sidekiqのコードを読んだので備忘録。バージョンは5.0.5です
bin/sidekiqはSidekiq::CLI#parse, #runを叩きます。
cli = Sidekiq::CLI.instance
cli.parse
cli.run
parseメソッドではオプションをパースしたり、ロガー初期化したりバリデーションしたりデーモン化したりpidファイル書き込んだり…まぁ見ての通りです。
def parse(args=ARGV)
@code = nil
setup_options(args)
initialize_logger
validate!
daemonize
write_pid
end
runではシグナルトラップの設定をしています。シグナルを受け取ったらself_writeに書き込み、IO.selectを使ってself_readから読み込んでシグナルのハンドリングをします。最後にSidekiq::Launcher#runを呼び出します。
def run
boot_system
print_banner
self_read, self_write = IO.pipe
sigs = %w(INT TERM TTIN TSTP)
# USR1 and USR2 don't work on the JVM
if !jruby?
sigs << 'USR1'
sigs << 'USR2'
end
sigs.each do |sig|
begin
trap sig do
self_write.puts(sig)
end
rescue ArgumentError
puts "Signal #{sig} not supported"
end
end
...
require 'sidekiq/launcher'
@launcher = Sidekiq::Launcher.new(options)
begin
launcher.run
while readable_io = IO.select([self_read])
signal = readable_io.first[0].gets.strip
handle_signal(signal)
end
...
Sidekiq::Launcher#runは以下のようにSidekiq::Scheduled::Poller#startとSidekiq::Manager#startを実行します。Pollerはスケジュール実行やretry処理を行うスレッドで、Managerはworkerを管理するスレッドです。
module Sidekiq
class Launcher
include Util
attr_accessor :manager, :poller, :fetcher
def initialize(options)
@manager = Sidekiq::Manager.new(options)
@poller = Sidekiq::Scheduled::Poller.new
@done = false
@options = options
end
def run
@thread = safe_thread("heartbeat", &method(:start_heartbeat))
@poller.start
@manager.start
end
safe_threadはThreadを実行してエラー時にoptions[:error_handlers]に設定したエラーハンドリングを実行する処理です。
module Sidekiq
module Util
include ExceptionHandler
def watchdog(last_words)
yield
rescue Exception => ex
handle_exception(ex, { context: last_words })
raise ex
end
def safe_thread(name, &block)
Thread.new do
Thread.current['sidekiq_label'] = name
watchdog(name, &block)
end
end
Sidekiq::Manager#startはconcurrency数分のworker(Sidekiq::Processor)のインスタンスを作成し、それぞれのworkerに対してSidekiq::Processor#startを実行します。
module Sidekiq
class Manager
include Util
attr_reader :workers
attr_reader :options
def initialize(options={})
logger.debug { options.inspect }
@options = options
@count = options[:concurrency] || 25
raise ArgumentError, "Concurrency of #{@count} is not supported" if @count < 1
@done = false
@workers = Set.new
@count.times do
@workers << Processor.new(self)
end
@plock = Mutex.new
end
def start
@workers.each do |x|
x.start
end
end
Sidekiq::Processor#startはsafe_thread経由でrunをスレッド内で実行します。
module Sidekiq
class Processor
include Util
attr_reader :thread
attr_reader :job
def start
@thread ||= safe_thread("processor", &method(:run))
end
runではfetchメソッドでジョブを取ってきて、processでそのジョブの内容を実行します。
def run
begin
while !@done
process_one
end
@mgr.processor_stopped(self)
rescue Sidekiq::Shutdown
@mgr.processor_stopped(self)
rescue Exception => ex
@mgr.processor_died(self, ex)
end
end
def process_one
@job = fetch
process(@job) if @job
@job = nil
end
def get_one
begin
work = @strategy.retrieve_work
(logger.info { "Redis is online, #{Time.now - @down} sec downtime" }; @down = nil) if @down
work
rescue Sidekiq::Shutdown
rescue => ex
handle_fetch_exception(ex)
end
end
def fetch
j = get_one
if j && @done
j.requeue
nil
else
j
end
end
fetchメソッドからはget_oneを呼び出しており、get_oneでは@strategy.retrieve_workを呼び出します。デフォルトではSidekiq::BasicFetch#retrieve_workが呼び出されます。このメソッドではredisに対してBRPOPのコマンドを送ります。BRPOPは指定のキーにデータが入っていない場合はタイムアウト付きでブロックします。タイムアウト時はwork はnilになります。
module Sidekiq
class BasicFetch
def retrieve_work
work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd) }
UnitOfWork.new(*work) if work
end
queues_cmdはこんな感じで、queuesに設定したものをshuffleしてuniqを取っています。
def queues_cmd
if @strictly_ordered_queues
@queues
else
queues = @queues.shuffle.uniq
queues << TIMEOUT
queues
end
end
queuesはparseメソッドでセットされています
def parse_queues(opts, queues_and_weights)
queues_and_weights.each { |queue_and_weight| parse_queue(opts, *queue_and_weight) }
end
def parse_queue(opts, q, weight=nil)
[weight.to_i, 1].max.times do
(opts[:queues] ||= []) << q
end
opts[:strict] = false if weight.to_i > 0
end
例えば foo, 3 と bar, 1が設定されている場合はqueuesは [ ‘foo’, ‘foo’, ‘foo’, ‘bar’]
となります。この配列に対して#shuffle, #uniqが呼び出されると3/4の確率でfooが先頭で次点にbarが並びます。BRPOPでは先頭のキーから優先的にチェックするようになっているので、配列の比率で並び順をコントロールしているということになります。
processメソッドはdispatchを呼び出します。
def dispatch(job_hash, queue)
pristine = cloned(job_hash)
Sidekiq::Logging.with_job_hash_context(job_hash) do
@retrier.global(job_hash, queue) do
@logging.call(job_hash, queue) do
stats(pristine, queue) do
# Rails 5 requires a Reloader to wrap code execution. In order to
# constantize the worker and instantiate an instance, we have to call
# the Reloader. It handles code loading, db connection management, etc.
# Effectively this block denotes a "unit of work" to Rails.
@reloader.call do
klass = constantize(job_hash['class'.freeze])
worker = klass.new
worker.jid = job_hash['jid'.freeze]
@retrier.local(worker, job_hash, queue) do
yield worker
end
end
end
end
end
end
end
job_hashはこんな感じ
{
"class": "HogeJob",
"args": [
123
],
"retry": true,
"queue": "default",
"jid": "a70cc43ec6e490e2d3d31552",
"created_at": 1509374692.256918,
"enqueued_at": 1509374692.257654
}
ActiveJobだとこんな感じです。JobWrapper経由でActiveJobのクラスが実行されるので階層が深いです。
{
"class": "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper",
"wrapped": "HogeJob",
"queue": "default",
"args": [
{
"job_class": "HogeJob",
"job_id": "15db0f2a-0614-4435-bac4-40cf8cb166ef",
"provider_job_id": null,
"queue_name": "default",
"priority": null,
"arguments": [
123
],
"executions": 0,
"locale": "ja"
}
],
"retry": true,
"jid": "45489c0845e5868c5f837a66",
"created_at": 1509374502.2748291,
"enqueued_at": 1509374502.275716
}
最後にexecute_jobを呼び出します。classのキーの文字列がクラス化されてperformが実行されます。
def execute_job(worker, cloned_args)
worker.perform(*cloned_args)
end
Poller
Sidekiq::Scheduled::PollerはScheduled Jobsやリトライを実行するためのワーカーです。startメソッドを呼ぶとスレッドを作ってその中でenqueue => waitのループに入ります。module Sidekiq
module Scheduled
class Poller
include Util
INITIAL_WAIT = 10
def initialize
@enq = (Sidekiq.options[:scheduled_enq] || Sidekiq::Scheduled::Enq).new
@sleeper = ConnectionPool::TimedStack.new
@done = false
@thread = nil
end
def start
@thread ||= safe_thread("scheduler") do
initial_wait
while !@done
enqueue
wait
end
Sidekiq.logger.info("Scheduler exiting...")
end
end
enqueueはSidekiq::Scheduled::Enq#enqueue_jobsを呼び出します。Enq#enqueue_jobsではzrangebyscoreでschedule, retryキューから実行時刻が過去で、現時点で実行されていないジョブをSidekiqのキューに入れています。zremで要素を削除しつつエンキューしています。
module Sidekiq
module Scheduled
SETS = %w(retry schedule)
class Enq
def enqueue_jobs(now=Time.now.to_f.to_s, sorted_sets=SETS)
Sidekiq.redis do |conn|
sorted_sets.each do |sorted_set|
while job = conn.zrangebyscore(sorted_set, '-inf'.freeze, now, :limit => [0, 1]).first do
if conn.zrem(sorted_set, job)
Sidekiq::Client.push(Sidekiq.load_json(job))
Sidekiq::Logging.logger.debug { "enqueued #{sorted_set}: #{job}" }
end
end
end
end
end
end
waitの仕組みはsleepを直接使うのではなく、ConnectionPool::TimedStack#popを使っています。このメソッドを呼ぶと、引数に指定した秒数をタイムアウト値としてConnectionPool::TimedStack#pushされるまでwaitがかかります。終了時以外はpushされることはないのでrandom_poll_intervalの数値の秒数だけwaitがかかることになります。
def wait
@sleeper.pop(random_poll_interval)
rescue Timeout::Error
# expected
rescue => ex
# if poll_interval_average hasn't been calculated yet, we can
# raise an error trying to reach Redis.
logger.error ex.message
logger.error ex.backtrace.first
sleep 5
end
終了時はpushをすることによって、waitを終わらせてすみやかにスレッドが終了するようになっています。
# Shut down this instance, will pause until the thread is dead.
def terminate
@done = true
if @thread
t = @thread
@thread = nil
@sleeper << 0
t.value
end
end
Retryの仕組み
execute_job(ジョブの実行メソッド)はSidekiq::JobRetry#global, #localのブロック内で実行されます。execute_job内でエラーが発生した場合は、rescueで補足されます。def local(worker, msg, queue)
yield
rescue Skip => ex
raise ex
rescue Sidekiq::Shutdown => ey
# ignore, will be pushed back onto queue during hard_shutdown
raise ey
rescue Exception => e
# ignore, will be pushed back onto queue during hard_shutdown
raise Sidekiq::Shutdown if exception_caused_by_shutdown?(e)
if msg['retry'] == nil
msg['retry'] = worker.class.get_sidekiq_options['retry']
end
raise e unless msg['retry']
attempt_retry(worker, msg, queue, e)
# We've handled this error associated with this job, don't
# need to handle it at the global level
raise Skip
end
attempt_retryメソッドではretry_countやbacktraceの設定をしつつretryのキューにジョブをエンキューします。retryのキューはPollerによってデキューされ、通常のジョブキューにエンキューされます。max_retry_attemptsを超えている場合はリトライ処理はせずretries_exhaustedが呼び出されます。
if count < max_retry_attempts
delay = delay_for(worker, count, exception)
logger.debug { "Failure! Retry #{count} in #{delay} seconds" }
retry_at = Time.now.to_f + delay
payload = Sidekiq.dump_json(msg)
Sidekiq.redis do |conn|
conn.zadd('retry', retry_at.to_s, payload)
end
else
# Goodbye dear message, you (re)tried your best I'm sure.
retries_exhausted(worker, msg, exception)
end