2014-02-20

ApexからSWFを触ってみる【ActivityTaskのスケジュールまで】

前回はREST APIのSignature生成とAPIコールの基本的なところを説明したので、今回は実際にSWFをApexから触ってみます!

SWFの概要説明のところで書いたとおり、SWFの全体的なフローは以下のようになります。 ※エラーパターンを除いたシンプルな正常系フローです。

  1. Starterがワークフローをスタート&Decision Task(Decider用のタスク)のスケジュール
  2. DeciderがTaskListにポーリングしてタスクを取得
  3. 2のDeciderが過去のタスクの実行履歴を参照して、Activity Task(Activity Worker用のタスク)をスケジュール
  4. Activity WorkerがTaskListにポーリングしてタスクを取得
  5. 4のActivity Workerがタスク情報を参照して、タスクを実行し、Decision Taskをスケジュール
  6. 2~5を繰り返して、ワークフローを進める
  7. Deciderがワークフロー終了条件を満たしたと判断した場合、ワークフローを終了する
今回は1~3までをApexから操作してみます!

REST APIの詳しい説明はこちらから。

※ドメインとワークフロー・アクティビティタイプの登録は割愛します。詳細はこちら

1. ワークフローの開始(StartWorkflowExecution)

実行ActorはStarterになります。ワークフローを開始地点で、ここで設定した情報を元にワークフローが走ります。

Apexサンプルは以下のとおりで、前回作成したcallSWFAPIメソッドを使用しています。

public static String StartWorkflowExecution(
    String domain,
    String workflowId,
    String input,
    List<String> tagList,
    List<String> taskList,
    Map<String, String> workflowType
) {
    String jsonBody = JSON.serialize(new Map<String, Object>{
        //'childPolicy' => ,
        'domain' => domain,
        //'executionStartToCloseTimeout' => '',
        'input' => input,
        //'tagList' => tagList,
        //'taskList' => taskList,
        //'taskStartToCloseTimeout' => '',
        'workflowId' => workflowId,
        'workflowType' => workflowType
    });
    return callSWFAPI('StartWorkflowExecution', jsonBody);
}

 

■主要パラメータの説明

レスポンスは以下のとおり

{
    "runId": "12ysohWoRpjbW2jQ4GwEv5CD8otWn3z5/jVTXeXE7KF9M="
}

runIdはSWFが自動生成する一意な識別子になります。

2. Decision Taskの取得(PollForDecisionTask)

実行ActorはDeciderです。

Decision Task Listに投げられたDecision Taskを取得します。

public static String PollForDecisionTask(
    String domain,
    String identity,
    Map<String, String> taskList
) {
    String jsonBody = JSON.serialize(new Map<String, Object>{
        'domain' => domain,
        'identity' => identity,
        //'maximumPageSize' => maximumPageSize,
        //'nextPageToken' => nextPageToken,
        //'reverseOrder' => reverseOrder,
        'taskList' => taskList
    });
    return callSWFAPI('PollForDecisionTask', jsonBody);
}

■主要パラメータの説明

レスポンスはこんな感じ

{
    "events": [
        {
            "eventId": 1,
            "eventTimestamp": 1392904914.964,
            "eventType": "WorkflowExecutionStarted",
            "workflowExecutionStartedEventAttributes": {
                "childPolicy": "TERMINATE",
                "executionStartToCloseTimeout": "300",
                "input": "Test Message",
                "parentInitiatedEventId": 0,
                "taskList": {
                    "name": "mainTaskList"
                },
                "taskStartToCloseTimeout": "90",
                "workflowType": {
                    "name": "TestWorkFlow",
                    "version": "1.1"
                }
            }
        },
        {
            "decisionTaskScheduledEventAttributes": {
                "startToCloseTimeout": "90",
                "taskList": {
                    "name": "mainTaskList"
                }
            },
            "eventId": 2,
            "eventTimestamp": 1392904914.964,
            "eventType": "DecisionTaskScheduled"
        },
        {
            "decisionTaskStartedEventAttributes": {
                "identity": "test-identity",
                "scheduledEventId": 2
            },
            "eventId": 3,
            "eventTimestamp": 1392904928.626,
            "eventType": "DecisionTaskStarted"
        }
    ],
    "previousStartedEventId": 0,
    "startedEventId": 3,
    "taskToken": "AAAAKgAAAAEAAAAAAAAAAi7pAgzTfeadKjv3fFNpHq3FDjuDpXLZrsRUvOoUKHDXnAnv3Bt14FTxaAINAb9x6ljCF0LwLKxQcNDbsvjDB9n/Y+9GtnncZYjbFGbg17CzE3ZPb+wj4ldnvaRDHQjKeM3WPXbtefa4tUOoClUawJr/3EtV1V2E0HMHaTy7OkOylqLg1EpQ1AurE/HM3kIpWhy4y9itNU2IA41fSTNDRg7X8QAug/RCSUtK2G+XghJn+Zih+/0lPejveZSC7xj7LaPiAVPgFWM+jp4u3mftr/6mJIqJJ2m9M7fmlMxCyOVJ",
    "workflowExecution": {
        "runId": "12ysohWoRpjbW2jQ4GwEv5CD8otWn3z5/jVTXeXE7KF9M=",
        "workflowId": "20140215001"
    },
    "workflowType": {
        "name": "TestWorkFlow",
        "version": "1.1"
    }
}

■レスポンスの主要パラメータの説明

他にも色々とパラメータがありますが、基本的には履歴とワークフロータイプを見て、次のアクションを決めるというのがDeciderの仕事になります。

3. Activity Taskのスケジュール(RespondDecisionTaskCompleted)

実行ActorはDeciderです。

Decision Taskを受け取ったら、SWFに対して受け取りました!という通知とビジネスロジックに応じて、次に実行するActivity Taskをスケジュールします。

public static String ScheduleActivityTaskDecision(
    String activityId,
    Map<String, String> activityType,
    String input,
    Map<String, String> taskList,
    String executionContext,
    String taskToken
) {
    return RespondDecisionTaskCompleted(
        new List<Map<String, Object>>{
            new Map<String, Object> {
                'decisionType' => 'ScheduleActivityTask',
                'scheduleActivityTaskDecisionAttributes' => new Map<String, Object> {
                    'activityId' => activityId,
                    'activityType' => activityType,
                    //'control' => '',
                    //'heartbeatTimeout' => '',
                    'input' => input,
                    //'scheduleToStartTimeout' => '',
                    //'scheduleToCloseTimeout' => '',
                    'taskList' => taskList
                    //'startToCloseTimeout' => '
                }
            }
        }, 
        executionContext, 
        taskToken
    );
}

public static String RespondDecisionTaskCompleted (
    List<Map<String, Object>> decisions,
    String executionContext,
    String taskToken
) {
    String jsonBody = JSON.serialize(new Map<String, Object>{
        'decisions' => decisions,
        'executionContext' => executionContext,
        'taskToken' => taskToken
    });
    return callSWFAPI('RespondDecisionTaskCompleted', jsonBody);
}

■主要パラメータの説明

これにより、ある特定のActivity Taskをある特定のWorkerに実行させることが出来て、局所的なスケールアウト・スケールアップが可能になります。1のDecision Task Listとは異なるので、同名を指定しても問題ありません。

成功時のレスポンスは空になります。

今回は、3の工程で次のActivity Taskを登録しましたが、WorkerにアサインされていないActivityTaskをキャンセルしたり(RequestCancelActivityTask)、ワークフローを終了させたり(FailWorkflowExecution, CompleteWorkflowExecution)、子ワークフローを開始させたり(StartChildWorkflowExecution)、Deciderは様々な方法でワークフローを制御することができます。

また、随所に出てきているtaskListパラメータは事前に登録するものではなく、実行時に名前を自由に指定して利用できるので、動的なTaskListの指定ができます。

ということで、次回はActivityTaskのポーリングからワークフロー終了までやります!

このエントリーをはてなブックマークに追加