データマネジメントチームの金野 浩之です。
前回、前々回とWorkflowのエラー通知を行う方法をご紹介させていただきましたが、今回はCustom Scripts内の処理結果に応じてWorkflowの処理について制御する方法についてご紹介致します。
Custom ScriptsとはWorkflow上の仮想化環境でPythonのスクリプトを実行することができるTreasure Data CDPの機能です。Custom Scriptsを使いこなすと、SQLやWorkflowのオペレータでは難しいデータ処理や、コネクターが存在しない外部ツールとのAPI連携などの実装ができるため、Treasure Data CDPの利活用の幅を広げることが可能です。
通常、Pythonでエラー制御をする場合はtry – except文で例外を補足し、sys.exit()を利用してプログラムの処理を終了することが多いかと思います。sys.exit()は引数に終了ステータスを入れて処理を終わらせることができますので、例えば 0 は “正常終了”、0 以外の整数を “異常終了” とするような使い方をします。
一方で、Custom Scriptsでsys.exit()を利用するとどのような引数を渡そうがエラー判定されてしてしまい、Workflowもその時点でエラーになってしまいます。そのため、sys.exit(0)のときはWorkflowの後続処理を実行し、sys.exit(1)のときは_error>:オペレータを実行する、といったような制御を行うことはできません。そこで、Custom Scriptsの処理結果をWorkflowに戻す際はsys.exit()以外の方法でフラグを渡してあげる必要があります。
Custom Scriptsの処理結果でWorkflowの処理を制御するサンプル
digdag.env.store()という関数を利用することで、Custom ScriptsからWorkflowに値を戻すことができます。例えば以下はdigdag.env.store()を利用したCustom Scriptsのサンプルになります。このサンプルスクリプトでへあSQLの結果が100行以上であればis_success、100行未満ならis_errorというフラグをWorkflowに戻します。もちろん、どちらもCustom Scriptsとしては正常に終了させることができます。
Workflowのサンプルは下記の通りです。上記のCustom Scriptsからis_errorが戻ってきたときだけ、fail>:オペレータでエラーメッセージを飛ばすことができます。
おわりに
このように、digdag.env.store()を使用しCustom Scriptsの処理結果をWorkflowに戻してあげることで、より細かいロジックの制御ができるようになりますので、ぜひ一度お試しください。