Coverage for webapp/handlers.py: 92%
145 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 22:43 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 22:43 +0000
1import socket
2from urllib.parse import unquote, urlparse, urlunparse
4import base64
5import hashlib
6import re
7import sentry_sdk
9import flask
10from flask import render_template, request
11import webapp.template_utils as template_utils
12from canonicalwebteam import image_template
13from webapp import authentication
14import webapp.helpers as helpers
15from webapp.config import (
16 BSI_URL,
17 LOGIN_URL,
18 SENTRY_DSN,
19 COMMIT_ID,
20 ENVIRONMENT,
21 WEBAPP_CONFIG,
22 DNS_VERIFICATION_SALT,
23 IS_DEVELOPMENT,
24 VITE_PORT,
25 ANALYTICS_ENDPOINT,
26 DEFAULT_ICON_URL,
27)
29from canonicalwebteam.exceptions import (
30 StoreApiError,
31 StoreApiConnectionError,
32 StoreApiResourceNotFound,
33 StoreApiResponseDecodeError,
34 StoreApiResponseError,
35 StoreApiResponseErrorList,
36 StoreApiTimeoutError,
37 PublisherAgreementNotSigned,
38 PublisherMacaroonRefreshRequired,
39 PublisherMissingUsername,
40)
42from webapp.api.exceptions import (
43 ApiError,
44 ApiConnectionError,
45 ApiResponseErrorList,
46 ApiTimeoutError,
47 ApiResponseDecodeError,
48 ApiResponseError,
49)
51from datetime import datetime
53CSP = {
54 "default-src": ["'self'"],
55 "img-src": [
56 "data: blob:",
57 # This is needed to allow images from
58 # https://www.google.*/ads/ga-audiences to load.
59 "*",
60 ],
61 "script-src-elem": [
62 "'self'",
63 "data:",
64 "assets.ubuntu.com",
65 "www.googletagmanager.com",
66 "www.youtube.com",
67 "asciinema.org",
68 "player.vimeo.com",
69 "plausible.io",
70 "script.crazyegg.com",
71 "w.usabilla.com",
72 "connect.facebook.net",
73 "snap.licdn.com",
74 "challenges.cloudflare.com",
75 # This is necessary for Google Tag Manager to function properly.
76 "'unsafe-inline'",
77 ],
78 "font-src": [
79 "'self'",
80 "assets.ubuntu.com",
81 ],
82 "script-src": [],
83 "connect-src": [
84 "'self'",
85 "ubuntu.com",
86 "analytics.google.com",
87 "*.analytics.google.com",
88 "stats.g.doubleclick.net",
89 "www.googletagmanager.com",
90 "sentry.is.canonical.com",
91 "www.google-analytics.com",
92 "plausible.io",
93 "*.crazyegg.com",
94 "www.facebook.com",
95 "px.ads.linkedin.com",
96 "*.snapcraft.io",
97 "*.snapcraftcontent.com",
98 "marketplace-analytics.staging.canonical.com",
99 "marketplace-analytics.canonical.com",
100 "challenges.cloudflare.com",
101 "www.google.com",
102 ],
103 "frame-src": [
104 "'self'",
105 "td.doubleclick.net",
106 "www.youtube.com",
107 "youtube.com",
108 "asciinema.org",
109 "player.vimeo.com",
110 "snapcraft.io",
111 "www.facebook.com",
112 "challenges.cloudflare.com",
113 "snap:",
114 ],
115 "style-src": [
116 "'self'",
117 "'unsafe-inline'",
118 ],
119 "media-src": [
120 "'self'",
121 "res.cloudinary.com",
122 ],
123}
125CSP_SCRIPT_SRC = [
126 "'self'",
127 "data:",
128 "blob:",
129 "'unsafe-eval'",
130 "'unsafe-hashes'",
131]
133# Vite integration
134if IS_DEVELOPMENT:
135 CSP["script-src-elem"].append(f"localhost:{VITE_PORT}")
136 CSP["connect-src"].append(f"localhost:{VITE_PORT}")
137 CSP["connect-src"].append(f"ws://localhost:{VITE_PORT}")
138 CSP["style-src"].append(f"localhost:{VITE_PORT}")
139 CSP_SCRIPT_SRC.append(f"localhost:{VITE_PORT}")
142def refresh_redirect():
143 try:
144 macaroon_discharge = authentication.get_refreshed_discharge(
145 flask.session["macaroon_discharge"]
146 )
147 except ApiResponseError as api_response_error:
148 if api_response_error.status_code == 401:
149 return flask.redirect(flask.url_for("login.logout"))
150 else:
151 return flask.abort(502, str(api_response_error))
152 except ApiError as api_error:
153 return flask.abort(502, str(api_error))
155 flask.session["macaroon_discharge"] = macaroon_discharge
156 return flask.redirect(
157 flask.url_for(
158 flask.request.endpoint,
159 **flask.request.view_args,
160 **flask.request.args,
161 )
162 )
165def snapcraft_utility_processor():
166 if authentication.is_authenticated(flask.session):
167 user_name = flask.session["publisher"]["fullname"]
168 user_is_canonical = flask.session["publisher"].get(
169 "is_canonical", False
170 )
171 stores = flask.session["publisher"].get("stores")
172 else:
173 user_name = None
174 user_is_canonical = False
175 stores = []
177 page_slug = template_utils.generate_slug(flask.request.path)
179 return {
180 # Variables
181 "LOGIN_URL": LOGIN_URL,
182 "SENTRY_DSN": SENTRY_DSN,
183 "COMMIT_ID": COMMIT_ID,
184 "ENVIRONMENT": ENVIRONMENT,
185 "host_url": flask.request.host_url,
186 "path": flask.request.path,
187 "page_slug": page_slug,
188 "user_name": user_name,
189 "VERIFIED_PUBLISHER": "verified",
190 "STAR_DEVELOPER": "starred",
191 "webapp_config": WEBAPP_CONFIG,
192 "BSI_URL": BSI_URL,
193 "now": datetime.now(),
194 "user_is_canonical": user_is_canonical,
195 # Functions
196 "contains": template_utils.contains,
197 "join": template_utils.join,
198 "static_url": template_utils.static_url,
199 "IS_DEVELOPMENT": IS_DEVELOPMENT,
200 "format_number": template_utils.format_number,
201 "format_display_name": template_utils.format_display_name,
202 "display_name": template_utils.display_name,
203 "install_snippet": template_utils.install_snippet,
204 "format_date": template_utils.format_date,
205 "format_member_role": template_utils.format_member_role,
206 "image": image_template,
207 "stores": stores,
208 "format_link": template_utils.format_link,
209 "DNS_VERIFICATION_SALT": DNS_VERIFICATION_SALT,
210 "ANALYTICS_ENDPOINT": ANALYTICS_ENDPOINT,
211 "DEFAULT_ICON_URL": DEFAULT_ICON_URL,
212 }
215def set_handlers(app):
216 @app.context_processor
217 def utility_processor():
218 """
219 This defines the set of properties and functions that will be added
220 to the default context for processing templates. All these items
221 can be used in all templates
222 """
224 return snapcraft_utility_processor()
226 # Error handlers
227 # ===
228 @app.errorhandler(500)
229 @app.errorhandler(501)
230 @app.errorhandler(502)
231 @app.errorhandler(504)
232 @app.errorhandler(505)
233 def internal_error(error):
234 error_name = getattr(error, "name", type(error).__name__)
235 return_code = getattr(error, "code", 500)
237 if not app.testing:
238 sentry_sdk.capture_exception()
240 return (
241 flask.render_template("50X.html", error_name=error_name),
242 return_code,
243 )
245 @app.errorhandler(503)
246 def service_unavailable(error):
247 return render_template("503.html"), 503
249 @app.errorhandler(404)
250 @app.errorhandler(StoreApiResourceNotFound)
251 def handle_resource_not_found(error):
252 return render_template("404.html", message=str(error)), 404
254 @app.errorhandler(ApiTimeoutError)
255 @app.errorhandler(StoreApiTimeoutError)
256 def handle_connection_timeout(error):
257 status_code = 504
258 return (
259 render_template(
260 "50X.html", error_message=str(error), status_code=status_code
261 ),
262 status_code,
263 )
265 @app.errorhandler(ApiResponseDecodeError)
266 @app.errorhandler(ApiResponseError)
267 @app.errorhandler(ApiConnectionError)
268 @app.errorhandler(StoreApiResponseDecodeError)
269 @app.errorhandler(StoreApiResponseError)
270 @app.errorhandler(StoreApiConnectionError)
271 @app.errorhandler(ApiError)
272 @app.errorhandler(StoreApiError)
273 def store_api_error(error):
274 status_code = 502
275 return (
276 render_template(
277 "50X.html", error_message=str(error), status_code=status_code
278 ),
279 status_code,
280 )
282 @app.errorhandler(ApiResponseErrorList)
283 @app.errorhandler(StoreApiResponseErrorList)
284 def handle_api_error_list(error):
285 if error.status_code == 404:
286 if "snap_name" in request.path:
287 return flask.abort(404, "Snap not found!")
288 else:
289 return (
290 render_template("404.html", message="Entity not found"),
291 404,
292 )
293 if len(error.errors) == 1 and error.errors[0]["code"] in [
294 "macaroon-permission-required",
295 "macaroon-authorization-required",
296 ]:
297 authentication.reset_auth_session(flask.session)
298 return flask.redirect(
299 flask.url_for("login.login_handler", next=flask.request.path)
300 )
302 status_code = 502
303 codes = [
304 f"{error['code']}: {error.get('message', 'No message')}"
305 for error in error.errors
306 ]
308 error_msg = ", ".join(codes)
309 return (
310 render_template(
311 "50X.html", error_message=error_msg, status_code=status_code
312 ),
313 status_code,
314 )
316 # Publisher error
317 @app.errorhandler(PublisherMissingUsername)
318 def handle_publisher_missing_name(error):
319 return flask.redirect(flask.url_for("account.get_account_name"))
321 @app.errorhandler(PublisherAgreementNotSigned)
322 def handle_publisher_agreement_not_signed(error):
323 return flask.redirect(flask.url_for("account.get_agreement"))
325 @app.errorhandler(PublisherMacaroonRefreshRequired)
326 def handle_publisher_macaroon_refresh_required(error):
327 return refresh_redirect()
329 # Global tasks for all requests
330 # ===
331 @app.before_request
332 def clear_trailing():
333 """
334 Remove trailing slashes from all routes
335 We like our URLs without slashes
336 """
338 parsed_url = urlparse(unquote(flask.request.url))
339 path = parsed_url.path
341 if path != "/" and path.endswith("/"):
342 new_uri = urlunparse(parsed_url._replace(path=path[:-1]))
344 return flask.redirect(new_uri)
346 # Calculate the SHA256 hash of the script content and encode it in base64.
347 def calculate_sha256_base64(script_content):
348 sha256_hash = hashlib.sha256(script_content.encode()).digest()
349 return "sha256-" + base64.b64encode(sha256_hash).decode()
351 def get_csp_directive(content, regex):
352 directive_items = set()
353 pattern = re.compile(regex)
354 matched_contents = pattern.findall(content)
355 for matched_content in matched_contents:
356 hash_value = f"'{calculate_sha256_base64(matched_content)}'"
357 directive_items.add(hash_value)
358 return list(directive_items)
360 # Find all script elements in the response and add their hashes to the CSP.
361 def add_script_hashes_to_csp(response):
362 response.freeze()
363 decoded_content = b"".join(response.response).decode(
364 "utf-8", errors="replace"
365 )
367 CSP["script-src"] = CSP_SCRIPT_SRC + get_csp_directive(
368 decoded_content, r'onclick\s*=\s*"(.*?)"'
369 )
370 return CSP
372 @app.after_request
373 def add_headers(response):
374 """
375 Generic rules for headers to add to all requests
377 - X-Hostname: Mention the name of the host/pod running the application
378 - Cache-Control: Add cache-control headers for public and private pages
379 - Content-Security-Policy: Restrict resources (e.g., JavaScript, CSS,
380 Images) and URLs
381 - Referrer-Policy: Limit referrer data for security while preserving
382 full referrer for same-origin requests
383 - Cross-Origin-Embedder-Policy: allows embedding cross-origin
384 resources
385 - Cross-Origin-Opener-Policy: enable the page to open pop-ups while
386 maintaining same-origin policy
387 - Cross-Origin-Resource-Policy: allowing cross-origin requests to
388 access the resource
389 - X-Permitted-Cross-Domain-Policies: disallows cross-domain access to
390 resources
391 """
393 response.headers["X-Hostname"] = socket.gethostname()
395 if response.status_code == 200:
396 if flask.session:
397 response.headers["Cache-Control"] = "private"
398 else:
399 # Only add caching headers to successful responses
400 if not response.headers.get("Cache-Control"):
401 response.headers["Cache-Control"] = ", ".join(
402 {
403 "public",
404 "max-age=61",
405 "stale-while-revalidate=300",
406 "stale-if-error=86400",
407 }
408 )
409 csp = add_script_hashes_to_csp(response)
410 response.headers["Content-Security-Policy"] = helpers.get_csp_as_str(
411 csp
412 )
413 response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
414 response.headers["Cross-Origin-Embedder-Policy"] = "unsafe-none"
415 response.headers["Cross-Origin-Opener-Policy"] = (
416 "same-origin-allow-popups"
417 )
418 response.headers["Cross-Origin-Resource-Policy"] = "cross-origin"
419 response.headers["X-Permitted-Cross-Domain-Policies"] = "none"
420 return response