Coverage for webapp/publisher/cve/cve_helper.py: 97%
125 statements
« prev ^ index » next coverage.py v7.10.5, created at 2025-08-26 22:06 +0000
« prev ^ index » next coverage.py v7.10.5, created at 2025-08-26 22:06 +0000
1import json
2from os import getenv
3import requests
4import re
6from werkzeug.exceptions import NotFound
8from webapp.publisher.snaps import (
9 logic,
10)
12REST_API_URL = "https://api.github.com"
13GITHUB_SNAPCRAFT_BOT_USER_TOKEN = getenv("GITHUB_SNAPCRAFT_BOT_USER_TOKEN")
14GLOBAL_STORE = "Global"
15CANONICAL_PUBLISHER_ID = "canonical"
18class CveHelper:
19 """
20 Provides CVE data through GitHub by using snapcraft-web@canonical.com.
21 """
23 @staticmethod
24 def _get_cve_file_metadata(file_path):
25 url = (
26 f"{REST_API_URL}/repos/canonical/canonicalwebteam.snap-cves/"
27 f"contents/{file_path}?ref=main"
28 )
29 headers = {"Authorization": f"token {GITHUB_SNAPCRAFT_BOT_USER_TOKEN}"}
30 response = requests.get(url, headers=headers)
32 if response.status_code == 200:
33 return response.json()
34 else:
35 raise NotFound
37 @staticmethod
38 def _format_cve_response(cve_list, cve_details, usn_details, status):
39 cves = []
41 for cve_id, cve in cve_list.items():
42 cve_detail = cve_details[cve_id]
44 cve_usns = []
45 cve_usn_list = cve["usns"]
46 if cve_usn_list:
47 for usn_id in cve_usn_list:
48 usn_detail = usn_details[usn_id]
49 if usn_detail:
50 cve_usns.append(
51 {
52 "id": usn_id,
53 "description": usn_detail["description"],
54 "published_at": usn_detail["published_at"],
55 "related_launchpad_bugs": usn_detail[
56 "related_launchpad_bugs"
57 ],
58 }
59 )
61 channels_with_fix = None
63 if "channels_with_fix" in cve:
64 channels_with_fix = cve["channels_with_fix"]
66 cves.append(
67 {
68 "id": cve_id,
69 "status": status,
70 "cvss_score": cve_detail["cvss_score"],
71 "cvss_severity": cve_detail["cvss_severity"],
72 "description": cve_detail["description"],
73 "ubuntu_priority": cve_detail["ubuntu_priority"],
74 "affected_binaries": cve["affected_binaries"],
75 "channels_with_fix": channels_with_fix,
76 "usns": cve_usns,
77 }
78 )
80 return cves
82 @staticmethod
83 def _fetch_file_content(snap_name, revision, file_metadata):
84 if "download_url" in file_metadata:
85 download_url = file_metadata["download_url"]
86 headers = {
87 "Authorization": f"token {GITHUB_SNAPCRAFT_BOT_USER_TOKEN}"
88 }
89 response = requests.get(download_url, headers=headers)
91 if response.status_code == 200:
92 content = json.loads(response.text)
93 revisions = content["snaps"][snap_name]["revisions"]
95 if revision not in revisions:
96 raise NotFound
98 cve_list = revisions[revision]
100 fixed_cves = cve_list["fixed-cves"]
101 unfixed_cves = cve_list["unfixed-cves"]
103 cve_details = content["security_issues"]["cves"]
104 usn_details = content["security_issues"]["usns"]
106 unfixed = CveHelper._format_cve_response(
107 unfixed_cves, cve_details, usn_details, "unfixed"
108 )
109 fixed = CveHelper._format_cve_response(
110 fixed_cves, cve_details, usn_details, "fixed"
111 )
113 return fixed + unfixed
114 else:
115 raise NotFound
116 else:
117 raise NotFound
119 @staticmethod
120 def get_revisions_with_cves(snap_name):
121 try:
122 contents = CveHelper._get_cve_file_metadata(
123 f"snap-cves/{snap_name}"
124 )
126 # find all revision YAML files in the folder
127 # e.g., 123.yaml, 456.yaml, 789.yaml
128 # and extract the revision numbers
129 revision_files = [
130 int(match.group(1))
131 for item in contents
132 if (match := re.match(r"(\d+)\.yaml$", item["name"]))
133 ]
135 return revision_files
136 except NotFound:
137 return []
139 @staticmethod
140 def get_cve_with_revision(snap_name, revision):
141 file_metadata = CveHelper._get_cve_file_metadata(
142 "snap-cves/{}.json".format(snap_name)
143 )
145 if file_metadata:
146 return CveHelper._fetch_file_content(
147 snap_name, revision, file_metadata
148 )
149 return []
151 @staticmethod
152 def can_user_access_cve_data(
153 snap_name, snap_details, account_info, is_user_canonical
154 ):
155 snap_store = snap_details["store"]
156 snap_publisher = snap_details["publisher"]
158 admin_user_stores = logic.get_stores(
159 account_info["stores"], roles=["admin"]
160 )
161 is_user_admin = [
162 item for item in admin_user_stores if item["name"] == snap_store
163 ]
165 is_snap_in_global_store = snap_store == GLOBAL_STORE
167 is_snap_publisher_canonical = (
168 snap_publisher["id"] == CANONICAL_PUBLISHER_ID
169 )
171 is_user_snap_publisher = (
172 snap_publisher["username"] == account_info["username"]
173 )
175 is_user_collaborator = snap_name in account_info["snaps"]["16"]
177 is_privileged_user = is_user_snap_publisher or is_user_admin
178 is_user_canonical_publisher = (
179 is_snap_publisher_canonical and is_user_canonical
180 )
181 has_store_access = is_snap_in_global_store and (
182 is_user_collaborator or is_user_canonical_publisher
183 )
185 # To access the CVE data of a snap, a user must meet
186 # the following criteria:
187 # - For all stores, the user must be
188 # the publisher of the snap or have admin privileges.
189 # - For non-Canonical snaps published
190 # in the global store, the user must be a collaborator.
191 # - For Canonical snaps published
192 # in the global store, the user must be a Canonical publisher.
193 can_view_cves = is_privileged_user or has_store_access
195 return can_view_cves
197 @staticmethod
198 def _match_filters(
199 cve,
200 usn_ids,
201 binary_statuses,
202 binary_versions,
203 binary_fixed_versions,
204 binary_names,
205 cvss_severities,
206 ubuntu_priorities,
207 ):
208 if usn_ids:
209 if not cve.get("usns") or not any(
210 usn["id"] in usn_ids for usn in cve["usns"]
211 ):
212 return False
214 if cvss_severities and cve["cvss_severity"] not in cvss_severities:
215 return False
217 if (
218 ubuntu_priorities
219 and cve["ubuntu_priority"] not in ubuntu_priorities
220 ):
221 return False
223 if any(
224 [
225 binary_statuses,
226 binary_fixed_versions,
227 binary_versions,
228 binary_names,
229 ]
230 ):
231 if not cve.get("affected_binaries"):
232 return False
234 # Check if at least one affected binary matches the filters
235 for binary in cve["affected_binaries"]:
236 matches_binary = (
237 (
238 not binary_statuses
239 or binary["status"] in binary_statuses
240 )
241 and (
242 not binary_versions
243 or binary["version"] in binary_versions
244 )
245 and (
246 not binary_fixed_versions
247 or binary["fixed_version"] in binary_fixed_versions
248 )
249 and (not binary_names or binary["name"] in binary_names)
250 )
251 if matches_binary:
252 return True
253 return False
254 return True
256 @staticmethod
257 def filter_cve_data(
258 cves,
259 usn_ids,
260 binary_statuses,
261 binary_versions,
262 binary_fixed_versions,
263 binary_names,
264 cvss_severities,
265 ubuntu_priorities,
266 ):
267 return [
268 cve
269 for cve in cves
270 if CveHelper._match_filters(
271 cve,
272 usn_ids=usn_ids,
273 binary_fixed_versions=binary_fixed_versions,
274 binary_names=binary_names,
275 binary_statuses=binary_statuses,
276 binary_versions=binary_versions,
277 cvss_severities=cvss_severities,
278 ubuntu_priorities=ubuntu_priorities,
279 )
280 ]
282 @staticmethod
283 def sort_cve_data(cves, sort_by, order):
284 priority_order = {
285 "negligible": 0,
286 "low": 1,
287 "medium": 2,
288 "high": 3,
289 "critical": 4,
290 }
292 is_reverse_order = order.lower() == "desc"
294 if sort_by == "cvss_severity":
295 cves.sort(
296 key=lambda cve: priority_order.get(
297 cve.get("cvss_severity"), -1
298 ),
299 reverse=is_reverse_order,
300 )
302 elif sort_by == "ubuntu_priority":
303 cves.sort(
304 key=lambda cve: priority_order.get(
305 cve.get("ubuntu_priority"), -1
306 ),
307 reverse=is_reverse_order,
308 )
309 elif sort_by == "cvss_score":
310 cves.sort(
311 key=lambda cve: cve.get("cvss_score", 0),
312 reverse=is_reverse_order,
313 )
314 else:
315 cves.sort(
316 key=lambda cve: cve.get(sort_by, ""), reverse=is_reverse_order
317 )
318 return cves
320 @staticmethod
321 def paginate_cve_list(cves, page, page_size):
322 total_items = len(cves)
323 start = (page - 1) * page_size
324 end = start + page_size
326 return {
327 "page": page,
328 "page_size": page_size,
329 "total_items": total_items,
330 "total_pages": (total_items + page_size - 1) // page_size,
331 "data": cves[start:end],
332 }