//! # Async Matrix Client Worker //! //! The worker thread handles asynchronous work, and can receive messages from the main thread that //! block on a reply from the async worker. use std::collections::HashMap; use std::convert::TryFrom; use std::fmt::{Debug, Formatter}; use std::ops::Deref; use std::str::FromStr; use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; use std::sync::Arc; use std::time::{Duration, Instant}; use futures::{stream::FuturesUnordered, StreamExt}; use gethostname::gethostname; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::task::JoinHandle; use tracing::{error, warn}; use url::Url; use matrix_sdk::{ config::{RequestConfig, SyncSettings}, encryption::verification::{SasVerification, Verification}, encryption::{BackupDownloadStrategy, EncryptionSettings}, event_handler::Ctx, matrix_auth::MatrixSession, reqwest, room::{Messages, MessagesOptions, Room as MatrixRoom, RoomMember}, ruma::{ api::client::{ filter::{FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter}, room::create_room::v3::{CreationContent, Request as CreateRoomRequest, RoomPreset}, room::Visibility, space::get_hierarchy::v1::Request as SpaceHierarchyRequest, }, assign, events::{ key::verification::{ done::{OriginalSyncKeyVerificationDoneEvent, ToDeviceKeyVerificationDoneEvent}, key::{OriginalSyncKeyVerificationKeyEvent, ToDeviceKeyVerificationKeyEvent}, request::ToDeviceKeyVerificationRequestEvent, start::{OriginalSyncKeyVerificationStartEvent, ToDeviceKeyVerificationStartEvent}, VerificationMethod, }, presence::PresenceEvent, reaction::ReactionEventContent, receipt::ReceiptType, receipt::{ReceiptEventContent, ReceiptThread}, room::{ encryption::RoomEncryptionEventContent, member::OriginalSyncRoomMemberEvent, message::{MessageType, RoomMessageEventContent}, name::RoomNameEventContent, redaction::{OriginalSyncRoomRedactionEvent, SyncRoomRedactionEvent}, }, tag::Tags, typing::SyncTypingEvent, AnyInitialStateEvent, AnyMessageLikeEvent, AnyTimelineEvent, EmptyStateKey, InitialStateEvent, SyncEphemeralRoomEvent, SyncMessageLikeEvent, SyncStateEvent, }, room::RoomType, serde::Raw, EventEncryptionAlgorithm, EventId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedUserId, RoomId, RoomVersionId, }, Client, ClientBuildError, DisplayName, RoomMemberships, }; use modalkit::errors::UIError; use modalkit::prelude::{EditInfo, InfoMessage}; use crate::base::Need; use crate::{ base::{ AsyncProgramStore, ChatStore, CreateRoomFlags, CreateRoomType, EventLocation, IambError, IambResult, RoomFetchStatus, RoomInfo, VerifyAction, }, message::MessageFetchResult, ApplicationSettings, }; const DEFAULT_ENCRYPTION_SETTINGS: EncryptionSettings = EncryptionSettings { auto_enable_cross_signing: true, auto_enable_backups: true, backup_download_strategy: BackupDownloadStrategy::AfterDecryptionFailure, }; const IAMB_DEVICE_NAME: &str = "iamb"; const IAMB_USER_AGENT: &str = "iamb"; const MIN_MSG_LOAD: u32 = 50; fn initial_devname() -> String { format!("{} on {}", IAMB_DEVICE_NAME, gethostname().to_string_lossy()) } async fn is_direct(room: &MatrixRoom) -> bool { room.deref().is_direct().await.unwrap_or_default() } pub async fn create_room( client: &Client, room_alias_name: Option, rt: CreateRoomType, flags: CreateRoomFlags, ) -> IambResult { let mut creation_content = None; let mut initial_state = vec![]; let mut is_direct = false; let mut preset = None; let mut invite = vec![]; let visibility = if flags.contains(CreateRoomFlags::PUBLIC) { Visibility::Public } else { Visibility::Private }; match rt { CreateRoomType::Direct(user) => { invite.push(user); is_direct = true; preset = Some(RoomPreset::TrustedPrivateChat); }, CreateRoomType::Space => { let mut cc = CreationContent::new(); cc.room_type = Some(RoomType::Space); let raw_cc = Raw::new(&cc).map_err(IambError::from)?; creation_content = Some(raw_cc); }, CreateRoomType::Room => {}, } // Set up encryption. if flags.contains(CreateRoomFlags::ENCRYPTED) { // XXX: Once matrix-sdk uses ruma 0.8, then this can skip the cast. let algo = EventEncryptionAlgorithm::MegolmV1AesSha2; let content = RoomEncryptionEventContent::new(algo); let encr = InitialStateEvent { content, state_key: EmptyStateKey }; let encr_raw = Raw::new(&encr).map_err(IambError::from)?; let encr_raw = encr_raw.cast::(); initial_state.push(encr_raw); } let request = assign!(CreateRoomRequest::new(), { room_alias_name, creation_content, initial_state, invite, is_direct, visibility, preset, }); let resp = client.create_room(request).await.map_err(IambError::from)?; if is_direct { if let Some(room) = client.get_room(resp.room_id()) { room.set_is_direct(true).await.map_err(IambError::from)?; } else { error!( room_id = resp.room_id().as_str(), "Couldn't set is_direct for new direct message room" ); } } return Ok(resp.room_id().to_owned()); } async fn update_event_receipts(info: &mut RoomInfo, room: &MatrixRoom, event_id: &EventId) { let receipts = match room .load_event_receipts(ReceiptType::Read, ReceiptThread::Main, event_id) .await { Ok(receipts) => receipts, Err(e) => { tracing::warn!(?event_id, "failed to get event receipts: {e}"); return; }, }; for (user_id, _) in receipts { info.set_receipt(user_id, event_id.to_owned()); } } #[derive(Debug)] enum Plan { Messages(OwnedRoomId, Option), Members(OwnedRoomId), } async fn load_plans(store: &AsyncProgramStore) -> Vec { let mut locked = store.lock().await; let ChatStore { need_load, rooms, .. } = &mut locked.application; let mut plan = vec![]; for (room_id, mut need) in std::mem::take(need_load).into_iter() { if need.contains(Need::MESSAGES) { let info = rooms.get_or_default(room_id.clone()); if !info.recently_fetched() && !info.fetching { info.fetch_last = Instant::now().into(); info.fetching = true; let fetch_id = match &info.fetch_id { RoomFetchStatus::Done => continue, RoomFetchStatus::HaveMore(fetch_id) => Some(fetch_id.clone()), RoomFetchStatus::NotStarted => None, }; plan.push(Plan::Messages(room_id.to_owned(), fetch_id)); need.remove(Need::MESSAGES); } } if need.contains(Need::MEMBERS) { plan.push(Plan::Members(room_id.to_owned())); need.remove(Need::MEMBERS); } if !need.is_empty() { need_load.insert(room_id, need); } } return plan; } async fn run_plan(client: &Client, store: &AsyncProgramStore, plan: Plan) { match plan { Plan::Messages(room_id, fetch_id) => { let limit = MIN_MSG_LOAD; let client = client.clone(); let store = store.clone(); let res = load_older_one(&client, &room_id, fetch_id, limit).await; load_insert(room_id, res, store).await; }, Plan::Members(room_id) => { let res = members_load(client, &room_id).await; members_insert(room_id, res, store).await }, } } async fn load_older_one( client: &Client, room_id: &RoomId, fetch_id: Option, limit: u32, ) -> MessageFetchResult { if let Some(room) = client.get_room(room_id) { let mut opts = match &fetch_id { Some(id) => MessagesOptions::backward().from(id.as_str()), None => MessagesOptions::backward(), }; opts.limit = limit.into(); let Messages { end, chunk, .. } = room.messages(opts).await.map_err(IambError::from)?; let msgs = chunk.into_iter().filter_map(|ev| { match ev.event.deserialize() { Ok(AnyTimelineEvent::MessageLike(msg)) => Some(msg), Ok(AnyTimelineEvent::State(_)) => None, Err(_) => None, } }); Ok((end, msgs.collect())) } else { Err(IambError::UnknownRoom(room_id.to_owned()).into()) } } async fn load_insert(room_id: OwnedRoomId, res: MessageFetchResult, store: AsyncProgramStore) { let mut locked = store.lock().await; let ChatStore { presences, rooms, worker, picker, settings, .. } = &mut locked.application; let info = rooms.get_or_default(room_id.clone()); info.fetching = false; let client = &worker.client; match res { Ok((fetch_id, msgs)) => { for msg in msgs.into_iter() { let sender = msg.sender().to_owned(); let _ = presences.get_or_default(sender); if let Some(room) = client.get_room(&room_id) { update_event_receipts(info, &room, msg.event_id()).await; } match msg { AnyMessageLikeEvent::RoomEncrypted(msg) => { info.insert_encrypted(msg); }, AnyMessageLikeEvent::RoomMessage(msg) => { info.insert_with_preview( room_id.clone(), store.clone(), *picker, msg, settings, client.media(), ); }, AnyMessageLikeEvent::Reaction(ev) => { info.insert_reaction(ev); }, _ => continue, } } info.fetch_id = fetch_id.map_or(RoomFetchStatus::Done, RoomFetchStatus::HaveMore); }, Err(e) => { warn!(room_id = room_id.as_str(), err = e.to_string(), "Failed to load older messages"); // Wait and try again. locked.application.need_load.insert(room_id, Need::MESSAGES); }, } } async fn load_older(client: &Client, store: &AsyncProgramStore) -> usize { // Plans are run in parallel. Any room *may* have several plans. load_plans(store) .await .into_iter() .map(|plan| run_plan(client, store, plan)) .collect::>() .count() .await } async fn members_load(client: &Client, room_id: &RoomId) -> IambResult> { if let Some(room) = client.get_room(room_id) { Ok(room .members_no_sync(RoomMemberships::all()) .await .map_err(IambError::from)?) } else { Err(IambError::UnknownRoom(room_id.to_owned()).into()) } } async fn members_insert( room_id: OwnedRoomId, res: IambResult>, store: &AsyncProgramStore, ) { if let Ok(members) = res { let mut locked = store.lock().await; let ChatStore { rooms, .. } = &mut locked.application; let info = rooms.get_or_default(room_id.clone()); for member in members { let user_id = member.user_id(); let display_name = member.display_name().map_or(user_id.to_string(), |str| str.to_string()); info.display_names.insert(user_id.to_owned(), display_name); } } // else ??? } async fn load_older_forever(client: &Client, store: &AsyncProgramStore) { // Load any pending older messages or members every 2 seconds. let mut interval = tokio::time::interval(Duration::from_secs(2)); loop { interval.tick().await; load_older(client, store).await; } } async fn refresh_rooms(client: &Client, store: &AsyncProgramStore) { let mut names = vec![]; let mut spaces = vec![]; let mut rooms = vec![]; let mut dms = vec![]; for room in client.invited_rooms().into_iter() { let name = room.display_name().await.unwrap_or(DisplayName::Empty).to_string(); let tags = room.tags().await.unwrap_or_default(); names.push((room.room_id().to_owned(), name)); if is_direct(&room).await { dms.push(Arc::new((room, tags))); } else if room.is_space() { spaces.push(Arc::new((room, tags))); } else { rooms.push(Arc::new((room, tags))); } } for room in client.joined_rooms().into_iter() { let name = room.display_name().await.unwrap_or(DisplayName::Empty).to_string(); let tags = room.tags().await.unwrap_or_default(); names.push((room.room_id().to_owned(), name)); if is_direct(&room).await { dms.push(Arc::new((room, tags))); } else if room.is_space() { spaces.push(Arc::new((room, tags))); } else { rooms.push(Arc::new((room, tags))); } } let mut locked = store.lock().await; locked.application.sync_info.spaces = spaces; locked.application.sync_info.rooms = rooms; locked.application.sync_info.dms = dms; for (room_id, name) in names { locked.application.set_room_name(&room_id, &name); } } async fn refresh_rooms_forever(client: &Client, store: &AsyncProgramStore) { let mut interval = tokio::time::interval(Duration::from_secs(5)); loop { refresh_rooms(client, store).await; interval.tick().await; } } async fn send_receipts_forever(client: &Client, store: &AsyncProgramStore) { let mut interval = tokio::time::interval(Duration::from_secs(2)); let mut sent = HashMap::::default(); loop { interval.tick().await; let locked = store.lock().await; let user_id = &locked.application.settings.profile.user_id; let updates = client .joined_rooms() .into_iter() .filter_map(|room| { let room_id = room.room_id().to_owned(); let info = locked.application.rooms.get(&room_id)?; let new_receipt = info.get_receipt(user_id)?; let old_receipt = sent.get(&room_id); if Some(new_receipt) != old_receipt { Some((room_id, new_receipt.clone())) } else { None } }) .collect::>(); drop(locked); for (room_id, new_receipt) in updates { use matrix_sdk::ruma::api::client::receipt::create_receipt::v3::ReceiptType; let Some(room) = client.get_room(&room_id) else { continue; }; match room .send_single_receipt( ReceiptType::Read, ReceiptThread::Unthreaded, new_receipt.clone(), ) .await { Ok(()) => { sent.insert(room_id, new_receipt); }, Err(e) => tracing::warn!(?room_id, "Failed to set read receipt: {e}"), } } } } pub async fn do_first_sync(client: &Client, store: &AsyncProgramStore) { // Perform an initial, lazily-loaded sync. let mut room = RoomEventFilter::default(); room.lazy_load_options = LazyLoadOptions::Enabled { include_redundant_members: false }; let mut room_ev = RoomFilter::default(); room_ev.state = room; let mut filter = FilterDefinition::default(); filter.room = room_ev; let settings = SyncSettings::new().filter(filter.into()); if let Err(e) = client.sync_once(settings).await { tracing::error!(err = e.to_string(), "Failed to perform initial sync; will retry later"); return; } // Populate sync_info with our initial set of rooms/dms/spaces. refresh_rooms(client, store).await; // Insert Need::Messages to fetch accurate recent timestamps in the background. let mut locked = store.lock().await; let ChatStore { sync_info, need_load, .. } = &mut locked.application; for room in sync_info.rooms.iter() { let room_id = room.as_ref().0.room_id().to_owned(); need_load.insert(room_id, Need::MESSAGES); } for room in sync_info.dms.iter() { let room_id = room.as_ref().0.room_id().to_owned(); need_load.insert(room_id, Need::MESSAGES); } } #[derive(Debug)] pub enum LoginStyle { SessionRestore(MatrixSession), Password(String), SingleSignOn, } pub struct ClientResponse(Receiver); pub struct ClientReply(SyncSender); impl ClientResponse { fn recv(self) -> T { self.0.recv().expect("failed to receive response from client thread") } } impl ClientReply { fn send(self, t: T) { self.0.send(t).unwrap(); } } fn oneshot() -> (ClientReply, ClientResponse) { let (tx, rx) = sync_channel(1); let reply = ClientReply(tx); let response = ClientResponse(rx); return (reply, response); } pub type FetchedRoom = (MatrixRoom, DisplayName, Option); pub enum WorkerTask { Init(AsyncProgramStore, ClientReply<()>), Login(LoginStyle, ClientReply>), Logout(String, ClientReply>), GetInviter(MatrixRoom, ClientReply>>), GetRoom(OwnedRoomId, ClientReply>), JoinRoom(String, ClientReply>), Members(OwnedRoomId, ClientReply>>), SpaceMembers(OwnedRoomId, ClientReply>>), TypingNotice(OwnedRoomId), Verify(VerifyAction, SasVerification, ClientReply>), VerifyRequest(OwnedUserId, ClientReply>), } impl Debug for WorkerTask { fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> { match self { WorkerTask::Init(_, _) => { f.debug_tuple("WorkerTask::Init") .field(&format_args!("_")) .field(&format_args!("_")) .finish() }, WorkerTask::Login(style, _) => { f.debug_tuple("WorkerTask::Login") .field(style) .field(&format_args!("_")) .finish() }, WorkerTask::Logout(user_id, _) => { f.debug_tuple("WorkerTask::Logout").field(user_id).finish() }, WorkerTask::GetInviter(invite, _) => { f.debug_tuple("WorkerTask::GetInviter").field(invite).finish() }, WorkerTask::GetRoom(room_id, _) => { f.debug_tuple("WorkerTask::GetRoom") .field(room_id) .field(&format_args!("_")) .finish() }, WorkerTask::JoinRoom(s, _) => { f.debug_tuple("WorkerTask::JoinRoom") .field(s) .field(&format_args!("_")) .finish() }, WorkerTask::Members(room_id, _) => { f.debug_tuple("WorkerTask::Members") .field(room_id) .field(&format_args!("_")) .finish() }, WorkerTask::SpaceMembers(room_id, _) => { f.debug_tuple("WorkerTask::SpaceMembers") .field(room_id) .field(&format_args!("_")) .finish() }, WorkerTask::TypingNotice(room_id) => { f.debug_tuple("WorkerTask::TypingNotice").field(room_id).finish() }, WorkerTask::Verify(act, sasv1, _) => { f.debug_tuple("WorkerTask::Verify") .field(act) .field(sasv1) .field(&format_args!("_")) .finish() }, WorkerTask::VerifyRequest(user_id, _) => { f.debug_tuple("WorkerTask::VerifyRequest") .field(user_id) .field(&format_args!("_")) .finish() }, } } } async fn create_client_inner( homeserver: &Option, settings: &ApplicationSettings, ) -> Result { let req_timeout = Duration::from_secs(settings.tunables.request_timeout); // Set up the HTTP client. let http = reqwest::Client::builder() .user_agent(IAMB_USER_AGENT) .timeout(req_timeout) .pool_idle_timeout(Duration::from_secs(60)) .pool_max_idle_per_host(10) .tcp_keepalive(Duration::from_secs(10)) .build() .unwrap(); let req_config = RequestConfig::new().timeout(req_timeout).retry_timeout(req_timeout); // Set up the Matrix client for the selected profile. let builder = Client::builder() .http_client(http) .sqlite_store(settings.sqlite_dir.as_path(), None) .request_config(req_config) .with_encryption_settings(DEFAULT_ENCRYPTION_SETTINGS.clone()); let builder = if let Some(url) = homeserver { // Use the explicitly specified homeserver. builder.homeserver_url(url.as_str()) } else { // Try to discover the homeserver from the user ID. let account = &settings.profile; builder.server_name(account.user_id.server_name()) }; builder.build().await } pub async fn create_client(settings: &ApplicationSettings) -> Client { let account = &settings.profile; let res = match create_client_inner(&account.url, settings).await { Err(ClientBuildError::AutoDiscovery(_)) => { let url = format!("https://{}/", account.user_id.server_name().as_str()); let url = Url::parse(&url).unwrap(); create_client_inner(&Some(url), settings).await }, res => res, }; res.expect("Failed to instantiate client") } #[derive(Clone)] pub struct Requester { pub client: Client, pub tx: UnboundedSender, } impl Requester { pub fn init(&self, store: AsyncProgramStore) { let (reply, response) = oneshot(); self.tx.send(WorkerTask::Init(store, reply)).unwrap(); return response.recv(); } pub fn login(&self, style: LoginStyle) -> IambResult { let (reply, response) = oneshot(); self.tx.send(WorkerTask::Login(style, reply)).unwrap(); return response.recv(); } pub fn logout(&self, user_id: String) -> IambResult { let (reply, response) = oneshot(); self.tx.send(WorkerTask::Logout(user_id, reply)).unwrap(); return response.recv(); } pub fn get_inviter(&self, invite: MatrixRoom) -> IambResult> { let (reply, response) = oneshot(); self.tx.send(WorkerTask::GetInviter(invite, reply)).unwrap(); return response.recv(); } pub fn get_room(&self, room_id: OwnedRoomId) -> IambResult { let (reply, response) = oneshot(); self.tx.send(WorkerTask::GetRoom(room_id, reply)).unwrap(); return response.recv(); } pub fn join_room(&self, name: String) -> IambResult { let (reply, response) = oneshot(); self.tx.send(WorkerTask::JoinRoom(name, reply)).unwrap(); return response.recv(); } pub fn members(&self, room_id: OwnedRoomId) -> IambResult> { let (reply, response) = oneshot(); self.tx.send(WorkerTask::Members(room_id, reply)).unwrap(); return response.recv(); } pub fn space_members(&self, space: OwnedRoomId) -> IambResult> { let (reply, response) = oneshot(); self.tx.send(WorkerTask::SpaceMembers(space, reply)).unwrap(); return response.recv(); } pub fn typing_notice(&self, room_id: OwnedRoomId) { self.tx.send(WorkerTask::TypingNotice(room_id)).unwrap(); } pub fn verify(&self, act: VerifyAction, sas: SasVerification) -> IambResult { let (reply, response) = oneshot(); self.tx.send(WorkerTask::Verify(act, sas, reply)).unwrap(); return response.recv(); } pub fn verify_request(&self, user_id: OwnedUserId) -> IambResult { let (reply, response) = oneshot(); self.tx.send(WorkerTask::VerifyRequest(user_id, reply)).unwrap(); return response.recv(); } } pub struct ClientWorker { initialized: bool, settings: ApplicationSettings, client: Client, load_handle: Option>, sync_handle: Option>, } impl ClientWorker { pub async fn spawn(client: Client, settings: ApplicationSettings) -> Requester { let (tx, rx) = unbounded_channel(); let mut worker = ClientWorker { initialized: false, settings, client: client.clone(), load_handle: None, sync_handle: None, }; tokio::spawn(async move { worker.work(rx).await; }); return Requester { client, tx }; } async fn work(&mut self, mut rx: UnboundedReceiver) { loop { let t = rx.recv().await; match t { Some(task) => self.run(task).await, None => { break; }, } } if let Some(handle) = self.sync_handle.take() { handle.abort(); } } async fn run(&mut self, task: WorkerTask) { match task { WorkerTask::Init(store, reply) => { assert_eq!(self.initialized, false); self.init(store).await; reply.send(()); }, WorkerTask::JoinRoom(room_id, reply) => { assert!(self.initialized); reply.send(self.join_room(room_id).await); }, WorkerTask::GetInviter(invited, reply) => { assert!(self.initialized); reply.send(self.get_inviter(invited).await); }, WorkerTask::GetRoom(room_id, reply) => { assert!(self.initialized); reply.send(self.get_room(room_id).await); }, WorkerTask::Login(style, reply) => { assert!(self.initialized); reply.send(self.login_and_sync(style).await); }, WorkerTask::Logout(user_id, reply) => { assert!(self.initialized); reply.send(self.logout(user_id).await); }, WorkerTask::Members(room_id, reply) => { assert!(self.initialized); reply.send(self.members(room_id).await); }, WorkerTask::SpaceMembers(space, reply) => { assert!(self.initialized); reply.send(self.space_members(space).await); }, WorkerTask::TypingNotice(room_id) => { assert!(self.initialized); self.typing_notice(room_id).await; }, WorkerTask::Verify(act, sas, reply) => { assert!(self.initialized); reply.send(self.verify(act, sas).await); }, WorkerTask::VerifyRequest(user_id, reply) => { assert!(self.initialized); reply.send(self.verify_request(user_id).await); }, } } async fn init(&mut self, store: AsyncProgramStore) { self.client.add_event_handler_context(store.clone()); let _ = self.client.add_event_handler( |ev: SyncTypingEvent, room: MatrixRoom, store: Ctx| { async move { let room_id = room.room_id().to_owned(); let mut locked = store.lock().await; let users = ev .content .user_ids .into_iter() .filter(|u| u != &locked.application.settings.profile.user_id) .collect(); locked.application.get_room_info(room_id).set_typing(users); } }, ); let _ = self.client .add_event_handler(|ev: PresenceEvent, store: Ctx| { async move { let mut locked = store.lock().await; locked.application.presences.insert(ev.sender, ev.content.presence); } }); let _ = self.client.add_event_handler( |ev: SyncStateEvent, room: MatrixRoom, store: Ctx| { async move { if let SyncStateEvent::Original(ev) = ev { let room_id = room.room_id().to_owned(); let room_name = Some(ev.content.name); let mut locked = store.lock().await; let info = locked.application.rooms.get_or_default(room_id.clone()); info.name = room_name; } } }, ); let _ = self.client.add_event_handler( |ev: SyncMessageLikeEvent, room: MatrixRoom, client: Client, store: Ctx| { async move { let room_id = room.room_id(); if let Some(msg) = ev.as_original() { if let MessageType::VerificationRequest(_) = msg.content.msgtype { if let Some(request) = client .encryption() .get_verification_request(ev.sender(), ev.event_id()) .await { request.accept().await.expect("Failed to accept request"); } } } let mut locked = store.lock().await; let sender = ev.sender().to_owned(); let _ = locked.application.presences.get_or_default(sender); let ChatStore { rooms, picker, settings, .. } = &mut locked.application; let info = rooms.get_or_default(room_id.to_owned()); update_event_receipts(info, &room, ev.event_id()).await; let full_ev = ev.into_full_event(room_id.to_owned()); info.insert_with_preview( room_id.to_owned(), store.clone(), *picker, full_ev, settings, client.media(), ); } }, ); let _ = self.client.add_event_handler( |ev: SyncMessageLikeEvent, room: MatrixRoom, store: Ctx| { async move { let room_id = room.room_id(); let mut locked = store.lock().await; let sender = ev.sender().to_owned(); let _ = locked.application.presences.get_or_default(sender); let info = locked.application.get_room_info(room_id.to_owned()); update_event_receipts(info, &room, ev.event_id()).await; info.insert_reaction(ev.into_full_event(room_id.to_owned())); } }, ); let _ = self.client.add_event_handler( |ev: SyncEphemeralRoomEvent, room: MatrixRoom, store: Ctx| { async move { let room_id = room.room_id(); let mut locked = store.lock().await; let info = locked.application.get_room_info(room_id.to_owned()); for (event_id, receipts) in ev.content.0.into_iter() { let Some(receipts) = receipts.get(&ReceiptType::Read) else { continue; }; for user_id in receipts.keys() { info.set_receipt(user_id.to_owned(), event_id.clone()); } } } }, ); let _ = self.client.add_event_handler( |ev: OriginalSyncRoomRedactionEvent, room: MatrixRoom, store: Ctx| { async move { let room_id = room.room_id(); let room_info = room.clone_info(); let room_version = room_info.room_version().unwrap_or(&RoomVersionId::V1); let mut locked = store.lock().await; let info = locked.application.get_room_info(room_id.to_owned()); let Some(redacts) = &ev.redacts else { return; }; match info.keys.get(redacts) { None => return, Some(EventLocation::Message(key)) => { if let Some(msg) = info.messages.get_mut(key) { let ev = SyncRoomRedactionEvent::Original(ev); msg.redact(ev, room_version); } }, Some(EventLocation::Reaction(event_id)) => { if let Some(reactions) = info.reactions.get_mut(event_id) { reactions.remove(redacts); } info.keys.remove(redacts); }, } } }, ); let _ = self.client.add_event_handler( |ev: OriginalSyncRoomMemberEvent, room: MatrixRoom, client: Client, store: Ctx| { async move { let room_id = room.room_id(); let user_id = ev.state_key; let ambiguous_name = ev.content.displayname.as_deref().unwrap_or_else(|| user_id.localpart()); let ambiguous = client .store() .get_users_with_display_name(room_id, ambiguous_name) .await .map(|users| users.len() > 1) .unwrap_or_default(); let mut locked = store.lock().await; let info = locked.application.get_room_info(room_id.to_owned()); if ambiguous { info.display_names.remove(&user_id); } else if let Some(display) = ev.content.displayname { info.display_names.insert(user_id, display); } else { info.display_names.remove(&user_id); } } }, ); let _ = self.client.add_event_handler( |ev: OriginalSyncKeyVerificationStartEvent, client: Client, store: Ctx| { async move { let tx_id = ev.content.relates_to.event_id.as_ref(); if let Some(Verification::SasV1(sas)) = client.encryption().get_verification(&ev.sender, tx_id).await { sas.accept().await.unwrap(); store.lock().await.application.insert_sas(sas) } } }, ); let _ = self.client.add_event_handler( |ev: OriginalSyncKeyVerificationKeyEvent, client: Client, store: Ctx| { async move { let tx_id = ev.content.relates_to.event_id.as_ref(); if let Some(Verification::SasV1(sas)) = client.encryption().get_verification(&ev.sender, tx_id).await { store.lock().await.application.insert_sas(sas); } } }, ); let _ = self.client.add_event_handler( |ev: OriginalSyncKeyVerificationDoneEvent, client: Client, store: Ctx| { async move { let tx_id = ev.content.relates_to.event_id.as_ref(); if let Some(Verification::SasV1(sas)) = client.encryption().get_verification(&ev.sender, tx_id).await { store.lock().await.application.insert_sas(sas); } } }, ); let _ = self.client.add_event_handler( |ev: ToDeviceKeyVerificationRequestEvent, client: Client| { async move { let request = client .encryption() .get_verification_request(&ev.sender, &ev.content.transaction_id) .await; if let Some(request) = request { request.accept().await.unwrap(); } } }, ); let _ = self.client.add_event_handler( |ev: ToDeviceKeyVerificationStartEvent, client: Client, store: Ctx| { async move { let tx_id = ev.content.transaction_id; if let Some(Verification::SasV1(sas)) = client.encryption().get_verification(&ev.sender, tx_id.as_ref()).await { sas.accept().await.unwrap(); store.lock().await.application.insert_sas(sas); } } }, ); let _ = self.client.add_event_handler( |ev: ToDeviceKeyVerificationKeyEvent, client: Client, store: Ctx| { async move { let tx_id = ev.content.transaction_id; if let Some(Verification::SasV1(sas)) = client.encryption().get_verification(&ev.sender, tx_id.as_ref()).await { store.lock().await.application.insert_sas(sas); } } }, ); let _ = self.client.add_event_handler( |ev: ToDeviceKeyVerificationDoneEvent, client: Client, store: Ctx| { async move { let tx_id = ev.content.transaction_id; if let Some(Verification::SasV1(sas)) = client.encryption().get_verification(&ev.sender, tx_id.as_ref()).await { store.lock().await.application.insert_sas(sas); } } }, ); self.load_handle = tokio::spawn({ let client = self.client.clone(); async move { let load = load_older_forever(&client, &store); let rcpt = send_receipts_forever(&client, &store); let room = refresh_rooms_forever(&client, &store); let ((), (), ()) = tokio::join!(load, rcpt, room); } }) .into(); self.initialized = true; } async fn login_and_sync(&mut self, style: LoginStyle) -> IambResult { let client = self.client.clone(); match style { LoginStyle::SessionRestore(session) => { client.restore_session(session).await.map_err(IambError::from)?; }, LoginStyle::Password(password) => { let resp = client .matrix_auth() .login_username(&self.settings.profile.user_id, &password) .initial_device_display_name(initial_devname().as_str()) .send() .await .map_err(IambError::from)?; let session = MatrixSession::from(&resp); self.settings.write_session(session)?; }, LoginStyle::SingleSignOn => { let resp = client .matrix_auth() .login_sso(|url| { let opened = format!( "The following URL should have been opened in your browser:\n {url}" ); async move { tokio::task::spawn_blocking(move || open::that(url)); println!("\n{opened}\n"); Ok(()) } }) .initial_device_display_name(initial_devname().as_str()) .send() .await .map_err(IambError::from)?; let session = MatrixSession::from(&resp); self.settings.write_session(session)?; }, } self.sync_handle = tokio::spawn(async move { loop { let settings = SyncSettings::default(); let _ = client.sync(settings).await; } }) .into(); Ok(Some(InfoMessage::from("* Successfully logged in!"))) } async fn logout(&mut self, user_id: String) -> IambResult { // Verify that the user is logging out of the correct profile. let curr = self.settings.profile.user_id.as_ref(); if user_id != curr { let msg = format!("Incorrect user ID (currently logged in as {curr})"); let err = UIError::Failure(msg); return Err(err); } // Send the logout request. if let Err(e) = self.client.matrix_auth().logout().await { let msg = format!("Failed to logout: {e}"); let err = UIError::Failure(msg); return Err(err); } // Remove the session.json file. std::fs::remove_file(&self.settings.session_json)?; Ok(Some(InfoMessage::from("Sucessfully logged out"))) } async fn direct_message(&mut self, user: OwnedUserId) -> IambResult { for room in self.client.rooms() { if !is_direct(&room).await { continue; } if room.get_member(user.as_ref()).await.map_err(IambError::from)?.is_some() { return Ok(room.room_id().to_owned()); } } let rt = CreateRoomType::Direct(user.clone()); let flags = CreateRoomFlags::ENCRYPTED; create_room(&self.client, None, rt, flags).await.map_err(|e| { error!( user_id = user.as_str(), err = e.to_string(), "Failed to create direct message room" ); let msg = format!("Could not open a room with {user}"); UIError::Failure(msg) }) } async fn get_inviter(&mut self, invited: MatrixRoom) -> IambResult> { let details = invited.invite_details().await.map_err(IambError::from)?; Ok(details.inviter) } async fn get_room(&mut self, room_id: OwnedRoomId) -> IambResult { if let Some(room) = self.client.get_room(&room_id) { let name = room.display_name().await.map_err(IambError::from)?; let tags = room.tags().await.map_err(IambError::from)?; Ok((room, name, tags)) } else { Err(IambError::UnknownRoom(room_id).into()) } } async fn join_room(&mut self, name: String) -> IambResult { if let Ok(alias_id) = OwnedRoomOrAliasId::from_str(name.as_str()) { match self.client.join_room_by_id_or_alias(&alias_id, &[]).await { Ok(resp) => Ok(resp.room_id().to_owned()), Err(e) => { let msg = e.to_string(); let err = UIError::Failure(msg); return Err(err); }, } } else if let Ok(user) = OwnedUserId::try_from(name.as_str()) { self.direct_message(user).await } else { let msg = format!("{:?} is not a valid room or user name", name.as_str()); let err = UIError::Failure(msg); return Err(err); } } async fn members(&mut self, room_id: OwnedRoomId) -> IambResult> { if let Some(room) = self.client.get_room(room_id.as_ref()) { Ok(room.members(RoomMemberships::ACTIVE).await.map_err(IambError::from)?) } else { Err(IambError::UnknownRoom(room_id).into()) } } async fn space_members(&mut self, space: OwnedRoomId) -> IambResult> { let mut req = SpaceHierarchyRequest::new(space); req.limit = Some(1000u32.into()); req.max_depth = Some(1u32.into()); let resp = self.client.send(req, None).await.map_err(IambError::from)?; let rooms = resp.rooms.into_iter().map(|chunk| chunk.room_id).collect(); Ok(rooms) } async fn typing_notice(&mut self, room_id: OwnedRoomId) { if let Some(room) = self.client.get_room(room_id.as_ref()) { let _ = room.typing_notice(true).await; } } async fn verify(&self, action: VerifyAction, sas: SasVerification) -> IambResult { match action { VerifyAction::Accept => { sas.accept().await.map_err(IambError::from)?; Ok(Some(InfoMessage::from("Accepted verification request"))) }, VerifyAction::Confirm => { if sas.is_done() || sas.is_cancelled() { let msg = "Can only confirm in-progress verifications!"; let err = UIError::Failure(msg.into()); return Err(err); } sas.confirm().await.map_err(IambError::from)?; Ok(Some(InfoMessage::from("Confirmed verification"))) }, VerifyAction::Cancel => { if sas.is_done() || sas.is_cancelled() { let msg = "Can only cancel in-progress verifications!"; let err = UIError::Failure(msg.into()); return Err(err); } sas.cancel().await.map_err(IambError::from)?; Ok(Some(InfoMessage::from("Cancelled verification"))) }, VerifyAction::Mismatch => { if sas.is_done() || sas.is_cancelled() { let msg = "Can only cancel in-progress verifications!"; let err = UIError::Failure(msg.into()); return Err(err); } sas.mismatch().await.map_err(IambError::from)?; Ok(Some(InfoMessage::from("Cancelled verification"))) }, } } async fn verify_request(&self, user_id: OwnedUserId) -> IambResult { let enc = self.client.encryption(); match enc.get_user_identity(user_id.as_ref()).await.map_err(IambError::from)? { Some(identity) => { let methods = vec![VerificationMethod::SasV1]; let request = identity.request_verification_with_methods(methods); let _req = request.await.map_err(IambError::from)?; let info = format!("Sent verification request to {user_id}"); Ok(Some(InfoMessage::from(info))) }, None => { let msg = format!("Could not find identity information for {user_id}"); let err = UIError::Failure(msg); Err(err) }, } } }