過去に触った記事の続き。
how-to-make-stock-trading-system.dogwood008.com
基本的なロジックは下記と同等だが、一部変更している部分がある。DataFrame結合の際に型が一致していなかった部分を直したり、変数のスコープを定めるためにクラス化したりした。
実行すると、下図のような表を得られる。データ自体は権利の問題がある可能性を考慮し、ここでは公開しない。
ソースコードは次の通り。 import
は未だ整理していないので、同じパッケージを複数 import
している部分があるが、本来重複するものは不要。
明日は、サンプルでも触れられていたグラフをプロットするところまでやってみる。
from datetime import datetime from dateutil import tz import jquantsapi import pandas as pd import os import numpy as np class Prices(): def __init__(self, storage_dir_path: str, jqapi: jquantsapi.Client): self.storage_dir_path = storage_dir_path self._jqapi = jqapi self._price_file = f"{self.storage_dir_path}/price.pkl" def fetch_prices_from_api_with_cache(self) -> pd.DataFrame: ''' Original: J-Quants/jquants-api-client-python https://github.com/J-Quants/jquants-api-client-python/blob/da16a23d85c80a0106673f0a0deaec3437016418/examples/20220825-003-dividend.ipynb Arguments ------------------- storage_dir_path: str DataFrameをpickle化したものを保存する親ディレクトリへのパス ''' # 株価情報を取得します now = pd.Timestamp.now(tz="Asia/Tokyo") # 過去3ヶ月のデータを取得 start_dt = now - pd.Timedelta(95, unit="D") end_dt = now if end_dt.hour < 19: # データ更新時間前の場合は日付を1日ずらします。 end_dt -= pd.Timedelta(1, unit="D") if not os.path.isfile(self._price_file): df_p = self._jqapi.get_price_range(start_dt=start_dt, end_dt=end_dt) df_p.reset_index(drop=True, inplace=True) df_p = self._format_price_df(df_p) self.save_price_df(df_p) else: # データを読み込みます df_p = self.load_price_df() return df_p def save_price_df(self, df_p: pd.DataFrame): df_p.to_pickle(self._price_file) print(f"save file: {self._price_file}") def load_price_df(self): print(f"file exists: {self._price_file}, loading") df_p = pd.read_pickle(self._price_file) return df_p def _format_price_df(self, df_p: pd.DataFrame) -> pd.DataFrame: # 各列のデータ型を調整します df_p.loc[:, "Date"] = pd.to_datetime(df_p["Date"], format="%Y-%m-%d") df_p.loc[:, "Open"] = df_p["Open"].astype(np.float64) df_p.loc[:, "High"] = df_p["High"].astype(np.float64) df_p.loc[:, "Low"] = df_p["Low"].astype(np.float64) df_p.loc[:, "Close"] = df_p["Close"].astype(np.float64) df_p.loc[:, "Volume"] = df_p["Volume"].astype(np.float64) df_p.loc[:, "TurnoverValue"] = df_p["TurnoverValue"].astype(np.float64) df_p.loc[:, "AdjustmentFactor"] = df_p["AdjustmentFactor"].astype(np.float64) df_p.loc[:, "AdjustmentOpen"] = df_p["AdjustmentOpen"].astype(np.float64) df_p.loc[:, "AdjustmentHigh"] = df_p["AdjustmentHigh"].astype(np.float64) df_p.loc[:, "AdjustmentLow"] = df_p["AdjustmentLow"].astype(np.float64) df_p.loc[:, "AdjustmentClose"] = df_p["AdjustmentClose"].astype(np.float64) df_p.loc[:, "AdjustmentVolume"] = df_p["AdjustmentVolume"].astype(np.float64) return df_p
from datetime import datetime from dateutil import tz import jquantsapi import pandas as pd import os import numpy as np from typing import Union, List, Optional DatetimeLike = Optional[Union[datetime, pd.Timestamp, str]] class Statements: def __init__(self, storage_dir_path: str, jqapi: jquantsapi.Client): self.storage_dir_path:str = storage_dir_path self._jqapi:jquantsapi.Client = jqapi self.statements_file:str = f"{self.storage_dir_path}/statements.pkl" self._cache_dir = f"{self.storage_dir_path}/raw_statements" def fetch_from_api_and_append(self, start_dt: DatetimeLike=None, end_dt: DatetimeLike=None): ''' Arguments ------------------- start_dt: DatetimeLike 取得開始日 end_dt: DatetimeLike 取得終了日 ''' previous_df = self.load_df() df = self.fetch_from_api(start_dt, end_dt) appended_df = pd.concat([previous_df, df]) unique_df = appended_df.loc[~appended_df.duplicated(subset='DisclosureNumber')] self.save_df(unique_df) return unique_df def fetch_from_api(self, start_dt: DatetimeLike=None, end_dt: DatetimeLike=None): ''' Original: J-Quants/jquants-api-client-python https://github.com/J-Quants/jquants-api-client-python/blob/da16a23d85c80a0106673f0a0deaec3437016418/examples/20220825-003-dividend.ipynb Arguments ------------------- start_dt: DatetimeLike 取得開始日 end_dt: DatetimeLike 取得終了日 ''' if not start_dt or not end_dt: # 過去3ヶ月に発表された財務情報を取得します now = pd.Timestamp.now(tz="Asia/Tokyo") start_dt = now - pd.Timedelta(90, unit="D") end_dt = now if end_dt.hour < 1: # データ更新時間前の場合は日付を1日ずらします。 end_dt -= pd.Timedelta(1, unit="D") if not os.path.isfile(self.statements_file): os.makedirs(self._cache_dir, exist_ok=True) df_s = self._jqapi.get_statements_range( start_dt=start_dt, end_dt=end_dt, cache_dir=self._cache_dir ) df = self._format_df(df_s) return df def _format_df(self, df_s: pd.DataFrame): ''' Original: J-Quants/jquants-api-client-python https://github.com/J-Quants/jquants-api-client-python/blob/da16a23d85c80a0106673f0a0deaec3437016418/examples/20220825-003-dividend.ipynb ''' # float64にするために"-"をnp.nanに置き換えます df_s.replace({"-": np.nan}, inplace=True) float_columns: List[str] = ['ResultDividendPerShareFiscalYearEnd', 'EarningsPerShare', 'ForecastDividendPerShareAnnual', 'ForecastEarningsPerShare'] for column in float_columns: df_s.loc[:, column] = pd.to_numeric(df_s.loc[:, column], errors='coerce') # 日付型に変換します date_columns: List[str] = ['DisclosedDate', 'CurrentPeriodEndDate', 'CurrentFiscalYearStartDate', 'CurrentFiscalYearEndDate'] for column in date_columns: df_s.loc[:, column] = pd.to_datetime( df_s[column], format="%Y-%m-%d" ) df_s.sort_values("DisclosedUnixTime", inplace=True) df_s.index = df_s.DisclosureNumber df_s.index.name = 'DisclosureNumberIndex' return df_s def save_df(self, df: pd.DataFrame) -> pd.DataFrame: df.to_pickle(self.statements_file) print(f"save file: {self.statements_file}") def load_df(self) -> pd.DataFrame: ''' Original: J-Quants/jquants-api-client-python https://github.com/J-Quants/jquants-api-client-python/blob/da16a23d85c80a0106673f0a0deaec3437016418/examples/20220825-003-dividend.ipynb ''' print(f"file exists: {self.statements_file}, loading") df_s = pd.read_pickle(self.statements_file) return df_s
class Yields(): def __init__(self, statements: pd.DataFrame, prices: pd.DataFrame) -> pd.DataFrame: ''' Original: https://github.com/J-Quants/jquants-api-client-python/blob/da16a23d85c80a0106673f0a0deaec3437016418/examples/20220825-003-dividend.ipynb ''' df_work = statements.copy() df_p_work = prices.copy() # 財務情報を銘柄ごとに重複を排除して最新の財務情報のみを使用します df_work.sort_values("DisclosedUnixTime", inplace=True) df_work = df_work.drop_duplicates(["LocalCode"], keep="last") # 終値が0の場合は前営業日の終値を使用します df_p_work.sort_values(["Code", "Date"], inplace=True) df_p_work["AdjustmentClose"].replace({0.0: np.nan}, inplace=True) df_p_work.loc[:, "AdjustmentClose"] = df_p_work.groupby("Code")["AdjustmentClose"].ffill() # 終値がnanの場合は翌営業日の終値を使用します (データの先頭) df_p_work.loc[:, "AdjustmentClose"] = df_p_work.groupby("Code")["AdjustmentClose"].bfill() # 各銘柄の直近のリターンを算出します def _calc_return(df, bdays): return (df["AdjustmentClose"].iat[-1] / df["AdjustmentClose"].iloc[-bdays:].iat[0]) - 1 #return (df["AdjustmentClose"].iat[-1] / float(df["AdjustmentClose"].iloc[-bdays:].iat[0])) - 1 df_p_work.sort_values(["Code", "Date"], inplace=True) df_returns_1months = df_p_work.groupby("Code").apply(_calc_return, 20).rename("1ヶ月リターン") df_returns_1months.index = pd.Series(df_returns_1months.index).apply(pd.to_numeric, errors='coerce') df_returns_3months = df_p_work.groupby("Code").apply(_calc_return, 60).rename("3ヶ月リターン") df_returns_3months.index = pd.Series(df_returns_3months.index).apply(pd.to_numeric, errors='coerce') # リターンと結合します df_work = pd.merge(df_work, df_returns_1months, left_on=["LocalCode"], right_index=True, how="left") df_work = pd.merge(df_work, df_returns_3months, left_on=["LocalCode"], right_index=True, how="left") # 配当利回りを計算するために直近の終値を取得します df_close = df_p_work.loc[df_p_work["Date"] == df_p_work["Date"].max(), ["Code", "Date", "AdjustmentClose"]] df_close.Code = df_close.Code.apply(pd.to_numeric, errors='coerce') # 直近の株価と結合します df_work = pd.merge(df_work, df_close, left_on=["LocalCode"], right_on=["Code"], how="left") # 配当利回りを算出します df_work["配当利回り"] = df_work["ResultDividendPerShareFiscalYearEnd"] / df_work["AdjustmentClose"] # 予想配当利回りを算出します df_work["予想配当利回り"] = df_work["ForecastDividendPerShareAnnual"] / df_work["AdjustmentClose"] # 配当性向を算出します df_work["配当性向"] = df_work["ResultDividendPerShareFiscalYearEnd"] / df_work["EarningsPerShare"] # 予想配当性向を算出します df_work["予想配当性向"] = df_work["ForecastDividendPerShareAnnual"] / df_work["ForecastEarningsPerShare"] # 銘柄名と結合します df_list = cli.get_list() df_list.Code = df_list.Code.astype(np.int64) df_work = pd.merge(df_work, df_list, left_on=["LocalCode"], right_on=["Code"]) # 表示用に開示日を追加します df_work["開示日"] = df_work["DisclosedDate"].dt.strftime("%Y-%m-%d") self.df = df_work def top20(self, column_name: str) -> pd.DataFrame: # 表示する項目を指定します output_cols = [ "LocalCode", "CompanyName", "開示日", "配当性向", "予想配当性向", "配当利回り", "予想配当利回り", "1ヶ月リターン", "3ヶ月リターン", ] # 項目別に表示方法を指定します output_format = { "配当性向": "{:.2%}", "予想配当性向": "{:.2%}", "配当利回り": "{:.2%}", "予想配当利回り": "{:.2%}", "1ヶ月リターン": "{:.2%}", "3ヶ月リターン": "{:.2%}", } # column_name順に上位20銘柄を表示します return self.df.sort_values([column_name], ascending=False)[output_cols].head(20).style.format(output_format) my_refresh_token: str = '(token)' cli = jquantsapi.Client(refresh_token=my_refresh_token) parent_path = '/content/drive/MyDrive/drive_ws/marketdata' prices = Prices(parent_path, cli) df_prices = prices.fetch_prices_from_api_with_cache() statements = Statements(parent_path, cli) df_statements = statements.fetch_from_api_and_append( start_dt='2017-01-01', end_dt='2022-12-31' ) yields = Yields(statements=df_statements, prices=df_prices) for column_name in ['配当利回り', '予想配当利回り']: print('---------------') print(column_name) display(yields.top20(column_name)) print('---------------')