Class: OpenTelemetry::SDK::Trace::Export::BatchSpanProcessor
- Inherits:
-
Object
- Object
- OpenTelemetry::SDK::Trace::Export::BatchSpanProcessor
- Defined in:
- lib/opentelemetry/sdk/trace/export/batch_span_processor.rb
Overview
Implementation of the duck type SpanProcessor that batches spans exported by the SDK then pushes them to the exporter pipeline.
All spans reported by the SDK implementation are first added to a synchronized queue (with a #max_queue_size maximum size, after the size is reached spans are dropped) and exported every schedule_delay_millis to the exporter pipeline in batches of max_export_batch_size.
If the queue gets half full a preemptive notification is sent to the worker thread that exports the spans to wake up and start a new export cycle.
Instance Method Summary collapse
-
#force_flush ⇒ Integer
TODO: test this explicitly.
-
#initialize(exporter:, exporter_timeout_millis: Float(ENV.fetch('OTEL_BSP_EXPORT_TIMEOUT_MILLIS', 30_000)), schedule_delay_millis: Float(ENV.fetch('OTEL_BSP_SCHEDULE_DELAY_MILLIS', 5_000)), max_queue_size: Integer(ENV.fetch('OTEL_BSP_MAX_QUEUE_SIZE', 2048)), max_export_batch_size: Integer(ENV.fetch('OTEL_BSP_MAX_EXPORT_BATCH_SIZE', 512))) ⇒ Object
constructor
Returns a new instance of the BatchSpanProcessor.
-
#on_finish(span) ⇒ Object
adds a span to the batcher, threadsafe may block on lock.
-
#on_start(span, parent_context) ⇒ Object
does nothing for this processor.
-
#shutdown ⇒ Integer
shuts the consumer thread down and flushes the current accumulated buffer will block until the thread is finished.
Constructor Details
#initialize(exporter:, exporter_timeout_millis: Float(ENV.fetch('OTEL_BSP_EXPORT_TIMEOUT_MILLIS', 30_000)), schedule_delay_millis: Float(ENV.fetch('OTEL_BSP_SCHEDULE_DELAY_MILLIS', 5_000)), max_queue_size: Integer(ENV.fetch('OTEL_BSP_MAX_QUEUE_SIZE', 2048)), max_export_batch_size: Integer(ENV.fetch('OTEL_BSP_MAX_EXPORT_BATCH_SIZE', 512))) ⇒ Object
Returns a new instance of the OpenTelemetry::SDK::Trace::Export::BatchSpanProcessor.
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 43 def initialize(exporter:, exporter_timeout_millis: Float(ENV.fetch('OTEL_BSP_EXPORT_TIMEOUT_MILLIS', 30_000)), schedule_delay_millis: Float(ENV.fetch('OTEL_BSP_SCHEDULE_DELAY_MILLIS', 5_000)), max_queue_size: Integer(ENV.fetch('OTEL_BSP_MAX_QUEUE_SIZE', 2048)), max_export_batch_size: Integer(ENV.fetch('OTEL_BSP_MAX_EXPORT_BATCH_SIZE', 512))) raise ArgumentError if max_export_batch_size > max_queue_size @exporter = exporter @exporter_timeout_seconds = exporter_timeout_millis / 1000.0 @mutex = Mutex.new @condition = ConditionVariable.new @keep_running = true @delay_seconds = schedule_delay_millis / 1000.0 @max_queue_size = max_queue_size @batch_size = max_export_batch_size @spans = [] @pid = nil @thread = nil reset_on_fork end |
Instance Method Details
#force_flush ⇒ Integer
TODO: test this explicitly. Export all ended spans to the configured Exporter that have not yet been exported.
This method should only be called in cases where it is absolutely necessary, such as when using some FaaS providers that may suspend the process after an invocation, but before the Processor exports the completed spans.
93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 93 def force_flush snapshot = lock do reset_on_fork(restart_thread: false) if @keep_running spans.shift(spans.size) end until snapshot.empty? batch = snapshot.shift(@batch_size).map!(&:to_span_data) result_code = @exporter.export(batch) report_result(result_code, batch) end SUCCESS end |
#on_finish(span) ⇒ Object
adds a span to the batcher, threadsafe may block on lock
70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 70 def on_finish(span) # rubocop:disable Metrics/AbcSize return unless span.context.trace_flags.sampled? lock do reset_on_fork n = spans.size + 1 - max_queue_size spans.shift(n) if n.positive? spans << span @condition.signal if spans.size > max_queue_size / 2 end end |
#on_start(span, parent_context) ⇒ Object
does nothing for this processor
65 66 67 |
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 65 def on_start(span, parent_context) # noop end |
#shutdown ⇒ Integer
shuts the consumer thread down and flushes the current accumulated buffer will block until the thread is finished
111 112 113 114 115 116 117 118 119 120 |
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 111 def shutdown lock do @keep_running = false @condition.signal end @thread.join force_flush @exporter.shutdown end |