Coverage for webapp / handlers.py: 92%
157 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-05 22:09 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-05 22:09 +0000
1import socket
2from urllib.parse import unquote, urlparse, urlunparse
4import base64
5import copy
6import hashlib
7import re
8import secrets
9import sentry_sdk
11import flask
12from flask import render_template, request
13import webapp.template_utils as template_utils
14from canonicalwebteam import image_template
15from webapp import authentication
16import webapp.helpers as helpers
17from webapp.config import (
18 BSI_URL,
19 LOGIN_URL,
20 SENTRY_DSN,
21 COMMIT_ID,
22 ENVIRONMENT,
23 WEBAPP_CONFIG,
24 DNS_VERIFICATION_SALT,
25 IS_DEVELOPMENT,
26 VITE_PORT,
27)
29from canonicalwebteam.exceptions import (
30 StoreApiError,
31 StoreApiConnectionError,
32 StoreApiResourceNotFound,
33 StoreApiResponseDecodeError,
34 StoreApiResponseError,
35 StoreApiResponseErrorList,
36 StoreApiTimeoutError,
37 PublisherAgreementNotSigned,
38 PublisherMacaroonRefreshRequired,
39 PublisherMissingUsername,
40)
42from webapp.api.exceptions import (
43 ApiError,
44 ApiConnectionError,
45 ApiResponseErrorList,
46 ApiTimeoutError,
47 ApiResponseDecodeError,
48 ApiResponseError,
49)
51from datetime import datetime
54CSP = {
55 "default-src": ["'self'"],
56 "img-src": [
57 "data: blob:",
58 # This is needed to allow images from
59 # https://www.google.*/ads/ga-audiences to load.
60 "*",
61 ],
62 "script-src-elem": [
63 "'self'",
64 "assets.ubuntu.com",
65 "www.googletagmanager.com",
66 "www.youtube.com",
67 "asciinema.org",
68 "player.vimeo.com",
69 "plausible.io",
70 "script.crazyegg.com",
71 "w.usabilla.com",
72 "connect.facebook.net",
73 "snap.licdn.com",
74 ],
75 "font-src": [
76 "'self'",
77 "assets.ubuntu.com",
78 ],
79 "script-src": [],
80 "connect-src": [
81 "'self'",
82 "ubuntu.com",
83 "*.analytics.google.com",
84 "stats.g.doubleclick.net",
85 "www.googletagmanager.com",
86 "sentry.is.canonical.com",
87 "www.google-analytics.com",
88 "plausible.io",
89 "*.crazyegg.com",
90 "www.facebook.com",
91 "px.ads.linkedin.com",
92 "*.snapcraft.io",
93 "*.snapcraftcontent.com",
94 "www.google.com",
95 ],
96 "frame-src": [
97 "'self'",
98 "td.doubleclick.net",
99 "www.youtube.com",
100 "youtube.com",
101 "asciinema.org",
102 "player.vimeo.com",
103 "snapcraft.io",
104 "www.facebook.com",
105 "snap:",
106 ],
107 "style-src": [
108 "'self'",
109 "'unsafe-inline'",
110 ],
111 "media-src": [
112 "'self'",
113 "res.cloudinary.com",
114 ],
115}
117CSP_SCRIPT_SRC = [
118 "'self'",
119 "blob:",
120 "'unsafe-eval'",
121 "'unsafe-hashes'",
122]
124# Vite integration
125if IS_DEVELOPMENT:
126 CSP["script-src-elem"].append(f"localhost:{VITE_PORT}")
127 CSP["connect-src"].append(f"localhost:{VITE_PORT}")
128 CSP["connect-src"].append(f"ws://localhost:{VITE_PORT}")
129 CSP["style-src"].append(f"localhost:{VITE_PORT}")
130 CSP_SCRIPT_SRC.append(f"localhost:{VITE_PORT}")
133def refresh_redirect(path):
134 try:
135 macaroon_discharge = authentication.get_refreshed_discharge(
136 flask.session["macaroon_discharge"]
137 )
138 except ApiResponseError as api_response_error:
139 if api_response_error.status_code == 401:
140 return flask.redirect(flask.url_for("login.logout"))
141 else:
142 return flask.abort(502, str(api_response_error))
143 except ApiError as api_error:
144 return flask.abort(502, str(api_error))
146 flask.session["macaroon_discharge"] = macaroon_discharge
147 return flask.redirect(path)
150def snapcraft_utility_processor():
151 if authentication.is_authenticated(flask.session):
152 user_name = flask.session["publisher"]["fullname"]
153 user_is_canonical = flask.session["publisher"].get(
154 "is_canonical", False
155 )
156 stores = flask.session["publisher"].get("stores")
157 else:
158 user_name = None
159 user_is_canonical = False
160 stores = []
162 page_slug = template_utils.generate_slug(flask.request.path)
164 return {
165 # Variables
166 "LOGIN_URL": LOGIN_URL,
167 "SENTRY_DSN": SENTRY_DSN,
168 "COMMIT_ID": COMMIT_ID,
169 "ENVIRONMENT": ENVIRONMENT,
170 "host_url": flask.request.host_url,
171 "path": flask.request.path,
172 "page_slug": page_slug,
173 "user_name": user_name,
174 "VERIFIED_PUBLISHER": "verified",
175 "STAR_DEVELOPER": "starred",
176 "webapp_config": WEBAPP_CONFIG,
177 "BSI_URL": BSI_URL,
178 "now": datetime.now(),
179 "user_is_canonical": user_is_canonical,
180 "CSP_NONCE": getattr(request, "CSP_NONCE", ""),
181 # Functions
182 "contains": template_utils.contains,
183 "join": template_utils.join,
184 "static_url": template_utils.static_url,
185 "IS_DEVELOPMENT": IS_DEVELOPMENT,
186 "format_number": template_utils.format_number,
187 "format_display_name": template_utils.format_display_name,
188 "display_name": template_utils.display_name,
189 "install_snippet": template_utils.install_snippet,
190 "format_date": template_utils.format_date,
191 "format_member_role": template_utils.format_member_role,
192 "image": image_template,
193 "stores": stores,
194 "format_link": template_utils.format_link,
195 "DNS_VERIFICATION_SALT": DNS_VERIFICATION_SALT,
196 }
199def set_handlers(app):
200 @app.context_processor
201 def utility_processor():
202 """
203 This defines the set of properties and functions that will be added
204 to the default context for processing templates. All these items
205 can be used in all templates
206 """
208 return snapcraft_utility_processor()
210 # Error handlers
211 # ===
212 @app.errorhandler(500)
213 @app.errorhandler(501)
214 @app.errorhandler(502)
215 @app.errorhandler(504)
216 @app.errorhandler(505)
217 def internal_error(error):
218 error_name = getattr(error, "name", type(error).__name__)
219 return_code = getattr(error, "code", 500)
221 if not app.testing:
222 sentry_sdk.capture_exception()
224 return (
225 flask.render_template("50X.html", error_name=error_name),
226 return_code,
227 )
229 @app.errorhandler(503)
230 def service_unavailable(error):
231 return render_template("503.html"), 503
233 @app.errorhandler(404)
234 @app.errorhandler(StoreApiResourceNotFound)
235 def handle_resource_not_found(error):
236 return render_template("404.html", message=str(error)), 404
238 @app.errorhandler(ApiTimeoutError)
239 @app.errorhandler(StoreApiTimeoutError)
240 def handle_connection_timeout(error):
241 status_code = 504
242 return (
243 render_template(
244 "50X.html", error_message=str(error), status_code=status_code
245 ),
246 status_code,
247 )
249 @app.errorhandler(ApiResponseDecodeError)
250 @app.errorhandler(ApiResponseError)
251 @app.errorhandler(ApiConnectionError)
252 @app.errorhandler(StoreApiResponseDecodeError)
253 @app.errorhandler(StoreApiResponseError)
254 @app.errorhandler(StoreApiConnectionError)
255 @app.errorhandler(ApiError)
256 @app.errorhandler(StoreApiError)
257 def store_api_error(error):
258 status_code = 502
259 return (
260 render_template(
261 "50X.html", error_message=str(error), status_code=status_code
262 ),
263 status_code,
264 )
266 @app.errorhandler(ApiResponseErrorList)
267 @app.errorhandler(StoreApiResponseErrorList)
268 def handle_api_error_list(error):
269 if error.status_code == 404:
270 if "snap_name" in request.path:
271 return flask.abort(404, "Snap not found!")
272 else:
273 return (
274 render_template("404.html", message="Entity not found"),
275 404,
276 )
277 if len(error.errors) == 1 and error.errors[0]["code"] in [
278 "macaroon-permission-required",
279 "macaroon-authorization-required",
280 ]:
281 authentication.empty_session(flask.session)
282 return flask.redirect(f"/login?next={flask.request.path}")
284 status_code = 502
285 codes = [
286 f"{error['code']}: {error.get('message', 'No message')}"
287 for error in error.errors
288 ]
290 error_msg = ", ".join(codes)
291 return (
292 render_template(
293 "50X.html", error_message=error_msg, status_code=status_code
294 ),
295 status_code,
296 )
298 # Publisher error
299 @app.errorhandler(PublisherMissingUsername)
300 def handle_publisher_missing_name(error):
301 return flask.redirect(flask.url_for("account.get_account_name"))
303 @app.errorhandler(PublisherAgreementNotSigned)
304 def handle_publisher_agreement_not_signed(error):
305 return flask.redirect(flask.url_for("account.get_agreement"))
307 @app.errorhandler(PublisherMacaroonRefreshRequired)
308 def handle_publisher_macaroon_refresh_required(error):
309 return refresh_redirect(flask.request.path)
311 # Global tasks for all requests
312 # ===
313 @app.before_request
314 def generate_nonce():
315 """
316 Generate a cryptographically secure random nonce for CSP
317 """
318 request.CSP_NONCE = secrets.token_urlsafe(16)
320 @app.before_request
321 def clear_trailing():
322 """
323 Remove trailing slashes from all routes
324 We like our URLs without slashes
325 """
327 parsed_url = urlparse(unquote(flask.request.url))
328 path = parsed_url.path
330 if path != "/" and path.endswith("/"):
331 new_uri = urlunparse(parsed_url._replace(path=path[:-1]))
333 return flask.redirect(new_uri)
335 # Calculate the SHA256 hash of the script content and encode it in base64.
336 def calculate_sha256_base64(script_content):
337 sha256_hash = hashlib.sha256(script_content.encode()).digest()
338 return "sha256-" + base64.b64encode(sha256_hash).decode()
340 def get_csp_directive(content, regex):
341 directive_items = set()
342 pattern = re.compile(regex)
343 matched_contents = pattern.findall(content)
344 for matched_content in matched_contents:
345 hash_value = f"'{calculate_sha256_base64(matched_content)}'"
346 directive_items.add(hash_value)
347 return list(directive_items)
349 # Find all script elements in the response and add their hashes to the CSP.
350 # Also add nonce for use in inline script elements
351 def add_script_hashes_and_nonce_to_csp(response):
352 response.freeze()
353 decoded_content = b"".join(response.response).decode(
354 "utf-8", errors="replace"
355 )
357 # Create a fresh copy of CSP for this request to
358 # prevent multiple nonces in the CSP headers
359 request_csp = copy.deepcopy(CSP)
361 # Handle cases where CSP_NONCE might not be set
362 csp_nonce = getattr(request, "CSP_NONCE", "")
363 csp_nonce_value = f"'nonce-{csp_nonce}'" if csp_nonce else ""
365 # Include CSP_NONCE in script-src along with other directives
366 script_src_values = CSP_SCRIPT_SRC + get_csp_directive(
367 decoded_content, r'onclick\s*=\s*"(.*?)"'
368 )
370 if csp_nonce_value:
371 request_csp["script-src-elem"] = CSP["script-src-elem"] + [
372 csp_nonce_value
373 ]
374 script_src_values.append(csp_nonce_value)
376 request_csp["script-src"] = script_src_values
377 return request_csp
379 @app.after_request
380 def add_headers(response):
381 """
382 Generic rules for headers to add to all requests
384 - X-Hostname: Mention the name of the host/pod running the application
385 - Cache-Control: Add cache-control headers for public and private pages
386 - Content-Security-Policy: Restrict resources (e.g., JavaScript, CSS,
387 Images) and URLs
388 - Referrer-Policy: Limit referrer data for security while preserving
389 full referrer for same-origin requests
390 - Cross-Origin-Embedder-Policy: allows embedding cross-origin
391 resources
392 - Cross-Origin-Opener-Policy: enable the page to open pop-ups while
393 maintaining same-origin policy
394 - Cross-Origin-Resource-Policy: allowing cross-origin requests to
395 access the resource
396 - X-Permitted-Cross-Domain-Policies: disallows cross-domain access to
397 resources
398 """
400 response.headers["X-Hostname"] = socket.gethostname()
402 if response.status_code == 200:
403 if flask.session:
404 response.headers["Cache-Control"] = "private"
405 else:
406 # Only add caching headers to successful responses
407 if not response.headers.get("Cache-Control"):
408 response.headers["Cache-Control"] = ", ".join(
409 {
410 "public",
411 "max-age=61",
412 "stale-while-revalidate=300",
413 "stale-if-error=86400",
414 }
415 )
416 csp = add_script_hashes_and_nonce_to_csp(response)
417 response.headers["Content-Security-Policy"] = helpers.get_csp_as_str(
418 csp
419 )
420 response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
421 response.headers["Cross-Origin-Embedder-Policy"] = "unsafe-none"
422 response.headers["Cross-Origin-Opener-Policy"] = (
423 "same-origin-allow-popups"
424 )
425 response.headers["Cross-Origin-Resource-Policy"] = "cross-origin"
426 response.headers["X-Permitted-Cross-Domain-Policies"] = "none"
427 return response