Coverage for webapp / handlers.py: 92%

157 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-02-05 22:09 +0000

1import socket 

2from urllib.parse import unquote, urlparse, urlunparse 

3 

4import base64 

5import copy 

6import hashlib 

7import re 

8import secrets 

9import sentry_sdk 

10 

11import flask 

12from flask import render_template, request 

13import webapp.template_utils as template_utils 

14from canonicalwebteam import image_template 

15from webapp import authentication 

16import webapp.helpers as helpers 

17from webapp.config import ( 

18 BSI_URL, 

19 LOGIN_URL, 

20 SENTRY_DSN, 

21 COMMIT_ID, 

22 ENVIRONMENT, 

23 WEBAPP_CONFIG, 

24 DNS_VERIFICATION_SALT, 

25 IS_DEVELOPMENT, 

26 VITE_PORT, 

27) 

28 

29from canonicalwebteam.exceptions import ( 

30 StoreApiError, 

31 StoreApiConnectionError, 

32 StoreApiResourceNotFound, 

33 StoreApiResponseDecodeError, 

34 StoreApiResponseError, 

35 StoreApiResponseErrorList, 

36 StoreApiTimeoutError, 

37 PublisherAgreementNotSigned, 

38 PublisherMacaroonRefreshRequired, 

39 PublisherMissingUsername, 

40) 

41 

42from webapp.api.exceptions import ( 

43 ApiError, 

44 ApiConnectionError, 

45 ApiResponseErrorList, 

46 ApiTimeoutError, 

47 ApiResponseDecodeError, 

48 ApiResponseError, 

49) 

50 

51from datetime import datetime 

52 

53 

54CSP = { 

55 "default-src": ["'self'"], 

56 "img-src": [ 

57 "data: blob:", 

58 # This is needed to allow images from 

59 # https://www.google.*/ads/ga-audiences to load. 

60 "*", 

61 ], 

62 "script-src-elem": [ 

63 "'self'", 

64 "assets.ubuntu.com", 

65 "www.googletagmanager.com", 

66 "www.youtube.com", 

67 "asciinema.org", 

68 "player.vimeo.com", 

69 "plausible.io", 

70 "script.crazyegg.com", 

71 "w.usabilla.com", 

72 "connect.facebook.net", 

73 "snap.licdn.com", 

74 ], 

75 "font-src": [ 

76 "'self'", 

77 "assets.ubuntu.com", 

78 ], 

79 "script-src": [], 

80 "connect-src": [ 

81 "'self'", 

82 "ubuntu.com", 

83 "*.analytics.google.com", 

84 "stats.g.doubleclick.net", 

85 "www.googletagmanager.com", 

86 "sentry.is.canonical.com", 

87 "www.google-analytics.com", 

88 "plausible.io", 

89 "*.crazyegg.com", 

90 "www.facebook.com", 

91 "px.ads.linkedin.com", 

92 "*.snapcraft.io", 

93 "*.snapcraftcontent.com", 

94 "www.google.com", 

95 ], 

96 "frame-src": [ 

97 "'self'", 

98 "td.doubleclick.net", 

99 "www.youtube.com", 

100 "youtube.com", 

101 "asciinema.org", 

102 "player.vimeo.com", 

103 "snapcraft.io", 

104 "www.facebook.com", 

105 "snap:", 

106 ], 

107 "style-src": [ 

108 "'self'", 

109 "'unsafe-inline'", 

110 ], 

111 "media-src": [ 

112 "'self'", 

113 "res.cloudinary.com", 

114 ], 

115} 

116 

117CSP_SCRIPT_SRC = [ 

118 "'self'", 

119 "blob:", 

120 "'unsafe-eval'", 

121 "'unsafe-hashes'", 

122] 

123 

124# Vite integration 

125if IS_DEVELOPMENT: 

126 CSP["script-src-elem"].append(f"localhost:{VITE_PORT}") 

127 CSP["connect-src"].append(f"localhost:{VITE_PORT}") 

128 CSP["connect-src"].append(f"ws://localhost:{VITE_PORT}") 

129 CSP["style-src"].append(f"localhost:{VITE_PORT}") 

130 CSP_SCRIPT_SRC.append(f"localhost:{VITE_PORT}") 

131 

132 

133def refresh_redirect(path): 

134 try: 

135 macaroon_discharge = authentication.get_refreshed_discharge( 

136 flask.session["macaroon_discharge"] 

137 ) 

138 except ApiResponseError as api_response_error: 

139 if api_response_error.status_code == 401: 

140 return flask.redirect(flask.url_for("login.logout")) 

141 else: 

142 return flask.abort(502, str(api_response_error)) 

143 except ApiError as api_error: 

144 return flask.abort(502, str(api_error)) 

145 

146 flask.session["macaroon_discharge"] = macaroon_discharge 

147 return flask.redirect(path) 

148 

149 

150def snapcraft_utility_processor(): 

151 if authentication.is_authenticated(flask.session): 

152 user_name = flask.session["publisher"]["fullname"] 

153 user_is_canonical = flask.session["publisher"].get( 

154 "is_canonical", False 

155 ) 

156 stores = flask.session["publisher"].get("stores") 

157 else: 

158 user_name = None 

159 user_is_canonical = False 

160 stores = [] 

161 

162 page_slug = template_utils.generate_slug(flask.request.path) 

163 

164 return { 

165 # Variables 

166 "LOGIN_URL": LOGIN_URL, 

167 "SENTRY_DSN": SENTRY_DSN, 

168 "COMMIT_ID": COMMIT_ID, 

169 "ENVIRONMENT": ENVIRONMENT, 

170 "host_url": flask.request.host_url, 

171 "path": flask.request.path, 

172 "page_slug": page_slug, 

173 "user_name": user_name, 

174 "VERIFIED_PUBLISHER": "verified", 

175 "STAR_DEVELOPER": "starred", 

176 "webapp_config": WEBAPP_CONFIG, 

177 "BSI_URL": BSI_URL, 

178 "now": datetime.now(), 

179 "user_is_canonical": user_is_canonical, 

180 "CSP_NONCE": getattr(request, "CSP_NONCE", ""), 

181 # Functions 

182 "contains": template_utils.contains, 

183 "join": template_utils.join, 

184 "static_url": template_utils.static_url, 

185 "IS_DEVELOPMENT": IS_DEVELOPMENT, 

186 "format_number": template_utils.format_number, 

187 "format_display_name": template_utils.format_display_name, 

188 "display_name": template_utils.display_name, 

189 "install_snippet": template_utils.install_snippet, 

190 "format_date": template_utils.format_date, 

191 "format_member_role": template_utils.format_member_role, 

192 "image": image_template, 

193 "stores": stores, 

194 "format_link": template_utils.format_link, 

195 "DNS_VERIFICATION_SALT": DNS_VERIFICATION_SALT, 

196 } 

197 

198 

199def set_handlers(app): 

200 @app.context_processor 

201 def utility_processor(): 

202 """ 

203 This defines the set of properties and functions that will be added 

204 to the default context for processing templates. All these items 

205 can be used in all templates 

206 """ 

207 

208 return snapcraft_utility_processor() 

209 

210 # Error handlers 

211 # === 

212 @app.errorhandler(500) 

213 @app.errorhandler(501) 

214 @app.errorhandler(502) 

215 @app.errorhandler(504) 

216 @app.errorhandler(505) 

217 def internal_error(error): 

218 error_name = getattr(error, "name", type(error).__name__) 

219 return_code = getattr(error, "code", 500) 

220 

221 if not app.testing: 

222 sentry_sdk.capture_exception() 

223 

224 return ( 

225 flask.render_template("50X.html", error_name=error_name), 

226 return_code, 

227 ) 

228 

229 @app.errorhandler(503) 

230 def service_unavailable(error): 

231 return render_template("503.html"), 503 

232 

233 @app.errorhandler(404) 

234 @app.errorhandler(StoreApiResourceNotFound) 

235 def handle_resource_not_found(error): 

236 return render_template("404.html", message=str(error)), 404 

237 

238 @app.errorhandler(ApiTimeoutError) 

239 @app.errorhandler(StoreApiTimeoutError) 

240 def handle_connection_timeout(error): 

241 status_code = 504 

242 return ( 

243 render_template( 

244 "50X.html", error_message=str(error), status_code=status_code 

245 ), 

246 status_code, 

247 ) 

248 

249 @app.errorhandler(ApiResponseDecodeError) 

250 @app.errorhandler(ApiResponseError) 

251 @app.errorhandler(ApiConnectionError) 

252 @app.errorhandler(StoreApiResponseDecodeError) 

253 @app.errorhandler(StoreApiResponseError) 

254 @app.errorhandler(StoreApiConnectionError) 

255 @app.errorhandler(ApiError) 

256 @app.errorhandler(StoreApiError) 

257 def store_api_error(error): 

258 status_code = 502 

259 return ( 

260 render_template( 

261 "50X.html", error_message=str(error), status_code=status_code 

262 ), 

263 status_code, 

264 ) 

265 

266 @app.errorhandler(ApiResponseErrorList) 

267 @app.errorhandler(StoreApiResponseErrorList) 

268 def handle_api_error_list(error): 

269 if error.status_code == 404: 

270 if "snap_name" in request.path: 

271 return flask.abort(404, "Snap not found!") 

272 else: 

273 return ( 

274 render_template("404.html", message="Entity not found"), 

275 404, 

276 ) 

277 if len(error.errors) == 1 and error.errors[0]["code"] in [ 

278 "macaroon-permission-required", 

279 "macaroon-authorization-required", 

280 ]: 

281 authentication.empty_session(flask.session) 

282 return flask.redirect(f"/login?next={flask.request.path}") 

283 

284 status_code = 502 

285 codes = [ 

286 f"{error['code']}: {error.get('message', 'No message')}" 

287 for error in error.errors 

288 ] 

289 

290 error_msg = ", ".join(codes) 

291 return ( 

292 render_template( 

293 "50X.html", error_message=error_msg, status_code=status_code 

294 ), 

295 status_code, 

296 ) 

297 

298 # Publisher error 

299 @app.errorhandler(PublisherMissingUsername) 

300 def handle_publisher_missing_name(error): 

301 return flask.redirect(flask.url_for("account.get_account_name")) 

302 

303 @app.errorhandler(PublisherAgreementNotSigned) 

304 def handle_publisher_agreement_not_signed(error): 

305 return flask.redirect(flask.url_for("account.get_agreement")) 

306 

307 @app.errorhandler(PublisherMacaroonRefreshRequired) 

308 def handle_publisher_macaroon_refresh_required(error): 

309 return refresh_redirect(flask.request.path) 

310 

311 # Global tasks for all requests 

312 # === 

313 @app.before_request 

314 def generate_nonce(): 

315 """ 

316 Generate a cryptographically secure random nonce for CSP 

317 """ 

318 request.CSP_NONCE = secrets.token_urlsafe(16) 

319 

320 @app.before_request 

321 def clear_trailing(): 

322 """ 

323 Remove trailing slashes from all routes 

324 We like our URLs without slashes 

325 """ 

326 

327 parsed_url = urlparse(unquote(flask.request.url)) 

328 path = parsed_url.path 

329 

330 if path != "/" and path.endswith("/"): 

331 new_uri = urlunparse(parsed_url._replace(path=path[:-1])) 

332 

333 return flask.redirect(new_uri) 

334 

335 # Calculate the SHA256 hash of the script content and encode it in base64. 

336 def calculate_sha256_base64(script_content): 

337 sha256_hash = hashlib.sha256(script_content.encode()).digest() 

338 return "sha256-" + base64.b64encode(sha256_hash).decode() 

339 

340 def get_csp_directive(content, regex): 

341 directive_items = set() 

342 pattern = re.compile(regex) 

343 matched_contents = pattern.findall(content) 

344 for matched_content in matched_contents: 

345 hash_value = f"'{calculate_sha256_base64(matched_content)}'" 

346 directive_items.add(hash_value) 

347 return list(directive_items) 

348 

349 # Find all script elements in the response and add their hashes to the CSP. 

350 # Also add nonce for use in inline script elements 

351 def add_script_hashes_and_nonce_to_csp(response): 

352 response.freeze() 

353 decoded_content = b"".join(response.response).decode( 

354 "utf-8", errors="replace" 

355 ) 

356 

357 # Create a fresh copy of CSP for this request to 

358 # prevent multiple nonces in the CSP headers 

359 request_csp = copy.deepcopy(CSP) 

360 

361 # Handle cases where CSP_NONCE might not be set 

362 csp_nonce = getattr(request, "CSP_NONCE", "") 

363 csp_nonce_value = f"'nonce-{csp_nonce}'" if csp_nonce else "" 

364 

365 # Include CSP_NONCE in script-src along with other directives 

366 script_src_values = CSP_SCRIPT_SRC + get_csp_directive( 

367 decoded_content, r'onclick\s*=\s*"(.*?)"' 

368 ) 

369 

370 if csp_nonce_value: 

371 request_csp["script-src-elem"] = CSP["script-src-elem"] + [ 

372 csp_nonce_value 

373 ] 

374 script_src_values.append(csp_nonce_value) 

375 

376 request_csp["script-src"] = script_src_values 

377 return request_csp 

378 

379 @app.after_request 

380 def add_headers(response): 

381 """ 

382 Generic rules for headers to add to all requests 

383 

384 - X-Hostname: Mention the name of the host/pod running the application 

385 - Cache-Control: Add cache-control headers for public and private pages 

386 - Content-Security-Policy: Restrict resources (e.g., JavaScript, CSS, 

387 Images) and URLs 

388 - Referrer-Policy: Limit referrer data for security while preserving 

389 full referrer for same-origin requests 

390 - Cross-Origin-Embedder-Policy: allows embedding cross-origin 

391 resources 

392 - Cross-Origin-Opener-Policy: enable the page to open pop-ups while 

393 maintaining same-origin policy 

394 - Cross-Origin-Resource-Policy: allowing cross-origin requests to 

395 access the resource 

396 - X-Permitted-Cross-Domain-Policies: disallows cross-domain access to 

397 resources 

398 """ 

399 

400 response.headers["X-Hostname"] = socket.gethostname() 

401 

402 if response.status_code == 200: 

403 if flask.session: 

404 response.headers["Cache-Control"] = "private" 

405 else: 

406 # Only add caching headers to successful responses 

407 if not response.headers.get("Cache-Control"): 

408 response.headers["Cache-Control"] = ", ".join( 

409 { 

410 "public", 

411 "max-age=61", 

412 "stale-while-revalidate=300", 

413 "stale-if-error=86400", 

414 } 

415 ) 

416 csp = add_script_hashes_and_nonce_to_csp(response) 

417 response.headers["Content-Security-Policy"] = helpers.get_csp_as_str( 

418 csp 

419 ) 

420 response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin" 

421 response.headers["Cross-Origin-Embedder-Policy"] = "unsafe-none" 

422 response.headers["Cross-Origin-Opener-Policy"] = ( 

423 "same-origin-allow-popups" 

424 ) 

425 response.headers["Cross-Origin-Resource-Policy"] = "cross-origin" 

426 response.headers["X-Permitted-Cross-Domain-Policies"] = "none" 

427 return response