Reduce number of Tokio workers (#129)

This commit is contained in:
Ulyssa 2023-07-05 15:25:42 -07:00
parent 8d4539831f
commit 61aba80be1
No known key found for this signature in database
GPG key ID: 1B3965A3D18B9B64
7 changed files with 232 additions and 258 deletions

View file

@ -8,6 +8,7 @@ use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::sync::Arc;
use std::time::{Duration, Instant};
use futures::{stream::FuturesUnordered, StreamExt};
use gethostname::gethostname;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::task::JoinHandle;
@ -260,19 +261,122 @@ async fn load_insert(room_id: OwnedRoomId, res: MessageFetchResult, store: Async
}
}
async fn load_older(client: &Client, store: &AsyncProgramStore) {
async fn load_older(client: &Client, store: &AsyncProgramStore) -> usize {
let limit = MIN_MSG_LOAD;
let plan = load_plan(store).await;
// Fetch each room separately, so they don't block each other.
for (room_id, fetch_id) in plan.into_iter() {
let client = client.clone();
let store = store.clone();
load_plan(store)
.await
.into_iter()
.map(|(room_id, fetch_id)| {
let client = client.clone();
let store = store.clone();
tokio::spawn(async move {
let res = load_older_one(client, room_id.as_ref(), fetch_id, limit).await;
load_insert(room_id, res, store).await;
});
async move {
let res = load_older_one(client, room_id.as_ref(), fetch_id, limit).await;
load_insert(room_id, res, store).await;
}
})
.collect::<FuturesUnordered<_>>()
.count()
.await
}
async fn load_older_forever(client: &Client, store: &AsyncProgramStore) {
// Load older messages every 2 seconds.
let mut interval = tokio::time::interval(Duration::from_secs(2));
loop {
interval.tick().await;
load_older(client, store).await;
}
}
async fn refresh_rooms(client: &Client, store: &AsyncProgramStore) {
let mut names = vec![];
let mut spaces = vec![];
let mut rooms = vec![];
let mut dms = vec![];
for room in client.invited_rooms().into_iter() {
let name = room.display_name().await.unwrap_or(DisplayName::Empty).to_string();
names.push((room.room_id().to_owned(), name));
if room.is_direct() {
let tags = room.tags().await.unwrap_or_default();
dms.push(Arc::new((room.into(), tags)));
} else if room.is_space() {
spaces.push(room.into());
} else {
let tags = room.tags().await.unwrap_or_default();
rooms.push(Arc::new((room.into(), tags)));
}
}
for room in client.joined_rooms().into_iter() {
let name = room.display_name().await.unwrap_or(DisplayName::Empty).to_string();
names.push((room.room_id().to_owned(), name));
if room.is_direct() {
let tags = room.tags().await.unwrap_or_default();
dms.push(Arc::new((room.into(), tags)));
} else if room.is_space() {
spaces.push(room.into());
} else {
let tags = room.tags().await.unwrap_or_default();
rooms.push(Arc::new((room.into(), tags)));
}
}
let mut locked = store.lock().await;
locked.application.sync_info.spaces = spaces;
locked.application.sync_info.rooms = rooms;
locked.application.sync_info.dms = dms;
for (room_id, name) in names {
locked.application.set_room_name(&room_id, &name);
}
}
async fn refresh_rooms_forever(client: &Client, store: &AsyncProgramStore) {
let mut interval = tokio::time::interval(Duration::from_secs(5));
loop {
interval.tick().await;
refresh_rooms(client, store).await;
}
}
async fn refresh_receipts_forever(client: &Client, store: &AsyncProgramStore) {
// Update the displayed read receipts every 5 seconds.
let mut interval = tokio::time::interval(Duration::from_secs(5));
let mut sent = HashMap::<OwnedRoomId, OwnedEventId>::default();
loop {
interval.tick().await;
let receipts = update_receipts(client).await;
let read = store.lock().await.application.set_receipts(receipts).await;
for (room_id, read_till) in read.into_iter() {
if let Some(read_sent) = sent.get(&room_id) {
if read_sent == &read_till {
// Skip unchanged receipts.
continue;
}
}
if let Some(room) = client.get_joined_room(&room_id) {
if room.read_receipt(&read_till).await.is_ok() {
sent.insert(room_id, read_till);
}
}
}
}
}
@ -331,8 +435,6 @@ async fn update_receipts(client: &Client) -> Vec<(OwnedRoomId, Receipts)> {
pub type FetchedRoom = (MatrixRoom, DisplayName, Option<Tags>);
pub enum WorkerTask {
ActiveRooms(ClientReply<Vec<FetchedRoom>>),
DirectMessages(ClientReply<Vec<FetchedRoom>>),
Init(AsyncProgramStore, ClientReply<()>),
Login(LoginStyle, ClientReply<IambResult<EditInfo>>),
GetInviter(Invited, ClientReply<IambResult<Option<RoomMember>>>),
@ -340,7 +442,6 @@ pub enum WorkerTask {
JoinRoom(String, ClientReply<IambResult<OwnedRoomId>>),
Members(OwnedRoomId, ClientReply<IambResult<Vec<RoomMember>>>),
SpaceMembers(OwnedRoomId, ClientReply<IambResult<Vec<OwnedRoomId>>>),
Spaces(ClientReply<Vec<(MatrixRoom, DisplayName)>>),
TypingNotice(OwnedRoomId),
Verify(VerifyAction, SasVerification, ClientReply<IambResult<EditInfo>>),
VerifyRequest(OwnedUserId, ClientReply<IambResult<EditInfo>>),
@ -349,14 +450,6 @@ pub enum WorkerTask {
impl Debug for WorkerTask {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
match self {
WorkerTask::ActiveRooms(_) => {
f.debug_tuple("WorkerTask::ActiveRooms").field(&format_args!("_")).finish()
},
WorkerTask::DirectMessages(_) => {
f.debug_tuple("WorkerTask::DirectMessages")
.field(&format_args!("_"))
.finish()
},
WorkerTask::Init(_, _) => {
f.debug_tuple("WorkerTask::Init")
.field(&format_args!("_"))
@ -396,9 +489,6 @@ impl Debug for WorkerTask {
.field(&format_args!("_"))
.finish()
},
WorkerTask::Spaces(_) => {
f.debug_tuple("WorkerTask::Spaces").field(&format_args!("_")).finish()
},
WorkerTask::TypingNotice(room_id) => {
f.debug_tuple("WorkerTask::TypingNotice").field(room_id).finish()
},
@ -442,14 +532,6 @@ impl Requester {
return response.recv();
}
pub fn direct_messages(&self) -> Vec<FetchedRoom> {
let (reply, response) = oneshot();
self.tx.send(WorkerTask::DirectMessages(reply)).unwrap();
return response.recv();
}
pub fn get_inviter(&self, invite: Invited) -> IambResult<Option<RoomMember>> {
let (reply, response) = oneshot();
@ -474,14 +556,6 @@ impl Requester {
return response.recv();
}
pub fn active_rooms(&self) -> Vec<FetchedRoom> {
let (reply, response) = oneshot();
self.tx.send(WorkerTask::ActiveRooms(reply)).unwrap();
return response.recv();
}
pub fn members(&self, room_id: OwnedRoomId) -> IambResult<Vec<RoomMember>> {
let (reply, response) = oneshot();
@ -498,14 +572,6 @@ impl Requester {
return response.recv();
}
pub fn spaces(&self) -> Vec<(MatrixRoom, DisplayName)> {
let (reply, response) = oneshot();
self.tx.send(WorkerTask::Spaces(reply)).unwrap();
return response.recv();
}
pub fn typing_notice(&self, room_id: OwnedRoomId) {
self.tx.send(WorkerTask::TypingNotice(room_id)).unwrap();
}
@ -532,7 +598,6 @@ pub struct ClientWorker {
settings: ApplicationSettings,
client: Client,
load_handle: Option<JoinHandle<()>>,
rcpt_handle: Option<JoinHandle<()>>,
sync_handle: Option<JoinHandle<()>>,
}
@ -571,7 +636,6 @@ impl ClientWorker {
settings,
client: client.clone(),
load_handle: None,
rcpt_handle: None,
sync_handle: None,
};
@ -597,18 +661,10 @@ impl ClientWorker {
if let Some(handle) = self.sync_handle.take() {
handle.abort();
}
if let Some(handle) = self.rcpt_handle.take() {
handle.abort();
}
}
async fn run(&mut self, task: WorkerTask) {
match task {
WorkerTask::DirectMessages(reply) => {
assert!(self.initialized);
reply.send(self.direct_messages().await);
},
WorkerTask::Init(store, reply) => {
assert_eq!(self.initialized, false);
self.init(store).await;
@ -626,10 +682,6 @@ impl ClientWorker {
assert!(self.initialized);
reply.send(self.get_room(room_id).await);
},
WorkerTask::ActiveRooms(reply) => {
assert!(self.initialized);
reply.send(self.active_rooms().await);
},
WorkerTask::Login(style, reply) => {
assert!(self.initialized);
reply.send(self.login_and_sync(style).await);
@ -642,10 +694,6 @@ impl ClientWorker {
assert!(self.initialized);
reply.send(self.space_members(space).await);
},
WorkerTask::Spaces(reply) => {
assert!(self.initialized);
reply.send(self.spaces().await);
},
WorkerTask::TypingNotice(room_id) => {
assert!(self.initialized);
self.typing_notice(room_id).await;
@ -903,50 +951,14 @@ impl ClientWorker {
},
);
self.rcpt_handle = tokio::spawn({
let store = store.clone();
let client = self.client.clone();
let mut sent = HashMap::<OwnedRoomId, OwnedEventId>::default();
async move {
// Update the displayed read receipts every 5 seconds.
let mut interval = tokio::time::interval(Duration::from_secs(5));
loop {
interval.tick().await;
let receipts = update_receipts(&client).await;
let read = store.lock().await.application.set_receipts(receipts).await;
for (room_id, read_till) in read.into_iter() {
if let Some(read_sent) = sent.get(&room_id) {
if read_sent == &read_till {
// Skip unchanged receipts.
continue;
}
}
if let Some(room) = client.get_joined_room(&room_id) {
if room.read_receipt(&read_till).await.is_ok() {
sent.insert(room_id, read_till);
}
}
}
}
}
})
.into();
self.load_handle = tokio::spawn({
let client = self.client.clone();
async move {
// Load older messages every 2 seconds.
let mut interval = tokio::time::interval(Duration::from_secs(2));
loop {
interval.tick().await;
load_older(&client, &store).await;
}
let load = load_older_forever(&client, &store);
let rcpt = refresh_receipts_forever(&client, &store);
let room = refresh_rooms_forever(&client, &store);
let ((), (), ()) = tokio::join!(load, rcpt, room);
}
})
.into();
@ -1002,31 +1014,30 @@ impl ClientWorker {
Ok(Some(InfoMessage::from("Successfully logged in!")))
}
async fn direct_message(&mut self, user: OwnedUserId) -> IambResult<FetchedRoom> {
for (room, name, tags) in self.direct_messages().await {
async fn direct_message(&mut self, user: OwnedUserId) -> IambResult<OwnedRoomId> {
for room in self.client.rooms() {
if !room.is_direct() {
continue;
}
if room.get_member(user.as_ref()).await.map_err(IambError::from)?.is_some() {
return Ok((room, name, tags));
return Ok(room.room_id().to_owned());
}
}
let rt = CreateRoomType::Direct(user.clone());
let flags = CreateRoomFlags::ENCRYPTED;
match create_room(&self.client, None, rt, flags).await {
Ok(room_id) => self.get_room(room_id).await,
Err(e) => {
error!(
user_id = user.as_str(),
err = e.to_string(),
"Failed to create direct message room"
);
create_room(&self.client, None, rt, flags).await.map_err(|e| {
error!(
user_id = user.as_str(),
err = e.to_string(),
"Failed to create direct message room"
);
let msg = format!("Could not open a room with {user}");
let err = UIError::Failure(msg);
Err(err)
},
}
let msg = format!("Could not open a room with {user}");
UIError::Failure(msg)
})
}
async fn get_inviter(&mut self, invited: Invited) -> IambResult<Option<RoomMember>> {
@ -1058,9 +1069,7 @@ impl ClientWorker {
},
}
} else if let Ok(user) = OwnedUserId::try_from(name.as_str()) {
let room = self.direct_message(user).await?.0;
return Ok(room.room_id().to_owned());
self.direct_message(user).await
} else {
let msg = format!("{:?} is not a valid room or user name", name.as_str());
let err = UIError::Failure(msg);
@ -1069,62 +1078,6 @@ impl ClientWorker {
}
}
async fn direct_messages(&self) -> Vec<FetchedRoom> {
let mut rooms = vec![];
for room in self.client.invited_rooms().into_iter() {
if !room.is_direct() {
continue;
}
let name = room.display_name().await.unwrap_or(DisplayName::Empty);
let tags = room.tags().await.unwrap_or_default();
rooms.push((room.into(), name, tags));
}
for room in self.client.joined_rooms().into_iter() {
if !room.is_direct() {
continue;
}
let name = room.display_name().await.unwrap_or(DisplayName::Empty);
let tags = room.tags().await.unwrap_or_default();
rooms.push((room.into(), name, tags));
}
return rooms;
}
async fn active_rooms(&self) -> Vec<FetchedRoom> {
let mut rooms = vec![];
for room in self.client.invited_rooms().into_iter() {
if room.is_space() || room.is_direct() {
continue;
}
let name = room.display_name().await.unwrap_or(DisplayName::Empty);
let tags = room.tags().await.unwrap_or_default();
rooms.push((room.into(), name, tags));
}
for room in self.client.joined_rooms().into_iter() {
if room.is_space() || room.is_direct() {
continue;
}
let name = room.display_name().await.unwrap_or(DisplayName::Empty);
let tags = room.tags().await.unwrap_or_default();
rooms.push((room.into(), name, tags));
}
return rooms;
}
async fn members(&mut self, room_id: OwnedRoomId) -> IambResult<Vec<RoomMember>> {
if let Some(room) = self.client.get_room(room_id.as_ref()) {
Ok(room.active_members().await.map_err(IambError::from)?)
@ -1145,32 +1098,6 @@ impl ClientWorker {
Ok(rooms)
}
async fn spaces(&self) -> Vec<(MatrixRoom, DisplayName)> {
let mut spaces = vec![];
for room in self.client.invited_rooms().into_iter() {
if !room.is_space() {
continue;
}
let name = room.display_name().await.unwrap_or(DisplayName::Empty);
spaces.push((room.into(), name));
}
for room in self.client.joined_rooms().into_iter() {
if !room.is_space() {
continue;
}
let name = room.display_name().await.unwrap_or(DisplayName::Empty);
spaces.push((room.into(), name));
}
return spaces;
}
async fn typing_notice(&mut self, room_id: OwnedRoomId) {
if let Some(room) = self.client.get_joined_room(room_id.as_ref()) {
let _ = room.typing_notice(true).await;