株のシステムトレードをしよう - 1から始める株自動取引システムの作り方

株式をコンピュータに売買させる仕組みを少しずつ作っていきます。できあがってから公開ではなく、書いたら途中でも記事として即掲載して、後から固定ページにして体裁を整える方式で進めていきます。

kabu STATION APIを使ったリアルタイムトレード用のクラスを作る その8

まず、kabu STATION API を保存しておくストアを作る。

class KabuSAPIStore(with_metaclass(MetaSingleton, object)):
    '''Singleton class wrapping to control the connections to Kabu STATION API.

    Params:

      - ``url`` (default:``localhost``): The url to access API

      - ``port`` (default: ``port``): 18081 is dev, 18080 is prod

      - ``password`` (default: ``None``): The password for API, not login
    '''

    BrokerCls = None  # broker class will autoregister
    DataCls = None  # data class will auto register

    params = (
        ('url', 'localhost'),
        ('port', '18081'),
        ('password', None),
    )

    @classmethod
    def getdata(cls, *args, **kwargs):
        '''Returns ``DataCls`` with args, kwargs'''
        return cls.DataCls(*args, **kwargs)

    @classmethod
    def getbroker(cls, *args, **kwargs):
        '''Returns broker with *args, **kwargs from registered ``BrokerCls``'''
        return cls.BrokerCls(*args, **kwargs)

そして、その中のイニシャライザで、APIを初期化する。

class KabuSAPIStore(with_metaclass(MetaSingleton, object)):
(略)
    def __init__(self):
        super(KabuSAPIStore, self).__init__()

        self.notifs = collections.deque()  # store notifications for cerebro

        self._env = None  # reference to cerebro for general notifications
        self.broker = None  # broker instance
        self.datas = list()  # datas that have registered over start

        self._orders = collections.OrderedDict()  # map order.ref to oid
        self._ordersrev = collections.OrderedDict()  # map oid to order.ref
        self._transpend = collections.defaultdict(collections.deque)

        self._oenv = self._ENVPRACTICE if self.p.practice else self._ENVLIVE
        token = kabusapi.Context(url, port, password).token
        self.kapi = kabusapi.Context(url, port, token=token)

        self._cash = 0.0
        self._value = 0.0
        self._evt_acct = threading.Event()

まだ動作確認できないが、おそらくこんな感じで進められると思う。引き続きやっていく。

(C) 2020 dogwood008 禁無断転載 不許複製 Reprinting, reproducing are prohibited.