Coverage for webapp/packages/logic.py: 16%

138 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-15 22:43 +0000

1import datetime 

2 

3from flask import make_response 

4from typing import List, Dict, TypedDict, Any, Union 

5 

6from canonicalwebteam.exceptions import StoreApiError 

7from canonicalwebteam.store_api.devicegw import DeviceGW 

8 

9from webapp.endpoints.utils import get_item_details_cache_key 

10from webapp.helpers import get_icon 

11from webapp.helpers import api_session 

12from cache.cache_utility import redis_cache 

13 

14device_gateway = DeviceGW("snap", api_session) 

15 

16Packages = TypedDict( 

17 "Packages", 

18 { 

19 "packages": List[ 

20 Dict[ 

21 str, 

22 Union[Dict[str, Union[str, List[str]]], List[Dict[str, str]]], 

23 ] 

24 ] 

25 }, 

26) 

27 

28Package = TypedDict( 

29 "Package", 

30 { 

31 "package": Dict[ 

32 str, Union[Dict[str, str], List[str], List[Dict[str, str]]] 

33 ] 

34 }, 

35) 

36 

37 

38def fetch_packages(fields: List[str], query_params) -> Packages: 

39 """ 

40 Fetches packages from the store API based on the specified fields. 

41 

42 :param: fields (List[str]): A list of fields to include in the package 

43 data. 

44 :param: query_params: A search query 

45 

46 :returns: a dictionary containing the list of fetched packages. 

47 """ 

48 

49 category = query_params.get("categories", "") 

50 query = query_params.get("q", "") 

51 package_type = query_params.get("type", None) 

52 platform = query_params.get("platforms", "") 

53 architecture = query_params.get("architecture", "") 

54 provides = query_params.get("provides", None) 

55 requires = query_params.get("requires", None) 

56 

57 if package_type == "all": 

58 package_type = None 

59 

60 args = { 

61 "category": category, 

62 "fields": fields, 

63 "query": query, 

64 } 

65 

66 if package_type: 

67 args["type"] = package_type 

68 

69 if provides: 

70 provides = provides.split(",") 

71 args["provides"] = provides 

72 

73 if requires: 

74 requires = requires.split(",") 

75 args["requires"] = requires 

76 

77 packages = device_gateway.find(**args).get("results", []) 

78 

79 if platform and platform != "all": 

80 filtered_packages = [] 

81 for p in packages: 

82 platforms = p["result"].get("deployable-on", []) 

83 if not platforms: 

84 platforms = ["vm"] 

85 if platform in platforms: 

86 filtered_packages.append(p) 

87 packages = filtered_packages 

88 

89 if architecture and architecture != "all": 

90 args["architecture"] = architecture 

91 packages = device_gateway.find(**args).get("results", []) 

92 

93 return packages 

94 

95 

96def fetch_package(package_name: str, fields: List[str]) -> Package: 

97 """ 

98 Fetches a package from the store API based on the specified package name. 

99 

100 :param: package_name (str): The name of the package to fetch. 

101 :param: fields (List[str]): A list of fields to include in the package 

102 

103 :returns: a dictionary containing the fetched package. 

104 """ 

105 get_item_details_key = get_item_details_cache_key(package_name) 

106 cached_package = redis_cache.get(get_item_details_key, expected_type=dict) 

107 if cached_package: 

108 package = cached_package 

109 else: 

110 package = device_gateway.get_item_details( 

111 name=package_name, 

112 fields=fields, 

113 api_version=2, 

114 ) 

115 redis_cache.set(get_item_details_key, package, ttl=300) 

116 response = make_response({"package": package}) 

117 response.cache_control.max_age = 3600 

118 return response.json 

119 

120 

121def parse_package_for_card(package: Dict[str, Any]) -> Package: 

122 """ 

123 Parses a snap and returns the formatted package 

124 based on the given card schema. 

125 

126 :param: package (Dict[str, Any]): The package to be parsed. 

127 :returns: a dictionary containing the formatted package. 

128 

129 note: 

130 - This function has to be refactored to be more generic, 

131 so we won't have to check for the package type before parsing. 

132 

133 """ 

134 resp = { 

135 "package": { 

136 "description": "", 

137 "display_name": "", 

138 "icon_url": "", 

139 "name": "", 

140 "platforms": [], 

141 "type": "", 

142 "channel": { 

143 "name": "", 

144 "risk": "", 

145 "track": "", 

146 }, 

147 }, 

148 "publisher": {"display_name": "", "name": "", "validation": ""}, 

149 "categories": [], 

150 # hardcoded temporarily until we have this data from the API 

151 "ratings": {"value": "0", "count": "0"}, 

152 } 

153 

154 snap = package.get("snap", {}) 

155 publisher = snap.get("publisher", {}) 

156 resp["package"]["description"] = snap.get("summary", "") 

157 resp["package"]["display_name"] = snap.get("title", "") 

158 resp["package"]["type"] = "snap" 

159 resp["package"]["name"] = package.get("name", "") 

160 # platform to be fetched 

161 resp["publisher"]["display_name"] = publisher.get("display-name", "") 

162 resp["publisher"]["name"] = publisher.get("username", "") 

163 resp["publisher"]["validation"] = publisher.get("validation", "") 

164 resp["categories"] = snap.get("categories", []) 

165 resp["package"]["icon_url"] = get_icon(package["snap"]["media"]) 

166 

167 return resp 

168 

169 

170def paginate( 

171 packages: List[Packages], page: int, size: int, total_pages: int 

172) -> List[Packages]: 

173 """ 

174 Paginates a list of packages based on the specified page and size. 

175 

176 :param: packages (List[Packages]): The list of packages to paginate. 

177 :param: page (int): The current page number. 

178 :param: size (int): The number of packages to include in each page. 

179 :param: total_pages (int): The total number of pages. 

180 :returns: a list of paginated packages. 

181 

182 note: 

183 - If the provided page exceeds the total number of pages, the last 

184 page will be returned. 

185 - If the provided page is less than 1, the first page will be returned. 

186 """ 

187 

188 if page > total_pages: 

189 page = total_pages 

190 if page < 1: 

191 page = 1 

192 

193 start = (page - 1) * size 

194 end = start + size 

195 if end > len(packages): 

196 end = len(packages) 

197 

198 return packages[start:end] 

199 

200 

201def get_packages( 

202 fields: List[str], 

203 size: int = 10, 

204 query_params: Dict[str, Any] = {}, 

205) -> List[Dict[str, Any]]: 

206 """ 

207 Retrieves a list of packages from the store based on the specified 

208 parameters.The returned packages are paginated and parsed using the 

209 card schema. 

210 

211 :param: store: The store object. 

212 :param: fields (List[str]): A list of fields to include in the 

213 package data. 

214 :param: size (int, optional): The number of packages to include 

215 in each page. Defaults to 10. 

216 :param: page (int, optional): The current page number. Defaults to 1. 

217 :param: query (str, optional): The search query. 

218 :param: filters (Dict, optional): The filter parameters. Defaults to {}. 

219 :returns: a dictionary containing the list of parsed packages and 

220 the total pages 

221 """ 

222 

223 packages = fetch_packages(fields, query_params) 

224 

225 total_pages = -(len(packages) // -size) 

226 

227 total_pages = -(len(packages) // -size) 

228 total_items = len(packages) 

229 page = int(query_params.get("page", 1)) 

230 packages_per_page = paginate(packages, page, size, total_pages) 

231 parsed_packages = [] 

232 for package in packages_per_page: 

233 parsed_packages.append(parse_package_for_card(package)) 

234 res = parsed_packages 

235 

236 categories = get_store_categories() 

237 

238 return { 

239 "packages": res, 

240 "total_pages": total_pages, 

241 "total_items": total_items, 

242 "categories": categories, 

243 } 

244 

245 

246def format_slug(slug): 

247 """Format category name into a standard title format 

248 

249 :param slug: The hypen spaced, lowercase slug to be formatted 

250 :return: The formatted string 

251 """ 

252 return ( 

253 slug.title() 

254 .replace("-", " ") 

255 .replace("And", "and") 

256 .replace("Iot", "IoT") 

257 ) 

258 

259 

260def parse_categories( 

261 categories_json: Dict[str, List[Dict[str, str]]], 

262) -> List[Dict[str, str]]: 

263 """ 

264 :param categories_json: The returned json from store_api.get_categories() 

265 :returns: A list of categories in the format: 

266 [{"name": "Category", "slug": "category"}] 

267 """ 

268 

269 categories = [] 

270 

271 if "categories" in categories_json: 

272 for category in categories_json["categories"]: 

273 categories.append( 

274 {"slug": category, "name": format_slug(category)} 

275 ) 

276 

277 return categories 

278 

279 

280def get_store_categories() -> List[Dict[str, str]]: 

281 """ 

282 Fetches all store categories. 

283 

284 :returns: A list of categories in the format: 

285 [{"name": "Category", "slug": "category"}] 

286 """ 

287 try: 

288 all_categories = device_gateway.get_categories() 

289 except StoreApiError: 

290 all_categories = [] 

291 

292 for cat in all_categories["categories"]: 

293 cat["display_name"] = format_slug(cat["name"]) 

294 

295 categories = list( 

296 filter( 

297 lambda cat: cat["name"] != "featured", all_categories["categories"] 

298 ) 

299 ) 

300 

301 return categories 

302 

303 

304def get_snaps_account_info(account_info): 

305 """Get snaps from the account information of a user 

306 

307 :param account_info: The account informations 

308 

309 :return: A list of snaps 

310 :return: A list of registred snaps 

311 """ 

312 user_snaps = {} 

313 registered_snaps = {} 

314 if "16" in account_info["snaps"]: 

315 snaps = account_info["snaps"]["16"] 

316 for snap in snaps.keys(): 

317 if snaps[snap]["status"] != "Revoked": 

318 if not snaps[snap]["latest_revisions"]: 

319 registered_snaps[snap] = snaps[snap] 

320 else: 

321 user_snaps[snap] = snaps[snap] 

322 

323 now = datetime.datetime.utcnow() 

324 

325 for snap in user_snaps: 

326 snap_info = user_snaps[snap] 

327 for revision in snap_info["latest_revisions"]: 

328 if len(revision["channels"]) > 0: 

329 snap_info["latest_release"] = revision 

330 break 

331 

332 if len(user_snaps) == 1: 

333 for snap in user_snaps: 

334 snap_info = user_snaps[snap] 

335 revisions = snap_info["latest_revisions"] 

336 

337 revision_since = datetime.datetime.strptime( 

338 revisions[-1]["since"], "%Y-%m-%dT%H:%M:%SZ" 

339 ) 

340 

341 if abs((revision_since - now).days) < 30 and ( 

342 not revisions[0]["channels"] 

343 or revisions[0]["channels"][0] == "edge" 

344 ): 

345 snap_info["is_new"] = True 

346 

347 return user_snaps, registered_snaps 

348 

349 

350def get_package( 

351 package_name: str, 

352 fields: List[str], 

353) -> Package: 

354 """Get a package by name 

355 

356 :param store: The store object. 

357 :param store_name: The name of the store. 

358 :param package_name: The name of the package. 

359 :param fields: The fields to fetch. 

360 

361 :return: A dictionary containing the package. 

362 """ 

363 package = fetch_package(package_name, fields).get("package", {}) 

364 resp = parse_package_for_card(package) 

365 return {"package": resp}