Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import datetime 

2import random 

3import re 

4from urllib.parse import parse_qs, urlparse 

5 

6import humanize 

7from dateutil import parser 

8from webapp import helpers 

9 

10 

11def get_n_random_snaps(snaps, choice_number): 

12 if len(snaps) > choice_number: 

13 return random.sample(snaps, choice_number) 

14 

15 return snaps 

16 

17 

18def get_snap_banner_url(snap_result): 

19 """Get snaps banner url from media object 

20 

21 :param snap_result: the snap dictionnary 

22 :returns: the snap dict with banner key 

23 """ 

24 for media in snap_result["media"]: 

25 if media["type"] == "banner": 

26 snap_result["banner_url"] = media["url"] 

27 break 

28 

29 return snap_result 

30 

31 

32def get_pages_details(url, links): 

33 """Transform returned navigation links from search API from limit/offset 

34 to size/page 

35 

36 :param url: The url to build 

37 :param links: The links returned by the API 

38 

39 :returns: A dictionnary with all the navigation links 

40 """ 

41 links_result = {} 

42 

43 if "first" in links: 

44 links_result["first"] = convert_navigation_url( 

45 url, links["first"]["href"] 

46 ) 

47 

48 if "last" in links: 

49 links_result["last"] = convert_navigation_url( 

50 url, links["last"]["href"] 

51 ) 

52 

53 if "next" in links: 

54 links_result["next"] = convert_navigation_url( 

55 url, links["next"]["href"] 

56 ) 

57 

58 if "prev" in links: 

59 links_result["prev"] = convert_navigation_url( 

60 url, links["prev"]["href"] 

61 ) 

62 

63 if "self" in links: 

64 links_result["self"] = convert_navigation_url( 

65 url, links["self"]["href"] 

66 ) 

67 

68 return links_result 

69 

70 

71def convert_navigation_url(url, link): 

72 """Convert navigation link from offest/limit to size/page 

73 

74 Example: 

75 - input: http://example.com?q=test&category=finance&size=10&page=3 

76 - output: http://example2.com?q=test&category=finance&limit=10&offset=30 

77 

78 :param url: The new url 

79 :param link: The navigation url returned by the API 

80 

81 :returns: The new navigation link 

82 """ 

83 url_parsed = urlparse(link) 

84 host_url = "{base_url}" "?q={q}&limit={limit}&offset={offset}" 

85 

86 url_queries = parse_qs(url_parsed.query) 

87 

88 if "q" in url_queries: 

89 q = url_queries["q"][0] 

90 else: 

91 q = "" 

92 

93 if "section" in url_queries: 

94 category = url_queries["section"][0] 

95 else: 

96 category = "" 

97 

98 size = int(url_queries["size"][0]) 

99 page = int(url_queries["page"][0]) 

100 

101 url = host_url.format( 

102 base_url=url, q=q, limit=size, offset=size * (page - 1) 

103 ) 

104 

105 if category != "": 

106 url += "&category=" + category 

107 

108 return url 

109 

110 

111def build_pagination_link(snap_searched, snap_category, page): 

112 """Build pagination link 

113 

114 :param snap_searched: Name of the search query 

115 :param snap_category: The category being searched in 

116 :param page: The page of results 

117 

118 :returns: A url string 

119 """ 

120 params = [] 

121 

122 if snap_searched: 

123 params.append("q=" + snap_searched) 

124 

125 if snap_category: 

126 params.append("category=" + snap_category) 

127 

128 if page: 

129 params.append("page=" + str(page)) 

130 

131 return "/search?" + "&".join(params) 

132 

133 

134def convert_channel_maps(channel_map): 

135 """Converts channel maps list to format easier to manipulate 

136 

137 Example: 

138 - Input: 

139 [ 

140 { 

141 'architecture': 'arch' 

142 'map': [{'info': 'release', ...}, ...], 

143 'track': 'track 1' 

144 }, 

145 ... 

146 ] 

147 - Output: 

148 { 

149 'arch': { 

150 'track 1': [{'info': 'release', ...}, ...], 

151 ... 

152 }, 

153 ... 

154 } 

155 

156 :param channel_maps_list: The channel maps list returned by the API 

157 

158 :returns: The channel maps reshaped 

159 """ 

160 channel_map_restruct = {} 

161 

162 for channel in channel_map: 

163 arch = channel.get("channel").get("architecture") 

164 track = channel.get("channel").get("track") 

165 if arch not in channel_map_restruct: 

166 channel_map_restruct[arch] = {} 

167 if track not in channel_map_restruct[arch]: 

168 channel_map_restruct[arch][track] = [] 

169 

170 info = { 

171 "released-at": convert_date(channel["channel"].get("released-at")), 

172 "version": channel.get("version"), 

173 "channel": channel["channel"].get("name"), 

174 "risk": channel["channel"].get("risk"), 

175 "confinement": channel.get("confinement"), 

176 "size": channel["download"].get("size"), 

177 } 

178 

179 channel_map_restruct[arch][track].append(info) 

180 

181 return channel_map_restruct 

182 

183 

184def convert_date(date_to_convert): 

185 """Convert date to human readable format: Month Day Year 

186 

187 If date is less than a day return: today or yesterday 

188 

189 Format of date to convert: 2019-01-12T16:48:41.821037+00:00 

190 Output: Jan 12 2019 

191 

192 :param date_to_convert: Date to convert 

193 :returns: Readable date 

194 """ 

195 local_timezone = datetime.datetime.utcnow().tzinfo 

196 date_parsed = parser.parse(date_to_convert).replace(tzinfo=local_timezone) 

197 delta = datetime.datetime.utcnow() - datetime.timedelta(days=1) 

198 

199 if delta < date_parsed: 

200 return humanize.naturalday(date_parsed).title() 

201 else: 

202 return date_parsed.strftime("%-d %B %Y") 

203 

204 

205categories_list = [ 

206 "development", 

207 "games", 

208 "social", 

209 "productivity", 

210 "utilities", 

211 "photo-and-video", 

212 "server-and-cloud", 

213 "security", 

214 "devices-and-iot", 

215 "music-and-audio", 

216 "entertainment", 

217 "art-and-design", 

218] 

219 

220blacklist = ["featured"] 

221 

222 

223def format_category_name(slug): 

224 """Format category name into a standard title format 

225 

226 :param slug: The hypen spaced, lowercase slug to be formatted 

227 :return: The formatted string 

228 """ 

229 return ( 

230 slug.title() 

231 .replace("-", " ") 

232 .replace("And", "and") 

233 .replace("Iot", "IoT") 

234 ) 

235 

236 

237def get_categories(categories_json): 

238 """Retrieve and flatten the nested array from the legacy API response. 

239 

240 :param categories_json: The returned json 

241 :returns: A list of categories 

242 """ 

243 

244 categories = [] 

245 

246 if "categories" in categories_json: 

247 for cat in categories_json["categories"]: 

248 if cat["name"] not in categories_list: 

249 if cat["name"] not in blacklist: 

250 categories_list.append(cat["name"]) 

251 

252 for category in categories_list: 

253 categories.append( 

254 {"slug": category, "name": format_category_name(category)} 

255 ) 

256 

257 return categories 

258 

259 

260def get_snap_categories(snap_categories): 

261 """Retrieve list of categories with names for a snap. 

262 

263 :param snap_categories: List of snap categories from snap info API 

264 :returns: A list of categories with names 

265 """ 

266 categories = [] 

267 

268 for cat in snap_categories: 

269 if cat["name"] not in blacklist: 

270 categories.append( 

271 { 

272 "slug": cat["name"], 

273 "name": format_category_name(cat["name"]), 

274 } 

275 ) 

276 

277 return categories 

278 

279 

280def get_latest_versions(channel_maps, default_track, lowest_risk): 

281 """Get the latest versions of both default/stable and the latest of 

282 all other channels, unless it's default/stable 

283 

284 :param channel_map: Channel map list 

285 

286 :returns: A tuple of default/stable, track/risk channel map objects 

287 """ 

288 ordered_versions = get_last_updated_versions(channel_maps) 

289 

290 default_stable = None 

291 other = None 

292 

293 for channel in ordered_versions: 

294 if ( 

295 channel["track"] == default_track 

296 and channel["risk"] == lowest_risk 

297 ): 

298 if not default_stable: 

299 default_stable = channel 

300 elif not other: 

301 other = channel 

302 

303 if default_stable: 

304 default_stable["released-at-display"] = convert_date( 

305 default_stable["released-at"] 

306 ) 

307 if other: 

308 other["released-at-display"] = convert_date(other["released-at"]) 

309 return default_stable, other 

310 

311 

312def get_last_updated_versions(channel_maps): 

313 """Get all channels in order of updates 

314 

315 :param channel_map: Channel map list 

316 

317 :returns: A list of channels ordered by last updated time 

318 """ 

319 releases = [] 

320 for channel_map in channel_maps: 

321 releases.append(channel_map["channel"]) 

322 

323 return list(reversed(sorted(releases, key=lambda c: c["released-at"]))) 

324 

325 

326def get_last_updated_version(channel_maps): 

327 """Get the oldest channel that was created 

328 

329 :param channel_map: Channel map list 

330 

331 :returns: The latest stable version, if no stable, the latest risk updated 

332 """ 

333 newest_channel = None 

334 for channel_map in channel_maps: 

335 if not newest_channel: 

336 newest_channel = channel_map 

337 else: 

338 if channel_map["channel"]["risk"] == "stable": 

339 newest_channel = channel_map 

340 

341 if newest_channel["channel"]["risk"] == "stable": 

342 break 

343 

344 return newest_channel 

345 

346 

347def has_stable(channel_maps_list): 

348 """Use the channel map to find out if the snap has a stable release 

349 

350 :param channel_maps_list: Channel map list 

351 

352 :returns: True or False 

353 """ 

354 if channel_maps_list: 

355 for arch in channel_maps_list: 

356 for track in channel_maps_list[arch]: 

357 for release in channel_maps_list[arch][track]: 

358 if release["risk"] == "stable": 

359 return True 

360 

361 return False 

362 

363 

364def get_lowest_available_risk(channel_map, track): 

365 """Get the lowest available risk for the default track 

366 

367 :param channel_map: Channel map list 

368 :param track: The track of the channel 

369 

370 :returns: The lowest available risk 

371 """ 

372 risk_order = ["stable", "candidate", "beta", "edge"] 

373 lowest_available_risk = None 

374 for arch in channel_map: 

375 if arch in channel_map and track in channel_map[arch]: 

376 releases = channel_map[arch][track] 

377 for release in releases: 

378 if not lowest_available_risk: 

379 lowest_available_risk = release["risk"] 

380 else: 

381 risk_index = risk_order.index(release["risk"]) 

382 lowest_index = risk_order.index(lowest_available_risk) 

383 if risk_index < lowest_index: 

384 lowest_available_risk = release["risk"] 

385 

386 return lowest_available_risk 

387 

388 

389def extract_info_channel_map(channel_map, track, risk): 

390 """Get the confinement and version for a channel 

391 

392 :param channel_map: Channel map list 

393 :param track: The track of the channel 

394 :param risk: The risk of the channel 

395 

396 :returns: Dict containing confinement and version 

397 """ 

398 context = { 

399 "confinement": None, 

400 "version": None, 

401 } 

402 

403 for arch in channel_map: 

404 if track in channel_map[arch]: 

405 releases = channel_map[arch][track] 

406 for release in releases: 

407 if release["risk"] == risk: 

408 context["confinement"] = release.get("confinement") 

409 context["version"] = release.get("version") 

410 

411 return context 

412 

413 return context 

414 

415 

416def get_video_embed_code(url): 

417 """Get the embed code for videos 

418 

419 :param url: The url of the video 

420 

421 :returns: Embed code 

422 """ 

423 if "youtube" in url: 

424 return { 

425 "type": "youtube", 

426 "url": url.replace("watch?v=", "embed/"), 

427 "id": url.rsplit("?v=", 1)[-1], 

428 } 

429 if "youtu.be" in url: 

430 return { 

431 "type": "youtube", 

432 "url": url.replace("youtu.be/", "youtube.com/embed/"), 

433 "id": url.rsplit("/", 1)[-1], 

434 } 

435 if "vimeo" in url: 

436 return { 

437 "type": "vimeo", 

438 "url": url.replace("vimeo.com/", "player.vimeo.com/video/"), 

439 "id": url.rsplit("/", 1)[-1], 

440 } 

441 if "asciinema" in url: 

442 return { 

443 "type": "asciinema", 

444 "url": url + ".js", 

445 "id": url.rsplit("/", 1)[-1], 

446 } 

447 

448 

449def filter_screenshots(media): 

450 banner_regex = r"/banner(\-icon)?(_.*)?\.(png|jpg)" 

451 

452 return [ 

453 m 

454 for m in media 

455 if m["type"] == "screenshot" and not re.search(banner_regex, m["url"]) 

456 ][:5] 

457 

458 

459def get_video(media): 

460 video = None 

461 for m in media: 

462 if m["type"] == "video": 

463 video = get_video_embed_code(m["url"]) 

464 break 

465 return video 

466 

467 

468def promote_snap_with_icon(snaps): 

469 """Move the first snap with an icon to the front of the list 

470 

471 :param snaps: The list of snaps 

472 

473 :returns: A list of snaps 

474 """ 

475 try: 

476 snap_with_icon = next(snap for snap in snaps if snap["icon_url"] != "") 

477 

478 if snap_with_icon: 

479 snap_with_icon_index = snaps.index(snap_with_icon) 

480 

481 snaps.insert(0, snaps.pop(snap_with_icon_index)) 

482 except StopIteration: 

483 pass 

484 

485 return snaps 

486 

487 

488def get_snap_developer(snap_name): 

489 """Is this a special snap published by Canonical? 

490 Show some developer information 

491 

492 :param snap_name: The name of a snap 

493 

494 :returns: a list of [display_name, url] 

495 

496 """ 

497 filename = "store/content/developers/snaps.yaml" 

498 snaps = helpers.get_yaml(filename, typ="rt") 

499 

500 if snaps and snap_name in snaps: 

501 return snaps[snap_name] 

502 

503 return None