2017-11-07

sidekiqコードリーディング

sidekiqのコードを読んだので備忘録。バージョンは5.0.5です

bin/sidekiqはSidekiq::CLI#parse, #runを叩きます。

cli = Sidekiq::CLI.instance
cli.parse
cli.run

parseメソッドではオプションをパースしたり、ロガー初期化したりバリデーションしたりデーモン化したりpidファイル書き込んだり…まぁ見ての通りです。

def parse(args=ARGV)
  @code = nil

  setup_options(args)
  initialize_logger
  validate!
  daemonize
  write_pid
end

runではシグナルトラップの設定をしています。シグナルを受け取ったらself_writeに書き込み、IO.selectを使ってself_readから読み込んでシグナルのハンドリングをします。最後にSidekiq::Launcher#runを呼び出します。

def run
  boot_system
  print_banner

  self_read, self_write = IO.pipe
  sigs = %w(INT TERM TTIN TSTP)
  # USR1 and USR2 don't work on the JVM
  if !jruby?
    sigs << 'USR1'
    sigs << 'USR2'
  end

  sigs.each do |sig|
    begin
      trap sig do
        self_write.puts(sig)
      end
    rescue ArgumentError
      puts "Signal #{sig} not supported"
    end
  end

...
  require 'sidekiq/launcher'
  @launcher = Sidekiq::Launcher.new(options)

  begin
    launcher.run

    while readable_io = IO.select([self_read])
      signal = readable_io.first[0].gets.strip
      handle_signal(signal)
    end
...

Sidekiq::Launcher#runは以下のようにSidekiq::Scheduled::Poller#startとSidekiq::Manager#startを実行します。Pollerはスケジュール実行やretry処理を行うスレッドで、Managerはworkerを管理するスレッドです。

module Sidekiq
  class Launcher
    include Util

    attr_accessor :manager, :poller, :fetcher

    def initialize(options)
      @manager = Sidekiq::Manager.new(options)
      @poller = Sidekiq::Scheduled::Poller.new
      @done = false
      @options = options
    end

    def run
      @thread = safe_thread("heartbeat", &method(:start_heartbeat))
      @poller.start
      @manager.start
    end

safe_threadはThreadを実行してエラー時にoptions[:error_handlers]に設定したエラーハンドリングを実行する処理です。

module Sidekiq
  module Util
    include ExceptionHandler

    def watchdog(last_words)
      yield
    rescue Exception => ex
      handle_exception(ex, { context: last_words })
      raise ex
    end

    def safe_thread(name, &block)
      Thread.new do
        Thread.current['sidekiq_label'] = name
        watchdog(name, &block)
      end
    end

Sidekiq::Manager#startはconcurrency数分のworker(Sidekiq::Processor)のインスタンスを作成し、それぞれのworkerに対してSidekiq::Processor#startを実行します。

module Sidekiq
  class Manager
    include Util

    attr_reader :workers
    attr_reader :options

    def initialize(options={})
      logger.debug { options.inspect }
      @options = options
      @count = options[:concurrency] || 25
      raise ArgumentError, "Concurrency of #{@count} is not supported" if @count < 1

      @done = false
      @workers = Set.new
      @count.times do
        @workers << Processor.new(self)
      end
      @plock = Mutex.new
    end

    def start
      @workers.each do |x|
        x.start
      end
    end

Sidekiq::Processor#startはsafe_thread経由でrunをスレッド内で実行します。

module Sidekiq
  class Processor

    include Util

    attr_reader :thread
    attr_reader :job

    def start
      @thread ||= safe_thread("processor", &method(:run))
    end

runではfetchメソッドでジョブを取ってきて、processでそのジョブの内容を実行します。

def run
  begin
    while !@done
      process_one
    end
    @mgr.processor_stopped(self)
  rescue Sidekiq::Shutdown
    @mgr.processor_stopped(self)
  rescue Exception => ex
    @mgr.processor_died(self, ex)
  end
end

def process_one
  @job = fetch
  process(@job) if @job
  @job = nil
end

def get_one
  begin
    work = @strategy.retrieve_work
    (logger.info { "Redis is online, #{Time.now - @down} sec downtime" }; @down = nil) if @down
    work
  rescue Sidekiq::Shutdown
  rescue => ex
    handle_fetch_exception(ex)
  end
end

def fetch
  j = get_one
  if j && @done
    j.requeue
    nil
  else
    j
  end
end

fetchメソッドからはget_oneを呼び出しており、get_oneでは@strategy.retrieve_workを呼び出します。デフォルトではSidekiq::BasicFetch#retrieve_workが呼び出されます。このメソッドではredisに対してBRPOPのコマンドを送ります。BRPOPは指定のキーにデータが入っていない場合はタイムアウト付きでブロックします。タイムアウト時はwork はnilになります。

module Sidekiq
  class BasicFetch
    def retrieve_work
      work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd) }
      UnitOfWork.new(*work) if work
    end

queues_cmdはこんな感じで、queuesに設定したものをshuffleしてuniqを取っています。

def queues_cmd
  if @strictly_ordered_queues
    @queues
  else
    queues = @queues.shuffle.uniq
    queues << TIMEOUT
    queues
  end
end

queuesはparseメソッドでセットされています

def parse_queues(opts, queues_and_weights)
  queues_and_weights.each { |queue_and_weight| parse_queue(opts, *queue_and_weight) }
end

def parse_queue(opts, q, weight=nil)
  [weight.to_i, 1].max.times do
   (opts[:queues] ||= []) << q
  end
  opts[:strict] = false if weight.to_i > 0
end

例えば foo, 3 と bar, 1が設定されている場合はqueuesは [ 'foo', 'foo', 'foo', 'bar']となります。この配列に対して#shuffle, #uniqが呼び出されると3/4の確率でfooが先頭で次点にbarが並びます。BRPOPでは先頭のキーから優先的にチェックするようになっているので、配列の比率で並び順をコントロールしているということになります。

processメソッドはdispatchを呼び出します。

def dispatch(job_hash, queue)
  pristine = cloned(job_hash)

  Sidekiq::Logging.with_job_hash_context(job_hash) do
    @retrier.global(job_hash, queue) do
      @logging.call(job_hash, queue) do
        stats(pristine, queue) do
          # Rails 5 requires a Reloader to wrap code execution.  In order to
          # constantize the worker and instantiate an instance, we have to call
          # the Reloader.  It handles code loading, db connection management, etc.
          # Effectively this block denotes a "unit of work" to Rails.
          @reloader.call do
            klass  = constantize(job_hash['class'.freeze])
            worker = klass.new
            worker.jid = job_hash['jid'.freeze]
            @retrier.local(worker, job_hash, queue) do
              yield worker
            end
          end
        end
      end
    end
  end
end

job_hashはこんな感じ

{
    "class": "HogeJob",
    "args": [
        123
    ],
    "retry": true,
    "queue": "default",
    "jid": "a70cc43ec6e490e2d3d31552",
    "created_at": 1509374692.256918,
    "enqueued_at": 1509374692.257654
}

ActiveJobだとこんな感じです。JobWrapper経由でActiveJobのクラスが実行されるので階層が深いです。

{
    "class": "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper",
    "wrapped": "HogeJob",
    "queue": "default",
    "args": [
        {
            "job_class": "HogeJob",
            "job_id": "15db0f2a-0614-4435-bac4-40cf8cb166ef",
            "provider_job_id": null,
            "queue_name": "default",
            "priority": null,
            "arguments": [
                123
            ],
            "executions": 0,
            "locale": "ja"
        }
    ],
    "retry": true,
    "jid": "45489c0845e5868c5f837a66",
    "created_at": 1509374502.2748291,
    "enqueued_at": 1509374502.275716
}

最後にexecute_jobを呼び出します。classのキーの文字列がクラス化されてperformが実行されます。

def execute_job(worker, cloned_args)
  worker.perform(*cloned_args)
end

Poller

Sidekiq::Scheduled::PollerはScheduled Jobsやリトライを実行するためのワーカーです。startメソッドを呼ぶとスレッドを作ってその中でenqueue => waitのループに入ります。
module Sidekiq
  module Scheduled
    class Poller
      include Util

      INITIAL_WAIT = 10

      def initialize
        @enq = (Sidekiq.options[:scheduled_enq] || Sidekiq::Scheduled::Enq).new
        @sleeper = ConnectionPool::TimedStack.new
        @done = false
        @thread = nil
      end

      def start
        @thread ||= safe_thread("scheduler") do
          initial_wait

          while !@done
            enqueue
            wait
          end
          Sidekiq.logger.info("Scheduler exiting...")
        end
      end

enqueueはSidekiq::Scheduled::Enq#enqueue_jobsを呼び出します。Enq#enqueue_jobsではzrangebyscoreでschedule, retryキューから実行時刻が過去で、現時点で実行されていないジョブをSidekiqのキューに入れています。zremで要素を削除しつつエンキューしています。

module Sidekiq
  module Scheduled
    SETS = %w(retry schedule)

    class Enq
      def enqueue_jobs(now=Time.now.to_f.to_s, sorted_sets=SETS)
        Sidekiq.redis do |conn|
          sorted_sets.each do |sorted_set|
            while job = conn.zrangebyscore(sorted_set, '-inf'.freeze, now, :limit => [0, 1]).first do

              if conn.zrem(sorted_set, job)
                Sidekiq::Client.push(Sidekiq.load_json(job))
                Sidekiq::Logging.logger.debug { "enqueued #{sorted_set}: #{job}" }
              end
            end
          end
        end
      end
    end

waitの仕組みはsleepを直接使うのではなく、ConnectionPool::TimedStack#popを使っています。このメソッドを呼ぶと、引数に指定した秒数をタイムアウト値としてConnectionPool::TimedStack#pushされるまでwaitがかかります。終了時以外はpushされることはないのでrandom_poll_intervalの数値の秒数だけwaitがかかることになります。

def wait
  @sleeper.pop(random_poll_interval)
rescue Timeout::Error
  # expected
rescue => ex
  # if poll_interval_average hasn't been calculated yet, we can
  # raise an error trying to reach Redis.
  logger.error ex.message
  logger.error ex.backtrace.first
  sleep 5
end

終了時はpushをすることによって、waitを終わらせてすみやかにスレッドが終了するようになっています。

# Shut down this instance, will pause until the thread is dead.
def terminate
  @done = true
  if @thread
    t = @thread
    @thread = nil
    @sleeper << 0
    t.value
  end
end

Retryの仕組み

execute_job(ジョブの実行メソッド)はSidekiq::JobRetry#global, #localのブロック内で実行されます。execute_job内でエラーが発生した場合は、rescueで補足されます。
def local(worker, msg, queue)
  yield
rescue Skip => ex
  raise ex
rescue Sidekiq::Shutdown => ey
  # ignore, will be pushed back onto queue during hard_shutdown
  raise ey
rescue Exception => e
  # ignore, will be pushed back onto queue during hard_shutdown
  raise Sidekiq::Shutdown if exception_caused_by_shutdown?(e)

  if msg['retry'] == nil
    msg['retry'] = worker.class.get_sidekiq_options['retry']
  end

  raise e unless msg['retry']
  attempt_retry(worker, msg, queue, e)
  # We've handled this error associated with this job, don't
  # need to handle it at the global level
  raise Skip
end

attempt_retryメソッドではretry_countやbacktraceの設定をしつつretryのキューにジョブをエンキューします。retryのキューはPollerによってデキューされ、通常のジョブキューにエンキューされます。max_retry_attemptsを超えている場合はリトライ処理はせずretries_exhaustedが呼び出されます。

if count < max_retry_attempts
  delay = delay_for(worker, count, exception)
  logger.debug { "Failure! Retry #{count} in #{delay} seconds" }
  retry_at = Time.now.to_f + delay
  payload = Sidekiq.dump_json(msg)
  Sidekiq.redis do |conn|
    conn.zadd('retry', retry_at.to_s, payload)
  end
else
  # Goodbye dear message, you (re)tried your best I'm sure.
  retries_exhausted(worker, msg, exception)
end
このエントリーをはてなブックマークに追加