第3回目の今日は、データ処理ライブラリ Polars の実践編だ。 「Pandasで十分」という場面もあるかもしれない。しかし、数百MB~数GBを超えるCSVを開こうとしてメモリ不足でPCがフリーズした経験はないだろうか。私は過去に何度も経験した。 Polarsを使えば、そのようなメモリのリソース問題から解放される手法を選択できる。
なお、本記事の最後に簡易なベンチマークを実施した結果を掲載しておくので、興味があれば参照されたい。
10年分の1分足を一瞬で読み込む
まず、Polarsの真骨頂である Lazy Evaluation(遅延評価) を見てみよう。 手元に数GBほどの株価データ(Parquet形式)があると仮定する。
Polarsの基本文法
Pandasでは read_parquet を使うが、Polarsでは scan_parquet を使う。
import polars as pl # scan_ なので、まだファイルは読み込まれない q = pl.scan_parquet("data/stock_history_large.parquet") # クエリを構築(メソッドチェーンできる点が良い) q = ( q.filter(pl.col("code") == "7203") # トヨタ自動車 .filter(pl.col("date") >= "2020-01-01") .select(["date", "open", "high", "low", "close", "volume"]) ) # ここで初めて実行!必要なデータだけがメモリに展開される df = q.collect() print(df.head())
Pandasだと「全データをメモリに読んでからフィルタ」していたのが、Polarsなら「フィルタしてから必要な部分だけ読む」ことができる。この違いは大きい。
テクニカル指標計算の書き直し
システムトレードといえばテクニカル指標である。Pandasで rolling を使って書いていた移動平均線計算は、Polarsでは次のように記述する。
rolling 処理とウィンドウ関数
# Pandasの場合(遅い) # df['ma25'] = df.groupby('code')['close'].transform(lambda x: x.rolling(25).mean()) # Polarsの場合(高速) df = df.with_columns([ pl.col("close") .rolling_mean(window_size=25) .over("code") # 銘柄ごとに計算 .alias("ma25") ])
この .over("code") が素晴らしい。groupby して apply する必要はない。SQLのwindow関数のように、特定のグループ内で計算を完結させることができる。
TA-Libとの連携
とはいえ、RSIやボリンジャーバンドなど、TA-Lib の資産を使いたい場合もあるだろう。PolarsはNumPyと親和性が高いので、簡単に連携できる。
import talib import numpy as np def calculate_rsi(close_prices: pl.Series, timeperiod=14) -> pl.Series: # Polars Series -> NumPy Array rsi_np = talib.RSI(close_prices.to_numpy(), timeperiod=timeperiod) return pl.Series(rsi_np) # map_batches を使って適用 df = df.with_columns( pl.col("close") .map_batches(lambda x: calculate_rsi(x, 14)) .over("code") .alias("rsi14") )
少しコード量は増えるが、処理速度は依然として高速である。
データ型の厳格さと欠損値処理
PolarsはRustらしさを受け継いでいるのか、データ型(dtype)に厳格である。
例えば「数値カラムのはずの列に文字列 "-" が混じっていて、計算時に落ちる」という事態はPandasではありがちだ。しかし、Polarsは読み込み時点(Schema解決時)でエラーを吐くか、Null に変換してくれる。
Null処理のモダンなアプローチ
欠損値(Null)の扱いも直感的でわかりやすい(と個人的に思う)。
# 直前の値で埋める (Forward Fill) df.with_columns(pl.col("close").fill_null(strategy="forward")) # 0で埋める df.with_columns(pl.col("volume").fill_null(0))
また、システムトレードでハマりやすいのが タイムゾーン である。
Polarsの Datetime 型はタイムゾーンを明確に管理できる。
# UTCからJSTへ変換 df.with_columns( pl.col("timestamp").dt.convert_time_zone("Asia/Tokyo") )
これを怠ると、バックテストで「朝9時のエントリーのはずが、深夜0時にエントリーしていた」というミスが発生する。Polarsの厳格さは、こうしたミスを未然に防いでくれる。
今日の成果
- Polars の
LazyFrameを使い、メモリ効率の良いデータ読み込みを実装した。 over("code")を使い、銘柄ごとの移動平均計算を高速化した。- TA-Lib との連携方法を確認した。
これで、大量のデータを扱う準備は整った。 次回は、このデータをどこに保存するかについて考える。CSVもParquetも管理しきれなくなったときの避難先として、DuckDB を紹介する。
ベンチマーク
ベンチマークに使用したスクリプト
ダミーのデータを生成して、PandasとPolarsのベンチマークを比較する。
ベンチマークスクリプト
import time import numpy as np import pandas as pd import polars as pl import os import shutil from datetime import datetime, timedelta # 設定 DATA_FILE = "data/stock_history_large.parquet" NUM_STOCKS = 50 YEARS = 10 # 単純な仮定: 250営業日 * 5時間 * 60分 = 年間75,000分 MINUTES_PER_YEAR = 250 * 300 TOTAL_MINUTES = YEARS * MINUTES_PER_YEAR try: import talib HAS_TALIB = True except ImportError: HAS_TALIB = False print("TA-Libが見つからない。RSIの計算にはNumPyのフォールバックを使用する。") def generate_synthetic_data(filename): print(f"合成データを生成中: {NUM_STOCKS} 銘柄, {YEARS} 年分 (各 {TOTAL_MINUTES} 行)...") start_time = time.time() # ディレクトリが存在しない場合は作成 os.makedirs(os.path.dirname(filename), exist_ok=True) dfs = [] # 各銘柄のデータを生成 # 生成中のメモリを節約するため1銘柄ずつ生成するが、50項目程度ならリストへの追加で十分効率的。 start_date = datetime(2015, 1, 1) end_date = start_date + timedelta(minutes=TOTAL_MINUTES - 1) # Polarsを使用して効率的に日付範囲を生成 # これは適切なDatetime型のPolars Seriesを返します base_dates = pl.datetime_range(start_date, end_date, "1m", eager=True) for i in range(NUM_STOCKS): code = f"{7000 + i}" # 価格のランダムウォーク # 開始価格は1000から5000の間 start_price = np.random.uniform(1000, 5000) returns = np.random.normal(0, 0.0002, TOTAL_MINUTES) # 微小な分足ボラティリティ price_path = start_price * np.exp(np.cumsum(returns)) # OHLVCの生成(簡単な近似) close = price_path open_ = close * (1 + np.random.normal(0, 0.0001, TOTAL_MINUTES)) high = np.maximum(open_, close) * (1 + np.abs(np.random.normal(0, 0.0001, TOTAL_MINUTES))) low = np.minimum(open_, close) * (1 - np.abs(np.random.normal(0, 0.0001, TOTAL_MINUTES))) volume = np.random.randint(100, 10000, TOTAL_MINUTES).astype(float) # この銘柄のDataFrameを作成 # 辞書を使用する方が作成が速い。 data = { "code": [code] * TOTAL_MINUTES, "date": base_dates, "open": open_, "high": high, "low": low, "close": close, "volume": volume } # 後でconcatするためにPandasを使用するが、保存にはPolarsの方が速い可能性。 # 保存するために直接Polars DataFrameを作成。 df = pl.DataFrame(data) # 必要に応じて適切な型にキャストしてスペース/メモリを節約できるが、ベンチマークにはデフォルト(f64, i64)で問題なし。 dfs.append(df) if (i + 1) % 10 == 0: print(f"{i + 1}/{NUM_STOCKS} 銘柄の生成が完了。") print("結合してParquetに保存中...") # 全てを結合(Polarsはこれが非常に高速)。 full_df = pl.concat(dfs) # Parquetに保存 full_df.write_parquet(filename) end_time = time.time() print(f"データ生成完了。サイズ: {full_df.estimated_size() / 1024**3:.2f} GB (メモリ上)。") print(f"ファイルを {filename} に保存した。所要時間: {end_time - start_time:.2f}秒") # メモリを解放 del dfs del full_df # --- ベンチマーク関数 --- def numpy_rsi(prices, period=14): # NumPyを使用した簡略版RSI実装 deltas = np.diff(prices) seed = deltas[:period+1] up = seed[seed >= 0].sum()/period down = -seed[seed < 0].sum()/period rs = up/down rsi = np.zeros_like(prices) rsi[:period] = np.nan rsi[period] = 100. - 100./(1. + rs) for i in range(period+1, len(prices)): delta = deltas[i-1] # 階差をとるため長さが1短くなることに注意 if delta > 0: upval = delta downval = 0. else: upval = 0. downval = -delta up = (up*(period-1) + upval)/period down = (down*(period-1) + downval)/period rs = up/down rsi[i] = 100. - 100./(1. + rs) return rsi # ブロードキャスト(可能な限りのベクトル化)用に最適化されたNumPy RSIは、ループや特定の関数なしでは困難です # 公平性のために、TALibがない場合はPandas用に少しベクトル化されたPandasフレンドリーなバージョンを使用します def pandas_rsi_manual(series, period=14): delta = series.diff() gain = (delta.where(delta > 0, 0)).fillna(0) loss = (-delta.where(delta < 0, 0)).fillna(0) avg_gain = gain.rolling(window=period, min_periods=period).mean() avg_loss = loss.rolling(window=period, min_periods=period).mean() rs = avg_gain / avg_loss return 100 - (100 / (1 + rs)) # TALibがない場合のPolarsマニュアルRSI def polars_rsi_manual(series, period=14): # これはmap_batches内で直接使用されるわけではありませんが、Polarsの式やPython関数内でロジックを再現するためのものです # map_batchesを使用する場合は、NumPy配列を渡すことができます # map_batchesを使用する場合は、単に同じNumPy関数を使用します pass def run_pandas_benchmark(): print("\n--- Pandas ベンチマーク ---") results = {} # 1. データの読み込み t0 = time.time() df = pd.read_parquet(DATA_FILE) t1 = time.time() results['Load'] = t1 - t0 print(f"Load: {results['Load']:.4f}s") # 2. フィルタリング t0 = time.time() # 記事に合わせたロジック: 銘柄コードと日付でフィルタ # Pandasでは、記事の「Pandasの使い方」にあるように、全てをロードしてからフィルタリングする。 # 注: 記事では「Pandasの全ロード + フィルタ」と「Polarsのスキャン + フィルタ」を比較している。 # 指示への公平性のために、記事の内容に従う。 # rolling/groupbyの速度差を示すために、データセット全体で指標を計算する。 # コピーを使用するか、単にdfを修正するか?標準的なpandasは修正するか新規を返す。 # 負荷テストのために全データで移動平均を実行。 # フィルタリングの例(トヨタ自動車相当) target_code = "7024" # 任意の銘柄を選択 subset = df[(df['code'] == target_code) & (df['date'] >= "2020-01-01")] t1 = time.time() results['Filter'] = t1 - t0 print(f"フィルタリング (サブセット): {results['Filter']:.4f}秒") # 3. 移動平均 (グループ化) t0 = time.time() # df['ma25'] = df.groupby('code')['close'].transform(lambda x: x.rolling(25).mean()) # lambdaバージョンは低速。最適化されたDataFrameGroupBy.rollingの方が良いが、transformは扱いにくい。 # 記事では「df.groupby('code')['close'].transform(lambda x: x.rolling(25).mean())」となっている。 # 可能であれば最適化された方法を使用するか、それとも記事に示された方法を使用して論点を示すか。 # 記事では、lambda/transformを使用した方法を明示的に「Pandasの場合(遅い)」と呼んでいる。 # 意図的に最も遅いわけではないかもしれないが、適度に標準的なpandasのアプローチを採用。 # df.groupby('code').rolling(25)['close'].mean() はマルチインデックスのSeriesを返す。 # マッピングし直すのは低速な可能性。 # 組み込み関数でtransformを使用する方がlambdaより速いが、rollingはtransformの引数として直接渡せない。 # 著者の主張を実証するため、記事に記載されている方法を使用。 df['ma25'] = df.groupby('code')['close'].transform(lambda x: x.rolling(25).mean()) t1 = time.time() results['Rolling Mean'] = t1 - t0 print(f"Rolling Mean (Grouped): {results['Rolling Mean']:.4f}s") # 4. RSI (グループ化) t0 = time.time() if HAS_TALIB: # Pandasのapplyをtalibと共に使用するのは、行単位またはグループ単位のapplyになるため、一般的に低速。 df['rsi14'] = df.groupby('code')['close'].transform( lambda x: pd.Series(talib.RSI(x.values, timeperiod=14), index=x.index) ) else: # 手動によるPandas RSI df['rsi14'] = df.groupby('code')['close'].transform( lambda x: pandas_rsi_manual(x, 14) ) t1 = time.time() results['RSI'] = t1 - t0 print(f"RSI (Grouped): {results['RSI']:.4f}s") return results def run_polars_benchmark(): print("\n--- Polars ベンチマーク ---") results = {} # 1. 読み込み (スキャン) + フィルタリング # Polarsではこれらを組み合わせることが一般的。サブセットに対して「スキャン + コレクト」を測定し、指標に対して「スキャン + 全件計算」を実行。 # Pandasの「ロード」時間と公平に比較するために、全てをロードする場合もあるが、記事ではLazy(遅延評価)が強調されている。 t0 = time.time() q = pl.scan_parquet(DATA_FILE) # フィルタリング target_code = "7024" q_filter = ( q.filter(pl.col("code") == target_code) .filter(pl.col("date") >= datetime(2020, 1, 1)) ) # サブセットをコレクト df_subset = q_filter.collect() t1 = time.time() # Pandasでは「ファイル全体のロード」の後に「フィルタリング」を測定した。 # ここでは「スキャン + フィルタ + コレクト」を測定。これはIDとIOを組み合わせたもの。 # これは「欲しい特定のデータを取得する速度」の指標。 results['Load (Lazy Filter)'] = t1 - t0 print(f"Load (Lazy Filter): {results['Load (Lazy Filter)']:.4f}s") # さあ、本番です: 全データセットに対するロジック # GroupBy Rollingを比較するために、メモリ上に完全なDataFrameが必要です t0 = time.time() df_pl = pl.read_parquet(DATA_FILE) t_load_full = time.time() - t0 print(f"(参考: Polars 全件読み込み: {t_load_full:.4f}秒)") # 2. 移動平均 t0 = time.time() df_pl = df_pl.with_columns([ pl.col("close") .rolling_mean(window_size=25) .over("code") .alias("ma25") ]) t1 = time.time() results['Rolling Mean'] = t1 - t0 print(f"Rolling Mean (Grouped): {results['Rolling Mean']:.4f}s") # 3. RSI t0 = time.time() if HAS_TALIB: # 記事のようにmap_batchesを使用 def calculate_rsi(close_prices: pl.Series, timeperiod=14) -> pl.Series: # NULLや空のSeriesがある場合に対処 if len(close_prices) == 0: return pl.Series([], dtype=pl.Float64) rsi_np = talib.RSI(close_prices.to_numpy(), timeperiod=timeperiod) return pl.Series(rsi_np) df_pl = df_pl.with_columns( pl.col("close") .map_batches(lambda x: calculate_rsi(x, 14)) .over("code") .alias("rsi14") ) else: # 式を使用したPolarsの手動RSI # RSI = 100 - 100 / (1 + RS) # RS = AvgGain / AvgLoss # 標準的なRSIはWilderの平滑化(概ねEMA)を使用します # このベンチマークでは、単純化のためにPandasの手動バージョンに近い式(単純移動平均または簡単な計算)を使用します # または、pandas_rsi_manual相当(numpy)を使用してmap_batchesを使用します # 速度のためにNumPyでmap_batchesを使用し、「Polarsスタイル」を維持します(Rustベースのロジックが最適ですが、map_batches経由のPython計算は標準的なフォールバックです) df_pl = df_pl.with_columns( pl.col("close") .map_batches(lambda x: pd.Series(pandas_rsi_manual(pd.Series(x), 14)).values) # Pandas/NumPyのロジックを再利用 .over("code") .alias("rsi14") ) t1 = time.time() results['RSI'] = t1 - t0 print(f"RSI (Grouped): {results['RSI']:.4f}s") return results def main(): if not os.path.exists(DATA_FILE): generate_synthetic_data(DATA_FILE) else: print(f"データファイル {DATA_FILE} が存在する。生成をスキップする。") p_res = run_pandas_benchmark() pl_res = run_polars_benchmark() print("\n" + "="*40) print("最終比較結果") print("="*40) print(f"{'タスク':<25} | {'Pandas (秒)':<10} | {'Polars (秒)':<10} | {'倍速'}") print("-" * 65) # 移動平均の比較 pd_roll = p_res['Rolling Mean'] pl_roll = pl_res['Rolling Mean'] print(f"{'Rolling Mean (Grouped)':<25} | {pd_roll:<10.4f} | {pl_roll:<10.4f} | {pd_roll/pl_roll:<10.1f}x") # RSIの比較 pd_rsi = p_res['RSI'] pl_rsi = pl_res['RSI'] print(f"{'RSI (Grouped)':<25} | {pd_rsi:<10.4f} | {pl_rsi:<10.4f} | {pd_rsi/pl_rsi:<10.1f}x") print("-" * 65) print("注: 「読み込み」の比較は手法が異なる (全件読み込み vs 遅延フィルタリング)。") print("これはワークフローの効率の差を示している。") if __name__ == "__main__": main()
出力
出力結果
$ uv run python scripts/benchmark_polars_pandas.py データファイル data/stock_history_large.parquet が存在する。生成をスキップする。 --- Pandas ベンチマーク --- Load: 1.7503s フィルタリング (サブセット): 0.5683秒 Rolling Mean (Grouped): 2.4783s RSI (Grouped): 1.8133s --- Polars ベンチマーク --- Load (Lazy Filter): 0.0078s (参考: Polars 全件読み込み: 0.5328秒) Rolling Mean (Grouped): 1.1277s RSI (Grouped): 1.2417s ======================================== 最終比較結果 ======================================== タスク | Pandas (秒) | Polars (秒) | 倍速 ----------------------------------------------------------------- Rolling Mean (Grouped) | 2.4783 | 1.1277 | 2.2 x RSI (Grouped) | 1.8133 | 1.2417 | 1.5 x ----------------------------------------------------------------- 注: 「読み込み」の比較は手法が異なる (全件読み込み vs 遅延フィルタリング)。 これはワークフローの効率の差を示している。
なお、下記はあまり本文には関係ないアイキャッチ画像である。生成AIで作ったが、これくらいのものなら一瞬でできてしまうので、すごい時代になったなと思う。
