From c88858e43f039168249d925af51d846484c1c196 Mon Sep 17 00:00:00 2001 From: "Brian J. Tarricone" Date: Tue, 31 Aug 2021 17:29:26 -0700 Subject: [PATCH] First step toward supporting multiple database writes --- src/config.rs | 22 +++++++------ src/main.rs | 88 ++++++++++++++++++++++++++++---------------------- src/mapping.rs | 2 +- 3 files changed, 62 insertions(+), 50 deletions(-) diff --git a/src/config.rs b/src/config.rs index 56faadc..6c07ad6 100644 --- a/src/config.rs +++ b/src/config.rs @@ -45,20 +45,22 @@ pub struct UserAuth { } #[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct InfluxDBConfig { - pub url: String, - pub auth: Option, - pub db_name: String, - pub measurement: String, +#[serde(rename_all = "kebab-case", tag = "type")] +pub enum Database { + #[serde(rename_all = "camelCase")] + Influxdb { + url: String, + auth: Option, + db_name: String, + measurement: String, + }, } #[derive(Debug, Deserialize, Clone)] -#[serde(rename_all = "camelCase", tag = "type")] -#[allow(non_camel_case_types)] +#[serde(rename_all = "kebab-case", tag = "type")] pub enum Payload { #[serde(rename_all = "camelCase")] - json { + Json { value_field_path: String, timestamp_field_path: Option, }, @@ -79,7 +81,7 @@ pub struct Mapping { pub struct Config { pub log_level: Option, pub mqtt: MqttConfig, - pub database: InfluxDBConfig, + pub databases: Vec, pub mappings: Vec, } diff --git a/src/main.rs b/src/main.rs index eb0182c..09bb83b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,7 @@ #[macro_use] extern crate log; -use config::{Config, InfluxDBConfig, MqttAuth, MqttConfig, UserAuth}; +use config::{Config, Database as ConfigDatabase, MqttAuth, MqttConfig, UserAuth}; use futures::TryFutureExt; use influxdb::InfluxDbWriteable; use influxdb::{Client as InfluxClient, Timestamp, Type}; @@ -89,15 +89,19 @@ async fn init_mqtt(config: &MqttConfig) -> Result<(MqttAsyncClient, MqttEventLoo Ok(MqttAsyncClient::new(options, 100)) } -fn init_db(config: &InfluxDBConfig) -> Result { - let mut client = InfluxClient::new(config.url.clone(), config.db_name.clone()); - if let Some(UserAuth { username, password }) = &config.auth { - client = client.with_auth(username, password); +fn init_db(config: &ConfigDatabase) -> Result { + match config { + ConfigDatabase::Influxdb { url, auth, db_name, measurement } => { + let mut client = InfluxClient::new(url.clone(), db_name.clone()); + if let Some(UserAuth { username, password }) = auth { + client = client.with_auth(username, password); + } + Ok(Database { + client, + measurement: measurement.clone(), + }) + } } - Ok(Database { - client, - measurement: config.measurement.clone(), - }) } async fn init_subscriptions( @@ -121,7 +125,7 @@ async fn init_subscriptions( async fn handle_publish( publish: &Publish, mapping: Arc, - database: Arc, + databases: Arc>, ) -> Result<(), String> { debug!("Got publish: {:?}; {:?}", publish, publish.payload); @@ -170,25 +174,28 @@ async fn handle_publish( .unwrap() .as_millis() ); - let mut query = Timestamp::Milliseconds(timestamp) - .into_query(&database.measurement) - .add_field(&field_name, influx_value); - for tag in mapping.tags.iter() { - let value = match &tag.1 { - TagValue::Literal(v) => v.clone(), - TagValue::InterpolatedStr(interp) => Type::Text(interp.interpolate(&reference_values)?), - }; - query = query.add_tag(&tag.0, value); - } - use tokio_compat_02::FutureExt; - database - .client - .query(&query) - .compat() - .await - .map_err(|err| format!("Failed to write to DB: {}", err))?; - debug!("wrote to influx: {:?}", query); + for database in databases.iter() { + let mut query = Timestamp::Milliseconds(timestamp) + .into_query(&database.measurement) + .add_field(&field_name, influx_value.clone()); + for tag in mapping.tags.iter() { + let value = match &tag.1 { + TagValue::Literal(v) => v.clone(), + TagValue::InterpolatedStr(interp) => Type::Text(interp.interpolate(&reference_values)?), + }; + query = query.add_tag(&tag.0, value); + } + + use tokio_compat_02::FutureExt; + database + .client + .query(&query) + .compat() + .await + .map_err(|err| format!("Failed to write to DB: {}", err))?; + debug!("wrote to influx: {:?}", query); + } Ok(()) } @@ -216,17 +223,20 @@ fn find_mapping<'a>(mappings: &'a Vec>, topic: &String) -> Option<& async fn run_event_loop( mut event_loop: MqttEventLoop, - mappings: &Vec>, - database: Arc, + mappings: Vec, + databases: Vec, ) { + let mappings = mappings.into_iter().map(Arc::new).collect(); + let databases = Arc::new(databases); + loop { match event_loop.poll().await { Ok(Event::Incoming(Packet::Publish(publish))) => { if let Some(mapping) = find_mapping(&mappings, &publish.topic) { let mapping = Arc::clone(mapping); - let database = Arc::clone(&database); + let databases = Arc::clone(&databases); tokio::spawn(async move { - if let Err(err) = handle_publish(&publish, mapping, database).await { + if let Err(err) = handle_publish(&publish, mapping, databases).await { warn!("{}", err); } }); @@ -256,14 +266,11 @@ async fn main() -> Result<(), String> { } logger_builder.init(); - let mappings: Vec> = config + let mappings: Vec = config .mappings .iter() .map(Mapping::try_from) - .collect::, String>>()? - .into_iter() - .map(Arc::new) - .collect(); + .collect::, String>>()?; let (mut mqtt_client, mqtt_event_loop) = init_mqtt(&config.mqtt).await?; init_subscriptions( @@ -276,9 +283,12 @@ async fn main() -> Result<(), String> { ) .await?; - let database = Arc::new(init_db(&config.database)?); + let databases = config.databases + .iter() + .map(init_db) + .collect::, String>>()?; - run_event_loop(mqtt_event_loop, &mappings, database).await; + run_event_loop(mqtt_event_loop, mappings, databases).await; Ok(()) } diff --git a/src/mapping.rs b/src/mapping.rs index 8ebf00d..d207618 100644 --- a/src/mapping.rs +++ b/src/mapping.rs @@ -113,7 +113,7 @@ impl TryFrom<&ConfigMapping> for Mapping { let payload = match &mapping.payload { None => Payload::Raw, - Some(ConfigPayload::json { value_field_path, timestamp_field_path }) => { + Some(ConfigPayload::Json { value_field_path, timestamp_field_path }) => { let value_field_selector = Selector::new(&value_field_path) .map_err(|err| format!("Value field path '{}' is invalid: {}'", value_field_path, err))?; let timestamp_field_selector = timestamp_field_path.as_ref()