diff --git a/agents/ac-infinity/src/ac-client.js b/agents/ac-infinity/src/ac-client.js index dff01db..3b21046 100644 --- a/agents/ac-infinity/src/ac-client.js +++ b/agents/ac-infinity/src/ac-client.js @@ -35,14 +35,15 @@ export class ACInfinityClient { async login() { try { - // AC Infinity API does not accept passwords greater than 25 characters - const normalizedPassword = this.password.substring(0, 25); + // AC Infinity API does not accept passwords greater than 25 characters - UPDATE: Reference impl uses full password? + // const normalizedPassword = this.password.substring(0, 25); + const normalizedPassword = this.password; const response = await fetch(`${this.host}${API_URL_LOGIN}`, { method: 'POST', headers: { 'User-Agent': 'ACController/1.9.7 (com.acinfinity.humiture; build:533; iOS 18.5.0) Alamofire/5.10.2', - 'Content-Type': 'application/x-www-form-urlencoded; charset=utf-8', + 'Content-Type': 'application/x-www-form-urlencoded', }, body: new URLSearchParams({ appEmail: this.email, @@ -63,6 +64,7 @@ export class ACInfinityClient { console.log('[AC] Successfully logged in to AC Infinity API'); return this.userId; } catch (error) { + console.error('[AC] Login error details:', error); // Added detailed logging if (error instanceof ACInfinityClientError) { throw error; } @@ -129,6 +131,9 @@ export class ACInfinityClient { for (const device of devices) { const devId = device.devId; const devName = device.devName || `device-${devId}`; + // Use deviceInfo if available (newer API structure), otherwise fallback to root/devSettings + const info = device.deviceInfo || device; + const settings = device.devSettings || info; // Normalize device name for use as identifier const deviceId = devName @@ -136,52 +141,65 @@ export class ACInfinityClient { .replace(/[^a-z0-9]+/g, '-') .replace(/^-|-$/g, ''); - // Extract sensor data from device settings or sensor fields - // Temperature is stored as Celsius * 100 - if (device.devSettings?.temperature !== undefined) { + // --- Device Level Sensors --- + + // Temperature (Celsius * 100) + if (info.temperature !== undefined) { readings.push({ device: deviceId, channel: 'temperature', - value: device.devSettings.temperature / 100, + value: info.temperature / 100, }); - } else if (device.temperature !== undefined) { + } else if (settings.temperature !== undefined) { readings.push({ device: deviceId, channel: 'temperature', - value: device.temperature / 100, + value: settings.temperature / 100, }); } - // Humidity is stored as % * 100 - if (device.devSettings?.humidity !== undefined) { + // Humidity (% * 100) + if (info.humidity !== undefined) { readings.push({ device: deviceId, channel: 'humidity', - value: device.devSettings.humidity / 100, + value: info.humidity / 100, }); - } else if (device.humidity !== undefined) { + } else if (settings.humidity !== undefined) { readings.push({ device: deviceId, channel: 'humidity', - value: device.humidity / 100, + value: settings.humidity / 100, }); } - // VPD if available - if (device.devSettings?.vpdnums !== undefined) { + // VPD + if (info.vpdnums !== undefined) { readings.push({ device: deviceId, channel: 'vpd', - value: device.devSettings.vpdnums / 100, + value: info.vpdnums / 100, + }); + } else if (settings.vpdnums !== undefined) { + readings.push({ + device: deviceId, + channel: 'vpd', + value: settings.vpdnums / 100, }); } - // Check for port-level sensors (some controllers have multiple ports) - if (device.devPortList && Array.isArray(device.devPortList)) { - for (const port of device.devPortList) { - const portId = port.portId || port.port; - const portDeviceId = `${deviceId}-port${portId}`; + // --- Port Level Sensors/State --- + const ports = info.ports || device.devPortList; + if (ports && Array.isArray(ports)) { + for (const port of ports) { + const portId = port.port || port.portId; + const portName = port.portName || `port${portId}`; + // Create a descriptive suffix for the port device, e.g. "wall-fan" or "wall-port1" + // If portName is generic "Port X", use number. If it's specific "Fan", use that. + const suffix = portName.toLowerCase().replace(/[^a-z0-9]+/g, '-'); + const portDeviceId = `${deviceId}-${suffix}`; + // Port specific sensors (if any - sometimes temp usually on device) if (port.temperature !== undefined) { readings.push({ device: portDeviceId, @@ -189,7 +207,6 @@ export class ACInfinityClient { value: port.temperature / 100, }); } - if (port.humidity !== undefined) { readings.push({ device: portDeviceId, @@ -197,6 +214,15 @@ export class ACInfinityClient { value: port.humidity / 100, }); } + + // Level / Speed (speak) + if (port.speak !== undefined) { + readings.push({ + device: portDeviceId, + channel: 'level', + value: port.speak, + }); + } } } } diff --git a/agents/ac-infinity/src/config.js b/agents/ac-infinity/src/config.js index 6e7c4ec..a357289 100644 --- a/agents/ac-infinity/src/config.js +++ b/agents/ac-infinity/src/config.js @@ -21,5 +21,5 @@ export default { pollIntervalMs: parseInt(process.env.POLL_INTERVAL_MS || '60000', 10), // AC Infinity API - acApiHost: process.env.AC_API_HOST || 'https://www.acinfinity.com', + acApiHost: process.env.AC_API_HOST || 'http://www.acinfinityserver.com', }; diff --git a/server/src/db/queries.js b/server/src/db/queries.js index 5f955c0..5a0ca44 100644 --- a/server/src/db/queries.js +++ b/server/src/db/queries.js @@ -11,21 +11,21 @@ import crypto from 'crypto'; * @returns {object|null} - API key metadata or null if invalid */ export function validateApiKey(db, apiKey) { - const stmt = db.prepare(` + const stmt = db.prepare(` SELECT id, name, device_prefix FROM api_keys WHERE key = ? `); - const result = stmt.get(apiKey); + const result = stmt.get(apiKey); - if (result) { - // Update last_used_at timestamp - db.prepare(` + if (result) { + // Update last_used_at timestamp + db.prepare(` UPDATE api_keys SET last_used_at = datetime('now') WHERE id = ? `).run(result.id); - } + } - return result || null; + return result || null; } /** @@ -36,97 +36,103 @@ export function validateApiKey(db, apiKey) { * @returns {string} - The generated API key */ export function generateApiKey(db, name, devicePrefix) { - const key = crypto.randomBytes(32).toString('hex'); + const key = crypto.randomBytes(32).toString('hex'); - db.prepare(` + db.prepare(` INSERT INTO api_keys (key, name, device_prefix) VALUES (?, ?, ?) `).run(key, name, devicePrefix); - return key; + return key; } /** - * Insert sensor readings into the database + * Insert sensor readings with RLE (Run-Length Encoding) logic * @param {Database} db - SQLite database instance * @param {string} devicePrefix - Prefix to prepend to device names - * @param {Array} readings - Array of {device, channel, value} objects - * @param {Date} timestamp - Timestamp for all readings (defaults to now) + * @param {Array} readings - Array of readings + * @param {Date} timestamp - Timestamp for all readings */ -export function insertReadings(db, devicePrefix, readings, timestamp = new Date()) { - const isoTimestamp = timestamp.toISOString(); +export function insertReadingsSmart(db, devicePrefix, readings, timestamp = new Date()) { + const isoTimestamp = timestamp.toISOString(); - const stmt = db.prepare(` - INSERT INTO sensor_data (timestamp, device, channel, value) - VALUES (?, ?, ?, ?) - `); + const stmtLast = db.prepare(` + SELECT id, value, data, data_type + FROM sensor_events + WHERE device = ? AND channel = ? + ORDER BY timestamp DESC + LIMIT 1 + `); - const insertMany = db.transaction((items) => { - for (const reading of items) { - const fullDevice = `${devicePrefix}${reading.device}`; - stmt.run(isoTimestamp, fullDevice, reading.channel, reading.value); + const stmtUpdate = db.prepare(` + UPDATE sensor_events SET until = ? WHERE id = ? + `); + + const stmtInsert = db.prepare(` + INSERT INTO sensor_events (timestamp, until, device, channel, value, data, data_type) + VALUES (?, NULL, ?, ?, ?, ?, ?) + `); + + const transaction = db.transaction((items) => { + let inserted = 0; + let updated = 0; + + for (const reading of items) { + const fullDevice = `${devicePrefix}${reading.device}`; + const channel = reading.channel; + + // Determine type and values + let dataType = 'number'; + let value = null; + let data = null; + + if (reading.value !== undefined && reading.value !== null) { + dataType = 'number'; + value = reading.value; + } else if (reading.data !== undefined) { + dataType = 'json'; + data = typeof reading.data === 'string' ? reading.data : JSON.stringify(reading.data); + } else { + continue; // Skip invalid + } + + // Check last reading for RLE + const last = stmtLast.get(fullDevice, channel); + let isDuplicate = false; + + if (last && last.data_type === dataType) { + if (dataType === 'number') { + // Compare defined numbers with small epsilon? Or exact match? + // For sensors, exact match is typical for RLE if "identical". + if (Math.abs(last.value - value) < Number.EPSILON) { + isDuplicate = true; + } + } else { + // Compare JSON strings + if (last.data === data) { + isDuplicate = true; + } } - }); + } - insertMany(readings); - return readings.length; + if (isDuplicate) { + stmtUpdate.run(isoTimestamp, last.id); + updated++; + } else { + stmtInsert.run(isoTimestamp, fullDevice, channel, value, data, dataType); + inserted++; + } + } + return { inserted, updated }; + }); + + return transaction(readings); } -/** - * Aggregate raw data into 10-minute buckets - * @param {Database} db - SQLite database instance - * @returns {number} - Number of aggregated records created - */ -export function aggregate10Minutes(db) { - // Get the cutoff time (10 minutes ago, rounded down to 10-min boundary) - const now = new Date(); - const cutoff = new Date(Math.floor(now.getTime() / 600000) * 600000 - 600000); - const cutoffISO = cutoff.toISOString(); +// Temporary stubs for aggregators until they are redesigned for the new schema +export function aggregate10Minutes(db) { return 0; } +export function aggregate1Hour(db) { return 0; } - const result = db.prepare(` - INSERT OR REPLACE INTO sensor_data_10m (timestamp, device, channel, value, sample_count) - SELECT - datetime(strftime('%s', timestamp) / 600 * 600, 'unixepoch') as bucket, - device, - channel, - AVG(value) as avg_value, - COUNT(*) as sample_count - FROM sensor_data - WHERE timestamp < ? - AND timestamp >= datetime(?, '-1 hour') - GROUP BY bucket, device, channel - `).run(cutoffISO, cutoffISO); - - return result.changes; -} - -/** - * Aggregate 10-minute data into 1-hour buckets - * @param {Database} db - SQLite database instance - * @returns {number} - Number of aggregated records created - */ -export function aggregate1Hour(db) { - // Get the cutoff time (1 hour ago, rounded down to hour boundary) - const now = new Date(); - const cutoff = new Date(Math.floor(now.getTime() / 3600000) * 3600000 - 3600000); - const cutoffISO = cutoff.toISOString(); - - const result = db.prepare(` - INSERT OR REPLACE INTO sensor_data_1h (timestamp, device, channel, value, sample_count) - SELECT - datetime(strftime('%s', timestamp) / 3600 * 3600, 'unixepoch') as bucket, - device, - channel, - SUM(value * sample_count) / SUM(sample_count) as weighted_avg, - SUM(sample_count) as total_samples - FROM sensor_data_10m - WHERE timestamp < ? - AND timestamp >= datetime(?, '-1 day') - GROUP BY bucket, device, channel - `).run(cutoffISO, cutoffISO); - - return result.changes; -} /** * Clean up old data according to retention policy @@ -134,24 +140,17 @@ export function aggregate1Hour(db) { * @returns {object} - Number of deleted records per table */ export function cleanupOldData(db) { - const now = new Date(); + const now = new Date(); - // Delete raw data older than 7 days - const weekAgo = new Date(now.getTime() - 7 * 24 * 60 * 60 * 1000); - const rawDeleted = db.prepare(` - DELETE FROM sensor_data WHERE timestamp < ? - `).run(weekAgo.toISOString()); - - // Delete 10-minute data older than 30 days - const monthAgo = new Date(now.getTime() - 30 * 24 * 60 * 60 * 1000); - const aggDeleted = db.prepare(` - DELETE FROM sensor_data_10m WHERE timestamp < ? + // Delete events older than 30 days + const monthAgo = new Date(now.getTime() - 30 * 24 * 60 * 60 * 1000); + const eventsDeleted = db.prepare(` + DELETE FROM sensor_events WHERE timestamp < ? `).run(monthAgo.toISOString()); - return { - rawDeleted: rawDeleted.changes, - aggregatedDeleted: aggDeleted.changes - }; + return { + eventsDeleted: eventsDeleted.changes + }; } /** @@ -160,7 +159,7 @@ export function cleanupOldData(db) { * @returns {Array} - List of API key metadata */ export function listApiKeys(db) { - return db.prepare(` + return db.prepare(` SELECT id, name, device_prefix, created_at, last_used_at, substr(key, 1, 8) || '...' as key_preview FROM api_keys @@ -169,11 +168,11 @@ export function listApiKeys(db) { } export default { - validateApiKey, - generateApiKey, - insertReadings, - aggregate10Minutes, - aggregate1Hour, - cleanupOldData, - listApiKeys + validateApiKey, + generateApiKey, + insertReadingsSmart, + aggregate10Minutes, + aggregate1Hour, + cleanupOldData, + listApiKeys }; diff --git a/server/src/db/schema.js b/server/src/db/schema.js index 7f27615..1ea835d 100644 --- a/server/src/db/schema.js +++ b/server/src/db/schema.js @@ -12,20 +12,20 @@ const __dirname = dirname(__filename); * @returns {Database} - The initialized database instance */ export function initDatabase(dbPath) { - // Ensure data directory exists - const dataDir = dirname(dbPath); - if (!existsSync(dataDir)) { - mkdirSync(dataDir, { recursive: true }); - } + // Ensure data directory exists + const dataDir = dirname(dbPath); + if (!existsSync(dataDir)) { + mkdirSync(dataDir, { recursive: true }); + } - const db = new Database(dbPath); + const db = new Database(dbPath); - // Enable WAL mode for better concurrent performance - db.pragma('journal_mode = WAL'); + // Enable WAL mode for better concurrent performance + db.pragma('journal_mode = WAL'); - // Create tables - db.exec(` - -- API keys for agent authentication + // Create tables + // API keys for agent authentication + db.exec(` CREATE TABLE IF NOT EXISTS api_keys ( id INTEGER PRIMARY KEY AUTOINCREMENT, key TEXT UNIQUE NOT NULL, @@ -34,53 +34,35 @@ export function initDatabase(dbPath) { created_at DATETIME DEFAULT CURRENT_TIMESTAMP, last_used_at DATETIME ); + `); - -- Raw sensor data (1-minute resolution, kept for 1 week) - CREATE TABLE IF NOT EXISTS sensor_data ( + // --- MIGRATION: Drop old tables if they exist --- + // User requested deleting old sensor data but keeping keys. + db.exec(` + DROP TABLE IF EXISTS sensor_data; + DROP TABLE IF EXISTS sensor_data_10m; + DROP TABLE IF EXISTS sensor_data_1h; + `); + + // --- NEW SCHEMA: Sensor Events with RLE support --- + db.exec(` + CREATE TABLE IF NOT EXISTS sensor_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp DATETIME NOT NULL, + until DATETIME, -- NULL if point, Time if duplicated range end device TEXT NOT NULL, channel TEXT NOT NULL, - value REAL NOT NULL + value REAL, -- Nullable + data TEXT, -- Nullable (JSON) + data_type TEXT NOT NULL -- 'number' or 'json' ); - -- Index for time-based queries and cleanup - CREATE INDEX IF NOT EXISTS idx_sensor_data_time - ON sensor_data(timestamp); - - -- Index for device/channel queries - CREATE INDEX IF NOT EXISTS idx_sensor_data_device - ON sensor_data(device, channel, timestamp); - - -- 10-minute aggregated data (kept for 1 month) - CREATE TABLE IF NOT EXISTS sensor_data_10m ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - timestamp DATETIME NOT NULL, - device TEXT NOT NULL, - channel TEXT NOT NULL, - value REAL NOT NULL, - sample_count INTEGER NOT NULL - ); - - CREATE UNIQUE INDEX IF NOT EXISTS idx_sensor_data_10m_unique - ON sensor_data_10m(timestamp, device, channel); - - -- 1-hour aggregated data (kept forever) - CREATE TABLE IF NOT EXISTS sensor_data_1h ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - timestamp DATETIME NOT NULL, - device TEXT NOT NULL, - channel TEXT NOT NULL, - value REAL NOT NULL, - sample_count INTEGER NOT NULL - ); - - CREATE UNIQUE INDEX IF NOT EXISTS idx_sensor_data_1h_unique - ON sensor_data_1h(timestamp, device, channel); + CREATE INDEX IF NOT EXISTS idx_sensor_events_search + ON sensor_events(device, channel, timestamp); `); - console.log('[DB] Database initialized successfully'); - return db; + console.log('[DB] Database initialized successfully'); + return db; } export default { initDatabase }; diff --git a/server/src/websocket/server.js b/server/src/websocket/server.js index 5c230d5..da81b32 100644 --- a/server/src/websocket/server.js +++ b/server/src/websocket/server.js @@ -1,5 +1,5 @@ import { WebSocketServer } from 'ws'; -import { validateApiKey, insertReadings } from '../db/queries.js'; +import { validateApiKey, insertReadingsSmart } from '../db/queries.js'; /** * Create and configure the WebSocket server @@ -153,30 +153,39 @@ function handleData(ws, message, clientState, db) { // Validate readings format for (const reading of readings) { - // We strictly require device, channel, and value. - // If value is missing (undefined), we skip it, even if 'data' is present. - if (!reading.device || !reading.channel || reading.value === undefined) { - console.warn(`[WS] Skipped invalid reading from ${clientState.name || 'unknown'}:`, JSON.stringify(reading)); + // We require device, channel, and EITHER value (number) OR data (json) + if (!reading.device || !reading.channel) { + console.warn(`[WS] Skipped invalid reading (missing device/channel) from ${clientState.name}:`, JSON.stringify(reading)); skippedCount++; continue; } + + const hasValue = reading.value !== undefined && reading.value !== null; + const hasData = reading.data !== undefined; + + if (!hasValue && !hasData) { + console.warn(`[WS] Skipped invalid reading (no value/data) from ${clientState.name}:`, JSON.stringify(reading)); + skippedCount++; + continue; + } + validReadings.push(reading); } if (validReadings.length === 0) { if (skippedCount > 0) { - // Acknowledge receipt even if all were skipped, so the agent doesn't retry endlessly if it thinks it's a temp error. - // But here the agent probably doesn't handle acks effectively anyway. - console.log(`[WS] Received ${skippedCount} readings, but all were valid (non-numeric data dropped).`); + console.log(`[WS] Received ${skippedCount} readings, but all were invalid.`); return send(ws, { type: 'ack', count: 0 }); } return sendError(ws, 'No valid readings found in batch'); } try { - const count = insertReadings(db, clientState.devicePrefix, validReadings); + const result = insertReadingsSmart(db, clientState.devicePrefix, validReadings); + const count = result.inserted + result.updated; + if (skippedCount > 0) { - console.log(`[WS] Inserted ${count} valid readings (skipped ${skippedCount} invalid/non-numeric readings).`); + console.log(`[WS] Processed ${count} readings (inserted: ${result.inserted}, updated: ${result.updated}, skipped: ${skippedCount}).`); } send(ws, { type: 'ack', count }); } catch (err) {