Coverage for webapp/handlers.py: 92%
153 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-28 22:05 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-28 22:05 +0000
1import socket
2from urllib.parse import unquote, urlparse, urlunparse
4import base64
5import hashlib
6import re
8import flask
9from flask import render_template, request
10import prometheus_client
11import user_agents
12import webapp.template_utils as template_utils
13from canonicalwebteam import image_template
14from webapp import authentication
15import webapp.helpers as helpers
16from webapp.config import (
17 BSI_URL,
18 LOGIN_URL,
19 SENTRY_DSN,
20 COMMIT_ID,
21 ENVIRONMENT,
22 WEBAPP_CONFIG,
23 DNS_VERIFICATION_SALT,
24)
26from canonicalwebteam.exceptions import (
27 StoreApiError,
28 StoreApiConnectionError,
29 StoreApiResourceNotFound,
30 StoreApiResponseDecodeError,
31 StoreApiResponseError,
32 StoreApiResponseErrorList,
33 StoreApiTimeoutError,
34 PublisherAgreementNotSigned,
35 PublisherMacaroonRefreshRequired,
36 PublisherMissingUsername,
37)
39from webapp.api.exceptions import (
40 ApiError,
41 ApiConnectionError,
42 ApiResponseErrorList,
43 ApiTimeoutError,
44 ApiResponseDecodeError,
45 ApiResponseError,
46)
48from datetime import datetime
50badge_counter = prometheus_client.Counter(
51 "badge_counter", "A counter of badges requests"
52)
54badge_logged_in_counter = prometheus_client.Counter(
55 "badge_logged_in_counter",
56 "A counter of badges requests of logged in users",
57)
59accept_encoding_counter = prometheus_client.Counter(
60 "accept_encoding_counter",
61 "A counter for Accept-Encoding headers, split by browser",
62 ["accept_encoding", "browser_family"],
63)
65CSP = {
66 "default-src": ["'self'"],
67 "img-src": [
68 "data: blob:",
69 # This is needed to allow images from
70 # https://www.google.*/ads/ga-audiences to load.
71 "*",
72 ],
73 "script-src-elem": [
74 "'self'",
75 "assets.ubuntu.com",
76 "www.googletagmanager.com",
77 "www.youtube.com",
78 "asciinema.org",
79 "player.vimeo.com",
80 "plausible.io",
81 "script.crazyegg.com",
82 "w.usabilla.com",
83 "connect.facebook.net",
84 "snap.licdn.com",
85 # This is necessary for Google Tag Manager to function properly.
86 "'unsafe-inline'",
87 ],
88 "font-src": [
89 "'self'",
90 "assets.ubuntu.com",
91 ],
92 "script-src": [],
93 "connect-src": [
94 "'self'",
95 "ubuntu.com",
96 "analytics.google.com",
97 "stats.g.doubleclick.net",
98 "www.googletagmanager.com",
99 "sentry.is.canonical.com",
100 "www.google-analytics.com",
101 "plausible.io",
102 "*.crazyegg.com",
103 "www.facebook.com",
104 "px.ads.linkedin.com",
105 ],
106 "frame-src": [
107 "'self'",
108 "td.doubleclick.net",
109 "www.youtube.com/",
110 "asciinema.org",
111 "player.vimeo.com",
112 "snapcraft.io",
113 "www.facebook.com",
114 "snap:",
115 ],
116 "style-src": [
117 "'self'",
118 "'unsafe-inline'",
119 ],
120 "media-src": [
121 "'self'",
122 "res.cloudinary.com",
123 ],
124}
126CSP_SCRIPT_SRC = [
127 "'self'",
128 "blob:",
129 "'unsafe-eval'",
130 "'unsafe-hashes'",
131]
134def refresh_redirect(path):
135 try:
136 macaroon_discharge = authentication.get_refreshed_discharge(
137 flask.session["macaroon_discharge"]
138 )
139 except ApiResponseError as api_response_error:
140 if api_response_error.status_code == 401:
141 return flask.redirect(flask.url_for("login.logout"))
142 else:
143 return flask.abort(502, str(api_response_error))
144 except ApiError as api_error:
145 return flask.abort(502, str(api_error))
147 flask.session["macaroon_discharge"] = macaroon_discharge
148 return flask.redirect(path)
151def snapcraft_utility_processor():
152 if authentication.is_authenticated(flask.session):
153 user_name = flask.session["publisher"]["fullname"]
154 user_is_canonical = flask.session["publisher"].get(
155 "is_canonical", False
156 )
157 stores = flask.session["publisher"].get("stores")
158 else:
159 user_name = None
160 user_is_canonical = False
161 stores = []
163 page_slug = template_utils.generate_slug(flask.request.path)
165 return {
166 # Variables
167 "LOGIN_URL": LOGIN_URL,
168 "SENTRY_DSN": SENTRY_DSN,
169 "COMMIT_ID": COMMIT_ID,
170 "ENVIRONMENT": ENVIRONMENT,
171 "host_url": flask.request.host_url,
172 "path": flask.request.path,
173 "page_slug": page_slug,
174 "user_name": user_name,
175 "VERIFIED_PUBLISHER": "verified",
176 "STAR_DEVELOPER": "starred",
177 "webapp_config": WEBAPP_CONFIG,
178 "BSI_URL": BSI_URL,
179 "now": datetime.now(),
180 "user_is_canonical": user_is_canonical,
181 # Functions
182 "contains": template_utils.contains,
183 "join": template_utils.join,
184 "static_url": template_utils.static_url,
185 "format_number": template_utils.format_number,
186 "format_display_name": template_utils.format_display_name,
187 "display_name": template_utils.display_name,
188 "install_snippet": template_utils.install_snippet,
189 "format_date": template_utils.format_date,
190 "format_member_role": template_utils.format_member_role,
191 "image": image_template,
192 "stores": stores,
193 "format_link": template_utils.format_link,
194 "DNS_VERIFICATION_SALT": DNS_VERIFICATION_SALT,
195 }
198def set_handlers(app):
199 @app.context_processor
200 def utility_processor():
201 """
202 This defines the set of properties and functions that will be added
203 to the default context for processing templates. All these items
204 can be used in all templates
205 """
207 return snapcraft_utility_processor()
209 # Error handlers
210 # ===
211 @app.errorhandler(500)
212 @app.errorhandler(501)
213 @app.errorhandler(502)
214 @app.errorhandler(504)
215 @app.errorhandler(505)
216 def internal_error(error):
217 error_name = getattr(error, "name", type(error).__name__)
218 return_code = getattr(error, "code", 500)
220 if not app.testing:
221 app.extensions["sentry"].captureException()
223 return (
224 flask.render_template("50X.html", error_name=error_name),
225 return_code,
226 )
228 @app.errorhandler(503)
229 def service_unavailable(error):
230 return render_template("503.html"), 503
232 @app.errorhandler(404)
233 @app.errorhandler(StoreApiResourceNotFound)
234 def handle_resource_not_found(error):
235 return render_template("404.html", message=str(error)), 404
237 @app.errorhandler(ApiTimeoutError)
238 @app.errorhandler(StoreApiTimeoutError)
239 def handle_connection_timeout(error):
240 status_code = 504
241 return (
242 render_template(
243 "50X.html", error_message=str(error), status_code=status_code
244 ),
245 status_code,
246 )
248 @app.errorhandler(ApiResponseDecodeError)
249 @app.errorhandler(ApiResponseError)
250 @app.errorhandler(ApiConnectionError)
251 @app.errorhandler(StoreApiResponseDecodeError)
252 @app.errorhandler(StoreApiResponseError)
253 @app.errorhandler(StoreApiConnectionError)
254 @app.errorhandler(ApiError)
255 @app.errorhandler(StoreApiError)
256 def store_api_error(error):
257 status_code = 502
258 return (
259 render_template(
260 "50X.html", error_message=str(error), status_code=status_code
261 ),
262 status_code,
263 )
265 @app.errorhandler(ApiResponseErrorList)
266 @app.errorhandler(StoreApiResponseErrorList)
267 def handle_api_error_list(error):
268 if error.status_code == 404:
269 if "snap_name" in request.path:
270 return flask.abort(404, "Snap not found!")
271 else:
272 return (
273 render_template("404.html", message="Entity not found"),
274 404,
275 )
276 if len(error.errors) == 1 and error.errors[0]["code"] in [
277 "macaroon-permission-required",
278 "macaroon-authorization-required",
279 ]:
280 authentication.empty_session(flask.session)
281 return flask.redirect(f"/login?next={flask.request.path}")
283 status_code = 502
284 codes = [
285 f"{error['code']}: {error.get('message', 'No message')}"
286 for error in error.errors
287 ]
289 error_msg = ", ".join(codes)
290 return (
291 render_template(
292 "50X.html", error_message=error_msg, status_code=status_code
293 ),
294 status_code,
295 )
297 # Publisher error
298 @app.errorhandler(PublisherMissingUsername)
299 def handle_publisher_missing_name(error):
300 return flask.redirect(flask.url_for("account.get_account_name"))
302 @app.errorhandler(PublisherAgreementNotSigned)
303 def handle_publisher_agreement_not_signed(error):
304 return flask.redirect(flask.url_for("account.get_agreement"))
306 @app.errorhandler(PublisherMacaroonRefreshRequired)
307 def handle_publisher_macaroon_refresh_required(error):
308 return refresh_redirect(flask.request.path)
310 # Global tasks for all requests
311 # ===
312 @app.before_request
313 def clear_trailing():
314 """
315 Remove trailing slashes from all routes
316 We like our URLs without slashes
317 """
319 parsed_url = urlparse(unquote(flask.request.url))
320 path = parsed_url.path
322 if path != "/" and path.endswith("/"):
323 new_uri = urlunparse(parsed_url._replace(path=path[:-1]))
325 return flask.redirect(new_uri)
327 @app.before_request
328 def prometheus_metrics():
329 # Accept-encoding counter
330 # ===
331 agent_string = flask.request.headers.get("User-Agent")
333 # Exclude probes, which happen behind the cache
334 if agent_string and not agent_string.startswith(
335 ("kube-probe", "Prometheus")
336 ):
337 agent = user_agents.parse(agent_string or "")
339 accept_encoding_counter.labels(
340 accept_encoding=flask.request.headers.get("Accept-Encoding"),
341 browser_family=agent.browser.family,
342 ).inc()
344 # Badge counters
345 # ===
346 if "/static/images/badges" in flask.request.url:
347 if flask.session:
348 badge_logged_in_counter.inc()
349 else:
350 badge_counter.inc()
352 # Calculate the SHA256 hash of the script content and encode it in base64.
353 def calculate_sha256_base64(script_content):
354 sha256_hash = hashlib.sha256(script_content.encode()).digest()
355 return "sha256-" + base64.b64encode(sha256_hash).decode()
357 def get_csp_directive(content, regex):
358 directive_items = set()
359 pattern = re.compile(regex)
360 matched_contents = pattern.findall(content)
361 for matched_content in matched_contents:
362 hash_value = f"'{calculate_sha256_base64(matched_content)}'"
363 directive_items.add(hash_value)
364 return list(directive_items)
366 # Find all script elements in the response and add their hashes to the CSP.
367 def add_script_hashes_to_csp(response):
368 response.freeze()
369 decoded_content = b"".join(response.response).decode(
370 "utf-8", errors="replace"
371 )
373 CSP["script-src"] = CSP_SCRIPT_SRC + get_csp_directive(
374 decoded_content, r'onclick\s*=\s*"(.*?)"'
375 )
376 return CSP
378 @app.after_request
379 def add_headers(response):
380 """
381 Generic rules for headers to add to all requests
383 - X-Hostname: Mention the name of the host/pod running the application
384 - Cache-Control: Add cache-control headers for public and private pages
385 - Content-Security-Policy: Restrict resources (e.g., JavaScript, CSS,
386 Images) and URLs
387 - Referrer-Policy: Limit referrer data for security while preserving
388 full referrer for same-origin requests
389 - Cross-Origin-Embedder-Policy: allows embedding cross-origin
390 resources
391 - Cross-Origin-Opener-Policy: enable the page to open pop-ups while
392 maintaining same-origin policy
393 - Cross-Origin-Resource-Policy: allowing cross-origin requests to
394 access the resource
395 - X-Permitted-Cross-Domain-Policies: disallows cross-domain access to
396 resources
397 """
399 response.headers["X-Hostname"] = socket.gethostname()
401 if response.status_code == 200:
402 if flask.session:
403 response.headers["Cache-Control"] = "private"
404 else:
405 # Only add caching headers to successful responses
406 if not response.headers.get("Cache-Control"):
407 response.headers["Cache-Control"] = ", ".join(
408 {
409 "public",
410 "max-age=61",
411 "stale-while-revalidate=300",
412 "stale-if-error=86400",
413 }
414 )
415 csp = add_script_hashes_to_csp(response)
416 response.headers["Content-Security-Policy"] = helpers.get_csp_as_str(
417 csp
418 )
419 response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
420 response.headers["Cross-Origin-Embedder-Policy"] = "unsafe-none"
421 response.headers["Cross-Origin-Opener-Policy"] = (
422 "same-origin-allow-popups"
423 )
424 response.headers["Cross-Origin-Resource-Policy"] = "cross-origin"
425 response.headers["X-Permitted-Cross-Domain-Policies"] = "none"
426 return response