前回の続きとして、よく使うワークフローオペレーターの中でdigdagオペレーターについてご紹介させていただきます。
オペレーター | 概要 | |
ファイルを呼び出し | call>: | 別のdigファイルを呼び出す。(主に自PJ) |
require>: | 別のdigファイルを呼び出す。(主に他PJ) | |
!include: | 別のdigファイルを読み込む。(主に変数など) | |
タスク制御 | if>: | 条件に対する真偽値で処理を分岐する。 |
for_each>: | 変数のセットに対してタスクを繰り返す。 | |
loop>: | 指定した数値の分だけタスクを繰り返す。 |
続きは会員登録およびログイン後にご覧いただけます。以下からログインしてください。
call>:
# main.dig +before_call: echo>: before +run_another_workflow_call: call>: workflow/another.dig +after_call: echo>: after # workflow/another.dig +another_workflow: echo>: another workflow
- call>
– このオペレーターは、別のワークフローをサブタスクとして埋め込みます。 - オプション
– call>: [ファイル名]
ワークフロー定義ファイルへのパスを指定します。ファイル名は.digで終わる必要があります。
require>:
# main.dig +before_call: echo>: before +run_another_workflow_call: require>: another project_name: another_project1 +after_call: echo>: after # another.dig / project_name: another_project1 +another_workflow: echo>: another workflow
- require>
– このオペレーターは、別のワークフローを実行します。 - オプション
– require>: [ファイル名]
ワークフロー名を指定します。
– session_time: ISO_TIME_WITH_ZONE
実行する際のセッションタイムを指定します。
例)session_time: 2017-01-01T00:00:00+00:00
– project_id:[プロジェクトID] or project_name:[プロジェクト名]
project_idまたはproject_nameを指定することで、別プロジェクトのワークフローを開始できます。プロジェクトが存在しない場合、タスクは失敗します。project_idとproject_nameの両方を設定すると、タスクは失敗します。
– rerun_on: none, failed, all(default: none)
noneは試行が既に存在する場合は、ワークフローを開始しません。
failedは試行が存在しその結果が失敗している場合、ワークフローを開始します。
allは試行の結果に関係なく、ワークフローを開始します。※おすすめ
callとrequireの違い
call | require | |
サブフォルダのdigファイル実行 | できる | できない |
別プロジェクトのワークフロー呼び出し | できない | できる |
新規セッションの作成 | 作成されない | 作成される |
実行されるタスクの扱い | 呼び出し元のワークフローにマージされる | 呼び出し元のワークフローにはマージされず、独立したセッションとして実行される |
呼び出し先ワークフローで失敗後のリトライ時、失敗地点からリトライできるか | できる | できない |
呼び出し方法 | digファイルのパスを指定 | ワークフロー名を指定 |
使い分け(参考) | 自プロジェクトのワークフローを呼び出す時に使う | 他プロジェクトのワークフローを呼び出す時に使う または、自プロジェクトでもタスク数が1,000を超える場合に使う |
!include:
# workflow1.dig _export: # include env_variables !include : common/config/env_variables.dig # common/config/env_variables.dig variables: - foo - bar
- !include
– 別のdigファイルを読み込むことが可能です。
– 挙動としてはcallオペレーターとほぼ同じですが、!includeでは変数の読み込みも可能です。
– 変数の読み込みは!include 、タスクの呼び出しはcallを使うことで可読性を確保することができます。
if>:
+run_if_param_is_true: if>: ${param} _do: echo>: ${param} == true +run_if_param_is_false: if>: ${param} _do: echo>: ${param} == true _else_do: echo>: ${param} == false
- if>
– if>演算子は、条件がtrue
の場合に_do
のサブタスクを実行します。
– 条件がfalse
の場合、_else_do
のサブタスクを実行します。 - オプション
– if>: BOOLEAN
trueまたはfalseとなる条件を指定します。
– _do: タスク
条件がtrueの場合に実行するタスクを指定します。
– _else_do: タスク
条件がfalseの場合に実行するタスクを指定します。
for_each>:
# main.dig _export: td: database: sample_db tables: - costomer - order +for_each: for_each>: table: ${tables} _do: +create_table: td>: query/create_${table}.dig create_table: ${table} +for_each_parallel: for_each>: obj: [apple, orange] _parallel: true _do: +task: echo>: ${obj}
- for_each>
– for_each>演算子は、変数のセットを使用してサブタスクを複数回実行します。 - オプション
– for_each>: 変数
ループに使用される変数を指定します。オブジェクトまたはJSON文字列にすることができます。
– _parallel: BOOLEAN | {limit: N}
繰り返しタスクを並行して実行します。limit: Nを指定することで、並行実行数を限定することができます。
– _do: タスク
実行するタスクを指定します。
loop>:
# main.dig +loop: loop>: 2 _do: +task1: echo>: task1-${i} +loop_parallel_boolean: loop>: 3 _parallel: true _do: +task2: echo>: task2-${i} +loop_parallel_limit: loop>: 4 _parallel: limit: 2 _do: +task3: echo>: task3-${i}
- loop>
– loop>演算子は、サブタスクを複数回実行します。
–${i}
は、サブタスクの変数をエクスポートします。0から始まり、指定したサブタスクが3の場合、i=0,i=1,i=2で実行されます。 - オプション
– loop>: タスクを実行する回数
ループを繰り返す回数を指定します。
– _parallel: BOOLEAN | {limit: N}
繰り返しタスクを並行して実行します。limit: Nを指定することで、並行実行数を限定することができます。
– _do: タスク
実行するタスクを指定します。
最後に
今回は、データマネジメントチームが良く使うオペレーターの中でもdigdagオペレーターについてご紹介させていただきました。
他にもオペレーターはたくさんありますが、まずは紹介したオペレーターを抑えていただければ基本的なワークフローの実装は行っていただけると思いますので、参考にしてみてください。