From ce0ac32143d04bba352e5195261b449b98d3a5eb Mon Sep 17 00:00:00 2001 From: iraizo Date: Sat, 6 Apr 2024 18:13:05 +0200 Subject: [PATCH] move all other services to loole channels --- src/service/admin/mod.rs | 2 +- src/service/presence/mod.rs | 26 ++++++++++++++++---------- src/service/sending/mod.rs | 30 +++++++++++++++++------------- 3 files changed, 34 insertions(+), 24 deletions(-) diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index c7655580..5754a435 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -91,7 +91,7 @@ pub enum AdminRoomEvent { } pub struct Service { - pub sender: loole::Sender, //mpsc::UnboundedSender, + pub sender: loole::Sender, receiver: Mutex>, } diff --git a/src/service/presence/mod.rs b/src/service/presence/mod.rs index 1b8672f7..fd5b1c58 100644 --- a/src/service/presence/mod.rs +++ b/src/service/presence/mod.rs @@ -10,10 +10,7 @@ use ruma::{ OwnedUserId, UInt, UserId, }; use serde::{Deserialize, Serialize}; -use tokio::{ - sync::{mpsc, Mutex}, - time::sleep, -}; +use tokio::{sync::Mutex, time::sleep}; use tracing::{debug, error}; use crate::{services, utils, Config, Error, Result}; @@ -71,14 +68,14 @@ impl Presence { pub struct Service { pub db: &'static dyn Data, - pub timer_sender: mpsc::UnboundedSender<(OwnedUserId, Duration)>, - timer_receiver: Mutex>, + pub timer_sender: loole::Sender<(OwnedUserId, Duration)>, + timer_receiver: Mutex>, timeout_remote_users: bool, } impl Service { pub fn build(db: &'static dyn Data, config: &Config) -> Arc { - let (timer_sender, timer_receiver) = mpsc::unbounded_channel(); + let (timer_sender, timer_receiver) = loole::unbounded(); Arc::new(Self { db, @@ -176,9 +173,18 @@ impl Service { let mut receiver = self.timer_receiver.lock().await; loop { tokio::select! { - Some((user_id, timeout)) = receiver.recv() => { - debug!("Adding timer {}: {user_id} timeout:{timeout:?}", presence_timers.len()); - presence_timers.push(presence_timer(user_id, timeout)); + event = receiver.recv_async() => { + + match event { + Ok((user_id, timeout)) => { + debug!("Adding timer {}: {user_id} timeout:{timeout:?}", presence_timers.len()); + presence_timers.push(presence_timer(user_id, timeout)); + } + Err(e) => { + // TODO: Handle error better? I have no idea what to do here. + error!("Failed to receive presence timer: {}", e); + } + } } Some(user_id) = presence_timers.next() => { diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index c802278b..b884a504 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -27,7 +27,7 @@ use ruma::{ }; use tokio::{ select, - sync::{mpsc, Mutex, Semaphore}, + sync::{Mutex, Semaphore}, }; use tracing::{error, warn}; @@ -43,8 +43,8 @@ pub struct Service { /// The state for a given state hash. pub(super) maximum_requests: Arc, - pub sender: mpsc::UnboundedSender<(OutgoingKind, SendingEventType, Vec)>, - receiver: Mutex)>>, + pub sender: loole::Sender<(OutgoingKind, SendingEventType, Vec)>, + receiver: Mutex)>>, startup_netburst: bool, startup_netburst_keep: i64, timeout: u64, @@ -73,7 +73,7 @@ enum TransactionStatus { impl Service { pub fn build(db: &'static dyn Data, config: &Config) -> Arc { - let (sender, receiver) = mpsc::unbounded_channel(); + let (sender, receiver) = loole::unbounded(); Arc::new(Self { db, sender, @@ -275,7 +275,7 @@ impl Service { #[tracing::instrument(skip(self), name = "sender")] async fn handler(&self) -> Result<()> { - let mut receiver = self.receiver.lock().await; + let receiver = self.receiver.lock().await; let mut futures = FuturesUnordered::new(); let mut current_transaction_status = HashMap::::new(); @@ -306,7 +306,7 @@ impl Service { } loop { - select! { + tokio::select! { Some(response) = futures.next() => { match response { Ok(outgoing_kind) => { @@ -343,13 +343,17 @@ impl Service { } }; }, - Some((outgoing_kind, event, key)) = receiver.recv() => { - if let Ok(Some(events)) = self.select_events( - &outgoing_kind, - vec![(event, key)], - &mut current_transaction_status, - ) { - futures.push(handle_events(outgoing_kind, events)); + + event = receiver.recv_async() => { + // TODO: Error handling for this + if let Ok((outgoing_kind, event, key)) = event { + if let Ok(Some(events)) = self.select_events( + &outgoing_kind, + vec![(event, key)], + &mut current_transaction_status, + ) { + futures.push(handle_events(outgoing_kind, events)); + } } } }