Files
tischlerCtrl/implementation_plan.md

9.6 KiB

Sensor Data Collection System

A Node.js server that collects sensor data from multiple agents via WebSocket, stores it in SQLite with automatic data summarization and retention policies.

Architecture Overview

graph TB
    subgraph "Central Server (Node.js)"
        WS[WebSocket Server :8080]
        DB[(SQLite Database)]
        AGG[Aggregation Job]
        WS --> DB
        AGG --> DB
    end
    
    subgraph "AC Infinity Agent (Node.js)"
        AC[AC Infinity Client]
        AC -->|polls every 60s| ACAPI[AC Infinity Cloud API]
        AC -->|WebSocket| WS
    end
    
    subgraph "Tapo Agent (Rust)"
        TAPO[Tapo Client]
        TAPO -->|polls every 60s| PLUG[Tapo P100/P110]
        TAPO -->|WebSocket| WS
    end
    
    subgraph "Custom CLI Agent"
        CLI[Shell Script]
        CLI -->|WebSocket| WS
    end

User Review Required

Important

Tapo Agent Language Choice: I recommend Rust for the Tapo agent because:

  • Compiles to a single ~2MB static binary
  • Uses ~5-10MB RAM at runtime
  • Excellent tapo crate already exists
  • Easy cross-compilation for Raspberry Pi

Alternatively, I could write it in Go (would need to implement protocol from scratch) or as a Node.js agent (but you mentioned wanting it lightweight).

Important

AC Infinity Credentials: The AC Infinity API requires email/password authentication to their cloud service. These will need to be stored in configuration.


Project Structure

tischlerctrl/
├── server/
│   ├── package.json
│   ├── src/
│   │   ├── index.js              # Entry point
│   │   ├── config.js             # Configuration loader
│   │   ├── db/
│   │   │   ├── schema.js         # SQLite schema + migrations
│   │   │   └── queries.js        # Database operations
│   │   ├── websocket/
│   │   │   ├── server.js         # WebSocket server
│   │   │   └── handlers.js       # Message handlers
│   │   └── jobs/
│   │       ├── aggregator.js     # Data summarization job
│   │       └── cleanup.js        # Data retention cleanup
│   └── data/
│       └── sensors.db            # SQLite database file
│
├── agents/
│   ├── ac-infinity/
│   │   ├── package.json
│   │   └── src/
│   │       ├── index.js          # Entry point
│   │       ├── config.js         # Configuration
│   │       ├── ac-client.js      # AC Infinity API client
│   │       └── ws-client.js      # WebSocket client with reconnect
│   │
│   ├── tapo/
│   │   ├── Cargo.toml
│   │   └── src/
│   │       └── main.rs           # Rust Tapo agent
│   │
│   └── cli/
│       └── sensor-send           # Shell script CLI tool
│
├── .env.example                  # Example environment variables
└── README.md

Proposed Changes

Server - Database Schema

[NEW] schema.js

SQLite tables:

-- API keys for agent authentication
CREATE TABLE api_keys (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    key TEXT UNIQUE NOT NULL,
    name TEXT NOT NULL,           -- e.g., "ac-infinity-agent"
    device_prefix TEXT NOT NULL,  -- e.g., "ac:" or "tapo:"
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    last_used_at DATETIME
);

-- Raw sensor data (1-minute resolution, kept for 1 week)
CREATE TABLE sensor_data (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    timestamp DATETIME NOT NULL,
    device TEXT NOT NULL,         -- e.g., "ac:controller-69-grow"
    channel TEXT NOT NULL,        -- e.g., "temperature", "humidity", "power"
    value REAL NOT NULL,
    INDEX idx_sensor_data_time (timestamp),
    INDEX idx_sensor_data_device (device, channel)
);

-- 10-minute aggregated data (kept for 1 month)
CREATE TABLE sensor_data_10m (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    timestamp DATETIME NOT NULL,  -- Rounded to 10-min boundary
    device TEXT NOT NULL,
    channel TEXT NOT NULL,
    value REAL NOT NULL,          -- Averaged value
    sample_count INTEGER NOT NULL,
    UNIQUE(timestamp, device, channel)
);

-- 1-hour aggregated data (kept forever)
CREATE TABLE sensor_data_1h (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    timestamp DATETIME NOT NULL,  -- Rounded to 1-hour boundary
    device TEXT NOT NULL,
    channel TEXT NOT NULL,
    value REAL NOT NULL,          -- Averaged value
    sample_count INTEGER NOT NULL,
    UNIQUE(timestamp, device, channel)
);

Server - WebSocket Protocol

[NEW] server.js

Authentication Flow:

  1. Client connects to ws://server:8080
  2. Client sends: { "type": "auth", "apiKey": "xxx" }
  3. Server validates API key, responds: { "type": "auth", "success": true, "devicePrefix": "ac:" }
  4. Client is now authenticated and can send data

Data Ingestion Message:

{
  "type": "data",
  "readings": [
    { "device": "controller-69-grow", "channel": "temperature", "value": 24.5 },
    { "device": "controller-69-grow", "channel": "humidity", "value": 65.2 }
  ]
}

Server prepends devicePrefix to device names and adds timestamp.

Keepalive:

  • Server sends ping every 30 seconds
  • Client responds with pong
  • Connection closed after 90 seconds of no response

Server - Aggregation Jobs

[NEW] aggregator.js

Runs every 10 minutes:

  1. 10-minute aggregation:

    • Select data from sensor_data older than 10 minutes
    • Group by device, channel, and 10-minute bucket
    • Calculate average, insert into sensor_data_10m
  2. 1-hour aggregation:

    • Select data from sensor_data_10m older than 1 hour
    • Group by device, channel, and 1-hour bucket
    • Calculate weighted average, insert into sensor_data_1h

[NEW] cleanup.js

Runs every hour:

  • Delete from sensor_data where timestamp < NOW - 7 days
  • Delete from sensor_data_10m where timestamp < NOW - 30 days

AC Infinity Agent

[NEW] ac-client.js

Port of the TypeScript AC Infinity client to JavaScript ES modules:

  • login(email, password) → Returns userId token
  • getDevicesListAll() → Returns all controllers with sensor readings
  • Polling interval: 60 seconds
  • Extracts: temperature, humidity, VPD (if available) per controller

Data extraction from API response:

// Each device in response has:
// - devId, devName
// - devSettings.temperature (°C * 100)
// - devSettings.humidity (% * 100)
// We normalize and send to server

[NEW] ws-client.js

WebSocket client with:

  • Auto-reconnect with exponential backoff (1s → 2s → 4s → ... → 60s max)
  • Authentication on connect
  • Heartbeat response
  • Message queue during disconnection

Tapo Agent (Rust)

[NEW] main.rs

Uses tapo crate for P100/P110 communication.

Features:

  • Configuration via environment variables or TOML file
  • WebSocket client with tungstenite crate
  • Auto-reconnect with backoff
  • Polls devices every 60 seconds

Data collected:

Device Channel Description
P100 state 0 = off, 1 = on
P110 state 0 = off, 1 = on
P110 power Current power in watts
P110 energy_today Energy used today in Wh

Build for Raspberry Pi:

# Cross-compile for ARM
cross build --release --target armv7-unknown-linux-gnueabihf
# Binary: ~2MB, runs with ~8MB RAM

Custom CLI Agent

[NEW] sensor-send

A shell script using websocat (lightweight WebSocket CLI tool):

#!/bin/bash
# Usage: sensor-send --device=mydevice --channel=temp --value=23.5

API_KEY="${SENSOR_API_KEY:-}"
SERVER="${SENSOR_SERVER:-ws://localhost:8080}"

sensor-send mydevice temperature 23.5

Requires: websocat (single binary, ~3MB, available via cargo or apt)


Configuration Examples

Server .env

PORT=8080
DB_PATH=./data/sensors.db
# Generate API keys via CLI: node src/cli/generate-key.js "ac-infinity" "ac:"

AC Infinity Agent .env

SERVER_URL=ws://192.168.1.100:8080
API_KEY=your-api-key-here
AC_EMAIL=your@email.com
AC_PASSWORD=your-password
POLL_INTERVAL_MS=60000

Tapo Agent config.toml

server_url = "ws://192.168.1.100:8080"
api_key = "your-api-key-here"
poll_interval_secs = 60

[[devices]]
ip = "192.168.1.50"
name = "grow-light-plug"
type = "P110"  # or "P100"
tapo_email = "your@email.com"
tapo_password = "your-tapo-password"

Verification Plan

Automated Tests

  1. Server unit tests: Database operations, aggregation logic
  2. Integration test: Start server, connect mock agent, verify data flow
  3. Run commands:
    cd server && npm test
    cd agents/ac-infinity && npm test
    

Manual Verification

  1. Start server, verify WebSocket accepts connections
  2. Send test data via CLI agent, verify it appears in database
  3. Wait 10+ minutes, verify aggregation runs and data appears in sensor_data_10m
  4. Connect AC Infinity agent with real credentials, verify sensor readings
  5. Deploy Tapo agent to Raspberry Pi, verify plug data collection