Files
shellySrv/server.js
sebseb7 9e94edab90 u
2026-01-16 15:28:31 -05:00

400 lines
17 KiB
JavaScript

import { WebSocketServer } from 'ws';
import fs from 'fs';
import path from 'path';
import sqlite3 from 'sqlite3';
import { fileURLToPath } from 'url';
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
const LOG_DIR = path.join(__dirname, 'logs');
// Ensure logs directory exists
if (!fs.existsSync(LOG_DIR)) {
fs.mkdirSync(LOG_DIR);
}
// Initialize SQLite database
const db = new sqlite3.Database('devices.db');
db.serialize(() => {
// Devices table
db.run("CREATE TABLE IF NOT EXISTS devices (mac TEXT PRIMARY KEY, model TEXT, connected INTEGER, last_seen TEXT)");
// Reset all devices to disconnected on server start
db.run("UPDATE devices SET connected = 0");
// Channels table - stores component/field/type per device
db.run("CREATE TABLE IF NOT EXISTS channels (id INTEGER PRIMARY KEY AUTOINCREMENT, mac TEXT, component TEXT, field TEXT, type TEXT, FOREIGN KEY(mac) REFERENCES devices(mac), UNIQUE(mac, component, field))");
// Events table - references channels
db.run("CREATE TABLE IF NOT EXISTS events (id INTEGER PRIMARY KEY AUTOINCREMENT, channel_id INTEGER, event TEXT, timestamp TEXT, FOREIGN KEY(channel_id) REFERENCES channels(id))");
});
// Upstream Client Integration
import { TischlerClient } from './tischler_client.js';
const UPSTREAM_URL = 'wss://dash.bosewolf.de/agentapi/';
const UPSTREAM_KEY = 'd2fba4ff8cd18b87735bef34088a5fe78e52049145bc336f30adf2da371ff431';
//const upstreamClient = new TischlerClient(UPSTREAM_URL, UPSTREAM_KEY);
//upstreamClient.connect().catch(err => console.error('Failed to connect to upstream:', err));
// Map to track connection ID to device MAC
const connectionDeviceMap = new Map();
// Map to track all active connection IDs for a given MAC
const macConnectionsMap = new Map();
// Cache for channel IDs to avoid repeated DB lookups
const channelCache = new Map(); // key: "mac:component:field" -> channel_id
// Get or create a channel, returns channel_id via callback
function getOrCreateChannel(mac, component, field, type, callback) {
const cacheKey = `${mac}:${component}:${field}`;
// Check cache first
if (channelCache.has(cacheKey)) {
callback(null, channelCache.get(cacheKey));
return;
}
// Try to find existing channel
db.get("SELECT id FROM channels WHERE mac = ? AND component = ? AND field = ?", [mac, component, field], (err, row) => {
if (err) {
callback(err, null);
return;
}
if (row) {
channelCache.set(cacheKey, row.id);
callback(null, row.id);
} else {
// Insert new channel
db.run("INSERT INTO channels (mac, component, field, type) VALUES (?, ?, ?, ?)", [mac, component, field, type], function (err) {
if (err) {
callback(err, null);
return;
}
const channelId = this.lastID;
channelCache.set(cacheKey, channelId);
callback(null, channelId);
});
}
});
}
// Helper to deduplicate stateful events
// type: 'enum' (button events), 'range' (level 0-100), 'boolean' (on/off, online)
function checkAndLogEvent(mac, component, field, type, event, connectionId = null) {
// Function to forward event to upstream
const forwardToUpstream = () => {
const channel = `${component}_${field}`; // e.g. light:0_on, system_online
//upstreamClient.sendReadings([{
// device: mac,
// channel: channel,
// value: event
//}]);
};
getOrCreateChannel(mac, component, field, type, (err, channelId) => {
if (err) {
console.error("Error getting/creating channel:", err);
return;
}
if (field === 'button') {
// Button events are always logged (no deduplication)
if (connectionId) console.log(`[ID: ${connectionId}] Event logged: ${event} on ${component} (${field})`);
db.run("INSERT INTO events (channel_id, event, timestamp) VALUES (?, ?, ?)", [channelId, String(event), new Date().toISOString()]);
forwardToUpstream();
return;
}
// Deduplication: check last event for this channel
db.get("SELECT event FROM events WHERE channel_id = ? ORDER BY id DESC LIMIT 1", [channelId], (err, row) => {
if (err) {
console.error("Error querying events for deduplication:", err);
return;
}
const currentEventStr = String(event);
if (!row || row.event !== currentEventStr) {
if (connectionId) console.log(`[ID: ${connectionId}] Status change logged: ${event} on ${component} (${field})`);
db.run("INSERT INTO events (channel_id, event, timestamp) VALUES (?, ?, ?)", [channelId, currentEventStr, new Date().toISOString()]);
forwardToUpstream();
}
});
});
}
const wss = new WebSocketServer({ port: 8080 });
console.log('Shelly Agent Server listening on port 8080');
// Global counter for connection IDs
let connectionIdCounter = 0;
const interval = setInterval(() => {
wss.clients.forEach((ws) => {
if (ws.isAlive === false) {
console.log(`[ID: ${ws.connectionId}] Heartbeat missed. Terminating connection...`);
return ws.terminate();
}
ws.isAlive = false;
ws.ping();
});
}, 30000);
wss.on('close', () => {
clearInterval(interval);
});
wss.on('connection', (ws, req) => {
ws.isAlive = true;
ws.on('pong', () => { ws.isAlive = true; });
const ip = req.socket.remoteAddress;
const connectionId = ++connectionIdCounter;
ws.connectionId = connectionId;
console.log(`New connection from ${ip} (ID: ${connectionId})`);
ws.on('message', (message) => {
const msgString = message.toString();
// console.log(`Received: ${msgString}`);
try {
const data = JSON.parse(msgString);
// Try to identify the device from 'src'
let deviceId = 'unknown_device';
if (data.src) {
deviceId = data.src;
}
if (data.method === 'NotifyFullStatus') {
console.log(`[ID: ${connectionId}] Device identified: ${deviceId}`);
// Connect event (preliminary, we confirm MAC later but good to track 'device' online if we trust src)
// Actually, let's wait until we have the confirmed MAC from RPC or use src if it looks like a MAC-based ID
// Ideally we log 'online' when we insert into the devices table.
if (data.params) {
for (const [key, value] of Object.entries(data.params)) {
const possibleMac = data.params.sys ? data.params.sys.mac : null;
// Log initial output states
if (key.startsWith('switch') && typeof value.output !== 'undefined') {
const eventVal = value.output; // true/false
if (possibleMac) {
checkAndLogEvent(possibleMac, key, 'on', 'boolean', eventVal, connectionId);
}
}
// Log initial level for analog light outputs (0 = off, 1-100 = brightness)
if (key.startsWith('light') && typeof value.output !== 'undefined') {
const level = value.output === false ? 0 : (value.brightness || 0);
if (possibleMac) {
checkAndLogEvent(possibleMac, key, 'level', 'range', level, connectionId);
}
}
// Log initial input states (if state is boolean)
if (key.startsWith('input') && typeof value.state === 'boolean') {
const eventVal = value.state; // true/false
if (possibleMac) {
checkAndLogEvent(possibleMac, key, 'input', 'boolean', eventVal, connectionId);
}
}
}
}
// Request device info to populate database
const rpcRequest = {
id: Math.floor(Math.random() * 10000),
src: "server",
method: "Shelly.GetDeviceInfo"
};
ws.send(JSON.stringify(rpcRequest));
}
// Handle properties for device info update
if (data.result && (data.result.mac || (data.result.device_id && data.result.model))) {
// Adjust based on actual Shelly RPC response structure.
// Shelly.GetDeviceInfo usually returns { mac, model, gen, ... } or { name, id, mac, model, gen, ... }
// The 'mac' might be in data.result.mac or data.result.ethernet.mac etc depending on firmware/generation.
// Assuming flat structure or checking common fields.
const mac = data.result.mac || data.result.id; // Fallback
const model = data.result.model || data.result.app; // Fallback
if (mac && model) {
console.log(`[ID: ${connectionId}] Updating device info: MAC=${mac}, Model=${model}`);
connectionDeviceMap.set(connectionId, mac);
// Check for and drop old connections for this MAC
if (macConnectionsMap.has(mac)) {
const existingConnections = macConnectionsMap.get(mac);
existingConnections.forEach(oldConnId => {
if (oldConnId !== connectionId) {
console.log(`[ID: ${connectionId}] New connection for MAC=${mac}. Dropping old connection ID: ${oldConnId}`);
// Find the socket to terminate
wss.clients.forEach(client => {
if (client.connectionId === oldConnId) {
client.terminate();
}
});
// Cleanup maps will happen in 'close' handler of that socket
}
});
}
if (!macConnectionsMap.has(mac)) {
macConnectionsMap.set(mac, new Set());
}
macConnectionsMap.get(mac).add(connectionId);
const stmtDevice = db.prepare("INSERT OR REPLACE INTO devices (mac, model, connected, last_seen) VALUES (?, ?, ?, ?)");
stmtDevice.run(mac, model, 1, new Date().toISOString());
stmtDevice.finalize();
// Log online event
checkAndLogEvent(mac, 'system', 'online', 'boolean', true, connectionId);
}
}
if (data.params && data.params.sys && data.params.sys.available_updates) {
// Only notify if a stable firmware update is available
const updates = data.params.sys.available_updates;
const hasStableUpdate = updates.stable && updates.stable.version;
if (hasStableUpdate) {
console.log(`[ID: ${connectionId}] Firmware update available for ${deviceId}:`, JSON.stringify(data.params.sys.available_updates));
}
}
if (data.method === 'NotifyStatus') {
if (data.params) {
const mac = connectionDeviceMap.get(connectionId);
if (mac) {
for (const [key, value] of Object.entries(data.params)) {
// Check for switch components
if (key.startsWith('switch')) {
if (typeof value.output !== 'undefined') {
checkAndLogEvent(mac, key, 'on', 'boolean', value.output, connectionId);
}
}
// Check for light components (analog) - log level 0-100
if (key.startsWith('light')) {
// Calculate level: 0 if explicitly off, otherwise use brightness
if (typeof value.output !== 'undefined' || typeof value.brightness !== 'undefined') {
const isOff = value.output === false;
const level = isOff ? 0 : (value.brightness !== undefined ? value.brightness : null);
if (level !== null) {
checkAndLogEvent(mac, key, 'level', 'range', level, connectionId);
}
}
}
}
}
}
}
if (data.method === 'NotifyEvent') {
if (data.params && data.params.events) {
const mac = connectionDeviceMap.get(connectionId);
// Even if we don't have MAC from map yet (unlikely for identified device), we can try data.src or skip
if (mac) {
data.params.events.forEach(evt => {
// Pass the button event (btn_down/up/etc) as values
checkAndLogEvent(mac, evt.component, 'button', 'enum', evt.event, connectionId);
});
}
}
}
const logFile = path.join(LOG_DIR, `${deviceId}.log`);
const timestamp = new Date().toISOString();
const prettyJson = JSON.stringify(data, null, 2);
const logEntry = `[${timestamp}]\n${prettyJson}\n\n`;
// Ensure log directory exists (in case it was deleted at runtime)
if (!fs.existsSync(LOG_DIR)) {
try {
fs.mkdirSync(LOG_DIR, { recursive: true });
} catch (err) {
console.error(`Error recreating log directory ${LOG_DIR}:`, err);
}
}
fs.appendFile(logFile, logEntry, (err) => {
if (err) {
console.error(`Error writing to log file ${logFile}:`, err);
}
});
} catch (e) {
console.error('Error parsing message:', e);
// Ensure log directory exists before error logging
if (!fs.existsSync(LOG_DIR)) {
try { fs.mkdirSync(LOG_DIR, { recursive: true }); } catch (e) { }
}
// Log raw message to a generic error log or unknown
fs.appendFile(path.join(LOG_DIR, 'parsing_errors.log'), `[${new Date().toISOString()}] ${msgString}\n`, () => { });
}
});
ws.on('close', (code, reason) => {
console.log(`Connection closed (ID: ${connectionId}). Code: ${code}, Reason: ${reason}`);
const mac = connectionDeviceMap.get(connectionId);
if (mac) {
connectionDeviceMap.delete(connectionId);
const connections = macConnectionsMap.get(mac);
if (connections) {
connections.delete(connectionId);
if (connections.size === 0) {
console.log(`All connections closed for MAC=${mac}. Marking as disconnected.`);
db.run("UPDATE devices SET connected = 0 WHERE mac = ?", [mac]);
// Log offline event
checkAndLogEvent(mac, 'system', 'online', 'boolean', false, connectionId);
macConnectionsMap.delete(mac);
} else {
console.log(`Connection closed for MAC=${mac}, but ${connections.size} other connection(s) remain active.`);
}
} else {
// Fallback for safety, though technically shouldn't happen if logic is correct
db.run("UPDATE devices SET connected = 0 WHERE mac = ?", [mac]);
}
}
});
ws.on('error', (err) => {
console.error(`WebSocket error: ${err}`);
});
});
// Graceful shutdown
function shutdown() {
console.log('Shutting down server...');
wss.clients.forEach(ws => ws.terminate());
db.serialize(() => {
db.run("UPDATE devices SET connected = 0", (err) => {
if (err) {
console.error('Error updating DB on shutdown:', err);
} else {
console.log('All devices marked as disconnected.');
}
db.close();
process.exit(0);
});
});
}
process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);