Coverage for webapp/store/logic.py: 59%

259 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-27 22:07 +0000

1import sys 

2import datetime 

3from collections import OrderedDict 

4import re 

5import humanize 

6from dateutil import parser 

7from mistune import html 

8from canonicalwebteam.docstring_extractor import get_docstrings 

9from webapp.helpers import get_soup, modify_headers 

10from webapp.helpers import ( 

11 discourse_api, 

12 get_yaml_loader, 

13) 

14from webapp.observability.utils import trace_function 

15 

16yaml = get_yaml_loader() 

17 

18PLATFORMS = { 

19 "ubuntu": "Ubuntu", 

20 "centos": "CentOS", 

21} 

22 

23ARCHITECTURES = ["amd64", "arm64", "ppc64el", "riscv64", "s390x"] 

24 

25 

26@trace_function 

27def add_description_and_summary(package): 

28 if package["type"] == "bundle": 

29 description = ( 

30 package.get("store_front", {}) 

31 .get("bundle", {}) 

32 .get("description", None) 

33 ) 

34 summary = ( 

35 package.get("store_front", {}) 

36 .get("bundle", {}) 

37 .get("summary", None) 

38 ) 

39 else: 

40 description = ( 

41 package.get("store_front", {}) 

42 .get("metadata", {}) 

43 .get("description", None) 

44 ) 

45 summary = ( 

46 package.get("store_front", {}) 

47 .get("metadata", {}) 

48 .get("summary", None) 

49 ) 

50 return description, summary 

51 

52 

53@trace_function 

54def get_banner_url(media): 

55 """ 

56 Get banner url from media object 

57 

58 :param media: the media dictionnary 

59 :returns: the banner url 

60 """ 

61 for m in media: 

62 if m["type"] == "banner": 

63 return m["url"] 

64 

65 return None 

66 

67 

68@trace_function 

69def get_channel_map(channel_map): 

70 """ 

71 Reformat channel map to return a channel map 

72 with unique risk 

73 

74 :param channel_map: the channel map from the api 

75 :returns: the channel map reformatted 

76 """ 

77 new_map = [] 

78 for channel in channel_map: 

79 for res in new_map: 

80 if channel["channel"]["name"] == res["channel"]["name"]: 

81 break 

82 else: 

83 new_map.append(channel) 

84 

85 return new_map 

86 

87 

88@trace_function 

89def convert_channel_maps(channel_map): 

90 """ 

91 Converts channel maps list to format easier to manipulate 

92 

93 :param channel_maps: The channel maps list returned by the API 

94 

95 :returns: The channel maps reshaped 

96 """ 

97 result = {} 

98 track_order = {"latest": 1} 

99 risk_order = {"stable": 1, "candidate": 2, "beta": 3, "edge": 4} 

100 for channel in channel_map: 

101 track = channel["channel"].get("track", "latest") 

102 risk = channel["channel"]["risk"] 

103 revision_number = channel["revision"]["revision"] 

104 

105 if track not in result: 

106 result[track] = {} 

107 

108 if risk not in result[track]: 

109 result[track][risk] = {"latest": None, "releases": {}} 

110 

111 # same revision but for a different arch 

112 if revision_number in result[track][risk]["releases"]: 

113 arch = channel["channel"]["base"]["architecture"] 

114 

115 if arch == "all": 

116 result[track][risk]["releases"][revision_number][ 

117 "architectures" 

118 ].update(ARCHITECTURES) 

119 else: 

120 result[track][risk]["releases"][revision_number][ 

121 "architectures" 

122 ].add(arch) 

123 continue 

124 

125 info = { 

126 "released_at": channel["channel"]["released-at"], 

127 "release_date": convert_date(channel["channel"]["released-at"]), 

128 "version": channel["revision"]["version"], 

129 "channel": channel["channel"]["name"], 

130 "risk": channel["channel"]["risk"], 

131 "size": channel["revision"]["download"]["size"], 

132 "bases": extract_series(channel, True), 

133 "channel_bases": extract_bases(channel), 

134 "revision": process_revision(channel["revision"]), 

135 "architectures": set(), 

136 } 

137 

138 if channel["channel"]["base"]: 

139 arch = channel["channel"]["base"]["architecture"] 

140 if arch == "all": 

141 info["architectures"].update(ARCHITECTURES) 

142 else: 

143 info["architectures"].add(arch) 

144 

145 result[track][risk]["releases"][revision_number] = info 

146 

147 # Order tracks (latest track first) 

148 result = OrderedDict( 

149 sorted( 

150 result.items(), key=lambda x: track_order.get(x[0], sys.maxsize) 

151 ) 

152 ) 

153 

154 # Order risks (stable, candidate, beta, edge) 

155 for track, track_data in result.items(): 

156 result[track] = OrderedDict( 

157 sorted( 

158 track_data.items(), 

159 key=lambda x: risk_order.get(x[0], sys.maxsize), 

160 ) 

161 ) 

162 

163 # Order releases by revision 

164 for risk, data in result[track].items(): 

165 result[track][risk]["releases"] = OrderedDict( 

166 sorted( 

167 result[track][risk]["releases"].items(), 

168 key=lambda release: release[1]["released_at"], 

169 reverse=True, 

170 ) 

171 ) 

172 

173 # Collect all the bases available across all releases 

174 

175 base_names = sorted( 

176 list( 

177 set( 

178 base 

179 for release in result[track][risk]["releases"].values() 

180 for base in release["bases"] 

181 ) 

182 ), 

183 reverse=True, 

184 ) 

185 

186 result[track][risk]["all_bases"] = [ 

187 { 

188 "name": base, 

189 "architectures": sorted( 

190 list( 

191 set( 

192 arch 

193 for release in result[track][risk][ 

194 "releases" 

195 ].values() 

196 if base in release["bases"] 

197 for arch in release["architectures"] 

198 ) 

199 ) 

200 ), 

201 } 

202 for base in base_names 

203 ] 

204 

205 result[track][risk]["latest"] = result[track][risk]["releases"][ 

206 max(result[track][risk]["releases"].keys()) 

207 ] 

208 return result 

209 

210 

211@trace_function 

212def process_revision(revision): 

213 bases = [] 

214 

215 for base in revision["bases"]: 

216 if base and base.get("architecture") == "all": 

217 for arch in ARCHITECTURES: 

218 bases.append({**base, "architecture": arch}) 

219 else: 

220 bases.append(base) 

221 return {**revision, "bases": bases} 

222 

223 

224@trace_function 

225def extract_resources(channel): 

226 """ 

227 Extract resources from channel map 

228 

229 :param channel_maps: The channel maps list returned by the API 

230 

231 :returns: Charm resource names 

232 """ 

233 resources = [] 

234 

235 channel_resources = channel["resources"] 

236 

237 for resource in channel_resources: 

238 resources.append( 

239 {"name": resource["name"], "revision": resource["revision"]} 

240 ) 

241 

242 return resources 

243 

244 

245@trace_function 

246def extract_default_release_architectures(channel): 

247 architectures = set() 

248 

249 for base in channel["revision"]["bases"]: 

250 if not base or base["architecture"] in architectures: 

251 continue 

252 

253 arch = base["architecture"] 

254 if arch == "all": 

255 architectures.update(ARCHITECTURES) 

256 else: 

257 architectures.add(arch) 

258 

259 return sorted(architectures) 

260 

261 

262@trace_function 

263def extract_all_arch(channel_map, parent_dict): 

264 all_archy = set() 

265 all_channel_bases = {} 

266 platforms = {} 

267 

268 for version_data in channel_map.values(): 

269 channel_map_all = list(version_data.items()) 

270 for _, channel_data in channel_map_all: 

271 for release in channel_data["releases"].values(): 

272 all_archy = all_archy.union(release["architectures"]) 

273 

274 for base in release["channel_bases"]: 

275 for series in base["channels"]: 

276 platform = PLATFORMS.get(base["name"], base["name"]) 

277 

278 if base["name"] not in platforms: 

279 platforms[base["name"]] = set() 

280 platforms[base["name"]].add(series) 

281 

282 all_channel_bases[base["name"] + series] = ( 

283 f"{platform} {series}" 

284 ) 

285 

286 parent_dict["all_architectures"] = sorted(all_archy) 

287 parent_dict["all_platforms"] = platforms 

288 parent_dict["all_channel_bases"] = dict( 

289 sorted(all_channel_bases.items(), reverse=True) 

290 ) 

291 

292 return 

293 

294 

295@trace_function 

296def extract_series(channel, long_name=False): 

297 """ 

298 Extract ubuntu series from channel map 

299 

300 :param channel_maps: The channel maps list returned by the API 

301 

302 :returns: Ubuntu series number 

303 """ 

304 series = set() 

305 

306 for base in channel["revision"]["bases"]: 

307 if not base or base["channel"] in series: 

308 continue 

309 platform = PLATFORMS.get(base["name"], base["name"]) 

310 series.add( 

311 f"{platform} {base['channel']}" if long_name else base["channel"] 

312 ) 

313 

314 return sorted(series, reverse=True) 

315 

316 

317@trace_function 

318def extract_bases(channel): 

319 bases = channel["revision"]["bases"] 

320 channel_bases = [] 

321 

322 for i in bases: 

323 if i is None: 

324 return [] 

325 

326 has_base = False 

327 

328 for b in channel_bases: 

329 if b["name"] == i["name"]: 

330 has_base = True 

331 

332 if not has_base: 

333 channel_bases.append( 

334 { 

335 "name": i["name"], 

336 "channels": set(), 

337 } 

338 ) 

339 

340 for i in channel_bases: 

341 for b in bases: 

342 if b["name"] == i["name"]: 

343 i["channels"].add(b["channel"]) 

344 

345 i["channels"] = sorted(i["channels"], reverse=True) 

346 

347 return channel_bases 

348 

349 

350@trace_function 

351def convert_date(date_to_convert): 

352 """ 

353 Convert date to human readable format: Month Day Year 

354 

355 If date is less than a day return: today or yesterday 

356 

357 Format of date to convert: 2019-01-12T16:48:41.821037+00:00 

358 Output: Jan 12 2019 

359 

360 :param date_to_convert: Date to convert 

361 :returns: Readable date 

362 """ 

363 date_parsed = parser.parse(date_to_convert).replace(tzinfo=None) 

364 delta = datetime.datetime.now() - datetime.timedelta(days=1) 

365 if delta < date_parsed: 

366 return humanize.naturalday(date_parsed).title() 

367 else: 

368 return date_parsed.strftime("%d %b %Y") 

369 

370 

371@trace_function 

372def get_icons(package): 

373 media = package["result"]["media"] 

374 return [m["url"] for m in media if m["type"] == "icon"] 

375 

376 

377@trace_function 

378def get_docs_topic_id(metadata_yaml): 

379 """ 

380 Return discourse topic ID or None 

381 """ 

382 base_url = discourse_api.base_url 

383 docs_link = metadata_yaml.get("docs") 

384 

385 if docs_link: 

386 if docs_link.startswith(base_url): 

387 docs_link_parts = docs_link[len(base_url) :].split("/") 

388 

389 if len(docs_link_parts) > 2: 

390 topic_id = docs_link_parts[-1] 

391 

392 if topic_id.isnumeric(): 

393 return topic_id 

394 

395 return None 

396 

397 

398@trace_function 

399def convert_categories(api_categories): 

400 """ 

401 The name property in the API response has a slug 

402 like format, e.g., big-data 

403 

404 This method will return the desired name and an 

405 extra slug property with the value from the API 

406 """ 

407 result = [] 

408 

409 for category in api_categories: 

410 category["slug"] = category["name"] 

411 category["name"] = format_slug(category["slug"]) 

412 result.append(category) 

413 

414 return result 

415 

416 

417@trace_function 

418def add_store_front_data(package, details=False): 

419 extra = {} 

420 

421 extra["icons"] = get_icons(package) 

422 

423 if package["result"]["deployable-on"]: 

424 extra["deployable-on"] = package["result"]["deployable-on"] 

425 else: 

426 extra["deployable-on"] = ["vm"] 

427 

428 extra["categories"] = convert_categories(package["result"]["categories"]) 

429 

430 if "title" in package["result"] and package["result"]["title"]: 

431 extra["display-name"] = package["result"]["title"] 

432 else: 

433 extra["display-name"] = format_slug(package["name"]) 

434 

435 if details: 

436 extra["metadata"] = yaml.load( 

437 package["default-release"]["revision"]["metadata-yaml"] 

438 ) 

439 extra["config"] = yaml.load( 

440 package["default-release"]["revision"]["config-yaml"] 

441 ) 

442 extra["actions"] = yaml.load( 

443 package["default-release"]["revision"]["actions-yaml"] 

444 ) 

445 

446 if package["type"] == "bundle": 

447 extra["bundle"] = yaml.load( 

448 package["default-release"]["revision"]["bundle-yaml"] 

449 ) 

450 

451 # Get bundle docs 

452 extra["docs_topic"] = get_docs_topic_id(extra["bundle"]) 

453 

454 # List charms 

455 extra["bundle"]["charms"] = get_bundle_charms( 

456 extra["bundle"].get( 

457 "applications", extra["bundle"].get("services") 

458 ) 

459 ) 

460 else: 

461 # Get charm docs 

462 extra["docs_topic"] = get_docs_topic_id(extra["metadata"]) 

463 

464 # Reshape channel maps 

465 extra["channel_map"] = convert_channel_maps(package["channel-map"]) 

466 extra["resources"] = extract_resources(package["default-release"]) 

467 

468 # Extract all supported series 

469 extra["architectures"] = extract_default_release_architectures( 

470 package["default-release"] 

471 ) 

472 # extract all architecture based on series 

473 extract_all_arch(extra["channel_map"], extra) 

474 extra["series"] = extract_series(package["default-release"]) 

475 extra["channel_bases"] = extract_bases(package["default-release"]) 

476 

477 # Some needed fields 

478 extra["publisher_name"] = package["result"]["publisher"][ 

479 "display-name" 

480 ] 

481 extra["username"] = package["result"]["publisher"]["username"] 

482 

483 if "summary" in package["result"]: 

484 extra["summary"] = package["result"]["summary"] 

485 

486 # Handle issues and website keys 

487 if "issues" in extra["metadata"]: 

488 if not isinstance(extra["metadata"]["issues"], list): 

489 extra["metadata"]["issues"] = [extra["metadata"]["issues"]] 

490 

491 if "website" in extra["metadata"]: 

492 if not isinstance(extra["metadata"]["website"], list): 

493 extra["metadata"]["website"] = [extra["metadata"]["website"]] 

494 

495 package["store_front"] = extra 

496 return package 

497 

498 

499@trace_function 

500def get_bundle_charms(charm_apps): 

501 result = [] 

502 

503 if charm_apps: 

504 for app_name, data in charm_apps.items(): 

505 # Charm names could be with the old prefix/suffix 

506 # Like: cs:~charmed-osm/mariadb-k8s-35 

507 name = data["charm"] 

508 if name.startswith("cs:") or name.startswith("ch:"): 

509 name = re.match(r"(?:cs:|ch:)(?:.+/)?(\S*?)(?:-\d+)?$", name)[ 

510 1 

511 ] 

512 

513 charm = {"title": format_slug(name), "name": name} 

514 

515 result.append(charm) 

516 

517 return result 

518 

519 

520@trace_function 

521def process_python_docs(library, module_name): 

522 """Process libraries response from the API 

523 to generate the HTML output""" 

524 

525 # Obtain Python docstrings 

526 docstrings = get_docstrings(library["content"], module_name) 

527 

528 bs_soup = get_soup(html(docstrings["docstring_text"])) 

529 docstrings["html"] = modify_headers(bs_soup, 3) 

530 

531 return docstrings 

532 

533 

534@trace_function 

535def process_libraries(libraries): 

536 """Process the libraries response from the API""" 

537 

538 result = [] 

539 

540 for lib in libraries["libraries"]: 

541 data = { 

542 "id": lib["library-id"], 

543 "name": lib["library-name"], 

544 "hash": lib["hash"], 

545 "created_at": lib["created-at"], 

546 } 

547 

548 result.append(data) 

549 

550 return result 

551 

552 

553@trace_function 

554def get_library(library_name, libraries): 

555 library = next( 

556 (lib for lib in libraries if lib.get("name") == library_name), 

557 None, 

558 ) 

559 

560 if not library: 

561 return None 

562 

563 return library["id"] 

564 

565 

566@trace_function 

567def filter_charm(charm, categories=["all"], base="all"): 

568 """ 

569 This filter will be done in the API soon. 

570 :returns: boolean 

571 """ 

572 # When all is present there is no need to filter 

573 if categories and "all" not in categories: 

574 charm_categories = [ 

575 cat["slug"] for cat in charm["store_front"]["categories"] 

576 ] 

577 

578 if not any(x in categories for x in charm_categories): 

579 return False 

580 

581 # Filter platforms 

582 if base != "all" and base not in charm["store_front"]["base"]: 

583 return False 

584 

585 return True 

586 

587 

588@trace_function 

589def format_slug(slug): 

590 """Format slug name into a standard title format 

591 :param slug: The hypen spaced, lowercase slug to be formatted 

592 :return: The formatted string 

593 """ 

594 

595 return ( 

596 slug.title() 

597 .replace("-", " ") 

598 .replace("_", " ") 

599 .replace("And", "and") 

600 .replace("Iot", "IoT") 

601 )