テクニカルサポートエンジニアリングチームの橘 樹男です。
今回はpytdを使っている際に遭遇する意図しないデータ変換の事象について解説いたします。
TL;DR
load_table_from_dataframeを実行してDataFrameをTreasure Data CDPにUploadすると意図しないデータの変換が発生します。(例: 001 -> 1)これはデフォルトのfmt引数のデフォルト値がcsvとなっていることに起因する。 load_table_from_dataframeの引数にfmt=’msgpack’を渡すことで、この事象は防げます。
pytdとは?
Treasure Data CDPが提供しているPython用のライブラリです。TD Client PythonというSDKも提供しているのですが、PytdはPandasなどのデータサイエンスでよく使われるライブラリとの親和性を高め、より使い勝手を向上したSDKです。
詳細はこちらのpytdのドキュメントをご確認ください。
https://pytd-doc.readthedocs.io/en/latest/
事象
pytdを使ってデータをTreasure Data CDPに取り込もうとする時に以下のようなコードを書くかと思います。
import pandas as pd
df = pd.DataFrame(data={'col1': [1, 2], 'col2': [3, 10]})
client.load_table_from_dataframe(df, 'tachibana.foo', writer='bulk_import', if_exists='overwrite')
この際に以下のように0埋めのようなデータだった場合に問題が生じることがあります。
| id | name | adderss |
| 001 | Taro Treasure | Tokyo |
| 002 | Hanako Treasure | Tokyo |
どんな問題が発生するかについては実際のコードともに確認していきましょう。 以下のようなコードでUploadをしてみます。
import pandas as pd
import pytd
df = pd.DataFrame([{'id':'001', 'name': 'Taro Treasure'}, [{'id':'002', 'name': 'Hanako Treasure'}, ])
client.load_table_from_dataframe(df, 'database_name.table_name', writer='bulk_import', if_exists='overwrite')
では実際のデータをみてます。
client.query('select * from database.table', engine='presto')
結果は以下のとおりです。
{'data': [[1, 'Taro Treasure', 1622613195],
[2, 'Hanako Treasure', 1622613195]],
'columns': ['id', 'name', 'time']}
少しわかりにくいかもしれないのですが、001というデータの0が勝手に排除されてしまい、001 -> 1, 002 -> 2となってしまっているのです。意図せずデータが勝手に変換されてしまいました。
原因
原因について、pytdのコードをみながら確認していきましょう。まず、load_table_from_dataframeが何をしているのかをみていきます。
def load_table_from_dataframe(
self, dataframe, destination, writer="bulk_import", if_exists="error", **kwargs
):
"""Write a given DataFrame to a Treasure Data table.
This function may initialize a Writer instance. Note that, as a part of
the initialization process for SparkWriter, the latest version of
td-spark will be downloaded.
Parameters
----------
dataframe : :class:`pandas.DataFrame`
Data loaded to a target table.
destination : str, or :class:`pytd.table.Table`
Target table.
writer : str, {'bulk_import', 'insert_into', 'spark'}, or
:class:`pytd.writer.Writer`, default: 'bulk_import'
A Writer to choose writing method to Treasure Data. If not given or
string value, a temporal Writer instance will be created.
if_exists : str, {'error', 'overwrite', 'append', 'ignore'}, default: 'error'
What happens when a target table already exists.
- error: raise an exception.
- overwrite: drop it, recreate it, and insert data.
- append: insert data. Create if does not exist.
- ignore: do nothing.
"""
if isinstance(destination, str):
if "." in destination:
database, table = destination.split(".")
else:
database, table = self.database, destination
destination = self.get_table(database, table)
destination.import_dataframe(dataframe, writer, if_exists, **kwargs)
データの変換に関わるような事はしておらず、import_dataframeを実行しているだけのようです。 次にimport_dataframeをみましょう。
writer.write_dataframe(dataframe, self, if_exists, **kwargs)
これもwrite_dataframeを実行しているだけのようです。 ではwrite_dataframeをみていきましょう。
あれ、コメントに怪しいことがかいてあります。
fmt : {'csv', 'msgpack'}, default: 'csv'
Format for bulk_import.
- csv
Convert dataframe to temporary CSV file. Stable option but slower
than msgpack option because pytd saves dataframe as temporary CSV file,
then td-client converts it to msgpack.
Types of columns are guessed by ``pandas.read_csv`` and it causes
unintended type conversion e.g., 0-padded string ``"00012"`` into
integer ``12``.
- msgpack
Convert to temporary msgpack.gz file. Fast option but there is a
slight difference on type conversion compared to csv.
どうやらfmtという引数でcsvを指定した場合、一時的にデータをCSVファイルとして書き出しその後msgpackに変換しTreasure Data CDPにUploadするが、その際にはpandas.read_csvを使っているとのことです。 これによって0埋めしていた00012が12に変換されることある、とのこと。
まさにこれの事です。ようやくたどり着くことができました。つまり、load_table_from_dataframeは最終的にwrite_dataframeを実行しているがこれのfmt引数のデフォルトがcsvとなっていることにより、意図しないデータの変換が起きてしまったようです。ここまでくれば対策も簡単です。
対策
load_table_from_dataframeにfmt=’msgpack’を渡す。これだけで0埋めしていた数字が勝手に変換されるといったことが防ぐことができます。
最後に
対策としては非常に簡単なものなのとなりますが、 知らないとなかなか解決策にたどりつけず苦労するところか思います。
本記事がどなたかのお役にたてば幸いです。
