Coverage for webapp/handlers.py: 92%

159 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-17 22:07 +0000

1import socket 

2from urllib.parse import unquote, urlparse, urlunparse 

3 

4import base64 

5import hashlib 

6import re 

7 

8import flask 

9from flask import render_template, request 

10import prometheus_client 

11import user_agents 

12import webapp.template_utils as template_utils 

13from canonicalwebteam import image_template 

14from webapp import authentication 

15import webapp.helpers as helpers 

16from webapp.config import ( 

17 BSI_URL, 

18 LOGIN_URL, 

19 SENTRY_DSN, 

20 COMMIT_ID, 

21 ENVIRONMENT, 

22 WEBAPP_CONFIG, 

23 DNS_VERIFICATION_SALT, 

24 IS_DEVELOPMENT, 

25 VITE_PORT, 

26) 

27 

28from canonicalwebteam.exceptions import ( 

29 StoreApiError, 

30 StoreApiConnectionError, 

31 StoreApiResourceNotFound, 

32 StoreApiResponseDecodeError, 

33 StoreApiResponseError, 

34 StoreApiResponseErrorList, 

35 StoreApiTimeoutError, 

36 PublisherAgreementNotSigned, 

37 PublisherMacaroonRefreshRequired, 

38 PublisherMissingUsername, 

39) 

40 

41from webapp.api.exceptions import ( 

42 ApiError, 

43 ApiConnectionError, 

44 ApiResponseErrorList, 

45 ApiTimeoutError, 

46 ApiResponseDecodeError, 

47 ApiResponseError, 

48) 

49 

50from datetime import datetime 

51 

52badge_counter = prometheus_client.Counter( 

53 "badge_counter", "A counter of badges requests" 

54) 

55 

56badge_logged_in_counter = prometheus_client.Counter( 

57 "badge_logged_in_counter", 

58 "A counter of badges requests of logged in users", 

59) 

60 

61accept_encoding_counter = prometheus_client.Counter( 

62 "accept_encoding_counter", 

63 "A counter for Accept-Encoding headers, split by browser", 

64 ["accept_encoding", "browser_family"], 

65) 

66 

67CSP = { 

68 "default-src": ["'self'"], 

69 "img-src": [ 

70 "data: blob:", 

71 # This is needed to allow images from 

72 # https://www.google.*/ads/ga-audiences to load. 

73 "*", 

74 ], 

75 "script-src-elem": [ 

76 "'self'", 

77 "assets.ubuntu.com", 

78 "www.googletagmanager.com", 

79 "www.youtube.com", 

80 "asciinema.org", 

81 "player.vimeo.com", 

82 "plausible.io", 

83 "script.crazyegg.com", 

84 "w.usabilla.com", 

85 "connect.facebook.net", 

86 "snap.licdn.com", 

87 # This is necessary for Google Tag Manager to function properly. 

88 "'unsafe-inline'", 

89 ], 

90 "font-src": [ 

91 "'self'", 

92 "assets.ubuntu.com", 

93 ], 

94 "script-src": [], 

95 "connect-src": [ 

96 "'self'", 

97 "ubuntu.com", 

98 "analytics.google.com", 

99 "stats.g.doubleclick.net", 

100 "www.googletagmanager.com", 

101 "sentry.is.canonical.com", 

102 "www.google-analytics.com", 

103 "plausible.io", 

104 "*.crazyegg.com", 

105 "www.facebook.com", 

106 "px.ads.linkedin.com", 

107 ], 

108 "frame-src": [ 

109 "'self'", 

110 "td.doubleclick.net", 

111 "www.youtube.com/", 

112 "asciinema.org", 

113 "player.vimeo.com", 

114 "snapcraft.io", 

115 "www.facebook.com", 

116 "snap:", 

117 ], 

118 "style-src": [ 

119 "'self'", 

120 "'unsafe-inline'", 

121 ], 

122 "media-src": [ 

123 "'self'", 

124 "res.cloudinary.com", 

125 ], 

126} 

127 

128CSP_SCRIPT_SRC = [ 

129 "'self'", 

130 "blob:", 

131 "'unsafe-eval'", 

132 "'unsafe-hashes'", 

133] 

134 

135# Vite integration 

136if IS_DEVELOPMENT: 

137 CSP["script-src-elem"].append(f"localhost:{VITE_PORT}") 

138 CSP["connect-src"].append(f"localhost:{VITE_PORT}") 

139 CSP["connect-src"].append(f"ws://localhost:{VITE_PORT}") 

140 CSP["style-src"].append(f"localhost:{VITE_PORT}") 

141 CSP_SCRIPT_SRC.append(f"localhost:{VITE_PORT}") 

142 

143 

144def refresh_redirect(path): 

145 try: 

146 macaroon_discharge = authentication.get_refreshed_discharge( 

147 flask.session["macaroon_discharge"] 

148 ) 

149 except ApiResponseError as api_response_error: 

150 if api_response_error.status_code == 401: 

151 return flask.redirect(flask.url_for("login.logout")) 

152 else: 

153 return flask.abort(502, str(api_response_error)) 

154 except ApiError as api_error: 

155 return flask.abort(502, str(api_error)) 

156 

157 flask.session["macaroon_discharge"] = macaroon_discharge 

158 return flask.redirect(path) 

159 

160 

161def snapcraft_utility_processor(): 

162 if authentication.is_authenticated(flask.session): 

163 user_name = flask.session["publisher"]["fullname"] 

164 user_is_canonical = flask.session["publisher"].get( 

165 "is_canonical", False 

166 ) 

167 stores = flask.session["publisher"].get("stores") 

168 else: 

169 user_name = None 

170 user_is_canonical = False 

171 stores = [] 

172 

173 page_slug = template_utils.generate_slug(flask.request.path) 

174 

175 return { 

176 # Variables 

177 "LOGIN_URL": LOGIN_URL, 

178 "SENTRY_DSN": SENTRY_DSN, 

179 "COMMIT_ID": COMMIT_ID, 

180 "ENVIRONMENT": ENVIRONMENT, 

181 "host_url": flask.request.host_url, 

182 "path": flask.request.path, 

183 "page_slug": page_slug, 

184 "user_name": user_name, 

185 "VERIFIED_PUBLISHER": "verified", 

186 "STAR_DEVELOPER": "starred", 

187 "webapp_config": WEBAPP_CONFIG, 

188 "BSI_URL": BSI_URL, 

189 "now": datetime.now(), 

190 "user_is_canonical": user_is_canonical, 

191 # Functions 

192 "contains": template_utils.contains, 

193 "join": template_utils.join, 

194 "static_url": template_utils.static_url, 

195 "IS_DEVELOPMENT": IS_DEVELOPMENT, 

196 "vite_import": template_utils.vite_import, 

197 "vite_dev_tools": template_utils.vite_dev_tools, 

198 "format_number": template_utils.format_number, 

199 "format_display_name": template_utils.format_display_name, 

200 "display_name": template_utils.display_name, 

201 "install_snippet": template_utils.install_snippet, 

202 "format_date": template_utils.format_date, 

203 "format_member_role": template_utils.format_member_role, 

204 "image": image_template, 

205 "stores": stores, 

206 "format_link": template_utils.format_link, 

207 "DNS_VERIFICATION_SALT": DNS_VERIFICATION_SALT, 

208 } 

209 

210 

211def set_handlers(app): 

212 @app.context_processor 

213 def utility_processor(): 

214 """ 

215 This defines the set of properties and functions that will be added 

216 to the default context for processing templates. All these items 

217 can be used in all templates 

218 """ 

219 

220 return snapcraft_utility_processor() 

221 

222 # Error handlers 

223 # === 

224 @app.errorhandler(500) 

225 @app.errorhandler(501) 

226 @app.errorhandler(502) 

227 @app.errorhandler(504) 

228 @app.errorhandler(505) 

229 def internal_error(error): 

230 error_name = getattr(error, "name", type(error).__name__) 

231 return_code = getattr(error, "code", 500) 

232 

233 if not app.testing: 

234 app.extensions["sentry"].captureException() 

235 

236 return ( 

237 flask.render_template("50X.html", error_name=error_name), 

238 return_code, 

239 ) 

240 

241 @app.errorhandler(503) 

242 def service_unavailable(error): 

243 return render_template("503.html"), 503 

244 

245 @app.errorhandler(404) 

246 @app.errorhandler(StoreApiResourceNotFound) 

247 def handle_resource_not_found(error): 

248 return render_template("404.html", message=str(error)), 404 

249 

250 @app.errorhandler(ApiTimeoutError) 

251 @app.errorhandler(StoreApiTimeoutError) 

252 def handle_connection_timeout(error): 

253 status_code = 504 

254 return ( 

255 render_template( 

256 "50X.html", error_message=str(error), status_code=status_code 

257 ), 

258 status_code, 

259 ) 

260 

261 @app.errorhandler(ApiResponseDecodeError) 

262 @app.errorhandler(ApiResponseError) 

263 @app.errorhandler(ApiConnectionError) 

264 @app.errorhandler(StoreApiResponseDecodeError) 

265 @app.errorhandler(StoreApiResponseError) 

266 @app.errorhandler(StoreApiConnectionError) 

267 @app.errorhandler(ApiError) 

268 @app.errorhandler(StoreApiError) 

269 def store_api_error(error): 

270 status_code = 502 

271 return ( 

272 render_template( 

273 "50X.html", error_message=str(error), status_code=status_code 

274 ), 

275 status_code, 

276 ) 

277 

278 @app.errorhandler(ApiResponseErrorList) 

279 @app.errorhandler(StoreApiResponseErrorList) 

280 def handle_api_error_list(error): 

281 if error.status_code == 404: 

282 if "snap_name" in request.path: 

283 return flask.abort(404, "Snap not found!") 

284 else: 

285 return ( 

286 render_template("404.html", message="Entity not found"), 

287 404, 

288 ) 

289 if len(error.errors) == 1 and error.errors[0]["code"] in [ 

290 "macaroon-permission-required", 

291 "macaroon-authorization-required", 

292 ]: 

293 authentication.empty_session(flask.session) 

294 return flask.redirect(f"/login?next={flask.request.path}") 

295 

296 status_code = 502 

297 codes = [ 

298 f"{error['code']}: {error.get('message', 'No message')}" 

299 for error in error.errors 

300 ] 

301 

302 error_msg = ", ".join(codes) 

303 return ( 

304 render_template( 

305 "50X.html", error_message=error_msg, status_code=status_code 

306 ), 

307 status_code, 

308 ) 

309 

310 # Publisher error 

311 @app.errorhandler(PublisherMissingUsername) 

312 def handle_publisher_missing_name(error): 

313 return flask.redirect(flask.url_for("account.get_account_name")) 

314 

315 @app.errorhandler(PublisherAgreementNotSigned) 

316 def handle_publisher_agreement_not_signed(error): 

317 return flask.redirect(flask.url_for("account.get_agreement")) 

318 

319 @app.errorhandler(PublisherMacaroonRefreshRequired) 

320 def handle_publisher_macaroon_refresh_required(error): 

321 return refresh_redirect(flask.request.path) 

322 

323 # Global tasks for all requests 

324 # === 

325 @app.before_request 

326 def clear_trailing(): 

327 """ 

328 Remove trailing slashes from all routes 

329 We like our URLs without slashes 

330 """ 

331 

332 parsed_url = urlparse(unquote(flask.request.url)) 

333 path = parsed_url.path 

334 

335 if path != "/" and path.endswith("/"): 

336 new_uri = urlunparse(parsed_url._replace(path=path[:-1])) 

337 

338 return flask.redirect(new_uri) 

339 

340 @app.before_request 

341 def prometheus_metrics(): 

342 # Accept-encoding counter 

343 # === 

344 agent_string = flask.request.headers.get("User-Agent") 

345 

346 # Exclude probes, which happen behind the cache 

347 if agent_string and not agent_string.startswith( 

348 ("kube-probe", "Prometheus") 

349 ): 

350 agent = user_agents.parse(agent_string or "") 

351 

352 accept_encoding_counter.labels( 

353 accept_encoding=flask.request.headers.get("Accept-Encoding"), 

354 browser_family=agent.browser.family, 

355 ).inc() 

356 

357 # Badge counters 

358 # === 

359 if "/static/images/badges" in flask.request.url: 

360 if flask.session: 

361 badge_logged_in_counter.inc() 

362 else: 

363 badge_counter.inc() 

364 

365 # Calculate the SHA256 hash of the script content and encode it in base64. 

366 def calculate_sha256_base64(script_content): 

367 sha256_hash = hashlib.sha256(script_content.encode()).digest() 

368 return "sha256-" + base64.b64encode(sha256_hash).decode() 

369 

370 def get_csp_directive(content, regex): 

371 directive_items = set() 

372 pattern = re.compile(regex) 

373 matched_contents = pattern.findall(content) 

374 for matched_content in matched_contents: 

375 hash_value = f"'{calculate_sha256_base64(matched_content)}'" 

376 directive_items.add(hash_value) 

377 return list(directive_items) 

378 

379 # Find all script elements in the response and add their hashes to the CSP. 

380 def add_script_hashes_to_csp(response): 

381 response.freeze() 

382 decoded_content = b"".join(response.response).decode( 

383 "utf-8", errors="replace" 

384 ) 

385 

386 CSP["script-src"] = CSP_SCRIPT_SRC + get_csp_directive( 

387 decoded_content, r'onclick\s*=\s*"(.*?)"' 

388 ) 

389 return CSP 

390 

391 @app.after_request 

392 def add_headers(response): 

393 """ 

394 Generic rules for headers to add to all requests 

395 

396 - X-Hostname: Mention the name of the host/pod running the application 

397 - Cache-Control: Add cache-control headers for public and private pages 

398 - Content-Security-Policy: Restrict resources (e.g., JavaScript, CSS, 

399 Images) and URLs 

400 - Referrer-Policy: Limit referrer data for security while preserving 

401 full referrer for same-origin requests 

402 - Cross-Origin-Embedder-Policy: allows embedding cross-origin 

403 resources 

404 - Cross-Origin-Opener-Policy: enable the page to open pop-ups while 

405 maintaining same-origin policy 

406 - Cross-Origin-Resource-Policy: allowing cross-origin requests to 

407 access the resource 

408 - X-Permitted-Cross-Domain-Policies: disallows cross-domain access to 

409 resources 

410 """ 

411 

412 response.headers["X-Hostname"] = socket.gethostname() 

413 

414 if response.status_code == 200: 

415 if flask.session: 

416 response.headers["Cache-Control"] = "private" 

417 else: 

418 # Only add caching headers to successful responses 

419 if not response.headers.get("Cache-Control"): 

420 response.headers["Cache-Control"] = ", ".join( 

421 { 

422 "public", 

423 "max-age=61", 

424 "stale-while-revalidate=300", 

425 "stale-if-error=86400", 

426 } 

427 ) 

428 csp = add_script_hashes_to_csp(response) 

429 response.headers["Content-Security-Policy"] = helpers.get_csp_as_str( 

430 csp 

431 ) 

432 response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin" 

433 response.headers["Cross-Origin-Embedder-Policy"] = "unsafe-none" 

434 response.headers["Cross-Origin-Opener-Policy"] = ( 

435 "same-origin-allow-popups" 

436 ) 

437 response.headers["Cross-Origin-Resource-Policy"] = "cross-origin" 

438 response.headers["X-Permitted-Cross-Domain-Policies"] = "none" 

439 return response