Module: OpenTelemetry::Instrumentation::Rdkafka::Patches::Consumer

Defined in:
lib/opentelemetry/instrumentation/rdkafka/patches/consumer.rb

Overview

The Consumer module contains the instrumentation patch for the Consumer class

Instance Method Summary collapse

Instance Method Details

#eachObject



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/opentelemetry/instrumentation/rdkafka/patches/consumer.rb', line 13

def each
  super do |message|
    attributes = {
      'messaging.system' => 'kafka',
      'messaging.destination' => message.topic,
      'messaging.destination_kind' => 'topic',
      'messaging.kafka.partition' => message.partition,
      'messaging.kafka.offset' => message.offset
    }

    attributes['messaging.kafka.message_key'] = message.key if message.key
    parent_context = OpenTelemetry.propagation.extract(message.headers, getter: OpenTelemetry::Common::Propagation.symbol_key_getter)
    span_context = OpenTelemetry::Trace.current_span(parent_context).context
    links = [OpenTelemetry::Trace::Link.new(span_context)] if span_context.valid?

    OpenTelemetry::Context.with_current(parent_context) do
      tracer.in_span("#{message.topic} process", links: links, attributes: attributes, kind: :consumer) do
        yield message
      end
    end
  end
end

#each_batch(max_items: 100, bytes_threshold: Float::INFINITY, timeout_ms: 250, yield_on_error: false, &block) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/opentelemetry/instrumentation/rdkafka/patches/consumer.rb', line 36

def each_batch(max_items: 100, bytes_threshold: Float::INFINITY, timeout_ms: 250, yield_on_error: false, &block)
  super do |messages, error|
    if messages.empty?
      yield messages, error
    else
      attributes = {
        'messaging.system' => 'kafka',
        'messaging.destination_kind' => 'topic',
        'messaging.kafka.message_count' => messages.size
      }

      links = messages.map do |message|
        span_context = OpenTelemetry::Trace.current_span(OpenTelemetry.propagation.extract(message.headers, getter: OpenTelemetry::Common::Propagation.symbol_key_getter)).context
        OpenTelemetry::Trace::Link.new(span_context) if span_context.valid?
      end
      links.compact!

      tracer.in_span('batch process', attributes: attributes, links: links, kind: :consumer) do
        yield messages, error
      end
    end
  end
end