Coverage for webapp/api/github.py: 64%
203 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-28 22:05 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-28 22:05 +0000
1import hmac
2from hashlib import sha1
3from os import getenv
5from webapp import api
6from webapp.helpers import get_yaml_loader
7from werkzeug.exceptions import Unauthorized, Forbidden
8from requests.exceptions import HTTPError
10import gzip
11from io import BytesIO
12import json
14GITHUB_WEBHOOK_SECRET = getenv("GITHUB_WEBHOOK_SECRET")
17class InvalidYAML(Exception):
18 pass
21class GitHub:
22 """
23 Provides authentication for GitHub users. Helper methods are also provided
24 for checking organization access and getting user data from the Github API.
25 """
27 REST_API_URL = "https://api.github.com"
28 GRAPHQL_API_URL = "https://api.github.com/graphql"
29 RAW_CONTENT_URL = "https://raw.githubusercontent.com"
31 YAML_LOCATIONS = [
32 "snapcraft.yaml",
33 ".snapcraft.yaml",
34 "snap/snapcraft.yaml",
35 "build-aux/snap/snapcraft.yaml",
36 ]
38 def __init__(self, access_token=None, session=api.requests.Session()):
39 self.access_token = access_token
40 self.session = session
41 self.session.headers["Accept"] = "application/json"
43 def _request(
44 self, method="GET", url="", params={}, data={}, raise_exceptions=True
45 ):
46 """
47 Makes a raw HTTP request and returns the response.
48 """
49 if self.access_token:
50 headers = {"Authorization": f"token {self.access_token}"}
51 else:
52 headers = {}
54 response = self.session.request(
55 method,
56 f"{self.REST_API_URL}/{url}",
57 headers=headers,
58 params=params,
59 json=data,
60 )
62 if raise_exceptions:
63 if response.status_code == 401:
64 raise Unauthorized(response=response)
65 if response.status_code == 403:
66 raise Forbidden(response=response)
68 response.raise_for_status()
70 return response
72 def decompress_data(self, data, encoding):
73 if encoding == "gzip":
74 with gzip.GzipFile(fileobj=BytesIO(data)) as f:
75 return f.read().decode(
76 "utf-8"
77 ) # Decompress and decode as UTF-8
78 return data.decode("utf-8")
80 def get_data_from_response(self, response):
81 content_encoding = response.headers.get("Content-Encoding", "")
82 if content_encoding == "gzip":
83 try:
84 content = response.content
85 decompressed_data = self.decompress_data(
86 content, content_encoding
87 )
88 data = json.loads(decompressed_data)
89 except Exception:
90 data = response.json()
91 else:
92 data = response.json()
93 return data
95 def _gql_request(self, query={}):
96 """
97 Makes a raw HTTP request and returns the response.
98 """
99 if self.access_token:
100 headers = {"Authorization": f"token {self.access_token}"}
101 else:
102 headers = {}
104 response = self.session.request(
105 "POST",
106 self.GRAPHQL_API_URL,
107 json={"query": query},
108 headers=headers,
109 )
111 if response.status_code == 401:
112 raise Unauthorized(response=response)
113 if response.status_code == 403:
114 raise Forbidden
116 response.raise_for_status()
118 data = self.get_data_from_response(response)
119 return data["data"]
121 def _get_nodes(self, edges):
122 """
123 GraphQL: Return the list of nodes from the edges
124 """
125 return [i["node"] for i in edges]
127 def get_user(self):
128 """
129 Return some user properties of the current user
130 """
131 gql = """
132 {
133 viewer {
134 login
135 name
136 avatarUrl(size: 100)
137 }
138 }
139 """
141 return self._gql_request(gql)["viewer"]
143 def get_orgs(self, end_cursor=None):
144 """
145 Lists of organizations that the authenticated user has explicit
146 permission to access.
147 """
148 gql = (
149 """
150 {
151 viewer {
152 organizations(first: 100,"""
153 + (f'after: "{end_cursor}"' if end_cursor else "")
154 + """) {
155 edges {
156 node {
157 login
158 name
159 }
160 }
161 pageInfo {
162 hasNextPage
163 endCursor
164 }
165 }
166 }
167 }
168 """
169 )
171 gql_response = self._gql_request(gql)["viewer"]["organizations"]
172 page_info = gql_response["pageInfo"]
173 orgs = self._get_nodes(gql_response["edges"])
175 if page_info["hasNextPage"]:
176 next_page = self.get_orgs(page_info["endCursor"])
177 orgs.extend(next_page)
179 return orgs
181 def get_user_repositories(self, end_cursor=None):
182 """
183 Lists of public repositories from the authenticated user
184 """
185 gql = (
186 """{
187 viewer {
188 repositories(
189 first: 100,
190 privacy: PUBLIC,
191 """
192 + (f'after: "{end_cursor}"' if end_cursor else "")
193 + """
194 ) {
195 edges {
196 node {
197 name
198 nameWithOwner
199 }
200 }
201 pageInfo {
202 hasNextPage
203 endCursor
204 }
205 }
206 }
207 }"""
208 )
210 gql_response = self._gql_request(gql)["viewer"]["repositories"]
211 page_info = gql_response["pageInfo"]
212 repositories = self._get_nodes(gql_response["edges"])
214 if page_info["hasNextPage"]:
215 next_page = self.get_user_repositories(page_info["endCursor"])
216 repositories.extend(next_page)
218 return repositories
220 def get_org_repositories(self, org_login, end_cursor=None):
221 """
222 Lists of public repositories from the authenticated user
223 """
224 gql = (
225 """{
226 viewer {
227 organization(login: \""""
228 + org_login
229 + """") {
230 repositories(
231 first: 100,
232 privacy: PUBLIC
233 """
234 + (f'after: "{end_cursor}"' if end_cursor else "")
235 + """
236 ) {
237 edges {
238 node {
239 name
240 }
241 }
242 pageInfo {
243 hasNextPage
244 endCursor
245 }
246 }
247 }
248 }
249 }"""
250 )
252 response = self._gql_request(gql)["viewer"]["organization"][
253 "repositories"
254 ]
256 page_info = response["pageInfo"]
257 repositories = self._get_nodes(response["edges"])
259 if page_info["hasNextPage"]:
260 next_page = self.get_org_repositories(
261 org_login, page_info["endCursor"]
262 )
263 repositories.extend(next_page)
265 return repositories
267 def check_permissions_over_repo(self, owner, repo, permission="push"):
268 """
269 Return True when the current user has the requested permissions
270 Possible values: "admin", "push" or "pull"
271 """
272 try:
273 response = self._request(
274 "GET",
275 f"repos/{owner}/{repo}",
276 raise_exceptions=True,
277 )
278 except Unauthorized:
279 return False
280 except Forbidden:
281 return False
282 except HTTPError as e:
283 if e.response.status_code == 404:
284 return False
286 data = self.get_data_from_response(response)
287 response_permissions = data["permissions"]
288 user_permissions = [
289 p for p in response_permissions if response_permissions[p]
290 ]
292 return permission in user_permissions
294 def check_if_repo_exists(self, owner, repo):
295 """
296 Return True if GitHub repo exists
297 """
298 response = self._request(
299 "GET",
300 f"repos/{owner}/{repo}",
301 raise_exceptions=False,
302 )
303 if response.status_code == 404:
304 return False
305 elif response.status_code == 200:
306 return True
307 elif response.status_code == 401:
308 raise Unauthorized
309 elif response.status_code == 403:
310 raise Forbidden
312 response.raise_for_status()
314 def get_snapcraft_yaml_location(self, owner, repo):
315 """
316 Return the snapcraft.yaml file location in the GitHub repo
317 """
319 # It is not possible to use GraphQL without authentication
320 # for that reason we are doing a call for each location to the REST API
321 for loc in self.YAML_LOCATIONS:
322 response = self._request(
323 "GET",
324 f"repos/{owner}/{repo}/contents/{loc}",
325 raise_exceptions=False,
326 )
327 if response.status_code == 404:
328 continue
329 elif response.status_code == 200:
330 return loc
331 elif response.status_code == 401:
332 raise Unauthorized
333 elif response.status_code == 403:
334 raise Forbidden
336 response.raise_for_status()
338 return False
340 def get_default_branch(self, owner, repo):
341 response = self._request("GET", f"repos/{owner}/{repo}")
342 data = self.get_data_from_response(response)
343 return data["default_branch"]
345 def get_last_commit(self, owner, repo, branch=None):
346 if not branch:
347 branch = self.get_default_branch(owner, repo)
349 response = self._request(
350 "GET", f"repos/{owner}/{repo}/commits/{branch}"
351 )
352 data = self.get_data_from_response(response)
353 return data["sha"]
355 def get_snapcraft_yaml_data(self, owner, repo, location=None):
356 """
357 Parse the snapcraft.yaml from the repo and return a dict
358 """
359 if not location:
360 location = self.get_snapcraft_yaml_location(owner, repo)
362 if location:
363 # Get last commit to avoid cache issues with raw.github.com
364 last_commit = self.get_last_commit(owner, repo)
366 response = self.session.request(
367 "GET",
368 f"{self.RAW_CONTENT_URL}/{owner}/{repo}"
369 f"/{last_commit}/{location}",
370 )
372 yaml = get_yaml_loader()
373 try:
374 content_encoding = response.headers.get("Content-Encoding", "")
375 if content_encoding == "gzip":
376 try:
377 content = response.content
378 data = self.decompress_data(content, content_encoding)
379 except Exception:
380 data = response.content
381 else:
382 data = response.content
383 return yaml.load(data)
384 except Exception:
385 raise InvalidYAML
387 return {}
389 def generate_webhook_secret_for_repo(self, owner, name):
390 key = bytes(GITHUB_WEBHOOK_SECRET, "UTF-8")
391 hmac_gen = hmac.new(key, None, sha1)
392 hmac_gen.update(bytes(owner, "UTF-8"))
393 hmac_gen.update(bytes(name, "UTF-8"))
394 return hmac_gen.hexdigest()
396 def validate_webhook_signature(self, payload, signature):
397 """
398 Generate the payload signature and compare with the given one
399 """
400 key = bytes(GITHUB_WEBHOOK_SECRET, "UTF-8")
401 hmac_gen = hmac.new(key, payload, sha1)
403 # Add append prefix to match the GitHub request format
404 digest = f"sha1={hmac_gen.hexdigest()}"
406 return hmac.compare_digest(digest, signature)
408 def validate_bsi_webhook_secret(self, owner, name, payload, signature):
409 """
410 Return True if the webhook contain a valid secret in BSI
411 """
412 secret = self.generate_webhook_secret_for_repo(owner, name)
413 final_key = bytes(secret, "UTF-8")
414 final_hmac = hmac.new(final_key, payload, sha1)
416 # Add append prefix to match the GitHub request format
417 digest = f"sha1={final_hmac.hexdigest()}"
419 return hmac.compare_digest(digest, signature)
421 def get_hooks(self, owner, repo, page=1):
422 """
423 Return all the webhooks in the repo
424 """
425 response = self._request(
426 "GET",
427 f"repos/{owner}/{repo}/hooks",
428 params={"per_page": 100, "page": page},
429 )
430 hooks = response.json()
432 if "next" in response.links:
433 hooks.extend(self.get_hooks(page=page + 1))
435 return hooks
437 def get_hook_by_url(self, owner, repo, url):
438 """
439 Return a webhook from the repo with the url
440 """
441 hooks = self.get_hooks(owner, repo)
443 for hook in hooks:
444 if hook["config"]["url"] == url:
445 return hook
447 return None
449 def update_hook_url(self, owner, repo, hook_id, new_url):
450 """
451 Update a webhook to activate it and update the URL
452 """
453 data = {
454 "active": True,
455 "config": {
456 "url": new_url,
457 "content_type": "json",
458 "secret": GITHUB_WEBHOOK_SECRET,
459 },
460 }
462 self._request(
463 "PATCH", f"repos/{owner}/{repo}/hooks/{hook_id}", data=data
464 )
466 return True
468 def create_hook(self, owner, repo, hook_url):
469 """
470 Create the webhook in the repo
471 """
472 secret = self.generate_webhook_secret_for_repo(owner, repo)
473 data = {
474 "config": {
475 "url": hook_url,
476 "content_type": "json",
477 "secret": secret,
478 },
479 }
481 self._request("POST", f"repos/{owner}/{repo}/hooks", data=data)
483 return True
485 def remove_hook(self, owner, repo, hook_id):
486 """
487 Remove GitHub webhook in a repo
488 """
489 self._request("DELETE", f"repos/{owner}/{repo}/hooks/{hook_id}")
491 return True