Module: OpenTelemetry::Instrumentation::RubyKafka::Patches::Client

Defined in:
lib/opentelemetry/instrumentation/ruby_kafka/patches/client.rb

Overview

The Client module contains the instrumentation patch the Client#deliver_message and Client#each_message methods.

Instance Method Summary collapse

Instance Method Details

#deliver_message(value, key: nil, headers: {}, topic:, partition: nil, partition_key: nil, retries: 1) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/opentelemetry/instrumentation/ruby_kafka/patches/client.rb', line 15

def deliver_message(value, key: nil, headers: {}, topic:, partition: nil, partition_key: nil, retries: 1)
  attributes = {
    'messaging.system' => 'kafka',
    'messaging.destination' => topic,
    'messaging.destination_kind' => 'topic'
  }

  message_key = Utils.extract_message_key(key)
  attributes['messaging.kafka.message_key'] = message_key if message_key

  attributes['messaging.kafka.partition'] = partition if partition

  tracer.in_span("#{topic} send", attributes: attributes, kind: :producer) do
    OpenTelemetry.propagation.inject(headers)
    super
  end
end

#each_message(topic:, start_from_beginning: true, max_wait_time: 5, min_bytes: 1, max_bytes: 1_048_576, &block) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/opentelemetry/instrumentation/ruby_kafka/patches/client.rb', line 33

def each_message(topic:, start_from_beginning: true, max_wait_time: 5, min_bytes: 1, max_bytes: 1_048_576, &block)
  super do |message|
    attributes = {
      'messaging.system' => 'kafka',
      'messaging.destination' => message.topic,
      'messaging.destination_kind' => 'topic',
      'messaging.kafka.partition' => message.partition
    }

    message_key = Utils.extract_message_key(message.key)
    attributes['messaging.kafka.message_key'] = message_key if message_key

    parent_context = OpenTelemetry.propagation.extract(message.headers)
    OpenTelemetry::Context.with_current(parent_context) do
      tracer.in_span("#{topic} process", attributes: attributes, kind: :consumer) do
        yield message
      end
    end
  end
end