Coverage for webapp/integrations/logic.py: 23%

137 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-08 22:07 +0000

1import re 

2from github import Github 

3from os import getenv 

4from functools import lru_cache 

5 

6from webapp.helpers import get_yaml_loader 

7from webapp.observability.utils import trace_function 

8from webapp.packages.logic import ( 

9 get_package, 

10) 

11 

12GITHUB_TOKEN = getenv("GITHUB_TOKEN") 

13 

14github_client = Github(GITHUB_TOKEN) 

15yaml = get_yaml_loader() 

16 

17 

18class Interfaces: 

19 def __init__(self): 

20 self._repo = None 

21 

22 @property 

23 def repo(self): 

24 if self._repo is None: 

25 self._repo = github_client.get_repo( 

26 "canonical/charm-relation-interfaces" 

27 ) 

28 return self._repo 

29 

30 @lru_cache(maxsize=None) 

31 @trace_function 

32 def get_interfaces(self): 

33 try: 

34 index = self.repo.get_contents("index.json") 

35 if isinstance(index, list): 

36 index = index[0] 

37 index_content = index.decoded_content.decode("utf-8") 

38 interfaces = yaml.load(index_content) 

39 return interfaces 

40 except Exception: 

41 return [] 

42 

43 @lru_cache(maxsize=None) 

44 @trace_function 

45 def get_interface_from_path(self, interface_name): 

46 try: 

47 interface_versions = self.repo.get_contents( 

48 "interfaces/{}".format(interface_name) 

49 ) 

50 except Exception: 

51 return None 

52 versions = [] 

53 if not isinstance(interface_versions, list): 

54 interface_versions = [interface_versions] 

55 for _, version in enumerate(interface_versions): 

56 if version.type == "dir" and version.name.startswith("v"): 

57 versions.append(version.name) 

58 

59 versions.sort() 

60 

61 latest_version = versions.pop() 

62 

63 latest_version_interface = self.repo.get_contents( 

64 "interfaces/{}/{}/interface.yaml".format( 

65 interface_name, latest_version 

66 ) 

67 ).decoded_content.decode("utf-8") 

68 interface = yaml.load(latest_version_interface) 

69 

70 active_providers = [] 

71 active_requirers = [] 

72 

73 if "providers" in interface and interface["providers"]: 

74 active_providers = [] 

75 for provider in interface["providers"]: 

76 try: 

77 get_package(provider["name"], [], False) 

78 active_providers.append(provider) 

79 except Exception: 

80 continue 

81 interface["providers"] = active_providers 

82 

83 if "requirers" in interface and interface["requirers"]: 

84 active_requirers = [] 

85 for requirer in interface["requirers"]: 

86 try: 

87 get_package(requirer["name"], [], False) 

88 active_requirers.append(requirer) 

89 except Exception: 

90 continue 

91 interface["requirers"] = active_requirers 

92 

93 return interface 

94 

95 @trace_function 

96 def get_h_content(self, text, pattern): 

97 start_index = text.index(pattern) 

98 return [start_index, start_index + len(pattern)] 

99 

100 @trace_function 

101 def extract_headings_and_content(self, text, level): 

102 headings = re.findall( 

103 r"^#{" + str(level) + r"}\s.*", text, flags=re.MULTILINE 

104 ) 

105 

106 start_end = { 

107 heading: self.get_h_content(text, heading) for heading in headings 

108 } 

109 result = [] 

110 for i in range(len(headings)): 

111 current_heading = headings[i] 

112 start_index = start_end[current_heading][1] 

113 has_next = i < len(headings) - 1 

114 if has_next: 

115 next_heading = headings[i + 1] 

116 end_index = start_end[next_heading][0] 

117 body = text[start_index:end_index] 

118 else: 

119 body = text[start_index:] 

120 

121 result.append([current_heading.strip(), body.strip()]) 

122 return result 

123 

124 @trace_function 

125 def parse_text(self, interface, version, text): 

126 base_link = ( 

127 "https://github.com/canonical/" 

128 "charm-relation-interfaces/blob/main/interfaces/{}/v{}" 

129 ).format(interface, version) 

130 pattern = r"\[.*?\]\(.*?\)" 

131 matches = re.findall(pattern, text) 

132 

133 for match in matches: 

134 element_pattern = r"\[(.*?)\]\((.*?)\)" 

135 element_match = re.search(element_pattern, match) 

136 if element_match: 

137 title = element_match.group(1) 

138 url = element_match.group(2) 

139 absolute_url = url 

140 if absolute_url.startswith("./"): 

141 absolute_url = absolute_url.replace("./", base_link + "/") 

142 

143 text = text.replace(match, f"[{title}]({absolute_url})") 

144 

145 return text 

146 

147 @trace_function 

148 def convert_readme(self, interface, version, text, level=2): 

149 headings_and_contents = self.extract_headings_and_content(text, level) 

150 

151 if len(headings_and_contents) == 0: 

152 return [s.strip("\n") for s in text.split("\n") if s.strip("\n")] 

153 

154 resulting_list = [] 

155 

156 for heading, content in headings_and_contents: 

157 strip_char = "{}{}".format("#" * level, " ") 

158 heading = heading.strip(strip_char) 

159 _temp = {"heading": heading, "level": level, "children": []} 

160 

161 children = [] 

162 

163 result = self.convert_readme( 

164 interface, version, content, level + 1 

165 ) 

166 

167 if len(content) > 0: 

168 body = content.split("#")[0].strip() 

169 body_list = body.split("\n\n") 

170 if len(list(filter(None, body_list))) > 0: 

171 children += body_list 

172 

173 if isinstance(result, list) and isinstance(result[0], dict): 

174 children += result 

175 

176 for child in children: 

177 if isinstance(child, list): 

178 _temp["children"] += child 

179 else: 

180 _temp["children"].append(child) 

181 

182 for index, child in list(enumerate(_temp["children"])): 

183 if isinstance(child, str): 

184 _temp["children"][index] = self.parse_text( 

185 interface, version, child 

186 ) 

187 

188 resulting_list.append(_temp) 

189 

190 return resulting_list 

191 

192 @trace_function 

193 def get_interface_name_from_readme(self, text): 

194 name = re.sub(r"[#` \n]", "", text.split("\n##", 1)[0]).split("/")[0] 

195 return name 

196 

197 

198interface_logic = Interfaces()