Coverage for webapp/handlers.py: 92%
159 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-17 22:07 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-17 22:07 +0000
1import socket
2from urllib.parse import unquote, urlparse, urlunparse
4import base64
5import hashlib
6import re
8import flask
9from flask import render_template, request
10import prometheus_client
11import user_agents
12import webapp.template_utils as template_utils
13from canonicalwebteam import image_template
14from webapp import authentication
15import webapp.helpers as helpers
16from webapp.config import (
17 BSI_URL,
18 LOGIN_URL,
19 SENTRY_DSN,
20 COMMIT_ID,
21 ENVIRONMENT,
22 WEBAPP_CONFIG,
23 DNS_VERIFICATION_SALT,
24 IS_DEVELOPMENT,
25 VITE_PORT,
26)
28from canonicalwebteam.exceptions import (
29 StoreApiError,
30 StoreApiConnectionError,
31 StoreApiResourceNotFound,
32 StoreApiResponseDecodeError,
33 StoreApiResponseError,
34 StoreApiResponseErrorList,
35 StoreApiTimeoutError,
36 PublisherAgreementNotSigned,
37 PublisherMacaroonRefreshRequired,
38 PublisherMissingUsername,
39)
41from webapp.api.exceptions import (
42 ApiError,
43 ApiConnectionError,
44 ApiResponseErrorList,
45 ApiTimeoutError,
46 ApiResponseDecodeError,
47 ApiResponseError,
48)
50from datetime import datetime
52badge_counter = prometheus_client.Counter(
53 "badge_counter", "A counter of badges requests"
54)
56badge_logged_in_counter = prometheus_client.Counter(
57 "badge_logged_in_counter",
58 "A counter of badges requests of logged in users",
59)
61accept_encoding_counter = prometheus_client.Counter(
62 "accept_encoding_counter",
63 "A counter for Accept-Encoding headers, split by browser",
64 ["accept_encoding", "browser_family"],
65)
67CSP = {
68 "default-src": ["'self'"],
69 "img-src": [
70 "data: blob:",
71 # This is needed to allow images from
72 # https://www.google.*/ads/ga-audiences to load.
73 "*",
74 ],
75 "script-src-elem": [
76 "'self'",
77 "assets.ubuntu.com",
78 "www.googletagmanager.com",
79 "www.youtube.com",
80 "asciinema.org",
81 "player.vimeo.com",
82 "plausible.io",
83 "script.crazyegg.com",
84 "w.usabilla.com",
85 "connect.facebook.net",
86 "snap.licdn.com",
87 # This is necessary for Google Tag Manager to function properly.
88 "'unsafe-inline'",
89 ],
90 "font-src": [
91 "'self'",
92 "assets.ubuntu.com",
93 ],
94 "script-src": [],
95 "connect-src": [
96 "'self'",
97 "ubuntu.com",
98 "analytics.google.com",
99 "stats.g.doubleclick.net",
100 "www.googletagmanager.com",
101 "sentry.is.canonical.com",
102 "www.google-analytics.com",
103 "plausible.io",
104 "*.crazyegg.com",
105 "www.facebook.com",
106 "px.ads.linkedin.com",
107 ],
108 "frame-src": [
109 "'self'",
110 "td.doubleclick.net",
111 "www.youtube.com/",
112 "asciinema.org",
113 "player.vimeo.com",
114 "snapcraft.io",
115 "www.facebook.com",
116 "snap:",
117 ],
118 "style-src": [
119 "'self'",
120 "'unsafe-inline'",
121 ],
122 "media-src": [
123 "'self'",
124 "res.cloudinary.com",
125 ],
126}
128CSP_SCRIPT_SRC = [
129 "'self'",
130 "blob:",
131 "'unsafe-eval'",
132 "'unsafe-hashes'",
133]
135# Vite integration
136if IS_DEVELOPMENT:
137 CSP["script-src-elem"].append(f"localhost:{VITE_PORT}")
138 CSP["connect-src"].append(f"localhost:{VITE_PORT}")
139 CSP["connect-src"].append(f"ws://localhost:{VITE_PORT}")
140 CSP["style-src"].append(f"localhost:{VITE_PORT}")
141 CSP_SCRIPT_SRC.append(f"localhost:{VITE_PORT}")
144def refresh_redirect(path):
145 try:
146 macaroon_discharge = authentication.get_refreshed_discharge(
147 flask.session["macaroon_discharge"]
148 )
149 except ApiResponseError as api_response_error:
150 if api_response_error.status_code == 401:
151 return flask.redirect(flask.url_for("login.logout"))
152 else:
153 return flask.abort(502, str(api_response_error))
154 except ApiError as api_error:
155 return flask.abort(502, str(api_error))
157 flask.session["macaroon_discharge"] = macaroon_discharge
158 return flask.redirect(path)
161def snapcraft_utility_processor():
162 if authentication.is_authenticated(flask.session):
163 user_name = flask.session["publisher"]["fullname"]
164 user_is_canonical = flask.session["publisher"].get(
165 "is_canonical", False
166 )
167 stores = flask.session["publisher"].get("stores")
168 else:
169 user_name = None
170 user_is_canonical = False
171 stores = []
173 page_slug = template_utils.generate_slug(flask.request.path)
175 return {
176 # Variables
177 "LOGIN_URL": LOGIN_URL,
178 "SENTRY_DSN": SENTRY_DSN,
179 "COMMIT_ID": COMMIT_ID,
180 "ENVIRONMENT": ENVIRONMENT,
181 "host_url": flask.request.host_url,
182 "path": flask.request.path,
183 "page_slug": page_slug,
184 "user_name": user_name,
185 "VERIFIED_PUBLISHER": "verified",
186 "STAR_DEVELOPER": "starred",
187 "webapp_config": WEBAPP_CONFIG,
188 "BSI_URL": BSI_URL,
189 "now": datetime.now(),
190 "user_is_canonical": user_is_canonical,
191 # Functions
192 "contains": template_utils.contains,
193 "join": template_utils.join,
194 "static_url": template_utils.static_url,
195 "IS_DEVELOPMENT": IS_DEVELOPMENT,
196 "vite_import": template_utils.vite_import,
197 "vite_dev_tools": template_utils.vite_dev_tools,
198 "format_number": template_utils.format_number,
199 "format_display_name": template_utils.format_display_name,
200 "display_name": template_utils.display_name,
201 "install_snippet": template_utils.install_snippet,
202 "format_date": template_utils.format_date,
203 "format_member_role": template_utils.format_member_role,
204 "image": image_template,
205 "stores": stores,
206 "format_link": template_utils.format_link,
207 "DNS_VERIFICATION_SALT": DNS_VERIFICATION_SALT,
208 }
211def set_handlers(app):
212 @app.context_processor
213 def utility_processor():
214 """
215 This defines the set of properties and functions that will be added
216 to the default context for processing templates. All these items
217 can be used in all templates
218 """
220 return snapcraft_utility_processor()
222 # Error handlers
223 # ===
224 @app.errorhandler(500)
225 @app.errorhandler(501)
226 @app.errorhandler(502)
227 @app.errorhandler(504)
228 @app.errorhandler(505)
229 def internal_error(error):
230 error_name = getattr(error, "name", type(error).__name__)
231 return_code = getattr(error, "code", 500)
233 if not app.testing:
234 app.extensions["sentry"].captureException()
236 return (
237 flask.render_template("50X.html", error_name=error_name),
238 return_code,
239 )
241 @app.errorhandler(503)
242 def service_unavailable(error):
243 return render_template("503.html"), 503
245 @app.errorhandler(404)
246 @app.errorhandler(StoreApiResourceNotFound)
247 def handle_resource_not_found(error):
248 return render_template("404.html", message=str(error)), 404
250 @app.errorhandler(ApiTimeoutError)
251 @app.errorhandler(StoreApiTimeoutError)
252 def handle_connection_timeout(error):
253 status_code = 504
254 return (
255 render_template(
256 "50X.html", error_message=str(error), status_code=status_code
257 ),
258 status_code,
259 )
261 @app.errorhandler(ApiResponseDecodeError)
262 @app.errorhandler(ApiResponseError)
263 @app.errorhandler(ApiConnectionError)
264 @app.errorhandler(StoreApiResponseDecodeError)
265 @app.errorhandler(StoreApiResponseError)
266 @app.errorhandler(StoreApiConnectionError)
267 @app.errorhandler(ApiError)
268 @app.errorhandler(StoreApiError)
269 def store_api_error(error):
270 status_code = 502
271 return (
272 render_template(
273 "50X.html", error_message=str(error), status_code=status_code
274 ),
275 status_code,
276 )
278 @app.errorhandler(ApiResponseErrorList)
279 @app.errorhandler(StoreApiResponseErrorList)
280 def handle_api_error_list(error):
281 if error.status_code == 404:
282 if "snap_name" in request.path:
283 return flask.abort(404, "Snap not found!")
284 else:
285 return (
286 render_template("404.html", message="Entity not found"),
287 404,
288 )
289 if len(error.errors) == 1 and error.errors[0]["code"] in [
290 "macaroon-permission-required",
291 "macaroon-authorization-required",
292 ]:
293 authentication.empty_session(flask.session)
294 return flask.redirect(f"/login?next={flask.request.path}")
296 status_code = 502
297 codes = [
298 f"{error['code']}: {error.get('message', 'No message')}"
299 for error in error.errors
300 ]
302 error_msg = ", ".join(codes)
303 return (
304 render_template(
305 "50X.html", error_message=error_msg, status_code=status_code
306 ),
307 status_code,
308 )
310 # Publisher error
311 @app.errorhandler(PublisherMissingUsername)
312 def handle_publisher_missing_name(error):
313 return flask.redirect(flask.url_for("account.get_account_name"))
315 @app.errorhandler(PublisherAgreementNotSigned)
316 def handle_publisher_agreement_not_signed(error):
317 return flask.redirect(flask.url_for("account.get_agreement"))
319 @app.errorhandler(PublisherMacaroonRefreshRequired)
320 def handle_publisher_macaroon_refresh_required(error):
321 return refresh_redirect(flask.request.path)
323 # Global tasks for all requests
324 # ===
325 @app.before_request
326 def clear_trailing():
327 """
328 Remove trailing slashes from all routes
329 We like our URLs without slashes
330 """
332 parsed_url = urlparse(unquote(flask.request.url))
333 path = parsed_url.path
335 if path != "/" and path.endswith("/"):
336 new_uri = urlunparse(parsed_url._replace(path=path[:-1]))
338 return flask.redirect(new_uri)
340 @app.before_request
341 def prometheus_metrics():
342 # Accept-encoding counter
343 # ===
344 agent_string = flask.request.headers.get("User-Agent")
346 # Exclude probes, which happen behind the cache
347 if agent_string and not agent_string.startswith(
348 ("kube-probe", "Prometheus")
349 ):
350 agent = user_agents.parse(agent_string or "")
352 accept_encoding_counter.labels(
353 accept_encoding=flask.request.headers.get("Accept-Encoding"),
354 browser_family=agent.browser.family,
355 ).inc()
357 # Badge counters
358 # ===
359 if "/static/images/badges" in flask.request.url:
360 if flask.session:
361 badge_logged_in_counter.inc()
362 else:
363 badge_counter.inc()
365 # Calculate the SHA256 hash of the script content and encode it in base64.
366 def calculate_sha256_base64(script_content):
367 sha256_hash = hashlib.sha256(script_content.encode()).digest()
368 return "sha256-" + base64.b64encode(sha256_hash).decode()
370 def get_csp_directive(content, regex):
371 directive_items = set()
372 pattern = re.compile(regex)
373 matched_contents = pattern.findall(content)
374 for matched_content in matched_contents:
375 hash_value = f"'{calculate_sha256_base64(matched_content)}'"
376 directive_items.add(hash_value)
377 return list(directive_items)
379 # Find all script elements in the response and add their hashes to the CSP.
380 def add_script_hashes_to_csp(response):
381 response.freeze()
382 decoded_content = b"".join(response.response).decode(
383 "utf-8", errors="replace"
384 )
386 CSP["script-src"] = CSP_SCRIPT_SRC + get_csp_directive(
387 decoded_content, r'onclick\s*=\s*"(.*?)"'
388 )
389 return CSP
391 @app.after_request
392 def add_headers(response):
393 """
394 Generic rules for headers to add to all requests
396 - X-Hostname: Mention the name of the host/pod running the application
397 - Cache-Control: Add cache-control headers for public and private pages
398 - Content-Security-Policy: Restrict resources (e.g., JavaScript, CSS,
399 Images) and URLs
400 - Referrer-Policy: Limit referrer data for security while preserving
401 full referrer for same-origin requests
402 - Cross-Origin-Embedder-Policy: allows embedding cross-origin
403 resources
404 - Cross-Origin-Opener-Policy: enable the page to open pop-ups while
405 maintaining same-origin policy
406 - Cross-Origin-Resource-Policy: allowing cross-origin requests to
407 access the resource
408 - X-Permitted-Cross-Domain-Policies: disallows cross-domain access to
409 resources
410 """
412 response.headers["X-Hostname"] = socket.gethostname()
414 if response.status_code == 200:
415 if flask.session:
416 response.headers["Cache-Control"] = "private"
417 else:
418 # Only add caching headers to successful responses
419 if not response.headers.get("Cache-Control"):
420 response.headers["Cache-Control"] = ", ".join(
421 {
422 "public",
423 "max-age=61",
424 "stale-while-revalidate=300",
425 "stale-if-error=86400",
426 }
427 )
428 csp = add_script_hashes_to_csp(response)
429 response.headers["Content-Security-Policy"] = helpers.get_csp_as_str(
430 csp
431 )
432 response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
433 response.headers["Cross-Origin-Embedder-Policy"] = "unsafe-none"
434 response.headers["Cross-Origin-Opener-Policy"] = (
435 "same-origin-allow-popups"
436 )
437 response.headers["Cross-Origin-Resource-Policy"] = "cross-origin"
438 response.headers["X-Permitted-Cross-Domain-Policies"] = "none"
439 return response