# Sensor Data Collection System A Node.js server that collects sensor data from multiple agents via WebSocket, stores it in SQLite with automatic data summarization and retention policies. ## Architecture Overview ```mermaid graph TB subgraph "Central Server (Node.js)" WS[WebSocket Server :8080] DB[(SQLite Database)] AGG[Aggregation Job] WS --> DB AGG --> DB end subgraph "AC Infinity Agent (Node.js)" AC[AC Infinity Client] AC -->|polls every 60s| ACAPI[AC Infinity Cloud API] AC -->|WebSocket| WS end subgraph "Tapo Agent (Rust)" TAPO[Tapo Client] TAPO -->|polls every 60s| PLUG[Tapo P100/P110] TAPO -->|WebSocket| WS end subgraph "Custom CLI Agent" CLI[Shell Script] CLI -->|WebSocket| WS end ``` --- ## User Review Required > [!IMPORTANT] > **Tapo Agent Language Choice**: I recommend **Rust** for the Tapo agent because: > - Compiles to a single ~2MB static binary > - Uses ~5-10MB RAM at runtime > - Excellent [tapo crate](https://crates.io/crates/tapo) already exists > - Easy cross-compilation for Raspberry Pi > > Alternatively, I could write it in **Go** (would need to implement protocol from scratch) or as a **Node.js** agent (but you mentioned wanting it lightweight). > [!IMPORTANT] > **AC Infinity Credentials**: The AC Infinity API requires email/password authentication to their cloud service. These will need to be stored in configuration. --- ## Project Structure ``` tischlerctrl/ ├── server/ │ ├── package.json │ ├── src/ │ │ ├── index.js # Entry point │ │ ├── config.js # Configuration loader │ │ ├── db/ │ │ │ ├── schema.js # SQLite schema + migrations │ │ │ └── queries.js # Database operations │ │ ├── websocket/ │ │ │ ├── server.js # WebSocket server │ │ │ └── handlers.js # Message handlers │ │ └── jobs/ │ │ ├── aggregator.js # Data summarization job │ │ └── cleanup.js # Data retention cleanup │ └── data/ │ └── sensors.db # SQLite database file │ ├── agents/ │ ├── ac-infinity/ │ │ ├── package.json │ │ └── src/ │ │ ├── index.js # Entry point │ │ ├── config.js # Configuration │ │ ├── ac-client.js # AC Infinity API client │ │ └── ws-client.js # WebSocket client with reconnect │ │ │ ├── tapo/ │ │ ├── Cargo.toml │ │ └── src/ │ │ └── main.rs # Rust Tapo agent │ │ │ └── cli/ │ └── sensor-send # Shell script CLI tool │ ├── .env.example # Example environment variables └── README.md ``` --- ## Proposed Changes ### Server - Database Schema #### [NEW] [schema.js](file:///home/seb/src/tischlerctrl/server/src/db/schema.js) SQLite tables: ```sql -- API keys for agent authentication CREATE TABLE api_keys ( id INTEGER PRIMARY KEY AUTOINCREMENT, key TEXT UNIQUE NOT NULL, name TEXT NOT NULL, -- e.g., "ac-infinity-agent" device_prefix TEXT NOT NULL, -- e.g., "ac:" or "tapo:" created_at DATETIME DEFAULT CURRENT_TIMESTAMP, last_used_at DATETIME ); -- Raw sensor data (1-minute resolution, kept for 1 week) CREATE TABLE sensor_data ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp DATETIME NOT NULL, device TEXT NOT NULL, -- e.g., "ac:controller-69-grow" channel TEXT NOT NULL, -- e.g., "temperature", "humidity", "power" value REAL NOT NULL, INDEX idx_sensor_data_time (timestamp), INDEX idx_sensor_data_device (device, channel) ); -- 10-minute aggregated data (kept for 1 month) CREATE TABLE sensor_data_10m ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp DATETIME NOT NULL, -- Rounded to 10-min boundary device TEXT NOT NULL, channel TEXT NOT NULL, value REAL NOT NULL, -- Averaged value sample_count INTEGER NOT NULL, UNIQUE(timestamp, device, channel) ); -- 1-hour aggregated data (kept forever) CREATE TABLE sensor_data_1h ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp DATETIME NOT NULL, -- Rounded to 1-hour boundary device TEXT NOT NULL, channel TEXT NOT NULL, value REAL NOT NULL, -- Averaged value sample_count INTEGER NOT NULL, UNIQUE(timestamp, device, channel) ); ``` --- ### Server - WebSocket Protocol #### [NEW] [server.js](file:///home/seb/src/tischlerctrl/server/src/websocket/server.js) **Authentication Flow:** 1. Client connects to `ws://server:8080` 2. Client sends: `{ "type": "auth", "apiKey": "xxx" }` 3. Server validates API key, responds: `{ "type": "auth", "success": true, "devicePrefix": "ac:" }` 4. Client is now authenticated and can send data **Data Ingestion Message:** ```json { "type": "data", "readings": [ { "device": "controller-69-grow", "channel": "temperature", "value": 24.5 }, { "device": "controller-69-grow", "channel": "humidity", "value": 65.2 } ] } ``` Server prepends `devicePrefix` to device names and adds timestamp. **Keepalive:** - Server sends `ping` every 30 seconds - Client responds with `pong` - Connection closed after 90 seconds of no response --- ### Server - Aggregation Jobs #### [NEW] [aggregator.js](file:///home/seb/src/tischlerctrl/server/src/jobs/aggregator.js) Runs every 10 minutes: 1. **10-minute aggregation**: - Select data from `sensor_data` older than 10 minutes - Group by device, channel, and 10-minute bucket - Calculate average, insert into `sensor_data_10m` 2. **1-hour aggregation**: - Select data from `sensor_data_10m` older than 1 hour - Group by device, channel, and 1-hour bucket - Calculate weighted average, insert into `sensor_data_1h` #### [NEW] [cleanup.js](file:///home/seb/src/tischlerctrl/server/src/jobs/cleanup.js) Runs every hour: - Delete from `sensor_data` where timestamp < NOW - 7 days - Delete from `sensor_data_10m` where timestamp < NOW - 30 days --- ### AC Infinity Agent #### [NEW] [ac-client.js](file:///home/seb/src/tischlerctrl/agents/ac-infinity/src/ac-client.js) Port of the TypeScript AC Infinity client to JavaScript ES modules: - `login(email, password)` → Returns userId token - `getDevicesListAll()` → Returns all controllers with sensor readings - Polling interval: 60 seconds - Extracts: temperature, humidity, VPD (if available) per controller **Data extraction from API response:** ```javascript // Each device in response has: // - devId, devName // - devSettings.temperature (°C * 100) // - devSettings.humidity (% * 100) // We normalize and send to server ``` #### [NEW] [ws-client.js](file:///home/seb/src/tischlerctrl/agents/ac-infinity/src/ws-client.js) WebSocket client with: - Auto-reconnect with exponential backoff (1s → 2s → 4s → ... → 60s max) - Authentication on connect - Heartbeat response - Message queue during disconnection --- ### Tapo Agent (Rust) #### [NEW] [main.rs](file:///home/seb/src/tischlerctrl/agents/tapo/src/main.rs) Uses [tapo crate](https://crates.io/crates/tapo) for P100/P110 communication. **Features:** - Configuration via environment variables or TOML file - WebSocket client with tungstenite crate - Auto-reconnect with backoff - Polls devices every 60 seconds **Data collected:** | Device | Channel | Description | |--------|---------|-------------| | P100 | `state` | 0 = off, 1 = on | | P110 | `state` | 0 = off, 1 = on | | P110 | `power` | Current power in watts | | P110 | `energy_today` | Energy used today in Wh | **Build for Raspberry Pi:** ```bash # Cross-compile for ARM cross build --release --target armv7-unknown-linux-gnueabihf # Binary: ~2MB, runs with ~8MB RAM ``` --- ### Custom CLI Agent #### [NEW] [sensor-send](file:///home/seb/src/tischlerctrl/agents/cli/sensor-send) A shell script using `websocat` (lightweight WebSocket CLI tool): ```bash #!/bin/bash # Usage: sensor-send --device=mydevice --channel=temp --value=23.5 API_KEY="${SENSOR_API_KEY:-}" SERVER="${SENSOR_SERVER:-ws://localhost:8080}" sensor-send mydevice temperature 23.5 ``` Requires: `websocat` (single binary, ~3MB, available via cargo or apt) --- ## Configuration Examples ### Server `.env` ```bash PORT=8080 DB_PATH=./data/sensors.db # Generate API keys via CLI: node src/cli/generate-key.js "ac-infinity" "ac:" ``` ### AC Infinity Agent `.env` ```bash SERVER_URL=ws://192.168.1.100:8080 API_KEY=your-api-key-here AC_EMAIL=your@email.com AC_PASSWORD=your-password POLL_INTERVAL_MS=60000 ``` ### Tapo Agent `config.toml` ```toml server_url = "ws://192.168.1.100:8080" api_key = "your-api-key-here" poll_interval_secs = 60 [[devices]] ip = "192.168.1.50" name = "grow-light-plug" type = "P110" # or "P100" tapo_email = "your@email.com" tapo_password = "your-tapo-password" ``` --- ## Verification Plan ### Automated Tests 1. **Server unit tests**: Database operations, aggregation logic 2. **Integration test**: Start server, connect mock agent, verify data flow 3. **Run commands**: ```bash cd server && npm test cd agents/ac-infinity && npm test ``` ### Manual Verification 1. Start server, verify WebSocket accepts connections 2. Send test data via CLI agent, verify it appears in database 3. Wait 10+ minutes, verify aggregation runs and data appears in `sensor_data_10m` 4. Connect AC Infinity agent with real credentials, verify sensor readings 5. Deploy Tapo agent to Raspberry Pi, verify plug data collection