株のシステムトレードをしよう - 1から始める株自動取引システムの作り方

株式をコンピュータに売買させる仕組みを少しずつ作っていきます。できあがってから公開ではなく、書いたら途中でも記事として即掲載して、後から固定ページにして体裁を整える方式で進めていきます。

Pandasを卒業しよう -- Polarsによる高速データ処理

第3回目の今日は、データ処理ライブラリ Polars の実践編だ。 「Pandasで十分」という場面もあるかもしれない。しかし、数百MB~数GBを超えるCSVを開こうとしてメモリ不足でPCがフリーズした経験はないだろうか。私は過去に何度も経験した。 Polarsを使えば、そのようなメモリのリソース問題から解放される手法を選択できる。

なお、本記事の最後に簡易なベンチマークを実施した結果を掲載しておくので、興味があれば参照されたい。

10年分の1分足を一瞬で読み込む

まず、Polarsの真骨頂である Lazy Evaluation(遅延評価) を見てみよう。 手元に数GBほどの株価データ(Parquet形式)があると仮定する。

Polarsの基本文法

Pandasでは read_parquet を使うが、Polarsでは scan_parquet を使う。

import polars as pl

# scan_ なので、まだファイルは読み込まれない
q = pl.scan_parquet("data/stock_history_large.parquet")

# クエリを構築(メソッドチェーンできる点が良い)
q = (
    q.filter(pl.col("code") == "7203")      # トヨタ自動車
     .filter(pl.col("date") >= "2020-01-01")
     .select(["date", "open", "high", "low", "close", "volume"])
)

# ここで初めて実行!必要なデータだけがメモリに展開される
df = q.collect()
print(df.head())

Pandasだと「全データをメモリに読んでからフィルタ」していたのが、Polarsなら「フィルタしてから必要な部分だけ読む」ことができる。この違いは大きい。

テクニカル指標計算の書き直し

システムトレードといえばテクニカル指標である。Pandasで rolling を使って書いていた移動平均線計算は、Polarsでは次のように記述する。

rolling 処理とウィンドウ関数

# Pandasの場合(遅い)
# df['ma25'] = df.groupby('code')['close'].transform(lambda x: x.rolling(25).mean())

# Polarsの場合(高速)
df = df.with_columns([
    pl.col("close")
      .rolling_mean(window_size=25)
      .over("code")  # 銘柄ごとに計算
      .alias("ma25")
])

この .over("code") が素晴らしい。groupby して apply する必要はない。SQLのwindow関数のように、特定のグループ内で計算を完結させることができる。

TA-Libとの連携

とはいえ、RSIやボリンジャーバンドなど、TA-Lib の資産を使いたい場合もあるだろう。PolarsはNumPyと親和性が高いので、簡単に連携できる。

import talib
import numpy as np

def calculate_rsi(close_prices: pl.Series, timeperiod=14) -> pl.Series:
    # Polars Series -> NumPy Array
    rsi_np = talib.RSI(close_prices.to_numpy(), timeperiod=timeperiod)
    return pl.Series(rsi_np)

# map_batches を使って適用
df = df.with_columns(
    pl.col("close")
      .map_batches(lambda x: calculate_rsi(x, 14))
      .over("code")
      .alias("rsi14")
)

少しコード量は増えるが、処理速度は依然として高速である。

データ型の厳格さと欠損値処理

PolarsはRustらしさを受け継いでいるのか、データ型(dtype)に厳格である。

例えば「数値カラムのはずの列に文字列 "-" が混じっていて、計算時に落ちる」という事態はPandasではありがちだ。しかし、Polarsは読み込み時点(Schema解決時)でエラーを吐くか、Null に変換してくれる。

Null処理のモダンなアプローチ

欠損値(Null)の扱いも直感的でわかりやすい(と個人的に思う)。

# 直前の値で埋める (Forward Fill)
df.with_columns(pl.col("close").fill_null(strategy="forward"))

# 0で埋める
df.with_columns(pl.col("volume").fill_null(0))

また、システムトレードでハマりやすいのが タイムゾーン である。 Polarsの Datetime 型はタイムゾーンを明確に管理できる。

# UTCからJSTへ変換
df.with_columns(
    pl.col("timestamp").dt.convert_time_zone("Asia/Tokyo")
)

これを怠ると、バックテストで「朝9時のエントリーのはずが、深夜0時にエントリーしていた」というミスが発生する。Polarsの厳格さは、こうしたミスを未然に防いでくれる。

今日の成果

  • Polars の LazyFrame を使い、メモリ効率の良いデータ読み込みを実装した。
  • over("code") を使い、銘柄ごとの移動平均計算を高速化した。
  • TA-Lib との連携方法を確認した。

これで、大量のデータを扱う準備は整った。 次回は、このデータをどこに保存するかについて考える。CSVもParquetも管理しきれなくなったときの避難先として、DuckDB を紹介する。

ベンチマーク

ベンチマークに使用したスクリプト

ダミーのデータを生成して、PandasとPolarsのベンチマークを比較する。

ベンチマークスクリプト

import time
import numpy as np
import pandas as pd
import polars as pl
import os
import shutil
from datetime import datetime, timedelta

# 設定
DATA_FILE = "data/stock_history_large.parquet"
NUM_STOCKS = 50
YEARS = 10
# 単純な仮定: 250営業日 * 5時間 * 60分 = 年間75,000分
MINUTES_PER_YEAR = 250 * 300 
TOTAL_MINUTES = YEARS * MINUTES_PER_YEAR

try:
    import talib
    HAS_TALIB = True
except ImportError:
    HAS_TALIB = False
    print("TA-Libが見つからない。RSIの計算にはNumPyのフォールバックを使用する。")

def generate_synthetic_data(filename):
    print(f"合成データを生成中: {NUM_STOCKS} 銘柄, {YEARS} 年分 (各 {TOTAL_MINUTES} 行)...")
    start_time = time.time()
    
    # ディレクトリが存在しない場合は作成
    os.makedirs(os.path.dirname(filename), exist_ok=True)
    
    dfs = []
    
    # 各銘柄のデータを生成
    # 生成中のメモリを節約するため1銘柄ずつ生成するが、50項目程度ならリストへの追加で十分効率的。
    
    start_date = datetime(2015, 1, 1)
    end_date = start_date + timedelta(minutes=TOTAL_MINUTES - 1)
    
    # Polarsを使用して効率的に日付範囲を生成
    # これは適切なDatetime型のPolars Seriesを返します
    base_dates = pl.datetime_range(start_date, end_date, "1m", eager=True)
    
    for i in range(NUM_STOCKS):
        code = f"{7000 + i}"
        
        # 価格のランダムウォーク
        # 開始価格は1000から5000の間
        start_price = np.random.uniform(1000, 5000)
        returns = np.random.normal(0, 0.0002, TOTAL_MINUTES) # 微小な分足ボラティリティ
        price_path = start_price * np.exp(np.cumsum(returns))
        
        # OHLVCの生成(簡単な近似)
        close = price_path
        open_ = close * (1 + np.random.normal(0, 0.0001, TOTAL_MINUTES))
        high = np.maximum(open_, close) * (1 + np.abs(np.random.normal(0, 0.0001, TOTAL_MINUTES)))
        low = np.minimum(open_, close) * (1 - np.abs(np.random.normal(0, 0.0001, TOTAL_MINUTES)))
        volume = np.random.randint(100, 10000, TOTAL_MINUTES).astype(float)
        
        # この銘柄のDataFrameを作成
        # 辞書を使用する方が作成が速い。
        data = {
            "code": [code] * TOTAL_MINUTES,
            "date": base_dates,
            "open": open_,
            "high": high,
            "low": low,
            "close": close,
            "volume": volume
        }
        
        # 後でconcatするためにPandasを使用するが、保存にはPolarsの方が速い可能性。
        # 保存するために直接Polars DataFrameを作成。
        df = pl.DataFrame(data)
        
        # 必要に応じて適切な型にキャストしてスペース/メモリを節約できるが、ベンチマークにはデフォルト(f64, i64)で問題なし。
        dfs.append(df)
        
        if (i + 1) % 10 == 0:
            print(f"{i + 1}/{NUM_STOCKS} 銘柄の生成が完了。")

    print("結合してParquetに保存中...")
    # 全てを結合(Polarsはこれが非常に高速)。
    full_df = pl.concat(dfs)
    
    # Parquetに保存
    full_df.write_parquet(filename)
    
    end_time = time.time()
    print(f"データ生成完了。サイズ: {full_df.estimated_size() / 1024**3:.2f} GB (メモリ上)。")
    print(f"ファイルを {filename} に保存した。所要時間: {end_time - start_time:.2f}秒")
    
    # メモリを解放
    del dfs
    del full_df

# --- ベンチマーク関数 ---

def numpy_rsi(prices, period=14):
    # NumPyを使用した簡略版RSI実装
    deltas = np.diff(prices)
    seed = deltas[:period+1]
    up = seed[seed >= 0].sum()/period
    down = -seed[seed < 0].sum()/period
    rs = up/down
    rsi = np.zeros_like(prices)
    rsi[:period] = np.nan
    rsi[period] = 100. - 100./(1. + rs)

    for i in range(period+1, len(prices)):
        delta = deltas[i-1] # 階差をとるため長さが1短くなることに注意

        if delta > 0:
            upval = delta
            downval = 0.
        else:
            upval = 0.
            downval = -delta

        up = (up*(period-1) + upval)/period
        down = (down*(period-1) + downval)/period

        rs = up/down
        rsi[i] = 100. - 100./(1. + rs)
    return rsi

# ブロードキャスト(可能な限りのベクトル化)用に最適化されたNumPy RSIは、ループや特定の関数なしでは困難です
# 公平性のために、TALibがない場合はPandas用に少しベクトル化されたPandasフレンドリーなバージョンを使用します
def pandas_rsi_manual(series, period=14):
    delta = series.diff()
    gain = (delta.where(delta > 0, 0)).fillna(0)
    loss = (-delta.where(delta < 0, 0)).fillna(0)
    avg_gain = gain.rolling(window=period, min_periods=period).mean()
    avg_loss = loss.rolling(window=period, min_periods=period).mean()
    rs = avg_gain / avg_loss
    return 100 - (100 / (1 + rs))

# TALibがない場合のPolarsマニュアルRSI
def polars_rsi_manual(series, period=14):
    # これはmap_batches内で直接使用されるわけではありませんが、Polarsの式やPython関数内でロジックを再現するためのものです
    # map_batchesを使用する場合は、NumPy配列を渡すことができます
    # map_batchesを使用する場合は、単に同じNumPy関数を使用します
    pass


def run_pandas_benchmark():
    print("\n--- Pandas ベンチマーク ---")
    results = {}
    
    # 1. データの読み込み
    t0 = time.time()
    df = pd.read_parquet(DATA_FILE)
    t1 = time.time()
    results['Load'] = t1 - t0
    print(f"Load: {results['Load']:.4f}s")
    
    # 2. フィルタリング
    t0 = time.time()
    # 記事に合わせたロジック: 銘柄コードと日付でフィルタ
    # Pandasでは、記事の「Pandasの使い方」にあるように、全てをロードしてからフィルタリングする。
    # 注: 記事では「Pandasの全ロード + フィルタ」と「Polarsのスキャン + フィルタ」を比較している。
    # 指示への公平性のために、記事の内容に従う。
    
    # rolling/groupbyの速度差を示すために、データセット全体で指標を計算する。
    # コピーを使用するか、単にdfを修正するか?標準的なpandasは修正するか新規を返す。
    # 負荷テストのために全データで移動平均を実行。
    
    # フィルタリングの例(トヨタ自動車相当)
    target_code = "7024" # 任意の銘柄を選択
    subset = df[(df['code'] == target_code) & (df['date'] >= "2020-01-01")]
    t1 = time.time()
    results['Filter'] = t1 - t0
    print(f"フィルタリング (サブセット): {results['Filter']:.4f}秒")
    
    # 3. 移動平均 (グループ化)
    t0 = time.time()
    # df['ma25'] = df.groupby('code')['close'].transform(lambda x: x.rolling(25).mean()) 
    # lambdaバージョンは低速。最適化されたDataFrameGroupBy.rollingの方が良いが、transformは扱いにくい。
    # 記事では「df.groupby('code')['close'].transform(lambda x: x.rolling(25).mean())」となっている。
    # 可能であれば最適化された方法を使用するか、それとも記事に示された方法を使用して論点を示すか。
    # 記事では、lambda/transformを使用した方法を明示的に「Pandasの場合(遅い)」と呼んでいる。
    # 意図的に最も遅いわけではないかもしれないが、適度に標準的なpandasのアプローチを採用。
    # df.groupby('code').rolling(25)['close'].mean() はマルチインデックスのSeriesを返す。
    # マッピングし直すのは低速な可能性。
    # 組み込み関数でtransformを使用する方がlambdaより速いが、rollingはtransformの引数として直接渡せない。
    # 著者の主張を実証するため、記事に記載されている方法を使用。
    df['ma25'] = df.groupby('code')['close'].transform(lambda x: x.rolling(25).mean())
    t1 = time.time()
    results['Rolling Mean'] = t1 - t0
    print(f"Rolling Mean (Grouped): {results['Rolling Mean']:.4f}s")
    
    # 4. RSI (グループ化)
    t0 = time.time()
    if HAS_TALIB:
        # Pandasのapplyをtalibと共に使用するのは、行単位またはグループ単位のapplyになるため、一般的に低速。
        df['rsi14'] = df.groupby('code')['close'].transform(
            lambda x: pd.Series(talib.RSI(x.values, timeperiod=14), index=x.index)
        )
    else:
        # 手動によるPandas RSI
        df['rsi14'] = df.groupby('code')['close'].transform(
             lambda x: pandas_rsi_manual(x, 14)
        )
            
    t1 = time.time()
    results['RSI'] = t1 - t0
    print(f"RSI (Grouped): {results['RSI']:.4f}s")
    
    return results

def run_polars_benchmark():
    print("\n--- Polars ベンチマーク ---")
    results = {}
    
    # 1. 読み込み (スキャン) + フィルタリング
    # Polarsではこれらを組み合わせることが一般的。サブセットに対して「スキャン + コレクト」を測定し、指標に対して「スキャン + 全件計算」を実行。
    # Pandasの「ロード」時間と公平に比較するために、全てをロードする場合もあるが、記事ではLazy(遅延評価)が強調されている。
    
    t0 = time.time()
    q = pl.scan_parquet(DATA_FILE)
    
    # フィルタリング
    target_code = "7024"
    q_filter = (
        q.filter(pl.col("code") == target_code)
         .filter(pl.col("date") >= datetime(2020, 1, 1))
    )
    # サブセットをコレクト
    df_subset = q_filter.collect() 
    t1 = time.time()
    # Pandasでは「ファイル全体のロード」の後に「フィルタリング」を測定した。
    # ここでは「スキャン + フィルタ + コレクト」を測定。これはIDとIOを組み合わせたもの。
    # これは「欲しい特定のデータを取得する速度」の指標。
    results['Load (Lazy Filter)'] = t1 - t0
    print(f"Load (Lazy Filter): {results['Load (Lazy Filter)']:.4f}s")
    
    # さあ、本番です: 全データセットに対するロジック
    # GroupBy Rollingを比較するために、メモリ上に完全なDataFrameが必要です
    t0 = time.time()
    df_pl = pl.read_parquet(DATA_FILE)
    t_load_full = time.time() - t0
    print(f"(参考: Polars 全件読み込み: {t_load_full:.4f}秒)")
    
    # 2. 移動平均
    t0 = time.time()
    df_pl = df_pl.with_columns([
        pl.col("close")
          .rolling_mean(window_size=25)
          .over("code")
          .alias("ma25")
    ])
    t1 = time.time()
    results['Rolling Mean'] = t1 - t0
    print(f"Rolling Mean (Grouped): {results['Rolling Mean']:.4f}s")
    
    # 3. RSI
    t0 = time.time()
    if HAS_TALIB:
        # 記事のようにmap_batchesを使用
        def calculate_rsi(close_prices: pl.Series, timeperiod=14) -> pl.Series:
            # NULLや空のSeriesがある場合に対処
            if len(close_prices) == 0: return pl.Series([], dtype=pl.Float64)
            rsi_np = talib.RSI(close_prices.to_numpy(), timeperiod=timeperiod)
            return pl.Series(rsi_np)

        df_pl = df_pl.with_columns(
            pl.col("close")
            .map_batches(lambda x: calculate_rsi(x, 14))
            .over("code")
            .alias("rsi14")
        )
    else:
        # 式を使用したPolarsの手動RSI
        # RSI = 100 - 100 / (1 + RS)
        # RS = AvgGain / AvgLoss
        
        # 標準的なRSIはWilderの平滑化(概ねEMA)を使用します
        # このベンチマークでは、単純化のためにPandasの手動バージョンに近い式(単純移動平均または簡単な計算)を使用します
        # または、pandas_rsi_manual相当(numpy)を使用してmap_batchesを使用します
        
        # 速度のためにNumPyでmap_batchesを使用し、「Polarsスタイル」を維持します(Rustベースのロジックが最適ですが、map_batches経由のPython計算は標準的なフォールバックです)
        df_pl = df_pl.with_columns(
            pl.col("close")
            .map_batches(lambda x: pd.Series(pandas_rsi_manual(pd.Series(x), 14)).values) # Pandas/NumPyのロジックを再利用
            .over("code")
            .alias("rsi14")
        )
        
    t1 = time.time()
    results['RSI'] = t1 - t0
    print(f"RSI (Grouped): {results['RSI']:.4f}s")

    return results

def main():
    if not os.path.exists(DATA_FILE):
        generate_synthetic_data(DATA_FILE)
    else:
        print(f"データファイル {DATA_FILE} が存在する。生成をスキップする。")
        
    p_res = run_pandas_benchmark()
    pl_res = run_polars_benchmark()
    
    print("\n" + "="*40)
    print("最終比較結果")
    print("="*40)
    print(f"{'タスク':<25} | {'Pandas (秒)':<10} | {'Polars (秒)':<10} | {'倍速'}")
    print("-" * 65)
    
    # 移動平均の比較
    pd_roll = p_res['Rolling Mean']
    pl_roll = pl_res['Rolling Mean']
    print(f"{'Rolling Mean (Grouped)':<25} | {pd_roll:<10.4f} | {pl_roll:<10.4f} | {pd_roll/pl_roll:<10.1f}x")
    
    # RSIの比較
    pd_rsi = p_res['RSI']
    pl_rsi = pl_res['RSI']
    print(f"{'RSI (Grouped)':<25} | {pd_rsi:<10.4f} | {pl_rsi:<10.4f} | {pd_rsi/pl_rsi:<10.1f}x")
    
    print("-" * 65)
    print("注: 「読み込み」の比較は手法が異なる (全件読み込み vs 遅延フィルタリング)。")
    print("これはワークフローの効率の差を示している。")

if __name__ == "__main__":
    main()

出力

出力結果

$ uv run python scripts/benchmark_polars_pandas.py 
データファイル data/stock_history_large.parquet が存在する。生成をスキップする。

--- Pandas ベンチマーク ---
Load: 1.7503s
フィルタリング (サブセット): 0.5683秒
Rolling Mean (Grouped): 2.4783s
RSI (Grouped): 1.8133s

--- Polars ベンチマーク ---
Load (Lazy Filter): 0.0078s
(参考: Polars 全件読み込み: 0.5328秒)
Rolling Mean (Grouped): 1.1277s
RSI (Grouped): 1.2417s

========================================
最終比較結果
========================================
タスク                       | Pandas (秒) | Polars (秒) | 倍速
-----------------------------------------------------------------
Rolling Mean (Grouped)    | 2.4783     | 1.1277     | 2.2       x
RSI (Grouped)             | 1.8133     | 1.2417     | 1.5       x
-----------------------------------------------------------------
注: 「読み込み」の比較は手法が異なる (全件読み込み vs 遅延フィルタリング)。
これはワークフローの効率の差を示している。

なお、下記はあまり本文には関係ないアイキャッチ画像である。生成AIで作ったが、これくらいのものなら一瞬でできてしまうので、すごい時代になったなと思う。

Pandas vs Polars

(C) 2020 dogwood008 禁無断転載 不許複製 Reprinting, reproducing are prohibited.