Coverage for webapp/handlers.py: 91%
145 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-14 22:07 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-14 22:07 +0000
1import socket
2from urllib.parse import unquote, urlparse, urlunparse
4import base64
5import hashlib
6import re
7import sentry_sdk
9import flask
10from flask import render_template, request
11import webapp.template_utils as template_utils
12from canonicalwebteam import image_template
13from webapp import authentication
14import webapp.helpers as helpers
15from webapp.config import (
16 BSI_URL,
17 LOGIN_URL,
18 SENTRY_DSN,
19 COMMIT_ID,
20 ENVIRONMENT,
21 WEBAPP_CONFIG,
22 DNS_VERIFICATION_SALT,
23 IS_DEVELOPMENT,
24 VITE_PORT,
25)
27from canonicalwebteam.exceptions import (
28 StoreApiError,
29 StoreApiConnectionError,
30 StoreApiResourceNotFound,
31 StoreApiResponseDecodeError,
32 StoreApiResponseError,
33 StoreApiResponseErrorList,
34 StoreApiTimeoutError,
35 PublisherAgreementNotSigned,
36 PublisherMacaroonRefreshRequired,
37 PublisherMissingUsername,
38)
40from webapp.api.exceptions import (
41 ApiError,
42 ApiConnectionError,
43 ApiResponseErrorList,
44 ApiTimeoutError,
45 ApiResponseDecodeError,
46 ApiResponseError,
47)
49from datetime import datetime
52CSP = {
53 "default-src": ["'self'"],
54 "img-src": [
55 "data: blob:",
56 # This is needed to allow images from
57 # https://www.google.*/ads/ga-audiences to load.
58 "*",
59 ],
60 "script-src-elem": [
61 "'self'",
62 "assets.ubuntu.com",
63 "www.googletagmanager.com",
64 "www.youtube.com",
65 "asciinema.org",
66 "player.vimeo.com",
67 "plausible.io",
68 "script.crazyegg.com",
69 "w.usabilla.com",
70 "connect.facebook.net",
71 "snap.licdn.com",
72 # This is necessary for Google Tag Manager to function properly.
73 "'unsafe-inline'",
74 ],
75 "font-src": [
76 "'self'",
77 "assets.ubuntu.com",
78 ],
79 "script-src": [],
80 "connect-src": [
81 "'self'",
82 "ubuntu.com",
83 "analytics.google.com",
84 "stats.g.doubleclick.net",
85 "www.googletagmanager.com",
86 "sentry.is.canonical.com",
87 "www.google-analytics.com",
88 "plausible.io",
89 "*.crazyegg.com",
90 "www.facebook.com",
91 "px.ads.linkedin.com",
92 ],
93 "frame-src": [
94 "'self'",
95 "td.doubleclick.net",
96 "www.youtube.com/",
97 "asciinema.org",
98 "player.vimeo.com",
99 "snapcraft.io",
100 "www.facebook.com",
101 "snap:",
102 ],
103 "style-src": [
104 "'self'",
105 "'unsafe-inline'",
106 ],
107 "media-src": [
108 "'self'",
109 "res.cloudinary.com",
110 ],
111}
113CSP_SCRIPT_SRC = [
114 "'self'",
115 "blob:",
116 "'unsafe-eval'",
117 "'unsafe-hashes'",
118]
120# Vite integration
121if IS_DEVELOPMENT:
122 CSP["script-src-elem"].append(f"localhost:{VITE_PORT}")
123 CSP["connect-src"].append(f"localhost:{VITE_PORT}")
124 CSP["connect-src"].append(f"ws://localhost:{VITE_PORT}")
125 CSP["style-src"].append(f"localhost:{VITE_PORT}")
126 CSP_SCRIPT_SRC.append(f"localhost:{VITE_PORT}")
129def refresh_redirect(path):
130 try:
131 macaroon_discharge = authentication.get_refreshed_discharge(
132 flask.session["macaroon_discharge"]
133 )
134 except ApiResponseError as api_response_error:
135 if api_response_error.status_code == 401:
136 return flask.redirect(flask.url_for("login.logout"))
137 else:
138 return flask.abort(502, str(api_response_error))
139 except ApiError as api_error:
140 return flask.abort(502, str(api_error))
142 flask.session["macaroon_discharge"] = macaroon_discharge
143 return flask.redirect(path)
146def snapcraft_utility_processor():
147 if authentication.is_authenticated(flask.session):
148 user_name = flask.session["publisher"]["fullname"]
149 user_is_canonical = flask.session["publisher"].get(
150 "is_canonical", False
151 )
152 stores = flask.session["publisher"].get("stores")
153 else:
154 user_name = None
155 user_is_canonical = False
156 stores = []
158 page_slug = template_utils.generate_slug(flask.request.path)
160 return {
161 # Variables
162 "LOGIN_URL": LOGIN_URL,
163 "SENTRY_DSN": SENTRY_DSN,
164 "COMMIT_ID": COMMIT_ID,
165 "ENVIRONMENT": ENVIRONMENT,
166 "host_url": flask.request.host_url,
167 "path": flask.request.path,
168 "page_slug": page_slug,
169 "user_name": user_name,
170 "VERIFIED_PUBLISHER": "verified",
171 "STAR_DEVELOPER": "starred",
172 "webapp_config": WEBAPP_CONFIG,
173 "BSI_URL": BSI_URL,
174 "now": datetime.now(),
175 "user_is_canonical": user_is_canonical,
176 # Functions
177 "contains": template_utils.contains,
178 "join": template_utils.join,
179 "static_url": template_utils.static_url,
180 "IS_DEVELOPMENT": IS_DEVELOPMENT,
181 "vite_import": template_utils.vite_import,
182 "vite_dev_tools": template_utils.vite_dev_tools,
183 "format_number": template_utils.format_number,
184 "format_display_name": template_utils.format_display_name,
185 "display_name": template_utils.display_name,
186 "install_snippet": template_utils.install_snippet,
187 "format_date": template_utils.format_date,
188 "format_member_role": template_utils.format_member_role,
189 "image": image_template,
190 "stores": stores,
191 "format_link": template_utils.format_link,
192 "DNS_VERIFICATION_SALT": DNS_VERIFICATION_SALT,
193 }
196def set_handlers(app):
197 @app.context_processor
198 def utility_processor():
199 """
200 This defines the set of properties and functions that will be added
201 to the default context for processing templates. All these items
202 can be used in all templates
203 """
205 return snapcraft_utility_processor()
207 # Error handlers
208 # ===
209 @app.errorhandler(500)
210 @app.errorhandler(501)
211 @app.errorhandler(502)
212 @app.errorhandler(504)
213 @app.errorhandler(505)
214 def internal_error(error):
215 error_name = getattr(error, "name", type(error).__name__)
216 return_code = getattr(error, "code", 500)
218 if not app.testing:
219 sentry_sdk.capture_exception()
221 return (
222 flask.render_template("50X.html", error_name=error_name),
223 return_code,
224 )
226 @app.errorhandler(503)
227 def service_unavailable(error):
228 return render_template("503.html"), 503
230 @app.errorhandler(404)
231 @app.errorhandler(StoreApiResourceNotFound)
232 def handle_resource_not_found(error):
233 return render_template("404.html", message=str(error)), 404
235 @app.errorhandler(ApiTimeoutError)
236 @app.errorhandler(StoreApiTimeoutError)
237 def handle_connection_timeout(error):
238 status_code = 504
239 return (
240 render_template(
241 "50X.html", error_message=str(error), status_code=status_code
242 ),
243 status_code,
244 )
246 @app.errorhandler(ApiResponseDecodeError)
247 @app.errorhandler(ApiResponseError)
248 @app.errorhandler(ApiConnectionError)
249 @app.errorhandler(StoreApiResponseDecodeError)
250 @app.errorhandler(StoreApiResponseError)
251 @app.errorhandler(StoreApiConnectionError)
252 @app.errorhandler(ApiError)
253 @app.errorhandler(StoreApiError)
254 def store_api_error(error):
255 status_code = 502
256 return (
257 render_template(
258 "50X.html", error_message=str(error), status_code=status_code
259 ),
260 status_code,
261 )
263 @app.errorhandler(ApiResponseErrorList)
264 @app.errorhandler(StoreApiResponseErrorList)
265 def handle_api_error_list(error):
266 if error.status_code == 404:
267 if "snap_name" in request.path:
268 return flask.abort(404, "Snap not found!")
269 else:
270 return (
271 render_template("404.html", message="Entity not found"),
272 404,
273 )
274 if len(error.errors) == 1 and error.errors[0]["code"] in [
275 "macaroon-permission-required",
276 "macaroon-authorization-required",
277 ]:
278 authentication.empty_session(flask.session)
279 return flask.redirect(f"/login?next={flask.request.path}")
281 status_code = 502
282 codes = [
283 f"{error['code']}: {error.get('message', 'No message')}"
284 for error in error.errors
285 ]
287 error_msg = ", ".join(codes)
288 return (
289 render_template(
290 "50X.html", error_message=error_msg, status_code=status_code
291 ),
292 status_code,
293 )
295 # Publisher error
296 @app.errorhandler(PublisherMissingUsername)
297 def handle_publisher_missing_name(error):
298 return flask.redirect(flask.url_for("account.get_account_name"))
300 @app.errorhandler(PublisherAgreementNotSigned)
301 def handle_publisher_agreement_not_signed(error):
302 return flask.redirect(flask.url_for("account.get_agreement"))
304 @app.errorhandler(PublisherMacaroonRefreshRequired)
305 def handle_publisher_macaroon_refresh_required(error):
306 return refresh_redirect(flask.request.path)
308 # Global tasks for all requests
309 # ===
310 @app.before_request
311 def clear_trailing():
312 """
313 Remove trailing slashes from all routes
314 We like our URLs without slashes
315 """
317 parsed_url = urlparse(unquote(flask.request.url))
318 path = parsed_url.path
320 if path != "/" and path.endswith("/"):
321 new_uri = urlunparse(parsed_url._replace(path=path[:-1]))
323 return flask.redirect(new_uri)
325 # Calculate the SHA256 hash of the script content and encode it in base64.
326 def calculate_sha256_base64(script_content):
327 sha256_hash = hashlib.sha256(script_content.encode()).digest()
328 return "sha256-" + base64.b64encode(sha256_hash).decode()
330 def get_csp_directive(content, regex):
331 directive_items = set()
332 pattern = re.compile(regex)
333 matched_contents = pattern.findall(content)
334 for matched_content in matched_contents:
335 hash_value = f"'{calculate_sha256_base64(matched_content)}'"
336 directive_items.add(hash_value)
337 return list(directive_items)
339 # Find all script elements in the response and add their hashes to the CSP.
340 def add_script_hashes_to_csp(response):
341 response.freeze()
342 decoded_content = b"".join(response.response).decode(
343 "utf-8", errors="replace"
344 )
346 CSP["script-src"] = CSP_SCRIPT_SRC + get_csp_directive(
347 decoded_content, r'onclick\s*=\s*"(.*?)"'
348 )
349 return CSP
351 @app.after_request
352 def add_headers(response):
353 """
354 Generic rules for headers to add to all requests
356 - X-Hostname: Mention the name of the host/pod running the application
357 - Cache-Control: Add cache-control headers for public and private pages
358 - Content-Security-Policy: Restrict resources (e.g., JavaScript, CSS,
359 Images) and URLs
360 - Referrer-Policy: Limit referrer data for security while preserving
361 full referrer for same-origin requests
362 - Cross-Origin-Embedder-Policy: allows embedding cross-origin
363 resources
364 - Cross-Origin-Opener-Policy: enable the page to open pop-ups while
365 maintaining same-origin policy
366 - Cross-Origin-Resource-Policy: allowing cross-origin requests to
367 access the resource
368 - X-Permitted-Cross-Domain-Policies: disallows cross-domain access to
369 resources
370 """
372 response.headers["X-Hostname"] = socket.gethostname()
374 if response.status_code == 200:
375 if flask.session:
376 response.headers["Cache-Control"] = "private"
377 else:
378 # Only add caching headers to successful responses
379 if not response.headers.get("Cache-Control"):
380 response.headers["Cache-Control"] = ", ".join(
381 {
382 "public",
383 "max-age=61",
384 "stale-while-revalidate=300",
385 "stale-if-error=86400",
386 }
387 )
388 csp = add_script_hashes_to_csp(response)
389 response.headers["Content-Security-Policy"] = helpers.get_csp_as_str(
390 csp
391 )
392 response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
393 response.headers["Cross-Origin-Embedder-Policy"] = "unsafe-none"
394 response.headers["Cross-Origin-Opener-Policy"] = (
395 "same-origin-allow-popups"
396 )
397 response.headers["Cross-Origin-Resource-Policy"] = "cross-origin"
398 response.headers["X-Permitted-Cross-Domain-Policies"] = "none"
399 return response