Support uploading and downloading message attachments (#13)

This commit is contained in:
Ulyssa 2023-01-10 19:59:30 -08:00
parent 504b520fe1
commit b6f4b03c12
No known key found for this signature in database
GPG key ID: 1B3965A3D18B9B64
14 changed files with 684 additions and 247 deletions

View file

@ -1,12 +1,14 @@
use std::convert::TryFrom;
use std::fmt::{Debug, Formatter};
use std::fs::File;
use std::io::BufWriter;
use std::str::FromStr;
use std::sync::mpsc::{sync_channel, Receiver, RecvTimeoutError, SyncSender};
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::sync::Arc;
use std::time::Duration;
use gethostname::gethostname;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::task::JoinHandle;
use tracing::error;
@ -31,7 +33,7 @@ use matrix_sdk::{
VerificationMethod,
},
room::{
message::{MessageType, RoomMessageEventContent, TextMessageEventContent},
message::{MessageType, RoomMessageEventContent},
name::RoomNameEventContent,
topic::RoomTopicEventContent,
},
@ -41,7 +43,6 @@ use matrix_sdk::{
SyncMessageLikeEvent,
SyncStateEvent,
},
OwnedEventId,
OwnedRoomId,
OwnedRoomOrAliasId,
OwnedUserId,
@ -67,6 +68,7 @@ fn initial_devname() -> String {
format!("{} on {}", IAMB_DEVICE_NAME, gethostname().to_string_lossy())
}
#[derive(Debug)]
pub enum LoginStyle {
SessionRestore(Session),
Password(String),
@ -95,8 +97,6 @@ fn oneshot<T>() -> (ClientReply<T>, ClientResponse<T>) {
return (reply, response);
}
type EchoPair = (OwnedEventId, RoomMessageEventContent);
pub enum WorkerTask {
DirectMessages(ClientReply<Vec<(MatrixRoom, DisplayName)>>),
Init(AsyncProgramStore, ClientReply<()>),
@ -108,16 +108,101 @@ pub enum WorkerTask {
Members(OwnedRoomId, ClientReply<IambResult<Vec<RoomMember>>>),
SpaceMembers(OwnedRoomId, ClientReply<IambResult<Vec<OwnedRoomId>>>),
Spaces(ClientReply<Vec<(MatrixRoom, DisplayName)>>),
SendMessage(OwnedRoomId, String, ClientReply<IambResult<EchoPair>>),
SetRoom(OwnedRoomId, SetRoomField, ClientReply<IambResult<()>>),
TypingNotice(OwnedRoomId),
Verify(VerifyAction, SasVerification, ClientReply<IambResult<EditInfo>>),
VerifyRequest(OwnedUserId, ClientReply<IambResult<EditInfo>>),
}
impl Debug for WorkerTask {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
match self {
WorkerTask::DirectMessages(_) => {
f.debug_tuple("WorkerTask::DirectMessages")
.field(&format_args!("_"))
.finish()
},
WorkerTask::Init(_, _) => {
f.debug_tuple("WorkerTask::Init")
.field(&format_args!("_"))
.field(&format_args!("_"))
.finish()
},
WorkerTask::LoadOlder(room_id, from, n, _) => {
f.debug_tuple("WorkerTask::LoadOlder")
.field(room_id)
.field(from)
.field(n)
.field(&format_args!("_"))
.finish()
},
WorkerTask::Login(style, _) => {
f.debug_tuple("WorkerTask::Login")
.field(style)
.field(&format_args!("_"))
.finish()
},
WorkerTask::GetRoom(room_id, _) => {
f.debug_tuple("WorkerTask::GetRoom")
.field(room_id)
.field(&format_args!("_"))
.finish()
},
WorkerTask::JoinRoom(s, _) => {
f.debug_tuple("WorkerTask::JoinRoom")
.field(s)
.field(&format_args!("_"))
.finish()
},
WorkerTask::JoinedRooms(_) => {
f.debug_tuple("WorkerTask::JoinedRooms").field(&format_args!("_")).finish()
},
WorkerTask::Members(room_id, _) => {
f.debug_tuple("WorkerTask::Members")
.field(room_id)
.field(&format_args!("_"))
.finish()
},
WorkerTask::SpaceMembers(room_id, _) => {
f.debug_tuple("WorkerTask::SpaceMembers")
.field(room_id)
.field(&format_args!("_"))
.finish()
},
WorkerTask::Spaces(_) => {
f.debug_tuple("WorkerTask::Spaces").field(&format_args!("_")).finish()
},
WorkerTask::SetRoom(room_id, field, _) => {
f.debug_tuple("WorkerTask::SetRoom")
.field(room_id)
.field(field)
.field(&format_args!("_"))
.finish()
},
WorkerTask::TypingNotice(room_id) => {
f.debug_tuple("WorkerTask::TypingNotice").field(room_id).finish()
},
WorkerTask::Verify(act, sasv1, _) => {
f.debug_tuple("WorkerTask::Verify")
.field(act)
.field(sasv1)
.field(&format_args!("_"))
.finish()
},
WorkerTask::VerifyRequest(user_id, _) => {
f.debug_tuple("WorkerTask::VerifyRequest")
.field(user_id)
.field(&format_args!("_"))
.finish()
},
}
}
}
#[derive(Clone)]
pub struct Requester {
pub tx: SyncSender<WorkerTask>,
pub client: Client,
pub tx: UnboundedSender<WorkerTask>,
}
impl Requester {
@ -152,14 +237,6 @@ impl Requester {
return response.recv();
}
pub fn send_message(&self, room_id: OwnedRoomId, msg: String) -> IambResult<EchoPair> {
let (reply, response) = oneshot();
self.tx.send(WorkerTask::SendMessage(room_id, msg, reply)).unwrap();
return response.recv();
}
pub fn direct_messages(&self) -> Vec<(MatrixRoom, DisplayName)> {
let (reply, response) = oneshot();
@ -253,60 +330,57 @@ pub struct ClientWorker {
}
impl ClientWorker {
pub fn spawn(settings: ApplicationSettings) -> Requester {
let (tx, rx) = sync_channel(5);
pub async fn spawn(settings: ApplicationSettings) -> Requester {
let (tx, rx) = unbounded_channel();
let account = &settings.profile;
// Set up a custom client that only uses HTTP/1.
//
// During my testing, I kept stumbling across something weird with sync and HTTP/2 that
// will need to be revisited in the future.
let http = reqwest::Client::builder()
.user_agent(IAMB_USER_AGENT)
.timeout(Duration::from_secs(30))
.pool_idle_timeout(Duration::from_secs(60))
.pool_max_idle_per_host(10)
.tcp_keepalive(Duration::from_secs(10))
.http1_only()
.build()
.unwrap();
// Set up the Matrix client for the selected profile.
let client = Client::builder()
.http_client(Arc::new(http))
.homeserver_url(account.url.clone())
.store_config(StoreConfig::default())
.sled_store(settings.matrix_dir.as_path(), None)
.expect("Failed to setup up sled store for Matrix SDK")
.request_config(RequestConfig::new().timeout(REQ_TIMEOUT).retry_timeout(REQ_TIMEOUT))
.build()
.await
.expect("Failed to instantiate Matrix client");
let mut worker = ClientWorker {
initialized: false,
settings,
client: client.clone(),
sync_handle: None,
};
let _ = tokio::spawn(async move {
let account = &settings.profile;
// Set up a custom client that only uses HTTP/1.
//
// During my testing, I kept stumbling across something weird with sync and HTTP/2 that
// will need to be revisited in the future.
let http = reqwest::Client::builder()
.user_agent(IAMB_USER_AGENT)
.timeout(Duration::from_secs(60))
.pool_idle_timeout(Duration::from_secs(120))
.pool_max_idle_per_host(5)
.http1_only()
.build()
.unwrap();
// Set up the Matrix client for the selected profile.
let client = Client::builder()
.http_client(Arc::new(http))
.homeserver_url(account.url.clone())
.store_config(StoreConfig::default())
.sled_store(settings.matrix_dir.as_path(), None)
.expect("Failed to setup up sled store for Matrix SDK")
.request_config(
RequestConfig::new().timeout(REQ_TIMEOUT).retry_timeout(REQ_TIMEOUT),
)
.build()
.await
.expect("Failed to instantiate Matrix client");
let mut worker = ClientWorker {
initialized: false,
settings,
client,
sync_handle: None,
};
worker.work(rx).await;
});
return Requester { tx };
return Requester { client, tx };
}
async fn work(&mut self, rx: Receiver<WorkerTask>) {
async fn work(&mut self, mut rx: UnboundedReceiver<WorkerTask>) {
loop {
let t = rx.recv_timeout(Duration::from_secs(1));
let t = rx.recv().await;
match t {
Ok(task) => self.run(task).await,
Err(RecvTimeoutError::Timeout) => {},
Err(RecvTimeoutError::Disconnected) => {
Some(task) => self.run(task).await,
None => {
break;
},
}
@ -364,10 +438,6 @@ impl ClientWorker {
assert!(self.initialized);
reply.send(self.spaces().await);
},
WorkerTask::SendMessage(room_id, msg, reply) => {
assert!(self.initialized);
reply.send(self.send_message(room_id, msg).await);
},
WorkerTask::TypingNotice(room_id) => {
assert!(self.initialized);
self.typing_notice(room_id).await;
@ -615,33 +685,6 @@ impl ClientWorker {
Ok(Some(InfoMessage::from("Successfully logged in!")))
}
async fn send_message(&mut self, room_id: OwnedRoomId, msg: String) -> IambResult<EchoPair> {
let room = if let r @ Some(_) = self.client.get_joined_room(&room_id) {
r
} else if self.client.join_room_by_id(&room_id).await.is_ok() {
self.client.get_joined_room(&room_id)
} else {
None
};
if let Some(room) = room {
let msg = TextMessageEventContent::plain(msg);
let msg = MessageType::Text(msg);
let msg = RoomMessageEventContent::new(msg);
// XXX: second parameter can be a locally unique transaction id.
// Useful for doing retries.
let resp = room.send(msg.clone(), None).await.map_err(IambError::from)?;
let event_id = resp.event_id;
// XXX: need to either give error messages and retry when needed!
return Ok((event_id, msg));
} else {
Err(IambError::UnknownRoom(room_id).into())
}
}
async fn direct_message(&mut self, user: OwnedUserId) -> IambResult<(MatrixRoom, DisplayName)> {
for (room, name) in self.direct_messages().await {
if room.get_member(user.as_ref()).await.map_err(IambError::from)?.is_some() {