Coverage for webapp/metrics/helper.py: 50%
98 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-28 22:05 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-28 22:05 +0000
1import datetime
2from dateutil import relativedelta
3import math
6def get_filter(metric_name, snap_id, start, end):
7 return {
8 "metric_name": metric_name,
9 "snap_id": snap_id,
10 "start": start.strftime("%Y-%m-%d"),
11 "end": end.strftime("%Y-%m-%d"),
12 }
15def get_last_metrics_processed_date():
16 # We want to give time to the store to proccess all the metrics,
17 # since the metrics are processed during the night
18 # https://github.com/canonical-web-and-design/snapcraft.io/pull/616
19 three_hours = relativedelta.relativedelta(hours=3)
20 last_metrics_processed = datetime.datetime.now() - three_hours
22 # Add an extra day on Mondays and Sundays to prevent the reset issue.
23 if (
24 last_metrics_processed.weekday() == 0
25 or last_metrics_processed.weekday() == 6
26 ):
27 days_to_skip = relativedelta.relativedelta(days=2)
28 else:
29 days_to_skip = relativedelta.relativedelta(days=1)
31 return last_metrics_processed.date() - days_to_skip
34def get_dates_for_metric(metric_period=30, metric_bucket="d"):
35 end = get_last_metrics_processed_date()
37 if metric_bucket == "d":
38 start = end + relativedelta.relativedelta(days=-metric_period)
39 elif metric_bucket == "m":
40 start = end + relativedelta.relativedelta(months=-metric_period)
41 elif metric_bucket == "y":
42 # Go back an extra day to ensure the granularity increases
43 start = end + relativedelta.relativedelta(
44 years=-metric_period, days=-1
45 )
46 return {"end": end, "start": start}
49def build_metric_query_installed_base(
50 snap_id, installed_base, metric_period=30, metric_bucket="d"
51):
52 """Build the json that will be requested to the API
54 :param snap_id The snap id
55 :param installed_base_metric The base metric requested
56 :param metric_period The metric period requested, by default 30
57 :param metric_bucket The metric bucket, by default 'd'
59 :returns A dictionary with the filters for the metrics API.
60 """
62 dates = get_dates_for_metric(metric_period, metric_bucket)
63 return {
64 "filters": [
65 get_filter(
66 metric_name=installed_base,
67 snap_id=snap_id,
68 start=dates["start"],
69 end=dates["end"],
70 ),
71 ]
72 }
75def build_active_device_metric_query(snap_id, installed_base, end, start):
76 return {
77 "filters": [
78 get_filter(
79 metric_name=installed_base,
80 snap_id=snap_id,
81 start=start,
82 end=end,
83 ),
84 ]
85 }
88def build_metric_query_country(snap_id):
89 """Build the json that will be requested to the API
91 :param snap_id The snap id
92 :param installed_base_metric The base metric requested
93 :param metric_period The metric period requested, by default 30
94 :param metric_bucket The metric bucket, by default 'd'
96 :returns A dictionary with the filters for the metrics API, by default
97 returns also the 'weekly_installed_base_by_country'.
98 """
99 end = get_last_metrics_processed_date()
101 return {
102 "filters": [
103 get_filter(
104 metric_name="weekly_installed_base_by_country",
105 snap_id=snap_id,
106 start=end,
107 end=end,
108 ),
109 ]
110 }
113def find_metric(full_response, name):
114 """Find a named metric in a metric response
116 :param full_response: The JSON response from the metrics API
117 :name: Name of the metric to find
119 :returns: A dictionary with the metric information
120 """
121 for metric in full_response:
122 if metric["metric_name"] == name:
123 return metric
126def build_snap_installs_metrics_query(snaps, get_filter=get_filter):
127 """Transforms an API response from the publisher metrics
129 :param snaps: dict containing snaps we want metrics for
130 :param get_filter: function that builds a single filter payload
132 :returns: A dict containing a filter for each snap in snaps
133 or empty if there are no snaps
134 """
135 if not snaps:
136 return {}
138 end = get_last_metrics_processed_date()
139 start = end + relativedelta.relativedelta(months=-1)
141 metrics_query = {"filters": []}
142 for snap_name in snaps:
143 metrics_query["filters"].append(
144 get_filter(
145 metric_name="weekly_device_change",
146 snap_id=snaps[snap_name],
147 start=start,
148 end=end,
149 )
150 )
152 return metrics_query
155def transform_metrics(metrics, metrics_response, snaps):
156 """Transforms an API response from the publisher metrics
158 :param metrics_response: The JSON response from the metrics API
160 :returns: A dictionary with the metric information
161 """
162 for metric in metrics_response["metrics"]:
163 if metric["status"] == "OK":
164 snap_id = metric["snap_id"]
166 snap_name = None
167 for snaps_name, snaps_id in snaps.items():
168 if snaps_id == snap_id:
169 snap_name = snaps_name
171 metrics["snaps"].append(
172 {"id": snap_id, "name": snap_name, "series": metric["series"]}
173 )
174 metrics["buckets"] = metric["buckets"]
176 return metrics
179def lttb_select_indices(values, target_size):
180 """
181 Selects indices using the LTTB algorithm for downsampling,
182 treating None as 0.
183 """
184 n = len(values)
185 if n <= target_size:
186 return list(range(n))
188 # Initialize bucket size
189 bucket_size = (n - 2) / (target_size - 2)
190 indices = []
192 current_bucket_start = 0
193 for i in range(1, target_size - 1):
194 next_bucket_start = min(math.ceil((i + 1) * bucket_size), n - 1)
196 max_area = 0
197 max_area_idx = current_bucket_start
199 point1 = (
200 current_bucket_start,
201 (
202 values[current_bucket_start]
203 if values[current_bucket_start] is not None
204 else 0
205 ),
206 )
207 point2 = (
208 next_bucket_start,
209 (
210 values[next_bucket_start]
211 if values[next_bucket_start] is not None
212 else 0
213 ),
214 )
216 for j in range(current_bucket_start + 1, next_bucket_start):
217 val_j = values[j] if values[j] is not None else 0
219 # Area of triangle formed by point1, point2, and the current point
220 area = abs(
221 (point1[0] - point2[0]) * (val_j - point1[1])
222 - (point1[0] - j) * (point2[1] - point1[1])
223 )
224 if area > max_area:
225 max_area = area
226 max_area_idx = j
228 indices.append(max_area_idx)
229 current_bucket_start = next_bucket_start
231 indices.append(n - 1)
232 return indices
235def normalize_series(series, bucket_count):
236 """
237 Ensure all value arrays in the series have the same size
238 by padding with 0s.
239 """
240 for item in series:
241 values = item["values"]
242 # If the series has no values, fill it with 0s
243 if not values:
244 item["values"] = [0] * bucket_count
245 # Extend the values with 0 if they are shorter than the bucket count
246 elif len(values) < bucket_count:
247 item["values"].extend([0] * (bucket_count - len(values)))
250def downsample_series(buckets, series, target_size):
251 """Downsample each series in the data, treating None as 0."""
252 downsampled_buckets = []
253 downsampled_series = []
255 # Handle case where series is empty
256 if not series:
257 return buckets[:target_size], []
259 bucket_count = len(buckets)
260 # Normalize series first to make sure all series have the same length
261 normalize_series(series, bucket_count)
263 # Downsample each series independently
264 for item in series:
265 name = item["name"]
266 values = item["values"]
268 selected_indices = lttb_select_indices(values, target_size)
270 # Collect the buckets and values based on the selected indices
271 downsampled_buckets = [buckets[i] for i in selected_indices]
272 downsampled_values = [
273 values[i] if values[i] is not None else 0 for i in selected_indices
274 ]
276 downsampled_series.append({"name": name, "values": downsampled_values})
278 return downsampled_buckets, downsampled_series