Treasure Data CDPにはオンライン、オフラインそしてIoTデバイス機器など様々なデータを集約することができます。そのインテグレーション機能としてGUIベースの機能である「Integrations Hub」があります。Treasure Data CDPのWebコンソールでグラフィカルに操作できるので便利な機能ではありますがワークフロー(digdag)で記載することもできます。今回はその方法をサンプルコードも含めて紹介させていただきます。こちらのサンプルでは事前にデータベースおよびテーブルをTDコンソールから作成頂く必要がございます。
GUIでの設定方法例
今回はS3に格納されたオブジェクト(CSVファイル)を格納します。
GUIでの設定について詳細ドキュメントはこちらを参照ください。

GUIでの入力項目
ワークフローでの設定方法例
ワークフローで行う場合は、yamlファイルで取り込み元のS3情報設定をします。yamlは配列や連想配列を簡易的に示すことができます。
サンプルファイル
# 「in」は取り込み元のデータソースを指定します
in:
# 取り込み元は「s3」
type: s3
# 「s3(AWS)」のアクセスID
access_key_id: XXX
# 「s3(AWS)」のアクセスキー
secret_access_key: XXX
# 「s3(AWS)」のバケット
bucket: XXX
# 「s3(AWS)」のパス
path_prefix: XXX
# 新規登録として指定
mode: insert
# 取り込み対象ファイルのパースを設定
parser:
# 文字コード
charset: UTF-8
# CSVの改行コード
newline: LF
# ファイル・タイプ
type: csv
# CSVの区切り文字
delimiter: ','
# CSVの文字列区切り文字
quote: '"'
# CSVのエスケープ文字
escape: '"'
# ‘’をnullに変換
null_string: ''
# クォートされていないと文字列前後の空白を削除、初期値false
trim_if_not_quoted: false
# CSVの1行目を取り込まない
skip_header_lines: 1
# 「true」の場合は過多なカラムを取り込まない
allow_extra_columns: false
# 「true」の場合、足りていないカラムを無視
allow_optional_columns: false
# 「true」の場合でパースエラーが発生した場合停止させる
stop_on_invalid_record: true
# デフォルトのタイムゾーン
default_timezone: 'Asia/Tokyo'
# カラムを定義
columns:
- {name: id, type: string}
- {name: text, type: string}
filters:
- type: add_time
to_column:
name: time
type: timestamp
from_value:
value: ${session_unixtime}
ワークフローでyamlを読み込み実行させます。このサンプルでは理解しやすいようにaccess_key_idやsecret_access_keyをファイル内に記載しておりますが、実際の環境ではシークレットをご使用ください。
https://docs.treasuredata.com/display/public/PD/About+Workflow+Secret+Management
サンプルファイル
_export:
# S3取り込み先データベース
to_database: xxxx
# S3取り込み先テーブル
to_table: xxxx
+import:
# 上記のS3取り込み設定ファイルを指定
td_load>: s3.yml
database: ${to_database}
table: ${to_table}
Treasure Data CDPではyamlとワークフローを利用した柔軟な方法があることがご理解いただけたかと思われます。今回は取り込み元データソースはS3を例にご紹介いたしましたが、他にも「(S)FTP」や「gcs」、サードパーティーのシステムなど多数ございます。それらのyamlのサンプルは「treasure-boxes」にて多数のサンプルを紹介しております。
是非今後の運用にご活用ください。
