RailsEventStoreのコードリーディングをしました。バージョンは0.31.1です。

使い方

rails-event-store をbundle installして

config/application.rbとかconfig/environments/*.rbに以下のようなコードを入れておいて

models配下などにイベント用のクラスを作成して

任意のタイミングでpublishすればOK

publishしたタイミングでRailsEventStore::Client#subscribeで設定したイベントに対応するsubscriberが発火(#call)されます。

Subscribe

まずはRailsEventStore::Client#subscribeを追っていきます。

RailsEventStore::ClientはRubyEventStore::Clientを継承しています。RubyEventStore::Cilentは以下のように定義されています。

brokerはPubSub::BrokerのインスタンスでPubSub::Broker#add_subscriptionが呼ばれます。

dispatcherはRailsEventStore::ActiveJobDispatcherのインスタンス、subscriptionsはRubyEventStore::PubSub::Subscriptionsのインスタンスです。

まず、RailsEventStore::ActiveJobDispatcher#verifyでsubscriberを検証します。

ActiveJobDispatcherはRubyEventStore::AsyncDispatcherを継承しています。AsyncDispatcher#verifyは以下のように定義されています。

@schedulerはRailsEventStore::ActiveJobDispatcher::ActiveJobSchedulerのインスタンスです。#async_handler?はsubscriberがActiveJob::Baseを継承しているかどうかをチェックしています。

ActiveJob::Baseを継承していない場合は、AsyncDispatcherのsuperが呼ばれるため親クラスのPubSub::Dispatcher#verifyが呼ばれます

subscriberにクラスがセットされていた場合はインスタンス化してから#callメソッドがあるかどうかを確認してあればtrue、なければInvalidHandlerをraiseします。

検証が終わったら、RubyEventStore::PubSub::Subscriptions#add_subscriptionを呼び出します。

event_typesには[OrderPlaced]のようなイベントの配列が入ります。LocalSubscriptions#addで@subscriptionsのハッシュにイベント名をキーとしてsubscriberの配列が入ります。

Publish

RailsEventStore::Client#publishから追っていきます。

enrich_events_metadataは各eventデータにメタデータ(リモートIPやタイムスタンプ)を付与します。

serialized_eventはイベントデータをシリアライズできる形式(to_hとか==が生えたSerializedRecordオブジェクト)に変換します。これはActiveJobでto_hでシリアライズしたデータを引数にperform_laterを実行したり、streamにデータを送るときの形式として使います(SubscriberがActiveJobでなければ、eventのオブジェクトがそのまま引数として渡ります)

enriched_eventsとserialized_eventsはRubyEventStore::PubSub::Broker#callの引数としてそれぞれ渡されます。

LocationSubscriptions#all_forは@subscriptions[event_type]によってイベントに対してセットされたsubscriberの配列を返します。そのsubscriberの配列の要素に対してActiveJobDispatcher#callを呼び出していきます。

ActiveJobDispatcher自身にはcallが定義されていないのでスーパークラスのRubyEventStore::AsyncDispatcher#callが呼ばれます。

@scheduler.async_handler?がtrueの場合、つまりActiveJobがsubscriberの場合はAsyncProxyStrategy::Inline#callが呼ばれます。Inline#callは引数のオブジェクトのcallメソッドを呼ぶのでActiveJobScheduler#callが呼ばれます。ActiveJobScheduler#callはserialized_eventをハッシュ化したものを引数にActiveJob.perform_laterを呼び出します。

subscriberがActiveJobではない場合はRubyEventStore::PubSub::Dispatcher#callを呼び出します。

subscriberがクラスの場合はインスタンス化した後、callメソッドを呼び出します。このケースではserialized_eventは利用されずメタデータが入っている元のeventが引数として渡ることになります。

RailsEventStore::Middleware

RailsEventStoreはRailtieでRailsEventStore::Middlewareが自動でセットされます。event_storeを設定している場合は、@app.callがRailsEventStore::Client#with_request_metadataメソッドのブロックに囲まれて実行されます。

with_request_metadataは以下のように定義されています。

request_metadataは#default_request_metadataの値でremote_ipやrequest_idがセットされたハッシュを返します。

#with_metadataではmetadataを設定=>ブロック実行=>元のmetadataに復元、というのをやっています。