unicornを読んだので備忘録。非公式ですがミラーのリポジトリは defunkt/unicornです。
bin/unicornでは引数をパースしつつ、Unicorn.builderでRackアプリを返すLambdaを作成し、そのLambdaを引数にUnicorn::HTTPServer#start, #joinを実行します。
1 2 3 4 5 |
app = Unicorn.builder(ARGV[0] || 'config.ru', op) op = nil # ... Unicorn::Launcher.daemonize!(options) if rackup_opts[:daemonize] Unicorn::HttpServer.new(app, options).start.join |
config.ruが指定されている場合は、inner_appにはconfig.ruの内容でビルドしたRackアプリがセットされます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
module Unicorn def self.builder(ru, op) #... lambda do || inner_app = case ru when /\.ru$/ raw = File.read(ru) raw.sub!(/^__END__\n.*/, '') eval("Rack::Builder.new {(\n#{raw}\n)}.to_app", TOPLEVEL_BINDING, ru) else require ru Object.const_get(File.basename(ru, '.rb').capitalize) end pp({ :inner_app => inner_app }) if $DEBUG return inner_app if no_default_middleware middleware = { # order matters ContentLength: nil, Chunked: nil, CommonLogger: [ $stderr ], ShowExceptions: nil, Lint: nil, TempfileReaper: nil, } case ENV["RACK_ENV"] when "development" when "deployment" middleware.delete(:ShowExceptions) middleware.delete(:Lint) else return inner_app end Rack::Builder.new do middleware.each do |m, args| use(Rack.const_get(m), *args) if Rack.const_defined?(m) end run inner_app end.to_app end end |
最終的にmiddleware => inner_appという順番で実行されるRackアプリが作成されるLambdaを返すことになります。RackアプリではなくLambdaを返している意図としては遅延評価をするためで、Lambda作成時点でRailsアプリを読み込むのではなくLambdaをcallした時点で初めてRailsアプリが読み込まれます。
Unicorn::HTTPServer#startのself.pidではpidファイルにプロセスIDを書き込んでいます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
class Unicorn::HttpServer # ... attr_reader :pid, :logger include Unicorn::SocketHelper include Unicorn::HttpResponse LISTENERS = [] NEW_LISTENERS = [] def start inherit_listeners! @self_pipe.replace(Unicorn.pipe) @master_pid = @worker_data ? Process.ppid : $$ @queue_sigs.each { |sig| trap(sig) { @sig_queue << sig; awaken_master } } trap(:CHLD) { awaken_master } self.pid = config[:pid] build_app! if preload_app bind_new_listeners! spawn_missing_workers self end |
preload_appがtrueの場合はbuild_app!が実行されます。build_app!はLambda#callを呼んでおり、Railsアプリを読み込みます。この時点ではマスタープロセスが実行しているので、子プロセス生成時には子プロセスでRailsアプリをロードすることなく即座に子プロセスで処理を開始することができます。
1 2 3 4 5 6 7 8 9 |
def build_app! if app.respond_to?(:arity) && app.arity == 0 if defined?(Gem) && Gem.respond_to?(:refresh) logger.info "Refreshing Gem list" Gem.refresh end self.app = app.call end end |
bind_new_listeners!メソッドでは指定したアドレス(UNIXドメインソケット or TCPソケット)で待ち受けます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# call only after calling inherit_listeners! # This binds any listeners we did NOT inherit from the parent def bind_new_listeners! NEW_LISTENERS.each { |addr| listen(addr) }.clear raise ArgumentError, "no listeners" if LISTENERS.empty? end def listen(address, opt = {}.merge(listener_opts[address] || {})) address = config.expand_addr(address) return if String === address && listener_names.include?(address) delay = opt[:delay] || 0.5 tries = opt[:tries] || 5 begin io = bind_listen(address, opt) unless Kgio::TCPServer === io || Kgio::UNIXServer === io io.autoclose = false io = server_cast(io) end logger.info "listening on addr=#{sock_name(io)} fd=#{io.fileno}" LISTENERS << io io # ... |
spawn_missing_workersでWorkerを作成します。このプロセスがHTTPリクエストをハンドリングすることになります。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
def spawn_missing_workers #... worker_nr = -1 until (worker_nr += 1) == @worker_processes @workers.value?(worker_nr) and next worker = Unicorn::Worker.new(worker_nr) before_fork.call(self, worker) pid = @worker_exec ? worker_spawn(worker) : fork unless pid after_fork_internal worker_loop(worker) exit end @workers[pid] = worker worker.atfork_parent end rescue => e @logger.error(e) rescue nil exit! end |
まず、指定の数だけWorkerを増やします。worker_spawnやforkで子プロセスが生成される前にbefore_forkが呼び出されます。unless pidの部分は子プロセスはpidがnilになるので、子プロセスのみがunlessの中を実行することになり、after_forkとworker_loopが実行されます。親プロセスではpidをキーにUnicorn::Workerのインスタンスを保持します。
worker_loopでは初期化をしつつHTTPリクエストのハンドリングを行っています。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
def worker_loop(worker) ppid = @master_pid readers = init_worker_process(worker) nr = 0 # this becomes negative if we need to reopen logs #... ready = readers.dup @after_worker_ready.call(self, worker) begin nr < 0 and reopen_worker_logs(worker.nr) nr = 0 worker.tick = time_now.to_i tmp = ready.dup while sock = tmp.shift # Unicorn::Worker#kgio_tryaccept is not like accept(2) at all, # but that will return false if client = sock.kgio_tryaccept process_client(client) nr += 1 worker.tick = time_now.to_i end break if nr < 0 end #... worker.tick = time_now.to_i ret = IO.select(readers, nil, nil, @timeout) and ready = ret[0] rescue => e redo if nr < 0 && readers[0] Unicorn.log_error(@logger, "listen loop error", e) if readers[0] end while readers[0] end |
readersにはUnicornのソケットとWorkerのインスタンスが入ります。Workerのインスタンスはto_ioメソッドを実装しており、IO.selectの引数に入れることができます。これによってワーカがマスタープロセスからシグナルを受信できるようにしています。
IO.selectを使っているためThundering Herdが生じます。Unicornのリファレンスではこれに関して以下のように記述しています。
1 2 3 4 5 6 7 8 9 10 |
Since non-blocking accept() is used, there can be a thundering herd when an occasional client connects when application *is not busy*. The thundering herd problem should not affect applications that are running all the time since worker processes will only select()/accept() outside of the application dispatch. Additionally, thundering herds are much smaller than with configurations using existing prefork servers. Process counts should only be scaled to backend resources, never to the number of expected clients like is typical with blocking prefork servers. So while we've seen instances of popular prefork servers configured to run many hundreds of worker processes, Unicorn deployments are typically only 2-4 processes per-core. |
要約するとこんな感じです
- Thundering Herdになる可能性があるけど、ActionDispatch以外はselect/acceptしかしていないのでアプリケーションには影響を与えないはず
- Thundering Herd自体の影響もそこまで無い。プロセス数はサーバリソースに依存するが、1コアあたり2〜4プロセスぐらいになる。
ちなみに、Chris’s Wiki :: blog/unix/AcceptDoesNotThunderの記事だと256プロセスでも20〜60ms程度のオーバーヘッドと書いてあったりします。
process_clientではHTTPのハンドリングをしています。HTTPリクエストのパースはRagelを使ってC拡張を書いていて、Unicorn::HttpRequest#readのところで使っています。それ以外の部分はRackの仕様に従ってハンドリングをしています。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
def process_client(client) status, headers, body = @app.call(env = @request.read(client)) begin return if @request.hijacked? if 100 == status.to_i e100_response_write(client, env) status, headers, body = @app.call(env) return if @request.hijacked? end @request.headers? or headers = nil http_response_write(client, status, headers, body, @request.response_start_sent) ensure body.respond_to?(:close) and body.close end unless client.closed? # rack.hijack may've close this for us client.shutdown # in case of fork() in Rack app client.close # flush and uncork socket immediately, no keepalive end rescue => e handle_error(client, e) end |
デーモン化はUnicorn::Launcher.daemonize!でダブルフォークの方法でデーモン化しています。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
module Unicorn::Launcher def self.daemonize!(options) cfg = Unicorn::Configurator $stdin.reopen("/dev/null") # We only start a new process group if we're not being reexecuted # and inheriting file descriptors from our parent unless ENV['UNICORN_FD'] # grandparent - reads pipe, exits when master is ready # \_ parent - exits immediately ASAP # \_ unicorn master - writes to pipe when ready rd, wr = IO.pipe grandparent = $$ if fork wr.close # grandparent does not write else rd.close # unicorn master does not read Process.setsid exit if fork # parent dies now end |
IO.pipeを使っているのはgrandparentプロセスのライフサイクルコントロールのためで、joinメソッド内でパイプを通じてgrandparentプロセスにデータが送られたのを契機にプロセスがexitするようになっています。
コメントを残す