Coverage for webapp/api/github.py: 65%

204 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 22:07 +0000

1import hmac 

2from hashlib import sha1 

3from os import getenv 

4 

5from webapp import api 

6from webapp.helpers import get_yaml_loader 

7from werkzeug.exceptions import Unauthorized, Forbidden 

8from requests.exceptions import HTTPError 

9 

10import gzip 

11from io import BytesIO 

12import json 

13 

14GITHUB_WEBHOOK_SECRET = getenv("GITHUB_WEBHOOK_SECRET") 

15 

16 

17class InvalidYAML(Exception): 

18 pass 

19 

20 

21class GitHub: 

22 """ 

23 Provides authentication for GitHub users. Helper methods are also provided 

24 for checking organization access and getting user data from the Github API. 

25 """ 

26 

27 REST_API_URL = "https://api.github.com" 

28 GRAPHQL_API_URL = "https://api.github.com/graphql" 

29 RAW_CONTENT_URL = "https://raw.githubusercontent.com" 

30 

31 YAML_LOCATIONS = [ 

32 "snapcraft.yaml", 

33 ".snapcraft.yaml", 

34 "snap/snapcraft.yaml", 

35 "build-aux/snap/snapcraft.yaml", 

36 ] 

37 

38 def __init__(self, access_token=None, session=api.requests.Session()): 

39 self.access_token = access_token 

40 self.session = session 

41 self.session.headers["Accept"] = "application/json" 

42 

43 def _request( 

44 self, method="GET", url="", params={}, data={}, raise_exceptions=True 

45 ): 

46 """ 

47 Makes a raw HTTP request and returns the response. 

48 """ 

49 if self.access_token: 

50 headers = {"Authorization": f"token {self.access_token}"} 

51 else: 

52 headers = {} 

53 

54 response = self.session.request( 

55 method, 

56 f"{self.REST_API_URL}/{url}", 

57 headers=headers, 

58 params=params, 

59 json=data, 

60 ) 

61 

62 if raise_exceptions: 

63 if response.status_code == 401: 

64 raise Unauthorized(response=response) 

65 if response.status_code == 403: 

66 raise Forbidden(response=response) 

67 

68 response.raise_for_status() 

69 

70 return response 

71 

72 def decompress_data(self, data, encoding): 

73 if encoding == "gzip": 

74 with gzip.GzipFile(fileobj=BytesIO(data)) as f: 

75 return f.read().decode( 

76 "utf-8" 

77 ) # Decompress and decode as UTF-8 

78 return data.decode("utf-8") 

79 

80 def get_data_from_response(self, response): 

81 content_encoding = response.headers.get("Content-Encoding", "") 

82 if content_encoding == "gzip": 

83 try: 

84 content = response.content 

85 decompressed_data = self.decompress_data( 

86 content, content_encoding 

87 ) 

88 data = json.loads(decompressed_data) 

89 except Exception: 

90 data = response.json() 

91 else: 

92 data = response.json() 

93 return data 

94 

95 def _gql_request(self, query={}): 

96 """ 

97 Makes a raw HTTP request and returns the response. 

98 """ 

99 if self.access_token: 

100 headers = {"Authorization": f"token {self.access_token}"} 

101 else: 

102 headers = {} 

103 

104 response = self.session.request( 

105 "POST", 

106 self.GRAPHQL_API_URL, 

107 json={"query": query}, 

108 headers=headers, 

109 ) 

110 

111 if response.status_code == 401: 

112 raise Unauthorized(response=response) 

113 if response.status_code == 403: 

114 raise Forbidden 

115 

116 response.raise_for_status() 

117 

118 data = self.get_data_from_response(response) 

119 return data["data"] 

120 

121 def _get_nodes(self, edges): 

122 """ 

123 GraphQL: Return the list of nodes from the edges 

124 """ 

125 return [i["node"] for i in edges] 

126 

127 def get_user(self): 

128 """ 

129 Return some user properties of the current user 

130 """ 

131 gql = """ 

132 { 

133 viewer { 

134 login 

135 name 

136 avatarUrl(size: 100) 

137 } 

138 } 

139 """ 

140 

141 return self._gql_request(gql)["viewer"] 

142 

143 def get_orgs(self, end_cursor=None): 

144 """ 

145 Lists of organizations that the authenticated user has explicit 

146 permission to access. 

147 """ 

148 gql = ( 

149 """ 

150 { 

151 viewer { 

152 organizations(first: 100,""" 

153 + (f'after: "{end_cursor}"' if end_cursor else "") 

154 + """) { 

155 edges { 

156 node { 

157 login 

158 name 

159 } 

160 } 

161 pageInfo { 

162 hasNextPage 

163 endCursor 

164 } 

165 } 

166 } 

167 } 

168 """ 

169 ) 

170 

171 gql_response = self._gql_request(gql)["viewer"]["organizations"] 

172 page_info = gql_response["pageInfo"] 

173 orgs = self._get_nodes(gql_response["edges"]) 

174 

175 if page_info["hasNextPage"]: 

176 next_page = self.get_orgs(page_info["endCursor"]) 

177 orgs.extend(next_page) 

178 

179 return orgs 

180 

181 def get_user_repositories(self, end_cursor=None): 

182 """ 

183 Lists of public repositories from the authenticated user 

184 """ 

185 gql = ( 

186 """{ 

187 viewer { 

188 repositories( 

189 first: 100, 

190 privacy: PUBLIC, 

191 """ 

192 + (f'after: "{end_cursor}"' if end_cursor else "") 

193 + """ 

194 ) { 

195 edges { 

196 node { 

197 name 

198 nameWithOwner 

199 } 

200 } 

201 pageInfo { 

202 hasNextPage 

203 endCursor 

204 } 

205 } 

206 } 

207 }""" 

208 ) 

209 

210 gql_response = self._gql_request(gql)["viewer"]["repositories"] 

211 page_info = gql_response["pageInfo"] 

212 repositories = self._get_nodes(gql_response["edges"]) 

213 

214 if page_info["hasNextPage"]: 

215 next_page = self.get_user_repositories(page_info["endCursor"]) 

216 repositories.extend(next_page) 

217 

218 repos = [ 

219 ( 

220 {**repo, "owner": repo.get("nameWithOwner", "").split("/")[0]} 

221 if "nameWithOwner" in repo and repo.get("nameWithOwner") 

222 else {**repo, "owner": None} 

223 ) 

224 for repo in repositories 

225 ] 

226 

227 return repos 

228 

229 def get_org_repositories(self, org_login, end_cursor=None): 

230 """ 

231 Lists of public repositories from the authenticated user 

232 """ 

233 gql = ( 

234 """{ 

235 viewer { 

236 organization(login: \"""" 

237 + org_login 

238 + """") { 

239 repositories( 

240 first: 100, 

241 privacy: PUBLIC 

242 """ 

243 + (f'after: "{end_cursor}"' if end_cursor else "") 

244 + """ 

245 ) { 

246 edges { 

247 node { 

248 name 

249 } 

250 } 

251 pageInfo { 

252 hasNextPage 

253 endCursor 

254 } 

255 } 

256 } 

257 } 

258 }""" 

259 ) 

260 

261 response = self._gql_request(gql)["viewer"]["organization"][ 

262 "repositories" 

263 ] 

264 

265 page_info = response["pageInfo"] 

266 repositories = self._get_nodes(response["edges"]) 

267 

268 if page_info["hasNextPage"]: 

269 next_page = self.get_org_repositories( 

270 org_login, page_info["endCursor"] 

271 ) 

272 repositories.extend(next_page) 

273 

274 return repositories 

275 

276 def check_permissions_over_repo(self, owner, repo, permission="push"): 

277 """ 

278 Return True when the current user has the requested permissions 

279 Possible values: "admin", "push" or "pull" 

280 """ 

281 try: 

282 response = self._request( 

283 "GET", 

284 f"repos/{owner}/{repo}", 

285 raise_exceptions=True, 

286 ) 

287 except Unauthorized: 

288 return False 

289 except Forbidden: 

290 return False 

291 except HTTPError as e: 

292 if e.response.status_code == 404: 

293 return False 

294 

295 data = self.get_data_from_response(response) 

296 response_permissions = data["permissions"] 

297 user_permissions = [ 

298 p for p in response_permissions if response_permissions[p] 

299 ] 

300 

301 return permission in user_permissions 

302 

303 def check_if_repo_exists(self, owner, repo): 

304 """ 

305 Return True if GitHub repo exists 

306 """ 

307 response = self._request( 

308 "GET", 

309 f"repos/{owner}/{repo}", 

310 raise_exceptions=False, 

311 ) 

312 if response.status_code == 404: 

313 return False 

314 elif response.status_code == 200: 

315 return True 

316 elif response.status_code == 401: 

317 raise Unauthorized 

318 elif response.status_code == 403: 

319 raise Forbidden 

320 

321 response.raise_for_status() 

322 

323 def get_snapcraft_yaml_location(self, owner, repo): 

324 """ 

325 Return the snapcraft.yaml file location in the GitHub repo 

326 """ 

327 

328 # It is not possible to use GraphQL without authentication 

329 # for that reason we are doing a call for each location to the REST API 

330 for loc in self.YAML_LOCATIONS: 

331 response = self._request( 

332 "GET", 

333 f"repos/{owner}/{repo}/contents/{loc}", 

334 raise_exceptions=False, 

335 ) 

336 if response.status_code == 404: 

337 continue 

338 elif response.status_code == 200: 

339 return loc 

340 elif response.status_code == 401: 

341 raise Unauthorized 

342 elif response.status_code == 403: 

343 raise Forbidden 

344 

345 response.raise_for_status() 

346 

347 return False 

348 

349 def get_default_branch(self, owner, repo): 

350 response = self._request("GET", f"repos/{owner}/{repo}") 

351 data = self.get_data_from_response(response) 

352 return data["default_branch"] 

353 

354 def get_last_commit(self, owner, repo, branch=None): 

355 if not branch: 

356 branch = self.get_default_branch(owner, repo) 

357 

358 response = self._request( 

359 "GET", f"repos/{owner}/{repo}/commits/{branch}" 

360 ) 

361 data = self.get_data_from_response(response) 

362 return data["sha"] 

363 

364 def get_snapcraft_yaml_data(self, owner, repo, location=None): 

365 """ 

366 Parse the snapcraft.yaml from the repo and return a dict 

367 """ 

368 if not location: 

369 location = self.get_snapcraft_yaml_location(owner, repo) 

370 

371 if location: 

372 # Get last commit to avoid cache issues with raw.github.com 

373 last_commit = self.get_last_commit(owner, repo) 

374 

375 response = self.session.request( 

376 "GET", 

377 f"{self.RAW_CONTENT_URL}/{owner}/{repo}" 

378 f"/{last_commit}/{location}", 

379 ) 

380 

381 yaml = get_yaml_loader() 

382 try: 

383 content_encoding = response.headers.get("Content-Encoding", "") 

384 if content_encoding == "gzip": 

385 try: 

386 content = response.content 

387 data = self.decompress_data(content, content_encoding) 

388 except Exception: 

389 data = response.content 

390 else: 

391 data = response.content 

392 return yaml.load(data) 

393 except Exception: 

394 raise InvalidYAML 

395 

396 return {} 

397 

398 def generate_webhook_secret_for_repo(self, owner, name): 

399 key = bytes(GITHUB_WEBHOOK_SECRET, "UTF-8") 

400 hmac_gen = hmac.new(key, None, sha1) 

401 hmac_gen.update(bytes(owner, "UTF-8")) 

402 hmac_gen.update(bytes(name, "UTF-8")) 

403 return hmac_gen.hexdigest() 

404 

405 def validate_webhook_signature(self, payload, signature): 

406 """ 

407 Generate the payload signature and compare with the given one 

408 """ 

409 key = bytes(GITHUB_WEBHOOK_SECRET, "UTF-8") 

410 hmac_gen = hmac.new(key, payload, sha1) 

411 

412 # Add append prefix to match the GitHub request format 

413 digest = f"sha1={hmac_gen.hexdigest()}" 

414 

415 return hmac.compare_digest(digest, signature) 

416 

417 def validate_bsi_webhook_secret(self, owner, name, payload, signature): 

418 """ 

419 Return True if the webhook contain a valid secret in BSI 

420 """ 

421 secret = self.generate_webhook_secret_for_repo(owner, name) 

422 final_key = bytes(secret, "UTF-8") 

423 final_hmac = hmac.new(final_key, payload, sha1) 

424 

425 # Add append prefix to match the GitHub request format 

426 digest = f"sha1={final_hmac.hexdigest()}" 

427 

428 return hmac.compare_digest(digest, signature) 

429 

430 def get_hooks(self, owner, repo, page=1): 

431 """ 

432 Return all the webhooks in the repo 

433 """ 

434 response = self._request( 

435 "GET", 

436 f"repos/{owner}/{repo}/hooks", 

437 params={"per_page": 100, "page": page}, 

438 ) 

439 hooks = response.json() 

440 

441 if "next" in response.links: 

442 hooks.extend(self.get_hooks(page=page + 1)) 

443 

444 return hooks 

445 

446 def get_hook_by_url(self, owner, repo, url): 

447 """ 

448 Return a webhook from the repo with the url 

449 """ 

450 hooks = self.get_hooks(owner, repo) 

451 

452 for hook in hooks: 

453 if hook["config"]["url"] == url: 

454 return hook 

455 

456 return None 

457 

458 def update_hook_url(self, owner, repo, hook_id, new_url): 

459 """ 

460 Update a webhook to activate it and update the URL 

461 """ 

462 data = { 

463 "active": True, 

464 "config": { 

465 "url": new_url, 

466 "content_type": "json", 

467 "secret": GITHUB_WEBHOOK_SECRET, 

468 }, 

469 } 

470 

471 self._request( 

472 "PATCH", f"repos/{owner}/{repo}/hooks/{hook_id}", data=data 

473 ) 

474 

475 return True 

476 

477 def create_hook(self, owner, repo, hook_url): 

478 """ 

479 Create the webhook in the repo 

480 """ 

481 secret = self.generate_webhook_secret_for_repo(owner, repo) 

482 data = { 

483 "config": { 

484 "url": hook_url, 

485 "content_type": "json", 

486 "secret": secret, 

487 }, 

488 } 

489 

490 self._request("POST", f"repos/{owner}/{repo}/hooks", data=data) 

491 

492 return True 

493 

494 def remove_hook(self, owner, repo, hook_id): 

495 """ 

496 Remove GitHub webhook in a repo 

497 """ 

498 self._request("DELETE", f"repos/{owner}/{repo}/hooks/{hook_id}") 

499 

500 return True