前回の続きとして、よく使うワークフローオペレーターの中でdigdagオペレーターについてご紹介させていただきます。
| オペレーター | 概要 | |
| ファイルを呼び出し | call>: | 別のdigファイルを呼び出す。(主に自PJ) |
| require>: | 別のdigファイルを呼び出す。(主に他PJ) | |
| !include: | 別のdigファイルを読み込む。(主に変数など) | |
| タスク制御 | if>: | 条件に対する真偽値で処理を分岐する。 |
| for_each>: | 変数のセットに対してタスクを繰り返す。 | |
| loop>: | 指定した数値の分だけタスクを繰り返す。 |
続きは会員登録およびログイン後にご覧いただけます。以下からログインしてください。
call>:
# main.dig +before_call: echo>: before +run_another_workflow_call: call>: workflow/another.dig +after_call: echo>: after # workflow/another.dig +another_workflow: echo>: another workflow
- call>
– このオペレーターは、別のワークフローをサブタスクとして埋め込みます。 - オプション
– call>: [ファイル名]
ワークフロー定義ファイルへのパスを指定します。ファイル名は.digで終わる必要があります。
require>:
# main.dig +before_call: echo>: before +run_another_workflow_call: require>: another project_name: another_project1 +after_call: echo>: after # another.dig / project_name: another_project1 +another_workflow: echo>: another workflow
- require>
– このオペレーターは、別のワークフローを実行します。 - オプション
– require>: [ファイル名]
ワークフロー名を指定します。
– session_time: ISO_TIME_WITH_ZONE
実行する際のセッションタイムを指定します。
例)session_time: 2017-01-01T00:00:00+00:00
– project_id:[プロジェクトID] or project_name:[プロジェクト名]
project_idまたはproject_nameを指定することで、別プロジェクトのワークフローを開始できます。プロジェクトが存在しない場合、タスクは失敗します。project_idとproject_nameの両方を設定すると、タスクは失敗します。
– rerun_on: none, failed, all(default: none)
noneは試行が既に存在する場合は、ワークフローを開始しません。
failedは試行が存在しその結果が失敗している場合、ワークフローを開始します。
allは試行の結果に関係なく、ワークフローを開始します。※おすすめ
callとrequireの違い
| call | require | |
| サブフォルダのdigファイル実行 | できる | できない |
| 別プロジェクトのワークフロー呼び出し | できない | できる |
| 新規セッションの作成 | 作成されない | 作成される |
| 実行されるタスクの扱い | 呼び出し元のワークフローにマージされる | 呼び出し元のワークフローにはマージされず、独立したセッションとして実行される |
| 呼び出し先ワークフローで失敗後のリトライ時、失敗地点からリトライできるか | できる | できない |
| 呼び出し方法 | digファイルのパスを指定 | ワークフロー名を指定 |
| 使い分け(参考) | 自プロジェクトのワークフローを呼び出す時に使う | 他プロジェクトのワークフローを呼び出す時に使う または、自プロジェクトでもタスク数が1,000を超える場合に使う |
!include:
# workflow1.dig _export: # include env_variables !include : common/config/env_variables.dig # common/config/env_variables.dig variables: - foo - bar
- !include
– 別のdigファイルを読み込むことが可能です。
– 挙動としてはcallオペレーターとほぼ同じですが、!includeでは変数の読み込みも可能です。
– 変数の読み込みは!include 、タスクの呼び出しはcallを使うことで可読性を確保することができます。
if>:
+run_if_param_is_true:
if>: ${param}
_do:
echo>: ${param} == true
+run_if_param_is_false:
if>: ${param}
_do:
echo>: ${param} == true
_else_do:
echo>: ${param} == false
- if>
– if>演算子は、条件がtrueの場合に_doのサブタスクを実行します。
– 条件がfalseの場合、_else_doのサブタスクを実行します。 - オプション
– if>: BOOLEAN
trueまたはfalseとなる条件を指定します。
– _do: タスク
条件がtrueの場合に実行するタスクを指定します。
– _else_do: タスク
条件がfalseの場合に実行するタスクを指定します。
for_each>:
# main.dig
_export:
td:
database: sample_db
tables:
- costomer
- order
+for_each:
for_each>:
table: ${tables}
_do:
+create_table:
td>: query/create_${table}.dig
create_table: ${table}
+for_each_parallel:
for_each>:
obj: [apple, orange]
_parallel: true
_do:
+task:
echo>: ${obj}
- for_each>
– for_each>演算子は、変数のセットを使用してサブタスクを複数回実行します。 - オプション
– for_each>: 変数
ループに使用される変数を指定します。オブジェクトまたはJSON文字列にすることができます。
– _parallel: BOOLEAN | {limit: N}
繰り返しタスクを並行して実行します。limit: Nを指定することで、並行実行数を限定することができます。
– _do: タスク
実行するタスクを指定します。
loop>:
# main.dig
+loop:
loop>: 2
_do:
+task1:
echo>: task1-${i}
+loop_parallel_boolean:
loop>: 3
_parallel: true
_do:
+task2:
echo>: task2-${i}
+loop_parallel_limit:
loop>: 4
_parallel:
limit: 2
_do:
+task3:
echo>: task3-${i}
- loop>
– loop>演算子は、サブタスクを複数回実行します。
–${i}は、サブタスクの変数をエクスポートします。0から始まり、指定したサブタスクが3の場合、i=0,i=1,i=2で実行されます。 - オプション
– loop>: タスクを実行する回数
ループを繰り返す回数を指定します。
– _parallel: BOOLEAN | {limit: N}
繰り返しタスクを並行して実行します。limit: Nを指定することで、並行実行数を限定することができます。
– _do: タスク
実行するタスクを指定します。
最後に
今回は、データマネジメントチームが良く使うオペレーターの中でもdigdagオペレーターについてご紹介させていただきました。
他にもオペレーターはたくさんありますが、まずは紹介したオペレーターを抑えていただければ基本的なワークフローの実装は行っていただけると思いますので、参考にしてみてください。
