Coverage for webapp/publisher/cve/cve_helper.py: 97%
122 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-14 22:06 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-14 22:06 +0000
1import json
2from os import getenv
3import requests
4import re
6from werkzeug.exceptions import NotFound
8from webapp.publisher.snaps import (
9 logic,
10)
12REST_API_URL = "https://api.github.com"
13GITHUB_SNAPCRAFT_BOT_USER_TOKEN = getenv("GITHUB_SNAPCRAFT_BOT_USER_TOKEN")
14GLOBAL_STORE = "Global"
15CANONICAL_PUBLISHER_ID = "canonical"
18class CveHelper:
19 """
20 Provides CVE data through GitHub by using snapcraft-web@canonical.com.
21 """
23 @staticmethod
24 def _get_cve_file_metadata(file_path):
25 url = (
26 f"{REST_API_URL}/repos/canonical/canonicalwebteam.snap-cves/"
27 f"contents/{file_path}?ref=main"
28 )
29 headers = {"Authorization": f"token {GITHUB_SNAPCRAFT_BOT_USER_TOKEN}"}
30 response = requests.get(url, headers=headers)
32 if response.status_code == 200:
33 return response.json()
34 else:
35 raise NotFound
37 @staticmethod
38 def _format_cve_response(cve_list, cve_details, usn_details, status):
39 cves = []
41 for cve_id, cve in cve_list.items():
42 cve_detail = cve_details[cve_id]
44 cve_usns = []
45 cve_usn_list = cve["usns"]
46 if cve_usn_list:
47 for usn_id in cve_usn_list:
48 usn_detail = usn_details[usn_id]
49 if usn_detail:
50 cve_usns.append(
51 {
52 "id": usn_id,
53 "description": usn_detail["description"],
54 "published_at": usn_detail["published_at"],
55 "related_launchpad_bugs": usn_detail[
56 "related_launchpad_bugs"
57 ],
58 }
59 )
61 cves.append(
62 {
63 "id": cve_id,
64 "status": status,
65 "cvss_score": cve_detail["cvss_score"],
66 "cvss_severity": cve_detail["cvss_severity"],
67 "description": cve_detail["description"],
68 "ubuntu_priority": cve_detail["ubuntu_priority"],
69 "affected_binaries": cve["affected_binaries"],
70 "channels_with_fix": cve["channels_with_fix"],
71 "usns": cve_usns,
72 }
73 )
75 return cves
77 @staticmethod
78 def _fetch_file_content(snap_name, revision, file_metadata):
79 if "download_url" in file_metadata:
80 download_url = file_metadata["download_url"]
81 headers = {
82 "Authorization": f"token {GITHUB_SNAPCRAFT_BOT_USER_TOKEN}"
83 }
84 response = requests.get(download_url, headers=headers)
86 if response.status_code == 200:
87 content = json.loads(response.text)
88 revisions = content["snaps"][snap_name]["revisions"]
90 if revision not in revisions:
91 raise NotFound
93 cve_list = revisions[revision]
95 fixed_cves = cve_list["fixed-cves"]
96 unfixed_cves = cve_list["unfixed-cves"]
98 cve_details = content["security_issues"]["cves"]
99 usn_details = content["security_issues"]["usns"]
101 unfixed = CveHelper._format_cve_response(
102 unfixed_cves, cve_details, usn_details, "unfixed"
103 )
104 fixed = CveHelper._format_cve_response(
105 fixed_cves, cve_details, usn_details, "fixed"
106 )
108 return fixed + unfixed
109 else:
110 raise NotFound
111 else:
112 raise NotFound
114 @staticmethod
115 def get_revisions_with_cves(snap_name):
116 try:
117 contents = CveHelper._get_cve_file_metadata(
118 f"snap-cves/{snap_name}"
119 )
121 # find all revision YAML files in the folder
122 # e.g., 123.yaml, 456.yaml, 789.yaml
123 # and extract the revision numbers
124 revision_files = [
125 int(match.group(1))
126 for item in contents
127 if (match := re.match(r"(\d+)\.yaml$", item["name"]))
128 ]
130 return revision_files
131 except NotFound:
132 return []
134 @staticmethod
135 def get_cve_with_revision(snap_name, revision):
136 file_metadata = CveHelper._get_cve_file_metadata(
137 "snap-cves/{}.json".format(snap_name)
138 )
140 if file_metadata:
141 return CveHelper._fetch_file_content(
142 snap_name, revision, file_metadata
143 )
144 return []
146 @staticmethod
147 def can_user_access_cve_data(
148 snap_name, snap_details, account_info, is_user_canonical
149 ):
150 snap_store = snap_details["store"]
151 snap_publisher = snap_details["publisher"]
153 admin_user_stores = logic.get_stores(
154 account_info["stores"], roles=["admin"]
155 )
156 is_user_admin = [
157 item for item in admin_user_stores if item["name"] == snap_store
158 ]
160 is_snap_in_global_store = snap_store == GLOBAL_STORE
162 is_snap_publisher_canonical = (
163 snap_publisher["id"] == CANONICAL_PUBLISHER_ID
164 )
166 is_user_snap_publisher = (
167 snap_publisher["username"] == account_info["username"]
168 )
170 is_user_collaborator = snap_name in account_info["snaps"]["16"]
172 is_privileged_user = is_user_snap_publisher or is_user_admin
173 is_user_canonical_publisher = (
174 is_snap_publisher_canonical and is_user_canonical
175 )
176 has_store_access = is_snap_in_global_store and (
177 is_user_collaborator or is_user_canonical_publisher
178 )
180 # To access the CVE data of a snap, a user must meet
181 # the following criteria:
182 # - For all stores, the user must be
183 # the publisher of the snap or have admin privileges.
184 # - For non-Canonical snaps published
185 # in the global store, the user must be a collaborator.
186 # - For Canonical snaps published
187 # in the global store, the user must be a Canonical publisher.
188 can_view_cves = is_privileged_user or has_store_access
190 return can_view_cves
192 @staticmethod
193 def _match_filters(
194 cve,
195 usn_ids,
196 binary_statuses,
197 binary_versions,
198 binary_fixed_versions,
199 binary_names,
200 cvss_severities,
201 ubuntu_priorities,
202 ):
203 if usn_ids:
204 if not cve.get("usns") or not any(
205 usn["id"] in usn_ids for usn in cve["usns"]
206 ):
207 return False
209 if cvss_severities and cve["cvss_severity"] not in cvss_severities:
210 return False
212 if (
213 ubuntu_priorities
214 and cve["ubuntu_priority"] not in ubuntu_priorities
215 ):
216 return False
218 if any(
219 [
220 binary_statuses,
221 binary_fixed_versions,
222 binary_versions,
223 binary_names,
224 ]
225 ):
226 if not cve.get("affected_binaries"):
227 return False
229 # Check if at least one affected binary matches the filters
230 for binary in cve["affected_binaries"]:
231 matches_binary = (
232 (
233 not binary_statuses
234 or binary["status"] in binary_statuses
235 )
236 and (
237 not binary_versions
238 or binary["version"] in binary_versions
239 )
240 and (
241 not binary_fixed_versions
242 or binary["fixed_version"] in binary_fixed_versions
243 )
244 and (not binary_names or binary["name"] in binary_names)
245 )
246 if matches_binary:
247 return True
248 return False
249 return True
251 @staticmethod
252 def filter_cve_data(
253 cves,
254 usn_ids,
255 binary_statuses,
256 binary_versions,
257 binary_fixed_versions,
258 binary_names,
259 cvss_severities,
260 ubuntu_priorities,
261 ):
262 return [
263 cve
264 for cve in cves
265 if CveHelper._match_filters(
266 cve,
267 usn_ids=usn_ids,
268 binary_fixed_versions=binary_fixed_versions,
269 binary_names=binary_names,
270 binary_statuses=binary_statuses,
271 binary_versions=binary_versions,
272 cvss_severities=cvss_severities,
273 ubuntu_priorities=ubuntu_priorities,
274 )
275 ]
277 @staticmethod
278 def sort_cve_data(cves, sort_by, order):
279 priority_order = {
280 "negligible": 0,
281 "low": 1,
282 "medium": 2,
283 "high": 3,
284 "critical": 4,
285 }
287 is_reverse_order = order.lower() == "desc"
289 if sort_by == "cvss_severity":
290 cves.sort(
291 key=lambda cve: priority_order.get(
292 cve.get("cvss_severity"), -1
293 ),
294 reverse=is_reverse_order,
295 )
297 elif sort_by == "ubuntu_priority":
298 cves.sort(
299 key=lambda cve: priority_order.get(
300 cve.get("ubuntu_priority"), -1
301 ),
302 reverse=is_reverse_order,
303 )
304 elif sort_by == "cvss_score":
305 cves.sort(
306 key=lambda cve: cve.get("cvss_score", 0),
307 reverse=is_reverse_order,
308 )
309 else:
310 cves.sort(
311 key=lambda cve: cve.get(sort_by, ""), reverse=is_reverse_order
312 )
313 return cves
315 @staticmethod
316 def paginate_cve_list(cves, page, page_size):
317 total_items = len(cves)
318 start = (page - 1) * page_size
319 end = start + page_size
321 return {
322 "page": page,
323 "page_size": page_size,
324 "total_items": total_items,
325 "total_pages": (total_items + page_size - 1) // page_size,
326 "data": cves[start:end],
327 }