Coverage for webapp/handlers.py: 91%

145 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 22:07 +0000

1import socket 

2from urllib.parse import unquote, urlparse, urlunparse 

3 

4import base64 

5import hashlib 

6import re 

7import sentry_sdk 

8 

9import flask 

10from flask import render_template, request 

11import webapp.template_utils as template_utils 

12from canonicalwebteam import image_template 

13from webapp import authentication 

14import webapp.helpers as helpers 

15from webapp.config import ( 

16 BSI_URL, 

17 LOGIN_URL, 

18 SENTRY_DSN, 

19 COMMIT_ID, 

20 ENVIRONMENT, 

21 WEBAPP_CONFIG, 

22 DNS_VERIFICATION_SALT, 

23 IS_DEVELOPMENT, 

24 VITE_PORT, 

25) 

26 

27from canonicalwebteam.exceptions import ( 

28 StoreApiError, 

29 StoreApiConnectionError, 

30 StoreApiResourceNotFound, 

31 StoreApiResponseDecodeError, 

32 StoreApiResponseError, 

33 StoreApiResponseErrorList, 

34 StoreApiTimeoutError, 

35 PublisherAgreementNotSigned, 

36 PublisherMacaroonRefreshRequired, 

37 PublisherMissingUsername, 

38) 

39 

40from webapp.api.exceptions import ( 

41 ApiError, 

42 ApiConnectionError, 

43 ApiResponseErrorList, 

44 ApiTimeoutError, 

45 ApiResponseDecodeError, 

46 ApiResponseError, 

47) 

48 

49from datetime import datetime 

50 

51 

52CSP = { 

53 "default-src": ["'self'"], 

54 "img-src": [ 

55 "data: blob:", 

56 # This is needed to allow images from 

57 # https://www.google.*/ads/ga-audiences to load. 

58 "*", 

59 ], 

60 "script-src-elem": [ 

61 "'self'", 

62 "assets.ubuntu.com", 

63 "www.googletagmanager.com", 

64 "www.youtube.com", 

65 "asciinema.org", 

66 "player.vimeo.com", 

67 "plausible.io", 

68 "script.crazyegg.com", 

69 "w.usabilla.com", 

70 "connect.facebook.net", 

71 "snap.licdn.com", 

72 # This is necessary for Google Tag Manager to function properly. 

73 "'unsafe-inline'", 

74 ], 

75 "font-src": [ 

76 "'self'", 

77 "assets.ubuntu.com", 

78 ], 

79 "script-src": [], 

80 "connect-src": [ 

81 "'self'", 

82 "ubuntu.com", 

83 "analytics.google.com", 

84 "stats.g.doubleclick.net", 

85 "www.googletagmanager.com", 

86 "sentry.is.canonical.com", 

87 "www.google-analytics.com", 

88 "plausible.io", 

89 "*.crazyegg.com", 

90 "www.facebook.com", 

91 "px.ads.linkedin.com", 

92 "*.snapcraft.io", 

93 ], 

94 "frame-src": [ 

95 "'self'", 

96 "td.doubleclick.net", 

97 "www.youtube.com/", 

98 "asciinema.org", 

99 "player.vimeo.com", 

100 "snapcraft.io", 

101 "www.facebook.com", 

102 "snap:", 

103 ], 

104 "style-src": [ 

105 "'self'", 

106 "'unsafe-inline'", 

107 ], 

108 "media-src": [ 

109 "'self'", 

110 "res.cloudinary.com", 

111 ], 

112} 

113 

114CSP_SCRIPT_SRC = [ 

115 "'self'", 

116 "blob:", 

117 "'unsafe-eval'", 

118 "'unsafe-hashes'", 

119] 

120 

121# Vite integration 

122if IS_DEVELOPMENT: 

123 CSP["script-src-elem"].append(f"localhost:{VITE_PORT}") 

124 CSP["connect-src"].append(f"localhost:{VITE_PORT}") 

125 CSP["connect-src"].append(f"ws://localhost:{VITE_PORT}") 

126 CSP["style-src"].append(f"localhost:{VITE_PORT}") 

127 CSP_SCRIPT_SRC.append(f"localhost:{VITE_PORT}") 

128 

129 

130def refresh_redirect(path): 

131 try: 

132 macaroon_discharge = authentication.get_refreshed_discharge( 

133 flask.session["macaroon_discharge"] 

134 ) 

135 except ApiResponseError as api_response_error: 

136 if api_response_error.status_code == 401: 

137 return flask.redirect(flask.url_for("login.logout")) 

138 else: 

139 return flask.abort(502, str(api_response_error)) 

140 except ApiError as api_error: 

141 return flask.abort(502, str(api_error)) 

142 

143 flask.session["macaroon_discharge"] = macaroon_discharge 

144 return flask.redirect(path) 

145 

146 

147def snapcraft_utility_processor(): 

148 if authentication.is_authenticated(flask.session): 

149 user_name = flask.session["publisher"]["fullname"] 

150 user_is_canonical = flask.session["publisher"].get( 

151 "is_canonical", False 

152 ) 

153 stores = flask.session["publisher"].get("stores") 

154 else: 

155 user_name = None 

156 user_is_canonical = False 

157 stores = [] 

158 

159 page_slug = template_utils.generate_slug(flask.request.path) 

160 

161 return { 

162 # Variables 

163 "LOGIN_URL": LOGIN_URL, 

164 "SENTRY_DSN": SENTRY_DSN, 

165 "COMMIT_ID": COMMIT_ID, 

166 "ENVIRONMENT": ENVIRONMENT, 

167 "host_url": flask.request.host_url, 

168 "path": flask.request.path, 

169 "page_slug": page_slug, 

170 "user_name": user_name, 

171 "VERIFIED_PUBLISHER": "verified", 

172 "STAR_DEVELOPER": "starred", 

173 "webapp_config": WEBAPP_CONFIG, 

174 "BSI_URL": BSI_URL, 

175 "now": datetime.now(), 

176 "user_is_canonical": user_is_canonical, 

177 # Functions 

178 "contains": template_utils.contains, 

179 "join": template_utils.join, 

180 "static_url": template_utils.static_url, 

181 "IS_DEVELOPMENT": IS_DEVELOPMENT, 

182 "vite_import": template_utils.vite_import, 

183 "vite_dev_tools": template_utils.vite_dev_tools, 

184 "format_number": template_utils.format_number, 

185 "format_display_name": template_utils.format_display_name, 

186 "display_name": template_utils.display_name, 

187 "install_snippet": template_utils.install_snippet, 

188 "format_date": template_utils.format_date, 

189 "format_member_role": template_utils.format_member_role, 

190 "image": image_template, 

191 "stores": stores, 

192 "format_link": template_utils.format_link, 

193 "DNS_VERIFICATION_SALT": DNS_VERIFICATION_SALT, 

194 } 

195 

196 

197def set_handlers(app): 

198 @app.context_processor 

199 def utility_processor(): 

200 """ 

201 This defines the set of properties and functions that will be added 

202 to the default context for processing templates. All these items 

203 can be used in all templates 

204 """ 

205 

206 return snapcraft_utility_processor() 

207 

208 # Error handlers 

209 # === 

210 @app.errorhandler(500) 

211 @app.errorhandler(501) 

212 @app.errorhandler(502) 

213 @app.errorhandler(504) 

214 @app.errorhandler(505) 

215 def internal_error(error): 

216 error_name = getattr(error, "name", type(error).__name__) 

217 return_code = getattr(error, "code", 500) 

218 

219 if not app.testing: 

220 sentry_sdk.capture_exception() 

221 

222 return ( 

223 flask.render_template("50X.html", error_name=error_name), 

224 return_code, 

225 ) 

226 

227 @app.errorhandler(503) 

228 def service_unavailable(error): 

229 return render_template("503.html"), 503 

230 

231 @app.errorhandler(404) 

232 @app.errorhandler(StoreApiResourceNotFound) 

233 def handle_resource_not_found(error): 

234 return render_template("404.html", message=str(error)), 404 

235 

236 @app.errorhandler(ApiTimeoutError) 

237 @app.errorhandler(StoreApiTimeoutError) 

238 def handle_connection_timeout(error): 

239 status_code = 504 

240 return ( 

241 render_template( 

242 "50X.html", error_message=str(error), status_code=status_code 

243 ), 

244 status_code, 

245 ) 

246 

247 @app.errorhandler(ApiResponseDecodeError) 

248 @app.errorhandler(ApiResponseError) 

249 @app.errorhandler(ApiConnectionError) 

250 @app.errorhandler(StoreApiResponseDecodeError) 

251 @app.errorhandler(StoreApiResponseError) 

252 @app.errorhandler(StoreApiConnectionError) 

253 @app.errorhandler(ApiError) 

254 @app.errorhandler(StoreApiError) 

255 def store_api_error(error): 

256 status_code = 502 

257 return ( 

258 render_template( 

259 "50X.html", error_message=str(error), status_code=status_code 

260 ), 

261 status_code, 

262 ) 

263 

264 @app.errorhandler(ApiResponseErrorList) 

265 @app.errorhandler(StoreApiResponseErrorList) 

266 def handle_api_error_list(error): 

267 if error.status_code == 404: 

268 if "snap_name" in request.path: 

269 return flask.abort(404, "Snap not found!") 

270 else: 

271 return ( 

272 render_template("404.html", message="Entity not found"), 

273 404, 

274 ) 

275 if len(error.errors) == 1 and error.errors[0]["code"] in [ 

276 "macaroon-permission-required", 

277 "macaroon-authorization-required", 

278 ]: 

279 authentication.empty_session(flask.session) 

280 return flask.redirect(f"/login?next={flask.request.path}") 

281 

282 status_code = 502 

283 codes = [ 

284 f"{error['code']}: {error.get('message', 'No message')}" 

285 for error in error.errors 

286 ] 

287 

288 error_msg = ", ".join(codes) 

289 return ( 

290 render_template( 

291 "50X.html", error_message=error_msg, status_code=status_code 

292 ), 

293 status_code, 

294 ) 

295 

296 # Publisher error 

297 @app.errorhandler(PublisherMissingUsername) 

298 def handle_publisher_missing_name(error): 

299 return flask.redirect(flask.url_for("account.get_account_name")) 

300 

301 @app.errorhandler(PublisherAgreementNotSigned) 

302 def handle_publisher_agreement_not_signed(error): 

303 return flask.redirect(flask.url_for("account.get_agreement")) 

304 

305 @app.errorhandler(PublisherMacaroonRefreshRequired) 

306 def handle_publisher_macaroon_refresh_required(error): 

307 return refresh_redirect(flask.request.path) 

308 

309 # Global tasks for all requests 

310 # === 

311 @app.before_request 

312 def clear_trailing(): 

313 """ 

314 Remove trailing slashes from all routes 

315 We like our URLs without slashes 

316 """ 

317 

318 parsed_url = urlparse(unquote(flask.request.url)) 

319 path = parsed_url.path 

320 

321 if path != "/" and path.endswith("/"): 

322 new_uri = urlunparse(parsed_url._replace(path=path[:-1])) 

323 

324 return flask.redirect(new_uri) 

325 

326 # Calculate the SHA256 hash of the script content and encode it in base64. 

327 def calculate_sha256_base64(script_content): 

328 sha256_hash = hashlib.sha256(script_content.encode()).digest() 

329 return "sha256-" + base64.b64encode(sha256_hash).decode() 

330 

331 def get_csp_directive(content, regex): 

332 directive_items = set() 

333 pattern = re.compile(regex) 

334 matched_contents = pattern.findall(content) 

335 for matched_content in matched_contents: 

336 hash_value = f"'{calculate_sha256_base64(matched_content)}'" 

337 directive_items.add(hash_value) 

338 return list(directive_items) 

339 

340 # Find all script elements in the response and add their hashes to the CSP. 

341 def add_script_hashes_to_csp(response): 

342 response.freeze() 

343 decoded_content = b"".join(response.response).decode( 

344 "utf-8", errors="replace" 

345 ) 

346 

347 CSP["script-src"] = CSP_SCRIPT_SRC + get_csp_directive( 

348 decoded_content, r'onclick\s*=\s*"(.*?)"' 

349 ) 

350 return CSP 

351 

352 @app.after_request 

353 def add_headers(response): 

354 """ 

355 Generic rules for headers to add to all requests 

356 

357 - X-Hostname: Mention the name of the host/pod running the application 

358 - Cache-Control: Add cache-control headers for public and private pages 

359 - Content-Security-Policy: Restrict resources (e.g., JavaScript, CSS, 

360 Images) and URLs 

361 - Referrer-Policy: Limit referrer data for security while preserving 

362 full referrer for same-origin requests 

363 - Cross-Origin-Embedder-Policy: allows embedding cross-origin 

364 resources 

365 - Cross-Origin-Opener-Policy: enable the page to open pop-ups while 

366 maintaining same-origin policy 

367 - Cross-Origin-Resource-Policy: allowing cross-origin requests to 

368 access the resource 

369 - X-Permitted-Cross-Domain-Policies: disallows cross-domain access to 

370 resources 

371 """ 

372 

373 response.headers["X-Hostname"] = socket.gethostname() 

374 

375 if response.status_code == 200: 

376 if flask.session: 

377 response.headers["Cache-Control"] = "private" 

378 else: 

379 # Only add caching headers to successful responses 

380 if not response.headers.get("Cache-Control"): 

381 response.headers["Cache-Control"] = ", ".join( 

382 { 

383 "public", 

384 "max-age=61", 

385 "stale-while-revalidate=300", 

386 "stale-if-error=86400", 

387 } 

388 ) 

389 csp = add_script_hashes_to_csp(response) 

390 response.headers["Content-Security-Policy"] = helpers.get_csp_as_str( 

391 csp 

392 ) 

393 response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin" 

394 response.headers["Cross-Origin-Embedder-Policy"] = "unsafe-none" 

395 response.headers["Cross-Origin-Opener-Policy"] = ( 

396 "same-origin-allow-popups" 

397 ) 

398 response.headers["Cross-Origin-Resource-Policy"] = "cross-origin" 

399 response.headers["X-Permitted-Cross-Domain-Policies"] = "none" 

400 return response