cookpadさんのExpeditorのコードリーディングをしてみました。
Expeditorの概要
非同期に処理を実行したり、依存処理や失敗時のfallbackを簡潔に書けるライブラリです。一番ベーシックな使い方はこんな感じです。command = Expeditor::Command.new do
# some code
end
command.start # start execute asynchronously
command.get # wait for value
コードリーディングの前の前提知識
ライブラリのベースはconcurrent-rubyなので、まずはconcurrent-rubyの説明から。Concurrent::IVar
ImmutableなVarで以下の処理をすることができます- 複数回、値をセットしようとするとエラーにする
- オブザーバを追加して変数セット時に処理を実行する
- 値がセットされるまでwaitする
a = Concurrent::IVar.new
a.add_observer do
puts "hoge"
end
a.fulfilled? # false
a.set 1 # set and call observer
a.fulfilled? # true
a.set 2 # error
Concurrent::Future
非同期に処理を実行することができるようになるクラス。FutureはConcurrent::IVarを継承しています。使い方は色々ありますが、Expeditorの場合は以下のようにイニシャライズ時にブロックを渡して、後でexecuteするスタイルです。
require 'concurrent/future'
future = Concurrent::Future.new do
# some code
end
futute.complete? # false
future.execute # execute asynchronously
future.value
future.complete? # true
最も基本的な実行ケースを追ってみる
Expeditor::Commandをイニシャライズ。@service = Expeditor::Services.defaultとblock以外はセットされない- startメソッド
- started?で二重起動しない仕組み
- @normal_future には initial_normalした値が入る
- intial_normalはConcurrent::Futureを継承したRichFutureクラス
- Expeditor::Commandコンストラクタの引数のブロックはtimeout_block > retryable_block > breakable_blockの3段構成でラップされてRichFutureの実行ブロックとなる
- RichFutureのadd_observerとしてmetricsメソッドが呼ばれる。このメソッドは@serviceに処理を委譲しているため@serviceの定義によって、実行完了後に何らかの処理をすることが可能
- もう一つのadd_observerではConcurrent::IVarな@ivar変数を操作している
- 成功時は@ivar.setで値がセットされる。on_xxxメソッドでコールバックのブロックを指定している場合は、@ivar.setの時点で呼び出される(@ivar.setはon_completeとon_success、@ivar.failはon_completeとon_failureが呼び出される)
- getメソッド
- 処理完了までwaitする => OKなら値返却、ダメならraise error
Dependenciesがある場合
ベーシックな使い方との差分は- イニシャライズ時に引数のdependenciesがセットされる(Concurrent::Commandインスタンス)
- prepareでdependencies内でstartが実行される
- safe_execute =>initial_normalでブロック内にセットしたwait_dependenciesを呼び出し
- wait_dependenciesではdependenciesの各インスタンスのgetメソッドで値取得を待つ。 これらはConcurrent::ThreadPoolExecutorによって実行される。shutdownはスレッドプールを停止するメソッドで、スレッドプールの新規キューの追加を禁止しつつ、既存のキューは処理される。後続のwait_for_terminationを合わせることで、値取得を待つ処理を実現している。
fallback_blockがある場合
- エラー発生時に@fallback_blockが定義されている場合は、Concurrent::SafeTaskExecutorに@fallback_blockを入れて実行する。多分ブロックをそのまま実行するとキャッチされないErrorが発生したときにfallbackされないので、エラーが発生したら引数で返すスタイル(なんかgoっぽい感じ)で安全に実行している。
- fallbackの実行結果は@ivarのcompleteメソッドを実行して返される(=@ivar.set か @ivar.failされる)
- fallback_blockの実行自体も非同期となっておりRichFutureクラスを使っている。
- fallback_blockの実行自体が非同期なので、getメソッド内ではfallbackが有効なときに@ivar.waitをかけて値の取得を待っている。
サーキットブレーカー
内外のAPIを叩くときに対象サービスが落ちていたり、ネットワーク障害などでHTTPリクエストが失敗したときなど、立て続けにエラーになるケースでは、そのサービスが復旧するまではリクエストを送らずにクライアント側でエラーにするのが効率的です。このように、エラーがある閾値を越えたときに一定時間対象の処理を行わないようにする機構をサーキットブレーカーと呼びます。Expeditorにはこのサーキットブレーカーの機構も備えています。service = Expeditor::Service.new(
period: 10, # retention period of the service metrics (success, failure, timeout, ...)
sleep: 1, # if once the circuit is opened, the circuit is still open until sleep time is passed even though failure rate is less than threshold
threshold: 0.5, # if the failure rate is more than or equal to threshold, the circuit is opened
non_break_count: 100 # if the total count of metrics is not more than non_break_count, the circuit is not opened even though failure rate is more than threshold
)
command = Expeditor::Command.new(service: service) do
...
end
- 引数にExpeditor::Serviceのインスタンスを渡す。このインスタンスはDefaultのExpeditor::Services::Defaultとは異なり、サーキットブレーカの機能を有している。
- サーキットブレークするかどうかはExpeditor::Command#breakable_blockのExpeditor::Service#open?メソッドで判定している。
- open?メソッド内ではさらにcalc_openメソッドを呼び出している。calc_openでは今までに発生したエラー数/トータル数の割合と、指定した閾値(割合)を比較している。non_break_countを設定していると、設定値以上のサンプル値じゃないとサーキットブレーカーが動かないように制御される。また、一旦openになるとsleepで指定した秒数の間はopenの状態になる。
- 成功/エラー/タイムアウト等はExpeditor::Serviceのインスタンス変数@bucket(Expeditor::Bucket)に格納される(Expeditor::Command#metricsの処理)。
- bucketのカウントアップは非同期処理に行われるため、Mutexを使って排他制御している。
その他
- timeout指定時はtimeout_blockでTimeout::timeoutのブロックに入れてタイムアウトさせている。
- start_with_retryを実行するとConcurrent::IVarな@retryable_optionsにオプションをセットして処理を実行する。
- Retryable.retryableのブロックに入れて処理を実行することでリトライ処理を実現している
- current_thread: trueの指定でConcurrent::ImmediateExecutorによって処理が実行される=同期的に処理が実行される