RailsEventStoreのコードリーディングをしました。バージョンは0.31.1です。
使い方
rails-event-store
をbundle installして
config/application.rbとかconfig/environments/*.rbに以下のようなコードを入れておいて
config.to_prepare do
Rails.configuration.event_store = RailsEventStore::Client.new
Rails.configuration.event_store.subscribe(->(c){ puts c }, to: [OrderPlaced])
end
models配下などにイベント用のクラスを作成して
class OrderPlaced < RailsEventStore::Event
end
任意のタイミングでpublishすればOK
event_store = Rails.configuration.event_store
event = OrderPlaced.new(data: { foo: :bar })
event_store.publish(event)
publishしたタイミングでRailsEventStore::Client#subscribeで設定したイベントに対応するsubscriberが発火(#call)されます。
Subscribe
まずはRailsEventStore::Client#subscribeを追っていきます。RailsEventStore::ClientはRubyEventStore::Clientを継承しています。RubyEventStore::Cilentは以下のように定義されています。
module RubyEventStore
class Client
def subscribe(subscriber = nil, to:, &proc)
raise ArgumentError, "subscriber must be first argument or block, cannot be both" if subscriber && proc
subscriber ||= proc
broker.add_subscription(subscriber, to)
end
brokerはPubSub::BrokerのインスタンスでPubSub::Broker#add_subscriptionが呼ばれます。
module RubyEventStore
module PubSub
class Broker
def add_subscription(subscriber, event_types)
verify_subscription(subscriber)
subscriptions.add_subscription(subscriber, event_types)
end
# ...
private
attr_reader :subscriptions, :dispatcher
def verify_subscription(subscriber)
raise SubscriberNotExist, "subscriber must be first argument or block" unless subscriber
dispatcher.verify(subscriber)
end
dispatcherはRailsEventStore::ActiveJobDispatcherのインスタンス、subscriptionsはRubyEventStore::PubSub::Subscriptionsのインスタンスです。
まず、RailsEventStore::ActiveJobDispatcher#verifyでsubscriberを検証します。
ActiveJobDispatcherはRubyEventStore::AsyncDispatcherを継承しています。AsyncDispatcher#verifyは以下のように定義されています。
module RubyEventStore
class AsyncDispatcher < PubSub::Dispatcher
def initialize(proxy_strategy: AsyncProxyStrategy::Inline.new, scheduler:)
@async_proxy_strategy = proxy_strategy
@scheduler = scheduler
end
# ...
def verify(subscriber)
super unless @scheduler.async_handler?(subscriber)
end
end
end
@schedulerはRailsEventStore::ActiveJobDispatcher::ActiveJobSchedulerのインスタンスです。#async_handler?はsubscriberがActiveJob::Baseを継承しているかどうかをチェックしています。
module RailsEventStore
class ActiveJobDispatcher < RubyEventStore::AsyncDispatcher
class ActiveJobScheduler
def call(klass, serialized_event)
klass.perform_later(serialized_event.to_h)
end
def async_handler?(klass)
Class === klass && klass < ActiveJob::Base
end
end
ActiveJob::Baseを継承していない場合は、AsyncDispatcherのsuperが呼ばれるため親クラスのPubSub::Dispatcher#verifyが呼ばれます
module RubyEventStore
module PubSub
class Dispatcher
def call(subscriber, event, _)
subscriber = subscriber.new if Class === subscriber
subscriber.call(event)
end
def verify(subscriber)
subscriber = klassify(subscriber)
subscriber.respond_to?(:call) or raise InvalidHandler.new(subscriber)
end
private
def klassify(subscriber)
Class === subscriber ? subscriber.new : subscriber
rescue ArgumentError
raise InvalidHandler.new(subscriber)
end
end
end
end
subscriberにクラスがセットされていた場合はインスタンス化してから#callメソッドがあるかどうかを確認してあればtrue、なければInvalidHandlerをraiseします。
検証が終わったら、RubyEventStore::PubSub::Subscriptions#add_subscriptionを呼び出します。
module RubyEventStore
module PubSub
class Subscriptions
def initialize
@local = LocalSubscriptions.new
@global = GlobalSubscriptions.new
@thread = ThreadSubscriptions.new
end
def add_subscription(subscriber, event_types)
local.add(subscriber, event_types)
end
event_typesには[OrderPlaced]のようなイベントの配列が入ります。LocalSubscriptions#addで@subscriptionsのハッシュにイベント名をキーとしてsubscriberの配列が入ります。
class LocalSubscriptions
def initialize
@subscriptions = Hash.new {|hsh, key| hsh[key] = [] }
end
def add(subscription, event_types)
event_types.each{ |type| @subscriptions[type.to_s] << subscription }
->() {event_types.each{ |type| @subscriptions.fetch(type.to_s).delete(subscription) } }
end
Publish
RailsEventStore::Client#publishから追っていきます。module RubyEventStore
class Client
def publish(events, stream_name: GLOBAL_STREAM, expected_version: :any)
enriched_events = enrich_events_metadata(events)
serialized_events = serialize_events(enriched_events)
append_to_stream_serialized_events(serialized_events, stream_name: stream_name, expected_version: expected_version)
enriched_events.zip(serialized_events) do |event, serialized_event|
with_metadata(
correlation_id: event.metadata[:correlation_id] || event.event_id,
causation_id: event.event_id,
) do
broker.(event, serialized_event)
end
end
:ok
end
enrich_events_metadataは各eventデータにメタデータ(リモートIPやタイムスタンプ)を付与します。
serialized_eventはイベントデータをシリアライズできる形式(to_hとか==が生えたSerializedRecordオブジェクト)に変換します。これはActiveJobでto_hでシリアライズしたデータを引数にperform_laterを実行したり、streamにデータを送るときの形式として使います(SubscriberがActiveJobでなければ、eventのオブジェクトがそのまま引数として渡ります)
enriched_eventsとserialized_eventsはRubyEventStore::PubSub::Broker#callの引数としてそれぞれ渡されます。
module RubyEventStore
module PubSub
class Broker
def call(event, serialized_event)
subscribers = subscriptions.all_for(event.type)
subscribers.each do |subscriber|
dispatcher.call(subscriber, event, serialized_event)
end
end
LocationSubscriptions#all_forは@subscriptions[event_type]
によってイベントに対してセットされたsubscriberの配列を返します。そのsubscriberの配列の要素に対してActiveJobDispatcher#callを呼び出していきます。
ActiveJobDispatcher自身にはcallが定義されていないのでスーパークラスのRubyEventStore::AsyncDispatcher#callが呼ばれます。
module RubyEventStore
class AsyncDispatcher < PubSub::Dispatcher
def call(subscriber, _, serialized_event)
if @scheduler.async_handler?(subscriber)
@async_proxy_strategy.call(->{ @scheduler.call(subscriber, serialized_event) })
else
super
end
end
@scheduler.async_handler?がtrueの場合、つまりActiveJobがsubscriberの場合はAsyncProxyStrategy::Inline#callが呼ばれます。Inline#callは引数のオブジェクトのcallメソッドを呼ぶのでActiveJobScheduler#callが呼ばれます。ActiveJobScheduler#callはserialized_eventをハッシュ化したものを引数にActiveJob.perform_laterを呼び出します。
module RailsEventStore
class ActiveJobDispatcher < RubyEventStore::AsyncDispatcher
class ActiveJobScheduler
def call(klass, serialized_event)
klass.perform_later(serialized_event.to_h)
end
subscriberがActiveJobではない場合はRubyEventStore::PubSub::Dispatcher#callを呼び出します。
module RubyEventStore
module PubSub
class Dispatcher
def call(subscriber, event, _)
subscriber = subscriber.new if Class === subscriber
subscriber.call(event)
end
subscriberがクラスの場合はインスタンス化した後、callメソッドを呼び出します。このケースではserialized_eventは利用されずメタデータが入っている元のeventが引数として渡ることになります。
RailsEventStore::Middleware
RailsEventStoreはRailtieでRailsEventStore::Middlewareが自動でセットされます。event_storeを設定している場合は、@app.callがRailsEventStore::Client#with_request_metadataメソッドのブロックに囲まれて実行されます。module RailsEventStore
class Middleware
def initialize(app)
@app = app
end
def call(env)
if config.respond_to?(:event_store)
config.event_store.with_request_metadata(env) do
@app.call(env)
end
else
@app.call(env)
end
end
private
def config
Rails.application.config
end
end
end
with_request_metadataは以下のように定義されています。
module RailsEventStore
class Client < RubyEventStore::Client
attr_reader :request_metadata
def with_request_metadata(env, &block)
with_metadata(request_metadata.call(env)) do
block.call
end
end
private
def default_request_metadata
->(env) do
request = ActionDispatch::Request.new(env)
{
remote_ip: request.remote_ip,
request_id: request.uuid
}
end
end
request_metadataは#default_request_metadataの値でremote_ipやrequest_idがセットされたハッシュを返します。
#with_metadataではmetadataを設定=>ブロック実行=>元のmetadataに復元、というのをやっています。
module RubyEventStore
class Client
def with_metadata(metadata, &block)
previous_metadata = metadata()
self.metadata = previous_metadata.merge(metadata)
block.call if block_given?
ensure
self.metadata = previous_metadata
end
def metadata
@metadata.value || EMPTY_HASH
end