Coverage for webapp/integrations/logic.py: 24%
152 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-27 22:07 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-27 22:07 +0000
1import re
2import time
3from github import Github
4from os import getenv
6import requests
8from webapp.helpers import get_yaml_loader
9from webapp.observability.utils import trace_function
11GITHUB_TOKEN = getenv("GITHUB_TOKEN")
13github_client = Github(GITHUB_TOKEN)
14yaml = get_yaml_loader()
17class Interfaces:
18 interfaces = []
19 last_fetch = None
20 repo = None
22 def __init__(self):
23 self.repo = github_client.get_repo(
24 "canonical/charm-relation-interfaces"
25 )
27 @trace_function
28 def get_interfaces(self):
29 if (
30 len(self.interfaces) == 0
31 or not self.last_fetch
32 or time.time() - self.last_fetch > 172800
33 ):
34 interfaces_contents = self.repo.get_contents("interfaces")
35 interfaces_table = []
37 for interface in interfaces_contents:
38 if interface.name.startswith("__"):
39 continue
40 interface_content = self.repo.get_contents(
41 f"interfaces/{interface.name}"
42 )
43 for version in interface_content:
44 interface_details = {}
45 version_content = self.repo.get_contents(
46 f"interfaces/{interface.name}/{version.name}"
47 )
48 for details in version_content:
49 if details.name == "interface.yaml":
50 interface_yaml = yaml.load(
51 details.decoded_content.decode("utf-8")
52 )
53 interface_details["name"] = interface_yaml.get(
54 "name", interface.name
55 )
57 if interface_yaml.get("status") == "published":
58 interface_details["status"] = "live"
59 elif interface_yaml.get("status") == "draft":
60 interface_details["status"] = "draft"
61 interface_details["version"] = interface_yaml.get(
62 "version", version.name[1:]
63 )
64 if details.name == "README.md":
65 interface_details["readme_path"] = details.html_url
66 if interface_details.get("status", "") in [
67 "live",
68 "draft",
69 ]:
70 interfaces_table.append(interface_details)
71 self.interfaces = interfaces_table
72 self.last_fetch = time.time()
73 return self.interfaces
75 @trace_function
76 def repo_has_interface(self, interface):
77 try:
78 self.repo.get_contents("interfaces/{}".format(interface))
79 return True
80 except Exception:
81 return False
83 @trace_function
84 def get_interface_from_path(self, interface_name):
85 interface_versions = self.repo.get_contents(
86 "interfaces/{}".format(interface_name)
87 )
88 versions = []
89 for i, version in enumerate(interface_versions):
90 if version.type == "dir" and version.name.startswith("v"):
91 versions.append(version.name)
93 versions.sort()
95 latest_version = versions.pop()
97 latest_version_interface = self.repo.get_contents(
98 "interfaces/{}/{}/interface.yaml".format(
99 interface_name, latest_version
100 )
101 ).decoded_content.decode("utf-8")
102 interface = yaml.load(latest_version_interface)
104 active_providers = []
105 active_requirers = []
106 url = "https://charmhub.io/"
107 if "providers" in interface and interface["providers"]:
108 for provider in interface["providers"]:
109 try:
110 p = requests.get(f"{url}/{provider['name']}")
111 if p.status_code != 404:
112 active_providers.append(provider)
113 except Exception:
114 continue
116 interface["providers"] = active_providers
117 if "requirers" in interface and interface["requirers"]:
118 for requirer in interface["requirers"]:
119 try:
120 c = requests.get(f"{url}/{requirer['name']}")
121 if c.status_code != 404:
122 active_requirers.append(requirer)
123 except Exception:
124 continue
125 interface["requirers"] = active_requirers
127 return interface
129 @trace_function
130 def get_h_content(self, text, pattern):
131 start_index = text.index(pattern)
132 return [start_index, start_index + len(pattern)]
134 @trace_function
135 def extract_headings_and_content(self, text, level):
136 headings = re.findall(
137 r"^#{" + str(level) + r"}\s.*", text, flags=re.MULTILINE
138 )
140 start_end = {
141 heading: self.get_h_content(text, heading) for heading in headings
142 }
143 result = []
144 for i in range(len(headings)):
145 current_heading = headings[i]
146 start_index = start_end[current_heading][1]
147 has_next = i < len(headings) - 1
148 if has_next:
149 next_heading = headings[i + 1]
150 end_index = start_end[next_heading][0]
151 body = text[start_index:end_index]
152 else:
153 body = text[start_index:]
155 result.append([current_heading.strip(), body.strip()])
156 return result
158 @trace_function
159 def parse_text(self, interface, version, text):
160 base_link = (
161 "https://github.com/canonical/"
162 "charm-relation-interfaces/blob/main/interfaces/{}/v{}"
163 ).format(interface, version)
164 pattern = r"\[.*?\]\(.*?\)"
165 matches = re.findall(pattern, text)
167 for match in matches:
168 element_pattern = r"\[(.*?)\]\((.*?)\)"
169 element_match = re.search(element_pattern, match)
170 if element_match:
171 title = element_match.group(1)
172 url = element_match.group(2)
173 absolute_url = url
174 if absolute_url.startswith("./"):
175 absolute_url = absolute_url.replace("./", base_link + "/")
177 text = text.replace(match, f"[{title}]({absolute_url})")
179 return text
181 @trace_function
182 def convert_readme(self, interface, version, text, level=2):
183 headings_and_contents = self.extract_headings_and_content(text, level)
185 if len(headings_and_contents) == 0:
186 return [s.strip("\n") for s in text.split("\n") if s.strip("\n")]
188 resulting_list = []
190 for heading, content in headings_and_contents:
191 strip_char = "{}{}".format("#" * level, " ")
192 heading = heading.strip(strip_char)
193 _temp = {"heading": heading, "level": level, "children": []}
195 children = []
197 result = self.convert_readme(
198 interface, version, content, level + 1
199 )
201 if len(content) > 0:
202 body = content.split("#")[0].strip()
203 body_list = body.split("\n\n")
204 if len(list(filter(None, body_list))) > 0:
205 children += body_list
207 if isinstance(result, list) and isinstance(result[0], dict):
208 children += result
210 for child in children:
211 if isinstance(child, list):
212 _temp["children"] += child
213 else:
214 _temp["children"].append(child)
216 for index, child in list(enumerate(_temp["children"])):
217 if isinstance(child, str):
218 _temp["children"][index] = self.parse_text(
219 interface, version, child
220 )
222 resulting_list.append(_temp)
224 return resulting_list
226 @trace_function
227 def get_interface_name_from_readme(self, text):
228 name = re.sub(r"[#` \n]", "", text.split("\n##", 1)[0]).split("/")[0]
229 return name