Coverage for webapp/publisher/cve/cve_helper.py: 97%

122 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-14 22:06 +0000

1import json 

2from os import getenv 

3import requests 

4import re 

5 

6from werkzeug.exceptions import NotFound 

7 

8from webapp.publisher.snaps import ( 

9 logic, 

10) 

11 

12REST_API_URL = "https://api.github.com" 

13GITHUB_SNAPCRAFT_BOT_USER_TOKEN = getenv("GITHUB_SNAPCRAFT_BOT_USER_TOKEN") 

14GLOBAL_STORE = "Global" 

15CANONICAL_PUBLISHER_ID = "canonical" 

16 

17 

18class CveHelper: 

19 """ 

20 Provides CVE data through GitHub by using snapcraft-web@canonical.com. 

21 """ 

22 

23 @staticmethod 

24 def _get_cve_file_metadata(file_path): 

25 url = ( 

26 f"{REST_API_URL}/repos/canonical/canonicalwebteam.snap-cves/" 

27 f"contents/{file_path}?ref=main" 

28 ) 

29 headers = {"Authorization": f"token {GITHUB_SNAPCRAFT_BOT_USER_TOKEN}"} 

30 response = requests.get(url, headers=headers) 

31 

32 if response.status_code == 200: 

33 return response.json() 

34 else: 

35 raise NotFound 

36 

37 @staticmethod 

38 def _format_cve_response(cve_list, cve_details, usn_details, status): 

39 cves = [] 

40 

41 for cve_id, cve in cve_list.items(): 

42 cve_detail = cve_details[cve_id] 

43 

44 cve_usns = [] 

45 cve_usn_list = cve["usns"] 

46 if cve_usn_list: 

47 for usn_id in cve_usn_list: 

48 usn_detail = usn_details[usn_id] 

49 if usn_detail: 

50 cve_usns.append( 

51 { 

52 "id": usn_id, 

53 "description": usn_detail["description"], 

54 "published_at": usn_detail["published_at"], 

55 "related_launchpad_bugs": usn_detail[ 

56 "related_launchpad_bugs" 

57 ], 

58 } 

59 ) 

60 

61 cves.append( 

62 { 

63 "id": cve_id, 

64 "status": status, 

65 "cvss_score": cve_detail["cvss_score"], 

66 "cvss_severity": cve_detail["cvss_severity"], 

67 "description": cve_detail["description"], 

68 "ubuntu_priority": cve_detail["ubuntu_priority"], 

69 "affected_binaries": cve["affected_binaries"], 

70 "channels_with_fix": cve["channels_with_fix"], 

71 "usns": cve_usns, 

72 } 

73 ) 

74 

75 return cves 

76 

77 @staticmethod 

78 def _fetch_file_content(snap_name, revision, file_metadata): 

79 if "download_url" in file_metadata: 

80 download_url = file_metadata["download_url"] 

81 headers = { 

82 "Authorization": f"token {GITHUB_SNAPCRAFT_BOT_USER_TOKEN}" 

83 } 

84 response = requests.get(download_url, headers=headers) 

85 

86 if response.status_code == 200: 

87 content = json.loads(response.text) 

88 revisions = content["snaps"][snap_name]["revisions"] 

89 

90 if revision not in revisions: 

91 raise NotFound 

92 

93 cve_list = revisions[revision] 

94 

95 fixed_cves = cve_list["fixed-cves"] 

96 unfixed_cves = cve_list["unfixed-cves"] 

97 

98 cve_details = content["security_issues"]["cves"] 

99 usn_details = content["security_issues"]["usns"] 

100 

101 unfixed = CveHelper._format_cve_response( 

102 unfixed_cves, cve_details, usn_details, "unfixed" 

103 ) 

104 fixed = CveHelper._format_cve_response( 

105 fixed_cves, cve_details, usn_details, "fixed" 

106 ) 

107 

108 return fixed + unfixed 

109 else: 

110 raise NotFound 

111 else: 

112 raise NotFound 

113 

114 @staticmethod 

115 def get_revisions_with_cves(snap_name): 

116 try: 

117 contents = CveHelper._get_cve_file_metadata( 

118 f"snap-cves/{snap_name}" 

119 ) 

120 

121 # find all revision YAML files in the folder 

122 # e.g., 123.yaml, 456.yaml, 789.yaml 

123 # and extract the revision numbers 

124 revision_files = [ 

125 int(match.group(1)) 

126 for item in contents 

127 if (match := re.match(r"(\d+)\.yaml$", item["name"])) 

128 ] 

129 

130 return revision_files 

131 except NotFound: 

132 return [] 

133 

134 @staticmethod 

135 def get_cve_with_revision(snap_name, revision): 

136 file_metadata = CveHelper._get_cve_file_metadata( 

137 "snap-cves/{}.json".format(snap_name) 

138 ) 

139 

140 if file_metadata: 

141 return CveHelper._fetch_file_content( 

142 snap_name, revision, file_metadata 

143 ) 

144 return [] 

145 

146 @staticmethod 

147 def can_user_access_cve_data( 

148 snap_name, snap_details, account_info, is_user_canonical 

149 ): 

150 snap_store = snap_details["store"] 

151 snap_publisher = snap_details["publisher"] 

152 

153 admin_user_stores = logic.get_stores( 

154 account_info["stores"], roles=["admin"] 

155 ) 

156 is_user_admin = [ 

157 item for item in admin_user_stores if item["name"] == snap_store 

158 ] 

159 

160 is_snap_in_global_store = snap_store == GLOBAL_STORE 

161 

162 is_snap_publisher_canonical = ( 

163 snap_publisher["id"] == CANONICAL_PUBLISHER_ID 

164 ) 

165 

166 is_user_snap_publisher = ( 

167 snap_publisher["username"] == account_info["username"] 

168 ) 

169 

170 is_user_collaborator = snap_name in account_info["snaps"]["16"] 

171 

172 is_privileged_user = is_user_snap_publisher or is_user_admin 

173 is_user_canonical_publisher = ( 

174 is_snap_publisher_canonical and is_user_canonical 

175 ) 

176 has_store_access = is_snap_in_global_store and ( 

177 is_user_collaborator or is_user_canonical_publisher 

178 ) 

179 

180 # To access the CVE data of a snap, a user must meet 

181 # the following criteria: 

182 # - For all stores, the user must be 

183 # the publisher of the snap or have admin privileges. 

184 # - For non-Canonical snaps published 

185 # in the global store, the user must be a collaborator. 

186 # - For Canonical snaps published 

187 # in the global store, the user must be a Canonical publisher. 

188 can_view_cves = is_privileged_user or has_store_access 

189 

190 return can_view_cves 

191 

192 @staticmethod 

193 def _match_filters( 

194 cve, 

195 usn_ids, 

196 binary_statuses, 

197 binary_versions, 

198 binary_fixed_versions, 

199 binary_names, 

200 cvss_severities, 

201 ubuntu_priorities, 

202 ): 

203 if usn_ids: 

204 if not cve.get("usns") or not any( 

205 usn["id"] in usn_ids for usn in cve["usns"] 

206 ): 

207 return False 

208 

209 if cvss_severities and cve["cvss_severity"] not in cvss_severities: 

210 return False 

211 

212 if ( 

213 ubuntu_priorities 

214 and cve["ubuntu_priority"] not in ubuntu_priorities 

215 ): 

216 return False 

217 

218 if any( 

219 [ 

220 binary_statuses, 

221 binary_fixed_versions, 

222 binary_versions, 

223 binary_names, 

224 ] 

225 ): 

226 if not cve.get("affected_binaries"): 

227 return False 

228 

229 # Check if at least one affected binary matches the filters 

230 for binary in cve["affected_binaries"]: 

231 matches_binary = ( 

232 ( 

233 not binary_statuses 

234 or binary["status"] in binary_statuses 

235 ) 

236 and ( 

237 not binary_versions 

238 or binary["version"] in binary_versions 

239 ) 

240 and ( 

241 not binary_fixed_versions 

242 or binary["fixed_version"] in binary_fixed_versions 

243 ) 

244 and (not binary_names or binary["name"] in binary_names) 

245 ) 

246 if matches_binary: 

247 return True 

248 return False 

249 return True 

250 

251 @staticmethod 

252 def filter_cve_data( 

253 cves, 

254 usn_ids, 

255 binary_statuses, 

256 binary_versions, 

257 binary_fixed_versions, 

258 binary_names, 

259 cvss_severities, 

260 ubuntu_priorities, 

261 ): 

262 return [ 

263 cve 

264 for cve in cves 

265 if CveHelper._match_filters( 

266 cve, 

267 usn_ids=usn_ids, 

268 binary_fixed_versions=binary_fixed_versions, 

269 binary_names=binary_names, 

270 binary_statuses=binary_statuses, 

271 binary_versions=binary_versions, 

272 cvss_severities=cvss_severities, 

273 ubuntu_priorities=ubuntu_priorities, 

274 ) 

275 ] 

276 

277 @staticmethod 

278 def sort_cve_data(cves, sort_by, order): 

279 priority_order = { 

280 "negligible": 0, 

281 "low": 1, 

282 "medium": 2, 

283 "high": 3, 

284 "critical": 4, 

285 } 

286 

287 is_reverse_order = order.lower() == "desc" 

288 

289 if sort_by == "cvss_severity": 

290 cves.sort( 

291 key=lambda cve: priority_order.get( 

292 cve.get("cvss_severity"), -1 

293 ), 

294 reverse=is_reverse_order, 

295 ) 

296 

297 elif sort_by == "ubuntu_priority": 

298 cves.sort( 

299 key=lambda cve: priority_order.get( 

300 cve.get("ubuntu_priority"), -1 

301 ), 

302 reverse=is_reverse_order, 

303 ) 

304 elif sort_by == "cvss_score": 

305 cves.sort( 

306 key=lambda cve: cve.get("cvss_score", 0), 

307 reverse=is_reverse_order, 

308 ) 

309 else: 

310 cves.sort( 

311 key=lambda cve: cve.get(sort_by, ""), reverse=is_reverse_order 

312 ) 

313 return cves 

314 

315 @staticmethod 

316 def paginate_cve_list(cves, page, page_size): 

317 total_items = len(cves) 

318 start = (page - 1) * page_size 

319 end = start + page_size 

320 

321 return { 

322 "page": page, 

323 "page_size": page_size, 

324 "total_items": total_items, 

325 "total_pages": (total_items + page_size - 1) // page_size, 

326 "data": cves[start:end], 

327 }