Class: OpenTelemetry::SDK::Metrics::Aggregation::ExponentialBucketHistogram

Inherits:
Object
  • Object
show all
Defined in:
lib/opentelemetry/sdk/metrics/aggregation/exponential_bucket_histogram.rb

Overview

Contains the implementation of the ExponentialBucketHistogram aggregation

Constant Summary collapse

DEFAULT_SIZE =
160
DEFAULT_SCALE =
20
MAX_SCALE =
20
MIN_SCALE =
-10
MIN_MAX_SIZE =
2
MAX_MAX_SIZE =
16_384

Instance Method Summary collapse

Constructor Details

#initialize(aggregation_temporality: ENV.fetch('OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE', :delta), max_size: DEFAULT_SIZE, max_scale: DEFAULT_SCALE, record_min_max: true, zero_threshold: 0) ⇒ ExponentialBucketHistogram

The default boundaries are calculated based on default max_size and max_scale values



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/opentelemetry/sdk/metrics/aggregation/exponential_bucket_histogram.rb', line 29

def initialize(
  aggregation_temporality: ENV.fetch('OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE', :delta),
  max_size: DEFAULT_SIZE,
  max_scale: DEFAULT_SCALE,
  record_min_max: true,
  zero_threshold: 0
)
  @aggregation_temporality = AggregationTemporality.determine_temporality(aggregation_temporality: aggregation_temporality, default: :delta)
  @record_min_max = record_min_max
  @min            = Float::INFINITY
  @max            = -Float::INFINITY
  @sum            = 0
  @count          = 0
  @zero_threshold = zero_threshold
  @zero_count     = 0
  @size           = validate_size(max_size)
  @scale          = validate_scale(max_scale)

  @mapping = new_mapping(@scale)

  # Previous state for cumulative aggregation
  @previous_positive = {} # nil
  @previous_negative = {} # nil
  @previous_min = {} # Float::INFINITY
  @previous_max = {} # -Float::INFINITY
  @previous_sum = {} # 0
  @previous_count = {} # 0
  @previous_zero_count = {} # 0
  @previous_scale = {} # nil
end

Instance Method Details

#aggregation_temporalityObject

rubocop:enable Metrics/MethodLength



299
300
301
# File 'lib/opentelemetry/sdk/metrics/aggregation/exponential_bucket_histogram.rb', line 299

def aggregation_temporality
  @aggregation_temporality.temporality
end

#collect(start_time, end_time, data_points) ⇒ Object

when aggregation temporality is cumulative, merge and downscale will happen. rubocop:disable Metrics/MethodLength



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/opentelemetry/sdk/metrics/aggregation/exponential_bucket_histogram.rb', line 62

def collect(start_time, end_time, data_points)
  if @aggregation_temporality.delta?
    # Set timestamps and 'move' data point values to result.
    hdps = data_points.values.map! do |hdp|
      hdp.start_time_unix_nano = start_time
      hdp.time_unix_nano = end_time
      hdp
    end
    data_points.clear
    hdps
  else
    # CUMULATIVE temporality - merge current data_points to previous data_points
    # and only keep the merged data_points in @previous_*

    merged_data_points = {}

    # this will slow down the operation especially if large amount of data_points present
    # but it should be fine since with cumulative, the data_points are merged into previous_* and not kept in data_points
    # rubocop:disable Metrics/BlockLength
    data_points.each do |attributes, hdp|
      # Store current values
      current_positive = hdp.positive
      current_negative = hdp.negative
      current_sum = hdp.sum
      current_min = hdp.min
      current_max = hdp.max
      current_count = hdp.count
      current_zero_count = hdp.zero_count
      current_scale = hdp.scale

      # Setup previous positive, negative bucket and scale based on three different cases
      @previous_positive[attributes] = current_positive.copy_empty if @previous_positive[attributes].nil?
      @previous_negative[attributes] = current_negative.copy_empty if @previous_negative[attributes].nil?
      @previous_scale[attributes] = current_scale if @previous_scale[attributes].nil?

      # Determine minimum scale for merging
      min_scale = [@previous_scale[attributes], current_scale].min

      # Calculate ranges for positive and negative buckets
      low_positive, high_positive = get_low_high_previous_current(
        @previous_positive[attributes],
        current_positive,
        @previous_scale[attributes],
        current_scale,
        min_scale
      )
      low_negative, high_negative = get_low_high_previous_current(
        @previous_negative[attributes],
        current_negative,
        @previous_scale[attributes],
        current_scale,
        min_scale
      )

      # Adjust min_scale based on bucket size constraints
      min_scale = [
        min_scale - get_scale_change(low_positive, high_positive),
        min_scale - get_scale_change(low_negative, high_negative)
      ].min

      # Downscale previous buckets if necessary
      downscale_change = @previous_scale[attributes] - min_scale
      downscale(downscale_change, @previous_positive[attributes], @previous_negative[attributes])

      # Merge current buckets into previous buckets (kind like update); it's always :cumulative
      merge_buckets(@previous_positive[attributes], current_positive, current_scale, min_scale, @aggregation_temporality)
      merge_buckets(@previous_negative[attributes], current_negative, current_scale, min_scale, @aggregation_temporality)

      # initialize min, max, sum, count, zero_count for first time
      @previous_min[attributes] = Float::INFINITY if @previous_min[attributes].nil?
      @previous_max[attributes] = -Float::INFINITY if @previous_max[attributes].nil?
      @previous_sum[attributes] = 0 if @previous_sum[attributes].nil?
      @previous_count[attributes] = 0 if @previous_count[attributes].nil?
      @previous_zero_count[attributes] = 0 if @previous_zero_count[attributes].nil?

      # Update aggregated values
      @previous_min[attributes] = [@previous_min[attributes], current_min].min
      @previous_max[attributes] = [@previous_max[attributes], current_max].max
      @previous_sum[attributes] += current_sum
      @previous_count[attributes] += current_count
      @previous_zero_count[attributes] += current_zero_count
      @previous_scale[attributes] = min_scale

      # Create merged data point
      merged_hdp = ExponentialHistogramDataPoint.new(
        attributes,
        start_time,
        end_time,
        @previous_count[attributes],
        @previous_sum[attributes],
        @previous_scale[attributes],
        @previous_zero_count[attributes],
        @previous_positive[attributes].dup,
        @previous_negative[attributes].dup,
        0, # flags
        nil, # exemplars
        @previous_min[attributes],
        @previous_max[attributes],
        @zero_threshold
      )

      merged_data_points[attributes] = merged_hdp
    end
    # rubocop:enable Metrics/BlockLength

    # when you have no local_data_points, the loop from cumulative aggregation will not run
    # so return last merged data points if exists
    if data_points.empty? && !@previous_positive.empty?
      @previous_positive.each_key do |attributes|
        merged_hdp = ExponentialHistogramDataPoint.new(
          attributes,
          start_time,
          end_time,
          @previous_count[attributes],
          @previous_sum[attributes],
          @previous_scale[attributes],
          @previous_zero_count[attributes],
          @previous_positive[attributes].dup,
          @previous_negative[attributes].dup,
          0, # flags
          nil, # exemplars
          @previous_min[attributes],
          @previous_max[attributes],
          @zero_threshold
        )
        merged_data_points[attributes] = merged_hdp
      end
    end

    # clear data_points since the data is merged into previous_* already;
    # otherwise we will have duplicated data_points in the next collect
    data_points.clear
    merged_data_points.values # return array
  end
end

#update(amount, attributes, data_points) ⇒ Object

this is aggregate in python; there is no merge in aggregate; but rescale happened rubocop:disable Metrics/MethodLength



201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
# File 'lib/opentelemetry/sdk/metrics/aggregation/exponential_bucket_histogram.rb', line 201

def update(amount, attributes, data_points)
  # fetch or initialize the ExponentialHistogramDataPoint
  hdp = data_points.fetch(attributes) do
    if @record_min_max
      min = Float::INFINITY
      max = -Float::INFINITY
    end

    # this code block will only be executed if no data_points was found with the attributes
    data_points[attributes] = ExponentialHistogramDataPoint.new(
      attributes,
      nil,                                                               # :start_time_unix_nano
      0,                                                                 # :time_unix_nano
      0,                                                                 # :count
      0,                                                                 # :sum
      @scale,                                                            # :scale
      @zero_count,                                                       # :zero_count
      ExponentialHistogram::Buckets.new,  # :positive
      ExponentialHistogram::Buckets.new,  # :negative
      0,                                                                 # :flags
      nil,                                                               # :exemplars
      min,                                                               # :min
      max,                                                               # :max
      @zero_threshold # :zero_threshold)
    )
  end

  # Start to populate the data point (esp. the buckets)
  if @record_min_max
    hdp.max = amount if amount > hdp.max
    hdp.min = amount if amount < hdp.min
  end

  hdp.sum += amount
  hdp.count += 1

  if amount.abs <= @zero_threshold
    hdp.zero_count += 1
    hdp.scale = 0 if hdp.count == hdp.zero_count # if always getting zero, then there is no point to keep doing the update
    return
  end

  # rescale, map to index, update the buckets here
  buckets = amount.positive? ? hdp.positive : hdp.negative
  amount = -amount if amount.negative?

  bucket_index = @mapping.map_to_index(amount)

  rescaling_needed = false
  low = high = 0

  if buckets.counts == [0] # special case of empty
    buckets.index_start = bucket_index
    buckets.index_end   = bucket_index
    buckets.index_base  = bucket_index

  elsif bucket_index < buckets.index_start && (buckets.index_end - bucket_index) >= @size
    rescaling_needed = true
    low = bucket_index
    high = buckets.index_end

  elsif bucket_index > buckets.index_end && (bucket_index - buckets.index_start) >= @size
    rescaling_needed = true
    low = buckets.index_start
    high = bucket_index
  end

  if rescaling_needed
    scale_change = get_scale_change(low, high)
    downscale(scale_change, hdp.positive, hdp.negative)
    new_scale = @mapping.scale - scale_change
    @mapping = new_mapping(new_scale)
    bucket_index = @mapping.map_to_index(amount)

    OpenTelemetry.logger.debug "Rescaled with new scale #{new_scale} from #{low} and #{high}; bucket_index is updated to #{bucket_index}"
  end

  hdp.scale = @mapping.scale

  # adjust buckets based on the bucket_index
  if bucket_index < buckets.index_start
    span = buckets.index_end - bucket_index
    grow_buckets(span, buckets)
    buckets.index_start = bucket_index
  elsif bucket_index > buckets.index_end
    span = bucket_index - buckets.index_start
    grow_buckets(span, buckets)
    buckets.index_end = bucket_index
  end

  bucket_index -= buckets.index_base
  bucket_index += buckets.counts.size if bucket_index.negative?

  buckets.increment_bucket(bucket_index)
  nil
end