Coverage for webapp/handlers.py: 91%

145 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-14 22:07 +0000

1import socket 

2from urllib.parse import unquote, urlparse, urlunparse 

3 

4import base64 

5import hashlib 

6import re 

7import sentry_sdk 

8 

9import flask 

10from flask import render_template, request 

11import webapp.template_utils as template_utils 

12from canonicalwebteam import image_template 

13from webapp import authentication 

14import webapp.helpers as helpers 

15from webapp.config import ( 

16 BSI_URL, 

17 LOGIN_URL, 

18 SENTRY_DSN, 

19 COMMIT_ID, 

20 ENVIRONMENT, 

21 WEBAPP_CONFIG, 

22 DNS_VERIFICATION_SALT, 

23 IS_DEVELOPMENT, 

24 VITE_PORT, 

25) 

26 

27from canonicalwebteam.exceptions import ( 

28 StoreApiError, 

29 StoreApiConnectionError, 

30 StoreApiResourceNotFound, 

31 StoreApiResponseDecodeError, 

32 StoreApiResponseError, 

33 StoreApiResponseErrorList, 

34 StoreApiTimeoutError, 

35 PublisherAgreementNotSigned, 

36 PublisherMacaroonRefreshRequired, 

37 PublisherMissingUsername, 

38) 

39 

40from webapp.api.exceptions import ( 

41 ApiError, 

42 ApiConnectionError, 

43 ApiResponseErrorList, 

44 ApiTimeoutError, 

45 ApiResponseDecodeError, 

46 ApiResponseError, 

47) 

48 

49from datetime import datetime 

50 

51 

52CSP = { 

53 "default-src": ["'self'"], 

54 "img-src": [ 

55 "data: blob:", 

56 # This is needed to allow images from 

57 # https://www.google.*/ads/ga-audiences to load. 

58 "*", 

59 ], 

60 "script-src-elem": [ 

61 "'self'", 

62 "assets.ubuntu.com", 

63 "www.googletagmanager.com", 

64 "www.youtube.com", 

65 "asciinema.org", 

66 "player.vimeo.com", 

67 "plausible.io", 

68 "script.crazyegg.com", 

69 "w.usabilla.com", 

70 "connect.facebook.net", 

71 "snap.licdn.com", 

72 # This is necessary for Google Tag Manager to function properly. 

73 "'unsafe-inline'", 

74 ], 

75 "font-src": [ 

76 "'self'", 

77 "assets.ubuntu.com", 

78 ], 

79 "script-src": [], 

80 "connect-src": [ 

81 "'self'", 

82 "ubuntu.com", 

83 "analytics.google.com", 

84 "stats.g.doubleclick.net", 

85 "www.googletagmanager.com", 

86 "sentry.is.canonical.com", 

87 "www.google-analytics.com", 

88 "plausible.io", 

89 "*.crazyegg.com", 

90 "www.facebook.com", 

91 "px.ads.linkedin.com", 

92 ], 

93 "frame-src": [ 

94 "'self'", 

95 "td.doubleclick.net", 

96 "www.youtube.com/", 

97 "asciinema.org", 

98 "player.vimeo.com", 

99 "snapcraft.io", 

100 "www.facebook.com", 

101 "snap:", 

102 ], 

103 "style-src": [ 

104 "'self'", 

105 "'unsafe-inline'", 

106 ], 

107 "media-src": [ 

108 "'self'", 

109 "res.cloudinary.com", 

110 ], 

111} 

112 

113CSP_SCRIPT_SRC = [ 

114 "'self'", 

115 "blob:", 

116 "'unsafe-eval'", 

117 "'unsafe-hashes'", 

118] 

119 

120# Vite integration 

121if IS_DEVELOPMENT: 

122 CSP["script-src-elem"].append(f"localhost:{VITE_PORT}") 

123 CSP["connect-src"].append(f"localhost:{VITE_PORT}") 

124 CSP["connect-src"].append(f"ws://localhost:{VITE_PORT}") 

125 CSP["style-src"].append(f"localhost:{VITE_PORT}") 

126 CSP_SCRIPT_SRC.append(f"localhost:{VITE_PORT}") 

127 

128 

129def refresh_redirect(path): 

130 try: 

131 macaroon_discharge = authentication.get_refreshed_discharge( 

132 flask.session["macaroon_discharge"] 

133 ) 

134 except ApiResponseError as api_response_error: 

135 if api_response_error.status_code == 401: 

136 return flask.redirect(flask.url_for("login.logout")) 

137 else: 

138 return flask.abort(502, str(api_response_error)) 

139 except ApiError as api_error: 

140 return flask.abort(502, str(api_error)) 

141 

142 flask.session["macaroon_discharge"] = macaroon_discharge 

143 return flask.redirect(path) 

144 

145 

146def snapcraft_utility_processor(): 

147 if authentication.is_authenticated(flask.session): 

148 user_name = flask.session["publisher"]["fullname"] 

149 user_is_canonical = flask.session["publisher"].get( 

150 "is_canonical", False 

151 ) 

152 stores = flask.session["publisher"].get("stores") 

153 else: 

154 user_name = None 

155 user_is_canonical = False 

156 stores = [] 

157 

158 page_slug = template_utils.generate_slug(flask.request.path) 

159 

160 return { 

161 # Variables 

162 "LOGIN_URL": LOGIN_URL, 

163 "SENTRY_DSN": SENTRY_DSN, 

164 "COMMIT_ID": COMMIT_ID, 

165 "ENVIRONMENT": ENVIRONMENT, 

166 "host_url": flask.request.host_url, 

167 "path": flask.request.path, 

168 "page_slug": page_slug, 

169 "user_name": user_name, 

170 "VERIFIED_PUBLISHER": "verified", 

171 "STAR_DEVELOPER": "starred", 

172 "webapp_config": WEBAPP_CONFIG, 

173 "BSI_URL": BSI_URL, 

174 "now": datetime.now(), 

175 "user_is_canonical": user_is_canonical, 

176 # Functions 

177 "contains": template_utils.contains, 

178 "join": template_utils.join, 

179 "static_url": template_utils.static_url, 

180 "IS_DEVELOPMENT": IS_DEVELOPMENT, 

181 "vite_import": template_utils.vite_import, 

182 "vite_dev_tools": template_utils.vite_dev_tools, 

183 "format_number": template_utils.format_number, 

184 "format_display_name": template_utils.format_display_name, 

185 "display_name": template_utils.display_name, 

186 "install_snippet": template_utils.install_snippet, 

187 "format_date": template_utils.format_date, 

188 "format_member_role": template_utils.format_member_role, 

189 "image": image_template, 

190 "stores": stores, 

191 "format_link": template_utils.format_link, 

192 "DNS_VERIFICATION_SALT": DNS_VERIFICATION_SALT, 

193 } 

194 

195 

196def set_handlers(app): 

197 @app.context_processor 

198 def utility_processor(): 

199 """ 

200 This defines the set of properties and functions that will be added 

201 to the default context for processing templates. All these items 

202 can be used in all templates 

203 """ 

204 

205 return snapcraft_utility_processor() 

206 

207 # Error handlers 

208 # === 

209 @app.errorhandler(500) 

210 @app.errorhandler(501) 

211 @app.errorhandler(502) 

212 @app.errorhandler(504) 

213 @app.errorhandler(505) 

214 def internal_error(error): 

215 error_name = getattr(error, "name", type(error).__name__) 

216 return_code = getattr(error, "code", 500) 

217 

218 if not app.testing: 

219 sentry_sdk.capture_exception() 

220 

221 return ( 

222 flask.render_template("50X.html", error_name=error_name), 

223 return_code, 

224 ) 

225 

226 @app.errorhandler(503) 

227 def service_unavailable(error): 

228 return render_template("503.html"), 503 

229 

230 @app.errorhandler(404) 

231 @app.errorhandler(StoreApiResourceNotFound) 

232 def handle_resource_not_found(error): 

233 return render_template("404.html", message=str(error)), 404 

234 

235 @app.errorhandler(ApiTimeoutError) 

236 @app.errorhandler(StoreApiTimeoutError) 

237 def handle_connection_timeout(error): 

238 status_code = 504 

239 return ( 

240 render_template( 

241 "50X.html", error_message=str(error), status_code=status_code 

242 ), 

243 status_code, 

244 ) 

245 

246 @app.errorhandler(ApiResponseDecodeError) 

247 @app.errorhandler(ApiResponseError) 

248 @app.errorhandler(ApiConnectionError) 

249 @app.errorhandler(StoreApiResponseDecodeError) 

250 @app.errorhandler(StoreApiResponseError) 

251 @app.errorhandler(StoreApiConnectionError) 

252 @app.errorhandler(ApiError) 

253 @app.errorhandler(StoreApiError) 

254 def store_api_error(error): 

255 status_code = 502 

256 return ( 

257 render_template( 

258 "50X.html", error_message=str(error), status_code=status_code 

259 ), 

260 status_code, 

261 ) 

262 

263 @app.errorhandler(ApiResponseErrorList) 

264 @app.errorhandler(StoreApiResponseErrorList) 

265 def handle_api_error_list(error): 

266 if error.status_code == 404: 

267 if "snap_name" in request.path: 

268 return flask.abort(404, "Snap not found!") 

269 else: 

270 return ( 

271 render_template("404.html", message="Entity not found"), 

272 404, 

273 ) 

274 if len(error.errors) == 1 and error.errors[0]["code"] in [ 

275 "macaroon-permission-required", 

276 "macaroon-authorization-required", 

277 ]: 

278 authentication.empty_session(flask.session) 

279 return flask.redirect(f"/login?next={flask.request.path}") 

280 

281 status_code = 502 

282 codes = [ 

283 f"{error['code']}: {error.get('message', 'No message')}" 

284 for error in error.errors 

285 ] 

286 

287 error_msg = ", ".join(codes) 

288 return ( 

289 render_template( 

290 "50X.html", error_message=error_msg, status_code=status_code 

291 ), 

292 status_code, 

293 ) 

294 

295 # Publisher error 

296 @app.errorhandler(PublisherMissingUsername) 

297 def handle_publisher_missing_name(error): 

298 return flask.redirect(flask.url_for("account.get_account_name")) 

299 

300 @app.errorhandler(PublisherAgreementNotSigned) 

301 def handle_publisher_agreement_not_signed(error): 

302 return flask.redirect(flask.url_for("account.get_agreement")) 

303 

304 @app.errorhandler(PublisherMacaroonRefreshRequired) 

305 def handle_publisher_macaroon_refresh_required(error): 

306 return refresh_redirect(flask.request.path) 

307 

308 # Global tasks for all requests 

309 # === 

310 @app.before_request 

311 def clear_trailing(): 

312 """ 

313 Remove trailing slashes from all routes 

314 We like our URLs without slashes 

315 """ 

316 

317 parsed_url = urlparse(unquote(flask.request.url)) 

318 path = parsed_url.path 

319 

320 if path != "/" and path.endswith("/"): 

321 new_uri = urlunparse(parsed_url._replace(path=path[:-1])) 

322 

323 return flask.redirect(new_uri) 

324 

325 # Calculate the SHA256 hash of the script content and encode it in base64. 

326 def calculate_sha256_base64(script_content): 

327 sha256_hash = hashlib.sha256(script_content.encode()).digest() 

328 return "sha256-" + base64.b64encode(sha256_hash).decode() 

329 

330 def get_csp_directive(content, regex): 

331 directive_items = set() 

332 pattern = re.compile(regex) 

333 matched_contents = pattern.findall(content) 

334 for matched_content in matched_contents: 

335 hash_value = f"'{calculate_sha256_base64(matched_content)}'" 

336 directive_items.add(hash_value) 

337 return list(directive_items) 

338 

339 # Find all script elements in the response and add their hashes to the CSP. 

340 def add_script_hashes_to_csp(response): 

341 response.freeze() 

342 decoded_content = b"".join(response.response).decode( 

343 "utf-8", errors="replace" 

344 ) 

345 

346 CSP["script-src"] = CSP_SCRIPT_SRC + get_csp_directive( 

347 decoded_content, r'onclick\s*=\s*"(.*?)"' 

348 ) 

349 return CSP 

350 

351 @app.after_request 

352 def add_headers(response): 

353 """ 

354 Generic rules for headers to add to all requests 

355 

356 - X-Hostname: Mention the name of the host/pod running the application 

357 - Cache-Control: Add cache-control headers for public and private pages 

358 - Content-Security-Policy: Restrict resources (e.g., JavaScript, CSS, 

359 Images) and URLs 

360 - Referrer-Policy: Limit referrer data for security while preserving 

361 full referrer for same-origin requests 

362 - Cross-Origin-Embedder-Policy: allows embedding cross-origin 

363 resources 

364 - Cross-Origin-Opener-Policy: enable the page to open pop-ups while 

365 maintaining same-origin policy 

366 - Cross-Origin-Resource-Policy: allowing cross-origin requests to 

367 access the resource 

368 - X-Permitted-Cross-Domain-Policies: disallows cross-domain access to 

369 resources 

370 """ 

371 

372 response.headers["X-Hostname"] = socket.gethostname() 

373 

374 if response.status_code == 200: 

375 if flask.session: 

376 response.headers["Cache-Control"] = "private" 

377 else: 

378 # Only add caching headers to successful responses 

379 if not response.headers.get("Cache-Control"): 

380 response.headers["Cache-Control"] = ", ".join( 

381 { 

382 "public", 

383 "max-age=61", 

384 "stale-while-revalidate=300", 

385 "stale-if-error=86400", 

386 } 

387 ) 

388 csp = add_script_hashes_to_csp(response) 

389 response.headers["Content-Security-Policy"] = helpers.get_csp_as_str( 

390 csp 

391 ) 

392 response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin" 

393 response.headers["Cross-Origin-Embedder-Policy"] = "unsafe-none" 

394 response.headers["Cross-Origin-Opener-Policy"] = ( 

395 "same-origin-allow-popups" 

396 ) 

397 response.headers["Cross-Origin-Resource-Policy"] = "cross-origin" 

398 response.headers["X-Permitted-Cross-Domain-Policies"] = "none" 

399 return response