Coverage for webapp/store/logic.py: 90%

116 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-17 22:07 +0000

1import humanize 

2from typing import List, Dict, TypedDict, Any, Union 

3import datetime 

4from dateutil import parser 

5 

6from canonicalwebteam.store_api.devicegw import DeviceGW 

7 

8from webapp.helpers import get_yaml_loader, is_date_format 

9from cache.cache_utility import redis_cache as cache 

10 

11ARCHITECTURES = ["amd64", "arm64", "ppc64el", "riscv64", "s390x"] 

12FIND_FIELDS = [ 

13 "contact", 

14 "description", 

15 "license", 

16 "summary", 

17 "title", 

18 "website", 

19 "publisher", 

20 "categories", 

21 "links", 

22] 

23DETAILS_FIELDS = [ 

24 "categories", 

25 "contact", 

26 "description", 

27 "license", 

28 "links", 

29 "media", 

30 "private", 

31 "publisher", 

32 "summary", 

33 "title", 

34 "website", 

35 "created-at", 

36 "download", 

37 "version", 

38 "revision", 

39 "channel-map", 

40] 

41 

42yaml = get_yaml_loader() 

43device_gw = DeviceGW("rock", staging=True) 

44 

45Packages = TypedDict( 

46 "Packages", 

47 { 

48 "packages": List[ 

49 Dict[ 

50 str, 

51 Union[Dict[str, Union[str, List[str]]], List[Dict[str, str]]], 

52 ] 

53 ] 

54 }, 

55) 

56 

57Package = TypedDict( 

58 "Package", 

59 { 

60 "package": Dict[ 

61 str, Union[Dict[str, str], List[str], List[Dict[str, str]]] 

62 ] 

63 }, 

64) 

65 

66 

67def convert_date(date_to_convert): 

68 """ 

69 Convert a datetime string to a human-readable string. 

70 (e.g. 'Today', 'Yesterday', or '12 Jan 2023'). 

71 """ 

72 date_parsed = parser.parse(date_to_convert).replace(tzinfo=None) 

73 delta = datetime.datetime.now() - datetime.timedelta(days=1) 

74 if delta < date_parsed: 

75 return humanize.naturalday(date_parsed).title() 

76 else: 

77 return date_parsed.strftime("%d %b %Y") 

78 

79 

80def format_relative_date(date_str: str) -> str: 

81 """ 

82 Return a relative time string from an ISO datetime string. 

83 (e.g. '2 weeks ago', '25 Apr 2025'). 

84 """ 

85 try: 

86 given_date = datetime.datetime.fromisoformat(date_str) 

87 now = datetime.datetime.now(datetime.timezone.utc) 

88 delta = now - given_date 

89 

90 if delta.days < 0: 

91 return "in the future" 

92 elif delta.days == 0: 

93 return "today" 

94 elif delta.days == 1: 

95 return "yesterday" 

96 elif delta.days < 7: 

97 return f"{delta.days} days ago" 

98 elif delta.days < 30: 

99 weeks = delta.days // 7 

100 return f"{weeks} week{'s' if weeks > 1 else ''} ago" 

101 elif delta.days < 90: 

102 months = delta.days // 30 

103 return f"{months} month{'s' if months > 1 else ''} ago" 

104 else: 

105 return given_date.strftime("%d %b %Y") 

106 

107 except Exception as e: 

108 return f"Invalid date: {e}" 

109 

110 

111def get_icons(package): 

112 """ 

113 Extracts a list of icon URLs from the package metadata. 

114 """ 

115 media = package["result"]["media"] 

116 return [m["url"] for m in media if m["type"] == "icon"] 

117 

118 

119def format_slug(slug): 

120 """ 

121 Converts a slug to a title-like string. 

122 (e.g. 'my-rock-name' to 'My Rock Name'). 

123 """ 

124 return ( 

125 slug.title() 

126 .replace("-", " ") 

127 .replace("_", " ") 

128 .replace("And", "and") 

129 .replace("Iot", "IoT") 

130 ) 

131 

132 

133def get_icon(media): 

134 icons = [m["url"] for m in media if m["type"] == "icon"] 

135 if len(icons) > 0: 

136 return icons[0] 

137 return "" 

138 

139 

140def parse_package_for_card( 

141 package: Dict[str, Any], 

142) -> Package: 

143 """ 

144 Parses a package dictionary into a simplified schema for card display. 

145 """ 

146 resp = { 

147 "package": { 

148 "summary": "", 

149 "display_name": "", 

150 "icon_url": "", 

151 "name": "", 

152 "platforms": [], 

153 "website": "", 

154 "contact": "", 

155 "support": "", 

156 "cves": "", 

157 "last_updated": "", 

158 }, 

159 "publisher": {"display_name": "", "name": "", "validation": ""}, 

160 } 

161 

162 metadata = package.get("metadata", {}) 

163 publisher = metadata.get("publisher", {}) 

164 

165 resp["package"]["name"] = package.get("name", "") 

166 resp["package"]["summary"] = metadata["summary"] or metadata["description"] 

167 resp["package"]["display_name"] = format_slug( 

168 package.get("name", metadata.get("title")) 

169 ) 

170 resp["package"]["icon_url"] = get_icon( 

171 metadata.get("links", {}).get("media", []) 

172 ) 

173 resp["package"]["website"] = metadata.get("website", "") 

174 resp["package"]["contact"] = metadata.get("contact", "") 

175 

176 resp["publisher"]["display_name"] = publisher.get("display-name", "") 

177 resp["publisher"]["name"] = publisher.get("username", "") 

178 resp["publisher"]["validation"] = publisher.get("validation", "") 

179 

180 return resp 

181 

182 

183def paginate(packages: List[Packages], page: int, size: int) -> List[Packages]: 

184 """ 

185 Paginate the given packages list based on current page and size. 

186 """ 

187 total_items = len(packages) 

188 total_pages = (total_items + size - 1) // size 

189 

190 if page > total_pages: 

191 page = total_pages 

192 if page < 1: 

193 page = 1 

194 

195 start = (page - 1) * size 

196 end = min(start + size, total_items) 

197 

198 return packages[start:end] 

199 

200 

201def parse_rock_details(rock): 

202 """ 

203 Parses detailed rock metadata into a structured format for internal use. 

204 """ 

205 parsed_rock = { 

206 "display_name": "", 

207 "name": rock.get("name", ""), 

208 "description": rock["metadata"].get("description", ""), 

209 "summary": rock["metadata"].get("summary", ""), 

210 "icon_url": "", 

211 "metadata": { 

212 "license": rock["metadata"].get("license", ""), 

213 "links": rock["metadata"].get("links", {}), 

214 "private": rock["metadata"].get("private", False), 

215 "upstream_details": {}, 

216 "related_rocks": [], 

217 "downstream_artifacts": {}, 

218 }, 

219 "categories": rock["metadata"].get("categories", []), 

220 "publisher": { 

221 "name": format_slug( 

222 rock["metadata"]["publisher"].get("display-name", "") 

223 ), 

224 "username": rock["metadata"]["publisher"].get("username", ""), 

225 "validation": rock["metadata"]["publisher"].get("validation", ""), 

226 }, 

227 "channels": [], 

228 } 

229 

230 parsed_rock["display_name"] = format_slug(rock.get("name", "")) 

231 parsed_rock["icon_url"] = get_icon(rock["metadata"].get("media", [])) 

232 parsed_rock["license"] = rock["metadata"].get("license", "") 

233 

234 # Build channel info 

235 for channel in rock.get("channel-map", []): 

236 channel_data = channel.get("channel", {}) 

237 revision_data = channel.get("revision", {}) 

238 v = revision_data.get("version", "").split(".") 

239 if len(v) < 3: 

240 v.append("0") 

241 normalized_version = ".".join(v) 

242 parsed_channel = { 

243 "workload_version": normalized_version, 

244 "risk": channel_data.get("risk", ""), 

245 "last_updated": format_relative_date( 

246 channel_data.get("released-at", "") 

247 ), 

248 "released_at": convert_date(channel_data.get("released-at", "")), 

249 "revision": revision_data["revision"], 

250 "version": revision_data.get("version", ""), 

251 "track": channel_data.get("track", ""), 

252 } 

253 parsed_rock["channels"].append(parsed_channel) 

254 parsed_rock["latest_channel"] = max( 

255 parsed_rock["channels"], 

256 key=lambda x: ( 

257 datetime.datetime.strptime(x["released_at"], "%d %b %Y") 

258 if is_date_format(x["released_at"]) 

259 else x["released_at"] 

260 ), 

261 ) 

262 return parsed_rock 

263 

264 

265def fetch_rocks(query_string): 

266 key = ("fetch_rocks", {"q": query_string}) 

267 rocks = cache.get(key, expected_type=list) 

268 if rocks: 

269 return rocks 

270 rocks = device_gw.find( 

271 "%" if query_string == "" else query_string, fields=FIND_FIELDS 

272 ).get("results", []) 

273 parsed_rocks = [parse_package_for_card(rock) for rock in rocks] 

274 cache.set(key, parsed_rocks, ttl=600) 

275 return parsed_rocks 

276 

277 

278def get_rocks( 

279 size: int = 10, query_string: str = "", page: int = 0 

280) -> List[Dict[str, Any]]: 

281 """ 

282 Fetches paginated and parsed rock packages using DeviceGW. 

283 """ 

284 rocks = fetch_rocks(query_string) 

285 total_items = len(rocks) 

286 total_pages = (total_items + size - 1) // size 

287 rocks_per_page = paginate(rocks, page, size) 

288 return { 

289 "packages": rocks_per_page, 

290 "total_pages": total_pages, 

291 "total_items": total_items, 

292 } 

293 

294 

295def get_rock( 

296 entity_name: str, 

297) -> Dict[str, Any]: 

298 """ 

299 Retrieves a specific rock package by its name. 

300 """ 

301 key = f"get_rock:{entity_name}" 

302 rock = cache.get(key, expected_type=dict) 

303 if rock: 

304 return rock 

305 rock = device_gw.get_item_details(entity_name, fields=DETAILS_FIELDS) 

306 parsed_rock = parse_rock_details(rock) 

307 cache.set(key, parsed_rock, ttl=600) 

308 

309 return parsed_rock