Coverage for webapp / packages / logic.py: 16%

138 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-22 22:07 +0000

1import datetime 

2 

3from flask import make_response 

4from typing import List, Dict, TypedDict, Any, Union 

5 

6from canonicalwebteam.exceptions import StoreApiError 

7from canonicalwebteam.store_api.devicegw import DeviceGW 

8 

9from webapp.endpoints.utils import get_item_details_cache_key 

10from webapp.helpers import get_icon 

11from webapp.helpers import api_session 

12from cache.cache_utility import redis_cache 

13 

14 

15device_gateway = DeviceGW("snap", api_session) 

16 

17Packages = TypedDict( 

18 "Packages", 

19 { 

20 "packages": List[ 

21 Dict[ 

22 str, 

23 Union[Dict[str, Union[str, List[str]]], List[Dict[str, str]]], 

24 ] 

25 ] 

26 }, 

27) 

28 

29Package = TypedDict( 

30 "Package", 

31 { 

32 "package": Dict[ 

33 str, Union[Dict[str, str], List[str], List[Dict[str, str]]] 

34 ] 

35 }, 

36) 

37 

38 

39def fetch_packages(fields: List[str], query_params) -> Packages: 

40 """ 

41 Fetches packages from the store API based on the specified fields. 

42 

43 :param: fields (List[str]): A list of fields to include in the package 

44 data. 

45 :param: query_params: A search query 

46 

47 :returns: a dictionary containing the list of fetched packages. 

48 """ 

49 

50 category = query_params.get("categories", "") 

51 query = query_params.get("q", "") 

52 package_type = query_params.get("type", None) 

53 platform = query_params.get("platforms", "") 

54 architecture = query_params.get("architecture", "") 

55 provides = query_params.get("provides", None) 

56 requires = query_params.get("requires", None) 

57 

58 if package_type == "all": 

59 package_type = None 

60 

61 args = { 

62 "category": category, 

63 "fields": fields, 

64 "query": query, 

65 } 

66 

67 if package_type: 

68 args["type"] = package_type 

69 

70 if provides: 

71 provides = provides.split(",") 

72 args["provides"] = provides 

73 

74 if requires: 

75 requires = requires.split(",") 

76 args["requires"] = requires 

77 

78 packages = device_gateway.find(**args).get("results", []) 

79 

80 if platform and platform != "all": 

81 filtered_packages = [] 

82 for p in packages: 

83 platforms = p["result"].get("deployable-on", []) 

84 if not platforms: 

85 platforms = ["vm"] 

86 if platform in platforms: 

87 filtered_packages.append(p) 

88 packages = filtered_packages 

89 

90 if architecture and architecture != "all": 

91 args["architecture"] = architecture 

92 packages = device_gateway.find(**args).get("results", []) 

93 

94 return packages 

95 

96 

97def fetch_package(package_name: str, fields: List[str]) -> Package: 

98 """ 

99 Fetches a package from the store API based on the specified package name. 

100 

101 :param: package_name (str): The name of the package to fetch. 

102 :param: fields (List[str]): A list of fields to include in the package 

103 

104 :returns: a dictionary containing the fetched package. 

105 """ 

106 get_item_details_key = get_item_details_cache_key(package_name) 

107 cached_package = redis_cache.get(get_item_details_key, expected_type=dict) 

108 if cached_package: 

109 package = cached_package 

110 else: 

111 package = device_gateway.get_item_details( 

112 name=package_name, 

113 fields=fields, 

114 api_version=2, 

115 ) 

116 redis_cache.set(get_item_details_key, package, ttl=300) 

117 response = make_response({"package": package}) 

118 response.cache_control.max_age = 3600 

119 return response.json 

120 

121 

122def parse_package_for_card(package: Dict[str, Any]) -> Package: 

123 """ 

124 Parses a snap and returns the formatted package 

125 based on the given card schema. 

126 

127 :param: package (Dict[str, Any]): The package to be parsed. 

128 :returns: a dictionary containing the formatted package. 

129 

130 note: 

131 - This function has to be refactored to be more generic, 

132 so we won't have to check for the package type before parsing. 

133 

134 """ 

135 resp = { 

136 "package": { 

137 "description": "", 

138 "display_name": "", 

139 "icon_url": "", 

140 "name": "", 

141 "platforms": [], 

142 "type": "", 

143 "channel": { 

144 "name": "", 

145 "risk": "", 

146 "track": "", 

147 }, 

148 }, 

149 "publisher": {"display_name": "", "name": "", "validation": ""}, 

150 "categories": [], 

151 # hardcoded temporarily until we have this data from the API 

152 "ratings": {"value": "0", "count": "0"}, 

153 } 

154 

155 snap = package.get("snap", {}) 

156 publisher = snap.get("publisher", {}) 

157 resp["package"]["description"] = snap.get("summary", "") 

158 resp["package"]["display_name"] = snap.get("title", "") 

159 resp["package"]["type"] = "snap" 

160 resp["package"]["name"] = package.get("name", "") 

161 # platform to be fetched 

162 resp["publisher"]["display_name"] = publisher.get("display-name", "") 

163 resp["publisher"]["name"] = publisher.get("username", "") 

164 resp["publisher"]["validation"] = publisher.get("validation", "") 

165 resp["categories"] = snap.get("categories", []) 

166 resp["package"]["icon_url"] = get_icon(package["snap"]["media"]) 

167 

168 return resp 

169 

170 

171def paginate( 

172 packages: List[Packages], page: int, size: int, total_pages: int 

173) -> List[Packages]: 

174 """ 

175 Paginates a list of packages based on the specified page and size. 

176 

177 :param: packages (List[Packages]): The list of packages to paginate. 

178 :param: page (int): The current page number. 

179 :param: size (int): The number of packages to include in each page. 

180 :param: total_pages (int): The total number of pages. 

181 :returns: a list of paginated packages. 

182 

183 note: 

184 - If the provided page exceeds the total number of pages, the last 

185 page will be returned. 

186 - If the provided page is less than 1, the first page will be returned. 

187 """ 

188 

189 if page > total_pages: 

190 page = total_pages 

191 if page < 1: 

192 page = 1 

193 

194 start = (page - 1) * size 

195 end = start + size 

196 if end > len(packages): 

197 end = len(packages) 

198 

199 return packages[start:end] 

200 

201 

202def get_packages( 

203 fields: List[str], 

204 size: int = 10, 

205 query_params: Dict[str, Any] = {}, 

206) -> List[Dict[str, Any]]: 

207 """ 

208 Retrieves a list of packages from the store based on the specified 

209 parameters.The returned packages are paginated and parsed using the 

210 card schema. 

211 

212 :param: store: The store object. 

213 :param: fields (List[str]): A list of fields to include in the 

214 package data. 

215 :param: size (int, optional): The number of packages to include 

216 in each page. Defaults to 10. 

217 :param: page (int, optional): The current page number. Defaults to 1. 

218 :param: query (str, optional): The search query. 

219 :param: filters (Dict, optional): The filter parameters. Defaults to {}. 

220 :returns: a dictionary containing the list of parsed packages and 

221 the total pages 

222 """ 

223 

224 packages = fetch_packages(fields, query_params) 

225 

226 total_pages = -(len(packages) // -size) 

227 

228 total_pages = -(len(packages) // -size) 

229 total_items = len(packages) 

230 page = int(query_params.get("page", 1)) 

231 packages_per_page = paginate(packages, page, size, total_pages) 

232 parsed_packages = [] 

233 for package in packages_per_page: 

234 parsed_packages.append(parse_package_for_card(package)) 

235 res = parsed_packages 

236 

237 categories = get_store_categories() 

238 

239 return { 

240 "packages": res, 

241 "total_pages": total_pages, 

242 "total_items": total_items, 

243 "categories": categories, 

244 } 

245 

246 

247def format_slug(slug): 

248 """Format category name into a standard title format 

249 

250 :param slug: The hypen spaced, lowercase slug to be formatted 

251 :return: The formatted string 

252 """ 

253 return ( 

254 slug.title() 

255 .replace("-", " ") 

256 .replace("And", "and") 

257 .replace("Iot", "IoT") 

258 ) 

259 

260 

261def parse_categories( 

262 categories_json: Dict[str, List[Dict[str, str]]] 

263) -> List[Dict[str, str]]: 

264 """ 

265 :param categories_json: The returned json from store_api.get_categories() 

266 :returns: A list of categories in the format: 

267 [{"name": "Category", "slug": "category"}] 

268 """ 

269 

270 categories = [] 

271 

272 if "categories" in categories_json: 

273 for category in categories_json["categories"]: 

274 categories.append( 

275 {"slug": category, "name": format_slug(category)} 

276 ) 

277 

278 return categories 

279 

280 

281def get_store_categories() -> List[Dict[str, str]]: 

282 """ 

283 Fetches all store categories. 

284 

285 :returns: A list of categories in the format: 

286 [{"name": "Category", "slug": "category"}] 

287 """ 

288 try: 

289 all_categories = device_gateway.get_categories() 

290 except StoreApiError: 

291 all_categories = [] 

292 

293 for cat in all_categories["categories"]: 

294 cat["display_name"] = format_slug(cat["name"]) 

295 

296 categories = list( 

297 filter( 

298 lambda cat: cat["name"] != "featured", all_categories["categories"] 

299 ) 

300 ) 

301 

302 return categories 

303 

304 

305def get_snaps_account_info(account_info): 

306 """Get snaps from the account information of a user 

307 

308 :param account_info: The account informations 

309 

310 :return: A list of snaps 

311 :return: A list of registred snaps 

312 """ 

313 user_snaps = {} 

314 registered_snaps = {} 

315 if "16" in account_info["snaps"]: 

316 snaps = account_info["snaps"]["16"] 

317 for snap in snaps.keys(): 

318 if snaps[snap]["status"] != "Revoked": 

319 if not snaps[snap]["latest_revisions"]: 

320 registered_snaps[snap] = snaps[snap] 

321 else: 

322 user_snaps[snap] = snaps[snap] 

323 

324 now = datetime.datetime.utcnow() 

325 

326 for snap in user_snaps: 

327 snap_info = user_snaps[snap] 

328 for revision in snap_info["latest_revisions"]: 

329 if len(revision["channels"]) > 0: 

330 snap_info["latest_release"] = revision 

331 break 

332 

333 if len(user_snaps) == 1: 

334 for snap in user_snaps: 

335 snap_info = user_snaps[snap] 

336 revisions = snap_info["latest_revisions"] 

337 

338 revision_since = datetime.datetime.strptime( 

339 revisions[-1]["since"], "%Y-%m-%dT%H:%M:%SZ" 

340 ) 

341 

342 if abs((revision_since - now).days) < 30 and ( 

343 not revisions[0]["channels"] 

344 or revisions[0]["channels"][0] == "edge" 

345 ): 

346 snap_info["is_new"] = True 

347 

348 return user_snaps, registered_snaps 

349 

350 

351def get_package( 

352 package_name: str, 

353 fields: List[str], 

354) -> Package: 

355 """Get a package by name 

356 

357 :param store: The store object. 

358 :param store_name: The name of the store. 

359 :param package_name: The name of the package. 

360 :param fields: The fields to fetch. 

361 

362 :return: A dictionary containing the package. 

363 """ 

364 package = fetch_package(package_name, fields).get("package", {}) 

365 resp = parse_package_for_card(package) 

366 return {"package": resp}