Coverage for webapp/metrics/helper.py: 54%
107 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-14 22:07 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-14 22:07 +0000
1import datetime
2from dateutil import relativedelta
3import math
6def get_filter(metric_name, snap_id, start, end):
7 return {
8 "metric_name": metric_name,
9 "snap_id": snap_id,
10 "start": start.strftime("%Y-%m-%d"),
11 "end": end.strftime("%Y-%m-%d"),
12 }
15def get_last_metrics_processed_date():
16 # We want to give time to the store to proccess all the metrics,
17 # since the metrics are processed during the night
18 # https://github.com/canonical-web-and-design/snapcraft.io/pull/616
19 three_hours = relativedelta.relativedelta(hours=3)
20 last_metrics_processed = datetime.datetime.now() - three_hours
22 # Add an extra day on Mondays and Sundays to prevent the reset issue.
23 if (
24 last_metrics_processed.weekday() == 0
25 or last_metrics_processed.weekday() == 6
26 ):
27 days_to_skip = relativedelta.relativedelta(days=2)
28 else:
29 days_to_skip = relativedelta.relativedelta(days=1)
31 return last_metrics_processed.date() - days_to_skip
34def get_dates_for_metric(metric_period=30, metric_bucket="d"):
35 end = get_last_metrics_processed_date()
37 if metric_bucket == "d":
38 start = end + relativedelta.relativedelta(days=-metric_period)
39 elif metric_bucket == "m":
40 start = end + relativedelta.relativedelta(months=-metric_period)
41 elif metric_bucket == "y":
42 # Go back an extra day to ensure the granularity increases
43 start = end + relativedelta.relativedelta(
44 years=-metric_period, days=-1
45 )
46 return {"end": end, "start": start}
49def build_metric_query_installed_base(
50 snap_id, installed_base, metric_period=30, metric_bucket="d"
51):
52 """Build the json that will be requested to the API
54 :param snap_id The snap id
55 :param installed_base_metric The base metric requested
56 :param metric_period The metric period requested, by default 30
57 :param metric_bucket The metric bucket, by default 'd'
59 :returns A dictionary with the filters for the metrics API.
60 """
62 dates = get_dates_for_metric(metric_period, metric_bucket)
63 return {
64 "filters": [
65 get_filter(
66 metric_name=installed_base,
67 snap_id=snap_id,
68 start=dates["start"],
69 end=dates["end"],
70 ),
71 ]
72 }
75def build_active_device_metric_query(snap_id, installed_base, end, start):
76 return {
77 "filters": [
78 get_filter(
79 metric_name=installed_base,
80 snap_id=snap_id,
81 start=start,
82 end=end,
83 ),
84 ]
85 }
88def build_metric_query_country(snap_id):
89 """Build the json that will be requested to the API
91 :param snap_id The snap id
92 :param installed_base_metric The base metric requested
93 :param metric_period The metric period requested, by default 30
94 :param metric_bucket The metric bucket, by default 'd'
96 :returns A dictionary with the filters for the metrics API, by default
97 returns also the 'weekly_installed_base_by_country'.
98 """
99 end = get_last_metrics_processed_date()
101 return {
102 "filters": [
103 get_filter(
104 metric_name="weekly_installed_base_by_country",
105 snap_id=snap_id,
106 start=end,
107 end=end,
108 ),
109 ]
110 }
113def find_metric(full_response, name):
114 """Find a named metric in a metric response
116 :param full_response: The JSON response from the metrics API
117 :name: Name of the metric to find
119 :returns: A dictionary with the metric information
120 """
121 for metric in full_response:
122 if metric["metric_name"] == name:
123 return metric
126def build_snap_installs_metrics_query(snaps, get_filter=get_filter):
127 """Transforms an API response from the publisher metrics
129 :param snaps: dict containing snaps we want metrics for
130 :param get_filter: function that builds a single filter payload
132 :returns: A dict containing a filter for each snap in snaps
133 or empty if there are no snaps
134 """
135 if not snaps:
136 return {}
138 end = get_last_metrics_processed_date()
139 start = end + relativedelta.relativedelta(months=-1)
141 metrics_query = {"filters": []}
142 for snap_name in snaps:
143 metrics_query["filters"].append(
144 get_filter(
145 metric_name="weekly_device_change",
146 snap_id=snaps[snap_name],
147 start=start,
148 end=end,
149 )
150 )
152 return metrics_query
155def get_days_without_data(metrics_response):
156 days_without_data = set()
157 for metric in metrics_response["metrics"]:
158 if metric["status"] == "OK":
159 for series in metric["series"]:
160 none_indexes = [
161 i for i, val in enumerate(series["values"]) if val is None
162 ]
163 days_without_data.update(
164 metric["buckets"][i] for i in none_indexes
165 )
166 return list(days_without_data)
169def transform_metrics(metrics, metrics_response, snaps):
170 """Transforms an API response from the publisher metrics
172 :param metrics_response: The JSON response from the metrics API
174 :returns: A dictionary with the metric information
175 """
176 for metric in metrics_response["metrics"]:
177 if metric["status"] == "OK":
178 snap_id = metric["snap_id"]
180 snap_name = None
181 for snaps_name, snaps_id in snaps.items():
182 if snaps_id == snap_id:
183 snap_name = snaps_name
185 metrics["snaps"].append(
186 {"id": snap_id, "name": snap_name, "series": metric["series"]}
187 )
188 metrics["buckets"] = metric["buckets"]
189 metrics["days_without_data"] = get_days_without_data(metrics_response)
190 return metrics
193def lttb_select_indices(values, target_size):
194 """
195 Selects indices using the LTTB algorithm for downsampling,
196 treating None as 0.
197 """
198 n = len(values)
199 if n <= target_size:
200 return list(range(n))
202 # Initialize bucket size
203 bucket_size = (n - 2) / (target_size - 2)
204 indices = []
206 current_bucket_start = 0
207 for i in range(1, target_size - 1):
208 next_bucket_start = min(math.ceil((i + 1) * bucket_size), n - 1)
210 max_area = 0
211 max_area_idx = current_bucket_start
213 point1 = (
214 current_bucket_start,
215 (
216 values[current_bucket_start]
217 if values[current_bucket_start] is not None
218 else 0
219 ),
220 )
221 point2 = (
222 next_bucket_start,
223 (
224 values[next_bucket_start]
225 if values[next_bucket_start] is not None
226 else 0
227 ),
228 )
230 for j in range(current_bucket_start + 1, next_bucket_start):
231 val_j = values[j] if values[j] is not None else 0
233 # Area of triangle formed by point1, point2, and the current point
234 area = abs(
235 (point1[0] - point2[0]) * (val_j - point1[1])
236 - (point1[0] - j) * (point2[1] - point1[1])
237 )
238 if area > max_area:
239 max_area = area
240 max_area_idx = j
242 indices.append(max_area_idx)
243 current_bucket_start = next_bucket_start
245 indices.append(n - 1)
246 return indices
249def normalize_series(series, bucket_count):
250 """
251 Ensure all value arrays in the series have the same size
252 by padding with 0s.
253 """
254 for item in series:
255 values = item["values"]
256 # If the series has no values, fill it with 0s
257 if not values:
258 item["values"] = [0] * bucket_count
259 # Extend the values with 0 if they are shorter than the bucket count
260 elif len(values) < bucket_count:
261 item["values"].extend([0] * (bucket_count - len(values)))
264def downsample_series(buckets, series, target_size):
265 """Downsample each series in the data, treating None as 0."""
266 downsampled_buckets = []
267 downsampled_series = []
269 # Handle case where series is empty
270 if not series:
271 return buckets[:target_size], []
273 bucket_count = len(buckets)
274 # Normalize series first to make sure all series have the same length
275 normalize_series(series, bucket_count)
277 # Downsample each series independently
278 for item in series:
279 name = item["name"]
280 values = item["values"]
282 selected_indices = lttb_select_indices(values, target_size)
284 # Collect the buckets and values based on the selected indices
285 downsampled_buckets = [buckets[i] for i in selected_indices]
286 downsampled_values = [
287 values[i] if values[i] is not None else 0 for i in selected_indices
288 ]
290 downsampled_series.append({"name": name, "values": downsampled_values})
292 return downsampled_buckets, downsampled_series