move all other services to loole channels

This commit is contained in:
iraizo 2024-04-06 18:13:05 +02:00
parent 546d0efe21
commit ce0ac32143
No known key found for this signature in database
GPG key ID: 12A9EF3A7EA88D6C
3 changed files with 34 additions and 24 deletions

View file

@ -91,7 +91,7 @@ pub enum AdminRoomEvent {
} }
pub struct Service { pub struct Service {
pub sender: loole::Sender<AdminRoomEvent>, //mpsc::UnboundedSender<AdminRoomEvent>, pub sender: loole::Sender<AdminRoomEvent>,
receiver: Mutex<loole::Receiver<AdminRoomEvent>>, receiver: Mutex<loole::Receiver<AdminRoomEvent>>,
} }

View file

@ -10,10 +10,7 @@ use ruma::{
OwnedUserId, UInt, UserId, OwnedUserId, UInt, UserId,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::{ use tokio::{sync::Mutex, time::sleep};
sync::{mpsc, Mutex},
time::sleep,
};
use tracing::{debug, error}; use tracing::{debug, error};
use crate::{services, utils, Config, Error, Result}; use crate::{services, utils, Config, Error, Result};
@ -71,14 +68,14 @@ impl Presence {
pub struct Service { pub struct Service {
pub db: &'static dyn Data, pub db: &'static dyn Data,
pub timer_sender: mpsc::UnboundedSender<(OwnedUserId, Duration)>, pub timer_sender: loole::Sender<(OwnedUserId, Duration)>,
timer_receiver: Mutex<mpsc::UnboundedReceiver<(OwnedUserId, Duration)>>, timer_receiver: Mutex<loole::Receiver<(OwnedUserId, Duration)>>,
timeout_remote_users: bool, timeout_remote_users: bool,
} }
impl Service { impl Service {
pub fn build(db: &'static dyn Data, config: &Config) -> Arc<Self> { pub fn build(db: &'static dyn Data, config: &Config) -> Arc<Self> {
let (timer_sender, timer_receiver) = mpsc::unbounded_channel(); let (timer_sender, timer_receiver) = loole::unbounded();
Arc::new(Self { Arc::new(Self {
db, db,
@ -176,9 +173,18 @@ impl Service {
let mut receiver = self.timer_receiver.lock().await; let mut receiver = self.timer_receiver.lock().await;
loop { loop {
tokio::select! { tokio::select! {
Some((user_id, timeout)) = receiver.recv() => { event = receiver.recv_async() => {
debug!("Adding timer {}: {user_id} timeout:{timeout:?}", presence_timers.len());
presence_timers.push(presence_timer(user_id, timeout)); match event {
Ok((user_id, timeout)) => {
debug!("Adding timer {}: {user_id} timeout:{timeout:?}", presence_timers.len());
presence_timers.push(presence_timer(user_id, timeout));
}
Err(e) => {
// TODO: Handle error better? I have no idea what to do here.
error!("Failed to receive presence timer: {}", e);
}
}
} }
Some(user_id) = presence_timers.next() => { Some(user_id) = presence_timers.next() => {

View file

@ -27,7 +27,7 @@ use ruma::{
}; };
use tokio::{ use tokio::{
select, select,
sync::{mpsc, Mutex, Semaphore}, sync::{Mutex, Semaphore},
}; };
use tracing::{error, warn}; use tracing::{error, warn};
@ -43,8 +43,8 @@ pub struct Service {
/// The state for a given state hash. /// The state for a given state hash.
pub(super) maximum_requests: Arc<Semaphore>, pub(super) maximum_requests: Arc<Semaphore>,
pub sender: mpsc::UnboundedSender<(OutgoingKind, SendingEventType, Vec<u8>)>, pub sender: loole::Sender<(OutgoingKind, SendingEventType, Vec<u8>)>,
receiver: Mutex<mpsc::UnboundedReceiver<(OutgoingKind, SendingEventType, Vec<u8>)>>, receiver: Mutex<loole::Receiver<(OutgoingKind, SendingEventType, Vec<u8>)>>,
startup_netburst: bool, startup_netburst: bool,
startup_netburst_keep: i64, startup_netburst_keep: i64,
timeout: u64, timeout: u64,
@ -73,7 +73,7 @@ enum TransactionStatus {
impl Service { impl Service {
pub fn build(db: &'static dyn Data, config: &Config) -> Arc<Self> { pub fn build(db: &'static dyn Data, config: &Config) -> Arc<Self> {
let (sender, receiver) = mpsc::unbounded_channel(); let (sender, receiver) = loole::unbounded();
Arc::new(Self { Arc::new(Self {
db, db,
sender, sender,
@ -275,7 +275,7 @@ impl Service {
#[tracing::instrument(skip(self), name = "sender")] #[tracing::instrument(skip(self), name = "sender")]
async fn handler(&self) -> Result<()> { async fn handler(&self) -> Result<()> {
let mut receiver = self.receiver.lock().await; let receiver = self.receiver.lock().await;
let mut futures = FuturesUnordered::new(); let mut futures = FuturesUnordered::new();
let mut current_transaction_status = HashMap::<OutgoingKind, TransactionStatus>::new(); let mut current_transaction_status = HashMap::<OutgoingKind, TransactionStatus>::new();
@ -306,7 +306,7 @@ impl Service {
} }
loop { loop {
select! { tokio::select! {
Some(response) = futures.next() => { Some(response) = futures.next() => {
match response { match response {
Ok(outgoing_kind) => { Ok(outgoing_kind) => {
@ -343,13 +343,17 @@ impl Service {
} }
}; };
}, },
Some((outgoing_kind, event, key)) = receiver.recv() => {
if let Ok(Some(events)) = self.select_events( event = receiver.recv_async() => {
&outgoing_kind, // TODO: Error handling for this
vec![(event, key)], if let Ok((outgoing_kind, event, key)) = event {
&mut current_transaction_status, if let Ok(Some(events)) = self.select_events(
) { &outgoing_kind,
futures.push(handle_events(outgoing_kind, events)); vec![(event, key)],
&mut current_transaction_status,
) {
futures.push(handle_events(outgoing_kind, events));
}
} }
} }
} }