Coverage for webapp/store/views.py: 44%
228 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-28 22:05 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-28 22:05 +0000
1from math import floor
2import talisker.requests
3import flask
4from dateutil import parser
5from webapp.decorators import exchange_required, login_required
6import webapp.helpers as helpers
7import webapp.store.logic as logic
8from webapp.api import requests
10from canonicalwebteam.exceptions import StoreApiError
11from canonicalwebteam.store_api.dashboard import Dashboard
12from canonicalwebteam.store_api.publishergw import PublisherGW
13from canonicalwebteam.store_api.devicegw import DeviceGW
15from webapp.api.exceptions import ApiError
16from webapp.store.snap_details_views import snap_details_views
17from webapp.helpers import api_publisher_session, api_session
18from flask.json import jsonify
19import os
20from webapp.extensions import csrf
22session = talisker.requests.get_session(requests.Session)
24YOUTUBE_API_KEY = os.getenv("YOUTUBE_API_KEY")
25dashboard = Dashboard(api_session)
26publisher_gateway = PublisherGW("snap", api_publisher_session)
27device_gateway = DeviceGW("snap", api_session)
30def store_blueprint(store_query=None):
31 store = flask.Blueprint(
32 "store",
33 __name__,
34 template_folder="/templates",
35 static_folder="/static",
36 )
37 snap_details_views(store)
39 def format_validation_set(validation_set):
40 return validation_set["headers"]
42 @store.route("/api/validation-sets")
43 @login_required
44 def get_validation_sets():
45 res = {}
47 try:
48 validation_sets = dashboard.get_validation_sets(flask.session)
49 res["success"] = True
51 if len(validation_sets["assertions"]) > 0:
52 res["data"] = [
53 format_validation_set(item)
54 for item in validation_sets["assertions"]
55 ]
56 else:
57 res["data"] = []
59 response = flask.make_response(res, 200)
60 response.cache_control.max_age = "3600"
61 except StoreApiError as error_list:
62 error_messages = [
63 f"{error.get('message', 'An error occurred')}"
64 for error in error_list.errors
65 ]
67 res["message"] = " ".join(error_messages)
68 res["success"] = False
69 response = flask.make_response(res, 500)
71 return response
73 @store.route("/api/validation-sets/<validation_set_id>")
74 @login_required
75 def get_validation_set(validation_set_id):
76 res = {}
78 try:
79 validation_set = dashboard.get_validation_set(
80 flask.session, validation_set_id
81 )
82 res["success"] = True
84 if len(validation_set["assertions"]) > 0:
85 res["data"] = [
86 format_validation_set(item)
87 for item in validation_set["assertions"]
88 ]
89 else:
90 res["data"] = []
92 response = flask.make_response(res, 200)
93 response.cache_control.max_age = "3600"
94 except StoreApiError as error_list:
95 error_messages = [
96 f"{error.get('message', 'An error occurred')}"
97 for error in error_list.errors
98 ]
100 res["message"] = " ".join(error_messages)
101 res["success"] = False
102 response = flask.make_response(res, 500)
104 return response
106 @store.route("/validation-sets", defaults={"path": ""})
107 @store.route("/validation-sets/<path:path>")
108 @login_required
109 def validation_sets(path):
110 return flask.render_template("store/publisher.html")
112 @store.route("/discover")
113 def discover():
114 return flask.redirect(flask.url_for(".homepage"))
116 def brand_store_view():
117 error_info = {}
118 status_code = 200
120 try:
121 snaps = device_gateway.get_all_items(size=16)["results"]
122 except (StoreApiError, ApiError):
123 snaps = []
125 for snap in snaps:
126 if "media" in snap:
127 snap["icon_url"] = helpers.get_icon(snap["media"])
129 return (
130 flask.render_template(
131 "brand-store/store.html", snaps=snaps, error_info=error_info
132 ),
133 status_code,
134 )
136 def brand_search_snap():
137 status_code = 200
138 snap_searched = flask.request.args.get("q", default="", type=str)
140 if not snap_searched:
141 return flask.redirect(flask.url_for(".homepage"))
143 size = flask.request.args.get("limit", default=25, type=int)
144 offset = flask.request.args.get("offset", default=0, type=int)
146 try:
147 page = floor(offset / size) + 1
148 except ZeroDivisionError:
149 size = 10
150 page = floor(offset / size) + 1
152 error_info = {}
153 searched_results = []
155 searched_results = device_gateway.search(
156 snap_searched, size=size, page=page
157 )
159 snaps_results = searched_results["results"]
161 for snap in snaps_results:
162 snap["icon_url"] = helpers.get_icon(snap["media"])
164 links = logic.get_pages_details(
165 flask.request.base_url,
166 (
167 searched_results["_links"]
168 if "_links" in searched_results
169 else []
170 ),
171 )
173 context = {
174 "query": snap_searched,
175 "snaps": snaps_results,
176 "links": links,
177 "error_info": error_info,
178 }
180 return (
181 flask.render_template("brand-store/search.html", **context),
182 status_code,
183 )
185 @store.route("/store")
186 def store_view():
187 return flask.render_template("store/store.html")
189 @store.route("/youtube", methods=["POST"])
190 def get_video_thumbnail_data():
191 body = flask.request.form
192 thumbnail_url = "https://www.googleapis.com/youtube/v3/videos"
193 thumbnail_data = session.get(
194 (
195 f"{thumbnail_url}?id={body['videoId']}"
196 f"&part=snippet&key={YOUTUBE_API_KEY}"
197 )
198 )
200 if thumbnail_data:
201 return thumbnail_data.json()
203 return {}
205 @store.route("/publisher/<regex('[a-z0-9-]*[a-z][a-z0-9-]*'):publisher>")
206 def publisher_details(publisher):
207 """
208 A view to display the publisher details page for specific publisher.
209 """
211 # 404 for the snap-quarantine publisher
212 if publisher == "snap-quarantine":
213 flask.abort(404)
215 publisher_content_path = flask.current_app.config["CONTENT_DIRECTORY"][
216 "PUBLISHER_PAGES"
217 ]
219 if publisher in ["kde", "snapcrafters", "jetbrains"]:
220 context = helpers.get_yaml(
221 publisher_content_path + publisher + ".yaml", typ="safe"
222 )
224 if not context:
225 flask.abort(404)
227 popular_snaps = helpers.get_yaml(
228 publisher_content_path + publisher + "-snaps.yaml",
229 typ="safe",
230 )
232 context["popular_snaps"] = (
233 popular_snaps["snaps"] if popular_snaps else []
234 )
236 if "publishers" in context:
237 context["snaps"] = []
238 for publisher in context["publishers"]:
239 snaps_results = []
240 try:
241 snaps_results = device_gateway.get_publisher_items(
242 publisher, size=500, page=1
243 )["_embedded"]["clickindex:package"]
244 except StoreApiError:
245 pass
247 for snap in snaps_results:
248 snap["icon_url"] = helpers.get_icon(snap["media"])
250 context["snaps"].extend(
251 [snap for snap in snaps_results if snap["apps"]]
252 )
254 featured_snaps = [
255 snap["package_name"] for snap in context["featured_snaps"]
256 ]
258 context["snaps"] = [
259 snap
260 for snap in context["snaps"]
261 if snap["package_name"] not in featured_snaps
262 ]
264 context["snaps_count"] = len(context["snaps"]) + len(
265 featured_snaps
266 )
268 return flask.render_template(
269 "store/publisher-details.html", **context
270 )
272 status_code = 200
273 error_info = {}
274 snaps_results = []
275 snaps = []
276 snaps_count = 0
277 publisher_details = {"display-name": publisher, "username": publisher}
279 snaps_results = device_gateway.find(
280 publisher=publisher,
281 fields=[
282 "title",
283 "summary",
284 "media",
285 "publisher",
286 ],
287 )["results"]
289 for snap in snaps_results:
290 item = snap["snap"]
291 item["package_name"] = snap["name"]
292 item["icon_url"] = helpers.get_icon(item["media"])
293 snaps.append(item)
295 snaps_count = len(snaps)
297 if snaps_count > 0:
298 publisher_details = snaps[0]["publisher"]
300 context = {
301 "snaps": snaps,
302 "snaps_count": snaps_count,
303 "publisher": publisher_details,
304 "error_info": error_info,
305 }
307 return (
308 flask.render_template(
309 "store/community-publisher-details.html", **context
310 ),
311 status_code,
312 )
314 @store.route("/store/categories/<category>")
315 def store_category(category):
316 status_code = 200
317 error_info = {}
318 snaps_results = []
320 snaps_results = device_gateway.get_category_items(
321 category=category, size=10, page=1
322 )["results"]
323 for snap in snaps_results:
324 snap["icon_url"] = helpers.get_icon(snap["media"])
326 # if the first snap (banner snap) doesn't have an icon, remove the last
327 # snap from the list to avoid a hanging snap (grid of 9)
328 if len(snaps_results) == 10 and snaps_results[0]["icon_url"] == "":
329 snaps_results = snaps_results[:-1]
331 for index in range(len(snaps_results)):
332 snaps_results[index] = logic.get_snap_banner_url(
333 snaps_results[index]
334 )
336 context = {
337 "category": category,
338 "has_featured": True,
339 "snaps": snaps_results,
340 "error_info": error_info,
341 }
343 return (
344 flask.render_template("store/_category-partial.html", **context),
345 status_code,
346 )
348 @store.route("/store/featured-snaps/<category>")
349 def featured_snaps_in_category(category):
350 snaps_results = []
352 snaps_results = device_gateway.get_category_items(
353 category=category, size=3, page=1
354 )["_embedded"]["clickindex:package"]
356 for snap in snaps_results:
357 snap["icon_url"] = helpers.get_icon(snap["media"])
359 return flask.jsonify(snaps_results)
361 @store.route("/store/sitemap.xml")
362 def sitemap():
363 base_url = "https://snapcraft.io/store"
365 snaps = []
366 page = 0
367 url = f"https://api.snapcraft.io/api/v1/snaps/search?page={page}"
368 while url:
369 response = session.get(url)
370 try:
371 snaps_response = response.json()
372 except Exception:
373 continue
375 for snap in snaps_response["_embedded"]["clickindex:package"]:
376 try:
377 last_udpated = (
378 parser.parse(snap["last_updated"])
379 .replace(tzinfo=None)
380 .strftime("%Y-%m-%d")
381 )
382 snaps.append(
383 {
384 "url": "https://snapcraft.io/"
385 + snap["package_name"],
386 "last_udpated": last_udpated,
387 }
388 )
389 except Exception:
390 continue
391 if "next" in snaps_response["_links"]:
392 url = snaps_response["_links"]["next"]["href"]
393 else:
394 url = None
396 xml_sitemap = flask.render_template(
397 "sitemap/sitemap.xml",
398 base_url=base_url,
399 links=snaps,
400 )
402 response = flask.make_response(xml_sitemap)
403 response.headers["Content-Type"] = "application/xml"
404 response.headers["Cache-Control"] = "public, max-age=43200"
406 return response
408 if store_query:
409 store.add_url_rule("/", "homepage", brand_store_view)
410 store.add_url_rule("/search", "search", brand_search_snap)
411 else:
412 store.add_url_rule("/store", "homepage", store_view)
414 @store.route("/<snap_name>/create-track", methods=["POST"])
415 @login_required
416 @csrf.exempt
417 @exchange_required
418 def post_create_track(snap_name):
419 track_name = flask.request.form["track-name"]
420 version_pattern = flask.request.form.get("version-pattern")
421 auto_phasing_percentage = flask.request.form.get(
422 "automatic-phasing-percentage"
423 )
425 if auto_phasing_percentage is not None:
426 auto_phasing_percentage = float(auto_phasing_percentage)
428 response = publisher_gateway.create_track(
429 flask.session,
430 snap_name,
431 track_name,
432 version_pattern,
433 auto_phasing_percentage,
434 )
435 if response.status_code == 201:
436 return response.json(), response.status_code
437 if response.status_code == 409:
438 return (
439 jsonify({"error": "Track already exists."}),
440 response.status_code,
441 )
442 if "error-list" in response.json():
443 return (
444 jsonify(
445 {"error": response.json()["error-list"][0]["message"]}
446 ),
447 response.status_code,
448 )
449 return response.json(), response.status_code
451 return store