Coverage for webapp/publisher/cve/cve_helper.py: 97%
122 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-28 22:05 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-28 22:05 +0000
1import json
2from os import getenv
3import requests
4import re
6from werkzeug.exceptions import NotFound
8from webapp.publisher.snaps import (
9 logic,
10)
12REST_API_URL = "https://api.github.com"
13GITHUB_SNAPCRAFT_BOT_USER_TOKEN = getenv("GITHUB_SNAPCRAFT_BOT_USER_TOKEN")
14GLOBAL_STORE = "Global"
15CANONICAL_PUBLISHER_ID = "canonical"
18class CveHelper:
19 """
20 Provides CVE data through GitHub by using snapcraft-web@canonical.com.
21 """
23 @staticmethod
24 def _get_cve_file_metadata(file_path):
25 url = (
26 f"{REST_API_URL}/repos/canonical/canonicalwebteam.snap-cves/"
27 f"contents/{file_path}?ref=main"
28 )
29 headers = {"Authorization": f"token {GITHUB_SNAPCRAFT_BOT_USER_TOKEN}"}
30 response = requests.get(url, headers=headers)
32 if response.status_code == 200:
33 return response.json()
34 else:
35 raise NotFound
37 @staticmethod
38 def _format_cve_response(cve_list, cve_details, usn_details):
39 cves = []
41 for cve_id, cve in cve_list.items():
42 cve_detail = cve_details[cve_id]
44 cve_usns = []
45 cve_usn_list = cve["usns"]
46 if cve_usn_list:
47 for usn_id in cve_usn_list:
48 usn_detail = usn_details[usn_id]
49 if usn_detail:
50 cve_usns.append(
51 {
52 "id": usn_id,
53 "description": usn_detail["description"],
54 "published_at": usn_detail["published_at"],
55 "related_launchpad_bugs": usn_detail[
56 "related_launchpad_bugs"
57 ],
58 }
59 )
61 cves.append(
62 {
63 "id": cve_id,
64 "cvss_score": cve_detail["cvss_score"],
65 "cvss_severity": cve_detail["cvss_severity"],
66 "description": cve_detail["description"],
67 "ubuntu_priority": cve_detail["ubuntu_priority"],
68 "affected_binaries": cve["affected_binaries"],
69 "channels_with_fix": cve["channels_with_fix"],
70 "usns": cve_usns,
71 }
72 )
74 return cves
76 @staticmethod
77 def _fetch_file_content(snap_name, revision, file_metadata):
78 if "download_url" in file_metadata:
79 download_url = file_metadata["download_url"]
80 headers = {
81 "Authorization": f"token {GITHUB_SNAPCRAFT_BOT_USER_TOKEN}"
82 }
83 response = requests.get(download_url, headers=headers)
85 if response.status_code == 200:
86 content = json.loads(response.text)
87 revisions = content["snaps"][snap_name]["revisions"]
89 if revision not in revisions:
90 raise NotFound
92 cve_list = revisions[revision]
94 fixed_cves = cve_list["fixed-cves"]
95 unfixed_cves = cve_list["unfixed-cves"]
97 cve_details = content["security_issues"]["cves"]
98 usn_details = content["security_issues"]["usns"]
100 unfixed = CveHelper._format_cve_response(
101 unfixed_cves, cve_details, usn_details
102 )
103 fixed = CveHelper._format_cve_response(
104 fixed_cves, cve_details, usn_details
105 )
107 return fixed + unfixed
108 else:
109 raise NotFound
110 else:
111 raise NotFound
113 @staticmethod
114 def get_revisions_with_cves(snap_name):
115 try:
116 contents = CveHelper._get_cve_file_metadata(
117 f"snap-cves/{snap_name}"
118 )
120 # find all revision YAML files in the folder
121 # e.g., 123.yaml, 456.yaml, 789.yaml
122 # and extract the revision numbers
123 revision_files = [
124 int(match.group(1))
125 for item in contents
126 if (match := re.match(r"(\d+)\.yaml$", item["name"]))
127 ]
129 return revision_files
130 except NotFound:
131 return []
133 @staticmethod
134 def get_cve_with_revision(snap_name, revision):
135 file_metadata = CveHelper._get_cve_file_metadata(
136 "snap-cves/{}.json".format(snap_name)
137 )
139 if file_metadata:
140 return CveHelper._fetch_file_content(
141 snap_name, revision, file_metadata
142 )
143 return []
145 @staticmethod
146 def can_user_access_cve_data(
147 snap_name, snap_details, account_info, is_user_canonical
148 ):
149 snap_store = snap_details["store"]
150 snap_publisher = snap_details["publisher"]
152 admin_user_stores = logic.get_stores(
153 account_info["stores"], roles=["admin"]
154 )
155 is_user_admin = [
156 item for item in admin_user_stores if item["name"] == snap_store
157 ]
159 is_snap_in_global_store = snap_store == GLOBAL_STORE
161 is_snap_publisher_canonical = (
162 snap_publisher["id"] == CANONICAL_PUBLISHER_ID
163 )
165 is_user_snap_publisher = (
166 snap_publisher["username"] == account_info["username"]
167 )
169 is_user_collaborator = snap_name in account_info["snaps"]["16"]
171 is_privileged_user = is_user_snap_publisher or is_user_admin
172 is_user_canonical_publisher = (
173 is_snap_publisher_canonical and is_user_canonical
174 )
175 has_store_access = is_snap_in_global_store and (
176 is_user_collaborator or is_user_canonical_publisher
177 )
179 # To access the CVE data of a snap, a user must meet
180 # the following criteria:
181 # - For all stores, the user must be
182 # the publisher of the snap or have admin privileges.
183 # - For non-Canonical snaps published
184 # in the global store, the user must be a collaborator.
185 # - For Canonical snaps published
186 # in the global store, the user must be a Canonical publisher.
187 can_view_cves = is_privileged_user or has_store_access
189 return can_view_cves
191 @staticmethod
192 def _match_filters(
193 cve,
194 usn_ids,
195 binary_statuses,
196 binary_versions,
197 binary_fixed_versions,
198 binary_names,
199 cvss_severities,
200 ubuntu_priorities,
201 ):
202 if usn_ids:
203 if not cve.get("usns") or not any(
204 usn["id"] in usn_ids for usn in cve["usns"]
205 ):
206 return False
208 if cvss_severities and cve["cvss_severity"] not in cvss_severities:
209 return False
211 if (
212 ubuntu_priorities
213 and cve["ubuntu_priority"] not in ubuntu_priorities
214 ):
215 return False
217 if any(
218 [
219 binary_statuses,
220 binary_fixed_versions,
221 binary_versions,
222 binary_names,
223 ]
224 ):
225 if not cve.get("affected_binaries"):
226 return False
228 # Check if at least one affected binary matches the filters
229 for binary in cve["affected_binaries"]:
230 matches_binary = (
231 (
232 not binary_statuses
233 or binary["status"] in binary_statuses
234 )
235 and (
236 not binary_versions
237 or binary["version"] in binary_versions
238 )
239 and (
240 not binary_fixed_versions
241 or binary["fixed_version"] in binary_fixed_versions
242 )
243 and (not binary_names or binary["name"] in binary_names)
244 )
245 if matches_binary:
246 return True
247 return False
248 return True
250 @staticmethod
251 def filter_cve_data(
252 cves,
253 usn_ids,
254 binary_statuses,
255 binary_versions,
256 binary_fixed_versions,
257 binary_names,
258 cvss_severities,
259 ubuntu_priorities,
260 ):
261 return [
262 cve
263 for cve in cves
264 if CveHelper._match_filters(
265 cve,
266 usn_ids=usn_ids,
267 binary_fixed_versions=binary_fixed_versions,
268 binary_names=binary_names,
269 binary_statuses=binary_statuses,
270 binary_versions=binary_versions,
271 cvss_severities=cvss_severities,
272 ubuntu_priorities=ubuntu_priorities,
273 )
274 ]
276 @staticmethod
277 def sort_cve_data(cves, sort_by, order):
278 priority_order = {
279 "negligible": 0,
280 "low": 1,
281 "medium": 2,
282 "high": 3,
283 "critical": 4,
284 }
286 is_reverse_order = order.lower() == "desc"
288 if sort_by == "cvss_severity":
289 cves.sort(
290 key=lambda cve: priority_order.get(
291 cve.get("cvss_severity"), -1
292 ),
293 reverse=is_reverse_order,
294 )
296 elif sort_by == "ubuntu_priority":
297 cves.sort(
298 key=lambda cve: priority_order.get(
299 cve.get("ubuntu_priority"), -1
300 ),
301 reverse=is_reverse_order,
302 )
303 elif sort_by == "cvss_score":
304 cves.sort(
305 key=lambda cve: cve.get("cvss_score", 0),
306 reverse=is_reverse_order,
307 )
308 else:
309 cves.sort(
310 key=lambda cve: cve.get(sort_by, ""), reverse=is_reverse_order
311 )
312 return cves
314 @staticmethod
315 def paginate_cve_list(cves, page, page_size):
316 total_items = len(cves)
317 start = (page - 1) * page_size
318 end = start + page_size
320 return {
321 "page": page,
322 "page_size": page_size,
323 "total_items": total_items,
324 "total_pages": (total_items + page_size - 1) // page_size,
325 "data": cves[start:end],
326 }