Modernize
This commit is contained in:
parent
a3c2f84223
commit
fa2542f361
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -1040,6 +1040,7 @@ dependencies = [
|
|||||||
name = "mqtt2db"
|
name = "mqtt2db"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"anyhow",
|
||||||
"chrono",
|
"chrono",
|
||||||
"env_logger",
|
"env_logger",
|
||||||
"futures",
|
"futures",
|
||||||
|
@ -2,9 +2,10 @@
|
|||||||
name = "mqtt2db"
|
name = "mqtt2db"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
authors = ["Brian J. Tarricone <brian@tarricone.org>"]
|
authors = ["Brian J. Tarricone <brian@tarricone.org>"]
|
||||||
edition = "2018"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
anyhow = "1"
|
||||||
chrono = { version = "0.4", features = ["serde"] }
|
chrono = { version = "0.4", features = ["serde"] }
|
||||||
env_logger = "0.9"
|
env_logger = "0.9"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
|
@ -86,12 +86,11 @@ pub struct Config {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Config {
|
impl Config {
|
||||||
pub fn parse<P: AsRef<Path>>(filename: P) -> Result<Config, String> {
|
pub fn parse<P: AsRef<Path>>(filename: P) -> anyhow::Result<Config> {
|
||||||
let mut f = File::open(filename).map_err(|err| err.to_string())?;
|
let mut f = File::open(filename)?;
|
||||||
let mut contents = String::new();
|
let mut contents = String::new();
|
||||||
f.read_to_string(&mut contents)
|
f.read_to_string(&mut contents)?;
|
||||||
.map_err(|err| err.to_string())?;
|
let config: Config = from_str(&contents)?;
|
||||||
let config: Config = from_str(&contents).map_err(|err| err.to_string())?;
|
|
||||||
Ok(config)
|
Ok(config)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -19,8 +19,8 @@ pub enum InterpolatedNamePart {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl TryFrom<&str> for InterpolatedName {
|
impl TryFrom<&str> for InterpolatedName {
|
||||||
type Error = String;
|
type Error = anyhow::Error;
|
||||||
fn try_from(s: &str) -> Result<InterpolatedName, String> {
|
fn try_from(s: &str) -> Result<InterpolatedName, Self::Error> {
|
||||||
let mut parts: Vec<InterpolatedNamePart> = Vec::new();
|
let mut parts: Vec<InterpolatedNamePart> = Vec::new();
|
||||||
let mut n_references: usize = 0;
|
let mut n_references: usize = 0;
|
||||||
let mut pos: usize = 0;
|
let mut pos: usize = 0;
|
||||||
@ -28,7 +28,7 @@ impl TryFrom<&str> for InterpolatedName {
|
|||||||
for cap in REFERENCE_RE.captures_iter(s) {
|
for cap in REFERENCE_RE.captures_iter(s) {
|
||||||
let mat = cap
|
let mat = cap
|
||||||
.get(2)
|
.get(2)
|
||||||
.ok_or_else(|| format!("Unable to get full match for name '{}'", s))?;
|
.ok_or_else(|| anyhow!("Unable to get full match for name '{}'", s))?;
|
||||||
if pos < mat.start() {
|
if pos < mat.start() {
|
||||||
parts.push(InterpolatedNamePart::Literal(
|
parts.push(InterpolatedNamePart::Literal(
|
||||||
s.chars()
|
s.chars()
|
||||||
@ -42,12 +42,12 @@ impl TryFrom<&str> for InterpolatedName {
|
|||||||
let num_str = cap
|
let num_str = cap
|
||||||
.get(3)
|
.get(3)
|
||||||
.map(|mat1| mat1.as_str())
|
.map(|mat1| mat1.as_str())
|
||||||
.ok_or_else(|| format!("Unable to get capture group for name '{}'", s))?;
|
.ok_or_else(|| anyhow!("Unable to get capture group for name '{}'", s))?;
|
||||||
let num = num_str
|
let num = num_str
|
||||||
.parse::<usize>()
|
.parse::<usize>()
|
||||||
.map_err(|_| format!("Couldn't parse '{}' as number for name '{}'", num_str, s))?;
|
.map_err(|_| anyhow!("Couldn't parse '{}' as number for name '{}'", num_str, s))?;
|
||||||
if num == 0 {
|
if num == 0 {
|
||||||
Err(format!("Invalid reference number 0 for name '{}'", s))?;
|
Err(anyhow!("Invalid reference number 0 for name '{}'", s))?;
|
||||||
}
|
}
|
||||||
parts.push(InterpolatedNamePart::Reference(num));
|
parts.push(InterpolatedNamePart::Reference(num));
|
||||||
n_references += 1;
|
n_references += 1;
|
||||||
@ -69,7 +69,7 @@ impl TryFrom<&str> for InterpolatedName {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl InterpolatedName {
|
impl InterpolatedName {
|
||||||
pub fn interpolate<S: AsRef<str>>(&self, reference_values: &Vec<S>) -> Result<String, String> {
|
pub fn interpolate<S: AsRef<str>>(&self, reference_values: &Vec<S>) -> anyhow::Result<String> {
|
||||||
self.parts
|
self.parts
|
||||||
.iter()
|
.iter()
|
||||||
.fold(Ok(String::new()), |accum, part| match accum {
|
.fold(Ok(String::new()), |accum, part| match accum {
|
||||||
@ -83,7 +83,7 @@ impl InterpolatedName {
|
|||||||
accum.push_str(reference_value.as_ref());
|
accum.push_str(reference_value.as_ref());
|
||||||
Ok(accum)
|
Ok(accum)
|
||||||
}
|
}
|
||||||
None => Err(format!(
|
None => Err(anyhow!(
|
||||||
"Can't find reference number {} to interpolate",
|
"Can't find reference number {} to interpolate",
|
||||||
num
|
num
|
||||||
)),
|
)),
|
||||||
@ -99,7 +99,7 @@ mod test {
|
|||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn interpolated_name_parsing() -> Result<(), String> {
|
fn interpolated_name_parsing() -> anyhow::Result<()> {
|
||||||
use InterpolatedNamePart::*;
|
use InterpolatedNamePart::*;
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
@ -135,7 +135,7 @@ mod test {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn interpolation() -> Result<(), String> {
|
fn interpolation() -> anyhow::Result<()> {
|
||||||
let interp = InterpolatedName::try_from("foo$1bar$2 baz $1")?;
|
let interp = InterpolatedName::try_from("foo$1bar$2 baz $1")?;
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
"foofirstbarsecond baz first".to_string(),
|
"foofirstbarsecond baz first".to_string(),
|
||||||
|
71
src/main.rs
71
src/main.rs
@ -1,8 +1,9 @@
|
|||||||
#[macro_use]
|
#[macro_use]
|
||||||
|
extern crate anyhow;
|
||||||
|
#[macro_use]
|
||||||
extern crate log;
|
extern crate log;
|
||||||
|
|
||||||
use config::{Config, Database as ConfigDatabase, MqttAuth, MqttConfig, UserAuth};
|
use config::{Config, Database as ConfigDatabase, MqttAuth, MqttConfig, UserAuth};
|
||||||
use futures::TryFutureExt;
|
|
||||||
use influxdb::InfluxDbWriteable;
|
use influxdb::InfluxDbWriteable;
|
||||||
use influxdb::{Client as InfluxClient, Timestamp, Type};
|
use influxdb::{Client as InfluxClient, Timestamp, Type};
|
||||||
use mapping::{Mapping, Payload, TagValue, TopicLevel};
|
use mapping::{Mapping, Payload, TagValue, TopicLevel};
|
||||||
@ -13,11 +14,9 @@ use rumqttc::{
|
|||||||
use serde_json::Value as JsonValue;
|
use serde_json::Value as JsonValue;
|
||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
use std::env;
|
use std::env;
|
||||||
use std::path::Path;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::{SystemTime, UNIX_EPOCH};
|
use std::time::{SystemTime, UNIX_EPOCH};
|
||||||
use tokio::fs::File;
|
use tokio::fs;
|
||||||
use tokio::io::AsyncReadExt;
|
|
||||||
use value::ToInfluxType;
|
use value::ToInfluxType;
|
||||||
|
|
||||||
mod config;
|
mod config;
|
||||||
@ -30,31 +29,8 @@ struct Database {
|
|||||||
measurement: String,
|
measurement: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn init_mqtt(config: &MqttConfig) -> Result<(MqttAsyncClient, MqttEventLoop), String> {
|
async fn init_mqtt(config: &MqttConfig) -> anyhow::Result<(MqttAsyncClient, MqttEventLoop)> {
|
||||||
async fn file_to_bytevec<P: AsRef<Path>>(file: P) -> Result<Vec<u8>, String> {
|
let mut options = MqttOptions::new(&config.client_id, &config.host, config.port);
|
||||||
let mut f = File::open(&file)
|
|
||||||
.map_err(|err| {
|
|
||||||
format!(
|
|
||||||
"Unable to open {}: {}",
|
|
||||||
file.as_ref().to_string_lossy(),
|
|
||||||
err
|
|
||||||
)
|
|
||||||
})
|
|
||||||
.await?;
|
|
||||||
let mut buf = Vec::new();
|
|
||||||
f.read_to_end(&mut buf)
|
|
||||||
.map_err(|err| {
|
|
||||||
format!(
|
|
||||||
"Unable to read {}: {}",
|
|
||||||
file.as_ref().to_string_lossy(),
|
|
||||||
err
|
|
||||||
)
|
|
||||||
})
|
|
||||||
.await?;
|
|
||||||
Ok(buf)
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut options = MqttOptions::new(config.client_id.clone(), config.host.clone(), config.port);
|
|
||||||
if let Some(connect_timeout) = config.connect_timeout {
|
if let Some(connect_timeout) = config.connect_timeout {
|
||||||
options.set_connection_timeout(connect_timeout.as_secs());
|
options.set_connection_timeout(connect_timeout.as_secs());
|
||||||
}
|
}
|
||||||
@ -62,7 +38,7 @@ async fn init_mqtt(config: &MqttConfig) -> Result<(MqttAsyncClient, MqttEventLoo
|
|||||||
options.set_keep_alive(keep_alive);
|
options.set_keep_alive(keep_alive);
|
||||||
}
|
}
|
||||||
if let Some(ca_file) = &config.ca_file {
|
if let Some(ca_file) = &config.ca_file {
|
||||||
let ca = file_to_bytevec(ca_file).await?;
|
let ca = fs::read(ca_file).await?;
|
||||||
options.set_transport(Transport::Tls(TlsConfiguration::Simple {
|
options.set_transport(Transport::Tls(TlsConfiguration::Simple {
|
||||||
ca,
|
ca,
|
||||||
alpn: None,
|
alpn: None,
|
||||||
@ -71,8 +47,8 @@ async fn init_mqtt(config: &MqttConfig) -> Result<(MqttAsyncClient, MqttEventLoo
|
|||||||
private_key_file,
|
private_key_file,
|
||||||
}) = &config.auth
|
}) = &config.auth
|
||||||
{
|
{
|
||||||
let cert = file_to_bytevec(cert_file).await?;
|
let cert = fs::read(cert_file).await?;
|
||||||
let private_key = file_to_bytevec(private_key_file).await?;
|
let private_key = fs::read(private_key_file).await?;
|
||||||
Some((cert, Key::RSA(private_key)))
|
Some((cert, Key::RSA(private_key)))
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
@ -87,10 +63,10 @@ async fn init_mqtt(config: &MqttConfig) -> Result<(MqttAsyncClient, MqttEventLoo
|
|||||||
Ok(MqttAsyncClient::new(options, 100))
|
Ok(MqttAsyncClient::new(options, 100))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn init_db(config: &ConfigDatabase) -> Result<Database, String> {
|
fn init_db(config: &ConfigDatabase) -> anyhow::Result<Database> {
|
||||||
match config {
|
match config {
|
||||||
ConfigDatabase::Influxdb { url, auth, db_name, measurement } => {
|
ConfigDatabase::Influxdb { url, auth, db_name, measurement } => {
|
||||||
let mut client = InfluxClient::new(url.clone(), db_name.clone());
|
let mut client = InfluxClient::new(url, db_name);
|
||||||
if let Some(UserAuth { username, password }) = auth {
|
if let Some(UserAuth { username, password }) = auth {
|
||||||
client = client.with_auth(username, password);
|
client = client.with_auth(username, password);
|
||||||
}
|
}
|
||||||
@ -105,7 +81,7 @@ fn init_db(config: &ConfigDatabase) -> Result<Database, String> {
|
|||||||
async fn init_subscriptions(
|
async fn init_subscriptions(
|
||||||
mqtt_client: &mut MqttAsyncClient,
|
mqtt_client: &mut MqttAsyncClient,
|
||||||
topics: &Vec<&String>,
|
topics: &Vec<&String>,
|
||||||
) -> Result<(), String> {
|
) -> anyhow::Result<()> {
|
||||||
let topics: Vec<SubscribeFilter> = topics
|
let topics: Vec<SubscribeFilter> = topics
|
||||||
.iter()
|
.iter()
|
||||||
.map(|topic| {
|
.map(|topic| {
|
||||||
@ -115,8 +91,7 @@ async fn init_subscriptions(
|
|||||||
.collect();
|
.collect();
|
||||||
mqtt_client
|
mqtt_client
|
||||||
.subscribe_many(topics)
|
.subscribe_many(topics)
|
||||||
.await
|
.await?;
|
||||||
.map_err(|err| err.to_string())?;
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -124,7 +99,7 @@ async fn handle_publish(
|
|||||||
publish: &Publish,
|
publish: &Publish,
|
||||||
mapping: Arc<Mapping>,
|
mapping: Arc<Mapping>,
|
||||||
databases: Arc<Vec<Database>>,
|
databases: Arc<Vec<Database>>,
|
||||||
) -> Result<(), String> {
|
) -> anyhow::Result<()> {
|
||||||
debug!("Got publish: {:?}; {:?}", publish, publish.payload);
|
debug!("Got publish: {:?}; {:?}", publish, publish.payload);
|
||||||
|
|
||||||
let reference_values = publish
|
let reference_values = publish
|
||||||
@ -139,27 +114,27 @@ async fn handle_publish(
|
|||||||
let field_name = mapping.field_name.interpolate(&reference_values)?;
|
let field_name = mapping.field_name.interpolate(&reference_values)?;
|
||||||
|
|
||||||
let payload = String::from_utf8(Vec::from(publish.payload.as_ref()))
|
let payload = String::from_utf8(Vec::from(publish.payload.as_ref()))
|
||||||
.map_err(|err| format!("Invalid payload value: {}", err))?;
|
.map_err(|err| anyhow!("Invalid payload value: {}", err))?;
|
||||||
let (influx_value, timestamp) = match &mapping.payload {
|
let (influx_value, timestamp) = match &mapping.payload {
|
||||||
Payload::Raw => (payload.to_influx_type(mapping.value_type)?, None),
|
Payload::Raw => (payload.to_influx_type(mapping.value_type)?, None),
|
||||||
Payload::Json { value_field_selector, timestamp_field_selector } => {
|
Payload::Json { value_field_selector, timestamp_field_selector } => {
|
||||||
let payload_root: JsonValue = serde_json::from_str(&payload)
|
let payload_root: JsonValue = serde_json::from_str(&payload)
|
||||||
.map_err(|err| format!("Failed to parse payload as JSON: {}", err))?;
|
.map_err(|err| anyhow!("Failed to parse payload as JSON: {}", err))?;
|
||||||
let influx_value = value_field_selector
|
let influx_value = value_field_selector
|
||||||
.find(&payload_root)
|
.find(&payload_root)
|
||||||
.next()
|
.next()
|
||||||
.ok_or_else(|| format!("Couldn't find value in payload on topic {}", publish.topic))
|
.ok_or_else(|| anyhow!("Couldn't find value in payload on topic {}", publish.topic))
|
||||||
.and_then(|value| value.to_influx_type(mapping.value_type))?;
|
.and_then(|value| value.to_influx_type(mapping.value_type))?;
|
||||||
let timestamp = timestamp_field_selector
|
let timestamp = timestamp_field_selector
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.map(|selector| selector
|
.map(|selector| selector
|
||||||
.find(&payload_root)
|
.find(&payload_root)
|
||||||
.next()
|
.next()
|
||||||
.ok_or_else(|| format!("Couldn't find timestamp in payload on topic {}", publish.topic))
|
.ok_or_else(|| anyhow!("Couldn't find timestamp in payload on topic {}", publish.topic))
|
||||||
.and_then(|ts_value| ts_value
|
.and_then(|ts_value| ts_value
|
||||||
.as_u64()
|
.as_u64()
|
||||||
.map(|ts| ts as u128)
|
.map(|ts| ts as u128)
|
||||||
.ok_or_else(|| format!("'{}' cannot be converted to a timestamp", ts_value))
|
.ok_or_else(|| anyhow!("'{}' cannot be converted to a timestamp", ts_value))
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
.transpose()?;
|
.transpose()?;
|
||||||
@ -189,7 +164,7 @@ async fn handle_publish(
|
|||||||
.client
|
.client
|
||||||
.query(&query)
|
.query(&query)
|
||||||
.await
|
.await
|
||||||
.map_err(|err| format!("Failed to write to DB: {}", err))?;
|
.map_err(|err| anyhow!("Failed to write to DB: {}", err))?;
|
||||||
debug!("wrote to influx: {:?}", query);
|
debug!("wrote to influx: {:?}", query);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -247,10 +222,10 @@ async fn run_event_loop(
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<(), String> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
let config_filename = env::args()
|
let config_filename = env::args()
|
||||||
.nth(1)
|
.nth(1)
|
||||||
.ok_or_else(|| "Missing argument 'config filename'")?;
|
.ok_or_else(|| anyhow!("Missing argument 'config filename'"))?;
|
||||||
let config = Config::parse(&config_filename)?;
|
let config = Config::parse(&config_filename)?;
|
||||||
|
|
||||||
let logger_env = env_logger::Env::new()
|
let logger_env = env_logger::Env::new()
|
||||||
@ -266,7 +241,7 @@ async fn main() -> Result<(), String> {
|
|||||||
.mappings
|
.mappings
|
||||||
.iter()
|
.iter()
|
||||||
.map(Mapping::try_from)
|
.map(Mapping::try_from)
|
||||||
.collect::<Result<Vec<Mapping>, String>>()?;
|
.collect::<anyhow::Result<Vec<Mapping>>>()?;
|
||||||
|
|
||||||
let (mut mqtt_client, mqtt_event_loop) = init_mqtt(&config.mqtt).await?;
|
let (mut mqtt_client, mqtt_event_loop) = init_mqtt(&config.mqtt).await?;
|
||||||
init_subscriptions(
|
init_subscriptions(
|
||||||
@ -282,7 +257,7 @@ async fn main() -> Result<(), String> {
|
|||||||
let databases = config.databases
|
let databases = config.databases
|
||||||
.iter()
|
.iter()
|
||||||
.map(init_db)
|
.map(init_db)
|
||||||
.collect::<Result<Vec<Database>, String>>()?;
|
.collect::<anyhow::Result<Vec<Database>>>()?;
|
||||||
|
|
||||||
run_event_loop(mqtt_event_loop, mappings, databases).await;
|
run_event_loop(mqtt_event_loop, mappings, databases).await;
|
||||||
|
|
||||||
|
@ -14,13 +14,13 @@ pub enum TopicLevel {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl TryFrom<&str> for TopicLevel {
|
impl TryFrom<&str> for TopicLevel {
|
||||||
type Error = String;
|
type Error = anyhow::Error;
|
||||||
fn try_from(s: &str) -> Result<Self, Self::Error> {
|
fn try_from(s: &str) -> Result<Self, Self::Error> {
|
||||||
match s {
|
match s {
|
||||||
"+" => Ok(TopicLevel::SingleWildcard),
|
"+" => Ok(TopicLevel::SingleWildcard),
|
||||||
"#" => Ok(TopicLevel::MultiWildcard),
|
"#" => Ok(TopicLevel::MultiWildcard),
|
||||||
s if s.contains("+") || s.contains("#") => {
|
s if s.contains("+") || s.contains("#") => {
|
||||||
Err(format!("Topic level '{}' cannot contain '+' or '#'", s))
|
Err(anyhow!("Topic level '{}' cannot contain '+' or '#'", s))
|
||||||
}
|
}
|
||||||
s => Ok(TopicLevel::Literal(s.to_string())),
|
s => Ok(TopicLevel::Literal(s.to_string())),
|
||||||
}
|
}
|
||||||
@ -34,7 +34,7 @@ pub enum TagValue {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl TryFrom<&ConfigTagValue> for TagValue {
|
impl TryFrom<&ConfigTagValue> for TagValue {
|
||||||
type Error = String;
|
type Error = anyhow::Error;
|
||||||
fn try_from(tag_value: &ConfigTagValue) -> Result<Self, Self::Error> {
|
fn try_from(tag_value: &ConfigTagValue) -> Result<Self, Self::Error> {
|
||||||
match tag_value.r#type {
|
match tag_value.r#type {
|
||||||
ValueType::Text => {
|
ValueType::Text => {
|
||||||
@ -79,19 +79,19 @@ pub struct Mapping {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl TryFrom<&ConfigMapping> for Mapping {
|
impl TryFrom<&ConfigMapping> for Mapping {
|
||||||
type Error = String;
|
type Error = anyhow::Error;
|
||||||
fn try_from(mapping: &ConfigMapping) -> Result<Self, Self::Error> {
|
fn try_from(mapping: &ConfigMapping) -> Result<Self, Self::Error> {
|
||||||
let topic = mapping
|
let topic = mapping
|
||||||
.topic
|
.topic
|
||||||
.split("/")
|
.split("/")
|
||||||
.map(|level| TopicLevel::try_from(level))
|
.map(|level| TopicLevel::try_from(level))
|
||||||
.collect::<Result<Vec<TopicLevel>, String>>()?;
|
.collect::<anyhow::Result<Vec<TopicLevel>>>()?;
|
||||||
let pre_multi_levels: Vec<&TopicLevel> = topic
|
let pre_multi_levels: Vec<&TopicLevel> = topic
|
||||||
.iter()
|
.iter()
|
||||||
.take_while(|level| **level != TopicLevel::MultiWildcard)
|
.take_while(|level| **level != TopicLevel::MultiWildcard)
|
||||||
.collect();
|
.collect();
|
||||||
if pre_multi_levels.len() < topic.len() - 1 {
|
if pre_multi_levels.len() < topic.len() - 1 {
|
||||||
Err(format!(
|
Err(anyhow!(
|
||||||
"Topic '{}' has '#' wildcard before last topic level",
|
"Topic '{}' has '#' wildcard before last topic level",
|
||||||
mapping.topic
|
mapping.topic
|
||||||
))?;
|
))?;
|
||||||
@ -103,7 +103,7 @@ impl TryFrom<&ConfigMapping> for Mapping {
|
|||||||
.count();
|
.count();
|
||||||
|
|
||||||
let field_name = match InterpolatedName::try_from(mapping.field_name.as_str()) {
|
let field_name = match InterpolatedName::try_from(mapping.field_name.as_str()) {
|
||||||
Ok(name) if find_max_ref(&name) > max_interp_ref => Err(format!(
|
Ok(name) if find_max_ref(&name) > max_interp_ref => Err(anyhow!(
|
||||||
"Topic '{}' has field name '{}' which has invalid references",
|
"Topic '{}' has field name '{}' which has invalid references",
|
||||||
mapping.topic, mapping.field_name
|
mapping.topic, mapping.field_name
|
||||||
)),
|
)),
|
||||||
@ -115,10 +115,10 @@ impl TryFrom<&ConfigMapping> for Mapping {
|
|||||||
None => Payload::Raw,
|
None => Payload::Raw,
|
||||||
Some(ConfigPayload::Json { value_field_path, timestamp_field_path }) => {
|
Some(ConfigPayload::Json { value_field_path, timestamp_field_path }) => {
|
||||||
let value_field_selector = Selector::new(&value_field_path)
|
let value_field_selector = Selector::new(&value_field_path)
|
||||||
.map_err(|err| format!("Value field path '{}' is invalid: {}'", value_field_path, err))?;
|
.map_err(|err| anyhow!("Value field path '{}' is invalid: {}'", value_field_path, err))?;
|
||||||
let timestamp_field_selector = timestamp_field_path.as_ref()
|
let timestamp_field_selector = timestamp_field_path.as_ref()
|
||||||
.map(|path| Selector::new(path)
|
.map(|path| Selector::new(path)
|
||||||
.map_err(|err| format!("Timestamp field path '{}' is invalid: {}'", path, err))
|
.map_err(|err| anyhow!("Timestamp field path '{}' is invalid: {}'", path, err))
|
||||||
)
|
)
|
||||||
.transpose()?;
|
.transpose()?;
|
||||||
Payload::Json {
|
Payload::Json {
|
||||||
@ -133,7 +133,7 @@ impl TryFrom<&ConfigMapping> for Mapping {
|
|||||||
.iter()
|
.iter()
|
||||||
.map(|tag| match TagValue::try_from(tag.1) {
|
.map(|tag| match TagValue::try_from(tag.1) {
|
||||||
Ok(TagValue::InterpolatedStr(ref name)) if find_max_ref(name) > max_interp_ref => {
|
Ok(TagValue::InterpolatedStr(ref name)) if find_max_ref(name) > max_interp_ref => {
|
||||||
Err(format!(
|
Err(anyhow!(
|
||||||
"Topic '{}' has tag value '{:?}' which has invalid references",
|
"Topic '{}' has tag value '{:?}' which has invalid references",
|
||||||
mapping.topic, tag.1
|
mapping.topic, tag.1
|
||||||
))
|
))
|
||||||
@ -141,7 +141,7 @@ impl TryFrom<&ConfigMapping> for Mapping {
|
|||||||
Ok(value) => Ok((tag.0.clone(), value)),
|
Ok(value) => Ok((tag.0.clone(), value)),
|
||||||
Err(err) => Err(err),
|
Err(err) => Err(err),
|
||||||
})
|
})
|
||||||
.collect::<Result<Vec<(String, TagValue)>, String>>()?;
|
.collect::<anyhow::Result<Vec<(String, TagValue)>>>()?;
|
||||||
|
|
||||||
Ok(Mapping {
|
Ok(Mapping {
|
||||||
topic,
|
topic,
|
||||||
@ -167,7 +167,7 @@ mod test {
|
|||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn mapping_parsing() -> Result<(), String> {
|
fn mapping_parsing() -> anyhow::Result<()> {
|
||||||
use TopicLevel::*;
|
use TopicLevel::*;
|
||||||
|
|
||||||
fn mk_cfg_mapping(topic: &str) -> ConfigMapping {
|
fn mk_cfg_mapping(topic: &str) -> ConfigMapping {
|
||||||
|
22
src/value.rs
22
src/value.rs
@ -28,53 +28,53 @@ impl fmt::Display for ValueType {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub trait ToInfluxType {
|
pub trait ToInfluxType {
|
||||||
fn to_influx_type(&self, value_type: ValueType) -> Result<Type, String>;
|
fn to_influx_type(&self, value_type: ValueType) -> anyhow::Result<Type>;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ToInfluxType for String {
|
impl ToInfluxType for String {
|
||||||
fn to_influx_type(&self, value_type: ValueType) -> Result<Type, String> {
|
fn to_influx_type(&self, value_type: ValueType) -> anyhow::Result<Type> {
|
||||||
match value_type {
|
match value_type {
|
||||||
ValueType::Boolean if self == "true" => Ok(Type::Boolean(true)),
|
ValueType::Boolean if self == "true" => Ok(Type::Boolean(true)),
|
||||||
ValueType::Boolean if self == "false" => Ok(Type::Boolean(false)),
|
ValueType::Boolean if self == "false" => Ok(Type::Boolean(false)),
|
||||||
ValueType::Boolean => Err(format!("Value '{}' is not a valid boolean", self)),
|
ValueType::Boolean => Err(anyhow!("Value '{}' is not a valid boolean", self)),
|
||||||
ValueType::Float => self
|
ValueType::Float => self
|
||||||
.parse::<f64>()
|
.parse::<f64>()
|
||||||
.map(|v| Type::Float(v))
|
.map(|v| Type::Float(v))
|
||||||
.map_err(|err| err.to_string()),
|
.map_err(|err| err.into()),
|
||||||
ValueType::SignedInteger => self
|
ValueType::SignedInteger => self
|
||||||
.parse::<i64>()
|
.parse::<i64>()
|
||||||
.map(|v| Type::SignedInteger(v))
|
.map(|v| Type::SignedInteger(v))
|
||||||
.map_err(|err| err.to_string()),
|
.map_err(|err| err.into()),
|
||||||
ValueType::UnsignedInteger => self
|
ValueType::UnsignedInteger => self
|
||||||
.parse::<u64>()
|
.parse::<u64>()
|
||||||
.map(|v| Type::UnsignedInteger(v))
|
.map(|v| Type::UnsignedInteger(v))
|
||||||
.map_err(|err| err.to_string()),
|
.map_err(|err| err.into()),
|
||||||
ValueType::Text => Ok(Type::Text(self.clone())),
|
ValueType::Text => Ok(Type::Text(self.clone())),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ToInfluxType for JsonValue {
|
impl ToInfluxType for JsonValue {
|
||||||
fn to_influx_type(&self, value_type: ValueType) -> Result<Type, String> {
|
fn to_influx_type(&self, value_type: ValueType) -> anyhow::Result<Type> {
|
||||||
match (value_type, self) {
|
match (value_type, self) {
|
||||||
(ValueType::Boolean, JsonValue::Bool(true)) => Ok(Type::Boolean(true)),
|
(ValueType::Boolean, JsonValue::Bool(true)) => Ok(Type::Boolean(true)),
|
||||||
(ValueType::Boolean, JsonValue::Bool(false)) => Ok(Type::Boolean(false)),
|
(ValueType::Boolean, JsonValue::Bool(false)) => Ok(Type::Boolean(false)),
|
||||||
(ValueType::Float, JsonValue::Number(num)) => num
|
(ValueType::Float, JsonValue::Number(num)) => num
|
||||||
.as_f64()
|
.as_f64()
|
||||||
.ok_or_else(|| format!("Cannot be expressed as f64: {}", num))
|
.ok_or_else(|| anyhow!("Cannot be expressed as f64: {}", num))
|
||||||
.map(|v| Type::Float(v)),
|
.map(|v| Type::Float(v)),
|
||||||
(ValueType::SignedInteger, JsonValue::Number(num)) => num
|
(ValueType::SignedInteger, JsonValue::Number(num)) => num
|
||||||
.as_i64()
|
.as_i64()
|
||||||
.ok_or_else(|| format!("Cannot be expressed as i64:{}", num))
|
.ok_or_else(|| anyhow!("Cannot be expressed as i64: {}", num))
|
||||||
.map(|v| Type::SignedInteger(v)),
|
.map(|v| Type::SignedInteger(v)),
|
||||||
(ValueType::UnsignedInteger, JsonValue::Number(num)) => num
|
(ValueType::UnsignedInteger, JsonValue::Number(num)) => num
|
||||||
.as_u64()
|
.as_u64()
|
||||||
.ok_or_else(|| format!("Cannot be expressed as u64:{}", num))
|
.ok_or_else(|| anyhow!("Cannot be expressed as u64: {}", num))
|
||||||
.map(|v| Type::UnsignedInteger(v)),
|
.map(|v| Type::UnsignedInteger(v)),
|
||||||
(ValueType::Text, JsonValue::String(s)) => Ok(Type::Text(s.to_string())),
|
(ValueType::Text, JsonValue::String(s)) => Ok(Type::Text(s.to_string())),
|
||||||
(ValueType::Text, JsonValue::Bool(b)) => Ok(Type::Text(b.to_string())),
|
(ValueType::Text, JsonValue::Bool(b)) => Ok(Type::Text(b.to_string())),
|
||||||
(ValueType::Text, JsonValue::Number(num)) => Ok(Type::Text(num.to_string())),
|
(ValueType::Text, JsonValue::Number(num)) => Ok(Type::Text(num.to_string())),
|
||||||
(other_type, other_value) => Err(format!("Unable to parse self from JSON; need type {} but got '{}'", other_type, other_value)),
|
(other_type, other_value) => Err(anyhow!("Unable to parse self from JSON; need type {} but got '{}'", other_type, other_value)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user