Coverage for webapp/store/logic.py: 59%
259 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-27 22:07 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-27 22:07 +0000
1import sys
2import datetime
3from collections import OrderedDict
4import re
5import humanize
6from dateutil import parser
7from mistune import html
8from canonicalwebteam.docstring_extractor import get_docstrings
9from webapp.helpers import get_soup, modify_headers
10from webapp.helpers import (
11 discourse_api,
12 get_yaml_loader,
13)
14from webapp.observability.utils import trace_function
16yaml = get_yaml_loader()
18PLATFORMS = {
19 "ubuntu": "Ubuntu",
20 "centos": "CentOS",
21}
23ARCHITECTURES = ["amd64", "arm64", "ppc64el", "riscv64", "s390x"]
26@trace_function
27def add_description_and_summary(package):
28 if package["type"] == "bundle":
29 description = (
30 package.get("store_front", {})
31 .get("bundle", {})
32 .get("description", None)
33 )
34 summary = (
35 package.get("store_front", {})
36 .get("bundle", {})
37 .get("summary", None)
38 )
39 else:
40 description = (
41 package.get("store_front", {})
42 .get("metadata", {})
43 .get("description", None)
44 )
45 summary = (
46 package.get("store_front", {})
47 .get("metadata", {})
48 .get("summary", None)
49 )
50 return description, summary
53@trace_function
54def get_banner_url(media):
55 """
56 Get banner url from media object
58 :param media: the media dictionnary
59 :returns: the banner url
60 """
61 for m in media:
62 if m["type"] == "banner":
63 return m["url"]
65 return None
68@trace_function
69def get_channel_map(channel_map):
70 """
71 Reformat channel map to return a channel map
72 with unique risk
74 :param channel_map: the channel map from the api
75 :returns: the channel map reformatted
76 """
77 new_map = []
78 for channel in channel_map:
79 for res in new_map:
80 if channel["channel"]["name"] == res["channel"]["name"]:
81 break
82 else:
83 new_map.append(channel)
85 return new_map
88@trace_function
89def convert_channel_maps(channel_map):
90 """
91 Converts channel maps list to format easier to manipulate
93 :param channel_maps: The channel maps list returned by the API
95 :returns: The channel maps reshaped
96 """
97 result = {}
98 track_order = {"latest": 1}
99 risk_order = {"stable": 1, "candidate": 2, "beta": 3, "edge": 4}
100 for channel in channel_map:
101 track = channel["channel"].get("track", "latest")
102 risk = channel["channel"]["risk"]
103 revision_number = channel["revision"]["revision"]
105 if track not in result:
106 result[track] = {}
108 if risk not in result[track]:
109 result[track][risk] = {"latest": None, "releases": {}}
111 # same revision but for a different arch
112 if revision_number in result[track][risk]["releases"]:
113 arch = channel["channel"]["base"]["architecture"]
115 if arch == "all":
116 result[track][risk]["releases"][revision_number][
117 "architectures"
118 ].update(ARCHITECTURES)
119 else:
120 result[track][risk]["releases"][revision_number][
121 "architectures"
122 ].add(arch)
123 continue
125 info = {
126 "released_at": channel["channel"]["released-at"],
127 "release_date": convert_date(channel["channel"]["released-at"]),
128 "version": channel["revision"]["version"],
129 "channel": channel["channel"]["name"],
130 "risk": channel["channel"]["risk"],
131 "size": channel["revision"]["download"]["size"],
132 "bases": extract_series(channel, True),
133 "channel_bases": extract_bases(channel),
134 "revision": process_revision(channel["revision"]),
135 "architectures": set(),
136 }
138 if channel["channel"]["base"]:
139 arch = channel["channel"]["base"]["architecture"]
140 if arch == "all":
141 info["architectures"].update(ARCHITECTURES)
142 else:
143 info["architectures"].add(arch)
145 result[track][risk]["releases"][revision_number] = info
147 # Order tracks (latest track first)
148 result = OrderedDict(
149 sorted(
150 result.items(), key=lambda x: track_order.get(x[0], sys.maxsize)
151 )
152 )
154 # Order risks (stable, candidate, beta, edge)
155 for track, track_data in result.items():
156 result[track] = OrderedDict(
157 sorted(
158 track_data.items(),
159 key=lambda x: risk_order.get(x[0], sys.maxsize),
160 )
161 )
163 # Order releases by revision
164 for risk, data in result[track].items():
165 result[track][risk]["releases"] = OrderedDict(
166 sorted(
167 result[track][risk]["releases"].items(),
168 key=lambda release: release[1]["released_at"],
169 reverse=True,
170 )
171 )
173 # Collect all the bases available across all releases
175 base_names = sorted(
176 list(
177 set(
178 base
179 for release in result[track][risk]["releases"].values()
180 for base in release["bases"]
181 )
182 ),
183 reverse=True,
184 )
186 result[track][risk]["all_bases"] = [
187 {
188 "name": base,
189 "architectures": sorted(
190 list(
191 set(
192 arch
193 for release in result[track][risk][
194 "releases"
195 ].values()
196 if base in release["bases"]
197 for arch in release["architectures"]
198 )
199 )
200 ),
201 }
202 for base in base_names
203 ]
205 result[track][risk]["latest"] = result[track][risk]["releases"][
206 max(result[track][risk]["releases"].keys())
207 ]
208 return result
211@trace_function
212def process_revision(revision):
213 bases = []
215 for base in revision["bases"]:
216 if base and base.get("architecture") == "all":
217 for arch in ARCHITECTURES:
218 bases.append({**base, "architecture": arch})
219 else:
220 bases.append(base)
221 return {**revision, "bases": bases}
224@trace_function
225def extract_resources(channel):
226 """
227 Extract resources from channel map
229 :param channel_maps: The channel maps list returned by the API
231 :returns: Charm resource names
232 """
233 resources = []
235 channel_resources = channel["resources"]
237 for resource in channel_resources:
238 resources.append(
239 {"name": resource["name"], "revision": resource["revision"]}
240 )
242 return resources
245@trace_function
246def extract_default_release_architectures(channel):
247 architectures = set()
249 for base in channel["revision"]["bases"]:
250 if not base or base["architecture"] in architectures:
251 continue
253 arch = base["architecture"]
254 if arch == "all":
255 architectures.update(ARCHITECTURES)
256 else:
257 architectures.add(arch)
259 return sorted(architectures)
262@trace_function
263def extract_all_arch(channel_map, parent_dict):
264 all_archy = set()
265 all_channel_bases = {}
266 platforms = {}
268 for version_data in channel_map.values():
269 channel_map_all = list(version_data.items())
270 for _, channel_data in channel_map_all:
271 for release in channel_data["releases"].values():
272 all_archy = all_archy.union(release["architectures"])
274 for base in release["channel_bases"]:
275 for series in base["channels"]:
276 platform = PLATFORMS.get(base["name"], base["name"])
278 if base["name"] not in platforms:
279 platforms[base["name"]] = set()
280 platforms[base["name"]].add(series)
282 all_channel_bases[base["name"] + series] = (
283 f"{platform} {series}"
284 )
286 parent_dict["all_architectures"] = sorted(all_archy)
287 parent_dict["all_platforms"] = platforms
288 parent_dict["all_channel_bases"] = dict(
289 sorted(all_channel_bases.items(), reverse=True)
290 )
292 return
295@trace_function
296def extract_series(channel, long_name=False):
297 """
298 Extract ubuntu series from channel map
300 :param channel_maps: The channel maps list returned by the API
302 :returns: Ubuntu series number
303 """
304 series = set()
306 for base in channel["revision"]["bases"]:
307 if not base or base["channel"] in series:
308 continue
309 platform = PLATFORMS.get(base["name"], base["name"])
310 series.add(
311 f"{platform} {base['channel']}" if long_name else base["channel"]
312 )
314 return sorted(series, reverse=True)
317@trace_function
318def extract_bases(channel):
319 bases = channel["revision"]["bases"]
320 channel_bases = []
322 for i in bases:
323 if i is None:
324 return []
326 has_base = False
328 for b in channel_bases:
329 if b["name"] == i["name"]:
330 has_base = True
332 if not has_base:
333 channel_bases.append(
334 {
335 "name": i["name"],
336 "channels": set(),
337 }
338 )
340 for i in channel_bases:
341 for b in bases:
342 if b["name"] == i["name"]:
343 i["channels"].add(b["channel"])
345 i["channels"] = sorted(i["channels"], reverse=True)
347 return channel_bases
350@trace_function
351def convert_date(date_to_convert):
352 """
353 Convert date to human readable format: Month Day Year
355 If date is less than a day return: today or yesterday
357 Format of date to convert: 2019-01-12T16:48:41.821037+00:00
358 Output: Jan 12 2019
360 :param date_to_convert: Date to convert
361 :returns: Readable date
362 """
363 date_parsed = parser.parse(date_to_convert).replace(tzinfo=None)
364 delta = datetime.datetime.now() - datetime.timedelta(days=1)
365 if delta < date_parsed:
366 return humanize.naturalday(date_parsed).title()
367 else:
368 return date_parsed.strftime("%d %b %Y")
371@trace_function
372def get_icons(package):
373 media = package["result"]["media"]
374 return [m["url"] for m in media if m["type"] == "icon"]
377@trace_function
378def get_docs_topic_id(metadata_yaml):
379 """
380 Return discourse topic ID or None
381 """
382 base_url = discourse_api.base_url
383 docs_link = metadata_yaml.get("docs")
385 if docs_link:
386 if docs_link.startswith(base_url):
387 docs_link_parts = docs_link[len(base_url) :].split("/")
389 if len(docs_link_parts) > 2:
390 topic_id = docs_link_parts[-1]
392 if topic_id.isnumeric():
393 return topic_id
395 return None
398@trace_function
399def convert_categories(api_categories):
400 """
401 The name property in the API response has a slug
402 like format, e.g., big-data
404 This method will return the desired name and an
405 extra slug property with the value from the API
406 """
407 result = []
409 for category in api_categories:
410 category["slug"] = category["name"]
411 category["name"] = format_slug(category["slug"])
412 result.append(category)
414 return result
417@trace_function
418def add_store_front_data(package, details=False):
419 extra = {}
421 extra["icons"] = get_icons(package)
423 if package["result"]["deployable-on"]:
424 extra["deployable-on"] = package["result"]["deployable-on"]
425 else:
426 extra["deployable-on"] = ["vm"]
428 extra["categories"] = convert_categories(package["result"]["categories"])
430 if "title" in package["result"] and package["result"]["title"]:
431 extra["display-name"] = package["result"]["title"]
432 else:
433 extra["display-name"] = format_slug(package["name"])
435 if details:
436 extra["metadata"] = yaml.load(
437 package["default-release"]["revision"]["metadata-yaml"]
438 )
439 extra["config"] = yaml.load(
440 package["default-release"]["revision"]["config-yaml"]
441 )
442 extra["actions"] = yaml.load(
443 package["default-release"]["revision"]["actions-yaml"]
444 )
446 if package["type"] == "bundle":
447 extra["bundle"] = yaml.load(
448 package["default-release"]["revision"]["bundle-yaml"]
449 )
451 # Get bundle docs
452 extra["docs_topic"] = get_docs_topic_id(extra["bundle"])
454 # List charms
455 extra["bundle"]["charms"] = get_bundle_charms(
456 extra["bundle"].get(
457 "applications", extra["bundle"].get("services")
458 )
459 )
460 else:
461 # Get charm docs
462 extra["docs_topic"] = get_docs_topic_id(extra["metadata"])
464 # Reshape channel maps
465 extra["channel_map"] = convert_channel_maps(package["channel-map"])
466 extra["resources"] = extract_resources(package["default-release"])
468 # Extract all supported series
469 extra["architectures"] = extract_default_release_architectures(
470 package["default-release"]
471 )
472 # extract all architecture based on series
473 extract_all_arch(extra["channel_map"], extra)
474 extra["series"] = extract_series(package["default-release"])
475 extra["channel_bases"] = extract_bases(package["default-release"])
477 # Some needed fields
478 extra["publisher_name"] = package["result"]["publisher"][
479 "display-name"
480 ]
481 extra["username"] = package["result"]["publisher"]["username"]
483 if "summary" in package["result"]:
484 extra["summary"] = package["result"]["summary"]
486 # Handle issues and website keys
487 if "issues" in extra["metadata"]:
488 if not isinstance(extra["metadata"]["issues"], list):
489 extra["metadata"]["issues"] = [extra["metadata"]["issues"]]
491 if "website" in extra["metadata"]:
492 if not isinstance(extra["metadata"]["website"], list):
493 extra["metadata"]["website"] = [extra["metadata"]["website"]]
495 package["store_front"] = extra
496 return package
499@trace_function
500def get_bundle_charms(charm_apps):
501 result = []
503 if charm_apps:
504 for app_name, data in charm_apps.items():
505 # Charm names could be with the old prefix/suffix
506 # Like: cs:~charmed-osm/mariadb-k8s-35
507 name = data["charm"]
508 if name.startswith("cs:") or name.startswith("ch:"):
509 name = re.match(r"(?:cs:|ch:)(?:.+/)?(\S*?)(?:-\d+)?$", name)[
510 1
511 ]
513 charm = {"title": format_slug(name), "name": name}
515 result.append(charm)
517 return result
520@trace_function
521def process_python_docs(library, module_name):
522 """Process libraries response from the API
523 to generate the HTML output"""
525 # Obtain Python docstrings
526 docstrings = get_docstrings(library["content"], module_name)
528 bs_soup = get_soup(html(docstrings["docstring_text"]))
529 docstrings["html"] = modify_headers(bs_soup, 3)
531 return docstrings
534@trace_function
535def process_libraries(libraries):
536 """Process the libraries response from the API"""
538 result = []
540 for lib in libraries["libraries"]:
541 data = {
542 "id": lib["library-id"],
543 "name": lib["library-name"],
544 "hash": lib["hash"],
545 "created_at": lib["created-at"],
546 }
548 result.append(data)
550 return result
553@trace_function
554def get_library(library_name, libraries):
555 library = next(
556 (lib for lib in libraries if lib.get("name") == library_name),
557 None,
558 )
560 if not library:
561 return None
563 return library["id"]
566@trace_function
567def filter_charm(charm, categories=["all"], base="all"):
568 """
569 This filter will be done in the API soon.
570 :returns: boolean
571 """
572 # When all is present there is no need to filter
573 if categories and "all" not in categories:
574 charm_categories = [
575 cat["slug"] for cat in charm["store_front"]["categories"]
576 ]
578 if not any(x in categories for x in charm_categories):
579 return False
581 # Filter platforms
582 if base != "all" and base not in charm["store_front"]["base"]:
583 return False
585 return True
588@trace_function
589def format_slug(slug):
590 """Format slug name into a standard title format
591 :param slug: The hypen spaced, lowercase slug to be formatted
592 :return: The formatted string
593 """
595 return (
596 slug.title()
597 .replace("-", " ")
598 .replace("_", " ")
599 .replace("And", "and")
600 .replace("Iot", "IoT")
601 )