Coverage for webapp/store/logic.py: 74%
209 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 22:43 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 22:43 +0000
1import datetime
2import random
3import re
4from urllib.parse import parse_qs, urlparse
6import humanize
7from dateutil import parser
8from dateutil.relativedelta import relativedelta
9from webapp import helpers
12def get_n_random_snaps(snaps, choice_number):
13 if len(snaps) > choice_number:
14 return random.sample(snaps, choice_number)
16 return snaps
19def get_snap_banner_url(snap_result):
20 """Get snaps banner url from media object
22 :param snap_result: the snap dictionnary
23 :returns: the snap dict with banner key
24 """
25 for media in snap_result["media"]:
26 if media["type"] == "banner":
27 snap_result["banner_url"] = media["url"]
28 break
30 return snap_result
33def get_pages_details(url, links):
34 """Transform returned navigation links from search API from limit/offset
35 to size/page
37 :param url: The url to build
38 :param links: The links returned by the API
40 :returns: A dictionnary with all the navigation links
41 """
42 links_result = {}
44 if "first" in links:
45 links_result["first"] = convert_navigation_url(
46 url, links["first"]["href"]
47 )
49 if "last" in links:
50 links_result["last"] = convert_navigation_url(
51 url, links["last"]["href"]
52 )
54 if "next" in links:
55 links_result["next"] = convert_navigation_url(
56 url, links["next"]["href"]
57 )
59 if "prev" in links:
60 links_result["prev"] = convert_navigation_url(
61 url, links["prev"]["href"]
62 )
64 if "self" in links:
65 links_result["self"] = convert_navigation_url(
66 url, links["self"]["href"]
67 )
69 return links_result
72def convert_navigation_url(url, link):
73 """Convert navigation link from offest/limit to size/page
75 Example:
76 - input: http://example.com?q=test&category=finance&size=10&page=3
77 - output: http://example2.com?q=test&category=finance&limit=10&offset=30
79 :param url: The new url
80 :param link: The navigation url returned by the API
82 :returns: The new navigation link
83 """
84 url_parsed = urlparse(link)
85 host_url = "{base_url}" "?q={q}&limit={limit}&offset={offset}"
87 url_queries = parse_qs(url_parsed.query)
89 if "q" in url_queries:
90 q = url_queries["q"][0]
91 else:
92 q = ""
94 if "section" in url_queries:
95 category = url_queries["section"][0]
96 else:
97 category = ""
99 size = int(url_queries["size"][0])
100 page = int(url_queries["page"][0])
102 url = host_url.format(
103 base_url=url, q=q, limit=size, offset=size * (page - 1)
104 )
106 if category != "":
107 url += "&category=" + category
109 return url
112def build_pagination_link(snap_searched, snap_category, page):
113 """Build pagination link
115 :param snap_searched: Name of the search query
116 :param snap_category: The category being searched in
117 :param page: The page of results
119 :returns: A url string
120 """
121 params = []
123 if snap_searched:
124 params.append("q=" + snap_searched)
126 if snap_category:
127 params.append("category=" + snap_category)
129 if page:
130 params.append("page=" + str(page))
132 return "/search?" + "&".join(params)
135def convert_channel_maps(channel_map):
136 """Converts channel maps list to format easier to manipulate
138 Example:
139 - Input:
140 [
141 {
142 'architecture': 'arch'
143 'map': [{'info': 'release', ...}, ...],
144 'track': 'track 1'
145 },
146 ...
147 ]
148 - Output:
149 {
150 'arch': {
151 'track 1': [{'info': 'release', ...}, ...],
152 ...
153 },
154 ...
155 }
157 :param channel_maps_list: The channel maps list returned by the API
159 :returns: The channel maps reshaped
160 """
161 channel_map_restruct = {}
163 for channel in channel_map:
164 arch = channel.get("channel").get("architecture")
165 track = channel.get("channel").get("track")
166 if arch not in channel_map_restruct:
167 channel_map_restruct[arch] = {}
168 if track not in channel_map_restruct[arch]:
169 channel_map_restruct[arch][track] = []
171 info = {
172 "released-at": convert_date(channel["channel"].get("released-at")),
173 "version": channel.get("version"),
174 "channel": channel["channel"].get("name"),
175 "risk": channel["channel"].get("risk"),
176 "confinement": channel.get("confinement"),
177 "size": channel["download"].get("size"),
178 "revision": channel["revision"],
179 }
181 channel_map_restruct[arch][track].append(info)
183 return channel_map_restruct
186def convert_date(date_to_convert):
187 """Convert date to human readable format: Month Day Year
189 If date is less than a day return: today or yesterday
191 Format of date to convert: 2019-01-12T16:48:41.821037+00:00
192 Output: Jan 12 2019
194 :param date_to_convert: Date to convert
195 :returns: Readable date
196 """
197 local_timezone = datetime.datetime.utcnow().tzinfo
198 date_parsed = parser.parse(date_to_convert).replace(tzinfo=local_timezone)
199 delta = datetime.datetime.utcnow() - datetime.timedelta(days=1)
201 if delta < date_parsed:
202 return humanize.naturalday(date_parsed).title()
203 else:
204 return date_parsed.strftime("%-d %B %Y")
207def is_snap_old(last_updated_date, old_threshold_years=2.0):
208 """Check if a snap is considered 'old' based on its last update date
210 A snap is considered old if it hasn't been updated in the specified
211 number of years (default: 2 years).
213 :param last_updated_date: The last updated date string in ISO format
214 :param old_threshold_years: Number of years to consider a snap old
215 (default: 2)
216 :returns: True if snap is old, False otherwise
217 """
218 if not last_updated_date:
219 return False
221 try:
222 date_parsed = parser.parse(last_updated_date)
223 if date_parsed.tzinfo is None:
224 date_parsed = date_parsed.replace(tzinfo=datetime.timezone.utc)
226 now = datetime.datetime.now(datetime.timezone.utc)
228 delta = relativedelta(now, date_parsed)
229 years_since_update = delta.years
231 return years_since_update >= old_threshold_years
232 except (ValueError, TypeError):
233 # If we can't parse the date, assume it's not old
234 return False
237categories_list = [
238 "development",
239 "games",
240 "social",
241 "productivity",
242 "utilities",
243 "photo-and-video",
244 "server-and-cloud",
245 "security",
246 "devices-and-iot",
247 "music-and-audio",
248 "entertainment",
249 "art-and-design",
250]
252blacklist = ["featured"]
255def format_category_name(slug):
256 """Format category name into a standard title format
258 :param slug: The hypen spaced, lowercase slug to be formatted
259 :return: The formatted string
260 """
261 return (
262 slug.title()
263 .replace("-", " ")
264 .replace("And", "and")
265 .replace("Iot", "IoT")
266 )
269def get_categories(categories_json):
270 """Retrieve and flatten the nested array from the legacy API response.
272 :param categories_json: The returned json
273 :returns: A list of categories
274 """
276 categories = []
278 if "categories" in categories_json:
279 for cat in categories_json["categories"]:
280 if cat["name"] not in categories_list:
281 if cat["name"] not in blacklist:
282 categories_list.append(cat["name"])
284 for category in categories_list:
285 categories.append(
286 {"slug": category, "name": format_category_name(category)}
287 )
289 return categories
292def get_snap_categories(snap_categories):
293 """Retrieve list of categories with names for a snap.
295 :param snap_categories: List of snap categories from snap info API
296 :returns: A list of categories with names
297 """
298 categories = []
300 for cat in snap_categories:
301 if cat["name"] not in blacklist:
302 categories.append(
303 {
304 "slug": cat["name"],
305 "name": format_category_name(cat["name"]),
306 }
307 )
309 return categories
312def get_latest_versions(
313 channel_maps, default_track, lowest_risk, supported_architectures=None
314):
315 """Get the latest versions of both default/stable and the latest of
316 all other channels, unless it's default/stable
318 :param channel_map: Channel map list
320 :returns: A tuple of default/stable, track/risk channel map objects
321 """
322 ordered_versions = get_last_updated_versions(channel_maps)
324 default_stable = None
325 other = None
326 for channel in ordered_versions:
327 if (
328 not supported_architectures
329 or channel["architecture"] in supported_architectures
330 ):
331 if (
332 channel["track"] == default_track
333 and channel["risk"] == lowest_risk
334 ):
335 if not default_stable:
336 default_stable = channel
337 elif not other:
338 other = channel
340 if default_stable:
341 default_stable["released-at-display"] = convert_date(
342 default_stable["released-at"]
343 )
344 if other:
345 other["released-at-display"] = convert_date(other["released-at"])
346 return default_stable, other
349def get_revisions(channel_maps: list) -> list:
350 """Gets a sorted list of unique revisions
352 :param channel_map: Channel map list
354 :returns: A sorted list of unique revisions
355 """
356 revisions = {channel_map["revision"] for channel_map in channel_maps}
357 return list(reversed(sorted(revisions)))
360def get_last_updated_versions(channel_maps):
361 """Get all channels in order of updates
363 :param channel_map: Channel map list
365 :returns: A list of channels ordered by last updated time
366 """
367 releases = []
368 for channel_map in channel_maps:
369 releases.append(channel_map["channel"])
371 return list(reversed(sorted(releases, key=lambda c: c["released-at"])))
374def get_last_updated_version(channel_maps):
375 """Get the oldest channel that was created
377 :param channel_map: Channel map list
379 :returns: The latest stable version, if no stable, the latest risk updated
380 """
381 newest_channel = None
382 for channel_map in channel_maps:
383 if not newest_channel:
384 newest_channel = channel_map
385 else:
386 if channel_map["channel"]["risk"] == "stable":
387 newest_channel = channel_map
389 if newest_channel["channel"]["risk"] == "stable":
390 break
392 return newest_channel
395def has_stable(channel_maps_list):
396 """Use the channel map to find out if the snap has a stable release
398 :param channel_maps_list: Channel map list
400 :returns: True or False
401 """
402 if channel_maps_list:
403 for arch in channel_maps_list:
404 for track in channel_maps_list[arch]:
405 for release in channel_maps_list[arch][track]:
406 if release["risk"] == "stable":
407 return True
409 return False
412def get_lowest_available_risk(channel_map, track):
413 """Get the lowest available risk for the default track
415 :param channel_map: Channel map list
416 :param track: The track of the channel
418 :returns: The lowest available risk
419 """
420 risk_order = ["stable", "candidate", "beta", "edge"]
421 lowest_available_risk = None
422 for arch in channel_map:
423 if arch in channel_map and track in channel_map[arch]:
424 releases = channel_map[arch][track]
425 for release in releases:
426 if not lowest_available_risk:
427 lowest_available_risk = release["risk"]
428 else:
429 risk_index = risk_order.index(release["risk"])
430 lowest_index = risk_order.index(lowest_available_risk)
431 if risk_index < lowest_index:
432 lowest_available_risk = release["risk"]
434 return lowest_available_risk
437def extract_info_channel_map(channel_map, track, risk):
438 """Get the confinement and version for a channel
440 :param channel_map: Channel map list
441 :param track: The track of the channel
442 :param risk: The risk of the channel
444 :returns: Dict containing confinement and version
445 """
446 context = {
447 "confinement": None,
448 "version": None,
449 }
451 for arch in channel_map:
452 if track in channel_map[arch]:
453 releases = channel_map[arch][track]
454 for release in releases:
455 if release["risk"] == risk:
456 context["confinement"] = release.get("confinement")
457 context["version"] = release.get("version")
459 return context
461 return context
464def get_video_embed_code(url):
465 """Get the embed code for videos
467 :param url: The url of the video
469 :returns: Embed code
470 """
471 if "youtube" in url:
472 return {
473 "type": "youtube",
474 "url": url.replace("watch?v=", "embed/"),
475 "id": url.rsplit("?v=", 1)[-1],
476 }
477 if "youtu.be" in url:
478 return {
479 "type": "youtube",
480 "url": url.replace("youtu.be/", "youtube.com/embed/"),
481 "id": url.rsplit("/", 1)[-1],
482 }
483 if "vimeo" in url:
484 return {
485 "type": "vimeo",
486 "url": url.replace("vimeo.com/", "player.vimeo.com/video/"),
487 "id": url.rsplit("/", 1)[-1],
488 }
489 if "asciinema" in url:
490 return {
491 "type": "asciinema",
492 "url": url + ".js",
493 "id": url.rsplit("/", 1)[-1],
494 }
497def filter_screenshots(media):
498 banner_regex = r"/banner(\-icon)?(_.*)?\.(png|jpg)"
500 return [
501 m
502 for m in media
503 if m["type"] == "screenshot" and not re.search(banner_regex, m["url"])
504 ][:5]
507def get_video(media):
508 video = None
509 for m in media:
510 if m["type"] == "video":
511 video = get_video_embed_code(m["url"])
512 break
513 return video
516def promote_snap_with_icon(snaps):
517 """Move the first snap with an icon to the front of the list
519 :param snaps: The list of snaps
521 :returns: A list of snaps
522 """
523 try:
524 snap_with_icon = next(snap for snap in snaps if snap["icon_url"] != "")
526 if snap_with_icon:
527 snap_with_icon_index = snaps.index(snap_with_icon)
529 snaps.insert(0, snaps.pop(snap_with_icon_index))
530 except StopIteration:
531 pass
533 return snaps
536def get_snap_developer(snap_name):
537 """Is this a special snap published by Canonical?
538 Show some developer information
540 :param snap_name: The name of a snap
542 :returns: a list of [display_name, url]
544 """
545 filename = "store/content/developers/snaps.yaml"
546 snaps = helpers.get_yaml(filename, typ="rt")
548 if snaps and snap_name in snaps:
549 return snaps[snap_name]
551 return None