unicornを読んだので備忘録。非公式ですがミラーのリポジトリは defunkt/unicornです。
bin/unicornでは引数をパースしつつ、Unicorn.builderでRackアプリを返すLambdaを作成し、そのLambdaを引数にUnicorn::HTTPServer#start, #joinを実行します。
app = Unicorn.builder(ARGV[0] || 'config.ru', op)
op = nil
# ...
Unicorn::Launcher.daemonize!(options) if rackup_opts[:daemonize]
Unicorn::HttpServer.new(app, options).start.join
config.ruが指定されている場合は、inner_appにはconfig.ruの内容でビルドしたRackアプリがセットされます。
module Unicorn
def self.builder(ru, op)
#...
lambda do ||
inner_app = case ru
when /\.ru$/
raw = File.read(ru)
raw.sub!(/^__END__\n.*/, '')
eval("Rack::Builder.new {(\n#{raw}\n)}.to_app", TOPLEVEL_BINDING, ru)
else
require ru
Object.const_get(File.basename(ru, '.rb').capitalize)
end
pp({ :inner_app => inner_app }) if $DEBUG
return inner_app if no_default_middleware
middleware = { # order matters
ContentLength: nil,
Chunked: nil,
CommonLogger: [ $stderr ],
ShowExceptions: nil,
Lint: nil,
TempfileReaper: nil,
}
case ENV["RACK_ENV"]
when "development"
when "deployment"
middleware.delete(:ShowExceptions)
middleware.delete(:Lint)
else
return inner_app
end
Rack::Builder.new do
middleware.each do |m, args|
use(Rack.const_get(m), *args) if Rack.const_defined?(m)
end
run inner_app
end.to_app
end
end
最終的にmiddleware => inner_appという順番で実行されるRackアプリが作成されるLambdaを返すことになります。RackアプリではなくLambdaを返している意図としては遅延評価をするためで、Lambda作成時点でRailsアプリを読み込むのではなくLambdaをcallした時点で初めてRailsアプリが読み込まれます。
Unicorn::HTTPServer#startのself.pidではpidファイルにプロセスIDを書き込んでいます。
class Unicorn::HttpServer
# ...
attr_reader :pid, :logger
include Unicorn::SocketHelper
include Unicorn::HttpResponse
LISTENERS = []
NEW_LISTENERS = []
def start
inherit_listeners!
@self_pipe.replace(Unicorn.pipe)
@master_pid = @worker_data ? Process.ppid : $$
@queue_sigs.each { |sig| trap(sig) { @sig_queue << sig; awaken_master } }
trap(:CHLD) { awaken_master }
self.pid = config[:pid]
build_app! if preload_app
bind_new_listeners!
spawn_missing_workers
self
end
preload_appがtrueの場合はbuild_app!が実行されます。build_app!はLambda#callを呼んでおり、Railsアプリを読み込みます。この時点ではマスタープロセスが実行しているので、子プロセス生成時には子プロセスでRailsアプリをロードすることなく即座に子プロセスで処理を開始することができます。
def build_app!
if app.respond_to?(:arity) && app.arity == 0
if defined?(Gem) && Gem.respond_to?(:refresh)
logger.info "Refreshing Gem list"
Gem.refresh
end
self.app = app.call
end
end
bind_new_listeners!メソッドでは指定したアドレス(UNIXドメインソケット or TCPソケット)で待ち受けます。
# call only after calling inherit_listeners!
# This binds any listeners we did NOT inherit from the parent
def bind_new_listeners!
NEW_LISTENERS.each { |addr| listen(addr) }.clear
raise ArgumentError, "no listeners" if LISTENERS.empty?
end
def listen(address, opt = {}.merge(listener_opts[address] || {}))
address = config.expand_addr(address)
return if String === address && listener_names.include?(address)
delay = opt[:delay] || 0.5
tries = opt[:tries] || 5
begin
io = bind_listen(address, opt)
unless Kgio::TCPServer === io || Kgio::UNIXServer === io
io.autoclose = false
io = server_cast(io)
end
logger.info "listening on addr=#{sock_name(io)} fd=#{io.fileno}"
LISTENERS << io
io
# ...
spawn_missing_workersでWorkerを作成します。このプロセスがHTTPリクエストをハンドリングすることになります。
def spawn_missing_workers
#...
worker_nr = -1
until (worker_nr += 1) == @worker_processes
@workers.value?(worker_nr) and next
worker = Unicorn::Worker.new(worker_nr)
before_fork.call(self, worker)
pid = @worker_exec ? worker_spawn(worker) : fork
unless pid
after_fork_internal
worker_loop(worker)
exit
end
@workers[pid] = worker
worker.atfork_parent
end
rescue => e
@logger.error(e) rescue nil
exit!
end
まず、指定の数だけWorkerを増やします。worker_spawnやforkで子プロセスが生成される前にbefore_forkが呼び出されます。unless pidの部分は子プロセスはpidがnilになるので、子プロセスのみがunlessの中を実行することになり、after_forkとworker_loopが実行されます。親プロセスではpidをキーにUnicorn::Workerのインスタンスを保持します。
worker_loopでは初期化をしつつHTTPリクエストのハンドリングを行っています。
def worker_loop(worker)
ppid = @master_pid
readers = init_worker_process(worker)
nr = 0 # this becomes negative if we need to reopen logs
#...
ready = readers.dup
@after_worker_ready.call(self, worker)
begin
nr < 0 and reopen_worker_logs(worker.nr)
nr = 0
worker.tick = time_now.to_i
tmp = ready.dup
while sock = tmp.shift
# Unicorn::Worker#kgio_tryaccept is not like accept(2) at all,
# but that will return false
if client = sock.kgio_tryaccept
process_client(client)
nr += 1
worker.tick = time_now.to_i
end
break if nr < 0
end
#...
worker.tick = time_now.to_i
ret = IO.select(readers, nil, nil, @timeout) and ready = ret[0]
rescue => e
redo if nr < 0 && readers[0]
Unicorn.log_error(@logger, "listen loop error", e) if readers[0]
end while readers[0]
end
readersにはUnicornのソケットとWorkerのインスタンスが入ります。Workerのインスタンスはto_ioメソッドを実装しており、IO.selectの引数に入れることができます。これによってワーカがマスタープロセスからシグナルを受信できるようにしています。
IO.selectを使っているためThundering Herdが生じます。Unicornのリファレンスではこれに関して以下のように記述しています。
Since non-blocking accept() is used, there can be a thundering herd when an occasional client
connects when application *is not busy*. The thundering herd problem should not affect
applications that are running all the time since worker processes will only select()/accept()
outside of the application dispatch.
Additionally, thundering herds are much smaller than with configurations using existing
prefork servers. Process counts should only be scaled to backend resources,
never to the number of expected clients like is typical with blocking prefork servers.
So while we've seen instances of popular prefork servers configured to run
many hundreds of worker processes, Unicorn deployments are typically only 2-4 processes per-core.
要約するとこんな感じです
- Thundering Herdになる可能性があるけど、ActionDispatch以外はselect/acceptしかしていないのでアプリケーションには影響を与えないはず
- Thundering Herd自体の影響もそこまで無い。プロセス数はサーバリソースに依存するが、1コアあたり2〜4プロセスぐらいになる。
process_clientではHTTPのハンドリングをしています。HTTPリクエストのパースはRagelを使ってC拡張を書いていて、Unicorn::HttpRequest#readのところで使っています。それ以外の部分はRackの仕様に従ってハンドリングをしています。
def process_client(client)
status, headers, body = @app.call(env = @request.read(client))
begin
return if @request.hijacked?
if 100 == status.to_i
e100_response_write(client, env)
status, headers, body = @app.call(env)
return if @request.hijacked?
end
@request.headers? or headers = nil
http_response_write(client, status, headers, body,
@request.response_start_sent)
ensure
body.respond_to?(:close) and body.close
end
unless client.closed? # rack.hijack may've close this for us
client.shutdown # in case of fork() in Rack app
client.close # flush and uncork socket immediately, no keepalive
end
rescue => e
handle_error(client, e)
end
デーモン化はUnicorn::Launcher.daemonize!でダブルフォークの方法でデーモン化しています。
module Unicorn::Launcher
def self.daemonize!(options)
cfg = Unicorn::Configurator
$stdin.reopen("/dev/null")
# We only start a new process group if we're not being reexecuted
# and inheriting file descriptors from our parent
unless ENV['UNICORN_FD']
# grandparent - reads pipe, exits when master is ready
# \_ parent - exits immediately ASAP
# \_ unicorn master - writes to pipe when ready
rd, wr = IO.pipe
grandparent = $$
if fork
wr.close # grandparent does not write
else
rd.close # unicorn master does not read
Process.setsid
exit if fork # parent dies now
end
IO.pipeを使っているのはgrandparentプロセスのライフサイクルコントロールのためで、joinメソッド内でパイプを通じてgrandparentプロセスにデータが送られたのを契機にプロセスがexitするようになっています。