Coverage for webapp / store / logic.py: 73%
195 statements
« prev ^ index » next coverage.py v7.13.1, created at 2025-12-29 22:06 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2025-12-29 22:06 +0000
1import datetime
2import random
3import re
4from urllib.parse import parse_qs, urlparse
6import humanize
7from dateutil import parser
8from webapp import helpers
11def get_n_random_snaps(snaps, choice_number):
12 if len(snaps) > choice_number:
13 return random.sample(snaps, choice_number)
15 return snaps
18def get_snap_banner_url(snap_result):
19 """Get snaps banner url from media object
21 :param snap_result: the snap dictionnary
22 :returns: the snap dict with banner key
23 """
24 for media in snap_result["media"]:
25 if media["type"] == "banner":
26 snap_result["banner_url"] = media["url"]
27 break
29 return snap_result
32def get_pages_details(url, links):
33 """Transform returned navigation links from search API from limit/offset
34 to size/page
36 :param url: The url to build
37 :param links: The links returned by the API
39 :returns: A dictionnary with all the navigation links
40 """
41 links_result = {}
43 if "first" in links:
44 links_result["first"] = convert_navigation_url(
45 url, links["first"]["href"]
46 )
48 if "last" in links:
49 links_result["last"] = convert_navigation_url(
50 url, links["last"]["href"]
51 )
53 if "next" in links:
54 links_result["next"] = convert_navigation_url(
55 url, links["next"]["href"]
56 )
58 if "prev" in links:
59 links_result["prev"] = convert_navigation_url(
60 url, links["prev"]["href"]
61 )
63 if "self" in links:
64 links_result["self"] = convert_navigation_url(
65 url, links["self"]["href"]
66 )
68 return links_result
71def convert_navigation_url(url, link):
72 """Convert navigation link from offest/limit to size/page
74 Example:
75 - input: http://example.com?q=test&category=finance&size=10&page=3
76 - output: http://example2.com?q=test&category=finance&limit=10&offset=30
78 :param url: The new url
79 :param link: The navigation url returned by the API
81 :returns: The new navigation link
82 """
83 url_parsed = urlparse(link)
84 host_url = "{base_url}" "?q={q}&limit={limit}&offset={offset}"
86 url_queries = parse_qs(url_parsed.query)
88 if "q" in url_queries:
89 q = url_queries["q"][0]
90 else:
91 q = ""
93 if "section" in url_queries:
94 category = url_queries["section"][0]
95 else:
96 category = ""
98 size = int(url_queries["size"][0])
99 page = int(url_queries["page"][0])
101 url = host_url.format(
102 base_url=url, q=q, limit=size, offset=size * (page - 1)
103 )
105 if category != "":
106 url += "&category=" + category
108 return url
111def build_pagination_link(snap_searched, snap_category, page):
112 """Build pagination link
114 :param snap_searched: Name of the search query
115 :param snap_category: The category being searched in
116 :param page: The page of results
118 :returns: A url string
119 """
120 params = []
122 if snap_searched:
123 params.append("q=" + snap_searched)
125 if snap_category:
126 params.append("category=" + snap_category)
128 if page:
129 params.append("page=" + str(page))
131 return "/search?" + "&".join(params)
134def convert_channel_maps(channel_map):
135 """Converts channel maps list to format easier to manipulate
137 Example:
138 - Input:
139 [
140 {
141 'architecture': 'arch'
142 'map': [{'info': 'release', ...}, ...],
143 'track': 'track 1'
144 },
145 ...
146 ]
147 - Output:
148 {
149 'arch': {
150 'track 1': [{'info': 'release', ...}, ...],
151 ...
152 },
153 ...
154 }
156 :param channel_maps_list: The channel maps list returned by the API
158 :returns: The channel maps reshaped
159 """
160 channel_map_restruct = {}
162 for channel in channel_map:
163 arch = channel.get("channel").get("architecture")
164 track = channel.get("channel").get("track")
165 if arch not in channel_map_restruct:
166 channel_map_restruct[arch] = {}
167 if track not in channel_map_restruct[arch]:
168 channel_map_restruct[arch][track] = []
170 info = {
171 "released-at": convert_date(channel["channel"].get("released-at")),
172 "version": channel.get("version"),
173 "channel": channel["channel"].get("name"),
174 "risk": channel["channel"].get("risk"),
175 "confinement": channel.get("confinement"),
176 "size": channel["download"].get("size"),
177 "revision": channel["revision"],
178 }
180 channel_map_restruct[arch][track].append(info)
182 return channel_map_restruct
185def convert_date(date_to_convert):
186 """Convert date to human readable format: Month Day Year
188 If date is less than a day return: today or yesterday
190 Format of date to convert: 2019-01-12T16:48:41.821037+00:00
191 Output: Jan 12 2019
193 :param date_to_convert: Date to convert
194 :returns: Readable date
195 """
196 local_timezone = datetime.datetime.utcnow().tzinfo
197 date_parsed = parser.parse(date_to_convert).replace(tzinfo=local_timezone)
198 delta = datetime.datetime.utcnow() - datetime.timedelta(days=1)
200 if delta < date_parsed:
201 return humanize.naturalday(date_parsed).title()
202 else:
203 return date_parsed.strftime("%-d %B %Y")
206categories_list = [
207 "development",
208 "games",
209 "social",
210 "productivity",
211 "utilities",
212 "photo-and-video",
213 "server-and-cloud",
214 "security",
215 "devices-and-iot",
216 "music-and-audio",
217 "entertainment",
218 "art-and-design",
219]
221blacklist = ["featured"]
224def format_category_name(slug):
225 """Format category name into a standard title format
227 :param slug: The hypen spaced, lowercase slug to be formatted
228 :return: The formatted string
229 """
230 return (
231 slug.title()
232 .replace("-", " ")
233 .replace("And", "and")
234 .replace("Iot", "IoT")
235 )
238def get_categories(categories_json):
239 """Retrieve and flatten the nested array from the legacy API response.
241 :param categories_json: The returned json
242 :returns: A list of categories
243 """
245 categories = []
247 if "categories" in categories_json:
248 for cat in categories_json["categories"]:
249 if cat["name"] not in categories_list:
250 if cat["name"] not in blacklist:
251 categories_list.append(cat["name"])
253 for category in categories_list:
254 categories.append(
255 {"slug": category, "name": format_category_name(category)}
256 )
258 return categories
261def get_snap_categories(snap_categories):
262 """Retrieve list of categories with names for a snap.
264 :param snap_categories: List of snap categories from snap info API
265 :returns: A list of categories with names
266 """
267 categories = []
269 for cat in snap_categories:
270 if cat["name"] not in blacklist:
271 categories.append(
272 {
273 "slug": cat["name"],
274 "name": format_category_name(cat["name"]),
275 }
276 )
278 return categories
281def get_latest_versions(
282 channel_maps, default_track, lowest_risk, supported_architectures=None
283):
284 """Get the latest versions of both default/stable and the latest of
285 all other channels, unless it's default/stable
287 :param channel_map: Channel map list
289 :returns: A tuple of default/stable, track/risk channel map objects
290 """
291 ordered_versions = get_last_updated_versions(channel_maps)
293 default_stable = None
294 other = None
295 for channel in ordered_versions:
296 if (
297 not supported_architectures
298 or channel["architecture"] in supported_architectures
299 ):
300 if (
301 channel["track"] == default_track
302 and channel["risk"] == lowest_risk
303 ):
304 if not default_stable:
305 default_stable = channel
306 elif not other:
307 other = channel
309 if default_stable:
310 default_stable["released-at-display"] = convert_date(
311 default_stable["released-at"]
312 )
313 if other:
314 other["released-at-display"] = convert_date(other["released-at"])
315 return default_stable, other
318def get_revisions(channel_maps: list) -> list:
319 """Gets a sorted list of unique revisions
321 :param channel_map: Channel map list
323 :returns: A sorted list of unique revisions
324 """
325 revisions = {channel_map["revision"] for channel_map in channel_maps}
326 return list(reversed(sorted(revisions)))
329def get_last_updated_versions(channel_maps):
330 """Get all channels in order of updates
332 :param channel_map: Channel map list
334 :returns: A list of channels ordered by last updated time
335 """
336 releases = []
337 for channel_map in channel_maps:
338 releases.append(channel_map["channel"])
340 return list(reversed(sorted(releases, key=lambda c: c["released-at"])))
343def get_last_updated_version(channel_maps):
344 """Get the oldest channel that was created
346 :param channel_map: Channel map list
348 :returns: The latest stable version, if no stable, the latest risk updated
349 """
350 newest_channel = None
351 for channel_map in channel_maps:
352 if not newest_channel:
353 newest_channel = channel_map
354 else:
355 if channel_map["channel"]["risk"] == "stable":
356 newest_channel = channel_map
358 if newest_channel["channel"]["risk"] == "stable":
359 break
361 return newest_channel
364def has_stable(channel_maps_list):
365 """Use the channel map to find out if the snap has a stable release
367 :param channel_maps_list: Channel map list
369 :returns: True or False
370 """
371 if channel_maps_list:
372 for arch in channel_maps_list:
373 for track in channel_maps_list[arch]:
374 for release in channel_maps_list[arch][track]:
375 if release["risk"] == "stable":
376 return True
378 return False
381def get_lowest_available_risk(channel_map, track):
382 """Get the lowest available risk for the default track
384 :param channel_map: Channel map list
385 :param track: The track of the channel
387 :returns: The lowest available risk
388 """
389 risk_order = ["stable", "candidate", "beta", "edge"]
390 lowest_available_risk = None
391 for arch in channel_map:
392 if arch in channel_map and track in channel_map[arch]:
393 releases = channel_map[arch][track]
394 for release in releases:
395 if not lowest_available_risk:
396 lowest_available_risk = release["risk"]
397 else:
398 risk_index = risk_order.index(release["risk"])
399 lowest_index = risk_order.index(lowest_available_risk)
400 if risk_index < lowest_index:
401 lowest_available_risk = release["risk"]
403 return lowest_available_risk
406def extract_info_channel_map(channel_map, track, risk):
407 """Get the confinement and version for a channel
409 :param channel_map: Channel map list
410 :param track: The track of the channel
411 :param risk: The risk of the channel
413 :returns: Dict containing confinement and version
414 """
415 context = {
416 "confinement": None,
417 "version": None,
418 }
420 for arch in channel_map:
421 if track in channel_map[arch]:
422 releases = channel_map[arch][track]
423 for release in releases:
424 if release["risk"] == risk:
425 context["confinement"] = release.get("confinement")
426 context["version"] = release.get("version")
428 return context
430 return context
433def get_video_embed_code(url):
434 """Get the embed code for videos
436 :param url: The url of the video
438 :returns: Embed code
439 """
440 if "youtube" in url:
441 return {
442 "type": "youtube",
443 "url": url.replace("watch?v=", "embed/"),
444 "id": url.rsplit("?v=", 1)[-1],
445 }
446 if "youtu.be" in url:
447 return {
448 "type": "youtube",
449 "url": url.replace("youtu.be/", "youtube.com/embed/"),
450 "id": url.rsplit("/", 1)[-1],
451 }
452 if "vimeo" in url:
453 return {
454 "type": "vimeo",
455 "url": url.replace("vimeo.com/", "player.vimeo.com/video/"),
456 "id": url.rsplit("/", 1)[-1],
457 }
458 if "asciinema" in url:
459 return {
460 "type": "asciinema",
461 "url": url + ".js",
462 "id": url.rsplit("/", 1)[-1],
463 }
466def filter_screenshots(media):
467 banner_regex = r"/banner(\-icon)?(_.*)?\.(png|jpg)"
469 return [
470 m
471 for m in media
472 if m["type"] == "screenshot" and not re.search(banner_regex, m["url"])
473 ][:5]
476def get_video(media):
477 video = None
478 for m in media:
479 if m["type"] == "video":
480 video = get_video_embed_code(m["url"])
481 break
482 return video
485def promote_snap_with_icon(snaps):
486 """Move the first snap with an icon to the front of the list
488 :param snaps: The list of snaps
490 :returns: A list of snaps
491 """
492 try:
493 snap_with_icon = next(snap for snap in snaps if snap["icon_url"] != "")
495 if snap_with_icon:
496 snap_with_icon_index = snaps.index(snap_with_icon)
498 snaps.insert(0, snaps.pop(snap_with_icon_index))
499 except StopIteration:
500 pass
502 return snaps
505def get_snap_developer(snap_name):
506 """Is this a special snap published by Canonical?
507 Show some developer information
509 :param snap_name: The name of a snap
511 :returns: a list of [display_name, url]
513 """
514 filename = "store/content/developers/snaps.yaml"
515 snaps = helpers.get_yaml(filename, typ="rt")
517 if snaps and snap_name in snaps:
518 return snaps[snap_name]
520 return None