From fa2542f3612dbc141c15110828e03a285d2bb5c4 Mon Sep 17 00:00:00 2001 From: "Brian J. Tarricone" Date: Wed, 8 Jun 2022 15:36:08 -0700 Subject: [PATCH] Modernize --- Cargo.lock | 1 + Cargo.toml | 3 +- src/config.rs | 9 +++--- src/interpolate.rs | 20 ++++++------- src/main.rs | 71 +++++++++++++++------------------------------- src/mapping.rs | 24 ++++++++-------- src/value.rs | 22 +++++++------- 7 files changed, 63 insertions(+), 87 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c3fcd61..ff60cd8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1040,6 +1040,7 @@ dependencies = [ name = "mqtt2db" version = "0.1.0" dependencies = [ + "anyhow", "chrono", "env_logger", "futures", diff --git a/Cargo.toml b/Cargo.toml index dbf80f7..6cdf273 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,9 +2,10 @@ name = "mqtt2db" version = "0.1.0" authors = ["Brian J. Tarricone "] -edition = "2018" +edition = "2021" [dependencies] +anyhow = "1" chrono = { version = "0.4", features = ["serde"] } env_logger = "0.9" futures = "0.3" diff --git a/src/config.rs b/src/config.rs index 6c07ad6..ef0a5b5 100644 --- a/src/config.rs +++ b/src/config.rs @@ -86,12 +86,11 @@ pub struct Config { } impl Config { - pub fn parse>(filename: P) -> Result { - let mut f = File::open(filename).map_err(|err| err.to_string())?; + pub fn parse>(filename: P) -> anyhow::Result { + let mut f = File::open(filename)?; let mut contents = String::new(); - f.read_to_string(&mut contents) - .map_err(|err| err.to_string())?; - let config: Config = from_str(&contents).map_err(|err| err.to_string())?; + f.read_to_string(&mut contents)?; + let config: Config = from_str(&contents)?; Ok(config) } } diff --git a/src/interpolate.rs b/src/interpolate.rs index 0700023..17814da 100644 --- a/src/interpolate.rs +++ b/src/interpolate.rs @@ -19,8 +19,8 @@ pub enum InterpolatedNamePart { } impl TryFrom<&str> for InterpolatedName { - type Error = String; - fn try_from(s: &str) -> Result { + type Error = anyhow::Error; + fn try_from(s: &str) -> Result { let mut parts: Vec = Vec::new(); let mut n_references: usize = 0; let mut pos: usize = 0; @@ -28,7 +28,7 @@ impl TryFrom<&str> for InterpolatedName { for cap in REFERENCE_RE.captures_iter(s) { let mat = cap .get(2) - .ok_or_else(|| format!("Unable to get full match for name '{}'", s))?; + .ok_or_else(|| anyhow!("Unable to get full match for name '{}'", s))?; if pos < mat.start() { parts.push(InterpolatedNamePart::Literal( s.chars() @@ -42,12 +42,12 @@ impl TryFrom<&str> for InterpolatedName { let num_str = cap .get(3) .map(|mat1| mat1.as_str()) - .ok_or_else(|| format!("Unable to get capture group for name '{}'", s))?; + .ok_or_else(|| anyhow!("Unable to get capture group for name '{}'", s))?; let num = num_str .parse::() - .map_err(|_| format!("Couldn't parse '{}' as number for name '{}'", num_str, s))?; + .map_err(|_| anyhow!("Couldn't parse '{}' as number for name '{}'", num_str, s))?; if num == 0 { - Err(format!("Invalid reference number 0 for name '{}'", s))?; + Err(anyhow!("Invalid reference number 0 for name '{}'", s))?; } parts.push(InterpolatedNamePart::Reference(num)); n_references += 1; @@ -69,7 +69,7 @@ impl TryFrom<&str> for InterpolatedName { } impl InterpolatedName { - pub fn interpolate>(&self, reference_values: &Vec) -> Result { + pub fn interpolate>(&self, reference_values: &Vec) -> anyhow::Result { self.parts .iter() .fold(Ok(String::new()), |accum, part| match accum { @@ -83,7 +83,7 @@ impl InterpolatedName { accum.push_str(reference_value.as_ref()); Ok(accum) } - None => Err(format!( + None => Err(anyhow!( "Can't find reference number {} to interpolate", num )), @@ -99,7 +99,7 @@ mod test { use super::*; #[test] - fn interpolated_name_parsing() -> Result<(), String> { + fn interpolated_name_parsing() -> anyhow::Result<()> { use InterpolatedNamePart::*; assert_eq!( @@ -135,7 +135,7 @@ mod test { } #[test] - fn interpolation() -> Result<(), String> { + fn interpolation() -> anyhow::Result<()> { let interp = InterpolatedName::try_from("foo$1bar$2 baz $1")?; assert_eq!( "foofirstbarsecond baz first".to_string(), diff --git a/src/main.rs b/src/main.rs index 7115439..0283b5d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,9 @@ #[macro_use] +extern crate anyhow; +#[macro_use] extern crate log; use config::{Config, Database as ConfigDatabase, MqttAuth, MqttConfig, UserAuth}; -use futures::TryFutureExt; use influxdb::InfluxDbWriteable; use influxdb::{Client as InfluxClient, Timestamp, Type}; use mapping::{Mapping, Payload, TagValue, TopicLevel}; @@ -13,11 +14,9 @@ use rumqttc::{ use serde_json::Value as JsonValue; use std::convert::TryFrom; use std::env; -use std::path::Path; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; -use tokio::fs::File; -use tokio::io::AsyncReadExt; +use tokio::fs; use value::ToInfluxType; mod config; @@ -30,31 +29,8 @@ struct Database { measurement: String, } -async fn init_mqtt(config: &MqttConfig) -> Result<(MqttAsyncClient, MqttEventLoop), String> { - async fn file_to_bytevec>(file: P) -> Result, String> { - let mut f = File::open(&file) - .map_err(|err| { - format!( - "Unable to open {}: {}", - file.as_ref().to_string_lossy(), - err - ) - }) - .await?; - let mut buf = Vec::new(); - f.read_to_end(&mut buf) - .map_err(|err| { - format!( - "Unable to read {}: {}", - file.as_ref().to_string_lossy(), - err - ) - }) - .await?; - Ok(buf) - } - - let mut options = MqttOptions::new(config.client_id.clone(), config.host.clone(), config.port); +async fn init_mqtt(config: &MqttConfig) -> anyhow::Result<(MqttAsyncClient, MqttEventLoop)> { + let mut options = MqttOptions::new(&config.client_id, &config.host, config.port); if let Some(connect_timeout) = config.connect_timeout { options.set_connection_timeout(connect_timeout.as_secs()); } @@ -62,7 +38,7 @@ async fn init_mqtt(config: &MqttConfig) -> Result<(MqttAsyncClient, MqttEventLoo options.set_keep_alive(keep_alive); } if let Some(ca_file) = &config.ca_file { - let ca = file_to_bytevec(ca_file).await?; + let ca = fs::read(ca_file).await?; options.set_transport(Transport::Tls(TlsConfiguration::Simple { ca, alpn: None, @@ -71,8 +47,8 @@ async fn init_mqtt(config: &MqttConfig) -> Result<(MqttAsyncClient, MqttEventLoo private_key_file, }) = &config.auth { - let cert = file_to_bytevec(cert_file).await?; - let private_key = file_to_bytevec(private_key_file).await?; + let cert = fs::read(cert_file).await?; + let private_key = fs::read(private_key_file).await?; Some((cert, Key::RSA(private_key))) } else { None @@ -87,10 +63,10 @@ async fn init_mqtt(config: &MqttConfig) -> Result<(MqttAsyncClient, MqttEventLoo Ok(MqttAsyncClient::new(options, 100)) } -fn init_db(config: &ConfigDatabase) -> Result { +fn init_db(config: &ConfigDatabase) -> anyhow::Result { match config { ConfigDatabase::Influxdb { url, auth, db_name, measurement } => { - let mut client = InfluxClient::new(url.clone(), db_name.clone()); + let mut client = InfluxClient::new(url, db_name); if let Some(UserAuth { username, password }) = auth { client = client.with_auth(username, password); } @@ -105,7 +81,7 @@ fn init_db(config: &ConfigDatabase) -> Result { async fn init_subscriptions( mqtt_client: &mut MqttAsyncClient, topics: &Vec<&String>, -) -> Result<(), String> { +) -> anyhow::Result<()> { let topics: Vec = topics .iter() .map(|topic| { @@ -115,8 +91,7 @@ async fn init_subscriptions( .collect(); mqtt_client .subscribe_many(topics) - .await - .map_err(|err| err.to_string())?; + .await?; Ok(()) } @@ -124,7 +99,7 @@ async fn handle_publish( publish: &Publish, mapping: Arc, databases: Arc>, -) -> Result<(), String> { +) -> anyhow::Result<()> { debug!("Got publish: {:?}; {:?}", publish, publish.payload); let reference_values = publish @@ -139,27 +114,27 @@ async fn handle_publish( let field_name = mapping.field_name.interpolate(&reference_values)?; let payload = String::from_utf8(Vec::from(publish.payload.as_ref())) - .map_err(|err| format!("Invalid payload value: {}", err))?; + .map_err(|err| anyhow!("Invalid payload value: {}", err))?; let (influx_value, timestamp) = match &mapping.payload { Payload::Raw => (payload.to_influx_type(mapping.value_type)?, None), Payload::Json { value_field_selector, timestamp_field_selector } => { let payload_root: JsonValue = serde_json::from_str(&payload) - .map_err(|err| format!("Failed to parse payload as JSON: {}", err))?; + .map_err(|err| anyhow!("Failed to parse payload as JSON: {}", err))?; let influx_value = value_field_selector .find(&payload_root) .next() - .ok_or_else(|| format!("Couldn't find value in payload on topic {}", publish.topic)) + .ok_or_else(|| anyhow!("Couldn't find value in payload on topic {}", publish.topic)) .and_then(|value| value.to_influx_type(mapping.value_type))?; let timestamp = timestamp_field_selector .as_ref() .map(|selector| selector .find(&payload_root) .next() - .ok_or_else(|| format!("Couldn't find timestamp in payload on topic {}", publish.topic)) + .ok_or_else(|| anyhow!("Couldn't find timestamp in payload on topic {}", publish.topic)) .and_then(|ts_value| ts_value .as_u64() .map(|ts| ts as u128) - .ok_or_else(|| format!("'{}' cannot be converted to a timestamp", ts_value)) + .ok_or_else(|| anyhow!("'{}' cannot be converted to a timestamp", ts_value)) ) ) .transpose()?; @@ -189,7 +164,7 @@ async fn handle_publish( .client .query(&query) .await - .map_err(|err| format!("Failed to write to DB: {}", err))?; + .map_err(|err| anyhow!("Failed to write to DB: {}", err))?; debug!("wrote to influx: {:?}", query); } @@ -247,10 +222,10 @@ async fn run_event_loop( } #[tokio::main] -async fn main() -> Result<(), String> { +async fn main() -> anyhow::Result<()> { let config_filename = env::args() .nth(1) - .ok_or_else(|| "Missing argument 'config filename'")?; + .ok_or_else(|| anyhow!("Missing argument 'config filename'"))?; let config = Config::parse(&config_filename)?; let logger_env = env_logger::Env::new() @@ -266,7 +241,7 @@ async fn main() -> Result<(), String> { .mappings .iter() .map(Mapping::try_from) - .collect::, String>>()?; + .collect::>>()?; let (mut mqtt_client, mqtt_event_loop) = init_mqtt(&config.mqtt).await?; init_subscriptions( @@ -282,7 +257,7 @@ async fn main() -> Result<(), String> { let databases = config.databases .iter() .map(init_db) - .collect::, String>>()?; + .collect::>>()?; run_event_loop(mqtt_event_loop, mappings, databases).await; diff --git a/src/mapping.rs b/src/mapping.rs index d207618..f70833b 100644 --- a/src/mapping.rs +++ b/src/mapping.rs @@ -14,13 +14,13 @@ pub enum TopicLevel { } impl TryFrom<&str> for TopicLevel { - type Error = String; + type Error = anyhow::Error; fn try_from(s: &str) -> Result { match s { "+" => Ok(TopicLevel::SingleWildcard), "#" => Ok(TopicLevel::MultiWildcard), s if s.contains("+") || s.contains("#") => { - Err(format!("Topic level '{}' cannot contain '+' or '#'", s)) + Err(anyhow!("Topic level '{}' cannot contain '+' or '#'", s)) } s => Ok(TopicLevel::Literal(s.to_string())), } @@ -34,7 +34,7 @@ pub enum TagValue { } impl TryFrom<&ConfigTagValue> for TagValue { - type Error = String; + type Error = anyhow::Error; fn try_from(tag_value: &ConfigTagValue) -> Result { match tag_value.r#type { ValueType::Text => { @@ -79,19 +79,19 @@ pub struct Mapping { } impl TryFrom<&ConfigMapping> for Mapping { - type Error = String; + type Error = anyhow::Error; fn try_from(mapping: &ConfigMapping) -> Result { let topic = mapping .topic .split("/") .map(|level| TopicLevel::try_from(level)) - .collect::, String>>()?; + .collect::>>()?; let pre_multi_levels: Vec<&TopicLevel> = topic .iter() .take_while(|level| **level != TopicLevel::MultiWildcard) .collect(); if pre_multi_levels.len() < topic.len() - 1 { - Err(format!( + Err(anyhow!( "Topic '{}' has '#' wildcard before last topic level", mapping.topic ))?; @@ -103,7 +103,7 @@ impl TryFrom<&ConfigMapping> for Mapping { .count(); let field_name = match InterpolatedName::try_from(mapping.field_name.as_str()) { - Ok(name) if find_max_ref(&name) > max_interp_ref => Err(format!( + Ok(name) if find_max_ref(&name) > max_interp_ref => Err(anyhow!( "Topic '{}' has field name '{}' which has invalid references", mapping.topic, mapping.field_name )), @@ -115,10 +115,10 @@ impl TryFrom<&ConfigMapping> for Mapping { None => Payload::Raw, Some(ConfigPayload::Json { value_field_path, timestamp_field_path }) => { let value_field_selector = Selector::new(&value_field_path) - .map_err(|err| format!("Value field path '{}' is invalid: {}'", value_field_path, err))?; + .map_err(|err| anyhow!("Value field path '{}' is invalid: {}'", value_field_path, err))?; let timestamp_field_selector = timestamp_field_path.as_ref() .map(|path| Selector::new(path) - .map_err(|err| format!("Timestamp field path '{}' is invalid: {}'", path, err)) + .map_err(|err| anyhow!("Timestamp field path '{}' is invalid: {}'", path, err)) ) .transpose()?; Payload::Json { @@ -133,7 +133,7 @@ impl TryFrom<&ConfigMapping> for Mapping { .iter() .map(|tag| match TagValue::try_from(tag.1) { Ok(TagValue::InterpolatedStr(ref name)) if find_max_ref(name) > max_interp_ref => { - Err(format!( + Err(anyhow!( "Topic '{}' has tag value '{:?}' which has invalid references", mapping.topic, tag.1 )) @@ -141,7 +141,7 @@ impl TryFrom<&ConfigMapping> for Mapping { Ok(value) => Ok((tag.0.clone(), value)), Err(err) => Err(err), }) - .collect::, String>>()?; + .collect::>>()?; Ok(Mapping { topic, @@ -167,7 +167,7 @@ mod test { use super::*; #[test] - fn mapping_parsing() -> Result<(), String> { + fn mapping_parsing() -> anyhow::Result<()> { use TopicLevel::*; fn mk_cfg_mapping(topic: &str) -> ConfigMapping { diff --git a/src/value.rs b/src/value.rs index 9b75d01..fcd58de 100644 --- a/src/value.rs +++ b/src/value.rs @@ -28,53 +28,53 @@ impl fmt::Display for ValueType { } pub trait ToInfluxType { - fn to_influx_type(&self, value_type: ValueType) -> Result; + fn to_influx_type(&self, value_type: ValueType) -> anyhow::Result; } impl ToInfluxType for String { - fn to_influx_type(&self, value_type: ValueType) -> Result { + fn to_influx_type(&self, value_type: ValueType) -> anyhow::Result { match value_type { ValueType::Boolean if self == "true" => Ok(Type::Boolean(true)), ValueType::Boolean if self == "false" => Ok(Type::Boolean(false)), - ValueType::Boolean => Err(format!("Value '{}' is not a valid boolean", self)), + ValueType::Boolean => Err(anyhow!("Value '{}' is not a valid boolean", self)), ValueType::Float => self .parse::() .map(|v| Type::Float(v)) - .map_err(|err| err.to_string()), + .map_err(|err| err.into()), ValueType::SignedInteger => self .parse::() .map(|v| Type::SignedInteger(v)) - .map_err(|err| err.to_string()), + .map_err(|err| err.into()), ValueType::UnsignedInteger => self .parse::() .map(|v| Type::UnsignedInteger(v)) - .map_err(|err| err.to_string()), + .map_err(|err| err.into()), ValueType::Text => Ok(Type::Text(self.clone())), } } } impl ToInfluxType for JsonValue { - fn to_influx_type(&self, value_type: ValueType) -> Result { + fn to_influx_type(&self, value_type: ValueType) -> anyhow::Result { match (value_type, self) { (ValueType::Boolean, JsonValue::Bool(true)) => Ok(Type::Boolean(true)), (ValueType::Boolean, JsonValue::Bool(false)) => Ok(Type::Boolean(false)), (ValueType::Float, JsonValue::Number(num)) => num .as_f64() - .ok_or_else(|| format!("Cannot be expressed as f64: {}", num)) + .ok_or_else(|| anyhow!("Cannot be expressed as f64: {}", num)) .map(|v| Type::Float(v)), (ValueType::SignedInteger, JsonValue::Number(num)) => num .as_i64() - .ok_or_else(|| format!("Cannot be expressed as i64:{}", num)) + .ok_or_else(|| anyhow!("Cannot be expressed as i64: {}", num)) .map(|v| Type::SignedInteger(v)), (ValueType::UnsignedInteger, JsonValue::Number(num)) => num .as_u64() - .ok_or_else(|| format!("Cannot be expressed as u64:{}", num)) + .ok_or_else(|| anyhow!("Cannot be expressed as u64: {}", num)) .map(|v| Type::UnsignedInteger(v)), (ValueType::Text, JsonValue::String(s)) => Ok(Type::Text(s.to_string())), (ValueType::Text, JsonValue::Bool(b)) => Ok(Type::Text(b.to_string())), (ValueType::Text, JsonValue::Number(num)) => Ok(Type::Text(num.to_string())), - (other_type, other_value) => Err(format!("Unable to parse self from JSON; need type {} but got '{}'", other_type, other_value)), + (other_type, other_value) => Err(anyhow!("Unable to parse self from JSON; need type {} but got '{}'", other_type, other_value)), } } }