Coverage for webapp/store/logic.py : 72%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import datetime
2import random
3import re
4from urllib.parse import parse_qs, urlparse
6import humanize
7from dateutil import parser
8from webapp import helpers
11def get_n_random_snaps(snaps, choice_number):
12 if len(snaps) > choice_number:
13 return random.sample(snaps, choice_number)
15 return snaps
18def get_snap_banner_url(snap_result):
19 """Get snaps banner url from media object
21 :param snap_result: the snap dictionnary
22 :returns: the snap dict with banner key
23 """
24 for media in snap_result["media"]:
25 if media["type"] == "banner":
26 snap_result["banner_url"] = media["url"]
27 break
29 return snap_result
32def get_pages_details(url, links):
33 """Transform returned navigation links from search API from limit/offset
34 to size/page
36 :param url: The url to build
37 :param links: The links returned by the API
39 :returns: A dictionnary with all the navigation links
40 """
41 links_result = {}
43 if "first" in links:
44 links_result["first"] = convert_navigation_url(
45 url, links["first"]["href"]
46 )
48 if "last" in links:
49 links_result["last"] = convert_navigation_url(
50 url, links["last"]["href"]
51 )
53 if "next" in links:
54 links_result["next"] = convert_navigation_url(
55 url, links["next"]["href"]
56 )
58 if "prev" in links:
59 links_result["prev"] = convert_navigation_url(
60 url, links["prev"]["href"]
61 )
63 if "self" in links:
64 links_result["self"] = convert_navigation_url(
65 url, links["self"]["href"]
66 )
68 return links_result
71def convert_navigation_url(url, link):
72 """Convert navigation link from offest/limit to size/page
74 Example:
75 - input: http://example.com?q=test&category=finance&size=10&page=3
76 - output: http://example2.com?q=test&category=finance&limit=10&offset=30
78 :param url: The new url
79 :param link: The navigation url returned by the API
81 :returns: The new navigation link
82 """
83 url_parsed = urlparse(link)
84 host_url = "{base_url}" "?q={q}&limit={limit}&offset={offset}"
86 url_queries = parse_qs(url_parsed.query)
88 if "q" in url_queries:
89 q = url_queries["q"][0]
90 else:
91 q = ""
93 if "section" in url_queries:
94 category = url_queries["section"][0]
95 else:
96 category = ""
98 size = int(url_queries["size"][0])
99 page = int(url_queries["page"][0])
101 url = host_url.format(
102 base_url=url, q=q, limit=size, offset=size * (page - 1)
103 )
105 if category != "":
106 url += "&category=" + category
108 return url
111def build_pagination_link(snap_searched, snap_category, page):
112 """Build pagination link
114 :param snap_searched: Name of the search query
115 :param snap_category: The category being searched in
116 :param page: The page of results
118 :returns: A url string
119 """
120 params = []
122 if snap_searched:
123 params.append("q=" + snap_searched)
125 if snap_category:
126 params.append("category=" + snap_category)
128 if page:
129 params.append("page=" + str(page))
131 return "/search?" + "&".join(params)
134def convert_channel_maps(channel_map):
135 """Converts channel maps list to format easier to manipulate
137 Example:
138 - Input:
139 [
140 {
141 'architecture': 'arch'
142 'map': [{'info': 'release', ...}, ...],
143 'track': 'track 1'
144 },
145 ...
146 ]
147 - Output:
148 {
149 'arch': {
150 'track 1': [{'info': 'release', ...}, ...],
151 ...
152 },
153 ...
154 }
156 :param channel_maps_list: The channel maps list returned by the API
158 :returns: The channel maps reshaped
159 """
160 channel_map_restruct = {}
162 for channel in channel_map:
163 arch = channel.get("channel").get("architecture")
164 track = channel.get("channel").get("track")
165 if arch not in channel_map_restruct:
166 channel_map_restruct[arch] = {}
167 if track not in channel_map_restruct[arch]:
168 channel_map_restruct[arch][track] = []
170 info = {
171 "released-at": convert_date(channel["channel"].get("released-at")),
172 "version": channel.get("version"),
173 "channel": channel["channel"].get("name"),
174 "risk": channel["channel"].get("risk"),
175 "confinement": channel.get("confinement"),
176 "size": channel["download"].get("size"),
177 }
179 channel_map_restruct[arch][track].append(info)
181 return channel_map_restruct
184def convert_date(date_to_convert):
185 """Convert date to human readable format: Month Day Year
187 If date is less than a day return: today or yesterday
189 Format of date to convert: 2019-01-12T16:48:41.821037+00:00
190 Output: Jan 12 2019
192 :param date_to_convert: Date to convert
193 :returns: Readable date
194 """
195 local_timezone = datetime.datetime.utcnow().tzinfo
196 date_parsed = parser.parse(date_to_convert).replace(tzinfo=local_timezone)
197 delta = datetime.datetime.utcnow() - datetime.timedelta(days=1)
199 if delta < date_parsed:
200 return humanize.naturalday(date_parsed).title()
201 else:
202 return date_parsed.strftime("%-d %B %Y")
205categories_list = [
206 "development",
207 "games",
208 "social",
209 "productivity",
210 "utilities",
211 "photo-and-video",
212 "server-and-cloud",
213 "security",
214 "devices-and-iot",
215 "music-and-audio",
216 "entertainment",
217 "art-and-design",
218]
220blacklist = ["featured"]
223def format_category_name(slug):
224 """Format category name into a standard title format
226 :param slug: The hypen spaced, lowercase slug to be formatted
227 :return: The formatted string
228 """
229 return (
230 slug.title()
231 .replace("-", " ")
232 .replace("And", "and")
233 .replace("Iot", "IoT")
234 )
237def get_categories(categories_json):
238 """Retrieve and flatten the nested array from the legacy API response.
240 :param categories_json: The returned json
241 :returns: A list of categories
242 """
244 categories = []
246 if "categories" in categories_json:
247 for cat in categories_json["categories"]:
248 if cat["name"] not in categories_list:
249 if cat["name"] not in blacklist:
250 categories_list.append(cat["name"])
252 for category in categories_list:
253 categories.append(
254 {"slug": category, "name": format_category_name(category)}
255 )
257 return categories
260def get_snap_categories(snap_categories):
261 """Retrieve list of categories with names for a snap.
263 :param snap_categories: List of snap categories from snap info API
264 :returns: A list of categories with names
265 """
266 categories = []
268 for cat in snap_categories:
269 if cat["name"] not in blacklist:
270 categories.append(
271 {
272 "slug": cat["name"],
273 "name": format_category_name(cat["name"]),
274 }
275 )
277 return categories
280def get_latest_versions(channel_maps, default_track, lowest_risk):
281 """Get the latest versions of both default/stable and the latest of
282 all other channels, unless it's default/stable
284 :param channel_map: Channel map list
286 :returns: A tuple of default/stable, track/risk channel map objects
287 """
288 ordered_versions = get_last_updated_versions(channel_maps)
290 default_stable = None
291 other = None
293 for channel in ordered_versions:
294 if (
295 channel["track"] == default_track
296 and channel["risk"] == lowest_risk
297 ):
298 if not default_stable:
299 default_stable = channel
300 elif not other:
301 other = channel
303 if default_stable:
304 default_stable["released-at-display"] = convert_date(
305 default_stable["released-at"]
306 )
307 if other:
308 other["released-at-display"] = convert_date(other["released-at"])
309 return default_stable, other
312def get_last_updated_versions(channel_maps):
313 """Get all channels in order of updates
315 :param channel_map: Channel map list
317 :returns: A list of channels ordered by last updated time
318 """
319 releases = []
320 for channel_map in channel_maps:
321 releases.append(channel_map["channel"])
323 return list(reversed(sorted(releases, key=lambda c: c["released-at"])))
326def get_last_updated_version(channel_maps):
327 """Get the oldest channel that was created
329 :param channel_map: Channel map list
331 :returns: The latest stable version, if no stable, the latest risk updated
332 """
333 newest_channel = None
334 for channel_map in channel_maps:
335 if not newest_channel:
336 newest_channel = channel_map
337 else:
338 if channel_map["channel"]["risk"] == "stable":
339 newest_channel = channel_map
341 if newest_channel["channel"]["risk"] == "stable":
342 break
344 return newest_channel
347def has_stable(channel_maps_list):
348 """Use the channel map to find out if the snap has a stable release
350 :param channel_maps_list: Channel map list
352 :returns: True or False
353 """
354 if channel_maps_list:
355 for arch in channel_maps_list:
356 for track in channel_maps_list[arch]:
357 for release in channel_maps_list[arch][track]:
358 if release["risk"] == "stable":
359 return True
361 return False
364def get_lowest_available_risk(channel_map, track):
365 """Get the lowest available risk for the default track
367 :param channel_map: Channel map list
368 :param track: The track of the channel
370 :returns: The lowest available risk
371 """
372 risk_order = ["stable", "candidate", "beta", "edge"]
373 lowest_available_risk = None
374 for arch in channel_map:
375 if arch in channel_map and track in channel_map[arch]:
376 releases = channel_map[arch][track]
377 for release in releases:
378 if not lowest_available_risk:
379 lowest_available_risk = release["risk"]
380 else:
381 risk_index = risk_order.index(release["risk"])
382 lowest_index = risk_order.index(lowest_available_risk)
383 if risk_index < lowest_index:
384 lowest_available_risk = release["risk"]
386 return lowest_available_risk
389def extract_info_channel_map(channel_map, track, risk):
390 """Get the confinement and version for a channel
392 :param channel_map: Channel map list
393 :param track: The track of the channel
394 :param risk: The risk of the channel
396 :returns: Dict containing confinement and version
397 """
398 context = {
399 "confinement": None,
400 "version": None,
401 }
403 for arch in channel_map:
404 if track in channel_map[arch]:
405 releases = channel_map[arch][track]
406 for release in releases:
407 if release["risk"] == risk:
408 context["confinement"] = release.get("confinement")
409 context["version"] = release.get("version")
411 return context
413 return context
416def get_video_embed_code(url):
417 """Get the embed code for videos
419 :param url: The url of the video
421 :returns: Embed code
422 """
423 if "youtube" in url:
424 return {
425 "type": "youtube",
426 "url": url.replace("watch?v=", "embed/"),
427 "id": url.rsplit("?v=", 1)[-1],
428 }
429 if "youtu.be" in url:
430 return {
431 "type": "youtube",
432 "url": url.replace("youtu.be/", "youtube.com/embed/"),
433 "id": url.rsplit("/", 1)[-1],
434 }
435 if "vimeo" in url:
436 return {
437 "type": "vimeo",
438 "url": url.replace("vimeo.com/", "player.vimeo.com/video/"),
439 "id": url.rsplit("/", 1)[-1],
440 }
441 if "asciinema" in url:
442 return {
443 "type": "asciinema",
444 "url": url + ".js",
445 "id": url.rsplit("/", 1)[-1],
446 }
449def filter_screenshots(media):
450 banner_regex = r"/banner(\-icon)?(_.*)?\.(png|jpg)"
452 return [
453 m
454 for m in media
455 if m["type"] == "screenshot" and not re.search(banner_regex, m["url"])
456 ][:5]
459def get_video(media):
460 video = None
461 for m in media:
462 if m["type"] == "video":
463 video = get_video_embed_code(m["url"])
464 break
465 return video
468def promote_snap_with_icon(snaps):
469 """Move the first snap with an icon to the front of the list
471 :param snaps: The list of snaps
473 :returns: A list of snaps
474 """
475 try:
476 snap_with_icon = next(snap for snap in snaps if snap["icon_url"] != "")
478 if snap_with_icon:
479 snap_with_icon_index = snaps.index(snap_with_icon)
481 snaps.insert(0, snaps.pop(snap_with_icon_index))
482 except StopIteration:
483 pass
485 return snaps
488def get_snap_developer(snap_name):
489 """Is this a special snap published by Canonical?
490 Show some developer information
492 :param snap_name: The name of a snap
494 :returns: a list of [display_name, url]
496 """
497 filename = "store/content/developers/snaps.yaml"
498 snaps = helpers.get_yaml(filename, typ="rt")
500 if snaps and snap_name in snaps:
501 return snaps[snap_name]
503 return None