Class: OpenTelemetry::Instrumentation::Que::Middlewares::ServerMiddleware

Inherits:
Object
  • Object
show all
Defined in:
lib/opentelemetry/instrumentation/que/middlewares/server_middleware.rb

Overview

Server middleware to trace Que jobs

Class Method Summary collapse

Class Method Details

.attributes_before_job_completion(job, job_class) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/opentelemetry/instrumentation/que/middlewares/server_middleware.rb', line 51

def self.attributes_before_job_completion(job, job_class)
  attributes = {
    'messaging.system' => 'que',
    'messaging.destination' => job.que_attrs[:queue] || 'default',
    'messaging.destination_kind' => 'queue',
    'messaging.operation' => 'process',
    'messaging.que.job_class' => job_class,
    'messaging.que.priority' => job.que_attrs[:priority] || 100
  }
  attributes['messaging.message_id'] = job.que_attrs[:id] if job.que_attrs[:id]
  attributes
end

.call(job, &block) ⇒ Object

rubocop:disable Metrics/AbcSize,Metrics/MethodLength



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/opentelemetry/instrumentation/que/middlewares/server_middleware.rb', line 13

def self.call(job, &block) # rubocop:disable Metrics/AbcSize,Metrics/MethodLength
  job_class = job_class(job)
  span_name = "#{job_class} process"
  attributes = attributes_before_job_completion(job, job_class)

  extracted_context = extract_context_from_tags(job.que_attrs[:data][:tags] || [])

  OpenTelemetry::Context.with_current(extracted_context) do
    if otel_config[:propagation_style] == :child
      tracer.in_span(span_name, attributes: attributes, kind: :consumer) do |span|
        block.call
        enhance_span_after_job_completion(span, job)
      end
    else
      span_links = otel_config[:propagation_style] == :link ? prepare_span_links(extracted_context) : []

      root_span = tracer.start_root_span(span_name, attributes: attributes, links: span_links, kind: :consumer)
      OpenTelemetry::Trace.with_span(root_span) do |span|
        block.call
        enhance_span_after_job_completion(span, job)
      ensure
        root_span.finish
      end
    end
  end

  # return value is not important
  nil
end

.enhance_span_after_job_completion(span, job) ⇒ Object



64
65
66
67
68
69
70
71
72
# File 'lib/opentelemetry/instrumentation/que/middlewares/server_middleware.rb', line 64

def self.enhance_span_after_job_completion(span, job)
  span.set_attribute('messaging.que.attempts', job.que_attrs[:error_count])

  error = job.que_error
  return unless error

  span.record_exception(error)
  span.status = OpenTelemetry::Trace::Status.error("Unhandled exception of type: #{error.class}")
end

.extract_context_from_tags(tags) ⇒ Object

tags is an array looking something like [“tag1”, “traceparent:…”]



75
76
77
78
79
80
81
# File 'lib/opentelemetry/instrumentation/que/middlewares/server_middleware.rb', line 75

def self.extract_context_from_tags(tags)
  # Convert tags into Hash (ignoring elements that cannot be converted)
  tags_hash = Hash[
    tags.map { |value| value.split(':', 2) }.select { |value| value.size == 2 }
  ]
  OpenTelemetry.propagation.extract(tags_hash)
end

.job_class(job) ⇒ Object



83
84
85
# File 'lib/opentelemetry/instrumentation/que/middlewares/server_middleware.rb', line 83

def self.job_class(job)
  job.que_attrs[:job_class] || job.class.name
end

.otel_configObject



47
48
49
# File 'lib/opentelemetry/instrumentation/que/middlewares/server_middleware.rb', line 47

def self.otel_config
  Que::Instrumentation.instance.config
end


87
88
89
90
# File 'lib/opentelemetry/instrumentation/que/middlewares/server_middleware.rb', line 87

def self.prepare_span_links(extracted_context)
  span_context = OpenTelemetry::Trace.current_span(extracted_context).context
  span_context.valid? ? [OpenTelemetry::Trace::Link.new(span_context)] : []
end

.tracerObject



43
44
45
# File 'lib/opentelemetry/instrumentation/que/middlewares/server_middleware.rb', line 43

def self.tracer
  OpenTelemetry::Instrumentation::Que::Instrumentation.instance.tracer
end