EmbulkのAnalytics Cloudのプラグインを作成、リリースしましたー。
前回はSalesforceへのoutputプラグインだったのですが、Salesforceにはそこまで大量のデータを保持できないのと、embulkの用途(ログコレクタのfluentdのバッチ版)的にあまりマッチしていなかったのですが、今回のプラグインの出力先であるAnalytics Cloudは大量データ格納+BIの機能を持っているため、用途としては比較的マッチしているのではないかと思ってます。
使い方
$ embulk gem install embulk-output-analytics_cloud
していただいて、以下の様にconfig.ymlを設定すればOKです。
...
out:
type: analytics_cloud
username: {username}
password: {password}
edgemart_alias: {edgemart_alias}
工夫したところ
Taskの中でInsightsExternalData作成→InsightsExternalDataPart作成×Nをすると、タスク分だけファイルアップロードが走ってしまうため、1回のバルク処理に対して1ファイルアップロードにするようにプラグインのクラス側でInsightsExternalDataを作ったり、staticなPartNumberのインクリメントをしたりしています。(これがベストな方法かどうかは謎)また、MetadataJsonの作成は面倒なので自動的に作成してくれる機能を入れています。これはembulkのinput/parserプラグインが各データからカラムのデータ型を決定するため実現できました。
その他
外部アップロード出来る数は24時間当たり50ファイルまでなので注意が必要。最初はタスク分だけファイルアップロードしていていたのですぐに制限に抵触してしまいました。スレッド数、タスク数の制御にembulkを -X max_threads=1 -X min_output_tasks=1のオプションを付けて実行する方法もあるので、最初はそれで試した方が良かったですね。
課題/機能
以下は対応予定の課題/機能になります。- I/O部分も非同期にしてもう少し高速にロードできるようにする
- 指定用のMetadataJsonが文字列なのはイケていないのでファイル指定にする
- 自動生成のMetadataJsonのNumeric型のscale, precisionやTimestampのフォーマットを指定できるようにする
- カラムがNULLだと落ちるっぽいので、修正する
- もうちょっとコードを綺麗に…(staticな変数どうしようとか)
- テストを書こう