use tokio::sync::Mutex; use std::{process::exit, sync::Arc, path::PathBuf, str::FromStr}; use matrix_sdk::{ config::SyncSettings, ruma::{events::room::message::RoomMessageEventContent, OwnedRoomOrAliasId}, Client, Room, encryption::EncryptionSettings, }; use secrecy::{Secret, ExposeSecret}; use tokio::io::AsyncBufReadExt; use futures_util::stream::TryStreamExt; use confique::Config; use clap::{arg, command, value_parser}; mod ntfy; mod matrix_bot; #[derive(serde::Deserialize, confique::Config, Debug, Clone)] struct Options { #[config(env="BOT_MATRIX_HOMESERVER")] matrix_homeserver: String, #[config(env="BOT_MATRIX_USERNAME")] matrix_username: String, #[config(env="BOT_MATRIX_PASSWORD")] matrix_password: Secret, #[config(env="BOT_MATRIX_RECOVERY_KEY")] matrix_recovery_key: Option>, #[config(env="BOT_MATRIX_DEVICE_NAME", default="ntfy bot")] matrix_device_name: String, #[config(env="BOT_MATRIX_ROOMS")] matrix_rooms: Vec, #[config(env="BOT_NTFY_SERVER")] ntfy_server: String, #[config(env="BOT_NTFY_TOPICS", parse_env=confique::env::parse::list_by_comma)] ntfy_topics: Vec, #[config(env="BOT_NTFY_TOKEN")] ntfy_token: Option> } #[tokio::main] async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt::init(); let matches = command!() .arg(arg!(-c --config "config file location") .required(false) .value_parser(value_parser!(PathBuf)) ) .get_matches(); let config_file = match matches.get_one::("config") { None => PathBuf::from_str("config.toml")?, Some(path) => path.to_owned() }; tracing::info!("config file location: {config_file:?}"); let config = &Options::builder() .file(config_file) .env() .load()?; let client = Client::builder() .homeserver_url(config.matrix_homeserver.to_owned()) .with_encryption_settings(EncryptionSettings { auto_enable_cross_signing: true, auto_enable_backups: true, ..Default::default() }) .build().await?; let recovery = client.encryption().recovery(); client .matrix_auth() .login_username(&config.matrix_username, &config.matrix_password.expose_secret()) .initial_device_display_name(&config.matrix_device_name) .await?; if let Some(ref key) = config.matrix_recovery_key { recovery.recover(key.expose_secret()).await.unwrap(); } else { let key = recovery .enable() .wait_for_backups_to_upload() .await .unwrap(); tracing::warn!("recovery key should be set in config: {}", key); } tracing::info!("logged in as {}", config.matrix_username); let configured_rooms = matrix_bot::resolve_room_aliases(&client, &config.matrix_rooms) .await; tracing::info!("rooms configured by id {:?}", configured_rooms); let rooms_to_notify = Arc::new(Mutex::new(Vec::new())); { let rooms_to_notify = rooms_to_notify.clone(); let configured_rooms = configured_rooms.clone(); client.add_event_handler( |event, client, room| matrix_bot::join_rooms_on_invite(event, client, room, configured_rooms, rooms_to_notify) ); } client.sync_once(SyncSettings::default()).await?; matrix_bot::delete_other_devices(&client, &config.matrix_username, &config.matrix_password) .await?; matrix_bot::join_configured_rooms(&client, &configured_rooms).await?; let rooms_we_have = client.joined_rooms() .into_iter() .filter(|room| configured_rooms.contains(&room.room_id().into())) .collect::>(); { let mut lock = rooms_to_notify.lock().await; for room in rooms_we_have { lock.push(room) } } tokio::spawn(subscriber_loop(config.clone(), rooms_to_notify)); client.sync(SyncSettings::default()).await?; Ok(()) } #[tracing::instrument(skip(config, rooms_to_notify))] async fn subscriber_loop(config: Options, rooms_to_notify: Arc>>) { loop { tracing::info!("starting ntfy subscriber"); if let Err(e) = read_ntfy(&config, rooms_to_notify.clone()).await { tracing::error!("error while handling ntfy events: {}", e); } } } async fn read_ntfy(config: &Options, rooms_to_notify: Arc>>) -> anyhow::Result<()> { let url = format!("{}/{}/json", config.ntfy_server, config.ntfy_topics.join(",")); let req = { let mut req = reqwest::Client::new() .get(url); if let Some(ref token) = config.ntfy_token { req = req.bearer_auth(token.expose_secret()); } req.send().await? }; if !req.status().is_success() { tracing::error!("fatal: ntfy sent status {}", req.status()); exit(1); } let stream = req .bytes_stream() .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)); let mut lines = tokio_util::io::StreamReader::new(stream).lines(); while let Ok(Some(line)) = lines.next_line().await { tracing::debug!("received ntfy message {}", line); match serde_json::from_str::(&line) { Ok(msg) if msg.event == ntfy::Event::Message => { let title = match msg.title { Some(ref title) => title, _ => &msg.topic }; let (level, color) = match msg.priority { Some(..=2) => ("DEBUG", "white"), Some(3) | None => ("INFO", "cyan"), Some(4) => ("WARN", "yellow"), Some(5..) => ("ERROR", "red") }; // per spec, event=message implies this field exists let body = msg.message.unwrap(); let event = RoomMessageEventContent::text_html( format!("{level} {title}: {body}"), format!("{level} {title}: {body}") ); tracing::info!("sending event"); for room in rooms_to_notify.lock().await.iter() { room.send(event.clone()).await?; } } Ok(msg) => { tracing::debug!("ignoring ntfy message of type {:?}", msg.event); } Err(e) => { tracing::error!("could not parse ntfy message: {}: {}", e, line) } } } Ok(()) }