Skip to content

Commit 338cce4

Browse files
committed
add base2 expo histo, test, benchmark
1 parent b62565f commit 338cce4

File tree

3 files changed

+474
-5
lines changed

3 files changed

+474
-5
lines changed
Lines changed: 312 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,312 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
#include "opentelemetry/sdk/metrics/aggregation/base2_exponential_histogram_aggregation.h"
5+
#include "opentelemetry/sdk/metrics/aggregation/aggregation.h"
6+
#include "opentelemetry/sdk/metrics/data/circular_buffer.h"
7+
#include "opentelemetry/sdk/metrics/data/point_data.h"
8+
#include "opentelemetry/version.h"
9+
10+
#include <cmath>
11+
#include <cstddef>
12+
#include <exception>
13+
#include <limits>
14+
#include <memory>
15+
#include <mutex>
16+
17+
#include <iostream>
18+
19+
OPENTELEMETRY_BEGIN_NAMESPACE
20+
namespace sdk
21+
{
22+
namespace metrics
23+
{
24+
25+
namespace
26+
{
27+
28+
uint32_t GetScaleReduction(int32_t start_index, int32_t end_index, size_t max_buckets) noexcept
29+
{
30+
uint32_t scale_reduction = 0;
31+
while (static_cast<size_t>(end_index - start_index + 1) > max_buckets)
32+
{
33+
start_index >>= 1;
34+
end_index >>= 1;
35+
scale_reduction++;
36+
}
37+
return scale_reduction;
38+
}
39+
40+
uint32_t GetScaleReduction(const AdaptingCircularBufferCounter &first,
41+
const AdaptingCircularBufferCounter &second,
42+
size_t max_buckets)
43+
{
44+
if (first.Empty() || second.Empty())
45+
{
46+
return 0;
47+
}
48+
49+
const int32_t start_index = std::min(first.StartIndex(), second.StartIndex());
50+
const int32_t end_index = std::max(first.EndIndex(), second.EndIndex());
51+
return GetScaleReduction(start_index, end_index, max_buckets);
52+
}
53+
54+
void DownscaleBuckets(AdaptingCircularBufferCounter *buckets, uint32_t by) noexcept
55+
{
56+
if (buckets->Empty())
57+
{
58+
return;
59+
}
60+
61+
// We want to preserve other optimisations here as well, e.g. integer size.
62+
// Instead of creating a new counter, we copy the existing one (for bucket size
63+
// optimisations), and clear the values before writing the new ones.
64+
// TODO(euroelessar): Do downscaling in-place.
65+
AdaptingCircularBufferCounter new_buckets = *buckets;
66+
new_buckets.Clear();
67+
68+
for (int i = buckets->StartIndex(); i <= buckets->EndIndex(); i++)
69+
{
70+
const uint64_t count = buckets->Get(i);
71+
if (count > 0)
72+
{
73+
new_buckets.Increment(i >> by, count);
74+
}
75+
}
76+
*buckets = std::move(new_buckets);
77+
}
78+
79+
} // namespace
80+
81+
Base2ExponentialHistogramAggregation::Base2ExponentialHistogramAggregation(
82+
const AggregationConfig *aggregation_config)
83+
{
84+
const Base2ExponentialHistogramAggregationConfig default_config;
85+
auto ac = static_cast<const Base2ExponentialHistogramAggregationConfig *>(aggregation_config);
86+
if (!ac)
87+
{
88+
ac = &default_config;
89+
}
90+
91+
point_data_.max_buckets_ = ac->max_buckets_;
92+
point_data_.scale_ = ac->max_scale_;
93+
point_data_.record_min_max_ = ac->record_min_max_;
94+
point_data_.min_ = std::numeric_limits<double>::max();
95+
point_data_.max_ = std::numeric_limits<double>::min();
96+
97+
indexer_ = Base2ExponentialHistogramIndexer(point_data_.scale_);
98+
}
99+
100+
Base2ExponentialHistogramAggregation::Base2ExponentialHistogramAggregation(
101+
const Base2ExponentialHistogramPointData &point_data)
102+
: point_data_{point_data}, indexer_(point_data.scale_), record_min_max_{point_data.record_min_max_}
103+
{}
104+
105+
Base2ExponentialHistogramAggregation::Base2ExponentialHistogramAggregation(Base2ExponentialHistogramPointData &&point_data)
106+
: point_data_{std::move(point_data)}, indexer_(point_data_.scale_), record_min_max_{point_data_.record_min_max_}
107+
{}
108+
109+
void Base2ExponentialHistogramAggregation::Aggregate(
110+
int64_t value,
111+
const PointAttributes & /* attributes */) noexcept
112+
{
113+
Aggregate(double(value));
114+
}
115+
116+
void Base2ExponentialHistogramAggregation::Aggregate(
117+
double value,
118+
const PointAttributes & /* attributes */) noexcept
119+
{
120+
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
121+
point_data_.sum_ += value;
122+
point_data_.count_++;
123+
124+
if (record_min_max_)
125+
{
126+
point_data_.min_ = std::min(point_data_.min_, value);
127+
point_data_.max_ = std::max(point_data_.max_, value);
128+
}
129+
130+
if (value == 0)
131+
{
132+
point_data_.zero_count_++;
133+
return;
134+
}
135+
else if (value > 0)
136+
{
137+
AggregateIntoBuckets(&point_data_.positive_buckets_, value);
138+
}
139+
else
140+
{
141+
AggregateIntoBuckets(&point_data_.negative_buckets_, -value);
142+
}
143+
}
144+
145+
void Base2ExponentialHistogramAggregation::AggregateIntoBuckets(
146+
AdaptingCircularBufferCounter *buckets,
147+
double value) noexcept
148+
{
149+
if (buckets->MaxSize() == 0)
150+
{
151+
*buckets = AdaptingCircularBufferCounter{point_data_.max_buckets_};
152+
}
153+
154+
const int32_t index = indexer_.ComputeIndex(value);
155+
if (!buckets->Increment(index, 1))
156+
{
157+
const int32_t start_index = std::min(buckets->StartIndex(), index);
158+
const int32_t end_index = std::max(buckets->EndIndex(), index);
159+
const uint32_t scale_reduction =
160+
GetScaleReduction(start_index, end_index, point_data_.max_buckets_);
161+
Downscale(scale_reduction);
162+
163+
buckets->Increment(index >> scale_reduction, 1);
164+
}
165+
}
166+
167+
void Base2ExponentialHistogramAggregation::Downscale(uint32_t by) noexcept
168+
{
169+
if (by == 0)
170+
{
171+
return;
172+
}
173+
174+
DownscaleBuckets(&point_data_.positive_buckets_, by);
175+
DownscaleBuckets(&point_data_.negative_buckets_, by);
176+
177+
point_data_.scale_ -= by;
178+
indexer_ = Base2ExponentialHistogramIndexer(point_data_.scale_);
179+
}
180+
181+
std::unique_ptr<Aggregation> Base2ExponentialHistogramAggregation::Merge(
182+
const Aggregation &delta) const noexcept
183+
{
184+
auto left = nostd::get<Base2ExponentialHistogramPointData>(ToPoint());
185+
auto right = nostd::get<Base2ExponentialHistogramPointData>(
186+
(static_cast<const Base2ExponentialHistogramAggregation &>(delta).ToPoint()));
187+
188+
auto low_res = left.scale_ < right.scale_ ? left : right;
189+
auto high_res = left.scale_ < right.scale_ ? right : left;
190+
auto scale_reduction = high_res.scale_ - low_res.scale_;
191+
192+
if (scale_reduction > 0)
193+
{
194+
DownscaleBuckets(&high_res.positive_buckets_, scale_reduction);
195+
DownscaleBuckets(&high_res.negative_buckets_, scale_reduction);
196+
high_res.scale_ -= scale_reduction;
197+
}
198+
199+
Base2ExponentialHistogramPointData result_value;
200+
result_value.count_ = low_res.count_ + high_res.count_;
201+
result_value.sum_ = low_res.sum_ + high_res.sum_;
202+
result_value.zero_count_ = low_res.zero_count_ + high_res.zero_count_;
203+
result_value.scale_ = std::min(low_res.scale_, high_res.scale_);
204+
result_value.max_buckets_ = low_res.max_buckets_;
205+
result_value.record_min_max_ = low_res.record_min_max_ && high_res.record_min_max_;
206+
if (result_value.record_min_max_)
207+
{
208+
result_value.min_ = std::min(low_res.min_, high_res.min_);
209+
result_value.max_ = std::max(low_res.max_, high_res.max_);
210+
}
211+
if (!high_res.positive_buckets_.Empty())
212+
{
213+
for (int i = high_res.positive_buckets_.StartIndex();
214+
i <= high_res.positive_buckets_.EndIndex(); i++)
215+
{
216+
low_res.positive_buckets_.Increment(i, high_res.positive_buckets_.Get(i));
217+
}
218+
}
219+
result_value.positive_buckets_ = std::move(low_res.positive_buckets_);
220+
221+
if (!high_res.negative_buckets_.Empty())
222+
{
223+
for (int i = high_res.negative_buckets_.StartIndex();
224+
i <= high_res.negative_buckets_.EndIndex(); i++)
225+
{
226+
low_res.negative_buckets_.Increment(i, high_res.negative_buckets_.Get(i));
227+
}
228+
}
229+
result_value.negative_buckets_ = std::move(low_res.negative_buckets_);
230+
231+
return std::unique_ptr<Base2ExponentialHistogramAggregation>{
232+
new Base2ExponentialHistogramAggregation(std::move(result_value))};
233+
}
234+
235+
std::unique_ptr<Aggregation> Base2ExponentialHistogramAggregation::Diff(
236+
const Aggregation &next) const noexcept
237+
{
238+
auto left = nostd::get<Base2ExponentialHistogramPointData>(ToPoint());
239+
auto right = nostd::get<Base2ExponentialHistogramPointData>(
240+
(static_cast<const Base2ExponentialHistogramAggregation &>(next).ToPoint()));
241+
242+
auto low_res = left.scale_ < right.scale_ ? left : right;
243+
auto high_res = left.scale_ < right.scale_ ? right : left;
244+
auto scale_reduction = high_res.scale_ - low_res.scale_;
245+
246+
if (scale_reduction > 0)
247+
{
248+
DownscaleBuckets(&high_res.positive_buckets_, scale_reduction);
249+
DownscaleBuckets(&high_res.negative_buckets_, scale_reduction);
250+
high_res.scale_ -= scale_reduction;
251+
}
252+
253+
Base2ExponentialHistogramPointData result_value;
254+
result_value.scale_ = low_res.scale_;
255+
result_value.max_buckets_ = low_res.max_buckets_;
256+
result_value.record_min_max_ = false;
257+
// caution for underflow
258+
result_value.count_ = (left.count_ >= right.count_) ? (left.count_ - right.count_) : 0;
259+
result_value.sum_ = (left.sum_ >= right.sum_) ? (left.sum_ - right.sum_) : 0.0;
260+
result_value.zero_count_ = (left.zero_count_ >= right.zero_count_) ? (left.zero_count_ - right.zero_count_) : 0;
261+
if (!high_res.positive_buckets_.Empty())
262+
{
263+
for (int i = high_res.positive_buckets_.StartIndex();
264+
i <= high_res.positive_buckets_.EndIndex(); i++)
265+
{
266+
low_res.positive_buckets_.Increment(i, 0-high_res.positive_buckets_.Get(i));
267+
}
268+
}
269+
result_value.positive_buckets_ = std::move(low_res.positive_buckets_);
270+
271+
if (!high_res.negative_buckets_.Empty())
272+
{
273+
for (int i = high_res.negative_buckets_.StartIndex();
274+
i <= high_res.negative_buckets_.EndIndex(); i++)
275+
{
276+
low_res.negative_buckets_.Increment(i, 0-high_res.negative_buckets_.Get(i));
277+
}
278+
}
279+
result_value.negative_buckets_ = std::move(low_res.negative_buckets_);
280+
281+
return std::unique_ptr<Base2ExponentialHistogramAggregation>{
282+
new Base2ExponentialHistogramAggregation(std::move(result_value))};
283+
}
284+
285+
// std::unique_ptr<Aggregation> Base2ExponentialHistogramAggregation::Diff(
286+
// const Aggregation &next) const noexcept
287+
// {
288+
// auto curr_value = nostd::get<Base2ExponentialHistogramPointData>(ToPoint());
289+
// auto next_value = nostd::get<Base2ExponentialHistogramPointData>(
290+
// (static_cast<const Base2ExponentialHistogramAggregation &>(next).ToPoint()));
291+
292+
// Base2ExponentialHistogramPointData result_value;
293+
// result_value.scale_ = curr_value.scale_;
294+
// result_value.max_buckets_ = curr_value.max_buckets_;
295+
// result_value.record_min_max_ = false;
296+
// result_value.count_ = next_value.count_ - curr_value.count_;
297+
// result_value.sum_ = next_value.sum_ - curr_value.sum_;
298+
// result_value.zero_count_ = next_value.zero_count_ - curr_value.zero_count_;
299+
300+
// return std::unique_ptr<Base2ExponentialHistogramAggregation>{
301+
// new Base2ExponentialHistogramAggregation(std::move(result_value))};
302+
// }
303+
304+
PointType Base2ExponentialHistogramAggregation::ToPoint() const noexcept
305+
{
306+
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
307+
return point_data_;
308+
}
309+
310+
} // namespace metrics
311+
} // namespace sdk
312+
OPENTELEMETRY_END_NAMESPACE

0 commit comments

Comments
 (0)