Coverage for webapp/helpers.py: 64%
96 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-27 22:07 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-27 22:07 +0000
1import re
2import json
4from bs4 import BeautifulSoup
5from flask import request
6from ruamel.yaml import YAML
7from slugify import slugify
8from talisker import requests
9from datetime import datetime, timedelta
10import mistune
11from canonicalwebteam.discourse import DiscourseAPI
13session = requests.get_session()
14discourse_api = DiscourseAPI(
15 base_url="https://discourse.charmhub.io/",
16 session=session,
17)
19_yaml = YAML(typ="rt")
20_yaml_safe = YAML(typ="safe")
23def get_yaml_loader(typ="safe"):
24 if typ == "safe":
25 return _yaml_safe
26 return _yaml
29def is_safe_url(url):
30 """
31 Return True if the URL is inside the same app
32 """
33 return url.startswith(request.url_root) or url.startswith("/")
36def get_soup(html_content):
37 soup = BeautifulSoup(html_content, "html.parser")
38 return soup
41# Change all the headers (if step=2: eg h1 => h3)
42def decrease_header(header, step):
43 level = int(header.name[1:]) + step
44 if level > 6:
45 level = 6
46 header.name = f"h{str(level)}"
48 return header
51def add_header_id(h, levels):
52 id = slugify(h.get_text())
53 level = int(h.name[1:])
55 # Go through previous headings and find any that are lower
56 levels.append((level, id))
57 reversed_levels = list(reversed(levels))
58 parents = []
59 level_cache = None
60 for i in reversed_levels:
61 if i[0] < level and not level_cache:
62 parents.append(i)
63 level_cache = i[0]
64 elif i[0] < level and i[0] < level_cache:
65 parents.append(i)
66 level_cache = i[0]
67 parents.reverse()
68 if "id" not in h.attrs:
69 parent_path_id = ""
70 if len(parents) > 0:
71 parent_path_id = "--".join([i[1] for i in parents]) + "--"
72 h["id"] = parent_path_id + id
74 return h
77def modify_headers(soup, decrease_step=2):
78 levels = []
80 for header in soup.find_all(re.compile("^h[1-6]$")):
81 decrease_header(header, decrease_step)
82 add_header_id(header, levels)
84 return soup
87def schedule_banner(start_date: str, end_date: str):
88 try:
89 end = datetime.strptime(end_date, "%Y-%m-%d")
90 start = datetime.strptime(start_date, "%Y-%m-%d")
91 present = datetime.now()
92 return start <= present < end
93 except ValueError:
94 return False
97def markdown_to_html(markdown_text):
98 markdown = mistune.create_markdown(renderer=mistune.HTMLRenderer())
99 return markdown(markdown_text)
102def param_redirect_capture(req, resp):
103 """
104 Functions that captures params and sets a cookie based on a match
105 with a predefined list.
106 """
107 # Signatures to capture in a cookie
108 param_signatures = [
109 {"endpoint": "/accept-invite", "params": ["package", "token"]}
110 ]
111 path = req.path
112 params = req.args
114 for item in param_signatures:
115 # If the endpoint and params match a param_signature
116 if item["endpoint"] == path and set(params).issubset(item["params"]):
117 param_values = {}
118 for param in item["params"]:
119 param_values[param] = params[param]
120 # Set the cookie
121 resp.set_cookie(
122 "param_redirect",
123 json.dumps(
124 {"endpoint": item["endpoint"], "params": param_values}
125 ),
126 # Set expiration for 10 days in the future
127 expires=datetime.now() + timedelta(days=10),
128 secure=True,
129 httponly=True,
130 )
132 return resp
135def param_redirect_exec(req, make_response, redirect):
136 """
137 Function that returns a response, redirecting based on
138 a matched cookie
139 """
140 # Get cookie data
141 encoded_redirect_data = req.cookies.get("param_redirect")
143 if encoded_redirect_data:
144 redirect_data = json.loads(encoded_redirect_data)
145 # Only redirect if the current path matches the redirect endpoint
146 if req.path == redirect_data["endpoint"]:
147 params = []
148 for key, value in redirect_data["params"].items():
149 params.append(f"{key}={value}")
150 response = make_response(
151 redirect(f'{redirect_data["endpoint"]}?{"&".join(params)}')
152 )
153 response.set_cookie(
154 "param_redirect", "", expires=0, secure=True, httponly=True
155 )
156 return response
157 return None
160def get_csp_as_str(csp={}):
161 csp_str = ""
162 for key, values in csp.items():
163 csp_value = " ".join(values)
164 csp_str += f"{key} {csp_value}; "
165 return csp_str.strip()