Coverage for webapp/helpers.py: 39%

100 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-17 22:07 +0000

1import re 

2import json 

3 

4from bs4 import BeautifulSoup 

5from flask import request 

6from ruamel.yaml import YAML 

7from slugify import slugify 

8from talisker import requests 

9from datetime import datetime, timedelta 

10import mistune 

11 

12session = requests.get_session() 

13 

14_yaml = YAML(typ="rt") 

15_yaml_safe = YAML(typ="safe") 

16 

17 

18def get_yaml_loader(typ="safe"): 

19 if typ == "safe": 

20 return _yaml_safe 

21 return _yaml 

22 

23 

24def is_safe_url(url): 

25 """ 

26 Return True if the URL is inside the same app 

27 """ 

28 return url.startswith(request.url_root) or url.startswith("/") 

29 

30 

31def get_soup(html_content): 

32 soup = BeautifulSoup(html_content, "html.parser") 

33 return soup 

34 

35 

36# Change all the headers (if step=2: eg h1 => h3) 

37def decrease_header(header, step): 

38 level = int(header.name[1:]) + step 

39 if level > 6: 

40 level = 6 

41 header.name = f"h{str(level)}" 

42 

43 return header 

44 

45 

46def add_header_id(h, levels): 

47 id = slugify(h.get_text()) 

48 level = int(h.name[1:]) 

49 

50 # Go through previous headings and find any that are lower 

51 levels.append((level, id)) 

52 reversed_levels = list(reversed(levels)) 

53 parents = [] 

54 level_cache = None 

55 for i in reversed_levels: 

56 if i[0] < level and not level_cache: 

57 parents.append(i) 

58 level_cache = i[0] 

59 elif i[0] < level and i[0] < level_cache: 

60 parents.append(i) 

61 level_cache = i[0] 

62 parents.reverse() 

63 if "id" not in h.attrs: 

64 parent_path_id = "" 

65 if len(parents) > 0: 

66 parent_path_id = "--".join([i[1] for i in parents]) + "--" 

67 h["id"] = parent_path_id + id 

68 

69 return h 

70 

71 

72def modify_headers(soup, decrease_step=2): 

73 levels = [] 

74 

75 for header in soup.find_all(re.compile("^h[1-6]$")): 

76 decrease_header(header, decrease_step) 

77 add_header_id(header, levels) 

78 

79 return soup 

80 

81 

82def schedule_banner(start_date: str, end_date: str): 

83 try: 

84 end = datetime.strptime(end_date, "%Y-%m-%d") 

85 start = datetime.strptime(start_date, "%Y-%m-%d") 

86 present = datetime.now() 

87 return start <= present < end 

88 except ValueError: 

89 return False 

90 

91 

92def markdown_to_html(markdown_text): 

93 markdown = mistune.create_markdown(renderer=mistune.HTMLRenderer()) 

94 return markdown(markdown_text) 

95 

96 

97def param_redirect_capture(req, resp): 

98 """ 

99 Functions that captures params and sets a cookie based on a match 

100 with a predefined list. 

101 """ 

102 # Signatures to capture in a cookie 

103 param_signatures = [ 

104 {"endpoint": "/accept-invite", "params": ["package", "token"]} 

105 ] 

106 path = req.path 

107 params = req.args 

108 

109 for item in param_signatures: 

110 # If the endpoint and params match a param_signature 

111 if item["endpoint"] == path and set(params).issubset(item["params"]): 

112 param_values = {} 

113 for param in item["params"]: 

114 param_values[param] = params[param] 

115 # Set the cookie 

116 resp.set_cookie( 

117 "param_redirect", 

118 json.dumps( 

119 {"endpoint": item["endpoint"], "params": param_values} 

120 ), 

121 # Set expiration for 10 days in the future 

122 expires=datetime.now() + timedelta(days=10), 

123 ) 

124 

125 return resp 

126 

127 

128def param_redirect_exec(req, make_response, redirect): 

129 """ 

130 Function that returns a response, redirecting based on 

131 a matched cookie 

132 """ 

133 # Get cookie data 

134 encoded_redirect_data = req.cookies.get("param_redirect") 

135 

136 if encoded_redirect_data: 

137 redirect_data = json.loads(encoded_redirect_data) 

138 # Only redirect if the current path matches the redirect endpoint 

139 if req.path == redirect_data["endpoint"]: 

140 params = [] 

141 for key, value in redirect_data["params"].items(): 

142 params.append(f"{key}={value}") 

143 response = make_response( 

144 redirect(f'{redirect_data["endpoint"]}?{"&".join(params)}') 

145 ) 

146 response.set_cookie("param_redirect", "", expires=0) 

147 return response 

148 return None 

149 

150 

151def get_csp_as_str(csp={}): 

152 csp_str = "" 

153 for key, values in csp.items(): 

154 csp_value = " ".join(values) 

155 csp_str += f"{key} {csp_value}; " 

156 return csp_str.strip() 

157 

158 

159def is_date_format(string): 

160 try: 

161 datetime.strptime(string, "%d %b %Y") 

162 return True 

163 except ValueError: 

164 return False