Coverage for webapp/handlers.py: 92%

145 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-15 22:43 +0000

1import socket 

2from urllib.parse import unquote, urlparse, urlunparse 

3 

4import base64 

5import hashlib 

6import re 

7import sentry_sdk 

8 

9import flask 

10from flask import render_template, request 

11import webapp.template_utils as template_utils 

12from canonicalwebteam import image_template 

13from webapp import authentication 

14import webapp.helpers as helpers 

15from webapp.config import ( 

16 BSI_URL, 

17 LOGIN_URL, 

18 SENTRY_DSN, 

19 COMMIT_ID, 

20 ENVIRONMENT, 

21 WEBAPP_CONFIG, 

22 DNS_VERIFICATION_SALT, 

23 IS_DEVELOPMENT, 

24 VITE_PORT, 

25 ANALYTICS_ENDPOINT, 

26 DEFAULT_ICON_URL, 

27) 

28 

29from canonicalwebteam.exceptions import ( 

30 StoreApiError, 

31 StoreApiConnectionError, 

32 StoreApiResourceNotFound, 

33 StoreApiResponseDecodeError, 

34 StoreApiResponseError, 

35 StoreApiResponseErrorList, 

36 StoreApiTimeoutError, 

37 PublisherAgreementNotSigned, 

38 PublisherMacaroonRefreshRequired, 

39 PublisherMissingUsername, 

40) 

41 

42from webapp.api.exceptions import ( 

43 ApiError, 

44 ApiConnectionError, 

45 ApiResponseErrorList, 

46 ApiTimeoutError, 

47 ApiResponseDecodeError, 

48 ApiResponseError, 

49) 

50 

51from datetime import datetime 

52 

53CSP = { 

54 "default-src": ["'self'"], 

55 "img-src": [ 

56 "data: blob:", 

57 # This is needed to allow images from 

58 # https://www.google.*/ads/ga-audiences to load. 

59 "*", 

60 ], 

61 "script-src-elem": [ 

62 "'self'", 

63 "data:", 

64 "assets.ubuntu.com", 

65 "www.googletagmanager.com", 

66 "www.youtube.com", 

67 "asciinema.org", 

68 "player.vimeo.com", 

69 "plausible.io", 

70 "script.crazyegg.com", 

71 "w.usabilla.com", 

72 "connect.facebook.net", 

73 "snap.licdn.com", 

74 "challenges.cloudflare.com", 

75 # This is necessary for Google Tag Manager to function properly. 

76 "'unsafe-inline'", 

77 ], 

78 "font-src": [ 

79 "'self'", 

80 "assets.ubuntu.com", 

81 ], 

82 "script-src": [], 

83 "connect-src": [ 

84 "'self'", 

85 "ubuntu.com", 

86 "analytics.google.com", 

87 "*.analytics.google.com", 

88 "stats.g.doubleclick.net", 

89 "www.googletagmanager.com", 

90 "sentry.is.canonical.com", 

91 "www.google-analytics.com", 

92 "plausible.io", 

93 "*.crazyegg.com", 

94 "www.facebook.com", 

95 "px.ads.linkedin.com", 

96 "*.snapcraft.io", 

97 "*.snapcraftcontent.com", 

98 "marketplace-analytics.staging.canonical.com", 

99 "marketplace-analytics.canonical.com", 

100 "challenges.cloudflare.com", 

101 "www.google.com", 

102 ], 

103 "frame-src": [ 

104 "'self'", 

105 "td.doubleclick.net", 

106 "www.youtube.com", 

107 "youtube.com", 

108 "asciinema.org", 

109 "player.vimeo.com", 

110 "snapcraft.io", 

111 "www.facebook.com", 

112 "challenges.cloudflare.com", 

113 "snap:", 

114 ], 

115 "style-src": [ 

116 "'self'", 

117 "'unsafe-inline'", 

118 ], 

119 "media-src": [ 

120 "'self'", 

121 "res.cloudinary.com", 

122 ], 

123} 

124 

125CSP_SCRIPT_SRC = [ 

126 "'self'", 

127 "data:", 

128 "blob:", 

129 "'unsafe-eval'", 

130 "'unsafe-hashes'", 

131] 

132 

133# Vite integration 

134if IS_DEVELOPMENT: 

135 CSP["script-src-elem"].append(f"localhost:{VITE_PORT}") 

136 CSP["connect-src"].append(f"localhost:{VITE_PORT}") 

137 CSP["connect-src"].append(f"ws://localhost:{VITE_PORT}") 

138 CSP["style-src"].append(f"localhost:{VITE_PORT}") 

139 CSP_SCRIPT_SRC.append(f"localhost:{VITE_PORT}") 

140 

141 

142def refresh_redirect(): 

143 try: 

144 macaroon_discharge = authentication.get_refreshed_discharge( 

145 flask.session["macaroon_discharge"] 

146 ) 

147 except ApiResponseError as api_response_error: 

148 if api_response_error.status_code == 401: 

149 return flask.redirect(flask.url_for("login.logout")) 

150 else: 

151 return flask.abort(502, str(api_response_error)) 

152 except ApiError as api_error: 

153 return flask.abort(502, str(api_error)) 

154 

155 flask.session["macaroon_discharge"] = macaroon_discharge 

156 return flask.redirect( 

157 flask.url_for( 

158 flask.request.endpoint, 

159 **flask.request.view_args, 

160 **flask.request.args, 

161 ) 

162 ) 

163 

164 

165def snapcraft_utility_processor(): 

166 if authentication.is_authenticated(flask.session): 

167 user_name = flask.session["publisher"]["fullname"] 

168 user_is_canonical = flask.session["publisher"].get( 

169 "is_canonical", False 

170 ) 

171 stores = flask.session["publisher"].get("stores") 

172 else: 

173 user_name = None 

174 user_is_canonical = False 

175 stores = [] 

176 

177 page_slug = template_utils.generate_slug(flask.request.path) 

178 

179 return { 

180 # Variables 

181 "LOGIN_URL": LOGIN_URL, 

182 "SENTRY_DSN": SENTRY_DSN, 

183 "COMMIT_ID": COMMIT_ID, 

184 "ENVIRONMENT": ENVIRONMENT, 

185 "host_url": flask.request.host_url, 

186 "path": flask.request.path, 

187 "page_slug": page_slug, 

188 "user_name": user_name, 

189 "VERIFIED_PUBLISHER": "verified", 

190 "STAR_DEVELOPER": "starred", 

191 "webapp_config": WEBAPP_CONFIG, 

192 "BSI_URL": BSI_URL, 

193 "now": datetime.now(), 

194 "user_is_canonical": user_is_canonical, 

195 # Functions 

196 "contains": template_utils.contains, 

197 "join": template_utils.join, 

198 "static_url": template_utils.static_url, 

199 "IS_DEVELOPMENT": IS_DEVELOPMENT, 

200 "format_number": template_utils.format_number, 

201 "format_display_name": template_utils.format_display_name, 

202 "display_name": template_utils.display_name, 

203 "install_snippet": template_utils.install_snippet, 

204 "format_date": template_utils.format_date, 

205 "format_member_role": template_utils.format_member_role, 

206 "image": image_template, 

207 "stores": stores, 

208 "format_link": template_utils.format_link, 

209 "DNS_VERIFICATION_SALT": DNS_VERIFICATION_SALT, 

210 "ANALYTICS_ENDPOINT": ANALYTICS_ENDPOINT, 

211 "DEFAULT_ICON_URL": DEFAULT_ICON_URL, 

212 } 

213 

214 

215def set_handlers(app): 

216 @app.context_processor 

217 def utility_processor(): 

218 """ 

219 This defines the set of properties and functions that will be added 

220 to the default context for processing templates. All these items 

221 can be used in all templates 

222 """ 

223 

224 return snapcraft_utility_processor() 

225 

226 # Error handlers 

227 # === 

228 @app.errorhandler(500) 

229 @app.errorhandler(501) 

230 @app.errorhandler(502) 

231 @app.errorhandler(504) 

232 @app.errorhandler(505) 

233 def internal_error(error): 

234 error_name = getattr(error, "name", type(error).__name__) 

235 return_code = getattr(error, "code", 500) 

236 

237 if not app.testing: 

238 sentry_sdk.capture_exception() 

239 

240 return ( 

241 flask.render_template("50X.html", error_name=error_name), 

242 return_code, 

243 ) 

244 

245 @app.errorhandler(503) 

246 def service_unavailable(error): 

247 return render_template("503.html"), 503 

248 

249 @app.errorhandler(404) 

250 @app.errorhandler(StoreApiResourceNotFound) 

251 def handle_resource_not_found(error): 

252 return render_template("404.html", message=str(error)), 404 

253 

254 @app.errorhandler(ApiTimeoutError) 

255 @app.errorhandler(StoreApiTimeoutError) 

256 def handle_connection_timeout(error): 

257 status_code = 504 

258 return ( 

259 render_template( 

260 "50X.html", error_message=str(error), status_code=status_code 

261 ), 

262 status_code, 

263 ) 

264 

265 @app.errorhandler(ApiResponseDecodeError) 

266 @app.errorhandler(ApiResponseError) 

267 @app.errorhandler(ApiConnectionError) 

268 @app.errorhandler(StoreApiResponseDecodeError) 

269 @app.errorhandler(StoreApiResponseError) 

270 @app.errorhandler(StoreApiConnectionError) 

271 @app.errorhandler(ApiError) 

272 @app.errorhandler(StoreApiError) 

273 def store_api_error(error): 

274 status_code = 502 

275 return ( 

276 render_template( 

277 "50X.html", error_message=str(error), status_code=status_code 

278 ), 

279 status_code, 

280 ) 

281 

282 @app.errorhandler(ApiResponseErrorList) 

283 @app.errorhandler(StoreApiResponseErrorList) 

284 def handle_api_error_list(error): 

285 if error.status_code == 404: 

286 if "snap_name" in request.path: 

287 return flask.abort(404, "Snap not found!") 

288 else: 

289 return ( 

290 render_template("404.html", message="Entity not found"), 

291 404, 

292 ) 

293 if len(error.errors) == 1 and error.errors[0]["code"] in [ 

294 "macaroon-permission-required", 

295 "macaroon-authorization-required", 

296 ]: 

297 authentication.reset_auth_session(flask.session) 

298 return flask.redirect( 

299 flask.url_for("login.login_handler", next=flask.request.path) 

300 ) 

301 

302 status_code = 502 

303 codes = [ 

304 f"{error['code']}: {error.get('message', 'No message')}" 

305 for error in error.errors 

306 ] 

307 

308 error_msg = ", ".join(codes) 

309 return ( 

310 render_template( 

311 "50X.html", error_message=error_msg, status_code=status_code 

312 ), 

313 status_code, 

314 ) 

315 

316 # Publisher error 

317 @app.errorhandler(PublisherMissingUsername) 

318 def handle_publisher_missing_name(error): 

319 return flask.redirect(flask.url_for("account.get_account_name")) 

320 

321 @app.errorhandler(PublisherAgreementNotSigned) 

322 def handle_publisher_agreement_not_signed(error): 

323 return flask.redirect(flask.url_for("account.get_agreement")) 

324 

325 @app.errorhandler(PublisherMacaroonRefreshRequired) 

326 def handle_publisher_macaroon_refresh_required(error): 

327 return refresh_redirect() 

328 

329 # Global tasks for all requests 

330 # === 

331 @app.before_request 

332 def clear_trailing(): 

333 """ 

334 Remove trailing slashes from all routes 

335 We like our URLs without slashes 

336 """ 

337 

338 parsed_url = urlparse(unquote(flask.request.url)) 

339 path = parsed_url.path 

340 

341 if path != "/" and path.endswith("/"): 

342 new_uri = urlunparse(parsed_url._replace(path=path[:-1])) 

343 

344 return flask.redirect(new_uri) 

345 

346 # Calculate the SHA256 hash of the script content and encode it in base64. 

347 def calculate_sha256_base64(script_content): 

348 sha256_hash = hashlib.sha256(script_content.encode()).digest() 

349 return "sha256-" + base64.b64encode(sha256_hash).decode() 

350 

351 def get_csp_directive(content, regex): 

352 directive_items = set() 

353 pattern = re.compile(regex) 

354 matched_contents = pattern.findall(content) 

355 for matched_content in matched_contents: 

356 hash_value = f"'{calculate_sha256_base64(matched_content)}'" 

357 directive_items.add(hash_value) 

358 return list(directive_items) 

359 

360 # Find all script elements in the response and add their hashes to the CSP. 

361 def add_script_hashes_to_csp(response): 

362 response.freeze() 

363 decoded_content = b"".join(response.response).decode( 

364 "utf-8", errors="replace" 

365 ) 

366 

367 CSP["script-src"] = CSP_SCRIPT_SRC + get_csp_directive( 

368 decoded_content, r'onclick\s*=\s*"(.*?)"' 

369 ) 

370 return CSP 

371 

372 @app.after_request 

373 def add_headers(response): 

374 """ 

375 Generic rules for headers to add to all requests 

376 

377 - X-Hostname: Mention the name of the host/pod running the application 

378 - Cache-Control: Add cache-control headers for public and private pages 

379 - Content-Security-Policy: Restrict resources (e.g., JavaScript, CSS, 

380 Images) and URLs 

381 - Referrer-Policy: Limit referrer data for security while preserving 

382 full referrer for same-origin requests 

383 - Cross-Origin-Embedder-Policy: allows embedding cross-origin 

384 resources 

385 - Cross-Origin-Opener-Policy: enable the page to open pop-ups while 

386 maintaining same-origin policy 

387 - Cross-Origin-Resource-Policy: allowing cross-origin requests to 

388 access the resource 

389 - X-Permitted-Cross-Domain-Policies: disallows cross-domain access to 

390 resources 

391 """ 

392 

393 response.headers["X-Hostname"] = socket.gethostname() 

394 

395 if response.status_code == 200: 

396 if flask.session: 

397 response.headers["Cache-Control"] = "private" 

398 else: 

399 # Only add caching headers to successful responses 

400 if not response.headers.get("Cache-Control"): 

401 response.headers["Cache-Control"] = ", ".join( 

402 { 

403 "public", 

404 "max-age=61", 

405 "stale-while-revalidate=300", 

406 "stale-if-error=86400", 

407 } 

408 ) 

409 csp = add_script_hashes_to_csp(response) 

410 response.headers["Content-Security-Policy"] = helpers.get_csp_as_str( 

411 csp 

412 ) 

413 response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin" 

414 response.headers["Cross-Origin-Embedder-Policy"] = "unsafe-none" 

415 response.headers["Cross-Origin-Opener-Policy"] = ( 

416 "same-origin-allow-popups" 

417 ) 

418 response.headers["Cross-Origin-Resource-Policy"] = "cross-origin" 

419 response.headers["X-Permitted-Cross-Domain-Policies"] = "none" 

420 return response