Coverage for webapp / publisher / cve / cve_helper.py: 97%
126 statements
« prev ^ index » next coverage.py v7.13.1, created at 2025-12-29 22:06 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2025-12-29 22:06 +0000
1import json
2from os import getenv
3import requests
4import re
6from werkzeug.exceptions import NotFound
8from webapp.publisher.snaps import (
9 logic,
10)
12REST_API_URL = "https://api.github.com"
13GITHUB_SNAPCRAFT_BOT_USER_TOKEN = getenv("GITHUB_SNAPCRAFT_BOT_USER_TOKEN")
14GLOBAL_STORE = "Global"
15CANONICAL_PUBLISHER_ID = "canonical"
18class CveHelper:
19 """
20 Provides CVE data through GitHub by using snapcraft-web@canonical.com.
21 """
23 @staticmethod
24 def _get_cve_file_metadata(file_path):
25 url = (
26 f"{REST_API_URL}/repos/canonical/canonicalwebteam.snap-cves/"
27 f"contents/{file_path}?ref=main"
28 )
29 headers = {"Authorization": f"token {GITHUB_SNAPCRAFT_BOT_USER_TOKEN}"}
30 response = requests.get(url, headers=headers)
32 if response.status_code == 200:
33 return response.json()
34 else:
35 raise NotFound
37 @staticmethod
38 def _format_cve_response(cve_list, cve_details, usn_details, status):
39 cves = []
41 for cve_id, cve in cve_list.items():
42 cve_detail = cve_details[cve_id]
44 cve_usns = []
45 cve_usn_list = cve["usns"]
46 if cve_usn_list:
47 for usn_id in cve_usn_list:
48 usn_detail = usn_details[usn_id]
49 if usn_detail:
50 cve_usns.append(
51 {
52 "id": usn_id,
53 "description": usn_detail["description"],
54 "published_at": usn_detail["published_at"],
55 "related_launchpad_bugs": usn_detail[
56 "related_launchpad_bugs"
57 ],
58 }
59 )
61 channels_with_fix = None
63 if "channels_with_fix" in cve:
64 channels_with_fix = cve["channels_with_fix"]
66 cves.append(
67 {
68 "id": cve_id,
69 "status": status,
70 "cvss_score": cve_detail["cvss_score"],
71 "cvss_severity": cve_detail["cvss_severity"],
72 "description": cve_detail["description"],
73 "ubuntu_priority": cve_detail["ubuntu_priority"],
74 "affected_binaries": cve["affected_binaries"],
75 "channels_with_fix": channels_with_fix,
76 "usns": cve_usns,
77 }
78 )
80 return cves
82 @staticmethod
83 def _fetch_file_content(snap_name, revision, file_metadata):
84 if "download_url" in file_metadata:
85 download_url = file_metadata["download_url"]
86 headers = {
87 "Authorization": f"token {GITHUB_SNAPCRAFT_BOT_USER_TOKEN}"
88 }
89 response = requests.get(download_url, headers=headers)
91 if response.status_code == 200:
92 content = json.loads(response.text)
93 revisions = content["snaps"][snap_name]["revisions"]
95 if revision not in revisions:
96 raise NotFound
98 cve_list = revisions[revision]
100 fixed_cves = cve_list["fixed-cves"]
101 unfixed_cves = cve_list["unfixed-cves"]
103 cve_details = content["security_issues"]["cves"]
104 usn_details = content["security_issues"]["usns"]
106 unfixed = CveHelper._format_cve_response(
107 unfixed_cves, cve_details, usn_details, "unfixed"
108 )
109 fixed = CveHelper._format_cve_response(
110 fixed_cves, cve_details, usn_details, "fixed"
111 )
113 return fixed + unfixed
114 else:
115 raise NotFound
116 else:
117 raise NotFound
119 @staticmethod
120 def get_revisions_with_cves(snap_name):
121 try:
122 contents = CveHelper._get_cve_file_metadata(
123 f"snap-cves/{snap_name}"
124 )
126 # find all revision YAML files in the folder
127 # e.g., 123.yaml, 456.yaml, 789.yaml
128 # and extract the revision numbers
129 revision_files = [
130 int(match.group(1))
131 for item in contents
132 if (match := re.match(r"(\d+)\.yaml$", item["name"]))
133 ]
135 return revision_files
136 except NotFound:
137 return []
139 @staticmethod
140 def get_cve_with_revision(snap_name, revision):
141 file_metadata = CveHelper._get_cve_file_metadata(
142 "snap-cves/{}.json".format(snap_name)
143 )
145 if file_metadata:
146 file_content = CveHelper._fetch_file_content(
147 snap_name, revision, file_metadata
148 )
149 return file_content
150 return []
152 @staticmethod
153 def can_user_access_cve_data(
154 snap_name, snap_details, account_info, is_user_canonical
155 ):
156 snap_store = snap_details["store"]
157 snap_publisher = snap_details["publisher"]
159 admin_user_stores = logic.get_stores(
160 account_info["stores"], roles=["admin"]
161 )
162 is_user_admin = [
163 item for item in admin_user_stores if item["name"] == snap_store
164 ]
166 is_snap_in_global_store = snap_store == GLOBAL_STORE
168 is_snap_publisher_canonical = (
169 snap_publisher["id"] == CANONICAL_PUBLISHER_ID
170 )
172 is_user_snap_publisher = (
173 snap_publisher["username"] == account_info["username"]
174 )
176 is_user_collaborator = snap_name in account_info["snaps"]["16"]
178 is_privileged_user = is_user_snap_publisher or is_user_admin
179 is_user_canonical_publisher = (
180 is_snap_publisher_canonical and is_user_canonical
181 )
182 has_store_access = is_snap_in_global_store and (
183 is_user_collaborator or is_user_canonical_publisher
184 )
186 # To access the CVE data of a snap, a user must meet
187 # the following criteria:
188 # - For all stores, the user must be
189 # the publisher of the snap or have admin privileges.
190 # - For non-Canonical snaps published
191 # in the global store, the user must be a collaborator.
192 # - For Canonical snaps published
193 # in the global store, the user must be a Canonical publisher.
194 can_view_cves = is_privileged_user or has_store_access
196 return can_view_cves
198 @staticmethod
199 def _match_filters(
200 cve,
201 usn_ids,
202 binary_statuses,
203 binary_versions,
204 binary_fixed_versions,
205 binary_names,
206 cvss_severities,
207 ubuntu_priorities,
208 ):
209 if usn_ids:
210 if not cve.get("usns") or not any(
211 usn["id"] in usn_ids for usn in cve["usns"]
212 ):
213 return False
215 if cvss_severities and cve["cvss_severity"] not in cvss_severities:
216 return False
218 if (
219 ubuntu_priorities
220 and cve["ubuntu_priority"] not in ubuntu_priorities
221 ):
222 return False
224 if any(
225 [
226 binary_statuses,
227 binary_fixed_versions,
228 binary_versions,
229 binary_names,
230 ]
231 ):
232 if not cve.get("affected_binaries"):
233 return False
235 # Check if at least one affected binary matches the filters
236 for binary in cve["affected_binaries"]:
237 matches_binary = (
238 (
239 not binary_statuses
240 or binary["status"] in binary_statuses
241 )
242 and (
243 not binary_versions
244 or binary["version"] in binary_versions
245 )
246 and (
247 not binary_fixed_versions
248 or binary["fixed_version"] in binary_fixed_versions
249 )
250 and (not binary_names or binary["name"] in binary_names)
251 )
252 if matches_binary:
253 return True
254 return False
255 return True
257 @staticmethod
258 def filter_cve_data(
259 cves,
260 usn_ids,
261 binary_statuses,
262 binary_versions,
263 binary_fixed_versions,
264 binary_names,
265 cvss_severities,
266 ubuntu_priorities,
267 ):
268 return [
269 cve
270 for cve in cves
271 if CveHelper._match_filters(
272 cve,
273 usn_ids=usn_ids,
274 binary_fixed_versions=binary_fixed_versions,
275 binary_names=binary_names,
276 binary_statuses=binary_statuses,
277 binary_versions=binary_versions,
278 cvss_severities=cvss_severities,
279 ubuntu_priorities=ubuntu_priorities,
280 )
281 ]
283 @staticmethod
284 def sort_cve_data(cves, sort_by, order):
285 priority_order = {
286 "negligible": 0,
287 "low": 1,
288 "medium": 2,
289 "high": 3,
290 "critical": 4,
291 }
293 is_reverse_order = order.lower() == "desc"
295 if sort_by == "cvss_severity":
296 cves.sort(
297 key=lambda cve: priority_order.get(
298 cve.get("cvss_severity"), -1
299 ),
300 reverse=is_reverse_order,
301 )
303 elif sort_by == "ubuntu_priority":
304 cves.sort(
305 key=lambda cve: priority_order.get(
306 cve.get("ubuntu_priority"), -1
307 ),
308 reverse=is_reverse_order,
309 )
310 elif sort_by == "cvss_score":
311 cves.sort(
312 key=lambda cve: cve.get("cvss_score", 0),
313 reverse=is_reverse_order,
314 )
315 else:
316 cves.sort(
317 key=lambda cve: cve.get(sort_by, ""), reverse=is_reverse_order
318 )
319 return cves
321 @staticmethod
322 def paginate_cve_list(cves, page, page_size):
323 total_items = len(cves)
324 start = (page - 1) * page_size
325 end = start + page_size
327 return {
328 "page": page,
329 "page_size": page_size,
330 "total_items": total_items,
331 "total_pages": (total_items + page_size - 1) // page_size,
332 "data": cves[start:end],
333 }