Coverage for webapp/store/logic.py: 72%
192 statements
« prev ^ index » next coverage.py v7.10.5, created at 2025-08-26 22:06 +0000
« prev ^ index » next coverage.py v7.10.5, created at 2025-08-26 22:06 +0000
1import datetime
2import random
3import re
4from urllib.parse import parse_qs, urlparse
6import humanize
7from dateutil import parser
8from webapp import helpers
11def get_n_random_snaps(snaps, choice_number):
12 if len(snaps) > choice_number:
13 return random.sample(snaps, choice_number)
15 return snaps
18def get_snap_banner_url(snap_result):
19 """Get snaps banner url from media object
21 :param snap_result: the snap dictionnary
22 :returns: the snap dict with banner key
23 """
24 for media in snap_result["media"]:
25 if media["type"] == "banner":
26 snap_result["banner_url"] = media["url"]
27 break
29 return snap_result
32def get_pages_details(url, links):
33 """Transform returned navigation links from search API from limit/offset
34 to size/page
36 :param url: The url to build
37 :param links: The links returned by the API
39 :returns: A dictionnary with all the navigation links
40 """
41 links_result = {}
43 if "first" in links:
44 links_result["first"] = convert_navigation_url(
45 url, links["first"]["href"]
46 )
48 if "last" in links:
49 links_result["last"] = convert_navigation_url(
50 url, links["last"]["href"]
51 )
53 if "next" in links:
54 links_result["next"] = convert_navigation_url(
55 url, links["next"]["href"]
56 )
58 if "prev" in links:
59 links_result["prev"] = convert_navigation_url(
60 url, links["prev"]["href"]
61 )
63 if "self" in links:
64 links_result["self"] = convert_navigation_url(
65 url, links["self"]["href"]
66 )
68 return links_result
71def convert_navigation_url(url, link):
72 """Convert navigation link from offest/limit to size/page
74 Example:
75 - input: http://example.com?q=test&category=finance&size=10&page=3
76 - output: http://example2.com?q=test&category=finance&limit=10&offset=30
78 :param url: The new url
79 :param link: The navigation url returned by the API
81 :returns: The new navigation link
82 """
83 url_parsed = urlparse(link)
84 host_url = "{base_url}" "?q={q}&limit={limit}&offset={offset}"
86 url_queries = parse_qs(url_parsed.query)
88 if "q" in url_queries:
89 q = url_queries["q"][0]
90 else:
91 q = ""
93 if "section" in url_queries:
94 category = url_queries["section"][0]
95 else:
96 category = ""
98 size = int(url_queries["size"][0])
99 page = int(url_queries["page"][0])
101 url = host_url.format(
102 base_url=url, q=q, limit=size, offset=size * (page - 1)
103 )
105 if category != "":
106 url += "&category=" + category
108 return url
111def build_pagination_link(snap_searched, snap_category, page):
112 """Build pagination link
114 :param snap_searched: Name of the search query
115 :param snap_category: The category being searched in
116 :param page: The page of results
118 :returns: A url string
119 """
120 params = []
122 if snap_searched:
123 params.append("q=" + snap_searched)
125 if snap_category:
126 params.append("category=" + snap_category)
128 if page:
129 params.append("page=" + str(page))
131 return "/search?" + "&".join(params)
134def convert_channel_maps(channel_map):
135 """Converts channel maps list to format easier to manipulate
137 Example:
138 - Input:
139 [
140 {
141 'architecture': 'arch'
142 'map': [{'info': 'release', ...}, ...],
143 'track': 'track 1'
144 },
145 ...
146 ]
147 - Output:
148 {
149 'arch': {
150 'track 1': [{'info': 'release', ...}, ...],
151 ...
152 },
153 ...
154 }
156 :param channel_maps_list: The channel maps list returned by the API
158 :returns: The channel maps reshaped
159 """
160 channel_map_restruct = {}
162 for channel in channel_map:
163 arch = channel.get("channel").get("architecture")
164 track = channel.get("channel").get("track")
165 if arch not in channel_map_restruct:
166 channel_map_restruct[arch] = {}
167 if track not in channel_map_restruct[arch]:
168 channel_map_restruct[arch][track] = []
170 info = {
171 "released-at": convert_date(channel["channel"].get("released-at")),
172 "version": channel.get("version"),
173 "channel": channel["channel"].get("name"),
174 "risk": channel["channel"].get("risk"),
175 "confinement": channel.get("confinement"),
176 "size": channel["download"].get("size"),
177 }
179 channel_map_restruct[arch][track].append(info)
181 return channel_map_restruct
184def convert_date(date_to_convert):
185 """Convert date to human readable format: Month Day Year
187 If date is less than a day return: today or yesterday
189 Format of date to convert: 2019-01-12T16:48:41.821037+00:00
190 Output: Jan 12 2019
192 :param date_to_convert: Date to convert
193 :returns: Readable date
194 """
195 local_timezone = datetime.datetime.utcnow().tzinfo
196 date_parsed = parser.parse(date_to_convert).replace(tzinfo=local_timezone)
197 delta = datetime.datetime.utcnow() - datetime.timedelta(days=1)
199 if delta < date_parsed:
200 return humanize.naturalday(date_parsed).title()
201 else:
202 return date_parsed.strftime("%-d %B %Y")
205categories_list = [
206 "development",
207 "games",
208 "social",
209 "productivity",
210 "utilities",
211 "photo-and-video",
212 "server-and-cloud",
213 "security",
214 "devices-and-iot",
215 "music-and-audio",
216 "entertainment",
217 "art-and-design",
218]
220blacklist = ["featured"]
223def format_category_name(slug):
224 """Format category name into a standard title format
226 :param slug: The hypen spaced, lowercase slug to be formatted
227 :return: The formatted string
228 """
229 return (
230 slug.title()
231 .replace("-", " ")
232 .replace("And", "and")
233 .replace("Iot", "IoT")
234 )
237def get_categories(categories_json):
238 """Retrieve and flatten the nested array from the legacy API response.
240 :param categories_json: The returned json
241 :returns: A list of categories
242 """
244 categories = []
246 if "categories" in categories_json:
247 for cat in categories_json["categories"]:
248 if cat["name"] not in categories_list:
249 if cat["name"] not in blacklist:
250 categories_list.append(cat["name"])
252 for category in categories_list:
253 categories.append(
254 {"slug": category, "name": format_category_name(category)}
255 )
257 return categories
260def get_snap_categories(snap_categories):
261 """Retrieve list of categories with names for a snap.
263 :param snap_categories: List of snap categories from snap info API
264 :returns: A list of categories with names
265 """
266 categories = []
268 for cat in snap_categories:
269 if cat["name"] not in blacklist:
270 categories.append(
271 {
272 "slug": cat["name"],
273 "name": format_category_name(cat["name"]),
274 }
275 )
277 return categories
280def get_latest_versions(
281 channel_maps, default_track, lowest_risk, supported_architectures=None
282):
283 """Get the latest versions of both default/stable and the latest of
284 all other channels, unless it's default/stable
286 :param channel_map: Channel map list
288 :returns: A tuple of default/stable, track/risk channel map objects
289 """
290 ordered_versions = get_last_updated_versions(channel_maps)
292 default_stable = None
293 other = None
294 for channel in ordered_versions:
295 if (
296 not supported_architectures
297 or channel["architecture"] in supported_architectures
298 ):
299 if (
300 channel["track"] == default_track
301 and channel["risk"] == lowest_risk
302 ):
303 if not default_stable:
304 default_stable = channel
305 elif not other:
306 other = channel
308 if default_stable:
309 default_stable["released-at-display"] = convert_date(
310 default_stable["released-at"]
311 )
312 if other:
313 other["released-at-display"] = convert_date(other["released-at"])
314 return default_stable, other
317def get_last_updated_versions(channel_maps):
318 """Get all channels in order of updates
320 :param channel_map: Channel map list
322 :returns: A list of channels ordered by last updated time
323 """
324 releases = []
325 for channel_map in channel_maps:
326 releases.append(channel_map["channel"])
328 return list(reversed(sorted(releases, key=lambda c: c["released-at"])))
331def get_last_updated_version(channel_maps):
332 """Get the oldest channel that was created
334 :param channel_map: Channel map list
336 :returns: The latest stable version, if no stable, the latest risk updated
337 """
338 newest_channel = None
339 for channel_map in channel_maps:
340 if not newest_channel:
341 newest_channel = channel_map
342 else:
343 if channel_map["channel"]["risk"] == "stable":
344 newest_channel = channel_map
346 if newest_channel["channel"]["risk"] == "stable":
347 break
349 return newest_channel
352def has_stable(channel_maps_list):
353 """Use the channel map to find out if the snap has a stable release
355 :param channel_maps_list: Channel map list
357 :returns: True or False
358 """
359 if channel_maps_list:
360 for arch in channel_maps_list:
361 for track in channel_maps_list[arch]:
362 for release in channel_maps_list[arch][track]:
363 if release["risk"] == "stable":
364 return True
366 return False
369def get_lowest_available_risk(channel_map, track):
370 """Get the lowest available risk for the default track
372 :param channel_map: Channel map list
373 :param track: The track of the channel
375 :returns: The lowest available risk
376 """
377 risk_order = ["stable", "candidate", "beta", "edge"]
378 lowest_available_risk = None
379 for arch in channel_map:
380 if arch in channel_map and track in channel_map[arch]:
381 releases = channel_map[arch][track]
382 for release in releases:
383 if not lowest_available_risk:
384 lowest_available_risk = release["risk"]
385 else:
386 risk_index = risk_order.index(release["risk"])
387 lowest_index = risk_order.index(lowest_available_risk)
388 if risk_index < lowest_index:
389 lowest_available_risk = release["risk"]
391 return lowest_available_risk
394def extract_info_channel_map(channel_map, track, risk):
395 """Get the confinement and version for a channel
397 :param channel_map: Channel map list
398 :param track: The track of the channel
399 :param risk: The risk of the channel
401 :returns: Dict containing confinement and version
402 """
403 context = {
404 "confinement": None,
405 "version": None,
406 }
408 for arch in channel_map:
409 if track in channel_map[arch]:
410 releases = channel_map[arch][track]
411 for release in releases:
412 if release["risk"] == risk:
413 context["confinement"] = release.get("confinement")
414 context["version"] = release.get("version")
416 return context
418 return context
421def get_video_embed_code(url):
422 """Get the embed code for videos
424 :param url: The url of the video
426 :returns: Embed code
427 """
428 if "youtube" in url:
429 return {
430 "type": "youtube",
431 "url": url.replace("watch?v=", "embed/"),
432 "id": url.rsplit("?v=", 1)[-1],
433 }
434 if "youtu.be" in url:
435 return {
436 "type": "youtube",
437 "url": url.replace("youtu.be/", "youtube.com/embed/"),
438 "id": url.rsplit("/", 1)[-1],
439 }
440 if "vimeo" in url:
441 return {
442 "type": "vimeo",
443 "url": url.replace("vimeo.com/", "player.vimeo.com/video/"),
444 "id": url.rsplit("/", 1)[-1],
445 }
446 if "asciinema" in url:
447 return {
448 "type": "asciinema",
449 "url": url + ".js",
450 "id": url.rsplit("/", 1)[-1],
451 }
454def filter_screenshots(media):
455 banner_regex = r"/banner(\-icon)?(_.*)?\.(png|jpg)"
457 return [
458 m
459 for m in media
460 if m["type"] == "screenshot" and not re.search(banner_regex, m["url"])
461 ][:5]
464def get_video(media):
465 video = None
466 for m in media:
467 if m["type"] == "video":
468 video = get_video_embed_code(m["url"])
469 break
470 return video
473def promote_snap_with_icon(snaps):
474 """Move the first snap with an icon to the front of the list
476 :param snaps: The list of snaps
478 :returns: A list of snaps
479 """
480 try:
481 snap_with_icon = next(snap for snap in snaps if snap["icon_url"] != "")
483 if snap_with_icon:
484 snap_with_icon_index = snaps.index(snap_with_icon)
486 snaps.insert(0, snaps.pop(snap_with_icon_index))
487 except StopIteration:
488 pass
490 return snaps
493def get_snap_developer(snap_name):
494 """Is this a special snap published by Canonical?
495 Show some developer information
497 :param snap_name: The name of a snap
499 :returns: a list of [display_name, url]
501 """
502 filename = "store/content/developers/snaps.yaml"
503 snaps = helpers.get_yaml(filename, typ="rt")
505 if snaps and snap_name in snaps:
506 return snaps[snap_name]
508 return None