EmbulkのAnalytics Cloudのプラグインを作成、リリースしましたー。

前回はSalesforceへのoutputプラグインだったのですが、Salesforceにはそこまで大量のデータを保持できないのと、embulkの用途(ログコレクタのfluentdのバッチ版)的にあまりマッチしていなかったのですが、今回のプラグインの出力先であるAnalytics Cloudは大量データ格納+BIの機能を持っているため、用途としては比較的マッチしているのではないかと思ってます。

使い方

していただいて、以下の様にconfig.ymlを設定すればOKです。

工夫したところ

Taskの中でInsightsExternalData作成→InsightsExternalDataPart作成×Nをすると、タスク分だけファイルアップロードが走ってしまうため、1回のバルク処理に対して1ファイルアップロードにするようにプラグインのクラス側でInsightsExternalDataを作ったり、staticなPartNumberのインクリメントをしたりしています。(これがベストな方法かどうかは謎)

また、MetadataJsonの作成は面倒なので自動的に作成してくれる機能を入れています。これはembulkのinput/parserプラグインが各データからカラムのデータ型を決定するため実現できました。

その他

外部アップロード出来る数は24時間当たり50ファイルまでなので注意が必要。最初はタスク分だけファイルアップロードしていていたのですぐに制限に抵触してしまいました。

スレッド数、タスク数の制御にembulkを -X max_threads=1 -X min_output_tasks=1のオプションを付けて実行する方法もあるので、最初はそれで試した方が良かったですね。

課題/機能

以下は対応予定の課題/機能になります。

  • I/O部分も非同期にしてもう少し高速にロードできるようにする
  • 指定用のMetadataJsonが文字列なのはイケていないのでファイル指定にする
  • 自動生成のMetadataJsonのNumeric型のscale, precisionやTimestampのフォーマットを指定できるようにする
  • カラムがNULLだと落ちるっぽいので、修正する
  • もうちょっとコードを綺麗に…(staticな変数どうしようとか)
  • テストを書こう

不具合等あればissue上げていただくか、プルリクしていただけるとめちゃくちゃ嬉しいです!