mirror of
https://forgejo.ellis.link/continuwuation/continuwuity.git
synced 2025-07-13 11:37:39 +02:00
rewrite admin handler to use loole channels
This commit is contained in:
parent
2516d44cb1
commit
ddb3b1a2bb
3 changed files with 85 additions and 60 deletions
7
Cargo.lock
generated
7
Cargo.lock
generated
|
@ -484,6 +484,7 @@ dependencies = [
|
||||||
"itertools 0.12.1",
|
"itertools 0.12.1",
|
||||||
"jsonwebtoken",
|
"jsonwebtoken",
|
||||||
"log",
|
"log",
|
||||||
|
"loole",
|
||||||
"lru-cache",
|
"lru-cache",
|
||||||
"nix",
|
"nix",
|
||||||
"num_cpus",
|
"num_cpus",
|
||||||
|
@ -1473,6 +1474,12 @@ version = "0.4.21"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c"
|
checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "loole"
|
||||||
|
version = "0.3.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "c6725f0feab07fcf90f6de5417c06d7fef976fa6e5912fa9e21cb5e4dc6ae5da"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lru-cache"
|
name = "lru-cache"
|
||||||
version = "0.1.2"
|
version = "0.1.2"
|
||||||
|
|
|
@ -70,6 +70,9 @@ cyborgtime = "2.1.1"
|
||||||
bytes = "1.6.0"
|
bytes = "1.6.0"
|
||||||
http = "0.2.12"
|
http = "0.2.12"
|
||||||
|
|
||||||
|
# used to replace the channels of the tokio runtime
|
||||||
|
loole = "0.3.0"
|
||||||
|
|
||||||
# standard date and time tools
|
# standard date and time tools
|
||||||
[dependencies.chrono]
|
[dependencies.chrono]
|
||||||
version = "0.4.37"
|
version = "0.4.37"
|
||||||
|
|
|
@ -23,7 +23,7 @@ use ruma::{
|
||||||
EventId, MxcUri, OwnedRoomAliasId, OwnedRoomId, RoomAliasId, RoomId, RoomVersionId, ServerName, UserId,
|
EventId, MxcUri, OwnedRoomAliasId, OwnedRoomId, RoomAliasId, RoomId, RoomVersionId, ServerName, UserId,
|
||||||
};
|
};
|
||||||
use serde_json::value::to_raw_value;
|
use serde_json::value::to_raw_value;
|
||||||
use tokio::sync::{mpsc, Mutex};
|
use tokio::sync::Mutex;
|
||||||
use tracing::{error, warn};
|
use tracing::{error, warn};
|
||||||
|
|
||||||
use super::pdu::PduBuilder;
|
use super::pdu::PduBuilder;
|
||||||
|
@ -91,13 +91,13 @@ pub enum AdminRoomEvent {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Service {
|
pub struct Service {
|
||||||
pub sender: mpsc::UnboundedSender<AdminRoomEvent>,
|
pub sender: loole::Sender<AdminRoomEvent>, //mpsc::UnboundedSender<AdminRoomEvent>,
|
||||||
receiver: Mutex<mpsc::UnboundedReceiver<AdminRoomEvent>>,
|
receiver: Mutex<loole::Receiver<AdminRoomEvent>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Service {
|
impl Service {
|
||||||
pub fn build() -> Arc<Self> {
|
pub fn build() -> Arc<Self> {
|
||||||
let (sender, receiver) = mpsc::unbounded_channel();
|
let (sender, receiver) = loole::unbounded();
|
||||||
Arc::new(Self {
|
Arc::new(Self {
|
||||||
sender,
|
sender,
|
||||||
receiver: Mutex::new(receiver),
|
receiver: Mutex::new(receiver),
|
||||||
|
@ -115,7 +115,7 @@ impl Service {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handler(&self) -> Result<()> {
|
async fn handler(&self) -> Result<()> {
|
||||||
let mut receiver = self.receiver.lock().await;
|
let receiver = self.receiver.lock().await;
|
||||||
// TODO: Use futures when we have long admin commands
|
// TODO: Use futures when we have long admin commands
|
||||||
//let mut futures = FuturesUnordered::new();
|
//let mut futures = FuturesUnordered::new();
|
||||||
|
|
||||||
|
@ -125,63 +125,72 @@ impl Service {
|
||||||
if let Ok(Some(conduit_room)) = Self::get_admin_room() {
|
if let Ok(Some(conduit_room)) = Self::get_admin_room() {
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
Some(event) = receiver.recv() => {
|
event = receiver.recv_async() => {
|
||||||
let (mut message_content, reply) = match event {
|
match event {
|
||||||
AdminRoomEvent::SendMessage(content) => (content, None),
|
Ok(event) => {
|
||||||
AdminRoomEvent::ProcessMessage(room_message, reply_id) => {
|
let (mut message_content, reply) = match event {
|
||||||
(self.process_admin_message(room_message).await, Some(reply_id))
|
AdminRoomEvent::SendMessage(content) => (content, None),
|
||||||
|
AdminRoomEvent::ProcessMessage(room_message, reply_id) => {
|
||||||
|
(self.process_admin_message(room_message).await, Some(reply_id))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let mutex_state = Arc::clone(
|
||||||
|
services().globals
|
||||||
|
.roomid_mutex_state
|
||||||
|
.write()
|
||||||
|
.await
|
||||||
|
.entry(conduit_room.clone())
|
||||||
|
.or_default(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let state_lock = mutex_state.lock().await;
|
||||||
|
|
||||||
|
if let Some(reply) = reply {
|
||||||
|
message_content.relates_to = Some(Reply { in_reply_to: InReplyTo { event_id: reply.into() } });
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Err(e) = services().rooms.timeline.build_and_append_pdu(
|
||||||
|
PduBuilder {
|
||||||
|
event_type: TimelineEventType::RoomMessage,
|
||||||
|
content: to_raw_value(&message_content)
|
||||||
|
.expect("event is valid, we just created it"),
|
||||||
|
unsigned: None,
|
||||||
|
state_key: None,
|
||||||
|
redacts: None,
|
||||||
|
},
|
||||||
|
&conduit_user,
|
||||||
|
&conduit_room,
|
||||||
|
&state_lock)
|
||||||
|
.await {
|
||||||
|
error!("Failed to build and append admin room response PDU: \"{e}\"");
|
||||||
|
|
||||||
|
let error_room_message = RoomMessageEventContent::text_plain(format!("Failed to build and append admin room PDU: \"{e}\"\n\nThe original admin command may have finished successfully, but we could not return the output."));
|
||||||
|
|
||||||
|
services().rooms.timeline.build_and_append_pdu(
|
||||||
|
PduBuilder {
|
||||||
|
event_type: TimelineEventType::RoomMessage,
|
||||||
|
content: to_raw_value(&error_room_message)
|
||||||
|
.expect("event is valid, we just created it"),
|
||||||
|
unsigned: None,
|
||||||
|
state_key: None,
|
||||||
|
redacts: None,
|
||||||
|
},
|
||||||
|
&conduit_user,
|
||||||
|
&conduit_room,
|
||||||
|
&state_lock)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
drop(state_lock);
|
||||||
}
|
}
|
||||||
};
|
Err(_) => {
|
||||||
|
// TODO: Handle error, Im too unfamiliar with the codebase to know what to do here
|
||||||
|
|
||||||
let mutex_state = Arc::clone(
|
|
||||||
services().globals
|
|
||||||
.roomid_mutex_state
|
|
||||||
.write()
|
|
||||||
.await
|
|
||||||
.entry(conduit_room.clone())
|
|
||||||
.or_default(),
|
|
||||||
);
|
|
||||||
|
|
||||||
let state_lock = mutex_state.lock().await;
|
// recv_async returns an error if all senders have been dropped. If the channel is empty, the returned future will yield to the async runtime.
|
||||||
|
|
||||||
if let Some(reply) = reply {
|
}
|
||||||
message_content.relates_to = Some(Reply { in_reply_to: InReplyTo { event_id: reply.into() } });
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Err(e) = services().rooms.timeline.build_and_append_pdu(
|
|
||||||
PduBuilder {
|
|
||||||
event_type: TimelineEventType::RoomMessage,
|
|
||||||
content: to_raw_value(&message_content)
|
|
||||||
.expect("event is valid, we just created it"),
|
|
||||||
unsigned: None,
|
|
||||||
state_key: None,
|
|
||||||
redacts: None,
|
|
||||||
},
|
|
||||||
&conduit_user,
|
|
||||||
&conduit_room,
|
|
||||||
&state_lock)
|
|
||||||
.await {
|
|
||||||
error!("Failed to build and append admin room response PDU: \"{e}\"");
|
|
||||||
|
|
||||||
let error_room_message = RoomMessageEventContent::text_plain(format!("Failed to build and append admin room PDU: \"{e}\"\n\nThe original admin command may have finished successfully, but we could not return the output."));
|
|
||||||
|
|
||||||
services().rooms.timeline.build_and_append_pdu(
|
|
||||||
PduBuilder {
|
|
||||||
event_type: TimelineEventType::RoomMessage,
|
|
||||||
content: to_raw_value(&error_room_message)
|
|
||||||
.expect("event is valid, we just created it"),
|
|
||||||
unsigned: None,
|
|
||||||
state_key: None,
|
|
||||||
redacts: None,
|
|
||||||
},
|
|
||||||
&conduit_user,
|
|
||||||
&conduit_room,
|
|
||||||
&state_lock)
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
drop(state_lock);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -760,13 +769,19 @@ mod test {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn get_help_short() { get_help_inner("-h"); }
|
fn get_help_short() {
|
||||||
|
get_help_inner("-h");
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn get_help_long() { get_help_inner("--help"); }
|
fn get_help_long() {
|
||||||
|
get_help_inner("--help");
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn get_help_subcommand() { get_help_inner("help"); }
|
fn get_help_subcommand() {
|
||||||
|
get_help_inner("help");
|
||||||
|
}
|
||||||
|
|
||||||
fn get_help_inner(input: &str) {
|
fn get_help_inner(input: &str) {
|
||||||
let error = AdminCommand::try_parse_from(["argv[0] doesn't matter", input])
|
let error = AdminCommand::try_parse_from(["argv[0] doesn't matter", input])
|
||||||
|
|
Loading…
Add table
Reference in a new issue