Support JSON in MQTT payloads
The imeptus for this is to allow embedding an event timestamp with the data, since MQTT doesn't tag messages with the time they were sent. So Instead of having a payload value like "some value", you could have: { "timestamp": 1630360029124, "value": "some value" } ... and then configure the mapping in mqtt2db to be a JSON mapping, with timestamp field name "timestamp" and value field name "value", and we just figure things out. If no mapping payload type is specified, we just assume it's a raw value as before.
This commit is contained in:
parent
0b95f5bf7c
commit
2984561a9a
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -1056,6 +1056,7 @@ dependencies = [
|
|||||||
"regex",
|
"regex",
|
||||||
"rumqttc",
|
"rumqttc",
|
||||||
"serde",
|
"serde",
|
||||||
|
"serde_json",
|
||||||
"serde_yaml",
|
"serde_yaml",
|
||||||
"tokio 1.10.1",
|
"tokio 1.10.1",
|
||||||
"tokio-compat-02",
|
"tokio-compat-02",
|
||||||
|
@ -15,6 +15,7 @@ log = { version = "0.4", features = ["std", "serde"] }
|
|||||||
regex = "1"
|
regex = "1"
|
||||||
rumqttc = "0.8"
|
rumqttc = "0.8"
|
||||||
serde = { version = "1", features = ["derive"] }
|
serde = { version = "1", features = ["derive"] }
|
||||||
|
serde_json = "1"
|
||||||
serde_yaml = "0.8"
|
serde_yaml = "0.8"
|
||||||
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "rt", "rt-multi-thread"] }
|
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "rt", "rt-multi-thread"] }
|
||||||
tokio-compat-02 = "0.2"
|
tokio-compat-02 = "0.2"
|
||||||
|
@ -53,10 +53,22 @@ pub struct InfluxDBConfig {
|
|||||||
pub measurement: String,
|
pub measurement: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize, Clone)]
|
||||||
|
#[serde(rename_all = "camelCase", tag = "type")]
|
||||||
|
#[allow(non_camel_case_types)]
|
||||||
|
pub enum Payload {
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
json {
|
||||||
|
value_field_name: String,
|
||||||
|
timestamp_field_name: Option<String>,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Deserialize)]
|
#[derive(Debug, Deserialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
pub struct Mapping {
|
pub struct Mapping {
|
||||||
pub topic: String,
|
pub topic: String,
|
||||||
|
pub payload: Option<Payload>,
|
||||||
pub field_name: String,
|
pub field_name: String,
|
||||||
pub value_type: ValueType,
|
pub value_type: ValueType,
|
||||||
pub tags: HashMap<String, TagValue>,
|
pub tags: HashMap<String, TagValue>,
|
||||||
|
40
src/main.rs
40
src/main.rs
@ -1,7 +1,7 @@
|
|||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate log;
|
extern crate log;
|
||||||
|
|
||||||
use config::{Config, InfluxDBConfig, MqttAuth, MqttConfig, UserAuth};
|
use config::{Config, InfluxDBConfig, MqttAuth, MqttConfig, Payload, UserAuth};
|
||||||
use futures::TryFutureExt;
|
use futures::TryFutureExt;
|
||||||
use influxdb::InfluxDbWriteable;
|
use influxdb::InfluxDbWriteable;
|
||||||
use influxdb::{Client as InfluxClient, Timestamp, Type};
|
use influxdb::{Client as InfluxClient, Timestamp, Type};
|
||||||
@ -11,6 +11,7 @@ use rumqttc::{
|
|||||||
AsyncClient as MqttAsyncClient, Event, EventLoop as MqttEventLoop, Key, MqttOptions, Packet,
|
AsyncClient as MqttAsyncClient, Event, EventLoop as MqttEventLoop, Key, MqttOptions, Packet,
|
||||||
Publish, QoS, SubscribeFilter, TlsConfiguration, Transport,
|
Publish, QoS, SubscribeFilter, TlsConfiguration, Transport,
|
||||||
};
|
};
|
||||||
|
use serde_json::Value as JsonValue;
|
||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
use std::env;
|
use std::env;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
@ -18,6 +19,7 @@ use std::sync::Arc;
|
|||||||
use std::time::{SystemTime, UNIX_EPOCH};
|
use std::time::{SystemTime, UNIX_EPOCH};
|
||||||
use tokio::fs::File;
|
use tokio::fs::File;
|
||||||
use tokio::io::AsyncReadExt;
|
use tokio::io::AsyncReadExt;
|
||||||
|
use value::ToInfluxType;
|
||||||
|
|
||||||
mod config;
|
mod config;
|
||||||
mod interpolate;
|
mod interpolate;
|
||||||
@ -135,15 +137,41 @@ async fn handle_publish(
|
|||||||
.collect::<Vec<&str>>();
|
.collect::<Vec<&str>>();
|
||||||
let field_name = mapping.field_name.interpolate(&reference_values)?;
|
let field_name = mapping.field_name.interpolate(&reference_values)?;
|
||||||
|
|
||||||
let value = String::from_utf8(Vec::from(publish.payload.as_ref()))
|
let payload = String::from_utf8(Vec::from(publish.payload.as_ref()))
|
||||||
.map_err(|err| format!("Invalid payload value: {}", err))?;
|
.map_err(|err| format!("Invalid payload value: {}", err))?;
|
||||||
let influx_value = mapping.value_type.parse(&value)?;
|
let (influx_value, timestamp) = match &mapping.payload {
|
||||||
|
None => (payload.to_influx_type(mapping.value_type)?, None),
|
||||||
|
Some(Payload::json { value_field_name, timestamp_field_name }) => {
|
||||||
|
let payload_root: JsonValue = serde_json::from_str(&payload).map_err(|err| format!("Failed to parse payload as JSON: {}", err))?;
|
||||||
|
match payload_root {
|
||||||
|
JsonValue::Object(mut map) => map
|
||||||
|
.remove(value_field_name)
|
||||||
|
.ok_or_else(|| format!("Missing field '{}' in payload for '{}'", value_field_name, publish.topic))
|
||||||
|
.and_then(|value| value.to_influx_type(mapping.value_type))
|
||||||
|
.and_then(|influx_value| timestamp_field_name
|
||||||
|
.as_ref()
|
||||||
|
.and_then(|tsf| map
|
||||||
|
.remove(tsf)
|
||||||
|
.map(|timestamp_value| timestamp_value
|
||||||
|
.as_u64()
|
||||||
|
.map(|ts| ts as u128)
|
||||||
|
.ok_or_else(|| format!("'{}' cannot be converted to timestamp", timestamp_value))
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.transpose()
|
||||||
|
.map(|ts| (influx_value, ts))
|
||||||
|
)?,
|
||||||
|
_ => return Err(format!("Payload for {} was not a JSON object", publish.topic)),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
let now = SystemTime::now()
|
let timestamp = timestamp.unwrap_or_else(|| SystemTime::now()
|
||||||
.duration_since(UNIX_EPOCH)
|
.duration_since(UNIX_EPOCH)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.as_millis();
|
.as_millis()
|
||||||
let mut query = Timestamp::Milliseconds(now)
|
);
|
||||||
|
let mut query = Timestamp::Milliseconds(timestamp)
|
||||||
.into_query(&database.measurement)
|
.into_query(&database.measurement)
|
||||||
.add_field(&field_name, influx_value);
|
.add_field(&field_name, influx_value);
|
||||||
for tag in mapping.tags.iter() {
|
for tag in mapping.tags.iter() {
|
||||||
|
@ -1,9 +1,9 @@
|
|||||||
use influxdb::Type;
|
use influxdb::Type;
|
||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
|
|
||||||
use crate::config::{Mapping as ConfigMapping, TagValue as ConfigTagValue};
|
use crate::config::{Mapping as ConfigMapping, Payload, TagValue as ConfigTagValue};
|
||||||
use crate::interpolate::{InterpolatedName, InterpolatedNamePart};
|
use crate::interpolate::{InterpolatedName, InterpolatedNamePart};
|
||||||
use crate::value::ValueType;
|
use crate::value::{ToInfluxType, ValueType};
|
||||||
|
|
||||||
#[derive(Clone, Debug, PartialEq)]
|
#[derive(Clone, Debug, PartialEq)]
|
||||||
pub enum TopicLevel {
|
pub enum TopicLevel {
|
||||||
@ -45,7 +45,7 @@ impl TryFrom<&ConfigTagValue> for TagValue {
|
|||||||
_ => Ok(TagValue::InterpolatedStr(interp)),
|
_ => Ok(TagValue::InterpolatedStr(interp)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
other => other.parse(&tag_value.value).map(TagValue::Literal),
|
other => tag_value.value.to_influx_type(other).map(TagValue::Literal),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -53,6 +53,7 @@ impl TryFrom<&ConfigTagValue> for TagValue {
|
|||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Mapping {
|
pub struct Mapping {
|
||||||
pub topic: Vec<TopicLevel>,
|
pub topic: Vec<TopicLevel>,
|
||||||
|
pub payload: Option<Payload>,
|
||||||
pub field_name: InterpolatedName,
|
pub field_name: InterpolatedName,
|
||||||
pub value_type: ValueType,
|
pub value_type: ValueType,
|
||||||
pub tags: Vec<(String, TagValue)>,
|
pub tags: Vec<(String, TagValue)>,
|
||||||
@ -108,6 +109,7 @@ impl TryFrom<&ConfigMapping> for Mapping {
|
|||||||
|
|
||||||
Ok(Mapping {
|
Ok(Mapping {
|
||||||
topic,
|
topic,
|
||||||
|
payload: mapping.payload.as_ref().map(|p| p.clone()),
|
||||||
field_name,
|
field_name,
|
||||||
value_type: mapping.value_type,
|
value_type: mapping.value_type,
|
||||||
tags,
|
tags,
|
||||||
@ -135,6 +137,7 @@ mod test {
|
|||||||
fn mk_cfg_mapping(topic: &str) -> ConfigMapping {
|
fn mk_cfg_mapping(topic: &str) -> ConfigMapping {
|
||||||
ConfigMapping {
|
ConfigMapping {
|
||||||
topic: topic.to_string(),
|
topic: topic.to_string(),
|
||||||
|
payload: None,
|
||||||
field_name: "".to_string(),
|
field_name: "".to_string(),
|
||||||
value_type: ValueType::Text,
|
value_type: ValueType::Text,
|
||||||
tags: HashMap::new(),
|
tags: HashMap::new(),
|
||||||
|
65
src/value.rs
65
src/value.rs
@ -1,5 +1,7 @@
|
|||||||
use influxdb::Type;
|
use influxdb::Type;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
|
use serde_json::Value as JsonValue;
|
||||||
|
use std::fmt;
|
||||||
|
|
||||||
#[derive(Copy, Clone, Debug, Deserialize)]
|
#[derive(Copy, Clone, Debug, Deserialize)]
|
||||||
#[serde(rename_all = "kebab-case")]
|
#[serde(rename_all = "kebab-case")]
|
||||||
@ -11,25 +13,68 @@ pub enum ValueType {
|
|||||||
Text,
|
Text,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ValueType {
|
impl fmt::Display for ValueType {
|
||||||
pub fn parse(self, value: &String) -> Result<Type, String> {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
match self {
|
use ValueType::*;
|
||||||
ValueType::Boolean if value == "true" => Ok(Type::Boolean(true)),
|
let s = match self {
|
||||||
ValueType::Boolean if value == "false" => Ok(Type::Boolean(false)),
|
Boolean => "boolean",
|
||||||
ValueType::Boolean => Err(format!("Value '{}' is not a valid boolean", value)),
|
Float => "float",
|
||||||
ValueType::Float => value
|
SignedInteger => "signed integer",
|
||||||
|
UnsignedInteger => "unsigned integer",
|
||||||
|
Text => "text",
|
||||||
|
};
|
||||||
|
write!(f, "{}", s)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait ToInfluxType {
|
||||||
|
fn to_influx_type(&self, value_type: ValueType) -> Result<Type, String>;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ToInfluxType for String {
|
||||||
|
fn to_influx_type(&self, value_type: ValueType) -> Result<Type, String> {
|
||||||
|
match value_type {
|
||||||
|
ValueType::Boolean if self == "true" => Ok(Type::Boolean(true)),
|
||||||
|
ValueType::Boolean if self == "false" => Ok(Type::Boolean(false)),
|
||||||
|
ValueType::Boolean => Err(format!("Value '{}' is not a valid boolean", self)),
|
||||||
|
ValueType::Float => self
|
||||||
.parse::<f64>()
|
.parse::<f64>()
|
||||||
.map(|v| Type::Float(v))
|
.map(|v| Type::Float(v))
|
||||||
.map_err(|err| err.to_string()),
|
.map_err(|err| err.to_string()),
|
||||||
ValueType::SignedInteger => value
|
ValueType::SignedInteger => self
|
||||||
.parse::<i64>()
|
.parse::<i64>()
|
||||||
.map(|v| Type::SignedInteger(v))
|
.map(|v| Type::SignedInteger(v))
|
||||||
.map_err(|err| err.to_string()),
|
.map_err(|err| err.to_string()),
|
||||||
ValueType::UnsignedInteger => value
|
ValueType::UnsignedInteger => self
|
||||||
.parse::<u64>()
|
.parse::<u64>()
|
||||||
.map(|v| Type::UnsignedInteger(v))
|
.map(|v| Type::UnsignedInteger(v))
|
||||||
.map_err(|err| err.to_string()),
|
.map_err(|err| err.to_string()),
|
||||||
ValueType::Text => Ok(Type::Text(value.clone())),
|
ValueType::Text => Ok(Type::Text(self.clone())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ToInfluxType for JsonValue {
|
||||||
|
fn to_influx_type(&self, value_type: ValueType) -> Result<Type, String> {
|
||||||
|
match (value_type, self) {
|
||||||
|
(ValueType::Boolean, JsonValue::Bool(true)) => Ok(Type::Boolean(true)),
|
||||||
|
(ValueType::Boolean, JsonValue::Bool(false)) => Ok(Type::Boolean(false)),
|
||||||
|
(ValueType::Float, JsonValue::Number(num)) => num
|
||||||
|
.as_f64()
|
||||||
|
.ok_or_else(|| format!("Cannot be expressed as f64: {}", num))
|
||||||
|
.map(|v| Type::Float(v)),
|
||||||
|
(ValueType::SignedInteger, JsonValue::Number(num)) => num
|
||||||
|
.as_i64()
|
||||||
|
.ok_or_else(|| format!("Cannot be expressed as i64:{}", num))
|
||||||
|
.map(|v| Type::SignedInteger(v)),
|
||||||
|
(ValueType::UnsignedInteger, JsonValue::Number(num)) => num
|
||||||
|
.as_u64()
|
||||||
|
.ok_or_else(|| format!("Cannot be expressed as u64:{}", num))
|
||||||
|
.map(|v| Type::UnsignedInteger(v)),
|
||||||
|
(ValueType::Text, JsonValue::String(s)) => Ok(Type::Text(s.to_string())),
|
||||||
|
(ValueType::Text, JsonValue::Bool(b)) => Ok(Type::Text(b.to_string())),
|
||||||
|
(ValueType::Text, JsonValue::Number(num)) => Ok(Type::Text(num.to_string())),
|
||||||
|
(other_type, other_value) => Err(format!("Unable to parse self from JSON; need type {} but got '{}'", other_type, other_value)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user