Coverage for webapp/helpers.py: 64%

96 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-27 22:07 +0000

1import re 

2import json 

3 

4from bs4 import BeautifulSoup 

5from flask import request 

6from ruamel.yaml import YAML 

7from slugify import slugify 

8from talisker import requests 

9from datetime import datetime, timedelta 

10import mistune 

11from canonicalwebteam.discourse import DiscourseAPI 

12 

13session = requests.get_session() 

14discourse_api = DiscourseAPI( 

15 base_url="https://discourse.charmhub.io/", 

16 session=session, 

17) 

18 

19_yaml = YAML(typ="rt") 

20_yaml_safe = YAML(typ="safe") 

21 

22 

23def get_yaml_loader(typ="safe"): 

24 if typ == "safe": 

25 return _yaml_safe 

26 return _yaml 

27 

28 

29def is_safe_url(url): 

30 """ 

31 Return True if the URL is inside the same app 

32 """ 

33 return url.startswith(request.url_root) or url.startswith("/") 

34 

35 

36def get_soup(html_content): 

37 soup = BeautifulSoup(html_content, "html.parser") 

38 return soup 

39 

40 

41# Change all the headers (if step=2: eg h1 => h3) 

42def decrease_header(header, step): 

43 level = int(header.name[1:]) + step 

44 if level > 6: 

45 level = 6 

46 header.name = f"h{str(level)}" 

47 

48 return header 

49 

50 

51def add_header_id(h, levels): 

52 id = slugify(h.get_text()) 

53 level = int(h.name[1:]) 

54 

55 # Go through previous headings and find any that are lower 

56 levels.append((level, id)) 

57 reversed_levels = list(reversed(levels)) 

58 parents = [] 

59 level_cache = None 

60 for i in reversed_levels: 

61 if i[0] < level and not level_cache: 

62 parents.append(i) 

63 level_cache = i[0] 

64 elif i[0] < level and i[0] < level_cache: 

65 parents.append(i) 

66 level_cache = i[0] 

67 parents.reverse() 

68 if "id" not in h.attrs: 

69 parent_path_id = "" 

70 if len(parents) > 0: 

71 parent_path_id = "--".join([i[1] for i in parents]) + "--" 

72 h["id"] = parent_path_id + id 

73 

74 return h 

75 

76 

77def modify_headers(soup, decrease_step=2): 

78 levels = [] 

79 

80 for header in soup.find_all(re.compile("^h[1-6]$")): 

81 decrease_header(header, decrease_step) 

82 add_header_id(header, levels) 

83 

84 return soup 

85 

86 

87def schedule_banner(start_date: str, end_date: str): 

88 try: 

89 end = datetime.strptime(end_date, "%Y-%m-%d") 

90 start = datetime.strptime(start_date, "%Y-%m-%d") 

91 present = datetime.now() 

92 return start <= present < end 

93 except ValueError: 

94 return False 

95 

96 

97def markdown_to_html(markdown_text): 

98 markdown = mistune.create_markdown(renderer=mistune.HTMLRenderer()) 

99 return markdown(markdown_text) 

100 

101 

102def param_redirect_capture(req, resp): 

103 """ 

104 Functions that captures params and sets a cookie based on a match 

105 with a predefined list. 

106 """ 

107 # Signatures to capture in a cookie 

108 param_signatures = [ 

109 {"endpoint": "/accept-invite", "params": ["package", "token"]} 

110 ] 

111 path = req.path 

112 params = req.args 

113 

114 for item in param_signatures: 

115 # If the endpoint and params match a param_signature 

116 if item["endpoint"] == path and set(params).issubset(item["params"]): 

117 param_values = {} 

118 for param in item["params"]: 

119 param_values[param] = params[param] 

120 # Set the cookie 

121 resp.set_cookie( 

122 "param_redirect", 

123 json.dumps( 

124 {"endpoint": item["endpoint"], "params": param_values} 

125 ), 

126 # Set expiration for 10 days in the future 

127 expires=datetime.now() + timedelta(days=10), 

128 secure=True, 

129 httponly=True, 

130 ) 

131 

132 return resp 

133 

134 

135def param_redirect_exec(req, make_response, redirect): 

136 """ 

137 Function that returns a response, redirecting based on 

138 a matched cookie 

139 """ 

140 # Get cookie data 

141 encoded_redirect_data = req.cookies.get("param_redirect") 

142 

143 if encoded_redirect_data: 

144 redirect_data = json.loads(encoded_redirect_data) 

145 # Only redirect if the current path matches the redirect endpoint 

146 if req.path == redirect_data["endpoint"]: 

147 params = [] 

148 for key, value in redirect_data["params"].items(): 

149 params.append(f"{key}={value}") 

150 response = make_response( 

151 redirect(f'{redirect_data["endpoint"]}?{"&".join(params)}') 

152 ) 

153 response.set_cookie( 

154 "param_redirect", "", expires=0, secure=True, httponly=True 

155 ) 

156 return response 

157 return None 

158 

159 

160def get_csp_as_str(csp={}): 

161 csp_str = "" 

162 for key, values in csp.items(): 

163 csp_value = " ".join(values) 

164 csp_str += f"{key} {csp_value}; " 

165 return csp_str.strip()