diff --git a/Cargo.lock b/Cargo.lock index fe2a4f4f..6ce78a43 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -484,6 +484,7 @@ dependencies = [ "itertools 0.12.1", "jsonwebtoken", "log", + "loole", "lru-cache", "nix", "num_cpus", @@ -1473,6 +1474,12 @@ version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" +[[package]] +name = "loole" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6725f0feab07fcf90f6de5417c06d7fef976fa6e5912fa9e21cb5e4dc6ae5da" + [[package]] name = "lru-cache" version = "0.1.2" diff --git a/Cargo.toml b/Cargo.toml index 5f7a3fde..0a217f25 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,6 +70,9 @@ cyborgtime = "2.1.1" bytes = "1.6.0" http = "0.2.12" +# used to replace the channels of the tokio runtime +loole = "0.3.0" + # standard date and time tools [dependencies.chrono] version = "0.4.37" diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index 398e1d62..c9b6ac63 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -23,7 +23,7 @@ use ruma::{ EventId, MxcUri, OwnedRoomAliasId, OwnedRoomId, RoomAliasId, RoomId, RoomVersionId, ServerName, UserId, }; use serde_json::value::to_raw_value; -use tokio::sync::{mpsc, Mutex}; +use tokio::sync::Mutex; use tracing::{error, warn}; use super::pdu::PduBuilder; @@ -91,13 +91,13 @@ pub enum AdminRoomEvent { } pub struct Service { - pub sender: mpsc::UnboundedSender, - receiver: Mutex>, + pub sender: loole::Sender, //mpsc::UnboundedSender, + receiver: Mutex>, } impl Service { pub fn build() -> Arc { - let (sender, receiver) = mpsc::unbounded_channel(); + let (sender, receiver) = loole::unbounded(); Arc::new(Self { sender, receiver: Mutex::new(receiver), @@ -115,7 +115,7 @@ impl Service { } async fn handler(&self) -> Result<()> { - let mut receiver = self.receiver.lock().await; + let receiver = self.receiver.lock().await; // TODO: Use futures when we have long admin commands //let mut futures = FuturesUnordered::new(); @@ -125,63 +125,72 @@ impl Service { if let Ok(Some(conduit_room)) = Self::get_admin_room() { loop { tokio::select! { - Some(event) = receiver.recv() => { - let (mut message_content, reply) = match event { - AdminRoomEvent::SendMessage(content) => (content, None), - AdminRoomEvent::ProcessMessage(room_message, reply_id) => { - (self.process_admin_message(room_message).await, Some(reply_id)) + event = receiver.recv_async() => { + match event { + Ok(event) => { + let (mut message_content, reply) = match event { + AdminRoomEvent::SendMessage(content) => (content, None), + AdminRoomEvent::ProcessMessage(room_message, reply_id) => { + (self.process_admin_message(room_message).await, Some(reply_id)) + } + }; + + let mutex_state = Arc::clone( + services().globals + .roomid_mutex_state + .write() + .await + .entry(conduit_room.clone()) + .or_default(), + ); + + let state_lock = mutex_state.lock().await; + + if let Some(reply) = reply { + message_content.relates_to = Some(Reply { in_reply_to: InReplyTo { event_id: reply.into() } }); + } + + if let Err(e) = services().rooms.timeline.build_and_append_pdu( + PduBuilder { + event_type: TimelineEventType::RoomMessage, + content: to_raw_value(&message_content) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: None, + redacts: None, + }, + &conduit_user, + &conduit_room, + &state_lock) + .await { + error!("Failed to build and append admin room response PDU: \"{e}\""); + + let error_room_message = RoomMessageEventContent::text_plain(format!("Failed to build and append admin room PDU: \"{e}\"\n\nThe original admin command may have finished successfully, but we could not return the output.")); + + services().rooms.timeline.build_and_append_pdu( + PduBuilder { + event_type: TimelineEventType::RoomMessage, + content: to_raw_value(&error_room_message) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: None, + redacts: None, + }, + &conduit_user, + &conduit_room, + &state_lock) + .await?; + } + drop(state_lock); } - }; + Err(_) => { + // TODO: Handle error, Im too unfamiliar with the codebase to know what to do here - let mutex_state = Arc::clone( - services().globals - .roomid_mutex_state - .write() - .await - .entry(conduit_room.clone()) - .or_default(), - ); - let state_lock = mutex_state.lock().await; + // recv_async returns an error if all senders have been dropped. If the channel is empty, the returned future will yield to the async runtime. - if let Some(reply) = reply { - message_content.relates_to = Some(Reply { in_reply_to: InReplyTo { event_id: reply.into() } }); + } } - - if let Err(e) = services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomMessage, - content: to_raw_value(&message_content) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: None, - redacts: None, - }, - &conduit_user, - &conduit_room, - &state_lock) - .await { - error!("Failed to build and append admin room response PDU: \"{e}\""); - - let error_room_message = RoomMessageEventContent::text_plain(format!("Failed to build and append admin room PDU: \"{e}\"\n\nThe original admin command may have finished successfully, but we could not return the output.")); - - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomMessage, - content: to_raw_value(&error_room_message) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: None, - redacts: None, - }, - &conduit_user, - &conduit_room, - &state_lock) - .await?; - } - - - drop(state_lock); } } } @@ -760,13 +769,19 @@ mod test { use super::*; #[test] - fn get_help_short() { get_help_inner("-h"); } + fn get_help_short() { + get_help_inner("-h"); + } #[test] - fn get_help_long() { get_help_inner("--help"); } + fn get_help_long() { + get_help_inner("--help"); + } #[test] - fn get_help_subcommand() { get_help_inner("help"); } + fn get_help_subcommand() { + get_help_inner("help"); + } fn get_help_inner(input: &str) { let error = AdminCommand::try_parse_from(["argv[0] doesn't matter", input])