Implement receipts per thread (#438)

This commit is contained in:
VAWVAW 2025-06-06 01:11:57 +00:00 committed by GitHub
parent f0de97a049
commit f880358a83
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 152 additions and 72 deletions

View file

@ -12,6 +12,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use emojis::Emoji; use emojis::Emoji;
use matrix_sdk::ruma::events::receipt::ReceiptThread;
use ratatui::{ use ratatui::{
buffer::Buffer, buffer::Buffer,
layout::{Alignment, Rect}, layout::{Alignment, Rect},
@ -869,7 +870,6 @@ impl UnreadInfo {
} }
/// Information about room's the user's joined. /// Information about room's the user's joined.
#[derive(Default)]
pub struct RoomInfo { pub struct RoomInfo {
/// The display name for this room. /// The display name for this room.
pub name: Option<String>, pub name: Option<String>,
@ -884,15 +884,13 @@ pub struct RoomInfo {
messages: Messages, messages: Messages,
/// A map of read markers to display on different events. /// A map of read markers to display on different events.
pub event_receipts: HashMap<OwnedEventId, HashSet<OwnedUserId>>, pub event_receipts: HashMap<ReceiptThread, HashMap<OwnedEventId, HashSet<OwnedUserId>>>,
/// A map of the most recent read marker for each user. /// A map of the most recent read marker for each user.
/// ///
/// Every receipt in this map should also have an entry in [`event_receipts`](`Self::event_receipts`), /// Every receipt in this map should also have an entry in [`event_receipts`](`Self::event_receipts`),
/// however not every user has an entry. If a user's most recent receipt is /// however not every user has an entry. If a user's most recent receipt is
/// older than the oldest loaded event, that user will not be included. /// older than the oldest loaded event, that user will not be included.
pub user_receipts: HashMap<OwnedUserId, OwnedEventId>, pub user_receipts: HashMap<ReceiptThread, HashMap<OwnedUserId, OwnedEventId>>,
/// A map of message identifiers to a map of reaction events. /// A map of message identifiers to a map of reaction events.
pub reactions: HashMap<OwnedEventId, MessageReactions>, pub reactions: HashMap<OwnedEventId, MessageReactions>,
@ -918,6 +916,28 @@ pub struct RoomInfo {
pub draw_last: Option<Instant>, pub draw_last: Option<Instant>,
} }
impl Default for RoomInfo {
fn default() -> Self {
Self {
messages: Messages::new(ReceiptThread::Main),
name: Default::default(),
tags: Default::default(),
keys: Default::default(),
event_receipts: Default::default(),
user_receipts: Default::default(),
reactions: Default::default(),
threads: Default::default(),
fetching: Default::default(),
fetch_id: Default::default(),
fetch_last: Default::default(),
users_typing: Default::default(),
display_names: Default::default(),
draw_last: Default::default(),
}
}
}
impl RoomInfo { impl RoomInfo {
pub fn get_thread(&self, root: Option<&EventId>) -> Option<&Messages> { pub fn get_thread(&self, root: Option<&EventId>) -> Option<&Messages> {
if let Some(thread_root) = root { if let Some(thread_root) = root {
@ -929,7 +949,9 @@ impl RoomInfo {
pub fn get_thread_mut(&mut self, root: Option<OwnedEventId>) -> &mut Messages { pub fn get_thread_mut(&mut self, root: Option<OwnedEventId>) -> &mut Messages {
if let Some(thread_root) = root { if let Some(thread_root) = root {
self.threads.entry(thread_root).or_default() self.threads
.entry(thread_root.clone())
.or_insert_with(|| Messages::thread(thread_root))
} else { } else {
&mut self.messages &mut self.messages
} }
@ -1069,7 +1091,9 @@ impl RoomInfo {
}; };
let source = if let Some(thread) = thread { let source = if let Some(thread) = thread {
self.threads.entry(thread.clone()).or_default() self.threads
.entry(thread.clone())
.or_insert_with(|| Messages::thread(thread.clone()))
} else { } else {
&mut self.messages &mut self.messages
}; };
@ -1108,13 +1132,20 @@ impl RoomInfo {
/// Indicates whether this room has unread messages. /// Indicates whether this room has unread messages.
pub fn unreads(&self, settings: &ApplicationSettings) -> UnreadInfo { pub fn unreads(&self, settings: &ApplicationSettings) -> UnreadInfo {
let last_message = self.messages.last_key_value(); let last_message = self.messages.last_key_value();
let last_receipt = self.get_receipt(&settings.profile.user_id); let last_receipt = self
.user_receipts
.get(&ReceiptThread::Main)
.and_then(|receipts| receipts.get(&settings.profile.user_id));
match (last_message, last_receipt) { match (last_message, last_receipt) {
(Some(((ts, recent), _)), Some(last_read)) => { (Some(((ts, recent), _)), Some(last_read)) => {
UnreadInfo { unread: last_read != recent, latest: Some(*ts) } UnreadInfo { unread: last_read != recent, latest: Some(*ts) }
}, },
(Some(((ts, _), _)), None) => UnreadInfo { unread: false, latest: Some(*ts) }, (Some(((ts, _), _)), None) => {
// If we've never loaded/generated a room's receipt (example,
// a newly joined but never viewed room), show it as unread.
UnreadInfo { unread: true, latest: Some(*ts) }
},
(None, _) => UnreadInfo::default(), (None, _) => UnreadInfo::default(),
} }
} }
@ -1142,7 +1173,10 @@ impl RoomInfo {
let event_id = msg.event_id().to_owned(); let event_id = msg.event_id().to_owned();
let key = (msg.origin_server_ts().into(), event_id.clone()); let key = (msg.origin_server_ts().into(), event_id.clone());
let replies = self.threads.entry(thread_root.clone()).or_default(); let replies = self
.threads
.entry(thread_root.clone())
.or_insert_with(|| Messages::thread(thread_root.clone()));
let loc = EventLocation::Message(Some(thread_root), key.clone()); let loc = EventLocation::Message(Some(thread_root), key.clone());
self.keys.insert(event_id, loc); self.keys.insert(event_id, loc);
replies.insert_message(key, msg); replies.insert_message(key, msg);
@ -1205,37 +1239,70 @@ impl RoomInfo {
self.fetch_last.is_some_and(|i| i.elapsed() < ROOM_FETCH_DEBOUNCE) self.fetch_last.is_some_and(|i| i.elapsed() < ROOM_FETCH_DEBOUNCE)
} }
fn clear_receipt(&mut self, user_id: &OwnedUserId) -> Option<()> { fn clear_receipt(&mut self, thread: &ReceiptThread, user_id: &OwnedUserId) -> Option<()> {
let old_event_id = self.user_receipts.get(user_id)?; let old_event_id =
let old_receipts = self.event_receipts.get_mut(old_event_id)?; self.user_receipts.get(thread).and_then(|receipts| receipts.get(user_id))?;
let old_thread = self.event_receipts.get_mut(thread)?;
let old_receipts = old_thread.get_mut(old_event_id)?;
old_receipts.remove(user_id); old_receipts.remove(user_id);
if old_receipts.is_empty() { if old_receipts.is_empty() {
self.event_receipts.remove(old_event_id); old_thread.remove(old_event_id);
}
if old_thread.is_empty() {
self.event_receipts.remove(thread);
} }
None None
} }
pub fn set_receipt(&mut self, user_id: OwnedUserId, event_id: OwnedEventId) { pub fn set_receipt(
self.clear_receipt(&user_id); &mut self,
thread: ReceiptThread,
user_id: OwnedUserId,
event_id: OwnedEventId,
) {
self.clear_receipt(&thread, &user_id);
self.event_receipts self.event_receipts
.entry(thread.clone())
.or_default()
.entry(event_id.clone()) .entry(event_id.clone())
.or_default() .or_default()
.insert(user_id.clone()); .insert(user_id.clone());
self.user_receipts.insert(user_id, event_id); self.user_receipts.entry(thread).or_default().insert(user_id, event_id);
} }
pub fn fully_read(&mut self, user_id: OwnedUserId) { pub fn fully_read(&mut self, user_id: &UserId) {
let Some(((_, event_id), _)) = self.messages.last_key_value() else { let Some(((_, event_id), _)) = self.messages.last_key_value() else {
return; return;
}; };
self.set_receipt(user_id, event_id.clone()); self.set_receipt(ReceiptThread::Main, user_id.to_owned(), event_id.clone());
let newest = self
.threads
.iter()
.filter_map(|(thread_id, messages)| {
let thread = ReceiptThread::Thread(thread_id.to_owned());
messages
.last_key_value()
.map(|((_, event_id), _)| (thread, event_id.to_owned()))
})
.collect::<Vec<_>>();
for (thread, event_id) in newest.into_iter() {
self.set_receipt(thread, user_id.to_owned(), event_id.clone());
}
} }
pub fn get_receipt(&self, user_id: &UserId) -> Option<&OwnedEventId> { pub fn receipts<'a>(
self.user_receipts.get(user_id) &'a self,
user_id: &'a UserId,
) -> impl Iterator<Item = (&'a ReceiptThread, &'a OwnedEventId)> + 'a {
self.user_receipts
.iter()
.filter_map(move |(t, rs)| rs.get(user_id).map(|r| (t, r)))
} }
fn get_typers(&self) -> &[OwnedUserId] { fn get_typers(&self) -> &[OwnedUserId] {

View file

@ -982,7 +982,7 @@ impl ApplicationSettings {
Ok(()) Ok(())
} }
pub fn get_user_char_span<'a>(&self, user_id: &'a UserId) -> Span<'a> { pub fn get_user_char_span(&self, user_id: &UserId) -> Span {
let (color, c) = self let (color, c) = self
.tunables .tunables
.users .users

View file

@ -559,7 +559,7 @@ impl Application {
for room_id in store.application.sync_info.chats() { for room_id in store.application.sync_info.chats() {
if let Some(room) = store.application.rooms.get_mut(room_id) { if let Some(room) = store.application.rooms.get_mut(room_id) {
room.fully_read(user_id.clone()); room.fully_read(user_id);
} }
} }

View file

@ -2,7 +2,6 @@
use std::borrow::Cow; use std::borrow::Cow;
use std::cmp::{Ord, Ordering, PartialOrd}; use std::cmp::{Ord, Ordering, PartialOrd};
use std::collections::hash_map::DefaultHasher; use std::collections::hash_map::DefaultHasher;
use std::collections::hash_set;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::convert::{TryFrom, TryInto}; use std::convert::{TryFrom, TryInto};
use std::fmt::{self, Display}; use std::fmt::{self, Display};
@ -11,6 +10,7 @@ use std::ops::{Deref, DerefMut};
use chrono::{DateTime, Local as LocalTz}; use chrono::{DateTime, Local as LocalTz};
use humansize::{format_size, DECIMAL}; use humansize::{format_size, DECIMAL};
use matrix_sdk::ruma::events::receipt::ReceiptThread;
use serde_json::json; use serde_json::json;
use unicode_width::UnicodeWidthStr; use unicode_width::UnicodeWidthStr;
@ -77,8 +77,7 @@ type ProtocolPreview<'a> = (&'a Protocol, u16, u16);
pub type MessageKey = (MessageTimeStamp, OwnedEventId); pub type MessageKey = (MessageTimeStamp, OwnedEventId);
#[derive(Default)] pub struct Messages(BTreeMap<MessageKey, Message>, pub ReceiptThread);
pub struct Messages(BTreeMap<MessageKey, Message>);
impl Deref for Messages { impl Deref for Messages {
type Target = BTreeMap<MessageKey, Message>; type Target = BTreeMap<MessageKey, Message>;
@ -95,6 +94,18 @@ impl DerefMut for Messages {
} }
impl Messages { impl Messages {
pub fn new(thread: ReceiptThread) -> Self {
Self(Default::default(), thread)
}
pub fn main() -> Self {
Self::new(ReceiptThread::Main)
}
pub fn thread(root: OwnedEventId) -> Self {
Self::new(ReceiptThread::Thread(root))
}
pub fn insert_message(&mut self, key: MessageKey, msg: impl Into<Message>) { pub fn insert_message(&mut self, key: MessageKey, msg: impl Into<Message>) {
let event_id = key.1.clone(); let event_id = key.1.clone();
let msg = msg.into(); let msg = msg.into();
@ -633,8 +644,8 @@ struct MessageFormatter<'a> {
/// The date the message was sent. /// The date the message was sent.
date: Option<Span<'a>>, date: Option<Span<'a>>,
/// Iterator over the users who have read up to this message. /// The users who have read up to this message.
read: Option<hash_set::Iter<'a, OwnedUserId>>, read: Vec<OwnedUserId>,
} }
impl<'a> MessageFormatter<'a> { impl<'a> MessageFormatter<'a> {
@ -667,13 +678,11 @@ impl<'a> MessageFormatter<'a> {
line.push(time); line.push(time);
// Show read receipts. // Show read receipts.
let user_char = let user_char = |user: OwnedUserId| -> Span { settings.get_user_char_span(&user) };
|user: &'a OwnedUserId| -> Span<'a> { settings.get_user_char_span(user) };
let mut read = self.read.iter_mut().flatten();
let a = read.next().map(user_char).unwrap_or_else(|| Span::raw(" ")); let a = self.read.pop().map(user_char).unwrap_or_else(|| Span::raw(" "));
let b = read.next().map(user_char).unwrap_or_else(|| Span::raw(" ")); let b = self.read.pop().map(user_char).unwrap_or_else(|| Span::raw(" "));
let c = read.next().map(user_char).unwrap_or_else(|| Span::raw(" ")); let c = self.read.pop().map(user_char).unwrap_or_else(|| Span::raw(" "));
line.push(Span::raw(" ")); line.push(Span::raw(" "));
line.push(c); line.push(c);
@ -944,7 +953,13 @@ impl Message {
let fill = width - user_gutter - TIME_GUTTER - READ_GUTTER; let fill = width - user_gutter - TIME_GUTTER - READ_GUTTER;
let user = self.show_sender(prev, true, info, settings); let user = self.show_sender(prev, true, info, settings);
let time = self.timestamp.show_time(); let time = self.timestamp.show_time();
let read = info.event_receipts.get(self.event.event_id()).map(|read| read.iter()); let read = info
.event_receipts
.values()
.filter_map(|receipts| receipts.get(self.event.event_id()))
.flat_map(|read| read.iter())
.map(|user_id| user_id.to_owned())
.collect();
MessageFormatter { settings, cols, orig, fill, user, date, time, read } MessageFormatter { settings, cols, orig, fill, user, date, time, read }
} else if user_gutter + TIME_GUTTER + MIN_MSG_LEN <= width { } else if user_gutter + TIME_GUTTER + MIN_MSG_LEN <= width {
@ -952,7 +967,7 @@ impl Message {
let fill = width - user_gutter - TIME_GUTTER; let fill = width - user_gutter - TIME_GUTTER;
let user = self.show_sender(prev, true, info, settings); let user = self.show_sender(prev, true, info, settings);
let time = self.timestamp.show_time(); let time = self.timestamp.show_time();
let read = None; let read = Vec::new();
MessageFormatter { settings, cols, orig, fill, user, date, time, read } MessageFormatter { settings, cols, orig, fill, user, date, time, read }
} else if user_gutter + MIN_MSG_LEN <= width { } else if user_gutter + MIN_MSG_LEN <= width {
@ -960,7 +975,7 @@ impl Message {
let fill = width - user_gutter; let fill = width - user_gutter;
let user = self.show_sender(prev, true, info, settings); let user = self.show_sender(prev, true, info, settings);
let time = None; let time = None;
let read = None; let read = Vec::new();
MessageFormatter { settings, cols, orig, fill, user, date, time, read } MessageFormatter { settings, cols, orig, fill, user, date, time, read }
} else { } else {
@ -968,7 +983,7 @@ impl Message {
let fill = width.saturating_sub(2); let fill = width.saturating_sub(2);
let user = self.show_sender(prev, false, info, settings); let user = self.show_sender(prev, false, info, settings);
let time = None; let time = None;
let read = None; let read = Vec::new();
MessageFormatter { settings, cols, orig, fill, user, date, time, read } MessageFormatter { settings, cols, orig, fill, user, date, time, read }
} }
@ -1281,7 +1296,7 @@ pub mod tests {
assert_eq!(k6, &MSG1_KEY.clone()); assert_eq!(k6, &MSG1_KEY.clone());
// MessageCursor::latest() fails to convert for a room w/o messages. // MessageCursor::latest() fails to convert for a room w/o messages.
let messages_empty = Messages::default(); let messages_empty = Messages::new(ReceiptThread::Main);
assert_eq!(mc6.to_key(&messages_empty), None); assert_eq!(mc6.to_key(&messages_empty), None);
} }

View file

@ -137,7 +137,7 @@ pub fn mock_keys() -> HashMap<OwnedEventId, EventLocation> {
} }
pub fn mock_messages() -> Messages { pub fn mock_messages() -> Messages {
let mut messages = Messages::default(); let mut messages = Messages::main();
messages.insert(MSG1_KEY.clone(), mock_message1()); messages.insert(MSG1_KEY.clone(), mock_message1());
messages.insert(MSG2_KEY.clone(), mock_message2()); messages.insert(MSG2_KEY.clone(), mock_message2());

View file

@ -1428,7 +1428,7 @@ impl StatefulWidget for Scrollback<'_> {
{ {
// If the cursor is at the last message, then update the read marker. // If the cursor is at the last message, then update the read marker.
if let Some((k, _)) = thread.last_key_value() { if let Some((k, _)) = thread.last_key_value() {
info.set_receipt(settings.profile.user_id.clone(), k.1.clone()); info.set_receipt(thread.1.clone(), settings.profile.user_id.clone(), k.1.clone());
} }
} }

View file

@ -210,7 +210,7 @@ async fn update_event_receipts(info: &mut RoomInfo, room: &MatrixRoom, event_id:
}; };
for (user_id, _) in receipts { for (user_id, _) in receipts {
info.set_receipt(user_id, event_id.to_owned()); info.set_receipt(ReceiptThread::Main, user_id, event_id.to_owned());
} }
} }
@ -300,7 +300,7 @@ async fn load_older_one(
let event_id = msg.event_id(); let event_id = msg.event_id();
let receipts = match room let receipts = match room
.load_event_receipts(ReceiptType::Read, ReceiptThread::Unthreaded, event_id) .load_event_receipts(ReceiptType::Read, ReceiptThread::Main, event_id)
.await .await
{ {
Ok(receipts) => receipts.into_iter().map(|(u, _)| u).collect(), Ok(receipts) => receipts.into_iter().map(|(u, _)| u).collect(),
@ -338,7 +338,7 @@ fn load_insert(
let _ = presences.get_or_default(sender); let _ = presences.get_or_default(sender);
for user_id in receipts { for user_id in receipts {
info.set_receipt(user_id, msg.event_id().to_owned()); info.set_receipt(ReceiptThread::Main, user_id, msg.event_id().to_owned());
} }
match msg { match msg {
@ -497,31 +497,32 @@ async fn refresh_rooms_forever(client: &Client, store: &AsyncProgramStore) {
async fn send_receipts_forever(client: &Client, store: &AsyncProgramStore) { async fn send_receipts_forever(client: &Client, store: &AsyncProgramStore) {
let mut interval = tokio::time::interval(Duration::from_secs(2)); let mut interval = tokio::time::interval(Duration::from_secs(2));
let mut sent = HashMap::<OwnedRoomId, OwnedEventId>::default(); let mut sent: HashMap<OwnedRoomId, HashMap<ReceiptThread, OwnedEventId>> = Default::default();
loop { loop {
interval.tick().await; interval.tick().await;
let locked = store.lock().await; let locked = store.lock().await;
let user_id = &locked.application.settings.profile.user_id; let user_id = &locked.application.settings.profile.user_id;
let updates = client
.joined_rooms() let mut updates = Vec::new();
.into_iter() for room in client.joined_rooms() {
.filter_map(|room| { let room_id = room.room_id();
let room_id = room.room_id().to_owned(); let Some(info) = locked.application.rooms.get(&room_id) else {
let info = locked.application.rooms.get(&room_id)?; continue;
let new_receipt = info.get_receipt(user_id)?; };
let old_receipt = sent.get(&room_id);
if Some(new_receipt) != old_receipt { let changed = info.receipts(user_id).filter_map(|(thread, new_receipt)| {
Some((room_id, new_receipt.clone())) let old_receipt = sent.get(room_id).and_then(|ts| ts.get(thread));
} else { let changed = Some(new_receipt) != old_receipt;
None changed.then(|| (room_id.to_owned(), thread.to_owned(), new_receipt.to_owned()))
});
updates.extend(changed);
} }
})
.collect::<Vec<_>>();
drop(locked); drop(locked);
for (room_id, new_receipt) in updates { for (room_id, thread, new_receipt) in updates {
use matrix_sdk::ruma::api::client::receipt::create_receipt::v3::ReceiptType; use matrix_sdk::ruma::api::client::receipt::create_receipt::v3::ReceiptType;
let Some(room) = client.get_room(&room_id) else { let Some(room) = client.get_room(&room_id) else {
@ -529,15 +530,11 @@ async fn send_receipts_forever(client: &Client, store: &AsyncProgramStore) {
}; };
match room match room
.send_single_receipt( .send_single_receipt(ReceiptType::Read, thread.to_owned(), new_receipt.clone())
ReceiptType::Read,
ReceiptThread::Unthreaded,
new_receipt.clone(),
)
.await .await
{ {
Ok(()) => { Ok(()) => {
sent.insert(room_id, new_receipt); sent.entry(room_id).or_default().insert(thread, new_receipt);
}, },
Err(e) => tracing::warn!(?room_id, "Failed to set read receipt: {e}"), Err(e) => tracing::warn!(?room_id, "Failed to set read receipt: {e}"),
} }
@ -1050,11 +1047,12 @@ impl ClientWorker {
let Some(receipts) = receipts.get(&ReceiptType::Read) else { let Some(receipts) = receipts.get(&ReceiptType::Read) else {
continue; continue;
}; };
for (user_id, _) in receipts for (user_id, rcpt) in receipts.iter() {
.iter() info.set_receipt(
.filter(|(_, rcpt)| rcpt.thread == ReceiptThread::Unthreaded) rcpt.thread.clone(),
{ user_id.to_owned(),
info.set_receipt(user_id.to_owned(), event_id.clone()); event_id.clone(),
);
} }
} }
} }