From f880358a83829fbd8191c0fe89bfee1b6b7ceba1 Mon Sep 17 00:00:00 2001 From: VAWVAW Date: Fri, 6 Jun 2025 01:11:57 +0000 Subject: [PATCH] Implement receipts per thread (#438) --- src/base.rs | 109 ++++++++++++++++++++++++++------- src/config.rs | 2 +- src/main.rs | 2 +- src/message/mod.rs | 47 +++++++++----- src/tests.rs | 2 +- src/windows/room/scrollback.rs | 2 +- src/worker.rs | 60 +++++++++--------- 7 files changed, 152 insertions(+), 72 deletions(-) diff --git a/src/base.rs b/src/base.rs index 362a7c0..1cd3071 100644 --- a/src/base.rs +++ b/src/base.rs @@ -12,6 +12,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use emojis::Emoji; +use matrix_sdk::ruma::events::receipt::ReceiptThread; use ratatui::{ buffer::Buffer, layout::{Alignment, Rect}, @@ -869,7 +870,6 @@ impl UnreadInfo { } /// Information about room's the user's joined. -#[derive(Default)] pub struct RoomInfo { /// The display name for this room. pub name: Option, @@ -884,15 +884,13 @@ pub struct RoomInfo { messages: Messages, /// A map of read markers to display on different events. - pub event_receipts: HashMap>, - + pub event_receipts: HashMap>>, /// A map of the most recent read marker for each user. /// /// Every receipt in this map should also have an entry in [`event_receipts`](`Self::event_receipts`), /// however not every user has an entry. If a user's most recent receipt is /// older than the oldest loaded event, that user will not be included. - pub user_receipts: HashMap, - + pub user_receipts: HashMap>, /// A map of message identifiers to a map of reaction events. pub reactions: HashMap, @@ -918,6 +916,28 @@ pub struct RoomInfo { pub draw_last: Option, } +impl Default for RoomInfo { + fn default() -> Self { + Self { + messages: Messages::new(ReceiptThread::Main), + + name: Default::default(), + tags: Default::default(), + keys: Default::default(), + event_receipts: Default::default(), + user_receipts: Default::default(), + reactions: Default::default(), + threads: Default::default(), + fetching: Default::default(), + fetch_id: Default::default(), + fetch_last: Default::default(), + users_typing: Default::default(), + display_names: Default::default(), + draw_last: Default::default(), + } + } +} + impl RoomInfo { pub fn get_thread(&self, root: Option<&EventId>) -> Option<&Messages> { if let Some(thread_root) = root { @@ -929,7 +949,9 @@ impl RoomInfo { pub fn get_thread_mut(&mut self, root: Option) -> &mut Messages { if let Some(thread_root) = root { - self.threads.entry(thread_root).or_default() + self.threads + .entry(thread_root.clone()) + .or_insert_with(|| Messages::thread(thread_root)) } else { &mut self.messages } @@ -1069,7 +1091,9 @@ impl RoomInfo { }; let source = if let Some(thread) = thread { - self.threads.entry(thread.clone()).or_default() + self.threads + .entry(thread.clone()) + .or_insert_with(|| Messages::thread(thread.clone())) } else { &mut self.messages }; @@ -1108,13 +1132,20 @@ impl RoomInfo { /// Indicates whether this room has unread messages. pub fn unreads(&self, settings: &ApplicationSettings) -> UnreadInfo { let last_message = self.messages.last_key_value(); - let last_receipt = self.get_receipt(&settings.profile.user_id); + let last_receipt = self + .user_receipts + .get(&ReceiptThread::Main) + .and_then(|receipts| receipts.get(&settings.profile.user_id)); match (last_message, last_receipt) { (Some(((ts, recent), _)), Some(last_read)) => { UnreadInfo { unread: last_read != recent, latest: Some(*ts) } }, - (Some(((ts, _), _)), None) => UnreadInfo { unread: false, latest: Some(*ts) }, + (Some(((ts, _), _)), None) => { + // If we've never loaded/generated a room's receipt (example, + // a newly joined but never viewed room), show it as unread. + UnreadInfo { unread: true, latest: Some(*ts) } + }, (None, _) => UnreadInfo::default(), } } @@ -1142,7 +1173,10 @@ impl RoomInfo { let event_id = msg.event_id().to_owned(); let key = (msg.origin_server_ts().into(), event_id.clone()); - let replies = self.threads.entry(thread_root.clone()).or_default(); + let replies = self + .threads + .entry(thread_root.clone()) + .or_insert_with(|| Messages::thread(thread_root.clone())); let loc = EventLocation::Message(Some(thread_root), key.clone()); self.keys.insert(event_id, loc); replies.insert_message(key, msg); @@ -1205,37 +1239,70 @@ impl RoomInfo { self.fetch_last.is_some_and(|i| i.elapsed() < ROOM_FETCH_DEBOUNCE) } - fn clear_receipt(&mut self, user_id: &OwnedUserId) -> Option<()> { - let old_event_id = self.user_receipts.get(user_id)?; - let old_receipts = self.event_receipts.get_mut(old_event_id)?; + fn clear_receipt(&mut self, thread: &ReceiptThread, user_id: &OwnedUserId) -> Option<()> { + let old_event_id = + self.user_receipts.get(thread).and_then(|receipts| receipts.get(user_id))?; + let old_thread = self.event_receipts.get_mut(thread)?; + let old_receipts = old_thread.get_mut(old_event_id)?; old_receipts.remove(user_id); if old_receipts.is_empty() { - self.event_receipts.remove(old_event_id); + old_thread.remove(old_event_id); + } + if old_thread.is_empty() { + self.event_receipts.remove(thread); } None } - pub fn set_receipt(&mut self, user_id: OwnedUserId, event_id: OwnedEventId) { - self.clear_receipt(&user_id); + pub fn set_receipt( + &mut self, + thread: ReceiptThread, + user_id: OwnedUserId, + event_id: OwnedEventId, + ) { + self.clear_receipt(&thread, &user_id); self.event_receipts + .entry(thread.clone()) + .or_default() .entry(event_id.clone()) .or_default() .insert(user_id.clone()); - self.user_receipts.insert(user_id, event_id); + self.user_receipts.entry(thread).or_default().insert(user_id, event_id); } - pub fn fully_read(&mut self, user_id: OwnedUserId) { + pub fn fully_read(&mut self, user_id: &UserId) { let Some(((_, event_id), _)) = self.messages.last_key_value() else { return; }; - self.set_receipt(user_id, event_id.clone()); + self.set_receipt(ReceiptThread::Main, user_id.to_owned(), event_id.clone()); + + let newest = self + .threads + .iter() + .filter_map(|(thread_id, messages)| { + let thread = ReceiptThread::Thread(thread_id.to_owned()); + + messages + .last_key_value() + .map(|((_, event_id), _)| (thread, event_id.to_owned())) + }) + .collect::>(); + + for (thread, event_id) in newest.into_iter() { + self.set_receipt(thread, user_id.to_owned(), event_id.clone()); + } } - pub fn get_receipt(&self, user_id: &UserId) -> Option<&OwnedEventId> { - self.user_receipts.get(user_id) + pub fn receipts<'a>( + &'a self, + user_id: &'a UserId, + ) -> impl Iterator + 'a { + self.user_receipts + .iter() + .filter_map(move |(t, rs)| rs.get(user_id).map(|r| (t, r))) } fn get_typers(&self) -> &[OwnedUserId] { diff --git a/src/config.rs b/src/config.rs index 4a11252..b712c73 100644 --- a/src/config.rs +++ b/src/config.rs @@ -982,7 +982,7 @@ impl ApplicationSettings { Ok(()) } - pub fn get_user_char_span<'a>(&self, user_id: &'a UserId) -> Span<'a> { + pub fn get_user_char_span(&self, user_id: &UserId) -> Span { let (color, c) = self .tunables .users diff --git a/src/main.rs b/src/main.rs index 4ca4f96..9b8fe46 100644 --- a/src/main.rs +++ b/src/main.rs @@ -559,7 +559,7 @@ impl Application { for room_id in store.application.sync_info.chats() { if let Some(room) = store.application.rooms.get_mut(room_id) { - room.fully_read(user_id.clone()); + room.fully_read(user_id); } } diff --git a/src/message/mod.rs b/src/message/mod.rs index e53c874..4608303 100644 --- a/src/message/mod.rs +++ b/src/message/mod.rs @@ -2,7 +2,6 @@ use std::borrow::Cow; use std::cmp::{Ord, Ordering, PartialOrd}; use std::collections::hash_map::DefaultHasher; -use std::collections::hash_set; use std::collections::BTreeMap; use std::convert::{TryFrom, TryInto}; use std::fmt::{self, Display}; @@ -11,6 +10,7 @@ use std::ops::{Deref, DerefMut}; use chrono::{DateTime, Local as LocalTz}; use humansize::{format_size, DECIMAL}; +use matrix_sdk::ruma::events::receipt::ReceiptThread; use serde_json::json; use unicode_width::UnicodeWidthStr; @@ -77,8 +77,7 @@ type ProtocolPreview<'a> = (&'a Protocol, u16, u16); pub type MessageKey = (MessageTimeStamp, OwnedEventId); -#[derive(Default)] -pub struct Messages(BTreeMap); +pub struct Messages(BTreeMap, pub ReceiptThread); impl Deref for Messages { type Target = BTreeMap; @@ -95,6 +94,18 @@ impl DerefMut for Messages { } impl Messages { + pub fn new(thread: ReceiptThread) -> Self { + Self(Default::default(), thread) + } + + pub fn main() -> Self { + Self::new(ReceiptThread::Main) + } + + pub fn thread(root: OwnedEventId) -> Self { + Self::new(ReceiptThread::Thread(root)) + } + pub fn insert_message(&mut self, key: MessageKey, msg: impl Into) { let event_id = key.1.clone(); let msg = msg.into(); @@ -633,8 +644,8 @@ struct MessageFormatter<'a> { /// The date the message was sent. date: Option>, - /// Iterator over the users who have read up to this message. - read: Option>, + /// The users who have read up to this message. + read: Vec, } impl<'a> MessageFormatter<'a> { @@ -667,13 +678,11 @@ impl<'a> MessageFormatter<'a> { line.push(time); // Show read receipts. - let user_char = - |user: &'a OwnedUserId| -> Span<'a> { settings.get_user_char_span(user) }; - let mut read = self.read.iter_mut().flatten(); + let user_char = |user: OwnedUserId| -> Span { settings.get_user_char_span(&user) }; - let a = read.next().map(user_char).unwrap_or_else(|| Span::raw(" ")); - let b = read.next().map(user_char).unwrap_or_else(|| Span::raw(" ")); - let c = read.next().map(user_char).unwrap_or_else(|| Span::raw(" ")); + let a = self.read.pop().map(user_char).unwrap_or_else(|| Span::raw(" ")); + let b = self.read.pop().map(user_char).unwrap_or_else(|| Span::raw(" ")); + let c = self.read.pop().map(user_char).unwrap_or_else(|| Span::raw(" ")); line.push(Span::raw(" ")); line.push(c); @@ -944,7 +953,13 @@ impl Message { let fill = width - user_gutter - TIME_GUTTER - READ_GUTTER; let user = self.show_sender(prev, true, info, settings); let time = self.timestamp.show_time(); - let read = info.event_receipts.get(self.event.event_id()).map(|read| read.iter()); + let read = info + .event_receipts + .values() + .filter_map(|receipts| receipts.get(self.event.event_id())) + .flat_map(|read| read.iter()) + .map(|user_id| user_id.to_owned()) + .collect(); MessageFormatter { settings, cols, orig, fill, user, date, time, read } } else if user_gutter + TIME_GUTTER + MIN_MSG_LEN <= width { @@ -952,7 +967,7 @@ impl Message { let fill = width - user_gutter - TIME_GUTTER; let user = self.show_sender(prev, true, info, settings); let time = self.timestamp.show_time(); - let read = None; + let read = Vec::new(); MessageFormatter { settings, cols, orig, fill, user, date, time, read } } else if user_gutter + MIN_MSG_LEN <= width { @@ -960,7 +975,7 @@ impl Message { let fill = width - user_gutter; let user = self.show_sender(prev, true, info, settings); let time = None; - let read = None; + let read = Vec::new(); MessageFormatter { settings, cols, orig, fill, user, date, time, read } } else { @@ -968,7 +983,7 @@ impl Message { let fill = width.saturating_sub(2); let user = self.show_sender(prev, false, info, settings); let time = None; - let read = None; + let read = Vec::new(); MessageFormatter { settings, cols, orig, fill, user, date, time, read } } @@ -1281,7 +1296,7 @@ pub mod tests { assert_eq!(k6, &MSG1_KEY.clone()); // MessageCursor::latest() fails to convert for a room w/o messages. - let messages_empty = Messages::default(); + let messages_empty = Messages::new(ReceiptThread::Main); assert_eq!(mc6.to_key(&messages_empty), None); } diff --git a/src/tests.rs b/src/tests.rs index 38a745c..52cb859 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -137,7 +137,7 @@ pub fn mock_keys() -> HashMap { } pub fn mock_messages() -> Messages { - let mut messages = Messages::default(); + let mut messages = Messages::main(); messages.insert(MSG1_KEY.clone(), mock_message1()); messages.insert(MSG2_KEY.clone(), mock_message2()); diff --git a/src/windows/room/scrollback.rs b/src/windows/room/scrollback.rs index 1566931..3d8ec7f 100644 --- a/src/windows/room/scrollback.rs +++ b/src/windows/room/scrollback.rs @@ -1428,7 +1428,7 @@ impl StatefulWidget for Scrollback<'_> { { // If the cursor is at the last message, then update the read marker. if let Some((k, _)) = thread.last_key_value() { - info.set_receipt(settings.profile.user_id.clone(), k.1.clone()); + info.set_receipt(thread.1.clone(), settings.profile.user_id.clone(), k.1.clone()); } } diff --git a/src/worker.rs b/src/worker.rs index 3d08584..144c34c 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -210,7 +210,7 @@ async fn update_event_receipts(info: &mut RoomInfo, room: &MatrixRoom, event_id: }; for (user_id, _) in receipts { - info.set_receipt(user_id, event_id.to_owned()); + info.set_receipt(ReceiptThread::Main, user_id, event_id.to_owned()); } } @@ -300,7 +300,7 @@ async fn load_older_one( let event_id = msg.event_id(); let receipts = match room - .load_event_receipts(ReceiptType::Read, ReceiptThread::Unthreaded, event_id) + .load_event_receipts(ReceiptType::Read, ReceiptThread::Main, event_id) .await { Ok(receipts) => receipts.into_iter().map(|(u, _)| u).collect(), @@ -338,7 +338,7 @@ fn load_insert( let _ = presences.get_or_default(sender); for user_id in receipts { - info.set_receipt(user_id, msg.event_id().to_owned()); + info.set_receipt(ReceiptThread::Main, user_id, msg.event_id().to_owned()); } match msg { @@ -497,31 +497,32 @@ async fn refresh_rooms_forever(client: &Client, store: &AsyncProgramStore) { async fn send_receipts_forever(client: &Client, store: &AsyncProgramStore) { let mut interval = tokio::time::interval(Duration::from_secs(2)); - let mut sent = HashMap::::default(); + let mut sent: HashMap> = Default::default(); loop { interval.tick().await; let locked = store.lock().await; let user_id = &locked.application.settings.profile.user_id; - let updates = client - .joined_rooms() - .into_iter() - .filter_map(|room| { - let room_id = room.room_id().to_owned(); - let info = locked.application.rooms.get(&room_id)?; - let new_receipt = info.get_receipt(user_id)?; - let old_receipt = sent.get(&room_id); - if Some(new_receipt) != old_receipt { - Some((room_id, new_receipt.clone())) - } else { - None - } - }) - .collect::>(); + + let mut updates = Vec::new(); + for room in client.joined_rooms() { + let room_id = room.room_id(); + let Some(info) = locked.application.rooms.get(&room_id) else { + continue; + }; + + let changed = info.receipts(user_id).filter_map(|(thread, new_receipt)| { + let old_receipt = sent.get(room_id).and_then(|ts| ts.get(thread)); + let changed = Some(new_receipt) != old_receipt; + changed.then(|| (room_id.to_owned(), thread.to_owned(), new_receipt.to_owned())) + }); + + updates.extend(changed); + } drop(locked); - for (room_id, new_receipt) in updates { + for (room_id, thread, new_receipt) in updates { use matrix_sdk::ruma::api::client::receipt::create_receipt::v3::ReceiptType; let Some(room) = client.get_room(&room_id) else { @@ -529,15 +530,11 @@ async fn send_receipts_forever(client: &Client, store: &AsyncProgramStore) { }; match room - .send_single_receipt( - ReceiptType::Read, - ReceiptThread::Unthreaded, - new_receipt.clone(), - ) + .send_single_receipt(ReceiptType::Read, thread.to_owned(), new_receipt.clone()) .await { Ok(()) => { - sent.insert(room_id, new_receipt); + sent.entry(room_id).or_default().insert(thread, new_receipt); }, Err(e) => tracing::warn!(?room_id, "Failed to set read receipt: {e}"), } @@ -1050,11 +1047,12 @@ impl ClientWorker { let Some(receipts) = receipts.get(&ReceiptType::Read) else { continue; }; - for (user_id, _) in receipts - .iter() - .filter(|(_, rcpt)| rcpt.thread == ReceiptThread::Unthreaded) - { - info.set_receipt(user_id.to_owned(), event_id.clone()); + for (user_id, rcpt) in receipts.iter() { + info.set_receipt( + rcpt.thread.clone(), + user_id.to_owned(), + event_id.clone(), + ); } } }