First step toward supporting multiple database writes

This commit is contained in:
Brian Tarricone 2021-08-31 17:29:26 -07:00
parent aefd8cde25
commit c88858e43f
3 changed files with 62 additions and 50 deletions

View File

@ -45,20 +45,22 @@ pub struct UserAuth {
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct InfluxDBConfig {
pub url: String,
pub auth: Option<UserAuth>,
pub db_name: String,
pub measurement: String,
#[serde(rename_all = "kebab-case", tag = "type")]
pub enum Database {
#[serde(rename_all = "camelCase")]
Influxdb {
url: String,
auth: Option<UserAuth>,
db_name: String,
measurement: String,
},
}
#[derive(Debug, Deserialize, Clone)]
#[serde(rename_all = "camelCase", tag = "type")]
#[allow(non_camel_case_types)]
#[serde(rename_all = "kebab-case", tag = "type")]
pub enum Payload {
#[serde(rename_all = "camelCase")]
json {
Json {
value_field_path: String,
timestamp_field_path: Option<String>,
},
@ -79,7 +81,7 @@ pub struct Mapping {
pub struct Config {
pub log_level: Option<LevelFilter>,
pub mqtt: MqttConfig,
pub database: InfluxDBConfig,
pub databases: Vec<Database>,
pub mappings: Vec<Mapping>,
}

View File

@ -1,7 +1,7 @@
#[macro_use]
extern crate log;
use config::{Config, InfluxDBConfig, MqttAuth, MqttConfig, UserAuth};
use config::{Config, Database as ConfigDatabase, MqttAuth, MqttConfig, UserAuth};
use futures::TryFutureExt;
use influxdb::InfluxDbWriteable;
use influxdb::{Client as InfluxClient, Timestamp, Type};
@ -89,15 +89,19 @@ async fn init_mqtt(config: &MqttConfig) -> Result<(MqttAsyncClient, MqttEventLoo
Ok(MqttAsyncClient::new(options, 100))
}
fn init_db(config: &InfluxDBConfig) -> Result<Database, String> {
let mut client = InfluxClient::new(config.url.clone(), config.db_name.clone());
if let Some(UserAuth { username, password }) = &config.auth {
client = client.with_auth(username, password);
fn init_db(config: &ConfigDatabase) -> Result<Database, String> {
match config {
ConfigDatabase::Influxdb { url, auth, db_name, measurement } => {
let mut client = InfluxClient::new(url.clone(), db_name.clone());
if let Some(UserAuth { username, password }) = auth {
client = client.with_auth(username, password);
}
Ok(Database {
client,
measurement: measurement.clone(),
})
}
}
Ok(Database {
client,
measurement: config.measurement.clone(),
})
}
async fn init_subscriptions(
@ -121,7 +125,7 @@ async fn init_subscriptions(
async fn handle_publish(
publish: &Publish,
mapping: Arc<Mapping>,
database: Arc<Database>,
databases: Arc<Vec<Database>>,
) -> Result<(), String> {
debug!("Got publish: {:?}; {:?}", publish, publish.payload);
@ -170,25 +174,28 @@ async fn handle_publish(
.unwrap()
.as_millis()
);
let mut query = Timestamp::Milliseconds(timestamp)
.into_query(&database.measurement)
.add_field(&field_name, influx_value);
for tag in mapping.tags.iter() {
let value = match &tag.1 {
TagValue::Literal(v) => v.clone(),
TagValue::InterpolatedStr(interp) => Type::Text(interp.interpolate(&reference_values)?),
};
query = query.add_tag(&tag.0, value);
}
use tokio_compat_02::FutureExt;
database
.client
.query(&query)
.compat()
.await
.map_err(|err| format!("Failed to write to DB: {}", err))?;
debug!("wrote to influx: {:?}", query);
for database in databases.iter() {
let mut query = Timestamp::Milliseconds(timestamp)
.into_query(&database.measurement)
.add_field(&field_name, influx_value.clone());
for tag in mapping.tags.iter() {
let value = match &tag.1 {
TagValue::Literal(v) => v.clone(),
TagValue::InterpolatedStr(interp) => Type::Text(interp.interpolate(&reference_values)?),
};
query = query.add_tag(&tag.0, value);
}
use tokio_compat_02::FutureExt;
database
.client
.query(&query)
.compat()
.await
.map_err(|err| format!("Failed to write to DB: {}", err))?;
debug!("wrote to influx: {:?}", query);
}
Ok(())
}
@ -216,17 +223,20 @@ fn find_mapping<'a>(mappings: &'a Vec<Arc<Mapping>>, topic: &String) -> Option<&
async fn run_event_loop(
mut event_loop: MqttEventLoop,
mappings: &Vec<Arc<Mapping>>,
database: Arc<Database>,
mappings: Vec<Mapping>,
databases: Vec<Database>,
) {
let mappings = mappings.into_iter().map(Arc::new).collect();
let databases = Arc::new(databases);
loop {
match event_loop.poll().await {
Ok(Event::Incoming(Packet::Publish(publish))) => {
if let Some(mapping) = find_mapping(&mappings, &publish.topic) {
let mapping = Arc::clone(mapping);
let database = Arc::clone(&database);
let databases = Arc::clone(&databases);
tokio::spawn(async move {
if let Err(err) = handle_publish(&publish, mapping, database).await {
if let Err(err) = handle_publish(&publish, mapping, databases).await {
warn!("{}", err);
}
});
@ -256,14 +266,11 @@ async fn main() -> Result<(), String> {
}
logger_builder.init();
let mappings: Vec<Arc<Mapping>> = config
let mappings: Vec<Mapping> = config
.mappings
.iter()
.map(Mapping::try_from)
.collect::<Result<Vec<Mapping>, String>>()?
.into_iter()
.map(Arc::new)
.collect();
.collect::<Result<Vec<Mapping>, String>>()?;
let (mut mqtt_client, mqtt_event_loop) = init_mqtt(&config.mqtt).await?;
init_subscriptions(
@ -276,9 +283,12 @@ async fn main() -> Result<(), String> {
)
.await?;
let database = Arc::new(init_db(&config.database)?);
let databases = config.databases
.iter()
.map(init_db)
.collect::<Result<Vec<Database>, String>>()?;
run_event_loop(mqtt_event_loop, &mappings, database).await;
run_event_loop(mqtt_event_loop, mappings, databases).await;
Ok(())
}

View File

@ -113,7 +113,7 @@ impl TryFrom<&ConfigMapping> for Mapping {
let payload = match &mapping.payload {
None => Payload::Raw,
Some(ConfigPayload::json { value_field_path, timestamp_field_path }) => {
Some(ConfigPayload::Json { value_field_path, timestamp_field_path }) => {
let value_field_selector = Selector::new(&value_field_path)
.map_err(|err| format!("Value field path '{}' is invalid: {}'", value_field_path, err))?;
let timestamp_field_selector = timestamp_field_path.as_ref()