import { WebSocketServer } from 'ws'; import fs from 'fs'; import path from 'path'; import sqlite3 from 'sqlite3'; import { fileURLToPath } from 'url'; const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); const LOG_DIR = path.join(__dirname, 'logs'); // Ensure logs directory exists if (!fs.existsSync(LOG_DIR)) { fs.mkdirSync(LOG_DIR); } // Initialize SQLite database const db = new sqlite3.Database('devices.db'); db.serialize(() => { db.run("CREATE TABLE IF NOT EXISTS devices (mac TEXT PRIMARY KEY, model TEXT, connected INTEGER, last_seen TEXT)"); // Reset all devices to disconnected on server start db.run("UPDATE devices SET connected = 0"); // Drop old events table to enforce new schema db.run("DROP TABLE IF EXISTS events"); // Create new events table with 'field' and 'type' columns db.run("CREATE TABLE IF NOT EXISTS events (id INTEGER PRIMARY KEY AUTOINCREMENT, mac TEXT, component TEXT, field TEXT, type TEXT, event TEXT, timestamp TEXT)"); }); // Upstream Client Integration import { TischlerClient } from './tischler_client.js'; const UPSTREAM_URL = 'wss://dash.bosewolf.de/agentapi/'; const UPSTREAM_KEY = 'd2fba4ff8cd18b87735bef34088a5fe78e52049145bc336f30adf2da371ff431'; //const upstreamClient = new TischlerClient(UPSTREAM_URL, UPSTREAM_KEY); //upstreamClient.connect().catch(err => console.error('Failed to connect to upstream:', err)); // Map to track connection ID to device MAC const connectionDeviceMap = new Map(); // Map to track all active connection IDs for a given MAC const macConnectionsMap = new Map(); // Helper to deduplicate stateful events // type: 'enum' (button events), 'range' (level 0-100), 'boolean' (on/off, online) function checkAndLogEvent(mac, component, field, type, event, connectionId = null) { // Function to forward event to upstream const forwardToUpstream = () => { const channel = `${component}_${field}`; // e.g. light:0_on, system_online //upstreamClient.sendReadings([{ // device: mac, // channel: channel, // value: event //}]); }; if (field === 'button') { if (connectionId) console.log(`[ID: ${connectionId}] Event logged: ${event} on ${component} (${field})`); const stmt = db.prepare("INSERT INTO events (mac, component, field, type, event, timestamp) VALUES (?, ?, ?, ?, ?, ?)"); stmt.run(mac, component, field, type, String(event), new Date().toISOString()); stmt.finalize(); forwardToUpstream(); return; } // Deduplication key: mac + component + field db.get("SELECT event FROM events WHERE mac = ? AND component = ? AND field = ? ORDER BY id DESC LIMIT 1", [mac, component, field], (err, row) => { if (err) { console.error("Error querying events for deduplication:", err); return; } // Compare stringified versions to be safe (e.g. "true" vs "true") const currentEventStr = String(event); if (!row || row.event !== currentEventStr) { if (connectionId) console.log(`[ID: ${connectionId}] Status change logged: ${event} on ${component} (${field})`); const stmt = db.prepare("INSERT INTO events (mac, component, field, type, event, timestamp) VALUES (?, ?, ?, ?, ?, ?)"); stmt.run(mac, component, field, type, currentEventStr, new Date().toISOString()); stmt.finalize(); forwardToUpstream(); } else { // console.log(`[ID: ${connectionId}] Duplicate event suppressed: ${event} on ${component} (${field})`); } }); } const wss = new WebSocketServer({ port: 8080 }); console.log('Shelly Agent Server listening on port 8080'); // Global counter for connection IDs let connectionIdCounter = 0; const interval = setInterval(() => { wss.clients.forEach((ws) => { if (ws.isAlive === false) { console.log(`[ID: ${ws.connectionId}] Heartbeat missed. Terminating connection...`); return ws.terminate(); } ws.isAlive = false; ws.ping(); }); }, 30000); wss.on('close', () => { clearInterval(interval); }); wss.on('connection', (ws, req) => { ws.isAlive = true; ws.on('pong', () => { ws.isAlive = true; }); const ip = req.socket.remoteAddress; const connectionId = ++connectionIdCounter; ws.connectionId = connectionId; console.log(`New connection from ${ip} (ID: ${connectionId})`); ws.on('message', (message) => { const msgString = message.toString(); // console.log(`Received: ${msgString}`); try { const data = JSON.parse(msgString); // Try to identify the device from 'src' let deviceId = 'unknown_device'; if (data.src) { deviceId = data.src; } if (data.method === 'NotifyFullStatus') { console.log(`[ID: ${connectionId}] Device identified: ${deviceId}`); // Connect event (preliminary, we confirm MAC later but good to track 'device' online if we trust src) // Actually, let's wait until we have the confirmed MAC from RPC or use src if it looks like a MAC-based ID // Ideally we log 'online' when we insert into the devices table. if (data.params) { for (const [key, value] of Object.entries(data.params)) { const possibleMac = data.params.sys ? data.params.sys.mac : null; // Log initial output states if (key.startsWith('switch') && typeof value.output !== 'undefined') { const eventVal = value.output; // true/false if (possibleMac) { checkAndLogEvent(possibleMac, key, 'on', 'boolean', eventVal, connectionId); } } // Log initial level for analog light outputs (0 = off, 1-100 = brightness) if (key.startsWith('light') && typeof value.output !== 'undefined') { const level = value.output === false ? 0 : (value.brightness || 0); if (possibleMac) { checkAndLogEvent(possibleMac, key, 'level', 'range', level, connectionId); } } // Log initial input states (if state is boolean) if (key.startsWith('input') && typeof value.state === 'boolean') { const eventVal = value.state; // true/false if (possibleMac) { checkAndLogEvent(possibleMac, key, 'input', 'boolean', eventVal, connectionId); } } } } // Request device info to populate database const rpcRequest = { id: Math.floor(Math.random() * 10000), src: "server", method: "Shelly.GetDeviceInfo" }; ws.send(JSON.stringify(rpcRequest)); } // Handle properties for device info update if (data.result && (data.result.mac || (data.result.device_id && data.result.model))) { // Adjust based on actual Shelly RPC response structure. // Shelly.GetDeviceInfo usually returns { mac, model, gen, ... } or { name, id, mac, model, gen, ... } // The 'mac' might be in data.result.mac or data.result.ethernet.mac etc depending on firmware/generation. // Assuming flat structure or checking common fields. const mac = data.result.mac || data.result.id; // Fallback const model = data.result.model || data.result.app; // Fallback if (mac && model) { console.log(`[ID: ${connectionId}] Updating device info: MAC=${mac}, Model=${model}`); connectionDeviceMap.set(connectionId, mac); // Check for and drop old connections for this MAC if (macConnectionsMap.has(mac)) { const existingConnections = macConnectionsMap.get(mac); existingConnections.forEach(oldConnId => { if (oldConnId !== connectionId) { console.log(`[ID: ${connectionId}] New connection for MAC=${mac}. Dropping old connection ID: ${oldConnId}`); // Find the socket to terminate wss.clients.forEach(client => { if (client.connectionId === oldConnId) { client.terminate(); } }); // Cleanup maps will happen in 'close' handler of that socket } }); } if (!macConnectionsMap.has(mac)) { macConnectionsMap.set(mac, new Set()); } macConnectionsMap.get(mac).add(connectionId); const stmtDevice = db.prepare("INSERT OR REPLACE INTO devices (mac, model, connected, last_seen) VALUES (?, ?, ?, ?)"); stmtDevice.run(mac, model, 1, new Date().toISOString()); stmtDevice.finalize(); // Log online event checkAndLogEvent(mac, 'system', 'online', 'boolean', true, connectionId); } } if (data.params && data.params.sys && data.params.sys.available_updates) { // Only notify if a stable firmware update is available const updates = data.params.sys.available_updates; const hasStableUpdate = updates.stable && updates.stable.version; if (hasStableUpdate) { console.log(`[ID: ${connectionId}] Firmware update available for ${deviceId}:`, JSON.stringify(data.params.sys.available_updates)); } } if (data.method === 'NotifyStatus') { if (data.params) { const mac = connectionDeviceMap.get(connectionId); if (mac) { for (const [key, value] of Object.entries(data.params)) { // Check for switch components if (key.startsWith('switch')) { if (typeof value.output !== 'undefined') { checkAndLogEvent(mac, key, 'on', 'boolean', value.output, connectionId); } } // Check for light components (analog) - log level 0-100 if (key.startsWith('light')) { // Calculate level: 0 if explicitly off, otherwise use brightness if (typeof value.output !== 'undefined' || typeof value.brightness !== 'undefined') { const isOff = value.output === false; const level = isOff ? 0 : (value.brightness !== undefined ? value.brightness : null); if (level !== null) { checkAndLogEvent(mac, key, 'level', 'range', level, connectionId); } } } } } } } if (data.method === 'NotifyEvent') { if (data.params && data.params.events) { const mac = connectionDeviceMap.get(connectionId); // Even if we don't have MAC from map yet (unlikely for identified device), we can try data.src or skip if (mac) { data.params.events.forEach(evt => { // Pass the button event (btn_down/up/etc) as values checkAndLogEvent(mac, evt.component, 'button', 'enum', evt.event, connectionId); }); } } } const logFile = path.join(LOG_DIR, `${deviceId}.log`); const timestamp = new Date().toISOString(); const prettyJson = JSON.stringify(data, null, 2); const logEntry = `[${timestamp}]\n${prettyJson}\n\n`; // Ensure log directory exists (in case it was deleted at runtime) if (!fs.existsSync(LOG_DIR)) { try { fs.mkdirSync(LOG_DIR, { recursive: true }); } catch (err) { console.error(`Error recreating log directory ${LOG_DIR}:`, err); } } fs.appendFile(logFile, logEntry, (err) => { if (err) { console.error(`Error writing to log file ${logFile}:`, err); } }); } catch (e) { console.error('Error parsing message:', e); // Ensure log directory exists before error logging if (!fs.existsSync(LOG_DIR)) { try { fs.mkdirSync(LOG_DIR, { recursive: true }); } catch (e) { } } // Log raw message to a generic error log or unknown fs.appendFile(path.join(LOG_DIR, 'parsing_errors.log'), `[${new Date().toISOString()}] ${msgString}\n`, () => { }); } }); ws.on('close', (code, reason) => { console.log(`Connection closed (ID: ${connectionId}). Code: ${code}, Reason: ${reason}`); const mac = connectionDeviceMap.get(connectionId); if (mac) { connectionDeviceMap.delete(connectionId); const connections = macConnectionsMap.get(mac); if (connections) { connections.delete(connectionId); if (connections.size === 0) { console.log(`All connections closed for MAC=${mac}. Marking as disconnected.`); db.run("UPDATE devices SET connected = 0 WHERE mac = ?", [mac]); // Log offline event checkAndLogEvent(mac, 'system', 'online', 'boolean', false, connectionId); macConnectionsMap.delete(mac); } else { console.log(`Connection closed for MAC=${mac}, but ${connections.size} other connection(s) remain active.`); } } else { // Fallback for safety, though technically shouldn't happen if logic is correct db.run("UPDATE devices SET connected = 0 WHERE mac = ?", [mac]); } } }); ws.on('error', (err) => { console.error(`WebSocket error: ${err}`); }); }); // Graceful shutdown function shutdown() { console.log('Shutting down server...'); wss.clients.forEach(ws => ws.terminate()); db.serialize(() => { db.run("UPDATE devices SET connected = 0", (err) => { if (err) { console.error('Error updating DB on shutdown:', err); } else { console.log('All devices marked as disconnected.'); } db.close(); process.exit(0); }); }); } process.on('SIGINT', shutdown); process.on('SIGTERM', shutdown);