Fetch scrollback history independently of main loop (#39)

This commit is contained in:
Ulyssa 2023-03-13 10:46:26 -07:00
parent 6a0722795a
commit 61897ea6f2
No known key found for this signature in database
GPG key ID: 1B3965A3D18B9B64
4 changed files with 149 additions and 137 deletions

View file

@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::convert::TryFrom;
use std::fmt::{Debug, Formatter};
use std::fs::File;
@ -5,12 +6,12 @@ use std::io::BufWriter;
use std::str::FromStr;
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use gethostname::gethostname;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::task::JoinHandle;
use tracing::error;
use tracing::{error, warn};
use matrix_sdk::{
config::{RequestConfig, StoreConfig, SyncSettings},
@ -44,6 +45,7 @@ use matrix_sdk::{
tag::Tags,
typing::SyncTypingEvent,
AnyInitialStateEvent,
AnyMessageLikeEvent,
AnyTimelineEvent,
EmptyStateKey,
InitialStateEvent,
@ -56,6 +58,7 @@ use matrix_sdk::{
OwnedRoomId,
OwnedRoomOrAliasId,
OwnedUserId,
RoomId,
RoomVersionId,
},
Client,
@ -68,12 +71,14 @@ use modalkit::editing::action::{EditInfo, InfoMessage, UIError};
use crate::{
base::{
AsyncProgramStore,
ChatStore,
CreateRoomFlags,
CreateRoomType,
EventLocation,
IambError,
IambResult,
Receipts,
RoomFetchStatus,
VerifyAction,
},
message::MessageFetchResult,
@ -82,6 +87,7 @@ use crate::{
const IAMB_DEVICE_NAME: &str = "iamb";
const IAMB_USER_AGENT: &str = "iamb";
const MIN_MSG_LOAD: u32 = 50;
fn initial_devname() -> String {
format!("{} on {}", IAMB_DEVICE_NAME, gethostname().to_string_lossy())
@ -158,6 +164,113 @@ pub async fn create_room(
return Ok(resp.room_id);
}
async fn load_plan(store: &AsyncProgramStore) -> HashMap<OwnedRoomId, Option<String>> {
let mut locked = store.lock().await;
let ChatStore { need_load, rooms, .. } = &mut locked.application;
let mut plan = HashMap::new();
for room_id in std::mem::take(need_load).into_iter() {
let info = rooms.get_or_default(room_id.clone());
if info.recently_fetched() || info.fetching {
need_load.insert(room_id);
continue;
} else {
info.fetch_last = Instant::now().into();
info.fetching = true;
}
let fetch_id = match &info.fetch_id {
RoomFetchStatus::Done => continue,
RoomFetchStatus::HaveMore(fetch_id) => Some(fetch_id.clone()),
RoomFetchStatus::NotStarted => None,
};
plan.insert(room_id, fetch_id);
}
return plan;
}
async fn load_older_one(
client: Client,
room_id: &RoomId,
fetch_id: Option<String>,
limit: u32,
) -> MessageFetchResult {
if let Some(room) = client.get_room(room_id) {
let mut opts = match &fetch_id {
Some(id) => MessagesOptions::backward().from(id.as_str()),
None => MessagesOptions::backward(),
};
opts.limit = limit.into();
let Messages { end, chunk, .. } = room.messages(opts).await.map_err(IambError::from)?;
let msgs = chunk.into_iter().filter_map(|ev| {
match ev.event.deserialize() {
Ok(AnyTimelineEvent::MessageLike(msg)) => Some(msg),
Ok(AnyTimelineEvent::State(_)) => None,
Err(_) => None,
}
});
Ok((end, msgs.collect()))
} else {
Err(IambError::UnknownRoom(room_id.to_owned()).into())
}
}
async fn load_insert(room_id: OwnedRoomId, res: MessageFetchResult, store: AsyncProgramStore) {
let mut locked = store.lock().await;
let ChatStore { need_load, presences, rooms, .. } = &mut locked.application;
let info = rooms.get_or_default(room_id.clone());
info.fetching = false;
match res {
Ok((fetch_id, msgs)) => {
for msg in msgs.into_iter() {
let sender = msg.sender().to_owned();
let _ = presences.get_or_default(sender);
match msg {
AnyMessageLikeEvent::RoomMessage(msg) => {
info.insert(msg);
},
AnyMessageLikeEvent::Reaction(ev) => {
info.insert_reaction(ev);
},
_ => continue,
}
}
info.fetch_id = fetch_id.map_or(RoomFetchStatus::Done, RoomFetchStatus::HaveMore);
},
Err(e) => {
warn!(room_id = room_id.as_str(), err = e.to_string(), "Failed to load older messages");
// Wait and try again.
need_load.insert(room_id);
},
}
}
async fn load_older(client: &Client, store: &AsyncProgramStore) {
let limit = MIN_MSG_LOAD;
let plan = load_plan(store).await;
// Fetch each room separately, so they don't block each other.
for (room_id, fetch_id) in plan.into_iter() {
let client = client.clone();
let store = store.clone();
tokio::spawn(async move {
let res = load_older_one(client, room_id.as_ref(), fetch_id, limit).await;
load_insert(room_id, res, store).await;
});
}
}
#[derive(Debug)]
pub enum LoginStyle {
SessionRestore(Session),
@ -216,7 +329,6 @@ pub enum WorkerTask {
ActiveRooms(ClientReply<Vec<FetchedRoom>>),
DirectMessages(ClientReply<Vec<FetchedRoom>>),
Init(AsyncProgramStore, ClientReply<()>),
LoadOlder(OwnedRoomId, Option<String>, u32, ClientReply<MessageFetchResult>),
Login(LoginStyle, ClientReply<IambResult<EditInfo>>),
GetInviter(Invited, ClientReply<IambResult<Option<RoomMember>>>),
GetRoom(OwnedRoomId, ClientReply<IambResult<FetchedRoom>>),
@ -246,14 +358,6 @@ impl Debug for WorkerTask {
.field(&format_args!("_"))
.finish()
},
WorkerTask::LoadOlder(room_id, from, n, _) => {
f.debug_tuple("WorkerTask::LoadOlder")
.field(room_id)
.field(from)
.field(n)
.field(&format_args!("_"))
.finish()
},
WorkerTask::Login(style, _) => {
f.debug_tuple("WorkerTask::Login")
.field(style)
@ -325,21 +429,6 @@ impl Requester {
return response.recv();
}
pub fn load_older(
&self,
room_id: OwnedRoomId,
fetch_id: Option<String>,
limit: u32,
) -> MessageFetchResult {
let (reply, response) = oneshot();
self.tx
.send(WorkerTask::LoadOlder(room_id, fetch_id, limit, reply))
.unwrap();
return response.recv();
}
pub fn login(&self, style: LoginStyle) -> IambResult<EditInfo> {
let (reply, response) = oneshot();
@ -437,8 +526,9 @@ pub struct ClientWorker {
initialized: bool,
settings: ApplicationSettings,
client: Client,
sync_handle: Option<JoinHandle<()>>,
load_handle: Option<JoinHandle<()>>,
rcpt_handle: Option<JoinHandle<()>>,
sync_handle: Option<JoinHandle<()>>,
}
impl ClientWorker {
@ -476,8 +566,9 @@ impl ClientWorker {
initialized: false,
settings,
client: client.clone(),
sync_handle: None,
load_handle: None,
rcpt_handle: None,
sync_handle: None,
};
tokio::spawn(async move {
@ -535,10 +626,6 @@ impl ClientWorker {
assert!(self.initialized);
reply.send(self.active_rooms().await);
},
WorkerTask::LoadOlder(room_id, fetch_id, limit, reply) => {
assert!(self.initialized);
reply.send(self.load_older(room_id, fetch_id, limit).await);
},
WorkerTask::Login(style, reply) => {
assert!(self.initialized);
reply.send(self.login_and_sync(style).await);
@ -811,17 +898,34 @@ impl ClientWorker {
},
);
let client = self.client.clone();
self.rcpt_handle = tokio::spawn({
let store = store.clone();
let client = self.client.clone();
self.rcpt_handle = tokio::spawn(async move {
// Update the displayed read receipts ever 5 seconds.
let mut interval = tokio::time::interval(Duration::from_secs(5));
async move {
// Update the displayed read receipts every 5 seconds.
let mut interval = tokio::time::interval(Duration::from_secs(5));
loop {
interval.tick().await;
loop {
interval.tick().await;
let receipts = update_receipts(&client).await;
store.lock().await.application.set_receipts(receipts).await;
let receipts = update_receipts(&client).await;
store.lock().await.application.set_receipts(receipts).await;
}
}
})
.into();
self.load_handle = tokio::spawn({
let client = self.client.clone();
async move {
// Load older messages every 2 seconds.
let mut interval = tokio::time::interval(Duration::from_secs(2));
loop {
interval.tick().await;
load_older(&client, &store).await;
}
}
})
.into();
@ -991,35 +1095,6 @@ impl ClientWorker {
return rooms;
}
async fn load_older(
&mut self,
room_id: OwnedRoomId,
fetch_id: Option<String>,
limit: u32,
) -> MessageFetchResult {
if let Some(room) = self.client.get_room(room_id.as_ref()) {
let mut opts = match &fetch_id {
Some(id) => MessagesOptions::backward().from(id.as_str()),
None => MessagesOptions::backward(),
};
opts.limit = limit.into();
let Messages { end, chunk, .. } = room.messages(opts).await.map_err(IambError::from)?;
let msgs = chunk.into_iter().filter_map(|ev| {
match ev.event.deserialize() {
Ok(AnyTimelineEvent::MessageLike(msg)) => Some(msg),
Ok(AnyTimelineEvent::State(_)) => None,
Err(_) => None,
}
});
Ok((end, msgs.collect()))
} else {
Err(IambError::UnknownRoom(room_id).into())
}
}
async fn members(&mut self, room_id: OwnedRoomId) -> IambResult<Vec<RoomMember>> {
if let Some(room) = self.client.get_room(room_id.as_ref()) {
Ok(room.active_members().await.map_err(IambError::from)?)