Reduce CPU usage by instead fetching read receipts after related sync events (#168)

This commit is contained in:
Benjamin Lee 2023-10-15 18:12:39 -07:00 committed by GitHub
parent df3148b9f5
commit b2b47ed7a0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 123 additions and 81 deletions

View file

@ -413,9 +413,6 @@ pub type IambResult<T> = UIResult<T, IambInfo>;
/// it's reacting to. /// it's reacting to.
pub type MessageReactions = HashMap<OwnedEventId, (String, OwnedUserId)>; pub type MessageReactions = HashMap<OwnedEventId, (String, OwnedUserId)>;
/// Map of read receipts for different events.
pub type Receipts = HashMap<OwnedEventId, Vec<OwnedUserId>>;
/// Errors encountered during application use. /// Errors encountered during application use.
#[derive(thiserror::Error, Debug)] #[derive(thiserror::Error, Debug)]
pub enum IambError { pub enum IambError {
@ -549,7 +546,14 @@ pub struct RoomInfo {
pub messages: Messages, pub messages: Messages,
/// A map of read markers to display on different events. /// A map of read markers to display on different events.
pub receipts: HashMap<OwnedEventId, Vec<OwnedUserId>>, pub event_receipts: HashMap<OwnedEventId, HashSet<OwnedUserId>>,
/// A map of the most recent read marker for each user.
///
/// Every receipt in this map should also have an entry in [`event_receipts`],
/// however not every user has an entry. If a user's most recent receipt is
/// older than the oldest loaded event, that user will not be included.
pub user_receipts: HashMap<OwnedUserId, OwnedEventId>,
/// An event ID for where we should indicate we've read up to. /// An event ID for where we should indicate we've read up to.
pub read_till: Option<OwnedEventId>, pub read_till: Option<OwnedEventId>,
@ -698,6 +702,27 @@ impl RoomInfo {
self.fetch_last.map_or(false, |i| i.elapsed() < ROOM_FETCH_DEBOUNCE) self.fetch_last.map_or(false, |i| i.elapsed() < ROOM_FETCH_DEBOUNCE)
} }
fn clear_receipt(&mut self, user_id: &OwnedUserId) -> Option<()> {
let old_event_id = self.user_receipts.get(user_id)?;
let old_receipts = self.event_receipts.get_mut(old_event_id)?;
old_receipts.remove(user_id);
if old_receipts.is_empty() {
self.event_receipts.remove(old_event_id);
}
None
}
pub fn set_receipt(&mut self, user_id: OwnedUserId, event_id: OwnedEventId) {
self.clear_receipt(&user_id);
self.event_receipts
.entry(event_id.clone())
.or_default()
.insert(user_id.clone());
self.user_receipts.insert(user_id, event_id);
}
fn get_typers(&self) -> &[OwnedUserId] { fn get_typers(&self) -> &[OwnedUserId] {
if let Some((t, users)) = &self.users_typing { if let Some((t, users)) = &self.users_typing {
if t.elapsed() < Duration::from_secs(4) { if t.elapsed() < Duration::from_secs(4) {
@ -860,26 +885,6 @@ impl ChatStore {
.unwrap_or_else(|| "Untitled Matrix Room".to_string()) .unwrap_or_else(|| "Untitled Matrix Room".to_string())
} }
/// Update the receipts for multiple rooms.
pub async fn set_receipts(
&mut self,
receipts: Vec<(OwnedRoomId, Receipts)>,
) -> Vec<(OwnedRoomId, OwnedEventId)> {
let mut updates = vec![];
for (room_id, receipts) in receipts.into_iter() {
if let Some(info) = self.rooms.get_mut(&room_id) {
info.receipts = receipts;
if let Some(read_till) = info.read_till.take() {
updates.push((room_id, read_till));
}
}
}
return updates;
}
/// Mark a room for loading more scrollback. /// Mark a room for loading more scrollback.
pub fn mark_for_load(&mut self, room_id: OwnedRoomId) { pub fn mark_for_load(&mut self, room_id: OwnedRoomId) {
self.need_load.insert(room_id); self.need_load.insert(room_id);

View file

@ -2,10 +2,10 @@
use std::borrow::Cow; use std::borrow::Cow;
use std::cmp::{Ord, Ordering, PartialOrd}; use std::cmp::{Ord, Ordering, PartialOrd};
use std::collections::hash_map::DefaultHasher; use std::collections::hash_map::DefaultHasher;
use std::collections::hash_set;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::convert::TryFrom; use std::convert::TryFrom;
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
use std::slice::Iter;
use chrono::{DateTime, Local as LocalTz, NaiveDateTime, TimeZone}; use chrono::{DateTime, Local as LocalTz, NaiveDateTime, TimeZone};
use comrak::{markdown_to_html, ComrakOptions}; use comrak::{markdown_to_html, ComrakOptions};
@ -501,7 +501,7 @@ struct MessageFormatter<'a> {
date: Option<Span<'a>>, date: Option<Span<'a>>,
/// Iterator over the users who have read up to this message. /// Iterator over the users who have read up to this message.
read: Iter<'a, OwnedUserId>, read: Option<hash_set::Iter<'a, OwnedUserId>>,
} }
impl<'a> MessageFormatter<'a> { impl<'a> MessageFormatter<'a> {
@ -533,10 +533,11 @@ impl<'a> MessageFormatter<'a> {
// Show read receipts. // Show read receipts.
let user_char = let user_char =
|user: &'a OwnedUserId| -> Span<'a> { settings.get_user_char_span(user) }; |user: &'a OwnedUserId| -> Span<'a> { settings.get_user_char_span(user) };
let mut read = self.read.iter_mut().flatten();
let a = self.read.next().map(user_char).unwrap_or_else(|| Span::raw(" ")); let a = read.next().map(user_char).unwrap_or_else(|| Span::raw(" "));
let b = self.read.next().map(user_char).unwrap_or_else(|| Span::raw(" ")); let b = read.next().map(user_char).unwrap_or_else(|| Span::raw(" "));
let c = self.read.next().map(user_char).unwrap_or_else(|| Span::raw(" ")); let c = read.next().map(user_char).unwrap_or_else(|| Span::raw(" "));
line.push(Span::raw(" ")); line.push(Span::raw(" "));
line.push(c); line.push(c);
@ -650,10 +651,7 @@ impl Message {
let fill = width - USER_GUTTER - TIME_GUTTER - READ_GUTTER; let fill = width - USER_GUTTER - TIME_GUTTER - READ_GUTTER;
let user = self.show_sender(prev, true, info, settings); let user = self.show_sender(prev, true, info, settings);
let time = self.timestamp.show_time(); let time = self.timestamp.show_time();
let read = match info.receipts.get(self.event.event_id()) { let read = info.event_receipts.get(self.event.event_id()).map(|read| read.iter());
Some(read) => read.iter(),
None => [].iter(),
};
MessageFormatter { settings, cols, orig, fill, user, date, time, read } MessageFormatter { settings, cols, orig, fill, user, date, time, read }
} else if USER_GUTTER + TIME_GUTTER + MIN_MSG_LEN <= width { } else if USER_GUTTER + TIME_GUTTER + MIN_MSG_LEN <= width {
@ -661,7 +659,7 @@ impl Message {
let fill = width - USER_GUTTER - TIME_GUTTER; let fill = width - USER_GUTTER - TIME_GUTTER;
let user = self.show_sender(prev, true, info, settings); let user = self.show_sender(prev, true, info, settings);
let time = self.timestamp.show_time(); let time = self.timestamp.show_time();
let read = [].iter(); let read = None;
MessageFormatter { settings, cols, orig, fill, user, date, time, read } MessageFormatter { settings, cols, orig, fill, user, date, time, read }
} else if USER_GUTTER + MIN_MSG_LEN <= width { } else if USER_GUTTER + MIN_MSG_LEN <= width {
@ -669,7 +667,7 @@ impl Message {
let fill = width - USER_GUTTER; let fill = width - USER_GUTTER;
let user = self.show_sender(prev, true, info, settings); let user = self.show_sender(prev, true, info, settings);
let time = None; let time = None;
let read = [].iter(); let read = None;
MessageFormatter { settings, cols, orig, fill, user, date, time, read } MessageFormatter { settings, cols, orig, fill, user, date, time, read }
} else { } else {
@ -677,7 +675,7 @@ impl Message {
let fill = width.saturating_sub(2); let fill = width.saturating_sub(2);
let user = self.show_sender(prev, false, info, settings); let user = self.show_sender(prev, false, info, settings);
let time = None; let time = None;
let read = [].iter(); let read = None;
MessageFormatter { settings, cols, orig, fill, user, date, time, read } MessageFormatter { settings, cols, orig, fill, user, date, time, read }
} }

View file

@ -153,7 +153,8 @@ pub fn mock_room() -> RoomInfo {
keys: mock_keys(), keys: mock_keys(),
messages: mock_messages(), messages: mock_messages(),
receipts: HashMap::new(), event_receipts: HashMap::new(),
user_receipts: HashMap::new(),
read_till: None, read_till: None,
reactions: HashMap::new(), reactions: HashMap::new(),

View file

@ -41,6 +41,7 @@ use matrix_sdk::{
}, },
presence::PresenceEvent, presence::PresenceEvent,
reaction::ReactionEventContent, reaction::ReactionEventContent,
receipt::{ReceiptEventContent, ReceiptType},
room::{ room::{
encryption::RoomEncryptionEventContent, encryption::RoomEncryptionEventContent,
member::OriginalSyncRoomMemberEvent, member::OriginalSyncRoomMemberEvent,
@ -55,12 +56,14 @@ use matrix_sdk::{
AnyTimelineEvent, AnyTimelineEvent,
EmptyStateKey, EmptyStateKey,
InitialStateEvent, InitialStateEvent,
SyncEphemeralRoomEvent,
SyncMessageLikeEvent, SyncMessageLikeEvent,
SyncStateEvent, SyncStateEvent,
}, },
room::RoomType, room::RoomType,
serde::Raw, serde::Raw,
EventEncryptionAlgorithm, EventEncryptionAlgorithm,
EventId,
OwnedEventId, OwnedEventId,
OwnedRoomId, OwnedRoomId,
OwnedRoomOrAliasId, OwnedRoomOrAliasId,
@ -84,8 +87,8 @@ use crate::{
EventLocation, EventLocation,
IambError, IambError,
IambResult, IambResult,
Receipts,
RoomFetchStatus, RoomFetchStatus,
RoomInfo,
VerifyAction, VerifyAction,
}, },
message::MessageFetchResult, message::MessageFetchResult,
@ -171,6 +174,19 @@ pub async fn create_room(
return Ok(resp.room_id); return Ok(resp.room_id);
} }
async fn update_event_receipts(info: &mut RoomInfo, room: &MatrixRoom, event_id: &EventId) {
let receipts = match room.event_read_receipts(event_id).await {
Ok(receipts) => receipts,
Err(e) => {
tracing::warn!(?event_id, "failed to get event receipts: {e}");
return;
},
};
for (user_id, _) in receipts {
info.set_receipt(user_id, event_id.to_owned());
}
}
async fn load_plan(store: &AsyncProgramStore) -> HashMap<OwnedRoomId, Option<String>> { async fn load_plan(store: &AsyncProgramStore) -> HashMap<OwnedRoomId, Option<String>> {
let mut locked = store.lock().await; let mut locked = store.lock().await;
let ChatStore { need_load, rooms, .. } = &mut locked.application; let ChatStore { need_load, rooms, .. } = &mut locked.application;
@ -200,7 +216,7 @@ async fn load_plan(store: &AsyncProgramStore) -> HashMap<OwnedRoomId, Option<Str
} }
async fn load_older_one( async fn load_older_one(
client: Client, client: &Client,
room_id: &RoomId, room_id: &RoomId,
fetch_id: Option<String>, fetch_id: Option<String>,
limit: u32, limit: u32,
@ -228,7 +244,12 @@ async fn load_older_one(
} }
} }
async fn load_insert(room_id: OwnedRoomId, res: MessageFetchResult, store: AsyncProgramStore) { async fn load_insert(
client: &Client,
room_id: OwnedRoomId,
res: MessageFetchResult,
store: AsyncProgramStore,
) {
let mut locked = store.lock().await; let mut locked = store.lock().await;
let ChatStore { need_load, presences, rooms, .. } = &mut locked.application; let ChatStore { need_load, presences, rooms, .. } = &mut locked.application;
let info = rooms.get_or_default(room_id.clone()); let info = rooms.get_or_default(room_id.clone());
@ -240,6 +261,10 @@ async fn load_insert(room_id: OwnedRoomId, res: MessageFetchResult, store: Async
let sender = msg.sender().to_owned(); let sender = msg.sender().to_owned();
let _ = presences.get_or_default(sender); let _ = presences.get_or_default(sender);
if let Some(room) = client.get_room(&room_id) {
update_event_receipts(info, &room, msg.event_id()).await;
}
match msg { match msg {
AnyMessageLikeEvent::RoomEncrypted(msg) => { AnyMessageLikeEvent::RoomEncrypted(msg) => {
info.insert_encrypted(msg); info.insert_encrypted(msg);
@ -277,8 +302,8 @@ async fn load_older(client: &Client, store: &AsyncProgramStore) -> usize {
let store = store.clone(); let store = store.clone();
async move { async move {
let res = load_older_one(client, room_id.as_ref(), fetch_id, limit).await; let res = load_older_one(&client, room_id.as_ref(), fetch_id, limit).await;
load_insert(room_id, res, store).await; load_insert(&client, room_id, res, store).await;
} }
}) })
.collect::<FuturesUnordered<_>>() .collect::<FuturesUnordered<_>>()
@ -357,28 +382,40 @@ async fn refresh_rooms_forever(client: &Client, store: &AsyncProgramStore) {
} }
} }
async fn refresh_receipts_forever(client: &Client, store: &AsyncProgramStore) { async fn send_receipts_forever(client: &Client, store: &AsyncProgramStore) {
// Update the displayed read receipts every 5 seconds. let mut interval = tokio::time::interval(Duration::from_secs(2));
let mut interval = tokio::time::interval(Duration::from_secs(5));
let mut sent = HashMap::<OwnedRoomId, OwnedEventId>::default(); let mut sent = HashMap::<OwnedRoomId, OwnedEventId>::default();
loop { loop {
interval.tick().await; interval.tick().await;
let receipts = update_receipts(client).await;
let read = store.lock().await.application.set_receipts(receipts).await;
for (room_id, read_till) in read.into_iter() { let locked = store.lock().await;
if let Some(read_sent) = sent.get(&room_id) { let updates = client
if read_sent == &read_till { .joined_rooms()
// Skip unchanged receipts. .into_iter()
continue; .filter_map(|room| {
let room_id = room.room_id().to_owned();
let info = locked.application.rooms.get(&room_id)?;
let new_receipt = info.read_till.as_ref()?;
let old_receipt = sent.get(&room_id);
if Some(new_receipt) != old_receipt {
Some((room_id, new_receipt.clone()))
} else {
None
} }
} })
.collect::<Vec<_>>();
drop(locked);
if let Some(room) = client.get_joined_room(&room_id) { for (room_id, new_receipt) in updates {
if room.read_receipt(&read_till).await.is_ok() { let Some(room) = client.get_joined_room(&room_id) else {
sent.insert(room_id, read_till); continue;
} };
match room.read_receipt(&new_receipt).await {
Ok(()) => {
sent.insert(room_id, new_receipt);
},
Err(e) => tracing::warn!(?room_id, "Failed to set read receipt: {e}"),
} }
} }
} }
@ -413,29 +450,6 @@ fn oneshot<T>() -> (ClientReply<T>, ClientResponse<T>) {
return (reply, response); return (reply, response);
} }
async fn update_receipts(client: &Client) -> Vec<(OwnedRoomId, Receipts)> {
let mut rooms = vec![];
for room in client.joined_rooms() {
if let Ok(users) = room.active_members_no_sync().await {
let mut receipts = Receipts::new();
for member in users {
let res = room.user_read_receipt(member.user_id()).await;
if let Ok(Some((event_id, _))) = res {
let user_id = member.user_id().to_owned();
receipts.entry(event_id).or_default().push(user_id);
}
}
rooms.push((room.room_id().to_owned(), receipts));
}
}
return rooms;
}
pub type FetchedRoom = (MatrixRoom, DisplayName, Option<Tags>); pub type FetchedRoom = (MatrixRoom, DisplayName, Option<Tags>);
pub enum WorkerTask { pub enum WorkerTask {
@ -787,6 +801,7 @@ impl ClientWorker {
let _ = locked.application.presences.get_or_default(sender); let _ = locked.application.presences.get_or_default(sender);
let info = locked.application.get_room_info(room_id.to_owned()); let info = locked.application.get_room_info(room_id.to_owned());
update_event_receipts(info, &room, ev.event_id()).await;
info.insert(ev.into_full_event(room_id.to_owned())); info.insert(ev.into_full_event(room_id.to_owned()));
} }
}, },
@ -805,11 +820,34 @@ impl ClientWorker {
let _ = locked.application.presences.get_or_default(sender); let _ = locked.application.presences.get_or_default(sender);
let info = locked.application.get_room_info(room_id.to_owned()); let info = locked.application.get_room_info(room_id.to_owned());
update_event_receipts(info, &room, ev.event_id()).await;
info.insert_reaction(ev.into_full_event(room_id.to_owned())); info.insert_reaction(ev.into_full_event(room_id.to_owned()));
} }
}, },
); );
let _ = self.client.add_event_handler(
|ev: SyncEphemeralRoomEvent<ReceiptEventContent>,
room: MatrixRoom,
store: Ctx<AsyncProgramStore>| {
async move {
let room_id = room.room_id();
let mut locked = store.lock().await;
let info = locked.application.get_room_info(room_id.to_owned());
for (event_id, receipts) in ev.content.0.into_iter() {
let Some(receipts) = receipts.get(&ReceiptType::Read) else {
continue;
};
for user_id in receipts.keys() {
info.set_receipt(user_id.to_owned(), event_id.clone());
}
}
}
},
);
let _ = self.client.add_event_handler( let _ = self.client.add_event_handler(
|ev: OriginalSyncRoomRedactionEvent, |ev: OriginalSyncRoomRedactionEvent,
room: MatrixRoom, room: MatrixRoom,
@ -992,7 +1030,7 @@ impl ClientWorker {
async move { async move {
let load = load_older_forever(&client, &store); let load = load_older_forever(&client, &store);
let rcpt = refresh_receipts_forever(&client, &store); let rcpt = send_receipts_forever(&client, &store);
let room = refresh_rooms_forever(&client, &store); let room = refresh_rooms_forever(&client, &store);
let ((), (), ()) = tokio::join!(load, rcpt, room); let ((), (), ()) = tokio::join!(load, rcpt, room);
} }