diff options
Diffstat (limited to '')
-rw-r--r-- | src/main.rs | 201 |
1 files changed, 201 insertions, 0 deletions
diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..ed9c8a7 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,201 @@ +use tokio::sync::Mutex; +use std::{process::exit, sync::Arc, path::PathBuf, str::FromStr}; +use matrix_sdk::{ + config::SyncSettings, ruma::{events::room::message::RoomMessageEventContent, OwnedRoomOrAliasId}, Client, Room, encryption::EncryptionSettings, +}; +use secrecy::{Secret, ExposeSecret}; +use tokio::io::AsyncBufReadExt; +use futures_util::stream::TryStreamExt; +use confique::Config; +use clap::{arg, command, value_parser}; + +mod ntfy; +mod matrix_bot; + + +#[derive(serde::Deserialize, confique::Config, Debug, Clone)] +struct Options { + #[config(env="BOT_MATRIX_HOMESERVER")] + matrix_homeserver: String, + #[config(env="BOT_MATRIX_USERNAME")] + matrix_username: String, + #[config(env="BOT_MATRIX_PASSWORD")] + matrix_password: Secret<String>, + #[config(env="BOT_MATRIX_RECOVERY_KEY")] + matrix_recovery_key: Option<Secret<String>>, + #[config(env="BOT_MATRIX_DEVICE_NAME", default="ntfy bot")] + matrix_device_name: String, + #[config(env="BOT_MATRIX_ROOMS")] + matrix_rooms: Vec<OwnedRoomOrAliasId>, + #[config(env="BOT_NTFY_SERVER")] + ntfy_server: String, + #[config(env="BOT_NTFY_TOPICS", parse_env=confique::env::parse::list_by_comma)] + ntfy_topics: Vec<String>, + #[config(env="BOT_NTFY_TOKEN")] + ntfy_token: Option<Secret<String>> +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + + let matches = command!() + .arg(arg!(-c --config <FILE> "config file location") + .required(false) + .value_parser(value_parser!(PathBuf)) + ) + .get_matches(); + + let config_file = match matches.get_one::<PathBuf>("config") { + None => PathBuf::from_str("config.toml")?, + Some(path) => path.to_owned() + }; + + tracing::info!("config file location: {config_file:?}"); + + let config = &Options::builder() + .file(config_file) + .env() + .load()?; + + let client = Client::builder() + .homeserver_url(config.matrix_homeserver.to_owned()) + .with_encryption_settings(EncryptionSettings { + auto_enable_cross_signing: true, + auto_enable_backups: true, + ..Default::default() + }) + .build().await?; + + let recovery = client.encryption().recovery(); + + client + .matrix_auth() + .login_username(&config.matrix_username, &config.matrix_password.expose_secret()) + .initial_device_display_name(&config.matrix_device_name) + .await?; + + if let Some(ref key) = config.matrix_recovery_key { + recovery.recover(key.expose_secret()).await.unwrap(); + } else { + let key = recovery + .enable() + .wait_for_backups_to_upload() + .await + .unwrap(); + tracing::warn!("recovery key should be set in config: {}", key); + } + + tracing::info!("logged in as {}", config.matrix_username); + + + let configured_rooms = + matrix_bot::resolve_room_aliases(&client, &config.matrix_rooms) + .await; + + tracing::info!("rooms configured by id {:?}", configured_rooms); + + let rooms_to_notify = Arc::new(Mutex::new(Vec::new())); + + { + let rooms_to_notify = rooms_to_notify.clone(); + let configured_rooms = configured_rooms.clone(); + client.add_event_handler( + |event, client, room| + matrix_bot::join_rooms_on_invite(event, client, room, configured_rooms, rooms_to_notify) + ); + } + + client.sync_once(SyncSettings::default()).await?; + + matrix_bot::delete_other_devices(&client, &config.matrix_username, &config.matrix_password) + .await?; + + matrix_bot::join_configured_rooms(&client, &configured_rooms).await?; + + let rooms_we_have = client.joined_rooms() + .into_iter() + .filter(|room| configured_rooms.contains(&room.room_id().into())) + .collect::<Vec<_>>(); + { + let mut lock = rooms_to_notify.lock().await; + for room in rooms_we_have { + lock.push(room) + } + } + + tokio::spawn(subscriber_loop(config.clone(), rooms_to_notify)); + client.sync(SyncSettings::default()).await?; + + Ok(()) +} + +#[tracing::instrument(skip(config, rooms_to_notify))] +async fn subscriber_loop(config: Options, rooms_to_notify: Arc<Mutex<Vec<Room>>>) { + loop { + tracing::info!("starting ntfy subscriber"); + if let Err(e) = read_ntfy(&config, rooms_to_notify.clone()).await { + tracing::error!("error while handling ntfy events: {}", e); + } + } +} + +async fn read_ntfy(config: &Options, rooms_to_notify: Arc<Mutex<Vec<Room>>>) -> anyhow::Result<()> { + let url = format!("{}/{}/json", config.ntfy_server, config.ntfy_topics.join(",")); + let req = { + let mut req = reqwest::Client::new() + .get(url); + if let Some(ref token) = config.ntfy_token { + req = req.bearer_auth(token.expose_secret()); + } + req.send().await? + }; + + if !req.status().is_success() { + tracing::error!("fatal: ntfy sent status {}", req.status()); + exit(1); + } + + let stream = req + .bytes_stream() + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)); + let mut lines = tokio_util::io::StreamReader::new(stream).lines(); + + while let Ok(Some(line)) = lines.next_line().await { + tracing::debug!("received ntfy message {}", line); + match serde_json::from_str::<ntfy::Message>(&line) { + Ok(msg) if msg.event == ntfy::Event::Message => { + let title = match msg.title { + Some(ref title) => title, + _ => &msg.topic + }; + let (level, color) = match msg.priority { + Some(..=2) => ("DEBUG", "white"), + Some(3) | None => ("INFO", "cyan"), + Some(4) => ("WARN", "yellow"), + Some(5..) => ("ERROR", "red") + }; + // per spec, event=message implies this field exists + let body = msg.message.unwrap(); + + let event = RoomMessageEventContent::text_html( + format!("{level} {title}: {body}"), + format!("<font color=\"{color}\">{level} {title}:</font> {body}") + ); + tracing::info!("sending event"); + + for room in rooms_to_notify.lock().await.iter() { + room.send(event.clone()).await?; + } + } + Ok(msg) => { + tracing::debug!("ignoring ntfy message of type {:?}", msg.event); + } + Err(e) => { + tracing::error!("could not parse ntfy message: {}: {}", e, line) + } + } + } + + Ok(()) +} |