まず、kabu STATION API を保存しておくストアを作る。
class KabuSAPIStore(with_metaclass(MetaSingleton, object)): '''Singleton class wrapping to control the connections to Kabu STATION API. Params: - ``url`` (default:``localhost``): The url to access API - ``port`` (default: ``port``): 18081 is dev, 18080 is prod - ``password`` (default: ``None``): The password for API, not login ''' BrokerCls = None # broker class will autoregister DataCls = None # data class will auto register params = ( ('url', 'localhost'), ('port', '18081'), ('password', None), ) @classmethod def getdata(cls, *args, **kwargs): '''Returns ``DataCls`` with args, kwargs''' return cls.DataCls(*args, **kwargs) @classmethod def getbroker(cls, *args, **kwargs): '''Returns broker with *args, **kwargs from registered ``BrokerCls``''' return cls.BrokerCls(*args, **kwargs)
そして、その中のイニシャライザで、APIを初期化する。
class KabuSAPIStore(with_metaclass(MetaSingleton, object)): (略) def __init__(self): super(KabuSAPIStore, self).__init__() self.notifs = collections.deque() # store notifications for cerebro self._env = None # reference to cerebro for general notifications self.broker = None # broker instance self.datas = list() # datas that have registered over start self._orders = collections.OrderedDict() # map order.ref to oid self._ordersrev = collections.OrderedDict() # map oid to order.ref self._transpend = collections.defaultdict(collections.deque) self._oenv = self._ENVPRACTICE if self.p.practice else self._ENVLIVE token = kabusapi.Context(url, port, password).token self.kapi = kabusapi.Context(url, port, token=token) self._cash = 0.0 self._value = 0.0 self._evt_acct = threading.Event()
まだ動作確認できないが、おそらくこんな感じで進められると思う。引き続きやっていく。