Coverage for webapp/store/logic.py: 81%

274 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-08 22:07 +0000

1import sys 

2import datetime 

3import json 

4from collections import OrderedDict 

5import re 

6import humanize 

7from dateutil import parser 

8from mistune import html 

9from canonicalwebteam.docstring_extractor import get_docstrings 

10from webapp.helpers import ( 

11 discourse_api, 

12 get_yaml_loader, 

13 markdown_to_html, 

14 get_soup, 

15 modify_headers, 

16) 

17from webapp.observability.utils import trace_function 

18 

19yaml = get_yaml_loader() 

20 

21PLATFORMS = { 

22 "ubuntu": "Ubuntu", 

23 "centos": "CentOS", 

24} 

25 

26ARCHITECTURES = ["amd64", "arm64", "ppc64el", "riscv64", "s390x"] 

27 

28 

29@trace_function 

30def get_summary(package): 

31 if package["type"] == "bundle": 

32 summary = ( 

33 package.get("store_front", {}) 

34 .get("bundle", {}) 

35 .get("summary", None) 

36 ) 

37 else: 

38 summary = ( 

39 package.get("store_front", {}) 

40 .get("metadata", {}) 

41 .get("summary", None) 

42 ) 

43 return summary 

44 

45 

46@trace_function 

47def get_description(package, parse_to_html=False): 

48 if package["type"] == "bundle": 

49 description = ( 

50 package.get("store_front", {}) 

51 .get("bundle", {}) 

52 .get("description", None) 

53 ) 

54 else: 

55 description = ( 

56 package.get("store_front", {}) 

57 .get("metadata", {}) 

58 .get("description", None) 

59 ) 

60 return markdown_to_html(description) if parse_to_html else description 

61 

62 

63@trace_function 

64def get_banner_url(media): 

65 """ 

66 Get banner url from media object 

67 

68 :param media: the media dictionnary 

69 :returns: the banner url 

70 """ 

71 for m in media: 

72 if m["type"] == "banner": 

73 return m["url"] 

74 

75 return None 

76 

77 

78@trace_function 

79def get_channel_map(channel_map): 

80 """ 

81 Reformat channel map to return a channel map 

82 with unique risk 

83 

84 :param channel_map: the channel map from the api 

85 :returns: the channel map reformatted 

86 """ 

87 new_map = [] 

88 for channel in channel_map: 

89 for res in new_map: 

90 if channel["channel"]["name"] == res["channel"]["name"]: 

91 break 

92 else: 

93 new_map.append(channel) 

94 

95 return new_map 

96 

97 

98@trace_function 

99def convert_channel_maps(channel_map): 

100 """ 

101 Converts channel maps list to format easier to manipulate 

102 

103 :param channel_maps: The channel maps list returned by the API 

104 

105 :returns: The channel maps reshaped 

106 """ 

107 result = {} 

108 track_order = {"latest": 1} 

109 risk_order = {"stable": 1, "candidate": 2, "beta": 3, "edge": 4} 

110 for channel in channel_map: 

111 track = channel["channel"].get("track", "latest") 

112 risk = channel["channel"]["risk"] 

113 revision_number = channel["revision"]["revision"] 

114 

115 if track not in result: 

116 result[track] = {} 

117 

118 if risk not in result[track]: 

119 result[track][risk] = {"latest": None, "releases": {}} 

120 

121 # same revision but for a different arch 

122 if revision_number in result[track][risk]["releases"]: 

123 arch = channel["channel"]["base"]["architecture"] 

124 

125 if arch == "all": 

126 result[track][risk]["releases"][revision_number][ 

127 "architectures" 

128 ].update(ARCHITECTURES) 

129 else: 

130 result[track][risk]["releases"][revision_number][ 

131 "architectures" 

132 ].add(arch) 

133 continue 

134 

135 info = { 

136 "released_at": channel["channel"]["released-at"], 

137 "release_date": convert_date(channel["channel"]["released-at"]), 

138 "version": channel["revision"]["version"], 

139 "channel": channel["channel"]["name"], 

140 "risk": channel["channel"]["risk"], 

141 "size": channel["revision"]["download"]["size"], 

142 "bases": extract_series(channel, True), 

143 "channel_bases": extract_bases(channel), 

144 "revision": process_revision(channel["revision"]), 

145 "architectures": set(), 

146 } 

147 

148 if channel["channel"]["base"]: 

149 arch = channel["channel"]["base"]["architecture"] 

150 if arch == "all": 

151 info["architectures"].update(ARCHITECTURES) 

152 else: 

153 info["architectures"].add(arch) 

154 

155 result[track][risk]["releases"][revision_number] = info 

156 

157 # Order tracks (latest track first) 

158 result = OrderedDict( 

159 sorted( 

160 result.items(), key=lambda x: track_order.get(x[0], sys.maxsize) 

161 ) 

162 ) 

163 

164 # Order risks (stable, candidate, beta, edge) 

165 for track, track_data in result.items(): 

166 result[track] = OrderedDict( 

167 sorted( 

168 track_data.items(), 

169 key=lambda x: risk_order.get(x[0], sys.maxsize), 

170 ) 

171 ) 

172 

173 # Order releases by revision 

174 for risk, data in result[track].items(): 

175 result[track][risk]["releases"] = OrderedDict( 

176 sorted( 

177 result[track][risk]["releases"].items(), 

178 key=lambda release: release[1]["released_at"], 

179 reverse=True, 

180 ) 

181 ) 

182 

183 # Collect all the bases available across all releases 

184 

185 base_names = sorted( 

186 list( 

187 set( 

188 base 

189 for release in result[track][risk]["releases"].values() 

190 for base in release["bases"] 

191 ) 

192 ), 

193 reverse=True, 

194 ) 

195 

196 result[track][risk]["all_bases"] = [ 

197 { 

198 "name": base, 

199 "architectures": sorted( 

200 list( 

201 set( 

202 arch 

203 for release in result[track][risk][ 

204 "releases" 

205 ].values() 

206 if base in release["bases"] 

207 for arch in release["architectures"] 

208 ) 

209 ) 

210 ), 

211 } 

212 for base in base_names 

213 ] 

214 

215 result[track][risk]["latest"] = result[track][risk]["releases"][ 

216 max(result[track][risk]["releases"].keys()) 

217 ] 

218 return result 

219 

220 

221@trace_function 

222def process_revision(revision): 

223 bases = [] 

224 

225 for base in revision["bases"]: 

226 if base and base.get("architecture") == "all": 

227 for arch in ARCHITECTURES: 

228 bases.append({**base, "architecture": arch}) 

229 else: 

230 bases.append(base) 

231 return {**revision, "bases": bases} 

232 

233 

234@trace_function 

235def extract_resources(channel): 

236 """ 

237 Extract resources from channel map 

238 

239 :param channel_maps: The channel maps list returned by the API 

240 

241 :returns: Charm resource names 

242 """ 

243 resources = [] 

244 

245 channel_resources = channel["resources"] 

246 

247 for resource in channel_resources: 

248 resources.append( 

249 {"name": resource["name"], "revision": resource["revision"]} 

250 ) 

251 

252 return resources 

253 

254 

255@trace_function 

256def extract_default_release_architectures(channel): 

257 architectures = set() 

258 

259 for base in channel["revision"]["bases"]: 

260 if not base or base["architecture"] in architectures: 

261 continue 

262 

263 arch = base["architecture"] 

264 if arch == "all": 

265 architectures.update(ARCHITECTURES) 

266 else: 

267 architectures.add(arch) 

268 

269 return sorted(architectures) 

270 

271 

272@trace_function 

273def extract_all_arch(channel_map, parent_dict): 

274 all_archy = set() 

275 all_channel_bases = {} 

276 platforms = {} 

277 

278 for version_data in channel_map.values(): 

279 channel_map_all = list(version_data.items()) 

280 for _, channel_data in channel_map_all: 

281 for release in channel_data["releases"].values(): 

282 all_archy = all_archy.union(release["architectures"]) 

283 

284 for base in release["channel_bases"]: 

285 for series in base["channels"]: 

286 platform = PLATFORMS.get(base["name"], base["name"]) 

287 

288 if base["name"] not in platforms: 

289 platforms[base["name"]] = set() 

290 platforms[base["name"]].add(series) 

291 

292 all_channel_bases[base["name"] + series] = ( 

293 f"{platform} {series}" 

294 ) 

295 

296 parent_dict["all_architectures"] = sorted(all_archy) 

297 parent_dict["all_platforms"] = platforms 

298 parent_dict["all_channel_bases"] = dict( 

299 sorted(all_channel_bases.items(), reverse=True) 

300 ) 

301 

302 return 

303 

304 

305@trace_function 

306def extract_series(channel, long_name=False): 

307 """ 

308 Extract ubuntu series from channel map 

309 

310 :param channel_maps: The channel maps list returned by the API 

311 

312 :returns: Ubuntu series number 

313 """ 

314 series = set() 

315 

316 for base in channel["revision"]["bases"]: 

317 if not base or base["channel"] in series: 

318 continue 

319 platform = PLATFORMS.get(base["name"], base["name"]) 

320 series.add( 

321 f"{platform} {base['channel']}" if long_name else base["channel"] 

322 ) 

323 

324 return sorted(series, reverse=True) 

325 

326 

327@trace_function 

328def extract_bases(channel): 

329 bases = channel["revision"]["bases"] 

330 channel_bases = [] 

331 

332 for i in bases: 

333 if i is None: 

334 return [] 

335 

336 has_base = False 

337 

338 for b in channel_bases: 

339 if b["name"] == i["name"]: 

340 has_base = True 

341 

342 if not has_base: 

343 channel_bases.append( 

344 { 

345 "name": i["name"], 

346 "channels": set(), 

347 } 

348 ) 

349 

350 for i in channel_bases: 

351 for b in bases: 

352 if b["name"] == i["name"]: 

353 i["channels"].add(b["channel"]) 

354 

355 i["channels"] = sorted(i["channels"], reverse=True) 

356 

357 return channel_bases 

358 

359 

360@trace_function 

361def convert_date(date_to_convert): 

362 """ 

363 Convert date to human readable format: Month Day Year 

364 

365 If date is less than a day return: today or yesterday 

366 

367 Format of date to convert: 2019-01-12T16:48:41.821037+00:00 

368 Output: Jan 12 2019 

369 

370 :param date_to_convert: Date to convert 

371 :returns: Readable date 

372 """ 

373 date_parsed = parser.parse(date_to_convert).replace(tzinfo=None) 

374 delta = datetime.datetime.now() - datetime.timedelta(days=1) 

375 if delta < date_parsed: 

376 return humanize.naturalday(date_parsed).title() 

377 else: 

378 return date_parsed.strftime("%d %b %Y") 

379 

380 

381@trace_function 

382def get_icons(package): 

383 media = package["result"]["media"] 

384 return [m["url"] for m in media if m["type"] == "icon"] 

385 

386 

387@trace_function 

388def get_docs_topic_id(metadata_yaml): 

389 """ 

390 Return discourse topic ID or None 

391 """ 

392 base_url = discourse_api.base_url 

393 docs_link = metadata_yaml.get("docs") 

394 

395 if docs_link: 

396 if docs_link.startswith(base_url): 

397 docs_link_parts = docs_link[len(base_url) :].split("/") 

398 

399 if len(docs_link_parts) > 2: 

400 topic_id = docs_link_parts[-1] 

401 

402 if topic_id.isnumeric(): 

403 return topic_id 

404 

405 return None 

406 

407 

408@trace_function 

409def convert_categories(api_categories): 

410 """ 

411 The name property in the API response has a slug 

412 like format, e.g., big-data 

413 

414 This method will return the desired name and an 

415 extra slug property with the value from the API 

416 """ 

417 result = [] 

418 

419 for category in api_categories: 

420 category["slug"] = category["name"] 

421 category["name"] = format_slug(category["slug"]) 

422 result.append(category) 

423 

424 return result 

425 

426 

427@trace_function 

428def add_store_front_data(package, details=False): 

429 extra = {} 

430 

431 extra["icons"] = get_icons(package) 

432 

433 if package["result"]["deployable-on"]: 

434 extra["deployable-on"] = package["result"]["deployable-on"] 

435 else: 

436 extra["deployable-on"] = ["vm"] 

437 

438 extra["categories"] = convert_categories(package["result"]["categories"]) 

439 

440 if "title" in package["result"] and package["result"]["title"]: 

441 extra["display-name"] = package["result"]["title"] 

442 else: 

443 extra["display-name"] = format_slug(package["name"]) 

444 

445 if details: 

446 extra["metadata"] = yaml.load( 

447 package["default-release"]["revision"]["metadata-yaml"] 

448 ) 

449 extra["config"] = yaml.load( 

450 package["default-release"]["revision"]["config-yaml"] 

451 ) 

452 extra["actions"] = yaml.load( 

453 package["default-release"]["revision"]["actions-yaml"] 

454 ) 

455 

456 if package["type"] == "bundle": 

457 extra["bundle"] = yaml.load( 

458 package["default-release"]["revision"]["bundle-yaml"] 

459 ) 

460 

461 # Get bundle docs 

462 extra["docs_topic"] = get_docs_topic_id(extra["bundle"]) 

463 

464 # List charms 

465 extra["bundle"]["charms"] = get_bundle_charms( 

466 extra["bundle"].get( 

467 "applications", extra["bundle"].get("services") 

468 ) 

469 ) 

470 else: 

471 # Get charm docs 

472 extra["docs_topic"] = get_docs_topic_id(extra["metadata"]) 

473 

474 # Reshape channel maps 

475 extra["channel_map"] = convert_channel_maps(package["channel-map"]) 

476 extra["resources"] = extract_resources(package["default-release"]) 

477 

478 # Extract all supported series 

479 extra["architectures"] = extract_default_release_architectures( 

480 package["default-release"] 

481 ) 

482 # extract all architecture based on series 

483 extract_all_arch(extra["channel_map"], extra) 

484 extra["series"] = extract_series(package["default-release"]) 

485 extra["channel_bases"] = extract_bases(package["default-release"]) 

486 

487 # Some needed fields 

488 extra["publisher_name"] = package["result"]["publisher"][ 

489 "display-name" 

490 ] 

491 extra["username"] = package["result"]["publisher"]["username"] 

492 

493 if "summary" in package["result"]: 

494 extra["summary"] = package["result"]["summary"] 

495 

496 # Handle issues and website keys 

497 if "issues" in extra["metadata"]: 

498 if not isinstance(extra["metadata"]["issues"], list): 

499 extra["metadata"]["issues"] = [extra["metadata"]["issues"]] 

500 

501 if "website" in extra["metadata"]: 

502 if not isinstance(extra["metadata"]["website"], list): 

503 extra["metadata"]["website"] = [extra["metadata"]["website"]] 

504 

505 package["store_front"] = extra 

506 return package 

507 

508 

509@trace_function 

510def get_bundle_charms(charm_apps): 

511 result = [] 

512 

513 if charm_apps: 

514 for app_name, data in charm_apps.items(): 

515 # Charm names could be with the old prefix/suffix 

516 # Like: cs:~charmed-osm/mariadb-k8s-35 

517 name = data["charm"] 

518 if name.startswith("cs:") or name.startswith("ch:"): 

519 name = re.match(r"(?:cs:|ch:)(?:.+/)?(\S*?)(?:-\d+)?$", name)[ 

520 1 

521 ] 

522 

523 charm = {"title": format_slug(name), "name": name} 

524 

525 result.append(charm) 

526 

527 return result 

528 

529 

530@trace_function 

531def process_python_docs(library, module_name): 

532 """Process libraries response from the API 

533 to generate the HTML output""" 

534 

535 # Obtain Python docstrings 

536 docstrings = get_docstrings(library["content"], module_name) 

537 

538 bs_soup = get_soup(html(docstrings["docstring_text"])) 

539 docstrings["html"] = modify_headers(bs_soup, 3) 

540 

541 return docstrings 

542 

543 

544@trace_function 

545def process_libraries(libraries): 

546 """Process the libraries response from the API""" 

547 

548 result = [] 

549 

550 for lib in libraries["libraries"]: 

551 data = { 

552 "id": lib["library-id"], 

553 "name": lib["library-name"], 

554 "hash": lib["hash"], 

555 "created_at": lib["created-at"], 

556 } 

557 

558 result.append(data) 

559 

560 return result 

561 

562 

563@trace_function 

564def get_library(library_name, libraries): 

565 library = next( 

566 (lib for lib in libraries if lib.get("name") == library_name), 

567 None, 

568 ) 

569 

570 if not library: 

571 return None 

572 

573 return library["id"] 

574 

575 

576@trace_function 

577def filter_charm(charm, categories=["all"], base="all"): 

578 """ 

579 This filter will be done in the API soon. 

580 :returns: boolean 

581 """ 

582 # When all is present there is no need to filter 

583 if categories and "all" not in categories: 

584 charm_categories = [ 

585 cat["slug"] for cat in charm["store_front"]["categories"] 

586 ] 

587 

588 if not any(x in categories for x in charm_categories): 

589 return False 

590 

591 # Filter platforms 

592 if base != "all" and base not in charm["store_front"]["base"]: 

593 return False 

594 

595 return True 

596 

597 

598@trace_function 

599def format_slug(slug): 

600 """Format slug name into a standard title format 

601 :param slug: The hypen spaced, lowercase slug to be formatted 

602 :return: The formatted string 

603 """ 

604 

605 return ( 

606 slug.title() 

607 .replace("-", " ") 

608 .replace("_", " ") 

609 .replace("And", "and") 

610 .replace("Iot", "IoT") 

611 ) 

612 

613 

614with open("webapp/store/overlay.json") as overlay_file: 

615 overlay = json.load(overlay_file) 

616 

617 

618@trace_function 

619def add_overlay_data(package): 

620 """ 

621 Adds custom hard-coded overlay.json data to the package object 

622 :param package: The package object retrieved from the snapcraft API 

623 :return: The package object with an additional "overlay" key 

624 containing extra info 

625 """ 

626 

627 if overlay.get(package["name"]) is not None: 

628 package["overlay_data"] = overlay[package["name"]].copy() 

629 

630 return package 

631 

632 

633@trace_function 

634def get_doc_link(package): 

635 """ 

636 Returns the documentation link of a package 

637 """ 

638 docs = package.get("result", {}).get("links", {}).get("docs", []) 

639 return docs[0] if docs else None