u
This commit is contained in:
@@ -35,14 +35,15 @@ export class ACInfinityClient {
|
||||
|
||||
async login() {
|
||||
try {
|
||||
// AC Infinity API does not accept passwords greater than 25 characters
|
||||
const normalizedPassword = this.password.substring(0, 25);
|
||||
// AC Infinity API does not accept passwords greater than 25 characters - UPDATE: Reference impl uses full password?
|
||||
// const normalizedPassword = this.password.substring(0, 25);
|
||||
const normalizedPassword = this.password;
|
||||
|
||||
const response = await fetch(`${this.host}${API_URL_LOGIN}`, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'User-Agent': 'ACController/1.9.7 (com.acinfinity.humiture; build:533; iOS 18.5.0) Alamofire/5.10.2',
|
||||
'Content-Type': 'application/x-www-form-urlencoded; charset=utf-8',
|
||||
'Content-Type': 'application/x-www-form-urlencoded',
|
||||
},
|
||||
body: new URLSearchParams({
|
||||
appEmail: this.email,
|
||||
@@ -63,6 +64,7 @@ export class ACInfinityClient {
|
||||
console.log('[AC] Successfully logged in to AC Infinity API');
|
||||
return this.userId;
|
||||
} catch (error) {
|
||||
console.error('[AC] Login error details:', error); // Added detailed logging
|
||||
if (error instanceof ACInfinityClientError) {
|
||||
throw error;
|
||||
}
|
||||
@@ -129,6 +131,9 @@ export class ACInfinityClient {
|
||||
for (const device of devices) {
|
||||
const devId = device.devId;
|
||||
const devName = device.devName || `device-${devId}`;
|
||||
// Use deviceInfo if available (newer API structure), otherwise fallback to root/devSettings
|
||||
const info = device.deviceInfo || device;
|
||||
const settings = device.devSettings || info;
|
||||
|
||||
// Normalize device name for use as identifier
|
||||
const deviceId = devName
|
||||
@@ -136,52 +141,65 @@ export class ACInfinityClient {
|
||||
.replace(/[^a-z0-9]+/g, '-')
|
||||
.replace(/^-|-$/g, '');
|
||||
|
||||
// Extract sensor data from device settings or sensor fields
|
||||
// Temperature is stored as Celsius * 100
|
||||
if (device.devSettings?.temperature !== undefined) {
|
||||
// --- Device Level Sensors ---
|
||||
|
||||
// Temperature (Celsius * 100)
|
||||
if (info.temperature !== undefined) {
|
||||
readings.push({
|
||||
device: deviceId,
|
||||
channel: 'temperature',
|
||||
value: device.devSettings.temperature / 100,
|
||||
value: info.temperature / 100,
|
||||
});
|
||||
} else if (device.temperature !== undefined) {
|
||||
} else if (settings.temperature !== undefined) {
|
||||
readings.push({
|
||||
device: deviceId,
|
||||
channel: 'temperature',
|
||||
value: device.temperature / 100,
|
||||
value: settings.temperature / 100,
|
||||
});
|
||||
}
|
||||
|
||||
// Humidity is stored as % * 100
|
||||
if (device.devSettings?.humidity !== undefined) {
|
||||
// Humidity (% * 100)
|
||||
if (info.humidity !== undefined) {
|
||||
readings.push({
|
||||
device: deviceId,
|
||||
channel: 'humidity',
|
||||
value: device.devSettings.humidity / 100,
|
||||
value: info.humidity / 100,
|
||||
});
|
||||
} else if (device.humidity !== undefined) {
|
||||
} else if (settings.humidity !== undefined) {
|
||||
readings.push({
|
||||
device: deviceId,
|
||||
channel: 'humidity',
|
||||
value: device.humidity / 100,
|
||||
value: settings.humidity / 100,
|
||||
});
|
||||
}
|
||||
|
||||
// VPD if available
|
||||
if (device.devSettings?.vpdnums !== undefined) {
|
||||
// VPD
|
||||
if (info.vpdnums !== undefined) {
|
||||
readings.push({
|
||||
device: deviceId,
|
||||
channel: 'vpd',
|
||||
value: device.devSettings.vpdnums / 100,
|
||||
value: info.vpdnums / 100,
|
||||
});
|
||||
} else if (settings.vpdnums !== undefined) {
|
||||
readings.push({
|
||||
device: deviceId,
|
||||
channel: 'vpd',
|
||||
value: settings.vpdnums / 100,
|
||||
});
|
||||
}
|
||||
|
||||
// Check for port-level sensors (some controllers have multiple ports)
|
||||
if (device.devPortList && Array.isArray(device.devPortList)) {
|
||||
for (const port of device.devPortList) {
|
||||
const portId = port.portId || port.port;
|
||||
const portDeviceId = `${deviceId}-port${portId}`;
|
||||
// --- Port Level Sensors/State ---
|
||||
const ports = info.ports || device.devPortList;
|
||||
if (ports && Array.isArray(ports)) {
|
||||
for (const port of ports) {
|
||||
const portId = port.port || port.portId;
|
||||
const portName = port.portName || `port${portId}`;
|
||||
// Create a descriptive suffix for the port device, e.g. "wall-fan" or "wall-port1"
|
||||
// If portName is generic "Port X", use number. If it's specific "Fan", use that.
|
||||
const suffix = portName.toLowerCase().replace(/[^a-z0-9]+/g, '-');
|
||||
const portDeviceId = `${deviceId}-${suffix}`;
|
||||
|
||||
// Port specific sensors (if any - sometimes temp usually on device)
|
||||
if (port.temperature !== undefined) {
|
||||
readings.push({
|
||||
device: portDeviceId,
|
||||
@@ -189,7 +207,6 @@ export class ACInfinityClient {
|
||||
value: port.temperature / 100,
|
||||
});
|
||||
}
|
||||
|
||||
if (port.humidity !== undefined) {
|
||||
readings.push({
|
||||
device: portDeviceId,
|
||||
@@ -197,6 +214,15 @@ export class ACInfinityClient {
|
||||
value: port.humidity / 100,
|
||||
});
|
||||
}
|
||||
|
||||
// Level / Speed (speak)
|
||||
if (port.speak !== undefined) {
|
||||
readings.push({
|
||||
device: portDeviceId,
|
||||
channel: 'level',
|
||||
value: port.speak,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,5 +21,5 @@ export default {
|
||||
pollIntervalMs: parseInt(process.env.POLL_INTERVAL_MS || '60000', 10),
|
||||
|
||||
// AC Infinity API
|
||||
acApiHost: process.env.AC_API_HOST || 'https://www.acinfinity.com',
|
||||
acApiHost: process.env.AC_API_HOST || 'http://www.acinfinityserver.com',
|
||||
};
|
||||
|
||||
@@ -11,21 +11,21 @@ import crypto from 'crypto';
|
||||
* @returns {object|null} - API key metadata or null if invalid
|
||||
*/
|
||||
export function validateApiKey(db, apiKey) {
|
||||
const stmt = db.prepare(`
|
||||
const stmt = db.prepare(`
|
||||
SELECT id, name, device_prefix
|
||||
FROM api_keys
|
||||
WHERE key = ?
|
||||
`);
|
||||
const result = stmt.get(apiKey);
|
||||
const result = stmt.get(apiKey);
|
||||
|
||||
if (result) {
|
||||
// Update last_used_at timestamp
|
||||
db.prepare(`
|
||||
if (result) {
|
||||
// Update last_used_at timestamp
|
||||
db.prepare(`
|
||||
UPDATE api_keys SET last_used_at = datetime('now') WHERE id = ?
|
||||
`).run(result.id);
|
||||
}
|
||||
}
|
||||
|
||||
return result || null;
|
||||
return result || null;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -36,97 +36,103 @@ export function validateApiKey(db, apiKey) {
|
||||
* @returns {string} - The generated API key
|
||||
*/
|
||||
export function generateApiKey(db, name, devicePrefix) {
|
||||
const key = crypto.randomBytes(32).toString('hex');
|
||||
const key = crypto.randomBytes(32).toString('hex');
|
||||
|
||||
db.prepare(`
|
||||
db.prepare(`
|
||||
INSERT INTO api_keys (key, name, device_prefix)
|
||||
VALUES (?, ?, ?)
|
||||
`).run(key, name, devicePrefix);
|
||||
|
||||
return key;
|
||||
return key;
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert sensor readings into the database
|
||||
* Insert sensor readings with RLE (Run-Length Encoding) logic
|
||||
* @param {Database} db - SQLite database instance
|
||||
* @param {string} devicePrefix - Prefix to prepend to device names
|
||||
* @param {Array} readings - Array of {device, channel, value} objects
|
||||
* @param {Date} timestamp - Timestamp for all readings (defaults to now)
|
||||
* @param {Array} readings - Array of readings
|
||||
* @param {Date} timestamp - Timestamp for all readings
|
||||
*/
|
||||
export function insertReadings(db, devicePrefix, readings, timestamp = new Date()) {
|
||||
const isoTimestamp = timestamp.toISOString();
|
||||
export function insertReadingsSmart(db, devicePrefix, readings, timestamp = new Date()) {
|
||||
const isoTimestamp = timestamp.toISOString();
|
||||
|
||||
const stmt = db.prepare(`
|
||||
INSERT INTO sensor_data (timestamp, device, channel, value)
|
||||
VALUES (?, ?, ?, ?)
|
||||
`);
|
||||
const stmtLast = db.prepare(`
|
||||
SELECT id, value, data, data_type
|
||||
FROM sensor_events
|
||||
WHERE device = ? AND channel = ?
|
||||
ORDER BY timestamp DESC
|
||||
LIMIT 1
|
||||
`);
|
||||
|
||||
const insertMany = db.transaction((items) => {
|
||||
for (const reading of items) {
|
||||
const fullDevice = `${devicePrefix}${reading.device}`;
|
||||
stmt.run(isoTimestamp, fullDevice, reading.channel, reading.value);
|
||||
const stmtUpdate = db.prepare(`
|
||||
UPDATE sensor_events SET until = ? WHERE id = ?
|
||||
`);
|
||||
|
||||
const stmtInsert = db.prepare(`
|
||||
INSERT INTO sensor_events (timestamp, until, device, channel, value, data, data_type)
|
||||
VALUES (?, NULL, ?, ?, ?, ?, ?)
|
||||
`);
|
||||
|
||||
const transaction = db.transaction((items) => {
|
||||
let inserted = 0;
|
||||
let updated = 0;
|
||||
|
||||
for (const reading of items) {
|
||||
const fullDevice = `${devicePrefix}${reading.device}`;
|
||||
const channel = reading.channel;
|
||||
|
||||
// Determine type and values
|
||||
let dataType = 'number';
|
||||
let value = null;
|
||||
let data = null;
|
||||
|
||||
if (reading.value !== undefined && reading.value !== null) {
|
||||
dataType = 'number';
|
||||
value = reading.value;
|
||||
} else if (reading.data !== undefined) {
|
||||
dataType = 'json';
|
||||
data = typeof reading.data === 'string' ? reading.data : JSON.stringify(reading.data);
|
||||
} else {
|
||||
continue; // Skip invalid
|
||||
}
|
||||
|
||||
// Check last reading for RLE
|
||||
const last = stmtLast.get(fullDevice, channel);
|
||||
let isDuplicate = false;
|
||||
|
||||
if (last && last.data_type === dataType) {
|
||||
if (dataType === 'number') {
|
||||
// Compare defined numbers with small epsilon? Or exact match?
|
||||
// For sensors, exact match is typical for RLE if "identical".
|
||||
if (Math.abs(last.value - value) < Number.EPSILON) {
|
||||
isDuplicate = true;
|
||||
}
|
||||
} else {
|
||||
// Compare JSON strings
|
||||
if (last.data === data) {
|
||||
isDuplicate = true;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
insertMany(readings);
|
||||
return readings.length;
|
||||
if (isDuplicate) {
|
||||
stmtUpdate.run(isoTimestamp, last.id);
|
||||
updated++;
|
||||
} else {
|
||||
stmtInsert.run(isoTimestamp, fullDevice, channel, value, data, dataType);
|
||||
inserted++;
|
||||
}
|
||||
}
|
||||
return { inserted, updated };
|
||||
});
|
||||
|
||||
return transaction(readings);
|
||||
}
|
||||
|
||||
/**
|
||||
* Aggregate raw data into 10-minute buckets
|
||||
* @param {Database} db - SQLite database instance
|
||||
* @returns {number} - Number of aggregated records created
|
||||
*/
|
||||
export function aggregate10Minutes(db) {
|
||||
// Get the cutoff time (10 minutes ago, rounded down to 10-min boundary)
|
||||
const now = new Date();
|
||||
const cutoff = new Date(Math.floor(now.getTime() / 600000) * 600000 - 600000);
|
||||
const cutoffISO = cutoff.toISOString();
|
||||
// Temporary stubs for aggregators until they are redesigned for the new schema
|
||||
export function aggregate10Minutes(db) { return 0; }
|
||||
export function aggregate1Hour(db) { return 0; }
|
||||
|
||||
const result = db.prepare(`
|
||||
INSERT OR REPLACE INTO sensor_data_10m (timestamp, device, channel, value, sample_count)
|
||||
SELECT
|
||||
datetime(strftime('%s', timestamp) / 600 * 600, 'unixepoch') as bucket,
|
||||
device,
|
||||
channel,
|
||||
AVG(value) as avg_value,
|
||||
COUNT(*) as sample_count
|
||||
FROM sensor_data
|
||||
WHERE timestamp < ?
|
||||
AND timestamp >= datetime(?, '-1 hour')
|
||||
GROUP BY bucket, device, channel
|
||||
`).run(cutoffISO, cutoffISO);
|
||||
|
||||
return result.changes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Aggregate 10-minute data into 1-hour buckets
|
||||
* @param {Database} db - SQLite database instance
|
||||
* @returns {number} - Number of aggregated records created
|
||||
*/
|
||||
export function aggregate1Hour(db) {
|
||||
// Get the cutoff time (1 hour ago, rounded down to hour boundary)
|
||||
const now = new Date();
|
||||
const cutoff = new Date(Math.floor(now.getTime() / 3600000) * 3600000 - 3600000);
|
||||
const cutoffISO = cutoff.toISOString();
|
||||
|
||||
const result = db.prepare(`
|
||||
INSERT OR REPLACE INTO sensor_data_1h (timestamp, device, channel, value, sample_count)
|
||||
SELECT
|
||||
datetime(strftime('%s', timestamp) / 3600 * 3600, 'unixepoch') as bucket,
|
||||
device,
|
||||
channel,
|
||||
SUM(value * sample_count) / SUM(sample_count) as weighted_avg,
|
||||
SUM(sample_count) as total_samples
|
||||
FROM sensor_data_10m
|
||||
WHERE timestamp < ?
|
||||
AND timestamp >= datetime(?, '-1 day')
|
||||
GROUP BY bucket, device, channel
|
||||
`).run(cutoffISO, cutoffISO);
|
||||
|
||||
return result.changes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean up old data according to retention policy
|
||||
@@ -134,24 +140,17 @@ export function aggregate1Hour(db) {
|
||||
* @returns {object} - Number of deleted records per table
|
||||
*/
|
||||
export function cleanupOldData(db) {
|
||||
const now = new Date();
|
||||
const now = new Date();
|
||||
|
||||
// Delete raw data older than 7 days
|
||||
const weekAgo = new Date(now.getTime() - 7 * 24 * 60 * 60 * 1000);
|
||||
const rawDeleted = db.prepare(`
|
||||
DELETE FROM sensor_data WHERE timestamp < ?
|
||||
`).run(weekAgo.toISOString());
|
||||
|
||||
// Delete 10-minute data older than 30 days
|
||||
const monthAgo = new Date(now.getTime() - 30 * 24 * 60 * 60 * 1000);
|
||||
const aggDeleted = db.prepare(`
|
||||
DELETE FROM sensor_data_10m WHERE timestamp < ?
|
||||
// Delete events older than 30 days
|
||||
const monthAgo = new Date(now.getTime() - 30 * 24 * 60 * 60 * 1000);
|
||||
const eventsDeleted = db.prepare(`
|
||||
DELETE FROM sensor_events WHERE timestamp < ?
|
||||
`).run(monthAgo.toISOString());
|
||||
|
||||
return {
|
||||
rawDeleted: rawDeleted.changes,
|
||||
aggregatedDeleted: aggDeleted.changes
|
||||
};
|
||||
return {
|
||||
eventsDeleted: eventsDeleted.changes
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -160,7 +159,7 @@ export function cleanupOldData(db) {
|
||||
* @returns {Array} - List of API key metadata
|
||||
*/
|
||||
export function listApiKeys(db) {
|
||||
return db.prepare(`
|
||||
return db.prepare(`
|
||||
SELECT id, name, device_prefix, created_at, last_used_at,
|
||||
substr(key, 1, 8) || '...' as key_preview
|
||||
FROM api_keys
|
||||
@@ -169,11 +168,11 @@ export function listApiKeys(db) {
|
||||
}
|
||||
|
||||
export default {
|
||||
validateApiKey,
|
||||
generateApiKey,
|
||||
insertReadings,
|
||||
aggregate10Minutes,
|
||||
aggregate1Hour,
|
||||
cleanupOldData,
|
||||
listApiKeys
|
||||
validateApiKey,
|
||||
generateApiKey,
|
||||
insertReadingsSmart,
|
||||
aggregate10Minutes,
|
||||
aggregate1Hour,
|
||||
cleanupOldData,
|
||||
listApiKeys
|
||||
};
|
||||
|
||||
@@ -12,20 +12,20 @@ const __dirname = dirname(__filename);
|
||||
* @returns {Database} - The initialized database instance
|
||||
*/
|
||||
export function initDatabase(dbPath) {
|
||||
// Ensure data directory exists
|
||||
const dataDir = dirname(dbPath);
|
||||
if (!existsSync(dataDir)) {
|
||||
mkdirSync(dataDir, { recursive: true });
|
||||
}
|
||||
// Ensure data directory exists
|
||||
const dataDir = dirname(dbPath);
|
||||
if (!existsSync(dataDir)) {
|
||||
mkdirSync(dataDir, { recursive: true });
|
||||
}
|
||||
|
||||
const db = new Database(dbPath);
|
||||
const db = new Database(dbPath);
|
||||
|
||||
// Enable WAL mode for better concurrent performance
|
||||
db.pragma('journal_mode = WAL');
|
||||
// Enable WAL mode for better concurrent performance
|
||||
db.pragma('journal_mode = WAL');
|
||||
|
||||
// Create tables
|
||||
db.exec(`
|
||||
-- API keys for agent authentication
|
||||
// Create tables
|
||||
// API keys for agent authentication
|
||||
db.exec(`
|
||||
CREATE TABLE IF NOT EXISTS api_keys (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
key TEXT UNIQUE NOT NULL,
|
||||
@@ -34,53 +34,35 @@ export function initDatabase(dbPath) {
|
||||
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||
last_used_at DATETIME
|
||||
);
|
||||
`);
|
||||
|
||||
-- Raw sensor data (1-minute resolution, kept for 1 week)
|
||||
CREATE TABLE IF NOT EXISTS sensor_data (
|
||||
// --- MIGRATION: Drop old tables if they exist ---
|
||||
// User requested deleting old sensor data but keeping keys.
|
||||
db.exec(`
|
||||
DROP TABLE IF EXISTS sensor_data;
|
||||
DROP TABLE IF EXISTS sensor_data_10m;
|
||||
DROP TABLE IF EXISTS sensor_data_1h;
|
||||
`);
|
||||
|
||||
// --- NEW SCHEMA: Sensor Events with RLE support ---
|
||||
db.exec(`
|
||||
CREATE TABLE IF NOT EXISTS sensor_events (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
timestamp DATETIME NOT NULL,
|
||||
until DATETIME, -- NULL if point, Time if duplicated range end
|
||||
device TEXT NOT NULL,
|
||||
channel TEXT NOT NULL,
|
||||
value REAL NOT NULL
|
||||
value REAL, -- Nullable
|
||||
data TEXT, -- Nullable (JSON)
|
||||
data_type TEXT NOT NULL -- 'number' or 'json'
|
||||
);
|
||||
|
||||
-- Index for time-based queries and cleanup
|
||||
CREATE INDEX IF NOT EXISTS idx_sensor_data_time
|
||||
ON sensor_data(timestamp);
|
||||
|
||||
-- Index for device/channel queries
|
||||
CREATE INDEX IF NOT EXISTS idx_sensor_data_device
|
||||
ON sensor_data(device, channel, timestamp);
|
||||
|
||||
-- 10-minute aggregated data (kept for 1 month)
|
||||
CREATE TABLE IF NOT EXISTS sensor_data_10m (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
timestamp DATETIME NOT NULL,
|
||||
device TEXT NOT NULL,
|
||||
channel TEXT NOT NULL,
|
||||
value REAL NOT NULL,
|
||||
sample_count INTEGER NOT NULL
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS idx_sensor_data_10m_unique
|
||||
ON sensor_data_10m(timestamp, device, channel);
|
||||
|
||||
-- 1-hour aggregated data (kept forever)
|
||||
CREATE TABLE IF NOT EXISTS sensor_data_1h (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
timestamp DATETIME NOT NULL,
|
||||
device TEXT NOT NULL,
|
||||
channel TEXT NOT NULL,
|
||||
value REAL NOT NULL,
|
||||
sample_count INTEGER NOT NULL
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS idx_sensor_data_1h_unique
|
||||
ON sensor_data_1h(timestamp, device, channel);
|
||||
CREATE INDEX IF NOT EXISTS idx_sensor_events_search
|
||||
ON sensor_events(device, channel, timestamp);
|
||||
`);
|
||||
|
||||
console.log('[DB] Database initialized successfully');
|
||||
return db;
|
||||
console.log('[DB] Database initialized successfully');
|
||||
return db;
|
||||
}
|
||||
|
||||
export default { initDatabase };
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { WebSocketServer } from 'ws';
|
||||
import { validateApiKey, insertReadings } from '../db/queries.js';
|
||||
import { validateApiKey, insertReadingsSmart } from '../db/queries.js';
|
||||
|
||||
/**
|
||||
* Create and configure the WebSocket server
|
||||
@@ -153,30 +153,39 @@ function handleData(ws, message, clientState, db) {
|
||||
|
||||
// Validate readings format
|
||||
for (const reading of readings) {
|
||||
// We strictly require device, channel, and value.
|
||||
// If value is missing (undefined), we skip it, even if 'data' is present.
|
||||
if (!reading.device || !reading.channel || reading.value === undefined) {
|
||||
console.warn(`[WS] Skipped invalid reading from ${clientState.name || 'unknown'}:`, JSON.stringify(reading));
|
||||
// We require device, channel, and EITHER value (number) OR data (json)
|
||||
if (!reading.device || !reading.channel) {
|
||||
console.warn(`[WS] Skipped invalid reading (missing device/channel) from ${clientState.name}:`, JSON.stringify(reading));
|
||||
skippedCount++;
|
||||
continue;
|
||||
}
|
||||
|
||||
const hasValue = reading.value !== undefined && reading.value !== null;
|
||||
const hasData = reading.data !== undefined;
|
||||
|
||||
if (!hasValue && !hasData) {
|
||||
console.warn(`[WS] Skipped invalid reading (no value/data) from ${clientState.name}:`, JSON.stringify(reading));
|
||||
skippedCount++;
|
||||
continue;
|
||||
}
|
||||
|
||||
validReadings.push(reading);
|
||||
}
|
||||
|
||||
if (validReadings.length === 0) {
|
||||
if (skippedCount > 0) {
|
||||
// Acknowledge receipt even if all were skipped, so the agent doesn't retry endlessly if it thinks it's a temp error.
|
||||
// But here the agent probably doesn't handle acks effectively anyway.
|
||||
console.log(`[WS] Received ${skippedCount} readings, but all were valid (non-numeric data dropped).`);
|
||||
console.log(`[WS] Received ${skippedCount} readings, but all were invalid.`);
|
||||
return send(ws, { type: 'ack', count: 0 });
|
||||
}
|
||||
return sendError(ws, 'No valid readings found in batch');
|
||||
}
|
||||
|
||||
try {
|
||||
const count = insertReadings(db, clientState.devicePrefix, validReadings);
|
||||
const result = insertReadingsSmart(db, clientState.devicePrefix, validReadings);
|
||||
const count = result.inserted + result.updated;
|
||||
|
||||
if (skippedCount > 0) {
|
||||
console.log(`[WS] Inserted ${count} valid readings (skipped ${skippedCount} invalid/non-numeric readings).`);
|
||||
console.log(`[WS] Processed ${count} readings (inserted: ${result.inserted}, updated: ${result.updated}, skipped: ${skippedCount}).`);
|
||||
}
|
||||
send(ws, { type: 'ack', count });
|
||||
} catch (err) {
|
||||
|
||||
Reference in New Issue
Block a user