今回は時系列データのクラスタリングについての記事です。
クラスタリングは多くのデータから特定の条件で分類しラベルが貼られる手法です。もう少し詳しく言うと、教師なし分類学習で特定の基準でデータを分類する手法であり、以下の特徴があります。
- データを分類したいが、明確な分類基準がない時に利用できる
- 類似しているデータを指定した数のクラスターで分類できる
- 分類されたクラスターは特徴内容などのラベルが付くのではなく、直接定義が必要
- 代表的にK-Means アルゴリズムがある
そして、時系列データのクラスタリングは、商品購買や回覧コンテンツの特徴が類似するユーザーの分類や売上推移が類似している店舗の分類、電力負荷の類似によるパターン分類などのユースケースがあります。
時系列クラスタリングの中で、主に二つの手法があります。一つ目はEuclidean Distanceでユークリッド距離を使って同一時系列上の類似度を測る手法です。同じ時系列上での類似度を測る、計算速度が速い、時系列の長さが同じである必要があるという特徴があります。そして、Dynamic Time Warpingは同一時系列上と周辺の要素まで類似度を測る手法であり、同一時系列上以外でも分類ができますが(時系列の長さが異なっても類似度を測ることが可能)、構築の難易度が高く、計算量が多いという特徴があります。
そのため、利用目的によって何を使うかの検討が必要となります。そして、今回はEuclidean Distanceを使った時系列クラスタリングについてご紹介します。「NY Stockの2014~2016 3年間のDaily 終値データ」を使って、時系列推移が類似している銘柄のうち、3つのクラスターを作成し特徴を把握してみましょう。
Time Series KMeansアルゴリズムを使ってmetricは先ほどのEuclidean Distanceを使います。必要な前処理としては、データの標準化となります。株価のスケールが異なるため、標準化することを推奨します。なぜなら、分類されるクラスターが株価のスケールの大きさによる特徴になってしまう可能性があるためです。標準化は以前の記事にも記載しましたが、改めて今回使うデータの例で説明します。
まずは、データサンプルの例です。
以下の例は同じデータをそのまま・標準化で比較した場合となります。
数値間のスケールの差がある場合、その差によって距離が遠くなり、類似してないことになる可能性が高くなります。
スケールを除いた距離を測ることができ、特徴による分類ができるようになります。
以下のコードでクラスタリング処理ができますので、ご参考にしてください。
import pandas as pd from sklearn.preprocessing import StandardScaler scaler_std = StandardScaler() from tslearn.clustering import TimeSeriesKMeans # sample データのload df = pd.read_csv('archive/prices-split-adjusted.csv') # 必要な項目選択、index設定 df = df.set_index('date') # indexをdatetime形に変換 df.index = pd.DatetimeIndex(df.index) df.sort_index(inplace=True) # 銘柄ごとの終値を標準化 df.loc[:,:] = scaler_std.fit_transform(df) # clustering # euclidean metric = 'euclidean' # cluster数 n_clusters = 3 tskm_base = TimeSeriesKMeans(n_clusters=n_clusters, metric=metric, max_iter=100, random_state=27) # model tskm_base.fit(df.T.values) # クラスタに属するデータ数を計算(base) cnt = collections.Counter(tskm_base.labels_) cluster_labels = {} for k in cnt: cluster_labels['cluster-{}'.format(k)] = cnt[k] # クラスターごとの数 print(sorted(cluster_labels.items())) # クラスターごとの平均をplot plt.rcParams["figure.figsize"] = (20, 10) plot_temp_all = pd.DataFrame() cluster_kv_base = pd.DataFrame(tskm_base.labels_, columns=['cluster']) for i in range(n_clusters): cluster_kv_values_base = cluster_kv_base[cluster_kv_base['cluster'] == i].index.values.tolist() plot_temp = pd.DataFrame(df.iloc[:,cluster_kv_values_base].mean(axis=1)) plot_temp.columns = ['clsuter : ' + str(i)] plot_temp_all = pd.concat([plot_temp_all, plot_temp], axis=1) plot_temp_all.plot()
そして、クラスタリング結果を確認してみましょう。
全体の銘柄数が490のうち、以下のような分類ができました。
- Cluster 0 : 131銘柄、2016年まで落ちていて、回復していく特徴に類似している銘柄
- Cluster 1 : 240銘柄、継続に成長している特徴に類似している銘柄
- Clsuter 2 : 119銘柄、2015年まで上昇して、その後下落し回復している特徴に類似している銘柄
490銘柄という多くの数の時系列データから、上記のような分類、そして、特徴の把握ができました。最後にここで注意する点は、クラスターの平均のため、その要素の中には強く類似・弱く類似しているデータが存在することです。
以下にTDでサンプルコードを記載しますので、ご参考にしてください。
結果をTDに戻すための、workflow
timezone: Asia/Tokyo # main process +train_predict: docker: image: "digdag/digdag-python:3.9" _env: TD_API_KEY: TD apikey入力 ENDPOINT: TD環境に合わせてhttps://api.treasuredata.co.jp or https://api.treasuredata.comを入力 DB: DB名入力 py>: scripts.test_py.main
- 上記のdigに次の以下のcustom scriptsを組み込むことで、Workflowとして結果をTDに戻すことができます
- Pythonのみでpytdを使うだけで、TDに戻すことができます(以下のcustom scripts参照)
結果をTDに戻すための、custom scripts
import pandas as pd from pytd import pandas_td as td from sklearn.preprocessing import StandardScaler scaler_std = StandardScaler() from tslearn.clustering import TimeSeriesKMeans # pytdの必要な情報をWorkflow側の変数からcall con = td.Client(apikey=os.environ.get('TD_API_KEY'), endpoint=os.environ.get('ENDPOINT')) presto = td.create_engine('presto:{}'.format(os.environ.get('DB')), con=con) database = os.environ.get('DB') def main(): # 対象の時系列データをload load_td = td.read_td_query(''' SELECT * FROM sample_dataset ''', engine=presto) # index設定 sample_df = load_td.set_index('date') # IndexをDatetime format指定 & sort sample_df.index = pd.DatetimeIndex(sample_df.index) sample_df.sort_index(inplace=True) # 銘柄ごとの終値を標準化 sample_df.loc[:,:] = scaler_std.fit_transform(sample_df) # clustering # euclidean metric = 'euclidean' # cluster数 n_clusters = 3 tskm_base = TimeSeriesKMeans(n_clusters=n_clusters, metric=metric, max_iter=100, random_state=27) # model tskm_base.fit(sample_df.T.values) # 出力用の銘柄とcluster番号のデータセット作成 export_result = pd.concat([pd.DataFrame(sample_df.columns), pd.DataFrame(tskm_base.labels_)], axis=1) export_result.columns = ['stock', 'cluster_no'] # 対象DBにてtest_exportテーブルに結果(stock, cluster_no)を格納 td.to_td(export_result, '{}.test_export'.format(database), con=con, if_exists='append')
時系列データの中である要素を分類したいけど特に基準がない場合にぜひ活用してみてください。
いままでわからなかった気付きがあるかもしれません!