Coverage for webapp/publisher/snaps/logic.py: 69%
158 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-28 22:05 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-28 22:05 +0000
1import datetime
2import hashlib
3from json import dumps
5from dateutil import parser
8def get_snaps_account_info(account_info):
9 """Get snaps from the account information of a user
11 :param account_info: The account informations
13 :return: A list of snaps
14 :return: A list of registred snaps
15 """
16 user_snaps = {}
17 registered_snaps = {}
18 if "16" in account_info["snaps"]:
19 snaps = account_info["snaps"]["16"]
20 for snap in snaps.keys():
21 if snaps[snap]["status"] != "Revoked":
22 if not snaps[snap]["latest_revisions"]:
23 registered_snaps[snap] = snaps[snap]
24 else:
25 user_snaps[snap] = snaps[snap]
27 now = datetime.datetime.utcnow()
29 for snap in user_snaps:
30 snap_info = user_snaps[snap]
31 for revision in snap_info["latest_revisions"]:
32 if len(revision["channels"]) > 0:
33 snap_info["latest_release"] = revision
34 break
36 if len(user_snaps) == 1:
37 for snap in user_snaps:
38 snap_info = user_snaps[snap]
39 revisions = snap_info["latest_revisions"]
41 revision_since = datetime.datetime.strptime(
42 revisions[-1]["since"], "%Y-%m-%dT%H:%M:%SZ"
43 )
45 if abs((revision_since - now).days) < 30 and (
46 not revisions[0]["channels"]
47 or revisions[0]["channels"][0] == "edge"
48 ):
49 snap_info["is_new"] = True
51 return user_snaps, registered_snaps
54def get_stores(stores, roles):
55 """Get list of stores where the user has the specified roles
57 :param stores: The account stores
58 :param roles: The requested roles to filter
60 :return: A list of stores
61 """
62 user_stores = []
64 for store in stores:
65 if not set(roles).isdisjoint(store["roles"]):
66 user_stores.append(store)
68 return user_stores
71def get_snap_names_by_ownership(account_info):
72 """Get list of snaps names user is collaborator of
74 :param account_info: The account informations
76 :return: A list of owned snaps names
77 :return: A list of shared snaps names
78 """
80 snaps, registered_names = get_snaps_account_info(account_info)
82 owned_snaps_names = []
83 shared_snaps_names = []
85 for snap in snaps:
86 if snaps[snap]["publisher"]["username"] == account_info["username"]:
87 owned_snaps_names.append(snap)
88 else:
89 shared_snaps_names.append(snap)
91 return owned_snaps_names, shared_snaps_names
94def verify_base_metrics(active_devices):
95 """Verify that the base metric exists in the list of available
96 metrics
98 :param active_devices: The base metric
100 :return: The base metric if it's available, 'version' if not
101 """
102 if active_devices not in ("version", "os", "channel", "architecture"):
103 return "version"
105 return active_devices
108def extract_metrics_period(metric_period):
109 """Extract the different values from the period requested. The format of
110 the metric_period should be: [0-9]+[dm]
111 If the metric_period is invalid the default value is 30d
113 Input:
114 30d
116 Output:
117 {
118 'period': '30d',
119 'int': 30,
120 'bucket': 30
121 }
123 :param metric_period: The metric period requested
125 :returns: A dictionnary with the differents values of the period
126 """
127 allowed_periods = ["d", "m", "y"]
129 if not metric_period[:-1].isdigit():
130 metric_period = "30d"
132 metric_period_int = int(metric_period[:-1])
133 metric_bucket = metric_period[-1:]
134 if metric_bucket not in allowed_periods:
135 metric_bucket = "d"
137 return {
138 "period": metric_period,
139 "int": metric_period_int,
140 "bucket": metric_bucket,
141 }
144def get_installed_based_metric(installed_base_metric):
145 if installed_base_metric == "version":
146 return "weekly_installed_base_by_version"
147 elif installed_base_metric == "os":
148 return "weekly_installed_base_by_operating_system"
149 elif installed_base_metric == "channel":
150 return "weekly_installed_base_by_channel"
151 elif installed_base_metric == "architecture":
152 return "weekly_installed_base_by_architecture"
155def is_snap_on_stable(channel_maps_list):
156 """Checks if the snap in on a stable channel
158 :param channel_maps_list: The channel maps list of a snap
160 :return: True is stable, False if not
161 """
162 is_on_stable = False
163 for series in channel_maps_list:
164 for series_map in series["map"]:
165 is_on_stable = (
166 is_on_stable
167 or "channel" in series_map
168 and series_map["channel"] == "stable"
169 and series_map["info"]
170 )
172 return is_on_stable
175def build_image_info(image, image_type):
176 """
177 Build info json structure for image upload
178 Return json oject with useful informations for the api
179 """
180 hasher = hashlib.sha256(image.read())
181 hash_final = hasher.hexdigest()
182 image.seek(0)
184 return {
185 "key": image.filename,
186 "type": image_type,
187 "filename": image.filename,
188 "hash": hash_final,
189 }
192def remove_invalid_characters(description):
193 """Remove invalid charcters from description
195 :param description: The description
197 :return: The description wihtou the invalid characters"""
198 return description.replace("\r\n", "\n")
201def build_changed_images(
202 changed_screenshots,
203 current_screenshots,
204 icon,
205 new_screenshots,
206 banner_background,
207):
208 """Filter and build images to upload.
210 :param changed_screenshots: Dictionary of all the changed screenshots
211 :param current_screenshots: Ductionary of the current screenshots
212 :param icon: The uploaded icon
213 :param new_screenshots: The uploaded screenshots
214 :param banner_background: The uploaded banner
215 :param banner_icon: The uploaded banner icon
217 :return: The json to send to the store and the list images to upload"""
219 info = []
220 images_files = []
221 images_json = None
223 # Get screenshots info (existing and new) while keeping the order recieved
224 for changed_screenshot in changed_screenshots:
225 for current_screenshot in current_screenshots:
226 if (
227 changed_screenshot
228 and changed_screenshot["url"] == current_screenshot["url"]
229 and current_screenshot not in info
230 ):
231 info.append(current_screenshot)
232 break
233 for new_screenshot in new_screenshots:
234 if new_screenshot:
235 is_same = (
236 changed_screenshot["status"] == "new"
237 and changed_screenshot["name"] == new_screenshot.filename
238 )
240 if is_same:
241 image_built = build_image_info(
242 new_screenshot, "screenshot"
243 )
244 if image_built not in info:
245 info.append(image_built)
246 images_files.append(new_screenshot)
247 break
249 # Add new icon
250 if icon is not None:
251 info.append(build_image_info(icon, "icon"))
252 images_files.append(icon)
254 # Add new banner background
255 if banner_background is not None:
256 info.append(build_image_info(banner_background, "banner"))
257 images_files.append(banner_background)
259 images_json = {"info": dumps(info)}
261 return images_json, images_files
264def filter_changes_data(changes):
265 """Filter the changes posted to keep the valid fields
267 :param changes: Dictionary of all the changes
269 ":return: Dictionary with the changes filtered"""
270 whitelist = [
271 "title",
272 "summary",
273 "description",
274 "keywords",
275 "license",
276 "private",
277 "unlisted",
278 "blacklist_countries",
279 "whitelist_countries",
280 "public_metrics_enabled",
281 "public_metrics_blacklist",
282 "whitelist_countries",
283 "blacklist_countries",
284 "license",
285 "video_urls",
286 "categories",
287 "update_metadata_on_release",
288 "links",
289 ]
291 return {key: changes[key] for key in whitelist if key in changes}
294def invalid_field_errors(errors):
295 """Split errors in invalid fields and other errors
297 :param erros: List of errors
299 :return: List of fields errors and list of other errors"""
300 field_errors = {}
301 other_errors = []
303 for error in errors:
304 if error["code"] == "invalid-field" or error["code"] == "required":
305 if "name" in error["extra"]:
306 name = error["extra"]["name"]
307 elif "field" in error["extra"]:
308 name = error["extra"]["field"]
309 field_errors[name] = error["message"]
310 else:
311 other_errors.append(error)
313 return field_errors, other_errors
316def replace_reserved_categories_key(categories):
317 """The API returns `items` which is a reserved word in jinja2.
318 This method renames that key for snap_categories.
320 :param categories: Dict of categories
322 :return: Dict of categories"""
323 snap_categories = categories
324 snap_categories["categories"] = snap_categories["items"]
326 del snap_categories["items"]
328 return snap_categories
331def filter_categories(categories):
332 """Filter featured category out of the list of categories on a snap
334 :param categories: Dict of categories
336 :return: Dict of categories"""
337 snap_categories = categories
339 snap_categories["categories"] = list(
340 filter(
341 lambda category: category["name"] != "featured",
342 snap_categories["categories"],
343 )
344 )
346 return snap_categories
349def filter_available_stores(stores):
350 """Available stores that aren't publicly available
352 :param stores: List of stores as per the account endpoint
354 :return: List of stores
355 """
356 public_stores = ["ubuntu", "LimeNET", "LimeSDR", "orange-pi"]
358 available_stores = []
359 for store in stores:
360 if "access" in store["roles"] and store["id"] not in public_stores:
361 available_stores.append(store)
363 return available_stores
366def convert_date(date_to_convert):
367 """Convert date to human readable format: Month Year
368 Format of date to convert: 2019-01-12T16:48:41.821037+00:00
370 Output: January 2019
371 :param date_to_convert: Date to convert
372 :returns: Readable date
373 """
374 local_timezone = datetime.datetime.utcnow().tzinfo
375 date_parsed = parser.parse(date_to_convert).replace(tzinfo=local_timezone)
376 return date_parsed.strftime("%B %Y")
379def categorise_media(media):
380 """Media comes in many forms, method splits them into:
381 Icons, Screenshots and Banner images
383 :param media: a list of media dicts
384 :returns: Separate lists for the media types"""
386 banner_urls = []
387 icon_urls = []
388 screenshot_urls = []
390 for m in media:
391 if m["type"] == "banner":
392 banner_urls.append(m["url"])
393 elif m["type"] == "icon":
394 icon_urls.append(m["url"])
395 elif m["type"] == "screenshot":
396 screenshot_urls.append(m["url"])
398 return icon_urls, screenshot_urls, banner_urls
401def get_store_name(store_id, stores):
402 available_stores = filter_available_stores(stores)
403 store = next(
404 (st for st in available_stores if st["id"] == store_id),
405 None,
406 )
407 if store:
408 return store["name"]
409 else:
410 return "Global"