commit f3cca149f9788d5bf32be93113f963578c7dc866 Author: sebseb7 Date: Mon Dec 22 23:32:55 2025 +0100 Initial commit: tischlerctrl home automation project diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..fd8ffc4 --- /dev/null +++ b/.gitignore @@ -0,0 +1,30 @@ +# Rust +**/target/ +**/*.rs.bk +Cargo.lock + +# Node.js +node_modules/ +npm-debug.log* + +# Environment files +.env +.env.local +.env.*.local + +# IDE +.idea/ +.vscode/ +*.swp +*.swo +*~ + +# OS +.DS_Store +Thumbs.db + +# Build artifacts +dist/ + +# Logs +*.log diff --git a/README.md b/README.md new file mode 100644 index 0000000..c8da157 --- /dev/null +++ b/README.md @@ -0,0 +1,139 @@ +# TischlerCtrl - Sensor Data Collection System + +A Node.js server that collects sensor data from multiple agents via WebSocket, stores it in SQLite with automatic data summarization and retention policies. + +## Architecture + +``` +┌─────────────────────────────────────────────────────────────┐ +│ Central Server (Node.js) │ +│ ┌─────────────┐ ┌──────────────┐ ┌──────────────────┐ │ +│ │ WebSocket │ │ SQLite DB │ │ Aggregation & │ │ +│ │ Server │──│ sensor_data │ │ Cleanup Jobs │ │ +│ │ :8080 │ │ sensor_10m │ │ (10m, 1h) │ │ +│ └─────────────┘ │ sensor_1h │ └──────────────────┘ │ +└────────┬─────────┴──────────────────────────────────────────┘ + │ + ┌────┴────┬──────────────┐ + │ │ │ +┌───▼───┐ ┌───▼───┐ ┌─────▼─────┐ +│ AC │ │ Tapo │ │ CLI │ +│Infinity│ │ Agent │ │ Agent │ +│ Agent │ │(Rust) │ │ (bash) │ +└───────┘ └───────┘ └───────────┘ +``` + +## Quick Start + +### 1. Start the Server + +```bash +cd server +cp .env.example .env +npm install +npm start +``` + +### 2. Generate API Keys + +```bash +cd server +node src/cli/generate-key.js "ac-infinity-agent" "ac:" +node src/cli/generate-key.js "tapo-agent" "tapo:" +node src/cli/generate-key.js "custom" "custom:" +``` + +### 3. Configure and Start AC Infinity Agent + +```bash +cd agents/ac-infinity +cp .env.example .env +# Edit .env with your AC Infinity credentials and API key +npm install +npm start +``` + +### 4. Build and Deploy Tapo Agent (Rust) + +```bash +cd agents/tapo +cp config.toml.example config.toml +# Edit config.toml with your Tapo devices and API key + +# Build for local machine +cargo build --release + +# Or cross-compile for Raspberry Pi (requires cross) +# cargo install cross +# cross build --release --target armv7-unknown-linux-gnueabihf + +# Run +./target/release/tapo-agent +# Or: RUST_LOG=info ./target/release/tapo-agent +``` + +### 5. Use CLI Agent + +```bash +# Install websocat (one-time) +cargo install websocat +# Or: sudo apt install websocat + +# Send data +export SENSOR_API_KEY="your-custom-api-key" +export SENSOR_SERVER="ws://localhost:8080" +./agents/cli/sensor-send mydevice temperature 24.5 +``` + +## Data Retention Policy + +| Resolution | Retention | Source | +|------------|-----------|--------| +| Raw (1 min) | 7 days | `sensor_data` | +| 10 minutes | 30 days | `sensor_data_10m` | +| 1 hour | Forever | `sensor_data_1h` | + +Data is averaged when aggregating to higher resolutions. + +## WebSocket Protocol + +### Authentication +```json +→ {"type": "auth", "apiKey": "your-api-key"} +← {"type": "auth", "success": true, "devicePrefix": "ac:"} +``` + +### Send Data +```json +→ {"type": "data", "readings": [ + {"device": "ctrl1", "channel": "temperature", "value": 24.5}, + {"device": "ctrl1", "channel": "humidity", "value": 65.0} + ]} +← {"type": "ack", "count": 2} +``` + +## Project Structure + +``` +tischlerctrl/ +├── server/ # Central data collection server +│ ├── src/ +│ │ ├── index.js # Entry point +│ │ ├── config.js # Configuration +│ │ ├── db/ # Database schema & queries +│ │ ├── websocket/ # WebSocket server +│ │ ├── jobs/ # Aggregation & cleanup jobs +│ │ └── cli/ # CLI tools (generate-key) +│ └── data/ # SQLite database files +│ +├── agents/ +│ ├── ac-infinity/ # Node.js AC Infinity agent +│ ├── tapo/ # Rust Tapo smart plug agent +│ └── cli/ # Bash CLI tool +│ +└── README.md +``` + +## License + +MIT diff --git a/agents/ac-infinity/.env.example b/agents/ac-infinity/.env.example new file mode 100644 index 0000000..49c4d2e --- /dev/null +++ b/agents/ac-infinity/.env.example @@ -0,0 +1,12 @@ +# AC Infinity Agent Environment Configuration + +# WebSocket server connection +SERVER_URL=ws://localhost:8080 +API_KEY=your-api-key-here + +# AC Infinity credentials +AC_EMAIL=your@email.com +AC_PASSWORD=your-password + +# Polling interval in milliseconds (default: 60000 = 1 minute) +POLL_INTERVAL_MS=60000 diff --git a/agents/ac-infinity/package-lock.json b/agents/ac-infinity/package-lock.json new file mode 100644 index 0000000..d85d0cd --- /dev/null +++ b/agents/ac-infinity/package-lock.json @@ -0,0 +1,52 @@ +{ + "name": "ac-infinity-agent", + "version": "1.0.0", + "lockfileVersion": 3, + "requires": true, + "packages": { + "": { + "name": "ac-infinity-agent", + "version": "1.0.0", + "dependencies": { + "dotenv": "^16.4.7", + "ws": "^8.18.0" + }, + "engines": { + "node": ">=18.0.0" + } + }, + "node_modules/dotenv": { + "version": "16.6.1", + "resolved": "https://registry.npmjs.org/dotenv/-/dotenv-16.6.1.tgz", + "integrity": "sha512-uBq4egWHTcTt33a72vpSG0z3HnPuIl6NqYcTrKEg2azoEyl2hpW0zqlxysq2pK9HlDIHyHyakeYaYnSAwd8bow==", + "license": "BSD-2-Clause", + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://dotenvx.com" + } + }, + "node_modules/ws": { + "version": "8.18.3", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.18.3.tgz", + "integrity": "sha512-PEIGCY5tSlUt50cqyMXfCzX+oOPqN0vuGqWzbcJ2xvnkzkq46oOpz7dQaTDBdfICb4N14+GARUDw2XV2N4tvzg==", + "license": "MIT", + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": ">=5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + } + } +} diff --git a/agents/ac-infinity/package.json b/agents/ac-infinity/package.json new file mode 100644 index 0000000..de757e0 --- /dev/null +++ b/agents/ac-infinity/package.json @@ -0,0 +1,17 @@ +{ + "name": "ac-infinity-agent", + "version": "1.0.0", + "description": "AC Infinity sensor data collection agent", + "type": "module", + "main": "src/index.js", + "scripts": { + "start": "node src/index.js" + }, + "dependencies": { + "dotenv": "^16.4.7", + "ws": "^8.18.0" + }, + "engines": { + "node": ">=18.0.0" + } +} \ No newline at end of file diff --git a/agents/ac-infinity/src/ac-client.js b/agents/ac-infinity/src/ac-client.js new file mode 100644 index 0000000..dff01db --- /dev/null +++ b/agents/ac-infinity/src/ac-client.js @@ -0,0 +1,208 @@ +/** + * AC Infinity API Client + * Ported from TypeScript homebridge-acinfinity plugin + */ + +const API_URL_LOGIN = '/api/user/appUserLogin'; +const API_URL_GET_DEVICE_INFO_LIST_ALL = '/api/user/devInfoListAll'; + +export class ACInfinityClientError extends Error { + constructor(message) { + super(message); + this.name = 'ACInfinityClientError'; + } +} + +export class ACInfinityClientCannotConnect extends ACInfinityClientError { + constructor() { + super('Cannot connect to AC Infinity API'); + } +} + +export class ACInfinityClientInvalidAuth extends ACInfinityClientError { + constructor() { + super('Invalid authentication credentials'); + } +} + +export class ACInfinityClient { + constructor(host, email, password) { + this.host = host; + this.email = email; + this.password = password; + this.userId = null; + } + + async login() { + try { + // AC Infinity API does not accept passwords greater than 25 characters + const normalizedPassword = this.password.substring(0, 25); + + const response = await fetch(`${this.host}${API_URL_LOGIN}`, { + method: 'POST', + headers: { + 'User-Agent': 'ACController/1.9.7 (com.acinfinity.humiture; build:533; iOS 18.5.0) Alamofire/5.10.2', + 'Content-Type': 'application/x-www-form-urlencoded; charset=utf-8', + }, + body: new URLSearchParams({ + appEmail: this.email, + appPasswordl: normalizedPassword, // Note: intentional typo in API + }), + }); + + const data = await response.json(); + + if (data.code !== 200) { + if (data.code === 10001) { + throw new ACInfinityClientInvalidAuth(); + } + throw new ACInfinityClientError(`Login failed: ${JSON.stringify(data)}`); + } + + this.userId = data.data.appId; + console.log('[AC] Successfully logged in to AC Infinity API'); + return this.userId; + } catch (error) { + if (error instanceof ACInfinityClientError) { + throw error; + } + throw new ACInfinityClientCannotConnect(); + } + } + + isLoggedIn() { + return this.userId !== null; + } + + getAuthHeaders() { + if (!this.userId) { + throw new ACInfinityClientError('Client is not logged in'); + } + return { + token: this.userId, + phoneType: '1', + appVersion: '1.9.7', + }; + } + + async getDevicesListAll() { + if (!this.isLoggedIn()) { + throw new ACInfinityClientError('AC Infinity client is not logged in'); + } + + try { + const response = await fetch(`${this.host}${API_URL_GET_DEVICE_INFO_LIST_ALL}`, { + method: 'POST', + headers: { + 'User-Agent': 'ACController/1.9.7 (com.acinfinity.humiture; build:533; iOS 18.5.0) Alamofire/5.10.2', + 'Content-Type': 'application/x-www-form-urlencoded; charset=utf-8', + ...this.getAuthHeaders(), + }, + body: new URLSearchParams({ + userId: this.userId, + }), + }); + + const data = await response.json(); + + if (data.code !== 200) { + throw new ACInfinityClientError(`Request failed: ${JSON.stringify(data)}`); + } + + return data.data || []; + } catch (error) { + if (error instanceof ACInfinityClientError) { + throw error; + } + throw new ACInfinityClientCannotConnect(); + } + } + + /** + * Extract sensor readings from device list + * @returns {Array} Array of {device, channel, value} objects + */ + async getSensorReadings() { + const devices = await this.getDevicesListAll(); + const readings = []; + + for (const device of devices) { + const devId = device.devId; + const devName = device.devName || `device-${devId}`; + + // Normalize device name for use as identifier + const deviceId = devName + .toLowerCase() + .replace(/[^a-z0-9]+/g, '-') + .replace(/^-|-$/g, ''); + + // Extract sensor data from device settings or sensor fields + // Temperature is stored as Celsius * 100 + if (device.devSettings?.temperature !== undefined) { + readings.push({ + device: deviceId, + channel: 'temperature', + value: device.devSettings.temperature / 100, + }); + } else if (device.temperature !== undefined) { + readings.push({ + device: deviceId, + channel: 'temperature', + value: device.temperature / 100, + }); + } + + // Humidity is stored as % * 100 + if (device.devSettings?.humidity !== undefined) { + readings.push({ + device: deviceId, + channel: 'humidity', + value: device.devSettings.humidity / 100, + }); + } else if (device.humidity !== undefined) { + readings.push({ + device: deviceId, + channel: 'humidity', + value: device.humidity / 100, + }); + } + + // VPD if available + if (device.devSettings?.vpdnums !== undefined) { + readings.push({ + device: deviceId, + channel: 'vpd', + value: device.devSettings.vpdnums / 100, + }); + } + + // Check for port-level sensors (some controllers have multiple ports) + if (device.devPortList && Array.isArray(device.devPortList)) { + for (const port of device.devPortList) { + const portId = port.portId || port.port; + const portDeviceId = `${deviceId}-port${portId}`; + + if (port.temperature !== undefined) { + readings.push({ + device: portDeviceId, + channel: 'temperature', + value: port.temperature / 100, + }); + } + + if (port.humidity !== undefined) { + readings.push({ + device: portDeviceId, + channel: 'humidity', + value: port.humidity / 100, + }); + } + } + } + } + + return readings; + } +} + +export default ACInfinityClient; diff --git a/agents/ac-infinity/src/config.js b/agents/ac-infinity/src/config.js new file mode 100644 index 0000000..6e7c4ec --- /dev/null +++ b/agents/ac-infinity/src/config.js @@ -0,0 +1,25 @@ +import { config } from 'dotenv'; +import { fileURLToPath } from 'url'; +import { dirname, join } from 'path'; + +const __filename = fileURLToPath(import.meta.url); +const __dirname = dirname(__filename); + +// Load environment variables +config({ path: join(__dirname, '..', '.env') }); + +export default { + // WebSocket server connection + serverUrl: process.env.SERVER_URL || 'ws://localhost:8080', + apiKey: process.env.API_KEY || '', + + // AC Infinity credentials + acEmail: process.env.AC_EMAIL || '', + acPassword: process.env.AC_PASSWORD || '', + + // Polling interval (default: 60 seconds) + pollIntervalMs: parseInt(process.env.POLL_INTERVAL_MS || '60000', 10), + + // AC Infinity API + acApiHost: process.env.AC_API_HOST || 'https://www.acinfinity.com', +}; diff --git a/agents/ac-infinity/src/index.js b/agents/ac-infinity/src/index.js new file mode 100644 index 0000000..b998f92 --- /dev/null +++ b/agents/ac-infinity/src/index.js @@ -0,0 +1,89 @@ +import config from './config.js'; +import ACInfinityClient from './ac-client.js'; +import WSClient from './ws-client.js'; + +console.log('='.repeat(50)); +console.log('AC Infinity Agent'); +console.log('='.repeat(50)); + +// Validate configuration +if (!config.apiKey) { + console.error('Error: API_KEY is required'); + process.exit(1); +} +if (!config.acEmail || !config.acPassword) { + console.error('Error: AC_EMAIL and AC_PASSWORD are required'); + process.exit(1); +} + +// Initialize clients +const acClient = new ACInfinityClient( + config.acApiHost, + config.acEmail, + config.acPassword +); + +const wsClient = new WSClient(config.serverUrl, config.apiKey); + +// Polling function +async function pollSensors() { + try { + const readings = await acClient.getSensorReadings(); + + if (readings.length > 0) { + console.log(`[Poll] Sending ${readings.length} readings`); + wsClient.sendReadings(readings); + } else { + console.log('[Poll] No readings available'); + } + } catch (err) { + console.error('[Poll] Error:', err.message); + + // Re-login if authentication failed + if (err.message.includes('not logged in')) { + console.log('[Poll] Attempting re-login...'); + try { + await acClient.login(); + } catch (loginErr) { + console.error('[Poll] Re-login failed:', loginErr.message); + } + } + } +} + +// Main function +async function main() { + try { + // Login to AC Infinity + await acClient.login(); + + // Connect to WebSocket server + await wsClient.connect(); + + // Start polling + console.log(`[Main] Starting polling every ${config.pollIntervalMs / 1000}s`); + + // Poll immediately + await pollSensors(); + + // Then poll at interval + setInterval(pollSensors, config.pollIntervalMs); + + } catch (err) { + console.error('[Main] Fatal error:', err.message); + process.exit(1); + } +} + +// Graceful shutdown +function shutdown() { + console.log('\n[Agent] Shutting down...'); + wsClient.close(); + process.exit(0); +} + +process.on('SIGINT', shutdown); +process.on('SIGTERM', shutdown); + +// Start +main(); diff --git a/agents/ac-infinity/src/ws-client.js b/agents/ac-infinity/src/ws-client.js new file mode 100644 index 0000000..c71d344 --- /dev/null +++ b/agents/ac-infinity/src/ws-client.js @@ -0,0 +1,194 @@ +import WebSocket from 'ws'; + +/** + * WebSocket client with auto-reconnect and authentication + */ +export class WSClient { + constructor(url, apiKey, options = {}) { + this.url = url; + this.apiKey = apiKey; + this.options = { + reconnectBaseMs: options.reconnectBaseMs || 1000, + reconnectMaxMs: options.reconnectMaxMs || 60000, + pingIntervalMs: options.pingIntervalMs || 30000, + ...options, + }; + + this.ws = null; + this.authenticated = false; + this.devicePrefix = null; + this.reconnectAttempts = 0; + this.reconnectTimer = null; + this.pingTimer = null; + this.messageQueue = []; + this.onReadyCallback = null; + } + + /** + * Connect to the WebSocket server + * @returns {Promise} Resolves when authenticated + */ + connect() { + return new Promise((resolve, reject) => { + this.onReadyCallback = resolve; + this._connect(); + }); + } + + _connect() { + console.log(`[WS] Connecting to ${this.url}...`); + + this.ws = new WebSocket(this.url); + + this.ws.on('open', () => { + console.log('[WS] Connected, authenticating...'); + this.reconnectAttempts = 0; + + // Send authentication + this._send({ type: 'auth', apiKey: this.apiKey }); + }); + + this.ws.on('message', (data) => { + try { + const message = JSON.parse(data.toString()); + this._handleMessage(message); + } catch (err) { + console.error('[WS] Error parsing message:', err.message); + } + }); + + this.ws.on('ping', () => { + this.ws.pong(); + }); + + this.ws.on('close', (code, reason) => { + console.log(`[WS] Connection closed: ${code} ${reason}`); + this._cleanup(); + this._scheduleReconnect(); + }); + + this.ws.on('error', (err) => { + console.error('[WS] Error:', err.message); + }); + } + + _handleMessage(message) { + switch (message.type) { + case 'auth': + if (message.success) { + console.log(`[WS] Authenticated as ${message.name}`); + this.authenticated = true; + this.devicePrefix = message.devicePrefix; + + // Start ping timer + this._startPingTimer(); + + // Flush queued messages + this._flushQueue(); + + // Resolve connect promise + if (this.onReadyCallback) { + this.onReadyCallback(); + this.onReadyCallback = null; + } + } else { + console.error('[WS] Authentication failed:', message.error); + } + break; + + case 'ack': + // Data acknowledged + break; + + case 'error': + console.error('[WS] Server error:', message.error); + break; + + default: + console.log('[WS] Unknown message type:', message.type); + } + } + + _startPingTimer() { + this._stopPingTimer(); + this.pingTimer = setInterval(() => { + if (this.ws && this.ws.readyState === WebSocket.OPEN) { + this._send({ type: 'pong' }); + } + }, this.options.pingIntervalMs); + } + + _stopPingTimer() { + if (this.pingTimer) { + clearInterval(this.pingTimer); + this.pingTimer = null; + } + } + + _cleanup() { + this._stopPingTimer(); + this.authenticated = false; + } + + _scheduleReconnect() { + if (this.reconnectTimer) return; + + const delay = Math.min( + this.options.reconnectBaseMs * Math.pow(2, this.reconnectAttempts), + this.options.reconnectMaxMs + ); + + console.log(`[WS] Reconnecting in ${delay}ms...`); + this.reconnectAttempts++; + + this.reconnectTimer = setTimeout(() => { + this.reconnectTimer = null; + this._connect(); + }, delay); + } + + _send(message) { + if (this.ws && this.ws.readyState === WebSocket.OPEN) { + this.ws.send(JSON.stringify(message)); + } + } + + _flushQueue() { + while (this.messageQueue.length > 0) { + const message = this.messageQueue.shift(); + this._send(message); + } + } + + /** + * Send sensor readings to the server + * @param {Array} readings - Array of {device, channel, value} objects + */ + sendReadings(readings) { + const message = { type: 'data', readings }; + + if (this.authenticated) { + this._send(message); + } else { + // Queue for later + this.messageQueue.push(message); + } + } + + /** + * Close the connection + */ + close() { + if (this.reconnectTimer) { + clearTimeout(this.reconnectTimer); + this.reconnectTimer = null; + } + this._cleanup(); + if (this.ws) { + this.ws.close(); + this.ws = null; + } + } +} + +export default WSClient; diff --git a/agents/cli/sensor-send b/agents/cli/sensor-send new file mode 100755 index 0000000..36bc9d7 --- /dev/null +++ b/agents/cli/sensor-send @@ -0,0 +1,119 @@ +#!/bin/bash +# +# sensor-send - CLI tool to send sensor data to TischlerCtrl server +# +# Usage: +# sensor-send +# +# Environment variables: +# SENSOR_API_KEY - API key for authentication (required) +# SENSOR_SERVER - WebSocket server URL (default: ws://localhost:8080) +# +# Examples: +# sensor-send growbox temperature 24.5 +# sensor-send pump-1 pressure 1.2 +# +# Dependencies: +# - websocat (install via: cargo install websocat) +# + +set -e + +# Configuration +API_KEY="${SENSOR_API_KEY:-}" +SERVER="${SENSOR_SERVER:-ws://localhost:8080}" + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +NC='\033[0m' # No Color + +# Usage function +usage() { + echo "Usage: sensor-send " + echo "" + echo "Environment variables:" + echo " SENSOR_API_KEY - API key for authentication (required)" + echo " SENSOR_SERVER - WebSocket server URL (default: ws://localhost:8080)" + echo "" + echo "Examples:" + echo " sensor-send growbox temperature 24.5" + echo " sensor-send pump-1 pressure 1.2" + exit 1 +} + +# Check for websocat +if ! command -v websocat &> /dev/null; then + echo -e "${RED}Error: websocat is not installed${NC}" + echo "Install via: cargo install websocat" + echo "Or: apt install websocat (on some systems)" + exit 1 +fi + +# Check arguments +if [ $# -lt 3 ]; then + usage +fi + +DEVICE="$1" +CHANNEL="$2" +VALUE="$3" + +# Validate value is a number +if ! [[ "$VALUE" =~ ^-?[0-9]*\.?[0-9]+$ ]]; then + echo -e "${RED}Error: value must be a number${NC}" + exit 1 +fi + +# Check API key +if [ -z "$API_KEY" ]; then + echo -e "${RED}Error: SENSOR_API_KEY environment variable is required${NC}" + exit 1 +fi + +# Create JSON messages +AUTH_MSG=$(cat <&1) || { + echo -e "${RED}Error: Failed to connect to server${NC}" + exit 1 +} + +# Check response +if echo "$RESPONSE" | grep -q '"success":true'; then + if echo "$RESPONSE" | grep -q '"type":"ack"'; then + echo -e "${GREEN}✓ Data sent successfully${NC}" + echo " Device: $DEVICE" + echo " Channel: $CHANNEL" + echo " Value: $VALUE" + exit 0 + fi +fi + +# Parse error if present +if echo "$RESPONSE" | grep -q '"type":"error"'; then + ERROR=$(echo "$RESPONSE" | grep -o '"error":"[^"]*"' | cut -d'"' -f4) + echo -e "${RED}Error: $ERROR${NC}" + exit 1 +fi + +# Auth failed +if echo "$RESPONSE" | grep -q '"success":false'; then + echo -e "${RED}Error: Authentication failed${NC}" + exit 1 +fi + +# Unknown response +echo -e "${RED}Error: Unexpected response from server${NC}" +echo "$RESPONSE" +exit 1 diff --git a/agents/tapo/Cargo.toml b/agents/tapo/Cargo.toml new file mode 100644 index 0000000..27f7a79 --- /dev/null +++ b/agents/tapo/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "tapo-agent" +version = "1.0.0" +edition = "2021" +description = "Tapo smart plug sensor data collection agent" + +[dependencies] +tapo = "0.8" +tokio = { version = "1", features = ["full"] } +tokio-tungstenite = { version = "0.24", default-features = false, features = ["connect", "rustls-tls-native-roots"] } +futures-util = "0.3" +serde = { version = "1", features = ["derive"] } +serde_json = "1" +toml = "0.8" +log = "0.4" +env_logger = "0.11" +clap = { version = "4", features = ["derive"] } +# Add reqwest with rustls to override tapo's default +reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] } + +[profile.release] +lto = true +codegen-units = 1 +strip = true diff --git a/agents/tapo/Cross.toml b/agents/tapo/Cross.toml new file mode 100644 index 0000000..50fe74d --- /dev/null +++ b/agents/tapo/Cross.toml @@ -0,0 +1,10 @@ +[build.env] +passthrough = [ + "RUST_BACKTRACE", +] + +[target.armv7-unknown-linux-gnueabihf] +image = "ghcr.io/cross-rs/armv7-unknown-linux-gnueabihf:main" + +[target.aarch64-unknown-linux-gnu] +image = "ghcr.io/cross-rs/aarch64-unknown-linux-gnu:main" diff --git a/agents/tapo/build-all.sh b/agents/tapo/build-all.sh new file mode 100755 index 0000000..f93eb18 --- /dev/null +++ b/agents/tapo/build-all.sh @@ -0,0 +1,148 @@ +#!/bin/bash +# +# Build Tapo agent for various Raspberry Pi targets +# +# Targets: +# - Pi 2, Pi 3, Pi 4 (32-bit): armv7-unknown-linux-gnueabihf +# - Pi 3, Pi 4 (64-bit): aarch64-unknown-linux-gnu +# +# Usage: ./build-all.sh +# + +set -e + +# Colors +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' + +echo "==========================================" +echo "Tapo Agent Cross-Compilation Build" +echo "==========================================" +echo "" + +# ============================================ +# Prerequisites Check +# ============================================ + +MISSING_DEPS=0 + +echo -e "${BLUE}Checking prerequisites...${NC}" +echo "" + +# Check for Rust/Cargo +if ! command -v cargo &> /dev/null; then + echo -e "${RED}✗ Rust/Cargo not found${NC}" + echo " Install with:" + echo -e " ${YELLOW}curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh${NC}" + echo " source \$HOME/.cargo/env" + echo "" + MISSING_DEPS=1 +else + RUST_VERSION=$(rustc --version | cut -d' ' -f2) + echo -e "${GREEN}✓ Rust/Cargo installed${NC} (v$RUST_VERSION)" +fi + +# Check for Docker +if ! command -v docker &> /dev/null; then + echo -e "${RED}✗ Docker not found${NC}" + echo " Install with:" + echo -e " ${YELLOW}sudo apt update && sudo apt install -y docker.io${NC}" + echo -e " ${YELLOW}sudo usermod -aG docker \$USER${NC}" + echo " (log out and back in after adding to docker group)" + echo "" + MISSING_DEPS=1 +else + DOCKER_VERSION=$(docker --version | cut -d' ' -f3 | tr -d ',') + echo -e "${GREEN}✓ Docker installed${NC} (v$DOCKER_VERSION)" + + # Check if Docker daemon is running + if ! docker info &> /dev/null; then + echo -e "${RED}✗ Docker daemon not running or no permission${NC}" + echo " Try:" + echo -e " ${YELLOW}sudo systemctl start docker${NC}" + echo " Or if permission denied:" + echo -e " ${YELLOW}sudo usermod -aG docker \$USER${NC}" + echo " (log out and back in)" + echo "" + MISSING_DEPS=1 + else + echo -e "${GREEN}✓ Docker daemon running${NC}" + fi +fi + +# Check for cross +if ! command -v cross &> /dev/null; then + echo -e "${YELLOW}! cross not found - will install automatically${NC}" + NEED_CROSS=1 +else + CROSS_VERSION=$(cross --version 2>/dev/null | head -1 | cut -d' ' -f2 || echo "unknown") + echo -e "${GREEN}✓ cross installed${NC} (v$CROSS_VERSION)" + NEED_CROSS=0 +fi + +echo "" + +# Exit if missing dependencies +if [ $MISSING_DEPS -eq 1 ]; then + echo -e "${RED}Please install missing dependencies and try again.${NC}" + exit 1 +fi + +# Install cross if needed +if [ "${NEED_CROSS:-0}" -eq 1 ]; then + echo -e "${YELLOW}Installing 'cross' for cross-compilation...${NC}" + cargo install cross --git https://github.com/cross-rs/cross + echo "" +fi + +# ============================================ +# Build +# ============================================ + +# Create output directory +mkdir -p dist + +# Define targets +declare -A TARGETS=( + ["armv7-unknown-linux-gnueabihf"]="pi2_pi3_pi4_32bit" + ["aarch64-unknown-linux-gnu"]="pi3_pi4_64bit" +) + +echo -e "${BLUE}Starting builds...${NC}" +echo "" + +for target in "${!TARGETS[@]}"; do + name="${TARGETS[$target]}" + echo -e "${GREEN}Building for $target ($name)...${NC}" + + cross build --release --target "$target" + + # Copy binary to dist folder with descriptive name + cp "target/$target/release/tapo-agent" "dist/tapo-agent-$name" + + # Get binary size + size=$(du -h "dist/tapo-agent-$name" | cut -f1) + echo -e " → ${GREEN}dist/tapo-agent-$name${NC} ($size)" + echo "" +done + +echo "==========================================" +echo -e "${GREEN}Build complete!${NC} Binaries in dist/" +echo "==========================================" +ls -lh dist/ + +echo "" +echo "To deploy to Raspberry Pi:" +echo -e " ${YELLOW}scp dist/tapo-agent-pi3_pi4_64bit pi@raspberrypi:~/tapo-agent${NC}" +echo -e " ${YELLOW}ssh pi@raspberrypi 'chmod +x ~/tapo-agent && ./tapo-agent'${NC}" + +echo "" +echo -e "${BLUE}Upload to bashupload.com for web console deploy (3 days, 1 download):${NC}" +echo -e " ${YELLOW}curl https://bashupload.com -F=@dist/tapo-agent-pi3_pi4_64bit${NC}" +echo -e " ${YELLOW}curl https://bashupload.com -F=@dist/tapo-agent-pi2_pi3_pi4_32bit${NC}" +echo "" +echo "Then on Pi, download and run:" +echo -e " ${YELLOW}curl -sSL https://bashupload.com/XXXXX -o tapo-agent && chmod +x tapo-agent${NC}" diff --git a/agents/tapo/config.toml.example b/agents/tapo/config.toml.example new file mode 100644 index 0000000..23c9c73 --- /dev/null +++ b/agents/tapo/config.toml.example @@ -0,0 +1,22 @@ +# Tapo Agent Configuration Example + +server_url = "ws://192.168.1.100:8080" +api_key = "your-api-key-here" +poll_interval_secs = 60 + +# Define your Tapo devices below +# Each device needs: ip, name, type (P100 or P110), tapo_email, tapo_password + +[[devices]] +ip = "192.168.1.50" +name = "grow-light-plug" +type = "P110" +tapo_email = "your@email.com" +tapo_password = "your-tapo-password" + +[[devices]] +ip = "192.168.1.51" +name = "fan-plug" +type = "P100" +tapo_email = "your@email.com" +tapo_password = "your-tapo-password" diff --git a/agents/tapo/src/main.rs b/agents/tapo/src/main.rs new file mode 100644 index 0000000..69448f0 --- /dev/null +++ b/agents/tapo/src/main.rs @@ -0,0 +1,383 @@ +use clap::{Parser, Subcommand}; +use futures_util::{SinkExt, StreamExt}; +use log::{error, info, warn}; +use serde::{Deserialize, Serialize}; +use std::time::Duration; +use tapo::{ApiClient, DiscoveryResult}; +use tokio::time::{interval, sleep}; +use tokio_tungstenite::{connect_async, tungstenite::Message}; + +#[derive(Parser)] +#[command(name = "tapo-agent")] +#[command(about = "Tapo smart plug sensor data collection agent")] +struct Cli { + #[command(subcommand)] + command: Option, + + /// Path to config file + #[arg(short, long, default_value = "config.toml")] + config: String, +} + +#[derive(Subcommand)] +enum Commands { + /// Initialize configuration file by discovering devices + Init { + /// Server WebSocket URL + #[arg(long)] + server: String, + + /// API key for authentication + #[arg(long)] + key: String, + + /// Tapo account email + #[arg(long)] + email: String, + + /// Tapo account password + #[arg(long)] + password: String, + + /// Broadcast address for discovery (default: 192.168.1.255) + #[arg(long, default_value = "192.168.1.255")] + broadcast: String, + + /// Discovery timeout in seconds + #[arg(long, default_value = "10")] + timeout: u64, + + /// Output config file path + #[arg(short, long, default_value = "config.toml")] + output: String, + }, + /// Run the agent (default if no subcommand) + Run, +} + +#[derive(Debug, Deserialize, Serialize)] +struct Config { + server_url: String, + api_key: String, + poll_interval_secs: u64, + devices: Vec, +} + +#[derive(Debug, Deserialize, Serialize, Clone)] +struct DeviceConfig { + ip: String, + name: String, + #[serde(rename = "type")] + device_type: String, + tapo_email: String, + tapo_password: String, +} + +#[derive(Debug, Serialize)] +struct AuthMessage { + #[serde(rename = "type")] + msg_type: String, + #[serde(rename = "apiKey")] + api_key: String, +} + +#[derive(Debug, Serialize)] +struct DataMessage { + #[serde(rename = "type")] + msg_type: String, + readings: Vec, +} + +#[derive(Debug, Serialize, Clone)] +struct Reading { + device: String, + channel: String, + value: f64, +} + +#[derive(Debug, Deserialize)] +struct ServerResponse { + #[serde(rename = "type")] + msg_type: String, + success: Option, + error: Option, +} + +async fn discover_and_create_config( + server: String, + key: String, + email: String, + password: String, + broadcast: String, + timeout: u64, + output: String, +) -> Result<(), Box> { + println!("Discovering Tapo devices on {} ({}s timeout)...", broadcast, timeout); + + let api_client = ApiClient::new(&email, &password); + let mut discovery = api_client.discover_devices(&broadcast, timeout).await?; + + let mut devices = Vec::new(); + + while let Some(discovery_result) = discovery.next().await { + if let Ok(device) = discovery_result { + match device { + DiscoveryResult::Plug { device_info, .. } => { + println!( + " Found Plug: {} ({}) at {}", + device_info.nickname, device_info.model, device_info.ip + ); + devices.push(DeviceConfig { + ip: device_info.ip, + name: device_info.nickname.replace(" ", "-").to_lowercase(), + device_type: "P100".to_string(), + tapo_email: email.clone(), + tapo_password: password.clone(), + }); + } + DiscoveryResult::PlugEnergyMonitoring { device_info, .. } => { + println!( + " Found Energy Plug: {} ({}) at {}", + device_info.nickname, device_info.model, device_info.ip + ); + devices.push(DeviceConfig { + ip: device_info.ip, + name: device_info.nickname.replace(" ", "-").to_lowercase(), + device_type: "P110".to_string(), + tapo_email: email.clone(), + tapo_password: password.clone(), + }); + } + DiscoveryResult::GenericDevice { device_info, .. } => { + println!( + " Found Unknown Device: {:?} ({}) at {} - skipping", + device_info.nickname, device_info.model, device_info.ip + ); + } + _ => { + // Light bulbs and other devices - skip for now + } + } + } + } + + if devices.is_empty() { + return Err("No plugs discovered. Check your broadcast address and ensure devices are on the same network.".into()); + } + + println!("\nDiscovered {} plug(s)", devices.len()); + + let config = Config { + server_url: server, + api_key: key, + poll_interval_secs: 60, + devices, + }; + + let toml_str = toml::to_string_pretty(&config)?; + std::fs::write(&output, &toml_str)?; + + println!("✓ Config written to: {}", output); + println!("\nRun the agent with: RUST_LOG=info ./tapo-agent"); + + Ok(()) +} + +async fn collect_device_data(device: &DeviceConfig) -> Vec { + let mut readings = Vec::new(); + let client = ApiClient::new(&device.tapo_email, &device.tapo_password); + + match device.device_type.as_str() { + "P110" => { + match client.p110(&device.ip).await { + Ok(plug) => { + if let Ok(info) = plug.get_device_info().await { + readings.push(Reading { + device: device.name.clone(), + channel: "state".to_string(), + value: if info.device_on { 1.0 } else { 0.0 }, + }); + } + + if let Ok(energy) = plug.get_current_power().await { + readings.push(Reading { + device: device.name.clone(), + channel: "power".to_string(), + value: energy.current_power as f64 / 1000.0, + }); + } + + if let Ok(usage) = plug.get_energy_usage().await { + readings.push(Reading { + device: device.name.clone(), + channel: "energy_today".to_string(), + value: usage.today_energy as f64, + }); + } + } + Err(e) => error!("Failed to connect to P110 {}: {}", device.name, e), + } + } + "P100" | "P105" => { + match client.p100(&device.ip).await { + Ok(plug) => { + if let Ok(info) = plug.get_device_info().await { + readings.push(Reading { + device: device.name.clone(), + channel: "state".to_string(), + value: if info.device_on { 1.0 } else { 0.0 }, + }); + } + } + Err(e) => error!("Failed to connect to P100 {}: {}", device.name, e), + } + } + _ => { + warn!("Unknown device type: {}", device.device_type); + } + } + + readings +} + +async fn run_agent(config: Config) -> Result<(), Box> { + let mut reconnect_delay = Duration::from_secs(1); + let max_reconnect_delay = Duration::from_secs(60); + + loop { + info!("Connecting to {}...", config.server_url); + + match connect_async(&config.server_url).await { + Ok((ws_stream, _)) => { + info!("Connected to server"); + reconnect_delay = Duration::from_secs(1); + + let (mut write, mut read) = ws_stream.split(); + + let auth = AuthMessage { + msg_type: "auth".to_string(), + api_key: config.api_key.clone(), + }; + let auth_json = serde_json::to_string(&auth)?; + write.send(Message::Text(auth_json)).await?; + + let authenticated = if let Some(Ok(msg)) = read.next().await { + if let Message::Text(text) = msg { + let response: ServerResponse = serde_json::from_str(&text)?; + if response.msg_type == "auth" && response.success == Some(true) { + info!("Authenticated successfully"); + true + } else { + error!("Authentication failed: {:?}", response.error); + false + } + } else { + false + } + } else { + false + }; + + if !authenticated { + sleep(reconnect_delay).await; + continue; + } + + let mut poll_interval = interval(Duration::from_secs(config.poll_interval_secs)); + + loop { + poll_interval.tick().await; + + let mut all_readings = Vec::new(); + for device in &config.devices { + let readings = collect_device_data(device).await; + all_readings.extend(readings); + } + + if !all_readings.is_empty() { + info!("Sending {} readings", all_readings.len()); + let data = DataMessage { + msg_type: "data".to_string(), + readings: all_readings, + }; + let data_json = serde_json::to_string(&data)?; + + if let Err(e) = write.send(Message::Text(data_json)).await { + error!("Failed to send data: {}", e); + break; + } + } + + while let Ok(Some(msg)) = tokio::time::timeout( + Duration::from_millis(100), + read.next(), + ) + .await + { + match msg { + Ok(Message::Ping(data)) => { + let _ = write.send(Message::Pong(data)).await; + } + Ok(Message::Close(_)) => { + info!("Server closed connection"); + break; + } + Err(e) => { + error!("WebSocket error: {}", e); + break; + } + _ => {} + } + } + } + } + Err(e) => { + error!("Connection failed: {}", e); + } + } + + warn!("Reconnecting in {:?}...", reconnect_delay); + sleep(reconnect_delay).await; + reconnect_delay = std::cmp::min(reconnect_delay * 2, max_reconnect_delay); + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + env_logger::init(); + + let cli = Cli::parse(); + + match cli.command { + Some(Commands::Init { + server, + key, + email, + password, + broadcast, + timeout, + output, + }) => { + discover_and_create_config(server, key, email, password, broadcast, timeout, output).await?; + } + Some(Commands::Run) | None => { + let config_path = &cli.config; + + let config_content = std::fs::read_to_string(config_path).map_err(|e| { + format!( + "Failed to read config file {}: {}\n\nCreate config with device discovery:\n ./tapo-agent init --server ws://SERVER:8080 --key YOUR_KEY --email tapo@email.com --password tapopass\n\nOr specify broadcast address:\n ./tapo-agent init --server ws://SERVER:8080 --key YOUR_KEY --email tapo@email.com --password tapopass --broadcast 192.168.0.255", + config_path, e + ) + })?; + + let config: Config = toml::from_str(&config_content) + .map_err(|e| format!("Failed to parse config: {}", e))?; + + info!("Tapo Agent starting with {} devices", config.devices.len()); + + run_agent(config).await?; + } + } + + Ok(()) +} diff --git a/implementation_plan.md b/implementation_plan.md new file mode 100644 index 0000000..baa9749 --- /dev/null +++ b/implementation_plan.md @@ -0,0 +1,335 @@ +# Sensor Data Collection System + +A Node.js server that collects sensor data from multiple agents via WebSocket, stores it in SQLite with automatic data summarization and retention policies. + +## Architecture Overview + +```mermaid +graph TB + subgraph "Central Server (Node.js)" + WS[WebSocket Server :8080] + DB[(SQLite Database)] + AGG[Aggregation Job] + WS --> DB + AGG --> DB + end + + subgraph "AC Infinity Agent (Node.js)" + AC[AC Infinity Client] + AC -->|polls every 60s| ACAPI[AC Infinity Cloud API] + AC -->|WebSocket| WS + end + + subgraph "Tapo Agent (Rust)" + TAPO[Tapo Client] + TAPO -->|polls every 60s| PLUG[Tapo P100/P110] + TAPO -->|WebSocket| WS + end + + subgraph "Custom CLI Agent" + CLI[Shell Script] + CLI -->|WebSocket| WS + end +``` + +--- + +## User Review Required + +> [!IMPORTANT] +> **Tapo Agent Language Choice**: I recommend **Rust** for the Tapo agent because: +> - Compiles to a single ~2MB static binary +> - Uses ~5-10MB RAM at runtime +> - Excellent [tapo crate](https://crates.io/crates/tapo) already exists +> - Easy cross-compilation for Raspberry Pi +> +> Alternatively, I could write it in **Go** (would need to implement protocol from scratch) or as a **Node.js** agent (but you mentioned wanting it lightweight). + +> [!IMPORTANT] +> **AC Infinity Credentials**: The AC Infinity API requires email/password authentication to their cloud service. These will need to be stored in configuration. + +--- + +## Project Structure + +``` +tischlerctrl/ +├── server/ +│ ├── package.json +│ ├── src/ +│ │ ├── index.js # Entry point +│ │ ├── config.js # Configuration loader +│ │ ├── db/ +│ │ │ ├── schema.js # SQLite schema + migrations +│ │ │ └── queries.js # Database operations +│ │ ├── websocket/ +│ │ │ ├── server.js # WebSocket server +│ │ │ └── handlers.js # Message handlers +│ │ └── jobs/ +│ │ ├── aggregator.js # Data summarization job +│ │ └── cleanup.js # Data retention cleanup +│ └── data/ +│ └── sensors.db # SQLite database file +│ +├── agents/ +│ ├── ac-infinity/ +│ │ ├── package.json +│ │ └── src/ +│ │ ├── index.js # Entry point +│ │ ├── config.js # Configuration +│ │ ├── ac-client.js # AC Infinity API client +│ │ └── ws-client.js # WebSocket client with reconnect +│ │ +│ ├── tapo/ +│ │ ├── Cargo.toml +│ │ └── src/ +│ │ └── main.rs # Rust Tapo agent +│ │ +│ └── cli/ +│ └── sensor-send # Shell script CLI tool +│ +├── .env.example # Example environment variables +└── README.md +``` + +--- + +## Proposed Changes + +### Server - Database Schema + +#### [NEW] [schema.js](file:///home/seb/src/tischlerctrl/server/src/db/schema.js) + +SQLite tables: + +```sql +-- API keys for agent authentication +CREATE TABLE api_keys ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + key TEXT UNIQUE NOT NULL, + name TEXT NOT NULL, -- e.g., "ac-infinity-agent" + device_prefix TEXT NOT NULL, -- e.g., "ac:" or "tapo:" + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + last_used_at DATETIME +); + +-- Raw sensor data (1-minute resolution, kept for 1 week) +CREATE TABLE sensor_data ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp DATETIME NOT NULL, + device TEXT NOT NULL, -- e.g., "ac:controller-69-grow" + channel TEXT NOT NULL, -- e.g., "temperature", "humidity", "power" + value REAL NOT NULL, + INDEX idx_sensor_data_time (timestamp), + INDEX idx_sensor_data_device (device, channel) +); + +-- 10-minute aggregated data (kept for 1 month) +CREATE TABLE sensor_data_10m ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp DATETIME NOT NULL, -- Rounded to 10-min boundary + device TEXT NOT NULL, + channel TEXT NOT NULL, + value REAL NOT NULL, -- Averaged value + sample_count INTEGER NOT NULL, + UNIQUE(timestamp, device, channel) +); + +-- 1-hour aggregated data (kept forever) +CREATE TABLE sensor_data_1h ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp DATETIME NOT NULL, -- Rounded to 1-hour boundary + device TEXT NOT NULL, + channel TEXT NOT NULL, + value REAL NOT NULL, -- Averaged value + sample_count INTEGER NOT NULL, + UNIQUE(timestamp, device, channel) +); +``` + +--- + +### Server - WebSocket Protocol + +#### [NEW] [server.js](file:///home/seb/src/tischlerctrl/server/src/websocket/server.js) + +**Authentication Flow:** +1. Client connects to `ws://server:8080` +2. Client sends: `{ "type": "auth", "apiKey": "xxx" }` +3. Server validates API key, responds: `{ "type": "auth", "success": true, "devicePrefix": "ac:" }` +4. Client is now authenticated and can send data + +**Data Ingestion Message:** +```json +{ + "type": "data", + "readings": [ + { "device": "controller-69-grow", "channel": "temperature", "value": 24.5 }, + { "device": "controller-69-grow", "channel": "humidity", "value": 65.2 } + ] +} +``` + +Server prepends `devicePrefix` to device names and adds timestamp. + +**Keepalive:** +- Server sends `ping` every 30 seconds +- Client responds with `pong` +- Connection closed after 90 seconds of no response + +--- + +### Server - Aggregation Jobs + +#### [NEW] [aggregator.js](file:///home/seb/src/tischlerctrl/server/src/jobs/aggregator.js) + +Runs every 10 minutes: + +1. **10-minute aggregation**: + - Select data from `sensor_data` older than 10 minutes + - Group by device, channel, and 10-minute bucket + - Calculate average, insert into `sensor_data_10m` + +2. **1-hour aggregation**: + - Select data from `sensor_data_10m` older than 1 hour + - Group by device, channel, and 1-hour bucket + - Calculate weighted average, insert into `sensor_data_1h` + +#### [NEW] [cleanup.js](file:///home/seb/src/tischlerctrl/server/src/jobs/cleanup.js) + +Runs every hour: +- Delete from `sensor_data` where timestamp < NOW - 7 days +- Delete from `sensor_data_10m` where timestamp < NOW - 30 days + +--- + +### AC Infinity Agent + +#### [NEW] [ac-client.js](file:///home/seb/src/tischlerctrl/agents/ac-infinity/src/ac-client.js) + +Port of the TypeScript AC Infinity client to JavaScript ES modules: + +- `login(email, password)` → Returns userId token +- `getDevicesListAll()` → Returns all controllers with sensor readings +- Polling interval: 60 seconds +- Extracts: temperature, humidity, VPD (if available) per controller + +**Data extraction from API response:** +```javascript +// Each device in response has: +// - devId, devName +// - devSettings.temperature (°C * 100) +// - devSettings.humidity (% * 100) +// We normalize and send to server +``` + +#### [NEW] [ws-client.js](file:///home/seb/src/tischlerctrl/agents/ac-infinity/src/ws-client.js) + +WebSocket client with: +- Auto-reconnect with exponential backoff (1s → 2s → 4s → ... → 60s max) +- Authentication on connect +- Heartbeat response +- Message queue during disconnection + +--- + +### Tapo Agent (Rust) + +#### [NEW] [main.rs](file:///home/seb/src/tischlerctrl/agents/tapo/src/main.rs) + +Uses [tapo crate](https://crates.io/crates/tapo) for P100/P110 communication. + +**Features:** +- Configuration via environment variables or TOML file +- WebSocket client with tungstenite crate +- Auto-reconnect with backoff +- Polls devices every 60 seconds + +**Data collected:** +| Device | Channel | Description | +|--------|---------|-------------| +| P100 | `state` | 0 = off, 1 = on | +| P110 | `state` | 0 = off, 1 = on | +| P110 | `power` | Current power in watts | +| P110 | `energy_today` | Energy used today in Wh | + +**Build for Raspberry Pi:** +```bash +# Cross-compile for ARM +cross build --release --target armv7-unknown-linux-gnueabihf +# Binary: ~2MB, runs with ~8MB RAM +``` + +--- + +### Custom CLI Agent + +#### [NEW] [sensor-send](file:///home/seb/src/tischlerctrl/agents/cli/sensor-send) + +A shell script using `websocat` (lightweight WebSocket CLI tool): + +```bash +#!/bin/bash +# Usage: sensor-send --device=mydevice --channel=temp --value=23.5 + +API_KEY="${SENSOR_API_KEY:-}" +SERVER="${SENSOR_SERVER:-ws://localhost:8080}" + +sensor-send mydevice temperature 23.5 +``` + +Requires: `websocat` (single binary, ~3MB, available via cargo or apt) + +--- + +## Configuration Examples + +### Server `.env` +```bash +PORT=8080 +DB_PATH=./data/sensors.db +# Generate API keys via CLI: node src/cli/generate-key.js "ac-infinity" "ac:" +``` + +### AC Infinity Agent `.env` +```bash +SERVER_URL=ws://192.168.1.100:8080 +API_KEY=your-api-key-here +AC_EMAIL=your@email.com +AC_PASSWORD=your-password +POLL_INTERVAL_MS=60000 +``` + +### Tapo Agent `config.toml` +```toml +server_url = "ws://192.168.1.100:8080" +api_key = "your-api-key-here" +poll_interval_secs = 60 + +[[devices]] +ip = "192.168.1.50" +name = "grow-light-plug" +type = "P110" # or "P100" +tapo_email = "your@email.com" +tapo_password = "your-tapo-password" +``` + +--- + +## Verification Plan + +### Automated Tests +1. **Server unit tests**: Database operations, aggregation logic +2. **Integration test**: Start server, connect mock agent, verify data flow +3. **Run commands**: + ```bash + cd server && npm test + cd agents/ac-infinity && npm test + ``` + +### Manual Verification +1. Start server, verify WebSocket accepts connections +2. Send test data via CLI agent, verify it appears in database +3. Wait 10+ minutes, verify aggregation runs and data appears in `sensor_data_10m` +4. Connect AC Infinity agent with real credentials, verify sensor readings +5. Deploy Tapo agent to Raspberry Pi, verify plug data collection diff --git a/promptlog.txt b/promptlog.txt new file mode 100644 index 0000000..a71c861 --- /dev/null +++ b/promptlog.txt @@ -0,0 +1,258 @@ +Sensor Data Collection System +A Node.js server that collects sensor data from multiple agents via WebSocket, stores it in SQLite with automatic data summarization and retention policies. + +Architecture Overview +Custom CLI Agent +Tapo Agent (Rust) +AC Infinity Agent (Node.js) +Central Server (Node.js) +polls every 60s +WebSocket +polls every 60s +WebSocket +WebSocket +WebSocket Server :8080 +SQLite Database +Aggregation Job +AC Infinity Client +AC Infinity Cloud API +Tapo Client +Tapo P100/P110 +Shell Script +User Review Required +IMPORTANT + +Tapo Agent Language Choice: I recommend Rust for the Tapo agent because: + +Compiles to a single ~2MB static binary +Uses ~5-10MB RAM at runtime +Excellent tapo crate already exists +Easy cross-compilation for Raspberry Pi +Alternatively, I could write it in Go (would need to implement protocol from scratch) or as a Node.js agent (but you mentioned wanting it lightweight). + +IMPORTANT + +AC Infinity Credentials: The AC Infinity API requires email/password authentication to their cloud service. These will need to be stored in configuration. + +Project Structure +tischlerctrl/ +├── server/ +│ ├── package.json +│ ├── src/ +│ │ ├── index.js # Entry point +│ │ ├── config.js # Configuration loader +│ │ ├── db/ +│ │ │ ├── schema.js # SQLite schema + migrations +│ │ │ └── queries.js # Database operations +│ │ ├── websocket/ +│ │ │ ├── server.js # WebSocket server +│ │ │ └── handlers.js # Message handlers +│ │ └── jobs/ +│ │ ├── aggregator.js # Data summarization job +│ │ └── cleanup.js # Data retention cleanup +│ └── data/ +│ └── sensors.db # SQLite database file +│ +├── agents/ +│ ├── ac-infinity/ +│ │ ├── package.json +│ │ └── src/ +│ │ ├── index.js # Entry point +│ │ ├── config.js # Configuration +│ │ ├── ac-client.js # AC Infinity API client +│ │ └── ws-client.js # WebSocket client with reconnect +│ │ +│ ├── tapo/ +│ │ ├── Cargo.toml +│ │ └── src/ +│ │ └── main.rs # Rust Tapo agent +│ │ +│ └── cli/ +│ └── sensor-send # Shell script CLI tool +│ +├── .env.example # Example environment variables +└── README.md +Proposed Changes +Server - Database Schema +[NEW] +schema.js +SQLite tables: + +-- API keys for agent authentication +CREATE TABLE api_keys ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + key TEXT UNIQUE NOT NULL, + name TEXT NOT NULL, -- e.g., "ac-infinity-agent" + device_prefix TEXT NOT NULL, -- e.g., "ac:" or "tapo:" + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + last_used_at DATETIME +); +-- Raw sensor data (1-minute resolution, kept for 1 week) +CREATE TABLE sensor_data ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp DATETIME NOT NULL, + device TEXT NOT NULL, -- e.g., "ac:controller-69-grow" + channel TEXT NOT NULL, -- e.g., "temperature", "humidity", "power" + value REAL NOT NULL, + INDEX idx_sensor_data_time (timestamp), + INDEX idx_sensor_data_device (device, channel) +); +-- 10-minute aggregated data (kept for 1 month) +CREATE TABLE sensor_data_10m ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp DATETIME NOT NULL, -- Rounded to 10-min boundary + device TEXT NOT NULL, + channel TEXT NOT NULL, + value REAL NOT NULL, -- Averaged value + sample_count INTEGER NOT NULL, + UNIQUE(timestamp, device, channel) +); +-- 1-hour aggregated data (kept forever) +CREATE TABLE sensor_data_1h ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp DATETIME NOT NULL, -- Rounded to 1-hour boundary + device TEXT NOT NULL, + channel TEXT NOT NULL, + value REAL NOT NULL, -- Averaged value + sample_count INTEGER NOT NULL, + UNIQUE(timestamp, device, channel) +); +Server - WebSocket Protocol +[NEW] +server.js +Authentication Flow: + +Client connects to ws://server:8080 +Client sends: { "type": "auth", "apiKey": "xxx" } +Server validates API key, responds: { "type": "auth", "success": true, "devicePrefix": "ac:" } +Client is now authenticated and can send data +Data Ingestion Message: + +{ + "type": "data", + "readings": [ + { "device": "controller-69-grow", "channel": "temperature", "value": 24.5 }, + { "device": "controller-69-grow", "channel": "humidity", "value": 65.2 } + ] +} +Server prepends devicePrefix to device names and adds timestamp. + +Keepalive: + +Server sends ping every 30 seconds +Client responds with pong +Connection closed after 90 seconds of no response +Server - Aggregation Jobs +[NEW] +aggregator.js +Runs every 10 minutes: + +10-minute aggregation: + +Select data from sensor_data older than 10 minutes +Group by device, channel, and 10-minute bucket +Calculate average, insert into sensor_data_10m +1-hour aggregation: + +Select data from sensor_data_10m older than 1 hour +Group by device, channel, and 1-hour bucket +Calculate weighted average, insert into sensor_data_1h +[NEW] +cleanup.js +Runs every hour: + +Delete from sensor_data where timestamp < NOW - 7 days +Delete from sensor_data_10m where timestamp < NOW - 30 days +AC Infinity Agent +[NEW] +ac-client.js +Port of the TypeScript AC Infinity client to JavaScript ES modules: + +login(email, password) → Returns userId token +getDevicesListAll() → Returns all controllers with sensor readings +Polling interval: 60 seconds +Extracts: temperature, humidity, VPD (if available) per controller +Data extraction from API response: + +// Each device in response has: +// - devId, devName +// - devSettings.temperature (°C * 100) +// - devSettings.humidity (% * 100) +// We normalize and send to server +[NEW] +ws-client.js +WebSocket client with: + +Auto-reconnect with exponential backoff (1s → 2s → 4s → ... → 60s max) +Authentication on connect +Heartbeat response +Message queue during disconnection +Tapo Agent (Rust) +[NEW] +main.rs +Uses tapo crate for P100/P110 communication. + +Features: + +Configuration via environment variables or TOML file +WebSocket client with tungstenite crate +Auto-reconnect with backoff +Polls devices every 60 seconds +Data collected: + +Device Channel Description +P100 state 0 = off, 1 = on +P110 state 0 = off, 1 = on +P110 power Current power in watts +P110 energy_today Energy used today in Wh +Build for Raspberry Pi: + +# Cross-compile for ARM +cross build --release --target armv7-unknown-linux-gnueabihf +# Binary: ~2MB, runs with ~8MB RAM +Custom CLI Agent +[NEW] +sensor-send +A shell script using websocat (lightweight WebSocket CLI tool): + +#!/bin/bash +# Usage: sensor-send --device=mydevice --channel=temp --value=23.5 +API_KEY="${SENSOR_API_KEY:-}" +SERVER="${SENSOR_SERVER:-ws://localhost:8080}" +sensor-send mydevice temperature 23.5 +Requires: websocat (single binary, ~3MB, available via cargo or apt) + +Configuration Examples +Server .env +PORT=8080 +DB_PATH=./data/sensors.db +# Generate API keys via CLI: node src/cli/generate-key.js "ac-infinity" "ac:" +AC Infinity Agent .env +SERVER_URL=ws://192.168.1.100:8080 +API_KEY=your-api-key-here +AC_EMAIL=your@email.com +AC_PASSWORD=your-password +POLL_INTERVAL_MS=60000 +Tapo Agent config.toml +server_url = "ws://192.168.1.100:8080" +api_key = "your-api-key-here" +poll_interval_secs = 60 +[[devices]] +ip = "192.168.1.50" +name = "grow-light-plug" +type = "P110" # or "P100" +tapo_email = "your@email.com" +tapo_password = "your-tapo-password" +Verification Plan +Automated Tests +Server unit tests: Database operations, aggregation logic +Integration test: Start server, connect mock agent, verify data flow +Run commands: +cd server && npm test +cd agents/ac-infinity && npm test +Manual Verification +Start server, verify WebSocket accepts connections +Send test data via CLI agent, verify it appears in database +Wait 10+ minutes, verify aggregation runs and data appears in sensor_data_10m +Connect AC Infinity agent with real credentials, verify sensor readings +Deploy Tapo agent to Raspberry Pi, verify plug data collection \ No newline at end of file diff --git a/server/.env.example b/server/.env.example new file mode 100644 index 0000000..f687807 --- /dev/null +++ b/server/.env.example @@ -0,0 +1,7 @@ +# Server Environment Configuration +PORT=8080 +DB_PATH=./data/sensors.db + +# Job intervals (optional, defaults shown) +# AGGREGATION_INTERVAL_MS=600000 +# CLEANUP_INTERVAL_MS=3600000 diff --git a/server/data/sensors.db b/server/data/sensors.db new file mode 100644 index 0000000..c269d1f Binary files /dev/null and b/server/data/sensors.db differ diff --git a/server/data/sensors.db-shm b/server/data/sensors.db-shm new file mode 100644 index 0000000..536aeac Binary files /dev/null and b/server/data/sensors.db-shm differ diff --git a/server/data/sensors.db-wal b/server/data/sensors.db-wal new file mode 100644 index 0000000..c478dba Binary files /dev/null and b/server/data/sensors.db-wal differ diff --git a/server/package-lock.json b/server/package-lock.json new file mode 100644 index 0000000..f161594 --- /dev/null +++ b/server/package-lock.json @@ -0,0 +1,501 @@ +{ + "name": "tischlerctrl-server", + "version": "1.0.0", + "lockfileVersion": 3, + "requires": true, + "packages": { + "": { + "name": "tischlerctrl-server", + "version": "1.0.0", + "dependencies": { + "better-sqlite3": "^11.6.0", + "dotenv": "^16.4.7", + "ws": "^8.18.0" + }, + "engines": { + "node": ">=18.0.0" + } + }, + "node_modules/base64-js": { + "version": "1.5.1", + "resolved": "https://registry.npmjs.org/base64-js/-/base64-js-1.5.1.tgz", + "integrity": "sha512-AKpaYlHn8t4SVbOHCy+b5+KKgvR4vrsD8vbvrbiQJps7fKDTkjkDry6ji0rUJjC0kzbNePLwzxq8iypo41qeWA==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ], + "license": "MIT" + }, + "node_modules/better-sqlite3": { + "version": "11.10.0", + "resolved": "https://registry.npmjs.org/better-sqlite3/-/better-sqlite3-11.10.0.tgz", + "integrity": "sha512-EwhOpyXiOEL/lKzHz9AW1msWFNzGc/z+LzeB3/jnFJpxu+th2yqvzsSWas1v9jgs9+xiXJcD5A8CJxAG2TaghQ==", + "hasInstallScript": true, + "license": "MIT", + "dependencies": { + "bindings": "^1.5.0", + "prebuild-install": "^7.1.1" + } + }, + "node_modules/bindings": { + "version": "1.5.0", + "resolved": "https://registry.npmjs.org/bindings/-/bindings-1.5.0.tgz", + "integrity": "sha512-p2q/t/mhvuOj/UeLlV6566GD/guowlr0hHxClI0W9m7MWYkL1F0hLo+0Aexs9HSPCtR1SXQ0TD3MMKrXZajbiQ==", + "license": "MIT", + "dependencies": { + "file-uri-to-path": "1.0.0" + } + }, + "node_modules/bl": { + "version": "4.1.0", + "resolved": "https://registry.npmjs.org/bl/-/bl-4.1.0.tgz", + "integrity": "sha512-1W07cM9gS6DcLperZfFSj+bWLtaPGSOHWhPiGzXmvVJbRLdG82sH/Kn8EtW1VqWVA54AKf2h5k5BbnIbwF3h6w==", + "license": "MIT", + "dependencies": { + "buffer": "^5.5.0", + "inherits": "^2.0.4", + "readable-stream": "^3.4.0" + } + }, + "node_modules/buffer": { + "version": "5.7.1", + "resolved": "https://registry.npmjs.org/buffer/-/buffer-5.7.1.tgz", + "integrity": "sha512-EHcyIPBQ4BSGlvjB16k5KgAJ27CIsHY/2JBmCRReo48y9rQ3MaUzWX3KVlBa4U7MyX02HdVj0K7C3WaB3ju7FQ==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ], + "license": "MIT", + "dependencies": { + "base64-js": "^1.3.1", + "ieee754": "^1.1.13" + } + }, + "node_modules/chownr": { + "version": "1.1.4", + "resolved": "https://registry.npmjs.org/chownr/-/chownr-1.1.4.tgz", + "integrity": "sha512-jJ0bqzaylmJtVnNgzTeSOs8DPavpbYgEr/b0YL8/2GO3xJEhInFmhKMUnEJQjZumK7KXGFhUy89PrsJWlakBVg==", + "license": "ISC" + }, + "node_modules/decompress-response": { + "version": "6.0.0", + "resolved": "https://registry.npmjs.org/decompress-response/-/decompress-response-6.0.0.tgz", + "integrity": "sha512-aW35yZM6Bb/4oJlZncMH2LCoZtJXTRxES17vE3hoRiowU2kWHaJKFkSBDnDR+cm9J+9QhXmREyIfv0pji9ejCQ==", + "license": "MIT", + "dependencies": { + "mimic-response": "^3.1.0" + }, + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, + "node_modules/deep-extend": { + "version": "0.6.0", + "resolved": "https://registry.npmjs.org/deep-extend/-/deep-extend-0.6.0.tgz", + "integrity": "sha512-LOHxIOaPYdHlJRtCQfDIVZtfw/ufM8+rVj649RIHzcm/vGwQRXFt6OPqIFWsm2XEMrNIEtWR64sY1LEKD2vAOA==", + "license": "MIT", + "engines": { + "node": ">=4.0.0" + } + }, + "node_modules/detect-libc": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/detect-libc/-/detect-libc-2.1.2.tgz", + "integrity": "sha512-Btj2BOOO83o3WyH59e8MgXsxEQVcarkUOpEYrubB0urwnN10yQ364rsiByU11nZlqWYZm05i/of7io4mzihBtQ==", + "license": "Apache-2.0", + "engines": { + "node": ">=8" + } + }, + "node_modules/dotenv": { + "version": "16.6.1", + "resolved": "https://registry.npmjs.org/dotenv/-/dotenv-16.6.1.tgz", + "integrity": "sha512-uBq4egWHTcTt33a72vpSG0z3HnPuIl6NqYcTrKEg2azoEyl2hpW0zqlxysq2pK9HlDIHyHyakeYaYnSAwd8bow==", + "license": "BSD-2-Clause", + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://dotenvx.com" + } + }, + "node_modules/end-of-stream": { + "version": "1.4.5", + "resolved": "https://registry.npmjs.org/end-of-stream/-/end-of-stream-1.4.5.tgz", + "integrity": "sha512-ooEGc6HP26xXq/N+GCGOT0JKCLDGrq2bQUZrQ7gyrJiZANJ/8YDTxTpQBXGMn+WbIQXNVpyWymm7KYVICQnyOg==", + "license": "MIT", + "dependencies": { + "once": "^1.4.0" + } + }, + "node_modules/expand-template": { + "version": "2.0.3", + "resolved": "https://registry.npmjs.org/expand-template/-/expand-template-2.0.3.tgz", + "integrity": "sha512-XYfuKMvj4O35f/pOXLObndIRvyQ+/+6AhODh+OKWj9S9498pHHn/IMszH+gt0fBCRWMNfk1ZSp5x3AifmnI2vg==", + "license": "(MIT OR WTFPL)", + "engines": { + "node": ">=6" + } + }, + "node_modules/file-uri-to-path": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/file-uri-to-path/-/file-uri-to-path-1.0.0.tgz", + "integrity": "sha512-0Zt+s3L7Vf1biwWZ29aARiVYLx7iMGnEUl9x33fbB/j3jR81u/O2LbqK+Bm1CDSNDKVtJ/YjwY7TUd5SkeLQLw==", + "license": "MIT" + }, + "node_modules/fs-constants": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/fs-constants/-/fs-constants-1.0.0.tgz", + "integrity": "sha512-y6OAwoSIf7FyjMIv94u+b5rdheZEjzR63GTyZJm5qh4Bi+2YgwLCcI/fPFZkL5PSixOt6ZNKm+w+Hfp/Bciwow==", + "license": "MIT" + }, + "node_modules/github-from-package": { + "version": "0.0.0", + "resolved": "https://registry.npmjs.org/github-from-package/-/github-from-package-0.0.0.tgz", + "integrity": "sha512-SyHy3T1v2NUXn29OsWdxmK6RwHD+vkj3v8en8AOBZ1wBQ/hCAQ5bAQTD02kW4W9tUp/3Qh6J8r9EvntiyCmOOw==", + "license": "MIT" + }, + "node_modules/ieee754": { + "version": "1.2.1", + "resolved": "https://registry.npmjs.org/ieee754/-/ieee754-1.2.1.tgz", + "integrity": "sha512-dcyqhDvX1C46lXZcVqCpK+FtMRQVdIMN6/Df5js2zouUsqG7I6sFxitIC+7KYK29KdXOLHdu9zL4sFnoVQnqaA==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ], + "license": "BSD-3-Clause" + }, + "node_modules/inherits": { + "version": "2.0.4", + "resolved": "https://registry.npmjs.org/inherits/-/inherits-2.0.4.tgz", + "integrity": "sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ==", + "license": "ISC" + }, + "node_modules/ini": { + "version": "1.3.8", + "resolved": "https://registry.npmjs.org/ini/-/ini-1.3.8.tgz", + "integrity": "sha512-JV/yugV2uzW5iMRSiZAyDtQd+nxtUnjeLt0acNdw98kKLrvuRVyB80tsREOE7yvGVgalhZ6RNXCmEHkUKBKxew==", + "license": "ISC" + }, + "node_modules/mimic-response": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/mimic-response/-/mimic-response-3.1.0.tgz", + "integrity": "sha512-z0yWI+4FDrrweS8Zmt4Ej5HdJmky15+L2e6Wgn3+iK5fWzb6T3fhNFq2+MeTRb064c6Wr4N/wv0DzQTjNzHNGQ==", + "license": "MIT", + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, + "node_modules/minimist": { + "version": "1.2.8", + "resolved": "https://registry.npmjs.org/minimist/-/minimist-1.2.8.tgz", + "integrity": "sha512-2yyAR8qBkN3YuheJanUpWC5U3bb5osDywNB8RzDVlDwDHbocAJveqqj1u8+SVD7jkWT4yvsHCpWqqWqAxb0zCA==", + "license": "MIT", + "funding": { + "url": "https://github.com/sponsors/ljharb" + } + }, + "node_modules/mkdirp-classic": { + "version": "0.5.3", + "resolved": "https://registry.npmjs.org/mkdirp-classic/-/mkdirp-classic-0.5.3.tgz", + "integrity": "sha512-gKLcREMhtuZRwRAfqP3RFW+TK4JqApVBtOIftVgjuABpAtpxhPGaDcfvbhNvD0B8iD1oUr/txX35NjcaY6Ns/A==", + "license": "MIT" + }, + "node_modules/napi-build-utils": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/napi-build-utils/-/napi-build-utils-2.0.0.tgz", + "integrity": "sha512-GEbrYkbfF7MoNaoh2iGG84Mnf/WZfB0GdGEsM8wz7Expx/LlWf5U8t9nvJKXSp3qr5IsEbK04cBGhol/KwOsWA==", + "license": "MIT" + }, + "node_modules/node-abi": { + "version": "3.85.0", + "resolved": "https://registry.npmjs.org/node-abi/-/node-abi-3.85.0.tgz", + "integrity": "sha512-zsFhmbkAzwhTft6nd3VxcG0cvJsT70rL+BIGHWVq5fi6MwGrHwzqKaxXE+Hl2GmnGItnDKPPkO5/LQqjVkIdFg==", + "license": "MIT", + "dependencies": { + "semver": "^7.3.5" + }, + "engines": { + "node": ">=10" + } + }, + "node_modules/once": { + "version": "1.4.0", + "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz", + "integrity": "sha512-lNaJgI+2Q5URQBkccEKHTQOPaXdUxnZZElQTZY0MFUAuaEqe1E+Nyvgdz/aIyNi6Z9MzO5dv1H8n58/GELp3+w==", + "license": "ISC", + "dependencies": { + "wrappy": "1" + } + }, + "node_modules/prebuild-install": { + "version": "7.1.3", + "resolved": "https://registry.npmjs.org/prebuild-install/-/prebuild-install-7.1.3.tgz", + "integrity": "sha512-8Mf2cbV7x1cXPUILADGI3wuhfqWvtiLA1iclTDbFRZkgRQS0NqsPZphna9V+HyTEadheuPmjaJMsbzKQFOzLug==", + "license": "MIT", + "dependencies": { + "detect-libc": "^2.0.0", + "expand-template": "^2.0.3", + "github-from-package": "0.0.0", + "minimist": "^1.2.3", + "mkdirp-classic": "^0.5.3", + "napi-build-utils": "^2.0.0", + "node-abi": "^3.3.0", + "pump": "^3.0.0", + "rc": "^1.2.7", + "simple-get": "^4.0.0", + "tar-fs": "^2.0.0", + "tunnel-agent": "^0.6.0" + }, + "bin": { + "prebuild-install": "bin.js" + }, + "engines": { + "node": ">=10" + } + }, + "node_modules/pump": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/pump/-/pump-3.0.3.tgz", + "integrity": "sha512-todwxLMY7/heScKmntwQG8CXVkWUOdYxIvY2s0VWAAMh/nd8SoYiRaKjlr7+iCs984f2P8zvrfWcDDYVb73NfA==", + "license": "MIT", + "dependencies": { + "end-of-stream": "^1.1.0", + "once": "^1.3.1" + } + }, + "node_modules/rc": { + "version": "1.2.8", + "resolved": "https://registry.npmjs.org/rc/-/rc-1.2.8.tgz", + "integrity": "sha512-y3bGgqKj3QBdxLbLkomlohkvsA8gdAiUQlSBJnBhfn+BPxg4bc62d8TcBW15wavDfgexCgccckhcZvywyQYPOw==", + "license": "(BSD-2-Clause OR MIT OR Apache-2.0)", + "dependencies": { + "deep-extend": "^0.6.0", + "ini": "~1.3.0", + "minimist": "^1.2.0", + "strip-json-comments": "~2.0.1" + }, + "bin": { + "rc": "cli.js" + } + }, + "node_modules/readable-stream": { + "version": "3.6.2", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-3.6.2.tgz", + "integrity": "sha512-9u/sniCrY3D5WdsERHzHE4G2YCXqoG5FTHUiCC4SIbr6XcLZBY05ya9EKjYek9O5xOAwjGq+1JdGBAS7Q9ScoA==", + "license": "MIT", + "dependencies": { + "inherits": "^2.0.3", + "string_decoder": "^1.1.1", + "util-deprecate": "^1.0.1" + }, + "engines": { + "node": ">= 6" + } + }, + "node_modules/safe-buffer": { + "version": "5.2.1", + "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.2.1.tgz", + "integrity": "sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ], + "license": "MIT" + }, + "node_modules/semver": { + "version": "7.7.3", + "resolved": "https://registry.npmjs.org/semver/-/semver-7.7.3.tgz", + "integrity": "sha512-SdsKMrI9TdgjdweUSR9MweHA4EJ8YxHn8DFaDisvhVlUOe4BF1tLD7GAj0lIqWVl+dPb/rExr0Btby5loQm20Q==", + "license": "ISC", + "bin": { + "semver": "bin/semver.js" + }, + "engines": { + "node": ">=10" + } + }, + "node_modules/simple-concat": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/simple-concat/-/simple-concat-1.0.1.tgz", + "integrity": "sha512-cSFtAPtRhljv69IK0hTVZQ+OfE9nePi/rtJmw5UjHeVyVroEqJXP1sFztKUy1qU+xvz3u/sfYJLa947b7nAN2Q==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ], + "license": "MIT" + }, + "node_modules/simple-get": { + "version": "4.0.1", + "resolved": "https://registry.npmjs.org/simple-get/-/simple-get-4.0.1.tgz", + "integrity": "sha512-brv7p5WgH0jmQJr1ZDDfKDOSeWWg+OVypG99A/5vYGPqJ6pxiaHLy8nxtFjBA7oMa01ebA9gfh1uMCFqOuXxvA==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ], + "license": "MIT", + "dependencies": { + "decompress-response": "^6.0.0", + "once": "^1.3.1", + "simple-concat": "^1.0.0" + } + }, + "node_modules/string_decoder": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.3.0.tgz", + "integrity": "sha512-hkRX8U1WjJFd8LsDJ2yQ/wWWxaopEsABU1XfkM8A+j0+85JAGppt16cr1Whg6KIbb4okU6Mql6BOj+uup/wKeA==", + "license": "MIT", + "dependencies": { + "safe-buffer": "~5.2.0" + } + }, + "node_modules/strip-json-comments": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/strip-json-comments/-/strip-json-comments-2.0.1.tgz", + "integrity": "sha512-4gB8na07fecVVkOI6Rs4e7T6NOTki5EmL7TUduTs6bu3EdnSycntVJ4re8kgZA+wx9IueI2Y11bfbgwtzuE0KQ==", + "license": "MIT", + "engines": { + "node": ">=0.10.0" + } + }, + "node_modules/tar-fs": { + "version": "2.1.4", + "resolved": "https://registry.npmjs.org/tar-fs/-/tar-fs-2.1.4.tgz", + "integrity": "sha512-mDAjwmZdh7LTT6pNleZ05Yt65HC3E+NiQzl672vQG38jIrehtJk/J3mNwIg+vShQPcLF/LV7CMnDW6vjj6sfYQ==", + "license": "MIT", + "dependencies": { + "chownr": "^1.1.1", + "mkdirp-classic": "^0.5.2", + "pump": "^3.0.0", + "tar-stream": "^2.1.4" + } + }, + "node_modules/tar-stream": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/tar-stream/-/tar-stream-2.2.0.tgz", + "integrity": "sha512-ujeqbceABgwMZxEJnk2HDY2DlnUZ+9oEcb1KzTVfYHio0UE6dG71n60d8D2I4qNvleWrrXpmjpt7vZeF1LnMZQ==", + "license": "MIT", + "dependencies": { + "bl": "^4.0.3", + "end-of-stream": "^1.4.1", + "fs-constants": "^1.0.0", + "inherits": "^2.0.3", + "readable-stream": "^3.1.1" + }, + "engines": { + "node": ">=6" + } + }, + "node_modules/tunnel-agent": { + "version": "0.6.0", + "resolved": "https://registry.npmjs.org/tunnel-agent/-/tunnel-agent-0.6.0.tgz", + "integrity": "sha512-McnNiV1l8RYeY8tBgEpuodCC1mLUdbSN+CYBL7kJsJNInOP8UjDDEwdk6Mw60vdLLrr5NHKZhMAOSrR2NZuQ+w==", + "license": "Apache-2.0", + "dependencies": { + "safe-buffer": "^5.0.1" + }, + "engines": { + "node": "*" + } + }, + "node_modules/util-deprecate": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz", + "integrity": "sha512-EPD5q1uXyFxJpCrLnCc1nHnq3gOa6DZBocAIiI2TaSCA7VCJ1UJDMagCzIkXNsUYfD1daK//LTEQ8xiIbrHtcw==", + "license": "MIT" + }, + "node_modules/wrappy": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/wrappy/-/wrappy-1.0.2.tgz", + "integrity": "sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ==", + "license": "ISC" + }, + "node_modules/ws": { + "version": "8.18.3", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.18.3.tgz", + "integrity": "sha512-PEIGCY5tSlUt50cqyMXfCzX+oOPqN0vuGqWzbcJ2xvnkzkq46oOpz7dQaTDBdfICb4N14+GARUDw2XV2N4tvzg==", + "license": "MIT", + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": ">=5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + } + } +} diff --git a/server/package.json b/server/package.json new file mode 100644 index 0000000..9262565 --- /dev/null +++ b/server/package.json @@ -0,0 +1,20 @@ +{ + "name": "tischlerctrl-server", + "version": "1.0.0", + "description": "Sensor data collection server with WebSocket API", + "type": "module", + "main": "src/index.js", + "scripts": { + "start": "node src/index.js", + "dev": "node --watch src/index.js", + "generate-key": "node src/cli/generate-key.js" + }, + "dependencies": { + "better-sqlite3": "^11.6.0", + "dotenv": "^16.4.7", + "ws": "^8.18.0" + }, + "engines": { + "node": ">=18.0.0" + } +} diff --git a/server/src/cli/generate-key.js b/server/src/cli/generate-key.js new file mode 100644 index 0000000..bc4b7c1 --- /dev/null +++ b/server/src/cli/generate-key.js @@ -0,0 +1,62 @@ +#!/usr/bin/env node + +/** + * CLI tool to generate API keys for agents + * Usage: node generate-key.js + * Example: node generate-key.js "ac-infinity-agent" "ac:" + */ + +import { fileURLToPath } from 'url'; +import { dirname, join } from 'path'; +import { initDatabase } from '../db/schema.js'; +import { generateApiKey, listApiKeys } from '../db/queries.js'; + +const __filename = fileURLToPath(import.meta.url); +const __dirname = dirname(__filename); + +const dbPath = process.env.DB_PATH || join(__dirname, '..', '..', 'data', 'sensors.db'); + +const args = process.argv.slice(2); + +if (args.length === 0 || args[0] === '--list') { + // List existing keys + const db = initDatabase(dbPath); + const keys = listApiKeys(db); + + if (keys.length === 0) { + console.log('No API keys found.'); + } else { + console.log('\nExisting API keys:\n'); + console.log('ID | Name | Prefix | Preview | Last Used'); + console.log('-'.repeat(75)); + for (const key of keys) { + const lastUsed = key.last_used_at || 'never'; + console.log(`${key.id.toString().padEnd(3)} | ${key.name.padEnd(20)} | ${key.device_prefix.padEnd(7)} | ${key.key_preview.padEnd(12)} | ${lastUsed}`); + } + } + + console.log('\nUsage: node generate-key.js '); + console.log('Example: node generate-key.js "ac-infinity-agent" "ac:"'); + + db.close(); + process.exit(0); +} + +if (args.length < 2) { + console.error('Error: Both name and device_prefix are required'); + console.error('Usage: node generate-key.js '); + process.exit(1); +} + +const [name, devicePrefix] = args; + +const db = initDatabase(dbPath); +const key = generateApiKey(db, name, devicePrefix); + +console.log('\n✓ API key generated successfully!\n'); +console.log(`Name: ${name}`); +console.log(`Device Prefix: ${devicePrefix}`); +console.log(`API Key: ${key}`); +console.log('\n⚠ Save this key securely - it cannot be recovered!\n'); + +db.close(); diff --git a/server/src/config.js b/server/src/config.js new file mode 100644 index 0000000..b29f708 --- /dev/null +++ b/server/src/config.js @@ -0,0 +1,18 @@ +import { config } from 'dotenv'; +import { fileURLToPath } from 'url'; +import { dirname, join } from 'path'; + +const __filename = fileURLToPath(import.meta.url); +const __dirname = dirname(__filename); + +// Load environment variables from .env file +config({ path: join(__dirname, '..', '.env') }); + +export default { + port: parseInt(process.env.PORT || '8080', 10), + dbPath: process.env.DB_PATH || join(__dirname, '..', 'data', 'sensors.db'), + + // Job intervals + aggregationIntervalMs: parseInt(process.env.AGGREGATION_INTERVAL_MS || String(10 * 60 * 1000), 10), + cleanupIntervalMs: parseInt(process.env.CLEANUP_INTERVAL_MS || String(60 * 60 * 1000), 10), +}; diff --git a/server/src/db/queries.js b/server/src/db/queries.js new file mode 100644 index 0000000..5f955c0 --- /dev/null +++ b/server/src/db/queries.js @@ -0,0 +1,179 @@ +import crypto from 'crypto'; + +/** + * Database query functions for sensor data operations + */ + +/** + * Validate an API key and return the associated metadata + * @param {Database} db - SQLite database instance + * @param {string} apiKey - The API key to validate + * @returns {object|null} - API key metadata or null if invalid + */ +export function validateApiKey(db, apiKey) { + const stmt = db.prepare(` + SELECT id, name, device_prefix + FROM api_keys + WHERE key = ? + `); + const result = stmt.get(apiKey); + + if (result) { + // Update last_used_at timestamp + db.prepare(` + UPDATE api_keys SET last_used_at = datetime('now') WHERE id = ? + `).run(result.id); + } + + return result || null; +} + +/** + * Generate a new API key + * @param {Database} db - SQLite database instance + * @param {string} name - Name/description for the API key + * @param {string} devicePrefix - Prefix to prepend to device names (e.g., "ac:", "tapo:") + * @returns {string} - The generated API key + */ +export function generateApiKey(db, name, devicePrefix) { + const key = crypto.randomBytes(32).toString('hex'); + + db.prepare(` + INSERT INTO api_keys (key, name, device_prefix) + VALUES (?, ?, ?) + `).run(key, name, devicePrefix); + + return key; +} + +/** + * Insert sensor readings into the database + * @param {Database} db - SQLite database instance + * @param {string} devicePrefix - Prefix to prepend to device names + * @param {Array} readings - Array of {device, channel, value} objects + * @param {Date} timestamp - Timestamp for all readings (defaults to now) + */ +export function insertReadings(db, devicePrefix, readings, timestamp = new Date()) { + const isoTimestamp = timestamp.toISOString(); + + const stmt = db.prepare(` + INSERT INTO sensor_data (timestamp, device, channel, value) + VALUES (?, ?, ?, ?) + `); + + const insertMany = db.transaction((items) => { + for (const reading of items) { + const fullDevice = `${devicePrefix}${reading.device}`; + stmt.run(isoTimestamp, fullDevice, reading.channel, reading.value); + } + }); + + insertMany(readings); + return readings.length; +} + +/** + * Aggregate raw data into 10-minute buckets + * @param {Database} db - SQLite database instance + * @returns {number} - Number of aggregated records created + */ +export function aggregate10Minutes(db) { + // Get the cutoff time (10 minutes ago, rounded down to 10-min boundary) + const now = new Date(); + const cutoff = new Date(Math.floor(now.getTime() / 600000) * 600000 - 600000); + const cutoffISO = cutoff.toISOString(); + + const result = db.prepare(` + INSERT OR REPLACE INTO sensor_data_10m (timestamp, device, channel, value, sample_count) + SELECT + datetime(strftime('%s', timestamp) / 600 * 600, 'unixepoch') as bucket, + device, + channel, + AVG(value) as avg_value, + COUNT(*) as sample_count + FROM sensor_data + WHERE timestamp < ? + AND timestamp >= datetime(?, '-1 hour') + GROUP BY bucket, device, channel + `).run(cutoffISO, cutoffISO); + + return result.changes; +} + +/** + * Aggregate 10-minute data into 1-hour buckets + * @param {Database} db - SQLite database instance + * @returns {number} - Number of aggregated records created + */ +export function aggregate1Hour(db) { + // Get the cutoff time (1 hour ago, rounded down to hour boundary) + const now = new Date(); + const cutoff = new Date(Math.floor(now.getTime() / 3600000) * 3600000 - 3600000); + const cutoffISO = cutoff.toISOString(); + + const result = db.prepare(` + INSERT OR REPLACE INTO sensor_data_1h (timestamp, device, channel, value, sample_count) + SELECT + datetime(strftime('%s', timestamp) / 3600 * 3600, 'unixepoch') as bucket, + device, + channel, + SUM(value * sample_count) / SUM(sample_count) as weighted_avg, + SUM(sample_count) as total_samples + FROM sensor_data_10m + WHERE timestamp < ? + AND timestamp >= datetime(?, '-1 day') + GROUP BY bucket, device, channel + `).run(cutoffISO, cutoffISO); + + return result.changes; +} + +/** + * Clean up old data according to retention policy + * @param {Database} db - SQLite database instance + * @returns {object} - Number of deleted records per table + */ +export function cleanupOldData(db) { + const now = new Date(); + + // Delete raw data older than 7 days + const weekAgo = new Date(now.getTime() - 7 * 24 * 60 * 60 * 1000); + const rawDeleted = db.prepare(` + DELETE FROM sensor_data WHERE timestamp < ? + `).run(weekAgo.toISOString()); + + // Delete 10-minute data older than 30 days + const monthAgo = new Date(now.getTime() - 30 * 24 * 60 * 60 * 1000); + const aggDeleted = db.prepare(` + DELETE FROM sensor_data_10m WHERE timestamp < ? + `).run(monthAgo.toISOString()); + + return { + rawDeleted: rawDeleted.changes, + aggregatedDeleted: aggDeleted.changes + }; +} + +/** + * List all API keys (without showing the actual key values) + * @param {Database} db - SQLite database instance + * @returns {Array} - List of API key metadata + */ +export function listApiKeys(db) { + return db.prepare(` + SELECT id, name, device_prefix, created_at, last_used_at, + substr(key, 1, 8) || '...' as key_preview + FROM api_keys + ORDER BY created_at DESC + `).all(); +} + +export default { + validateApiKey, + generateApiKey, + insertReadings, + aggregate10Minutes, + aggregate1Hour, + cleanupOldData, + listApiKeys +}; diff --git a/server/src/db/schema.js b/server/src/db/schema.js new file mode 100644 index 0000000..7f27615 --- /dev/null +++ b/server/src/db/schema.js @@ -0,0 +1,86 @@ +import Database from 'better-sqlite3'; +import { fileURLToPath } from 'url'; +import { dirname, join } from 'path'; +import { existsSync, mkdirSync } from 'fs'; + +const __filename = fileURLToPath(import.meta.url); +const __dirname = dirname(__filename); + +/** + * Initialize the SQLite database with all required tables + * @param {string} dbPath - Path to the SQLite database file + * @returns {Database} - The initialized database instance + */ +export function initDatabase(dbPath) { + // Ensure data directory exists + const dataDir = dirname(dbPath); + if (!existsSync(dataDir)) { + mkdirSync(dataDir, { recursive: true }); + } + + const db = new Database(dbPath); + + // Enable WAL mode for better concurrent performance + db.pragma('journal_mode = WAL'); + + // Create tables + db.exec(` + -- API keys for agent authentication + CREATE TABLE IF NOT EXISTS api_keys ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + key TEXT UNIQUE NOT NULL, + name TEXT NOT NULL, + device_prefix TEXT NOT NULL, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + last_used_at DATETIME + ); + + -- Raw sensor data (1-minute resolution, kept for 1 week) + CREATE TABLE IF NOT EXISTS sensor_data ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp DATETIME NOT NULL, + device TEXT NOT NULL, + channel TEXT NOT NULL, + value REAL NOT NULL + ); + + -- Index for time-based queries and cleanup + CREATE INDEX IF NOT EXISTS idx_sensor_data_time + ON sensor_data(timestamp); + + -- Index for device/channel queries + CREATE INDEX IF NOT EXISTS idx_sensor_data_device + ON sensor_data(device, channel, timestamp); + + -- 10-minute aggregated data (kept for 1 month) + CREATE TABLE IF NOT EXISTS sensor_data_10m ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp DATETIME NOT NULL, + device TEXT NOT NULL, + channel TEXT NOT NULL, + value REAL NOT NULL, + sample_count INTEGER NOT NULL + ); + + CREATE UNIQUE INDEX IF NOT EXISTS idx_sensor_data_10m_unique + ON sensor_data_10m(timestamp, device, channel); + + -- 1-hour aggregated data (kept forever) + CREATE TABLE IF NOT EXISTS sensor_data_1h ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp DATETIME NOT NULL, + device TEXT NOT NULL, + channel TEXT NOT NULL, + value REAL NOT NULL, + sample_count INTEGER NOT NULL + ); + + CREATE UNIQUE INDEX IF NOT EXISTS idx_sensor_data_1h_unique + ON sensor_data_1h(timestamp, device, channel); + `); + + console.log('[DB] Database initialized successfully'); + return db; +} + +export default { initDatabase }; diff --git a/server/src/index.js b/server/src/index.js new file mode 100644 index 0000000..79dd204 --- /dev/null +++ b/server/src/index.js @@ -0,0 +1,44 @@ +import config from './config.js'; +import { initDatabase } from './db/schema.js'; +import { createWebSocketServer } from './websocket/server.js'; +import { startAggregationJob } from './jobs/aggregator.js'; +import { startCleanupJob } from './jobs/cleanup.js'; + +console.log('='.repeat(50)); +console.log('TischlerCtrl Sensor Server'); +console.log('='.repeat(50)); + +// Initialize database +const db = initDatabase(config.dbPath); + +// Start WebSocket server +const wss = createWebSocketServer({ + port: config.port, + db +}); + +// Start background jobs +const aggregationTimer = startAggregationJob(db, config.aggregationIntervalMs); +const cleanupTimer = startCleanupJob(db, config.cleanupIntervalMs); + +// Graceful shutdown +function shutdown() { + console.log('\n[Server] Shutting down...'); + + clearInterval(aggregationTimer); + clearInterval(cleanupTimer); + + wss.close(() => { + db.close(); + console.log('[Server] Goodbye!'); + process.exit(0); + }); + + // Force exit after 5 seconds + setTimeout(() => process.exit(1), 5000); +} + +process.on('SIGINT', shutdown); +process.on('SIGTERM', shutdown); + +console.log('[Server] Ready to accept connections'); diff --git a/server/src/jobs/aggregator.js b/server/src/jobs/aggregator.js new file mode 100644 index 0000000..172c28c --- /dev/null +++ b/server/src/jobs/aggregator.js @@ -0,0 +1,42 @@ +import { aggregate10Minutes, aggregate1Hour, cleanupOldData } from '../db/queries.js'; + +/** + * Start the aggregation job that runs periodically + * @param {Database} db - SQLite database instance + * @param {number} intervalMs - Interval in milliseconds (default: 10 minutes) + * @returns {NodeJS.Timer} - The interval timer + */ +export function startAggregationJob(db, intervalMs = 10 * 60 * 1000) { + console.log(`[Aggregator] Starting aggregation job (interval: ${intervalMs / 1000}s)`); + + // Run immediately on start + runAggregation(db); + + // Then run periodically + return setInterval(() => runAggregation(db), intervalMs); +} + +/** + * Run the aggregation process + */ +function runAggregation(db) { + try { + const start = Date.now(); + + // Aggregate raw data to 10-minute buckets + const count10m = aggregate10Minutes(db); + + // Aggregate 10-minute data to 1-hour buckets + const count1h = aggregate1Hour(db); + + const elapsed = Date.now() - start; + + if (count10m > 0 || count1h > 0) { + console.log(`[Aggregator] Completed in ${elapsed}ms: ${count10m} 10m records, ${count1h} 1h records`); + } + } catch (err) { + console.error('[Aggregator] Error during aggregation:', err.message); + } +} + +export default { startAggregationJob }; diff --git a/server/src/jobs/cleanup.js b/server/src/jobs/cleanup.js new file mode 100644 index 0000000..cc62c24 --- /dev/null +++ b/server/src/jobs/cleanup.js @@ -0,0 +1,36 @@ +import { cleanupOldData } from '../db/queries.js'; + +/** + * Start the cleanup job that runs periodically + * @param {Database} db - SQLite database instance + * @param {number} intervalMs - Interval in milliseconds (default: 1 hour) + * @returns {NodeJS.Timer} - The interval timer + */ +export function startCleanupJob(db, intervalMs = 60 * 60 * 1000) { + console.log(`[Cleanup] Starting cleanup job (interval: ${intervalMs / 1000}s)`); + + // Run after a delay on start (don't compete with aggregator) + setTimeout(() => runCleanup(db), 5 * 60 * 1000); + + // Then run periodically + return setInterval(() => runCleanup(db), intervalMs); +} + +/** + * Run the cleanup process + */ +function runCleanup(db) { + try { + const start = Date.now(); + const result = cleanupOldData(db); + const elapsed = Date.now() - start; + + if (result.rawDeleted > 0 || result.aggregatedDeleted > 0) { + console.log(`[Cleanup] Completed in ${elapsed}ms: deleted ${result.rawDeleted} raw, ${result.aggregatedDeleted} 10m records`); + } + } catch (err) { + console.error('[Cleanup] Error during cleanup:', err.message); + } +} + +export default { startCleanupJob }; diff --git a/server/src/websocket/server.js b/server/src/websocket/server.js new file mode 100644 index 0000000..fcbbc14 --- /dev/null +++ b/server/src/websocket/server.js @@ -0,0 +1,183 @@ +import { WebSocketServer } from 'ws'; +import { validateApiKey, insertReadings } from '../db/queries.js'; + +/** + * Create and configure the WebSocket server + * @param {object} options - Server options + * @param {number} options.port - Port to listen on + * @param {Database} options.db - SQLite database instance + * @returns {WebSocketServer} - The WebSocket server instance + */ +export function createWebSocketServer({ port, db }) { + const wss = new WebSocketServer({ port }); + + // Track authenticated clients + const clients = new Map(); + + wss.on('connection', (ws, req) => { + const clientId = `${req.socket.remoteAddress}:${req.socket.remotePort}`; + console.log(`[WS] Client connected: ${clientId}`); + + // Client state + const clientState = { + authenticated: false, + devicePrefix: null, + name: null, + lastPong: Date.now() + }; + clients.set(ws, clientState); + + // Set up ping/pong for keepalive + ws.isAlive = true; + ws.on('pong', () => { + ws.isAlive = true; + clientState.lastPong = Date.now(); + }); + + ws.on('message', (data) => { + try { + const message = JSON.parse(data.toString()); + handleMessage(ws, message, clientState, db); + } catch (err) { + console.error(`[WS] Error parsing message from ${clientId}:`, err.message); + sendError(ws, 'Invalid JSON message'); + } + }); + + ws.on('close', () => { + console.log(`[WS] Client disconnected: ${clientId} (${clientState.name || 'unauthenticated'})`); + clients.delete(ws); + }); + + ws.on('error', (err) => { + console.error(`[WS] Error for ${clientId}:`, err.message); + }); + }); + + // Ping interval to detect dead connections + const pingInterval = setInterval(() => { + wss.clients.forEach((ws) => { + if (ws.isAlive === false) { + console.log('[WS] Terminating unresponsive client'); + return ws.terminate(); + } + ws.isAlive = false; + ws.ping(); + }); + }, 30000); + + wss.on('close', () => { + clearInterval(pingInterval); + }); + + console.log(`[WS] WebSocket server listening on port ${port}`); + return wss; +} + +/** + * Handle incoming WebSocket messages + * @param {WebSocket} ws - The WebSocket connection + * @param {object} message - Parsed message object + * @param {object} clientState - Client state object + * @param {Database} db - SQLite database instance + */ +function handleMessage(ws, message, clientState, db) { + const { type } = message; + + switch (type) { + case 'auth': + handleAuth(ws, message, clientState, db); + break; + + case 'data': + handleData(ws, message, clientState, db); + break; + + case 'pong': + // Client responded to our ping + clientState.lastPong = Date.now(); + break; + + default: + sendError(ws, `Unknown message type: ${type}`); + } +} + +/** + * Handle authentication request + */ +function handleAuth(ws, message, clientState, db) { + const { apiKey } = message; + + if (!apiKey) { + return sendError(ws, 'Missing apiKey in auth message'); + } + + const keyInfo = validateApiKey(db, apiKey); + + if (!keyInfo) { + send(ws, { type: 'auth', success: false, error: 'Invalid API key' }); + return; + } + + clientState.authenticated = true; + clientState.devicePrefix = keyInfo.device_prefix; + clientState.name = keyInfo.name; + + console.log(`[WS] Client authenticated: ${keyInfo.name} (prefix: ${keyInfo.device_prefix})`); + + send(ws, { + type: 'auth', + success: true, + devicePrefix: keyInfo.device_prefix, + name: keyInfo.name + }); +} + +/** + * Handle data ingestion + */ +function handleData(ws, message, clientState, db) { + if (!clientState.authenticated) { + return sendError(ws, 'Not authenticated. Send auth message first.'); + } + + const { readings } = message; + + if (!Array.isArray(readings) || readings.length === 0) { + return sendError(ws, 'Invalid readings: expected non-empty array'); + } + + // Validate readings format + for (const reading of readings) { + if (!reading.device || !reading.channel || reading.value === undefined) { + return sendError(ws, 'Invalid reading format: each reading must have device, channel, and value'); + } + } + + try { + const count = insertReadings(db, clientState.devicePrefix, readings); + send(ws, { type: 'ack', count }); + } catch (err) { + console.error('[WS] Error inserting readings:', err.message); + sendError(ws, 'Failed to insert readings'); + } +} + +/** + * Send a message to a WebSocket client + */ +function send(ws, message) { + if (ws.readyState === 1) { // OPEN + ws.send(JSON.stringify(message)); + } +} + +/** + * Send an error message + */ +function sendError(ws, error) { + send(ws, { type: 'error', error }); +} + +export default { createWebSocketServer };