Coverage for webapp/publisher/snaps/logic.py: 69%

158 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-28 22:05 +0000

1import datetime 

2import hashlib 

3from json import dumps 

4 

5from dateutil import parser 

6 

7 

8def get_snaps_account_info(account_info): 

9 """Get snaps from the account information of a user 

10 

11 :param account_info: The account informations 

12 

13 :return: A list of snaps 

14 :return: A list of registred snaps 

15 """ 

16 user_snaps = {} 

17 registered_snaps = {} 

18 if "16" in account_info["snaps"]: 

19 snaps = account_info["snaps"]["16"] 

20 for snap in snaps.keys(): 

21 if snaps[snap]["status"] != "Revoked": 

22 if not snaps[snap]["latest_revisions"]: 

23 registered_snaps[snap] = snaps[snap] 

24 else: 

25 user_snaps[snap] = snaps[snap] 

26 

27 now = datetime.datetime.utcnow() 

28 

29 for snap in user_snaps: 

30 snap_info = user_snaps[snap] 

31 for revision in snap_info["latest_revisions"]: 

32 if len(revision["channels"]) > 0: 

33 snap_info["latest_release"] = revision 

34 break 

35 

36 if len(user_snaps) == 1: 

37 for snap in user_snaps: 

38 snap_info = user_snaps[snap] 

39 revisions = snap_info["latest_revisions"] 

40 

41 revision_since = datetime.datetime.strptime( 

42 revisions[-1]["since"], "%Y-%m-%dT%H:%M:%SZ" 

43 ) 

44 

45 if abs((revision_since - now).days) < 30 and ( 

46 not revisions[0]["channels"] 

47 or revisions[0]["channels"][0] == "edge" 

48 ): 

49 snap_info["is_new"] = True 

50 

51 return user_snaps, registered_snaps 

52 

53 

54def get_stores(stores, roles): 

55 """Get list of stores where the user has the specified roles 

56 

57 :param stores: The account stores 

58 :param roles: The requested roles to filter 

59 

60 :return: A list of stores 

61 """ 

62 user_stores = [] 

63 

64 for store in stores: 

65 if not set(roles).isdisjoint(store["roles"]): 

66 user_stores.append(store) 

67 

68 return user_stores 

69 

70 

71def get_snap_names_by_ownership(account_info): 

72 """Get list of snaps names user is collaborator of 

73 

74 :param account_info: The account informations 

75 

76 :return: A list of owned snaps names 

77 :return: A list of shared snaps names 

78 """ 

79 

80 snaps, registered_names = get_snaps_account_info(account_info) 

81 

82 owned_snaps_names = [] 

83 shared_snaps_names = [] 

84 

85 for snap in snaps: 

86 if snaps[snap]["publisher"]["username"] == account_info["username"]: 

87 owned_snaps_names.append(snap) 

88 else: 

89 shared_snaps_names.append(snap) 

90 

91 return owned_snaps_names, shared_snaps_names 

92 

93 

94def verify_base_metrics(active_devices): 

95 """Verify that the base metric exists in the list of available 

96 metrics 

97 

98 :param active_devices: The base metric 

99 

100 :return: The base metric if it's available, 'version' if not 

101 """ 

102 if active_devices not in ("version", "os", "channel", "architecture"): 

103 return "version" 

104 

105 return active_devices 

106 

107 

108def extract_metrics_period(metric_period): 

109 """Extract the different values from the period requested. The format of 

110 the metric_period should be: [0-9]+[dm] 

111 If the metric_period is invalid the default value is 30d 

112 

113 Input: 

114 30d 

115 

116 Output: 

117 { 

118 'period': '30d', 

119 'int': 30, 

120 'bucket': 30 

121 } 

122 

123 :param metric_period: The metric period requested 

124 

125 :returns: A dictionnary with the differents values of the period 

126 """ 

127 allowed_periods = ["d", "m", "y"] 

128 

129 if not metric_period[:-1].isdigit(): 

130 metric_period = "30d" 

131 

132 metric_period_int = int(metric_period[:-1]) 

133 metric_bucket = metric_period[-1:] 

134 if metric_bucket not in allowed_periods: 

135 metric_bucket = "d" 

136 

137 return { 

138 "period": metric_period, 

139 "int": metric_period_int, 

140 "bucket": metric_bucket, 

141 } 

142 

143 

144def get_installed_based_metric(installed_base_metric): 

145 if installed_base_metric == "version": 

146 return "weekly_installed_base_by_version" 

147 elif installed_base_metric == "os": 

148 return "weekly_installed_base_by_operating_system" 

149 elif installed_base_metric == "channel": 

150 return "weekly_installed_base_by_channel" 

151 elif installed_base_metric == "architecture": 

152 return "weekly_installed_base_by_architecture" 

153 

154 

155def is_snap_on_stable(channel_maps_list): 

156 """Checks if the snap in on a stable channel 

157 

158 :param channel_maps_list: The channel maps list of a snap 

159 

160 :return: True is stable, False if not 

161 """ 

162 is_on_stable = False 

163 for series in channel_maps_list: 

164 for series_map in series["map"]: 

165 is_on_stable = ( 

166 is_on_stable 

167 or "channel" in series_map 

168 and series_map["channel"] == "stable" 

169 and series_map["info"] 

170 ) 

171 

172 return is_on_stable 

173 

174 

175def build_image_info(image, image_type): 

176 """ 

177 Build info json structure for image upload 

178 Return json oject with useful informations for the api 

179 """ 

180 hasher = hashlib.sha256(image.read()) 

181 hash_final = hasher.hexdigest() 

182 image.seek(0) 

183 

184 return { 

185 "key": image.filename, 

186 "type": image_type, 

187 "filename": image.filename, 

188 "hash": hash_final, 

189 } 

190 

191 

192def remove_invalid_characters(description): 

193 """Remove invalid charcters from description 

194 

195 :param description: The description 

196 

197 :return: The description wihtou the invalid characters""" 

198 return description.replace("\r\n", "\n") 

199 

200 

201def build_changed_images( 

202 changed_screenshots, 

203 current_screenshots, 

204 icon, 

205 new_screenshots, 

206 banner_background, 

207): 

208 """Filter and build images to upload. 

209 

210 :param changed_screenshots: Dictionary of all the changed screenshots 

211 :param current_screenshots: Ductionary of the current screenshots 

212 :param icon: The uploaded icon 

213 :param new_screenshots: The uploaded screenshots 

214 :param banner_background: The uploaded banner 

215 :param banner_icon: The uploaded banner icon 

216 

217 :return: The json to send to the store and the list images to upload""" 

218 

219 info = [] 

220 images_files = [] 

221 images_json = None 

222 

223 # Get screenshots info (existing and new) while keeping the order recieved 

224 for changed_screenshot in changed_screenshots: 

225 for current_screenshot in current_screenshots: 

226 if ( 

227 changed_screenshot 

228 and changed_screenshot["url"] == current_screenshot["url"] 

229 and current_screenshot not in info 

230 ): 

231 info.append(current_screenshot) 

232 break 

233 for new_screenshot in new_screenshots: 

234 if new_screenshot: 

235 is_same = ( 

236 changed_screenshot["status"] == "new" 

237 and changed_screenshot["name"] == new_screenshot.filename 

238 ) 

239 

240 if is_same: 

241 image_built = build_image_info( 

242 new_screenshot, "screenshot" 

243 ) 

244 if image_built not in info: 

245 info.append(image_built) 

246 images_files.append(new_screenshot) 

247 break 

248 

249 # Add new icon 

250 if icon is not None: 

251 info.append(build_image_info(icon, "icon")) 

252 images_files.append(icon) 

253 

254 # Add new banner background 

255 if banner_background is not None: 

256 info.append(build_image_info(banner_background, "banner")) 

257 images_files.append(banner_background) 

258 

259 images_json = {"info": dumps(info)} 

260 

261 return images_json, images_files 

262 

263 

264def filter_changes_data(changes): 

265 """Filter the changes posted to keep the valid fields 

266 

267 :param changes: Dictionary of all the changes 

268 

269 ":return: Dictionary with the changes filtered""" 

270 whitelist = [ 

271 "title", 

272 "summary", 

273 "description", 

274 "keywords", 

275 "license", 

276 "private", 

277 "unlisted", 

278 "blacklist_countries", 

279 "whitelist_countries", 

280 "public_metrics_enabled", 

281 "public_metrics_blacklist", 

282 "whitelist_countries", 

283 "blacklist_countries", 

284 "license", 

285 "video_urls", 

286 "categories", 

287 "update_metadata_on_release", 

288 "links", 

289 ] 

290 

291 return {key: changes[key] for key in whitelist if key in changes} 

292 

293 

294def invalid_field_errors(errors): 

295 """Split errors in invalid fields and other errors 

296 

297 :param erros: List of errors 

298 

299 :return: List of fields errors and list of other errors""" 

300 field_errors = {} 

301 other_errors = [] 

302 

303 for error in errors: 

304 if error["code"] == "invalid-field" or error["code"] == "required": 

305 if "name" in error["extra"]: 

306 name = error["extra"]["name"] 

307 elif "field" in error["extra"]: 

308 name = error["extra"]["field"] 

309 field_errors[name] = error["message"] 

310 else: 

311 other_errors.append(error) 

312 

313 return field_errors, other_errors 

314 

315 

316def replace_reserved_categories_key(categories): 

317 """The API returns `items` which is a reserved word in jinja2. 

318 This method renames that key for snap_categories. 

319 

320 :param categories: Dict of categories 

321 

322 :return: Dict of categories""" 

323 snap_categories = categories 

324 snap_categories["categories"] = snap_categories["items"] 

325 

326 del snap_categories["items"] 

327 

328 return snap_categories 

329 

330 

331def filter_categories(categories): 

332 """Filter featured category out of the list of categories on a snap 

333 

334 :param categories: Dict of categories 

335 

336 :return: Dict of categories""" 

337 snap_categories = categories 

338 

339 snap_categories["categories"] = list( 

340 filter( 

341 lambda category: category["name"] != "featured", 

342 snap_categories["categories"], 

343 ) 

344 ) 

345 

346 return snap_categories 

347 

348 

349def filter_available_stores(stores): 

350 """Available stores that aren't publicly available 

351 

352 :param stores: List of stores as per the account endpoint 

353 

354 :return: List of stores 

355 """ 

356 public_stores = ["ubuntu", "LimeNET", "LimeSDR", "orange-pi"] 

357 

358 available_stores = [] 

359 for store in stores: 

360 if "access" in store["roles"] and store["id"] not in public_stores: 

361 available_stores.append(store) 

362 

363 return available_stores 

364 

365 

366def convert_date(date_to_convert): 

367 """Convert date to human readable format: Month Year 

368 Format of date to convert: 2019-01-12T16:48:41.821037+00:00 

369 

370 Output: January 2019 

371 :param date_to_convert: Date to convert 

372 :returns: Readable date 

373 """ 

374 local_timezone = datetime.datetime.utcnow().tzinfo 

375 date_parsed = parser.parse(date_to_convert).replace(tzinfo=local_timezone) 

376 return date_parsed.strftime("%B %Y") 

377 

378 

379def categorise_media(media): 

380 """Media comes in many forms, method splits them into: 

381 Icons, Screenshots and Banner images 

382 

383 :param media: a list of media dicts 

384 :returns: Separate lists for the media types""" 

385 

386 banner_urls = [] 

387 icon_urls = [] 

388 screenshot_urls = [] 

389 

390 for m in media: 

391 if m["type"] == "banner": 

392 banner_urls.append(m["url"]) 

393 elif m["type"] == "icon": 

394 icon_urls.append(m["url"]) 

395 elif m["type"] == "screenshot": 

396 screenshot_urls.append(m["url"]) 

397 

398 return icon_urls, screenshot_urls, banner_urls 

399 

400 

401def get_store_name(store_id, stores): 

402 available_stores = filter_available_stores(stores) 

403 store = next( 

404 (st for st in available_stores if st["id"] == store_id), 

405 None, 

406 ) 

407 if store: 

408 return store["name"] 

409 else: 

410 return "Global"