Initial import
This commit is contained in:
commit
525b5152ef
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
/target
|
||||||
|
.tags
|
2338
Cargo.lock
generated
Normal file
2338
Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
19
Cargo.toml
Normal file
19
Cargo.toml
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
[package]
|
||||||
|
name = "mqtt2db"
|
||||||
|
version = "0.1.0"
|
||||||
|
authors = ["Brian J. Tarricone <brian@tarricone.org>"]
|
||||||
|
edition = "2018"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
chrono = { version = "0.4", features = ["serde"] }
|
||||||
|
clap = "2.33"
|
||||||
|
env_logger = "0.9"
|
||||||
|
futures = "0.3"
|
||||||
|
influxdb = { version = "0.4", features = ["derive"] }
|
||||||
|
lazy_static = "1"
|
||||||
|
log = { version = "0.4", features = ["std", "serde"] }
|
||||||
|
regex = "1"
|
||||||
|
rumqttc = "0.8"
|
||||||
|
serde = { version = "1", features = ["derive"] }
|
||||||
|
serde_yaml = "0.8"
|
||||||
|
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "rt", "rt-multi-thread"] }
|
83
src/config.rs
Normal file
83
src/config.rs
Normal file
@ -0,0 +1,83 @@
|
|||||||
|
use log::LevelFilter;
|
||||||
|
use serde::Deserialize;
|
||||||
|
use serde_yaml::from_str;
|
||||||
|
use std::io::Read;
|
||||||
|
use std::{collections::HashMap, fs::File, path::Path, time::Duration};
|
||||||
|
|
||||||
|
use crate::value::ValueType;
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase", untagged)]
|
||||||
|
pub enum MqttAuth {
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
UserPass { username: String, password: String },
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
Certificate {
|
||||||
|
cert_file: String,
|
||||||
|
private_key_file: String,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct MqttConfig {
|
||||||
|
pub host: String,
|
||||||
|
pub port: u16,
|
||||||
|
pub client_id: String,
|
||||||
|
pub auth: Option<MqttAuth>,
|
||||||
|
pub ca_file: Option<String>,
|
||||||
|
pub connect_timeout: Option<Duration>,
|
||||||
|
pub keep_alive: Option<Duration>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct TagValue {
|
||||||
|
pub r#type: ValueType,
|
||||||
|
pub value: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct UserAuth {
|
||||||
|
pub username: String,
|
||||||
|
pub password: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct InfluxDBConfig {
|
||||||
|
pub url: String,
|
||||||
|
pub auth: Option<UserAuth>,
|
||||||
|
pub db_name: String,
|
||||||
|
pub measurement: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct Mapping {
|
||||||
|
pub topic: String,
|
||||||
|
pub field_name: String,
|
||||||
|
pub value_type: ValueType,
|
||||||
|
pub tags: HashMap<String, TagValue>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct Config {
|
||||||
|
pub log_level: Option<LevelFilter>,
|
||||||
|
pub mqtt: MqttConfig,
|
||||||
|
pub database: InfluxDBConfig,
|
||||||
|
pub mappings: Vec<Mapping>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Config {
|
||||||
|
pub fn parse<P: AsRef<Path>>(filename: P) -> Result<Config, String> {
|
||||||
|
let mut f = File::open(filename).map_err(|err| err.to_string())?;
|
||||||
|
let mut contents = String::new();
|
||||||
|
f.read_to_string(&mut contents)
|
||||||
|
.map_err(|err| err.to_string())?;
|
||||||
|
let config: Config = from_str(&contents).map_err(|err| err.to_string())?;
|
||||||
|
Ok(config)
|
||||||
|
}
|
||||||
|
}
|
152
src/interpolate.rs
Normal file
152
src/interpolate.rs
Normal file
@ -0,0 +1,152 @@
|
|||||||
|
use lazy_static::lazy_static;
|
||||||
|
use regex::Regex;
|
||||||
|
use std::convert::TryFrom;
|
||||||
|
|
||||||
|
lazy_static! {
|
||||||
|
static ref REFERENCE_RE: Regex = Regex::new(r"(^|[^\\])(\$(\d+))").unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, PartialEq)]
|
||||||
|
pub struct InterpolatedName {
|
||||||
|
pub parts: Vec<InterpolatedNamePart>,
|
||||||
|
n_references: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, PartialEq)]
|
||||||
|
pub enum InterpolatedNamePart {
|
||||||
|
Literal(String),
|
||||||
|
Reference(usize),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TryFrom<&str> for InterpolatedName {
|
||||||
|
type Error = String;
|
||||||
|
fn try_from(s: &str) -> Result<InterpolatedName, String> {
|
||||||
|
let mut parts: Vec<InterpolatedNamePart> = Vec::new();
|
||||||
|
let mut n_references: usize = 0;
|
||||||
|
let mut pos: usize = 0;
|
||||||
|
|
||||||
|
for cap in REFERENCE_RE.captures_iter(s) {
|
||||||
|
let mat = cap
|
||||||
|
.get(2)
|
||||||
|
.ok_or_else(|| format!("Unable to get full match for name '{}'", s))?;
|
||||||
|
if pos < mat.start() {
|
||||||
|
parts.push(InterpolatedNamePart::Literal(
|
||||||
|
s.chars()
|
||||||
|
.skip(pos)
|
||||||
|
.take(mat.start() - pos)
|
||||||
|
.collect::<String>()
|
||||||
|
.replace("\\", ""),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let num_str = cap
|
||||||
|
.get(3)
|
||||||
|
.map(|mat1| mat1.as_str())
|
||||||
|
.ok_or_else(|| format!("Unable to get capture group for name '{}'", s))?;
|
||||||
|
let num = num_str
|
||||||
|
.parse::<usize>()
|
||||||
|
.map_err(|_| format!("Couldn't parse '{}' as number for name '{}'", num_str, s))?;
|
||||||
|
if num == 0 {
|
||||||
|
Err(format!("Invalid reference number 0 for name '{}'", s))?;
|
||||||
|
}
|
||||||
|
parts.push(InterpolatedNamePart::Reference(num));
|
||||||
|
n_references += 1;
|
||||||
|
|
||||||
|
pos = mat.end();
|
||||||
|
}
|
||||||
|
|
||||||
|
if pos < s.len() {
|
||||||
|
parts.push(InterpolatedNamePart::Literal(
|
||||||
|
s.chars().skip(pos).collect::<String>().replace("\\", ""),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(InterpolatedName {
|
||||||
|
parts,
|
||||||
|
n_references,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl InterpolatedName {
|
||||||
|
pub fn interpolate<S: AsRef<str>>(&self, reference_values: &Vec<S>) -> Result<String, String> {
|
||||||
|
self.parts
|
||||||
|
.iter()
|
||||||
|
.fold(Ok(String::new()), |accum, part| match accum {
|
||||||
|
Ok(mut accum) => match part {
|
||||||
|
InterpolatedNamePart::Literal(s) => {
|
||||||
|
accum.push_str(s.as_str());
|
||||||
|
Ok(accum)
|
||||||
|
}
|
||||||
|
InterpolatedNamePart::Reference(num) => match reference_values.get(*num - 1) {
|
||||||
|
Some(reference_value) => {
|
||||||
|
accum.push_str(reference_value.as_ref());
|
||||||
|
Ok(accum)
|
||||||
|
}
|
||||||
|
None => Err(format!(
|
||||||
|
"Can't find reference number {} to interpolate",
|
||||||
|
num
|
||||||
|
)),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Err(err) => Err(err),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn interpolated_name_parsing() -> Result<(), String> {
|
||||||
|
use InterpolatedNamePart::*;
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
vec![Literal("foo".to_string())],
|
||||||
|
InterpolatedName::try_from("foo")?.parts
|
||||||
|
);
|
||||||
|
|
||||||
|
assert_eq!(vec![Reference(1)], InterpolatedName::try_from("$1")?.parts);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
vec![
|
||||||
|
Literal("foo".to_string()),
|
||||||
|
Reference(1),
|
||||||
|
Literal("bar".to_string()),
|
||||||
|
Reference(2),
|
||||||
|
Literal(" baz".to_string())
|
||||||
|
],
|
||||||
|
InterpolatedName::try_from("foo$1bar$2 baz")?.parts
|
||||||
|
);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
vec![
|
||||||
|
Literal("$1foo".to_string()),
|
||||||
|
Reference(1),
|
||||||
|
Literal("$2".to_string())
|
||||||
|
],
|
||||||
|
InterpolatedName::try_from("\\$1foo$1\\$2")?.parts
|
||||||
|
);
|
||||||
|
|
||||||
|
assert!(InterpolatedName::try_from("$0").is_err());
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn interpolation() -> Result<(), String> {
|
||||||
|
let interp = InterpolatedName::try_from("foo$1bar$2 baz $1")?;
|
||||||
|
assert_eq!(
|
||||||
|
"foofirstbarsecond baz first".to_string(),
|
||||||
|
interp
|
||||||
|
.interpolate(&vec!["first".to_string(), "second".to_string()])
|
||||||
|
.unwrap()
|
||||||
|
);
|
||||||
|
|
||||||
|
let empty: Vec<String> = vec![];
|
||||||
|
assert!(interp.interpolate(&empty).is_err());
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
254
src/main.rs
Normal file
254
src/main.rs
Normal file
@ -0,0 +1,254 @@
|
|||||||
|
#[macro_use]
|
||||||
|
extern crate log;
|
||||||
|
|
||||||
|
use config::{Config, InfluxDBConfig, MqttAuth, MqttConfig, UserAuth};
|
||||||
|
use futures::TryFutureExt;
|
||||||
|
use influxdb::InfluxDbWriteable;
|
||||||
|
use influxdb::{Client as InfluxClient, Timestamp, Type};
|
||||||
|
use log::LevelFilter;
|
||||||
|
use mapping::{Mapping, TagValue, TopicLevel};
|
||||||
|
use rumqttc::{
|
||||||
|
AsyncClient as MqttAsyncClient, Event, EventLoop as MqttEventLoop, Key, MqttOptions, Packet,
|
||||||
|
Publish, QoS, SubscribeFilter, TlsConfiguration, Transport,
|
||||||
|
};
|
||||||
|
use std::convert::TryFrom;
|
||||||
|
use std::env;
|
||||||
|
use std::path::Path;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::{SystemTime, UNIX_EPOCH};
|
||||||
|
use tokio::fs::File;
|
||||||
|
use tokio::io::AsyncReadExt;
|
||||||
|
|
||||||
|
mod config;
|
||||||
|
mod interpolate;
|
||||||
|
mod mapping;
|
||||||
|
mod value;
|
||||||
|
|
||||||
|
struct Database {
|
||||||
|
client: InfluxClient,
|
||||||
|
measurement: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn init_mqtt(config: &MqttConfig) -> Result<(MqttAsyncClient, MqttEventLoop), String> {
|
||||||
|
async fn file_to_bytevec<P: AsRef<Path>>(file: P) -> Result<Vec<u8>, String> {
|
||||||
|
let mut f = File::open(&file)
|
||||||
|
.map_err(|err| {
|
||||||
|
format!(
|
||||||
|
"Unable to open {}: {}",
|
||||||
|
file.as_ref().to_string_lossy(),
|
||||||
|
err
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
let mut buf = Vec::new();
|
||||||
|
f.read_to_end(&mut buf)
|
||||||
|
.map_err(|err| {
|
||||||
|
format!(
|
||||||
|
"Unable to read {}: {}",
|
||||||
|
file.as_ref().to_string_lossy(),
|
||||||
|
err
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
Ok(buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut options = MqttOptions::new(config.client_id.clone(), config.host.clone(), config.port);
|
||||||
|
if let Some(connect_timeout) = config.connect_timeout {
|
||||||
|
options.set_connection_timeout(connect_timeout.as_secs());
|
||||||
|
}
|
||||||
|
if let Some(keep_alive) = config.keep_alive {
|
||||||
|
let keep_alive_secs = u16::try_from(keep_alive.as_secs())
|
||||||
|
.map_err(|_| "Keep alive time must be between 0 and 65535".to_string())?;
|
||||||
|
options.set_keep_alive(keep_alive_secs);
|
||||||
|
}
|
||||||
|
if let Some(ca_file) = &config.ca_file {
|
||||||
|
let ca = file_to_bytevec(ca_file).await?;
|
||||||
|
options.set_transport(Transport::Tls(TlsConfiguration::Simple {
|
||||||
|
ca,
|
||||||
|
alpn: None,
|
||||||
|
client_auth: if let Some(MqttAuth::Certificate {
|
||||||
|
cert_file,
|
||||||
|
private_key_file,
|
||||||
|
}) = &config.auth
|
||||||
|
{
|
||||||
|
let cert = file_to_bytevec(cert_file).await?;
|
||||||
|
let private_key = file_to_bytevec(private_key_file).await?;
|
||||||
|
Some((cert, Key::RSA(private_key)))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
},
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(MqttAuth::UserPass { username, password }) = &config.auth {
|
||||||
|
options.set_credentials(username, password);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(MqttAsyncClient::new(options, 100))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn init_db(config: &InfluxDBConfig) -> Result<Database, String> {
|
||||||
|
let mut client = InfluxClient::new(config.url.clone(), config.db_name.clone());
|
||||||
|
if let Some(UserAuth { username, password }) = &config.auth {
|
||||||
|
client = client.with_auth(username, password);
|
||||||
|
}
|
||||||
|
Ok(Database {
|
||||||
|
client,
|
||||||
|
measurement: config.measurement.clone(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn init_subscriptions(
|
||||||
|
mqtt_client: &mut MqttAsyncClient,
|
||||||
|
topics: &Vec<&String>,
|
||||||
|
) -> Result<(), String> {
|
||||||
|
let topics: Vec<SubscribeFilter> = topics
|
||||||
|
.iter()
|
||||||
|
.map(|topic| {
|
||||||
|
info!("Subscribing to topic '{}'", topic);
|
||||||
|
SubscribeFilter::new((*topic).clone(), QoS::AtLeastOnce)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
mqtt_client
|
||||||
|
.subscribe_many(topics)
|
||||||
|
.await
|
||||||
|
.map_err(|err| err.to_string())?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_publish(
|
||||||
|
publish: &Publish,
|
||||||
|
mapping: Arc<Mapping>,
|
||||||
|
database: Arc<Database>,
|
||||||
|
) -> Result<(), String> {
|
||||||
|
debug!("Got publish: {:?}; {:?}", publish, publish.payload);
|
||||||
|
|
||||||
|
let reference_values = publish
|
||||||
|
.topic
|
||||||
|
.split("/")
|
||||||
|
.zip(mapping.topic.iter())
|
||||||
|
.flat_map(|pair| match pair.1 {
|
||||||
|
TopicLevel::SingleWildcard => Some(pair.0),
|
||||||
|
_ => None,
|
||||||
|
})
|
||||||
|
.collect::<Vec<&str>>();
|
||||||
|
let field_name = mapping.field_name.interpolate(&reference_values)?;
|
||||||
|
|
||||||
|
let value = String::from_utf8(Vec::from(publish.payload.as_ref()))
|
||||||
|
.map_err(|err| format!("Invalid payload value: {}", err))?;
|
||||||
|
let influx_value = mapping.value_type.parse(&value)?;
|
||||||
|
|
||||||
|
let now = SystemTime::now()
|
||||||
|
.duration_since(UNIX_EPOCH)
|
||||||
|
.unwrap()
|
||||||
|
.as_millis();
|
||||||
|
let mut query = Timestamp::Milliseconds(now)
|
||||||
|
.into_query(&database.measurement)
|
||||||
|
.add_field(&field_name, influx_value);
|
||||||
|
for tag in mapping.tags.iter() {
|
||||||
|
let value = match &tag.1 {
|
||||||
|
TagValue::Literal(v) => v.clone(),
|
||||||
|
TagValue::InterpolatedStr(interp) => Type::Text(interp.interpolate(&reference_values)?),
|
||||||
|
};
|
||||||
|
query = query.add_tag(&tag.0, value);
|
||||||
|
}
|
||||||
|
database
|
||||||
|
.client
|
||||||
|
.query(&query)
|
||||||
|
.await
|
||||||
|
.map_err(|err| format!("Failed to write to DB: {}", err))?;
|
||||||
|
debug!("wrote to influx: {:?}", query);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn find_mapping<'a>(mappings: &'a Vec<Arc<Mapping>>, topic: &String) -> Option<&'a Arc<Mapping>> {
|
||||||
|
let levels: Vec<&str> = topic.split("/").collect();
|
||||||
|
mappings.iter().find(|mapping| {
|
||||||
|
let mut iter = levels.iter();
|
||||||
|
for expected_level in mapping.topic.iter() {
|
||||||
|
let maybe_cur_level = iter.next();
|
||||||
|
match (expected_level, maybe_cur_level) {
|
||||||
|
(TopicLevel::SingleWildcard, Some(_)) => (), // current level exists and anything matches
|
||||||
|
(TopicLevel::MultiWildcard, _) => return true, // rest of topic, if any, will match no matter what
|
||||||
|
(TopicLevel::Literal(expected_literal), Some(cur_level))
|
||||||
|
if expected_literal == cur_level =>
|
||||||
|
{
|
||||||
|
()
|
||||||
|
} // current level matches
|
||||||
|
_ => return false, // current level doesn't match or doesn't exist
|
||||||
|
}
|
||||||
|
}
|
||||||
|
iter.next().is_none() // only matches if we consumed all topic levels
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run_event_loop(
|
||||||
|
mut event_loop: MqttEventLoop,
|
||||||
|
mappings: &Vec<Arc<Mapping>>,
|
||||||
|
database: Arc<Database>,
|
||||||
|
) {
|
||||||
|
loop {
|
||||||
|
match event_loop.poll().await {
|
||||||
|
Ok(Event::Incoming(Packet::Publish(publish))) => {
|
||||||
|
if let Some(mapping) = find_mapping(&mappings, &publish.topic) {
|
||||||
|
let mapping = Arc::clone(mapping);
|
||||||
|
let database = Arc::clone(&database);
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(err) = handle_publish(&publish, mapping, database).await {
|
||||||
|
warn!("{}", err);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
warn!("Topic {} not found in mappings", publish.topic);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(Event::Incoming(_event)) => (),
|
||||||
|
Ok(Event::Outgoing(_event)) => (),
|
||||||
|
Err(err) => warn!("Error from MQTT loop: {}", err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> Result<(), String> {
|
||||||
|
let config_filename = env::args()
|
||||||
|
.nth(1)
|
||||||
|
.ok_or_else(|| "Missing argument 'config filename'")?;
|
||||||
|
let config = Config::parse(&config_filename)?;
|
||||||
|
|
||||||
|
let logger_env = env_logger::Env::new()
|
||||||
|
.filter("MQTT2DB_LOG")
|
||||||
|
.write_style("MQTT2DB_LOG_STYLE");
|
||||||
|
let mut logger_builder = env_logger::Builder::from_env(logger_env);
|
||||||
|
logger_builder
|
||||||
|
.filter_level(config.log_level.unwrap_or(LevelFilter::Info))
|
||||||
|
.init();
|
||||||
|
|
||||||
|
let mappings: Vec<Arc<Mapping>> = config
|
||||||
|
.mappings
|
||||||
|
.iter()
|
||||||
|
.map(Mapping::try_from)
|
||||||
|
.collect::<Result<Vec<Mapping>, String>>()?
|
||||||
|
.into_iter()
|
||||||
|
.map(Arc::new)
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let (mut mqtt_client, mqtt_event_loop) = init_mqtt(&config.mqtt).await?;
|
||||||
|
init_subscriptions(
|
||||||
|
&mut mqtt_client,
|
||||||
|
&config
|
||||||
|
.mappings
|
||||||
|
.iter()
|
||||||
|
.map(|mapping| &mapping.topic)
|
||||||
|
.collect(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let database = Arc::new(init_db(&config.database)?);
|
||||||
|
|
||||||
|
run_event_loop(mqtt_event_loop, &mappings, database).await;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
202
src/mapping.rs
Normal file
202
src/mapping.rs
Normal file
@ -0,0 +1,202 @@
|
|||||||
|
use influxdb::Type;
|
||||||
|
use std::convert::TryFrom;
|
||||||
|
|
||||||
|
use crate::config::{Mapping as ConfigMapping, TagValue as ConfigTagValue};
|
||||||
|
use crate::interpolate::{InterpolatedName, InterpolatedNamePart};
|
||||||
|
use crate::value::ValueType;
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, PartialEq)]
|
||||||
|
pub enum TopicLevel {
|
||||||
|
Literal(String),
|
||||||
|
SingleWildcard,
|
||||||
|
MultiWildcard,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TryFrom<&str> for TopicLevel {
|
||||||
|
type Error = String;
|
||||||
|
fn try_from(s: &str) -> Result<Self, Self::Error> {
|
||||||
|
match s {
|
||||||
|
"+" => Ok(TopicLevel::SingleWildcard),
|
||||||
|
"#" => Ok(TopicLevel::MultiWildcard),
|
||||||
|
s if s.contains("+") || s.contains("#") => {
|
||||||
|
Err(format!("Topic level '{}' cannot contain '+' or '#'", s))
|
||||||
|
}
|
||||||
|
s => Ok(TopicLevel::Literal(s.to_string())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum TagValue {
|
||||||
|
InterpolatedStr(InterpolatedName),
|
||||||
|
Literal(Type),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TryFrom<&ConfigTagValue> for TagValue {
|
||||||
|
type Error = String;
|
||||||
|
fn try_from(tag_value: &ConfigTagValue) -> Result<Self, Self::Error> {
|
||||||
|
match tag_value.r#type {
|
||||||
|
ValueType::Text => {
|
||||||
|
let interp = InterpolatedName::try_from(tag_value.value.as_str())?;
|
||||||
|
match interp.parts.get(0) {
|
||||||
|
Some(InterpolatedNamePart::Literal(literal)) if interp.parts.len() == 1 => {
|
||||||
|
Ok(TagValue::Literal(Type::Text(literal.clone())))
|
||||||
|
}
|
||||||
|
_ => Ok(TagValue::InterpolatedStr(interp)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
other => other.parse(&tag_value.value).map(TagValue::Literal),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Mapping {
|
||||||
|
pub topic: Vec<TopicLevel>,
|
||||||
|
pub field_name: InterpolatedName,
|
||||||
|
pub value_type: ValueType,
|
||||||
|
pub tags: Vec<(String, TagValue)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TryFrom<&ConfigMapping> for Mapping {
|
||||||
|
type Error = String;
|
||||||
|
fn try_from(mapping: &ConfigMapping) -> Result<Self, Self::Error> {
|
||||||
|
let topic = mapping
|
||||||
|
.topic
|
||||||
|
.split("/")
|
||||||
|
.map(|level| TopicLevel::try_from(level))
|
||||||
|
.collect::<Result<Vec<TopicLevel>, String>>()?;
|
||||||
|
let pre_multi_levels: Vec<&TopicLevel> = topic
|
||||||
|
.iter()
|
||||||
|
.take_while(|level| **level != TopicLevel::MultiWildcard)
|
||||||
|
.collect();
|
||||||
|
if pre_multi_levels.len() < topic.len() - 1 {
|
||||||
|
Err(format!(
|
||||||
|
"Topic '{}' has '#' wildcard before last topic level",
|
||||||
|
mapping.topic
|
||||||
|
))?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let max_interp_ref = topic
|
||||||
|
.iter()
|
||||||
|
.filter(|level| **level == TopicLevel::SingleWildcard)
|
||||||
|
.count();
|
||||||
|
|
||||||
|
let field_name = match InterpolatedName::try_from(mapping.field_name.as_str()) {
|
||||||
|
Ok(name) if find_max_ref(&name) > max_interp_ref => Err(format!(
|
||||||
|
"Topic '{}' has field name '{}' which has invalid references",
|
||||||
|
mapping.topic, mapping.field_name
|
||||||
|
)),
|
||||||
|
Ok(name) => Ok(name),
|
||||||
|
Err(err) => Err(err),
|
||||||
|
}?;
|
||||||
|
|
||||||
|
let tags = mapping
|
||||||
|
.tags
|
||||||
|
.iter()
|
||||||
|
.map(|tag| match TagValue::try_from(tag.1) {
|
||||||
|
Ok(TagValue::InterpolatedStr(ref name)) if find_max_ref(name) > max_interp_ref => {
|
||||||
|
Err(format!(
|
||||||
|
"Topic '{}' has tag value '{:?}' which has invalid references",
|
||||||
|
mapping.topic, tag.1
|
||||||
|
))
|
||||||
|
}
|
||||||
|
Ok(value) => Ok((tag.0.clone(), value)),
|
||||||
|
Err(err) => Err(err),
|
||||||
|
})
|
||||||
|
.collect::<Result<Vec<(String, TagValue)>, String>>()?;
|
||||||
|
|
||||||
|
Ok(Mapping {
|
||||||
|
topic,
|
||||||
|
field_name,
|
||||||
|
value_type: mapping.value_type,
|
||||||
|
tags,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn find_max_ref(name: &InterpolatedName) -> usize {
|
||||||
|
name.parts.iter().fold(0, |max_ref, part| match part {
|
||||||
|
InterpolatedNamePart::Reference(num) if *num > max_ref => *num,
|
||||||
|
_ => max_ref,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test {
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn mapping_parsing() -> Result<(), String> {
|
||||||
|
use TopicLevel::*;
|
||||||
|
|
||||||
|
fn mk_cfg_mapping(topic: &str) -> ConfigMapping {
|
||||||
|
ConfigMapping {
|
||||||
|
topic: topic.to_string(),
|
||||||
|
field_name: "".to_string(),
|
||||||
|
value_type: ValueType::Text,
|
||||||
|
tags: HashMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
vec![Literal("foo".to_string()), Literal("bar".to_string())],
|
||||||
|
Mapping::try_from(&mk_cfg_mapping("foo/bar"))?.topic
|
||||||
|
);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
vec![
|
||||||
|
Literal("".to_string()),
|
||||||
|
Literal("foo".to_string()),
|
||||||
|
Literal("bar".to_string())
|
||||||
|
],
|
||||||
|
Mapping::try_from(&mk_cfg_mapping("/foo/bar"))?.topic
|
||||||
|
);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
vec![
|
||||||
|
Literal("foo".to_string()),
|
||||||
|
Literal("bar".to_string()),
|
||||||
|
Literal("".to_string())
|
||||||
|
],
|
||||||
|
Mapping::try_from(&mk_cfg_mapping("foo/bar/"))?.topic
|
||||||
|
);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
vec![
|
||||||
|
Literal("foo".to_string()),
|
||||||
|
Literal("bar".to_string()),
|
||||||
|
MultiWildcard
|
||||||
|
],
|
||||||
|
Mapping::try_from(&mk_cfg_mapping("foo/bar/#"))?.topic
|
||||||
|
);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
vec![
|
||||||
|
Literal("foo".to_string()),
|
||||||
|
SingleWildcard,
|
||||||
|
Literal("bar".to_string())
|
||||||
|
],
|
||||||
|
Mapping::try_from(&mk_cfg_mapping("foo/+/bar"))?.topic
|
||||||
|
);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
vec![
|
||||||
|
Literal("foo".to_string()),
|
||||||
|
SingleWildcard,
|
||||||
|
Literal("bar".to_string()),
|
||||||
|
MultiWildcard
|
||||||
|
],
|
||||||
|
Mapping::try_from(&mk_cfg_mapping("foo/+/bar/#"))?.topic
|
||||||
|
);
|
||||||
|
|
||||||
|
assert!(Mapping::try_from(&mk_cfg_mapping("foo/#/bar")).is_err());
|
||||||
|
assert!(Mapping::try_from(&mk_cfg_mapping("foo/bar#")).is_err());
|
||||||
|
assert!(Mapping::try_from(&mk_cfg_mapping("foo/bar+baz/quux")).is_err());
|
||||||
|
assert!(Mapping::try_from(&mk_cfg_mapping("foo/bar#baz/quux")).is_err());
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
35
src/value.rs
Normal file
35
src/value.rs
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
use influxdb::Type;
|
||||||
|
use serde::Deserialize;
|
||||||
|
|
||||||
|
#[derive(Copy, Clone, Debug, Deserialize)]
|
||||||
|
#[serde(rename_all = "kebab-case")]
|
||||||
|
pub enum ValueType {
|
||||||
|
Boolean,
|
||||||
|
Float,
|
||||||
|
SignedInteger,
|
||||||
|
UnsignedInteger,
|
||||||
|
Text,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ValueType {
|
||||||
|
pub fn parse(self, value: &String) -> Result<Type, String> {
|
||||||
|
match self {
|
||||||
|
ValueType::Boolean if value == "true" => Ok(Type::Boolean(true)),
|
||||||
|
ValueType::Boolean if value == "false" => Ok(Type::Boolean(false)),
|
||||||
|
ValueType::Boolean => Err(format!("Value '{}' is not a valid boolean", value)),
|
||||||
|
ValueType::Float => value
|
||||||
|
.parse::<f64>()
|
||||||
|
.map(|v| Type::Float(v))
|
||||||
|
.map_err(|err| err.to_string()),
|
||||||
|
ValueType::SignedInteger => value
|
||||||
|
.parse::<i64>()
|
||||||
|
.map(|v| Type::SignedInteger(v))
|
||||||
|
.map_err(|err| err.to_string()),
|
||||||
|
ValueType::UnsignedInteger => value
|
||||||
|
.parse::<u64>()
|
||||||
|
.map(|v| Type::UnsignedInteger(v))
|
||||||
|
.map_err(|err| err.to_string()),
|
||||||
|
ValueType::Text => Ok(Type::Text(value.clone())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user