Coverage for webapp / store / logic.py: 73%

195 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2025-12-29 22:06 +0000

1import datetime 

2import random 

3import re 

4from urllib.parse import parse_qs, urlparse 

5 

6import humanize 

7from dateutil import parser 

8from webapp import helpers 

9 

10 

11def get_n_random_snaps(snaps, choice_number): 

12 if len(snaps) > choice_number: 

13 return random.sample(snaps, choice_number) 

14 

15 return snaps 

16 

17 

18def get_snap_banner_url(snap_result): 

19 """Get snaps banner url from media object 

20 

21 :param snap_result: the snap dictionnary 

22 :returns: the snap dict with banner key 

23 """ 

24 for media in snap_result["media"]: 

25 if media["type"] == "banner": 

26 snap_result["banner_url"] = media["url"] 

27 break 

28 

29 return snap_result 

30 

31 

32def get_pages_details(url, links): 

33 """Transform returned navigation links from search API from limit/offset 

34 to size/page 

35 

36 :param url: The url to build 

37 :param links: The links returned by the API 

38 

39 :returns: A dictionnary with all the navigation links 

40 """ 

41 links_result = {} 

42 

43 if "first" in links: 

44 links_result["first"] = convert_navigation_url( 

45 url, links["first"]["href"] 

46 ) 

47 

48 if "last" in links: 

49 links_result["last"] = convert_navigation_url( 

50 url, links["last"]["href"] 

51 ) 

52 

53 if "next" in links: 

54 links_result["next"] = convert_navigation_url( 

55 url, links["next"]["href"] 

56 ) 

57 

58 if "prev" in links: 

59 links_result["prev"] = convert_navigation_url( 

60 url, links["prev"]["href"] 

61 ) 

62 

63 if "self" in links: 

64 links_result["self"] = convert_navigation_url( 

65 url, links["self"]["href"] 

66 ) 

67 

68 return links_result 

69 

70 

71def convert_navigation_url(url, link): 

72 """Convert navigation link from offest/limit to size/page 

73 

74 Example: 

75 - input: http://example.com?q=test&category=finance&size=10&page=3 

76 - output: http://example2.com?q=test&category=finance&limit=10&offset=30 

77 

78 :param url: The new url 

79 :param link: The navigation url returned by the API 

80 

81 :returns: The new navigation link 

82 """ 

83 url_parsed = urlparse(link) 

84 host_url = "{base_url}" "?q={q}&limit={limit}&offset={offset}" 

85 

86 url_queries = parse_qs(url_parsed.query) 

87 

88 if "q" in url_queries: 

89 q = url_queries["q"][0] 

90 else: 

91 q = "" 

92 

93 if "section" in url_queries: 

94 category = url_queries["section"][0] 

95 else: 

96 category = "" 

97 

98 size = int(url_queries["size"][0]) 

99 page = int(url_queries["page"][0]) 

100 

101 url = host_url.format( 

102 base_url=url, q=q, limit=size, offset=size * (page - 1) 

103 ) 

104 

105 if category != "": 

106 url += "&category=" + category 

107 

108 return url 

109 

110 

111def build_pagination_link(snap_searched, snap_category, page): 

112 """Build pagination link 

113 

114 :param snap_searched: Name of the search query 

115 :param snap_category: The category being searched in 

116 :param page: The page of results 

117 

118 :returns: A url string 

119 """ 

120 params = [] 

121 

122 if snap_searched: 

123 params.append("q=" + snap_searched) 

124 

125 if snap_category: 

126 params.append("category=" + snap_category) 

127 

128 if page: 

129 params.append("page=" + str(page)) 

130 

131 return "/search?" + "&".join(params) 

132 

133 

134def convert_channel_maps(channel_map): 

135 """Converts channel maps list to format easier to manipulate 

136 

137 Example: 

138 - Input: 

139 [ 

140 { 

141 'architecture': 'arch' 

142 'map': [{'info': 'release', ...}, ...], 

143 'track': 'track 1' 

144 }, 

145 ... 

146 ] 

147 - Output: 

148 { 

149 'arch': { 

150 'track 1': [{'info': 'release', ...}, ...], 

151 ... 

152 }, 

153 ... 

154 } 

155 

156 :param channel_maps_list: The channel maps list returned by the API 

157 

158 :returns: The channel maps reshaped 

159 """ 

160 channel_map_restruct = {} 

161 

162 for channel in channel_map: 

163 arch = channel.get("channel").get("architecture") 

164 track = channel.get("channel").get("track") 

165 if arch not in channel_map_restruct: 

166 channel_map_restruct[arch] = {} 

167 if track not in channel_map_restruct[arch]: 

168 channel_map_restruct[arch][track] = [] 

169 

170 info = { 

171 "released-at": convert_date(channel["channel"].get("released-at")), 

172 "version": channel.get("version"), 

173 "channel": channel["channel"].get("name"), 

174 "risk": channel["channel"].get("risk"), 

175 "confinement": channel.get("confinement"), 

176 "size": channel["download"].get("size"), 

177 "revision": channel["revision"], 

178 } 

179 

180 channel_map_restruct[arch][track].append(info) 

181 

182 return channel_map_restruct 

183 

184 

185def convert_date(date_to_convert): 

186 """Convert date to human readable format: Month Day Year 

187 

188 If date is less than a day return: today or yesterday 

189 

190 Format of date to convert: 2019-01-12T16:48:41.821037+00:00 

191 Output: Jan 12 2019 

192 

193 :param date_to_convert: Date to convert 

194 :returns: Readable date 

195 """ 

196 local_timezone = datetime.datetime.utcnow().tzinfo 

197 date_parsed = parser.parse(date_to_convert).replace(tzinfo=local_timezone) 

198 delta = datetime.datetime.utcnow() - datetime.timedelta(days=1) 

199 

200 if delta < date_parsed: 

201 return humanize.naturalday(date_parsed).title() 

202 else: 

203 return date_parsed.strftime("%-d %B %Y") 

204 

205 

206categories_list = [ 

207 "development", 

208 "games", 

209 "social", 

210 "productivity", 

211 "utilities", 

212 "photo-and-video", 

213 "server-and-cloud", 

214 "security", 

215 "devices-and-iot", 

216 "music-and-audio", 

217 "entertainment", 

218 "art-and-design", 

219] 

220 

221blacklist = ["featured"] 

222 

223 

224def format_category_name(slug): 

225 """Format category name into a standard title format 

226 

227 :param slug: The hypen spaced, lowercase slug to be formatted 

228 :return: The formatted string 

229 """ 

230 return ( 

231 slug.title() 

232 .replace("-", " ") 

233 .replace("And", "and") 

234 .replace("Iot", "IoT") 

235 ) 

236 

237 

238def get_categories(categories_json): 

239 """Retrieve and flatten the nested array from the legacy API response. 

240 

241 :param categories_json: The returned json 

242 :returns: A list of categories 

243 """ 

244 

245 categories = [] 

246 

247 if "categories" in categories_json: 

248 for cat in categories_json["categories"]: 

249 if cat["name"] not in categories_list: 

250 if cat["name"] not in blacklist: 

251 categories_list.append(cat["name"]) 

252 

253 for category in categories_list: 

254 categories.append( 

255 {"slug": category, "name": format_category_name(category)} 

256 ) 

257 

258 return categories 

259 

260 

261def get_snap_categories(snap_categories): 

262 """Retrieve list of categories with names for a snap. 

263 

264 :param snap_categories: List of snap categories from snap info API 

265 :returns: A list of categories with names 

266 """ 

267 categories = [] 

268 

269 for cat in snap_categories: 

270 if cat["name"] not in blacklist: 

271 categories.append( 

272 { 

273 "slug": cat["name"], 

274 "name": format_category_name(cat["name"]), 

275 } 

276 ) 

277 

278 return categories 

279 

280 

281def get_latest_versions( 

282 channel_maps, default_track, lowest_risk, supported_architectures=None 

283): 

284 """Get the latest versions of both default/stable and the latest of 

285 all other channels, unless it's default/stable 

286 

287 :param channel_map: Channel map list 

288 

289 :returns: A tuple of default/stable, track/risk channel map objects 

290 """ 

291 ordered_versions = get_last_updated_versions(channel_maps) 

292 

293 default_stable = None 

294 other = None 

295 for channel in ordered_versions: 

296 if ( 

297 not supported_architectures 

298 or channel["architecture"] in supported_architectures 

299 ): 

300 if ( 

301 channel["track"] == default_track 

302 and channel["risk"] == lowest_risk 

303 ): 

304 if not default_stable: 

305 default_stable = channel 

306 elif not other: 

307 other = channel 

308 

309 if default_stable: 

310 default_stable["released-at-display"] = convert_date( 

311 default_stable["released-at"] 

312 ) 

313 if other: 

314 other["released-at-display"] = convert_date(other["released-at"]) 

315 return default_stable, other 

316 

317 

318def get_revisions(channel_maps: list) -> list: 

319 """Gets a sorted list of unique revisions 

320 

321 :param channel_map: Channel map list 

322 

323 :returns: A sorted list of unique revisions 

324 """ 

325 revisions = {channel_map["revision"] for channel_map in channel_maps} 

326 return list(reversed(sorted(revisions))) 

327 

328 

329def get_last_updated_versions(channel_maps): 

330 """Get all channels in order of updates 

331 

332 :param channel_map: Channel map list 

333 

334 :returns: A list of channels ordered by last updated time 

335 """ 

336 releases = [] 

337 for channel_map in channel_maps: 

338 releases.append(channel_map["channel"]) 

339 

340 return list(reversed(sorted(releases, key=lambda c: c["released-at"]))) 

341 

342 

343def get_last_updated_version(channel_maps): 

344 """Get the oldest channel that was created 

345 

346 :param channel_map: Channel map list 

347 

348 :returns: The latest stable version, if no stable, the latest risk updated 

349 """ 

350 newest_channel = None 

351 for channel_map in channel_maps: 

352 if not newest_channel: 

353 newest_channel = channel_map 

354 else: 

355 if channel_map["channel"]["risk"] == "stable": 

356 newest_channel = channel_map 

357 

358 if newest_channel["channel"]["risk"] == "stable": 

359 break 

360 

361 return newest_channel 

362 

363 

364def has_stable(channel_maps_list): 

365 """Use the channel map to find out if the snap has a stable release 

366 

367 :param channel_maps_list: Channel map list 

368 

369 :returns: True or False 

370 """ 

371 if channel_maps_list: 

372 for arch in channel_maps_list: 

373 for track in channel_maps_list[arch]: 

374 for release in channel_maps_list[arch][track]: 

375 if release["risk"] == "stable": 

376 return True 

377 

378 return False 

379 

380 

381def get_lowest_available_risk(channel_map, track): 

382 """Get the lowest available risk for the default track 

383 

384 :param channel_map: Channel map list 

385 :param track: The track of the channel 

386 

387 :returns: The lowest available risk 

388 """ 

389 risk_order = ["stable", "candidate", "beta", "edge"] 

390 lowest_available_risk = None 

391 for arch in channel_map: 

392 if arch in channel_map and track in channel_map[arch]: 

393 releases = channel_map[arch][track] 

394 for release in releases: 

395 if not lowest_available_risk: 

396 lowest_available_risk = release["risk"] 

397 else: 

398 risk_index = risk_order.index(release["risk"]) 

399 lowest_index = risk_order.index(lowest_available_risk) 

400 if risk_index < lowest_index: 

401 lowest_available_risk = release["risk"] 

402 

403 return lowest_available_risk 

404 

405 

406def extract_info_channel_map(channel_map, track, risk): 

407 """Get the confinement and version for a channel 

408 

409 :param channel_map: Channel map list 

410 :param track: The track of the channel 

411 :param risk: The risk of the channel 

412 

413 :returns: Dict containing confinement and version 

414 """ 

415 context = { 

416 "confinement": None, 

417 "version": None, 

418 } 

419 

420 for arch in channel_map: 

421 if track in channel_map[arch]: 

422 releases = channel_map[arch][track] 

423 for release in releases: 

424 if release["risk"] == risk: 

425 context["confinement"] = release.get("confinement") 

426 context["version"] = release.get("version") 

427 

428 return context 

429 

430 return context 

431 

432 

433def get_video_embed_code(url): 

434 """Get the embed code for videos 

435 

436 :param url: The url of the video 

437 

438 :returns: Embed code 

439 """ 

440 if "youtube" in url: 

441 return { 

442 "type": "youtube", 

443 "url": url.replace("watch?v=", "embed/"), 

444 "id": url.rsplit("?v=", 1)[-1], 

445 } 

446 if "youtu.be" in url: 

447 return { 

448 "type": "youtube", 

449 "url": url.replace("youtu.be/", "youtube.com/embed/"), 

450 "id": url.rsplit("/", 1)[-1], 

451 } 

452 if "vimeo" in url: 

453 return { 

454 "type": "vimeo", 

455 "url": url.replace("vimeo.com/", "player.vimeo.com/video/"), 

456 "id": url.rsplit("/", 1)[-1], 

457 } 

458 if "asciinema" in url: 

459 return { 

460 "type": "asciinema", 

461 "url": url + ".js", 

462 "id": url.rsplit("/", 1)[-1], 

463 } 

464 

465 

466def filter_screenshots(media): 

467 banner_regex = r"/banner(\-icon)?(_.*)?\.(png|jpg)" 

468 

469 return [ 

470 m 

471 for m in media 

472 if m["type"] == "screenshot" and not re.search(banner_regex, m["url"]) 

473 ][:5] 

474 

475 

476def get_video(media): 

477 video = None 

478 for m in media: 

479 if m["type"] == "video": 

480 video = get_video_embed_code(m["url"]) 

481 break 

482 return video 

483 

484 

485def promote_snap_with_icon(snaps): 

486 """Move the first snap with an icon to the front of the list 

487 

488 :param snaps: The list of snaps 

489 

490 :returns: A list of snaps 

491 """ 

492 try: 

493 snap_with_icon = next(snap for snap in snaps if snap["icon_url"] != "") 

494 

495 if snap_with_icon: 

496 snap_with_icon_index = snaps.index(snap_with_icon) 

497 

498 snaps.insert(0, snaps.pop(snap_with_icon_index)) 

499 except StopIteration: 

500 pass 

501 

502 return snaps 

503 

504 

505def get_snap_developer(snap_name): 

506 """Is this a special snap published by Canonical? 

507 Show some developer information 

508 

509 :param snap_name: The name of a snap 

510 

511 :returns: a list of [display_name, url] 

512 

513 """ 

514 filename = "store/content/developers/snaps.yaml" 

515 snaps = helpers.get_yaml(filename, typ="rt") 

516 

517 if snaps and snap_name in snaps: 

518 return snaps[snap_name] 

519 

520 return None