Class: OpenTelemetry::SDK::Trace::Export::BatchSpanProcessor
- Inherits:
-
Object
- Object
- OpenTelemetry::SDK::Trace::Export::BatchSpanProcessor
- Defined in:
- lib/opentelemetry/sdk/trace/export/batch_span_processor.rb
Overview
Implementation of the duck type SpanProcessor that batches spans exported by the SDK then pushes them to the exporter pipeline.
Typically, the BatchSpanProcessor will be more suitable for production environments than the SimpleSpanProcessor.
All spans reported by the SDK implementation are first added to a synchronized queue (with a #max_queue_size maximum size, after the size is reached spans are dropped) and exported every schedule_delay to the exporter pipeline in batches of max_export_batch_size.
If the queue gets half full a preemptive notification is sent to the worker thread that exports the spans to wake up and start a new export cycle.
Instance Method Summary collapse
-
#force_flush(timeout: nil) ⇒ Integer
Export all ended spans to the configured
Exporter
that have not yet been exported. -
#initialize(exporter, exporter_timeout: Float(ENV.fetch('OTEL_BSP_EXPORT_TIMEOUT', 30_000)), schedule_delay: Float(ENV.fetch('OTEL_BSP_SCHEDULE_DELAY', 5_000)), max_queue_size: Integer(ENV.fetch('OTEL_BSP_MAX_QUEUE_SIZE', 2048)), max_export_batch_size: Integer(ENV.fetch('OTEL_BSP_MAX_EXPORT_BATCH_SIZE', 512)), start_thread_on_boot: String(ENV['OTEL_RUBY_BSP_START_THREAD_ON_BOOT']) !~ /false/i, metrics_reporter: nil) ⇒ Object
constructor
Returns a new instance of the BatchSpanProcessor.
-
#on_finish(span) ⇒ Object
Adds a span to the batch.
-
#on_start(_span, _parent_context) ⇒ Object
Does nothing for this processor.
-
#shutdown(timeout: nil) ⇒ Integer
Shuts the consumer thread down and flushes the current accumulated buffer will block until the thread is finished.
Constructor Details
#initialize(exporter, exporter_timeout: Float(ENV.fetch('OTEL_BSP_EXPORT_TIMEOUT', 30_000)), schedule_delay: Float(ENV.fetch('OTEL_BSP_SCHEDULE_DELAY', 5_000)), max_queue_size: Integer(ENV.fetch('OTEL_BSP_MAX_QUEUE_SIZE', 2048)), max_export_batch_size: Integer(ENV.fetch('OTEL_BSP_MAX_EXPORT_BATCH_SIZE', 512)), start_thread_on_boot: String(ENV['OTEL_RUBY_BSP_START_THREAD_ON_BOOT']) !~ /false/i, metrics_reporter: nil) ⇒ Object
Returns a new instance of the OpenTelemetry::SDK::Trace::Export::BatchSpanProcessor.
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 47 def initialize(exporter, exporter_timeout: Float(ENV.fetch('OTEL_BSP_EXPORT_TIMEOUT', 30_000)), schedule_delay: Float(ENV.fetch('OTEL_BSP_SCHEDULE_DELAY', 5_000)), max_queue_size: Integer(ENV.fetch('OTEL_BSP_MAX_QUEUE_SIZE', 2048)), max_export_batch_size: Integer(ENV.fetch('OTEL_BSP_MAX_EXPORT_BATCH_SIZE', 512)), start_thread_on_boot: String(ENV['OTEL_RUBY_BSP_START_THREAD_ON_BOOT']) !~ /false/i, metrics_reporter: nil) raise ArgumentError if max_export_batch_size > max_queue_size raise ArgumentError, "exporter #{exporter.inspect} does not appear to be a valid exporter" unless Common::Utilities.valid_exporter?(exporter) @exporter = exporter @exporter_timeout_seconds = exporter_timeout / 1000.0 @mutex = Mutex.new @export_mutex = Mutex.new @condition = ConditionVariable.new @keep_running = true @delay_seconds = schedule_delay / 1000.0 @max_queue_size = max_queue_size @batch_size = max_export_batch_size @metrics_reporter = metrics_reporter || OpenTelemetry::SDK::Trace::Export::MetricsReporter @spans = [] @pid = nil @thread = nil reset_on_fork(restart_thread: start_thread_on_boot) end |
Instance Method Details
#force_flush(timeout: nil) ⇒ Integer
Export all ended spans to the configured Exporter
that have not yet been exported.
This method should only be called in cases where it is absolutely necessary, such as when using some FaaS providers that may suspend the process after an invocation, but before the Processor
exports the completed spans.
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 103 def force_flush(timeout: nil) # rubocop:disable Metrics/MethodLength start_time = OpenTelemetry::Common::Utilities. snapshot = lock do reset_on_fork(restart_thread: @keep_running) spans.shift(spans.size) end until snapshot.empty? remaining_timeout = OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time) return TIMEOUT if remaining_timeout&.zero? batch = snapshot.shift(@batch_size) result_code = export_batch(batch, timeout: remaining_timeout) return result_code unless result_code == SUCCESS end @exporter.force_flush(timeout: OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time)) ensure # Unshift the remaining spans if we timed out. We drop excess spans from # the snapshot because they're older than any spans in the spans buffer. lock do n = spans.size + snapshot.size - max_queue_size if n.positive? dropped_spans = snapshot.shift(n) report_dropped_spans(dropped_spans, reason: 'buffer-full', function: __method__.to_s) end spans.unshift(*snapshot) unless snapshot.empty? @condition.signal if spans.size > max_queue_size / 2 end end |
#on_finish(span) ⇒ Object
Adds a span to the batch. Thread-safe; may block on lock.
77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 77 def on_finish(span) return unless span.context.trace_flags.sampled? lock do reset_on_fork n = spans.size + 1 - max_queue_size if n > 0 dropped_spans = spans.shift(n) report_dropped_spans(dropped_spans, reason: 'buffer-full', function: __method__.to_s) end spans << span @condition.signal if spans.size > batch_size end end |
#on_start(_span, _parent_context) ⇒ Object
Does nothing for this processor
74 |
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 74 def on_start(_span, _parent_context); end |
#shutdown(timeout: nil) ⇒ Integer
Shuts the consumer thread down and flushes the current accumulated buffer will block until the thread is finished.
139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 139 def shutdown(timeout: nil) start_time = OpenTelemetry::Common::Utilities. thread = lock do @keep_running = false @condition.signal @thread end thread&.join(timeout) force_flush(timeout: OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time)) dropped_spans = lock { spans.shift(spans.length) } report_dropped_spans(dropped_spans, reason: 'terminating') if dropped_spans.any? @exporter.shutdown(timeout: OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time)) end |