Coverage for webapp/api/github.py: 64%

203 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-28 22:05 +0000

1import hmac 

2from hashlib import sha1 

3from os import getenv 

4 

5from webapp import api 

6from webapp.helpers import get_yaml_loader 

7from werkzeug.exceptions import Unauthorized, Forbidden 

8from requests.exceptions import HTTPError 

9 

10import gzip 

11from io import BytesIO 

12import json 

13 

14GITHUB_WEBHOOK_SECRET = getenv("GITHUB_WEBHOOK_SECRET") 

15 

16 

17class InvalidYAML(Exception): 

18 pass 

19 

20 

21class GitHub: 

22 """ 

23 Provides authentication for GitHub users. Helper methods are also provided 

24 for checking organization access and getting user data from the Github API. 

25 """ 

26 

27 REST_API_URL = "https://api.github.com" 

28 GRAPHQL_API_URL = "https://api.github.com/graphql" 

29 RAW_CONTENT_URL = "https://raw.githubusercontent.com" 

30 

31 YAML_LOCATIONS = [ 

32 "snapcraft.yaml", 

33 ".snapcraft.yaml", 

34 "snap/snapcraft.yaml", 

35 "build-aux/snap/snapcraft.yaml", 

36 ] 

37 

38 def __init__(self, access_token=None, session=api.requests.Session()): 

39 self.access_token = access_token 

40 self.session = session 

41 self.session.headers["Accept"] = "application/json" 

42 

43 def _request( 

44 self, method="GET", url="", params={}, data={}, raise_exceptions=True 

45 ): 

46 """ 

47 Makes a raw HTTP request and returns the response. 

48 """ 

49 if self.access_token: 

50 headers = {"Authorization": f"token {self.access_token}"} 

51 else: 

52 headers = {} 

53 

54 response = self.session.request( 

55 method, 

56 f"{self.REST_API_URL}/{url}", 

57 headers=headers, 

58 params=params, 

59 json=data, 

60 ) 

61 

62 if raise_exceptions: 

63 if response.status_code == 401: 

64 raise Unauthorized(response=response) 

65 if response.status_code == 403: 

66 raise Forbidden(response=response) 

67 

68 response.raise_for_status() 

69 

70 return response 

71 

72 def decompress_data(self, data, encoding): 

73 if encoding == "gzip": 

74 with gzip.GzipFile(fileobj=BytesIO(data)) as f: 

75 return f.read().decode( 

76 "utf-8" 

77 ) # Decompress and decode as UTF-8 

78 return data.decode("utf-8") 

79 

80 def get_data_from_response(self, response): 

81 content_encoding = response.headers.get("Content-Encoding", "") 

82 if content_encoding == "gzip": 

83 try: 

84 content = response.content 

85 decompressed_data = self.decompress_data( 

86 content, content_encoding 

87 ) 

88 data = json.loads(decompressed_data) 

89 except Exception: 

90 data = response.json() 

91 else: 

92 data = response.json() 

93 return data 

94 

95 def _gql_request(self, query={}): 

96 """ 

97 Makes a raw HTTP request and returns the response. 

98 """ 

99 if self.access_token: 

100 headers = {"Authorization": f"token {self.access_token}"} 

101 else: 

102 headers = {} 

103 

104 response = self.session.request( 

105 "POST", 

106 self.GRAPHQL_API_URL, 

107 json={"query": query}, 

108 headers=headers, 

109 ) 

110 

111 if response.status_code == 401: 

112 raise Unauthorized(response=response) 

113 if response.status_code == 403: 

114 raise Forbidden 

115 

116 response.raise_for_status() 

117 

118 data = self.get_data_from_response(response) 

119 return data["data"] 

120 

121 def _get_nodes(self, edges): 

122 """ 

123 GraphQL: Return the list of nodes from the edges 

124 """ 

125 return [i["node"] for i in edges] 

126 

127 def get_user(self): 

128 """ 

129 Return some user properties of the current user 

130 """ 

131 gql = """ 

132 { 

133 viewer { 

134 login 

135 name 

136 avatarUrl(size: 100) 

137 } 

138 } 

139 """ 

140 

141 return self._gql_request(gql)["viewer"] 

142 

143 def get_orgs(self, end_cursor=None): 

144 """ 

145 Lists of organizations that the authenticated user has explicit 

146 permission to access. 

147 """ 

148 gql = ( 

149 """ 

150 { 

151 viewer { 

152 organizations(first: 100,""" 

153 + (f'after: "{end_cursor}"' if end_cursor else "") 

154 + """) { 

155 edges { 

156 node { 

157 login 

158 name 

159 } 

160 } 

161 pageInfo { 

162 hasNextPage 

163 endCursor 

164 } 

165 } 

166 } 

167 } 

168 """ 

169 ) 

170 

171 gql_response = self._gql_request(gql)["viewer"]["organizations"] 

172 page_info = gql_response["pageInfo"] 

173 orgs = self._get_nodes(gql_response["edges"]) 

174 

175 if page_info["hasNextPage"]: 

176 next_page = self.get_orgs(page_info["endCursor"]) 

177 orgs.extend(next_page) 

178 

179 return orgs 

180 

181 def get_user_repositories(self, end_cursor=None): 

182 """ 

183 Lists of public repositories from the authenticated user 

184 """ 

185 gql = ( 

186 """{ 

187 viewer { 

188 repositories( 

189 first: 100, 

190 privacy: PUBLIC, 

191 """ 

192 + (f'after: "{end_cursor}"' if end_cursor else "") 

193 + """ 

194 ) { 

195 edges { 

196 node { 

197 name 

198 nameWithOwner 

199 } 

200 } 

201 pageInfo { 

202 hasNextPage 

203 endCursor 

204 } 

205 } 

206 } 

207 }""" 

208 ) 

209 

210 gql_response = self._gql_request(gql)["viewer"]["repositories"] 

211 page_info = gql_response["pageInfo"] 

212 repositories = self._get_nodes(gql_response["edges"]) 

213 

214 if page_info["hasNextPage"]: 

215 next_page = self.get_user_repositories(page_info["endCursor"]) 

216 repositories.extend(next_page) 

217 

218 return repositories 

219 

220 def get_org_repositories(self, org_login, end_cursor=None): 

221 """ 

222 Lists of public repositories from the authenticated user 

223 """ 

224 gql = ( 

225 """{ 

226 viewer { 

227 organization(login: \"""" 

228 + org_login 

229 + """") { 

230 repositories( 

231 first: 100, 

232 privacy: PUBLIC 

233 """ 

234 + (f'after: "{end_cursor}"' if end_cursor else "") 

235 + """ 

236 ) { 

237 edges { 

238 node { 

239 name 

240 } 

241 } 

242 pageInfo { 

243 hasNextPage 

244 endCursor 

245 } 

246 } 

247 } 

248 } 

249 }""" 

250 ) 

251 

252 response = self._gql_request(gql)["viewer"]["organization"][ 

253 "repositories" 

254 ] 

255 

256 page_info = response["pageInfo"] 

257 repositories = self._get_nodes(response["edges"]) 

258 

259 if page_info["hasNextPage"]: 

260 next_page = self.get_org_repositories( 

261 org_login, page_info["endCursor"] 

262 ) 

263 repositories.extend(next_page) 

264 

265 return repositories 

266 

267 def check_permissions_over_repo(self, owner, repo, permission="push"): 

268 """ 

269 Return True when the current user has the requested permissions 

270 Possible values: "admin", "push" or "pull" 

271 """ 

272 try: 

273 response = self._request( 

274 "GET", 

275 f"repos/{owner}/{repo}", 

276 raise_exceptions=True, 

277 ) 

278 except Unauthorized: 

279 return False 

280 except Forbidden: 

281 return False 

282 except HTTPError as e: 

283 if e.response.status_code == 404: 

284 return False 

285 

286 data = self.get_data_from_response(response) 

287 response_permissions = data["permissions"] 

288 user_permissions = [ 

289 p for p in response_permissions if response_permissions[p] 

290 ] 

291 

292 return permission in user_permissions 

293 

294 def check_if_repo_exists(self, owner, repo): 

295 """ 

296 Return True if GitHub repo exists 

297 """ 

298 response = self._request( 

299 "GET", 

300 f"repos/{owner}/{repo}", 

301 raise_exceptions=False, 

302 ) 

303 if response.status_code == 404: 

304 return False 

305 elif response.status_code == 200: 

306 return True 

307 elif response.status_code == 401: 

308 raise Unauthorized 

309 elif response.status_code == 403: 

310 raise Forbidden 

311 

312 response.raise_for_status() 

313 

314 def get_snapcraft_yaml_location(self, owner, repo): 

315 """ 

316 Return the snapcraft.yaml file location in the GitHub repo 

317 """ 

318 

319 # It is not possible to use GraphQL without authentication 

320 # for that reason we are doing a call for each location to the REST API 

321 for loc in self.YAML_LOCATIONS: 

322 response = self._request( 

323 "GET", 

324 f"repos/{owner}/{repo}/contents/{loc}", 

325 raise_exceptions=False, 

326 ) 

327 if response.status_code == 404: 

328 continue 

329 elif response.status_code == 200: 

330 return loc 

331 elif response.status_code == 401: 

332 raise Unauthorized 

333 elif response.status_code == 403: 

334 raise Forbidden 

335 

336 response.raise_for_status() 

337 

338 return False 

339 

340 def get_default_branch(self, owner, repo): 

341 response = self._request("GET", f"repos/{owner}/{repo}") 

342 data = self.get_data_from_response(response) 

343 return data["default_branch"] 

344 

345 def get_last_commit(self, owner, repo, branch=None): 

346 if not branch: 

347 branch = self.get_default_branch(owner, repo) 

348 

349 response = self._request( 

350 "GET", f"repos/{owner}/{repo}/commits/{branch}" 

351 ) 

352 data = self.get_data_from_response(response) 

353 return data["sha"] 

354 

355 def get_snapcraft_yaml_data(self, owner, repo, location=None): 

356 """ 

357 Parse the snapcraft.yaml from the repo and return a dict 

358 """ 

359 if not location: 

360 location = self.get_snapcraft_yaml_location(owner, repo) 

361 

362 if location: 

363 # Get last commit to avoid cache issues with raw.github.com 

364 last_commit = self.get_last_commit(owner, repo) 

365 

366 response = self.session.request( 

367 "GET", 

368 f"{self.RAW_CONTENT_URL}/{owner}/{repo}" 

369 f"/{last_commit}/{location}", 

370 ) 

371 

372 yaml = get_yaml_loader() 

373 try: 

374 content_encoding = response.headers.get("Content-Encoding", "") 

375 if content_encoding == "gzip": 

376 try: 

377 content = response.content 

378 data = self.decompress_data(content, content_encoding) 

379 except Exception: 

380 data = response.content 

381 else: 

382 data = response.content 

383 return yaml.load(data) 

384 except Exception: 

385 raise InvalidYAML 

386 

387 return {} 

388 

389 def generate_webhook_secret_for_repo(self, owner, name): 

390 key = bytes(GITHUB_WEBHOOK_SECRET, "UTF-8") 

391 hmac_gen = hmac.new(key, None, sha1) 

392 hmac_gen.update(bytes(owner, "UTF-8")) 

393 hmac_gen.update(bytes(name, "UTF-8")) 

394 return hmac_gen.hexdigest() 

395 

396 def validate_webhook_signature(self, payload, signature): 

397 """ 

398 Generate the payload signature and compare with the given one 

399 """ 

400 key = bytes(GITHUB_WEBHOOK_SECRET, "UTF-8") 

401 hmac_gen = hmac.new(key, payload, sha1) 

402 

403 # Add append prefix to match the GitHub request format 

404 digest = f"sha1={hmac_gen.hexdigest()}" 

405 

406 return hmac.compare_digest(digest, signature) 

407 

408 def validate_bsi_webhook_secret(self, owner, name, payload, signature): 

409 """ 

410 Return True if the webhook contain a valid secret in BSI 

411 """ 

412 secret = self.generate_webhook_secret_for_repo(owner, name) 

413 final_key = bytes(secret, "UTF-8") 

414 final_hmac = hmac.new(final_key, payload, sha1) 

415 

416 # Add append prefix to match the GitHub request format 

417 digest = f"sha1={final_hmac.hexdigest()}" 

418 

419 return hmac.compare_digest(digest, signature) 

420 

421 def get_hooks(self, owner, repo, page=1): 

422 """ 

423 Return all the webhooks in the repo 

424 """ 

425 response = self._request( 

426 "GET", 

427 f"repos/{owner}/{repo}/hooks", 

428 params={"per_page": 100, "page": page}, 

429 ) 

430 hooks = response.json() 

431 

432 if "next" in response.links: 

433 hooks.extend(self.get_hooks(page=page + 1)) 

434 

435 return hooks 

436 

437 def get_hook_by_url(self, owner, repo, url): 

438 """ 

439 Return a webhook from the repo with the url 

440 """ 

441 hooks = self.get_hooks(owner, repo) 

442 

443 for hook in hooks: 

444 if hook["config"]["url"] == url: 

445 return hook 

446 

447 return None 

448 

449 def update_hook_url(self, owner, repo, hook_id, new_url): 

450 """ 

451 Update a webhook to activate it and update the URL 

452 """ 

453 data = { 

454 "active": True, 

455 "config": { 

456 "url": new_url, 

457 "content_type": "json", 

458 "secret": GITHUB_WEBHOOK_SECRET, 

459 }, 

460 } 

461 

462 self._request( 

463 "PATCH", f"repos/{owner}/{repo}/hooks/{hook_id}", data=data 

464 ) 

465 

466 return True 

467 

468 def create_hook(self, owner, repo, hook_url): 

469 """ 

470 Create the webhook in the repo 

471 """ 

472 secret = self.generate_webhook_secret_for_repo(owner, repo) 

473 data = { 

474 "config": { 

475 "url": hook_url, 

476 "content_type": "json", 

477 "secret": secret, 

478 }, 

479 } 

480 

481 self._request("POST", f"repos/{owner}/{repo}/hooks", data=data) 

482 

483 return True 

484 

485 def remove_hook(self, owner, repo, hook_id): 

486 """ 

487 Remove GitHub webhook in a repo 

488 """ 

489 self._request("DELETE", f"repos/{owner}/{repo}/hooks/{hook_id}") 

490 

491 return True