Load receipts for room before acquiring lock (#213)

This commit is contained in:
Ulyssa 2024-03-06 23:49:35 -08:00 committed by GitHub
parent dd001af365
commit ec81b72f2c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 68 additions and 31 deletions

View file

@ -1119,6 +1119,10 @@ impl RoomNeeds {
pub fn insert(&mut self, room_id: OwnedRoomId, need: Need) { pub fn insert(&mut self, room_id: OwnedRoomId, need: Need) {
self.needs.entry(room_id).or_default().insert(need); self.needs.entry(room_id).or_default().insert(need);
} }
pub fn rooms(&self) -> usize {
self.needs.len()
}
} }
impl IntoIterator for RoomNeeds { impl IntoIterator for RoomNeeds {

View file

@ -33,7 +33,6 @@ use matrix_sdk::ruma::{
}, },
redaction::SyncRoomRedactionEvent, redaction::SyncRoomRedactionEvent,
}, },
AnyMessageLikeEvent,
RedactContent, RedactContent,
RedactedUnsigned, RedactedUnsigned,
}, },
@ -57,7 +56,7 @@ use ratatui_image::protocol::Protocol;
use crate::config::ImagePreviewSize; use crate::config::ImagePreviewSize;
use crate::{ use crate::{
base::{IambResult, RoomInfo}, base::RoomInfo,
config::ApplicationSettings, config::ApplicationSettings,
message::html::{parse_matrix_html, StyleTree}, message::html::{parse_matrix_html, StyleTree},
util::{space, space_span, take_width, wrapped_text}, util::{space, space_span, take_width, wrapped_text},
@ -66,7 +65,6 @@ use crate::{
mod html; mod html;
mod printer; mod printer;
pub type MessageFetchResult = IambResult<(Option<String>, Vec<AnyMessageLikeEvent>)>;
pub type MessageKey = (MessageTimeStamp, OwnedEventId); pub type MessageKey = (MessageTimeStamp, OwnedEventId);
pub type Messages = BTreeMap<MessageKey, Message>; pub type Messages = BTreeMap<MessageKey, Message>;

View file

@ -5,7 +5,7 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::convert::TryFrom; use std::convert::TryFrom;
use std::fmt::{Debug, Formatter}; use std::fmt::{Debug, Formatter};
use std::ops::Deref; use std::ops::{Deref, DerefMut};
use std::str::FromStr; use std::str::FromStr;
use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::sync::Arc; use std::sync::Arc;
@ -14,6 +14,7 @@ use std::time::{Duration, Instant};
use futures::{stream::FuturesUnordered, StreamExt}; use futures::{stream::FuturesUnordered, StreamExt};
use gethostname::gethostname; use gethostname::gethostname;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::Semaphore;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tracing::{error, warn}; use tracing::{error, warn};
use url::Url; use url::Url;
@ -94,11 +95,11 @@ use crate::{
EventLocation, EventLocation,
IambError, IambError,
IambResult, IambResult,
ProgramStore,
RoomFetchStatus, RoomFetchStatus,
RoomInfo, RoomInfo,
VerifyAction, VerifyAction,
}, },
message::MessageFetchResult,
ApplicationSettings, ApplicationSettings,
}; };
@ -112,6 +113,9 @@ const IAMB_DEVICE_NAME: &str = "iamb";
const IAMB_USER_AGENT: &str = "iamb"; const IAMB_USER_AGENT: &str = "iamb";
const MIN_MSG_LOAD: u32 = 50; const MIN_MSG_LOAD: u32 = 50;
type MessageFetchResult =
IambResult<(Option<String>, Vec<(AnyMessageLikeEvent, Vec<OwnedUserId>)>)>;
fn initial_devname() -> String { fn initial_devname() -> String {
format!("{} on {}", IAMB_DEVICE_NAME, gethostname().to_string_lossy()) format!("{} on {}", IAMB_DEVICE_NAME, gethostname().to_string_lossy())
} }
@ -217,7 +221,7 @@ enum Plan {
async fn load_plans(store: &AsyncProgramStore) -> Vec<Plan> { async fn load_plans(store: &AsyncProgramStore) -> Vec<Plan> {
let mut locked = store.lock().await; let mut locked = store.lock().await;
let ChatStore { need_load, rooms, .. } = &mut locked.application; let ChatStore { need_load, rooms, .. } = &mut locked.application;
let mut plan = vec![]; let mut plan = Vec::with_capacity(need_load.rooms() * 2);
for (room_id, mut need) in std::mem::take(need_load).into_iter() { for (room_id, mut need) in std::mem::take(need_load).into_iter() {
if need.contains(Need::MESSAGES) { if need.contains(Need::MESSAGES) {
@ -249,21 +253,25 @@ async fn load_plans(store: &AsyncProgramStore) -> Vec<Plan> {
return plan; return plan;
} }
async fn run_plan(client: &Client, store: &AsyncProgramStore, plan: Plan) { async fn run_plan(client: &Client, store: &AsyncProgramStore, plan: Plan, permits: &Semaphore) {
let permit = permits.acquire().await;
match plan { match plan {
Plan::Messages(room_id, fetch_id) => { Plan::Messages(room_id, fetch_id) => {
let limit = MIN_MSG_LOAD; let limit = MIN_MSG_LOAD;
let client = client.clone(); let client = client.clone();
let store = store.clone(); let store_clone = store.clone();
let res = load_older_one(&client, &room_id, fetch_id, limit).await; let res = load_older_one(&client, &room_id, fetch_id, limit).await;
load_insert(room_id, res, store).await; let mut locked = store.lock().await;
load_insert(room_id, res, locked.deref_mut(), store_clone);
}, },
Plan::Members(room_id) => { Plan::Members(room_id) => {
let res = members_load(client, &room_id).await; let res = members_load(client, &room_id).await;
members_insert(room_id, res, store).await let mut locked = store.lock().await;
members_insert(room_id, res, locked.deref_mut());
}, },
} }
drop(permit);
} }
async fn load_older_one( async fn load_older_one(
@ -281,22 +289,42 @@ async fn load_older_one(
let Messages { end, chunk, .. } = room.messages(opts).await.map_err(IambError::from)?; let Messages { end, chunk, .. } = room.messages(opts).await.map_err(IambError::from)?;
let msgs = chunk.into_iter().filter_map(|ev| { let mut msgs = vec![];
match ev.event.deserialize() {
Ok(AnyTimelineEvent::MessageLike(msg)) => Some(msg),
Ok(AnyTimelineEvent::State(_)) => None,
Err(_) => None,
}
});
Ok((end, msgs.collect())) for ev in chunk.into_iter() {
let msg = match ev.event.deserialize() {
Ok(AnyTimelineEvent::MessageLike(msg)) => msg,
Ok(AnyTimelineEvent::State(_)) => continue,
Err(_) => continue,
};
let event_id = msg.event_id();
let receipts = match room
.load_event_receipts(ReceiptType::Read, ReceiptThread::Main, event_id)
.await
{
Ok(receipts) => receipts.into_iter().map(|(u, _)| u).collect(),
Err(e) => {
tracing::warn!(?event_id, "failed to get event receipts: {e}");
vec![]
},
};
msgs.push((msg, receipts));
}
Ok((end, msgs))
} else { } else {
Err(IambError::UnknownRoom(room_id.to_owned()).into()) Err(IambError::UnknownRoom(room_id.to_owned()).into())
} }
} }
async fn load_insert(room_id: OwnedRoomId, res: MessageFetchResult, store: AsyncProgramStore) { fn load_insert(
let mut locked = store.lock().await; room_id: OwnedRoomId,
res: MessageFetchResult,
locked: &mut ProgramStore,
store: AsyncProgramStore,
) {
let ChatStore { presences, rooms, worker, picker, settings, .. } = &mut locked.application; let ChatStore { presences, rooms, worker, picker, settings, .. } = &mut locked.application;
let info = rooms.get_or_default(room_id.clone()); let info = rooms.get_or_default(room_id.clone());
info.fetching = false; info.fetching = false;
@ -304,12 +332,12 @@ async fn load_insert(room_id: OwnedRoomId, res: MessageFetchResult, store: Async
match res { match res {
Ok((fetch_id, msgs)) => { Ok((fetch_id, msgs)) => {
for msg in msgs.into_iter() { for (msg, receipts) in msgs.into_iter() {
let sender = msg.sender().to_owned(); let sender = msg.sender().to_owned();
let _ = presences.get_or_default(sender); let _ = presences.get_or_default(sender);
if let Some(room) = client.get_room(&room_id) { for user_id in receipts {
update_event_receipts(info, &room, msg.event_id()).await; info.set_receipt(user_id, msg.event_id().to_owned());
} }
match msg { match msg {
@ -345,11 +373,19 @@ async fn load_insert(room_id: OwnedRoomId, res: MessageFetchResult, store: Async
} }
async fn load_older(client: &Client, store: &AsyncProgramStore) -> usize { async fn load_older(client: &Client, store: &AsyncProgramStore) -> usize {
// This is an arbitrary limit on how much work we do in parallel to avoid
// spawning too many tasks at startup and overwhelming the client. We
// should normally only surpass this limit at startup when doing an initial.
// fetch for each room.
const LIMIT: usize = 15;
// Plans are run in parallel. Any room *may* have several plans. // Plans are run in parallel. Any room *may* have several plans.
load_plans(store) let plans = load_plans(store).await;
.await let permits = Semaphore::new(LIMIT);
plans
.into_iter() .into_iter()
.map(|plan| run_plan(client, store, plan)) .map(|plan| run_plan(client, store, plan, &permits))
.collect::<FuturesUnordered<_>>() .collect::<FuturesUnordered<_>>()
.count() .count()
.await .await
@ -366,14 +402,13 @@ async fn members_load(client: &Client, room_id: &RoomId) -> IambResult<Vec<RoomM
} }
} }
async fn members_insert( fn members_insert(
room_id: OwnedRoomId, room_id: OwnedRoomId,
res: IambResult<Vec<RoomMember>>, res: IambResult<Vec<RoomMember>>,
store: &AsyncProgramStore, store: &mut ProgramStore,
) { ) {
if let Ok(members) = res { if let Ok(members) = res {
let mut locked = store.lock().await; let ChatStore { rooms, .. } = &mut store.application;
let ChatStore { rooms, .. } = &mut locked.application;
let info = rooms.get_or_default(room_id.clone()); let info = rooms.get_or_default(room_id.clone());
for member in members { for member in members {
@ -672,7 +707,7 @@ async fn create_client_inner(
.http_client(http) .http_client(http)
.sqlite_store(settings.sqlite_dir.as_path(), None) .sqlite_store(settings.sqlite_dir.as_path(), None)
.request_config(req_config) .request_config(req_config)
.with_encryption_settings(DEFAULT_ENCRYPTION_SETTINGS.clone()); .with_encryption_settings(DEFAULT_ENCRYPTION_SETTINGS);
let builder = if let Some(url) = homeserver { let builder = if let Some(url) = homeserver {
// Use the explicitly specified homeserver. // Use the explicitly specified homeserver.