diff options
| author | stuebinm | 2024-03-27 23:10:02 +0100 | 
|---|---|---|
| committer | stuebinm | 2024-03-27 23:10:02 +0100 | 
| commit | bade89a506c380a7d4cab4fdd765e28686c14776 (patch) | |
| tree | 451bd9fdee32b03c159bcbff71699afaaef48341 /src | |
simple bot to play around with
Diffstat (limited to '')
| -rw-r--r-- | src/main.rs | 201 | ||||
| -rw-r--r-- | src/matrix_bot.rs | 134 | ||||
| -rw-r--r-- | src/ntfy.rs | 28 | 
3 files changed, 363 insertions, 0 deletions
| diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..ed9c8a7 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,201 @@ +use tokio::sync::Mutex; +use std::{process::exit, sync::Arc, path::PathBuf, str::FromStr}; +use matrix_sdk::{ +    config::SyncSettings, ruma::{events::room::message::RoomMessageEventContent, OwnedRoomOrAliasId}, Client, Room, encryption::EncryptionSettings, +}; +use secrecy::{Secret, ExposeSecret}; +use tokio::io::AsyncBufReadExt; +use futures_util::stream::TryStreamExt; +use confique::Config; +use clap::{arg, command, value_parser}; + +mod ntfy; +mod matrix_bot; + + +#[derive(serde::Deserialize, confique::Config, Debug, Clone)] +struct Options { +    #[config(env="BOT_MATRIX_HOMESERVER")] +    matrix_homeserver: String, +    #[config(env="BOT_MATRIX_USERNAME")] +    matrix_username: String, +    #[config(env="BOT_MATRIX_PASSWORD")] +    matrix_password: Secret<String>, +    #[config(env="BOT_MATRIX_RECOVERY_KEY")] +    matrix_recovery_key: Option<Secret<String>>, +    #[config(env="BOT_MATRIX_DEVICE_NAME", default="ntfy bot")] +    matrix_device_name: String, +    #[config(env="BOT_MATRIX_ROOMS")] +    matrix_rooms: Vec<OwnedRoomOrAliasId>, +    #[config(env="BOT_NTFY_SERVER")] +    ntfy_server: String, +    #[config(env="BOT_NTFY_TOPICS", parse_env=confique::env::parse::list_by_comma)] +    ntfy_topics: Vec<String>, +    #[config(env="BOT_NTFY_TOKEN")] +    ntfy_token: Option<Secret<String>> +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { +    tracing_subscriber::fmt::init(); + +    let matches = command!() +        .arg(arg!(-c --config <FILE> "config file location") +             .required(false) +             .value_parser(value_parser!(PathBuf)) +        ) +        .get_matches(); + +    let config_file = match matches.get_one::<PathBuf>("config") { +        None => PathBuf::from_str("config.toml")?, +        Some(path) => path.to_owned() +    }; + +    tracing::info!("config file location: {config_file:?}"); + +    let config = &Options::builder() +        .file(config_file) +        .env() +        .load()?; + +    let client = Client::builder() +        .homeserver_url(config.matrix_homeserver.to_owned()) +        .with_encryption_settings(EncryptionSettings { +            auto_enable_cross_signing: true, +            auto_enable_backups: true, +            ..Default::default() +        }) +        .build().await?; + +    let recovery = client.encryption().recovery(); + +    client +        .matrix_auth() +        .login_username(&config.matrix_username, &config.matrix_password.expose_secret()) +        .initial_device_display_name(&config.matrix_device_name) +        .await?; + +    if let Some(ref key) = config.matrix_recovery_key { +        recovery.recover(key.expose_secret()).await.unwrap(); +    } else { +        let key = recovery +            .enable() +            .wait_for_backups_to_upload() +            .await +            .unwrap(); +        tracing::warn!("recovery key should be set in config: {}", key); +    } + +    tracing::info!("logged in as {}", config.matrix_username); + + +    let configured_rooms = +        matrix_bot::resolve_room_aliases(&client, &config.matrix_rooms) +        .await; + +    tracing::info!("rooms configured by id {:?}", configured_rooms); + +    let rooms_to_notify = Arc::new(Mutex::new(Vec::new())); + +    { +        let rooms_to_notify = rooms_to_notify.clone(); +        let configured_rooms = configured_rooms.clone(); +        client.add_event_handler( +            |event, client, room| +            matrix_bot::join_rooms_on_invite(event, client, room, configured_rooms, rooms_to_notify) +        ); +    } + +    client.sync_once(SyncSettings::default()).await?; + +    matrix_bot::delete_other_devices(&client, &config.matrix_username, &config.matrix_password) +        .await?; + +    matrix_bot::join_configured_rooms(&client, &configured_rooms).await?; + +    let rooms_we_have = client.joined_rooms() +        .into_iter() +        .filter(|room| configured_rooms.contains(&room.room_id().into())) +        .collect::<Vec<_>>(); +    { +        let mut lock = rooms_to_notify.lock().await; +        for room in rooms_we_have { +            lock.push(room) +        } +    } + +    tokio::spawn(subscriber_loop(config.clone(), rooms_to_notify)); +    client.sync(SyncSettings::default()).await?; + +    Ok(()) +} + +#[tracing::instrument(skip(config, rooms_to_notify))] +async fn subscriber_loop(config: Options, rooms_to_notify: Arc<Mutex<Vec<Room>>>) { +   loop { +       tracing::info!("starting ntfy subscriber"); +       if let Err(e) = read_ntfy(&config, rooms_to_notify.clone()).await { +           tracing::error!("error while handling ntfy events: {}", e); +       } +   } +} + +async fn read_ntfy(config: &Options, rooms_to_notify: Arc<Mutex<Vec<Room>>>) -> anyhow::Result<()> { +    let url = format!("{}/{}/json", config.ntfy_server, config.ntfy_topics.join(",")); +    let req = { +        let mut req = reqwest::Client::new() +            .get(url); +        if let Some(ref token) = config.ntfy_token { +            req = req.bearer_auth(token.expose_secret()); +        } +        req.send().await? +    }; + +    if !req.status().is_success() { +        tracing::error!("fatal: ntfy sent status {}", req.status()); +        exit(1); +    } + +    let stream = req +        .bytes_stream() +        .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)); +    let mut lines = tokio_util::io::StreamReader::new(stream).lines(); + +    while let Ok(Some(line)) = lines.next_line().await { +        tracing::debug!("received ntfy message {}", line); +        match serde_json::from_str::<ntfy::Message>(&line) { +            Ok(msg) if msg.event == ntfy::Event::Message => { +                let title = match msg.title { +                    Some(ref title) => title, +                    _ => &msg.topic +                }; +                let (level, color) = match msg.priority { +                    Some(..=2) => ("DEBUG", "white"), +                    Some(3) | None => ("INFO", "cyan"), +                    Some(4) => ("WARN", "yellow"), +                    Some(5..) => ("ERROR", "red") +                }; +                // per spec, event=message implies this field exists +                let body = msg.message.unwrap(); + +                let event = RoomMessageEventContent::text_html( +                    format!("{level} {title}: {body}"), +                    format!("<font color=\"{color}\">{level} {title}:</font> {body}") +                ); +                tracing::info!("sending event"); + +                for room in rooms_to_notify.lock().await.iter() { +                    room.send(event.clone()).await?; +                } +            } +            Ok(msg) => { +                tracing::debug!("ignoring ntfy message of type {:?}", msg.event); +            } +            Err(e) => { +                tracing::error!("could not parse ntfy message: {}: {}", e, line) +            } +        } +    } + +    Ok(()) +} diff --git a/src/matrix_bot.rs b/src/matrix_bot.rs new file mode 100644 index 0000000..7618b0d --- /dev/null +++ b/src/matrix_bot.rs @@ -0,0 +1,134 @@ +use std::{sync::Arc, ops::Deref}; +use tokio::sync::Mutex; + +use anyhow::Context; +use matrix_sdk::{ +    ruma::{events::room::member::StrippedRoomMemberEvent, api::client::uiaa, OwnedRoomId, OwnedRoomOrAliasId}, Client, Room, +}; +use secrecy::{Secret, ExposeSecret}; +use tokio::time::{sleep, Duration}; + +#[tracing::instrument(skip(client, configured_rooms, room, state))] +pub async fn join_rooms_on_invite( +    event: StrippedRoomMemberEvent, +    client: Client, +    room: Room, +    configured_rooms: Vec<OwnedRoomId>, +    state: Arc<Mutex<Vec<Room>>> +) { +    if event.state_key == client.user_id().unwrap() { +        let room_id = room.room_id().to_owned(); +        if configured_rooms.contains(&room.room_id().into()) { +            tokio::spawn(async move { +                join_room_with_cooldown(&room).await; +                state.lock().await.push(room); +                tracing::info!("Successfully joined room {room_id}"); +            }); +        } +        else { +            tracing::warn!("was invited to room {room_id} which is not configured"); +            if let Err(e) = room.leave().await { +                tracing::error!("error leaving room {room_id}: {e}"); +            } +        } +    } +} + +async fn join_room_with_cooldown(room: &Room) { +    tracing::info!("Autojoining room {}", room.room_id()); +    let mut delay = 2; +    let room_id = room.room_id().to_owned(); + +    while let Err(err) = room.join().await { +        // retry autojoin due to synapse sending invites, before the +        // invited user can join for more information see +        // https://github.com/matrix-org/synapse/issues/4345 +        tracing::error!("Failed to join room {room_id} ({err:?}), retrying in {delay}s"); + +        sleep(Duration::from_secs(delay)).await; +        delay *= 2; + +        if delay > 3600 { +            eprintln!("Can't join room {} ({err:?})", room.room_id()); +            break; +        } +    } +} + +pub async fn resolve_room_aliases(client: &Client, aliases: &Vec<OwnedRoomOrAliasId>) -> Vec<OwnedRoomId> { +    futures_util::future::join_all (aliases +        .iter() +        .map(|name| async {match name.deref().try_into() { +            Ok(alias) => Some(client.resolve_room_alias(alias).await.ok()?.room_id), +            Err(id) => Some(id.into()) +        }})) +        .await +        .into_iter() +        .filter_map(|o: Option<_>| o) +        .collect() +} + +#[tracing::instrument(skip(client))] +pub async fn join_configured_rooms(client: &Client, configured_rooms: &Vec<OwnedRoomId>) -> anyhow::Result<()> { +    let joined_rooms = client.joined_rooms(); + +    let missing_rooms = configured_rooms +        .iter() +        .filter(|room_id| !joined_rooms.iter().any(|room| room.room_id() == *room_id)) +        .collect::<Vec<_>>(); + +    let extra_rooms = joined_rooms +        .iter() +        .filter(|room| !configured_rooms.contains(&room.room_id().into())) +        .collect::<Vec<_>>(); + +    for room_id in missing_rooms { +        tracing::info!("attempting to join room {room_id:?}"); +        if let Err(e) = client.join_room_by_id(room_id).await { +            tracing::error!("could not join room {}", e) +        } +    } + +    for room in extra_rooms { +        tracing::info!("leaving room {}", room.room_id()); +        if let Err(e) = room.leave().await { +            tracing::error!("could not leave room {}", e) +        } +    } + +    Ok(()) +} + +#[tracing::instrument(skip(password))] +pub async fn delete_other_devices(client: &Client, username: &str, password: &Secret<String>) -> anyhow::Result<()> { +    let own_id = client +        .device_id() +        .with_context(|| "no own device id?")?; + +    let devices = client.devices().await? +        .devices +        .into_iter() +        .map(|device| device.device_id) +        .filter(|id| id != own_id) +        .collect::<Vec<_>>(); + +    tracing::info!("deleting devices {:?}", devices); + +    // deleting devices is a funny double-request thing since it requires the +    // user's password; see the documentation on delete_devices() +    if let Err(e) = client.delete_devices(&devices, None).await { +        if let Some(info) = e.as_uiaa_response() { +            let mut password = uiaa::Password::new( +                uiaa::UserIdentifier::UserIdOrLocalpart(username.to_owned()), +                password.expose_secret().to_owned(), +            ); +            password.session = info.session.clone(); + +            client +                .delete_devices(&devices, Some(uiaa::AuthData::Password(password))) +                .await?; +        } +    } + +    Ok(()) +} diff --git a/src/ntfy.rs b/src/ntfy.rs new file mode 100644 index 0000000..48e9f36 --- /dev/null +++ b/src/ntfy.rs @@ -0,0 +1,28 @@ + + +/// see https://docs.ntfy.sh/subscribe/api/#json-message-format +#[derive(Debug, serde::Deserialize)] +#[allow(unused)] +pub struct Message { +    pub id: String, +    pub time: u64, +    pub expires: Option<u64>, +    pub event: Event, +    pub topic: String, +    pub message: Option<String>, +    pub title: Option<String>, +    pub tags: Option<Vec<String>>, +    pub priority: Option<usize>, +    pub click: Option<String>, +    // omitted: actions, attachment +} + + +#[derive(Debug, serde::Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum Event { +    Open, +    Keepalive, +    Message, +    PollRequest +} | 
