diff options
Diffstat (limited to '')
-rw-r--r-- | src/main.rs | 201 | ||||
-rw-r--r-- | src/matrix_bot.rs | 134 | ||||
-rw-r--r-- | src/ntfy.rs | 28 |
3 files changed, 363 insertions, 0 deletions
diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..ed9c8a7 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,201 @@ +use tokio::sync::Mutex; +use std::{process::exit, sync::Arc, path::PathBuf, str::FromStr}; +use matrix_sdk::{ + config::SyncSettings, ruma::{events::room::message::RoomMessageEventContent, OwnedRoomOrAliasId}, Client, Room, encryption::EncryptionSettings, +}; +use secrecy::{Secret, ExposeSecret}; +use tokio::io::AsyncBufReadExt; +use futures_util::stream::TryStreamExt; +use confique::Config; +use clap::{arg, command, value_parser}; + +mod ntfy; +mod matrix_bot; + + +#[derive(serde::Deserialize, confique::Config, Debug, Clone)] +struct Options { + #[config(env="BOT_MATRIX_HOMESERVER")] + matrix_homeserver: String, + #[config(env="BOT_MATRIX_USERNAME")] + matrix_username: String, + #[config(env="BOT_MATRIX_PASSWORD")] + matrix_password: Secret<String>, + #[config(env="BOT_MATRIX_RECOVERY_KEY")] + matrix_recovery_key: Option<Secret<String>>, + #[config(env="BOT_MATRIX_DEVICE_NAME", default="ntfy bot")] + matrix_device_name: String, + #[config(env="BOT_MATRIX_ROOMS")] + matrix_rooms: Vec<OwnedRoomOrAliasId>, + #[config(env="BOT_NTFY_SERVER")] + ntfy_server: String, + #[config(env="BOT_NTFY_TOPICS", parse_env=confique::env::parse::list_by_comma)] + ntfy_topics: Vec<String>, + #[config(env="BOT_NTFY_TOKEN")] + ntfy_token: Option<Secret<String>> +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + + let matches = command!() + .arg(arg!(-c --config <FILE> "config file location") + .required(false) + .value_parser(value_parser!(PathBuf)) + ) + .get_matches(); + + let config_file = match matches.get_one::<PathBuf>("config") { + None => PathBuf::from_str("config.toml")?, + Some(path) => path.to_owned() + }; + + tracing::info!("config file location: {config_file:?}"); + + let config = &Options::builder() + .file(config_file) + .env() + .load()?; + + let client = Client::builder() + .homeserver_url(config.matrix_homeserver.to_owned()) + .with_encryption_settings(EncryptionSettings { + auto_enable_cross_signing: true, + auto_enable_backups: true, + ..Default::default() + }) + .build().await?; + + let recovery = client.encryption().recovery(); + + client + .matrix_auth() + .login_username(&config.matrix_username, &config.matrix_password.expose_secret()) + .initial_device_display_name(&config.matrix_device_name) + .await?; + + if let Some(ref key) = config.matrix_recovery_key { + recovery.recover(key.expose_secret()).await.unwrap(); + } else { + let key = recovery + .enable() + .wait_for_backups_to_upload() + .await + .unwrap(); + tracing::warn!("recovery key should be set in config: {}", key); + } + + tracing::info!("logged in as {}", config.matrix_username); + + + let configured_rooms = + matrix_bot::resolve_room_aliases(&client, &config.matrix_rooms) + .await; + + tracing::info!("rooms configured by id {:?}", configured_rooms); + + let rooms_to_notify = Arc::new(Mutex::new(Vec::new())); + + { + let rooms_to_notify = rooms_to_notify.clone(); + let configured_rooms = configured_rooms.clone(); + client.add_event_handler( + |event, client, room| + matrix_bot::join_rooms_on_invite(event, client, room, configured_rooms, rooms_to_notify) + ); + } + + client.sync_once(SyncSettings::default()).await?; + + matrix_bot::delete_other_devices(&client, &config.matrix_username, &config.matrix_password) + .await?; + + matrix_bot::join_configured_rooms(&client, &configured_rooms).await?; + + let rooms_we_have = client.joined_rooms() + .into_iter() + .filter(|room| configured_rooms.contains(&room.room_id().into())) + .collect::<Vec<_>>(); + { + let mut lock = rooms_to_notify.lock().await; + for room in rooms_we_have { + lock.push(room) + } + } + + tokio::spawn(subscriber_loop(config.clone(), rooms_to_notify)); + client.sync(SyncSettings::default()).await?; + + Ok(()) +} + +#[tracing::instrument(skip(config, rooms_to_notify))] +async fn subscriber_loop(config: Options, rooms_to_notify: Arc<Mutex<Vec<Room>>>) { + loop { + tracing::info!("starting ntfy subscriber"); + if let Err(e) = read_ntfy(&config, rooms_to_notify.clone()).await { + tracing::error!("error while handling ntfy events: {}", e); + } + } +} + +async fn read_ntfy(config: &Options, rooms_to_notify: Arc<Mutex<Vec<Room>>>) -> anyhow::Result<()> { + let url = format!("{}/{}/json", config.ntfy_server, config.ntfy_topics.join(",")); + let req = { + let mut req = reqwest::Client::new() + .get(url); + if let Some(ref token) = config.ntfy_token { + req = req.bearer_auth(token.expose_secret()); + } + req.send().await? + }; + + if !req.status().is_success() { + tracing::error!("fatal: ntfy sent status {}", req.status()); + exit(1); + } + + let stream = req + .bytes_stream() + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)); + let mut lines = tokio_util::io::StreamReader::new(stream).lines(); + + while let Ok(Some(line)) = lines.next_line().await { + tracing::debug!("received ntfy message {}", line); + match serde_json::from_str::<ntfy::Message>(&line) { + Ok(msg) if msg.event == ntfy::Event::Message => { + let title = match msg.title { + Some(ref title) => title, + _ => &msg.topic + }; + let (level, color) = match msg.priority { + Some(..=2) => ("DEBUG", "white"), + Some(3) | None => ("INFO", "cyan"), + Some(4) => ("WARN", "yellow"), + Some(5..) => ("ERROR", "red") + }; + // per spec, event=message implies this field exists + let body = msg.message.unwrap(); + + let event = RoomMessageEventContent::text_html( + format!("{level} {title}: {body}"), + format!("<font color=\"{color}\">{level} {title}:</font> {body}") + ); + tracing::info!("sending event"); + + for room in rooms_to_notify.lock().await.iter() { + room.send(event.clone()).await?; + } + } + Ok(msg) => { + tracing::debug!("ignoring ntfy message of type {:?}", msg.event); + } + Err(e) => { + tracing::error!("could not parse ntfy message: {}: {}", e, line) + } + } + } + + Ok(()) +} diff --git a/src/matrix_bot.rs b/src/matrix_bot.rs new file mode 100644 index 0000000..7618b0d --- /dev/null +++ b/src/matrix_bot.rs @@ -0,0 +1,134 @@ +use std::{sync::Arc, ops::Deref}; +use tokio::sync::Mutex; + +use anyhow::Context; +use matrix_sdk::{ + ruma::{events::room::member::StrippedRoomMemberEvent, api::client::uiaa, OwnedRoomId, OwnedRoomOrAliasId}, Client, Room, +}; +use secrecy::{Secret, ExposeSecret}; +use tokio::time::{sleep, Duration}; + +#[tracing::instrument(skip(client, configured_rooms, room, state))] +pub async fn join_rooms_on_invite( + event: StrippedRoomMemberEvent, + client: Client, + room: Room, + configured_rooms: Vec<OwnedRoomId>, + state: Arc<Mutex<Vec<Room>>> +) { + if event.state_key == client.user_id().unwrap() { + let room_id = room.room_id().to_owned(); + if configured_rooms.contains(&room.room_id().into()) { + tokio::spawn(async move { + join_room_with_cooldown(&room).await; + state.lock().await.push(room); + tracing::info!("Successfully joined room {room_id}"); + }); + } + else { + tracing::warn!("was invited to room {room_id} which is not configured"); + if let Err(e) = room.leave().await { + tracing::error!("error leaving room {room_id}: {e}"); + } + } + } +} + +async fn join_room_with_cooldown(room: &Room) { + tracing::info!("Autojoining room {}", room.room_id()); + let mut delay = 2; + let room_id = room.room_id().to_owned(); + + while let Err(err) = room.join().await { + // retry autojoin due to synapse sending invites, before the + // invited user can join for more information see + // https://github.com/matrix-org/synapse/issues/4345 + tracing::error!("Failed to join room {room_id} ({err:?}), retrying in {delay}s"); + + sleep(Duration::from_secs(delay)).await; + delay *= 2; + + if delay > 3600 { + eprintln!("Can't join room {} ({err:?})", room.room_id()); + break; + } + } +} + +pub async fn resolve_room_aliases(client: &Client, aliases: &Vec<OwnedRoomOrAliasId>) -> Vec<OwnedRoomId> { + futures_util::future::join_all (aliases + .iter() + .map(|name| async {match name.deref().try_into() { + Ok(alias) => Some(client.resolve_room_alias(alias).await.ok()?.room_id), + Err(id) => Some(id.into()) + }})) + .await + .into_iter() + .filter_map(|o: Option<_>| o) + .collect() +} + +#[tracing::instrument(skip(client))] +pub async fn join_configured_rooms(client: &Client, configured_rooms: &Vec<OwnedRoomId>) -> anyhow::Result<()> { + let joined_rooms = client.joined_rooms(); + + let missing_rooms = configured_rooms + .iter() + .filter(|room_id| !joined_rooms.iter().any(|room| room.room_id() == *room_id)) + .collect::<Vec<_>>(); + + let extra_rooms = joined_rooms + .iter() + .filter(|room| !configured_rooms.contains(&room.room_id().into())) + .collect::<Vec<_>>(); + + for room_id in missing_rooms { + tracing::info!("attempting to join room {room_id:?}"); + if let Err(e) = client.join_room_by_id(room_id).await { + tracing::error!("could not join room {}", e) + } + } + + for room in extra_rooms { + tracing::info!("leaving room {}", room.room_id()); + if let Err(e) = room.leave().await { + tracing::error!("could not leave room {}", e) + } + } + + Ok(()) +} + +#[tracing::instrument(skip(password))] +pub async fn delete_other_devices(client: &Client, username: &str, password: &Secret<String>) -> anyhow::Result<()> { + let own_id = client + .device_id() + .with_context(|| "no own device id?")?; + + let devices = client.devices().await? + .devices + .into_iter() + .map(|device| device.device_id) + .filter(|id| id != own_id) + .collect::<Vec<_>>(); + + tracing::info!("deleting devices {:?}", devices); + + // deleting devices is a funny double-request thing since it requires the + // user's password; see the documentation on delete_devices() + if let Err(e) = client.delete_devices(&devices, None).await { + if let Some(info) = e.as_uiaa_response() { + let mut password = uiaa::Password::new( + uiaa::UserIdentifier::UserIdOrLocalpart(username.to_owned()), + password.expose_secret().to_owned(), + ); + password.session = info.session.clone(); + + client + .delete_devices(&devices, Some(uiaa::AuthData::Password(password))) + .await?; + } + } + + Ok(()) +} diff --git a/src/ntfy.rs b/src/ntfy.rs new file mode 100644 index 0000000..48e9f36 --- /dev/null +++ b/src/ntfy.rs @@ -0,0 +1,28 @@ + + +/// see https://docs.ntfy.sh/subscribe/api/#json-message-format +#[derive(Debug, serde::Deserialize)] +#[allow(unused)] +pub struct Message { + pub id: String, + pub time: u64, + pub expires: Option<u64>, + pub event: Event, + pub topic: String, + pub message: Option<String>, + pub title: Option<String>, + pub tags: Option<Vec<String>>, + pub priority: Option<usize>, + pub click: Option<String>, + // omitted: actions, attachment +} + + +#[derive(Debug, serde::Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum Event { + Open, + Keepalive, + Message, + PollRequest +} |