Coverage for webapp/store/logic.py: 74%

209 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-15 22:43 +0000

1import datetime 

2import random 

3import re 

4from urllib.parse import parse_qs, urlparse 

5 

6import humanize 

7from dateutil import parser 

8from dateutil.relativedelta import relativedelta 

9from webapp import helpers 

10 

11 

12def get_n_random_snaps(snaps, choice_number): 

13 if len(snaps) > choice_number: 

14 return random.sample(snaps, choice_number) 

15 

16 return snaps 

17 

18 

19def get_snap_banner_url(snap_result): 

20 """Get snaps banner url from media object 

21 

22 :param snap_result: the snap dictionnary 

23 :returns: the snap dict with banner key 

24 """ 

25 for media in snap_result["media"]: 

26 if media["type"] == "banner": 

27 snap_result["banner_url"] = media["url"] 

28 break 

29 

30 return snap_result 

31 

32 

33def get_pages_details(url, links): 

34 """Transform returned navigation links from search API from limit/offset 

35 to size/page 

36 

37 :param url: The url to build 

38 :param links: The links returned by the API 

39 

40 :returns: A dictionnary with all the navigation links 

41 """ 

42 links_result = {} 

43 

44 if "first" in links: 

45 links_result["first"] = convert_navigation_url( 

46 url, links["first"]["href"] 

47 ) 

48 

49 if "last" in links: 

50 links_result["last"] = convert_navigation_url( 

51 url, links["last"]["href"] 

52 ) 

53 

54 if "next" in links: 

55 links_result["next"] = convert_navigation_url( 

56 url, links["next"]["href"] 

57 ) 

58 

59 if "prev" in links: 

60 links_result["prev"] = convert_navigation_url( 

61 url, links["prev"]["href"] 

62 ) 

63 

64 if "self" in links: 

65 links_result["self"] = convert_navigation_url( 

66 url, links["self"]["href"] 

67 ) 

68 

69 return links_result 

70 

71 

72def convert_navigation_url(url, link): 

73 """Convert navigation link from offest/limit to size/page 

74 

75 Example: 

76 - input: http://example.com?q=test&category=finance&size=10&page=3 

77 - output: http://example2.com?q=test&category=finance&limit=10&offset=30 

78 

79 :param url: The new url 

80 :param link: The navigation url returned by the API 

81 

82 :returns: The new navigation link 

83 """ 

84 url_parsed = urlparse(link) 

85 host_url = "{base_url}" "?q={q}&limit={limit}&offset={offset}" 

86 

87 url_queries = parse_qs(url_parsed.query) 

88 

89 if "q" in url_queries: 

90 q = url_queries["q"][0] 

91 else: 

92 q = "" 

93 

94 if "section" in url_queries: 

95 category = url_queries["section"][0] 

96 else: 

97 category = "" 

98 

99 size = int(url_queries["size"][0]) 

100 page = int(url_queries["page"][0]) 

101 

102 url = host_url.format( 

103 base_url=url, q=q, limit=size, offset=size * (page - 1) 

104 ) 

105 

106 if category != "": 

107 url += "&category=" + category 

108 

109 return url 

110 

111 

112def build_pagination_link(snap_searched, snap_category, page): 

113 """Build pagination link 

114 

115 :param snap_searched: Name of the search query 

116 :param snap_category: The category being searched in 

117 :param page: The page of results 

118 

119 :returns: A url string 

120 """ 

121 params = [] 

122 

123 if snap_searched: 

124 params.append("q=" + snap_searched) 

125 

126 if snap_category: 

127 params.append("category=" + snap_category) 

128 

129 if page: 

130 params.append("page=" + str(page)) 

131 

132 return "/search?" + "&".join(params) 

133 

134 

135def convert_channel_maps(channel_map): 

136 """Converts channel maps list to format easier to manipulate 

137 

138 Example: 

139 - Input: 

140 [ 

141 { 

142 'architecture': 'arch' 

143 'map': [{'info': 'release', ...}, ...], 

144 'track': 'track 1' 

145 }, 

146 ... 

147 ] 

148 - Output: 

149 { 

150 'arch': { 

151 'track 1': [{'info': 'release', ...}, ...], 

152 ... 

153 }, 

154 ... 

155 } 

156 

157 :param channel_maps_list: The channel maps list returned by the API 

158 

159 :returns: The channel maps reshaped 

160 """ 

161 channel_map_restruct = {} 

162 

163 for channel in channel_map: 

164 arch = channel.get("channel").get("architecture") 

165 track = channel.get("channel").get("track") 

166 if arch not in channel_map_restruct: 

167 channel_map_restruct[arch] = {} 

168 if track not in channel_map_restruct[arch]: 

169 channel_map_restruct[arch][track] = [] 

170 

171 info = { 

172 "released-at": convert_date(channel["channel"].get("released-at")), 

173 "version": channel.get("version"), 

174 "channel": channel["channel"].get("name"), 

175 "risk": channel["channel"].get("risk"), 

176 "confinement": channel.get("confinement"), 

177 "size": channel["download"].get("size"), 

178 "revision": channel["revision"], 

179 } 

180 

181 channel_map_restruct[arch][track].append(info) 

182 

183 return channel_map_restruct 

184 

185 

186def convert_date(date_to_convert): 

187 """Convert date to human readable format: Month Day Year 

188 

189 If date is less than a day return: today or yesterday 

190 

191 Format of date to convert: 2019-01-12T16:48:41.821037+00:00 

192 Output: Jan 12 2019 

193 

194 :param date_to_convert: Date to convert 

195 :returns: Readable date 

196 """ 

197 local_timezone = datetime.datetime.utcnow().tzinfo 

198 date_parsed = parser.parse(date_to_convert).replace(tzinfo=local_timezone) 

199 delta = datetime.datetime.utcnow() - datetime.timedelta(days=1) 

200 

201 if delta < date_parsed: 

202 return humanize.naturalday(date_parsed).title() 

203 else: 

204 return date_parsed.strftime("%-d %B %Y") 

205 

206 

207def is_snap_old(last_updated_date, old_threshold_years=2.0): 

208 """Check if a snap is considered 'old' based on its last update date 

209 

210 A snap is considered old if it hasn't been updated in the specified 

211 number of years (default: 2 years). 

212 

213 :param last_updated_date: The last updated date string in ISO format 

214 :param old_threshold_years: Number of years to consider a snap old 

215 (default: 2) 

216 :returns: True if snap is old, False otherwise 

217 """ 

218 if not last_updated_date: 

219 return False 

220 

221 try: 

222 date_parsed = parser.parse(last_updated_date) 

223 if date_parsed.tzinfo is None: 

224 date_parsed = date_parsed.replace(tzinfo=datetime.timezone.utc) 

225 

226 now = datetime.datetime.now(datetime.timezone.utc) 

227 

228 delta = relativedelta(now, date_parsed) 

229 years_since_update = delta.years 

230 

231 return years_since_update >= old_threshold_years 

232 except (ValueError, TypeError): 

233 # If we can't parse the date, assume it's not old 

234 return False 

235 

236 

237categories_list = [ 

238 "development", 

239 "games", 

240 "social", 

241 "productivity", 

242 "utilities", 

243 "photo-and-video", 

244 "server-and-cloud", 

245 "security", 

246 "devices-and-iot", 

247 "music-and-audio", 

248 "entertainment", 

249 "art-and-design", 

250] 

251 

252blacklist = ["featured"] 

253 

254 

255def format_category_name(slug): 

256 """Format category name into a standard title format 

257 

258 :param slug: The hypen spaced, lowercase slug to be formatted 

259 :return: The formatted string 

260 """ 

261 return ( 

262 slug.title() 

263 .replace("-", " ") 

264 .replace("And", "and") 

265 .replace("Iot", "IoT") 

266 ) 

267 

268 

269def get_categories(categories_json): 

270 """Retrieve and flatten the nested array from the legacy API response. 

271 

272 :param categories_json: The returned json 

273 :returns: A list of categories 

274 """ 

275 

276 categories = [] 

277 

278 if "categories" in categories_json: 

279 for cat in categories_json["categories"]: 

280 if cat["name"] not in categories_list: 

281 if cat["name"] not in blacklist: 

282 categories_list.append(cat["name"]) 

283 

284 for category in categories_list: 

285 categories.append( 

286 {"slug": category, "name": format_category_name(category)} 

287 ) 

288 

289 return categories 

290 

291 

292def get_snap_categories(snap_categories): 

293 """Retrieve list of categories with names for a snap. 

294 

295 :param snap_categories: List of snap categories from snap info API 

296 :returns: A list of categories with names 

297 """ 

298 categories = [] 

299 

300 for cat in snap_categories: 

301 if cat["name"] not in blacklist: 

302 categories.append( 

303 { 

304 "slug": cat["name"], 

305 "name": format_category_name(cat["name"]), 

306 } 

307 ) 

308 

309 return categories 

310 

311 

312def get_latest_versions( 

313 channel_maps, default_track, lowest_risk, supported_architectures=None 

314): 

315 """Get the latest versions of both default/stable and the latest of 

316 all other channels, unless it's default/stable 

317 

318 :param channel_map: Channel map list 

319 

320 :returns: A tuple of default/stable, track/risk channel map objects 

321 """ 

322 ordered_versions = get_last_updated_versions(channel_maps) 

323 

324 default_stable = None 

325 other = None 

326 for channel in ordered_versions: 

327 if ( 

328 not supported_architectures 

329 or channel["architecture"] in supported_architectures 

330 ): 

331 if ( 

332 channel["track"] == default_track 

333 and channel["risk"] == lowest_risk 

334 ): 

335 if not default_stable: 

336 default_stable = channel 

337 elif not other: 

338 other = channel 

339 

340 if default_stable: 

341 default_stable["released-at-display"] = convert_date( 

342 default_stable["released-at"] 

343 ) 

344 if other: 

345 other["released-at-display"] = convert_date(other["released-at"]) 

346 return default_stable, other 

347 

348 

349def get_revisions(channel_maps: list) -> list: 

350 """Gets a sorted list of unique revisions 

351 

352 :param channel_map: Channel map list 

353 

354 :returns: A sorted list of unique revisions 

355 """ 

356 revisions = {channel_map["revision"] for channel_map in channel_maps} 

357 return list(reversed(sorted(revisions))) 

358 

359 

360def get_last_updated_versions(channel_maps): 

361 """Get all channels in order of updates 

362 

363 :param channel_map: Channel map list 

364 

365 :returns: A list of channels ordered by last updated time 

366 """ 

367 releases = [] 

368 for channel_map in channel_maps: 

369 releases.append(channel_map["channel"]) 

370 

371 return list(reversed(sorted(releases, key=lambda c: c["released-at"]))) 

372 

373 

374def get_last_updated_version(channel_maps): 

375 """Get the oldest channel that was created 

376 

377 :param channel_map: Channel map list 

378 

379 :returns: The latest stable version, if no stable, the latest risk updated 

380 """ 

381 newest_channel = None 

382 for channel_map in channel_maps: 

383 if not newest_channel: 

384 newest_channel = channel_map 

385 else: 

386 if channel_map["channel"]["risk"] == "stable": 

387 newest_channel = channel_map 

388 

389 if newest_channel["channel"]["risk"] == "stable": 

390 break 

391 

392 return newest_channel 

393 

394 

395def has_stable(channel_maps_list): 

396 """Use the channel map to find out if the snap has a stable release 

397 

398 :param channel_maps_list: Channel map list 

399 

400 :returns: True or False 

401 """ 

402 if channel_maps_list: 

403 for arch in channel_maps_list: 

404 for track in channel_maps_list[arch]: 

405 for release in channel_maps_list[arch][track]: 

406 if release["risk"] == "stable": 

407 return True 

408 

409 return False 

410 

411 

412def get_lowest_available_risk(channel_map, track): 

413 """Get the lowest available risk for the default track 

414 

415 :param channel_map: Channel map list 

416 :param track: The track of the channel 

417 

418 :returns: The lowest available risk 

419 """ 

420 risk_order = ["stable", "candidate", "beta", "edge"] 

421 lowest_available_risk = None 

422 for arch in channel_map: 

423 if arch in channel_map and track in channel_map[arch]: 

424 releases = channel_map[arch][track] 

425 for release in releases: 

426 if not lowest_available_risk: 

427 lowest_available_risk = release["risk"] 

428 else: 

429 risk_index = risk_order.index(release["risk"]) 

430 lowest_index = risk_order.index(lowest_available_risk) 

431 if risk_index < lowest_index: 

432 lowest_available_risk = release["risk"] 

433 

434 return lowest_available_risk 

435 

436 

437def extract_info_channel_map(channel_map, track, risk): 

438 """Get the confinement and version for a channel 

439 

440 :param channel_map: Channel map list 

441 :param track: The track of the channel 

442 :param risk: The risk of the channel 

443 

444 :returns: Dict containing confinement and version 

445 """ 

446 context = { 

447 "confinement": None, 

448 "version": None, 

449 } 

450 

451 for arch in channel_map: 

452 if track in channel_map[arch]: 

453 releases = channel_map[arch][track] 

454 for release in releases: 

455 if release["risk"] == risk: 

456 context["confinement"] = release.get("confinement") 

457 context["version"] = release.get("version") 

458 

459 return context 

460 

461 return context 

462 

463 

464def get_video_embed_code(url): 

465 """Get the embed code for videos 

466 

467 :param url: The url of the video 

468 

469 :returns: Embed code 

470 """ 

471 if "youtube" in url: 

472 return { 

473 "type": "youtube", 

474 "url": url.replace("watch?v=", "embed/"), 

475 "id": url.rsplit("?v=", 1)[-1], 

476 } 

477 if "youtu.be" in url: 

478 return { 

479 "type": "youtube", 

480 "url": url.replace("youtu.be/", "youtube.com/embed/"), 

481 "id": url.rsplit("/", 1)[-1], 

482 } 

483 if "vimeo" in url: 

484 return { 

485 "type": "vimeo", 

486 "url": url.replace("vimeo.com/", "player.vimeo.com/video/"), 

487 "id": url.rsplit("/", 1)[-1], 

488 } 

489 if "asciinema" in url: 

490 return { 

491 "type": "asciinema", 

492 "url": url + ".js", 

493 "id": url.rsplit("/", 1)[-1], 

494 } 

495 

496 

497def filter_screenshots(media): 

498 banner_regex = r"/banner(\-icon)?(_.*)?\.(png|jpg)" 

499 

500 return [ 

501 m 

502 for m in media 

503 if m["type"] == "screenshot" and not re.search(banner_regex, m["url"]) 

504 ][:5] 

505 

506 

507def get_video(media): 

508 video = None 

509 for m in media: 

510 if m["type"] == "video": 

511 video = get_video_embed_code(m["url"]) 

512 break 

513 return video 

514 

515 

516def promote_snap_with_icon(snaps): 

517 """Move the first snap with an icon to the front of the list 

518 

519 :param snaps: The list of snaps 

520 

521 :returns: A list of snaps 

522 """ 

523 try: 

524 snap_with_icon = next(snap for snap in snaps if snap["icon_url"] != "") 

525 

526 if snap_with_icon: 

527 snap_with_icon_index = snaps.index(snap_with_icon) 

528 

529 snaps.insert(0, snaps.pop(snap_with_icon_index)) 

530 except StopIteration: 

531 pass 

532 

533 return snaps 

534 

535 

536def get_snap_developer(snap_name): 

537 """Is this a special snap published by Canonical? 

538 Show some developer information 

539 

540 :param snap_name: The name of a snap 

541 

542 :returns: a list of [display_name, url] 

543 

544 """ 

545 filename = "store/content/developers/snaps.yaml" 

546 snaps = helpers.get_yaml(filename, typ="rt") 

547 

548 if snaps and snap_name in snaps: 

549 return snaps[snap_name] 

550 

551 return None