diff --git a/agents/tapo/src/main.rs b/agents/tapo/src/main.rs index 9459950..82240e9 100644 --- a/agents/tapo/src/main.rs +++ b/agents/tapo/src/main.rs @@ -178,6 +178,7 @@ async fn discover_and_create_config( server_url: server, api_key: key, poll_interval_secs: 60, + command_url: None, devices, }; @@ -408,6 +409,36 @@ async fn collect_device_data(device: &DeviceConfig) -> Vec { readings } +// Switch a device on or off +async fn switch_device(device: &DeviceConfig, turn_on: bool) -> Result<(), Box> { + let client = ApiClient::new(&device.tapo_email, &device.tapo_password); + + match device.device_type.as_str() { + "P110" | "P115" => { + let plug = client.p110(&device.ip).await?; + if turn_on { + plug.on().await?; + } else { + plug.off().await?; + } + } + "P100" | "P105" => { + let plug = client.p100(&device.ip).await?; + if turn_on { + plug.on().await?; + } else { + plug.off().await?; + } + } + _ => { + return Err(format!("Unknown device type: {}", device.device_type).into()); + } + } + + info!("[Switch] Device {} turned {}", device.name, if turn_on { "ON" } else { "OFF" }); + Ok(()) +} + async fn run_agent(config: Config) -> Result<(), Box> { use tokio::sync::mpsc; @@ -449,6 +480,9 @@ async fn run_agent(config: Config) -> Result<(), Box> { } }); + // Clone devices for command handling in main loop + let devices_for_commands = config.devices.clone(); + // Connection and sending loop let mut reconnect_delay = Duration::from_secs(1); let max_reconnect_delay = Duration::from_secs(60); @@ -512,6 +546,35 @@ async fn run_agent(config: Config) -> Result<(), Box> { // Handle incoming WebSocket messages msg = read.next() => { match msg { + Some(Ok(Message::Text(text))) => { + // Handle incoming commands from server + if let Ok(cmd) = serde_json::from_str::(&text) { + if cmd.get("type").and_then(|v| v.as_str()) == Some("command") { + let device_name = cmd.get("device").and_then(|v| v.as_str()).unwrap_or(""); + let action = cmd.get("action").and_then(|v| v.as_str()).unwrap_or(""); + let value = cmd.get("value").and_then(|v| v.as_i64()).unwrap_or(0); + + info!("[Command] Received: device={}, action={}, value={}", device_name, action, value); + + // Find matching device in our config + if let Some(device) = devices_for_commands.iter().find(|d| d.name == device_name) { + if action == "set_state" { + let turn_on = value > 0; + info!("[Command] Switching {} {}", device_name, if turn_on { "ON" } else { "OFF" }); + + let device_clone = device.clone(); + tokio::spawn(async move { + if let Err(e) = switch_device(&device_clone, turn_on).await { + error!("[Command] Failed to switch {}: {}", device_clone.name, e); + } + }); + } + } else { + warn!("[Command] Unknown device: {}", device_name); + } + } + } + } Some(Ok(Message::Ping(data))) => { let _ = write.send(Message::Pong(data)).await; } diff --git a/server/.env.example b/server/.env.example deleted file mode 100644 index f687807..0000000 --- a/server/.env.example +++ /dev/null @@ -1,7 +0,0 @@ -# Server Environment Configuration -PORT=8080 -DB_PATH=./data/sensors.db - -# Job intervals (optional, defaults shown) -# AGGREGATION_INTERVAL_MS=600000 -# CLEANUP_INTERVAL_MS=3600000 diff --git a/server/package-lock.json b/server/package-lock.json deleted file mode 100644 index f161594..0000000 --- a/server/package-lock.json +++ /dev/null @@ -1,501 +0,0 @@ -{ - "name": "tischlerctrl-server", - "version": "1.0.0", - "lockfileVersion": 3, - "requires": true, - "packages": { - "": { - "name": "tischlerctrl-server", - "version": "1.0.0", - "dependencies": { - "better-sqlite3": "^11.6.0", - "dotenv": "^16.4.7", - "ws": "^8.18.0" - }, - "engines": { - "node": ">=18.0.0" - } - }, - "node_modules/base64-js": { - "version": "1.5.1", - "resolved": "https://registry.npmjs.org/base64-js/-/base64-js-1.5.1.tgz", - "integrity": "sha512-AKpaYlHn8t4SVbOHCy+b5+KKgvR4vrsD8vbvrbiQJps7fKDTkjkDry6ji0rUJjC0kzbNePLwzxq8iypo41qeWA==", - "funding": [ - { - "type": "github", - "url": "https://github.com/sponsors/feross" - }, - { - "type": "patreon", - "url": "https://www.patreon.com/feross" - }, - { - "type": "consulting", - "url": "https://feross.org/support" - } - ], - "license": "MIT" - }, - "node_modules/better-sqlite3": { - "version": "11.10.0", - "resolved": "https://registry.npmjs.org/better-sqlite3/-/better-sqlite3-11.10.0.tgz", - "integrity": "sha512-EwhOpyXiOEL/lKzHz9AW1msWFNzGc/z+LzeB3/jnFJpxu+th2yqvzsSWas1v9jgs9+xiXJcD5A8CJxAG2TaghQ==", - "hasInstallScript": true, - "license": "MIT", - "dependencies": { - "bindings": "^1.5.0", - "prebuild-install": "^7.1.1" - } - }, - "node_modules/bindings": { - "version": "1.5.0", - "resolved": "https://registry.npmjs.org/bindings/-/bindings-1.5.0.tgz", - "integrity": "sha512-p2q/t/mhvuOj/UeLlV6566GD/guowlr0hHxClI0W9m7MWYkL1F0hLo+0Aexs9HSPCtR1SXQ0TD3MMKrXZajbiQ==", - "license": "MIT", - "dependencies": { - "file-uri-to-path": "1.0.0" - } - }, - "node_modules/bl": { - "version": "4.1.0", - "resolved": "https://registry.npmjs.org/bl/-/bl-4.1.0.tgz", - "integrity": "sha512-1W07cM9gS6DcLperZfFSj+bWLtaPGSOHWhPiGzXmvVJbRLdG82sH/Kn8EtW1VqWVA54AKf2h5k5BbnIbwF3h6w==", - "license": "MIT", - "dependencies": { - "buffer": "^5.5.0", - "inherits": "^2.0.4", - "readable-stream": "^3.4.0" - } - }, - "node_modules/buffer": { - "version": "5.7.1", - "resolved": "https://registry.npmjs.org/buffer/-/buffer-5.7.1.tgz", - "integrity": "sha512-EHcyIPBQ4BSGlvjB16k5KgAJ27CIsHY/2JBmCRReo48y9rQ3MaUzWX3KVlBa4U7MyX02HdVj0K7C3WaB3ju7FQ==", - "funding": [ - { - "type": "github", - "url": "https://github.com/sponsors/feross" - }, - { - "type": "patreon", - "url": "https://www.patreon.com/feross" - }, - { - "type": "consulting", - "url": "https://feross.org/support" - } - ], - "license": "MIT", - "dependencies": { - "base64-js": "^1.3.1", - "ieee754": "^1.1.13" - } - }, - "node_modules/chownr": { - "version": "1.1.4", - "resolved": "https://registry.npmjs.org/chownr/-/chownr-1.1.4.tgz", - "integrity": "sha512-jJ0bqzaylmJtVnNgzTeSOs8DPavpbYgEr/b0YL8/2GO3xJEhInFmhKMUnEJQjZumK7KXGFhUy89PrsJWlakBVg==", - "license": "ISC" - }, - "node_modules/decompress-response": { - "version": "6.0.0", - "resolved": "https://registry.npmjs.org/decompress-response/-/decompress-response-6.0.0.tgz", - "integrity": "sha512-aW35yZM6Bb/4oJlZncMH2LCoZtJXTRxES17vE3hoRiowU2kWHaJKFkSBDnDR+cm9J+9QhXmREyIfv0pji9ejCQ==", - "license": "MIT", - "dependencies": { - "mimic-response": "^3.1.0" - }, - "engines": { - "node": ">=10" - }, - "funding": { - "url": "https://github.com/sponsors/sindresorhus" - } - }, - "node_modules/deep-extend": { - "version": "0.6.0", - "resolved": "https://registry.npmjs.org/deep-extend/-/deep-extend-0.6.0.tgz", - "integrity": "sha512-LOHxIOaPYdHlJRtCQfDIVZtfw/ufM8+rVj649RIHzcm/vGwQRXFt6OPqIFWsm2XEMrNIEtWR64sY1LEKD2vAOA==", - "license": "MIT", - "engines": { - "node": ">=4.0.0" - } - }, - "node_modules/detect-libc": { - "version": "2.1.2", - "resolved": "https://registry.npmjs.org/detect-libc/-/detect-libc-2.1.2.tgz", - "integrity": "sha512-Btj2BOOO83o3WyH59e8MgXsxEQVcarkUOpEYrubB0urwnN10yQ364rsiByU11nZlqWYZm05i/of7io4mzihBtQ==", - "license": "Apache-2.0", - "engines": { - "node": ">=8" - } - }, - "node_modules/dotenv": { - "version": "16.6.1", - "resolved": "https://registry.npmjs.org/dotenv/-/dotenv-16.6.1.tgz", - "integrity": "sha512-uBq4egWHTcTt33a72vpSG0z3HnPuIl6NqYcTrKEg2azoEyl2hpW0zqlxysq2pK9HlDIHyHyakeYaYnSAwd8bow==", - "license": "BSD-2-Clause", - "engines": { - "node": ">=12" - }, - "funding": { - "url": "https://dotenvx.com" - } - }, - "node_modules/end-of-stream": { - "version": "1.4.5", - "resolved": "https://registry.npmjs.org/end-of-stream/-/end-of-stream-1.4.5.tgz", - "integrity": "sha512-ooEGc6HP26xXq/N+GCGOT0JKCLDGrq2bQUZrQ7gyrJiZANJ/8YDTxTpQBXGMn+WbIQXNVpyWymm7KYVICQnyOg==", - "license": "MIT", - "dependencies": { - "once": "^1.4.0" - } - }, - "node_modules/expand-template": { - "version": "2.0.3", - "resolved": "https://registry.npmjs.org/expand-template/-/expand-template-2.0.3.tgz", - "integrity": "sha512-XYfuKMvj4O35f/pOXLObndIRvyQ+/+6AhODh+OKWj9S9498pHHn/IMszH+gt0fBCRWMNfk1ZSp5x3AifmnI2vg==", - "license": "(MIT OR WTFPL)", - "engines": { - "node": ">=6" - } - }, - "node_modules/file-uri-to-path": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/file-uri-to-path/-/file-uri-to-path-1.0.0.tgz", - "integrity": "sha512-0Zt+s3L7Vf1biwWZ29aARiVYLx7iMGnEUl9x33fbB/j3jR81u/O2LbqK+Bm1CDSNDKVtJ/YjwY7TUd5SkeLQLw==", - "license": "MIT" - }, - "node_modules/fs-constants": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/fs-constants/-/fs-constants-1.0.0.tgz", - "integrity": "sha512-y6OAwoSIf7FyjMIv94u+b5rdheZEjzR63GTyZJm5qh4Bi+2YgwLCcI/fPFZkL5PSixOt6ZNKm+w+Hfp/Bciwow==", - "license": "MIT" - }, - "node_modules/github-from-package": { - "version": "0.0.0", - "resolved": "https://registry.npmjs.org/github-from-package/-/github-from-package-0.0.0.tgz", - "integrity": "sha512-SyHy3T1v2NUXn29OsWdxmK6RwHD+vkj3v8en8AOBZ1wBQ/hCAQ5bAQTD02kW4W9tUp/3Qh6J8r9EvntiyCmOOw==", - "license": "MIT" - }, - "node_modules/ieee754": { - "version": "1.2.1", - "resolved": "https://registry.npmjs.org/ieee754/-/ieee754-1.2.1.tgz", - "integrity": "sha512-dcyqhDvX1C46lXZcVqCpK+FtMRQVdIMN6/Df5js2zouUsqG7I6sFxitIC+7KYK29KdXOLHdu9zL4sFnoVQnqaA==", - "funding": [ - { - "type": "github", - "url": "https://github.com/sponsors/feross" - }, - { - "type": "patreon", - "url": "https://www.patreon.com/feross" - }, - { - "type": "consulting", - "url": "https://feross.org/support" - } - ], - "license": "BSD-3-Clause" - }, - "node_modules/inherits": { - "version": "2.0.4", - "resolved": "https://registry.npmjs.org/inherits/-/inherits-2.0.4.tgz", - "integrity": "sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ==", - "license": "ISC" - }, - "node_modules/ini": { - "version": "1.3.8", - "resolved": "https://registry.npmjs.org/ini/-/ini-1.3.8.tgz", - "integrity": "sha512-JV/yugV2uzW5iMRSiZAyDtQd+nxtUnjeLt0acNdw98kKLrvuRVyB80tsREOE7yvGVgalhZ6RNXCmEHkUKBKxew==", - "license": "ISC" - }, - "node_modules/mimic-response": { - "version": "3.1.0", - "resolved": "https://registry.npmjs.org/mimic-response/-/mimic-response-3.1.0.tgz", - "integrity": "sha512-z0yWI+4FDrrweS8Zmt4Ej5HdJmky15+L2e6Wgn3+iK5fWzb6T3fhNFq2+MeTRb064c6Wr4N/wv0DzQTjNzHNGQ==", - "license": "MIT", - "engines": { - "node": ">=10" - }, - "funding": { - "url": "https://github.com/sponsors/sindresorhus" - } - }, - "node_modules/minimist": { - "version": "1.2.8", - "resolved": "https://registry.npmjs.org/minimist/-/minimist-1.2.8.tgz", - "integrity": "sha512-2yyAR8qBkN3YuheJanUpWC5U3bb5osDywNB8RzDVlDwDHbocAJveqqj1u8+SVD7jkWT4yvsHCpWqqWqAxb0zCA==", - "license": "MIT", - "funding": { - "url": "https://github.com/sponsors/ljharb" - } - }, - "node_modules/mkdirp-classic": { - "version": "0.5.3", - "resolved": "https://registry.npmjs.org/mkdirp-classic/-/mkdirp-classic-0.5.3.tgz", - "integrity": "sha512-gKLcREMhtuZRwRAfqP3RFW+TK4JqApVBtOIftVgjuABpAtpxhPGaDcfvbhNvD0B8iD1oUr/txX35NjcaY6Ns/A==", - "license": "MIT" - }, - "node_modules/napi-build-utils": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/napi-build-utils/-/napi-build-utils-2.0.0.tgz", - "integrity": "sha512-GEbrYkbfF7MoNaoh2iGG84Mnf/WZfB0GdGEsM8wz7Expx/LlWf5U8t9nvJKXSp3qr5IsEbK04cBGhol/KwOsWA==", - "license": "MIT" - }, - "node_modules/node-abi": { - "version": "3.85.0", - "resolved": "https://registry.npmjs.org/node-abi/-/node-abi-3.85.0.tgz", - "integrity": "sha512-zsFhmbkAzwhTft6nd3VxcG0cvJsT70rL+BIGHWVq5fi6MwGrHwzqKaxXE+Hl2GmnGItnDKPPkO5/LQqjVkIdFg==", - "license": "MIT", - "dependencies": { - "semver": "^7.3.5" - }, - "engines": { - "node": ">=10" - } - }, - "node_modules/once": { - "version": "1.4.0", - "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz", - "integrity": "sha512-lNaJgI+2Q5URQBkccEKHTQOPaXdUxnZZElQTZY0MFUAuaEqe1E+Nyvgdz/aIyNi6Z9MzO5dv1H8n58/GELp3+w==", - "license": "ISC", - "dependencies": { - "wrappy": "1" - } - }, - "node_modules/prebuild-install": { - "version": "7.1.3", - "resolved": "https://registry.npmjs.org/prebuild-install/-/prebuild-install-7.1.3.tgz", - "integrity": "sha512-8Mf2cbV7x1cXPUILADGI3wuhfqWvtiLA1iclTDbFRZkgRQS0NqsPZphna9V+HyTEadheuPmjaJMsbzKQFOzLug==", - "license": "MIT", - "dependencies": { - "detect-libc": "^2.0.0", - "expand-template": "^2.0.3", - "github-from-package": "0.0.0", - "minimist": "^1.2.3", - "mkdirp-classic": "^0.5.3", - "napi-build-utils": "^2.0.0", - "node-abi": "^3.3.0", - "pump": "^3.0.0", - "rc": "^1.2.7", - "simple-get": "^4.0.0", - "tar-fs": "^2.0.0", - "tunnel-agent": "^0.6.0" - }, - "bin": { - "prebuild-install": "bin.js" - }, - "engines": { - "node": ">=10" - } - }, - "node_modules/pump": { - "version": "3.0.3", - "resolved": "https://registry.npmjs.org/pump/-/pump-3.0.3.tgz", - "integrity": "sha512-todwxLMY7/heScKmntwQG8CXVkWUOdYxIvY2s0VWAAMh/nd8SoYiRaKjlr7+iCs984f2P8zvrfWcDDYVb73NfA==", - "license": "MIT", - "dependencies": { - "end-of-stream": "^1.1.0", - "once": "^1.3.1" - } - }, - "node_modules/rc": { - "version": "1.2.8", - "resolved": "https://registry.npmjs.org/rc/-/rc-1.2.8.tgz", - "integrity": "sha512-y3bGgqKj3QBdxLbLkomlohkvsA8gdAiUQlSBJnBhfn+BPxg4bc62d8TcBW15wavDfgexCgccckhcZvywyQYPOw==", - "license": "(BSD-2-Clause OR MIT OR Apache-2.0)", - "dependencies": { - "deep-extend": "^0.6.0", - "ini": "~1.3.0", - "minimist": "^1.2.0", - "strip-json-comments": "~2.0.1" - }, - "bin": { - "rc": "cli.js" - } - }, - "node_modules/readable-stream": { - "version": "3.6.2", - "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-3.6.2.tgz", - "integrity": "sha512-9u/sniCrY3D5WdsERHzHE4G2YCXqoG5FTHUiCC4SIbr6XcLZBY05ya9EKjYek9O5xOAwjGq+1JdGBAS7Q9ScoA==", - "license": "MIT", - "dependencies": { - "inherits": "^2.0.3", - "string_decoder": "^1.1.1", - "util-deprecate": "^1.0.1" - }, - "engines": { - "node": ">= 6" - } - }, - "node_modules/safe-buffer": { - "version": "5.2.1", - "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.2.1.tgz", - "integrity": "sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ==", - "funding": [ - { - "type": "github", - "url": "https://github.com/sponsors/feross" - }, - { - "type": "patreon", - "url": "https://www.patreon.com/feross" - }, - { - "type": "consulting", - "url": "https://feross.org/support" - } - ], - "license": "MIT" - }, - "node_modules/semver": { - "version": "7.7.3", - "resolved": "https://registry.npmjs.org/semver/-/semver-7.7.3.tgz", - "integrity": "sha512-SdsKMrI9TdgjdweUSR9MweHA4EJ8YxHn8DFaDisvhVlUOe4BF1tLD7GAj0lIqWVl+dPb/rExr0Btby5loQm20Q==", - "license": "ISC", - "bin": { - "semver": "bin/semver.js" - }, - "engines": { - "node": ">=10" - } - }, - "node_modules/simple-concat": { - "version": "1.0.1", - "resolved": "https://registry.npmjs.org/simple-concat/-/simple-concat-1.0.1.tgz", - "integrity": "sha512-cSFtAPtRhljv69IK0hTVZQ+OfE9nePi/rtJmw5UjHeVyVroEqJXP1sFztKUy1qU+xvz3u/sfYJLa947b7nAN2Q==", - "funding": [ - { - "type": "github", - "url": "https://github.com/sponsors/feross" - }, - { - "type": "patreon", - "url": "https://www.patreon.com/feross" - }, - { - "type": "consulting", - "url": "https://feross.org/support" - } - ], - "license": "MIT" - }, - "node_modules/simple-get": { - "version": "4.0.1", - "resolved": "https://registry.npmjs.org/simple-get/-/simple-get-4.0.1.tgz", - "integrity": "sha512-brv7p5WgH0jmQJr1ZDDfKDOSeWWg+OVypG99A/5vYGPqJ6pxiaHLy8nxtFjBA7oMa01ebA9gfh1uMCFqOuXxvA==", - "funding": [ - { - "type": "github", - "url": "https://github.com/sponsors/feross" - }, - { - "type": "patreon", - "url": "https://www.patreon.com/feross" - }, - { - "type": "consulting", - "url": "https://feross.org/support" - } - ], - "license": "MIT", - "dependencies": { - "decompress-response": "^6.0.0", - "once": "^1.3.1", - "simple-concat": "^1.0.0" - } - }, - "node_modules/string_decoder": { - "version": "1.3.0", - "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.3.0.tgz", - "integrity": "sha512-hkRX8U1WjJFd8LsDJ2yQ/wWWxaopEsABU1XfkM8A+j0+85JAGppt16cr1Whg6KIbb4okU6Mql6BOj+uup/wKeA==", - "license": "MIT", - "dependencies": { - "safe-buffer": "~5.2.0" - } - }, - "node_modules/strip-json-comments": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/strip-json-comments/-/strip-json-comments-2.0.1.tgz", - "integrity": "sha512-4gB8na07fecVVkOI6Rs4e7T6NOTki5EmL7TUduTs6bu3EdnSycntVJ4re8kgZA+wx9IueI2Y11bfbgwtzuE0KQ==", - "license": "MIT", - "engines": { - "node": ">=0.10.0" - } - }, - "node_modules/tar-fs": { - "version": "2.1.4", - "resolved": "https://registry.npmjs.org/tar-fs/-/tar-fs-2.1.4.tgz", - "integrity": "sha512-mDAjwmZdh7LTT6pNleZ05Yt65HC3E+NiQzl672vQG38jIrehtJk/J3mNwIg+vShQPcLF/LV7CMnDW6vjj6sfYQ==", - "license": "MIT", - "dependencies": { - "chownr": "^1.1.1", - "mkdirp-classic": "^0.5.2", - "pump": "^3.0.0", - "tar-stream": "^2.1.4" - } - }, - "node_modules/tar-stream": { - "version": "2.2.0", - "resolved": "https://registry.npmjs.org/tar-stream/-/tar-stream-2.2.0.tgz", - "integrity": "sha512-ujeqbceABgwMZxEJnk2HDY2DlnUZ+9oEcb1KzTVfYHio0UE6dG71n60d8D2I4qNvleWrrXpmjpt7vZeF1LnMZQ==", - "license": "MIT", - "dependencies": { - "bl": "^4.0.3", - "end-of-stream": "^1.4.1", - "fs-constants": "^1.0.0", - "inherits": "^2.0.3", - "readable-stream": "^3.1.1" - }, - "engines": { - "node": ">=6" - } - }, - "node_modules/tunnel-agent": { - "version": "0.6.0", - "resolved": "https://registry.npmjs.org/tunnel-agent/-/tunnel-agent-0.6.0.tgz", - "integrity": "sha512-McnNiV1l8RYeY8tBgEpuodCC1mLUdbSN+CYBL7kJsJNInOP8UjDDEwdk6Mw60vdLLrr5NHKZhMAOSrR2NZuQ+w==", - "license": "Apache-2.0", - "dependencies": { - "safe-buffer": "^5.0.1" - }, - "engines": { - "node": "*" - } - }, - "node_modules/util-deprecate": { - "version": "1.0.2", - "resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz", - "integrity": "sha512-EPD5q1uXyFxJpCrLnCc1nHnq3gOa6DZBocAIiI2TaSCA7VCJ1UJDMagCzIkXNsUYfD1daK//LTEQ8xiIbrHtcw==", - "license": "MIT" - }, - "node_modules/wrappy": { - "version": "1.0.2", - "resolved": "https://registry.npmjs.org/wrappy/-/wrappy-1.0.2.tgz", - "integrity": "sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ==", - "license": "ISC" - }, - "node_modules/ws": { - "version": "8.18.3", - "resolved": "https://registry.npmjs.org/ws/-/ws-8.18.3.tgz", - "integrity": "sha512-PEIGCY5tSlUt50cqyMXfCzX+oOPqN0vuGqWzbcJ2xvnkzkq46oOpz7dQaTDBdfICb4N14+GARUDw2XV2N4tvzg==", - "license": "MIT", - "engines": { - "node": ">=10.0.0" - }, - "peerDependencies": { - "bufferutil": "^4.0.1", - "utf-8-validate": ">=5.0.2" - }, - "peerDependenciesMeta": { - "bufferutil": { - "optional": true - }, - "utf-8-validate": { - "optional": true - } - } - } - } -} diff --git a/server/package.json b/server/package.json deleted file mode 100644 index 9262565..0000000 --- a/server/package.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "name": "tischlerctrl-server", - "version": "1.0.0", - "description": "Sensor data collection server with WebSocket API", - "type": "module", - "main": "src/index.js", - "scripts": { - "start": "node src/index.js", - "dev": "node --watch src/index.js", - "generate-key": "node src/cli/generate-key.js" - }, - "dependencies": { - "better-sqlite3": "^11.6.0", - "dotenv": "^16.4.7", - "ws": "^8.18.0" - }, - "engines": { - "node": ">=18.0.0" - } -} diff --git a/server/src/cli/generate-key.js b/server/src/cli/generate-key.js deleted file mode 100644 index bc4b7c1..0000000 --- a/server/src/cli/generate-key.js +++ /dev/null @@ -1,62 +0,0 @@ -#!/usr/bin/env node - -/** - * CLI tool to generate API keys for agents - * Usage: node generate-key.js - * Example: node generate-key.js "ac-infinity-agent" "ac:" - */ - -import { fileURLToPath } from 'url'; -import { dirname, join } from 'path'; -import { initDatabase } from '../db/schema.js'; -import { generateApiKey, listApiKeys } from '../db/queries.js'; - -const __filename = fileURLToPath(import.meta.url); -const __dirname = dirname(__filename); - -const dbPath = process.env.DB_PATH || join(__dirname, '..', '..', 'data', 'sensors.db'); - -const args = process.argv.slice(2); - -if (args.length === 0 || args[0] === '--list') { - // List existing keys - const db = initDatabase(dbPath); - const keys = listApiKeys(db); - - if (keys.length === 0) { - console.log('No API keys found.'); - } else { - console.log('\nExisting API keys:\n'); - console.log('ID | Name | Prefix | Preview | Last Used'); - console.log('-'.repeat(75)); - for (const key of keys) { - const lastUsed = key.last_used_at || 'never'; - console.log(`${key.id.toString().padEnd(3)} | ${key.name.padEnd(20)} | ${key.device_prefix.padEnd(7)} | ${key.key_preview.padEnd(12)} | ${lastUsed}`); - } - } - - console.log('\nUsage: node generate-key.js '); - console.log('Example: node generate-key.js "ac-infinity-agent" "ac:"'); - - db.close(); - process.exit(0); -} - -if (args.length < 2) { - console.error('Error: Both name and device_prefix are required'); - console.error('Usage: node generate-key.js '); - process.exit(1); -} - -const [name, devicePrefix] = args; - -const db = initDatabase(dbPath); -const key = generateApiKey(db, name, devicePrefix); - -console.log('\n✓ API key generated successfully!\n'); -console.log(`Name: ${name}`); -console.log(`Device Prefix: ${devicePrefix}`); -console.log(`API Key: ${key}`); -console.log('\n⚠ Save this key securely - it cannot be recovered!\n'); - -db.close(); diff --git a/server/src/config.js b/server/src/config.js deleted file mode 100644 index b29f708..0000000 --- a/server/src/config.js +++ /dev/null @@ -1,18 +0,0 @@ -import { config } from 'dotenv'; -import { fileURLToPath } from 'url'; -import { dirname, join } from 'path'; - -const __filename = fileURLToPath(import.meta.url); -const __dirname = dirname(__filename); - -// Load environment variables from .env file -config({ path: join(__dirname, '..', '.env') }); - -export default { - port: parseInt(process.env.PORT || '8080', 10), - dbPath: process.env.DB_PATH || join(__dirname, '..', 'data', 'sensors.db'), - - // Job intervals - aggregationIntervalMs: parseInt(process.env.AGGREGATION_INTERVAL_MS || String(10 * 60 * 1000), 10), - cleanupIntervalMs: parseInt(process.env.CLEANUP_INTERVAL_MS || String(60 * 60 * 1000), 10), -}; diff --git a/server/src/db/queries.js b/server/src/db/queries.js deleted file mode 100644 index 5a0ca44..0000000 --- a/server/src/db/queries.js +++ /dev/null @@ -1,178 +0,0 @@ -import crypto from 'crypto'; - -/** - * Database query functions for sensor data operations - */ - -/** - * Validate an API key and return the associated metadata - * @param {Database} db - SQLite database instance - * @param {string} apiKey - The API key to validate - * @returns {object|null} - API key metadata or null if invalid - */ -export function validateApiKey(db, apiKey) { - const stmt = db.prepare(` - SELECT id, name, device_prefix - FROM api_keys - WHERE key = ? - `); - const result = stmt.get(apiKey); - - if (result) { - // Update last_used_at timestamp - db.prepare(` - UPDATE api_keys SET last_used_at = datetime('now') WHERE id = ? - `).run(result.id); - } - - return result || null; -} - -/** - * Generate a new API key - * @param {Database} db - SQLite database instance - * @param {string} name - Name/description for the API key - * @param {string} devicePrefix - Prefix to prepend to device names (e.g., "ac:", "tapo:") - * @returns {string} - The generated API key - */ -export function generateApiKey(db, name, devicePrefix) { - const key = crypto.randomBytes(32).toString('hex'); - - db.prepare(` - INSERT INTO api_keys (key, name, device_prefix) - VALUES (?, ?, ?) - `).run(key, name, devicePrefix); - - return key; -} - -/** - * Insert sensor readings with RLE (Run-Length Encoding) logic - * @param {Database} db - SQLite database instance - * @param {string} devicePrefix - Prefix to prepend to device names - * @param {Array} readings - Array of readings - * @param {Date} timestamp - Timestamp for all readings - */ -export function insertReadingsSmart(db, devicePrefix, readings, timestamp = new Date()) { - const isoTimestamp = timestamp.toISOString(); - - const stmtLast = db.prepare(` - SELECT id, value, data, data_type - FROM sensor_events - WHERE device = ? AND channel = ? - ORDER BY timestamp DESC - LIMIT 1 - `); - - const stmtUpdate = db.prepare(` - UPDATE sensor_events SET until = ? WHERE id = ? - `); - - const stmtInsert = db.prepare(` - INSERT INTO sensor_events (timestamp, until, device, channel, value, data, data_type) - VALUES (?, NULL, ?, ?, ?, ?, ?) - `); - - const transaction = db.transaction((items) => { - let inserted = 0; - let updated = 0; - - for (const reading of items) { - const fullDevice = `${devicePrefix}${reading.device}`; - const channel = reading.channel; - - // Determine type and values - let dataType = 'number'; - let value = null; - let data = null; - - if (reading.value !== undefined && reading.value !== null) { - dataType = 'number'; - value = reading.value; - } else if (reading.data !== undefined) { - dataType = 'json'; - data = typeof reading.data === 'string' ? reading.data : JSON.stringify(reading.data); - } else { - continue; // Skip invalid - } - - // Check last reading for RLE - const last = stmtLast.get(fullDevice, channel); - let isDuplicate = false; - - if (last && last.data_type === dataType) { - if (dataType === 'number') { - // Compare defined numbers with small epsilon? Or exact match? - // For sensors, exact match is typical for RLE if "identical". - if (Math.abs(last.value - value) < Number.EPSILON) { - isDuplicate = true; - } - } else { - // Compare JSON strings - if (last.data === data) { - isDuplicate = true; - } - } - } - - if (isDuplicate) { - stmtUpdate.run(isoTimestamp, last.id); - updated++; - } else { - stmtInsert.run(isoTimestamp, fullDevice, channel, value, data, dataType); - inserted++; - } - } - return { inserted, updated }; - }); - - return transaction(readings); -} - -// Temporary stubs for aggregators until they are redesigned for the new schema -export function aggregate10Minutes(db) { return 0; } -export function aggregate1Hour(db) { return 0; } - - -/** - * Clean up old data according to retention policy - * @param {Database} db - SQLite database instance - * @returns {object} - Number of deleted records per table - */ -export function cleanupOldData(db) { - const now = new Date(); - - // Delete events older than 30 days - const monthAgo = new Date(now.getTime() - 30 * 24 * 60 * 60 * 1000); - const eventsDeleted = db.prepare(` - DELETE FROM sensor_events WHERE timestamp < ? - `).run(monthAgo.toISOString()); - - return { - eventsDeleted: eventsDeleted.changes - }; -} - -/** - * List all API keys (without showing the actual key values) - * @param {Database} db - SQLite database instance - * @returns {Array} - List of API key metadata - */ -export function listApiKeys(db) { - return db.prepare(` - SELECT id, name, device_prefix, created_at, last_used_at, - substr(key, 1, 8) || '...' as key_preview - FROM api_keys - ORDER BY created_at DESC - `).all(); -} - -export default { - validateApiKey, - generateApiKey, - insertReadingsSmart, - aggregate10Minutes, - aggregate1Hour, - cleanupOldData, - listApiKeys -}; diff --git a/server/src/db/schema.js b/server/src/db/schema.js deleted file mode 100644 index 40fa543..0000000 --- a/server/src/db/schema.js +++ /dev/null @@ -1,115 +0,0 @@ -import Database from 'better-sqlite3'; -import { fileURLToPath } from 'url'; -import { dirname, join } from 'path'; -import { existsSync, mkdirSync } from 'fs'; - -const __filename = fileURLToPath(import.meta.url); -const __dirname = dirname(__filename); - -/** - * Initialize the SQLite database with all required tables - * @param {string} dbPath - Path to the SQLite database file - * @returns {Database} - The initialized database instance - */ -export function initDatabase(dbPath) { - // Ensure data directory exists - const dataDir = dirname(dbPath); - if (!existsSync(dataDir)) { - mkdirSync(dataDir, { recursive: true }); - } - - const db = new Database(dbPath); - - // Enable WAL mode for better concurrent performance - db.pragma('journal_mode = WAL'); - - // Create tables - // API keys for agent authentication - db.exec(` - CREATE TABLE IF NOT EXISTS api_keys ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - key TEXT UNIQUE NOT NULL, - name TEXT NOT NULL, - device_prefix TEXT NOT NULL, - created_at DATETIME DEFAULT CURRENT_TIMESTAMP, - last_used_at DATETIME - ); - `); - - // --- MIGRATION: Drop old tables if they exist --- - // User requested deleting old sensor data but keeping keys. - db.exec(` - DROP TABLE IF EXISTS sensor_data; - DROP TABLE IF EXISTS sensor_data_10m; - DROP TABLE IF EXISTS sensor_data_1h; - `); - - // --- NEW SCHEMA: Sensor Events with RLE support --- - db.exec(` - CREATE TABLE IF NOT EXISTS sensor_events ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - timestamp DATETIME NOT NULL, - until DATETIME, -- NULL if point, Time if duplicated range end - device TEXT NOT NULL, - channel TEXT NOT NULL, - value REAL, -- Nullable - data TEXT, -- Nullable (JSON) - data_type TEXT NOT NULL -- 'number' or 'json' - ); - - CREATE INDEX IF NOT EXISTS idx_sensor_events_search - ON sensor_events(device, channel, timestamp); - - -- Phase 2: Authentication & Views - CREATE TABLE IF NOT EXISTS users ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - username TEXT UNIQUE NOT NULL, - password_hash TEXT NOT NULL, - role TEXT NOT NULL CHECK(role IN ('admin', 'normal')), - created_at DATETIME DEFAULT CURRENT_TIMESTAMP - ); - - CREATE TABLE IF NOT EXISTS views ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - name TEXT UNIQUE NOT NULL, - config TEXT NOT NULL, -- JSON string of view configuration - created_by INTEGER, - created_at DATETIME DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY(created_by) REFERENCES users(id) - ); - - -- Rules for automation - CREATE TABLE IF NOT EXISTS rules ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - name TEXT NOT NULL, - type TEXT DEFAULT 'static', - enabled INTEGER DEFAULT 1, - position INTEGER DEFAULT 0, - conditions TEXT NOT NULL, -- JSON (nested AND/OR structure) - action TEXT NOT NULL, -- JSON object - created_by INTEGER, - created_at DATETIME DEFAULT CURRENT_TIMESTAMP, - updated_at DATETIME DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY(created_by) REFERENCES users(id) - ); - - -- Output events with RLE (same structure as sensor_events) - CREATE TABLE IF NOT EXISTS output_events ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - timestamp DATETIME NOT NULL, - until DATETIME, - channel TEXT NOT NULL, - value REAL, - data TEXT, - data_type TEXT NOT NULL - ); - - CREATE INDEX IF NOT EXISTS idx_output_events_search - ON output_events(channel, timestamp); - `); - - console.log('[DB] Database initialized successfully'); - return db; -} - -export default { initDatabase }; diff --git a/server/src/index.js b/server/src/index.js deleted file mode 100644 index 79dd204..0000000 --- a/server/src/index.js +++ /dev/null @@ -1,44 +0,0 @@ -import config from './config.js'; -import { initDatabase } from './db/schema.js'; -import { createWebSocketServer } from './websocket/server.js'; -import { startAggregationJob } from './jobs/aggregator.js'; -import { startCleanupJob } from './jobs/cleanup.js'; - -console.log('='.repeat(50)); -console.log('TischlerCtrl Sensor Server'); -console.log('='.repeat(50)); - -// Initialize database -const db = initDatabase(config.dbPath); - -// Start WebSocket server -const wss = createWebSocketServer({ - port: config.port, - db -}); - -// Start background jobs -const aggregationTimer = startAggregationJob(db, config.aggregationIntervalMs); -const cleanupTimer = startCleanupJob(db, config.cleanupIntervalMs); - -// Graceful shutdown -function shutdown() { - console.log('\n[Server] Shutting down...'); - - clearInterval(aggregationTimer); - clearInterval(cleanupTimer); - - wss.close(() => { - db.close(); - console.log('[Server] Goodbye!'); - process.exit(0); - }); - - // Force exit after 5 seconds - setTimeout(() => process.exit(1), 5000); -} - -process.on('SIGINT', shutdown); -process.on('SIGTERM', shutdown); - -console.log('[Server] Ready to accept connections'); diff --git a/server/src/jobs/aggregator.js b/server/src/jobs/aggregator.js deleted file mode 100644 index 172c28c..0000000 --- a/server/src/jobs/aggregator.js +++ /dev/null @@ -1,42 +0,0 @@ -import { aggregate10Minutes, aggregate1Hour, cleanupOldData } from '../db/queries.js'; - -/** - * Start the aggregation job that runs periodically - * @param {Database} db - SQLite database instance - * @param {number} intervalMs - Interval in milliseconds (default: 10 minutes) - * @returns {NodeJS.Timer} - The interval timer - */ -export function startAggregationJob(db, intervalMs = 10 * 60 * 1000) { - console.log(`[Aggregator] Starting aggregation job (interval: ${intervalMs / 1000}s)`); - - // Run immediately on start - runAggregation(db); - - // Then run periodically - return setInterval(() => runAggregation(db), intervalMs); -} - -/** - * Run the aggregation process - */ -function runAggregation(db) { - try { - const start = Date.now(); - - // Aggregate raw data to 10-minute buckets - const count10m = aggregate10Minutes(db); - - // Aggregate 10-minute data to 1-hour buckets - const count1h = aggregate1Hour(db); - - const elapsed = Date.now() - start; - - if (count10m > 0 || count1h > 0) { - console.log(`[Aggregator] Completed in ${elapsed}ms: ${count10m} 10m records, ${count1h} 1h records`); - } - } catch (err) { - console.error('[Aggregator] Error during aggregation:', err.message); - } -} - -export default { startAggregationJob }; diff --git a/server/src/jobs/cleanup.js b/server/src/jobs/cleanup.js deleted file mode 100644 index cc62c24..0000000 --- a/server/src/jobs/cleanup.js +++ /dev/null @@ -1,36 +0,0 @@ -import { cleanupOldData } from '../db/queries.js'; - -/** - * Start the cleanup job that runs periodically - * @param {Database} db - SQLite database instance - * @param {number} intervalMs - Interval in milliseconds (default: 1 hour) - * @returns {NodeJS.Timer} - The interval timer - */ -export function startCleanupJob(db, intervalMs = 60 * 60 * 1000) { - console.log(`[Cleanup] Starting cleanup job (interval: ${intervalMs / 1000}s)`); - - // Run after a delay on start (don't compete with aggregator) - setTimeout(() => runCleanup(db), 5 * 60 * 1000); - - // Then run periodically - return setInterval(() => runCleanup(db), intervalMs); -} - -/** - * Run the cleanup process - */ -function runCleanup(db) { - try { - const start = Date.now(); - const result = cleanupOldData(db); - const elapsed = Date.now() - start; - - if (result.rawDeleted > 0 || result.aggregatedDeleted > 0) { - console.log(`[Cleanup] Completed in ${elapsed}ms: deleted ${result.rawDeleted} raw, ${result.aggregatedDeleted} 10m records`); - } - } catch (err) { - console.error('[Cleanup] Error during cleanup:', err.message); - } -} - -export default { startCleanupJob }; diff --git a/server/src/websocket/server.js b/server/src/websocket/server.js deleted file mode 100644 index da81b32..0000000 --- a/server/src/websocket/server.js +++ /dev/null @@ -1,213 +0,0 @@ -import { WebSocketServer } from 'ws'; -import { validateApiKey, insertReadingsSmart } from '../db/queries.js'; - -/** - * Create and configure the WebSocket server - * @param {object} options - Server options - * @param {number} options.port - Port to listen on - * @param {Database} options.db - SQLite database instance - * @returns {WebSocketServer} - The WebSocket server instance - */ -export function createWebSocketServer({ port, db }) { - const wss = new WebSocketServer({ port }); - - // Track authenticated clients - const clients = new Map(); - - wss.on('connection', (ws, req) => { - const clientId = `${req.socket.remoteAddress}:${req.socket.remotePort}`; - console.log(`[WS] Client connected: ${clientId}`); - - // Client state - const clientState = { - authenticated: false, - devicePrefix: null, - name: null, - lastPong: Date.now() - }; - clients.set(ws, clientState); - - // Set up ping/pong for keepalive - ws.isAlive = true; - ws.on('pong', () => { - ws.isAlive = true; - clientState.lastPong = Date.now(); - }); - - ws.on('message', (data) => { - try { - const message = JSON.parse(data.toString()); - handleMessage(ws, message, clientState, db); - } catch (err) { - console.error(`[WS] Error parsing message from ${clientId}:`, err.message); - sendError(ws, 'Invalid JSON message'); - } - }); - - ws.on('close', () => { - console.log(`[WS] Client disconnected: ${clientId} (${clientState.name || 'unauthenticated'})`); - clients.delete(ws); - }); - - ws.on('error', (err) => { - console.error(`[WS] Error for ${clientId}:`, err.message); - }); - }); - - // Ping interval to detect dead connections - const pingInterval = setInterval(() => { - wss.clients.forEach((ws) => { - if (ws.isAlive === false) { - console.log('[WS] Terminating unresponsive client'); - return ws.terminate(); - } - ws.isAlive = false; - ws.ping(); - }); - }, 30000); - - wss.on('close', () => { - clearInterval(pingInterval); - }); - - console.log(`[WS] WebSocket server listening on port ${port}`); - return wss; -} - -/** - * Handle incoming WebSocket messages - * @param {WebSocket} ws - The WebSocket connection - * @param {object} message - Parsed message object - * @param {object} clientState - Client state object - * @param {Database} db - SQLite database instance - */ -function handleMessage(ws, message, clientState, db) { - const { type } = message; - - switch (type) { - case 'auth': - handleAuth(ws, message, clientState, db); - break; - - case 'data': - handleData(ws, message, clientState, db); - break; - - case 'pong': - // Client responded to our ping - clientState.lastPong = Date.now(); - break; - - default: - sendError(ws, `Unknown message type: ${type}`); - } -} - -/** - * Handle authentication request - */ -function handleAuth(ws, message, clientState, db) { - const { apiKey } = message; - - if (!apiKey) { - return sendError(ws, 'Missing apiKey in auth message'); - } - - const keyInfo = validateApiKey(db, apiKey); - - if (!keyInfo) { - send(ws, { type: 'auth', success: false, error: 'Invalid API key' }); - return; - } - - clientState.authenticated = true; - clientState.devicePrefix = keyInfo.device_prefix; - clientState.name = keyInfo.name; - - console.log(`[WS] Client authenticated: ${keyInfo.name} (prefix: ${keyInfo.device_prefix})`); - - send(ws, { - type: 'auth', - success: true, - devicePrefix: keyInfo.device_prefix, - name: keyInfo.name - }); -} - -/** - * Handle data ingestion - */ -function handleData(ws, message, clientState, db) { - if (!clientState.authenticated) { - return sendError(ws, 'Not authenticated. Send auth message first.'); - } - - const { readings } = message; - - if (!Array.isArray(readings) || readings.length === 0) { - return sendError(ws, 'Invalid readings: expected non-empty array'); - } - - const validReadings = []; - let skippedCount = 0; - - // Validate readings format - for (const reading of readings) { - // We require device, channel, and EITHER value (number) OR data (json) - if (!reading.device || !reading.channel) { - console.warn(`[WS] Skipped invalid reading (missing device/channel) from ${clientState.name}:`, JSON.stringify(reading)); - skippedCount++; - continue; - } - - const hasValue = reading.value !== undefined && reading.value !== null; - const hasData = reading.data !== undefined; - - if (!hasValue && !hasData) { - console.warn(`[WS] Skipped invalid reading (no value/data) from ${clientState.name}:`, JSON.stringify(reading)); - skippedCount++; - continue; - } - - validReadings.push(reading); - } - - if (validReadings.length === 0) { - if (skippedCount > 0) { - console.log(`[WS] Received ${skippedCount} readings, but all were invalid.`); - return send(ws, { type: 'ack', count: 0 }); - } - return sendError(ws, 'No valid readings found in batch'); - } - - try { - const result = insertReadingsSmart(db, clientState.devicePrefix, validReadings); - const count = result.inserted + result.updated; - - if (skippedCount > 0) { - console.log(`[WS] Processed ${count} readings (inserted: ${result.inserted}, updated: ${result.updated}, skipped: ${skippedCount}).`); - } - send(ws, { type: 'ack', count }); - } catch (err) { - console.error('[WS] Error inserting readings:', err.message); - sendError(ws, 'Failed to insert readings'); - } -} - -/** - * Send a message to a WebSocket client - */ -function send(ws, message) { - if (ws.readyState === 1) { // OPEN - ws.send(JSON.stringify(message)); - } -} - -/** - * Send an error message - */ -function sendError(ws, error) { - send(ws, { type: 'error', error }); -} - -export default { createWebSocketServer }; diff --git a/uiserver/webpack.config.js b/uiserver/webpack.config.js index e04e4d9..10901c4 100644 --- a/uiserver/webpack.config.js +++ b/uiserver/webpack.config.js @@ -4,6 +4,7 @@ const Database = require('better-sqlite3'); const { config } = require('dotenv'); const bcrypt = require('bcryptjs'); const jwt = require('jsonwebtoken'); +const { WebSocketServer } = require('ws'); // Load env vars config(); @@ -11,6 +12,7 @@ config(); // Database connection for Dev Server API const dbPath = process.env.DB_PATH || path.resolve(__dirname, '../server/data/sensors.db'); const JWT_SECRET = process.env.JWT_SECRET || 'dev-secret-key-change-me'; +const WS_PORT = process.env.WS_PORT || 3962; let db; try { @@ -20,6 +22,204 @@ try { console.error(`[UI Server] Failed to connect to database at ${dbPath}:`, err.message); } +// ============================================= +// WebSocket Server for Agents (port 3962) +// ============================================= + +// Track authenticated clients by devicePrefix +const agentClients = new Map(); // devicePrefix -> Set + +function validateApiKey(apiKey) { + if (!db) return null; + try { + const stmt = db.prepare('SELECT * FROM api_keys WHERE key = ? AND enabled = 1'); + return stmt.get(apiKey); + } catch (err) { + console.error('[WS] Error validating API key:', err.message); + return null; + } +} + +function insertReadingsSmart(devicePrefix, readings) { + if (!db) throw new Error('Database not connected'); + + let inserted = 0; + let updated = 0; + + const insertStmt = db.prepare(` + INSERT INTO sensor_events (timestamp, channel, device, value, data, data_type) + VALUES (?, ?, ?, ?, ?, ?) + `); + + const updateUntilStmt = db.prepare(` + UPDATE sensor_events SET until = ? WHERE id = ? + `); + + const getLastStmt = db.prepare(` + SELECT id, value, data FROM sensor_events + WHERE device = ? AND channel = ? + ORDER BY timestamp DESC LIMIT 1 + `); + + const now = new Date().toISOString(); + + for (const reading of readings) { + const device = `${devicePrefix}${reading.device}`; + const channel = reading.channel; + const value = reading.value ?? null; + const data = reading.data ? JSON.stringify(reading.data) : null; + const dataType = value !== null ? 'number' : 'json'; + + // Check last value for RLE + const last = getLastStmt.get(device, channel); + + if (last) { + const lastValue = last.value; + const lastData = last.data; + + // If same value, just update 'until' + if (value !== null && lastValue === value) { + updateUntilStmt.run(now, last.id); + updated++; + continue; + } + if (data !== null && lastData === data) { + updateUntilStmt.run(now, last.id); + updated++; + continue; + } + } + + // Insert new reading + insertStmt.run(now, channel, device, value, data, dataType); + inserted++; + } + + return { inserted, updated }; +} + +function createAgentWebSocketServer() { + const wss = new WebSocketServer({ port: WS_PORT }); + + wss.on('connection', (ws, req) => { + const clientId = `${req.socket.remoteAddress}:${req.socket.remotePort}`; + console.log(`[WS] Client connected: ${clientId}`); + + const clientState = { + authenticated: false, + devicePrefix: null, + name: null + }; + + ws.on('message', (data) => { + try { + const message = JSON.parse(data.toString()); + handleAgentMessage(ws, message, clientState, clientId); + } catch (err) { + console.error(`[WS] Error parsing message from ${clientId}:`, err.message); + ws.send(JSON.stringify({ type: 'error', error: 'Invalid JSON' })); + } + }); + + ws.on('close', () => { + console.log(`[WS] Client disconnected: ${clientId} (${clientState.name || 'unauthenticated'})`); + if (clientState.devicePrefix && agentClients.has(clientState.devicePrefix)) { + agentClients.get(clientState.devicePrefix).delete(ws); + } + }); + + ws.on('error', (err) => { + console.error(`[WS] Error for ${clientId}:`, err.message); + }); + }); + + console.log(`[WS] WebSocket server listening on port ${WS_PORT}`); + return wss; +} + +function handleAgentMessage(ws, message, clientState, clientId) { + const { type } = message; + + switch (type) { + case 'auth': + const { apiKey } = message; + if (!apiKey) { + ws.send(JSON.stringify({ type: 'auth', success: false, error: 'Missing apiKey' })); + return; + } + + const keyInfo = validateApiKey(apiKey); + if (!keyInfo) { + ws.send(JSON.stringify({ type: 'auth', success: false, error: 'Invalid API key' })); + return; + } + + clientState.authenticated = true; + clientState.devicePrefix = keyInfo.device_prefix; + clientState.name = keyInfo.name; + + // Track this connection + if (!agentClients.has(keyInfo.device_prefix)) { + agentClients.set(keyInfo.device_prefix, new Set()); + } + agentClients.get(keyInfo.device_prefix).add(ws); + + console.log(`[WS] Client authenticated: ${keyInfo.name} (prefix: ${keyInfo.device_prefix})`); + ws.send(JSON.stringify({ type: 'auth', success: true, devicePrefix: keyInfo.device_prefix, name: keyInfo.name })); + break; + + case 'data': + if (!clientState.authenticated) { + ws.send(JSON.stringify({ type: 'error', error: 'Not authenticated' })); + return; + } + + const { readings } = message; + if (!Array.isArray(readings) || readings.length === 0) { + ws.send(JSON.stringify({ type: 'error', error: 'Invalid readings' })); + return; + } + + try { + const validReadings = readings.filter(r => r.device && r.channel && (r.value !== undefined || r.data !== undefined)); + const result = insertReadingsSmart(clientState.devicePrefix, validReadings); + ws.send(JSON.stringify({ type: 'ack', count: result.inserted + result.updated })); + } catch (err) { + console.error('[WS] Error inserting readings:', err.message); + ws.send(JSON.stringify({ type: 'error', error: 'Failed to insert readings' })); + } + break; + + default: + ws.send(JSON.stringify({ type: 'error', error: `Unknown message type: ${type}` })); + } +} + +// Send command to all agents with the given device prefix +function sendCommandToDevicePrefix(devicePrefix, command) { + const clients = agentClients.get(devicePrefix); + if (!clients || clients.size === 0) { + console.log(`[WS] No connected agents for prefix: ${devicePrefix}`); + return false; + } + + const message = JSON.stringify({ type: 'command', ...command }); + let sent = 0; + + for (const ws of clients) { + if (ws.readyState === 1) { // OPEN + ws.send(message); + sent++; + } + } + + console.log(`[WS] Sent command to ${sent} agent(s) with prefix ${devicePrefix}:`, command); + return sent > 0; +} + +// Start the WebSocket server +const agentWss = createAgentWebSocketServer(); + module.exports = { entry: './src/index.js', output: { @@ -456,7 +656,9 @@ module.exports = { `); const last = lastStmt.get(channel); - if (last && Math.abs(last.value - value) < Number.EPSILON) { + const valueChanged = !last || Math.abs(last.value - value) >= Number.EPSILON; + + if (!valueChanged) { // Same value - update the until timestamp (RLE) const updateStmt = db.prepare('UPDATE output_events SET until = ? WHERE id = ?'); updateStmt.run(now, last.id); @@ -468,6 +670,16 @@ module.exports = { `); insertStmt.run(now, channel, value); console.log(`[RuleRunner] Output changed: ${channel} = ${value}`); + + // Send command to bound physical device + const binding = OUTPUT_BINDINGS[channel]; + if (binding) { + sendCommandToDevicePrefix(`${binding.device}:`, { + device: binding.channel, + action: 'set_state', + value: value > 0 ? 1 : 0 + }); + } } }