2017-10-30

unicornコードリーディング

unicornを読んだので備忘録。非公式ですがミラーのリポジトリは defunkt/unicornです。

bin/unicornでは引数をパースしつつ、Unicorn.builderでRackアプリを返すLambdaを作成し、そのLambdaを引数にUnicorn::HTTPServer#start, #joinを実行します。

app = Unicorn.builder(ARGV[0] || 'config.ru', op)
op = nil
# ...
Unicorn::Launcher.daemonize!(options) if rackup_opts[:daemonize]
Unicorn::HttpServer.new(app, options).start.join

config.ruが指定されている場合は、inner_appにはconfig.ruの内容でビルドしたRackアプリがセットされます。

module Unicorn
  def self.builder(ru, op)
#...
    lambda do ||
      inner_app = case ru
      when /\.ru$/
        raw = File.read(ru)
        raw.sub!(/^__END__\n.*/, '')
        eval("Rack::Builder.new {(\n#{raw}\n)}.to_app", TOPLEVEL_BINDING, ru)
      else
        require ru
        Object.const_get(File.basename(ru, '.rb').capitalize)
      end

      pp({ :inner_app => inner_app }) if $DEBUG

      return inner_app if no_default_middleware

      middleware = { # order matters
        ContentLength: nil,
        Chunked: nil,
        CommonLogger: [ $stderr ],
        ShowExceptions: nil,
        Lint: nil,
        TempfileReaper: nil,
      }

      case ENV["RACK_ENV"]
      when "development"
      when "deployment"
        middleware.delete(:ShowExceptions)
        middleware.delete(:Lint)
      else
        return inner_app
      end
      Rack::Builder.new do
        middleware.each do |m, args|
          use(Rack.const_get(m), *args) if Rack.const_defined?(m)
        end
        run inner_app
      end.to_app
    end
  end

最終的にmiddleware => inner_appという順番で実行されるRackアプリが作成されるLambdaを返すことになります。RackアプリではなくLambdaを返している意図としては遅延評価をするためで、Lambda作成時点でRailsアプリを読み込むのではなくLambdaをcallした時点で初めてRailsアプリが読み込まれます。

Unicorn::HTTPServer#startのself.pidではpidファイルにプロセスIDを書き込んでいます。

class Unicorn::HttpServer
# ...
  attr_reader :pid, :logger
  include Unicorn::SocketHelper
  include Unicorn::HttpResponse

  LISTENERS = []

  NEW_LISTENERS = []

  def start
    inherit_listeners!

    @self_pipe.replace(Unicorn.pipe)
    @master_pid = @worker_data ? Process.ppid : $$

    @queue_sigs.each { |sig| trap(sig) { @sig_queue << sig; awaken_master } }
    trap(:CHLD) { awaken_master }

    self.pid = config[:pid]

    build_app! if preload_app
    bind_new_listeners!

    spawn_missing_workers
    self
  end

preload_appがtrueの場合はbuild_app!が実行されます。build_app!はLambda#callを呼んでおり、Railsアプリを読み込みます。この時点ではマスタープロセスが実行しているので、子プロセス生成時には子プロセスでRailsアプリをロードすることなく即座に子プロセスで処理を開始することができます。

def build_app!
  if app.respond_to?(:arity) && app.arity == 0
    if defined?(Gem) && Gem.respond_to?(:refresh)
      logger.info "Refreshing Gem list"
      Gem.refresh
    end
    self.app = app.call
  end
end

bind_new_listeners!メソッドでは指定したアドレス(UNIXドメインソケット or TCPソケット)で待ち受けます。

# call only after calling inherit_listeners!
# This binds any listeners we did NOT inherit from the parent
def bind_new_listeners!
  NEW_LISTENERS.each { |addr| listen(addr) }.clear
  raise ArgumentError, "no listeners" if LISTENERS.empty?
end

def listen(address, opt = {}.merge(listener_opts[address] || {}))
  address = config.expand_addr(address)
  return if String === address && listener_names.include?(address)

  delay = opt[:delay] || 0.5
  tries = opt[:tries] || 5
  begin
    io = bind_listen(address, opt)
    unless Kgio::TCPServer === io || Kgio::UNIXServer === io
      io.autoclose = false
      io = server_cast(io)
    end
    logger.info "listening on addr=#{sock_name(io)} fd=#{io.fileno}"
    LISTENERS << io
    io
# ...

spawn_missing_workersでWorkerを作成します。このプロセスがHTTPリクエストをハンドリングすることになります。

def spawn_missing_workers
#...

  worker_nr = -1
  until (worker_nr += 1) == @worker_processes
    @workers.value?(worker_nr) and next
    worker = Unicorn::Worker.new(worker_nr)
    before_fork.call(self, worker)

    pid = @worker_exec ? worker_spawn(worker) : fork

    unless pid
      after_fork_internal
      worker_loop(worker)
      exit
    end

    @workers[pid] = worker
    worker.atfork_parent
  end
  rescue => e
    @logger.error(e) rescue nil
    exit!
end

まず、指定の数だけWorkerを増やします。worker_spawnやforkで子プロセスが生成される前にbefore_forkが呼び出されます。unless pidの部分は子プロセスはpidがnilになるので、子プロセスのみがunlessの中を実行することになり、after_forkとworker_loopが実行されます。親プロセスではpidをキーにUnicorn::Workerのインスタンスを保持します。

worker_loopでは初期化をしつつHTTPリクエストのハンドリングを行っています。

def worker_loop(worker)
  ppid = @master_pid
  readers = init_worker_process(worker)
  nr = 0 # this becomes negative if we need to reopen logs

#...

  ready = readers.dup
  @after_worker_ready.call(self, worker)

  begin
    nr < 0 and reopen_worker_logs(worker.nr)
    nr = 0
    worker.tick = time_now.to_i
    tmp = ready.dup
    while sock = tmp.shift
      # Unicorn::Worker#kgio_tryaccept is not like accept(2) at all,
      # but that will return false
      if client = sock.kgio_tryaccept
        process_client(client)
        nr += 1
        worker.tick = time_now.to_i
      end
      break if nr < 0
    end

#...

    worker.tick = time_now.to_i
    ret = IO.select(readers, nil, nil, @timeout) and ready = ret[0]
  rescue => e
    redo if nr < 0 && readers[0]
    Unicorn.log_error(@logger, "listen loop error", e) if readers[0]
  end while readers[0]
end

readersにはUnicornのソケットとWorkerのインスタンスが入ります。Workerのインスタンスはto_ioメソッドを実装しており、IO.selectの引数に入れることができます。これによってワーカがマスタープロセスからシグナルを受信できるようにしています。

IO.selectを使っているためThundering Herdが生じます。Unicornのリファレンスではこれに関して以下のように記述しています。

Since non-blocking accept() is used, there can be a thundering herd when an occasional client 
connects when application *is not busy*. The thundering herd problem should not affect 
applications that are running all the time since worker processes will only select()/accept() 
outside of the application dispatch.

Additionally, thundering herds are much smaller than with configurations using existing 
prefork servers. Process counts should only be scaled to backend resources, 
never to the number of expected clients like is typical with blocking prefork servers. 
So while we've seen instances of popular prefork servers configured to run 
many hundreds of worker processes, Unicorn deployments are typically only 2-4 processes per-core.

要約するとこんな感じです

ちなみに、Chris's Wiki :: blog/unix/AcceptDoesNotThunderの記事だと256プロセスでも20〜60ms程度のオーバーヘッドと書いてあったりします。

process_clientではHTTPのハンドリングをしています。HTTPリクエストのパースはRagelを使ってC拡張を書いていて、Unicorn::HttpRequest#readのところで使っています。それ以外の部分はRackの仕様に従ってハンドリングをしています。

def process_client(client)
  status, headers, body = @app.call(env = @request.read(client))

  begin
    return if @request.hijacked?

    if 100 == status.to_i
      e100_response_write(client, env)
      status, headers, body = @app.call(env)
      return if @request.hijacked?
    end
    @request.headers? or headers = nil
    http_response_write(client, status, headers, body,
                        @request.response_start_sent)
  ensure
    body.respond_to?(:close) and body.close
  end

  unless client.closed? # rack.hijack may've close this for us
    client.shutdown # in case of fork() in Rack app
    client.close # flush and uncork socket immediately, no keepalive
  end
rescue => e
  handle_error(client, e)
end

デーモン化はUnicorn::Launcher.daemonize!でダブルフォークの方法でデーモン化しています。

module Unicorn::Launcher
  def self.daemonize!(options)
    cfg = Unicorn::Configurator
    $stdin.reopen("/dev/null")

    # We only start a new process group if we're not being reexecuted
    # and inheriting file descriptors from our parent
    unless ENV['UNICORN_FD']
      # grandparent - reads pipe, exits when master is ready
      #  \_ parent  - exits immediately ASAP
      #      \_ unicorn master - writes to pipe when ready

      rd, wr = IO.pipe
      grandparent = $$
      if fork
        wr.close # grandparent does not write
      else
        rd.close # unicorn master does not read
        Process.setsid
        exit if fork # parent dies now
      end

IO.pipeを使っているのはgrandparentプロセスのライフサイクルコントロールのためで、joinメソッド内でパイプを通じてgrandparentプロセスにデータが送られたのを契機にプロセスがexitするようになっています。

このエントリーをはてなブックマークに追加