データマネジメントチームの藤井 温子です。
この記事では、日次処理のワークフローを使って、過去分のデータをまとめて取り込み・処理する際の「日次ループ処理」の考え方・記述方法について解説しています。日次の処理をワークフローで行っている場合に、過去分やメンテナンス等でワークフローを止めていた期間分だけ、後日まとめて行いたいということがあると思います。このような場合には、もともと使っている日次ワークフローを活用して、ループ処理を行うことでまとめて処理できます。
この記事では、以下の2点について説明しています。
- 日次ループ処理を含むプロジェクトのファイル構成
- 日次のループ処理の作り方
日次ループ処理を含むプロジェクトのファイル構成
ワークフローは通常は日次など定期実行するもののことが多いため、同じプロジェクトに、過去分やワークフロー実行を止めていた期間のデータをまとめて処理する日次ループ処理の.digファイルをプロジェクトの中に入れておくと便利です。例えば、以下のようなファイル構成が代表的です。
日次のループ処理の作り方
ループ処理のオペレーター
次に、具体的にどのように日次ループ処理を作成するかをご説明します。ループ処理のためのオペレーターには以下のようなものがあります。
- loop>
指定した回数、下位のタスクを繰り返す - for_each>
変数のリストを使って、下位のタスクを繰り返す - for_range>
from, to, stepを指定することで、ある範囲で下位のタスクを繰り返す - td_for_each>
クエリ結果を変数として使って、下位のタスクを繰り返す
その中でも、日次ループ処理を行うときはfor_range>
オペレーターが便利です。
for_range>を使った日次ループ処理
for_range>は変数を使って下位のタスクを複数回行います。from, to, indexの3つの変数をエクスポートするので、それらの変数に応じて下位のタスクが実行されます。
for_range>オペレーターにはfrom, to, stepの3つの変数を指定します。
+repeat: for_range>: from: 10 to: 50 step: 10 _do: echo>: processing from ${range.from} to ${range.to}.
上記の例では、10から50まで、10刻みで下位のタスクが実行されます。つまり、4つのタスクが実行されます。各タスクの変数は以下のようになります。
<タスク1> range.from: 10, range.to: 20, range.index: 0
<タスク2> range.from: 20, range.to: 30, range.index: 1
<タスク3> range.from: 30, range.to: 40, range.index: 2
<タスク4> range.from: 40, range.to: 50, range.index: 3
※タスク5はrange.from: 50となり、to: 50を超えてしまうので実行されない
これを利用して、日次の処理を1日刻みで一定期間行います。
_export: period: first: "2019-08-01" last: "2019-12-01" +for_range_from_to: for_range>: from: ${moment(period.first).unix()} to: ${moment(period.last).unix()} step: 86400 _do: # 実行したい下位タスクを記述
上記の例では、変数 ${range.from} に2019年8月1日から2019年11月30日の日付 (unixtime) が1日刻みで入ります。(step:の86400は、60秒×60分×24時間=86400秒で一日分の秒数を意味します。)過去分のデータのファイル名や値に日付(例:filename_YYYYMMDD)を付けておくことで、${range.from}の値をキーにしてデータの取り込み・処理を行うことができます。
実行されるのは指定したto:未満の範囲までという点はご注意ください。つまり、上記の例ではlast:で指定した日の「前日まで」となります。もし、last:で指定した日までの範囲で実行したい場合は、to:${moment(period.last).unix())} に1日足しておく必要があります。
処理したデータのセッションタイムを変更する
日次用のワークフローを利用して、ループ処理でデータの取り込み・処理などを行った際には、本来いつ処理されるべきデータなのかについて情報がなければ、その後の処理に不都合をきたします。session_timeを実際のワークフロー実行日ではなく${range.from}の日付に指定し、処理したデータのtimeカラムにsession_timeが入るようにしておくことで、処理されたデータがいつのデータなのか(本来はいつ処理されるべきデータか)を残しておくことができます。
実装例としては以下のようになります。
+for_range_from_to: for_range>: from: ${moment(period.first).unix()} to: ${moment(period.last).unix()} step: 86400 _do: +task: require>: workflow_010 session_time: ${moment.unix(range.from).format()} # セッションタイムを変更 rerun_on: all
以上となります。この記事が少しでもご参考になりましたら幸いです。
(参考資料)
for_range>オペレーターの詳細については、dig dagの公式ドキュメントをご参考ください。
http://docs.digdag.io/operators/for_range.html