Class: OpenTelemetry::SDK::Trace::Export::BatchSpanProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/opentelemetry/sdk/trace/export/batch_span_processor.rb

Overview

Implementation of the duck type SpanProcessor that batches spans exported by the SDK then pushes them to the exporter pipeline.

All spans reported by the SDK implementation are first added to a synchronized queue (with a #max_queue_size maximum size, after the size is reached spans are dropped) and exported every schedule_delay_millis to the exporter pipeline in batches of max_export_batch_size.

If the queue gets half full a preemptive notification is sent to the worker thread that exports the spans to wake up and start a new export cycle.

Instance Method Summary collapse

Constructor Details

#initialize(exporter:, exporter_timeout_millis: Float(ENV.fetch('OTEL_BSP_EXPORT_TIMEOUT_MILLIS', 30_000)), schedule_delay_millis: Float(ENV.fetch('OTEL_BSP_SCHEDULE_DELAY_MILLIS', 5_000)), max_queue_size: Integer(ENV.fetch('OTEL_BSP_MAX_QUEUE_SIZE', 2048)), max_export_batch_size: Integer(ENV.fetch('OTEL_BSP_MAX_EXPORT_BATCH_SIZE', 512))) ⇒ Object

Parameters:

  • exporter (SpanExporter)
  • exporter_timeout_millis (Numeric) (defaults to: Float(ENV.fetch('OTEL_BSP_EXPORT_TIMEOUT_MILLIS', 30_000)))

    the delay interval between two consecutive exports. Defaults to the value of the OTEL_BSP_EXPORT_TIMEOUT_MILLIS environment variable, if set, or 30,000 (30 seconds).

  • schedule_delay_millis (Numeric) (defaults to: Float(ENV.fetch('OTEL_BSP_SCHEDULE_DELAY_MILLIS', 5_000)))

    the maximum allowed time to export data. Defaults to the value of the OTEL_BSP_SCHEDULE_DELAY_MILLIS environment variable, if set, or 5,000 (5 seconds).

  • max_queue_size (Integer) (defaults to: Integer(ENV.fetch('OTEL_BSP_MAX_QUEUE_SIZE', 2048)))

    the maximum queue size in spans. Defaults to the value of the OTEL_BSP_MAX_QUEUE_SIZE environment variable, if set, or 2048.

  • max_export_batch_size (Integer) (defaults to: Integer(ENV.fetch('OTEL_BSP_MAX_EXPORT_BATCH_SIZE', 512)))

    the maximum batch size in spans. Defaults to the value of the OTEL_BSP_MAX_EXPORT_BATCH_SIZE environment variable, if set, or 512.

Raises:

  • (ArgumentError)


43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 43

def initialize(exporter:,
               exporter_timeout_millis: Float(ENV.fetch('OTEL_BSP_EXPORT_TIMEOUT_MILLIS', 30_000)),
               schedule_delay_millis: Float(ENV.fetch('OTEL_BSP_SCHEDULE_DELAY_MILLIS', 5_000)),
               max_queue_size: Integer(ENV.fetch('OTEL_BSP_MAX_QUEUE_SIZE', 2048)),
               max_export_batch_size: Integer(ENV.fetch('OTEL_BSP_MAX_EXPORT_BATCH_SIZE', 512)))
  raise ArgumentError if max_export_batch_size > max_queue_size

  @exporter = exporter
  @exporter_timeout_seconds = exporter_timeout_millis / 1000.0
  @mutex = Mutex.new
  @condition = ConditionVariable.new
  @keep_running = true
  @delay_seconds = schedule_delay_millis / 1000.0
  @max_queue_size = max_queue_size
  @batch_size = max_export_batch_size
  @spans = []
  @pid = nil
  @thread = nil
  reset_on_fork
end

Instance Method Details

#force_flushInteger

TODO: test this explicitly. Export all ended spans to the configured Exporter that have not yet been exported.

This method should only be called in cases where it is absolutely necessary, such as when using some FaaS providers that may suspend the process after an invocation, but before the Processor exports the completed spans.

Returns:

  • (Integer)

    SUCCESS if no error occurred, FAILURE if a non-specific failure occurred, TIMEOUT if a timeout occurred.



93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 93

def force_flush
  snapshot = lock do
    reset_on_fork(restart_thread: false) if @keep_running
    spans.shift(spans.size)
  end
  until snapshot.empty?
    batch = snapshot.shift(@batch_size).map!(&:to_span_data)
    result_code = @exporter.export(batch)
    report_result(result_code, batch)
  end
  SUCCESS
end

#on_finish(span) ⇒ Object

adds a span to the batcher, threadsafe may block on lock



70
71
72
73
74
75
76
77
78
79
80
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 70

def on_finish(span) # rubocop:disable Metrics/AbcSize
  return unless span.context.trace_flags.sampled?

  lock do
    reset_on_fork
    n = spans.size + 1 - max_queue_size
    spans.shift(n) if n.positive?
    spans << span
    @condition.signal if spans.size > max_queue_size / 2
  end
end

#on_start(span, parent_context) ⇒ Object

does nothing for this processor



65
66
67
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 65

def on_start(span, parent_context)
  # noop
end

#shutdownInteger

shuts the consumer thread down and flushes the current accumulated buffer will block until the thread is finished

Returns:

  • (Integer)

    SUCCESS if no error occurred, FAILURE if a non-specific failure occurred, TIMEOUT if a timeout occurred.



111
112
113
114
115
116
117
118
119
120
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 111

def shutdown
  lock do
    @keep_running = false
    @condition.signal
  end

  @thread.join
  force_flush
  @exporter.shutdown
end