2022-08-14

LiteFSコードリーディング:その1

LiteFS の仕組みについて読み解いてみました。 今回は起動〜リードレプリカの部分をざっくり紹介します。 FUSE周りの仕組みに関しては別記事で紹介予定。

ちなみにLiteFSを試したときに使ったDockerfileなどはこちらに置いているので手軽に動かしたいときにご活用ください🙏

コードリーディング

全体としてはこういうシーケンスになっている(間違っていたらご指摘ください

LiteFSのシーケンス


Main.Run() がメインの処理なのでここから見ていきます。

  1. initStore()
  2. initHttpServer()
  3. initConsul()
  4. openStore()
  5. initFileSystem()

の順に実行されます。

まず、 Main.initStore() でStoreのstructを初期化をします。

func (m *Main) initStore(ctx context.Context) error {
	mountDir, err := filepath.Abs(m.Config.MountDir)
	if err != nil {
		return fmt.Errorf("abs: %w", err)
	}
	dir, file := filepath.Split(mountDir)

	m.Store = litefs.NewStore(filepath.Join(dir, "."+file))
	m.Store.Client = http.NewClient()
	return nil
}

NewStoreの引数を見てもらうとわかるように、${マウントディレクトリの親ディレクトリ}/.${マウントのファイル名} にストアのパスが設定されます。 例えば /tmp/hoge をマウントした場合は /tmp/.hoge にストアのパスが設定されます。 これはFUSEのマウント先としては /tmp/hoge として利用し、SQLite3やLTXファイルなどの実体は /tmp/.hoge に置いて管理しているためです。

Main.initHttpServer()はノード間での通信を行うためのHTTPサーバーを初期化します。

func (m *Main) initHTTPServer(ctx context.Context) error {
	server := http.NewServer(m.Store, m.Config.HTTP.Addr)
	if err := server.Listen(); err != nil {
		return fmt.Errorf("cannot open http server: %w", err)
	}
	m.HTTPServer = server
	return nil
}

Main.initConsul() ではLeaserの構造体を初期化しています。LiteFSではPrimaryのノードを決定するためにConsulの client-side leader elections を利用しています。 PrimaryのノードはLeaserを使ってキーを一定期間リースし、定期的にTTLを更新(Renew)します。何らかでPrimaryのノードが死んでしまった場合はリース切れし、別のノードがPrimaryになります。

func (m *Main) initConsul(ctx context.Context) error {
	// TEMP: Allow non-localhost addresses.

	// Find advertise URL from function if this is a test.
	advertiseURL := m.Config.Consul.AdvertiseURL
	if m.AdvertiseURLFn != nil {
		advertiseURL = m.AdvertiseURLFn()
	}

	leaser := consul.NewLeaser(m.Config.Consul.URL, advertiseURL)

	leaser.Key = m.Config.Consul.Key
	if err := leaser.Open(); err != nil {
		return fmt.Errorf("cannot connect to consul: %w", err)
	}
	log.Printf("initializing consul: key=%s url=%s advertise-url=%s", m.Config.Consul.URL, m.Config.Consul.Key, advertiseURL)

	m.Leaser = leaser
	return nil
}

Main.openStore() はデータベースをオープンし、goroutineを使ってレプリケーションのモニタリングを行います。

func (m *Main) openStore(ctx context.Context) error {
	m.Store.Leaser = m.Leaser
	return m.Store.Open()
}

Main.openStore() は Store.Open() を呼び出します。

// Open initializes the store based on files in the data directory.
func (s *Store) Open() error {
	if err := os.MkdirAll(s.path, 0777); err != nil {
		return err
	}

	if err := s.openDatabases(); err != nil {
		return fmt.Errorf("open databases: %w", err)
	}

	// Begin background replication monitor.
	if s.Leaser != nil {
		s.g.Go(func() error { return s.monitor(s.ctx) })
	} else {
		log.Printf("WARNING: no leaser assigned, running as defacto primary (for testing only)")
		s.isPrimary = true
	}

	return nil
}

Store.monitor() ではConsul KVでPrimaryURLを取得して、Primary/Replicaの各ノードで処理を行います。

Store.acquireLeaseOrPrimaryURL() はConsul KVやSessionによる client-side leader elections を行い、Primary/Replicaの判定を行います。

func (s *Store) acquireLeaseOrPrimaryURL(ctx context.Context) (Lease, string, error) {
	// Attempt to find an existing primary first.
	primaryURL, err := s.Leaser.PrimaryURL(ctx)
	if err != nil && err != ErrNoPrimary {
		return nil, "", fmt.Errorf("fetch primary url: %w", err)
	} else if primaryURL != "" {
		return nil, primaryURL, nil
	}

	// If no primary, attempt to become primary.
	lease, err := s.Leaser.Acquire(ctx)
	if err != nil && err != ErrPrimaryExists {
		return nil, "", fmt.Errorf("acquire lease: %w", err)
	} else if lease != nil {
		return lease, "", nil
	}

	// If we raced to become primary and another node beat us, retry the fetch.
	primaryURL, err = s.Leaser.PrimaryURL(ctx)
	if err != nil {
		return nil, "", err
	}
	return nil, primaryURL, nil
}

すでにPrimaryが存在していれば Leaser.PrimaryURL() でキー(PrimaryURL)の取得を試み、 Primaryが存在していなければ Leaser.Acquire() でキーを設定して自身がPrimaryになります。

Primaryの場合 Store.monitorAsPrimary() で定期的にキーのTTLを更新します。Primaryのノードに異常がありキーのTTLが更新できない場合は、キーの有効期限が切れて別のノードがPrimaryになります。 Store.monitor() のforループではこのようにしてPrimary/Replicaの役割の変更や変更の検知を行います。

Replicaの場合 Store.monitorAsReplica() を呼び出し、PrimaryにHTTP経由でデータベースの変更を問い合わせています。

posMap := s.PosMap()
st, err := s.Client.Stream(ctx, primaryURL, posMap)
if err != nil {
	return fmt.Errorf("connect to primary: %s", err)
}

for {
	frame, err := st.NextFrame()
	if err == io.EOF {
		return nil // clean disconnect
	} else if err != nil {
		return fmt.Errorf("next frame: %w", err)
	}

	switch frame := frame.(type) {
	case *DBStreamFrame:
		if err := s.processDBStreamFrame(ctx, frame); err != nil {
			return fmt.Errorf("process db stream frame: %w", err)
		}
	case *LTXStreamFrame:
		if err := s.processLTXStreamFrame(ctx, frame, st); err != nil {
			return fmt.Errorf("process ltx stream frame: %w", err)
		}
	default:
		return fmt.Errorf("invalid stream frame type: 0x%02x", frame.Type())
	}
}

最後に Main.initFileSystem() を使ってファイルシステムのマウントを行います。

func (m *Main) initFileSystem(ctx context.Context) error {
	mountDir, err := filepath.Abs(m.Config.MountDir)
	if err != nil {
		return fmt.Errorf("abs: %w", err)
	}

	// Build the file system to interact with the store.
	fsys := fuse.NewFileSystem(mountDir, m.Store)
	fsys.Debug = m.Config.Debug
	if err := fsys.Mount(); err != nil {
		return fmt.Errorf("cannot open file system: %s", err)
	}

	// Attach file system to store so it can invalidate the page cache.
	m.Store.Invalidator = fsys

	m.FileSystem = fsys
	return nil
}

最後にノード間通信用のHTTPのサーバーを起動します。

m.HTTPServer.Serve()

HTTPのハンドラーは Server.serverHttp() で定義されています。

case "/stream":
	switch r.Method {
	case http.MethodPost:
		s.handlePostStream(w, r)

とりあえず今回はここまで。Primary/ReplicaのHTTPリクエスト・レスポンスの詳細やFUSEの挙動に関しては別記事で紹介します。

このエントリーをはてなブックマークに追加