Coverage for webapp / publisher / cve / cve_helper.py: 97%

126 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2025-12-29 22:06 +0000

1import json 

2from os import getenv 

3import requests 

4import re 

5 

6from werkzeug.exceptions import NotFound 

7 

8from webapp.publisher.snaps import ( 

9 logic, 

10) 

11 

12REST_API_URL = "https://api.github.com" 

13GITHUB_SNAPCRAFT_BOT_USER_TOKEN = getenv("GITHUB_SNAPCRAFT_BOT_USER_TOKEN") 

14GLOBAL_STORE = "Global" 

15CANONICAL_PUBLISHER_ID = "canonical" 

16 

17 

18class CveHelper: 

19 """ 

20 Provides CVE data through GitHub by using snapcraft-web@canonical.com. 

21 """ 

22 

23 @staticmethod 

24 def _get_cve_file_metadata(file_path): 

25 url = ( 

26 f"{REST_API_URL}/repos/canonical/canonicalwebteam.snap-cves/" 

27 f"contents/{file_path}?ref=main" 

28 ) 

29 headers = {"Authorization": f"token {GITHUB_SNAPCRAFT_BOT_USER_TOKEN}"} 

30 response = requests.get(url, headers=headers) 

31 

32 if response.status_code == 200: 

33 return response.json() 

34 else: 

35 raise NotFound 

36 

37 @staticmethod 

38 def _format_cve_response(cve_list, cve_details, usn_details, status): 

39 cves = [] 

40 

41 for cve_id, cve in cve_list.items(): 

42 cve_detail = cve_details[cve_id] 

43 

44 cve_usns = [] 

45 cve_usn_list = cve["usns"] 

46 if cve_usn_list: 

47 for usn_id in cve_usn_list: 

48 usn_detail = usn_details[usn_id] 

49 if usn_detail: 

50 cve_usns.append( 

51 { 

52 "id": usn_id, 

53 "description": usn_detail["description"], 

54 "published_at": usn_detail["published_at"], 

55 "related_launchpad_bugs": usn_detail[ 

56 "related_launchpad_bugs" 

57 ], 

58 } 

59 ) 

60 

61 channels_with_fix = None 

62 

63 if "channels_with_fix" in cve: 

64 channels_with_fix = cve["channels_with_fix"] 

65 

66 cves.append( 

67 { 

68 "id": cve_id, 

69 "status": status, 

70 "cvss_score": cve_detail["cvss_score"], 

71 "cvss_severity": cve_detail["cvss_severity"], 

72 "description": cve_detail["description"], 

73 "ubuntu_priority": cve_detail["ubuntu_priority"], 

74 "affected_binaries": cve["affected_binaries"], 

75 "channels_with_fix": channels_with_fix, 

76 "usns": cve_usns, 

77 } 

78 ) 

79 

80 return cves 

81 

82 @staticmethod 

83 def _fetch_file_content(snap_name, revision, file_metadata): 

84 if "download_url" in file_metadata: 

85 download_url = file_metadata["download_url"] 

86 headers = { 

87 "Authorization": f"token {GITHUB_SNAPCRAFT_BOT_USER_TOKEN}" 

88 } 

89 response = requests.get(download_url, headers=headers) 

90 

91 if response.status_code == 200: 

92 content = json.loads(response.text) 

93 revisions = content["snaps"][snap_name]["revisions"] 

94 

95 if revision not in revisions: 

96 raise NotFound 

97 

98 cve_list = revisions[revision] 

99 

100 fixed_cves = cve_list["fixed-cves"] 

101 unfixed_cves = cve_list["unfixed-cves"] 

102 

103 cve_details = content["security_issues"]["cves"] 

104 usn_details = content["security_issues"]["usns"] 

105 

106 unfixed = CveHelper._format_cve_response( 

107 unfixed_cves, cve_details, usn_details, "unfixed" 

108 ) 

109 fixed = CveHelper._format_cve_response( 

110 fixed_cves, cve_details, usn_details, "fixed" 

111 ) 

112 

113 return fixed + unfixed 

114 else: 

115 raise NotFound 

116 else: 

117 raise NotFound 

118 

119 @staticmethod 

120 def get_revisions_with_cves(snap_name): 

121 try: 

122 contents = CveHelper._get_cve_file_metadata( 

123 f"snap-cves/{snap_name}" 

124 ) 

125 

126 # find all revision YAML files in the folder 

127 # e.g., 123.yaml, 456.yaml, 789.yaml 

128 # and extract the revision numbers 

129 revision_files = [ 

130 int(match.group(1)) 

131 for item in contents 

132 if (match := re.match(r"(\d+)\.yaml$", item["name"])) 

133 ] 

134 

135 return revision_files 

136 except NotFound: 

137 return [] 

138 

139 @staticmethod 

140 def get_cve_with_revision(snap_name, revision): 

141 file_metadata = CveHelper._get_cve_file_metadata( 

142 "snap-cves/{}.json".format(snap_name) 

143 ) 

144 

145 if file_metadata: 

146 file_content = CveHelper._fetch_file_content( 

147 snap_name, revision, file_metadata 

148 ) 

149 return file_content 

150 return [] 

151 

152 @staticmethod 

153 def can_user_access_cve_data( 

154 snap_name, snap_details, account_info, is_user_canonical 

155 ): 

156 snap_store = snap_details["store"] 

157 snap_publisher = snap_details["publisher"] 

158 

159 admin_user_stores = logic.get_stores( 

160 account_info["stores"], roles=["admin"] 

161 ) 

162 is_user_admin = [ 

163 item for item in admin_user_stores if item["name"] == snap_store 

164 ] 

165 

166 is_snap_in_global_store = snap_store == GLOBAL_STORE 

167 

168 is_snap_publisher_canonical = ( 

169 snap_publisher["id"] == CANONICAL_PUBLISHER_ID 

170 ) 

171 

172 is_user_snap_publisher = ( 

173 snap_publisher["username"] == account_info["username"] 

174 ) 

175 

176 is_user_collaborator = snap_name in account_info["snaps"]["16"] 

177 

178 is_privileged_user = is_user_snap_publisher or is_user_admin 

179 is_user_canonical_publisher = ( 

180 is_snap_publisher_canonical and is_user_canonical 

181 ) 

182 has_store_access = is_snap_in_global_store and ( 

183 is_user_collaborator or is_user_canonical_publisher 

184 ) 

185 

186 # To access the CVE data of a snap, a user must meet 

187 # the following criteria: 

188 # - For all stores, the user must be 

189 # the publisher of the snap or have admin privileges. 

190 # - For non-Canonical snaps published 

191 # in the global store, the user must be a collaborator. 

192 # - For Canonical snaps published 

193 # in the global store, the user must be a Canonical publisher. 

194 can_view_cves = is_privileged_user or has_store_access 

195 

196 return can_view_cves 

197 

198 @staticmethod 

199 def _match_filters( 

200 cve, 

201 usn_ids, 

202 binary_statuses, 

203 binary_versions, 

204 binary_fixed_versions, 

205 binary_names, 

206 cvss_severities, 

207 ubuntu_priorities, 

208 ): 

209 if usn_ids: 

210 if not cve.get("usns") or not any( 

211 usn["id"] in usn_ids for usn in cve["usns"] 

212 ): 

213 return False 

214 

215 if cvss_severities and cve["cvss_severity"] not in cvss_severities: 

216 return False 

217 

218 if ( 

219 ubuntu_priorities 

220 and cve["ubuntu_priority"] not in ubuntu_priorities 

221 ): 

222 return False 

223 

224 if any( 

225 [ 

226 binary_statuses, 

227 binary_fixed_versions, 

228 binary_versions, 

229 binary_names, 

230 ] 

231 ): 

232 if not cve.get("affected_binaries"): 

233 return False 

234 

235 # Check if at least one affected binary matches the filters 

236 for binary in cve["affected_binaries"]: 

237 matches_binary = ( 

238 ( 

239 not binary_statuses 

240 or binary["status"] in binary_statuses 

241 ) 

242 and ( 

243 not binary_versions 

244 or binary["version"] in binary_versions 

245 ) 

246 and ( 

247 not binary_fixed_versions 

248 or binary["fixed_version"] in binary_fixed_versions 

249 ) 

250 and (not binary_names or binary["name"] in binary_names) 

251 ) 

252 if matches_binary: 

253 return True 

254 return False 

255 return True 

256 

257 @staticmethod 

258 def filter_cve_data( 

259 cves, 

260 usn_ids, 

261 binary_statuses, 

262 binary_versions, 

263 binary_fixed_versions, 

264 binary_names, 

265 cvss_severities, 

266 ubuntu_priorities, 

267 ): 

268 return [ 

269 cve 

270 for cve in cves 

271 if CveHelper._match_filters( 

272 cve, 

273 usn_ids=usn_ids, 

274 binary_fixed_versions=binary_fixed_versions, 

275 binary_names=binary_names, 

276 binary_statuses=binary_statuses, 

277 binary_versions=binary_versions, 

278 cvss_severities=cvss_severities, 

279 ubuntu_priorities=ubuntu_priorities, 

280 ) 

281 ] 

282 

283 @staticmethod 

284 def sort_cve_data(cves, sort_by, order): 

285 priority_order = { 

286 "negligible": 0, 

287 "low": 1, 

288 "medium": 2, 

289 "high": 3, 

290 "critical": 4, 

291 } 

292 

293 is_reverse_order = order.lower() == "desc" 

294 

295 if sort_by == "cvss_severity": 

296 cves.sort( 

297 key=lambda cve: priority_order.get( 

298 cve.get("cvss_severity"), -1 

299 ), 

300 reverse=is_reverse_order, 

301 ) 

302 

303 elif sort_by == "ubuntu_priority": 

304 cves.sort( 

305 key=lambda cve: priority_order.get( 

306 cve.get("ubuntu_priority"), -1 

307 ), 

308 reverse=is_reverse_order, 

309 ) 

310 elif sort_by == "cvss_score": 

311 cves.sort( 

312 key=lambda cve: cve.get("cvss_score", 0), 

313 reverse=is_reverse_order, 

314 ) 

315 else: 

316 cves.sort( 

317 key=lambda cve: cve.get(sort_by, ""), reverse=is_reverse_order 

318 ) 

319 return cves 

320 

321 @staticmethod 

322 def paginate_cve_list(cves, page, page_size): 

323 total_items = len(cves) 

324 start = (page - 1) * page_size 

325 end = start + page_size 

326 

327 return { 

328 "page": page, 

329 "page_size": page_size, 

330 "total_items": total_items, 

331 "total_pages": (total_items + page_size - 1) // page_size, 

332 "data": cves[start:end], 

333 }