Module: OpenTelemetry::Instrumentation::RubyKafka::Patches::Consumer
- Defined in:
- lib/opentelemetry/instrumentation/ruby_kafka/patches/consumer.rb
Overview
The Consumer module contains the instrumentation patch for the Consumer class
Instance Method Summary collapse
- 
  
    
      #each_batch(min_bytes: 1, max_bytes: 10_485_760, max_wait_time: 1, automatically_mark_as_processed: true)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    rubocop:disable Metrics/AbcSize. 
- #each_message(min_bytes: 1, max_bytes: 10_485_760, max_wait_time: 1, automatically_mark_as_processed: true) ⇒ Object
Instance Method Details
#each_batch(min_bytes: 1, max_bytes: 10_485_760, max_wait_time: 1, automatically_mark_as_processed: true) ⇒ Object
rubocop:disable Metrics/AbcSize
| 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 | # File 'lib/opentelemetry/instrumentation/ruby_kafka/patches/consumer.rb', line 38 def each_batch(min_bytes: 1, max_bytes: 10_485_760, max_wait_time: 1, automatically_mark_as_processed: true) # rubocop:disable Metrics/AbcSize super do |batch| attributes = { 'messaging.system' => 'kafka', 'messaging.destination' => batch.topic, 'messaging.destination_kind' => 'topic', 'messaging.kafka.partition' => batch.partition, 'messaging.kafka.offset_lag' => batch.offset_lag, 'messaging.kafka.highwater_mark_offset' => batch.highwater_mark_offset, 'messaging.kafka.message_count' => batch..count } links = batch..map do || span_context = OpenTelemetry::Trace.current_span(OpenTelemetry.propagation.extract(.headers)).context OpenTelemetry::Trace::Link.new(span_context) if span_context.valid? end links.compact! tracer.in_span("#{batch.topic} process", attributes: attributes, links: links, kind: :consumer) do yield batch end end end | 
#each_message(min_bytes: 1, max_bytes: 10_485_760, max_wait_time: 1, automatically_mark_as_processed: true) ⇒ Object
| 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | # File 'lib/opentelemetry/instrumentation/ruby_kafka/patches/consumer.rb', line 13 def (min_bytes: 1, max_bytes: 10_485_760, max_wait_time: 1, automatically_mark_as_processed: true) super do || attributes = { 'messaging.system' => 'kafka', 'messaging.destination' => .topic, 'messaging.destination_kind' => 'topic', 'messaging.kafka.partition' => .partition, 'messaging.kafka.offset' => .offset } = Utils.(.key) attributes['messaging.kafka.message_key'] = if parent_context = OpenTelemetry.propagation.extract(.headers) span_context = OpenTelemetry::Trace.current_span(parent_context).context links = [OpenTelemetry::Trace::Link.new(span_context)] if span_context.valid? OpenTelemetry::Context.with_current(parent_context) do tracer.in_span("#{.topic} process", links: links, attributes: attributes, kind: :consumer) do yield end end end end |