Coverage for webapp/publisher/cve/cve_helper.py: 97%

125 statements  

« prev     ^ index     » next       coverage.py v7.10.5, created at 2025-08-26 22:06 +0000

1import json 

2from os import getenv 

3import requests 

4import re 

5 

6from werkzeug.exceptions import NotFound 

7 

8from webapp.publisher.snaps import ( 

9 logic, 

10) 

11 

12REST_API_URL = "https://api.github.com" 

13GITHUB_SNAPCRAFT_BOT_USER_TOKEN = getenv("GITHUB_SNAPCRAFT_BOT_USER_TOKEN") 

14GLOBAL_STORE = "Global" 

15CANONICAL_PUBLISHER_ID = "canonical" 

16 

17 

18class CveHelper: 

19 """ 

20 Provides CVE data through GitHub by using snapcraft-web@canonical.com. 

21 """ 

22 

23 @staticmethod 

24 def _get_cve_file_metadata(file_path): 

25 url = ( 

26 f"{REST_API_URL}/repos/canonical/canonicalwebteam.snap-cves/" 

27 f"contents/{file_path}?ref=main" 

28 ) 

29 headers = {"Authorization": f"token {GITHUB_SNAPCRAFT_BOT_USER_TOKEN}"} 

30 response = requests.get(url, headers=headers) 

31 

32 if response.status_code == 200: 

33 return response.json() 

34 else: 

35 raise NotFound 

36 

37 @staticmethod 

38 def _format_cve_response(cve_list, cve_details, usn_details, status): 

39 cves = [] 

40 

41 for cve_id, cve in cve_list.items(): 

42 cve_detail = cve_details[cve_id] 

43 

44 cve_usns = [] 

45 cve_usn_list = cve["usns"] 

46 if cve_usn_list: 

47 for usn_id in cve_usn_list: 

48 usn_detail = usn_details[usn_id] 

49 if usn_detail: 

50 cve_usns.append( 

51 { 

52 "id": usn_id, 

53 "description": usn_detail["description"], 

54 "published_at": usn_detail["published_at"], 

55 "related_launchpad_bugs": usn_detail[ 

56 "related_launchpad_bugs" 

57 ], 

58 } 

59 ) 

60 

61 channels_with_fix = None 

62 

63 if "channels_with_fix" in cve: 

64 channels_with_fix = cve["channels_with_fix"] 

65 

66 cves.append( 

67 { 

68 "id": cve_id, 

69 "status": status, 

70 "cvss_score": cve_detail["cvss_score"], 

71 "cvss_severity": cve_detail["cvss_severity"], 

72 "description": cve_detail["description"], 

73 "ubuntu_priority": cve_detail["ubuntu_priority"], 

74 "affected_binaries": cve["affected_binaries"], 

75 "channels_with_fix": channels_with_fix, 

76 "usns": cve_usns, 

77 } 

78 ) 

79 

80 return cves 

81 

82 @staticmethod 

83 def _fetch_file_content(snap_name, revision, file_metadata): 

84 if "download_url" in file_metadata: 

85 download_url = file_metadata["download_url"] 

86 headers = { 

87 "Authorization": f"token {GITHUB_SNAPCRAFT_BOT_USER_TOKEN}" 

88 } 

89 response = requests.get(download_url, headers=headers) 

90 

91 if response.status_code == 200: 

92 content = json.loads(response.text) 

93 revisions = content["snaps"][snap_name]["revisions"] 

94 

95 if revision not in revisions: 

96 raise NotFound 

97 

98 cve_list = revisions[revision] 

99 

100 fixed_cves = cve_list["fixed-cves"] 

101 unfixed_cves = cve_list["unfixed-cves"] 

102 

103 cve_details = content["security_issues"]["cves"] 

104 usn_details = content["security_issues"]["usns"] 

105 

106 unfixed = CveHelper._format_cve_response( 

107 unfixed_cves, cve_details, usn_details, "unfixed" 

108 ) 

109 fixed = CveHelper._format_cve_response( 

110 fixed_cves, cve_details, usn_details, "fixed" 

111 ) 

112 

113 return fixed + unfixed 

114 else: 

115 raise NotFound 

116 else: 

117 raise NotFound 

118 

119 @staticmethod 

120 def get_revisions_with_cves(snap_name): 

121 try: 

122 contents = CveHelper._get_cve_file_metadata( 

123 f"snap-cves/{snap_name}" 

124 ) 

125 

126 # find all revision YAML files in the folder 

127 # e.g., 123.yaml, 456.yaml, 789.yaml 

128 # and extract the revision numbers 

129 revision_files = [ 

130 int(match.group(1)) 

131 for item in contents 

132 if (match := re.match(r"(\d+)\.yaml$", item["name"])) 

133 ] 

134 

135 return revision_files 

136 except NotFound: 

137 return [] 

138 

139 @staticmethod 

140 def get_cve_with_revision(snap_name, revision): 

141 file_metadata = CveHelper._get_cve_file_metadata( 

142 "snap-cves/{}.json".format(snap_name) 

143 ) 

144 

145 if file_metadata: 

146 return CveHelper._fetch_file_content( 

147 snap_name, revision, file_metadata 

148 ) 

149 return [] 

150 

151 @staticmethod 

152 def can_user_access_cve_data( 

153 snap_name, snap_details, account_info, is_user_canonical 

154 ): 

155 snap_store = snap_details["store"] 

156 snap_publisher = snap_details["publisher"] 

157 

158 admin_user_stores = logic.get_stores( 

159 account_info["stores"], roles=["admin"] 

160 ) 

161 is_user_admin = [ 

162 item for item in admin_user_stores if item["name"] == snap_store 

163 ] 

164 

165 is_snap_in_global_store = snap_store == GLOBAL_STORE 

166 

167 is_snap_publisher_canonical = ( 

168 snap_publisher["id"] == CANONICAL_PUBLISHER_ID 

169 ) 

170 

171 is_user_snap_publisher = ( 

172 snap_publisher["username"] == account_info["username"] 

173 ) 

174 

175 is_user_collaborator = snap_name in account_info["snaps"]["16"] 

176 

177 is_privileged_user = is_user_snap_publisher or is_user_admin 

178 is_user_canonical_publisher = ( 

179 is_snap_publisher_canonical and is_user_canonical 

180 ) 

181 has_store_access = is_snap_in_global_store and ( 

182 is_user_collaborator or is_user_canonical_publisher 

183 ) 

184 

185 # To access the CVE data of a snap, a user must meet 

186 # the following criteria: 

187 # - For all stores, the user must be 

188 # the publisher of the snap or have admin privileges. 

189 # - For non-Canonical snaps published 

190 # in the global store, the user must be a collaborator. 

191 # - For Canonical snaps published 

192 # in the global store, the user must be a Canonical publisher. 

193 can_view_cves = is_privileged_user or has_store_access 

194 

195 return can_view_cves 

196 

197 @staticmethod 

198 def _match_filters( 

199 cve, 

200 usn_ids, 

201 binary_statuses, 

202 binary_versions, 

203 binary_fixed_versions, 

204 binary_names, 

205 cvss_severities, 

206 ubuntu_priorities, 

207 ): 

208 if usn_ids: 

209 if not cve.get("usns") or not any( 

210 usn["id"] in usn_ids for usn in cve["usns"] 

211 ): 

212 return False 

213 

214 if cvss_severities and cve["cvss_severity"] not in cvss_severities: 

215 return False 

216 

217 if ( 

218 ubuntu_priorities 

219 and cve["ubuntu_priority"] not in ubuntu_priorities 

220 ): 

221 return False 

222 

223 if any( 

224 [ 

225 binary_statuses, 

226 binary_fixed_versions, 

227 binary_versions, 

228 binary_names, 

229 ] 

230 ): 

231 if not cve.get("affected_binaries"): 

232 return False 

233 

234 # Check if at least one affected binary matches the filters 

235 for binary in cve["affected_binaries"]: 

236 matches_binary = ( 

237 ( 

238 not binary_statuses 

239 or binary["status"] in binary_statuses 

240 ) 

241 and ( 

242 not binary_versions 

243 or binary["version"] in binary_versions 

244 ) 

245 and ( 

246 not binary_fixed_versions 

247 or binary["fixed_version"] in binary_fixed_versions 

248 ) 

249 and (not binary_names or binary["name"] in binary_names) 

250 ) 

251 if matches_binary: 

252 return True 

253 return False 

254 return True 

255 

256 @staticmethod 

257 def filter_cve_data( 

258 cves, 

259 usn_ids, 

260 binary_statuses, 

261 binary_versions, 

262 binary_fixed_versions, 

263 binary_names, 

264 cvss_severities, 

265 ubuntu_priorities, 

266 ): 

267 return [ 

268 cve 

269 for cve in cves 

270 if CveHelper._match_filters( 

271 cve, 

272 usn_ids=usn_ids, 

273 binary_fixed_versions=binary_fixed_versions, 

274 binary_names=binary_names, 

275 binary_statuses=binary_statuses, 

276 binary_versions=binary_versions, 

277 cvss_severities=cvss_severities, 

278 ubuntu_priorities=ubuntu_priorities, 

279 ) 

280 ] 

281 

282 @staticmethod 

283 def sort_cve_data(cves, sort_by, order): 

284 priority_order = { 

285 "negligible": 0, 

286 "low": 1, 

287 "medium": 2, 

288 "high": 3, 

289 "critical": 4, 

290 } 

291 

292 is_reverse_order = order.lower() == "desc" 

293 

294 if sort_by == "cvss_severity": 

295 cves.sort( 

296 key=lambda cve: priority_order.get( 

297 cve.get("cvss_severity"), -1 

298 ), 

299 reverse=is_reverse_order, 

300 ) 

301 

302 elif sort_by == "ubuntu_priority": 

303 cves.sort( 

304 key=lambda cve: priority_order.get( 

305 cve.get("ubuntu_priority"), -1 

306 ), 

307 reverse=is_reverse_order, 

308 ) 

309 elif sort_by == "cvss_score": 

310 cves.sort( 

311 key=lambda cve: cve.get("cvss_score", 0), 

312 reverse=is_reverse_order, 

313 ) 

314 else: 

315 cves.sort( 

316 key=lambda cve: cve.get(sort_by, ""), reverse=is_reverse_order 

317 ) 

318 return cves 

319 

320 @staticmethod 

321 def paginate_cve_list(cves, page, page_size): 

322 total_items = len(cves) 

323 start = (page - 1) * page_size 

324 end = start + page_size 

325 

326 return { 

327 "page": page, 

328 "page_size": page_size, 

329 "total_items": total_items, 

330 "total_pages": (total_items + page_size - 1) // page_size, 

331 "data": cves[start:end], 

332 }