Class: OpenTelemetry::SDK::Trace::Export::BatchSpanProcessor
- Inherits:
-
Object
- Object
- OpenTelemetry::SDK::Trace::Export::BatchSpanProcessor
- Defined in:
- lib/opentelemetry/sdk/trace/export/batch_span_processor.rb
Overview
Implementation of the duck type SpanProcessor that batches spans exported by the SDK then pushes them to the exporter pipeline.
Typically, the BatchSpanProcessor will be more suitable for production environments than the SimpleSpanProcessor.
All spans reported by the SDK implementation are first added to a synchronized queue (with a #max_queue_size maximum size, after the size is reached spans are dropped) and exported every schedule_delay_millis to the exporter pipeline in batches of max_export_batch_size.
If the queue gets half full a preemptive notification is sent to the worker thread that exports the spans to wake up and start a new export cycle.
Instance Method Summary collapse
-
#force_flush(timeout: nil) ⇒ Integer
Export all ended spans to the configured
Exporter
that have not yet been exported. -
#initialize(exporter:, exporter_timeout_millis: Float(ENV.fetch('OTEL_BSP_EXPORT_TIMEOUT_MILLIS', 30_000)), schedule_delay_millis: Float(ENV.fetch('OTEL_BSP_SCHEDULE_DELAY_MILLIS', 5_000)), max_queue_size: Integer(ENV.fetch('OTEL_BSP_MAX_QUEUE_SIZE', 2048)), max_export_batch_size: Integer(ENV.fetch('OTEL_BSP_MAX_EXPORT_BATCH_SIZE', 512)), start_thread_on_boot: String(ENV['OTEL_RUBY_BSP_START_THREAD_ON_BOOT']) !~ /false/i) ⇒ Object
constructor
Returns a new instance of the BatchSpanProcessor.
-
#on_finish(span) ⇒ Object
Adds a span to the batch.
-
#on_start(_span, _parent_context) ⇒ Object
Does nothing for this processor.
-
#shutdown(timeout: nil) ⇒ Integer
Shuts the consumer thread down and flushes the current accumulated buffer will block until the thread is finished.
Constructor Details
#initialize(exporter:, exporter_timeout_millis: Float(ENV.fetch('OTEL_BSP_EXPORT_TIMEOUT_MILLIS', 30_000)), schedule_delay_millis: Float(ENV.fetch('OTEL_BSP_SCHEDULE_DELAY_MILLIS', 5_000)), max_queue_size: Integer(ENV.fetch('OTEL_BSP_MAX_QUEUE_SIZE', 2048)), max_export_batch_size: Integer(ENV.fetch('OTEL_BSP_MAX_EXPORT_BATCH_SIZE', 512)), start_thread_on_boot: String(ENV['OTEL_RUBY_BSP_START_THREAD_ON_BOOT']) !~ /false/i) ⇒ Object
Returns a new instance of the OpenTelemetry::SDK::Trace::Export::BatchSpanProcessor.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 46 def initialize(exporter:, exporter_timeout_millis: Float(ENV.fetch('OTEL_BSP_EXPORT_TIMEOUT_MILLIS', 30_000)), schedule_delay_millis: Float(ENV.fetch('OTEL_BSP_SCHEDULE_DELAY_MILLIS', 5_000)), max_queue_size: Integer(ENV.fetch('OTEL_BSP_MAX_QUEUE_SIZE', 2048)), max_export_batch_size: Integer(ENV.fetch('OTEL_BSP_MAX_EXPORT_BATCH_SIZE', 512)), start_thread_on_boot: String(ENV['OTEL_RUBY_BSP_START_THREAD_ON_BOOT']) !~ /false/i) raise ArgumentError if max_export_batch_size > max_queue_size @exporter = exporter @exporter_timeout_seconds = exporter_timeout_millis / 1000.0 @mutex = Mutex.new @export_mutex = Mutex.new @condition = ConditionVariable.new @keep_running = true @delay_seconds = schedule_delay_millis / 1000.0 @max_queue_size = max_queue_size @batch_size = max_export_batch_size @spans = [] @pid = nil @thread = nil reset_on_fork(restart_thread: start_thread_on_boot) end |
Instance Method Details
#force_flush(timeout: nil) ⇒ Integer
Export all ended spans to the configured Exporter
that have not yet been exported.
This method should only be called in cases where it is absolutely necessary, such as when using some FaaS providers that may suspend the process after an invocation, but before the Processor
exports the completed spans.
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 96 def force_flush(timeout: nil) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity start_time = Time.now snapshot = lock do reset_on_fork(restart_thread: false) if @keep_running spans.shift(spans.size) end until snapshot.empty? remaining_timeout = OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time) return TIMEOUT if remaining_timeout&.zero? batch = snapshot.shift(@batch_size).map!(&:to_span_data) result_code = export_batch(batch, timeout: remaining_timeout) return result_code unless result_code == SUCCESS end SUCCESS ensure # Unshift the remaining spans if we timed out. We drop excess spans from # the snapshot because they're older than any spans in the spans buffer. lock do n = spans.size + snapshot.size - max_queue_size snapshot.shift(n) if n.positive? spans.unshift(snapshot) unless snapshot.empty? @condition.signal if spans.size > max_queue_size / 2 end end |
#on_finish(span) ⇒ Object
Adds a span to the batch. Thread-safe; may block on lock.
73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 73 def on_finish(span) # rubocop:disable Metrics/AbcSize return unless span.context.trace_flags.sampled? lock do reset_on_fork n = spans.size + 1 - max_queue_size spans.shift(n) if n.positive? spans << span @condition.signal if spans.size > batch_size end end |
#on_start(_span, _parent_context) ⇒ Object
Does nothing for this processor
70 |
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 70 def on_start(_span, _parent_context); end |
#shutdown(timeout: nil) ⇒ Integer
Shuts the consumer thread down and flushes the current accumulated buffer will block until the thread is finished.
129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 129 def shutdown(timeout: nil) start_time = Time.now lock do @keep_running = false @condition.signal end @thread.join(timeout) force_flush(timeout: OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time)) @exporter.shutdown(timeout: OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time)) end |