Coverage for webapp/store/snap_details_views.py: 80%

230 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-15 22:43 +0000

1import flask 

2from flask import Response 

3import requests 

4 

5import logging 

6import humanize 

7import os 

8 

9import webapp.helpers as helpers 

10import webapp.metrics.helper as metrics_helper 

11import webapp.metrics.metrics as metrics 

12import webapp.store.logic as logic 

13from webapp import authentication 

14from webapp.markdown import parse_markdown_description 

15from cache.cache_utility import redis_cache 

16 

17from canonicalwebteam.flask_base.decorators import ( 

18 exclude_xframe_options_header, 

19) 

20from canonicalwebteam.exceptions import StoreApiError 

21from canonicalwebteam.store_api.devicegw import DeviceGW 

22from pybadges import badge 

23 

24device_gateway = DeviceGW("snap", helpers.api_session) 

25device_gateway_sbom = DeviceGW("sbom", helpers.api_session) 

26 

27logger = logging.getLogger(__name__) 

28 

29 

30FIELDS = [ 

31 "title", 

32 "summary", 

33 "description", 

34 "license", 

35 "contact", 

36 "website", 

37 "publisher", 

38 "media", 

39 "download", 

40 "version", 

41 "created-at", 

42 "confinement", 

43 "categories", 

44 "trending", 

45 "unlisted", 

46 "links", 

47 "revision", 

48] 

49 

50FIELDS_EXTRA_DETAILS = [ 

51 "aliases", 

52] 

53 

54 

55def snap_details_views(store): 

56 snap_regex = "[a-z0-9-]*[a-z][a-z0-9-]*" 

57 snap_regex_upercase = "[A-Za-z0-9-]*[A-Za-z][A-Za-z0-9-]*" 

58 

59 def _get_context_snap_details(snap_name, supported_architectures=None): 

60 details = device_gateway.get_item_details( 

61 snap_name, fields=FIELDS, api_version=2 

62 ) 

63 # 404 for any snap under quarantine 

64 if details["snap"]["publisher"]["username"] == "snap-quarantine": 

65 flask.abort(404, "No snap named {}".format(snap_name)) 

66 

67 # When removing all the channel maps of an existing snap the API, 

68 # responds that the snaps still exists with data. 

69 # Return a 404 if not channel maps, to avoid having a error. 

70 # For example: mir-kiosk-browser 

71 if not details.get("channel-map"): 

72 flask.abort(404, "No snap named {}".format(snap_name)) 

73 

74 formatted_description = parse_markdown_description( 

75 details.get("snap", {}).get("description", "") 

76 ) 

77 

78 channel_maps_list = logic.convert_channel_maps( 

79 details.get("channel-map") 

80 ) 

81 

82 latest_channel = logic.get_last_updated_version( 

83 details.get("channel-map") 

84 ) 

85 

86 revisions = logic.get_revisions(details.get("channel-map")) 

87 

88 default_track = ( 

89 details.get("default-track") 

90 if details.get("default-track") 

91 else "latest" 

92 ) 

93 

94 lowest_risk_available = logic.get_lowest_available_risk( 

95 channel_maps_list, default_track 

96 ) 

97 

98 extracted_info = logic.extract_info_channel_map( 

99 channel_maps_list, default_track, lowest_risk_available 

100 ) 

101 

102 last_updated = latest_channel["channel"]["released-at"] 

103 updates = logic.get_latest_versions( 

104 details.get("channel-map"), 

105 default_track, 

106 lowest_risk_available, 

107 supported_architectures, 

108 ) 

109 

110 # Determine the most recent update date from updates tuple 

111 # updates[0] is the stable channel, updates[1] is the most 

112 # recent non-stable 

113 most_recent_update = None 

114 if updates[0] and updates[1]: 

115 # Compare both and use the most recent 

116 date_0 = updates[0].get("released-at") 

117 date_1 = updates[1].get("released-at") 

118 if date_0 and date_1: 

119 most_recent_update = max(date_0, date_1) 

120 else: 

121 most_recent_update = date_0 or date_1 

122 elif updates[0]: 

123 most_recent_update = updates[0].get("released-at") 

124 elif updates[1]: 

125 most_recent_update = updates[1].get("released-at") 

126 

127 binary_filesize = latest_channel["download"]["size"] 

128 

129 # filter out banner and banner-icon images from screenshots 

130 screenshots = logic.filter_screenshots( 

131 details.get("snap", {}).get("media", []) 

132 ) 

133 

134 icon_url = helpers.get_icon(details.get("snap", {}).get("media", [])) 

135 

136 publisher_info = helpers.get_yaml( 

137 "{}{}.yaml".format( 

138 flask.current_app.config["CONTENT_DIRECTORY"][ 

139 "PUBLISHER_PAGES" 

140 ], 

141 details["snap"]["publisher"]["username"], 

142 ), 

143 typ="safe", 

144 ) 

145 

146 publisher_snaps = helpers.get_yaml( 

147 "{}{}-snaps.yaml".format( 

148 flask.current_app.config["CONTENT_DIRECTORY"][ 

149 "PUBLISHER_PAGES" 

150 ], 

151 details["snap"]["publisher"]["username"], 

152 ), 

153 typ="safe", 

154 ) 

155 

156 publisher_featured_snaps = None 

157 

158 if publisher_info: 

159 publisher_featured_snaps = publisher_info.get("featured_snaps") 

160 publisher_snaps = logic.get_n_random_snaps( 

161 publisher_snaps["snaps"], 4 

162 ) 

163 

164 video = logic.get_video(details.get("snap", {}).get("media", [])) 

165 

166 is_users_snap = False 

167 if authentication.is_authenticated(flask.session): 

168 if ( 

169 flask.session.get("publisher").get("nickname") 

170 == details["snap"]["publisher"]["username"] 

171 ): 

172 is_users_snap = True 

173 

174 # build list of categories of a snap 

175 categories = logic.get_snap_categories( 

176 details.get("snap", {}).get("categories", []) 

177 ) 

178 

179 developer = logic.get_snap_developer(details["name"]) 

180 

181 is_last_updated_old = logic.is_snap_old(last_updated) 

182 

183 context = { 

184 "snap_id": details.get("snap-id"), 

185 # Data direct from details API 

186 "snap_title": details["snap"]["title"], 

187 "package_name": details["name"], 

188 "categories": categories, 

189 "icon_url": icon_url, 

190 "version": extracted_info["version"], 

191 "license": details["snap"]["license"], 

192 "publisher": details["snap"]["publisher"]["display-name"], 

193 "username": details["snap"]["publisher"]["username"], 

194 "screenshots": screenshots, 

195 "video": video, 

196 "publisher_snaps": publisher_snaps, 

197 "publisher_featured_snaps": publisher_featured_snaps, 

198 "has_publisher_page": publisher_info is not None, 

199 "contact": details["snap"].get("contact"), 

200 "website": details["snap"].get("website"), 

201 "summary": details["snap"]["summary"], 

202 "description": formatted_description, 

203 "channel_map": channel_maps_list, 

204 "has_stable": logic.has_stable(channel_maps_list), 

205 "developer_validation": details["snap"]["publisher"]["validation"], 

206 "default_track": default_track, 

207 "lowest_risk_available": lowest_risk_available, 

208 "confinement": extracted_info["confinement"], 

209 "trending": details.get("snap", {}).get("trending", False), 

210 # Transformed API data 

211 "filesize": humanize.naturalsize(binary_filesize), 

212 "last_updated": logic.convert_date(last_updated), 

213 "last_updated_raw": last_updated, 

214 "is_snap_old": logic.is_snap_old(most_recent_update), 

215 "is_last_updated_old": is_last_updated_old, 

216 "is_users_snap": is_users_snap, 

217 "unlisted": details.get("snap", {}).get("unlisted", False), 

218 "developer": developer, 

219 # TODO: This is horrible and hacky 

220 "appliances": { 

221 "adguard-home": "adguard", 

222 "mosquitto": "mosquitto", 

223 "nextcloud": "nextcloud", 

224 "plexmediaserver": "plex", 

225 "openhab": "openhab", 

226 }, 

227 "links": details["snap"].get("links"), 

228 "updates": updates, 

229 "revisions": revisions, 

230 "turnstile_site_key": ( 

231 flask.current_app.config.get("TURNSTILE_SITE_KEY", "") 

232 if flask.current_app.config.get("TURNSTILE_SECRET_KEY") 

233 else "" 

234 ), 

235 } 

236 return context 

237 

238 def verify_turnstile(turnstile_response): 

239 turnstile_secret = flask.current_app.config.get( 

240 "TURNSTILE_SECRET_KEY", "" 

241 ) 

242 if not turnstile_secret: 

243 return True 

244 

245 if not turnstile_response: 

246 logger.warning("Turnstile token missing from report form") 

247 return False 

248 

249 payload = { 

250 "secret": turnstile_secret, 

251 "response": turnstile_response, 

252 } 

253 

254 try: 

255 response = requests.post( 

256 flask.current_app.config["TURNSTILE_VERIFY_URL"], 

257 data=payload, 

258 timeout=10, 

259 ) 

260 if not response.ok: 

261 logger.warning( 

262 "Turnstile verification returned %s", 

263 response.status_code, 

264 ) 

265 return False 

266 verification = response.json() 

267 except (requests.RequestException, ValueError): 

268 logger.exception("Turnstile verification failed") 

269 return False 

270 

271 if not verification.get("success"): 

272 logger.warning( 

273 "Turnstile verification denied report: %s", 

274 verification.get("error-codes", []), 

275 ) 

276 return False 

277 

278 return True 

279 

280 def snap_has_sboms(revisions, snap_id): 

281 if not revisions: 

282 return False 

283 

284 sbom_path = f"download/sbom_snap_{snap_id}_{revisions[0]}.spdx2.3.json" 

285 endpoint = device_gateway_sbom.get_endpoint_url(sbom_path) 

286 

287 res = requests.head(endpoint) 

288 

289 # backend returns 302 instead of 200 for a successful request 

290 # adding the check for 200 in case this is changed without us knowing 

291 if res.status_code == 200 or res.status_code == 302: 

292 return True 

293 

294 return False 

295 

296 @store.route("/download/sbom_snap_<snap_id>_<revision>.spdx2.3.json") 

297 def get_sbom(snap_id, revision): 

298 sbom_path = f"download/sbom_snap_{snap_id}_{revision}.spdx2.3.json" 

299 endpoint = device_gateway_sbom.get_endpoint_url(sbom_path) 

300 

301 res = requests.get(endpoint) 

302 

303 return flask.jsonify(res.json()) 

304 

305 @store.route('/<regex("' + snap_regex + '"):snap_name>') 

306 def snap_details(snap_name): 

307 """ 

308 A view to display the snap details page for specific snaps. 

309 

310 This queries the snapcraft API (api.snapcraft.io) and passes 

311 some of the data through to the snap-details.html template, 

312 with appropriate sanitation. 

313 """ 

314 

315 error_info = {} 

316 status_code = 200 

317 

318 context = _get_context_snap_details(snap_name) 

319 try: 

320 # the empty string channel makes the store API not filter by 

321 # the default channel 'latest/stable', which gives errors for 

322 # snaps that don't use that channel 

323 extra_details = device_gateway.get_snap_details( 

324 snap_name, channel="", fields=FIELDS_EXTRA_DETAILS 

325 ) 

326 except Exception: 

327 logger.exception("Details endpoint returned an error") 

328 extra_details = None 

329 

330 if extra_details and extra_details["aliases"]: 

331 context["aliases"] = [ 

332 [ 

333 f"{extra_details['package_name']}.{alias_obj['target']}", 

334 alias_obj["name"], 

335 ] 

336 for alias_obj in extra_details["aliases"] 

337 ] 

338 

339 country_metric_name = "weekly_installed_base_by_country_percent" 

340 os_metric_name = "weekly_installed_base_by_operating_system_normalized" 

341 

342 end = metrics_helper.get_last_metrics_processed_date() 

343 

344 metrics_query_json = [ 

345 metrics_helper.get_filter( 

346 metric_name=country_metric_name, 

347 snap_id=context["snap_id"], 

348 start=end, 

349 end=end, 

350 ), 

351 metrics_helper.get_filter( 

352 metric_name=os_metric_name, 

353 snap_id=context["snap_id"], 

354 start=end, 

355 end=end, 

356 ), 

357 ] 

358 

359 metrics_response = device_gateway.get_public_metrics( 

360 metrics_query_json 

361 ) 

362 

363 os_metrics = None 

364 country_devices = None 

365 if metrics_response: 

366 oses = metrics_helper.find_metric(metrics_response, os_metric_name) 

367 os_metrics = metrics.OsMetric( 

368 name=oses["metric_name"], 

369 series=oses["series"], 

370 buckets=oses["buckets"], 

371 status=oses["status"], 

372 ) 

373 

374 territories = metrics_helper.find_metric( 

375 metrics_response, country_metric_name 

376 ) 

377 country_devices = metrics.CountryDevices( 

378 name=territories["metric_name"], 

379 series=territories["series"], 

380 buckets=territories["buckets"], 

381 status=territories["status"], 

382 private=False, 

383 ) 

384 

385 has_sboms = snap_has_sboms(context["revisions"], context["snap_id"]) 

386 

387 context.update( 

388 { 

389 "countries": ( 

390 country_devices.country_data if country_devices else None 

391 ), 

392 "normalized_os": os_metrics.os if os_metrics else None, 

393 # Context info 

394 "is_linux": ( 

395 "Linux" in flask.request.headers.get("User-Agent", "") 

396 and "Android" 

397 not in flask.request.headers.get("User-Agent", "") 

398 ), 

399 "error_info": error_info, 

400 } 

401 ) 

402 

403 context["has_sboms"] = has_sboms 

404 

405 return ( 

406 flask.render_template("store/snap-details.html", **context), 

407 status_code, 

408 ) 

409 

410 @store.route('/<regex("' + snap_regex + '"):snap_name>/embedded') 

411 @exclude_xframe_options_header 

412 def snap_details_embedded(snap_name): 

413 """ 

414 A view to display the snap embedded card for specific snaps. 

415 

416 This queries the snapcraft API (api.snapcraft.io) and passes 

417 some of the data through to the template, 

418 with appropriate sanitation. 

419 """ 

420 status_code = 200 

421 

422 context = _get_context_snap_details(snap_name) 

423 

424 button_variants = ["black", "white"] 

425 button = flask.request.args.get("button") 

426 if button and button not in button_variants: 

427 button = "black" 

428 

429 architectures = list(context["channel_map"].keys()) 

430 

431 context.update( 

432 { 

433 "default_architecture": ( 

434 "amd64" if "amd64" in architectures else architectures[0] 

435 ), 

436 "button": button, 

437 "show_channels": flask.request.args.get("channels"), 

438 "show_summary": flask.request.args.get("summary"), 

439 "show_screenshot": flask.request.args.get("screenshot"), 

440 } 

441 ) 

442 

443 return ( 

444 flask.render_template("store/snap-embedded-card.html", **context), 

445 status_code, 

446 ) 

447 

448 @store.route('/<regex("' + snap_regex_upercase + '"):snap_name>') 

449 def snap_details_case_sensitive(snap_name): 

450 return flask.redirect( 

451 flask.url_for(".snap_details", snap_name=snap_name.lower()) 

452 ) 

453 

454 def get_badge_svg(snap_name, left_text, right_text, color="#0e8420"): 

455 show_name = flask.request.args.get("name", default=1, type=int) 

456 snap_link = flask.request.url_root + snap_name 

457 

458 svg = badge( 

459 left_text=left_text if show_name else "", 

460 right_text=right_text, 

461 right_color=color, 

462 left_link=snap_link, 

463 right_link=snap_link, 

464 logo=( 

465 "data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' " 

466 "viewBox='0 0 32 32'%3E%3Cdefs%3E%3Cstyle%3E.cls-1%7Bfill:%23f" 

467 "ff%7D%3C/style%3E%3C/defs%3E%3Cpath class='cls-1' d='M18.03 1" 

468 "8.03l5.95-5.95-5.95-2.65v8.6zM6.66 29.4l10.51-10.51-3.21-3.18" 

469 "-7.3 13.69zM2.5 3.6l15.02 14.94V9.03L2.5 3.6zM27.03 9.03h-8.6" 

470 "5l11.12 4.95-2.47-4.95z'/%3E%3C/svg%3E" 

471 ), 

472 ) 

473 return svg 

474 

475 @store.route('/<regex("' + snap_regex + '"):snap_name>/badge.svg') 

476 def snap_details_badge(snap_name): 

477 context = _get_context_snap_details(snap_name) 

478 

479 # channel with safest risk available in default track 

480 snap_channel = "".join( 

481 [context["default_track"], "/", context["lowest_risk_available"]] 

482 ) 

483 

484 svg = get_badge_svg( 

485 snap_name=snap_name, 

486 left_text=context["snap_title"], 

487 right_text=snap_channel + " " + context["version"], 

488 ) 

489 

490 return svg, 200, {"Content-Type": "image/svg+xml"} 

491 

492 @store.route("/<lang>/<theme>/install.svg") 

493 def snap_install_badge(lang, theme): 

494 base_path = "static/images/badges/" 

495 allowed_langs = helpers.list_folders(base_path) 

496 

497 if lang not in allowed_langs: 

498 return Response("Invalid language", status=400) 

499 

500 file_name = ( 

501 "snap-store-white.svg" 

502 if theme == "light" 

503 else "snap-store-black.svg" 

504 ) 

505 

506 svg_path = os.path.normpath(os.path.join(base_path, lang, file_name)) 

507 

508 # Ensure the path is within the base path 

509 if not svg_path.startswith(base_path) or not os.path.exists(svg_path): 

510 return Response( 

511 '<svg height="20" width="1" ' 

512 'xmlns="http://www.w3.org/2000/svg" ' 

513 'xmlns:xlink="http://www.w3.org/1999/xlink"></svg>', 

514 mimetype="image/svg+xml", 

515 status=404, 

516 ) 

517 else: 

518 with open(svg_path, "r") as svg_file: 

519 svg_content = svg_file.read() 

520 return Response(svg_content, mimetype="image/svg+xml") 

521 

522 @store.route('/<regex("' + snap_regex + '"):snap_name>/trending.svg') 

523 def snap_details_badge_trending(snap_name): 

524 is_preview = flask.request.args.get("preview", default=0, type=int) 

525 context = _get_context_snap_details(snap_name) 

526 

527 # default to empty SVG 

528 svg = ( 

529 '<svg height="20" width="1" xmlns="http://www.w3.org/2000/svg" ' 

530 'xmlns:xlink="http://www.w3.org/1999/xlink"></svg>' 

531 ) 

532 

533 # publishers can see preview of trending badge of their own snaps 

534 # on Publicise page 

535 show_as_preview = False 

536 if is_preview and authentication.is_authenticated(flask.session): 

537 show_as_preview = True 

538 

539 if context["trending"] or show_as_preview: 

540 svg = get_badge_svg( 

541 snap_name=snap_name, 

542 left_text=context["snap_title"], 

543 right_text="Trending this week", 

544 color="#FA7041", 

545 ) 

546 

547 return svg, 200, {"Content-Type": "image/svg+xml"} 

548 

549 @store.route('/install/<regex("' + snap_regex + '"):snap_name>/<distro>') 

550 def snap_distro_install(snap_name, distro): 

551 filename = f"store/content/distros/{distro}.yaml" 

552 distro_data = helpers.get_yaml(filename) 

553 

554 if not distro_data: 

555 flask.abort(404) 

556 

557 supported_archs = distro_data["supported-archs"] 

558 context = _get_context_snap_details(snap_name, supported_archs) 

559 

560 if all(arch not in context["channel_map"] for arch in supported_archs): 

561 return flask.render_template("404.html"), 404 

562 

563 context.update( 

564 { 

565 "distro": distro, 

566 "distro_name": distro_data["name"], 

567 "distro_logo": distro_data["logo"], 

568 "distro_logo_mono": distro_data["logo-mono"], 

569 "distro_color_1": distro_data["color-1"], 

570 "distro_color_2": distro_data["color-2"], 

571 "distro_install_steps": distro_data["install"], 

572 } 

573 ) 

574 cached_featured_snaps = redis_cache.get( 

575 "featured_snaps_install_pages", expected_type=list 

576 ) 

577 if cached_featured_snaps: 

578 context.update({"featured_snaps": cached_featured_snaps}) 

579 return flask.render_template( 

580 "store/snap-distro-install.html", **context 

581 ) 

582 try: 

583 featured_snaps_results = device_gateway.get_featured_items( 

584 size=13, page=1 

585 ).get("results", []) 

586 

587 except StoreApiError: 

588 featured_snaps_results = [] 

589 featured_snaps = [ 

590 snap 

591 for snap in featured_snaps_results 

592 if snap["package_name"] != snap_name 

593 ][:12] 

594 

595 for snap in featured_snaps: 

596 snap["icon_url"] = helpers.get_icon(snap["media"]) 

597 redis_cache.set( 

598 "featured_snaps_install_pages", featured_snaps, ttl=3600 

599 ) 

600 context.update({"featured_snaps": featured_snaps}) 

601 return flask.render_template( 

602 "store/snap-distro-install.html", **context 

603 ) 

604 

605 @store.route("/report", methods=["POST"]) 

606 def report_snap(): 

607 form_url = flask.current_app.config.get("REPORT_SHEET_URL") 

608 if not form_url: 

609 logger.warning("REPORT_SHEET_URL is not configured") 

610 return flask.jsonify({"error": "report_url_missing"}), 503 

611 

612 fields = flask.request.form 

613 

614 # If the honeypot is activated (hidden field populated 

615 # silently reject to avoid spam 

616 if "confirm" in fields: 

617 return flask.jsonify({"ok": True}), 200 

618 

619 if not verify_turnstile(fields.get("cf-turnstile-response", "")): 

620 return flask.jsonify({"error": "turnstile_failed"}), 400 

621 

622 payload = { 

623 "snap_name": fields.get("snap_name", ""), 

624 "reason": fields.get("reason", ""), 

625 "comment": fields.get("comment", ""), 

626 "email": fields.get("email", ""), 

627 } 

628 

629 try: 

630 response = requests.post(form_url, data=payload) 

631 if not response.ok: 

632 logger.warning( 

633 "Report sheet webhook returned %s", 

634 response.status_code, 

635 ) 

636 return flask.jsonify({"error": "report_failed"}), 502 

637 except requests.RequestException: 

638 logger.exception("Report sheet webhook request failed") 

639 return flask.jsonify({"error": "report_failed"}), 502 

640 

641 return flask.jsonify({"ok": True}), 200