u
This commit is contained in:
@@ -32,8 +32,15 @@ const agentClients = new Map(); // devicePrefix -> Set<ws>
|
|||||||
function validateApiKey(apiKey) {
|
function validateApiKey(apiKey) {
|
||||||
if (!db) return null;
|
if (!db) return null;
|
||||||
try {
|
try {
|
||||||
const stmt = db.prepare('SELECT * FROM api_keys WHERE key = ? AND enabled = 1');
|
const stmt = db.prepare('SELECT id, name, device_prefix FROM api_keys WHERE key = ?');
|
||||||
return stmt.get(apiKey);
|
const result = stmt.get(apiKey);
|
||||||
|
|
||||||
|
if (result) {
|
||||||
|
// Update last_used_at timestamp
|
||||||
|
db.prepare("UPDATE api_keys SET last_used_at = datetime('now') WHERE id = ?").run(result.id);
|
||||||
|
}
|
||||||
|
|
||||||
|
return result || null;
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.error('[WS] Error validating API key:', err.message);
|
console.error('[WS] Error validating API key:', err.message);
|
||||||
return null;
|
return null;
|
||||||
@@ -43,59 +50,77 @@ function validateApiKey(apiKey) {
|
|||||||
function insertReadingsSmart(devicePrefix, readings) {
|
function insertReadingsSmart(devicePrefix, readings) {
|
||||||
if (!db) throw new Error('Database not connected');
|
if (!db) throw new Error('Database not connected');
|
||||||
|
|
||||||
let inserted = 0;
|
const isoTimestamp = new Date().toISOString();
|
||||||
let updated = 0;
|
|
||||||
|
|
||||||
const insertStmt = db.prepare(`
|
const stmtLast = db.prepare(`
|
||||||
INSERT INTO sensor_events (timestamp, channel, device, value, data, data_type)
|
SELECT id, value, data, data_type
|
||||||
VALUES (?, ?, ?, ?, ?, ?)
|
FROM sensor_events
|
||||||
|
WHERE device = ? AND channel = ?
|
||||||
|
ORDER BY timestamp DESC
|
||||||
|
LIMIT 1
|
||||||
`);
|
`);
|
||||||
|
|
||||||
const updateUntilStmt = db.prepare(`
|
const stmtUpdate = db.prepare(`
|
||||||
UPDATE sensor_events SET until = ? WHERE id = ?
|
UPDATE sensor_events SET until = ? WHERE id = ?
|
||||||
`);
|
`);
|
||||||
|
|
||||||
const getLastStmt = db.prepare(`
|
const stmtInsert = db.prepare(`
|
||||||
SELECT id, value, data FROM sensor_events
|
INSERT INTO sensor_events (timestamp, until, device, channel, value, data, data_type)
|
||||||
WHERE device = ? AND channel = ?
|
VALUES (?, NULL, ?, ?, ?, ?, ?)
|
||||||
ORDER BY timestamp DESC LIMIT 1
|
|
||||||
`);
|
`);
|
||||||
|
|
||||||
const now = new Date().toISOString();
|
const transaction = db.transaction((items) => {
|
||||||
|
let inserted = 0;
|
||||||
|
let updated = 0;
|
||||||
|
|
||||||
for (const reading of readings) {
|
for (const reading of items) {
|
||||||
const device = `${devicePrefix}${reading.device}`;
|
const fullDevice = `${devicePrefix}${reading.device}`;
|
||||||
const channel = reading.channel;
|
const channel = reading.channel;
|
||||||
const value = reading.value ?? null;
|
|
||||||
const data = reading.data ? JSON.stringify(reading.data) : null;
|
|
||||||
const dataType = value !== null ? 'number' : 'json';
|
|
||||||
|
|
||||||
// Check last value for RLE
|
// Determine type and values
|
||||||
const last = getLastStmt.get(device, channel);
|
let dataType = 'number';
|
||||||
|
let value = null;
|
||||||
|
let data = null;
|
||||||
|
|
||||||
if (last) {
|
if (reading.value !== undefined && reading.value !== null) {
|
||||||
const lastValue = last.value;
|
dataType = 'number';
|
||||||
const lastData = last.data;
|
value = reading.value;
|
||||||
|
} else if (reading.data !== undefined) {
|
||||||
|
dataType = 'json';
|
||||||
|
data = typeof reading.data === 'string' ? reading.data : JSON.stringify(reading.data);
|
||||||
|
} else {
|
||||||
|
continue; // Skip invalid
|
||||||
|
}
|
||||||
|
|
||||||
// If same value, just update 'until'
|
// Check last reading for RLE
|
||||||
if (value !== null && lastValue === value) {
|
const last = stmtLast.get(fullDevice, channel);
|
||||||
updateUntilStmt.run(now, last.id);
|
let isDuplicate = false;
|
||||||
|
|
||||||
|
if (last && last.data_type === dataType) {
|
||||||
|
if (dataType === 'number') {
|
||||||
|
if (Math.abs(last.value - value) < Number.EPSILON) {
|
||||||
|
isDuplicate = true;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Compare JSON strings
|
||||||
|
if (last.data === data) {
|
||||||
|
isDuplicate = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isDuplicate) {
|
||||||
|
stmtUpdate.run(isoTimestamp, last.id);
|
||||||
updated++;
|
updated++;
|
||||||
continue;
|
} else {
|
||||||
}
|
stmtInsert.run(isoTimestamp, fullDevice, channel, value, data, dataType);
|
||||||
if (data !== null && lastData === data) {
|
|
||||||
updateUntilStmt.run(now, last.id);
|
|
||||||
updated++;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Insert new reading
|
|
||||||
insertStmt.run(now, channel, device, value, data, dataType);
|
|
||||||
inserted++;
|
inserted++;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return { inserted, updated };
|
return { inserted, updated };
|
||||||
|
});
|
||||||
|
|
||||||
|
return transaction(readings);
|
||||||
}
|
}
|
||||||
|
|
||||||
function createAgentWebSocketServer() {
|
function createAgentWebSocketServer() {
|
||||||
@@ -108,9 +133,17 @@ function createAgentWebSocketServer() {
|
|||||||
const clientState = {
|
const clientState = {
|
||||||
authenticated: false,
|
authenticated: false,
|
||||||
devicePrefix: null,
|
devicePrefix: null,
|
||||||
name: null
|
name: null,
|
||||||
|
lastPong: Date.now()
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Set up ping/pong for keepalive
|
||||||
|
ws.isAlive = true;
|
||||||
|
ws.on('pong', () => {
|
||||||
|
ws.isAlive = true;
|
||||||
|
clientState.lastPong = Date.now();
|
||||||
|
});
|
||||||
|
|
||||||
ws.on('message', (data) => {
|
ws.on('message', (data) => {
|
||||||
try {
|
try {
|
||||||
const message = JSON.parse(data.toString());
|
const message = JSON.parse(data.toString());
|
||||||
@@ -133,6 +166,22 @@ function createAgentWebSocketServer() {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Ping interval to detect dead connections
|
||||||
|
const pingInterval = setInterval(() => {
|
||||||
|
wss.clients.forEach((ws) => {
|
||||||
|
if (ws.isAlive === false) {
|
||||||
|
console.log('[WS] Terminating unresponsive client');
|
||||||
|
return ws.terminate();
|
||||||
|
}
|
||||||
|
ws.isAlive = false;
|
||||||
|
ws.ping();
|
||||||
|
});
|
||||||
|
}, 30000);
|
||||||
|
|
||||||
|
wss.on('close', () => {
|
||||||
|
clearInterval(pingInterval);
|
||||||
|
});
|
||||||
|
|
||||||
console.log(`[WS] WebSocket server listening on port ${WS_PORT}`);
|
console.log(`[WS] WebSocket server listening on port ${WS_PORT}`);
|
||||||
return wss;
|
return wss;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user