diff --git a/crates/common/src/error.rs b/crates/common/src/error.rs index d57fef9..a45b609 100644 --- a/crates/common/src/error.rs +++ b/crates/common/src/error.rs @@ -27,6 +27,8 @@ pub enum ConnectedHomeError { UserAlreadyExists(String), #[error("TODO: MQTT ERROR")] Mqtt, + #[error("Failed to parse event from bytes")] + ParseEvent(Vec), } impl ConnectedHomeError { diff --git a/crates/common/src/mqtt/events/mod.rs b/crates/common/src/mqtt/events/mod.rs index dc32f51..e2e50c4 100644 --- a/crates/common/src/mqtt/events/mod.rs +++ b/crates/common/src/mqtt/events/mod.rs @@ -1,4 +1,5 @@ -use crate::mqtt::events::energy::CurrentMeasurement; +use crate::{error::ConnectedHomeError, mqtt::events::energy::CurrentMeasurement}; +use rumqttc::Publish; pub mod energy; @@ -6,3 +7,12 @@ pub mod energy; pub enum ConnectedHomeEvent { Current(CurrentMeasurement), } + +impl TryFrom<&Publish> for ConnectedHomeEvent { + type Error = ConnectedHomeError; + + fn try_from(publish: &Publish) -> Result { + serde_json::from_slice::(&publish.payload) + .map_err(|_| ConnectedHomeError::ParseEvent(publish.payload.to_vec())) + } +} diff --git a/crates/common/src/mqtt/mod.rs b/crates/common/src/mqtt/mod.rs index e6cf6ff..e0f5083 100644 --- a/crates/common/src/mqtt/mod.rs +++ b/crates/common/src/mqtt/mod.rs @@ -1,7 +1,7 @@ pub mod events; use crate::util::cli::from_file_or_const; -use rumqttc::MqttOptions; +use rumqttc::{MqttOptions, QoS, SubscribeFilter}; use std::time::Duration; #[derive(Debug, Clone, serde::Deserialize)] @@ -14,6 +14,9 @@ pub struct MqttConfig { #[serde(deserialize_with = "from_file_or_const")] pub password: String, pub client_id: String, + // note: consumer property only + #[serde(default)] + pub topics: Vec, } impl MqttConfig { @@ -27,4 +30,11 @@ impl MqttConfig { mqttoptions } + + pub fn sub_filters(&self) -> Vec { + self.topics + .iter() + .map(|topic| SubscribeFilter::new(topic.to_owned(), QoS::AtLeastOnce)) + .collect::>() + } } diff --git a/crates/consumer/config-local.toml.tmpl b/crates/consumer/config-local.toml.tmpl index 7f5eb75..e5ea813 100644 --- a/crates/consumer/config-local.toml.tmpl +++ b/crates/consumer/config-local.toml.tmpl @@ -16,4 +16,5 @@ host = "localhost" port = 1883 username = "file:./.secrets/mqtt_username" password = "file:./.secrets/mqtt_password" -client-id = "energy_consumer" \ No newline at end of file +client-id = "energy_consumer" +topics = ["energy"] \ No newline at end of file diff --git a/crates/consumer/src/main.rs b/crates/consumer/src/main.rs index 6d3dcfd..d7b1b65 100644 --- a/crates/consumer/src/main.rs +++ b/crates/consumer/src/main.rs @@ -1,7 +1,7 @@ #[macro_use] extern crate slog; -use crate::{config::Config, context::Context, tasks::energy_monitor::monitor_energy}; +use crate::{config::Config, context::Context, tasks::mqtt_consumer::MqttConsumer}; use axum::Router; use common::{ error::ConnectedHomeResult, @@ -37,7 +37,10 @@ async fn main() -> ConnectedHomeResult<()> { set.spawn(launch_task( "MQTT Consumer".to_string(), - monitor_energy(context.clone()), + { + let mut consumer = MqttConsumer::new(context.clone()); + async move { consumer.start().await } + }, context.clone(), ShutdownType::Manual, )); diff --git a/crates/consumer/src/tasks/energy_monitor.rs b/crates/consumer/src/tasks/energy_monitor.rs deleted file mode 100644 index 966004c..0000000 --- a/crates/consumer/src/tasks/energy_monitor.rs +++ /dev/null @@ -1,27 +0,0 @@ -use crate::context::Context; -use common::{ - error::{ConnectedHomeError, ConnectedHomeResult}, - mqtt::events::energy::CurrentMeasurement, -}; -use rumqttc::{AsyncClient, QoS}; -use rust_decimal::prelude::ToPrimitive; -use std::sync::Arc; - -pub async fn monitor_energy(ctx: Arc) -> ConnectedHomeResult<()> { - let (mqtt_client, mut mqtt_event_loop) = AsyncClient::new(ctx.config.mqtt.options(), 10); - - mqtt_client.subscribe("energy", QoS::AtMostOnce).await.unwrap(); - loop { - match mqtt_event_loop.poll().await.map_err(|_| ConnectedHomeError::Mqtt)? { - rumqttc::Event::Incoming(rumqttc::Packet::Publish(p)) => { - let current_measurement: CurrentMeasurement = serde_json::from_slice(&p.payload).unwrap(); - ctx.metrics - .current - .last_measurement - .with_label_values(&[current_measurement.device_id.to_string().as_str()]) - .set(current_measurement.amps.to_f64().unwrap()); - } - _ => {} - } - } -} diff --git a/crates/consumer/src/tasks/mod.rs b/crates/consumer/src/tasks/mod.rs index 5e43595..c9d82a3 100644 --- a/crates/consumer/src/tasks/mod.rs +++ b/crates/consumer/src/tasks/mod.rs @@ -1 +1,20 @@ -pub mod energy_monitor; +use crate::context::Context; +use common::{error::ConnectedHomeResult, mqtt::events::ConnectedHomeEvent}; +use rust_decimal::prelude::ToPrimitive; +use std::sync::Arc; + +pub mod mqtt_consumer; + +pub async fn handle_event(ctx: Arc, event: ConnectedHomeEvent) -> ConnectedHomeResult<()> { + match event { + ConnectedHomeEvent::Current(current_measurement) => { + ctx.metrics + .current + .last_measurement + .with_label_values(&[current_measurement.device_id.to_string().as_str()]) + .set(current_measurement.amps.to_f64().unwrap()); + } // _ => {}, + }; + + Ok(()) +} diff --git a/crates/consumer/src/tasks/mqtt_consumer.rs b/crates/consumer/src/tasks/mqtt_consumer.rs new file mode 100644 index 0000000..95ae004 --- /dev/null +++ b/crates/consumer/src/tasks/mqtt_consumer.rs @@ -0,0 +1,43 @@ +use crate::{context::Context, tasks::handle_event}; +use common::{ + error::{ConnectedHomeError, ConnectedHomeResult}, + mqtt::events::ConnectedHomeEvent, +}; +use rumqttc::AsyncClient; +use std::sync::Arc; + +pub struct MqttConsumer { + ctx: Arc, + mqtt_client: AsyncClient, + mqtt_event_loop: rumqttc::EventLoop, +} + +impl MqttConsumer { + pub fn new(ctx: Arc) -> Self { + let (mqtt_client, mqtt_event_loop) = AsyncClient::new(ctx.config.mqtt.options(), 10); + Self { + ctx, + mqtt_client, + mqtt_event_loop, + } + } + + pub async fn start(&mut self) -> ConnectedHomeResult<()> { + self.mqtt_client + .subscribe_many(self.ctx.config.mqtt.sub_filters()) + .await + .map_err(|_| ConnectedHomeError::Mqtt)?; + + loop { + if let rumqttc::Event::Incoming(rumqttc::Packet::Publish(p)) = self + .mqtt_event_loop + .poll() + .await + .map_err(|_| ConnectedHomeError::Mqtt)? + { + let event = ConnectedHomeEvent::try_from(&p)?; + handle_event(self.ctx.clone(), event).await?; + } + } + } +} diff --git a/crates/producer/src/tasks/current_monitor.rs b/crates/producer/src/tasks/current_monitor.rs index eb5988b..4ca0ec3 100644 --- a/crates/producer/src/tasks/current_monitor.rs +++ b/crates/producer/src/tasks/current_monitor.rs @@ -1,7 +1,7 @@ use chrono::Utc; use common::{ error::{ConnectedHomeError, ConnectedHomeResult}, - mqtt::events::energy::CurrentMeasurement, + mqtt::events::{energy::CurrentMeasurement, ConnectedHomeEvent}, }; use rand::Rng; use rumqttc::{AsyncClient, QoS}; @@ -28,15 +28,15 @@ fn sample_current() -> f64 { } async fn publish_current_measurement(mqtt_client: &AsyncClient, device_id: Uuid, amps: f64) -> ConnectedHomeResult<()> { - let measurement = CurrentMeasurement { + let measurement = ConnectedHomeEvent::Current(CurrentMeasurement { device_id, timestamp: Utc::now(), amps: Decimal::try_from(amps).unwrap(), - }; + }); let serialized_measurement = serde_json::to_string(&measurement).unwrap(); - Ok(mqtt_client + mqtt_client .publish("energy", QoS::AtLeastOnce, true, serialized_measurement) .await - .map_err(|_| ConnectedHomeError::Mqtt)?) + .map_err(|_| ConnectedHomeError::Mqtt) }