だいたいできてきた。細かい部分の調整は未だ必要。
'use strict'; // ref: https://shizenkarasuzon.hatenablog.com/entry/2021/04/21/004132 import WebSocket from 'ws'; // ref: https://stackoverflow.com/q/70306590/15983717 import pg from 'pg'; const { Pool } = pg; const debug = !!process.env.DEBUG; function initWebSocket () { const protocol = process.env.PROTOCOL; const host = process.env.HOST; const port = process.env.PORT; const path = process.env.WEBSOCKET_PATH || '/'; const server = `${protocol}://${host}:${port}${path}`; console.log(server) return new WebSocket(server); } function createDbSql(dbName) { return `SELECT 'CREATE DATABASE ${dbName}' WHERE NOT EXISTS (SELECT FROM pg_database WHERE datname = '${dbName}');` } function createTableSql(stockCode) { return `CREATE TABLE IF NOT EXISTS stock_${stockCode} ( datetime timestamp NOT NULL, volume int NOT NULL, price float NOT NULL );` } function insertSql({ stock_code, dateTime, volume, price }) { return `INSERT INTO stock_${stock_code} VALUES( '${dateTime}', ${volume}, ${price} );` } async function initPg() { const pool = new Pool({ host: process.env.POSTGRES_HOST, user: process.env.POSTGRES_USER, password: process.env.POSTGRES_PASSWORD, port: process.env.POSTGRES_PORT, }); const connect = await pool.connect(); const createDbSqlStatement = createDbSql('websocket_messages'); console.log(createDbSqlStatement) await connect.query(createDbSqlStatement); for(const stockCode of ['7974']) { const sql = createTableSql(stockCode); console.log({sql}) await connect.query(sql); } return { pool, connect }; } async function exit(connect, pool) { await connect.release(); await pool.end(); } async function main({ pool, connect }) { let currentTradingVolumeTime = null; let currentTradingVolume = 0; ws.on('open', function open() { console.log('open'); }); ws.on('error', async function (event) { console.log(event); await exit({ connect, pool }); }); ws.on('close', async function close() { await exit({ connect, pool }); }); ws.on('message', function message(data) { if (debug) { console.log('%s', data.toString()); } const json = JSON.parse(data.toString()); const tradingVolumeTime = json.TradingVolumeTime; if (currentTradingVolumeTime === tradingVolumeTime) { return; } const tradingVolume = json.TradingVolume - currentTradingVolume; const calcPrice = json.CalcPrice; const values = { stock_code: '7974', dateTime: tradingVolumeTime, volume: tradingVolume, price: calcPrice, }; const sql = insertSql({...values}); currentTradingVolume, currentTradingVolumeTime = [tradingVolume, tradingVolumeTime]; connect.query(sql).then(() => { console.log(`[${values.stock_code}] ${values.dateTime} ${values.price} ${values.volume}` ); }); }); } process.on('SIGINT', function() { console.log('SIGINT'); exit(); }) const ws = initWebSocket(); const { pool, connect } = await initPg(); (async ({ pool, connect }) => { await main({pool, connect}); })({ pool, connect });