Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1from math import floor 

2import talisker.requests 

3import flask 

4from dateutil import parser 

5from webapp.decorators import exchange_required, login_required 

6import webapp.helpers as helpers 

7import webapp.store.logic as logic 

8from webapp.api import requests 

9from canonicalwebteam.store_api.stores.snapstore import ( 

10 SnapStore, 

11 SnapPublisher, 

12) 

13from canonicalwebteam.store_api.exceptions import StoreApiError 

14from webapp.api.exceptions import ApiError 

15from webapp.store.snap_details_views import snap_details_views 

16from webapp.helpers import api_publisher_session 

17from flask.json import jsonify 

18import os 

19from webapp.extensions import csrf 

20 

21session = talisker.requests.get_session(requests.Session) 

22 

23YOUTUBE_API_KEY = os.getenv("YOUTUBE_API_KEY") 

24publisher_api = SnapPublisher(api_publisher_session) 

25 

26 

27def store_blueprint(store_query=None): 

28 api = SnapStore(session, store_query) 

29 

30 store = flask.Blueprint( 

31 "store", 

32 __name__, 

33 template_folder="/templates", 

34 static_folder="/static", 

35 ) 

36 snap_details_views(store, api) 

37 

38 def format_validation_set(validation_set): 

39 return validation_set["headers"] 

40 

41 @store.route("/api/validation-sets") 

42 @login_required 

43 def get_validation_sets(): 

44 res = {} 

45 

46 try: 

47 validation_sets = publisher_api.get_validation_sets(flask.session) 

48 res["success"] = True 

49 

50 if len(validation_sets["assertions"]) > 0: 

51 res["data"] = [ 

52 format_validation_set(item) 

53 for item in validation_sets["assertions"] 

54 ] 

55 else: 

56 res["data"] = [] 

57 

58 response = flask.make_response(res, 200) 

59 response.cache_control.max_age = "3600" 

60 except StoreApiError as error_list: 

61 error_messages = [ 

62 f"{error.get('message', 'An error occurred')}" 

63 for error in error_list.errors 

64 ] 

65 

66 res["message"] = " ".join(error_messages) 

67 res["success"] = False 

68 response = flask.make_response(res, 500) 

69 

70 return response 

71 

72 @store.route("/api/validation-sets/<validation_set_id>") 

73 @login_required 

74 def get_validation_set(validation_set_id): 

75 res = {} 

76 

77 try: 

78 validation_set = publisher_api.get_validation_set( 

79 flask.session, validation_set_id 

80 ) 

81 res["success"] = True 

82 

83 if len(validation_set["assertions"]) > 0: 

84 res["data"] = [ 

85 format_validation_set(item) 

86 for item in validation_set["assertions"] 

87 ] 

88 else: 

89 res["data"] = [] 

90 

91 response = flask.make_response(res, 200) 

92 response.cache_control.max_age = "3600" 

93 except StoreApiError as error_list: 

94 error_messages = [ 

95 f"{error.get('message', 'An error occurred')}" 

96 for error in error_list.errors 

97 ] 

98 

99 res["message"] = " ".join(error_messages) 

100 res["success"] = False 

101 response = flask.make_response(res, 500) 

102 

103 return response 

104 

105 @store.route("/validation-sets", defaults={"path": ""}) 

106 @store.route("/validation-sets/<path:path>") 

107 @login_required 

108 def validation_sets(path): 

109 return flask.render_template("store/publisher.html") 

110 

111 @store.route("/discover") 

112 def discover(): 

113 return flask.redirect(flask.url_for(".homepage")) 

114 

115 def brand_store_view(): 

116 error_info = {} 

117 status_code = 200 

118 

119 try: 

120 snaps = api.get_all_items(size=16)["results"] 

121 except (StoreApiError, ApiError): 

122 snaps = [] 

123 

124 for snap in snaps: 

125 if "media" in snap: 

126 snap["icon_url"] = helpers.get_icon(snap["media"]) 

127 

128 return ( 

129 flask.render_template( 

130 "brand-store/store.html", snaps=snaps, error_info=error_info 

131 ), 

132 status_code, 

133 ) 

134 

135 def brand_search_snap(): 

136 status_code = 200 

137 snap_searched = flask.request.args.get("q", default="", type=str) 

138 

139 if not snap_searched: 

140 return flask.redirect(flask.url_for(".homepage")) 

141 

142 size = flask.request.args.get("limit", default=25, type=int) 

143 offset = flask.request.args.get("offset", default=0, type=int) 

144 

145 try: 

146 page = floor(offset / size) + 1 

147 except ZeroDivisionError: 

148 size = 10 

149 page = floor(offset / size) + 1 

150 

151 error_info = {} 

152 searched_results = [] 

153 

154 searched_results = api.search(snap_searched, size=size, page=page) 

155 

156 snaps_results = searched_results["results"] 

157 

158 for snap in snaps_results: 

159 snap["icon_url"] = helpers.get_icon(snap["media"]) 

160 

161 links = logic.get_pages_details( 

162 flask.request.base_url, 

163 ( 

164 searched_results["_links"] 

165 if "_links" in searched_results 

166 else [] 

167 ), 

168 ) 

169 

170 context = { 

171 "query": snap_searched, 

172 "snaps": snaps_results, 

173 "links": links, 

174 "error_info": error_info, 

175 } 

176 

177 return ( 

178 flask.render_template("brand-store/search.html", **context), 

179 status_code, 

180 ) 

181 

182 @store.route("/store") 

183 def store_view(): 

184 return flask.render_template("store/store.html") 

185 

186 @store.route("/youtube", methods=["POST"]) 

187 def get_video_thumbnail_data(): 

188 body = flask.request.form 

189 thumbnail_url = "https://www.googleapis.com/youtube/v3/videos" 

190 thumbnail_data = session.get( 

191 ( 

192 f"{thumbnail_url}?id={body['videoId']}" 

193 f"&part=snippet&key={YOUTUBE_API_KEY}" 

194 ) 

195 ) 

196 

197 if thumbnail_data: 

198 return thumbnail_data.json() 

199 

200 return {} 

201 

202 @store.route("/publisher/<regex('[a-z0-9-]*[a-z][a-z0-9-]*'):publisher>") 

203 def publisher_details(publisher): 

204 """ 

205 A view to display the publisher details page for specific publisher. 

206 """ 

207 

208 # 404 for the snap-quarantine publisher 

209 if publisher == "snap-quarantine": 

210 flask.abort(404) 

211 

212 publisher_content_path = flask.current_app.config["CONTENT_DIRECTORY"][ 

213 "PUBLISHER_PAGES" 

214 ] 

215 

216 if publisher in ["kde", "snapcrafters", "jetbrains"]: 

217 context = helpers.get_yaml( 

218 publisher_content_path + publisher + ".yaml", typ="safe" 

219 ) 

220 

221 if not context: 

222 flask.abort(404) 

223 

224 popular_snaps = helpers.get_yaml( 

225 publisher_content_path + publisher + "-snaps.yaml", 

226 typ="safe", 

227 ) 

228 

229 context["popular_snaps"] = ( 

230 popular_snaps["snaps"] if popular_snaps else [] 

231 ) 

232 

233 if "publishers" in context: 

234 context["snaps"] = [] 

235 for publisher in context["publishers"]: 

236 snaps_results = [] 

237 try: 

238 snaps_results = api.get_publisher_items( 

239 publisher, size=500, page=1 

240 )["results"] 

241 except StoreApiError: 

242 pass 

243 

244 for snap in snaps_results: 

245 snap["icon_url"] = helpers.get_icon(snap["media"]) 

246 

247 context["snaps"].extend( 

248 [snap for snap in snaps_results if snap["apps"]] 

249 ) 

250 

251 featured_snaps = [ 

252 snap["package_name"] for snap in context["featured_snaps"] 

253 ] 

254 

255 context["snaps"] = [ 

256 snap 

257 for snap in context["snaps"] 

258 if snap["package_name"] not in featured_snaps 

259 ] 

260 

261 context["snaps_count"] = len(context["snaps"]) + len( 

262 featured_snaps 

263 ) 

264 

265 return flask.render_template( 

266 "store/publisher-details.html", **context 

267 ) 

268 

269 status_code = 200 

270 error_info = {} 

271 snaps_results = [] 

272 snaps = [] 

273 snaps_count = 0 

274 publisher_details = {"display-name": publisher, "username": publisher} 

275 

276 snaps_results = api.find( 

277 publisher=publisher, 

278 fields=[ 

279 "title", 

280 "summary", 

281 "media", 

282 "publisher", 

283 ], 

284 )["results"] 

285 

286 for snap in snaps_results: 

287 item = snap["snap"] 

288 item["package_name"] = snap["name"] 

289 item["icon_url"] = helpers.get_icon(item["media"]) 

290 snaps.append(item) 

291 

292 snaps_count = len(snaps) 

293 

294 if snaps_count > 0: 

295 publisher_details = snaps[0]["publisher"] 

296 

297 context = { 

298 "snaps": snaps, 

299 "snaps_count": snaps_count, 

300 "publisher": publisher_details, 

301 "error_info": error_info, 

302 } 

303 

304 return ( 

305 flask.render_template( 

306 "store/community-publisher-details.html", **context 

307 ), 

308 status_code, 

309 ) 

310 

311 @store.route("/store/categories/<category>") 

312 def store_category(category): 

313 status_code = 200 

314 error_info = {} 

315 snaps_results = [] 

316 

317 snaps_results = api.get_category_items( 

318 category=category, size=10, page=1 

319 )["results"] 

320 for snap in snaps_results: 

321 snap["icon_url"] = helpers.get_icon(snap["media"]) 

322 

323 # if the first snap (banner snap) doesn't have an icon, remove the last 

324 # snap from the list to avoid a hanging snap (grid of 9) 

325 if len(snaps_results) == 10 and snaps_results[0]["icon_url"] == "": 

326 snaps_results = snaps_results[:-1] 

327 

328 for index in range(len(snaps_results)): 

329 snaps_results[index] = logic.get_snap_banner_url( 

330 snaps_results[index] 

331 ) 

332 

333 context = { 

334 "category": category, 

335 "has_featured": True, 

336 "snaps": snaps_results, 

337 "error_info": error_info, 

338 } 

339 

340 return ( 

341 flask.render_template("store/_category-partial.html", **context), 

342 status_code, 

343 ) 

344 

345 @store.route("/store/featured-snaps/<category>") 

346 def featured_snaps_in_category(category): 

347 snaps_results = [] 

348 

349 snaps_results = api.get_category_items( 

350 category=category, size=3, page=1 

351 )["results"] 

352 

353 for snap in snaps_results: 

354 snap["icon_url"] = helpers.get_icon(snap["media"]) 

355 

356 return flask.jsonify(snaps_results) 

357 

358 @store.route("/store/sitemap.xml") 

359 def sitemap(): 

360 base_url = "https://snapcraft.io/store" 

361 

362 snaps = [] 

363 page = 0 

364 url = f"https://api.snapcraft.io/api/v1/snaps/search?page={page}" 

365 while url: 

366 response = session.get(url) 

367 try: 

368 snaps_response = response.json() 

369 except Exception: 

370 continue 

371 

372 for snap in snaps_response["_embedded"]["clickindex:package"]: 

373 try: 

374 last_udpated = ( 

375 parser.parse(snap["last_updated"]) 

376 .replace(tzinfo=None) 

377 .strftime("%Y-%m-%d") 

378 ) 

379 snaps.append( 

380 { 

381 "url": "https://snapcraft.io/" 

382 + snap["package_name"], 

383 "last_udpated": last_udpated, 

384 } 

385 ) 

386 except Exception: 

387 continue 

388 if "next" in snaps_response["_links"]: 

389 url = snaps_response["_links"]["next"]["href"] 

390 else: 

391 url = None 

392 

393 xml_sitemap = flask.render_template( 

394 "sitemap/sitemap.xml", 

395 base_url=base_url, 

396 links=snaps, 

397 ) 

398 

399 response = flask.make_response(xml_sitemap) 

400 response.headers["Content-Type"] = "application/xml" 

401 response.headers["Cache-Control"] = "public, max-age=43200" 

402 

403 return response 

404 

405 if store_query: 

406 store.add_url_rule("/", "homepage", brand_store_view) 

407 store.add_url_rule("/search", "search", brand_search_snap) 

408 else: 

409 store.add_url_rule("/store", "homepage", store_view) 

410 

411 @store.route("/<snap_name>/create-track", methods=["POST"]) 

412 @login_required 

413 @csrf.exempt 

414 @exchange_required 

415 def post_create_track(snap_name): 

416 track_name = flask.request.form["track-name"] 

417 version_pattern = flask.request.form.get("version-pattern") 

418 auto_phasing_percentage = flask.request.form.get( 

419 "automatic-phasing-percentage" 

420 ) 

421 

422 if auto_phasing_percentage is not None: 

423 auto_phasing_percentage = float(auto_phasing_percentage) 

424 

425 response = publisher_api.create_track( 

426 flask.session, 

427 snap_name, 

428 track_name, 

429 version_pattern, 

430 auto_phasing_percentage, 

431 ) 

432 if response.status_code == 201: 

433 return response.json(), response.status_code 

434 if response.status_code == 409: 

435 return ( 

436 jsonify({"error": "Track already exists."}), 

437 response.status_code, 

438 ) 

439 if "error-list" in response.json(): 

440 return ( 

441 jsonify( 

442 {"error": response.json()["error-list"][0]["message"]} 

443 ), 

444 response.status_code, 

445 ) 

446 return response.json(), response.status_code 

447 

448 return store