Module: OpenTelemetry::Instrumentation::ActiveJob::Patches::ActiveJobCallbacks

Defined in:
lib/opentelemetry/instrumentation/active_job/patches/active_job_callbacks.rb

Overview

Module to prepend to ActiveJob::Base for instrumentation.

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.prepended(base) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/opentelemetry/instrumentation/active_job/patches/active_job_callbacks.rb', line 13

def self.prepended(base)
  base.class_eval do
    around_enqueue do |job, block|
      span_kind = job.class.queue_adapter_name == 'inline' ? :client : :producer
      span_name = "#{otel_config[:span_naming] == :job_class ? job.class : job.queue_name} send"
      span_attributes = job_attributes(job)
      otel_tracer.in_span(span_name, attributes: span_attributes, kind: span_kind) do
        OpenTelemetry.propagation.inject(job.)
        block.call
      end
    end
  end
end

Instance Method Details

#perform_nowObject

rubocop:disable Metrics/AbcSize, Metrics/MethodLength, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/opentelemetry/instrumentation/active_job/patches/active_job_callbacks.rb', line 27

def perform_now # rubocop:disable Metrics/AbcSize, Metrics/MethodLength, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
  span_kind = self.class.queue_adapter_name == 'inline' ? :server : :consumer
  span_name = "#{otel_config[:span_naming] == :job_class ? self.class : queue_name} process"
  span_attributes = job_attributes(self).merge('messaging.operation' => 'process')
  executions_count = (executions || 0) + 1 # because we run before the count is incremented in ActiveJob::Execution

  extracted_context = OpenTelemetry.propagation.extract()
  OpenTelemetry::Context.with_current(extracted_context) do
    if otel_config[:propagation_style] == :child
      otel_tracer.in_span(span_name, attributes: span_attributes, kind: span_kind) do |span|
        span.set_attribute('messaging.active_job.executions', executions_count)
        super
      end
    else
      span_links = []
      if otel_config[:propagation_style] == :link
        span_context = OpenTelemetry::Trace.current_span(extracted_context).context
        span_links << OpenTelemetry::Trace::Link.new(span_context) if span_context.valid?
      end

      root_span = otel_tracer.start_root_span(span_name, attributes: span_attributes, links: span_links, kind: span_kind)
      OpenTelemetry::Trace.with_span(root_span) do |span|
        span.set_attribute('messaging.active_job.executions', executions_count)
        super
      rescue Exception => e # rubocop:disable Lint/RescueException
        span.record_exception(e)
        span.status = OpenTelemetry::Trace::Status.error("Unhandled exception of type: #{e.class}")
        raise e
      ensure
        root_span.finish
      end
    end
  end
ensure
  # We may be in a job system (eg: resque) that forks and kills worker processes often.
  # We don't want to lose spans by not flushing any span processors, so we optionally force it here.
  OpenTelemetry.tracer_provider.force_flush if otel_config[:force_flush]
end