EmbulkというOSSが非常に気になっていた&Salesforceのoutputプラグインが無かったので、勉強がてら作ってみました。↓
tzmfreedom/embulk-output-salesforce
今回は備忘としてoutputプラグインの作り方と、embulk-output-salesforceの使い方を書いていきます! Output Pluginの作成は以下のURLを参考にしました。
Java で Embulk Output Plugin を書く - Qiita
ちなみに、ソースコードを全て読んだわけではない&読んだところも合っているか怪しいので、多少間違いが有ると思いますがご了承願います。
outputプラグインの作り方
テンプレートの作成
プラグインはJavaかRubyで書くことが出来ます。今回はJavaを使いました。 プラグインのテンプレートは以下のコマンドで作成できます。
embulk new java-output {プラグイン名}
これでGradleベースのプロジェクトが自動生成されます。あとはsrc/main/java/org/embulk/outputにあるJavaファイルを書いていけばOK。
設定ファイルのキー値の設定
Taskクラスを継承したPluginTaskクラスにキー値と属性を指定し
ConfigSource#loadConfig(PluginTask.class)
で実際の設定を取得します。
PluginTaskにはannotationで属性を指定します。任意入力の項目の場合はOptional型で定義します。未指定時のデフォルト設定値はConfigDefaultアノテーションで定義します。文字列のデフォルトを定義する場合は@ConfigDefault(“\“hoge\”“)のようにエスケープが必要。
処理の流れ
OutputPluginの処理の流れだけに着目すると
OutputPlugin#transaction
→OutputPlugin#open(→TransactionalPageOutput#add …)
→OutputPlugin#cleanup
といったフローになります(自信なし)
TransactionalPageOutput
OutputPlugin#openではTransactionalPageOutputを継承したクラスを戻り値とします。 openはタスクの数だけ呼ばれるっぽいので、fileのinputプラグインを使って2つのファイルを処理する場合は、openが2回呼ばれて、TransactionalPageOutputのサブクラスのインスタンスが2つ作成されます。
TransactionalPageOutput#addでは引数のPageに実際のレコードが渡されるので、PageをPageReaderに読ませて、PageReaderではスキーマに応じた全カラムに対して処理を行うことになります。100桁くらいの3000行のデータを入れたら、一回のadd呼び出しで110件ちょっとのレコードが格納されていました。
pageReader.getSchema().visitColumns(…)の部分が全カラムに関する処理になり、引数にはColumnVisitorを継承したクラスを入れます(利用がワンタイムなので匿名クラスにしてます)。ColumnVisitorではカラムのデータ型に応じた処理を記載していきます。
Salesforceの場合はTimeStampだけ注意しておけばOKで、pageReaderで取得できるTimeStampの型はorg.joda.time.DateTime型であるため、wscを利用する場合は、データ型をjava.util.Calendar型やjava.util.Date型に変換する必要があります。wscはデータ型によってSOAPのxsi:typeを書き換えるので、SObject#addFieldで入れるObject型は注意が必要です。
テストの方法
pluginのテストの方法としては大きく2つあり、1つ目は実際のembulkコマンドに作ったプラグインを食わせる方法、もう1つはembulkのクラスを使ってJava内で実行するやり方です。
1つ目は
gradlew classpath
からの
embulk run -I ./embulk-output-salesforce/lib config.yml
を実行すればプラグインの挙動を確認できます。
2つ目はembulk runでやってることをJavaのコードを書いて実現するやり方です。
参考URLはこちら→Embulkプラグインのテストを楽にやりたい - 今日もプログラミング
embulk-output-salesforceの使い方
基本的には以下のようなoutputの定義を設定ファイルに書けばOKです。
out:
type: salesforce
username: hoge@example.com
password: fuga
sobject: Account
ログインからデータ入力まで全てSOAP APIを使ってます。 内部的にはカラム名=API参照名としてデータを格納しているので、inputプラグインのfileを使う場合、inputには以下のように記述します。
in:
type: file
path_prefix: /path/to/salesforce_
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: ','
quote: '"'
escape: ''
skip_header_lines: 1
comment_line_marker: null
allow_extra_columns: true
allow_optional_columns: false
columns:
- {name: Name, type: string}
- {name: Date__c, type: timestamp, format: '%Y-%m-%d'}
- {name: AnnualRevenue, type: double}
- {name: ParentId, type: string}
Embulkの良い所はプラガブルなのでS3にCSVファイルを置いて読み込ませるのも、超簡単に出来ます。(embuk-input-s3を利用します)
in:
type: s3
bucket: hogebucket
path_prefix: salesforce_
endpoint: s3.amazonaws.com
access_key_id: AKI***********
secret_access_key: *************
parser:
...
resultdirを指定すればData Loaderのようにsuccess.csvとerror_.csvを出す仕様になっています。
プラグインを作ってみた感想
hubotのときもそうでしたが、やっぱり中身のコードをちゃんと読まないとプラグインは書けない感じですね…。読まないと、プラグインのインターフェースのメソッドがどういうときにどう呼ばれるのかがわからないですし、エラーハンドリングとかも書けないですし。 あと、中身のコードを読むことでそのOSSのイズムみたいなものを何となく感じ取って、そのイズムをプラグインに反映させる、というのも大事なような気がしています。今回は数時間読んでみて、勘でコーディングしてtry&errorみたいな感じだったので、ケースによっては全然動かないポンコツプラグインかもしれませんw
感想その2
「Data Connector for Amazon S3」の記事とか、Embulkの図を見てたら、色んなサービスからのデータをtreasuredataに集約して、そこから中間データマート経由でBIツール等でビジュアライゼーションしたり統計学を駆使して解析する、というようなフローにおいて、データを集約する部分を並列処理で高速に実行し、treasuredataへの出力も冪等性を担保して、かつ色んなサービスからのデータを取り込むためにプラガブルにする、というのがEmbulkの役割な気がしています。
なので、基幹システムとか他のDBのデータをSalesforceにデータを入れて、的なETLツールじゃなくて「fluentdのバッチ版」としてデータを集約しているような感じですかね。作ってから気づいたぜ…。