Coverage for webapp/metrics/helper.py: 54%

107 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-14 22:07 +0000

1import datetime 

2from dateutil import relativedelta 

3import math 

4 

5 

6def get_filter(metric_name, snap_id, start, end): 

7 return { 

8 "metric_name": metric_name, 

9 "snap_id": snap_id, 

10 "start": start.strftime("%Y-%m-%d"), 

11 "end": end.strftime("%Y-%m-%d"), 

12 } 

13 

14 

15def get_last_metrics_processed_date(): 

16 # We want to give time to the store to proccess all the metrics, 

17 # since the metrics are processed during the night 

18 # https://github.com/canonical-web-and-design/snapcraft.io/pull/616 

19 three_hours = relativedelta.relativedelta(hours=3) 

20 last_metrics_processed = datetime.datetime.now() - three_hours 

21 

22 # Add an extra day on Mondays and Sundays to prevent the reset issue. 

23 if ( 

24 last_metrics_processed.weekday() == 0 

25 or last_metrics_processed.weekday() == 6 

26 ): 

27 days_to_skip = relativedelta.relativedelta(days=2) 

28 else: 

29 days_to_skip = relativedelta.relativedelta(days=1) 

30 

31 return last_metrics_processed.date() - days_to_skip 

32 

33 

34def get_dates_for_metric(metric_period=30, metric_bucket="d"): 

35 end = get_last_metrics_processed_date() 

36 

37 if metric_bucket == "d": 

38 start = end + relativedelta.relativedelta(days=-metric_period) 

39 elif metric_bucket == "m": 

40 start = end + relativedelta.relativedelta(months=-metric_period) 

41 elif metric_bucket == "y": 

42 # Go back an extra day to ensure the granularity increases 

43 start = end + relativedelta.relativedelta( 

44 years=-metric_period, days=-1 

45 ) 

46 return {"end": end, "start": start} 

47 

48 

49def build_metric_query_installed_base( 

50 snap_id, installed_base, metric_period=30, metric_bucket="d" 

51): 

52 """Build the json that will be requested to the API 

53 

54 :param snap_id The snap id 

55 :param installed_base_metric The base metric requested 

56 :param metric_period The metric period requested, by default 30 

57 :param metric_bucket The metric bucket, by default 'd' 

58 

59 :returns A dictionary with the filters for the metrics API. 

60 """ 

61 

62 dates = get_dates_for_metric(metric_period, metric_bucket) 

63 return { 

64 "filters": [ 

65 get_filter( 

66 metric_name=installed_base, 

67 snap_id=snap_id, 

68 start=dates["start"], 

69 end=dates["end"], 

70 ), 

71 ] 

72 } 

73 

74 

75def build_active_device_metric_query(snap_id, installed_base, end, start): 

76 return { 

77 "filters": [ 

78 get_filter( 

79 metric_name=installed_base, 

80 snap_id=snap_id, 

81 start=start, 

82 end=end, 

83 ), 

84 ] 

85 } 

86 

87 

88def build_metric_query_country(snap_id): 

89 """Build the json that will be requested to the API 

90 

91 :param snap_id The snap id 

92 :param installed_base_metric The base metric requested 

93 :param metric_period The metric period requested, by default 30 

94 :param metric_bucket The metric bucket, by default 'd' 

95 

96 :returns A dictionary with the filters for the metrics API, by default 

97 returns also the 'weekly_installed_base_by_country'. 

98 """ 

99 end = get_last_metrics_processed_date() 

100 

101 return { 

102 "filters": [ 

103 get_filter( 

104 metric_name="weekly_installed_base_by_country", 

105 snap_id=snap_id, 

106 start=end, 

107 end=end, 

108 ), 

109 ] 

110 } 

111 

112 

113def find_metric(full_response, name): 

114 """Find a named metric in a metric response 

115 

116 :param full_response: The JSON response from the metrics API 

117 :name: Name of the metric to find 

118 

119 :returns: A dictionary with the metric information 

120 """ 

121 for metric in full_response: 

122 if metric["metric_name"] == name: 

123 return metric 

124 

125 

126def build_snap_installs_metrics_query(snaps, get_filter=get_filter): 

127 """Transforms an API response from the publisher metrics 

128 

129 :param snaps: dict containing snaps we want metrics for 

130 :param get_filter: function that builds a single filter payload 

131 

132 :returns: A dict containing a filter for each snap in snaps 

133 or empty if there are no snaps 

134 """ 

135 if not snaps: 

136 return {} 

137 

138 end = get_last_metrics_processed_date() 

139 start = end + relativedelta.relativedelta(months=-1) 

140 

141 metrics_query = {"filters": []} 

142 for snap_name in snaps: 

143 metrics_query["filters"].append( 

144 get_filter( 

145 metric_name="weekly_device_change", 

146 snap_id=snaps[snap_name], 

147 start=start, 

148 end=end, 

149 ) 

150 ) 

151 

152 return metrics_query 

153 

154 

155def get_days_without_data(metrics_response): 

156 days_without_data = set() 

157 for metric in metrics_response["metrics"]: 

158 if metric["status"] == "OK": 

159 for series in metric["series"]: 

160 none_indexes = [ 

161 i for i, val in enumerate(series["values"]) if val is None 

162 ] 

163 days_without_data.update( 

164 metric["buckets"][i] for i in none_indexes 

165 ) 

166 return list(days_without_data) 

167 

168 

169def transform_metrics(metrics, metrics_response, snaps): 

170 """Transforms an API response from the publisher metrics 

171 

172 :param metrics_response: The JSON response from the metrics API 

173 

174 :returns: A dictionary with the metric information 

175 """ 

176 for metric in metrics_response["metrics"]: 

177 if metric["status"] == "OK": 

178 snap_id = metric["snap_id"] 

179 

180 snap_name = None 

181 for snaps_name, snaps_id in snaps.items(): 

182 if snaps_id == snap_id: 

183 snap_name = snaps_name 

184 

185 metrics["snaps"].append( 

186 {"id": snap_id, "name": snap_name, "series": metric["series"]} 

187 ) 

188 metrics["buckets"] = metric["buckets"] 

189 metrics["days_without_data"] = get_days_without_data(metrics_response) 

190 return metrics 

191 

192 

193def lttb_select_indices(values, target_size): 

194 """ 

195 Selects indices using the LTTB algorithm for downsampling, 

196 treating None as 0. 

197 """ 

198 n = len(values) 

199 if n <= target_size: 

200 return list(range(n)) 

201 

202 # Initialize bucket size 

203 bucket_size = (n - 2) / (target_size - 2) 

204 indices = [] 

205 

206 current_bucket_start = 0 

207 for i in range(1, target_size - 1): 

208 next_bucket_start = min(math.ceil((i + 1) * bucket_size), n - 1) 

209 

210 max_area = 0 

211 max_area_idx = current_bucket_start 

212 

213 point1 = ( 

214 current_bucket_start, 

215 ( 

216 values[current_bucket_start] 

217 if values[current_bucket_start] is not None 

218 else 0 

219 ), 

220 ) 

221 point2 = ( 

222 next_bucket_start, 

223 ( 

224 values[next_bucket_start] 

225 if values[next_bucket_start] is not None 

226 else 0 

227 ), 

228 ) 

229 

230 for j in range(current_bucket_start + 1, next_bucket_start): 

231 val_j = values[j] if values[j] is not None else 0 

232 

233 # Area of triangle formed by point1, point2, and the current point 

234 area = abs( 

235 (point1[0] - point2[0]) * (val_j - point1[1]) 

236 - (point1[0] - j) * (point2[1] - point1[1]) 

237 ) 

238 if area > max_area: 

239 max_area = area 

240 max_area_idx = j 

241 

242 indices.append(max_area_idx) 

243 current_bucket_start = next_bucket_start 

244 

245 indices.append(n - 1) 

246 return indices 

247 

248 

249def normalize_series(series, bucket_count): 

250 """ 

251 Ensure all value arrays in the series have the same size 

252 by padding with 0s. 

253 """ 

254 for item in series: 

255 values = item["values"] 

256 # If the series has no values, fill it with 0s 

257 if not values: 

258 item["values"] = [0] * bucket_count 

259 # Extend the values with 0 if they are shorter than the bucket count 

260 elif len(values) < bucket_count: 

261 item["values"].extend([0] * (bucket_count - len(values))) 

262 

263 

264def downsample_series(buckets, series, target_size): 

265 """Downsample each series in the data, treating None as 0.""" 

266 downsampled_buckets = [] 

267 downsampled_series = [] 

268 

269 # Handle case where series is empty 

270 if not series: 

271 return buckets[:target_size], [] 

272 

273 bucket_count = len(buckets) 

274 # Normalize series first to make sure all series have the same length 

275 normalize_series(series, bucket_count) 

276 

277 # Downsample each series independently 

278 for item in series: 

279 name = item["name"] 

280 values = item["values"] 

281 

282 selected_indices = lttb_select_indices(values, target_size) 

283 

284 # Collect the buckets and values based on the selected indices 

285 downsampled_buckets = [buckets[i] for i in selected_indices] 

286 downsampled_values = [ 

287 values[i] if values[i] is not None else 0 for i in selected_indices 

288 ] 

289 

290 downsampled_series.append({"name": name, "values": downsampled_values}) 

291 

292 return downsampled_buckets, downsampled_series