diff --git a/Cargo.lock b/Cargo.lock index 2c5ec58..2137098 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1056,6 +1056,7 @@ dependencies = [ "regex", "rumqttc", "serde", + "serde_json", "serde_yaml", "tokio 1.10.1", "tokio-compat-02", diff --git a/Cargo.toml b/Cargo.toml index f6e5acc..3eb5b9a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ log = { version = "0.4", features = ["std", "serde"] } regex = "1" rumqttc = "0.8" serde = { version = "1", features = ["derive"] } +serde_json = "1" serde_yaml = "0.8" tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "rt", "rt-multi-thread"] } tokio-compat-02 = "0.2" diff --git a/src/config.rs b/src/config.rs index a7e67b3..7e4327d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -53,10 +53,22 @@ pub struct InfluxDBConfig { pub measurement: String, } +#[derive(Debug, Deserialize, Clone)] +#[serde(rename_all = "camelCase", tag = "type")] +#[allow(non_camel_case_types)] +pub enum Payload { + #[serde(rename_all = "camelCase")] + json { + value_field_name: String, + timestamp_field_name: Option, + }, +} + #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Mapping { pub topic: String, + pub payload: Option, pub field_name: String, pub value_type: ValueType, pub tags: HashMap, diff --git a/src/main.rs b/src/main.rs index b1579fd..8461906 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,7 @@ #[macro_use] extern crate log; -use config::{Config, InfluxDBConfig, MqttAuth, MqttConfig, UserAuth}; +use config::{Config, InfluxDBConfig, MqttAuth, MqttConfig, Payload, UserAuth}; use futures::TryFutureExt; use influxdb::InfluxDbWriteable; use influxdb::{Client as InfluxClient, Timestamp, Type}; @@ -11,6 +11,7 @@ use rumqttc::{ AsyncClient as MqttAsyncClient, Event, EventLoop as MqttEventLoop, Key, MqttOptions, Packet, Publish, QoS, SubscribeFilter, TlsConfiguration, Transport, }; +use serde_json::Value as JsonValue; use std::convert::TryFrom; use std::env; use std::path::Path; @@ -18,6 +19,7 @@ use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use tokio::fs::File; use tokio::io::AsyncReadExt; +use value::ToInfluxType; mod config; mod interpolate; @@ -135,15 +137,41 @@ async fn handle_publish( .collect::>(); let field_name = mapping.field_name.interpolate(&reference_values)?; - let value = String::from_utf8(Vec::from(publish.payload.as_ref())) + let payload = String::from_utf8(Vec::from(publish.payload.as_ref())) .map_err(|err| format!("Invalid payload value: {}", err))?; - let influx_value = mapping.value_type.parse(&value)?; + let (influx_value, timestamp) = match &mapping.payload { + None => (payload.to_influx_type(mapping.value_type)?, None), + Some(Payload::json { value_field_name, timestamp_field_name }) => { + let payload_root: JsonValue = serde_json::from_str(&payload).map_err(|err| format!("Failed to parse payload as JSON: {}", err))?; + match payload_root { + JsonValue::Object(mut map) => map + .remove(value_field_name) + .ok_or_else(|| format!("Missing field '{}' in payload for '{}'", value_field_name, publish.topic)) + .and_then(|value| value.to_influx_type(mapping.value_type)) + .and_then(|influx_value| timestamp_field_name + .as_ref() + .and_then(|tsf| map + .remove(tsf) + .map(|timestamp_value| timestamp_value + .as_u64() + .map(|ts| ts as u128) + .ok_or_else(|| format!("'{}' cannot be converted to timestamp", timestamp_value)) + ) + ) + .transpose() + .map(|ts| (influx_value, ts)) + )?, + _ => return Err(format!("Payload for {} was not a JSON object", publish.topic)), + } + }, + }; - let now = SystemTime::now() + let timestamp = timestamp.unwrap_or_else(|| SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() - .as_millis(); - let mut query = Timestamp::Milliseconds(now) + .as_millis() + ); + let mut query = Timestamp::Milliseconds(timestamp) .into_query(&database.measurement) .add_field(&field_name, influx_value); for tag in mapping.tags.iter() { diff --git a/src/mapping.rs b/src/mapping.rs index 2328006..4fe6430 100644 --- a/src/mapping.rs +++ b/src/mapping.rs @@ -1,9 +1,9 @@ use influxdb::Type; use std::convert::TryFrom; -use crate::config::{Mapping as ConfigMapping, TagValue as ConfigTagValue}; +use crate::config::{Mapping as ConfigMapping, Payload, TagValue as ConfigTagValue}; use crate::interpolate::{InterpolatedName, InterpolatedNamePart}; -use crate::value::ValueType; +use crate::value::{ToInfluxType, ValueType}; #[derive(Clone, Debug, PartialEq)] pub enum TopicLevel { @@ -45,7 +45,7 @@ impl TryFrom<&ConfigTagValue> for TagValue { _ => Ok(TagValue::InterpolatedStr(interp)), } } - other => other.parse(&tag_value.value).map(TagValue::Literal), + other => tag_value.value.to_influx_type(other).map(TagValue::Literal), } } } @@ -53,6 +53,7 @@ impl TryFrom<&ConfigTagValue> for TagValue { #[derive(Debug)] pub struct Mapping { pub topic: Vec, + pub payload: Option, pub field_name: InterpolatedName, pub value_type: ValueType, pub tags: Vec<(String, TagValue)>, @@ -108,6 +109,7 @@ impl TryFrom<&ConfigMapping> for Mapping { Ok(Mapping { topic, + payload: mapping.payload.as_ref().map(|p| p.clone()), field_name, value_type: mapping.value_type, tags, @@ -135,6 +137,7 @@ mod test { fn mk_cfg_mapping(topic: &str) -> ConfigMapping { ConfigMapping { topic: topic.to_string(), + payload: None, field_name: "".to_string(), value_type: ValueType::Text, tags: HashMap::new(), diff --git a/src/value.rs b/src/value.rs index f2fcd82..9b75d01 100644 --- a/src/value.rs +++ b/src/value.rs @@ -1,5 +1,7 @@ use influxdb::Type; use serde::Deserialize; +use serde_json::Value as JsonValue; +use std::fmt; #[derive(Copy, Clone, Debug, Deserialize)] #[serde(rename_all = "kebab-case")] @@ -11,25 +13,68 @@ pub enum ValueType { Text, } -impl ValueType { - pub fn parse(self, value: &String) -> Result { - match self { - ValueType::Boolean if value == "true" => Ok(Type::Boolean(true)), - ValueType::Boolean if value == "false" => Ok(Type::Boolean(false)), - ValueType::Boolean => Err(format!("Value '{}' is not a valid boolean", value)), - ValueType::Float => value +impl fmt::Display for ValueType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + use ValueType::*; + let s = match self { + Boolean => "boolean", + Float => "float", + SignedInteger => "signed integer", + UnsignedInteger => "unsigned integer", + Text => "text", + }; + write!(f, "{}", s) + } +} + +pub trait ToInfluxType { + fn to_influx_type(&self, value_type: ValueType) -> Result; +} + +impl ToInfluxType for String { + fn to_influx_type(&self, value_type: ValueType) -> Result { + match value_type { + ValueType::Boolean if self == "true" => Ok(Type::Boolean(true)), + ValueType::Boolean if self == "false" => Ok(Type::Boolean(false)), + ValueType::Boolean => Err(format!("Value '{}' is not a valid boolean", self)), + ValueType::Float => self .parse::() .map(|v| Type::Float(v)) .map_err(|err| err.to_string()), - ValueType::SignedInteger => value + ValueType::SignedInteger => self .parse::() .map(|v| Type::SignedInteger(v)) .map_err(|err| err.to_string()), - ValueType::UnsignedInteger => value + ValueType::UnsignedInteger => self .parse::() .map(|v| Type::UnsignedInteger(v)) .map_err(|err| err.to_string()), - ValueType::Text => Ok(Type::Text(value.clone())), + ValueType::Text => Ok(Type::Text(self.clone())), + } + } +} + +impl ToInfluxType for JsonValue { + fn to_influx_type(&self, value_type: ValueType) -> Result { + match (value_type, self) { + (ValueType::Boolean, JsonValue::Bool(true)) => Ok(Type::Boolean(true)), + (ValueType::Boolean, JsonValue::Bool(false)) => Ok(Type::Boolean(false)), + (ValueType::Float, JsonValue::Number(num)) => num + .as_f64() + .ok_or_else(|| format!("Cannot be expressed as f64: {}", num)) + .map(|v| Type::Float(v)), + (ValueType::SignedInteger, JsonValue::Number(num)) => num + .as_i64() + .ok_or_else(|| format!("Cannot be expressed as i64:{}", num)) + .map(|v| Type::SignedInteger(v)), + (ValueType::UnsignedInteger, JsonValue::Number(num)) => num + .as_u64() + .ok_or_else(|| format!("Cannot be expressed as u64:{}", num)) + .map(|v| Type::UnsignedInteger(v)), + (ValueType::Text, JsonValue::String(s)) => Ok(Type::Text(s.to_string())), + (ValueType::Text, JsonValue::Bool(b)) => Ok(Type::Text(b.to_string())), + (ValueType::Text, JsonValue::Number(num)) => Ok(Type::Text(num.to_string())), + (other_type, other_value) => Err(format!("Unable to parse self from JSON; need type {} but got '{}'", other_type, other_value)), } } }