Class: OpenTelemetry::SDK::Metrics::Aggregation::ExponentialBucketHistogram
- Inherits:
-
Object
- Object
- OpenTelemetry::SDK::Metrics::Aggregation::ExponentialBucketHistogram
- Defined in:
- lib/opentelemetry/sdk/metrics/aggregation/exponential_bucket_histogram.rb
Overview
Contains the implementation of the ExponentialBucketHistogram aggregation
Constant Summary collapse
- DEFAULT_SIZE =
relate to min max scale: opentelemetry.io/docs/specs/otel/metrics/sdk/#support-a-minimum-and-maximum-scale
160- DEFAULT_SCALE =
20- MAX_SCALE =
20- MIN_SCALE =
-10
- MIN_MAX_SIZE =
2- MAX_MAX_SIZE =
16_384
Instance Method Summary collapse
-
#aggregation_temporality ⇒ Object
rubocop:enable Metrics/MethodLength.
-
#collect(start_time, end_time, data_points) ⇒ Object
when aggregation temporality is cumulative, merge and downscale will happen.
-
#initialize(aggregation_temporality: ENV.fetch('OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE', :delta), max_size: DEFAULT_SIZE, max_scale: DEFAULT_SCALE, record_min_max: true, zero_threshold: 0) ⇒ ExponentialBucketHistogram
constructor
The default boundaries are calculated based on default max_size and max_scale values.
-
#update(amount, attributes, data_points) ⇒ Object
this is aggregate in python; there is no merge in aggregate; but rescale happened rubocop:disable Metrics/MethodLength.
Constructor Details
#initialize(aggregation_temporality: ENV.fetch('OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE', :delta), max_size: DEFAULT_SIZE, max_scale: DEFAULT_SCALE, record_min_max: true, zero_threshold: 0) ⇒ ExponentialBucketHistogram
The default boundaries are calculated based on default max_size and max_scale values
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/opentelemetry/sdk/metrics/aggregation/exponential_bucket_histogram.rb', line 29 def initialize( aggregation_temporality: ENV.fetch('OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE', :delta), max_size: DEFAULT_SIZE, max_scale: DEFAULT_SCALE, record_min_max: true, zero_threshold: 0 ) @aggregation_temporality = AggregationTemporality.determine_temporality(aggregation_temporality: aggregation_temporality, default: :delta) @record_min_max = record_min_max @min = Float::INFINITY @max = -Float::INFINITY @sum = 0 @count = 0 @zero_threshold = zero_threshold @zero_count = 0 @size = validate_size(max_size) @scale = validate_scale(max_scale) @mapping = new_mapping(@scale) # Previous state for cumulative aggregation @previous_positive = {} # nil @previous_negative = {} # nil @previous_min = {} # Float::INFINITY @previous_max = {} # -Float::INFINITY @previous_sum = {} # 0 @previous_count = {} # 0 @previous_zero_count = {} # 0 @previous_scale = {} # nil end |
Instance Method Details
#aggregation_temporality ⇒ Object
rubocop:enable Metrics/MethodLength
299 300 301 |
# File 'lib/opentelemetry/sdk/metrics/aggregation/exponential_bucket_histogram.rb', line 299 def aggregation_temporality @aggregation_temporality.temporality end |
#collect(start_time, end_time, data_points) ⇒ Object
when aggregation temporality is cumulative, merge and downscale will happen. rubocop:disable Metrics/MethodLength
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/opentelemetry/sdk/metrics/aggregation/exponential_bucket_histogram.rb', line 62 def collect(start_time, end_time, data_points) if @aggregation_temporality.delta? # Set timestamps and 'move' data point values to result. hdps = data_points.values.map! do |hdp| hdp.start_time_unix_nano = start_time hdp.time_unix_nano = end_time hdp end data_points.clear hdps else # CUMULATIVE temporality - merge current data_points to previous data_points # and only keep the merged data_points in @previous_* merged_data_points = {} # this will slow down the operation especially if large amount of data_points present # but it should be fine since with cumulative, the data_points are merged into previous_* and not kept in data_points # rubocop:disable Metrics/BlockLength data_points.each do |attributes, hdp| # Store current values current_positive = hdp.positive current_negative = hdp.negative current_sum = hdp.sum current_min = hdp.min current_max = hdp.max current_count = hdp.count current_zero_count = hdp.zero_count current_scale = hdp.scale # Setup previous positive, negative bucket and scale based on three different cases @previous_positive[attributes] = current_positive.copy_empty if @previous_positive[attributes].nil? @previous_negative[attributes] = current_negative.copy_empty if @previous_negative[attributes].nil? @previous_scale[attributes] = current_scale if @previous_scale[attributes].nil? # Determine minimum scale for merging min_scale = [@previous_scale[attributes], current_scale].min # Calculate ranges for positive and negative buckets low_positive, high_positive = get_low_high_previous_current( @previous_positive[attributes], current_positive, @previous_scale[attributes], current_scale, min_scale ) low_negative, high_negative = get_low_high_previous_current( @previous_negative[attributes], current_negative, @previous_scale[attributes], current_scale, min_scale ) # Adjust min_scale based on bucket size constraints min_scale = [ min_scale - get_scale_change(low_positive, high_positive), min_scale - get_scale_change(low_negative, high_negative) ].min # Downscale previous buckets if necessary downscale_change = @previous_scale[attributes] - min_scale downscale(downscale_change, @previous_positive[attributes], @previous_negative[attributes]) # Merge current buckets into previous buckets (kind like update); it's always :cumulative merge_buckets(@previous_positive[attributes], current_positive, current_scale, min_scale, @aggregation_temporality) merge_buckets(@previous_negative[attributes], current_negative, current_scale, min_scale, @aggregation_temporality) # initialize min, max, sum, count, zero_count for first time @previous_min[attributes] = Float::INFINITY if @previous_min[attributes].nil? @previous_max[attributes] = -Float::INFINITY if @previous_max[attributes].nil? @previous_sum[attributes] = 0 if @previous_sum[attributes].nil? @previous_count[attributes] = 0 if @previous_count[attributes].nil? @previous_zero_count[attributes] = 0 if @previous_zero_count[attributes].nil? # Update aggregated values @previous_min[attributes] = [@previous_min[attributes], current_min].min @previous_max[attributes] = [@previous_max[attributes], current_max].max @previous_sum[attributes] += current_sum @previous_count[attributes] += current_count @previous_zero_count[attributes] += current_zero_count @previous_scale[attributes] = min_scale # Create merged data point merged_hdp = ExponentialHistogramDataPoint.new( attributes, start_time, end_time, @previous_count[attributes], @previous_sum[attributes], @previous_scale[attributes], @previous_zero_count[attributes], @previous_positive[attributes].dup, @previous_negative[attributes].dup, 0, # flags nil, # exemplars @previous_min[attributes], @previous_max[attributes], @zero_threshold ) merged_data_points[attributes] = merged_hdp end # rubocop:enable Metrics/BlockLength # when you have no local_data_points, the loop from cumulative aggregation will not run # so return last merged data points if exists if data_points.empty? && !@previous_positive.empty? @previous_positive.each_key do |attributes| merged_hdp = ExponentialHistogramDataPoint.new( attributes, start_time, end_time, @previous_count[attributes], @previous_sum[attributes], @previous_scale[attributes], @previous_zero_count[attributes], @previous_positive[attributes].dup, @previous_negative[attributes].dup, 0, # flags nil, # exemplars @previous_min[attributes], @previous_max[attributes], @zero_threshold ) merged_data_points[attributes] = merged_hdp end end # clear data_points since the data is merged into previous_* already; # otherwise we will have duplicated data_points in the next collect data_points.clear merged_data_points.values # return array end end |
#update(amount, attributes, data_points) ⇒ Object
this is aggregate in python; there is no merge in aggregate; but rescale happened rubocop:disable Metrics/MethodLength
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 |
# File 'lib/opentelemetry/sdk/metrics/aggregation/exponential_bucket_histogram.rb', line 201 def update(amount, attributes, data_points) # fetch or initialize the ExponentialHistogramDataPoint hdp = data_points.fetch(attributes) do if @record_min_max min = Float::INFINITY max = -Float::INFINITY end # this code block will only be executed if no data_points was found with the attributes data_points[attributes] = ExponentialHistogramDataPoint.new( attributes, nil, # :start_time_unix_nano 0, # :time_unix_nano 0, # :count 0, # :sum @scale, # :scale @zero_count, # :zero_count ExponentialHistogram::Buckets.new, # :positive ExponentialHistogram::Buckets.new, # :negative 0, # :flags nil, # :exemplars min, # :min max, # :max @zero_threshold # :zero_threshold) ) end # Start to populate the data point (esp. the buckets) if @record_min_max hdp.max = amount if amount > hdp.max hdp.min = amount if amount < hdp.min end hdp.sum += amount hdp.count += 1 if amount.abs <= @zero_threshold hdp.zero_count += 1 hdp.scale = 0 if hdp.count == hdp.zero_count # if always getting zero, then there is no point to keep doing the update return end # rescale, map to index, update the buckets here buckets = amount.positive? ? hdp.positive : hdp.negative amount = -amount if amount.negative? bucket_index = @mapping.map_to_index(amount) rescaling_needed = false low = high = 0 if buckets.counts == [0] # special case of empty buckets.index_start = bucket_index buckets.index_end = bucket_index buckets.index_base = bucket_index elsif bucket_index < buckets.index_start && (buckets.index_end - bucket_index) >= @size rescaling_needed = true low = bucket_index high = buckets.index_end elsif bucket_index > buckets.index_end && (bucket_index - buckets.index_start) >= @size rescaling_needed = true low = buckets.index_start high = bucket_index end if rescaling_needed scale_change = get_scale_change(low, high) downscale(scale_change, hdp.positive, hdp.negative) new_scale = @mapping.scale - scale_change @mapping = new_mapping(new_scale) bucket_index = @mapping.map_to_index(amount) OpenTelemetry.logger.debug "Rescaled with new scale #{new_scale} from #{low} and #{high}; bucket_index is updated to #{bucket_index}" end hdp.scale = @mapping.scale # adjust buckets based on the bucket_index if bucket_index < buckets.index_start span = buckets.index_end - bucket_index grow_buckets(span, buckets) buckets.index_start = bucket_index elsif bucket_index > buckets.index_end span = bucket_index - buckets.index_start grow_buckets(span, buckets) buckets.index_end = bucket_index end bucket_index -= buckets.index_base bucket_index += buckets.counts.size if bucket_index.negative? buckets.increment_bucket(bucket_index) nil end |