Treasure Workflowでは処理のワークフローを設定するために、.digという拡張子をつけたファイルを作成します。このdigファイルには大きく分けて、タスクとオペレーターの2種類とパラメーターをつかって記述します。
オペレーターとは、
- 実際の処理内容を定義したもの
- オペレーターはTreasure Workflowで定義されている中から選択する
今回は、よく使うオペレーターの中でTreasuredataオペレーターについてご紹介させていただきます。
オペレーター | 概要 | |
クエリ実行 | td>: | クエリを実行する |
クエリ実行 | td_run>: | 保存クエリを実行する |
クエリ実行 | td_run>: | クエリを実行し、結果行ごとにサブタスクをループする |
データコネクター(取得) | td_load>: | データの一括読み込み |
Database/Table操作 | td_ddl>: | 操作をする |
td>:
test.dig _export: td: database: test_db +task1: td>: queries/mst_customer.sql create_table: mst_customer engine: hive +task2: td>: queries/trs_sales.sql insert_into: trs_sales database: test_db2 engine: presto +task3: td>: query: | SELECT ARRAY_JOIN(ARRAY_AGG(column_name), ‘, ‘) AS cols FROM information_schema.columns WHERE table_schema = ‘schema_name’ AND table_name = ‘table_name’ store_last_results: true queries/mst_customer.sql SELECT * FROM customer queries/trs_sales.sql SELECT * FROM sales WHERE time = ${moment(session_date).unix()}
td> オペレーターは、Treasure Data CDPに対してHiveまたはPrestoクエリを実行します。
実行する方法としては、以下2パターンあります。
- クエリファイルを呼び出す。
- 「query: |」を使ってdigファイルに直接クエリを記述する。
また、以下のオプションを使ってクエリやクエリ結果の制御を行います。
- create_table: [テーブル名]
- insert_into: [テーブル名]
- database: [データベース名]
- engine: [presto or hive]
- store_last_results: BOOLEAN
結果から作成するテーブルの名前。テーブルが既に存在する場合は削除します。
結果を追加するテーブルの名前。テーブルが存在しない場合は作成されます。
_exportで宣言されていないデータベースを指定することができます。
クエリエンジンにprestoまたはhiveを指定できます。
engineを使わない場合は、デフォルトでprestoになります。
クエリ結果の最初の1行を${td.last_results}変数に格納します。
store_last_resultsを使わない場合は、デフォルトでfalseになります。
td_run>:
test.dig _export: td: database: test_db +task1: td_run>:2344127 download_file: dl_file.csv +task2: td>: test_query store_last_results: true クエリID:2344127 クエリ名:test_query SELECT * FROM customer
td_run> オペレーターは、Treasure Data CDPに保存されているクエリを実行します。
実行する方法としては2パターンあります。
- クエリIDを指定して実行する。※番号が指定された場合
- クエリ名を指定して実行する。※番号以外が指定された場合
また、以下のオプションを使ってクエリやクエリ結果の制御を行います。
- download_file: [ファイル名]
- store_last_results: BOOLEAN
クエリ結果をローカルCSVファイルとして保存します。
クエリ結果の最初の1行を${td.last_results}変数に格納します。
store_last_resultsを使わない場合は、デフォルトでfalseになります。
td_for_each>:
test.dig _export: td: database: test_db +task1: td_for_each>: queries/mst_customer.sql _do: +show: echo>: customer_name ${td.each.name} email ${td_each.email}
td_for_each> オペレーターは、Treasure Data CDPに対するHiveまたはPrestoクエリの結果行ごとにサブタスクをループします。
- td_for_each>: file.sql
クエリファイルのパスを指定します。クエリ結果を${td.each.xxxx}変数に
格納しサブタスク内で利用することができます。
また、以下のオプションを使ってクエリやクエリ結果の制御を行います。
- _do: タスク
- database: [データベース名]
- engine: [presto or hive]
実行するタスクを記述します。
_exportで宣言されていないデータベースを指定することができます。
クエリエンジンにprestoまたはhiveを指定できます。
engineを使わない場合は、デフォルトでprestoになります。
td_load>:
test.dig _export: td: database: test_db +task1: td_load>: config/load_data.yml database: test_db2 table: table_name
td_load> オペレーターは、ストレージ、データベース、またはサービスからデータをロードします。
実行できるのは自分に属するソースのみのため、他のユーザーが所有するソースを実行するには、管理者権限が必要になります。
- td_load>: FILE.yml
- database: [データベース名]
- table: [テーブル名]
YAMLファイルへのパスを指定します。
以下のオプションを使ってロードしたデータの格納先を指定します。
_exportで宣言されていないデータベースを指定することができます。
データを格納するターゲットテーブルを指定します。
td_dll>:
_export: td: database: test_db +task1: td_ddl>: create_tables: table_name +task2: td_ddl>: drop_tables: table_name +task3: td_ddl>: rename_tables: [{from: “table_name1, to: table_name2”}] database: test_db2
td_ddl> オペレーターは、Treasure Data CDPに対して操作タスクを実行します。
以下のオプションを使ってテーブル操作を実行します。
- create_tables: [テーブル名]
- drop_tables: [テーブル名]
- rename_tables: [{from: 変更前テーブル名, to: 変更後テーブル名}]
- database: [データベース名]
テーブルが存在しない場合、新しいテーブルを作成します。
テーブルがすでに存在する場合、処理を行いません。
例)WFの初回実行時に、selectやdelete等を行う場合、テーブルが存在しな
いとエラーとなるため、事前にcreate_teblesで利用するテーブル名を
指定しておくことで手動でテーブルを事前に作成する必要がなくなります。
テーブルが存在する場合はテーブルを削除します。
例)テンポラリーテーブルを作成した際に、WFの終了前にdrop_tablesを
行うことで不要なテーブルが削除されDB内の整理がやりやすくなります。
テーブルの名前を別の名前に変更します。
変更後テーブル名がすでに存在する場合は上書きします。
例)前日差分と比較する場合、前日結果をrename_tablesを行い退避させて
おくことで当日結果と前日結果を比較して差分を出すこともできます。
_exportで宣言されていないデータベースを指定することができます。
最後に
今回は、データマネジメントチームが良く使うオペレーターの中でもTreasuredataオペレーターについてご紹介させていただきました。
次回は、digdagオペレーターをご紹介させていただきます。