Coverage for webapp/store/logic.py: 81%
274 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-08 22:07 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-08 22:07 +0000
1import sys
2import datetime
3import json
4from collections import OrderedDict
5import re
6import humanize
7from dateutil import parser
8from mistune import html
9from canonicalwebteam.docstring_extractor import get_docstrings
10from webapp.helpers import (
11 discourse_api,
12 get_yaml_loader,
13 markdown_to_html,
14 get_soup,
15 modify_headers,
16)
17from webapp.observability.utils import trace_function
19yaml = get_yaml_loader()
21PLATFORMS = {
22 "ubuntu": "Ubuntu",
23 "centos": "CentOS",
24}
26ARCHITECTURES = ["amd64", "arm64", "ppc64el", "riscv64", "s390x"]
29@trace_function
30def get_summary(package):
31 if package["type"] == "bundle":
32 summary = (
33 package.get("store_front", {})
34 .get("bundle", {})
35 .get("summary", None)
36 )
37 else:
38 summary = (
39 package.get("store_front", {})
40 .get("metadata", {})
41 .get("summary", None)
42 )
43 return summary
46@trace_function
47def get_description(package, parse_to_html=False):
48 if package["type"] == "bundle":
49 description = (
50 package.get("store_front", {})
51 .get("bundle", {})
52 .get("description", None)
53 )
54 else:
55 description = (
56 package.get("store_front", {})
57 .get("metadata", {})
58 .get("description", None)
59 )
60 return markdown_to_html(description) if parse_to_html else description
63@trace_function
64def get_banner_url(media):
65 """
66 Get banner url from media object
68 :param media: the media dictionnary
69 :returns: the banner url
70 """
71 for m in media:
72 if m["type"] == "banner":
73 return m["url"]
75 return None
78@trace_function
79def get_channel_map(channel_map):
80 """
81 Reformat channel map to return a channel map
82 with unique risk
84 :param channel_map: the channel map from the api
85 :returns: the channel map reformatted
86 """
87 new_map = []
88 for channel in channel_map:
89 for res in new_map:
90 if channel["channel"]["name"] == res["channel"]["name"]:
91 break
92 else:
93 new_map.append(channel)
95 return new_map
98@trace_function
99def convert_channel_maps(channel_map):
100 """
101 Converts channel maps list to format easier to manipulate
103 :param channel_maps: The channel maps list returned by the API
105 :returns: The channel maps reshaped
106 """
107 result = {}
108 track_order = {"latest": 1}
109 risk_order = {"stable": 1, "candidate": 2, "beta": 3, "edge": 4}
110 for channel in channel_map:
111 track = channel["channel"].get("track", "latest")
112 risk = channel["channel"]["risk"]
113 revision_number = channel["revision"]["revision"]
115 if track not in result:
116 result[track] = {}
118 if risk not in result[track]:
119 result[track][risk] = {"latest": None, "releases": {}}
121 # same revision but for a different arch
122 if revision_number in result[track][risk]["releases"]:
123 arch = channel["channel"]["base"]["architecture"]
125 if arch == "all":
126 result[track][risk]["releases"][revision_number][
127 "architectures"
128 ].update(ARCHITECTURES)
129 else:
130 result[track][risk]["releases"][revision_number][
131 "architectures"
132 ].add(arch)
133 continue
135 info = {
136 "released_at": channel["channel"]["released-at"],
137 "release_date": convert_date(channel["channel"]["released-at"]),
138 "version": channel["revision"]["version"],
139 "channel": channel["channel"]["name"],
140 "risk": channel["channel"]["risk"],
141 "size": channel["revision"]["download"]["size"],
142 "bases": extract_series(channel, True),
143 "channel_bases": extract_bases(channel),
144 "revision": process_revision(channel["revision"]),
145 "architectures": set(),
146 }
148 if channel["channel"]["base"]:
149 arch = channel["channel"]["base"]["architecture"]
150 if arch == "all":
151 info["architectures"].update(ARCHITECTURES)
152 else:
153 info["architectures"].add(arch)
155 result[track][risk]["releases"][revision_number] = info
157 # Order tracks (latest track first)
158 result = OrderedDict(
159 sorted(
160 result.items(), key=lambda x: track_order.get(x[0], sys.maxsize)
161 )
162 )
164 # Order risks (stable, candidate, beta, edge)
165 for track, track_data in result.items():
166 result[track] = OrderedDict(
167 sorted(
168 track_data.items(),
169 key=lambda x: risk_order.get(x[0], sys.maxsize),
170 )
171 )
173 # Order releases by revision
174 for risk, data in result[track].items():
175 result[track][risk]["releases"] = OrderedDict(
176 sorted(
177 result[track][risk]["releases"].items(),
178 key=lambda release: release[1]["released_at"],
179 reverse=True,
180 )
181 )
183 # Collect all the bases available across all releases
185 base_names = sorted(
186 list(
187 set(
188 base
189 for release in result[track][risk]["releases"].values()
190 for base in release["bases"]
191 )
192 ),
193 reverse=True,
194 )
196 result[track][risk]["all_bases"] = [
197 {
198 "name": base,
199 "architectures": sorted(
200 list(
201 set(
202 arch
203 for release in result[track][risk][
204 "releases"
205 ].values()
206 if base in release["bases"]
207 for arch in release["architectures"]
208 )
209 )
210 ),
211 }
212 for base in base_names
213 ]
215 result[track][risk]["latest"] = result[track][risk]["releases"][
216 max(result[track][risk]["releases"].keys())
217 ]
218 return result
221@trace_function
222def process_revision(revision):
223 bases = []
225 for base in revision["bases"]:
226 if base and base.get("architecture") == "all":
227 for arch in ARCHITECTURES:
228 bases.append({**base, "architecture": arch})
229 else:
230 bases.append(base)
231 return {**revision, "bases": bases}
234@trace_function
235def extract_resources(channel):
236 """
237 Extract resources from channel map
239 :param channel_maps: The channel maps list returned by the API
241 :returns: Charm resource names
242 """
243 resources = []
245 channel_resources = channel["resources"]
247 for resource in channel_resources:
248 resources.append(
249 {"name": resource["name"], "revision": resource["revision"]}
250 )
252 return resources
255@trace_function
256def extract_default_release_architectures(channel):
257 architectures = set()
259 for base in channel["revision"]["bases"]:
260 if not base or base["architecture"] in architectures:
261 continue
263 arch = base["architecture"]
264 if arch == "all":
265 architectures.update(ARCHITECTURES)
266 else:
267 architectures.add(arch)
269 return sorted(architectures)
272@trace_function
273def extract_all_arch(channel_map, parent_dict):
274 all_archy = set()
275 all_channel_bases = {}
276 platforms = {}
278 for version_data in channel_map.values():
279 channel_map_all = list(version_data.items())
280 for _, channel_data in channel_map_all:
281 for release in channel_data["releases"].values():
282 all_archy = all_archy.union(release["architectures"])
284 for base in release["channel_bases"]:
285 for series in base["channels"]:
286 platform = PLATFORMS.get(base["name"], base["name"])
288 if base["name"] not in platforms:
289 platforms[base["name"]] = set()
290 platforms[base["name"]].add(series)
292 all_channel_bases[base["name"] + series] = (
293 f"{platform} {series}"
294 )
296 parent_dict["all_architectures"] = sorted(all_archy)
297 parent_dict["all_platforms"] = platforms
298 parent_dict["all_channel_bases"] = dict(
299 sorted(all_channel_bases.items(), reverse=True)
300 )
302 return
305@trace_function
306def extract_series(channel, long_name=False):
307 """
308 Extract ubuntu series from channel map
310 :param channel_maps: The channel maps list returned by the API
312 :returns: Ubuntu series number
313 """
314 series = set()
316 for base in channel["revision"]["bases"]:
317 if not base or base["channel"] in series:
318 continue
319 platform = PLATFORMS.get(base["name"], base["name"])
320 series.add(
321 f"{platform} {base['channel']}" if long_name else base["channel"]
322 )
324 return sorted(series, reverse=True)
327@trace_function
328def extract_bases(channel):
329 bases = channel["revision"]["bases"]
330 channel_bases = []
332 for i in bases:
333 if i is None:
334 return []
336 has_base = False
338 for b in channel_bases:
339 if b["name"] == i["name"]:
340 has_base = True
342 if not has_base:
343 channel_bases.append(
344 {
345 "name": i["name"],
346 "channels": set(),
347 }
348 )
350 for i in channel_bases:
351 for b in bases:
352 if b["name"] == i["name"]:
353 i["channels"].add(b["channel"])
355 i["channels"] = sorted(i["channels"], reverse=True)
357 return channel_bases
360@trace_function
361def convert_date(date_to_convert):
362 """
363 Convert date to human readable format: Month Day Year
365 If date is less than a day return: today or yesterday
367 Format of date to convert: 2019-01-12T16:48:41.821037+00:00
368 Output: Jan 12 2019
370 :param date_to_convert: Date to convert
371 :returns: Readable date
372 """
373 date_parsed = parser.parse(date_to_convert).replace(tzinfo=None)
374 delta = datetime.datetime.now() - datetime.timedelta(days=1)
375 if delta < date_parsed:
376 return humanize.naturalday(date_parsed).title()
377 else:
378 return date_parsed.strftime("%d %b %Y")
381@trace_function
382def get_icons(package):
383 media = package["result"]["media"]
384 return [m["url"] for m in media if m["type"] == "icon"]
387@trace_function
388def get_docs_topic_id(metadata_yaml):
389 """
390 Return discourse topic ID or None
391 """
392 base_url = discourse_api.base_url
393 docs_link = metadata_yaml.get("docs")
395 if docs_link:
396 if docs_link.startswith(base_url):
397 docs_link_parts = docs_link[len(base_url) :].split("/")
399 if len(docs_link_parts) > 2:
400 topic_id = docs_link_parts[-1]
402 if topic_id.isnumeric():
403 return topic_id
405 return None
408@trace_function
409def convert_categories(api_categories):
410 """
411 The name property in the API response has a slug
412 like format, e.g., big-data
414 This method will return the desired name and an
415 extra slug property with the value from the API
416 """
417 result = []
419 for category in api_categories:
420 category["slug"] = category["name"]
421 category["name"] = format_slug(category["slug"])
422 result.append(category)
424 return result
427@trace_function
428def add_store_front_data(package, details=False):
429 extra = {}
431 extra["icons"] = get_icons(package)
433 if package["result"]["deployable-on"]:
434 extra["deployable-on"] = package["result"]["deployable-on"]
435 else:
436 extra["deployable-on"] = ["vm"]
438 extra["categories"] = convert_categories(package["result"]["categories"])
440 if "title" in package["result"] and package["result"]["title"]:
441 extra["display-name"] = package["result"]["title"]
442 else:
443 extra["display-name"] = format_slug(package["name"])
445 if details:
446 extra["metadata"] = yaml.load(
447 package["default-release"]["revision"]["metadata-yaml"]
448 )
449 extra["config"] = yaml.load(
450 package["default-release"]["revision"]["config-yaml"]
451 )
452 extra["actions"] = yaml.load(
453 package["default-release"]["revision"]["actions-yaml"]
454 )
456 if package["type"] == "bundle":
457 extra["bundle"] = yaml.load(
458 package["default-release"]["revision"]["bundle-yaml"]
459 )
461 # Get bundle docs
462 extra["docs_topic"] = get_docs_topic_id(extra["bundle"])
464 # List charms
465 extra["bundle"]["charms"] = get_bundle_charms(
466 extra["bundle"].get(
467 "applications", extra["bundle"].get("services")
468 )
469 )
470 else:
471 # Get charm docs
472 extra["docs_topic"] = get_docs_topic_id(extra["metadata"])
474 # Reshape channel maps
475 extra["channel_map"] = convert_channel_maps(package["channel-map"])
476 extra["resources"] = extract_resources(package["default-release"])
478 # Extract all supported series
479 extra["architectures"] = extract_default_release_architectures(
480 package["default-release"]
481 )
482 # extract all architecture based on series
483 extract_all_arch(extra["channel_map"], extra)
484 extra["series"] = extract_series(package["default-release"])
485 extra["channel_bases"] = extract_bases(package["default-release"])
487 # Some needed fields
488 extra["publisher_name"] = package["result"]["publisher"][
489 "display-name"
490 ]
491 extra["username"] = package["result"]["publisher"]["username"]
493 if "summary" in package["result"]:
494 extra["summary"] = package["result"]["summary"]
496 # Handle issues and website keys
497 if "issues" in extra["metadata"]:
498 if not isinstance(extra["metadata"]["issues"], list):
499 extra["metadata"]["issues"] = [extra["metadata"]["issues"]]
501 if "website" in extra["metadata"]:
502 if not isinstance(extra["metadata"]["website"], list):
503 extra["metadata"]["website"] = [extra["metadata"]["website"]]
505 package["store_front"] = extra
506 return package
509@trace_function
510def get_bundle_charms(charm_apps):
511 result = []
513 if charm_apps:
514 for app_name, data in charm_apps.items():
515 # Charm names could be with the old prefix/suffix
516 # Like: cs:~charmed-osm/mariadb-k8s-35
517 name = data["charm"]
518 if name.startswith("cs:") or name.startswith("ch:"):
519 name = re.match(r"(?:cs:|ch:)(?:.+/)?(\S*?)(?:-\d+)?$", name)[
520 1
521 ]
523 charm = {"title": format_slug(name), "name": name}
525 result.append(charm)
527 return result
530@trace_function
531def process_python_docs(library, module_name):
532 """Process libraries response from the API
533 to generate the HTML output"""
535 # Obtain Python docstrings
536 docstrings = get_docstrings(library["content"], module_name)
538 bs_soup = get_soup(html(docstrings["docstring_text"]))
539 docstrings["html"] = modify_headers(bs_soup, 3)
541 return docstrings
544@trace_function
545def process_libraries(libraries):
546 """Process the libraries response from the API"""
548 result = []
550 for lib in libraries["libraries"]:
551 data = {
552 "id": lib["library-id"],
553 "name": lib["library-name"],
554 "hash": lib["hash"],
555 "created_at": lib["created-at"],
556 }
558 result.append(data)
560 return result
563@trace_function
564def get_library(library_name, libraries):
565 library = next(
566 (lib for lib in libraries if lib.get("name") == library_name),
567 None,
568 )
570 if not library:
571 return None
573 return library["id"]
576@trace_function
577def filter_charm(charm, categories=["all"], base="all"):
578 """
579 This filter will be done in the API soon.
580 :returns: boolean
581 """
582 # When all is present there is no need to filter
583 if categories and "all" not in categories:
584 charm_categories = [
585 cat["slug"] for cat in charm["store_front"]["categories"]
586 ]
588 if not any(x in categories for x in charm_categories):
589 return False
591 # Filter platforms
592 if base != "all" and base not in charm["store_front"]["base"]:
593 return False
595 return True
598@trace_function
599def format_slug(slug):
600 """Format slug name into a standard title format
601 :param slug: The hypen spaced, lowercase slug to be formatted
602 :return: The formatted string
603 """
605 return (
606 slug.title()
607 .replace("-", " ")
608 .replace("_", " ")
609 .replace("And", "and")
610 .replace("Iot", "IoT")
611 )
614with open("webapp/store/overlay.json") as overlay_file:
615 overlay = json.load(overlay_file)
618@trace_function
619def add_overlay_data(package):
620 """
621 Adds custom hard-coded overlay.json data to the package object
622 :param package: The package object retrieved from the snapcraft API
623 :return: The package object with an additional "overlay" key
624 containing extra info
625 """
627 if overlay.get(package["name"]) is not None:
628 package["overlay_data"] = overlay[package["name"]].copy()
630 return package
633@trace_function
634def get_doc_link(package):
635 """
636 Returns the documentation link of a package
637 """
638 docs = package.get("result", {}).get("links", {}).get("docs", [])
639 return docs[0] if docs else None