feat: Implement a rule engine for scriptable device automation, including an example timer light rule.
This commit is contained in:
3
.gitignore
vendored
3
.gitignore
vendored
@@ -1,3 +1,4 @@
|
|||||||
node_modules/
|
node_modules/
|
||||||
logs/
|
logs/
|
||||||
devices.db
|
devices.db
|
||||||
|
rules/timer_state.json
|
||||||
@@ -1,5 +1,5 @@
|
|||||||
{
|
{
|
||||||
"SNDC-0D4P10WW": {
|
"SNDC-0D4P10WW-light": {
|
||||||
"name": "shellyplusrgbwpm",
|
"name": "shellyplusrgbwpm",
|
||||||
"gen": 2,
|
"gen": 2,
|
||||||
"inputs": 4,
|
"inputs": 4,
|
||||||
|
|||||||
186
rule_engine.js
Normal file
186
rule_engine.js
Normal file
@@ -0,0 +1,186 @@
|
|||||||
|
/**
|
||||||
|
* Rule Engine
|
||||||
|
* Loads and executes JS rule scripts from the rules/ directory
|
||||||
|
*/
|
||||||
|
|
||||||
|
import fs from 'fs';
|
||||||
|
import path from 'path';
|
||||||
|
import { fileURLToPath, pathToFileURL } from 'url';
|
||||||
|
|
||||||
|
const __filename = fileURLToPath(import.meta.url);
|
||||||
|
const __dirname = path.dirname(__filename);
|
||||||
|
|
||||||
|
const RULES_DIR = path.join(__dirname, 'rules');
|
||||||
|
|
||||||
|
let rules = [];
|
||||||
|
let db = null;
|
||||||
|
let sendRPCToDevice = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize rule engine with database and RPC function
|
||||||
|
*/
|
||||||
|
export function initRuleEngine(database, rpcFunction) {
|
||||||
|
db = database;
|
||||||
|
sendRPCToDevice = rpcFunction;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Load all rule scripts from rules/ directory
|
||||||
|
*/
|
||||||
|
export async function loadRules() {
|
||||||
|
rules = [];
|
||||||
|
|
||||||
|
if (!fs.existsSync(RULES_DIR)) {
|
||||||
|
console.log('Rules directory not found, creating...');
|
||||||
|
fs.mkdirSync(RULES_DIR, { recursive: true });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const files = fs.readdirSync(RULES_DIR).filter(f => f.endsWith('.js'));
|
||||||
|
|
||||||
|
for (const file of files) {
|
||||||
|
try {
|
||||||
|
const filePath = path.join(RULES_DIR, file);
|
||||||
|
// Add timestamp to bust cache on reload
|
||||||
|
const fileUrl = pathToFileURL(filePath).href + '?t=' + Date.now();
|
||||||
|
const module = await import(fileUrl);
|
||||||
|
const rule = module.default || module;
|
||||||
|
rule._filename = file;
|
||||||
|
rules.push(rule);
|
||||||
|
console.log(`Loaded rule: ${file}`);
|
||||||
|
} catch (err) {
|
||||||
|
console.error(`Error loading rule ${file}:`, err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
console.log(`Loaded ${rules.length} rule(s)`);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get current state of a channel
|
||||||
|
*/
|
||||||
|
function getChannelState(mac, component, field) {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
db.get(
|
||||||
|
`SELECT e.event FROM events e
|
||||||
|
JOIN channels c ON e.channel_id = c.id
|
||||||
|
WHERE c.mac = ? AND c.component = ? AND c.field = ?
|
||||||
|
ORDER BY e.id DESC LIMIT 1`,
|
||||||
|
[mac, component, field],
|
||||||
|
(err, row) => {
|
||||||
|
if (err) reject(err);
|
||||||
|
else resolve(row ? row.event : null);
|
||||||
|
}
|
||||||
|
);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get all current channel states
|
||||||
|
*/
|
||||||
|
function getAllChannelStates() {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
db.all(
|
||||||
|
`SELECT c.mac, c.component, c.field, c.type,
|
||||||
|
(SELECT e.event FROM events e WHERE e.channel_id = c.id ORDER BY e.id DESC LIMIT 1) as value
|
||||||
|
FROM channels c`,
|
||||||
|
[],
|
||||||
|
(err, rows) => {
|
||||||
|
if (err) reject(err);
|
||||||
|
else resolve(rows || []);
|
||||||
|
}
|
||||||
|
);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create context object passed to rule scripts
|
||||||
|
*/
|
||||||
|
function createContext(triggerEvent) {
|
||||||
|
return {
|
||||||
|
// The event that triggered this evaluation
|
||||||
|
trigger: triggerEvent,
|
||||||
|
|
||||||
|
// Get state of a specific channel
|
||||||
|
getState: getChannelState,
|
||||||
|
|
||||||
|
// Get all channel states
|
||||||
|
getAllStates: getAllChannelStates,
|
||||||
|
|
||||||
|
// Set light level (0-100)
|
||||||
|
setLevel: async (mac, component, level) => {
|
||||||
|
if (sendRPCToDevice) {
|
||||||
|
await sendRPCToDevice(mac, 'Light.Set', { id: parseInt(component.split(':')[1]), brightness: level });
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
// Set switch output
|
||||||
|
setOutput: async (mac, component, on) => {
|
||||||
|
if (sendRPCToDevice) {
|
||||||
|
await sendRPCToDevice(mac, 'Switch.Set', { id: parseInt(component.split(':')[1]), on: on });
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
// Generic RPC call
|
||||||
|
sendRPC: async (mac, method, params) => {
|
||||||
|
if (sendRPCToDevice) {
|
||||||
|
await sendRPCToDevice(mac, method, params);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
// Logging
|
||||||
|
log: (...args) => console.log('[Rule]', ...args)
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run all rules after an event occurs
|
||||||
|
*/
|
||||||
|
export async function runRules(mac, component, field, type, event) {
|
||||||
|
const triggerEvent = { mac, component, field, type, event };
|
||||||
|
const ctx = createContext(triggerEvent);
|
||||||
|
|
||||||
|
for (const rule of rules) {
|
||||||
|
try {
|
||||||
|
if (typeof rule.run === 'function') {
|
||||||
|
await rule.run(ctx);
|
||||||
|
} else if (typeof rule === 'function') {
|
||||||
|
await rule(ctx);
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
console.error(`Error running rule ${rule._filename || 'unknown'}:`, err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reload rules from disk
|
||||||
|
*/
|
||||||
|
export async function reloadRules() {
|
||||||
|
console.log('Reloading rules...');
|
||||||
|
await loadRules();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Watch rules directory for changes and auto-reload
|
||||||
|
*/
|
||||||
|
export function watchRules() {
|
||||||
|
if (!fs.existsSync(RULES_DIR)) {
|
||||||
|
fs.mkdirSync(RULES_DIR, { recursive: true });
|
||||||
|
}
|
||||||
|
|
||||||
|
let reloadTimeout = null;
|
||||||
|
|
||||||
|
fs.watch(RULES_DIR, (eventType, filename) => {
|
||||||
|
if (filename && filename.endsWith('.js')) {
|
||||||
|
// Debounce reloads
|
||||||
|
if (reloadTimeout) clearTimeout(reloadTimeout);
|
||||||
|
reloadTimeout = setTimeout(async () => {
|
||||||
|
console.log(`Rule file changed: ${filename}`);
|
||||||
|
await reloadRules();
|
||||||
|
}, 500);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
console.log('Watching rules/ directory for changes');
|
||||||
|
}
|
||||||
144
rules/example_button_log.js
Normal file
144
rules/example_button_log.js
Normal file
@@ -0,0 +1,144 @@
|
|||||||
|
/**
|
||||||
|
* Timer light rule
|
||||||
|
*
|
||||||
|
* - btn_down: Light goes off
|
||||||
|
* - long_push: Flash to confirm, enter "count mode" - counts seconds until next btn_down, then light goes on
|
||||||
|
* - Normal btn_down (no long_push): Light goes off, then turns back on after the stored count elapsed
|
||||||
|
*
|
||||||
|
* Also syncs remote switch CC8DA243B0A0 inversely (light off = switch on, light on = switch off)
|
||||||
|
*/
|
||||||
|
|
||||||
|
import fs from 'fs';
|
||||||
|
import path from 'path';
|
||||||
|
import { fileURLToPath } from 'url';
|
||||||
|
|
||||||
|
const __filename = fileURLToPath(import.meta.url);
|
||||||
|
const __dirname = path.dirname(__filename);
|
||||||
|
|
||||||
|
const sleep = (ms) => new Promise(resolve => setTimeout(resolve, ms));
|
||||||
|
const STATE_FILE = path.join(__dirname, 'timer_state.json');
|
||||||
|
const REMOTE_SWITCH_MAC = 'CC8DA243B0A0';
|
||||||
|
|
||||||
|
// Helper to set local light and sync remote switch inversely
|
||||||
|
async function setLight(ctx, mac, on, brightness) {
|
||||||
|
await ctx.sendRPC(mac, 'Light.Set', { id: 0, on, brightness });
|
||||||
|
// Remote switch is inverse: light off = switch on, light on = switch off
|
||||||
|
await ctx.sendRPC(REMOTE_SWITCH_MAC, 'Switch.Set', { id: 0, on: !on });
|
||||||
|
}
|
||||||
|
|
||||||
|
// Load persisted state
|
||||||
|
function loadPersistedState() {
|
||||||
|
try {
|
||||||
|
if (fs.existsSync(STATE_FILE)) {
|
||||||
|
return JSON.parse(fs.readFileSync(STATE_FILE, 'utf8'));
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
console.error('Error loading state:', err);
|
||||||
|
}
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save state to file
|
||||||
|
function saveState(state) {
|
||||||
|
try {
|
||||||
|
// Only save storedDuration per MAC
|
||||||
|
const toSave = {};
|
||||||
|
for (const [mac, data] of Object.entries(state)) {
|
||||||
|
toSave[mac] = { storedDuration: data.storedDuration };
|
||||||
|
}
|
||||||
|
fs.writeFileSync(STATE_FILE, JSON.stringify(toSave, null, 2));
|
||||||
|
} catch (err) {
|
||||||
|
console.error('Error saving state:', err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// State for devices (in memory, with persistence for storedDuration)
|
||||||
|
const persistedState = loadPersistedState();
|
||||||
|
const deviceState = new Map();
|
||||||
|
|
||||||
|
function getState(mac) {
|
||||||
|
if (!deviceState.has(mac)) {
|
||||||
|
const persisted = persistedState[mac] || {};
|
||||||
|
deviceState.set(mac, {
|
||||||
|
countMode: false,
|
||||||
|
countStart: 0,
|
||||||
|
storedDuration: persisted.storedDuration || 5000, // Default 5 seconds
|
||||||
|
timer: null
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return deviceState.get(mac);
|
||||||
|
}
|
||||||
|
|
||||||
|
export default {
|
||||||
|
async run(ctx) {
|
||||||
|
if (ctx.trigger.field !== 'button') return;
|
||||||
|
|
||||||
|
const mac = ctx.trigger.mac;
|
||||||
|
const state = getState(mac);
|
||||||
|
|
||||||
|
ctx.log(`Event: ${ctx.trigger.event}, countMode: ${state.countMode}, storedDuration: ${state.storedDuration}ms`);
|
||||||
|
|
||||||
|
// Handle btn_down
|
||||||
|
if (ctx.trigger.event === 'btn_down') {
|
||||||
|
// Clear any pending timer
|
||||||
|
if (state.timer) {
|
||||||
|
clearTimeout(state.timer);
|
||||||
|
state.timer = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Turn light off (remote switch turns on)
|
||||||
|
await setLight(ctx, mac, false, 0);
|
||||||
|
|
||||||
|
if (state.countMode) {
|
||||||
|
// We're in count mode - calculate elapsed time and turn light on
|
||||||
|
const elapsed = Date.now() - state.countStart;
|
||||||
|
state.storedDuration = elapsed;
|
||||||
|
state.countMode = false;
|
||||||
|
ctx.log(`Count mode ended. Stored duration: ${elapsed}ms`);
|
||||||
|
|
||||||
|
// Persist the new duration
|
||||||
|
persistedState[mac] = { storedDuration: elapsed };
|
||||||
|
saveState(persistedState);
|
||||||
|
|
||||||
|
// Turn light on immediately (remote switch turns off)
|
||||||
|
await setLight(ctx, mac, true, 20);
|
||||||
|
} else {
|
||||||
|
// Normal mode - schedule light to turn on after stored duration
|
||||||
|
ctx.log(`Light off. Will turn on in ${state.storedDuration}ms`);
|
||||||
|
|
||||||
|
state.timer = setTimeout(async () => {
|
||||||
|
ctx.log(`Timer elapsed. Turning light on.`);
|
||||||
|
await setLight(ctx, mac, true, 20);
|
||||||
|
state.timer = null;
|
||||||
|
}, state.storedDuration);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle long_push - enter count mode
|
||||||
|
if (ctx.trigger.event === 'long_push' && !state.countMode) {
|
||||||
|
// Clear any pending timer
|
||||||
|
if (state.timer) {
|
||||||
|
clearTimeout(state.timer);
|
||||||
|
state.timer = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx.log('Entering count mode...');
|
||||||
|
|
||||||
|
// Flash to confirm (don't sync remote during flash)
|
||||||
|
for (let i = 0; i < 2; i++) {
|
||||||
|
await ctx.sendRPC(mac, 'Light.Set', { id: 0, on: true, brightness: 20 });
|
||||||
|
await sleep(200);
|
||||||
|
await ctx.sendRPC(mac, 'Light.Set', { id: 0, on: false, brightness: 0 });
|
||||||
|
await sleep(200);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure light stays off during count mode (remote switch on)
|
||||||
|
await setLight(ctx, mac, false, 0);
|
||||||
|
|
||||||
|
// Enter count mode
|
||||||
|
state.countMode = true;
|
||||||
|
state.countStart = Date.now();
|
||||||
|
ctx.log('Count mode active. Light stays off. Press button to set duration and turn on.');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
41
server.js
41
server.js
@@ -3,6 +3,7 @@ import fs from 'fs';
|
|||||||
import path from 'path';
|
import path from 'path';
|
||||||
import sqlite3 from 'sqlite3';
|
import sqlite3 from 'sqlite3';
|
||||||
import { fileURLToPath } from 'url';
|
import { fileURLToPath } from 'url';
|
||||||
|
import { initRuleEngine, loadRules, runRules, watchRules } from './rule_engine.js';
|
||||||
|
|
||||||
const __filename = fileURLToPath(import.meta.url);
|
const __filename = fileURLToPath(import.meta.url);
|
||||||
const __dirname = path.dirname(__filename);
|
const __dirname = path.dirname(__filename);
|
||||||
@@ -46,6 +47,35 @@ const macConnectionsMap = new Map();
|
|||||||
// Cache for channel IDs to avoid repeated DB lookups
|
// Cache for channel IDs to avoid repeated DB lookups
|
||||||
const channelCache = new Map(); // key: "mac:component:field" -> channel_id
|
const channelCache = new Map(); // key: "mac:component:field" -> channel_id
|
||||||
|
|
||||||
|
// Map MAC to WebSocket for sending RPC commands
|
||||||
|
const macToSocket = new Map();
|
||||||
|
|
||||||
|
// Send RPC command to a device by MAC
|
||||||
|
async function sendRPCToDevice(mac, method, params = {}) {
|
||||||
|
const connections = macConnectionsMap.get(mac);
|
||||||
|
if (!connections || connections.size === 0) {
|
||||||
|
console.log(`[RPC] No connection for MAC=${mac}`);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the first connection ID for this MAC
|
||||||
|
const connectionId = connections.values().next().value;
|
||||||
|
|
||||||
|
// Find the socket
|
||||||
|
for (const client of wss.clients) {
|
||||||
|
if (client.connectionId === connectionId) {
|
||||||
|
const rpcId = Math.floor(Math.random() * 10000);
|
||||||
|
const rpcRequest = { id: rpcId, src: 'server', method, params };
|
||||||
|
console.log(`[RPC] Sending to ${mac}: ${method}`, params);
|
||||||
|
client.send(JSON.stringify(rpcRequest));
|
||||||
|
return rpcId;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
console.log(`[RPC] Socket not found for MAC=${mac}`);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
// Get or create a channel, returns channel_id via callback
|
// Get or create a channel, returns channel_id via callback
|
||||||
function getOrCreateChannel(mac, component, field, type, callback) {
|
function getOrCreateChannel(mac, component, field, type, callback) {
|
||||||
const cacheKey = `${mac}:${component}:${field}`;
|
const cacheKey = `${mac}:${component}:${field}`;
|
||||||
@@ -105,6 +135,7 @@ function checkAndLogEvent(mac, component, field, type, event, connectionId = nul
|
|||||||
if (connectionId) console.log(`[ID: ${connectionId}] Event logged: ${event} on ${component} (${field})`);
|
if (connectionId) console.log(`[ID: ${connectionId}] Event logged: ${event} on ${component} (${field})`);
|
||||||
db.run("INSERT INTO events (channel_id, event, timestamp) VALUES (?, ?, ?)", [channelId, String(event), new Date().toISOString()]);
|
db.run("INSERT INTO events (channel_id, event, timestamp) VALUES (?, ?, ?)", [channelId, String(event), new Date().toISOString()]);
|
||||||
forwardToUpstream();
|
forwardToUpstream();
|
||||||
|
runRules(mac, component, field, type, event);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -121,6 +152,7 @@ function checkAndLogEvent(mac, component, field, type, event, connectionId = nul
|
|||||||
if (connectionId) console.log(`[ID: ${connectionId}] Status change logged: ${event} on ${component} (${field})`);
|
if (connectionId) console.log(`[ID: ${connectionId}] Status change logged: ${event} on ${component} (${field})`);
|
||||||
db.run("INSERT INTO events (channel_id, event, timestamp) VALUES (?, ?, ?)", [channelId, currentEventStr, new Date().toISOString()]);
|
db.run("INSERT INTO events (channel_id, event, timestamp) VALUES (?, ?, ?)", [channelId, currentEventStr, new Date().toISOString()]);
|
||||||
forwardToUpstream();
|
forwardToUpstream();
|
||||||
|
runRules(mac, component, field, type, currentEventStr);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
@@ -130,6 +162,15 @@ const wss = new WebSocketServer({ port: 8080 });
|
|||||||
|
|
||||||
console.log('Shelly Agent Server listening on port 8080');
|
console.log('Shelly Agent Server listening on port 8080');
|
||||||
|
|
||||||
|
// Initialize and load rules
|
||||||
|
initRuleEngine(db, sendRPCToDevice);
|
||||||
|
loadRules().then(() => {
|
||||||
|
console.log('Rule engine ready');
|
||||||
|
watchRules(); // Auto-reload rules when files change
|
||||||
|
}).catch(err => {
|
||||||
|
console.error('Error loading rules:', err);
|
||||||
|
});
|
||||||
|
|
||||||
// Global counter for connection IDs
|
// Global counter for connection IDs
|
||||||
let connectionIdCounter = 0;
|
let connectionIdCounter = 0;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user