summaryrefslogtreecommitdiff
path: root/src/main.rs
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/main.rs201
1 files changed, 201 insertions, 0 deletions
diff --git a/src/main.rs b/src/main.rs
new file mode 100644
index 0000000..ed9c8a7
--- /dev/null
+++ b/src/main.rs
@@ -0,0 +1,201 @@
+use tokio::sync::Mutex;
+use std::{process::exit, sync::Arc, path::PathBuf, str::FromStr};
+use matrix_sdk::{
+ config::SyncSettings, ruma::{events::room::message::RoomMessageEventContent, OwnedRoomOrAliasId}, Client, Room, encryption::EncryptionSettings,
+};
+use secrecy::{Secret, ExposeSecret};
+use tokio::io::AsyncBufReadExt;
+use futures_util::stream::TryStreamExt;
+use confique::Config;
+use clap::{arg, command, value_parser};
+
+mod ntfy;
+mod matrix_bot;
+
+
+#[derive(serde::Deserialize, confique::Config, Debug, Clone)]
+struct Options {
+ #[config(env="BOT_MATRIX_HOMESERVER")]
+ matrix_homeserver: String,
+ #[config(env="BOT_MATRIX_USERNAME")]
+ matrix_username: String,
+ #[config(env="BOT_MATRIX_PASSWORD")]
+ matrix_password: Secret<String>,
+ #[config(env="BOT_MATRIX_RECOVERY_KEY")]
+ matrix_recovery_key: Option<Secret<String>>,
+ #[config(env="BOT_MATRIX_DEVICE_NAME", default="ntfy bot")]
+ matrix_device_name: String,
+ #[config(env="BOT_MATRIX_ROOMS")]
+ matrix_rooms: Vec<OwnedRoomOrAliasId>,
+ #[config(env="BOT_NTFY_SERVER")]
+ ntfy_server: String,
+ #[config(env="BOT_NTFY_TOPICS", parse_env=confique::env::parse::list_by_comma)]
+ ntfy_topics: Vec<String>,
+ #[config(env="BOT_NTFY_TOKEN")]
+ ntfy_token: Option<Secret<String>>
+}
+
+#[tokio::main]
+async fn main() -> anyhow::Result<()> {
+ tracing_subscriber::fmt::init();
+
+ let matches = command!()
+ .arg(arg!(-c --config <FILE> "config file location")
+ .required(false)
+ .value_parser(value_parser!(PathBuf))
+ )
+ .get_matches();
+
+ let config_file = match matches.get_one::<PathBuf>("config") {
+ None => PathBuf::from_str("config.toml")?,
+ Some(path) => path.to_owned()
+ };
+
+ tracing::info!("config file location: {config_file:?}");
+
+ let config = &Options::builder()
+ .file(config_file)
+ .env()
+ .load()?;
+
+ let client = Client::builder()
+ .homeserver_url(config.matrix_homeserver.to_owned())
+ .with_encryption_settings(EncryptionSettings {
+ auto_enable_cross_signing: true,
+ auto_enable_backups: true,
+ ..Default::default()
+ })
+ .build().await?;
+
+ let recovery = client.encryption().recovery();
+
+ client
+ .matrix_auth()
+ .login_username(&config.matrix_username, &config.matrix_password.expose_secret())
+ .initial_device_display_name(&config.matrix_device_name)
+ .await?;
+
+ if let Some(ref key) = config.matrix_recovery_key {
+ recovery.recover(key.expose_secret()).await.unwrap();
+ } else {
+ let key = recovery
+ .enable()
+ .wait_for_backups_to_upload()
+ .await
+ .unwrap();
+ tracing::warn!("recovery key should be set in config: {}", key);
+ }
+
+ tracing::info!("logged in as {}", config.matrix_username);
+
+
+ let configured_rooms =
+ matrix_bot::resolve_room_aliases(&client, &config.matrix_rooms)
+ .await;
+
+ tracing::info!("rooms configured by id {:?}", configured_rooms);
+
+ let rooms_to_notify = Arc::new(Mutex::new(Vec::new()));
+
+ {
+ let rooms_to_notify = rooms_to_notify.clone();
+ let configured_rooms = configured_rooms.clone();
+ client.add_event_handler(
+ |event, client, room|
+ matrix_bot::join_rooms_on_invite(event, client, room, configured_rooms, rooms_to_notify)
+ );
+ }
+
+ client.sync_once(SyncSettings::default()).await?;
+
+ matrix_bot::delete_other_devices(&client, &config.matrix_username, &config.matrix_password)
+ .await?;
+
+ matrix_bot::join_configured_rooms(&client, &configured_rooms).await?;
+
+ let rooms_we_have = client.joined_rooms()
+ .into_iter()
+ .filter(|room| configured_rooms.contains(&room.room_id().into()))
+ .collect::<Vec<_>>();
+ {
+ let mut lock = rooms_to_notify.lock().await;
+ for room in rooms_we_have {
+ lock.push(room)
+ }
+ }
+
+ tokio::spawn(subscriber_loop(config.clone(), rooms_to_notify));
+ client.sync(SyncSettings::default()).await?;
+
+ Ok(())
+}
+
+#[tracing::instrument(skip(config, rooms_to_notify))]
+async fn subscriber_loop(config: Options, rooms_to_notify: Arc<Mutex<Vec<Room>>>) {
+ loop {
+ tracing::info!("starting ntfy subscriber");
+ if let Err(e) = read_ntfy(&config, rooms_to_notify.clone()).await {
+ tracing::error!("error while handling ntfy events: {}", e);
+ }
+ }
+}
+
+async fn read_ntfy(config: &Options, rooms_to_notify: Arc<Mutex<Vec<Room>>>) -> anyhow::Result<()> {
+ let url = format!("{}/{}/json", config.ntfy_server, config.ntfy_topics.join(","));
+ let req = {
+ let mut req = reqwest::Client::new()
+ .get(url);
+ if let Some(ref token) = config.ntfy_token {
+ req = req.bearer_auth(token.expose_secret());
+ }
+ req.send().await?
+ };
+
+ if !req.status().is_success() {
+ tracing::error!("fatal: ntfy sent status {}", req.status());
+ exit(1);
+ }
+
+ let stream = req
+ .bytes_stream()
+ .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
+ let mut lines = tokio_util::io::StreamReader::new(stream).lines();
+
+ while let Ok(Some(line)) = lines.next_line().await {
+ tracing::debug!("received ntfy message {}", line);
+ match serde_json::from_str::<ntfy::Message>(&line) {
+ Ok(msg) if msg.event == ntfy::Event::Message => {
+ let title = match msg.title {
+ Some(ref title) => title,
+ _ => &msg.topic
+ };
+ let (level, color) = match msg.priority {
+ Some(..=2) => ("DEBUG", "white"),
+ Some(3) | None => ("INFO", "cyan"),
+ Some(4) => ("WARN", "yellow"),
+ Some(5..) => ("ERROR", "red")
+ };
+ // per spec, event=message implies this field exists
+ let body = msg.message.unwrap();
+
+ let event = RoomMessageEventContent::text_html(
+ format!("{level} {title}: {body}"),
+ format!("<font color=\"{color}\">{level} {title}:</font> {body}")
+ );
+ tracing::info!("sending event");
+
+ for room in rooms_to_notify.lock().await.iter() {
+ room.send(event.clone()).await?;
+ }
+ }
+ Ok(msg) => {
+ tracing::debug!("ignoring ntfy message of type {:?}", msg.event);
+ }
+ Err(e) => {
+ tracing::error!("could not parse ntfy message: {}: {}", e, line)
+ }
+ }
+ }
+
+ Ok(())
+}