This commit is contained in:
sebseb7
2025-12-25 03:58:07 +01:00
parent 489e07d4e1
commit 4610d80c4f
13 changed files with 276 additions and 1237 deletions

View File

@@ -4,6 +4,7 @@ const Database = require('better-sqlite3');
const { config } = require('dotenv');
const bcrypt = require('bcryptjs');
const jwt = require('jsonwebtoken');
const { WebSocketServer } = require('ws');
// Load env vars
config();
@@ -11,6 +12,7 @@ config();
// Database connection for Dev Server API
const dbPath = process.env.DB_PATH || path.resolve(__dirname, '../server/data/sensors.db');
const JWT_SECRET = process.env.JWT_SECRET || 'dev-secret-key-change-me';
const WS_PORT = process.env.WS_PORT || 3962;
let db;
try {
@@ -20,6 +22,204 @@ try {
console.error(`[UI Server] Failed to connect to database at ${dbPath}:`, err.message);
}
// =============================================
// WebSocket Server for Agents (port 3962)
// =============================================
// Track authenticated clients by devicePrefix
const agentClients = new Map(); // devicePrefix -> Set<ws>
function validateApiKey(apiKey) {
if (!db) return null;
try {
const stmt = db.prepare('SELECT * FROM api_keys WHERE key = ? AND enabled = 1');
return stmt.get(apiKey);
} catch (err) {
console.error('[WS] Error validating API key:', err.message);
return null;
}
}
function insertReadingsSmart(devicePrefix, readings) {
if (!db) throw new Error('Database not connected');
let inserted = 0;
let updated = 0;
const insertStmt = db.prepare(`
INSERT INTO sensor_events (timestamp, channel, device, value, data, data_type)
VALUES (?, ?, ?, ?, ?, ?)
`);
const updateUntilStmt = db.prepare(`
UPDATE sensor_events SET until = ? WHERE id = ?
`);
const getLastStmt = db.prepare(`
SELECT id, value, data FROM sensor_events
WHERE device = ? AND channel = ?
ORDER BY timestamp DESC LIMIT 1
`);
const now = new Date().toISOString();
for (const reading of readings) {
const device = `${devicePrefix}${reading.device}`;
const channel = reading.channel;
const value = reading.value ?? null;
const data = reading.data ? JSON.stringify(reading.data) : null;
const dataType = value !== null ? 'number' : 'json';
// Check last value for RLE
const last = getLastStmt.get(device, channel);
if (last) {
const lastValue = last.value;
const lastData = last.data;
// If same value, just update 'until'
if (value !== null && lastValue === value) {
updateUntilStmt.run(now, last.id);
updated++;
continue;
}
if (data !== null && lastData === data) {
updateUntilStmt.run(now, last.id);
updated++;
continue;
}
}
// Insert new reading
insertStmt.run(now, channel, device, value, data, dataType);
inserted++;
}
return { inserted, updated };
}
function createAgentWebSocketServer() {
const wss = new WebSocketServer({ port: WS_PORT });
wss.on('connection', (ws, req) => {
const clientId = `${req.socket.remoteAddress}:${req.socket.remotePort}`;
console.log(`[WS] Client connected: ${clientId}`);
const clientState = {
authenticated: false,
devicePrefix: null,
name: null
};
ws.on('message', (data) => {
try {
const message = JSON.parse(data.toString());
handleAgentMessage(ws, message, clientState, clientId);
} catch (err) {
console.error(`[WS] Error parsing message from ${clientId}:`, err.message);
ws.send(JSON.stringify({ type: 'error', error: 'Invalid JSON' }));
}
});
ws.on('close', () => {
console.log(`[WS] Client disconnected: ${clientId} (${clientState.name || 'unauthenticated'})`);
if (clientState.devicePrefix && agentClients.has(clientState.devicePrefix)) {
agentClients.get(clientState.devicePrefix).delete(ws);
}
});
ws.on('error', (err) => {
console.error(`[WS] Error for ${clientId}:`, err.message);
});
});
console.log(`[WS] WebSocket server listening on port ${WS_PORT}`);
return wss;
}
function handleAgentMessage(ws, message, clientState, clientId) {
const { type } = message;
switch (type) {
case 'auth':
const { apiKey } = message;
if (!apiKey) {
ws.send(JSON.stringify({ type: 'auth', success: false, error: 'Missing apiKey' }));
return;
}
const keyInfo = validateApiKey(apiKey);
if (!keyInfo) {
ws.send(JSON.stringify({ type: 'auth', success: false, error: 'Invalid API key' }));
return;
}
clientState.authenticated = true;
clientState.devicePrefix = keyInfo.device_prefix;
clientState.name = keyInfo.name;
// Track this connection
if (!agentClients.has(keyInfo.device_prefix)) {
agentClients.set(keyInfo.device_prefix, new Set());
}
agentClients.get(keyInfo.device_prefix).add(ws);
console.log(`[WS] Client authenticated: ${keyInfo.name} (prefix: ${keyInfo.device_prefix})`);
ws.send(JSON.stringify({ type: 'auth', success: true, devicePrefix: keyInfo.device_prefix, name: keyInfo.name }));
break;
case 'data':
if (!clientState.authenticated) {
ws.send(JSON.stringify({ type: 'error', error: 'Not authenticated' }));
return;
}
const { readings } = message;
if (!Array.isArray(readings) || readings.length === 0) {
ws.send(JSON.stringify({ type: 'error', error: 'Invalid readings' }));
return;
}
try {
const validReadings = readings.filter(r => r.device && r.channel && (r.value !== undefined || r.data !== undefined));
const result = insertReadingsSmart(clientState.devicePrefix, validReadings);
ws.send(JSON.stringify({ type: 'ack', count: result.inserted + result.updated }));
} catch (err) {
console.error('[WS] Error inserting readings:', err.message);
ws.send(JSON.stringify({ type: 'error', error: 'Failed to insert readings' }));
}
break;
default:
ws.send(JSON.stringify({ type: 'error', error: `Unknown message type: ${type}` }));
}
}
// Send command to all agents with the given device prefix
function sendCommandToDevicePrefix(devicePrefix, command) {
const clients = agentClients.get(devicePrefix);
if (!clients || clients.size === 0) {
console.log(`[WS] No connected agents for prefix: ${devicePrefix}`);
return false;
}
const message = JSON.stringify({ type: 'command', ...command });
let sent = 0;
for (const ws of clients) {
if (ws.readyState === 1) { // OPEN
ws.send(message);
sent++;
}
}
console.log(`[WS] Sent command to ${sent} agent(s) with prefix ${devicePrefix}:`, command);
return sent > 0;
}
// Start the WebSocket server
const agentWss = createAgentWebSocketServer();
module.exports = {
entry: './src/index.js',
output: {
@@ -456,7 +656,9 @@ module.exports = {
`);
const last = lastStmt.get(channel);
if (last && Math.abs(last.value - value) < Number.EPSILON) {
const valueChanged = !last || Math.abs(last.value - value) >= Number.EPSILON;
if (!valueChanged) {
// Same value - update the until timestamp (RLE)
const updateStmt = db.prepare('UPDATE output_events SET until = ? WHERE id = ?');
updateStmt.run(now, last.id);
@@ -468,6 +670,16 @@ module.exports = {
`);
insertStmt.run(now, channel, value);
console.log(`[RuleRunner] Output changed: ${channel} = ${value}`);
// Send command to bound physical device
const binding = OUTPUT_BINDINGS[channel];
if (binding) {
sendCommandToDevicePrefix(`${binding.device}:`, {
device: binding.channel,
action: 'set_state',
value: value > 0 ? 1 : 0
});
}
}
}