ActiveJobの仕組みをざっくり読み解いてみました。Railsのバージョンは 5.1.2です。
ActiveJobの使い方をざっくり
configに使うバックエンドキューシステムのアダプタを指定してconfig.active_job.queue_adapter = :sidekiq
ActiveJob::Baseを継承したクラスを書いて(Rails5系だとApplicationJobを継承)
class HogeJob < ActiveJob::Base
queue_as :default
def perform(*args)
puts args
end
end
perform_laterすると良い感じにキューイングしてくれます。
HogeJob.perform_later "hoge", 1, true
あとは各アダプタのワーカーをデーモンなりで常駐させて処理させればOKです。
コードリーディング
ActiveJob::BaseはActiveJob::Enqueuingをincludeしていて、ActiveJob::Enqueuingにクラスメソッドとしてperform_laterメソッドが定義されています。module ActiveJob
module Enqueuing
extend ActiveSupport::Concern
...
module ClassMethods
def perform_later(*args)
job_or_instantiate(*args).enqueue
end
private
def job_or_instantiate(*args) # :doc:
args.first.is_a?(self) ? args.first : new(*args)
end
end
end
end
perform_laterの引数に何も指定しなければ定義したActiveJobがインスタンス化され、enqueueメソッドが呼び出されます。
...
def enqueue(options = {})
self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait]
self.scheduled_at = options[:wait_until].to_f if options[:wait_until]
self.queue_name = self.class.queue_name_from_part(options[:queue]) if options[:queue]
self.priority = options[:priority].to_i if options[:priority]
run_callbacks :enqueue do
if scheduled_at
self.class.queue_adapter.enqueue_at self, scheduled_at
else
self.class.queue_adapter.enqueue self
end
end
self
end
end
end
enqueueメソッドではqueue_adapterのenqueueメソッドが呼び出されます。queue_adapterはconfig.active_job.queue_adapterで指定した名前から生成されるアダプタクラスになります。:sidekiqを指定するとActiveJob::QueueAdapters::SidekiqAdapterが生成されます。
module ActiveJob
...
def queue_adapter=(name_or_adapter_or_class)
self._queue_adapter = interpret_adapter(name_or_adapter_or_class)
end
private
def interpret_adapter(name_or_adapter_or_class)
case name_or_adapter_or_class
when Symbol, String
ActiveJob::QueueAdapters.lookup(name_or_adapter_or_class).new
else
if queue_adapter?(name_or_adapter_or_class)
name_or_adapter_or_class
else
raise ArgumentError
end
end
end
...
ActiveJob::QueueAdapters#lookupではconst_getを使ってXXXAdapterという名前の定数(=クラス)を取得しています。
module ActiveJob
module QueueAdapters
...
ADAPTER = "Adapter".freeze
private_constant :ADAPTER
class << self
def lookup(name)
const_get(name.to_s.camelize << ADAPTER)
end
end
end
end
Sidekiqのアダプタは以下のような実装になっており、enqueueメソッドは内部的にはSidekiq::Client.pushするだけです
require "sidekiq"
module ActiveJob
module QueueAdapters
class SidekiqAdapter
def enqueue(job) #:nodoc:
#Sidekiq::Client does not support symbols as keys
job.provider_job_id = Sidekiq::Client.push \
"class" => JobWrapper,
"wrapped" => job.class.to_s,
"queue" => job.queue_name,
"args" => [ job.serialize ]
end
...
class JobWrapper #:nodoc:
include Sidekiq::Worker
def perform(job_data)
Base.execute job_data.merge("provider_job_id" => jid)
end
end
end
end
end
JobWrapper#performはActiveJob#execute(ActiveJob::Executionのメソッド)を呼び出します。job_dataを引数にdeserializeすると、ActiveJob::Baseを継承したJobのクラスがインスタンス化されます。その後、各Jobのperform_nowメソッドを呼び出され、最終的に独自に定義するperformメソッドが呼び出されます。その名の通りJobWrapperは、引数としてカスタムJobクラスの情報を渡すことで、カスタムJobの処理を実行するクラスになります。
module ActiveJob
module Execution
extend ActiveSupport::Concern
include ActiveSupport::Rescuable
# Includes methods for executing and performing jobs instantly.
module ClassMethods
def perform_now(*args)
job_or_instantiate(*args).perform_now
end
def execute(job_data) #:nodoc:
ActiveJob::Callbacks.run_callbacks(:execute) do
job = deserialize(job_data)
job.perform_now
end
end
end
def perform_now
deserialize_arguments_if_needed
run_callbacks :perform do
# Guard against jobs that were persisted before we started counting executions by zeroing out nil counters
self.executions = (executions || 0) + 1
perform(*arguments)
end
rescue => exception
rescue_with_handler(exception) || raise
end
def perform(*)
fail NotImplementedError
end
end
end
ちなみにjob_dataには以下のようなハッシュが入ります( “args” => [ job.serialize ] のところ)
def serialize
{
"job_class" => self.class.name,
"job_id" => job_id,
"queue_name" => queue_name,
"priority" => priority,
"arguments" => serialize_arguments(arguments),
"executions" => executions,
"locale" => I18n.locale.to_s
}
end
Sidekiq/Redisのデータを見てみる
Sidekiqの場合、格納先のRedisのデータを見てみるとこんな感じで、JSONでシリアライズされているのがわかります。$ redis-cli
127.0.0.1:6379> lrange queue:default 0 0
"{\"class\":\"ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper\",\"wrapped\":\"HogeJob\",\"queue\":\"default\",\"args\":[{\"job_class\":\"HogeJob\",\"job_id\":\"f801560f-cd28-497e-8dc8-422d5cb610e5\",\"queue_name\":\"default\",\"priority\":null,\"arguments\":[\"hoge\",1,2],\"executions\":0,\"locale\":\"en\"}],\"retry\":true,\"jid\":\"3bb26e03512662b6fd7fe709\",\"created_at\":1501514510.464975,\"enqueued_at\":1501514510.465015}"