Coverage for webapp/store/snap_details_views.py: 81%

239 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-06-29 22:26 +0000

1import flask 

2from flask import Response 

3import requests 

4 

5import logging 

6import humanize 

7import os 

8 

9import webapp.helpers as helpers 

10import webapp.metrics.helper as metrics_helper 

11import webapp.metrics.metrics as metrics 

12import webapp.store.logic as logic 

13from webapp import authentication 

14from webapp.markdown import parse_markdown_description 

15from cache.cache_utility import redis_cache 

16 

17from canonicalwebteam.flask_base.decorators import ( 

18 exclude_xframe_options_header, 

19) 

20from canonicalwebteam.exceptions import StoreApiError 

21from canonicalwebteam.store_api.devicegw import DeviceGW 

22from pybadges import badge 

23 

24device_gateway = DeviceGW("snap", helpers.api_session) 

25device_gateway_sbom = DeviceGW("sbom", helpers.api_session) 

26 

27logger = logging.getLogger(__name__) 

28 

29 

30FIELDS = [ 

31 "title", 

32 "summary", 

33 "description", 

34 "license", 

35 "contact", 

36 "website", 

37 "publisher", 

38 "media", 

39 "download", 

40 "version", 

41 "created-at", 

42 "confinement", 

43 "categories", 

44 "trending", 

45 "unlisted", 

46 "links", 

47 "revision", 

48] 

49 

50FIELDS_EXTRA_DETAILS = [ 

51 "aliases", 

52] 

53 

54 

55def snap_details_views(store): 

56 snap_regex = "[a-z0-9-]*[a-z][a-z0-9-]*" 

57 snap_regex_upercase = "[A-Za-z0-9-]*[A-Za-z][A-Za-z0-9-]*" 

58 

59 def _get_context_snap_details(snap_name, supported_architectures=None): 

60 details = device_gateway.get_item_details( 

61 snap_name, fields=FIELDS, api_version=2 

62 ) 

63 # 404 for any snap under quarantine 

64 if details["snap"]["publisher"]["username"] == "snap-quarantine": 

65 flask.abort(404, "No snap named {}".format(snap_name)) 

66 

67 # When removing all the channel maps of an existing snap the API, 

68 # responds that the snaps still exists with data. 

69 # Return a 404 if not channel maps, to avoid having a error. 

70 # For example: mir-kiosk-browser 

71 if not details.get("channel-map"): 

72 flask.abort(404, "No snap named {}".format(snap_name)) 

73 

74 formatted_description = parse_markdown_description( 

75 details.get("snap", {}).get("description", "") 

76 ) 

77 

78 channel_maps_list = logic.convert_channel_maps( 

79 details.get("channel-map") 

80 ) 

81 

82 latest_channel = logic.get_last_updated_version( 

83 details.get("channel-map") 

84 ) 

85 

86 revisions = logic.get_revisions(details.get("channel-map")) 

87 

88 default_track = ( 

89 details.get("default-track") 

90 if details.get("default-track") 

91 else "latest" 

92 ) 

93 

94 lowest_risk_available = logic.get_lowest_available_risk( 

95 channel_maps_list, default_track 

96 ) 

97 

98 extracted_info = logic.extract_info_channel_map( 

99 channel_maps_list, default_track, lowest_risk_available 

100 ) 

101 

102 last_updated = latest_channel["channel"]["released-at"] 

103 updates = logic.get_latest_versions( 

104 details.get("channel-map"), 

105 default_track, 

106 lowest_risk_available, 

107 supported_architectures, 

108 ) 

109 

110 # Determine the most recent update date from updates tuple 

111 # updates[0] is the stable channel, updates[1] is the most 

112 # recent non-stable 

113 most_recent_update = None 

114 if updates[0] and updates[1]: 

115 # Compare both and use the most recent 

116 date_0 = updates[0].get("released-at") 

117 date_1 = updates[1].get("released-at") 

118 if date_0 and date_1: 

119 most_recent_update = max(date_0, date_1) 

120 else: 

121 most_recent_update = date_0 or date_1 

122 elif updates[0]: 

123 most_recent_update = updates[0].get("released-at") 

124 elif updates[1]: 

125 most_recent_update = updates[1].get("released-at") 

126 

127 binary_filesize = latest_channel["download"]["size"] 

128 

129 # filter out banner and banner-icon images from screenshots 

130 screenshots = logic.filter_screenshots( 

131 details.get("snap", {}).get("media", []) 

132 ) 

133 

134 icon_url = helpers.get_icon(details.get("snap", {}).get("media", [])) 

135 

136 publisher_info = helpers.get_yaml( 

137 "{}{}.yaml".format( 

138 flask.current_app.config["CONTENT_DIRECTORY"][ 

139 "PUBLISHER_PAGES" 

140 ], 

141 details["snap"]["publisher"]["username"], 

142 ), 

143 typ="safe", 

144 ) 

145 

146 publisher_snaps = [] 

147 publisher_featured_snaps = None 

148 

149 if publisher_info: 

150 publisher_results = logic.get_publisher_snaps( 

151 device_gateway, details["snap"]["publisher"]["username"] 

152 ) 

153 

154 snaps_by_name = {} 

155 for snap in publisher_results: 

156 item = snap["snap"] 

157 snaps_by_name[snap["name"]] = { 

158 "package_name": snap["name"], 

159 "title": item.get("title"), 

160 "summary": item.get("summary"), 

161 "icon_url": helpers.get_icon(item.get("media", [])), 

162 } 

163 

164 publisher_featured_snaps = logic.hydrate_featured_snaps( 

165 publisher_info.get("featured_snaps"), snaps_by_name 

166 ) 

167 

168 # The "More from publisher" list excludes featured snaps and 

169 # the snap currently being viewed. 

170 excluded_names = {snap_name} 

171 for snap in publisher_featured_snaps: 

172 excluded_names.add(snap["package_name"]) 

173 

174 available_snaps = [ 

175 snap 

176 for name, snap in snaps_by_name.items() 

177 if name not in excluded_names 

178 ] 

179 

180 publisher_snaps = logic.get_n_random_snaps(available_snaps, 4) 

181 

182 video = logic.get_video(details.get("snap", {}).get("media", [])) 

183 

184 is_users_snap = False 

185 if authentication.is_authenticated(flask.session): 

186 if ( 

187 flask.session.get("publisher").get("nickname") 

188 == details["snap"]["publisher"]["username"] 

189 ): 

190 is_users_snap = True 

191 

192 # build list of categories of a snap 

193 categories = logic.get_snap_categories( 

194 details.get("snap", {}).get("categories", []) 

195 ) 

196 

197 developer = logic.get_snap_developer(details["name"]) 

198 

199 is_last_updated_old = logic.is_snap_old(last_updated) 

200 

201 context = { 

202 "snap_id": details.get("snap-id"), 

203 # Data direct from details API 

204 "snap_title": details["snap"]["title"], 

205 "package_name": details["name"], 

206 "categories": categories, 

207 "icon_url": icon_url, 

208 "version": extracted_info["version"], 

209 "license": details["snap"]["license"], 

210 "publisher": details["snap"]["publisher"]["display-name"], 

211 "username": details["snap"]["publisher"]["username"], 

212 "screenshots": screenshots, 

213 "video": video, 

214 "publisher_snaps": publisher_snaps, 

215 "publisher_featured_snaps": publisher_featured_snaps, 

216 "has_publisher_page": publisher_info is not None, 

217 "contact": details["snap"].get("contact"), 

218 "website": details["snap"].get("website"), 

219 "summary": details["snap"]["summary"], 

220 "description": formatted_description, 

221 "channel_map": channel_maps_list, 

222 "has_stable": logic.has_stable(channel_maps_list), 

223 "developer_validation": details["snap"]["publisher"]["validation"], 

224 "default_track": default_track, 

225 "lowest_risk_available": lowest_risk_available, 

226 "confinement": extracted_info["confinement"], 

227 "trending": details.get("snap", {}).get("trending", False), 

228 # Transformed API data 

229 "filesize": humanize.naturalsize(binary_filesize), 

230 "last_updated": logic.convert_date(last_updated), 

231 "last_updated_raw": last_updated, 

232 "is_snap_old": logic.is_snap_old(most_recent_update), 

233 "is_last_updated_old": is_last_updated_old, 

234 "is_users_snap": is_users_snap, 

235 "unlisted": details.get("snap", {}).get("unlisted", False), 

236 "developer": developer, 

237 # TODO: This is horrible and hacky 

238 "appliances": { 

239 "adguard-home": "adguard", 

240 "mosquitto": "mosquitto", 

241 "nextcloud": "nextcloud", 

242 "plexmediaserver": "plex", 

243 "openhab": "openhab", 

244 }, 

245 "links": details["snap"].get("links"), 

246 "updates": updates, 

247 "revisions": revisions, 

248 "turnstile_site_key": ( 

249 flask.current_app.config.get("TURNSTILE_SITE_KEY", "") 

250 if flask.current_app.config.get("TURNSTILE_SECRET_KEY") 

251 else "" 

252 ), 

253 } 

254 return context 

255 

256 def verify_turnstile(turnstile_response): 

257 turnstile_secret = flask.current_app.config.get( 

258 "TURNSTILE_SECRET_KEY", "" 

259 ) 

260 if not turnstile_secret: 

261 return True 

262 

263 if not turnstile_response: 

264 logger.warning("Turnstile token missing from report form") 

265 return False 

266 

267 payload = { 

268 "secret": turnstile_secret, 

269 "response": turnstile_response, 

270 } 

271 

272 try: 

273 response = requests.post( 

274 flask.current_app.config["TURNSTILE_VERIFY_URL"], 

275 data=payload, 

276 timeout=10, 

277 ) 

278 if not response.ok: 

279 logger.warning( 

280 "Turnstile verification returned %s", 

281 response.status_code, 

282 ) 

283 return False 

284 verification = response.json() 

285 except (requests.RequestException, ValueError): 

286 logger.exception("Turnstile verification failed") 

287 return False 

288 

289 if not verification.get("success"): 

290 logger.warning( 

291 "Turnstile verification denied report: %s", 

292 verification.get("error-codes", []), 

293 ) 

294 return False 

295 

296 return True 

297 

298 def snap_has_sboms(revisions, snap_id): 

299 if not revisions: 

300 return False 

301 

302 sbom_path = f"download/sbom_snap_{snap_id}_{revisions[0]}.spdx2.3.json" 

303 endpoint = device_gateway_sbom.get_endpoint_url(sbom_path) 

304 

305 res = requests.head(endpoint) 

306 

307 # backend returns 302 instead of 200 for a successful request 

308 # adding the check for 200 in case this is changed without us knowing 

309 if res.status_code == 200 or res.status_code == 302: 

310 return True 

311 

312 return False 

313 

314 @store.route("/download/sbom_snap_<snap_id>_<revision>.spdx2.3.json") 

315 def get_sbom(snap_id, revision): 

316 sbom_path = f"download/sbom_snap_{snap_id}_{revision}.spdx2.3.json" 

317 endpoint = device_gateway_sbom.get_endpoint_url(sbom_path) 

318 

319 res = requests.get(endpoint) 

320 

321 return flask.jsonify(res.json()) 

322 

323 @store.route('/<regex("' + snap_regex + '"):snap_name>') 

324 def snap_details(snap_name): 

325 """ 

326 A view to display the snap details page for specific snaps. 

327 

328 This queries the snapcraft API (api.snapcraft.io) and passes 

329 some of the data through to the snap-details.html template, 

330 with appropriate sanitation. 

331 """ 

332 

333 error_info = {} 

334 status_code = 200 

335 

336 context = _get_context_snap_details(snap_name) 

337 try: 

338 # the empty string channel makes the store API not filter by 

339 # the default channel 'latest/stable', which gives errors for 

340 # snaps that don't use that channel 

341 extra_details = device_gateway.get_snap_details( 

342 snap_name, channel="", fields=FIELDS_EXTRA_DETAILS 

343 ) 

344 except Exception: 

345 logger.exception("Details endpoint returned an error") 

346 extra_details = None 

347 

348 if extra_details and extra_details["aliases"]: 

349 context["aliases"] = [ 

350 [ 

351 f"{extra_details['package_name']}.{alias_obj['target']}", 

352 alias_obj["name"], 

353 ] 

354 for alias_obj in extra_details["aliases"] 

355 ] 

356 

357 country_metric_name = "weekly_installed_base_by_country_percent" 

358 os_metric_name = "weekly_installed_base_by_operating_system_normalized" 

359 

360 end = metrics_helper.get_last_metrics_processed_date() 

361 

362 metrics_query_json = [ 

363 metrics_helper.get_filter( 

364 metric_name=country_metric_name, 

365 snap_id=context["snap_id"], 

366 start=end, 

367 end=end, 

368 ), 

369 metrics_helper.get_filter( 

370 metric_name=os_metric_name, 

371 snap_id=context["snap_id"], 

372 start=end, 

373 end=end, 

374 ), 

375 ] 

376 

377 metrics_response = device_gateway.get_public_metrics( 

378 metrics_query_json 

379 ) 

380 

381 os_metrics = None 

382 country_devices = None 

383 if metrics_response: 

384 oses = metrics_helper.find_metric(metrics_response, os_metric_name) 

385 os_metrics = metrics.OsMetric( 

386 name=oses["metric_name"], 

387 series=oses["series"], 

388 buckets=oses["buckets"], 

389 status=oses["status"], 

390 ) 

391 

392 territories = metrics_helper.find_metric( 

393 metrics_response, country_metric_name 

394 ) 

395 country_devices = metrics.CountryDevices( 

396 name=territories["metric_name"], 

397 series=territories["series"], 

398 buckets=territories["buckets"], 

399 status=territories["status"], 

400 private=False, 

401 ) 

402 

403 has_sboms = snap_has_sboms(context["revisions"], context["snap_id"]) 

404 

405 context.update( 

406 { 

407 "countries": ( 

408 country_devices.country_data if country_devices else None 

409 ), 

410 "normalized_os": os_metrics.os if os_metrics else None, 

411 # Context info 

412 "is_linux": ( 

413 "Linux" in flask.request.headers.get("User-Agent", "") 

414 and "Android" 

415 not in flask.request.headers.get("User-Agent", "") 

416 ), 

417 "error_info": error_info, 

418 } 

419 ) 

420 

421 context["has_sboms"] = has_sboms 

422 

423 return ( 

424 flask.render_template("store/snap-details.html", **context), 

425 status_code, 

426 ) 

427 

428 @store.route('/<regex("' + snap_regex + '"):snap_name>/embedded') 

429 @exclude_xframe_options_header 

430 def snap_details_embedded(snap_name): 

431 """ 

432 A view to display the snap embedded card for specific snaps. 

433 

434 This queries the snapcraft API (api.snapcraft.io) and passes 

435 some of the data through to the template, 

436 with appropriate sanitation. 

437 """ 

438 status_code = 200 

439 

440 context = _get_context_snap_details(snap_name) 

441 

442 button_variants = ["black", "white"] 

443 button = flask.request.args.get("button") 

444 if button and button not in button_variants: 

445 button = "black" 

446 

447 architectures = list(context["channel_map"].keys()) 

448 

449 context.update( 

450 { 

451 "default_architecture": ( 

452 "amd64" if "amd64" in architectures else architectures[0] 

453 ), 

454 "button": button, 

455 "show_channels": flask.request.args.get("channels"), 

456 "show_summary": flask.request.args.get("summary"), 

457 "show_screenshot": flask.request.args.get("screenshot"), 

458 } 

459 ) 

460 

461 return ( 

462 flask.render_template("store/snap-embedded-card.html", **context), 

463 status_code, 

464 ) 

465 

466 @store.route('/<regex("' + snap_regex_upercase + '"):snap_name>') 

467 def snap_details_case_sensitive(snap_name): 

468 return flask.redirect( 

469 flask.url_for(".snap_details", snap_name=snap_name.lower()) 

470 ) 

471 

472 def get_badge_svg(snap_name, left_text, right_text, color="#0e8420"): 

473 show_name = flask.request.args.get("name", default=1, type=int) 

474 snap_link = flask.request.url_root + snap_name 

475 

476 svg = badge( 

477 left_text=left_text if show_name else "", 

478 right_text=right_text, 

479 right_color=color, 

480 left_link=snap_link, 

481 right_link=snap_link, 

482 logo=( 

483 "data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' " 

484 "viewBox='0 0 32 32'%3E%3Cdefs%3E%3Cstyle%3E.cls-1%7Bfill:%23f" 

485 "ff%7D%3C/style%3E%3C/defs%3E%3Cpath class='cls-1' d='M18.03 1" 

486 "8.03l5.95-5.95-5.95-2.65v8.6zM6.66 29.4l10.51-10.51-3.21-3.18" 

487 "-7.3 13.69zM2.5 3.6l15.02 14.94V9.03L2.5 3.6zM27.03 9.03h-8.6" 

488 "5l11.12 4.95-2.47-4.95z'/%3E%3C/svg%3E" 

489 ), 

490 ) 

491 return svg 

492 

493 @store.route('/<regex("' + snap_regex + '"):snap_name>/badge.svg') 

494 def snap_details_badge(snap_name): 

495 context = _get_context_snap_details(snap_name) 

496 

497 # channel with safest risk available in default track 

498 snap_channel = "".join( 

499 [context["default_track"], "/", context["lowest_risk_available"]] 

500 ) 

501 

502 svg = get_badge_svg( 

503 snap_name=snap_name, 

504 left_text=context["snap_title"], 

505 right_text=snap_channel + " " + context["version"], 

506 ) 

507 

508 return svg, 200, {"Content-Type": "image/svg+xml"} 

509 

510 @store.route("/<lang>/<theme>/install.svg") 

511 def snap_install_badge(lang, theme): 

512 base_path = "static/images/badges/" 

513 allowed_langs = helpers.list_folders(base_path) 

514 

515 if lang not in allowed_langs: 

516 return Response("Invalid language", status=400) 

517 

518 file_name = ( 

519 "snap-store-white.svg" 

520 if theme == "light" 

521 else "snap-store-black.svg" 

522 ) 

523 

524 svg_path = os.path.normpath(os.path.join(base_path, lang, file_name)) 

525 

526 # Ensure the path is within the base path 

527 if not svg_path.startswith(base_path) or not os.path.exists(svg_path): 

528 return Response( 

529 '<svg height="20" width="1" ' 

530 'xmlns="http://www.w3.org/2000/svg" ' 

531 'xmlns:xlink="http://www.w3.org/1999/xlink"></svg>', 

532 mimetype="image/svg+xml", 

533 status=404, 

534 ) 

535 else: 

536 with open(svg_path, "r") as svg_file: 

537 svg_content = svg_file.read() 

538 return Response(svg_content, mimetype="image/svg+xml") 

539 

540 @store.route('/<regex("' + snap_regex + '"):snap_name>/trending.svg') 

541 def snap_details_badge_trending(snap_name): 

542 is_preview = flask.request.args.get("preview", default=0, type=int) 

543 context = _get_context_snap_details(snap_name) 

544 

545 # default to empty SVG 

546 svg = ( 

547 '<svg height="20" width="1" xmlns="http://www.w3.org/2000/svg" ' 

548 'xmlns:xlink="http://www.w3.org/1999/xlink"></svg>' 

549 ) 

550 

551 # publishers can see preview of trending badge of their own snaps 

552 # on Publicise page 

553 show_as_preview = False 

554 if is_preview and authentication.is_authenticated(flask.session): 

555 show_as_preview = True 

556 

557 if context["trending"] or show_as_preview: 

558 svg = get_badge_svg( 

559 snap_name=snap_name, 

560 left_text=context["snap_title"], 

561 right_text="Trending this week", 

562 color="#FA7041", 

563 ) 

564 

565 return svg, 200, {"Content-Type": "image/svg+xml"} 

566 

567 @store.route('/install/<regex("' + snap_regex + '"):snap_name>/<distro>') 

568 def snap_distro_install(snap_name, distro): 

569 filename = f"store/content/distros/{distro}.yaml" 

570 distro_data = helpers.get_yaml(filename) 

571 

572 if not distro_data: 

573 flask.abort(404) 

574 

575 supported_archs = distro_data["supported-archs"] 

576 context = _get_context_snap_details(snap_name, supported_archs) 

577 

578 if all(arch not in context["channel_map"] for arch in supported_archs): 

579 return flask.render_template("404.html"), 404 

580 

581 context.update( 

582 { 

583 "distro": distro, 

584 "distro_name": distro_data["name"], 

585 "distro_logo": distro_data["logo"], 

586 "distro_logo_mono": distro_data["logo-mono"], 

587 "distro_color_1": distro_data["color-1"], 

588 "distro_color_2": distro_data["color-2"], 

589 "distro_install_steps": distro_data["install"], 

590 } 

591 ) 

592 cached_featured_snaps = redis_cache.get( 

593 "featured_snaps_install_pages", expected_type=list 

594 ) 

595 if cached_featured_snaps: 

596 context.update({"featured_snaps": cached_featured_snaps}) 

597 return flask.render_template( 

598 "store/snap-distro-install.html", **context 

599 ) 

600 try: 

601 featured_snaps_results = device_gateway.get_featured_items( 

602 size=13, page=1 

603 ).get("results", []) 

604 

605 except StoreApiError: 

606 featured_snaps_results = [] 

607 featured_snaps = [ 

608 snap 

609 for snap in featured_snaps_results 

610 if snap["package_name"] != snap_name 

611 ][:12] 

612 

613 for snap in featured_snaps: 

614 snap["icon_url"] = helpers.get_icon(snap["media"]) 

615 redis_cache.set( 

616 "featured_snaps_install_pages", featured_snaps, ttl=3600 

617 ) 

618 context.update({"featured_snaps": featured_snaps}) 

619 return flask.render_template( 

620 "store/snap-distro-install.html", **context 

621 ) 

622 

623 @store.route("/report", methods=["POST"]) 

624 def report_snap(): 

625 form_url = flask.current_app.config.get("REPORT_SHEET_URL") 

626 if not form_url: 

627 logger.warning("REPORT_SHEET_URL is not configured") 

628 return flask.jsonify({"error": "report_url_missing"}), 503 

629 

630 fields = flask.request.form 

631 

632 # If the honeypot is activated (hidden field populated 

633 # silently reject to avoid spam 

634 if "confirm" in fields: 

635 return flask.jsonify({"ok": True}), 200 

636 

637 if not verify_turnstile(fields.get("cf-turnstile-response", "")): 

638 return flask.jsonify({"error": "turnstile_failed"}), 400 

639 

640 payload = { 

641 "snap_name": fields.get("snap_name", ""), 

642 "reason": fields.get("reason", ""), 

643 "comment": fields.get("comment", ""), 

644 "email": fields.get("email", ""), 

645 } 

646 

647 try: 

648 response = requests.post(form_url, data=payload) 

649 if not response.ok: 

650 logger.warning( 

651 "Report sheet webhook returned %s", 

652 response.status_code, 

653 ) 

654 return flask.jsonify({"error": "report_failed"}), 502 

655 except requests.RequestException: 

656 logger.exception("Report sheet webhook request failed") 

657 return flask.jsonify({"error": "report_failed"}), 502 

658 

659 return flask.jsonify({"ok": True}), 200