株のシステムトレードをしよう - 1から始める株自動取引システムの作り方

株式をコンピュータに売買させる仕組みを少しずつ作っていきます。できあがってから公開ではなく、書いたら途中でも記事として即掲載して、後から固定ページにして体裁を整える方式で進めていきます。

kabu STATION API の PUSH API(時価PUSH配信)を記録し、あとから再生したい 08

終了時に、DBの接続を正常にクローズする処理がうまく動いていないが、WebSocketのメッセージを標準出力に書き出すところまではできている。

'use strict';

// ref: https://shizenkarasuzon.hatenablog.com/entry/2021/04/21/004132

import WebSocket from 'ws';

// ref: https://stackoverflow.com/q/70306590/15983717
import pg from 'pg';
const { Pool } = pg;

function initWebSocket () {
  const protocol = process.env.PROTOCOL;
  const host = process.env.HOST;
  const port = process.env.PORT;
  const path = process.env.WEBSOCKET_PATH || '/';
  const server = `${protocol}://${host}:${port}${path}`;

  console.log(server)
  return new WebSocket(server);
}

function createDbSql(dbName) {
  return `SELECT 'CREATE DATABASE ${dbName}'
    WHERE NOT EXISTS (SELECT FROM pg_database WHERE datname = '${dbName}');`
}

function createTableSql(stockCode) {
  return `CREATE TABLE IF NOT EXISTS stock_${stockCode} (
    datetime timestamp NOT NULL,
    volume int NOT NULL,
    price float NOT NULL
    );`
}

function insertSql({ stock_code, dateTime, volume, price }) {
  return `INSERT INTO stock_${stock_code} VALUES(
    ${dateTime},
    ${volume},
    ${price}
  );`
}

async function initPg() {
  const pool = new Pool({
    host: process.env.POSTGRES_HOST,
    user: process.env.POSTGRES_USER,
    password: process.env.POSTGRES_PASSWORD,
    port: process.env.POSTGRES_PORT,
  });
  const connect = await pool.connect();
  const createDbSqlStatement = createDbSql('websocket_messages');
  console.log(createDbSqlStatement)
  await connect.query(createDbSqlStatement);
  for(const stockCode of ['7974']) {
    const sql = createTableSql(stockCode);
    console.log({sql})
    await connect.query(sql);
  }

  return { pool, connect };
}

async function exit(connect, pool) {
  // await client.end();
  debugger
  await connect.release();
  await pool.end();
}

async function main({ pool, connect }) {
  ws.on('open', function open() {
    console.log('open');
  });
  
  ws.on('error', async function (event) {
    console.log(event);
    await exit({ connect, pool });
  });
  
  ws.on('close', async function close() {
    await exit({ connect, pool });
  });

  ws.on('message', function message(data) {
    console.log('%s', data.toString());
  });
}

process.on('SIGINT', function() {
  console.log('SIGINT');
  exit();
})

const ws = initWebSocket();
const { pool, connect } = await initPg();
(async ({ pool, connect }) => {
  await main({pool, connect});
})({ pool, connect });

(C) 2020 dogwood008 禁無断転載 不許複製 Reprinting, reproducing are prohibited.