sidekiqのコードを読んだので備忘録。バージョンは5.0.5です

bin/sidekiqはSidekiq::CLI#parse, #runを叩きます。

parseメソッドではオプションをパースしたり、ロガー初期化したりバリデーションしたりデーモン化したりpidファイル書き込んだり…まぁ見ての通りです。

runではシグナルトラップの設定をしています。シグナルを受け取ったらself_writeに書き込み、IO.selectを使ってself_readから読み込んでシグナルのハンドリングをします。最後にSidekiq::Launcher#runを呼び出します。

Sidekiq::Launcher#runは以下のようにSidekiq::Scheduled::Poller#startとSidekiq::Manager#startを実行します。Pollerはスケジュール実行やretry処理を行うスレッドで、Managerはworkerを管理するスレッドです。

safe_threadはThreadを実行してエラー時にoptions[:error_handlers]に設定したエラーハンドリングを実行する処理です。

Sidekiq::Manager#startはconcurrency数分のworker(Sidekiq::Processor)のインスタンスを作成し、それぞれのworkerに対してSidekiq::Processor#startを実行します。

Sidekiq::Processor#startはsafe_thread経由でrunをスレッド内で実行します。

runではfetchメソッドでジョブを取ってきて、processでそのジョブの内容を実行します。

fetchメソッドからはget_oneを呼び出しており、get_oneでは@strategy.retrieve_workを呼び出します。デフォルトではSidekiq::BasicFetch#retrieve_workが呼び出されます。このメソッドではredisに対してBRPOPのコマンドを送ります。BRPOPは指定のキーにデータが入っていない場合はタイムアウト付きでブロックします。タイムアウト時はwork はnilになります。

queues_cmdはこんな感じで、queuesに設定したものをshuffleしてuniqを取っています。

queuesはparseメソッドでセットされています

例えば foo, 3 と bar, 1が設定されている場合はqueuesは [ 'foo', 'foo', 'foo', 'bar']となります。この配列に対して#shuffle, #uniqが呼び出されると3/4の確率でfooが先頭で次点にbarが並びます。BRPOPでは先頭のキーから優先的にチェックするようになっているので、配列の比率で並び順をコントロールしているということになります。

processメソッドはdispatchを呼び出します。

job_hashはこんな感じ

ActiveJobだとこんな感じです。JobWrapper経由でActiveJobのクラスが実行されるので階層が深いです。

最後にexecute_jobを呼び出します。classのキーの文字列がクラス化されてperformが実行されます。

Poller

Sidekiq::Scheduled::PollerはScheduled Jobsやリトライを実行するためのワーカーです。startメソッドを呼ぶとスレッドを作ってその中でenqueue => waitのループに入ります。

enqueueはSidekiq::Scheduled::Enq#enqueue_jobsを呼び出します。Enq#enqueue_jobsではzrangebyscoreでschedule, retryキューから実行時刻が過去で、現時点で実行されていないジョブをSidekiqのキューに入れています。zremで要素を削除しつつエンキューしています。

waitの仕組みはsleepを直接使うのではなく、ConnectionPool::TimedStack#popを使っています。このメソッドを呼ぶと、引数に指定した秒数をタイムアウト値としてConnectionPool::TimedStack#pushされるまでwaitがかかります。終了時以外はpushされることはないのでrandom_poll_intervalの数値の秒数だけwaitがかかることになります。

終了時はpushをすることによって、waitを終わらせてすみやかにスレッドが終了するようになっています。

Retryの仕組み

execute_job(ジョブの実行メソッド)はSidekiq::JobRetry#global, #localのブロック内で実行されます。execute_job内でエラーが発生した場合は、rescueで補足されます。

attempt_retryメソッドではretry_countやbacktraceの設定をしつつretryのキューにジョブをエンキューします。retryのキューはPollerによってデキューされ、通常のジョブキューにエンキューされます。max_retry_attemptsを超えている場合はリトライ処理はせずretries_exhaustedが呼び出されます。