9.6 KiB
Sensor Data Collection System
A Node.js server that collects sensor data from multiple agents via WebSocket, stores it in SQLite with automatic data summarization and retention policies.
Architecture Overview
graph TB
subgraph "Central Server (Node.js)"
WS[WebSocket Server :8080]
DB[(SQLite Database)]
AGG[Aggregation Job]
WS --> DB
AGG --> DB
end
subgraph "AC Infinity Agent (Node.js)"
AC[AC Infinity Client]
AC -->|polls every 60s| ACAPI[AC Infinity Cloud API]
AC -->|WebSocket| WS
end
subgraph "Tapo Agent (Rust)"
TAPO[Tapo Client]
TAPO -->|polls every 60s| PLUG[Tapo P100/P110]
TAPO -->|WebSocket| WS
end
subgraph "Custom CLI Agent"
CLI[Shell Script]
CLI -->|WebSocket| WS
end
User Review Required
Important
Tapo Agent Language Choice: I recommend Rust for the Tapo agent because:
- Compiles to a single ~2MB static binary
- Uses ~5-10MB RAM at runtime
- Excellent tapo crate already exists
- Easy cross-compilation for Raspberry Pi
Alternatively, I could write it in Go (would need to implement protocol from scratch) or as a Node.js agent (but you mentioned wanting it lightweight).
Important
AC Infinity Credentials: The AC Infinity API requires email/password authentication to their cloud service. These will need to be stored in configuration.
Project Structure
tischlerctrl/
├── server/
│ ├── package.json
│ ├── src/
│ │ ├── index.js # Entry point
│ │ ├── config.js # Configuration loader
│ │ ├── db/
│ │ │ ├── schema.js # SQLite schema + migrations
│ │ │ └── queries.js # Database operations
│ │ ├── websocket/
│ │ │ ├── server.js # WebSocket server
│ │ │ └── handlers.js # Message handlers
│ │ └── jobs/
│ │ ├── aggregator.js # Data summarization job
│ │ └── cleanup.js # Data retention cleanup
│ └── data/
│ └── sensors.db # SQLite database file
│
├── agents/
│ ├── ac-infinity/
│ │ ├── package.json
│ │ └── src/
│ │ ├── index.js # Entry point
│ │ ├── config.js # Configuration
│ │ ├── ac-client.js # AC Infinity API client
│ │ └── ws-client.js # WebSocket client with reconnect
│ │
│ ├── tapo/
│ │ ├── Cargo.toml
│ │ └── src/
│ │ └── main.rs # Rust Tapo agent
│ │
│ └── cli/
│ └── sensor-send # Shell script CLI tool
│
├── .env.example # Example environment variables
└── README.md
Proposed Changes
Server - Database Schema
[NEW] schema.js
SQLite tables:
-- API keys for agent authentication
CREATE TABLE api_keys (
id INTEGER PRIMARY KEY AUTOINCREMENT,
key TEXT UNIQUE NOT NULL,
name TEXT NOT NULL, -- e.g., "ac-infinity-agent"
device_prefix TEXT NOT NULL, -- e.g., "ac:" or "tapo:"
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
last_used_at DATETIME
);
-- Raw sensor data (1-minute resolution, kept for 1 week)
CREATE TABLE sensor_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME NOT NULL,
device TEXT NOT NULL, -- e.g., "ac:controller-69-grow"
channel TEXT NOT NULL, -- e.g., "temperature", "humidity", "power"
value REAL NOT NULL,
INDEX idx_sensor_data_time (timestamp),
INDEX idx_sensor_data_device (device, channel)
);
-- 10-minute aggregated data (kept for 1 month)
CREATE TABLE sensor_data_10m (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME NOT NULL, -- Rounded to 10-min boundary
device TEXT NOT NULL,
channel TEXT NOT NULL,
value REAL NOT NULL, -- Averaged value
sample_count INTEGER NOT NULL,
UNIQUE(timestamp, device, channel)
);
-- 1-hour aggregated data (kept forever)
CREATE TABLE sensor_data_1h (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME NOT NULL, -- Rounded to 1-hour boundary
device TEXT NOT NULL,
channel TEXT NOT NULL,
value REAL NOT NULL, -- Averaged value
sample_count INTEGER NOT NULL,
UNIQUE(timestamp, device, channel)
);
Server - WebSocket Protocol
[NEW] server.js
Authentication Flow:
- Client connects to
ws://server:8080 - Client sends:
{ "type": "auth", "apiKey": "xxx" } - Server validates API key, responds:
{ "type": "auth", "success": true, "devicePrefix": "ac:" } - Client is now authenticated and can send data
Data Ingestion Message:
{
"type": "data",
"readings": [
{ "device": "controller-69-grow", "channel": "temperature", "value": 24.5 },
{ "device": "controller-69-grow", "channel": "humidity", "value": 65.2 }
]
}
Server prepends devicePrefix to device names and adds timestamp.
Keepalive:
- Server sends
pingevery 30 seconds - Client responds with
pong - Connection closed after 90 seconds of no response
Server - Aggregation Jobs
[NEW] aggregator.js
Runs every 10 minutes:
-
10-minute aggregation:
- Select data from
sensor_dataolder than 10 minutes - Group by device, channel, and 10-minute bucket
- Calculate average, insert into
sensor_data_10m
- Select data from
-
1-hour aggregation:
- Select data from
sensor_data_10molder than 1 hour - Group by device, channel, and 1-hour bucket
- Calculate weighted average, insert into
sensor_data_1h
- Select data from
[NEW] cleanup.js
Runs every hour:
- Delete from
sensor_datawhere timestamp < NOW - 7 days - Delete from
sensor_data_10mwhere timestamp < NOW - 30 days
AC Infinity Agent
[NEW] ac-client.js
Port of the TypeScript AC Infinity client to JavaScript ES modules:
login(email, password)→ Returns userId tokengetDevicesListAll()→ Returns all controllers with sensor readings- Polling interval: 60 seconds
- Extracts: temperature, humidity, VPD (if available) per controller
Data extraction from API response:
// Each device in response has:
// - devId, devName
// - devSettings.temperature (°C * 100)
// - devSettings.humidity (% * 100)
// We normalize and send to server
[NEW] ws-client.js
WebSocket client with:
- Auto-reconnect with exponential backoff (1s → 2s → 4s → ... → 60s max)
- Authentication on connect
- Heartbeat response
- Message queue during disconnection
Tapo Agent (Rust)
[NEW] main.rs
Uses tapo crate for P100/P110 communication.
Features:
- Configuration via environment variables or TOML file
- WebSocket client with tungstenite crate
- Auto-reconnect with backoff
- Polls devices every 60 seconds
Data collected:
| Device | Channel | Description |
|---|---|---|
| P100 | state |
0 = off, 1 = on |
| P110 | state |
0 = off, 1 = on |
| P110 | power |
Current power in watts |
| P110 | energy_today |
Energy used today in Wh |
Build for Raspberry Pi:
# Cross-compile for ARM
cross build --release --target armv7-unknown-linux-gnueabihf
# Binary: ~2MB, runs with ~8MB RAM
Custom CLI Agent
[NEW] sensor-send
A shell script using websocat (lightweight WebSocket CLI tool):
#!/bin/bash
# Usage: sensor-send --device=mydevice --channel=temp --value=23.5
API_KEY="${SENSOR_API_KEY:-}"
SERVER="${SENSOR_SERVER:-ws://localhost:8080}"
sensor-send mydevice temperature 23.5
Requires: websocat (single binary, ~3MB, available via cargo or apt)
Configuration Examples
Server .env
PORT=8080
DB_PATH=./data/sensors.db
# Generate API keys via CLI: node src/cli/generate-key.js "ac-infinity" "ac:"
AC Infinity Agent .env
SERVER_URL=ws://192.168.1.100:8080
API_KEY=your-api-key-here
AC_EMAIL=your@email.com
AC_PASSWORD=your-password
POLL_INTERVAL_MS=60000
Tapo Agent config.toml
server_url = "ws://192.168.1.100:8080"
api_key = "your-api-key-here"
poll_interval_secs = 60
[[devices]]
ip = "192.168.1.50"
name = "grow-light-plug"
type = "P110" # or "P100"
tapo_email = "your@email.com"
tapo_password = "your-tapo-password"
Verification Plan
Automated Tests
- Server unit tests: Database operations, aggregation logic
- Integration test: Start server, connect mock agent, verify data flow
- Run commands:
cd server && npm test cd agents/ac-infinity && npm test
Manual Verification
- Start server, verify WebSocket accepts connections
- Send test data via CLI agent, verify it appears in database
- Wait 10+ minutes, verify aggregation runs and data appears in
sensor_data_10m - Connect AC Infinity agent with real credentials, verify sensor readings
- Deploy Tapo agent to Raspberry Pi, verify plug data collection