Class: OpenTelemetry::SDK::Logs::Export::BatchLogRecordProcessor
- Inherits:
-
LogRecordProcessor
- Object
- LogRecordProcessor
- OpenTelemetry::SDK::Logs::Export::BatchLogRecordProcessor
- Defined in:
- lib/opentelemetry/sdk/logs/export/batch_log_record_processor.rb
Overview
WARNING - The spec has some differences from the LogRecord version of this processor Implementation of the duck type LogRecordProcessor that batches log records exported by the SDK then pushes them to the exporter pipeline.
Typically, the BatchLogRecordProcessor will be more suitable for production environments than the SimpleLogRecordProcessor.
Instance Method Summary collapse
-
#force_flush(timeout: nil) ⇒ Integer
Export all emitted log records that have not yet been exported to the configured
Exporter
. -
#initialize(exporter, exporter_timeout: Float(ENV.fetch('OTEL_BLRP_EXPORT_TIMEOUT', 30_000)), schedule_delay: Float(ENV.fetch('OTEL_BLRP_SCHEDULE_DELAY', 1000)), max_queue_size: Integer(ENV.fetch('OTEL_BLRP_MAX_QUEUE_SIZE', 2048)), max_export_batch_size: Integer(ENV.fetch('OTEL_BLRP_MAX_EXPORT_BATCH_SIZE', 512)), start_thread_on_boot: String(ENV['OTEL_RUBY_BLRP_START_THREAD_ON_BOOT']) !~ /false/i) ⇒ Object
constructor
Returns a new instance of the BatchLogRecordProcessor.
-
#on_emit(log_record, _context) ⇒ Object
Adds a log record to the batch.
-
#shutdown(timeout: nil) ⇒ Integer
Shuts the consumer thread down and flushes the current accumulated buffer will block until the thread is finished.
Constructor Details
#initialize(exporter, exporter_timeout: Float(ENV.fetch('OTEL_BLRP_EXPORT_TIMEOUT', 30_000)), schedule_delay: Float(ENV.fetch('OTEL_BLRP_SCHEDULE_DELAY', 1000)), max_queue_size: Integer(ENV.fetch('OTEL_BLRP_MAX_QUEUE_SIZE', 2048)), max_export_batch_size: Integer(ENV.fetch('OTEL_BLRP_MAX_EXPORT_BATCH_SIZE', 512)), start_thread_on_boot: String(ENV['OTEL_RUBY_BLRP_START_THREAD_ON_BOOT']) !~ /false/i) ⇒ Object
Returns a new instance of the OpenTelemetry::SDK::Logs::Export::BatchLogRecordProcessor.
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/opentelemetry/sdk/logs/export/batch_log_record_processor.rb', line 37 def initialize(exporter, exporter_timeout: Float(ENV.fetch('OTEL_BLRP_EXPORT_TIMEOUT', 30_000)), schedule_delay: Float(ENV.fetch('OTEL_BLRP_SCHEDULE_DELAY', 1000)), max_queue_size: Integer(ENV.fetch('OTEL_BLRP_MAX_QUEUE_SIZE', 2048)), max_export_batch_size: Integer(ENV.fetch('OTEL_BLRP_MAX_EXPORT_BATCH_SIZE', 512)), start_thread_on_boot: String(ENV['OTEL_RUBY_BLRP_START_THREAD_ON_BOOT']) !~ /false/i) unless max_export_batch_size <= max_queue_size raise ArgumentError, 'max_export_batch_size much be less than or equal to max_queue_size' end unless Common::Utilities.valid_exporter?(exporter) raise ArgumentError, "exporter #{exporter.inspect} does not appear to be a valid exporter" end @exporter = exporter @exporter_timeout_seconds = exporter_timeout / 1000.0 @mutex = Mutex.new @export_mutex = Mutex.new @condition = ConditionVariable.new @keep_running = true @stopped = false @delay_seconds = schedule_delay / 1000.0 @max_queue_size = max_queue_size @batch_size = max_export_batch_size @log_records = [] @pid = nil @thread = nil reset_on_fork(restart_thread: start_thread_on_boot) end |
Instance Method Details
#force_flush(timeout: nil) ⇒ Integer
Export all emitted log records that have not yet been exported to the configured Exporter
.
This method should only be called in cases where it is absolutely necessary, such as when using some FaaS providers that may suspend the process after an invocation, but before the Processor
exports the completed log records.
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/opentelemetry/sdk/logs/export/batch_log_record_processor.rb', line 96 def force_flush(timeout: nil) start_time = OpenTelemetry::Common::Utilities. snapshot = lock do reset_on_fork if @keep_running log_records.shift(log_records.size) end until snapshot.empty? remaining_timeout = OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time) return TIMEOUT if remaining_timeout&.zero? batch = snapshot.shift(batch_size).map!(&:to_log_record_data) result_code = export_batch(batch, timeout: remaining_timeout) return result_code unless result_code == SUCCESS end @exporter.force_flush(timeout: OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time)) ensure # Unshift the remaining log records if we timed out. We drop excess # log records from the snapshot because they're older than any # records in the buffer. lock do n = log_records.size + snapshot.size - max_queue_size if n.positive? snapshot.shift(n) report_dropped_log_records(n, reason: 'buffer-full') end log_records.unshift(*snapshot) unless snapshot.empty? @condition.signal if log_records.size > max_queue_size / 2 end end |
#on_emit(log_record, _context) ⇒ Object
Adds a log record to the batch. Thread-safe; may block on lock.
70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/opentelemetry/sdk/logs/export/batch_log_record_processor.rb', line 70 def on_emit(log_record, _context) return if @stopped lock do reset_on_fork n = log_records.size + 1 - max_queue_size if n.positive? log_records.shift(n) report_dropped_log_records(n, reason: 'buffer-full') end log_records << log_record @condition.signal if log_records.size > batch_size end end |
#shutdown(timeout: nil) ⇒ Integer
Shuts the consumer thread down and flushes the current accumulated buffer will block until the thread is finished.
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/opentelemetry/sdk/logs/export/batch_log_record_processor.rb', line 137 def shutdown(timeout: nil) return if @stopped start_time = OpenTelemetry::Common::Utilities. thread = lock do @keep_running = false @stopped = true @condition.signal @thread end thread&.join(timeout) force_flush(timeout: OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time)) dropped_log_records = lock { log_records.size } report_dropped_log_records(dropped_log_records, reason: 'terminating') if dropped_log_records.positive? @exporter.shutdown(timeout: OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time)) end |