Coverage for webapp/metrics/helper.py: 50%

98 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-28 22:05 +0000

1import datetime 

2from dateutil import relativedelta 

3import math 

4 

5 

6def get_filter(metric_name, snap_id, start, end): 

7 return { 

8 "metric_name": metric_name, 

9 "snap_id": snap_id, 

10 "start": start.strftime("%Y-%m-%d"), 

11 "end": end.strftime("%Y-%m-%d"), 

12 } 

13 

14 

15def get_last_metrics_processed_date(): 

16 # We want to give time to the store to proccess all the metrics, 

17 # since the metrics are processed during the night 

18 # https://github.com/canonical-web-and-design/snapcraft.io/pull/616 

19 three_hours = relativedelta.relativedelta(hours=3) 

20 last_metrics_processed = datetime.datetime.now() - three_hours 

21 

22 # Add an extra day on Mondays and Sundays to prevent the reset issue. 

23 if ( 

24 last_metrics_processed.weekday() == 0 

25 or last_metrics_processed.weekday() == 6 

26 ): 

27 days_to_skip = relativedelta.relativedelta(days=2) 

28 else: 

29 days_to_skip = relativedelta.relativedelta(days=1) 

30 

31 return last_metrics_processed.date() - days_to_skip 

32 

33 

34def get_dates_for_metric(metric_period=30, metric_bucket="d"): 

35 end = get_last_metrics_processed_date() 

36 

37 if metric_bucket == "d": 

38 start = end + relativedelta.relativedelta(days=-metric_period) 

39 elif metric_bucket == "m": 

40 start = end + relativedelta.relativedelta(months=-metric_period) 

41 elif metric_bucket == "y": 

42 # Go back an extra day to ensure the granularity increases 

43 start = end + relativedelta.relativedelta( 

44 years=-metric_period, days=-1 

45 ) 

46 return {"end": end, "start": start} 

47 

48 

49def build_metric_query_installed_base( 

50 snap_id, installed_base, metric_period=30, metric_bucket="d" 

51): 

52 """Build the json that will be requested to the API 

53 

54 :param snap_id The snap id 

55 :param installed_base_metric The base metric requested 

56 :param metric_period The metric period requested, by default 30 

57 :param metric_bucket The metric bucket, by default 'd' 

58 

59 :returns A dictionary with the filters for the metrics API. 

60 """ 

61 

62 dates = get_dates_for_metric(metric_period, metric_bucket) 

63 return { 

64 "filters": [ 

65 get_filter( 

66 metric_name=installed_base, 

67 snap_id=snap_id, 

68 start=dates["start"], 

69 end=dates["end"], 

70 ), 

71 ] 

72 } 

73 

74 

75def build_active_device_metric_query(snap_id, installed_base, end, start): 

76 return { 

77 "filters": [ 

78 get_filter( 

79 metric_name=installed_base, 

80 snap_id=snap_id, 

81 start=start, 

82 end=end, 

83 ), 

84 ] 

85 } 

86 

87 

88def build_metric_query_country(snap_id): 

89 """Build the json that will be requested to the API 

90 

91 :param snap_id The snap id 

92 :param installed_base_metric The base metric requested 

93 :param metric_period The metric period requested, by default 30 

94 :param metric_bucket The metric bucket, by default 'd' 

95 

96 :returns A dictionary with the filters for the metrics API, by default 

97 returns also the 'weekly_installed_base_by_country'. 

98 """ 

99 end = get_last_metrics_processed_date() 

100 

101 return { 

102 "filters": [ 

103 get_filter( 

104 metric_name="weekly_installed_base_by_country", 

105 snap_id=snap_id, 

106 start=end, 

107 end=end, 

108 ), 

109 ] 

110 } 

111 

112 

113def find_metric(full_response, name): 

114 """Find a named metric in a metric response 

115 

116 :param full_response: The JSON response from the metrics API 

117 :name: Name of the metric to find 

118 

119 :returns: A dictionary with the metric information 

120 """ 

121 for metric in full_response: 

122 if metric["metric_name"] == name: 

123 return metric 

124 

125 

126def build_snap_installs_metrics_query(snaps, get_filter=get_filter): 

127 """Transforms an API response from the publisher metrics 

128 

129 :param snaps: dict containing snaps we want metrics for 

130 :param get_filter: function that builds a single filter payload 

131 

132 :returns: A dict containing a filter for each snap in snaps 

133 or empty if there are no snaps 

134 """ 

135 if not snaps: 

136 return {} 

137 

138 end = get_last_metrics_processed_date() 

139 start = end + relativedelta.relativedelta(months=-1) 

140 

141 metrics_query = {"filters": []} 

142 for snap_name in snaps: 

143 metrics_query["filters"].append( 

144 get_filter( 

145 metric_name="weekly_device_change", 

146 snap_id=snaps[snap_name], 

147 start=start, 

148 end=end, 

149 ) 

150 ) 

151 

152 return metrics_query 

153 

154 

155def transform_metrics(metrics, metrics_response, snaps): 

156 """Transforms an API response from the publisher metrics 

157 

158 :param metrics_response: The JSON response from the metrics API 

159 

160 :returns: A dictionary with the metric information 

161 """ 

162 for metric in metrics_response["metrics"]: 

163 if metric["status"] == "OK": 

164 snap_id = metric["snap_id"] 

165 

166 snap_name = None 

167 for snaps_name, snaps_id in snaps.items(): 

168 if snaps_id == snap_id: 

169 snap_name = snaps_name 

170 

171 metrics["snaps"].append( 

172 {"id": snap_id, "name": snap_name, "series": metric["series"]} 

173 ) 

174 metrics["buckets"] = metric["buckets"] 

175 

176 return metrics 

177 

178 

179def lttb_select_indices(values, target_size): 

180 """ 

181 Selects indices using the LTTB algorithm for downsampling, 

182 treating None as 0. 

183 """ 

184 n = len(values) 

185 if n <= target_size: 

186 return list(range(n)) 

187 

188 # Initialize bucket size 

189 bucket_size = (n - 2) / (target_size - 2) 

190 indices = [] 

191 

192 current_bucket_start = 0 

193 for i in range(1, target_size - 1): 

194 next_bucket_start = min(math.ceil((i + 1) * bucket_size), n - 1) 

195 

196 max_area = 0 

197 max_area_idx = current_bucket_start 

198 

199 point1 = ( 

200 current_bucket_start, 

201 ( 

202 values[current_bucket_start] 

203 if values[current_bucket_start] is not None 

204 else 0 

205 ), 

206 ) 

207 point2 = ( 

208 next_bucket_start, 

209 ( 

210 values[next_bucket_start] 

211 if values[next_bucket_start] is not None 

212 else 0 

213 ), 

214 ) 

215 

216 for j in range(current_bucket_start + 1, next_bucket_start): 

217 val_j = values[j] if values[j] is not None else 0 

218 

219 # Area of triangle formed by point1, point2, and the current point 

220 area = abs( 

221 (point1[0] - point2[0]) * (val_j - point1[1]) 

222 - (point1[0] - j) * (point2[1] - point1[1]) 

223 ) 

224 if area > max_area: 

225 max_area = area 

226 max_area_idx = j 

227 

228 indices.append(max_area_idx) 

229 current_bucket_start = next_bucket_start 

230 

231 indices.append(n - 1) 

232 return indices 

233 

234 

235def normalize_series(series, bucket_count): 

236 """ 

237 Ensure all value arrays in the series have the same size 

238 by padding with 0s. 

239 """ 

240 for item in series: 

241 values = item["values"] 

242 # If the series has no values, fill it with 0s 

243 if not values: 

244 item["values"] = [0] * bucket_count 

245 # Extend the values with 0 if they are shorter than the bucket count 

246 elif len(values) < bucket_count: 

247 item["values"].extend([0] * (bucket_count - len(values))) 

248 

249 

250def downsample_series(buckets, series, target_size): 

251 """Downsample each series in the data, treating None as 0.""" 

252 downsampled_buckets = [] 

253 downsampled_series = [] 

254 

255 # Handle case where series is empty 

256 if not series: 

257 return buckets[:target_size], [] 

258 

259 bucket_count = len(buckets) 

260 # Normalize series first to make sure all series have the same length 

261 normalize_series(series, bucket_count) 

262 

263 # Downsample each series independently 

264 for item in series: 

265 name = item["name"] 

266 values = item["values"] 

267 

268 selected_indices = lttb_select_indices(values, target_size) 

269 

270 # Collect the buckets and values based on the selected indices 

271 downsampled_buckets = [buckets[i] for i in selected_indices] 

272 downsampled_values = [ 

273 values[i] if values[i] is not None else 0 for i in selected_indices 

274 ] 

275 

276 downsampled_series.append({"name": name, "values": downsampled_values}) 

277 

278 return downsampled_buckets, downsampled_series