Initial commit: tischlerctrl home automation project
This commit is contained in:
62
server/src/cli/generate-key.js
Normal file
62
server/src/cli/generate-key.js
Normal file
@@ -0,0 +1,62 @@
|
||||
#!/usr/bin/env node
|
||||
|
||||
/**
|
||||
* CLI tool to generate API keys for agents
|
||||
* Usage: node generate-key.js <name> <device_prefix>
|
||||
* Example: node generate-key.js "ac-infinity-agent" "ac:"
|
||||
*/
|
||||
|
||||
import { fileURLToPath } from 'url';
|
||||
import { dirname, join } from 'path';
|
||||
import { initDatabase } from '../db/schema.js';
|
||||
import { generateApiKey, listApiKeys } from '../db/queries.js';
|
||||
|
||||
const __filename = fileURLToPath(import.meta.url);
|
||||
const __dirname = dirname(__filename);
|
||||
|
||||
const dbPath = process.env.DB_PATH || join(__dirname, '..', '..', 'data', 'sensors.db');
|
||||
|
||||
const args = process.argv.slice(2);
|
||||
|
||||
if (args.length === 0 || args[0] === '--list') {
|
||||
// List existing keys
|
||||
const db = initDatabase(dbPath);
|
||||
const keys = listApiKeys(db);
|
||||
|
||||
if (keys.length === 0) {
|
||||
console.log('No API keys found.');
|
||||
} else {
|
||||
console.log('\nExisting API keys:\n');
|
||||
console.log('ID | Name | Prefix | Preview | Last Used');
|
||||
console.log('-'.repeat(75));
|
||||
for (const key of keys) {
|
||||
const lastUsed = key.last_used_at || 'never';
|
||||
console.log(`${key.id.toString().padEnd(3)} | ${key.name.padEnd(20)} | ${key.device_prefix.padEnd(7)} | ${key.key_preview.padEnd(12)} | ${lastUsed}`);
|
||||
}
|
||||
}
|
||||
|
||||
console.log('\nUsage: node generate-key.js <name> <device_prefix>');
|
||||
console.log('Example: node generate-key.js "ac-infinity-agent" "ac:"');
|
||||
|
||||
db.close();
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
if (args.length < 2) {
|
||||
console.error('Error: Both name and device_prefix are required');
|
||||
console.error('Usage: node generate-key.js <name> <device_prefix>');
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const [name, devicePrefix] = args;
|
||||
|
||||
const db = initDatabase(dbPath);
|
||||
const key = generateApiKey(db, name, devicePrefix);
|
||||
|
||||
console.log('\n✓ API key generated successfully!\n');
|
||||
console.log(`Name: ${name}`);
|
||||
console.log(`Device Prefix: ${devicePrefix}`);
|
||||
console.log(`API Key: ${key}`);
|
||||
console.log('\n⚠ Save this key securely - it cannot be recovered!\n');
|
||||
|
||||
db.close();
|
||||
18
server/src/config.js
Normal file
18
server/src/config.js
Normal file
@@ -0,0 +1,18 @@
|
||||
import { config } from 'dotenv';
|
||||
import { fileURLToPath } from 'url';
|
||||
import { dirname, join } from 'path';
|
||||
|
||||
const __filename = fileURLToPath(import.meta.url);
|
||||
const __dirname = dirname(__filename);
|
||||
|
||||
// Load environment variables from .env file
|
||||
config({ path: join(__dirname, '..', '.env') });
|
||||
|
||||
export default {
|
||||
port: parseInt(process.env.PORT || '8080', 10),
|
||||
dbPath: process.env.DB_PATH || join(__dirname, '..', 'data', 'sensors.db'),
|
||||
|
||||
// Job intervals
|
||||
aggregationIntervalMs: parseInt(process.env.AGGREGATION_INTERVAL_MS || String(10 * 60 * 1000), 10),
|
||||
cleanupIntervalMs: parseInt(process.env.CLEANUP_INTERVAL_MS || String(60 * 60 * 1000), 10),
|
||||
};
|
||||
179
server/src/db/queries.js
Normal file
179
server/src/db/queries.js
Normal file
@@ -0,0 +1,179 @@
|
||||
import crypto from 'crypto';
|
||||
|
||||
/**
|
||||
* Database query functions for sensor data operations
|
||||
*/
|
||||
|
||||
/**
|
||||
* Validate an API key and return the associated metadata
|
||||
* @param {Database} db - SQLite database instance
|
||||
* @param {string} apiKey - The API key to validate
|
||||
* @returns {object|null} - API key metadata or null if invalid
|
||||
*/
|
||||
export function validateApiKey(db, apiKey) {
|
||||
const stmt = db.prepare(`
|
||||
SELECT id, name, device_prefix
|
||||
FROM api_keys
|
||||
WHERE key = ?
|
||||
`);
|
||||
const result = stmt.get(apiKey);
|
||||
|
||||
if (result) {
|
||||
// Update last_used_at timestamp
|
||||
db.prepare(`
|
||||
UPDATE api_keys SET last_used_at = datetime('now') WHERE id = ?
|
||||
`).run(result.id);
|
||||
}
|
||||
|
||||
return result || null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate a new API key
|
||||
* @param {Database} db - SQLite database instance
|
||||
* @param {string} name - Name/description for the API key
|
||||
* @param {string} devicePrefix - Prefix to prepend to device names (e.g., "ac:", "tapo:")
|
||||
* @returns {string} - The generated API key
|
||||
*/
|
||||
export function generateApiKey(db, name, devicePrefix) {
|
||||
const key = crypto.randomBytes(32).toString('hex');
|
||||
|
||||
db.prepare(`
|
||||
INSERT INTO api_keys (key, name, device_prefix)
|
||||
VALUES (?, ?, ?)
|
||||
`).run(key, name, devicePrefix);
|
||||
|
||||
return key;
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert sensor readings into the database
|
||||
* @param {Database} db - SQLite database instance
|
||||
* @param {string} devicePrefix - Prefix to prepend to device names
|
||||
* @param {Array} readings - Array of {device, channel, value} objects
|
||||
* @param {Date} timestamp - Timestamp for all readings (defaults to now)
|
||||
*/
|
||||
export function insertReadings(db, devicePrefix, readings, timestamp = new Date()) {
|
||||
const isoTimestamp = timestamp.toISOString();
|
||||
|
||||
const stmt = db.prepare(`
|
||||
INSERT INTO sensor_data (timestamp, device, channel, value)
|
||||
VALUES (?, ?, ?, ?)
|
||||
`);
|
||||
|
||||
const insertMany = db.transaction((items) => {
|
||||
for (const reading of items) {
|
||||
const fullDevice = `${devicePrefix}${reading.device}`;
|
||||
stmt.run(isoTimestamp, fullDevice, reading.channel, reading.value);
|
||||
}
|
||||
});
|
||||
|
||||
insertMany(readings);
|
||||
return readings.length;
|
||||
}
|
||||
|
||||
/**
|
||||
* Aggregate raw data into 10-minute buckets
|
||||
* @param {Database} db - SQLite database instance
|
||||
* @returns {number} - Number of aggregated records created
|
||||
*/
|
||||
export function aggregate10Minutes(db) {
|
||||
// Get the cutoff time (10 minutes ago, rounded down to 10-min boundary)
|
||||
const now = new Date();
|
||||
const cutoff = new Date(Math.floor(now.getTime() / 600000) * 600000 - 600000);
|
||||
const cutoffISO = cutoff.toISOString();
|
||||
|
||||
const result = db.prepare(`
|
||||
INSERT OR REPLACE INTO sensor_data_10m (timestamp, device, channel, value, sample_count)
|
||||
SELECT
|
||||
datetime(strftime('%s', timestamp) / 600 * 600, 'unixepoch') as bucket,
|
||||
device,
|
||||
channel,
|
||||
AVG(value) as avg_value,
|
||||
COUNT(*) as sample_count
|
||||
FROM sensor_data
|
||||
WHERE timestamp < ?
|
||||
AND timestamp >= datetime(?, '-1 hour')
|
||||
GROUP BY bucket, device, channel
|
||||
`).run(cutoffISO, cutoffISO);
|
||||
|
||||
return result.changes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Aggregate 10-minute data into 1-hour buckets
|
||||
* @param {Database} db - SQLite database instance
|
||||
* @returns {number} - Number of aggregated records created
|
||||
*/
|
||||
export function aggregate1Hour(db) {
|
||||
// Get the cutoff time (1 hour ago, rounded down to hour boundary)
|
||||
const now = new Date();
|
||||
const cutoff = new Date(Math.floor(now.getTime() / 3600000) * 3600000 - 3600000);
|
||||
const cutoffISO = cutoff.toISOString();
|
||||
|
||||
const result = db.prepare(`
|
||||
INSERT OR REPLACE INTO sensor_data_1h (timestamp, device, channel, value, sample_count)
|
||||
SELECT
|
||||
datetime(strftime('%s', timestamp) / 3600 * 3600, 'unixepoch') as bucket,
|
||||
device,
|
||||
channel,
|
||||
SUM(value * sample_count) / SUM(sample_count) as weighted_avg,
|
||||
SUM(sample_count) as total_samples
|
||||
FROM sensor_data_10m
|
||||
WHERE timestamp < ?
|
||||
AND timestamp >= datetime(?, '-1 day')
|
||||
GROUP BY bucket, device, channel
|
||||
`).run(cutoffISO, cutoffISO);
|
||||
|
||||
return result.changes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean up old data according to retention policy
|
||||
* @param {Database} db - SQLite database instance
|
||||
* @returns {object} - Number of deleted records per table
|
||||
*/
|
||||
export function cleanupOldData(db) {
|
||||
const now = new Date();
|
||||
|
||||
// Delete raw data older than 7 days
|
||||
const weekAgo = new Date(now.getTime() - 7 * 24 * 60 * 60 * 1000);
|
||||
const rawDeleted = db.prepare(`
|
||||
DELETE FROM sensor_data WHERE timestamp < ?
|
||||
`).run(weekAgo.toISOString());
|
||||
|
||||
// Delete 10-minute data older than 30 days
|
||||
const monthAgo = new Date(now.getTime() - 30 * 24 * 60 * 60 * 1000);
|
||||
const aggDeleted = db.prepare(`
|
||||
DELETE FROM sensor_data_10m WHERE timestamp < ?
|
||||
`).run(monthAgo.toISOString());
|
||||
|
||||
return {
|
||||
rawDeleted: rawDeleted.changes,
|
||||
aggregatedDeleted: aggDeleted.changes
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* List all API keys (without showing the actual key values)
|
||||
* @param {Database} db - SQLite database instance
|
||||
* @returns {Array} - List of API key metadata
|
||||
*/
|
||||
export function listApiKeys(db) {
|
||||
return db.prepare(`
|
||||
SELECT id, name, device_prefix, created_at, last_used_at,
|
||||
substr(key, 1, 8) || '...' as key_preview
|
||||
FROM api_keys
|
||||
ORDER BY created_at DESC
|
||||
`).all();
|
||||
}
|
||||
|
||||
export default {
|
||||
validateApiKey,
|
||||
generateApiKey,
|
||||
insertReadings,
|
||||
aggregate10Minutes,
|
||||
aggregate1Hour,
|
||||
cleanupOldData,
|
||||
listApiKeys
|
||||
};
|
||||
86
server/src/db/schema.js
Normal file
86
server/src/db/schema.js
Normal file
@@ -0,0 +1,86 @@
|
||||
import Database from 'better-sqlite3';
|
||||
import { fileURLToPath } from 'url';
|
||||
import { dirname, join } from 'path';
|
||||
import { existsSync, mkdirSync } from 'fs';
|
||||
|
||||
const __filename = fileURLToPath(import.meta.url);
|
||||
const __dirname = dirname(__filename);
|
||||
|
||||
/**
|
||||
* Initialize the SQLite database with all required tables
|
||||
* @param {string} dbPath - Path to the SQLite database file
|
||||
* @returns {Database} - The initialized database instance
|
||||
*/
|
||||
export function initDatabase(dbPath) {
|
||||
// Ensure data directory exists
|
||||
const dataDir = dirname(dbPath);
|
||||
if (!existsSync(dataDir)) {
|
||||
mkdirSync(dataDir, { recursive: true });
|
||||
}
|
||||
|
||||
const db = new Database(dbPath);
|
||||
|
||||
// Enable WAL mode for better concurrent performance
|
||||
db.pragma('journal_mode = WAL');
|
||||
|
||||
// Create tables
|
||||
db.exec(`
|
||||
-- API keys for agent authentication
|
||||
CREATE TABLE IF NOT EXISTS api_keys (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
key TEXT UNIQUE NOT NULL,
|
||||
name TEXT NOT NULL,
|
||||
device_prefix TEXT NOT NULL,
|
||||
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||
last_used_at DATETIME
|
||||
);
|
||||
|
||||
-- Raw sensor data (1-minute resolution, kept for 1 week)
|
||||
CREATE TABLE IF NOT EXISTS sensor_data (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
timestamp DATETIME NOT NULL,
|
||||
device TEXT NOT NULL,
|
||||
channel TEXT NOT NULL,
|
||||
value REAL NOT NULL
|
||||
);
|
||||
|
||||
-- Index for time-based queries and cleanup
|
||||
CREATE INDEX IF NOT EXISTS idx_sensor_data_time
|
||||
ON sensor_data(timestamp);
|
||||
|
||||
-- Index for device/channel queries
|
||||
CREATE INDEX IF NOT EXISTS idx_sensor_data_device
|
||||
ON sensor_data(device, channel, timestamp);
|
||||
|
||||
-- 10-minute aggregated data (kept for 1 month)
|
||||
CREATE TABLE IF NOT EXISTS sensor_data_10m (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
timestamp DATETIME NOT NULL,
|
||||
device TEXT NOT NULL,
|
||||
channel TEXT NOT NULL,
|
||||
value REAL NOT NULL,
|
||||
sample_count INTEGER NOT NULL
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS idx_sensor_data_10m_unique
|
||||
ON sensor_data_10m(timestamp, device, channel);
|
||||
|
||||
-- 1-hour aggregated data (kept forever)
|
||||
CREATE TABLE IF NOT EXISTS sensor_data_1h (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
timestamp DATETIME NOT NULL,
|
||||
device TEXT NOT NULL,
|
||||
channel TEXT NOT NULL,
|
||||
value REAL NOT NULL,
|
||||
sample_count INTEGER NOT NULL
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS idx_sensor_data_1h_unique
|
||||
ON sensor_data_1h(timestamp, device, channel);
|
||||
`);
|
||||
|
||||
console.log('[DB] Database initialized successfully');
|
||||
return db;
|
||||
}
|
||||
|
||||
export default { initDatabase };
|
||||
44
server/src/index.js
Normal file
44
server/src/index.js
Normal file
@@ -0,0 +1,44 @@
|
||||
import config from './config.js';
|
||||
import { initDatabase } from './db/schema.js';
|
||||
import { createWebSocketServer } from './websocket/server.js';
|
||||
import { startAggregationJob } from './jobs/aggregator.js';
|
||||
import { startCleanupJob } from './jobs/cleanup.js';
|
||||
|
||||
console.log('='.repeat(50));
|
||||
console.log('TischlerCtrl Sensor Server');
|
||||
console.log('='.repeat(50));
|
||||
|
||||
// Initialize database
|
||||
const db = initDatabase(config.dbPath);
|
||||
|
||||
// Start WebSocket server
|
||||
const wss = createWebSocketServer({
|
||||
port: config.port,
|
||||
db
|
||||
});
|
||||
|
||||
// Start background jobs
|
||||
const aggregationTimer = startAggregationJob(db, config.aggregationIntervalMs);
|
||||
const cleanupTimer = startCleanupJob(db, config.cleanupIntervalMs);
|
||||
|
||||
// Graceful shutdown
|
||||
function shutdown() {
|
||||
console.log('\n[Server] Shutting down...');
|
||||
|
||||
clearInterval(aggregationTimer);
|
||||
clearInterval(cleanupTimer);
|
||||
|
||||
wss.close(() => {
|
||||
db.close();
|
||||
console.log('[Server] Goodbye!');
|
||||
process.exit(0);
|
||||
});
|
||||
|
||||
// Force exit after 5 seconds
|
||||
setTimeout(() => process.exit(1), 5000);
|
||||
}
|
||||
|
||||
process.on('SIGINT', shutdown);
|
||||
process.on('SIGTERM', shutdown);
|
||||
|
||||
console.log('[Server] Ready to accept connections');
|
||||
42
server/src/jobs/aggregator.js
Normal file
42
server/src/jobs/aggregator.js
Normal file
@@ -0,0 +1,42 @@
|
||||
import { aggregate10Minutes, aggregate1Hour, cleanupOldData } from '../db/queries.js';
|
||||
|
||||
/**
|
||||
* Start the aggregation job that runs periodically
|
||||
* @param {Database} db - SQLite database instance
|
||||
* @param {number} intervalMs - Interval in milliseconds (default: 10 minutes)
|
||||
* @returns {NodeJS.Timer} - The interval timer
|
||||
*/
|
||||
export function startAggregationJob(db, intervalMs = 10 * 60 * 1000) {
|
||||
console.log(`[Aggregator] Starting aggregation job (interval: ${intervalMs / 1000}s)`);
|
||||
|
||||
// Run immediately on start
|
||||
runAggregation(db);
|
||||
|
||||
// Then run periodically
|
||||
return setInterval(() => runAggregation(db), intervalMs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Run the aggregation process
|
||||
*/
|
||||
function runAggregation(db) {
|
||||
try {
|
||||
const start = Date.now();
|
||||
|
||||
// Aggregate raw data to 10-minute buckets
|
||||
const count10m = aggregate10Minutes(db);
|
||||
|
||||
// Aggregate 10-minute data to 1-hour buckets
|
||||
const count1h = aggregate1Hour(db);
|
||||
|
||||
const elapsed = Date.now() - start;
|
||||
|
||||
if (count10m > 0 || count1h > 0) {
|
||||
console.log(`[Aggregator] Completed in ${elapsed}ms: ${count10m} 10m records, ${count1h} 1h records`);
|
||||
}
|
||||
} catch (err) {
|
||||
console.error('[Aggregator] Error during aggregation:', err.message);
|
||||
}
|
||||
}
|
||||
|
||||
export default { startAggregationJob };
|
||||
36
server/src/jobs/cleanup.js
Normal file
36
server/src/jobs/cleanup.js
Normal file
@@ -0,0 +1,36 @@
|
||||
import { cleanupOldData } from '../db/queries.js';
|
||||
|
||||
/**
|
||||
* Start the cleanup job that runs periodically
|
||||
* @param {Database} db - SQLite database instance
|
||||
* @param {number} intervalMs - Interval in milliseconds (default: 1 hour)
|
||||
* @returns {NodeJS.Timer} - The interval timer
|
||||
*/
|
||||
export function startCleanupJob(db, intervalMs = 60 * 60 * 1000) {
|
||||
console.log(`[Cleanup] Starting cleanup job (interval: ${intervalMs / 1000}s)`);
|
||||
|
||||
// Run after a delay on start (don't compete with aggregator)
|
||||
setTimeout(() => runCleanup(db), 5 * 60 * 1000);
|
||||
|
||||
// Then run periodically
|
||||
return setInterval(() => runCleanup(db), intervalMs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Run the cleanup process
|
||||
*/
|
||||
function runCleanup(db) {
|
||||
try {
|
||||
const start = Date.now();
|
||||
const result = cleanupOldData(db);
|
||||
const elapsed = Date.now() - start;
|
||||
|
||||
if (result.rawDeleted > 0 || result.aggregatedDeleted > 0) {
|
||||
console.log(`[Cleanup] Completed in ${elapsed}ms: deleted ${result.rawDeleted} raw, ${result.aggregatedDeleted} 10m records`);
|
||||
}
|
||||
} catch (err) {
|
||||
console.error('[Cleanup] Error during cleanup:', err.message);
|
||||
}
|
||||
}
|
||||
|
||||
export default { startCleanupJob };
|
||||
183
server/src/websocket/server.js
Normal file
183
server/src/websocket/server.js
Normal file
@@ -0,0 +1,183 @@
|
||||
import { WebSocketServer } from 'ws';
|
||||
import { validateApiKey, insertReadings } from '../db/queries.js';
|
||||
|
||||
/**
|
||||
* Create and configure the WebSocket server
|
||||
* @param {object} options - Server options
|
||||
* @param {number} options.port - Port to listen on
|
||||
* @param {Database} options.db - SQLite database instance
|
||||
* @returns {WebSocketServer} - The WebSocket server instance
|
||||
*/
|
||||
export function createWebSocketServer({ port, db }) {
|
||||
const wss = new WebSocketServer({ port });
|
||||
|
||||
// Track authenticated clients
|
||||
const clients = new Map();
|
||||
|
||||
wss.on('connection', (ws, req) => {
|
||||
const clientId = `${req.socket.remoteAddress}:${req.socket.remotePort}`;
|
||||
console.log(`[WS] Client connected: ${clientId}`);
|
||||
|
||||
// Client state
|
||||
const clientState = {
|
||||
authenticated: false,
|
||||
devicePrefix: null,
|
||||
name: null,
|
||||
lastPong: Date.now()
|
||||
};
|
||||
clients.set(ws, clientState);
|
||||
|
||||
// Set up ping/pong for keepalive
|
||||
ws.isAlive = true;
|
||||
ws.on('pong', () => {
|
||||
ws.isAlive = true;
|
||||
clientState.lastPong = Date.now();
|
||||
});
|
||||
|
||||
ws.on('message', (data) => {
|
||||
try {
|
||||
const message = JSON.parse(data.toString());
|
||||
handleMessage(ws, message, clientState, db);
|
||||
} catch (err) {
|
||||
console.error(`[WS] Error parsing message from ${clientId}:`, err.message);
|
||||
sendError(ws, 'Invalid JSON message');
|
||||
}
|
||||
});
|
||||
|
||||
ws.on('close', () => {
|
||||
console.log(`[WS] Client disconnected: ${clientId} (${clientState.name || 'unauthenticated'})`);
|
||||
clients.delete(ws);
|
||||
});
|
||||
|
||||
ws.on('error', (err) => {
|
||||
console.error(`[WS] Error for ${clientId}:`, err.message);
|
||||
});
|
||||
});
|
||||
|
||||
// Ping interval to detect dead connections
|
||||
const pingInterval = setInterval(() => {
|
||||
wss.clients.forEach((ws) => {
|
||||
if (ws.isAlive === false) {
|
||||
console.log('[WS] Terminating unresponsive client');
|
||||
return ws.terminate();
|
||||
}
|
||||
ws.isAlive = false;
|
||||
ws.ping();
|
||||
});
|
||||
}, 30000);
|
||||
|
||||
wss.on('close', () => {
|
||||
clearInterval(pingInterval);
|
||||
});
|
||||
|
||||
console.log(`[WS] WebSocket server listening on port ${port}`);
|
||||
return wss;
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle incoming WebSocket messages
|
||||
* @param {WebSocket} ws - The WebSocket connection
|
||||
* @param {object} message - Parsed message object
|
||||
* @param {object} clientState - Client state object
|
||||
* @param {Database} db - SQLite database instance
|
||||
*/
|
||||
function handleMessage(ws, message, clientState, db) {
|
||||
const { type } = message;
|
||||
|
||||
switch (type) {
|
||||
case 'auth':
|
||||
handleAuth(ws, message, clientState, db);
|
||||
break;
|
||||
|
||||
case 'data':
|
||||
handleData(ws, message, clientState, db);
|
||||
break;
|
||||
|
||||
case 'pong':
|
||||
// Client responded to our ping
|
||||
clientState.lastPong = Date.now();
|
||||
break;
|
||||
|
||||
default:
|
||||
sendError(ws, `Unknown message type: ${type}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle authentication request
|
||||
*/
|
||||
function handleAuth(ws, message, clientState, db) {
|
||||
const { apiKey } = message;
|
||||
|
||||
if (!apiKey) {
|
||||
return sendError(ws, 'Missing apiKey in auth message');
|
||||
}
|
||||
|
||||
const keyInfo = validateApiKey(db, apiKey);
|
||||
|
||||
if (!keyInfo) {
|
||||
send(ws, { type: 'auth', success: false, error: 'Invalid API key' });
|
||||
return;
|
||||
}
|
||||
|
||||
clientState.authenticated = true;
|
||||
clientState.devicePrefix = keyInfo.device_prefix;
|
||||
clientState.name = keyInfo.name;
|
||||
|
||||
console.log(`[WS] Client authenticated: ${keyInfo.name} (prefix: ${keyInfo.device_prefix})`);
|
||||
|
||||
send(ws, {
|
||||
type: 'auth',
|
||||
success: true,
|
||||
devicePrefix: keyInfo.device_prefix,
|
||||
name: keyInfo.name
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle data ingestion
|
||||
*/
|
||||
function handleData(ws, message, clientState, db) {
|
||||
if (!clientState.authenticated) {
|
||||
return sendError(ws, 'Not authenticated. Send auth message first.');
|
||||
}
|
||||
|
||||
const { readings } = message;
|
||||
|
||||
if (!Array.isArray(readings) || readings.length === 0) {
|
||||
return sendError(ws, 'Invalid readings: expected non-empty array');
|
||||
}
|
||||
|
||||
// Validate readings format
|
||||
for (const reading of readings) {
|
||||
if (!reading.device || !reading.channel || reading.value === undefined) {
|
||||
return sendError(ws, 'Invalid reading format: each reading must have device, channel, and value');
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
const count = insertReadings(db, clientState.devicePrefix, readings);
|
||||
send(ws, { type: 'ack', count });
|
||||
} catch (err) {
|
||||
console.error('[WS] Error inserting readings:', err.message);
|
||||
sendError(ws, 'Failed to insert readings');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a message to a WebSocket client
|
||||
*/
|
||||
function send(ws, message) {
|
||||
if (ws.readyState === 1) { // OPEN
|
||||
ws.send(JSON.stringify(message));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send an error message
|
||||
*/
|
||||
function sendError(ws, error) {
|
||||
send(ws, { type: 'error', error });
|
||||
}
|
||||
|
||||
export default { createWebSocketServer };
|
||||
Reference in New Issue
Block a user