Coverage for webapp/api/github.py: 65%
204 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 22:07 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 22:07 +0000
1import hmac
2from hashlib import sha1
3from os import getenv
5from webapp import api
6from webapp.helpers import get_yaml_loader
7from werkzeug.exceptions import Unauthorized, Forbidden
8from requests.exceptions import HTTPError
10import gzip
11from io import BytesIO
12import json
14GITHUB_WEBHOOK_SECRET = getenv("GITHUB_WEBHOOK_SECRET")
17class InvalidYAML(Exception):
18 pass
21class GitHub:
22 """
23 Provides authentication for GitHub users. Helper methods are also provided
24 for checking organization access and getting user data from the Github API.
25 """
27 REST_API_URL = "https://api.github.com"
28 GRAPHQL_API_URL = "https://api.github.com/graphql"
29 RAW_CONTENT_URL = "https://raw.githubusercontent.com"
31 YAML_LOCATIONS = [
32 "snapcraft.yaml",
33 ".snapcraft.yaml",
34 "snap/snapcraft.yaml",
35 "build-aux/snap/snapcraft.yaml",
36 ]
38 def __init__(self, access_token=None, session=api.requests.Session()):
39 self.access_token = access_token
40 self.session = session
41 self.session.headers["Accept"] = "application/json"
43 def _request(
44 self, method="GET", url="", params={}, data={}, raise_exceptions=True
45 ):
46 """
47 Makes a raw HTTP request and returns the response.
48 """
49 if self.access_token:
50 headers = {"Authorization": f"token {self.access_token}"}
51 else:
52 headers = {}
54 response = self.session.request(
55 method,
56 f"{self.REST_API_URL}/{url}",
57 headers=headers,
58 params=params,
59 json=data,
60 )
62 if raise_exceptions:
63 if response.status_code == 401:
64 raise Unauthorized(response=response)
65 if response.status_code == 403:
66 raise Forbidden(response=response)
68 response.raise_for_status()
70 return response
72 def decompress_data(self, data, encoding):
73 if encoding == "gzip":
74 with gzip.GzipFile(fileobj=BytesIO(data)) as f:
75 return f.read().decode(
76 "utf-8"
77 ) # Decompress and decode as UTF-8
78 return data.decode("utf-8")
80 def get_data_from_response(self, response):
81 content_encoding = response.headers.get("Content-Encoding", "")
82 if content_encoding == "gzip":
83 try:
84 content = response.content
85 decompressed_data = self.decompress_data(
86 content, content_encoding
87 )
88 data = json.loads(decompressed_data)
89 except Exception:
90 data = response.json()
91 else:
92 data = response.json()
93 return data
95 def _gql_request(self, query={}):
96 """
97 Makes a raw HTTP request and returns the response.
98 """
99 if self.access_token:
100 headers = {"Authorization": f"token {self.access_token}"}
101 else:
102 headers = {}
104 response = self.session.request(
105 "POST",
106 self.GRAPHQL_API_URL,
107 json={"query": query},
108 headers=headers,
109 )
111 if response.status_code == 401:
112 raise Unauthorized(response=response)
113 if response.status_code == 403:
114 raise Forbidden
116 response.raise_for_status()
118 data = self.get_data_from_response(response)
119 return data["data"]
121 def _get_nodes(self, edges):
122 """
123 GraphQL: Return the list of nodes from the edges
124 """
125 return [i["node"] for i in edges]
127 def get_user(self):
128 """
129 Return some user properties of the current user
130 """
131 gql = """
132 {
133 viewer {
134 login
135 name
136 avatarUrl(size: 100)
137 }
138 }
139 """
141 return self._gql_request(gql)["viewer"]
143 def get_orgs(self, end_cursor=None):
144 """
145 Lists of organizations that the authenticated user has explicit
146 permission to access.
147 """
148 gql = (
149 """
150 {
151 viewer {
152 organizations(first: 100,"""
153 + (f'after: "{end_cursor}"' if end_cursor else "")
154 + """) {
155 edges {
156 node {
157 login
158 name
159 }
160 }
161 pageInfo {
162 hasNextPage
163 endCursor
164 }
165 }
166 }
167 }
168 """
169 )
171 gql_response = self._gql_request(gql)["viewer"]["organizations"]
172 page_info = gql_response["pageInfo"]
173 orgs = self._get_nodes(gql_response["edges"])
175 if page_info["hasNextPage"]:
176 next_page = self.get_orgs(page_info["endCursor"])
177 orgs.extend(next_page)
179 return orgs
181 def get_user_repositories(self, end_cursor=None):
182 """
183 Lists of public repositories from the authenticated user
184 """
185 gql = (
186 """{
187 viewer {
188 repositories(
189 first: 100,
190 privacy: PUBLIC,
191 """
192 + (f'after: "{end_cursor}"' if end_cursor else "")
193 + """
194 ) {
195 edges {
196 node {
197 name
198 nameWithOwner
199 }
200 }
201 pageInfo {
202 hasNextPage
203 endCursor
204 }
205 }
206 }
207 }"""
208 )
210 gql_response = self._gql_request(gql)["viewer"]["repositories"]
211 page_info = gql_response["pageInfo"]
212 repositories = self._get_nodes(gql_response["edges"])
214 if page_info["hasNextPage"]:
215 next_page = self.get_user_repositories(page_info["endCursor"])
216 repositories.extend(next_page)
218 repos = [
219 (
220 {**repo, "owner": repo.get("nameWithOwner", "").split("/")[0]}
221 if "nameWithOwner" in repo and repo.get("nameWithOwner")
222 else {**repo, "owner": None}
223 )
224 for repo in repositories
225 ]
227 return repos
229 def get_org_repositories(self, org_login, end_cursor=None):
230 """
231 Lists of public repositories from the authenticated user
232 """
233 gql = (
234 """{
235 viewer {
236 organization(login: \""""
237 + org_login
238 + """") {
239 repositories(
240 first: 100,
241 privacy: PUBLIC
242 """
243 + (f'after: "{end_cursor}"' if end_cursor else "")
244 + """
245 ) {
246 edges {
247 node {
248 name
249 }
250 }
251 pageInfo {
252 hasNextPage
253 endCursor
254 }
255 }
256 }
257 }
258 }"""
259 )
261 response = self._gql_request(gql)["viewer"]["organization"][
262 "repositories"
263 ]
265 page_info = response["pageInfo"]
266 repositories = self._get_nodes(response["edges"])
268 if page_info["hasNextPage"]:
269 next_page = self.get_org_repositories(
270 org_login, page_info["endCursor"]
271 )
272 repositories.extend(next_page)
274 return repositories
276 def check_permissions_over_repo(self, owner, repo, permission="push"):
277 """
278 Return True when the current user has the requested permissions
279 Possible values: "admin", "push" or "pull"
280 """
281 try:
282 response = self._request(
283 "GET",
284 f"repos/{owner}/{repo}",
285 raise_exceptions=True,
286 )
287 except Unauthorized:
288 return False
289 except Forbidden:
290 return False
291 except HTTPError as e:
292 if e.response.status_code == 404:
293 return False
295 data = self.get_data_from_response(response)
296 response_permissions = data["permissions"]
297 user_permissions = [
298 p for p in response_permissions if response_permissions[p]
299 ]
301 return permission in user_permissions
303 def check_if_repo_exists(self, owner, repo):
304 """
305 Return True if GitHub repo exists
306 """
307 response = self._request(
308 "GET",
309 f"repos/{owner}/{repo}",
310 raise_exceptions=False,
311 )
312 if response.status_code == 404:
313 return False
314 elif response.status_code == 200:
315 return True
316 elif response.status_code == 401:
317 raise Unauthorized
318 elif response.status_code == 403:
319 raise Forbidden
321 response.raise_for_status()
323 def get_snapcraft_yaml_location(self, owner, repo):
324 """
325 Return the snapcraft.yaml file location in the GitHub repo
326 """
328 # It is not possible to use GraphQL without authentication
329 # for that reason we are doing a call for each location to the REST API
330 for loc in self.YAML_LOCATIONS:
331 response = self._request(
332 "GET",
333 f"repos/{owner}/{repo}/contents/{loc}",
334 raise_exceptions=False,
335 )
336 if response.status_code == 404:
337 continue
338 elif response.status_code == 200:
339 return loc
340 elif response.status_code == 401:
341 raise Unauthorized
342 elif response.status_code == 403:
343 raise Forbidden
345 response.raise_for_status()
347 return False
349 def get_default_branch(self, owner, repo):
350 response = self._request("GET", f"repos/{owner}/{repo}")
351 data = self.get_data_from_response(response)
352 return data["default_branch"]
354 def get_last_commit(self, owner, repo, branch=None):
355 if not branch:
356 branch = self.get_default_branch(owner, repo)
358 response = self._request(
359 "GET", f"repos/{owner}/{repo}/commits/{branch}"
360 )
361 data = self.get_data_from_response(response)
362 return data["sha"]
364 def get_snapcraft_yaml_data(self, owner, repo, location=None):
365 """
366 Parse the snapcraft.yaml from the repo and return a dict
367 """
368 if not location:
369 location = self.get_snapcraft_yaml_location(owner, repo)
371 if location:
372 # Get last commit to avoid cache issues with raw.github.com
373 last_commit = self.get_last_commit(owner, repo)
375 response = self.session.request(
376 "GET",
377 f"{self.RAW_CONTENT_URL}/{owner}/{repo}"
378 f"/{last_commit}/{location}",
379 )
381 yaml = get_yaml_loader()
382 try:
383 content_encoding = response.headers.get("Content-Encoding", "")
384 if content_encoding == "gzip":
385 try:
386 content = response.content
387 data = self.decompress_data(content, content_encoding)
388 except Exception:
389 data = response.content
390 else:
391 data = response.content
392 return yaml.load(data)
393 except Exception:
394 raise InvalidYAML
396 return {}
398 def generate_webhook_secret_for_repo(self, owner, name):
399 key = bytes(GITHUB_WEBHOOK_SECRET, "UTF-8")
400 hmac_gen = hmac.new(key, None, sha1)
401 hmac_gen.update(bytes(owner, "UTF-8"))
402 hmac_gen.update(bytes(name, "UTF-8"))
403 return hmac_gen.hexdigest()
405 def validate_webhook_signature(self, payload, signature):
406 """
407 Generate the payload signature and compare with the given one
408 """
409 key = bytes(GITHUB_WEBHOOK_SECRET, "UTF-8")
410 hmac_gen = hmac.new(key, payload, sha1)
412 # Add append prefix to match the GitHub request format
413 digest = f"sha1={hmac_gen.hexdigest()}"
415 return hmac.compare_digest(digest, signature)
417 def validate_bsi_webhook_secret(self, owner, name, payload, signature):
418 """
419 Return True if the webhook contain a valid secret in BSI
420 """
421 secret = self.generate_webhook_secret_for_repo(owner, name)
422 final_key = bytes(secret, "UTF-8")
423 final_hmac = hmac.new(final_key, payload, sha1)
425 # Add append prefix to match the GitHub request format
426 digest = f"sha1={final_hmac.hexdigest()}"
428 return hmac.compare_digest(digest, signature)
430 def get_hooks(self, owner, repo, page=1):
431 """
432 Return all the webhooks in the repo
433 """
434 response = self._request(
435 "GET",
436 f"repos/{owner}/{repo}/hooks",
437 params={"per_page": 100, "page": page},
438 )
439 hooks = response.json()
441 if "next" in response.links:
442 hooks.extend(self.get_hooks(page=page + 1))
444 return hooks
446 def get_hook_by_url(self, owner, repo, url):
447 """
448 Return a webhook from the repo with the url
449 """
450 hooks = self.get_hooks(owner, repo)
452 for hook in hooks:
453 if hook["config"]["url"] == url:
454 return hook
456 return None
458 def update_hook_url(self, owner, repo, hook_id, new_url):
459 """
460 Update a webhook to activate it and update the URL
461 """
462 data = {
463 "active": True,
464 "config": {
465 "url": new_url,
466 "content_type": "json",
467 "secret": GITHUB_WEBHOOK_SECRET,
468 },
469 }
471 self._request(
472 "PATCH", f"repos/{owner}/{repo}/hooks/{hook_id}", data=data
473 )
475 return True
477 def create_hook(self, owner, repo, hook_url):
478 """
479 Create the webhook in the repo
480 """
481 secret = self.generate_webhook_secret_for_repo(owner, repo)
482 data = {
483 "config": {
484 "url": hook_url,
485 "content_type": "json",
486 "secret": secret,
487 },
488 }
490 self._request("POST", f"repos/{owner}/{repo}/hooks", data=data)
492 return True
494 def remove_hook(self, owner, repo, hook_id):
495 """
496 Remove GitHub webhook in a repo
497 """
498 self._request("DELETE", f"repos/{owner}/{repo}/hooks/{hook_id}")
500 return True