Fix Clippy. Cleanup Event Handling. Configurable Topics

This commit is contained in:
2023-08-31 21:18:11 -03:00
parent c62467417f
commit 3d34a11d5e
9 changed files with 99 additions and 38 deletions

View File

@@ -27,6 +27,8 @@ pub enum ConnectedHomeError {
UserAlreadyExists(String), UserAlreadyExists(String),
#[error("TODO: MQTT ERROR")] #[error("TODO: MQTT ERROR")]
Mqtt, Mqtt,
#[error("Failed to parse event from bytes")]
ParseEvent(Vec<u8>),
} }
impl ConnectedHomeError { impl ConnectedHomeError {

View File

@@ -1,4 +1,5 @@
use crate::mqtt::events::energy::CurrentMeasurement; use crate::{error::ConnectedHomeError, mqtt::events::energy::CurrentMeasurement};
use rumqttc::Publish;
pub mod energy; pub mod energy;
@@ -6,3 +7,12 @@ pub mod energy;
pub enum ConnectedHomeEvent { pub enum ConnectedHomeEvent {
Current(CurrentMeasurement), Current(CurrentMeasurement),
} }
impl TryFrom<&Publish> for ConnectedHomeEvent {
type Error = ConnectedHomeError;
fn try_from(publish: &Publish) -> Result<Self, Self::Error> {
serde_json::from_slice::<ConnectedHomeEvent>(&publish.payload)
.map_err(|_| ConnectedHomeError::ParseEvent(publish.payload.to_vec()))
}
}

View File

@@ -1,7 +1,7 @@
pub mod events; pub mod events;
use crate::util::cli::from_file_or_const; use crate::util::cli::from_file_or_const;
use rumqttc::MqttOptions; use rumqttc::{MqttOptions, QoS, SubscribeFilter};
use std::time::Duration; use std::time::Duration;
#[derive(Debug, Clone, serde::Deserialize)] #[derive(Debug, Clone, serde::Deserialize)]
@@ -14,6 +14,9 @@ pub struct MqttConfig {
#[serde(deserialize_with = "from_file_or_const")] #[serde(deserialize_with = "from_file_or_const")]
pub password: String, pub password: String,
pub client_id: String, pub client_id: String,
// note: consumer property only
#[serde(default)]
pub topics: Vec<String>,
} }
impl MqttConfig { impl MqttConfig {
@@ -27,4 +30,11 @@ impl MqttConfig {
mqttoptions mqttoptions
} }
pub fn sub_filters(&self) -> Vec<SubscribeFilter> {
self.topics
.iter()
.map(|topic| SubscribeFilter::new(topic.to_owned(), QoS::AtLeastOnce))
.collect::<Vec<_>>()
}
} }

View File

@@ -16,4 +16,5 @@ host = "localhost"
port = 1883 port = 1883
username = "file:./.secrets/mqtt_username" username = "file:./.secrets/mqtt_username"
password = "file:./.secrets/mqtt_password" password = "file:./.secrets/mqtt_password"
client-id = "energy_consumer" client-id = "energy_consumer"
topics = ["energy"]

View File

@@ -1,7 +1,7 @@
#[macro_use] #[macro_use]
extern crate slog; extern crate slog;
use crate::{config::Config, context::Context, tasks::energy_monitor::monitor_energy}; use crate::{config::Config, context::Context, tasks::mqtt_consumer::MqttConsumer};
use axum::Router; use axum::Router;
use common::{ use common::{
error::ConnectedHomeResult, error::ConnectedHomeResult,
@@ -37,7 +37,10 @@ async fn main() -> ConnectedHomeResult<()> {
set.spawn(launch_task( set.spawn(launch_task(
"MQTT Consumer".to_string(), "MQTT Consumer".to_string(),
monitor_energy(context.clone()), {
let mut consumer = MqttConsumer::new(context.clone());
async move { consumer.start().await }
},
context.clone(), context.clone(),
ShutdownType::Manual, ShutdownType::Manual,
)); ));

View File

@@ -1,27 +0,0 @@
use crate::context::Context;
use common::{
error::{ConnectedHomeError, ConnectedHomeResult},
mqtt::events::energy::CurrentMeasurement,
};
use rumqttc::{AsyncClient, QoS};
use rust_decimal::prelude::ToPrimitive;
use std::sync::Arc;
pub async fn monitor_energy(ctx: Arc<Context>) -> ConnectedHomeResult<()> {
let (mqtt_client, mut mqtt_event_loop) = AsyncClient::new(ctx.config.mqtt.options(), 10);
mqtt_client.subscribe("energy", QoS::AtMostOnce).await.unwrap();
loop {
match mqtt_event_loop.poll().await.map_err(|_| ConnectedHomeError::Mqtt)? {
rumqttc::Event::Incoming(rumqttc::Packet::Publish(p)) => {
let current_measurement: CurrentMeasurement = serde_json::from_slice(&p.payload).unwrap();
ctx.metrics
.current
.last_measurement
.with_label_values(&[current_measurement.device_id.to_string().as_str()])
.set(current_measurement.amps.to_f64().unwrap());
}
_ => {}
}
}
}

View File

@@ -1 +1,20 @@
pub mod energy_monitor; use crate::context::Context;
use common::{error::ConnectedHomeResult, mqtt::events::ConnectedHomeEvent};
use rust_decimal::prelude::ToPrimitive;
use std::sync::Arc;
pub mod mqtt_consumer;
pub async fn handle_event(ctx: Arc<Context>, event: ConnectedHomeEvent) -> ConnectedHomeResult<()> {
match event {
ConnectedHomeEvent::Current(current_measurement) => {
ctx.metrics
.current
.last_measurement
.with_label_values(&[current_measurement.device_id.to_string().as_str()])
.set(current_measurement.amps.to_f64().unwrap());
} // _ => {},
};
Ok(())
}

View File

@@ -0,0 +1,43 @@
use crate::{context::Context, tasks::handle_event};
use common::{
error::{ConnectedHomeError, ConnectedHomeResult},
mqtt::events::ConnectedHomeEvent,
};
use rumqttc::AsyncClient;
use std::sync::Arc;
pub struct MqttConsumer {
ctx: Arc<Context>,
mqtt_client: AsyncClient,
mqtt_event_loop: rumqttc::EventLoop,
}
impl MqttConsumer {
pub fn new(ctx: Arc<Context>) -> Self {
let (mqtt_client, mqtt_event_loop) = AsyncClient::new(ctx.config.mqtt.options(), 10);
Self {
ctx,
mqtt_client,
mqtt_event_loop,
}
}
pub async fn start(&mut self) -> ConnectedHomeResult<()> {
self.mqtt_client
.subscribe_many(self.ctx.config.mqtt.sub_filters())
.await
.map_err(|_| ConnectedHomeError::Mqtt)?;
loop {
if let rumqttc::Event::Incoming(rumqttc::Packet::Publish(p)) = self
.mqtt_event_loop
.poll()
.await
.map_err(|_| ConnectedHomeError::Mqtt)?
{
let event = ConnectedHomeEvent::try_from(&p)?;
handle_event(self.ctx.clone(), event).await?;
}
}
}
}

View File

@@ -1,7 +1,7 @@
use chrono::Utc; use chrono::Utc;
use common::{ use common::{
error::{ConnectedHomeError, ConnectedHomeResult}, error::{ConnectedHomeError, ConnectedHomeResult},
mqtt::events::energy::CurrentMeasurement, mqtt::events::{energy::CurrentMeasurement, ConnectedHomeEvent},
}; };
use rand::Rng; use rand::Rng;
use rumqttc::{AsyncClient, QoS}; use rumqttc::{AsyncClient, QoS};
@@ -28,15 +28,15 @@ fn sample_current() -> f64 {
} }
async fn publish_current_measurement(mqtt_client: &AsyncClient, device_id: Uuid, amps: f64) -> ConnectedHomeResult<()> { async fn publish_current_measurement(mqtt_client: &AsyncClient, device_id: Uuid, amps: f64) -> ConnectedHomeResult<()> {
let measurement = CurrentMeasurement { let measurement = ConnectedHomeEvent::Current(CurrentMeasurement {
device_id, device_id,
timestamp: Utc::now(), timestamp: Utc::now(),
amps: Decimal::try_from(amps).unwrap(), amps: Decimal::try_from(amps).unwrap(),
}; });
let serialized_measurement = serde_json::to_string(&measurement).unwrap(); let serialized_measurement = serde_json::to_string(&measurement).unwrap();
Ok(mqtt_client mqtt_client
.publish("energy", QoS::AtLeastOnce, true, serialized_measurement) .publish("energy", QoS::AtLeastOnce, true, serialized_measurement)
.await .await
.map_err(|_| ConnectedHomeError::Mqtt)?) .map_err(|_| ConnectedHomeError::Mqtt)
} }