2017-08-01

ActiveJobの仕組みを読み解く

ActiveJobの仕組みをざっくり読み解いてみました。Railsのバージョンは 5.1.2です。

ActiveJobの使い方をざっくり

configに使うバックエンドキューシステムのアダプタを指定して

config.active_job.queue_adapter = :sidekiq

ActiveJob::Baseを継承したクラスを書いて(Rails5系だとApplicationJobを継承)

class HogeJob < ActiveJob::Base
  queue_as :default
  def perform(*args)
    puts args
  end
end

perform_laterすると良い感じにキューイングしてくれます。

HogeJob.perform_later "hoge", 1, true

あとは各アダプタのワーカーをデーモンなりで常駐させて処理させればOKです。

コードリーディング

ActiveJob::BaseはActiveJob::Enqueuingをincludeしていて、ActiveJob::Enqueuingにクラスメソッドとしてperform_laterメソッドが定義されています。

module ActiveJob
  module Enqueuing
    extend ActiveSupport::Concern
...
    module ClassMethods
      def perform_later(*args)
        job_or_instantiate(*args).enqueue
      end

      private
        def job_or_instantiate(*args) # :doc:
          args.first.is_a?(self) ? args.first : new(*args)
        end
    end
  end
end

perform_laterの引数に何も指定しなければ定義したActiveJobがインスタンス化され、enqueueメソッドが呼び出されます。

...
    def enqueue(options = {})
      self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait]
      self.scheduled_at = options[:wait_until].to_f if options[:wait_until]
      self.queue_name   = self.class.queue_name_from_part(options[:queue]) if options[:queue]
      self.priority     = options[:priority].to_i if options[:priority]
      run_callbacks :enqueue do
        if scheduled_at
          self.class.queue_adapter.enqueue_at self, scheduled_at
        else
          self.class.queue_adapter.enqueue self
        end
      end
      self
    end
  end
end

enqueueメソッドではqueue_adapterのenqueueメソッドが呼び出されます。queue_adapterはconfig.active_job.queue_adapterで指定した名前から生成されるアダプタクラスになります。:sidekiqを指定するとActiveJob::QueueAdapters::SidekiqAdapterが生成されます。

module ActiveJob
...
      def queue_adapter=(name_or_adapter_or_class)
        self._queue_adapter = interpret_adapter(name_or_adapter_or_class)
      end

      private

        def interpret_adapter(name_or_adapter_or_class)
          case name_or_adapter_or_class
          when Symbol, String
            ActiveJob::QueueAdapters.lookup(name_or_adapter_or_class).new
          else
            if queue_adapter?(name_or_adapter_or_class)
              name_or_adapter_or_class
            else
              raise ArgumentError
            end
          end
        end
...

ActiveJob::QueueAdapters#lookupではconst_getを使ってXXXAdapterという名前の定数(=クラス)を取得しています。

module ActiveJob
  module QueueAdapters
...
    ADAPTER = "Adapter".freeze
    private_constant :ADAPTER

    class << self
      def lookup(name)
        const_get(name.to_s.camelize << ADAPTER)
      end
    end
  end
end

Sidekiqのアダプタは以下のような実装になっており、enqueueメソッドは内部的にはSidekiq::Client.pushするだけです

require "sidekiq"

module ActiveJob
  module QueueAdapters
    class SidekiqAdapter
      def enqueue(job) #:nodoc:
        #Sidekiq::Client does not support symbols as keys
        job.provider_job_id = Sidekiq::Client.push \
          "class"   => JobWrapper,
          "wrapped" => job.class.to_s,
          "queue"   => job.queue_name,
          "args"    => [ job.serialize ]
      end
...
      class JobWrapper #:nodoc:
        include Sidekiq::Worker

        def perform(job_data)
          Base.execute job_data.merge("provider_job_id" => jid)
        end
      end
    end
  end
end

JobWrapper#performはActiveJob#execute(ActiveJob::Executionのメソッド)を呼び出します。job_dataを引数にdeserializeすると、ActiveJob::Baseを継承したJobのクラスがインスタンス化されます。その後、各Jobのperform_nowメソッドを呼び出され、最終的に独自に定義するperformメソッドが呼び出されます。その名の通りJobWrapperは、引数としてカスタムJobクラスの情報を渡すことで、カスタムJobの処理を実行するクラスになります。

module ActiveJob
  module Execution
    extend ActiveSupport::Concern
    include ActiveSupport::Rescuable

    # Includes methods for executing and performing jobs instantly.
    module ClassMethods
      def perform_now(*args)
        job_or_instantiate(*args).perform_now
      end

      def execute(job_data) #:nodoc:
        ActiveJob::Callbacks.run_callbacks(:execute) do
          job = deserialize(job_data)
          job.perform_now
        end
      end
    end

    def perform_now
      deserialize_arguments_if_needed
      run_callbacks :perform do
        # Guard against jobs that were persisted before we started counting executions by zeroing out nil counters
        self.executions = (executions || 0) + 1

        perform(*arguments)
      end
    rescue => exception
      rescue_with_handler(exception) || raise
    end

    def perform(*)
      fail NotImplementedError
    end
  end
end

ちなみにjob_dataには以下のようなハッシュが入ります( “args” => [ job.serialize ] のところ)

def serialize
  {
    "job_class"  => self.class.name,
    "job_id"     => job_id,
    "queue_name" => queue_name,
    "priority"   => priority,
    "arguments"  => serialize_arguments(arguments),
    "executions" => executions,
    "locale"     => I18n.locale.to_s
  }
end

Sidekiq/Redisのデータを見てみる

Sidekiqの場合、格納先のRedisのデータを見てみるとこんな感じで、JSONでシリアライズされているのがわかります。

$ redis-cli 
127.0.0.1:6379> lrange queue:default 0 0

"{\"class\":\"ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper\",\"wrapped\":\"HogeJob\",\"queue\":\"default\",\"args\":[{\"job_class\":\"HogeJob\",\"job_id\":\"f801560f-cd28-497e-8dc8-422d5cb610e5\",\"queue_name\":\"default\",\"priority\":null,\"arguments\":[\"hoge\",1,2],\"executions\":0,\"locale\":\"en\"}],\"retry\":true,\"jid\":\"3bb26e03512662b6fd7fe709\",\"created_at\":1501514510.464975,\"enqueued_at\":1501514510.465015}"
このエントリーをはてなブックマークに追加