前回はREST APIのSignature生成とAPIコールの基本的なところを説明したので、今回は実際にSWFをApexから触ってみます!
SWFの概要説明のところで書いたとおり、SWFの全体的なフローは以下のようになります。 ※エラーパターンを除いたシンプルな正常系フローです。
- Starterがワークフローをスタート&Decision Task(Decider用のタスク)のスケジュール
- DeciderがTaskListにポーリングしてタスクを取得
- 2のDeciderが過去のタスクの実行履歴を参照して、Activity Task(Activity Worker用のタスク)をスケジュール
- Activity WorkerがTaskListにポーリングしてタスクを取得
- 4のActivity Workerがタスク情報を参照して、タスクを実行し、Decision Taskをスケジュール
- 2~5を繰り返して、ワークフローを進める
- Deciderがワークフロー終了条件を満たしたと判断した場合、ワークフローを終了する
REST APIの詳しい説明はこちらから。
※ドメインとワークフロー・アクティビティタイプの登録は割愛します。詳細はこちら!
1. ワークフローの開始(StartWorkflowExecution)
実行ActorはStarterになります。ワークフローを開始地点で、ここで設定した情報を元にワークフローが走ります。Apexサンプルは以下のとおりで、前回作成したcallSWFAPIメソッドを使用しています。
public static String StartWorkflowExecution(
String domain,
String workflowId,
String input,
List<String> tagList,
List<String> taskList,
Map<String, String> workflowType
) {
String jsonBody = JSON.serialize(new Map<String, Object>{
//'childPolicy' => ,
'domain' => domain,
//'executionStartToCloseTimeout' => '',
'input' => input,
//'tagList' => tagList,
//'taskList' => taskList,
//'taskStartToCloseTimeout' => '',
'workflowId' => workflowId,
'workflowType' => workflowType
});
return callSWFAPI('StartWorkflowExecution', jsonBody);
}
■主要パラメータの説明
- domain:AWSコンソールなどで登録したドメイン名を指定
- input:ワークフローへの入力値。この値は後続のDecision TaskやActivity Taskに渡される。
- taskList:Decision TaskのTask Listを指定。Decision TaskのTask Listはworkflow毎に1つのみ存在し、ここで指定した値(指定しなければデフォルト値)がworkflow実行中のDecision Task Listとなる。
- workflowId:実行中のワークフロー内で一意に設定するworkflow毎のID値
{
"runId": "12ysohWoRpjbW2jQ4GwEv5CD8otWn3z5/jVTXeXE7KF9M="
}
runIdはSWFが自動生成する一意な識別子になります。
2. Decision Taskの取得(PollForDecisionTask)
実行ActorはDeciderです。Decision Task Listに投げられたDecision Taskを取得します。
public static String PollForDecisionTask(
String domain,
String identity,
Map<String, String> taskList
) {
String jsonBody = JSON.serialize(new Map<String, Object>{
'domain' => domain,
'identity' => identity,
//'maximumPageSize' => maximumPageSize,
//'nextPageToken' => nextPageToken,
//'reverseOrder' => reverseOrder,
'taskList' => taskList
});
return callSWFAPI('PollForDecisionTask', jsonBody);
}
■主要パラメータの説明
- identity:どのDeciderが取得しようとしているかを明示する。ワークフロー履歴でTaskを取得したDeciderを特定するためのパラメータ。
- taskList:取得先のDecision Task Listを指定。1と同じTask Listを設定すれば、1で投げたワークフローのDecision Taskを取得可能。
{
"events": [
{
"eventId": 1,
"eventTimestamp": 1392904914.964,
"eventType": "WorkflowExecutionStarted",
"workflowExecutionStartedEventAttributes": {
"childPolicy": "TERMINATE",
"executionStartToCloseTimeout": "300",
"input": "Test Message",
"parentInitiatedEventId": 0,
"taskList": {
"name": "mainTaskList"
},
"taskStartToCloseTimeout": "90",
"workflowType": {
"name": "TestWorkFlow",
"version": "1.1"
}
}
},
{
"decisionTaskScheduledEventAttributes": {
"startToCloseTimeout": "90",
"taskList": {
"name": "mainTaskList"
}
},
"eventId": 2,
"eventTimestamp": 1392904914.964,
"eventType": "DecisionTaskScheduled"
},
{
"decisionTaskStartedEventAttributes": {
"identity": "test-identity",
"scheduledEventId": 2
},
"eventId": 3,
"eventTimestamp": 1392904928.626,
"eventType": "DecisionTaskStarted"
}
],
"previousStartedEventId": 0,
"startedEventId": 3,
"taskToken": "AAAAKgAAAAEAAAAAAAAAAi7pAgzTfeadKjv3fFNpHq3FDjuDpXLZrsRUvOoUKHDXnAnv3Bt14FTxaAINAb9x6ljCF0LwLKxQcNDbsvjDB9n/Y+9GtnncZYjbFGbg17CzE3ZPb+wj4ldnvaRDHQjKeM3WPXbtefa4tUOoClUawJr/3EtV1V2E0HMHaTy7OkOylqLg1EpQ1AurE/HM3kIpWhy4y9itNU2IA41fSTNDRg7X8QAug/RCSUtK2G+XghJn+Zih+/0lPejveZSC7xj7LaPiAVPgFWM+jp4u3mftr/6mJIqJJ2m9M7fmlMxCyOVJ",
"workflowExecution": {
"runId": "12ysohWoRpjbW2jQ4GwEv5CD8otWn3z5/jVTXeXE7KF9M=",
"workflowId": "20140215001"
},
"workflowType": {
"name": "TestWorkFlow",
"version": "1.1"
}
}
■レスポンスの主要パラメータの説明
- events:Workflowのこれまでの履歴(event)
- workflowType:1で指定したworkflowType。
- taskToken:タスクのやり取りに使用するトークン。これを使ってSWF⇔Decider、Worker間のタスクのやり取りを行う。
3. Activity Taskのスケジュール(RespondDecisionTaskCompleted)
実行ActorはDeciderです。Decision Taskを受け取ったら、SWFに対して受け取りました!という通知とビジネスロジックに応じて、次に実行するActivity Taskをスケジュールします。
public static String ScheduleActivityTaskDecision(
String activityId,
Map<String, String> activityType,
String input,
Map<String, String> taskList,
String executionContext,
String taskToken
) {
return RespondDecisionTaskCompleted(
new List<Map<String, Object>>{
new Map<String, Object> {
'decisionType' => 'ScheduleActivityTask',
'scheduleActivityTaskDecisionAttributes' => new Map<String, Object> {
'activityId' => activityId,
'activityType' => activityType,
//'control' => '',
//'heartbeatTimeout' => '',
'input' => input,
//'scheduleToStartTimeout' => '',
//'scheduleToCloseTimeout' => '',
'taskList' => taskList
//'startToCloseTimeout' => '
}
}
},
executionContext,
taskToken
);
}
public static String RespondDecisionTaskCompleted (
List<Map<String, Object>> decisions,
String executionContext,
String taskToken
) {
String jsonBody = JSON.serialize(new Map<String, Object>{
'decisions' => decisions,
'executionContext' => executionContext,
'taskToken' => taskToken
});
return callSWFAPI('RespondDecisionTaskCompleted', jsonBody);
}
■主要パラメータの説明
- decisions:Deciderが決定する次のタスクを指定。今回は次の処理の実行を意味する、”ActivityTaskの登録(ScheduleActivityTask)“を指定。
- taskToken:2で取得したトークンを入力
- decisionType:Decisionのタイプ。
- activityId:タスクの一意な識別子。
- activityType:事前に登録したActivityTypeを指定。Workerが次に実行するアクションを意味する。
- taskList:Activity TaskのTask Listを指定。Activity Task Listは1つのワークフローに対して複数存在する。
成功時のレスポンスは空になります。
今回は、3の工程で次のActivity Taskを登録しましたが、WorkerにアサインされていないActivityTaskをキャンセルしたり(RequestCancelActivityTask)、ワークフローを終了させたり(FailWorkflowExecution, CompleteWorkflowExecution)、子ワークフローを開始させたり(StartChildWorkflowExecution)、Deciderは様々な方法でワークフローを制御することができます。
また、随所に出てきているtaskListパラメータは事前に登録するものではなく、実行時に名前を自由に指定して利用できるので、動的なTaskListの指定ができます。
ということで、次回はActivityTaskのポーリングからワークフロー終了までやります!