import { WebSocketServer } from 'ws'; import fs from 'fs'; import path from 'path'; import sqlite3 from 'sqlite3'; import { fileURLToPath } from 'url'; import { initRuleEngine, loadRules, runRules, watchRules } from './rule_engine.js'; import { broadcastEvent, startStatusServer } from './status_server.js'; const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); const LOG_DIR = path.join(__dirname, 'logs'); // Ensure logs directory exists if (!fs.existsSync(LOG_DIR)) { fs.mkdirSync(LOG_DIR); } // Initialize SQLite database const db = new sqlite3.Database('devices.db'); db.serialize(() => { // Devices table db.run("CREATE TABLE IF NOT EXISTS devices (mac TEXT PRIMARY KEY, model TEXT, connected INTEGER, last_seen TEXT)"); // Reset all devices to disconnected on server start db.run("UPDATE devices SET connected = 0"); // Channels table - stores component/field/type per device db.run("CREATE TABLE IF NOT EXISTS channels (id INTEGER PRIMARY KEY AUTOINCREMENT, mac TEXT, component TEXT, field TEXT, type TEXT, FOREIGN KEY(mac) REFERENCES devices(mac), UNIQUE(mac, component, field))"); // Events table - references channels db.run("CREATE TABLE IF NOT EXISTS events (id INTEGER PRIMARY KEY AUTOINCREMENT, channel_id INTEGER, event TEXT, timestamp TEXT, FOREIGN KEY(channel_id) REFERENCES channels(id))"); // Insert offline events for all online channels on startup (to prevent deduplication issues after restart) db.run(`INSERT INTO events (channel_id, event, timestamp) SELECT c.id, 'false', datetime('now') FROM channels c WHERE c.field = 'online' AND EXISTS (SELECT 1 FROM events e WHERE e.channel_id = c.id AND e.event = 'true' AND e.id = (SELECT MAX(e2.id) FROM events e2 WHERE e2.channel_id = c.id))`); }); // Upstream Client Integration import { TischlerClient } from './tischler_client.js'; const UPSTREAM_URL = 'wss://dash.bosewolf.de/agentapi/'; const UPSTREAM_KEY = 'd2fba4ff8cd18b87735bef34088a5fe78e52049145bc336f30adf2da371ff431'; //const upstreamClient = new TischlerClient(UPSTREAM_URL, UPSTREAM_KEY); //upstreamClient.connect().catch(err => console.error('Failed to connect to upstream:', err)); // Map to track connection ID to device MAC const connectionDeviceMap = new Map(); // Map to track all active connection IDs for a given MAC const macConnectionsMap = new Map(); // Cache for channel IDs to avoid repeated DB lookups const channelCache = new Map(); // key: "mac:component:field" -> channel_id // Map MAC to WebSocket for sending RPC commands const macToSocket = new Map(); // Send RPC command to a device by MAC async function sendRPCToDevice(mac, method, params = {}) { const connections = macConnectionsMap.get(mac); if (!connections || connections.size === 0) { console.log(`[RPC] No connection for MAC=${mac}`); return null; } // Get the first connection ID for this MAC const connectionId = connections.values().next().value; // Find the socket for (const client of wss.clients) { if (client.connectionId === connectionId) { const rpcId = Math.floor(Math.random() * 10000); const rpcRequest = { id: rpcId, src: 'server', method, params }; console.log(`[RPC] Sending to ${mac}: ${method}`, params); client.send(JSON.stringify(rpcRequest)); return rpcId; } } console.log(`[RPC] Socket not found for MAC=${mac}`); return null; } // Get or create a channel, returns channel_id via callback function getOrCreateChannel(mac, component, field, type, callback) { const cacheKey = `${mac}:${component}:${field}`; // Check cache first if (channelCache.has(cacheKey)) { callback(null, channelCache.get(cacheKey)); return; } // Try to find existing channel db.get("SELECT id FROM channels WHERE mac = ? AND component = ? AND field = ?", [mac, component, field], (err, row) => { if (err) { callback(err, null); return; } if (row) { channelCache.set(cacheKey, row.id); callback(null, row.id); } else { // Insert new channel db.run("INSERT INTO channels (mac, component, field, type) VALUES (?, ?, ?, ?)", [mac, component, field, type], function (err) { if (err) { callback(err, null); return; } const channelId = this.lastID; channelCache.set(cacheKey, channelId); callback(null, channelId); }); } }); } // Helper to deduplicate stateful events // type: 'enum' (button events), 'range' (level 0-100), 'boolean' (on/off, online) function checkAndLogEvent(mac, component, field, type, event, connectionId = null) { // Function to forward event to upstream const forwardToUpstream = () => { const channel = `${component}_${field}`; // e.g. light:0_on, system_online //upstreamClient.sendReadings([{ // device: mac, // channel: channel, // value: event //}]); }; getOrCreateChannel(mac, component, field, type, (err, channelId) => { if (err) { console.error("Error getting/creating channel:", err); return; } if (field === 'button') { // Button events are always logged (no deduplication) if (connectionId) console.log(`[ID: ${connectionId}] Event logged: ${event} on ${component} (${field})`); db.run("INSERT INTO events (channel_id, event, timestamp) VALUES (?, ?, ?)", [channelId, String(event), new Date().toISOString()]); forwardToUpstream(); broadcastEvent(mac, component, field, type, event); runRules(mac, component, field, type, event); return; } // Deduplication: check last event for this channel db.get("SELECT event FROM events WHERE channel_id = ? ORDER BY id DESC LIMIT 1", [channelId], (err, row) => { if (err) { console.error("Error querying events for deduplication:", err); return; } const currentEventStr = String(event); if (!row || row.event !== currentEventStr) { if (connectionId) console.log(`[ID: ${connectionId}] Status change logged: ${event} on ${component} (${field})`); db.run("INSERT INTO events (channel_id, event, timestamp) VALUES (?, ?, ?)", [channelId, currentEventStr, new Date().toISOString()]); forwardToUpstream(); broadcastEvent(mac, component, field, type, currentEventStr); runRules(mac, component, field, type, currentEventStr); } }); }); } const wss = new WebSocketServer({ port: 8080 }); console.log('Shelly Agent Server listening on port 8080'); // Initialize and load rules initRuleEngine(db, sendRPCToDevice); loadRules().then(() => { console.log('Rule engine ready'); watchRules(); // Auto-reload rules when files change }).catch(err => { console.error('Error loading rules:', err); }); // Start status dashboard server startStatusServer(); // Global counter for connection IDs let connectionIdCounter = 0; const interval = setInterval(() => { wss.clients.forEach((ws) => { if (ws.isAlive === false) { console.log(`[ID: ${ws.connectionId}] Heartbeat missed. Terminating connection...`); return ws.terminate(); } ws.isAlive = false; ws.ping(); }); }, 30000); wss.on('close', () => { clearInterval(interval); }); wss.on('connection', (ws, req) => { ws.isAlive = true; ws.on('pong', () => { ws.isAlive = true; }); const ip = req.socket.remoteAddress; const connectionId = ++connectionIdCounter; ws.connectionId = connectionId; console.log(`New connection from ${ip} (ID: ${connectionId})`); ws.on('message', (message) => { const msgString = message.toString(); // console.log(`Received: ${msgString}`); try { const data = JSON.parse(msgString); // Try to identify the device from 'src' let deviceId = 'unknown_device'; if (data.src) { deviceId = data.src; } if (data.method === 'NotifyFullStatus') { console.log(`[ID: ${connectionId}] Device identified: ${deviceId}`); // Connect event (preliminary, we confirm MAC later but good to track 'device' online if we trust src) // Actually, let's wait until we have the confirmed MAC from RPC or use src if it looks like a MAC-based ID // Ideally we log 'online' when we insert into the devices table. if (data.params) { for (const [key, value] of Object.entries(data.params)) { const possibleMac = data.params.sys ? data.params.sys.mac : null; // Log initial output states if (key.startsWith('switch') && typeof value.output !== 'undefined') { const eventVal = value.output; // true/false if (possibleMac) { checkAndLogEvent(possibleMac, key, 'on', 'boolean', eventVal, connectionId); } } // Log initial level for analog light outputs (0 = off, 1-100 = brightness) if (key.startsWith('light') && typeof value.output !== 'undefined') { const level = value.output === false ? 0 : (value.brightness || 0); if (possibleMac) { checkAndLogEvent(possibleMac, key, 'level', 'range', level, connectionId); } } // Log initial input states (if state is boolean) if (key.startsWith('input') && typeof value.state === 'boolean') { const eventVal = value.state; // true/false if (possibleMac) { checkAndLogEvent(possibleMac, key, 'input', 'boolean', eventVal, connectionId); } } } } // Request device info to populate database const rpcRequest = { id: Math.floor(Math.random() * 10000), src: "server", method: "Shelly.GetDeviceInfo" }; ws.send(JSON.stringify(rpcRequest)); } // Handle properties for device info update if (data.result && (data.result.mac || (data.result.device_id && data.result.model))) { // Adjust based on actual Shelly RPC response structure. // Shelly.GetDeviceInfo usually returns { mac, model, gen, ... } or { name, id, mac, model, gen, ... } // The 'mac' might be in data.result.mac or data.result.ethernet.mac etc depending on firmware/generation. // Assuming flat structure or checking common fields. const mac = data.result.mac || data.result.id; // Fallback const model = data.result.model || data.result.app; // Fallback if (mac && model) { console.log(`[ID: ${connectionId}] Updating device info: MAC=${mac}, Model=${model}`); connectionDeviceMap.set(connectionId, mac); // Check for and drop old connections for this MAC if (macConnectionsMap.has(mac)) { const existingConnections = macConnectionsMap.get(mac); existingConnections.forEach(oldConnId => { if (oldConnId !== connectionId) { console.log(`[ID: ${connectionId}] New connection for MAC=${mac}. Dropping old connection ID: ${oldConnId}`); // Find the socket to terminate wss.clients.forEach(client => { if (client.connectionId === oldConnId) { client.terminate(); } }); // Cleanup maps will happen in 'close' handler of that socket } }); } if (!macConnectionsMap.has(mac)) { macConnectionsMap.set(mac, new Set()); } macConnectionsMap.get(mac).add(connectionId); const stmtDevice = db.prepare("INSERT OR REPLACE INTO devices (mac, model, connected, last_seen) VALUES (?, ?, ?, ?)"); stmtDevice.run(mac, model, 1, new Date().toISOString()); stmtDevice.finalize(); // Log online event checkAndLogEvent(mac, 'system', 'online', 'boolean', true, connectionId); } } if (data.params && data.params.sys && data.params.sys.available_updates) { // Only notify if a stable firmware update is available const updates = data.params.sys.available_updates; const hasStableUpdate = updates.stable && updates.stable.version; if (hasStableUpdate) { console.log(`[ID: ${connectionId}] Firmware update available for ${deviceId}:`, JSON.stringify(data.params.sys.available_updates)); } } if (data.method === 'NotifyStatus') { if (data.params) { const mac = connectionDeviceMap.get(connectionId); if (mac) { for (const [key, value] of Object.entries(data.params)) { // Check for switch components if (key.startsWith('switch')) { if (typeof value.output !== 'undefined') { checkAndLogEvent(mac, key, 'on', 'boolean', value.output, connectionId); } } // Check for light components (analog) - log level 0-100 if (key.startsWith('light')) { // Calculate level: 0 if explicitly off, otherwise use brightness if (typeof value.output !== 'undefined' || typeof value.brightness !== 'undefined') { const isOff = value.output === false; const level = isOff ? 0 : (value.brightness !== undefined ? value.brightness : null); if (level !== null) { checkAndLogEvent(mac, key, 'level', 'range', level, connectionId); } } } } } } } if (data.method === 'NotifyEvent') { if (data.params && data.params.events) { const mac = connectionDeviceMap.get(connectionId); // Known button event types to store const knownButtonEvents = ['single_push', 'double_push', 'triple_push', 'long_push', 'btn_down', 'btn_up']; if (mac) { data.params.events.forEach(evt => { // Only store known input button events if (evt.component.startsWith('input') && knownButtonEvents.includes(evt.event)) { checkAndLogEvent(mac, evt.component, 'button', 'enum', evt.event, connectionId); } else { console.log(`[ID: ${connectionId}] Skipped unknown event: ${evt.component} -> ${evt.event}`); } }); } } } const logFile = path.join(LOG_DIR, `${deviceId}.log`); const timestamp = new Date().toISOString(); const prettyJson = JSON.stringify(data, null, 2); const logEntry = `[${timestamp}]\n${prettyJson}\n\n`; // Ensure log directory exists (in case it was deleted at runtime) if (!fs.existsSync(LOG_DIR)) { try { fs.mkdirSync(LOG_DIR, { recursive: true }); } catch (err) { console.error(`Error recreating log directory ${LOG_DIR}:`, err); } } fs.appendFile(logFile, logEntry, (err) => { if (err) { console.error(`Error writing to log file ${logFile}:`, err); } }); } catch (e) { console.error('Error parsing message:', e); // Ensure log directory exists before error logging if (!fs.existsSync(LOG_DIR)) { try { fs.mkdirSync(LOG_DIR, { recursive: true }); } catch (e) { } } // Log raw message to a generic error log or unknown fs.appendFile(path.join(LOG_DIR, 'parsing_errors.log'), `[${new Date().toISOString()}] ${msgString}\n`, () => { }); } }); ws.on('close', (code, reason) => { console.log(`Connection closed (ID: ${connectionId}). Code: ${code}, Reason: ${reason}`); const mac = connectionDeviceMap.get(connectionId); if (mac) { connectionDeviceMap.delete(connectionId); const connections = macConnectionsMap.get(mac); if (connections) { connections.delete(connectionId); if (connections.size === 0) { console.log(`All connections closed for MAC=${mac}. Marking as disconnected.`); db.run("UPDATE devices SET connected = 0 WHERE mac = ?", [mac]); // Log offline event checkAndLogEvent(mac, 'system', 'online', 'boolean', false, connectionId); macConnectionsMap.delete(mac); } else { console.log(`Connection closed for MAC=${mac}, but ${connections.size} other connection(s) remain active.`); } } else { // Fallback for safety, though technically shouldn't happen if logic is correct db.run("UPDATE devices SET connected = 0 WHERE mac = ?", [mac]); } } }); ws.on('error', (err) => { console.error(`WebSocket error: ${err}`); }); }); // Graceful shutdown function shutdown() { console.log('Shutting down server...'); wss.clients.forEach(ws => ws.terminate()); db.serialize(() => { db.run("UPDATE devices SET connected = 0", (err) => { if (err) { console.error('Error updating DB on shutdown:', err); } else { console.log('All devices marked as disconnected.'); } db.close(); process.exit(0); }); }); } process.on('SIGINT', shutdown); process.on('SIGTERM', shutdown);