Add support for reading mail-archive.com RSS feeds

This commit is contained in:
2023-09-20 18:16:30 -07:00
parent 092b29637f
commit dc41aeac14
7 changed files with 1252 additions and 27 deletions

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
use std::{collections::HashMap, fs::File, io::BufReader};
use std::{collections::HashMap, fs::File, io::BufReader, path::PathBuf};
use anyhow::Context;
use matrix_sdk::ruma::{OwnedRoomOrAliasId, OwnedUserId};
@ -52,7 +52,7 @@ pub struct RepoConfig {
}
#[derive(Deserialize)]
pub struct GitlabConfig {
pub struct GitlabWebhookConfig {
pub url_prefix: Option<String>,
#[serde(default)]
#[serde(deserialize_with = "crate::matrix::deser_optional_room_or_alias_id")]
@ -62,6 +62,22 @@ pub struct GitlabConfig {
// gitlab.xfce.org/xfce/xfdesktop
}
#[derive(Clone, Deserialize)]
pub struct MailListConfig {
pub name: String,
#[serde(default)]
pub rooms: Vec<OwnedRoomOrAliasId>,
}
#[derive(Deserialize)]
pub struct MailArchiveConfig {
#[serde(default)]
pub default_rooms: Vec<OwnedRoomOrAliasId>,
pub update_interval: u64, // seconds
pub state_dir: PathBuf,
pub lists: Vec<MailListConfig>,
}
#[derive(Deserialize)]
pub struct Config {
pub bind_address: Option<String>,
@ -69,7 +85,8 @@ pub struct Config {
#[serde(deserialize_with = "crate::matrix::deser_user_id")]
pub user_id: OwnedUserId,
pub password: String,
pub gitlab: GitlabConfig,
pub gitlab_webhook: Option<GitlabWebhookConfig>,
pub mail_archive: Option<MailArchiveConfig>,
}
fn load_blocking(path: &String) -> anyhow::Result<Config> {

View File

@ -26,7 +26,7 @@ use tokio::sync::mpsc;
use warp::{filters::BoxedFilter, reply::Reply, Filter};
use crate::{
config::Config,
config::GitlabWebhookConfig,
gitlab_event::{parse_ref, GitlabEvent, GitlabEventExt},
matrix,
};
@ -66,7 +66,7 @@ pub async fn handle_gitlab_event(
Ok(())
}
pub fn build_webhook_route(config: Arc<Config>, matrix_client: Client) -> anyhow::Result<BoxedFilter<(impl Reply,)>> {
pub fn build_route(config: GitlabWebhookConfig, matrix_client: Client) -> anyhow::Result<BoxedFilter<(impl Reply,)>> {
let (event_tx, mut event_rx) = mpsc::channel::<(GitlabEvent, OwnedRoomOrAliasId)>(100);
tokio::spawn(async move {
while let Some((event, room)) = event_rx.recv().await {
@ -76,7 +76,7 @@ pub fn build_webhook_route(config: Arc<Config>, matrix_client: Client) -> anyhow
}
});
let gitlab_root_path = if let Some(url_prefix) = config.gitlab.url_prefix.as_ref() {
let gitlab_root_path = if let Some(url_prefix) = config.url_prefix.as_ref() {
url_prefix.split('/').fold(warp::any().boxed(), |last, segment| {
if segment.is_empty() {
last
@ -88,14 +88,15 @@ pub fn build_webhook_route(config: Arc<Config>, matrix_client: Client) -> anyhow
warp::any().boxed()
};
let config = Arc::new(config);
let gitlab = gitlab_root_path
.and(warp::path!("hooks" / "gitlab"))
.and(warp::post())
.and(warp::header::<String>("x-gitlab-token"))
.and(warp::body::json())
.then(move |token: String, event: GitlabEvent| {
let config = Arc::clone(&config);
let event_tx = event_tx.clone();
let config = Arc::clone(&config);
async move {
match event {
@ -105,19 +106,19 @@ pub fn build_webhook_route(config: Arc<Config>, matrix_client: Client) -> anyhow
_ => {
let project = event.project();
let config_key = project.web_url.replace("http://", "").replace("https://", "");
if let Some(repo_config) = config.gitlab.repo_configs.get(&config_key) {
if let Some(repo_config) = config.repo_configs.get(&config_key) {
if !constant_time_eq(token.as_bytes(), repo_config.token.as_bytes()) {
warn!("Invalid token for repo '{}'", config_key);
warp::reply::with_status("Invalid token", StatusCode::FORBIDDEN)
} else {
debug!("payload: {:?}", event);
if let Some(room) = repo_config.room.as_ref().or(config.gitlab.default_room.as_ref()) {
if let Some(room) = &repo_config.room.as_ref().or(config.default_room.as_ref()) {
let publish_events = repo_config
.publish_events
.as_ref()
.or(config.gitlab.default_publish_events.as_ref());
.or(config.default_publish_events.as_ref());
if publish_events.map(|ecs| event.should_publish(ecs)).unwrap_or(true) {
if let Err(err) = event_tx.send((event, room.clone())).await {
if let Err(err) = event_tx.send((event, (*room).clone())).await {
warn!("Failed to enqueue payload: {}", err);
}
}

262
src/mail_archive.rs Normal file
View File

@ -0,0 +1,262 @@
// bebot
// Copyright (C) 2023 Brian Tarricone <brian@tarricone.org>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
use std::{
fmt,
io::{BufReader, BufWriter, ErrorKind},
path::PathBuf,
time::{Duration, SystemTime},
};
use anyhow::Context;
use chrono::{DateTime, Utc};
use futures::{future::join_all, FutureExt};
use matrix_sdk::{
ruma::{events::room::message::RoomMessageEventContent, OwnedRoomOrAliasId},
Client,
};
use reqwest::redirect;
use serde::de;
use tokio::{fs::File, task::JoinHandle, time::sleep};
use crate::{
config::{MailArchiveConfig, MailListConfig},
matrix,
};
#[derive(Clone, Copy, Serialize, Deserialize)]
struct ListState {
last_pub_date: DateTime<Utc>,
}
#[derive(Debug, Deserialize)]
struct RssPubDate {
#[serde(rename = "$text", deserialize_with = "deser_rfc2616")]
value: DateTime<Utc>,
}
#[derive(Debug, Deserialize)]
struct RssItem {
title: String,
link: String,
#[serde(rename = "pubDate")]
pub_date: RssPubDate,
}
#[derive(Debug, Deserialize)]
struct RssChannel {
#[serde(rename = "item")]
items: Vec<RssItem>,
}
#[derive(Debug, Deserialize)]
struct MailRss {
channel: RssChannel,
}
async fn load_list_state(state_file: &PathBuf) -> anyhow::Result<ListState> {
match File::open(state_file).await {
Err(err) if err.kind() == ErrorKind::NotFound => {
// If we have no state, we probably don't want to blast out events
// for every single item in the RSS feed, so pretend the last time
// we published was right now.
let list_state = ListState {
last_pub_date: SystemTime::now().into(),
};
save_list_state(list_state, state_file).await?;
Ok(list_state)
}
Err(err) => Err(err)?,
Ok(f) => {
let r = BufReader::new(f.into_std().await);
Ok(tokio::task::spawn_blocking(move || serde_yaml::from_reader(r)).await??)
}
}
}
async fn save_list_state(list_state: ListState, state_file: &PathBuf) -> anyhow::Result<()> {
let f = File::options()
.write(true)
.truncate(true)
.create(true)
.open(state_file)
.await?;
let w = BufWriter::new(f.into_std().await);
tokio::task::spawn_blocking(move || serde_yaml::to_writer(w, &list_state)).await??;
Ok(())
}
async fn handle_list(
list: &MailListConfig,
state_file: &PathBuf,
http_client: &reqwest::Client,
url: &String,
matrix_client: &Client,
room_ids: &[OwnedRoomOrAliasId],
) -> anyhow::Result<()> {
let list_state = load_list_state(state_file).await?;
let rooms_f = room_ids.iter().map(|room_id| {
matrix::ensure_room_joined(matrix_client, room_id)
.map(move |res| res.with_context(|| format!("Failed to join Matrix room '{}'", room_id)))
});
let rooms = join_all(rooms_f)
.await
.into_iter()
.flat_map(|room_res| match room_res {
Err(err) => {
warn!("{:#}", err);
vec![]
}
Ok(room) => vec![room],
})
.collect::<Vec<_>>();
if rooms.is_empty() {
return Err(anyhow!("Failed to join all rooms for list '{}'; skipping", list.name));
}
let response = http_client
.get(url)
.send()
.await
.with_context(|| format!("Failed to fetch mail RSS feed from '{}'", url))
.and_then(|response| {
if !response.status().is_success() {
Err(anyhow!(
"Failed to fetch mail RSS feed from '{}': server returned status {}",
url,
response.status().as_u16()
))
} else {
Ok(response)
}
})?;
let body = response
.text()
.await
.with_context(|| format!("Failed to decode RSS response body for '{}'", url))?;
let mail_rss = tokio::task::spawn_blocking(move || quick_xml::de::from_str::<MailRss>(&body))
.await?
.with_context(|| format!("Failed to parse RSS feed for '{}'", url))?;
let items = mail_rss
.channel
.items
.into_iter()
.rev()
.skip_while(|item| item.pub_date.value <= list_state.last_pub_date)
.collect::<Vec<_>>();
for room in rooms {
for item in &items {
let msg =
RoomMessageEventContent::text_markdown(format!("\\[{}\\] [{}]({}]", list.name, item.title, item.link));
room.send(msg, None)
.await
.with_context(|| format!("Failed to send message to room '{}'", room.room_id()))?;
save_list_state(
ListState {
last_pub_date: item.pub_date.value,
},
state_file,
)
.await?;
}
}
Ok(())
}
pub fn start_polling(config: MailArchiveConfig, matrix_client: Client) -> anyhow::Result<Vec<JoinHandle<()>>> {
let http_client = reqwest::Client::builder()
.user_agent(format!("{}/{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")))
.gzip(true)
.redirect(redirect::Policy::default())
.timeout(Duration::from_secs(8))
.build()?;
Ok(config
.lists
.into_iter()
.map(|list| {
let room_ids = if list.rooms.is_empty() {
&config.default_rooms
} else {
&list.rooms
}
.clone();
let list = list.clone();
let http_client = http_client.clone();
let matrix_client = matrix_client.clone();
let url = format!("https://www.mail-archive.com/{}/maillist.xml", list.name);
let state_file = config.state_dir.join(format!("{}.state", list.name));
let update_interval = Duration::from_secs(config.update_interval);
tokio::spawn(async move {
if !room_ids.is_empty() {
loop {
if let Err(err) =
handle_list(&list, &state_file, &http_client, &url, &matrix_client, &room_ids).await
{
warn!("{:#}", err);
}
sleep(update_interval).await;
}
}
})
})
.collect())
}
fn deser_rfc2616<'de, D>(deserializer: D) -> Result<DateTime<Utc>, D::Error>
where
D: de::Deserializer<'de>,
{
struct Rfc2616Visitor;
impl<'de> de::Visitor<'de> for Rfc2616Visitor {
type Value = DateTime<Utc>;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("an RFC2616-formatted datetime")
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: de::Error,
{
dateparser::parse(v).map_err(E::custom)
}
}
deserializer.deserialize_any(Rfc2616Visitor)
}
#[cfg(test)]
mod test {
use std::{fs::File, io::BufReader};
use super::MailRss;
#[test]
pub fn test_feed_deser() -> anyhow::Result<()> {
let f = File::open(format!("{}/test-data/maillist.xml", env!("CARGO_MANIFEST_DIR")))?;
let r = BufReader::new(f);
let mail_rss = quick_xml::de::from_reader::<_, MailRss>(r)?;
println!("{:#?}", mail_rss);
Ok(())
}
}

View File

@ -22,37 +22,50 @@ extern crate log;
extern crate serde;
mod config;
mod gitlab;
mod gitlab_event;
mod gitlab_webhook;
mod mail_archive;
mod matrix;
use std::{env, net::IpAddr, process::exit, sync::Arc};
use std::{env, net::IpAddr, process::exit};
use anyhow::Context;
use futures::future::join_all;
use warp::Filter;
async fn run() -> anyhow::Result<()> {
let config_path = env::args()
.nth(1)
.ok_or_else(|| anyhow!("Config file should be passed as only parameter"))?;
let config = Arc::new(config::load(config_path).await?);
let mut config = config::load(config_path).await?;
let matrix_client = matrix::connect(&config).await.context("Failed to connect to Matrix")?;
let gitlab = gitlab::build_webhook_route(Arc::clone(&config), matrix_client)?;
let routes = gitlab.with(warp::log("bebot"));
let handles = if let Some(mail_archive) = config.mail_archive.take() {
mail_archive::start_polling(mail_archive, matrix_client.clone())?
} else {
vec![]
};
let addr = config
.bind_address
.as_ref()
.cloned()
.unwrap_or_else(|| "127.0.0.1".to_string())
.parse::<IpAddr>()
.context("Failed to parse bind_address")?;
let port = config.bind_port.unwrap_or(3000);
warp::serve(routes).run((addr, port)).await;
if let Some(gitlab_webhook) = config.gitlab_webhook.take() {
let gitlab = gitlab_webhook::build_route(gitlab_webhook, matrix_client.clone())?;
let routes = gitlab.with(warp::log("bebot"));
Ok(())
let addr = config
.bind_address
.as_ref()
.cloned()
.unwrap_or_else(|| "127.0.0.1".to_string())
.parse::<IpAddr>()
.context("Failed to parse bind_address")?;
let port = config.bind_port.unwrap_or(3000);
warp::serve(routes).run((addr, port)).await;
}
join_all(handles).await;
error!("No functionality is configured; exiting");
exit(1);
}
#[tokio::main]