Use JSON path expressions for JSON payload support

This commit is contained in:
Brian Tarricone 2021-08-30 23:41:02 -07:00
parent 2984561a9a
commit 45a3bb3c50
5 changed files with 231 additions and 61 deletions

193
Cargo.lock generated
View File

@ -2,6 +2,21 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "addr2line"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e61f2b7f93d2c7d2b08263acaa4a363b3e276806c68af6134c44f523bf1aacd"
dependencies = [
"gimli",
]
[[package]]
name = "adler"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "aead"
version = "0.3.2"
@ -245,8 +260,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44318e776df68115a881de9a8fd1b9e53368d7a4a5ce4cc48517da3393233a5e"
dependencies = [
"proc-macro2",
"quote",
"syn",
"quote 1.0.9",
"syn 1.0.75",
]
[[package]]
@ -272,6 +287,21 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
[[package]]
name = "backtrace"
version = "0.3.61"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7a905d892734eea339e896738c14b9afce22b5318f64b951e70bf3844419b01"
dependencies = [
"addr2line",
"cc",
"cfg-if",
"libc",
"miniz_oxide",
"object",
"rustc-demangle",
]
[[package]]
name = "base-x"
version = "0.2.8"
@ -497,8 +527,8 @@ version = "0.1.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e98e2ad1a782e33928b96fc3948e7c355e5af34ba4de7670fe8bac2a3b2006d"
dependencies = [
"quote",
"syn",
"quote 1.0.9",
"syn 1.0.75",
]
[[package]]
@ -568,6 +598,15 @@ dependencies = [
"termcolor",
]
[[package]]
name = "error-chain"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff511d5dc435d703f4971bc399647c9bc38e20cb41452e3b9feb4765419ed3f3"
dependencies = [
"backtrace",
]
[[package]]
name = "event-listener"
version = "2.5.1"
@ -671,8 +710,8 @@ dependencies = [
"autocfg",
"proc-macro-hack",
"proc-macro2",
"quote",
"syn",
"quote 1.0.9",
"syn 1.0.75",
]
[[package]]
@ -750,6 +789,12 @@ dependencies = [
"polyval",
]
[[package]]
name = "gimli"
version = "0.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0a01e0497841a3b2db4f8afa483cce65f7e96a3498bd6c541734792aeac8fe7"
[[package]]
name = "gloo-timers"
version = "0.2.1"
@ -913,8 +958,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "321345ebf687827f6254b36bb3077f00bdee36f609905d1e65d5905cc92156ab"
dependencies = [
"proc-macro2",
"quote",
"syn",
"quote 1.0.9",
"syn 1.0.75",
]
[[package]]
@ -941,6 +986,19 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "jsonpath"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8061db09019f1879ba586685694fe18279f597b1b3a9dd308f35e596be6cdf7d"
dependencies = [
"error-chain",
"pest",
"pest_derive",
"serde",
"serde_json",
]
[[package]]
name = "kv-log-macro"
version = "1.0.7"
@ -1020,6 +1078,16 @@ dependencies = [
"unicase",
]
[[package]]
name = "miniz_oxide"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a92518e98c078586bc6c934028adcca4c92a53d6a958196de835170a01d84e4b"
dependencies = [
"adler",
"autocfg",
]
[[package]]
name = "mio"
version = "0.7.13"
@ -1051,6 +1119,7 @@ dependencies = [
"env_logger",
"futures",
"influxdb",
"jsonpath",
"lazy_static",
"log",
"regex",
@ -1120,6 +1189,15 @@ dependencies = [
"libc",
]
[[package]]
name = "object"
version = "0.26.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39f37e50073ccad23b6d09bcb5b263f4e76d3bb6038e4a3c08e52162ffa8abc2"
dependencies = [
"memchr",
]
[[package]]
name = "once_cell"
version = "1.8.0"
@ -1144,6 +1222,23 @@ version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
[[package]]
name = "pest"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fce5d8b5cc33983fc74f78ad552b5522ab41442c4ca91606e4236eb4b5ceefc"
[[package]]
name = "pest_derive"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca3294f437119209b084c797604295f40227cffa35c57220b1e99a6ff3bf8ee4"
dependencies = [
"pest",
"quote 0.3.15",
"syn 0.11.11",
]
[[package]]
name = "pin-project"
version = "1.0.8"
@ -1160,8 +1255,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389"
dependencies = [
"proc-macro2",
"quote",
"syn",
"quote 1.0.9",
"syn 1.0.75",
]
[[package]]
@ -1236,9 +1331,15 @@ version = "1.0.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c7ed8b8c7b886ea3ed7dde405212185f423ab44682667c8c6dd14aa1d9f6612"
dependencies = [
"unicode-xid",
"unicode-xid 0.2.2",
]
[[package]]
name = "quote"
version = "0.3.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a6e920b65c65f10b2ae65c831a81a073a89edd28c7cce89475bff467ab4167a"
[[package]]
name = "quote"
version = "1.0.9"
@ -1379,6 +1480,12 @@ dependencies = [
"webpki",
]
[[package]]
name = "rustc-demangle"
version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342"
[[package]]
name = "rustc_version"
version = "0.2.3"
@ -1461,8 +1568,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7bc1a1ab1961464eae040d96713baa5a724a8152c1222492465b54322ec508b"
dependencies = [
"proc-macro2",
"quote",
"syn",
"quote 1.0.9",
"syn 1.0.75",
]
[[package]]
@ -1603,10 +1710,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c87a60a40fccc84bef0652345bbbbbe20a605bf5d0ce81719fc476f5c03b50ef"
dependencies = [
"proc-macro2",
"quote",
"quote 1.0.9",
"serde",
"serde_derive",
"syn",
"syn 1.0.75",
]
[[package]]
@ -1617,12 +1724,12 @@ checksum = "58fa5ff6ad0d98d1ffa8cb115892b6e69d67799f6763e162a1c9db421dc22e11"
dependencies = [
"base-x",
"proc-macro2",
"quote",
"quote 1.0.9",
"serde",
"serde_derive",
"serde_json",
"sha1",
"syn",
"syn 1.0.75",
]
[[package]]
@ -1665,6 +1772,17 @@ dependencies = [
"serde_json",
]
[[package]]
name = "syn"
version = "0.11.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3b891b9015c88c576343b9b3e41c2c11a51c219ef067b264bd9c8aa9b441dad"
dependencies = [
"quote 0.3.15",
"synom",
"unicode-xid 0.0.4",
]
[[package]]
name = "syn"
version = "1.0.75"
@ -1672,8 +1790,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7f58f7e8eaa0009c5fec437aabf511bd9933e4b2d7407bd05273c01a8906ea7"
dependencies = [
"proc-macro2",
"quote",
"unicode-xid",
"quote 1.0.9",
"unicode-xid 0.2.2",
]
[[package]]
name = "synom"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a393066ed9010ebaed60b9eafa373d4b1baac186dd7e008555b0f702b51945b6"
dependencies = [
"unicode-xid 0.0.4",
]
[[package]]
@ -1710,8 +1837,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa3884228611f5cd3608e2d409bf7dce832e4eb3135e3f11addbd7e41bd68e71"
dependencies = [
"proc-macro2",
"quote",
"syn",
"quote 1.0.9",
"syn 1.0.75",
]
[[package]]
@ -1757,9 +1884,9 @@ checksum = "fd3c141a1b43194f3f56a1411225df8646c55781d5f26db825b3d98507eb482f"
dependencies = [
"proc-macro-hack",
"proc-macro2",
"quote",
"quote 1.0.9",
"standback",
"syn",
"syn 1.0.75",
]
[[package]]
@ -1827,8 +1954,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54473be61f4ebe4efd09cec9bd5d16fa51d70ea0192213d754d2d500457db110"
dependencies = [
"proc-macro2",
"quote",
"syn",
"quote 1.0.9",
"syn 1.0.75",
]
[[package]]
@ -1889,6 +2016,12 @@ version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9337591893a19b88d8d87f2cec1e73fad5cdfd10e5a6f349f498ad6ea2ffb1e3"
[[package]]
name = "unicode-xid"
version = "0.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c1f860d7d29cf02cb2f3f359fd35991af3d30bac52c57d265a3c461074cb4dc"
[[package]]
name = "unicode-xid"
version = "0.2.2"
@ -1984,8 +2117,8 @@ dependencies = [
"lazy_static",
"log",
"proc-macro2",
"quote",
"syn",
"quote 1.0.9",
"syn 1.0.75",
"wasm-bindgen-shared",
]
@ -2007,7 +2140,7 @@ version = "0.2.76"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44468aa53335841d9d6b6c023eaab07c0cd4bddbcfdee3e2bb1e8d2cb8069fef"
dependencies = [
"quote",
"quote 1.0.9",
"wasm-bindgen-macro-support",
]
@ -2018,8 +2151,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0195807922713af1e67dc66132c7328206ed9766af3858164fb583eedc25fbad"
dependencies = [
"proc-macro2",
"quote",
"syn",
"quote 1.0.9",
"syn 1.0.75",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]

View File

@ -10,6 +10,7 @@ clap = "2.33"
env_logger = "0.9"
futures = "0.3"
influxdb = { version = "0.4", default-features = false, features = ["derive", "use-serde", "h1-client-rustls"] }
jsonpath = "0.1"
lazy_static = "1"
log = { version = "0.4", features = ["std", "serde"] }
regex = "1"

View File

@ -59,8 +59,8 @@ pub struct InfluxDBConfig {
pub enum Payload {
#[serde(rename_all = "camelCase")]
json {
value_field_name: String,
timestamp_field_name: Option<String>,
value_field_path: String,
timestamp_field_path: Option<String>,
},
}

View File

@ -1,12 +1,12 @@
#[macro_use]
extern crate log;
use config::{Config, InfluxDBConfig, MqttAuth, MqttConfig, Payload, UserAuth};
use config::{Config, InfluxDBConfig, MqttAuth, MqttConfig, UserAuth};
use futures::TryFutureExt;
use influxdb::InfluxDbWriteable;
use influxdb::{Client as InfluxClient, Timestamp, Type};
use log::LevelFilter;
use mapping::{Mapping, TagValue, TopicLevel};
use mapping::{Mapping, Payload, TagValue, TopicLevel};
use rumqttc::{
AsyncClient as MqttAsyncClient, Event, EventLoop as MqttEventLoop, Key, MqttOptions, Packet,
Publish, QoS, SubscribeFilter, TlsConfiguration, Transport,
@ -140,29 +140,29 @@ async fn handle_publish(
let payload = String::from_utf8(Vec::from(publish.payload.as_ref()))
.map_err(|err| format!("Invalid payload value: {}", err))?;
let (influx_value, timestamp) = match &mapping.payload {
None => (payload.to_influx_type(mapping.value_type)?, None),
Some(Payload::json { value_field_name, timestamp_field_name }) => {
let payload_root: JsonValue = serde_json::from_str(&payload).map_err(|err| format!("Failed to parse payload as JSON: {}", err))?;
match payload_root {
JsonValue::Object(mut map) => map
.remove(value_field_name)
.ok_or_else(|| format!("Missing field '{}' in payload for '{}'", value_field_name, publish.topic))
.and_then(|value| value.to_influx_type(mapping.value_type))
.and_then(|influx_value| timestamp_field_name
.as_ref()
.and_then(|tsf| map
.remove(tsf)
.map(|timestamp_value| timestamp_value
.as_u64()
.map(|ts| ts as u128)
.ok_or_else(|| format!("'{}' cannot be converted to timestamp", timestamp_value))
)
)
.transpose()
.map(|ts| (influx_value, ts))
)?,
_ => return Err(format!("Payload for {} was not a JSON object", publish.topic)),
}
Payload::Raw => (payload.to_influx_type(mapping.value_type)?, None),
Payload::Json { value_field_selector, timestamp_field_selector } => {
let payload_root: JsonValue = serde_json::from_str(&payload)
.map_err(|err| format!("Failed to parse payload as JSON: {}", err))?;
let influx_value = value_field_selector
.find(&payload_root)
.next()
.ok_or_else(|| format!("Couldn't find value in payload on topic {}", publish.topic))
.and_then(|value| value.to_influx_type(mapping.value_type))?;
let timestamp = timestamp_field_selector
.as_ref()
.map(|selector| selector
.find(&payload_root)
.next()
.ok_or_else(|| format!("Couldn't find timestamp in payload on topic {}", publish.topic))
.and_then(|ts_value| ts_value
.as_u64()
.map(|ts| ts as u128)
.ok_or_else(|| format!("'{}' cannot be converted to a timestamp", ts_value))
)
)
.transpose()?;
(influx_value, timestamp)
},
};

View File

@ -1,7 +1,8 @@
use influxdb::Type;
use std::convert::TryFrom;
use jsonpath::Selector;
use std::{convert::TryFrom, fmt};
use crate::config::{Mapping as ConfigMapping, Payload, TagValue as ConfigTagValue};
use crate::config::{Mapping as ConfigMapping, Payload as ConfigPayload, TagValue as ConfigTagValue};
use crate::interpolate::{InterpolatedName, InterpolatedNamePart};
use crate::value::{ToInfluxType, ValueType};
@ -50,10 +51,28 @@ impl TryFrom<&ConfigTagValue> for TagValue {
}
}
pub enum Payload {
Raw,
Json {
value_field_selector: Selector,
timestamp_field_selector: Option<Selector>,
},
}
impl fmt::Debug for Payload {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
use Payload::*;
match self {
Raw => write!(f, "Raw"),
Json { .. } => write!(f, "Json {{ ... }}"),
}
}
}
#[derive(Debug)]
pub struct Mapping {
pub topic: Vec<TopicLevel>,
pub payload: Option<Payload>,
pub payload: Payload,
pub field_name: InterpolatedName,
pub value_type: ValueType,
pub tags: Vec<(String, TagValue)>,
@ -92,6 +111,23 @@ impl TryFrom<&ConfigMapping> for Mapping {
Err(err) => Err(err),
}?;
let payload = match &mapping.payload {
None => Payload::Raw,
Some(ConfigPayload::json { value_field_path, timestamp_field_path }) => {
let value_field_selector = Selector::new(&value_field_path)
.map_err(|err| format!("Value field path '{}' is invalid: {}'", value_field_path, err))?;
let timestamp_field_selector = timestamp_field_path.as_ref()
.map(|path| Selector::new(path)
.map_err(|err| format!("Timestamp field path '{}' is invalid: {}'", path, err))
)
.transpose()?;
Payload::Json {
value_field_selector,
timestamp_field_selector,
}
}
};
let tags = mapping
.tags
.iter()
@ -109,7 +145,7 @@ impl TryFrom<&ConfigMapping> for Mapping {
Ok(Mapping {
topic,
payload: mapping.payload.as_ref().map(|p| p.clone()),
payload,
field_name,
value_type: mapping.value_type,
tags,