From b2b47ed7a0a04577a4c93d066ee1341c7aa05df9 Mon Sep 17 00:00:00 2001 From: Benjamin Lee Date: Sun, 15 Oct 2023 18:12:39 -0700 Subject: [PATCH] Reduce CPU usage by instead fetching read receipts after related sync events (#168) --- src/base.rs | 53 ++++++++++--------- src/message/mod.rs | 22 ++++---- src/tests.rs | 3 +- src/worker.rs | 126 +++++++++++++++++++++++++++++---------------- 4 files changed, 123 insertions(+), 81 deletions(-) diff --git a/src/base.rs b/src/base.rs index 7089bb4..07d825a 100644 --- a/src/base.rs +++ b/src/base.rs @@ -413,9 +413,6 @@ pub type IambResult = UIResult; /// it's reacting to. pub type MessageReactions = HashMap; -/// Map of read receipts for different events. -pub type Receipts = HashMap>; - /// Errors encountered during application use. #[derive(thiserror::Error, Debug)] pub enum IambError { @@ -549,7 +546,14 @@ pub struct RoomInfo { pub messages: Messages, /// A map of read markers to display on different events. - pub receipts: HashMap>, + pub event_receipts: HashMap>, + + /// A map of the most recent read marker for each user. + /// + /// Every receipt in this map should also have an entry in [`event_receipts`], + /// however not every user has an entry. If a user's most recent receipt is + /// older than the oldest loaded event, that user will not be included. + pub user_receipts: HashMap, /// An event ID for where we should indicate we've read up to. pub read_till: Option, @@ -698,6 +702,27 @@ impl RoomInfo { self.fetch_last.map_or(false, |i| i.elapsed() < ROOM_FETCH_DEBOUNCE) } + fn clear_receipt(&mut self, user_id: &OwnedUserId) -> Option<()> { + let old_event_id = self.user_receipts.get(user_id)?; + let old_receipts = self.event_receipts.get_mut(old_event_id)?; + old_receipts.remove(user_id); + + if old_receipts.is_empty() { + self.event_receipts.remove(old_event_id); + } + + None + } + + pub fn set_receipt(&mut self, user_id: OwnedUserId, event_id: OwnedEventId) { + self.clear_receipt(&user_id); + self.event_receipts + .entry(event_id.clone()) + .or_default() + .insert(user_id.clone()); + self.user_receipts.insert(user_id, event_id); + } + fn get_typers(&self) -> &[OwnedUserId] { if let Some((t, users)) = &self.users_typing { if t.elapsed() < Duration::from_secs(4) { @@ -860,26 +885,6 @@ impl ChatStore { .unwrap_or_else(|| "Untitled Matrix Room".to_string()) } - /// Update the receipts for multiple rooms. - pub async fn set_receipts( - &mut self, - receipts: Vec<(OwnedRoomId, Receipts)>, - ) -> Vec<(OwnedRoomId, OwnedEventId)> { - let mut updates = vec![]; - - for (room_id, receipts) in receipts.into_iter() { - if let Some(info) = self.rooms.get_mut(&room_id) { - info.receipts = receipts; - - if let Some(read_till) = info.read_till.take() { - updates.push((room_id, read_till)); - } - } - } - - return updates; - } - /// Mark a room for loading more scrollback. pub fn mark_for_load(&mut self, room_id: OwnedRoomId) { self.need_load.insert(room_id); diff --git a/src/message/mod.rs b/src/message/mod.rs index de76bfa..118c01e 100644 --- a/src/message/mod.rs +++ b/src/message/mod.rs @@ -2,10 +2,10 @@ use std::borrow::Cow; use std::cmp::{Ord, Ordering, PartialOrd}; use std::collections::hash_map::DefaultHasher; +use std::collections::hash_set; use std::collections::BTreeMap; use std::convert::TryFrom; use std::hash::{Hash, Hasher}; -use std::slice::Iter; use chrono::{DateTime, Local as LocalTz, NaiveDateTime, TimeZone}; use comrak::{markdown_to_html, ComrakOptions}; @@ -501,7 +501,7 @@ struct MessageFormatter<'a> { date: Option>, /// Iterator over the users who have read up to this message. - read: Iter<'a, OwnedUserId>, + read: Option>, } impl<'a> MessageFormatter<'a> { @@ -533,10 +533,11 @@ impl<'a> MessageFormatter<'a> { // Show read receipts. let user_char = |user: &'a OwnedUserId| -> Span<'a> { settings.get_user_char_span(user) }; + let mut read = self.read.iter_mut().flatten(); - let a = self.read.next().map(user_char).unwrap_or_else(|| Span::raw(" ")); - let b = self.read.next().map(user_char).unwrap_or_else(|| Span::raw(" ")); - let c = self.read.next().map(user_char).unwrap_or_else(|| Span::raw(" ")); + let a = read.next().map(user_char).unwrap_or_else(|| Span::raw(" ")); + let b = read.next().map(user_char).unwrap_or_else(|| Span::raw(" ")); + let c = read.next().map(user_char).unwrap_or_else(|| Span::raw(" ")); line.push(Span::raw(" ")); line.push(c); @@ -650,10 +651,7 @@ impl Message { let fill = width - USER_GUTTER - TIME_GUTTER - READ_GUTTER; let user = self.show_sender(prev, true, info, settings); let time = self.timestamp.show_time(); - let read = match info.receipts.get(self.event.event_id()) { - Some(read) => read.iter(), - None => [].iter(), - }; + let read = info.event_receipts.get(self.event.event_id()).map(|read| read.iter()); MessageFormatter { settings, cols, orig, fill, user, date, time, read } } else if USER_GUTTER + TIME_GUTTER + MIN_MSG_LEN <= width { @@ -661,7 +659,7 @@ impl Message { let fill = width - USER_GUTTER - TIME_GUTTER; let user = self.show_sender(prev, true, info, settings); let time = self.timestamp.show_time(); - let read = [].iter(); + let read = None; MessageFormatter { settings, cols, orig, fill, user, date, time, read } } else if USER_GUTTER + MIN_MSG_LEN <= width { @@ -669,7 +667,7 @@ impl Message { let fill = width - USER_GUTTER; let user = self.show_sender(prev, true, info, settings); let time = None; - let read = [].iter(); + let read = None; MessageFormatter { settings, cols, orig, fill, user, date, time, read } } else { @@ -677,7 +675,7 @@ impl Message { let fill = width.saturating_sub(2); let user = self.show_sender(prev, false, info, settings); let time = None; - let read = [].iter(); + let read = None; MessageFormatter { settings, cols, orig, fill, user, date, time, read } } diff --git a/src/tests.rs b/src/tests.rs index 94d120f..aa687e3 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -153,7 +153,8 @@ pub fn mock_room() -> RoomInfo { keys: mock_keys(), messages: mock_messages(), - receipts: HashMap::new(), + event_receipts: HashMap::new(), + user_receipts: HashMap::new(), read_till: None, reactions: HashMap::new(), diff --git a/src/worker.rs b/src/worker.rs index 646f010..255d41c 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -41,6 +41,7 @@ use matrix_sdk::{ }, presence::PresenceEvent, reaction::ReactionEventContent, + receipt::{ReceiptEventContent, ReceiptType}, room::{ encryption::RoomEncryptionEventContent, member::OriginalSyncRoomMemberEvent, @@ -55,12 +56,14 @@ use matrix_sdk::{ AnyTimelineEvent, EmptyStateKey, InitialStateEvent, + SyncEphemeralRoomEvent, SyncMessageLikeEvent, SyncStateEvent, }, room::RoomType, serde::Raw, EventEncryptionAlgorithm, + EventId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, @@ -84,8 +87,8 @@ use crate::{ EventLocation, IambError, IambResult, - Receipts, RoomFetchStatus, + RoomInfo, VerifyAction, }, message::MessageFetchResult, @@ -171,6 +174,19 @@ pub async fn create_room( return Ok(resp.room_id); } +async fn update_event_receipts(info: &mut RoomInfo, room: &MatrixRoom, event_id: &EventId) { + let receipts = match room.event_read_receipts(event_id).await { + Ok(receipts) => receipts, + Err(e) => { + tracing::warn!(?event_id, "failed to get event receipts: {e}"); + return; + }, + }; + for (user_id, _) in receipts { + info.set_receipt(user_id, event_id.to_owned()); + } +} + async fn load_plan(store: &AsyncProgramStore) -> HashMap> { let mut locked = store.lock().await; let ChatStore { need_load, rooms, .. } = &mut locked.application; @@ -200,7 +216,7 @@ async fn load_plan(store: &AsyncProgramStore) -> HashMap, limit: u32, @@ -228,7 +244,12 @@ async fn load_older_one( } } -async fn load_insert(room_id: OwnedRoomId, res: MessageFetchResult, store: AsyncProgramStore) { +async fn load_insert( + client: &Client, + room_id: OwnedRoomId, + res: MessageFetchResult, + store: AsyncProgramStore, +) { let mut locked = store.lock().await; let ChatStore { need_load, presences, rooms, .. } = &mut locked.application; let info = rooms.get_or_default(room_id.clone()); @@ -240,6 +261,10 @@ async fn load_insert(room_id: OwnedRoomId, res: MessageFetchResult, store: Async let sender = msg.sender().to_owned(); let _ = presences.get_or_default(sender); + if let Some(room) = client.get_room(&room_id) { + update_event_receipts(info, &room, msg.event_id()).await; + } + match msg { AnyMessageLikeEvent::RoomEncrypted(msg) => { info.insert_encrypted(msg); @@ -277,8 +302,8 @@ async fn load_older(client: &Client, store: &AsyncProgramStore) -> usize { let store = store.clone(); async move { - let res = load_older_one(client, room_id.as_ref(), fetch_id, limit).await; - load_insert(room_id, res, store).await; + let res = load_older_one(&client, room_id.as_ref(), fetch_id, limit).await; + load_insert(&client, room_id, res, store).await; } }) .collect::>() @@ -357,28 +382,40 @@ async fn refresh_rooms_forever(client: &Client, store: &AsyncProgramStore) { } } -async fn refresh_receipts_forever(client: &Client, store: &AsyncProgramStore) { - // Update the displayed read receipts every 5 seconds. - let mut interval = tokio::time::interval(Duration::from_secs(5)); +async fn send_receipts_forever(client: &Client, store: &AsyncProgramStore) { + let mut interval = tokio::time::interval(Duration::from_secs(2)); let mut sent = HashMap::::default(); loop { interval.tick().await; - let receipts = update_receipts(client).await; - let read = store.lock().await.application.set_receipts(receipts).await; - for (room_id, read_till) in read.into_iter() { - if let Some(read_sent) = sent.get(&room_id) { - if read_sent == &read_till { - // Skip unchanged receipts. - continue; + let locked = store.lock().await; + let updates = client + .joined_rooms() + .into_iter() + .filter_map(|room| { + let room_id = room.room_id().to_owned(); + let info = locked.application.rooms.get(&room_id)?; + let new_receipt = info.read_till.as_ref()?; + let old_receipt = sent.get(&room_id); + if Some(new_receipt) != old_receipt { + Some((room_id, new_receipt.clone())) + } else { + None } - } + }) + .collect::>(); + drop(locked); - if let Some(room) = client.get_joined_room(&room_id) { - if room.read_receipt(&read_till).await.is_ok() { - sent.insert(room_id, read_till); - } + for (room_id, new_receipt) in updates { + let Some(room) = client.get_joined_room(&room_id) else { + continue; + }; + match room.read_receipt(&new_receipt).await { + Ok(()) => { + sent.insert(room_id, new_receipt); + }, + Err(e) => tracing::warn!(?room_id, "Failed to set read receipt: {e}"), } } } @@ -413,29 +450,6 @@ fn oneshot() -> (ClientReply, ClientResponse) { return (reply, response); } -async fn update_receipts(client: &Client) -> Vec<(OwnedRoomId, Receipts)> { - let mut rooms = vec![]; - - for room in client.joined_rooms() { - if let Ok(users) = room.active_members_no_sync().await { - let mut receipts = Receipts::new(); - - for member in users { - let res = room.user_read_receipt(member.user_id()).await; - - if let Ok(Some((event_id, _))) = res { - let user_id = member.user_id().to_owned(); - receipts.entry(event_id).or_default().push(user_id); - } - } - - rooms.push((room.room_id().to_owned(), receipts)); - } - } - - return rooms; -} - pub type FetchedRoom = (MatrixRoom, DisplayName, Option); pub enum WorkerTask { @@ -787,6 +801,7 @@ impl ClientWorker { let _ = locked.application.presences.get_or_default(sender); let info = locked.application.get_room_info(room_id.to_owned()); + update_event_receipts(info, &room, ev.event_id()).await; info.insert(ev.into_full_event(room_id.to_owned())); } }, @@ -805,11 +820,34 @@ impl ClientWorker { let _ = locked.application.presences.get_or_default(sender); let info = locked.application.get_room_info(room_id.to_owned()); + update_event_receipts(info, &room, ev.event_id()).await; info.insert_reaction(ev.into_full_event(room_id.to_owned())); } }, ); + let _ = self.client.add_event_handler( + |ev: SyncEphemeralRoomEvent, + room: MatrixRoom, + store: Ctx| { + async move { + let room_id = room.room_id(); + + let mut locked = store.lock().await; + + let info = locked.application.get_room_info(room_id.to_owned()); + for (event_id, receipts) in ev.content.0.into_iter() { + let Some(receipts) = receipts.get(&ReceiptType::Read) else { + continue; + }; + for user_id in receipts.keys() { + info.set_receipt(user_id.to_owned(), event_id.clone()); + } + } + } + }, + ); + let _ = self.client.add_event_handler( |ev: OriginalSyncRoomRedactionEvent, room: MatrixRoom, @@ -992,7 +1030,7 @@ impl ClientWorker { async move { let load = load_older_forever(&client, &store); - let rcpt = refresh_receipts_forever(&client, &store); + let rcpt = send_receipts_forever(&client, &store); let room = refresh_rooms_forever(&client, &store); let ((), (), ()) = tokio::join!(load, rcpt, room); }