Class: OpenTelemetry::SDK::Trace::Export::BatchSpanProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/opentelemetry/sdk/trace/export/batch_span_processor.rb

Overview

Implementation of the duck type SpanProcessor that batches spans exported by the SDK then pushes them to the exporter pipeline.

Typically, the BatchSpanProcessor will be more suitable for production environments than the SimpleSpanProcessor.

All spans reported by the SDK implementation are first added to a synchronized queue (with a #max_queue_size maximum size, after the size is reached spans are dropped) and exported every schedule_delay to the exporter pipeline in batches of max_export_batch_size.

If the queue gets half full a preemptive notification is sent to the worker thread that exports the spans to wake up and start a new export cycle.

Instance Method Summary collapse

Constructor Details

#initialize(exporter, exporter_timeout: Float(ENV.fetch('OTEL_BSP_EXPORT_TIMEOUT', 30_000)), schedule_delay: Float(ENV.fetch('OTEL_BSP_SCHEDULE_DELAY', 5_000)), max_queue_size: Integer(ENV.fetch('OTEL_BSP_MAX_QUEUE_SIZE', 2048)), max_export_batch_size: Integer(ENV.fetch('OTEL_BSP_MAX_EXPORT_BATCH_SIZE', 512)), start_thread_on_boot: String(ENV['OTEL_RUBY_BSP_START_THREAD_ON_BOOT']) !~ /false/i, metrics_reporter: nil) ⇒ Object

Parameters:

  • exporter (SpanExporter)

    the (duck type) SpanExporter to where the recorded Spans are pushed after batching.

  • exporter_timeout (Numeric) (defaults to: Float(ENV.fetch('OTEL_BSP_EXPORT_TIMEOUT', 30_000)))

    the maximum allowed time to export data. Defaults to the value of the OTEL_BSP_EXPORT_TIMEOUT environment variable, if set, or 30,000 (30 seconds).

  • schedule_delay (Numeric) (defaults to: Float(ENV.fetch('OTEL_BSP_SCHEDULE_DELAY', 5_000)))

    the delay interval between two consecutive exports. Defaults to the value of the OTEL_BSP_SCHEDULE_DELAY environment variable, if set, or 5,000 (5 seconds).

  • max_queue_size (Integer) (defaults to: Integer(ENV.fetch('OTEL_BSP_MAX_QUEUE_SIZE', 2048)))

    the maximum queue size in spans. Defaults to the value of the OTEL_BSP_MAX_QUEUE_SIZE environment variable, if set, or 2048.

  • max_export_batch_size (Integer) (defaults to: Integer(ENV.fetch('OTEL_BSP_MAX_EXPORT_BATCH_SIZE', 512)))

    the maximum batch size in spans. Defaults to the value of the OTEL_BSP_MAX_EXPORT_BATCH_SIZE environment variable, if set, or 512.

Raises:

  • (ArgumentError)


47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 47

def initialize(exporter,
               exporter_timeout: Float(ENV.fetch('OTEL_BSP_EXPORT_TIMEOUT', 30_000)),
               schedule_delay: Float(ENV.fetch('OTEL_BSP_SCHEDULE_DELAY', 5_000)),
               max_queue_size: Integer(ENV.fetch('OTEL_BSP_MAX_QUEUE_SIZE', 2048)),
               max_export_batch_size: Integer(ENV.fetch('OTEL_BSP_MAX_EXPORT_BATCH_SIZE', 512)),
               start_thread_on_boot: String(ENV['OTEL_RUBY_BSP_START_THREAD_ON_BOOT']) !~ /false/i,
               metrics_reporter: nil)
  raise ArgumentError if max_export_batch_size > max_queue_size
  raise ArgumentError, "exporter #{exporter.inspect} does not appear to be a valid exporter" unless Common::Utilities.valid_exporter?(exporter)

  @exporter = exporter
  @exporter_timeout_seconds = exporter_timeout / 1000.0
  @mutex = Mutex.new
  @export_mutex = Mutex.new
  @condition = ConditionVariable.new
  @keep_running = true
  @delay_seconds = schedule_delay / 1000.0
  @max_queue_size = max_queue_size
  @batch_size = max_export_batch_size
  @metrics_reporter = metrics_reporter || OpenTelemetry::SDK::Trace::Export::MetricsReporter
  @spans = []
  @pid = nil
  @thread = nil
  reset_on_fork(restart_thread: start_thread_on_boot)
end

Instance Method Details

#force_flush(timeout: nil) ⇒ Integer

Export all ended spans to the configured Exporter that have not yet been exported.

This method should only be called in cases where it is absolutely necessary, such as when using some FaaS providers that may suspend the process after an invocation, but before the Processor exports the completed spans.

Parameters:

  • timeout (optional Numeric) (defaults to: nil)

    An optional timeout in seconds.

Returns:

  • (Integer)

    SUCCESS if no error occurred, FAILURE if a non-specific failure occurred, TIMEOUT if a timeout occurred.



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 103

def force_flush(timeout: nil) # rubocop:disable Metrics/MethodLength
  start_time = OpenTelemetry::Common::Utilities.timeout_timestamp
  snapshot = lock do
    reset_on_fork(restart_thread: @keep_running)
    spans.shift(spans.size)
  end
  until snapshot.empty?
    remaining_timeout = OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time)
    return TIMEOUT if remaining_timeout&.zero?

    batch = snapshot.shift(@batch_size)
    result_code = export_batch(batch, timeout: remaining_timeout)
    return result_code unless result_code == SUCCESS
  end

  @exporter.force_flush(timeout: OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time))
ensure
  # Unshift the remaining spans if we timed out. We drop excess spans from
  # the snapshot because they're older than any spans in the spans buffer.
  lock do
    n = spans.size + snapshot.size - max_queue_size
    if n.positive?
      dropped_spans = snapshot.shift(n)
      report_dropped_spans(dropped_spans, reason: 'buffer-full', function: __method__.to_s)
    end
    spans.unshift(*snapshot) unless snapshot.empty?
    @condition.signal if spans.size > max_queue_size / 2
  end
end

#on_finish(span) ⇒ Object

Adds a span to the batch. Thread-safe; may block on lock.



77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 77

def on_finish(span)
  return unless span.context.trace_flags.sampled?

  lock do
    reset_on_fork
    n = spans.size + 1 - max_queue_size
    if n > 0
      dropped_spans = spans.shift(n)
      report_dropped_spans(dropped_spans, reason: 'buffer-full', function: __method__.to_s)
    end
    spans << span
    @condition.signal if spans.size > batch_size
  end
end

#on_start(_span, _parent_context) ⇒ Object

Does nothing for this processor



74
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 74

def on_start(_span, _parent_context); end

#shutdown(timeout: nil) ⇒ Integer

Shuts the consumer thread down and flushes the current accumulated buffer will block until the thread is finished.

Parameters:

  • timeout (optional Numeric) (defaults to: nil)

    An optional timeout in seconds.

Returns:

  • (Integer)

    SUCCESS if no error occurred, FAILURE if a non-specific failure occurred, TIMEOUT if a timeout occurred.



139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 139

def shutdown(timeout: nil)
  start_time = OpenTelemetry::Common::Utilities.timeout_timestamp
  thread = lock do
    @keep_running = false
    @condition.signal
    @thread
  end

  thread&.join(timeout)
  force_flush(timeout: OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time))
  dropped_spans = lock { spans.shift(spans.length) }
  report_dropped_spans(dropped_spans, reason: 'terminating') if dropped_spans.any?
  @exporter.shutdown(timeout: OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time))
end