2017-10-24

rails/springコードリーディング

rails/springのコードリーディングをしましたー。ということで備忘録。

概要

Springはサーバ、アプリケーション、クライアントに分かれて処理を行います。サーバはクライアントからの接続を受け取り、アプリケーションにコマンド処理を委譲します。アプリケーションはRailsアプリケーションを先に読み込んでおいて、クライアントのリクエストが来たときにコマンドを実行する、ということをやっています。

サーバ側

まずはサーバ側の spring server を叩いたときの動きをコードから追ってみます。bin/springではSpring::Client.runを実行しています。

require 'spring/client'
Spring::Client.run(ARGV)

Spring::Client.runはサブコマンドに応じたクラスをインスタンス化し、callメソッドを呼び出します。serverサブコマンドの場合はSpring::Client::Server#callを呼び出します。

module Spring
  module Client
    class Server < Command

      def call
        require "spring/server"
        Spring::Server.boot(foreground: foreground?)
      end

Spring::Server.bootはSpring::Serverをインスタンス化してbootメソッドを呼び出します

module Spring
  class Server

    def initialize(options = {})
      @foreground   = options.fetch(:foreground, false)
      @env          = options[:env] || default_env
      @applications = Hash.new { |h, k| h[k] = ApplicationManager.new(k, env) }
      @pidfile      = env.pidfile_path.open('a')
      @mutex        = Mutex.new
    end
#...
    def boot
      Spring.verify_environment

      write_pidfile
      set_pgid unless foreground?
      ignore_signals unless foreground?
      set_exit_hook
      set_process_title
      start_server
    end

サーバ起動のためにpidファイルを書き込んだり諸々の初期設定を行った後、start_serverメソッドを呼び出します。

def start_server
  server = UNIXServer.open(env.socket_name)
  log "started on #{env.socket_name}"
  loop { serve server.accept }
rescue Interrupt
end

UNIXServerを起動します。ソケット名(env.socket_name)は環境変数SPRING_SOCKETの値か、テンポラリディレクトリ内のファイルになります。その後、acceptのループに入り、クライアントからの接続を待ち受けつつ、接続要求が来たら接続済みソケットがserveメソッドに渡されます。

def serve(client)
  log "accepted client"
  client.puts env.version

  app_client = client.recv_io
  command    = JSON.load(client.read(client.gets.to_i))

  args, default_rails_env = command.values_at('args', 'default_rails_env')

  if Spring.command?(args.first)
    log "running command #{args.first}"
    client.puts
    client.puts @applications[rails_env_for(args, default_rails_env)].run(app_client)
  else
    log "command not found #{args.first}"
    client.close
  end
rescue SocketError => e
  raise e unless client.eof?
ensure
  redirect_output
end

serverメソッドではspringのバージョンをクライアントに送ります。クライアントはサーバのSpringとバージョンが合っているかどうかを検証し、合っていなかったらエラーになります。

次にクライアント側がファイルディスクリプタを送ってくるので、UNIXSocket#recv_ioで受け取ります。

クライアント側はさらに実行したいコマンドをJSON形式で送ってきます。最初にJSONのサイズを送り、その後にJSONを送ってきます。JSONの実体としてはargsとdefault_rails_envの属性値を持っています。ここで実行したいコマンドを送っている意味合いとしてはコマンドの実行そのものではなく、rails_env_forによるenv(development, production, test)を決定するためです。

クライアントが送ってきたコマンドがspringで対応しているコマンドであれば、ApplicationManager#runを実行します。

module Spring
  class ApplicationManager

    # Returns the pid of the process running the command, or nil if the application process died.
    def run(client)
      with_child do
        child.send_io client
        child.gets or raise Errno::EPIPE
      end

      pid = child.gets.to_i

      unless pid.zero?
        log "got worker pid #{pid}"
        pid
      end
    rescue Errno::ECONNRESET, Errno::EPIPE => e
      log "#{e} while reading from child; returning no pid"
      nil
    ensure
      client.close
    end

with_childメソッドが実行されます。with_childメソッド内ではsynchronizeで排他制御をしつつ、アプリケーション(コマンドを実際に実行するプロセス)を子プロセスとして起動して、ブロック内の処理を実行します。ブロック内の処理はアプリケーションプロセスにクライアント側が送ってきたファイルディスクリプタを渡します。受け取ると子プロセスはputsしてくるのでブロック内のchild.getsの処理を抜けます。

def with_child
  synchronize do
    if alive?
      begin
        yield
      rescue Errno::ECONNRESET, Errno::EPIPE
        # The child has died but has not been collected by the wait thread yet,
        # so start a new child and try again.
        log "child dead; starting"
        start
        yield
      end
    else
      log "child not running; starting"
      start
      yield
    end
  end
end

with_childの処理は、初回はalive?=@pidはnilなのでstartメソッド経由でstart_childメソッドを呼び出します。UNIXSocket.pairで無名のソケットペアを作成し、新しく子プロセス(アプリケーションプロセス)を立ち上げ、そのプロセスの3と4番のファイルディスクリプタにソケットペアの片方とログファイル用のファイルディスクリプタをセットします。これによってサーバプロセスとアプリケーションプロセスがソケットペア経由で通信できるようになります。

def start_child(preload = false)
  @child, child_socket = UNIXSocket.pair

  Bundler.with_clean_env do
    @pid = Process.spawn(
      {
        "RAILS_ENV"           => app_env,
        "RACK_ENV"            => app_env,
        "SPRING_ORIGINAL_ENV" => JSON.dump(Spring::ORIGINAL_ENV),
        "SPRING_PRELOAD"      => preload ? "1" : "0"
      },
      "ruby",
      "-I", File.expand_path("../..", $LOADED_FEATURES.grep(/bundler\/setup\.rb$/).first),
      "-I", File.expand_path("../..", __FILE__),
      "-e", "require 'spring/application/boot'",
      3 => child_socket,
      4 => spring_env.log_file,
    )
  end

  start_wait_thread(pid, child) if child.gets
  child_socket.close
end

子プロセスとして立ち上げるコマンドは require ‘spring/application/boot’です。子プロセス用のソケットは親プロセスでは閉じておきます。

アプリケーション側

子プロセスのspring/application/bootが実際にコマンドを実行する常駐プロセスになります。

spring/application/bootはSpring::Applicationをインスタンス化してrunメソッドを実行します。runメソッドは親プロセスから受け取ったクライアントのファイルディスクリプタを受け取って、serveメソッドを実行します。serveメソッドは長いのでかなり省略していますが全体的にこんな感じです↓

def serve(client)
  log "got client"
  manager.puts

  stdout, stderr, stdin = streams = 3.times.map { client.recv_io }
  [STDOUT, STDERR, STDIN].zip(streams).each { |a, b| a.reopen(b) }

  preload unless preloaded?

  args, env = JSON.load(client.read(client.gets.to_i)).values_at("args", "env")
  command   = Spring.command(args.shift)

  connect_database
  setup command

#...

  pid = fork {

#...
    command.call
  }

  disconnect_database

  log "forked #{pid}"
  manager.puts pid

  wait pid, streams, client

#...
ensure
  # Redirect STDOUT and STDERR to prevent from keeping the original FDs
  # (i.e. to prevent `spring rake -T | grep db` from hanging forever),
  # even when exception is raised before forking (i.e. preloading).
  reset_streams
end

クライアントから標準出力、標準エラー出力、標準入力の3つのファイルディスクリプタを受け取り、アプリケーションの標準出力、エラー出力、標準入力として再オープンします。

あとはpreloadでRailsアプリケーションをロードしつつ、クライアントから送られてくるコマンドを実行し、waitメソッドでコマンドが終わるまで待ちます。

クライアント側のコード

次に spring rails console を実行したときの挙動を追ってみます。

springコマンドのサブコマンドがrailsだとSpring::Client::Rails#callを呼び出します。

module Spring
  module Client
    class Rails < Command
      COMMANDS = Set.new %w(console runner generate destroy test)

      ALIASES = {
        "c" => "console",
        "r" => "runner",
        "g" => "generate",
        "d" => "destroy",
        "t" => "test"
      }
# ...
      def call
        command_name = ALIASES[args[1]] || args[1]

        if COMMANDS.include?(command_name)
          Run.call(["rails_#{command_name}", *args.drop(2)])
        else
          require "spring/configuration"
          ARGV.shift
          load Dir.glob(Spring.application_root_path.join("{bin,script}/rails")).first
          exit
        end
      end
    end
  end
end

Spring::Client::Run.callを[“rails_console”]の引数をつけて呼び出します。connectでSpringサーバと接続できればwarm_run、そうでなければcold_runを実行します。Springサーバを明示的に起動していない状態で spring binstubでspringがラップされた状態のbin/railsやbin/rakeを叩くとcold_runが実行されます。cold_runはSpringサーバを立ち上げてからコマンドを実行します。

module Spring
  module Client
    class Run < Command
# ...
      def connect
        @server = UNIXSocket.open(env.socket_name)
      end

      def call
        begin
          connect
        rescue Errno::ENOENT, Errno::ECONNRESET, Errno::ECONNREFUSED
          cold_run
        else
          warm_run
        end
      ensure
        server.close if server
      end

# ...

      def run
        verify_server_version

        application, client = UNIXSocket.pair

        queue_signals
        connect_to_application(client)
        run_command(client, application)
      rescue Errno::ECONNRESET
        exit 1
      end

warm_runもcold_runも最終的にはrunメソッドを叩きます。connect_to_applicationメソッドではサーバにUNIXSocketを渡しており、アプリケーション側にそのソケットが渡されます。この時点ではクライアントとアプリケーションが接続されたことになります。run_commandではクライアントの標準入出力と実行すべきコマンドをアプリケーションに送信します。

def run_command(client, application)
  log "sending command"

  application.send_io STDOUT
  application.send_io STDERR
  application.send_io STDIN

  send_json application, "args" => args, "env" => ENV.to_hash

  pid = server.gets
  pid = pid.chomp if pid

  # We must not close the client socket until we are sure that the application has
  # received the FD. Otherwise the FD can end up getting closed while it's in the server
  # socket buffer on OS X. This doesn't happen on Linux.
  client.close

  if pid && !pid.empty?
    log "got pid: #{pid}"

    forward_signals(application)
    status = application.read.to_i

    log "got exit status #{status}"

    exit status
  else
    log "got no pid"
    exit 1
  end
ensure
  application.close
end

application.read.to_iはアプリケーション側で実行したコマンドのステータスコードが返ってくるので、コマンドが実行するまではクライアントは待ちの状態になります。ただし、クライアント側の標準入出力はアプリケーション側と同じになっているので、rails consoleのコマンドではインタラクティブにコマンドを実行することができます。また、forward_signalsによってクライアントに対するシグナルはアプリケーション側に転送されます。

まとめ

  1. Springサーバが立ち上がり、UNIXドメインソケットで待ち受ける
  2. クライアントからのアクセスがあると、サーバはアプリケーション(コマンドを実行する環境)の子プロセスを立ち上げる。既に立ち上がっていればそのアプリケーションを利用する。サーバとアプリケーション間はUnixSocketペアで相互通信を行う。
  3. クライアントからサーバ経由でアプリケーションにUnixSocketペアを渡す
  4. クライアントの標準入出力を3のUnixSocket経由でアプリケーションに渡し、アプリケーションの標準入出力としてセットする。
  5. クライアントがUnixSocket経由でアプリケーションにコマンドを送信し、アプリケーションはforkした上で初期設定を行い、コマンドを実行する。アプリケーションには既にRailsのコードがロードされているので直ぐにコマンドが実行される。クライアントで受け取ったシグナルはアプリケーションに転送される。

このエントリーをはてなブックマークに追加