diff --git a/Cargo.lock b/Cargo.lock index 2137098..57a3aaa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,21 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "addr2line" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e61f2b7f93d2c7d2b08263acaa4a363b3e276806c68af6134c44f523bf1aacd" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + [[package]] name = "aead" version = "0.3.2" @@ -245,8 +260,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "44318e776df68115a881de9a8fd1b9e53368d7a4a5ce4cc48517da3393233a5e" dependencies = [ "proc-macro2", - "quote", - "syn", + "quote 1.0.9", + "syn 1.0.75", ] [[package]] @@ -272,6 +287,21 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" +[[package]] +name = "backtrace" +version = "0.3.61" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7a905d892734eea339e896738c14b9afce22b5318f64b951e70bf3844419b01" +dependencies = [ + "addr2line", + "cc", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + [[package]] name = "base-x" version = "0.2.8" @@ -497,8 +527,8 @@ version = "0.1.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e98e2ad1a782e33928b96fc3948e7c355e5af34ba4de7670fe8bac2a3b2006d" dependencies = [ - "quote", - "syn", + "quote 1.0.9", + "syn 1.0.75", ] [[package]] @@ -568,6 +598,15 @@ dependencies = [ "termcolor", ] +[[package]] +name = "error-chain" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff511d5dc435d703f4971bc399647c9bc38e20cb41452e3b9feb4765419ed3f3" +dependencies = [ + "backtrace", +] + [[package]] name = "event-listener" version = "2.5.1" @@ -671,8 +710,8 @@ dependencies = [ "autocfg", "proc-macro-hack", "proc-macro2", - "quote", - "syn", + "quote 1.0.9", + "syn 1.0.75", ] [[package]] @@ -750,6 +789,12 @@ dependencies = [ "polyval", ] +[[package]] +name = "gimli" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0a01e0497841a3b2db4f8afa483cce65f7e96a3498bd6c541734792aeac8fe7" + [[package]] name = "gloo-timers" version = "0.2.1" @@ -913,8 +958,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "321345ebf687827f6254b36bb3077f00bdee36f609905d1e65d5905cc92156ab" dependencies = [ "proc-macro2", - "quote", - "syn", + "quote 1.0.9", + "syn 1.0.75", ] [[package]] @@ -941,6 +986,19 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "jsonpath" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8061db09019f1879ba586685694fe18279f597b1b3a9dd308f35e596be6cdf7d" +dependencies = [ + "error-chain", + "pest", + "pest_derive", + "serde", + "serde_json", +] + [[package]] name = "kv-log-macro" version = "1.0.7" @@ -1020,6 +1078,16 @@ dependencies = [ "unicase", ] +[[package]] +name = "miniz_oxide" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a92518e98c078586bc6c934028adcca4c92a53d6a958196de835170a01d84e4b" +dependencies = [ + "adler", + "autocfg", +] + [[package]] name = "mio" version = "0.7.13" @@ -1051,6 +1119,7 @@ dependencies = [ "env_logger", "futures", "influxdb", + "jsonpath", "lazy_static", "log", "regex", @@ -1120,6 +1189,15 @@ dependencies = [ "libc", ] +[[package]] +name = "object" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39f37e50073ccad23b6d09bcb5b263f4e76d3bb6038e4a3c08e52162ffa8abc2" +dependencies = [ + "memchr", +] + [[package]] name = "once_cell" version = "1.8.0" @@ -1144,6 +1222,23 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +[[package]] +name = "pest" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fce5d8b5cc33983fc74f78ad552b5522ab41442c4ca91606e4236eb4b5ceefc" + +[[package]] +name = "pest_derive" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3294f437119209b084c797604295f40227cffa35c57220b1e99a6ff3bf8ee4" +dependencies = [ + "pest", + "quote 0.3.15", + "syn 0.11.11", +] + [[package]] name = "pin-project" version = "1.0.8" @@ -1160,8 +1255,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389" dependencies = [ "proc-macro2", - "quote", - "syn", + "quote 1.0.9", + "syn 1.0.75", ] [[package]] @@ -1236,9 +1331,15 @@ version = "1.0.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c7ed8b8c7b886ea3ed7dde405212185f423ab44682667c8c6dd14aa1d9f6612" dependencies = [ - "unicode-xid", + "unicode-xid 0.2.2", ] +[[package]] +name = "quote" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a6e920b65c65f10b2ae65c831a81a073a89edd28c7cce89475bff467ab4167a" + [[package]] name = "quote" version = "1.0.9" @@ -1379,6 +1480,12 @@ dependencies = [ "webpki", ] +[[package]] +name = "rustc-demangle" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342" + [[package]] name = "rustc_version" version = "0.2.3" @@ -1461,8 +1568,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7bc1a1ab1961464eae040d96713baa5a724a8152c1222492465b54322ec508b" dependencies = [ "proc-macro2", - "quote", - "syn", + "quote 1.0.9", + "syn 1.0.75", ] [[package]] @@ -1603,10 +1710,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c87a60a40fccc84bef0652345bbbbbe20a605bf5d0ce81719fc476f5c03b50ef" dependencies = [ "proc-macro2", - "quote", + "quote 1.0.9", "serde", "serde_derive", - "syn", + "syn 1.0.75", ] [[package]] @@ -1617,12 +1724,12 @@ checksum = "58fa5ff6ad0d98d1ffa8cb115892b6e69d67799f6763e162a1c9db421dc22e11" dependencies = [ "base-x", "proc-macro2", - "quote", + "quote 1.0.9", "serde", "serde_derive", "serde_json", "sha1", - "syn", + "syn 1.0.75", ] [[package]] @@ -1665,6 +1772,17 @@ dependencies = [ "serde_json", ] +[[package]] +name = "syn" +version = "0.11.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3b891b9015c88c576343b9b3e41c2c11a51c219ef067b264bd9c8aa9b441dad" +dependencies = [ + "quote 0.3.15", + "synom", + "unicode-xid 0.0.4", +] + [[package]] name = "syn" version = "1.0.75" @@ -1672,8 +1790,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b7f58f7e8eaa0009c5fec437aabf511bd9933e4b2d7407bd05273c01a8906ea7" dependencies = [ "proc-macro2", - "quote", - "unicode-xid", + "quote 1.0.9", + "unicode-xid 0.2.2", +] + +[[package]] +name = "synom" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a393066ed9010ebaed60b9eafa373d4b1baac186dd7e008555b0f702b51945b6" +dependencies = [ + "unicode-xid 0.0.4", ] [[package]] @@ -1710,8 +1837,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fa3884228611f5cd3608e2d409bf7dce832e4eb3135e3f11addbd7e41bd68e71" dependencies = [ "proc-macro2", - "quote", - "syn", + "quote 1.0.9", + "syn 1.0.75", ] [[package]] @@ -1757,9 +1884,9 @@ checksum = "fd3c141a1b43194f3f56a1411225df8646c55781d5f26db825b3d98507eb482f" dependencies = [ "proc-macro-hack", "proc-macro2", - "quote", + "quote 1.0.9", "standback", - "syn", + "syn 1.0.75", ] [[package]] @@ -1827,8 +1954,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "54473be61f4ebe4efd09cec9bd5d16fa51d70ea0192213d754d2d500457db110" dependencies = [ "proc-macro2", - "quote", - "syn", + "quote 1.0.9", + "syn 1.0.75", ] [[package]] @@ -1889,6 +2016,12 @@ version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9337591893a19b88d8d87f2cec1e73fad5cdfd10e5a6f349f498ad6ea2ffb1e3" +[[package]] +name = "unicode-xid" +version = "0.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c1f860d7d29cf02cb2f3f359fd35991af3d30bac52c57d265a3c461074cb4dc" + [[package]] name = "unicode-xid" version = "0.2.2" @@ -1984,8 +2117,8 @@ dependencies = [ "lazy_static", "log", "proc-macro2", - "quote", - "syn", + "quote 1.0.9", + "syn 1.0.75", "wasm-bindgen-shared", ] @@ -2007,7 +2140,7 @@ version = "0.2.76" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "44468aa53335841d9d6b6c023eaab07c0cd4bddbcfdee3e2bb1e8d2cb8069fef" dependencies = [ - "quote", + "quote 1.0.9", "wasm-bindgen-macro-support", ] @@ -2018,8 +2151,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0195807922713af1e67dc66132c7328206ed9766af3858164fb583eedc25fbad" dependencies = [ "proc-macro2", - "quote", - "syn", + "quote 1.0.9", + "syn 1.0.75", "wasm-bindgen-backend", "wasm-bindgen-shared", ] diff --git a/Cargo.toml b/Cargo.toml index 3eb5b9a..826a9ee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ clap = "2.33" env_logger = "0.9" futures = "0.3" influxdb = { version = "0.4", default-features = false, features = ["derive", "use-serde", "h1-client-rustls"] } +jsonpath = "0.1" lazy_static = "1" log = { version = "0.4", features = ["std", "serde"] } regex = "1" diff --git a/src/config.rs b/src/config.rs index 7e4327d..56faadc 100644 --- a/src/config.rs +++ b/src/config.rs @@ -59,8 +59,8 @@ pub struct InfluxDBConfig { pub enum Payload { #[serde(rename_all = "camelCase")] json { - value_field_name: String, - timestamp_field_name: Option, + value_field_path: String, + timestamp_field_path: Option, }, } diff --git a/src/main.rs b/src/main.rs index 8461906..5148763 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,12 +1,12 @@ #[macro_use] extern crate log; -use config::{Config, InfluxDBConfig, MqttAuth, MqttConfig, Payload, UserAuth}; +use config::{Config, InfluxDBConfig, MqttAuth, MqttConfig, UserAuth}; use futures::TryFutureExt; use influxdb::InfluxDbWriteable; use influxdb::{Client as InfluxClient, Timestamp, Type}; use log::LevelFilter; -use mapping::{Mapping, TagValue, TopicLevel}; +use mapping::{Mapping, Payload, TagValue, TopicLevel}; use rumqttc::{ AsyncClient as MqttAsyncClient, Event, EventLoop as MqttEventLoop, Key, MqttOptions, Packet, Publish, QoS, SubscribeFilter, TlsConfiguration, Transport, @@ -140,29 +140,29 @@ async fn handle_publish( let payload = String::from_utf8(Vec::from(publish.payload.as_ref())) .map_err(|err| format!("Invalid payload value: {}", err))?; let (influx_value, timestamp) = match &mapping.payload { - None => (payload.to_influx_type(mapping.value_type)?, None), - Some(Payload::json { value_field_name, timestamp_field_name }) => { - let payload_root: JsonValue = serde_json::from_str(&payload).map_err(|err| format!("Failed to parse payload as JSON: {}", err))?; - match payload_root { - JsonValue::Object(mut map) => map - .remove(value_field_name) - .ok_or_else(|| format!("Missing field '{}' in payload for '{}'", value_field_name, publish.topic)) - .and_then(|value| value.to_influx_type(mapping.value_type)) - .and_then(|influx_value| timestamp_field_name - .as_ref() - .and_then(|tsf| map - .remove(tsf) - .map(|timestamp_value| timestamp_value - .as_u64() - .map(|ts| ts as u128) - .ok_or_else(|| format!("'{}' cannot be converted to timestamp", timestamp_value)) - ) - ) - .transpose() - .map(|ts| (influx_value, ts)) - )?, - _ => return Err(format!("Payload for {} was not a JSON object", publish.topic)), - } + Payload::Raw => (payload.to_influx_type(mapping.value_type)?, None), + Payload::Json { value_field_selector, timestamp_field_selector } => { + let payload_root: JsonValue = serde_json::from_str(&payload) + .map_err(|err| format!("Failed to parse payload as JSON: {}", err))?; + let influx_value = value_field_selector + .find(&payload_root) + .next() + .ok_or_else(|| format!("Couldn't find value in payload on topic {}", publish.topic)) + .and_then(|value| value.to_influx_type(mapping.value_type))?; + let timestamp = timestamp_field_selector + .as_ref() + .map(|selector| selector + .find(&payload_root) + .next() + .ok_or_else(|| format!("Couldn't find timestamp in payload on topic {}", publish.topic)) + .and_then(|ts_value| ts_value + .as_u64() + .map(|ts| ts as u128) + .ok_or_else(|| format!("'{}' cannot be converted to a timestamp", ts_value)) + ) + ) + .transpose()?; + (influx_value, timestamp) }, }; diff --git a/src/mapping.rs b/src/mapping.rs index 4fe6430..8ebf00d 100644 --- a/src/mapping.rs +++ b/src/mapping.rs @@ -1,7 +1,8 @@ use influxdb::Type; -use std::convert::TryFrom; +use jsonpath::Selector; +use std::{convert::TryFrom, fmt}; -use crate::config::{Mapping as ConfigMapping, Payload, TagValue as ConfigTagValue}; +use crate::config::{Mapping as ConfigMapping, Payload as ConfigPayload, TagValue as ConfigTagValue}; use crate::interpolate::{InterpolatedName, InterpolatedNamePart}; use crate::value::{ToInfluxType, ValueType}; @@ -50,10 +51,28 @@ impl TryFrom<&ConfigTagValue> for TagValue { } } +pub enum Payload { + Raw, + Json { + value_field_selector: Selector, + timestamp_field_selector: Option, + }, +} + +impl fmt::Debug for Payload { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + use Payload::*; + match self { + Raw => write!(f, "Raw"), + Json { .. } => write!(f, "Json {{ ... }}"), + } + } +} + #[derive(Debug)] pub struct Mapping { pub topic: Vec, - pub payload: Option, + pub payload: Payload, pub field_name: InterpolatedName, pub value_type: ValueType, pub tags: Vec<(String, TagValue)>, @@ -92,6 +111,23 @@ impl TryFrom<&ConfigMapping> for Mapping { Err(err) => Err(err), }?; + let payload = match &mapping.payload { + None => Payload::Raw, + Some(ConfigPayload::json { value_field_path, timestamp_field_path }) => { + let value_field_selector = Selector::new(&value_field_path) + .map_err(|err| format!("Value field path '{}' is invalid: {}'", value_field_path, err))?; + let timestamp_field_selector = timestamp_field_path.as_ref() + .map(|path| Selector::new(path) + .map_err(|err| format!("Timestamp field path '{}' is invalid: {}'", path, err)) + ) + .transpose()?; + Payload::Json { + value_field_selector, + timestamp_field_selector, + } + } + }; + let tags = mapping .tags .iter() @@ -109,7 +145,7 @@ impl TryFrom<&ConfigMapping> for Mapping { Ok(Mapping { topic, - payload: mapping.payload.as_ref().map(|p| p.clone()), + payload, field_name, value_type: mapping.value_type, tags,