Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import datetime 

2 

3import talisker 

4from flask import make_response 

5from typing import List, Dict, TypedDict, Any, Union 

6 

7from canonicalwebteam.store_api.exceptions import StoreApiError 

8 

9from webapp.helpers import get_icon 

10 

11 

12Packages = TypedDict( 

13 "Packages", 

14 { 

15 "packages": List[ 

16 Dict[ 

17 str, 

18 Union[Dict[str, Union[str, List[str]]], List[Dict[str, str]]], 

19 ] 

20 ] 

21 }, 

22) 

23 

24Package = TypedDict( 

25 "Package", 

26 { 

27 "package": Dict[ 

28 str, Union[Dict[str, str], List[str], List[Dict[str, str]]] 

29 ] 

30 }, 

31) 

32 

33 

34def fetch_packages(store_api, fields: List[str], query_params) -> Packages: 

35 """ 

36 Fetches packages from the store API based on the specified fields. 

37 

38 :param: store_api: The specific store API object. 

39 :param: fields (List[str]): A list of fields to include in the package 

40 data. 

41 :param: query_params: A search query 

42 

43 :returns: a dictionary containing the list of fetched packages. 

44 """ 

45 store = store_api(talisker.requests.get_session()) 

46 

47 category = query_params.get("categories", "") 

48 query = query_params.get("q", "") 

49 package_type = query_params.get("type", None) 

50 platform = query_params.get("platforms", "") 

51 architecture = query_params.get("architecture", "") 

52 provides = query_params.get("provides", None) 

53 requires = query_params.get("requires", None) 

54 

55 if package_type == "all": 

56 package_type = None 

57 

58 args = { 

59 "category": category, 

60 "fields": fields, 

61 "query": query, 

62 } 

63 

64 if package_type: 

65 args["type"] = package_type 

66 

67 if provides: 

68 provides = provides.split(",") 

69 args["provides"] = provides 

70 

71 if requires: 

72 requires = requires.split(",") 

73 args["requires"] = requires 

74 

75 packages = store.find(**args).get("results", []) 

76 

77 if platform and platform != "all": 

78 filtered_packages = [] 

79 for p in packages: 

80 platforms = p["result"].get("deployable-on", []) 

81 if not platforms: 

82 platforms = ["vm"] 

83 if platform in platforms: 

84 filtered_packages.append(p) 

85 packages = filtered_packages 

86 

87 if architecture and architecture != "all": 

88 args["architecture"] = architecture 

89 packages = store.find(**args).get("results", []) 

90 

91 return packages 

92 

93 

94def fetch_package(store_api, package_name: str, fields: List[str]) -> Package: 

95 """ 

96 Fetches a package from the store API based on the specified package name. 

97 

98 :param: store_api: The specific store API object. 

99 :param: package_name (str): The name of the package to fetch. 

100 :param: fields (List[str]): A list of fields to include in the package 

101 

102 :returns: a dictionary containing the fetched package. 

103 """ 

104 store = store_api(talisker.requests.get_session()) 

105 package = store.get_item_details( 

106 name=package_name, 

107 fields=fields, 

108 api_version=2, 

109 ) 

110 response = make_response({"package": package}) 

111 response.cache_control.max_age = 3600 

112 return response.json 

113 

114 

115def parse_package_for_card( 

116 package: Dict[str, Any], 

117 store_name: str, 

118 store_api: Any, 

119 publisher_api: Any, 

120 libraries: bool = False, 

121) -> Package: 

122 """ 

123 Parses a snap and returns the formatted package 

124 based on the given card schema. 

125 

126 :param: package (Dict[str, Any]): The package to be parsed. 

127 :returns: a dictionary containing the formatted package. 

128 

129 note: 

130 - This function has to be refactored to be more generic, 

131 so we won't have to check for the package type before parsing. 

132 

133 """ 

134 publisher_api = publisher_api(talisker.requests.get_session()) 

135 resp = { 

136 "package": { 

137 "description": "", 

138 "display_name": "", 

139 "icon_url": "", 

140 "name": "", 

141 "platforms": [], 

142 "type": "", 

143 "channel": { 

144 "name": "", 

145 "risk": "", 

146 "track": "", 

147 }, 

148 }, 

149 "publisher": {"display_name": "", "name": "", "validation": ""}, 

150 "categories": [], 

151 # hardcoded temporarily until we have this data from the API 

152 "ratings": {"value": "0", "count": "0"}, 

153 } 

154 

155 snap = package.get("snap", {}) 

156 publisher = snap.get("publisher", {}) 

157 resp["package"]["description"] = snap.get("summary", "") 

158 resp["package"]["display_name"] = snap.get("title", "") 

159 resp["package"]["type"] = "snap" 

160 resp["package"]["name"] = package.get("name", "") 

161 # platform to be fetched 

162 resp["publisher"]["display_name"] = publisher.get("display-name", "") 

163 resp["publisher"]["name"] = publisher.get("username", "") 

164 resp["publisher"]["validation"] = publisher.get("validation", "") 

165 resp["categories"] = snap.get("categories", []) 

166 resp["package"]["icon_url"] = get_icon(package["snap"]["media"]) 

167 

168 return resp 

169 

170 

171def paginate( 

172 packages: List[Packages], page: int, size: int, total_pages: int 

173) -> List[Packages]: 

174 """ 

175 Paginates a list of packages based on the specified page and size. 

176 

177 :param: packages (List[Packages]): The list of packages to paginate. 

178 :param: page (int): The current page number. 

179 :param: size (int): The number of packages to include in each page. 

180 :param: total_pages (int): The total number of pages. 

181 :returns: a list of paginated packages. 

182 

183 note: 

184 - If the provided page exceeds the total number of pages, the last 

185 page will be returned. 

186 - If the provided page is less than 1, the first page will be returned. 

187 """ 

188 

189 if page > total_pages: 

190 page = total_pages 

191 if page < 1: 

192 page = 1 

193 

194 start = (page - 1) * size 

195 end = start + size 

196 if end > len(packages): 

197 end = len(packages) 

198 

199 return packages[start:end] 

200 

201 

202def get_packages( 

203 store, 

204 publisher: Any, 

205 store_name: str, 

206 libraries: bool, 

207 fields: List[str], 

208 size: int = 10, 

209 query_params: Dict[str, Any] = {}, 

210) -> List[Dict[str, Any]]: 

211 """ 

212 Retrieves a list of packages from the store based on the specified 

213 parameters.The returned packages are paginated and parsed using the 

214 card schema. 

215 

216 :param: store: The store object. 

217 :param: fields (List[str]): A list of fields to include in the 

218 package data. 

219 :param: size (int, optional): The number of packages to include 

220 in each page. Defaults to 10. 

221 :param: page (int, optional): The current page number. Defaults to 1. 

222 :param: query (str, optional): The search query. 

223 :param: filters (Dict, optional): The filter parameters. Defaults to {}. 

224 :returns: a dictionary containing the list of parsed packages and 

225 the total pages 

226 """ 

227 

228 packages = fetch_packages(store, fields, query_params) 

229 

230 total_pages = -(len(packages) // -size) 

231 

232 total_pages = -(len(packages) // -size) 

233 total_items = len(packages) 

234 page = int(query_params.get("page", 1)) 

235 packages_per_page = paginate(packages, page, size, total_pages) 

236 parsed_packages = [] 

237 for package in packages_per_page: 

238 parsed_packages.append( 

239 parse_package_for_card( 

240 package, store_name, store, publisher, libraries 

241 ) 

242 ) 

243 res = parsed_packages 

244 

245 categories = get_store_categories(store) 

246 

247 return { 

248 "packages": res, 

249 "total_pages": total_pages, 

250 "total_items": total_items, 

251 "categories": categories, 

252 } 

253 

254 

255def format_slug(slug): 

256 """Format category name into a standard title format 

257 

258 :param slug: The hypen spaced, lowercase slug to be formatted 

259 :return: The formatted string 

260 """ 

261 return ( 

262 slug.title() 

263 .replace("-", " ") 

264 .replace("And", "and") 

265 .replace("Iot", "IoT") 

266 ) 

267 

268 

269def parse_categories( 

270 categories_json: Dict[str, List[Dict[str, str]]] 

271) -> List[Dict[str, str]]: 

272 """ 

273 :param categories_json: The returned json from store_api.get_categories() 

274 :returns: A list of categories in the format: 

275 [{"name": "Category", "slug": "category"}] 

276 """ 

277 

278 categories = [] 

279 

280 if "categories" in categories_json: 

281 for category in categories_json["categories"]: 

282 categories.append( 

283 {"slug": category, "name": format_slug(category)} 

284 ) 

285 

286 return categories 

287 

288 

289def get_store_categories(store_api) -> List[Dict[str, str]]: 

290 """ 

291 Fetches all store categories. 

292 

293 :param: store_api: The store API object used to fetch the categories. 

294 :returns: A list of categories in the format: 

295 [{"name": "Category", "slug": "category"}] 

296 """ 

297 store = store_api(talisker.requests.get_session()) 

298 try: 

299 all_categories = store.get_categories() 

300 except StoreApiError: 

301 all_categories = [] 

302 

303 for cat in all_categories["categories"]: 

304 cat["display_name"] = format_slug(cat["name"]) 

305 

306 categories = list( 

307 filter( 

308 lambda cat: cat["name"] != "featured", all_categories["categories"] 

309 ) 

310 ) 

311 

312 return categories 

313 

314 

315def get_snaps_account_info(account_info): 

316 """Get snaps from the account information of a user 

317 

318 :param account_info: The account informations 

319 

320 :return: A list of snaps 

321 :return: A list of registred snaps 

322 """ 

323 user_snaps = {} 

324 registered_snaps = {} 

325 if "16" in account_info["snaps"]: 

326 snaps = account_info["snaps"]["16"] 

327 for snap in snaps.keys(): 

328 if snaps[snap]["status"] != "Revoked": 

329 if not snaps[snap]["latest_revisions"]: 

330 registered_snaps[snap] = snaps[snap] 

331 else: 

332 user_snaps[snap] = snaps[snap] 

333 

334 now = datetime.datetime.utcnow() 

335 

336 for snap in user_snaps: 

337 snap_info = user_snaps[snap] 

338 for revision in snap_info["latest_revisions"]: 

339 if len(revision["channels"]) > 0: 

340 snap_info["latest_release"] = revision 

341 break 

342 

343 if len(user_snaps) == 1: 

344 for snap in user_snaps: 

345 snap_info = user_snaps[snap] 

346 revisions = snap_info["latest_revisions"] 

347 

348 revision_since = datetime.datetime.strptime( 

349 revisions[-1]["since"], "%Y-%m-%dT%H:%M:%SZ" 

350 ) 

351 

352 if abs((revision_since - now).days) < 30 and ( 

353 not revisions[0]["channels"] 

354 or revisions[0]["channels"][0] == "edge" 

355 ): 

356 snap_info["is_new"] = True 

357 

358 return user_snaps, registered_snaps 

359 

360 

361def get_package( 

362 store, 

363 publisher_api, 

364 store_name: str, 

365 package_name: str, 

366 fields: List[str], 

367 libraries: bool, 

368) -> Package: 

369 """Get a package by name 

370 

371 :param store: The store object. 

372 :param store_name: The name of the store. 

373 :param package_name: The name of the package. 

374 :param fields: The fields to fetch. 

375 

376 :return: A dictionary containing the package. 

377 """ 

378 package = fetch_package(store, package_name, fields).get("package", {}) 

379 resp = parse_package_for_card( 

380 package, store_name, store, publisher_api, libraries 

381 ) 

382 return {"package": resp}