summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main.rs201
-rw-r--r--src/matrix_bot.rs134
-rw-r--r--src/ntfy.rs28
3 files changed, 363 insertions, 0 deletions
diff --git a/src/main.rs b/src/main.rs
new file mode 100644
index 0000000..ed9c8a7
--- /dev/null
+++ b/src/main.rs
@@ -0,0 +1,201 @@
+use tokio::sync::Mutex;
+use std::{process::exit, sync::Arc, path::PathBuf, str::FromStr};
+use matrix_sdk::{
+ config::SyncSettings, ruma::{events::room::message::RoomMessageEventContent, OwnedRoomOrAliasId}, Client, Room, encryption::EncryptionSettings,
+};
+use secrecy::{Secret, ExposeSecret};
+use tokio::io::AsyncBufReadExt;
+use futures_util::stream::TryStreamExt;
+use confique::Config;
+use clap::{arg, command, value_parser};
+
+mod ntfy;
+mod matrix_bot;
+
+
+#[derive(serde::Deserialize, confique::Config, Debug, Clone)]
+struct Options {
+ #[config(env="BOT_MATRIX_HOMESERVER")]
+ matrix_homeserver: String,
+ #[config(env="BOT_MATRIX_USERNAME")]
+ matrix_username: String,
+ #[config(env="BOT_MATRIX_PASSWORD")]
+ matrix_password: Secret<String>,
+ #[config(env="BOT_MATRIX_RECOVERY_KEY")]
+ matrix_recovery_key: Option<Secret<String>>,
+ #[config(env="BOT_MATRIX_DEVICE_NAME", default="ntfy bot")]
+ matrix_device_name: String,
+ #[config(env="BOT_MATRIX_ROOMS")]
+ matrix_rooms: Vec<OwnedRoomOrAliasId>,
+ #[config(env="BOT_NTFY_SERVER")]
+ ntfy_server: String,
+ #[config(env="BOT_NTFY_TOPICS", parse_env=confique::env::parse::list_by_comma)]
+ ntfy_topics: Vec<String>,
+ #[config(env="BOT_NTFY_TOKEN")]
+ ntfy_token: Option<Secret<String>>
+}
+
+#[tokio::main]
+async fn main() -> anyhow::Result<()> {
+ tracing_subscriber::fmt::init();
+
+ let matches = command!()
+ .arg(arg!(-c --config <FILE> "config file location")
+ .required(false)
+ .value_parser(value_parser!(PathBuf))
+ )
+ .get_matches();
+
+ let config_file = match matches.get_one::<PathBuf>("config") {
+ None => PathBuf::from_str("config.toml")?,
+ Some(path) => path.to_owned()
+ };
+
+ tracing::info!("config file location: {config_file:?}");
+
+ let config = &Options::builder()
+ .file(config_file)
+ .env()
+ .load()?;
+
+ let client = Client::builder()
+ .homeserver_url(config.matrix_homeserver.to_owned())
+ .with_encryption_settings(EncryptionSettings {
+ auto_enable_cross_signing: true,
+ auto_enable_backups: true,
+ ..Default::default()
+ })
+ .build().await?;
+
+ let recovery = client.encryption().recovery();
+
+ client
+ .matrix_auth()
+ .login_username(&config.matrix_username, &config.matrix_password.expose_secret())
+ .initial_device_display_name(&config.matrix_device_name)
+ .await?;
+
+ if let Some(ref key) = config.matrix_recovery_key {
+ recovery.recover(key.expose_secret()).await.unwrap();
+ } else {
+ let key = recovery
+ .enable()
+ .wait_for_backups_to_upload()
+ .await
+ .unwrap();
+ tracing::warn!("recovery key should be set in config: {}", key);
+ }
+
+ tracing::info!("logged in as {}", config.matrix_username);
+
+
+ let configured_rooms =
+ matrix_bot::resolve_room_aliases(&client, &config.matrix_rooms)
+ .await;
+
+ tracing::info!("rooms configured by id {:?}", configured_rooms);
+
+ let rooms_to_notify = Arc::new(Mutex::new(Vec::new()));
+
+ {
+ let rooms_to_notify = rooms_to_notify.clone();
+ let configured_rooms = configured_rooms.clone();
+ client.add_event_handler(
+ |event, client, room|
+ matrix_bot::join_rooms_on_invite(event, client, room, configured_rooms, rooms_to_notify)
+ );
+ }
+
+ client.sync_once(SyncSettings::default()).await?;
+
+ matrix_bot::delete_other_devices(&client, &config.matrix_username, &config.matrix_password)
+ .await?;
+
+ matrix_bot::join_configured_rooms(&client, &configured_rooms).await?;
+
+ let rooms_we_have = client.joined_rooms()
+ .into_iter()
+ .filter(|room| configured_rooms.contains(&room.room_id().into()))
+ .collect::<Vec<_>>();
+ {
+ let mut lock = rooms_to_notify.lock().await;
+ for room in rooms_we_have {
+ lock.push(room)
+ }
+ }
+
+ tokio::spawn(subscriber_loop(config.clone(), rooms_to_notify));
+ client.sync(SyncSettings::default()).await?;
+
+ Ok(())
+}
+
+#[tracing::instrument(skip(config, rooms_to_notify))]
+async fn subscriber_loop(config: Options, rooms_to_notify: Arc<Mutex<Vec<Room>>>) {
+ loop {
+ tracing::info!("starting ntfy subscriber");
+ if let Err(e) = read_ntfy(&config, rooms_to_notify.clone()).await {
+ tracing::error!("error while handling ntfy events: {}", e);
+ }
+ }
+}
+
+async fn read_ntfy(config: &Options, rooms_to_notify: Arc<Mutex<Vec<Room>>>) -> anyhow::Result<()> {
+ let url = format!("{}/{}/json", config.ntfy_server, config.ntfy_topics.join(","));
+ let req = {
+ let mut req = reqwest::Client::new()
+ .get(url);
+ if let Some(ref token) = config.ntfy_token {
+ req = req.bearer_auth(token.expose_secret());
+ }
+ req.send().await?
+ };
+
+ if !req.status().is_success() {
+ tracing::error!("fatal: ntfy sent status {}", req.status());
+ exit(1);
+ }
+
+ let stream = req
+ .bytes_stream()
+ .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
+ let mut lines = tokio_util::io::StreamReader::new(stream).lines();
+
+ while let Ok(Some(line)) = lines.next_line().await {
+ tracing::debug!("received ntfy message {}", line);
+ match serde_json::from_str::<ntfy::Message>(&line) {
+ Ok(msg) if msg.event == ntfy::Event::Message => {
+ let title = match msg.title {
+ Some(ref title) => title,
+ _ => &msg.topic
+ };
+ let (level, color) = match msg.priority {
+ Some(..=2) => ("DEBUG", "white"),
+ Some(3) | None => ("INFO", "cyan"),
+ Some(4) => ("WARN", "yellow"),
+ Some(5..) => ("ERROR", "red")
+ };
+ // per spec, event=message implies this field exists
+ let body = msg.message.unwrap();
+
+ let event = RoomMessageEventContent::text_html(
+ format!("{level} {title}: {body}"),
+ format!("<font color=\"{color}\">{level} {title}:</font> {body}")
+ );
+ tracing::info!("sending event");
+
+ for room in rooms_to_notify.lock().await.iter() {
+ room.send(event.clone()).await?;
+ }
+ }
+ Ok(msg) => {
+ tracing::debug!("ignoring ntfy message of type {:?}", msg.event);
+ }
+ Err(e) => {
+ tracing::error!("could not parse ntfy message: {}: {}", e, line)
+ }
+ }
+ }
+
+ Ok(())
+}
diff --git a/src/matrix_bot.rs b/src/matrix_bot.rs
new file mode 100644
index 0000000..7618b0d
--- /dev/null
+++ b/src/matrix_bot.rs
@@ -0,0 +1,134 @@
+use std::{sync::Arc, ops::Deref};
+use tokio::sync::Mutex;
+
+use anyhow::Context;
+use matrix_sdk::{
+ ruma::{events::room::member::StrippedRoomMemberEvent, api::client::uiaa, OwnedRoomId, OwnedRoomOrAliasId}, Client, Room,
+};
+use secrecy::{Secret, ExposeSecret};
+use tokio::time::{sleep, Duration};
+
+#[tracing::instrument(skip(client, configured_rooms, room, state))]
+pub async fn join_rooms_on_invite(
+ event: StrippedRoomMemberEvent,
+ client: Client,
+ room: Room,
+ configured_rooms: Vec<OwnedRoomId>,
+ state: Arc<Mutex<Vec<Room>>>
+) {
+ if event.state_key == client.user_id().unwrap() {
+ let room_id = room.room_id().to_owned();
+ if configured_rooms.contains(&room.room_id().into()) {
+ tokio::spawn(async move {
+ join_room_with_cooldown(&room).await;
+ state.lock().await.push(room);
+ tracing::info!("Successfully joined room {room_id}");
+ });
+ }
+ else {
+ tracing::warn!("was invited to room {room_id} which is not configured");
+ if let Err(e) = room.leave().await {
+ tracing::error!("error leaving room {room_id}: {e}");
+ }
+ }
+ }
+}
+
+async fn join_room_with_cooldown(room: &Room) {
+ tracing::info!("Autojoining room {}", room.room_id());
+ let mut delay = 2;
+ let room_id = room.room_id().to_owned();
+
+ while let Err(err) = room.join().await {
+ // retry autojoin due to synapse sending invites, before the
+ // invited user can join for more information see
+ // https://github.com/matrix-org/synapse/issues/4345
+ tracing::error!("Failed to join room {room_id} ({err:?}), retrying in {delay}s");
+
+ sleep(Duration::from_secs(delay)).await;
+ delay *= 2;
+
+ if delay > 3600 {
+ eprintln!("Can't join room {} ({err:?})", room.room_id());
+ break;
+ }
+ }
+}
+
+pub async fn resolve_room_aliases(client: &Client, aliases: &Vec<OwnedRoomOrAliasId>) -> Vec<OwnedRoomId> {
+ futures_util::future::join_all (aliases
+ .iter()
+ .map(|name| async {match name.deref().try_into() {
+ Ok(alias) => Some(client.resolve_room_alias(alias).await.ok()?.room_id),
+ Err(id) => Some(id.into())
+ }}))
+ .await
+ .into_iter()
+ .filter_map(|o: Option<_>| o)
+ .collect()
+}
+
+#[tracing::instrument(skip(client))]
+pub async fn join_configured_rooms(client: &Client, configured_rooms: &Vec<OwnedRoomId>) -> anyhow::Result<()> {
+ let joined_rooms = client.joined_rooms();
+
+ let missing_rooms = configured_rooms
+ .iter()
+ .filter(|room_id| !joined_rooms.iter().any(|room| room.room_id() == *room_id))
+ .collect::<Vec<_>>();
+
+ let extra_rooms = joined_rooms
+ .iter()
+ .filter(|room| !configured_rooms.contains(&room.room_id().into()))
+ .collect::<Vec<_>>();
+
+ for room_id in missing_rooms {
+ tracing::info!("attempting to join room {room_id:?}");
+ if let Err(e) = client.join_room_by_id(room_id).await {
+ tracing::error!("could not join room {}", e)
+ }
+ }
+
+ for room in extra_rooms {
+ tracing::info!("leaving room {}", room.room_id());
+ if let Err(e) = room.leave().await {
+ tracing::error!("could not leave room {}", e)
+ }
+ }
+
+ Ok(())
+}
+
+#[tracing::instrument(skip(password))]
+pub async fn delete_other_devices(client: &Client, username: &str, password: &Secret<String>) -> anyhow::Result<()> {
+ let own_id = client
+ .device_id()
+ .with_context(|| "no own device id?")?;
+
+ let devices = client.devices().await?
+ .devices
+ .into_iter()
+ .map(|device| device.device_id)
+ .filter(|id| id != own_id)
+ .collect::<Vec<_>>();
+
+ tracing::info!("deleting devices {:?}", devices);
+
+ // deleting devices is a funny double-request thing since it requires the
+ // user's password; see the documentation on delete_devices()
+ if let Err(e) = client.delete_devices(&devices, None).await {
+ if let Some(info) = e.as_uiaa_response() {
+ let mut password = uiaa::Password::new(
+ uiaa::UserIdentifier::UserIdOrLocalpart(username.to_owned()),
+ password.expose_secret().to_owned(),
+ );
+ password.session = info.session.clone();
+
+ client
+ .delete_devices(&devices, Some(uiaa::AuthData::Password(password)))
+ .await?;
+ }
+ }
+
+ Ok(())
+}
diff --git a/src/ntfy.rs b/src/ntfy.rs
new file mode 100644
index 0000000..48e9f36
--- /dev/null
+++ b/src/ntfy.rs
@@ -0,0 +1,28 @@
+
+
+/// see https://docs.ntfy.sh/subscribe/api/#json-message-format
+#[derive(Debug, serde::Deserialize)]
+#[allow(unused)]
+pub struct Message {
+ pub id: String,
+ pub time: u64,
+ pub expires: Option<u64>,
+ pub event: Event,
+ pub topic: String,
+ pub message: Option<String>,
+ pub title: Option<String>,
+ pub tags: Option<Vec<String>>,
+ pub priority: Option<usize>,
+ pub click: Option<String>,
+ // omitted: actions, attachment
+}
+
+
+#[derive(Debug, serde::Deserialize, PartialEq, Eq)]
+#[serde(rename_all = "snake_case")]
+pub enum Event {
+ Open,
+ Keepalive,
+ Message,
+ PollRequest
+}