Coverage for webapp/publisher/cve/cve_helper.py: 97%

122 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-28 22:05 +0000

1import json 

2from os import getenv 

3import requests 

4import re 

5 

6from werkzeug.exceptions import NotFound 

7 

8from webapp.publisher.snaps import ( 

9 logic, 

10) 

11 

12REST_API_URL = "https://api.github.com" 

13GITHUB_SNAPCRAFT_BOT_USER_TOKEN = getenv("GITHUB_SNAPCRAFT_BOT_USER_TOKEN") 

14GLOBAL_STORE = "Global" 

15CANONICAL_PUBLISHER_ID = "canonical" 

16 

17 

18class CveHelper: 

19 """ 

20 Provides CVE data through GitHub by using snapcraft-web@canonical.com. 

21 """ 

22 

23 @staticmethod 

24 def _get_cve_file_metadata(file_path): 

25 url = ( 

26 f"{REST_API_URL}/repos/canonical/canonicalwebteam.snap-cves/" 

27 f"contents/{file_path}?ref=main" 

28 ) 

29 headers = {"Authorization": f"token {GITHUB_SNAPCRAFT_BOT_USER_TOKEN}"} 

30 response = requests.get(url, headers=headers) 

31 

32 if response.status_code == 200: 

33 return response.json() 

34 else: 

35 raise NotFound 

36 

37 @staticmethod 

38 def _format_cve_response(cve_list, cve_details, usn_details): 

39 cves = [] 

40 

41 for cve_id, cve in cve_list.items(): 

42 cve_detail = cve_details[cve_id] 

43 

44 cve_usns = [] 

45 cve_usn_list = cve["usns"] 

46 if cve_usn_list: 

47 for usn_id in cve_usn_list: 

48 usn_detail = usn_details[usn_id] 

49 if usn_detail: 

50 cve_usns.append( 

51 { 

52 "id": usn_id, 

53 "description": usn_detail["description"], 

54 "published_at": usn_detail["published_at"], 

55 "related_launchpad_bugs": usn_detail[ 

56 "related_launchpad_bugs" 

57 ], 

58 } 

59 ) 

60 

61 cves.append( 

62 { 

63 "id": cve_id, 

64 "cvss_score": cve_detail["cvss_score"], 

65 "cvss_severity": cve_detail["cvss_severity"], 

66 "description": cve_detail["description"], 

67 "ubuntu_priority": cve_detail["ubuntu_priority"], 

68 "affected_binaries": cve["affected_binaries"], 

69 "channels_with_fix": cve["channels_with_fix"], 

70 "usns": cve_usns, 

71 } 

72 ) 

73 

74 return cves 

75 

76 @staticmethod 

77 def _fetch_file_content(snap_name, revision, file_metadata): 

78 if "download_url" in file_metadata: 

79 download_url = file_metadata["download_url"] 

80 headers = { 

81 "Authorization": f"token {GITHUB_SNAPCRAFT_BOT_USER_TOKEN}" 

82 } 

83 response = requests.get(download_url, headers=headers) 

84 

85 if response.status_code == 200: 

86 content = json.loads(response.text) 

87 revisions = content["snaps"][snap_name]["revisions"] 

88 

89 if revision not in revisions: 

90 raise NotFound 

91 

92 cve_list = revisions[revision] 

93 

94 fixed_cves = cve_list["fixed-cves"] 

95 unfixed_cves = cve_list["unfixed-cves"] 

96 

97 cve_details = content["security_issues"]["cves"] 

98 usn_details = content["security_issues"]["usns"] 

99 

100 unfixed = CveHelper._format_cve_response( 

101 unfixed_cves, cve_details, usn_details 

102 ) 

103 fixed = CveHelper._format_cve_response( 

104 fixed_cves, cve_details, usn_details 

105 ) 

106 

107 return fixed + unfixed 

108 else: 

109 raise NotFound 

110 else: 

111 raise NotFound 

112 

113 @staticmethod 

114 def get_revisions_with_cves(snap_name): 

115 try: 

116 contents = CveHelper._get_cve_file_metadata( 

117 f"snap-cves/{snap_name}" 

118 ) 

119 

120 # find all revision YAML files in the folder 

121 # e.g., 123.yaml, 456.yaml, 789.yaml 

122 # and extract the revision numbers 

123 revision_files = [ 

124 int(match.group(1)) 

125 for item in contents 

126 if (match := re.match(r"(\d+)\.yaml$", item["name"])) 

127 ] 

128 

129 return revision_files 

130 except NotFound: 

131 return [] 

132 

133 @staticmethod 

134 def get_cve_with_revision(snap_name, revision): 

135 file_metadata = CveHelper._get_cve_file_metadata( 

136 "snap-cves/{}.json".format(snap_name) 

137 ) 

138 

139 if file_metadata: 

140 return CveHelper._fetch_file_content( 

141 snap_name, revision, file_metadata 

142 ) 

143 return [] 

144 

145 @staticmethod 

146 def can_user_access_cve_data( 

147 snap_name, snap_details, account_info, is_user_canonical 

148 ): 

149 snap_store = snap_details["store"] 

150 snap_publisher = snap_details["publisher"] 

151 

152 admin_user_stores = logic.get_stores( 

153 account_info["stores"], roles=["admin"] 

154 ) 

155 is_user_admin = [ 

156 item for item in admin_user_stores if item["name"] == snap_store 

157 ] 

158 

159 is_snap_in_global_store = snap_store == GLOBAL_STORE 

160 

161 is_snap_publisher_canonical = ( 

162 snap_publisher["id"] == CANONICAL_PUBLISHER_ID 

163 ) 

164 

165 is_user_snap_publisher = ( 

166 snap_publisher["username"] == account_info["username"] 

167 ) 

168 

169 is_user_collaborator = snap_name in account_info["snaps"]["16"] 

170 

171 is_privileged_user = is_user_snap_publisher or is_user_admin 

172 is_user_canonical_publisher = ( 

173 is_snap_publisher_canonical and is_user_canonical 

174 ) 

175 has_store_access = is_snap_in_global_store and ( 

176 is_user_collaborator or is_user_canonical_publisher 

177 ) 

178 

179 # To access the CVE data of a snap, a user must meet 

180 # the following criteria: 

181 # - For all stores, the user must be 

182 # the publisher of the snap or have admin privileges. 

183 # - For non-Canonical snaps published 

184 # in the global store, the user must be a collaborator. 

185 # - For Canonical snaps published 

186 # in the global store, the user must be a Canonical publisher. 

187 can_view_cves = is_privileged_user or has_store_access 

188 

189 return can_view_cves 

190 

191 @staticmethod 

192 def _match_filters( 

193 cve, 

194 usn_ids, 

195 binary_statuses, 

196 binary_versions, 

197 binary_fixed_versions, 

198 binary_names, 

199 cvss_severities, 

200 ubuntu_priorities, 

201 ): 

202 if usn_ids: 

203 if not cve.get("usns") or not any( 

204 usn["id"] in usn_ids for usn in cve["usns"] 

205 ): 

206 return False 

207 

208 if cvss_severities and cve["cvss_severity"] not in cvss_severities: 

209 return False 

210 

211 if ( 

212 ubuntu_priorities 

213 and cve["ubuntu_priority"] not in ubuntu_priorities 

214 ): 

215 return False 

216 

217 if any( 

218 [ 

219 binary_statuses, 

220 binary_fixed_versions, 

221 binary_versions, 

222 binary_names, 

223 ] 

224 ): 

225 if not cve.get("affected_binaries"): 

226 return False 

227 

228 # Check if at least one affected binary matches the filters 

229 for binary in cve["affected_binaries"]: 

230 matches_binary = ( 

231 ( 

232 not binary_statuses 

233 or binary["status"] in binary_statuses 

234 ) 

235 and ( 

236 not binary_versions 

237 or binary["version"] in binary_versions 

238 ) 

239 and ( 

240 not binary_fixed_versions 

241 or binary["fixed_version"] in binary_fixed_versions 

242 ) 

243 and (not binary_names or binary["name"] in binary_names) 

244 ) 

245 if matches_binary: 

246 return True 

247 return False 

248 return True 

249 

250 @staticmethod 

251 def filter_cve_data( 

252 cves, 

253 usn_ids, 

254 binary_statuses, 

255 binary_versions, 

256 binary_fixed_versions, 

257 binary_names, 

258 cvss_severities, 

259 ubuntu_priorities, 

260 ): 

261 return [ 

262 cve 

263 for cve in cves 

264 if CveHelper._match_filters( 

265 cve, 

266 usn_ids=usn_ids, 

267 binary_fixed_versions=binary_fixed_versions, 

268 binary_names=binary_names, 

269 binary_statuses=binary_statuses, 

270 binary_versions=binary_versions, 

271 cvss_severities=cvss_severities, 

272 ubuntu_priorities=ubuntu_priorities, 

273 ) 

274 ] 

275 

276 @staticmethod 

277 def sort_cve_data(cves, sort_by, order): 

278 priority_order = { 

279 "negligible": 0, 

280 "low": 1, 

281 "medium": 2, 

282 "high": 3, 

283 "critical": 4, 

284 } 

285 

286 is_reverse_order = order.lower() == "desc" 

287 

288 if sort_by == "cvss_severity": 

289 cves.sort( 

290 key=lambda cve: priority_order.get( 

291 cve.get("cvss_severity"), -1 

292 ), 

293 reverse=is_reverse_order, 

294 ) 

295 

296 elif sort_by == "ubuntu_priority": 

297 cves.sort( 

298 key=lambda cve: priority_order.get( 

299 cve.get("ubuntu_priority"), -1 

300 ), 

301 reverse=is_reverse_order, 

302 ) 

303 elif sort_by == "cvss_score": 

304 cves.sort( 

305 key=lambda cve: cve.get("cvss_score", 0), 

306 reverse=is_reverse_order, 

307 ) 

308 else: 

309 cves.sort( 

310 key=lambda cve: cve.get(sort_by, ""), reverse=is_reverse_order 

311 ) 

312 return cves 

313 

314 @staticmethod 

315 def paginate_cve_list(cves, page, page_size): 

316 total_items = len(cves) 

317 start = (page - 1) * page_size 

318 end = start + page_size 

319 

320 return { 

321 "page": page, 

322 "page_size": page_size, 

323 "total_items": total_items, 

324 "total_pages": (total_items + page_size - 1) // page_size, 

325 "data": cves[start:end], 

326 }