Coverage for webapp/store/views.py: 44%

228 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-28 22:05 +0000

1from math import floor 

2import talisker.requests 

3import flask 

4from dateutil import parser 

5from webapp.decorators import exchange_required, login_required 

6import webapp.helpers as helpers 

7import webapp.store.logic as logic 

8from webapp.api import requests 

9 

10from canonicalwebteam.exceptions import StoreApiError 

11from canonicalwebteam.store_api.dashboard import Dashboard 

12from canonicalwebteam.store_api.publishergw import PublisherGW 

13from canonicalwebteam.store_api.devicegw import DeviceGW 

14 

15from webapp.api.exceptions import ApiError 

16from webapp.store.snap_details_views import snap_details_views 

17from webapp.helpers import api_publisher_session, api_session 

18from flask.json import jsonify 

19import os 

20from webapp.extensions import csrf 

21 

22session = talisker.requests.get_session(requests.Session) 

23 

24YOUTUBE_API_KEY = os.getenv("YOUTUBE_API_KEY") 

25dashboard = Dashboard(api_session) 

26publisher_gateway = PublisherGW("snap", api_publisher_session) 

27device_gateway = DeviceGW("snap", api_session) 

28 

29 

30def store_blueprint(store_query=None): 

31 store = flask.Blueprint( 

32 "store", 

33 __name__, 

34 template_folder="/templates", 

35 static_folder="/static", 

36 ) 

37 snap_details_views(store) 

38 

39 def format_validation_set(validation_set): 

40 return validation_set["headers"] 

41 

42 @store.route("/api/validation-sets") 

43 @login_required 

44 def get_validation_sets(): 

45 res = {} 

46 

47 try: 

48 validation_sets = dashboard.get_validation_sets(flask.session) 

49 res["success"] = True 

50 

51 if len(validation_sets["assertions"]) > 0: 

52 res["data"] = [ 

53 format_validation_set(item) 

54 for item in validation_sets["assertions"] 

55 ] 

56 else: 

57 res["data"] = [] 

58 

59 response = flask.make_response(res, 200) 

60 response.cache_control.max_age = "3600" 

61 except StoreApiError as error_list: 

62 error_messages = [ 

63 f"{error.get('message', 'An error occurred')}" 

64 for error in error_list.errors 

65 ] 

66 

67 res["message"] = " ".join(error_messages) 

68 res["success"] = False 

69 response = flask.make_response(res, 500) 

70 

71 return response 

72 

73 @store.route("/api/validation-sets/<validation_set_id>") 

74 @login_required 

75 def get_validation_set(validation_set_id): 

76 res = {} 

77 

78 try: 

79 validation_set = dashboard.get_validation_set( 

80 flask.session, validation_set_id 

81 ) 

82 res["success"] = True 

83 

84 if len(validation_set["assertions"]) > 0: 

85 res["data"] = [ 

86 format_validation_set(item) 

87 for item in validation_set["assertions"] 

88 ] 

89 else: 

90 res["data"] = [] 

91 

92 response = flask.make_response(res, 200) 

93 response.cache_control.max_age = "3600" 

94 except StoreApiError as error_list: 

95 error_messages = [ 

96 f"{error.get('message', 'An error occurred')}" 

97 for error in error_list.errors 

98 ] 

99 

100 res["message"] = " ".join(error_messages) 

101 res["success"] = False 

102 response = flask.make_response(res, 500) 

103 

104 return response 

105 

106 @store.route("/validation-sets", defaults={"path": ""}) 

107 @store.route("/validation-sets/<path:path>") 

108 @login_required 

109 def validation_sets(path): 

110 return flask.render_template("store/publisher.html") 

111 

112 @store.route("/discover") 

113 def discover(): 

114 return flask.redirect(flask.url_for(".homepage")) 

115 

116 def brand_store_view(): 

117 error_info = {} 

118 status_code = 200 

119 

120 try: 

121 snaps = device_gateway.get_all_items(size=16)["results"] 

122 except (StoreApiError, ApiError): 

123 snaps = [] 

124 

125 for snap in snaps: 

126 if "media" in snap: 

127 snap["icon_url"] = helpers.get_icon(snap["media"]) 

128 

129 return ( 

130 flask.render_template( 

131 "brand-store/store.html", snaps=snaps, error_info=error_info 

132 ), 

133 status_code, 

134 ) 

135 

136 def brand_search_snap(): 

137 status_code = 200 

138 snap_searched = flask.request.args.get("q", default="", type=str) 

139 

140 if not snap_searched: 

141 return flask.redirect(flask.url_for(".homepage")) 

142 

143 size = flask.request.args.get("limit", default=25, type=int) 

144 offset = flask.request.args.get("offset", default=0, type=int) 

145 

146 try: 

147 page = floor(offset / size) + 1 

148 except ZeroDivisionError: 

149 size = 10 

150 page = floor(offset / size) + 1 

151 

152 error_info = {} 

153 searched_results = [] 

154 

155 searched_results = device_gateway.search( 

156 snap_searched, size=size, page=page 

157 ) 

158 

159 snaps_results = searched_results["results"] 

160 

161 for snap in snaps_results: 

162 snap["icon_url"] = helpers.get_icon(snap["media"]) 

163 

164 links = logic.get_pages_details( 

165 flask.request.base_url, 

166 ( 

167 searched_results["_links"] 

168 if "_links" in searched_results 

169 else [] 

170 ), 

171 ) 

172 

173 context = { 

174 "query": snap_searched, 

175 "snaps": snaps_results, 

176 "links": links, 

177 "error_info": error_info, 

178 } 

179 

180 return ( 

181 flask.render_template("brand-store/search.html", **context), 

182 status_code, 

183 ) 

184 

185 @store.route("/store") 

186 def store_view(): 

187 return flask.render_template("store/store.html") 

188 

189 @store.route("/youtube", methods=["POST"]) 

190 def get_video_thumbnail_data(): 

191 body = flask.request.form 

192 thumbnail_url = "https://www.googleapis.com/youtube/v3/videos" 

193 thumbnail_data = session.get( 

194 ( 

195 f"{thumbnail_url}?id={body['videoId']}" 

196 f"&part=snippet&key={YOUTUBE_API_KEY}" 

197 ) 

198 ) 

199 

200 if thumbnail_data: 

201 return thumbnail_data.json() 

202 

203 return {} 

204 

205 @store.route("/publisher/<regex('[a-z0-9-]*[a-z][a-z0-9-]*'):publisher>") 

206 def publisher_details(publisher): 

207 """ 

208 A view to display the publisher details page for specific publisher. 

209 """ 

210 

211 # 404 for the snap-quarantine publisher 

212 if publisher == "snap-quarantine": 

213 flask.abort(404) 

214 

215 publisher_content_path = flask.current_app.config["CONTENT_DIRECTORY"][ 

216 "PUBLISHER_PAGES" 

217 ] 

218 

219 if publisher in ["kde", "snapcrafters", "jetbrains"]: 

220 context = helpers.get_yaml( 

221 publisher_content_path + publisher + ".yaml", typ="safe" 

222 ) 

223 

224 if not context: 

225 flask.abort(404) 

226 

227 popular_snaps = helpers.get_yaml( 

228 publisher_content_path + publisher + "-snaps.yaml", 

229 typ="safe", 

230 ) 

231 

232 context["popular_snaps"] = ( 

233 popular_snaps["snaps"] if popular_snaps else [] 

234 ) 

235 

236 if "publishers" in context: 

237 context["snaps"] = [] 

238 for publisher in context["publishers"]: 

239 snaps_results = [] 

240 try: 

241 snaps_results = device_gateway.get_publisher_items( 

242 publisher, size=500, page=1 

243 )["_embedded"]["clickindex:package"] 

244 except StoreApiError: 

245 pass 

246 

247 for snap in snaps_results: 

248 snap["icon_url"] = helpers.get_icon(snap["media"]) 

249 

250 context["snaps"].extend( 

251 [snap for snap in snaps_results if snap["apps"]] 

252 ) 

253 

254 featured_snaps = [ 

255 snap["package_name"] for snap in context["featured_snaps"] 

256 ] 

257 

258 context["snaps"] = [ 

259 snap 

260 for snap in context["snaps"] 

261 if snap["package_name"] not in featured_snaps 

262 ] 

263 

264 context["snaps_count"] = len(context["snaps"]) + len( 

265 featured_snaps 

266 ) 

267 

268 return flask.render_template( 

269 "store/publisher-details.html", **context 

270 ) 

271 

272 status_code = 200 

273 error_info = {} 

274 snaps_results = [] 

275 snaps = [] 

276 snaps_count = 0 

277 publisher_details = {"display-name": publisher, "username": publisher} 

278 

279 snaps_results = device_gateway.find( 

280 publisher=publisher, 

281 fields=[ 

282 "title", 

283 "summary", 

284 "media", 

285 "publisher", 

286 ], 

287 )["results"] 

288 

289 for snap in snaps_results: 

290 item = snap["snap"] 

291 item["package_name"] = snap["name"] 

292 item["icon_url"] = helpers.get_icon(item["media"]) 

293 snaps.append(item) 

294 

295 snaps_count = len(snaps) 

296 

297 if snaps_count > 0: 

298 publisher_details = snaps[0]["publisher"] 

299 

300 context = { 

301 "snaps": snaps, 

302 "snaps_count": snaps_count, 

303 "publisher": publisher_details, 

304 "error_info": error_info, 

305 } 

306 

307 return ( 

308 flask.render_template( 

309 "store/community-publisher-details.html", **context 

310 ), 

311 status_code, 

312 ) 

313 

314 @store.route("/store/categories/<category>") 

315 def store_category(category): 

316 status_code = 200 

317 error_info = {} 

318 snaps_results = [] 

319 

320 snaps_results = device_gateway.get_category_items( 

321 category=category, size=10, page=1 

322 )["results"] 

323 for snap in snaps_results: 

324 snap["icon_url"] = helpers.get_icon(snap["media"]) 

325 

326 # if the first snap (banner snap) doesn't have an icon, remove the last 

327 # snap from the list to avoid a hanging snap (grid of 9) 

328 if len(snaps_results) == 10 and snaps_results[0]["icon_url"] == "": 

329 snaps_results = snaps_results[:-1] 

330 

331 for index in range(len(snaps_results)): 

332 snaps_results[index] = logic.get_snap_banner_url( 

333 snaps_results[index] 

334 ) 

335 

336 context = { 

337 "category": category, 

338 "has_featured": True, 

339 "snaps": snaps_results, 

340 "error_info": error_info, 

341 } 

342 

343 return ( 

344 flask.render_template("store/_category-partial.html", **context), 

345 status_code, 

346 ) 

347 

348 @store.route("/store/featured-snaps/<category>") 

349 def featured_snaps_in_category(category): 

350 snaps_results = [] 

351 

352 snaps_results = device_gateway.get_category_items( 

353 category=category, size=3, page=1 

354 )["_embedded"]["clickindex:package"] 

355 

356 for snap in snaps_results: 

357 snap["icon_url"] = helpers.get_icon(snap["media"]) 

358 

359 return flask.jsonify(snaps_results) 

360 

361 @store.route("/store/sitemap.xml") 

362 def sitemap(): 

363 base_url = "https://snapcraft.io/store" 

364 

365 snaps = [] 

366 page = 0 

367 url = f"https://api.snapcraft.io/api/v1/snaps/search?page={page}" 

368 while url: 

369 response = session.get(url) 

370 try: 

371 snaps_response = response.json() 

372 except Exception: 

373 continue 

374 

375 for snap in snaps_response["_embedded"]["clickindex:package"]: 

376 try: 

377 last_udpated = ( 

378 parser.parse(snap["last_updated"]) 

379 .replace(tzinfo=None) 

380 .strftime("%Y-%m-%d") 

381 ) 

382 snaps.append( 

383 { 

384 "url": "https://snapcraft.io/" 

385 + snap["package_name"], 

386 "last_udpated": last_udpated, 

387 } 

388 ) 

389 except Exception: 

390 continue 

391 if "next" in snaps_response["_links"]: 

392 url = snaps_response["_links"]["next"]["href"] 

393 else: 

394 url = None 

395 

396 xml_sitemap = flask.render_template( 

397 "sitemap/sitemap.xml", 

398 base_url=base_url, 

399 links=snaps, 

400 ) 

401 

402 response = flask.make_response(xml_sitemap) 

403 response.headers["Content-Type"] = "application/xml" 

404 response.headers["Cache-Control"] = "public, max-age=43200" 

405 

406 return response 

407 

408 if store_query: 

409 store.add_url_rule("/", "homepage", brand_store_view) 

410 store.add_url_rule("/search", "search", brand_search_snap) 

411 else: 

412 store.add_url_rule("/store", "homepage", store_view) 

413 

414 @store.route("/<snap_name>/create-track", methods=["POST"]) 

415 @login_required 

416 @csrf.exempt 

417 @exchange_required 

418 def post_create_track(snap_name): 

419 track_name = flask.request.form["track-name"] 

420 version_pattern = flask.request.form.get("version-pattern") 

421 auto_phasing_percentage = flask.request.form.get( 

422 "automatic-phasing-percentage" 

423 ) 

424 

425 if auto_phasing_percentage is not None: 

426 auto_phasing_percentage = float(auto_phasing_percentage) 

427 

428 response = publisher_gateway.create_track( 

429 flask.session, 

430 snap_name, 

431 track_name, 

432 version_pattern, 

433 auto_phasing_percentage, 

434 ) 

435 if response.status_code == 201: 

436 return response.json(), response.status_code 

437 if response.status_code == 409: 

438 return ( 

439 jsonify({"error": "Track already exists."}), 

440 response.status_code, 

441 ) 

442 if "error-list" in response.json(): 

443 return ( 

444 jsonify( 

445 {"error": response.json()["error-list"][0]["message"]} 

446 ), 

447 response.status_code, 

448 ) 

449 return response.json(), response.status_code 

450 

451 return store