feat(tapo-agent): add schedule/countdown timer API support

- Fork tapo crate to add missing schedule/timer APIs
- Add get_countdown_rules, get_schedule_rules, get_next_event methods
- New readings: countdown_active, countdown_remain, schedule_count,
  schedule_active_count, next_event_time
- Add local compilation to build script alongside cross-compilation
- Implement offline polling - device collection continues when server disconnects
- Add more device readings: on_time, signal_level, rssi, runtime_today/month, energy_month

Vendored tapo fork in tapo-fork/ with minimal changes to add schedule APIs.
This commit is contained in:
seb
2025-12-23 00:46:42 +01:00
parent f3cca149f9
commit 028763bdb2
274 changed files with 20784 additions and 48 deletions

View File

@@ -197,8 +197,27 @@ async fn collect_device_data(device: &DeviceConfig) -> Vec<Reading> {
channel: "state".to_string(),
value: if info.device_on { 1.0 } else { 0.0 },
});
// Time device has been ON since last state change (seconds)
readings.push(Reading {
device: device.name.clone(),
channel: "on_time".to_string(),
value: info.on_time as f64,
});
// WiFi signal level (0-3)
readings.push(Reading {
device: device.name.clone(),
channel: "signal_level".to_string(),
value: info.signal_level as f64,
});
// WiFi RSSI (dBm, negative value)
readings.push(Reading {
device: device.name.clone(),
channel: "rssi".to_string(),
value: info.rssi as f64,
});
}
// Current power in watts (API returns milliwatts)
if let Ok(energy) = plug.get_current_power().await {
readings.push(Reading {
device: device.name.clone(),
@@ -208,11 +227,74 @@ async fn collect_device_data(device: &DeviceConfig) -> Vec<Reading> {
}
if let Ok(usage) = plug.get_energy_usage().await {
// Today's energy in Wh
readings.push(Reading {
device: device.name.clone(),
channel: "energy_today".to_string(),
value: usage.today_energy as f64,
});
// Today's runtime in minutes
readings.push(Reading {
device: device.name.clone(),
channel: "runtime_today".to_string(),
value: usage.today_runtime as f64,
});
// This month's energy in Wh
readings.push(Reading {
device: device.name.clone(),
channel: "energy_month".to_string(),
value: usage.month_energy as f64,
});
// This month's runtime in minutes
readings.push(Reading {
device: device.name.clone(),
channel: "runtime_month".to_string(),
value: usage.month_runtime as f64,
});
}
// Countdown timer status
if let Ok(countdown) = plug.get_countdown_rules().await {
let active_countdown = countdown.rules.iter().find(|r| r.enable);
readings.push(Reading {
device: device.name.clone(),
channel: "countdown_active".to_string(),
value: if active_countdown.is_some() { 1.0 } else { 0.0 },
});
if let Some(rule) = active_countdown {
readings.push(Reading {
device: device.name.clone(),
channel: "countdown_remain".to_string(),
value: rule.remain as f64,
});
}
}
// Schedule rules count
if let Ok(schedules) = plug.get_schedule_rules().await {
readings.push(Reading {
device: device.name.clone(),
channel: "schedule_count".to_string(),
value: schedules.rules.len() as f64,
});
// Count active schedules
let active_count = schedules.rules.iter().filter(|r| r.enable).count();
readings.push(Reading {
device: device.name.clone(),
channel: "schedule_active_count".to_string(),
value: active_count as f64,
});
}
// Next scheduled event
if let Ok(next) = plug.get_next_event().await {
if let Some(ts) = next.timestamp {
readings.push(Reading {
device: device.name.clone(),
channel: "next_event_time".to_string(),
value: ts as f64,
});
}
}
}
Err(e) => error!("Failed to connect to P110 {}: {}", device.name, e),
@@ -241,6 +323,38 @@ async fn collect_device_data(device: &DeviceConfig) -> Vec<Reading> {
}
async fn run_agent(config: Config) -> Result<(), Box<dyn std::error::Error>> {
use tokio::sync::mpsc;
// Channel for readings from poller to sender
let (tx, mut rx) = mpsc::channel::<Vec<Reading>>(100);
// Spawn device polling task - runs continuously regardless of connection
let poll_interval_secs = config.poll_interval_secs;
let devices = config.devices.clone();
tokio::spawn(async move {
let mut poll_interval = interval(Duration::from_secs(poll_interval_secs));
loop {
poll_interval.tick().await;
let mut all_readings = Vec::new();
for device in &devices {
let readings = collect_device_data(device).await;
all_readings.extend(readings);
}
if !all_readings.is_empty() {
info!("Collected {} readings from devices", all_readings.len());
// Log readings even if not connected
for reading in &all_readings {
info!(" {} {} = {}", reading.device, reading.channel, reading.value);
}
// Try to send to connection task, drop if channel full
let _ = tx.try_send(all_readings);
}
}
});
// Connection and sending loop
let mut reconnect_delay = Duration::from_secs(1);
let max_reconnect_delay = Duration::from_secs(60);
@@ -283,50 +397,43 @@ async fn run_agent(config: Config) -> Result<(), Box<dyn std::error::Error>> {
continue;
}
let mut poll_interval = interval(Duration::from_secs(config.poll_interval_secs));
// Main send loop - receive readings from channel and send to server
loop {
poll_interval.tick().await;
tokio::select! {
// Receive readings from polling task
Some(readings) = rx.recv() => {
info!("Sending {} readings to server", readings.len());
let data = DataMessage {
msg_type: "data".to_string(),
readings,
};
let data_json = serde_json::to_string(&data)?;
let mut all_readings = Vec::new();
for device in &config.devices {
let readings = collect_device_data(device).await;
all_readings.extend(readings);
}
if !all_readings.is_empty() {
info!("Sending {} readings", all_readings.len());
let data = DataMessage {
msg_type: "data".to_string(),
readings: all_readings,
};
let data_json = serde_json::to_string(&data)?;
if let Err(e) = write.send(Message::Text(data_json)).await {
error!("Failed to send data: {}", e);
break;
if let Err(e) = write.send(Message::Text(data_json)).await {
error!("Failed to send data: {}", e);
break;
}
}
}
while let Ok(Some(msg)) = tokio::time::timeout(
Duration::from_millis(100),
read.next(),
)
.await
{
match msg {
Ok(Message::Ping(data)) => {
let _ = write.send(Message::Pong(data)).await;
// Handle incoming WebSocket messages
msg = read.next() => {
match msg {
Some(Ok(Message::Ping(data))) => {
let _ = write.send(Message::Pong(data)).await;
}
Some(Ok(Message::Close(_))) => {
info!("Server closed connection");
break;
}
Some(Err(e)) => {
error!("WebSocket error: {}", e);
break;
}
None => {
info!("Connection closed");
break;
}
_ => {}
}
Ok(Message::Close(_)) => {
info!("Server closed connection");
break;
}
Err(e) => {
error!("WebSocket error: {}", e);
break;
}
_ => {}
}
}
}
@@ -363,12 +470,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Some(Commands::Run) | None => {
let config_path = &cli.config;
let config_content = std::fs::read_to_string(config_path).map_err(|e| {
format!(
"Failed to read config file {}: {}\n\nCreate config with device discovery:\n ./tapo-agent init --server ws://SERVER:8080 --key YOUR_KEY --email tapo@email.com --password tapopass\n\nOr specify broadcast address:\n ./tapo-agent init --server ws://SERVER:8080 --key YOUR_KEY --email tapo@email.com --password tapopass --broadcast 192.168.0.255",
config_path, e
)
})?;
let config_content = match std::fs::read_to_string(config_path) {
Ok(content) => content,
Err(e) => {
eprintln!("Failed to read config file {}: {}", config_path, e);
eprintln!();
eprintln!("Create config with device discovery:");
eprintln!(" ./tapo-agent init --server ws://SERVER:8080 --key YOUR_KEY --email tapo@email.com --password tapopass");
eprintln!();
eprintln!("Or specify broadcast address:");
eprintln!(" ./tapo-agent init --server ws://SERVER:8080 --key YOUR_KEY --email tapo@email.com --password tapopass --broadcast 192.168.0.255");
std::process::exit(1);
}
};
let config: Config = toml::from_str(&config_content)
.map_err(|e| format!("Failed to parse config: {}", e))?;