2018-08-28

RailsEventStoreコードリーディング

RailsEventStoreのコードリーディングをしました。バージョンは0.31.1です。

使い方

rails-event-store をbundle installして

config/application.rbとかconfig/environments/*.rbに以下のようなコードを入れておいて

config.to_prepare do
  Rails.configuration.event_store = RailsEventStore::Client.new
  Rails.configuration.event_store.subscribe(->(c){ puts c }, to: [OrderPlaced])
end

models配下などにイベント用のクラスを作成して

class OrderPlaced < RailsEventStore::Event
end

任意のタイミングでpublishすればOK

event_store = Rails.configuration.event_store
event = OrderPlaced.new(data: { foo: :bar })
event_store.publish(event)

publishしたタイミングでRailsEventStore::Client#subscribeで設定したイベントに対応するsubscriberが発火(#call)されます。

Subscribe

まずはRailsEventStore::Client#subscribeを追っていきます。

RailsEventStore::ClientはRubyEventStore::Clientを継承しています。RubyEventStore::Cilentは以下のように定義されています。

module RubyEventStore
  class Client
    def subscribe(subscriber = nil, to:, &proc)
      raise ArgumentError, "subscriber must be first argument or block, cannot be both" if subscriber && proc
      subscriber ||= proc
      broker.add_subscription(subscriber, to)
    end

brokerはPubSub::BrokerのインスタンスでPubSub::Broker#add_subscriptionが呼ばれます。

module RubyEventStore
  module PubSub
    class Broker
      def add_subscription(subscriber, event_types)
        verify_subscription(subscriber)
        subscriptions.add_subscription(subscriber, event_types)
      end
# ...
      private
      attr_reader :subscriptions, :dispatcher

      def verify_subscription(subscriber)
        raise SubscriberNotExist, "subscriber must be first argument or block" unless subscriber
        dispatcher.verify(subscriber)
      end

dispatcherはRailsEventStore::ActiveJobDispatcherのインスタンス、subscriptionsはRubyEventStore::PubSub::Subscriptionsのインスタンスです。

まず、RailsEventStore::ActiveJobDispatcher#verifyでsubscriberを検証します。

ActiveJobDispatcherはRubyEventStore::AsyncDispatcherを継承しています。AsyncDispatcher#verifyは以下のように定義されています。

module RubyEventStore
  class AsyncDispatcher < PubSub::Dispatcher
    def initialize(proxy_strategy: AsyncProxyStrategy::Inline.new, scheduler:)
      @async_proxy_strategy = proxy_strategy
      @scheduler = scheduler
    end
# ...
    def verify(subscriber)
      super unless @scheduler.async_handler?(subscriber)
    end
  end
end

@schedulerはRailsEventStore::ActiveJobDispatcher::ActiveJobSchedulerのインスタンスです。#async_handler?はsubscriberがActiveJob::Baseを継承しているかどうかをチェックしています。

module RailsEventStore
  class ActiveJobDispatcher < RubyEventStore::AsyncDispatcher
    class ActiveJobScheduler
      def call(klass, serialized_event)
        klass.perform_later(serialized_event.to_h)
      end

      def async_handler?(klass)
        Class === klass && klass < ActiveJob::Base
      end
    end

ActiveJob::Baseを継承していない場合は、AsyncDispatcherのsuperが呼ばれるため親クラスのPubSub::Dispatcher#verifyが呼ばれます

module RubyEventStore
  module PubSub

    class Dispatcher
      def call(subscriber, event, _)
        subscriber = subscriber.new if Class === subscriber
        subscriber.call(event)
      end

      def verify(subscriber)
        subscriber = klassify(subscriber)
        subscriber.respond_to?(:call) or raise InvalidHandler.new(subscriber)
      end

      private

      def klassify(subscriber)
        Class === subscriber ? subscriber.new : subscriber
      rescue ArgumentError
        raise InvalidHandler.new(subscriber)
      end
    end

  end
end

subscriberにクラスがセットされていた場合はインスタンス化してから#callメソッドがあるかどうかを確認してあればtrue、なければInvalidHandlerをraiseします。

検証が終わったら、RubyEventStore::PubSub::Subscriptions#add_subscriptionを呼び出します。

module RubyEventStore
  module PubSub
    class Subscriptions
      def initialize
        @local  = LocalSubscriptions.new
        @global = GlobalSubscriptions.new
        @thread  = ThreadSubscriptions.new
      end

      def add_subscription(subscriber, event_types)
        local.add(subscriber, event_types)
      end

event_typesには[OrderPlaced]のようなイベントの配列が入ります。LocalSubscriptions#addで@subscriptionsのハッシュにイベント名をキーとしてsubscriberの配列が入ります。

      class LocalSubscriptions
        def initialize
          @subscriptions = Hash.new {|hsh, key| hsh[key] = [] }
        end

        def add(subscription, event_types)
          event_types.each{ |type| @subscriptions[type.to_s] << subscription }
          ->() {event_types.each{ |type| @subscriptions.fetch(type.to_s).delete(subscription) } }
        end

Publish

RailsEventStore::Client#publishから追っていきます。

module RubyEventStore
  class Client
    def publish(events, stream_name: GLOBAL_STREAM, expected_version: :any)
      enriched_events = enrich_events_metadata(events)
      serialized_events = serialize_events(enriched_events)
      append_to_stream_serialized_events(serialized_events, stream_name: stream_name, expected_version: expected_version)
      enriched_events.zip(serialized_events) do |event, serialized_event|
        with_metadata(
          correlation_id: event.metadata[:correlation_id] || event.event_id,
          causation_id:   event.event_id,
        ) do
          broker.(event, serialized_event)
        end
      end
      :ok
    end

enrich_events_metadataは各eventデータにメタデータ(リモートIPやタイムスタンプ)を付与します。

serialized_eventはイベントデータをシリアライズできる形式(to_hとか==が生えたSerializedRecordオブジェクト)に変換します。これはActiveJobでto_hでシリアライズしたデータを引数にperform_laterを実行したり、streamにデータを送るときの形式として使います(SubscriberがActiveJobでなければ、eventのオブジェクトがそのまま引数として渡ります)

enriched_eventsとserialized_eventsはRubyEventStore::PubSub::Broker#callの引数としてそれぞれ渡されます。

module RubyEventStore
  module PubSub
    class Broker
      def call(event, serialized_event)
        subscribers = subscriptions.all_for(event.type)
        subscribers.each do |subscriber|
          dispatcher.call(subscriber, event, serialized_event)
        end
      end

LocationSubscriptions#all_forは@subscriptions[event_type]によってイベントに対してセットされたsubscriberの配列を返します。そのsubscriberの配列の要素に対してActiveJobDispatcher#callを呼び出していきます。

ActiveJobDispatcher自身にはcallが定義されていないのでスーパークラスのRubyEventStore::AsyncDispatcher#callが呼ばれます。

module RubyEventStore
  class AsyncDispatcher < PubSub::Dispatcher
    def call(subscriber, _, serialized_event)
      if @scheduler.async_handler?(subscriber)
        @async_proxy_strategy.call(->{ @scheduler.call(subscriber, serialized_event) })
      else
        super
      end
    end

@scheduler.async_handler?がtrueの場合、つまりActiveJobがsubscriberの場合はAsyncProxyStrategy::Inline#callが呼ばれます。Inline#callは引数のオブジェクトのcallメソッドを呼ぶのでActiveJobScheduler#callが呼ばれます。ActiveJobScheduler#callはserialized_eventをハッシュ化したものを引数にActiveJob.perform_laterを呼び出します。

module RailsEventStore
  class ActiveJobDispatcher < RubyEventStore::AsyncDispatcher
    class ActiveJobScheduler
      def call(klass, serialized_event)
        klass.perform_later(serialized_event.to_h)
      end

subscriberがActiveJobではない場合はRubyEventStore::PubSub::Dispatcher#callを呼び出します。

module RubyEventStore
  module PubSub
    class Dispatcher
      def call(subscriber, event, _)
        subscriber = subscriber.new if Class === subscriber
        subscriber.call(event)
      end

subscriberがクラスの場合はインスタンス化した後、callメソッドを呼び出します。このケースではserialized_eventは利用されずメタデータが入っている元のeventが引数として渡ることになります。

RailsEventStore::Middleware

RailsEventStoreはRailtieでRailsEventStore::Middlewareが自動でセットされます。event_storeを設定している場合は、@app.callがRailsEventStore::Client#with_request_metadataメソッドのブロックに囲まれて実行されます。

module RailsEventStore
  class Middleware
    def initialize(app)
      @app = app
    end

    def call(env)
      if config.respond_to?(:event_store)
        config.event_store.with_request_metadata(env) do
          @app.call(env)
        end
      else
        @app.call(env)
      end
    end

    private

    def config
      Rails.application.config
    end
  end
end

with_request_metadataは以下のように定義されています。

module RailsEventStore
  class Client < RubyEventStore::Client
    attr_reader :request_metadata

    def with_request_metadata(env, &block)
      with_metadata(request_metadata.call(env)) do
        block.call
      end
    end

    private
    def default_request_metadata
      ->(env) do
        request = ActionDispatch::Request.new(env)
        {
          remote_ip:  request.remote_ip,
          request_id: request.uuid
        }
      end
    end

request_metadataは#default_request_metadataの値でremote_ipやrequest_idがセットされたハッシュを返します。

#with_metadataではmetadataを設定=>ブロック実行=>元のmetadataに復元、というのをやっています。

module RubyEventStore
  class Client
    def with_metadata(metadata, &block)
      previous_metadata = metadata()
      self.metadata = previous_metadata.merge(metadata)
      block.call if block_given?
    ensure
      self.metadata = previous_metadata
    end

    def metadata
      @metadata.value || EMPTY_HASH
    end
このエントリーをはてなブックマークに追加