diff --git a/src/base.rs b/src/base.rs index b16ed60..76cc595 100644 --- a/src/base.rs +++ b/src/base.rs @@ -7,7 +7,6 @@ use std::time::{Duration, Instant}; use emojis::Emoji; use tokio::sync::Mutex as AsyncMutex; -use tracing::warn; use matrix_sdk::{ encryption::verification::SasVerification, @@ -23,7 +22,6 @@ use matrix_sdk::{ RoomMessageEventContent, }, tag::{TagName, Tags}, - AnyMessageLikeEvent, MessageLikeEvent, }, presence::PresenceState, @@ -412,6 +410,9 @@ pub struct RoomInfo { /// A map of message identifiers to a map of reaction events. pub reactions: HashMap, + /// Whether the scrollback for this room is currently being fetched. + pub fetching: bool, + /// Where to continue fetching from when we continue loading scrollback history. pub fetch_id: RoomFetchStatus, @@ -520,7 +521,7 @@ impl RoomInfo { } } - fn recently_fetched(&self) -> bool { + pub fn recently_fetched(&self) -> bool { self.fetch_last.map_or(false, |i| i.elapsed() < ROOM_FETCH_DEBOUNCE) } @@ -668,61 +669,6 @@ impl ChatStore { self.need_load.insert(room_id); } - pub fn load_older(&mut self, limit: u32) { - let ChatStore { need_load, presences, rooms, worker, .. } = self; - - for room_id in std::mem::take(need_load).into_iter() { - let info = rooms.get_or_default(room_id.clone()); - - if info.recently_fetched() { - need_load.insert(room_id); - continue; - } else { - info.fetch_last = Instant::now().into(); - } - - let fetch_id = match &info.fetch_id { - RoomFetchStatus::Done => continue, - RoomFetchStatus::HaveMore(fetch_id) => Some(fetch_id.clone()), - RoomFetchStatus::NotStarted => None, - }; - - let res = worker.load_older(room_id.clone(), fetch_id, limit); - - match res { - Ok((fetch_id, msgs)) => { - for msg in msgs.into_iter() { - let sender = msg.sender().to_owned(); - let _ = presences.get_or_default(sender); - - match msg { - AnyMessageLikeEvent::RoomMessage(msg) => { - info.insert(msg); - }, - AnyMessageLikeEvent::Reaction(ev) => { - info.insert_reaction(ev); - }, - _ => continue, - } - } - - info.fetch_id = - fetch_id.map_or(RoomFetchStatus::Done, RoomFetchStatus::HaveMore); - }, - Err(e) => { - warn!( - room_id = room_id.as_str(), - err = e.to_string(), - "Failed to load older messages" - ); - - // Wait and try again. - need_load.insert(room_id); - }, - } - } - } - pub fn get_room_info(&mut self, room_id: OwnedRoomId) -> &mut RoomInfo { self.rooms.get_or_default(room_id) } diff --git a/src/main.rs b/src/main.rs index 3137ce1..11faf53 100644 --- a/src/main.rs +++ b/src/main.rs @@ -101,14 +101,6 @@ use modalkit::{ }, }; -const MIN_MSG_LOAD: u32 = 50; - -fn msg_load_req(area: Rect) -> u32 { - let n = area.height as u32; - - n.max(MIN_MSG_LOAD) -} - struct Application { store: AsyncProgramStore, worker: Requester, @@ -190,8 +182,6 @@ impl Application { } f.set_cursor(cx, cy); } - - store.application.load_older(msg_load_req(area)); })?; Ok(()) diff --git a/src/tests.rs b/src/tests.rs index b2cbfc9..34b840b 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -153,6 +153,7 @@ pub fn mock_room() -> RoomInfo { read_till: None, reactions: HashMap::new(), + fetching: false, fetch_id: RoomFetchStatus::NotStarted, fetch_last: None, users_typing: None, diff --git a/src/worker.rs b/src/worker.rs index 12eaa79..25c4da8 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::convert::TryFrom; use std::fmt::{Debug, Formatter}; use std::fs::File; @@ -5,12 +6,12 @@ use std::io::BufWriter; use std::str::FromStr; use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use gethostname::gethostname; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::task::JoinHandle; -use tracing::error; +use tracing::{error, warn}; use matrix_sdk::{ config::{RequestConfig, StoreConfig, SyncSettings}, @@ -44,6 +45,7 @@ use matrix_sdk::{ tag::Tags, typing::SyncTypingEvent, AnyInitialStateEvent, + AnyMessageLikeEvent, AnyTimelineEvent, EmptyStateKey, InitialStateEvent, @@ -56,6 +58,7 @@ use matrix_sdk::{ OwnedRoomId, OwnedRoomOrAliasId, OwnedUserId, + RoomId, RoomVersionId, }, Client, @@ -68,12 +71,14 @@ use modalkit::editing::action::{EditInfo, InfoMessage, UIError}; use crate::{ base::{ AsyncProgramStore, + ChatStore, CreateRoomFlags, CreateRoomType, EventLocation, IambError, IambResult, Receipts, + RoomFetchStatus, VerifyAction, }, message::MessageFetchResult, @@ -82,6 +87,7 @@ use crate::{ const IAMB_DEVICE_NAME: &str = "iamb"; const IAMB_USER_AGENT: &str = "iamb"; +const MIN_MSG_LOAD: u32 = 50; fn initial_devname() -> String { format!("{} on {}", IAMB_DEVICE_NAME, gethostname().to_string_lossy()) @@ -158,6 +164,113 @@ pub async fn create_room( return Ok(resp.room_id); } +async fn load_plan(store: &AsyncProgramStore) -> HashMap> { + let mut locked = store.lock().await; + let ChatStore { need_load, rooms, .. } = &mut locked.application; + let mut plan = HashMap::new(); + + for room_id in std::mem::take(need_load).into_iter() { + let info = rooms.get_or_default(room_id.clone()); + + if info.recently_fetched() || info.fetching { + need_load.insert(room_id); + continue; + } else { + info.fetch_last = Instant::now().into(); + info.fetching = true; + } + + let fetch_id = match &info.fetch_id { + RoomFetchStatus::Done => continue, + RoomFetchStatus::HaveMore(fetch_id) => Some(fetch_id.clone()), + RoomFetchStatus::NotStarted => None, + }; + + plan.insert(room_id, fetch_id); + } + + return plan; +} + +async fn load_older_one( + client: Client, + room_id: &RoomId, + fetch_id: Option, + limit: u32, +) -> MessageFetchResult { + if let Some(room) = client.get_room(room_id) { + let mut opts = match &fetch_id { + Some(id) => MessagesOptions::backward().from(id.as_str()), + None => MessagesOptions::backward(), + }; + opts.limit = limit.into(); + + let Messages { end, chunk, .. } = room.messages(opts).await.map_err(IambError::from)?; + + let msgs = chunk.into_iter().filter_map(|ev| { + match ev.event.deserialize() { + Ok(AnyTimelineEvent::MessageLike(msg)) => Some(msg), + Ok(AnyTimelineEvent::State(_)) => None, + Err(_) => None, + } + }); + + Ok((end, msgs.collect())) + } else { + Err(IambError::UnknownRoom(room_id.to_owned()).into()) + } +} + +async fn load_insert(room_id: OwnedRoomId, res: MessageFetchResult, store: AsyncProgramStore) { + let mut locked = store.lock().await; + let ChatStore { need_load, presences, rooms, .. } = &mut locked.application; + let info = rooms.get_or_default(room_id.clone()); + info.fetching = false; + + match res { + Ok((fetch_id, msgs)) => { + for msg in msgs.into_iter() { + let sender = msg.sender().to_owned(); + let _ = presences.get_or_default(sender); + + match msg { + AnyMessageLikeEvent::RoomMessage(msg) => { + info.insert(msg); + }, + AnyMessageLikeEvent::Reaction(ev) => { + info.insert_reaction(ev); + }, + _ => continue, + } + } + + info.fetch_id = fetch_id.map_or(RoomFetchStatus::Done, RoomFetchStatus::HaveMore); + }, + Err(e) => { + warn!(room_id = room_id.as_str(), err = e.to_string(), "Failed to load older messages"); + + // Wait and try again. + need_load.insert(room_id); + }, + } +} + +async fn load_older(client: &Client, store: &AsyncProgramStore) { + let limit = MIN_MSG_LOAD; + let plan = load_plan(store).await; + + // Fetch each room separately, so they don't block each other. + for (room_id, fetch_id) in plan.into_iter() { + let client = client.clone(); + let store = store.clone(); + + tokio::spawn(async move { + let res = load_older_one(client, room_id.as_ref(), fetch_id, limit).await; + load_insert(room_id, res, store).await; + }); + } +} + #[derive(Debug)] pub enum LoginStyle { SessionRestore(Session), @@ -216,7 +329,6 @@ pub enum WorkerTask { ActiveRooms(ClientReply>), DirectMessages(ClientReply>), Init(AsyncProgramStore, ClientReply<()>), - LoadOlder(OwnedRoomId, Option, u32, ClientReply), Login(LoginStyle, ClientReply>), GetInviter(Invited, ClientReply>>), GetRoom(OwnedRoomId, ClientReply>), @@ -246,14 +358,6 @@ impl Debug for WorkerTask { .field(&format_args!("_")) .finish() }, - WorkerTask::LoadOlder(room_id, from, n, _) => { - f.debug_tuple("WorkerTask::LoadOlder") - .field(room_id) - .field(from) - .field(n) - .field(&format_args!("_")) - .finish() - }, WorkerTask::Login(style, _) => { f.debug_tuple("WorkerTask::Login") .field(style) @@ -325,21 +429,6 @@ impl Requester { return response.recv(); } - pub fn load_older( - &self, - room_id: OwnedRoomId, - fetch_id: Option, - limit: u32, - ) -> MessageFetchResult { - let (reply, response) = oneshot(); - - self.tx - .send(WorkerTask::LoadOlder(room_id, fetch_id, limit, reply)) - .unwrap(); - - return response.recv(); - } - pub fn login(&self, style: LoginStyle) -> IambResult { let (reply, response) = oneshot(); @@ -437,8 +526,9 @@ pub struct ClientWorker { initialized: bool, settings: ApplicationSettings, client: Client, - sync_handle: Option>, + load_handle: Option>, rcpt_handle: Option>, + sync_handle: Option>, } impl ClientWorker { @@ -476,8 +566,9 @@ impl ClientWorker { initialized: false, settings, client: client.clone(), - sync_handle: None, + load_handle: None, rcpt_handle: None, + sync_handle: None, }; tokio::spawn(async move { @@ -535,10 +626,6 @@ impl ClientWorker { assert!(self.initialized); reply.send(self.active_rooms().await); }, - WorkerTask::LoadOlder(room_id, fetch_id, limit, reply) => { - assert!(self.initialized); - reply.send(self.load_older(room_id, fetch_id, limit).await); - }, WorkerTask::Login(style, reply) => { assert!(self.initialized); reply.send(self.login_and_sync(style).await); @@ -811,17 +898,34 @@ impl ClientWorker { }, ); - let client = self.client.clone(); + self.rcpt_handle = tokio::spawn({ + let store = store.clone(); + let client = self.client.clone(); - self.rcpt_handle = tokio::spawn(async move { - // Update the displayed read receipts ever 5 seconds. - let mut interval = tokio::time::interval(Duration::from_secs(5)); + async move { + // Update the displayed read receipts every 5 seconds. + let mut interval = tokio::time::interval(Duration::from_secs(5)); - loop { - interval.tick().await; + loop { + interval.tick().await; - let receipts = update_receipts(&client).await; - store.lock().await.application.set_receipts(receipts).await; + let receipts = update_receipts(&client).await; + store.lock().await.application.set_receipts(receipts).await; + } + } + }) + .into(); + + self.load_handle = tokio::spawn({ + let client = self.client.clone(); + + async move { + // Load older messages every 2 seconds. + let mut interval = tokio::time::interval(Duration::from_secs(2)); + loop { + interval.tick().await; + load_older(&client, &store).await; + } } }) .into(); @@ -991,35 +1095,6 @@ impl ClientWorker { return rooms; } - async fn load_older( - &mut self, - room_id: OwnedRoomId, - fetch_id: Option, - limit: u32, - ) -> MessageFetchResult { - if let Some(room) = self.client.get_room(room_id.as_ref()) { - let mut opts = match &fetch_id { - Some(id) => MessagesOptions::backward().from(id.as_str()), - None => MessagesOptions::backward(), - }; - opts.limit = limit.into(); - - let Messages { end, chunk, .. } = room.messages(opts).await.map_err(IambError::from)?; - - let msgs = chunk.into_iter().filter_map(|ev| { - match ev.event.deserialize() { - Ok(AnyTimelineEvent::MessageLike(msg)) => Some(msg), - Ok(AnyTimelineEvent::State(_)) => None, - Err(_) => None, - } - }); - - Ok((end, msgs.collect())) - } else { - Err(IambError::UnknownRoom(room_id).into()) - } - } - async fn members(&mut self, room_id: OwnedRoomId) -> IambResult> { if let Some(room) = self.client.get_room(room_id.as_ref()) { Ok(room.active_members().await.map_err(IambError::from)?)