|
| 1 | +// Copyright The OpenTelemetry Authors |
| 2 | +// SPDX-License-Identifier: Apache-2.0 |
| 3 | + |
| 4 | +#include "opentelemetry/sdk/metrics/aggregation/base2_exponential_histogram_aggregation.h" |
| 5 | +#include "opentelemetry/sdk/metrics/aggregation/aggregation.h" |
| 6 | +#include "opentelemetry/sdk/metrics/data/circular_buffer.h" |
| 7 | +#include "opentelemetry/sdk/metrics/data/point_data.h" |
| 8 | +#include "opentelemetry/version.h" |
| 9 | + |
| 10 | +#include <cmath> |
| 11 | +#include <cstddef> |
| 12 | +#include <exception> |
| 13 | +#include <limits> |
| 14 | +#include <memory> |
| 15 | +#include <mutex> |
| 16 | + |
| 17 | +#include <iostream> |
| 18 | + |
| 19 | +OPENTELEMETRY_BEGIN_NAMESPACE |
| 20 | +namespace sdk |
| 21 | +{ |
| 22 | +namespace metrics |
| 23 | +{ |
| 24 | + |
| 25 | +namespace |
| 26 | +{ |
| 27 | + |
| 28 | +uint32_t GetScaleReduction(int32_t start_index, int32_t end_index, size_t max_buckets) noexcept |
| 29 | +{ |
| 30 | + uint32_t scale_reduction = 0; |
| 31 | + while (static_cast<size_t>(end_index - start_index + 1) > max_buckets) |
| 32 | + { |
| 33 | + start_index >>= 1; |
| 34 | + end_index >>= 1; |
| 35 | + scale_reduction++; |
| 36 | + } |
| 37 | + return scale_reduction; |
| 38 | +} |
| 39 | + |
| 40 | +uint32_t GetScaleReduction(const AdaptingCircularBufferCounter &first, |
| 41 | + const AdaptingCircularBufferCounter &second, |
| 42 | + size_t max_buckets) |
| 43 | +{ |
| 44 | + if (first.Empty() || second.Empty()) |
| 45 | + { |
| 46 | + return 0; |
| 47 | + } |
| 48 | + |
| 49 | + const int32_t start_index = std::min(first.StartIndex(), second.StartIndex()); |
| 50 | + const int32_t end_index = std::max(first.EndIndex(), second.EndIndex()); |
| 51 | + return GetScaleReduction(start_index, end_index, max_buckets); |
| 52 | +} |
| 53 | + |
| 54 | +void DownscaleBuckets(AdaptingCircularBufferCounter *buckets, uint32_t by) noexcept |
| 55 | +{ |
| 56 | + if (buckets->Empty()) |
| 57 | + { |
| 58 | + return; |
| 59 | + } |
| 60 | + |
| 61 | + // We want to preserve other optimisations here as well, e.g. integer size. |
| 62 | + // Instead of creating a new counter, we copy the existing one (for bucket size |
| 63 | + // optimisations), and clear the values before writing the new ones. |
| 64 | + // TODO(euroelessar): Do downscaling in-place. |
| 65 | + AdaptingCircularBufferCounter new_buckets = *buckets; |
| 66 | + new_buckets.Clear(); |
| 67 | + |
| 68 | + for (int i = buckets->StartIndex(); i <= buckets->EndIndex(); i++) |
| 69 | + { |
| 70 | + const uint64_t count = buckets->Get(i); |
| 71 | + if (count > 0) |
| 72 | + { |
| 73 | + new_buckets.Increment(i >> by, count); |
| 74 | + } |
| 75 | + } |
| 76 | + *buckets = std::move(new_buckets); |
| 77 | +} |
| 78 | + |
| 79 | +} // namespace |
| 80 | + |
| 81 | +Base2ExponentialHistogramAggregation::Base2ExponentialHistogramAggregation( |
| 82 | + const AggregationConfig *aggregation_config) |
| 83 | +{ |
| 84 | + const Base2ExponentialHistogramAggregationConfig default_config; |
| 85 | + auto ac = static_cast<const Base2ExponentialHistogramAggregationConfig *>(aggregation_config); |
| 86 | + if (!ac) |
| 87 | + { |
| 88 | + ac = &default_config; |
| 89 | + } |
| 90 | + |
| 91 | + point_data_.max_buckets_ = ac->max_buckets_; |
| 92 | + point_data_.scale_ = ac->max_scale_; |
| 93 | + point_data_.record_min_max_ = ac->record_min_max_; |
| 94 | + point_data_.min_ = std::numeric_limits<double>::max(); |
| 95 | + point_data_.max_ = std::numeric_limits<double>::min(); |
| 96 | + |
| 97 | + indexer_ = Base2ExponentialHistogramIndexer(point_data_.scale_); |
| 98 | +} |
| 99 | + |
| 100 | +Base2ExponentialHistogramAggregation::Base2ExponentialHistogramAggregation( |
| 101 | + const Base2ExponentialHistogramPointData &point_data) |
| 102 | + : point_data_{point_data}, indexer_(point_data.scale_), record_min_max_{point_data.record_min_max_} |
| 103 | +{} |
| 104 | + |
| 105 | +Base2ExponentialHistogramAggregation::Base2ExponentialHistogramAggregation(Base2ExponentialHistogramPointData &&point_data) |
| 106 | + : point_data_{std::move(point_data)}, indexer_(point_data_.scale_), record_min_max_{point_data_.record_min_max_} |
| 107 | +{} |
| 108 | + |
| 109 | +void Base2ExponentialHistogramAggregation::Aggregate( |
| 110 | + int64_t value, |
| 111 | + const PointAttributes & /* attributes */) noexcept |
| 112 | +{ |
| 113 | + Aggregate(double(value)); |
| 114 | +} |
| 115 | + |
| 116 | +void Base2ExponentialHistogramAggregation::Aggregate( |
| 117 | + double value, |
| 118 | + const PointAttributes & /* attributes */) noexcept |
| 119 | +{ |
| 120 | + const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_); |
| 121 | + point_data_.sum_ += value; |
| 122 | + point_data_.count_++; |
| 123 | + |
| 124 | + if (record_min_max_) |
| 125 | + { |
| 126 | + point_data_.min_ = std::min(point_data_.min_, value); |
| 127 | + point_data_.max_ = std::max(point_data_.max_, value); |
| 128 | + } |
| 129 | + |
| 130 | + if (value == 0) |
| 131 | + { |
| 132 | + point_data_.zero_count_++; |
| 133 | + return; |
| 134 | + } |
| 135 | + else if (value > 0) |
| 136 | + { |
| 137 | + AggregateIntoBuckets(&point_data_.positive_buckets_, value); |
| 138 | + } |
| 139 | + else |
| 140 | + { |
| 141 | + AggregateIntoBuckets(&point_data_.negative_buckets_, -value); |
| 142 | + } |
| 143 | +} |
| 144 | + |
| 145 | +void Base2ExponentialHistogramAggregation::AggregateIntoBuckets( |
| 146 | + AdaptingCircularBufferCounter *buckets, |
| 147 | + double value) noexcept |
| 148 | +{ |
| 149 | + if (buckets->MaxSize() == 0) |
| 150 | + { |
| 151 | + *buckets = AdaptingCircularBufferCounter{point_data_.max_buckets_}; |
| 152 | + } |
| 153 | + |
| 154 | + const int32_t index = indexer_.ComputeIndex(value); |
| 155 | + if (!buckets->Increment(index, 1)) |
| 156 | + { |
| 157 | + const int32_t start_index = std::min(buckets->StartIndex(), index); |
| 158 | + const int32_t end_index = std::max(buckets->EndIndex(), index); |
| 159 | + const uint32_t scale_reduction = |
| 160 | + GetScaleReduction(start_index, end_index, point_data_.max_buckets_); |
| 161 | + Downscale(scale_reduction); |
| 162 | + |
| 163 | + buckets->Increment(index >> scale_reduction, 1); |
| 164 | + } |
| 165 | +} |
| 166 | + |
| 167 | +void Base2ExponentialHistogramAggregation::Downscale(uint32_t by) noexcept |
| 168 | +{ |
| 169 | + if (by == 0) |
| 170 | + { |
| 171 | + return; |
| 172 | + } |
| 173 | + |
| 174 | + DownscaleBuckets(&point_data_.positive_buckets_, by); |
| 175 | + DownscaleBuckets(&point_data_.negative_buckets_, by); |
| 176 | + |
| 177 | + point_data_.scale_ -= by; |
| 178 | + indexer_ = Base2ExponentialHistogramIndexer(point_data_.scale_); |
| 179 | +} |
| 180 | + |
| 181 | +std::unique_ptr<Aggregation> Base2ExponentialHistogramAggregation::Merge( |
| 182 | + const Aggregation &delta) const noexcept |
| 183 | +{ |
| 184 | + auto left = nostd::get<Base2ExponentialHistogramPointData>(ToPoint()); |
| 185 | + auto right = nostd::get<Base2ExponentialHistogramPointData>( |
| 186 | + (static_cast<const Base2ExponentialHistogramAggregation &>(delta).ToPoint())); |
| 187 | + |
| 188 | + auto low_res = left.scale_ < right.scale_ ? left : right; |
| 189 | + auto high_res = left.scale_ < right.scale_ ? right : left; |
| 190 | + auto scale_reduction = high_res.scale_ - low_res.scale_; |
| 191 | + |
| 192 | + if (scale_reduction > 0) |
| 193 | + { |
| 194 | + DownscaleBuckets(&high_res.positive_buckets_, scale_reduction); |
| 195 | + DownscaleBuckets(&high_res.negative_buckets_, scale_reduction); |
| 196 | + high_res.scale_ -= scale_reduction; |
| 197 | + } |
| 198 | + |
| 199 | + Base2ExponentialHistogramPointData result_value; |
| 200 | + result_value.count_ = low_res.count_ + high_res.count_; |
| 201 | + result_value.sum_ = low_res.sum_ + high_res.sum_; |
| 202 | + result_value.zero_count_ = low_res.zero_count_ + high_res.zero_count_; |
| 203 | + result_value.scale_ = std::min(low_res.scale_, high_res.scale_); |
| 204 | + result_value.max_buckets_ = low_res.max_buckets_; |
| 205 | + result_value.record_min_max_ = low_res.record_min_max_ && high_res.record_min_max_; |
| 206 | + if (result_value.record_min_max_) |
| 207 | + { |
| 208 | + result_value.min_ = std::min(low_res.min_, high_res.min_); |
| 209 | + result_value.max_ = std::max(low_res.max_, high_res.max_); |
| 210 | + } |
| 211 | + if (!high_res.positive_buckets_.Empty()) |
| 212 | + { |
| 213 | + for (int i = high_res.positive_buckets_.StartIndex(); |
| 214 | + i <= high_res.positive_buckets_.EndIndex(); i++) |
| 215 | + { |
| 216 | + low_res.positive_buckets_.Increment(i, high_res.positive_buckets_.Get(i)); |
| 217 | + } |
| 218 | + } |
| 219 | + result_value.positive_buckets_ = std::move(low_res.positive_buckets_); |
| 220 | + |
| 221 | + if (!high_res.negative_buckets_.Empty()) |
| 222 | + { |
| 223 | + for (int i = high_res.negative_buckets_.StartIndex(); |
| 224 | + i <= high_res.negative_buckets_.EndIndex(); i++) |
| 225 | + { |
| 226 | + low_res.negative_buckets_.Increment(i, high_res.negative_buckets_.Get(i)); |
| 227 | + } |
| 228 | + } |
| 229 | + result_value.negative_buckets_ = std::move(low_res.negative_buckets_); |
| 230 | + |
| 231 | + return std::unique_ptr<Base2ExponentialHistogramAggregation>{ |
| 232 | + new Base2ExponentialHistogramAggregation(std::move(result_value))}; |
| 233 | +} |
| 234 | + |
| 235 | +std::unique_ptr<Aggregation> Base2ExponentialHistogramAggregation::Diff( |
| 236 | + const Aggregation &next) const noexcept |
| 237 | +{ |
| 238 | + auto left = nostd::get<Base2ExponentialHistogramPointData>(ToPoint()); |
| 239 | + auto right = nostd::get<Base2ExponentialHistogramPointData>( |
| 240 | + (static_cast<const Base2ExponentialHistogramAggregation &>(next).ToPoint())); |
| 241 | + |
| 242 | + auto low_res = left.scale_ < right.scale_ ? left : right; |
| 243 | + auto high_res = left.scale_ < right.scale_ ? right : left; |
| 244 | + auto scale_reduction = high_res.scale_ - low_res.scale_; |
| 245 | + |
| 246 | + if (scale_reduction > 0) |
| 247 | + { |
| 248 | + DownscaleBuckets(&high_res.positive_buckets_, scale_reduction); |
| 249 | + DownscaleBuckets(&high_res.negative_buckets_, scale_reduction); |
| 250 | + high_res.scale_ -= scale_reduction; |
| 251 | + } |
| 252 | + |
| 253 | + Base2ExponentialHistogramPointData result_value; |
| 254 | + result_value.scale_ = low_res.scale_; |
| 255 | + result_value.max_buckets_ = low_res.max_buckets_; |
| 256 | + result_value.record_min_max_ = false; |
| 257 | + // caution for underflow |
| 258 | + result_value.count_ = (left.count_ >= right.count_) ? (left.count_ - right.count_) : 0; |
| 259 | + result_value.sum_ = (left.sum_ >= right.sum_) ? (left.sum_ - right.sum_) : 0.0; |
| 260 | + result_value.zero_count_ = (left.zero_count_ >= right.zero_count_) ? (left.zero_count_ - right.zero_count_) : 0; |
| 261 | + if (!high_res.positive_buckets_.Empty()) |
| 262 | + { |
| 263 | + for (int i = high_res.positive_buckets_.StartIndex(); |
| 264 | + i <= high_res.positive_buckets_.EndIndex(); i++) |
| 265 | + { |
| 266 | + low_res.positive_buckets_.Increment(i, 0-high_res.positive_buckets_.Get(i)); |
| 267 | + } |
| 268 | + } |
| 269 | + result_value.positive_buckets_ = std::move(low_res.positive_buckets_); |
| 270 | + |
| 271 | + if (!high_res.negative_buckets_.Empty()) |
| 272 | + { |
| 273 | + for (int i = high_res.negative_buckets_.StartIndex(); |
| 274 | + i <= high_res.negative_buckets_.EndIndex(); i++) |
| 275 | + { |
| 276 | + low_res.negative_buckets_.Increment(i, 0-high_res.negative_buckets_.Get(i)); |
| 277 | + } |
| 278 | + } |
| 279 | + result_value.negative_buckets_ = std::move(low_res.negative_buckets_); |
| 280 | + |
| 281 | + return std::unique_ptr<Base2ExponentialHistogramAggregation>{ |
| 282 | + new Base2ExponentialHistogramAggregation(std::move(result_value))}; |
| 283 | +} |
| 284 | + |
| 285 | +// std::unique_ptr<Aggregation> Base2ExponentialHistogramAggregation::Diff( |
| 286 | +// const Aggregation &next) const noexcept |
| 287 | +// { |
| 288 | +// auto curr_value = nostd::get<Base2ExponentialHistogramPointData>(ToPoint()); |
| 289 | +// auto next_value = nostd::get<Base2ExponentialHistogramPointData>( |
| 290 | +// (static_cast<const Base2ExponentialHistogramAggregation &>(next).ToPoint())); |
| 291 | + |
| 292 | +// Base2ExponentialHistogramPointData result_value; |
| 293 | +// result_value.scale_ = curr_value.scale_; |
| 294 | +// result_value.max_buckets_ = curr_value.max_buckets_; |
| 295 | +// result_value.record_min_max_ = false; |
| 296 | +// result_value.count_ = next_value.count_ - curr_value.count_; |
| 297 | +// result_value.sum_ = next_value.sum_ - curr_value.sum_; |
| 298 | +// result_value.zero_count_ = next_value.zero_count_ - curr_value.zero_count_; |
| 299 | + |
| 300 | +// return std::unique_ptr<Base2ExponentialHistogramAggregation>{ |
| 301 | +// new Base2ExponentialHistogramAggregation(std::move(result_value))}; |
| 302 | +// } |
| 303 | + |
| 304 | +PointType Base2ExponentialHistogramAggregation::ToPoint() const noexcept |
| 305 | +{ |
| 306 | + const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_); |
| 307 | + return point_data_; |
| 308 | +} |
| 309 | + |
| 310 | +} // namespace metrics |
| 311 | +} // namespace sdk |
| 312 | +OPENTELEMETRY_END_NAMESPACE |
0 commit comments