Class: OpenTelemetry::SDK::Logs::Export::BatchLogRecordProcessor

Inherits:
LogRecordProcessor show all
Defined in:
lib/opentelemetry/sdk/logs/export/batch_log_record_processor.rb

Overview

WARNING - The spec has some differences from the LogRecord version of this processor Implementation of the duck type LogRecordProcessor that batches log records exported by the SDK then pushes them to the exporter pipeline.

Typically, the BatchLogRecordProcessor will be more suitable for production environments than the SimpleLogRecordProcessor.

Instance Method Summary collapse

Constructor Details

#initialize(exporter, exporter_timeout: Float(ENV.fetch('OTEL_BLRP_EXPORT_TIMEOUT', 30_000)), schedule_delay: Float(ENV.fetch('OTEL_BLRP_SCHEDULE_DELAY', 1000)), max_queue_size: Integer(ENV.fetch('OTEL_BLRP_MAX_QUEUE_SIZE', 2048)), max_export_batch_size: Integer(ENV.fetch('OTEL_BLRP_MAX_EXPORT_BATCH_SIZE', 512)), start_thread_on_boot: String(ENV['OTEL_RUBY_BLRP_START_THREAD_ON_BOOT']) !~ /false/i) ⇒ Object

Parameters:

  • exporter (LogRecordExporter)

    The (duck type) LogRecordExporter to where the recorded LogRecords are pushed after batching.

  • exporter_timeout (Numeric) (defaults to: Float(ENV.fetch('OTEL_BLRP_EXPORT_TIMEOUT', 30_000)))

    The maximum allowed time to export data. Defaults to the value of the OTEL_BLRP_EXPORT_TIMEOUT environment variable, if set, or 30,000 (30 seconds).

  • schedule_delay (Numeric) (defaults to: Float(ENV.fetch('OTEL_BLRP_SCHEDULE_DELAY', 1000)))

    the delay interval between two consecutive exports. Defaults to the value of the OTEL_BLRP_SCHEDULE_DELAY environment variable, if set, or 1,000 (1 second).

  • max_queue_size (Integer) (defaults to: Integer(ENV.fetch('OTEL_BLRP_MAX_QUEUE_SIZE', 2048)))

    the maximum queue size in log records. Defaults to the value of the OTEL_BLRP_MAX_QUEUE_SIZE environment variable, if set, or 2048.

  • max_export_batch_size (Integer) (defaults to: Integer(ENV.fetch('OTEL_BLRP_MAX_EXPORT_BATCH_SIZE', 512)))

    the maximum batch size in log records. Defaults to the value of the OTEL_BLRP_MAX_EXPORT_BATCH_SIZE environment variable, if set, or 512.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/opentelemetry/sdk/logs/export/batch_log_record_processor.rb', line 37

def initialize(exporter,
               exporter_timeout: Float(ENV.fetch('OTEL_BLRP_EXPORT_TIMEOUT', 30_000)),
               schedule_delay: Float(ENV.fetch('OTEL_BLRP_SCHEDULE_DELAY', 1000)),
               max_queue_size: Integer(ENV.fetch('OTEL_BLRP_MAX_QUEUE_SIZE', 2048)),
               max_export_batch_size: Integer(ENV.fetch('OTEL_BLRP_MAX_EXPORT_BATCH_SIZE', 512)),
               start_thread_on_boot: String(ENV['OTEL_RUBY_BLRP_START_THREAD_ON_BOOT']) !~ /false/i)
  unless max_export_batch_size <= max_queue_size
    raise ArgumentError,
          'max_export_batch_size much be less than or equal to max_queue_size'
  end

  unless Common::Utilities.valid_exporter?(exporter)
    raise ArgumentError,
          "exporter #{exporter.inspect} does not appear to be a valid exporter"
  end

  @exporter = exporter
  @exporter_timeout_seconds = exporter_timeout / 1000.0
  @mutex = Mutex.new
  @export_mutex = Mutex.new
  @condition = ConditionVariable.new
  @keep_running = true
  @stopped = false
  @delay_seconds = schedule_delay / 1000.0
  @max_queue_size = max_queue_size
  @batch_size = max_export_batch_size
  @log_records = []
  @pid = nil
  @thread = nil
  reset_on_fork(restart_thread: start_thread_on_boot)
end

Instance Method Details

#force_flush(timeout: nil) ⇒ Integer

Export all emitted log records that have not yet been exported to the configured Exporter.

This method should only be called in cases where it is absolutely necessary, such as when using some FaaS providers that may suspend the process after an invocation, but before the Processor exports the completed log records.

Parameters:

  • timeout (optional Numeric) (defaults to: nil)

    An optional timeout in seconds.

Returns:

  • (Integer)

    SUCCESS if no error occurred, FAILURE if a non-specific failure occurred, TIMEOUT if a timeout occurred.



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/opentelemetry/sdk/logs/export/batch_log_record_processor.rb', line 96

def force_flush(timeout: nil)
  start_time = OpenTelemetry::Common::Utilities.timeout_timestamp

  snapshot = lock do
    reset_on_fork if @keep_running
    log_records.shift(log_records.size)
  end

  until snapshot.empty?
    remaining_timeout = OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time)
    return TIMEOUT if remaining_timeout&.zero?

    batch = snapshot.shift(batch_size).map!(&:to_log_record_data)
    result_code = export_batch(batch, timeout: remaining_timeout)
    return result_code unless result_code == SUCCESS
  end

  @exporter.force_flush(timeout: OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time))
ensure
  # Unshift the remaining log records if we timed out. We drop excess
  # log records from the snapshot because they're older than any
  # records in the buffer.
  lock do
    n = log_records.size + snapshot.size - max_queue_size

    if n.positive?
      snapshot.shift(n)
      report_dropped_log_records(n, reason: 'buffer-full')
    end

    log_records.unshift(*snapshot) unless snapshot.empty?
    @condition.signal if log_records.size > max_queue_size / 2
  end
end

#on_emit(log_record, _context) ⇒ Object

Adds a log record to the batch. Thread-safe; may block on lock.



70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/opentelemetry/sdk/logs/export/batch_log_record_processor.rb', line 70

def on_emit(log_record, _context)
  return if @stopped

  lock do
    reset_on_fork
    n = log_records.size + 1 - max_queue_size
    if n.positive?
      log_records.shift(n)
      report_dropped_log_records(n, reason: 'buffer-full')
    end
    log_records << log_record
    @condition.signal if log_records.size > batch_size
  end
end

#shutdown(timeout: nil) ⇒ Integer

Shuts the consumer thread down and flushes the current accumulated buffer will block until the thread is finished.

Parameters:

  • timeout (optional Numeric) (defaults to: nil)

    An optional timeout in seconds.

Returns:

  • (Integer)

    SUCCESS if no error occurred, FAILURE if a non-specific failure occurred, TIMEOUT if a timeout occurred.



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/opentelemetry/sdk/logs/export/batch_log_record_processor.rb', line 137

def shutdown(timeout: nil)
  return if @stopped

  start_time = OpenTelemetry::Common::Utilities.timeout_timestamp
  thread = lock do
    @keep_running = false
    @stopped = true
    @condition.signal
    @thread
  end

  thread&.join(timeout)
  force_flush(timeout: OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time))
  dropped_log_records = lock { log_records.size }
  report_dropped_log_records(dropped_log_records, reason: 'terminating') if dropped_log_records.positive?

  @exporter.shutdown(timeout: OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time))
end