Coverage for webapp/handlers.py: 91%
145 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 22:07 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 22:07 +0000
1import socket
2from urllib.parse import unquote, urlparse, urlunparse
4import base64
5import hashlib
6import re
7import sentry_sdk
9import flask
10from flask import render_template, request
11import webapp.template_utils as template_utils
12from canonicalwebteam import image_template
13from webapp import authentication
14import webapp.helpers as helpers
15from webapp.config import (
16 BSI_URL,
17 LOGIN_URL,
18 SENTRY_DSN,
19 COMMIT_ID,
20 ENVIRONMENT,
21 WEBAPP_CONFIG,
22 DNS_VERIFICATION_SALT,
23 IS_DEVELOPMENT,
24 VITE_PORT,
25)
27from canonicalwebteam.exceptions import (
28 StoreApiError,
29 StoreApiConnectionError,
30 StoreApiResourceNotFound,
31 StoreApiResponseDecodeError,
32 StoreApiResponseError,
33 StoreApiResponseErrorList,
34 StoreApiTimeoutError,
35 PublisherAgreementNotSigned,
36 PublisherMacaroonRefreshRequired,
37 PublisherMissingUsername,
38)
40from webapp.api.exceptions import (
41 ApiError,
42 ApiConnectionError,
43 ApiResponseErrorList,
44 ApiTimeoutError,
45 ApiResponseDecodeError,
46 ApiResponseError,
47)
49from datetime import datetime
52CSP = {
53 "default-src": ["'self'"],
54 "img-src": [
55 "data: blob:",
56 # This is needed to allow images from
57 # https://www.google.*/ads/ga-audiences to load.
58 "*",
59 ],
60 "script-src-elem": [
61 "'self'",
62 "assets.ubuntu.com",
63 "www.googletagmanager.com",
64 "www.youtube.com",
65 "asciinema.org",
66 "player.vimeo.com",
67 "plausible.io",
68 "script.crazyegg.com",
69 "w.usabilla.com",
70 "connect.facebook.net",
71 "snap.licdn.com",
72 # This is necessary for Google Tag Manager to function properly.
73 "'unsafe-inline'",
74 ],
75 "font-src": [
76 "'self'",
77 "assets.ubuntu.com",
78 ],
79 "script-src": [],
80 "connect-src": [
81 "'self'",
82 "ubuntu.com",
83 "analytics.google.com",
84 "stats.g.doubleclick.net",
85 "www.googletagmanager.com",
86 "sentry.is.canonical.com",
87 "www.google-analytics.com",
88 "plausible.io",
89 "*.crazyegg.com",
90 "www.facebook.com",
91 "px.ads.linkedin.com",
92 "*.snapcraft.io",
93 ],
94 "frame-src": [
95 "'self'",
96 "td.doubleclick.net",
97 "www.youtube.com/",
98 "asciinema.org",
99 "player.vimeo.com",
100 "snapcraft.io",
101 "www.facebook.com",
102 "snap:",
103 ],
104 "style-src": [
105 "'self'",
106 "'unsafe-inline'",
107 ],
108 "media-src": [
109 "'self'",
110 "res.cloudinary.com",
111 ],
112}
114CSP_SCRIPT_SRC = [
115 "'self'",
116 "blob:",
117 "'unsafe-eval'",
118 "'unsafe-hashes'",
119]
121# Vite integration
122if IS_DEVELOPMENT:
123 CSP["script-src-elem"].append(f"localhost:{VITE_PORT}")
124 CSP["connect-src"].append(f"localhost:{VITE_PORT}")
125 CSP["connect-src"].append(f"ws://localhost:{VITE_PORT}")
126 CSP["style-src"].append(f"localhost:{VITE_PORT}")
127 CSP_SCRIPT_SRC.append(f"localhost:{VITE_PORT}")
130def refresh_redirect(path):
131 try:
132 macaroon_discharge = authentication.get_refreshed_discharge(
133 flask.session["macaroon_discharge"]
134 )
135 except ApiResponseError as api_response_error:
136 if api_response_error.status_code == 401:
137 return flask.redirect(flask.url_for("login.logout"))
138 else:
139 return flask.abort(502, str(api_response_error))
140 except ApiError as api_error:
141 return flask.abort(502, str(api_error))
143 flask.session["macaroon_discharge"] = macaroon_discharge
144 return flask.redirect(path)
147def snapcraft_utility_processor():
148 if authentication.is_authenticated(flask.session):
149 user_name = flask.session["publisher"]["fullname"]
150 user_is_canonical = flask.session["publisher"].get(
151 "is_canonical", False
152 )
153 stores = flask.session["publisher"].get("stores")
154 else:
155 user_name = None
156 user_is_canonical = False
157 stores = []
159 page_slug = template_utils.generate_slug(flask.request.path)
161 return {
162 # Variables
163 "LOGIN_URL": LOGIN_URL,
164 "SENTRY_DSN": SENTRY_DSN,
165 "COMMIT_ID": COMMIT_ID,
166 "ENVIRONMENT": ENVIRONMENT,
167 "host_url": flask.request.host_url,
168 "path": flask.request.path,
169 "page_slug": page_slug,
170 "user_name": user_name,
171 "VERIFIED_PUBLISHER": "verified",
172 "STAR_DEVELOPER": "starred",
173 "webapp_config": WEBAPP_CONFIG,
174 "BSI_URL": BSI_URL,
175 "now": datetime.now(),
176 "user_is_canonical": user_is_canonical,
177 # Functions
178 "contains": template_utils.contains,
179 "join": template_utils.join,
180 "static_url": template_utils.static_url,
181 "IS_DEVELOPMENT": IS_DEVELOPMENT,
182 "vite_import": template_utils.vite_import,
183 "vite_dev_tools": template_utils.vite_dev_tools,
184 "format_number": template_utils.format_number,
185 "format_display_name": template_utils.format_display_name,
186 "display_name": template_utils.display_name,
187 "install_snippet": template_utils.install_snippet,
188 "format_date": template_utils.format_date,
189 "format_member_role": template_utils.format_member_role,
190 "image": image_template,
191 "stores": stores,
192 "format_link": template_utils.format_link,
193 "DNS_VERIFICATION_SALT": DNS_VERIFICATION_SALT,
194 }
197def set_handlers(app):
198 @app.context_processor
199 def utility_processor():
200 """
201 This defines the set of properties and functions that will be added
202 to the default context for processing templates. All these items
203 can be used in all templates
204 """
206 return snapcraft_utility_processor()
208 # Error handlers
209 # ===
210 @app.errorhandler(500)
211 @app.errorhandler(501)
212 @app.errorhandler(502)
213 @app.errorhandler(504)
214 @app.errorhandler(505)
215 def internal_error(error):
216 error_name = getattr(error, "name", type(error).__name__)
217 return_code = getattr(error, "code", 500)
219 if not app.testing:
220 sentry_sdk.capture_exception()
222 return (
223 flask.render_template("50X.html", error_name=error_name),
224 return_code,
225 )
227 @app.errorhandler(503)
228 def service_unavailable(error):
229 return render_template("503.html"), 503
231 @app.errorhandler(404)
232 @app.errorhandler(StoreApiResourceNotFound)
233 def handle_resource_not_found(error):
234 return render_template("404.html", message=str(error)), 404
236 @app.errorhandler(ApiTimeoutError)
237 @app.errorhandler(StoreApiTimeoutError)
238 def handle_connection_timeout(error):
239 status_code = 504
240 return (
241 render_template(
242 "50X.html", error_message=str(error), status_code=status_code
243 ),
244 status_code,
245 )
247 @app.errorhandler(ApiResponseDecodeError)
248 @app.errorhandler(ApiResponseError)
249 @app.errorhandler(ApiConnectionError)
250 @app.errorhandler(StoreApiResponseDecodeError)
251 @app.errorhandler(StoreApiResponseError)
252 @app.errorhandler(StoreApiConnectionError)
253 @app.errorhandler(ApiError)
254 @app.errorhandler(StoreApiError)
255 def store_api_error(error):
256 status_code = 502
257 return (
258 render_template(
259 "50X.html", error_message=str(error), status_code=status_code
260 ),
261 status_code,
262 )
264 @app.errorhandler(ApiResponseErrorList)
265 @app.errorhandler(StoreApiResponseErrorList)
266 def handle_api_error_list(error):
267 if error.status_code == 404:
268 if "snap_name" in request.path:
269 return flask.abort(404, "Snap not found!")
270 else:
271 return (
272 render_template("404.html", message="Entity not found"),
273 404,
274 )
275 if len(error.errors) == 1 and error.errors[0]["code"] in [
276 "macaroon-permission-required",
277 "macaroon-authorization-required",
278 ]:
279 authentication.empty_session(flask.session)
280 return flask.redirect(f"/login?next={flask.request.path}")
282 status_code = 502
283 codes = [
284 f"{error['code']}: {error.get('message', 'No message')}"
285 for error in error.errors
286 ]
288 error_msg = ", ".join(codes)
289 return (
290 render_template(
291 "50X.html", error_message=error_msg, status_code=status_code
292 ),
293 status_code,
294 )
296 # Publisher error
297 @app.errorhandler(PublisherMissingUsername)
298 def handle_publisher_missing_name(error):
299 return flask.redirect(flask.url_for("account.get_account_name"))
301 @app.errorhandler(PublisherAgreementNotSigned)
302 def handle_publisher_agreement_not_signed(error):
303 return flask.redirect(flask.url_for("account.get_agreement"))
305 @app.errorhandler(PublisherMacaroonRefreshRequired)
306 def handle_publisher_macaroon_refresh_required(error):
307 return refresh_redirect(flask.request.path)
309 # Global tasks for all requests
310 # ===
311 @app.before_request
312 def clear_trailing():
313 """
314 Remove trailing slashes from all routes
315 We like our URLs without slashes
316 """
318 parsed_url = urlparse(unquote(flask.request.url))
319 path = parsed_url.path
321 if path != "/" and path.endswith("/"):
322 new_uri = urlunparse(parsed_url._replace(path=path[:-1]))
324 return flask.redirect(new_uri)
326 # Calculate the SHA256 hash of the script content and encode it in base64.
327 def calculate_sha256_base64(script_content):
328 sha256_hash = hashlib.sha256(script_content.encode()).digest()
329 return "sha256-" + base64.b64encode(sha256_hash).decode()
331 def get_csp_directive(content, regex):
332 directive_items = set()
333 pattern = re.compile(regex)
334 matched_contents = pattern.findall(content)
335 for matched_content in matched_contents:
336 hash_value = f"'{calculate_sha256_base64(matched_content)}'"
337 directive_items.add(hash_value)
338 return list(directive_items)
340 # Find all script elements in the response and add their hashes to the CSP.
341 def add_script_hashes_to_csp(response):
342 response.freeze()
343 decoded_content = b"".join(response.response).decode(
344 "utf-8", errors="replace"
345 )
347 CSP["script-src"] = CSP_SCRIPT_SRC + get_csp_directive(
348 decoded_content, r'onclick\s*=\s*"(.*?)"'
349 )
350 return CSP
352 @app.after_request
353 def add_headers(response):
354 """
355 Generic rules for headers to add to all requests
357 - X-Hostname: Mention the name of the host/pod running the application
358 - Cache-Control: Add cache-control headers for public and private pages
359 - Content-Security-Policy: Restrict resources (e.g., JavaScript, CSS,
360 Images) and URLs
361 - Referrer-Policy: Limit referrer data for security while preserving
362 full referrer for same-origin requests
363 - Cross-Origin-Embedder-Policy: allows embedding cross-origin
364 resources
365 - Cross-Origin-Opener-Policy: enable the page to open pop-ups while
366 maintaining same-origin policy
367 - Cross-Origin-Resource-Policy: allowing cross-origin requests to
368 access the resource
369 - X-Permitted-Cross-Domain-Policies: disallows cross-domain access to
370 resources
371 """
373 response.headers["X-Hostname"] = socket.gethostname()
375 if response.status_code == 200:
376 if flask.session:
377 response.headers["Cache-Control"] = "private"
378 else:
379 # Only add caching headers to successful responses
380 if not response.headers.get("Cache-Control"):
381 response.headers["Cache-Control"] = ", ".join(
382 {
383 "public",
384 "max-age=61",
385 "stale-while-revalidate=300",
386 "stale-if-error=86400",
387 }
388 )
389 csp = add_script_hashes_to_csp(response)
390 response.headers["Content-Security-Policy"] = helpers.get_csp_as_str(
391 csp
392 )
393 response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
394 response.headers["Cross-Origin-Embedder-Policy"] = "unsafe-none"
395 response.headers["Cross-Origin-Opener-Policy"] = (
396 "same-origin-allow-popups"
397 )
398 response.headers["Cross-Origin-Resource-Policy"] = "cross-origin"
399 response.headers["X-Permitted-Cross-Domain-Policies"] = "none"
400 return response