Class: OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream Private

Inherits:
MetricStream
  • Object
show all
Defined in:
lib/opentelemetry/sdk/metrics/state/asynchronous_metric_stream.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

The AsynchronousMetricStream class provides SDK internal functionality that is not a part of the public API. It extends MetricStream to support asynchronous instruments.

Constant Summary collapse

DEFAULT_TIMEOUT =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

30

Instance Attribute Summary

Attributes inherited from MetricStream

#data_points, #description, #instrument_kind, #instrumentation_scope, #name, #unit

Instance Method Summary collapse

Methods inherited from MetricStream

#aggregate_metric_data, #find_registered_view, #to_s, #update

Constructor Details

#initialize(name, description, unit, instrument_kind, meter_provider, instrumentation_scope, aggregation, callback, timeout, attributes) ⇒ AsynchronousMetricStream

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of AsynchronousMetricStream.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/opentelemetry/sdk/metrics/state/asynchronous_metric_stream.rb', line 18

def initialize(
  name,
  description,
  unit,
  instrument_kind,
  meter_provider,
  instrumentation_scope,
  aggregation,
  callback,
  timeout,
  attributes
)
  # Call parent constructor with common parameters
  super(name, description, unit, instrument_kind, meter_provider, instrumentation_scope, aggregation)

  # Initialize asynchronous-specific attributes
  @callback = callback
  @start_time = OpenTelemetry::Common::Utilities.time_in_nanoseconds
  @timeout = timeout
  @attributes = attributes
end

Instance Method Details

#collect(start_time, end_time) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

When collect, if there are asynchronous SDK Instruments involved, their callback functions will be triggered. Related spec: github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#collect invoke_callback will update the data_points in aggregation



43
44
45
46
47
48
# File 'lib/opentelemetry/sdk/metrics/state/asynchronous_metric_stream.rb', line 43

def collect(start_time, end_time)
  invoke_callback(@timeout, @attributes)

  # Call parent collect method for the core collection logic
  super(start_time, end_time)
end

#invoke_callback(timeout, attributes) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/opentelemetry/sdk/metrics/state/asynchronous_metric_stream.rb', line 50

def invoke_callback(timeout, attributes)
  if @registered_views.empty?
    @mutex.synchronize do
      @callback.each do |cb|
        value = safe_guard_callback(cb, timeout: timeout)
        @default_aggregation.update(value, attributes, @data_points) if value.is_a?(Numeric)
      end
    end
  else
    @registered_views.each do |view|
      @mutex.synchronize do
        @callback.each do |cb|
          value = safe_guard_callback(cb, timeout: timeout)
          next unless value.is_a?(Numeric) # ignore if value is not valid number

          merged_attributes = attributes || {}
          merged_attributes.merge!(view.attribute_keys)
          view.aggregation.update(value, merged_attributes, @data_points) if view.valid_aggregation?
        end
      end
    end
  end
end