Coverage for webapp/handlers.py: 92%

153 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-28 22:05 +0000

1import socket 

2from urllib.parse import unquote, urlparse, urlunparse 

3 

4import base64 

5import hashlib 

6import re 

7 

8import flask 

9from flask import render_template, request 

10import prometheus_client 

11import user_agents 

12import webapp.template_utils as template_utils 

13from canonicalwebteam import image_template 

14from webapp import authentication 

15import webapp.helpers as helpers 

16from webapp.config import ( 

17 BSI_URL, 

18 LOGIN_URL, 

19 SENTRY_DSN, 

20 COMMIT_ID, 

21 ENVIRONMENT, 

22 WEBAPP_CONFIG, 

23 DNS_VERIFICATION_SALT, 

24) 

25 

26from canonicalwebteam.exceptions import ( 

27 StoreApiError, 

28 StoreApiConnectionError, 

29 StoreApiResourceNotFound, 

30 StoreApiResponseDecodeError, 

31 StoreApiResponseError, 

32 StoreApiResponseErrorList, 

33 StoreApiTimeoutError, 

34 PublisherAgreementNotSigned, 

35 PublisherMacaroonRefreshRequired, 

36 PublisherMissingUsername, 

37) 

38 

39from webapp.api.exceptions import ( 

40 ApiError, 

41 ApiConnectionError, 

42 ApiResponseErrorList, 

43 ApiTimeoutError, 

44 ApiResponseDecodeError, 

45 ApiResponseError, 

46) 

47 

48from datetime import datetime 

49 

50badge_counter = prometheus_client.Counter( 

51 "badge_counter", "A counter of badges requests" 

52) 

53 

54badge_logged_in_counter = prometheus_client.Counter( 

55 "badge_logged_in_counter", 

56 "A counter of badges requests of logged in users", 

57) 

58 

59accept_encoding_counter = prometheus_client.Counter( 

60 "accept_encoding_counter", 

61 "A counter for Accept-Encoding headers, split by browser", 

62 ["accept_encoding", "browser_family"], 

63) 

64 

65CSP = { 

66 "default-src": ["'self'"], 

67 "img-src": [ 

68 "data: blob:", 

69 # This is needed to allow images from 

70 # https://www.google.*/ads/ga-audiences to load. 

71 "*", 

72 ], 

73 "script-src-elem": [ 

74 "'self'", 

75 "assets.ubuntu.com", 

76 "www.googletagmanager.com", 

77 "www.youtube.com", 

78 "asciinema.org", 

79 "player.vimeo.com", 

80 "plausible.io", 

81 "script.crazyegg.com", 

82 "w.usabilla.com", 

83 "connect.facebook.net", 

84 "snap.licdn.com", 

85 # This is necessary for Google Tag Manager to function properly. 

86 "'unsafe-inline'", 

87 ], 

88 "font-src": [ 

89 "'self'", 

90 "assets.ubuntu.com", 

91 ], 

92 "script-src": [], 

93 "connect-src": [ 

94 "'self'", 

95 "ubuntu.com", 

96 "analytics.google.com", 

97 "stats.g.doubleclick.net", 

98 "www.googletagmanager.com", 

99 "sentry.is.canonical.com", 

100 "www.google-analytics.com", 

101 "plausible.io", 

102 "*.crazyegg.com", 

103 "www.facebook.com", 

104 "px.ads.linkedin.com", 

105 ], 

106 "frame-src": [ 

107 "'self'", 

108 "td.doubleclick.net", 

109 "www.youtube.com/", 

110 "asciinema.org", 

111 "player.vimeo.com", 

112 "snapcraft.io", 

113 "www.facebook.com", 

114 "snap:", 

115 ], 

116 "style-src": [ 

117 "'self'", 

118 "'unsafe-inline'", 

119 ], 

120 "media-src": [ 

121 "'self'", 

122 "res.cloudinary.com", 

123 ], 

124} 

125 

126CSP_SCRIPT_SRC = [ 

127 "'self'", 

128 "blob:", 

129 "'unsafe-eval'", 

130 "'unsafe-hashes'", 

131] 

132 

133 

134def refresh_redirect(path): 

135 try: 

136 macaroon_discharge = authentication.get_refreshed_discharge( 

137 flask.session["macaroon_discharge"] 

138 ) 

139 except ApiResponseError as api_response_error: 

140 if api_response_error.status_code == 401: 

141 return flask.redirect(flask.url_for("login.logout")) 

142 else: 

143 return flask.abort(502, str(api_response_error)) 

144 except ApiError as api_error: 

145 return flask.abort(502, str(api_error)) 

146 

147 flask.session["macaroon_discharge"] = macaroon_discharge 

148 return flask.redirect(path) 

149 

150 

151def snapcraft_utility_processor(): 

152 if authentication.is_authenticated(flask.session): 

153 user_name = flask.session["publisher"]["fullname"] 

154 user_is_canonical = flask.session["publisher"].get( 

155 "is_canonical", False 

156 ) 

157 stores = flask.session["publisher"].get("stores") 

158 else: 

159 user_name = None 

160 user_is_canonical = False 

161 stores = [] 

162 

163 page_slug = template_utils.generate_slug(flask.request.path) 

164 

165 return { 

166 # Variables 

167 "LOGIN_URL": LOGIN_URL, 

168 "SENTRY_DSN": SENTRY_DSN, 

169 "COMMIT_ID": COMMIT_ID, 

170 "ENVIRONMENT": ENVIRONMENT, 

171 "host_url": flask.request.host_url, 

172 "path": flask.request.path, 

173 "page_slug": page_slug, 

174 "user_name": user_name, 

175 "VERIFIED_PUBLISHER": "verified", 

176 "STAR_DEVELOPER": "starred", 

177 "webapp_config": WEBAPP_CONFIG, 

178 "BSI_URL": BSI_URL, 

179 "now": datetime.now(), 

180 "user_is_canonical": user_is_canonical, 

181 # Functions 

182 "contains": template_utils.contains, 

183 "join": template_utils.join, 

184 "static_url": template_utils.static_url, 

185 "format_number": template_utils.format_number, 

186 "format_display_name": template_utils.format_display_name, 

187 "display_name": template_utils.display_name, 

188 "install_snippet": template_utils.install_snippet, 

189 "format_date": template_utils.format_date, 

190 "format_member_role": template_utils.format_member_role, 

191 "image": image_template, 

192 "stores": stores, 

193 "format_link": template_utils.format_link, 

194 "DNS_VERIFICATION_SALT": DNS_VERIFICATION_SALT, 

195 } 

196 

197 

198def set_handlers(app): 

199 @app.context_processor 

200 def utility_processor(): 

201 """ 

202 This defines the set of properties and functions that will be added 

203 to the default context for processing templates. All these items 

204 can be used in all templates 

205 """ 

206 

207 return snapcraft_utility_processor() 

208 

209 # Error handlers 

210 # === 

211 @app.errorhandler(500) 

212 @app.errorhandler(501) 

213 @app.errorhandler(502) 

214 @app.errorhandler(504) 

215 @app.errorhandler(505) 

216 def internal_error(error): 

217 error_name = getattr(error, "name", type(error).__name__) 

218 return_code = getattr(error, "code", 500) 

219 

220 if not app.testing: 

221 app.extensions["sentry"].captureException() 

222 

223 return ( 

224 flask.render_template("50X.html", error_name=error_name), 

225 return_code, 

226 ) 

227 

228 @app.errorhandler(503) 

229 def service_unavailable(error): 

230 return render_template("503.html"), 503 

231 

232 @app.errorhandler(404) 

233 @app.errorhandler(StoreApiResourceNotFound) 

234 def handle_resource_not_found(error): 

235 return render_template("404.html", message=str(error)), 404 

236 

237 @app.errorhandler(ApiTimeoutError) 

238 @app.errorhandler(StoreApiTimeoutError) 

239 def handle_connection_timeout(error): 

240 status_code = 504 

241 return ( 

242 render_template( 

243 "50X.html", error_message=str(error), status_code=status_code 

244 ), 

245 status_code, 

246 ) 

247 

248 @app.errorhandler(ApiResponseDecodeError) 

249 @app.errorhandler(ApiResponseError) 

250 @app.errorhandler(ApiConnectionError) 

251 @app.errorhandler(StoreApiResponseDecodeError) 

252 @app.errorhandler(StoreApiResponseError) 

253 @app.errorhandler(StoreApiConnectionError) 

254 @app.errorhandler(ApiError) 

255 @app.errorhandler(StoreApiError) 

256 def store_api_error(error): 

257 status_code = 502 

258 return ( 

259 render_template( 

260 "50X.html", error_message=str(error), status_code=status_code 

261 ), 

262 status_code, 

263 ) 

264 

265 @app.errorhandler(ApiResponseErrorList) 

266 @app.errorhandler(StoreApiResponseErrorList) 

267 def handle_api_error_list(error): 

268 if error.status_code == 404: 

269 if "snap_name" in request.path: 

270 return flask.abort(404, "Snap not found!") 

271 else: 

272 return ( 

273 render_template("404.html", message="Entity not found"), 

274 404, 

275 ) 

276 if len(error.errors) == 1 and error.errors[0]["code"] in [ 

277 "macaroon-permission-required", 

278 "macaroon-authorization-required", 

279 ]: 

280 authentication.empty_session(flask.session) 

281 return flask.redirect(f"/login?next={flask.request.path}") 

282 

283 status_code = 502 

284 codes = [ 

285 f"{error['code']}: {error.get('message', 'No message')}" 

286 for error in error.errors 

287 ] 

288 

289 error_msg = ", ".join(codes) 

290 return ( 

291 render_template( 

292 "50X.html", error_message=error_msg, status_code=status_code 

293 ), 

294 status_code, 

295 ) 

296 

297 # Publisher error 

298 @app.errorhandler(PublisherMissingUsername) 

299 def handle_publisher_missing_name(error): 

300 return flask.redirect(flask.url_for("account.get_account_name")) 

301 

302 @app.errorhandler(PublisherAgreementNotSigned) 

303 def handle_publisher_agreement_not_signed(error): 

304 return flask.redirect(flask.url_for("account.get_agreement")) 

305 

306 @app.errorhandler(PublisherMacaroonRefreshRequired) 

307 def handle_publisher_macaroon_refresh_required(error): 

308 return refresh_redirect(flask.request.path) 

309 

310 # Global tasks for all requests 

311 # === 

312 @app.before_request 

313 def clear_trailing(): 

314 """ 

315 Remove trailing slashes from all routes 

316 We like our URLs without slashes 

317 """ 

318 

319 parsed_url = urlparse(unquote(flask.request.url)) 

320 path = parsed_url.path 

321 

322 if path != "/" and path.endswith("/"): 

323 new_uri = urlunparse(parsed_url._replace(path=path[:-1])) 

324 

325 return flask.redirect(new_uri) 

326 

327 @app.before_request 

328 def prometheus_metrics(): 

329 # Accept-encoding counter 

330 # === 

331 agent_string = flask.request.headers.get("User-Agent") 

332 

333 # Exclude probes, which happen behind the cache 

334 if agent_string and not agent_string.startswith( 

335 ("kube-probe", "Prometheus") 

336 ): 

337 agent = user_agents.parse(agent_string or "") 

338 

339 accept_encoding_counter.labels( 

340 accept_encoding=flask.request.headers.get("Accept-Encoding"), 

341 browser_family=agent.browser.family, 

342 ).inc() 

343 

344 # Badge counters 

345 # === 

346 if "/static/images/badges" in flask.request.url: 

347 if flask.session: 

348 badge_logged_in_counter.inc() 

349 else: 

350 badge_counter.inc() 

351 

352 # Calculate the SHA256 hash of the script content and encode it in base64. 

353 def calculate_sha256_base64(script_content): 

354 sha256_hash = hashlib.sha256(script_content.encode()).digest() 

355 return "sha256-" + base64.b64encode(sha256_hash).decode() 

356 

357 def get_csp_directive(content, regex): 

358 directive_items = set() 

359 pattern = re.compile(regex) 

360 matched_contents = pattern.findall(content) 

361 for matched_content in matched_contents: 

362 hash_value = f"'{calculate_sha256_base64(matched_content)}'" 

363 directive_items.add(hash_value) 

364 return list(directive_items) 

365 

366 # Find all script elements in the response and add their hashes to the CSP. 

367 def add_script_hashes_to_csp(response): 

368 response.freeze() 

369 decoded_content = b"".join(response.response).decode( 

370 "utf-8", errors="replace" 

371 ) 

372 

373 CSP["script-src"] = CSP_SCRIPT_SRC + get_csp_directive( 

374 decoded_content, r'onclick\s*=\s*"(.*?)"' 

375 ) 

376 return CSP 

377 

378 @app.after_request 

379 def add_headers(response): 

380 """ 

381 Generic rules for headers to add to all requests 

382 

383 - X-Hostname: Mention the name of the host/pod running the application 

384 - Cache-Control: Add cache-control headers for public and private pages 

385 - Content-Security-Policy: Restrict resources (e.g., JavaScript, CSS, 

386 Images) and URLs 

387 - Referrer-Policy: Limit referrer data for security while preserving 

388 full referrer for same-origin requests 

389 - Cross-Origin-Embedder-Policy: allows embedding cross-origin 

390 resources 

391 - Cross-Origin-Opener-Policy: enable the page to open pop-ups while 

392 maintaining same-origin policy 

393 - Cross-Origin-Resource-Policy: allowing cross-origin requests to 

394 access the resource 

395 - X-Permitted-Cross-Domain-Policies: disallows cross-domain access to 

396 resources 

397 """ 

398 

399 response.headers["X-Hostname"] = socket.gethostname() 

400 

401 if response.status_code == 200: 

402 if flask.session: 

403 response.headers["Cache-Control"] = "private" 

404 else: 

405 # Only add caching headers to successful responses 

406 if not response.headers.get("Cache-Control"): 

407 response.headers["Cache-Control"] = ", ".join( 

408 { 

409 "public", 

410 "max-age=61", 

411 "stale-while-revalidate=300", 

412 "stale-if-error=86400", 

413 } 

414 ) 

415 csp = add_script_hashes_to_csp(response) 

416 response.headers["Content-Security-Policy"] = helpers.get_csp_as_str( 

417 csp 

418 ) 

419 response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin" 

420 response.headers["Cross-Origin-Embedder-Policy"] = "unsafe-none" 

421 response.headers["Cross-Origin-Opener-Policy"] = ( 

422 "same-origin-allow-popups" 

423 ) 

424 response.headers["Cross-Origin-Resource-Policy"] = "cross-origin" 

425 response.headers["X-Permitted-Cross-Domain-Policies"] = "none" 

426 return response