mod appservice; mod data; mod dest; mod send; mod sender; use std::{fmt::Debug, iter::once, sync::Arc}; use async_trait::async_trait; use conduwuit::{ debug_warn, err, utils::{ReadyExt, TryReadyExt}, warn, Result, Server, }; use futures::{Stream, StreamExt}; use ruma::{ api::{appservice::Registration, OutgoingRequest}, RoomId, ServerName, UserId, }; use tokio::sync::Mutex; use self::data::Data; pub use self::{ dest::Destination, sender::{EDU_LIMIT, PDU_LIMIT}, }; use crate::{ account_data, appservice::NamespaceRegex, client, globals, presence, pusher, resolver, rooms::{self, timeline::RawPduId}, server_keys, users, Dep, }; pub struct Service { server: Arc, services: Services, pub db: Data, sender: loole::Sender, receiver: Mutex>, } struct Services { alias: Dep, client: Dep, globals: Dep, resolver: Dep, state: Dep, state_cache: Dep, user: Dep, users: Dep, presence: Dep, read_receipt: Dep, timeline: Dep, account_data: Dep, appservice: Dep, pusher: Dep, server_keys: Dep, } #[derive(Clone, Debug, PartialEq, Eq)] struct Msg { dest: Destination, event: SendingEvent, queue_id: Vec, } #[allow(clippy::module_name_repetitions)] #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub enum SendingEvent { Pdu(RawPduId), // pduid Edu(Vec), // pdu json Flush, // none } #[async_trait] impl crate::Service for Service { fn build(args: crate::Args<'_>) -> Result> { let (sender, receiver) = loole::unbounded(); Ok(Arc::new(Self { server: args.server.clone(), services: Services { alias: args.depend::("rooms::alias"), client: args.depend::("client"), globals: args.depend::("globals"), resolver: args.depend::("resolver"), state: args.depend::("rooms::state"), state_cache: args.depend::("rooms::state_cache"), user: args.depend::("rooms::user"), users: args.depend::("users"), presence: args.depend::("presence"), read_receipt: args.depend::("rooms::read_receipt"), timeline: args.depend::("rooms::timeline"), account_data: args.depend::("account_data"), appservice: args.depend::("appservice"), pusher: args.depend::("pusher"), server_keys: args.depend::("server_keys"), }, db: Data::new(&args), sender, receiver: Mutex::new(receiver), })) } async fn worker(self: Arc) -> Result<()> { // trait impl can't be split between files so this just glues to mod sender self.sender().await } fn interrupt(&self) { if !self.sender.is_closed() { self.sender.close(); } } fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } } impl Service { #[tracing::instrument(skip(self, pdu_id, user, pushkey), level = "debug")] pub fn send_pdu_push(&self, pdu_id: &RawPduId, user: &UserId, pushkey: String) -> Result { let dest = Destination::Push(user.to_owned(), pushkey); let event = SendingEvent::Pdu(*pdu_id); let _cork = self.db.db.cork(); let keys = self.db.queue_requests(once((&event, &dest))); self.dispatch(Msg { dest, event, queue_id: keys.into_iter().next().expect("request queue key"), }) } #[tracing::instrument(skip(self), level = "debug")] pub fn send_pdu_appservice(&self, appservice_id: String, pdu_id: RawPduId) -> Result { let dest = Destination::Appservice(appservice_id); let event = SendingEvent::Pdu(pdu_id); let _cork = self.db.db.cork(); let keys = self.db.queue_requests(once((&event, &dest))); self.dispatch(Msg { dest, event, queue_id: keys.into_iter().next().expect("request queue key"), }) } #[tracing::instrument(skip(self, room_id, pdu_id), level = "debug")] pub async fn send_pdu_room(&self, room_id: &RoomId, pdu_id: &RawPduId) -> Result { let servers = self .services .state_cache .room_servers(room_id) .ready_filter(|server_name| !self.services.globals.server_is_ours(server_name)); self.send_pdu_servers(servers, pdu_id).await } #[tracing::instrument(skip(self, servers, pdu_id), level = "debug")] pub async fn send_pdu_servers<'a, S>(&self, servers: S, pdu_id: &RawPduId) -> Result where S: Stream + Send + 'a, { let _cork = self.db.db.cork(); let requests = servers .map(|server| { (Destination::Normal(server.into()), SendingEvent::Pdu(pdu_id.to_owned())) }) .collect::>() .await; let keys = self.db.queue_requests(requests.iter().map(|(o, e)| (e, o))); for ((dest, event), queue_id) in requests.into_iter().zip(keys) { self.dispatch(Msg { dest, event, queue_id })?; } Ok(()) } #[tracing::instrument(skip(self, server, serialized), level = "debug")] pub fn send_edu_server(&self, server: &ServerName, serialized: Vec) -> Result<()> { let dest = Destination::Normal(server.to_owned()); let event = SendingEvent::Edu(serialized); let _cork = self.db.db.cork(); let keys = self.db.queue_requests(once((&event, &dest))); self.dispatch(Msg { dest, event, queue_id: keys.into_iter().next().expect("request queue key"), }) } #[tracing::instrument(skip(self, serialized), level = "debug")] pub fn send_edu_appservice(&self, appservice_id: &str, serialized: Vec) -> Result { let dest = Destination::Appservice(appservice_id.to_owned()); let event = SendingEvent::Edu(serialized); let _cork = self.db.db.cork(); let keys = self.db.queue_requests(once((&event, &dest))); self.dispatch(Msg { dest, event, queue_id: keys.into_iter().next().expect("request queue key"), }) } #[tracing::instrument(skip(self, room_id, serialized), level = "debug")] pub async fn send_edu_appservice_room( &self, room_id: &RoomId, serialized: Vec, ) -> Result<()> { for appservice in self.services.appservice.read().await.values() { let matching_aliases = |aliases: NamespaceRegex| { self.services .alias .local_aliases_for_room(room_id) .ready_any(move |room_alias| aliases.is_match(room_alias.as_str())) }; if appservice.rooms.is_match(room_id.as_str()) || matching_aliases(appservice.aliases.clone()).await || self .services .state_cache .appservice_in_room(room_id, appservice) .await { self.send_edu_appservice(&appservice.registration.id, serialized.clone())?; } } Ok(()) } #[tracing::instrument(skip(self, room_id, serialized), level = "debug")] pub async fn send_edu_room(&self, room_id: &RoomId, serialized: Vec) -> Result<()> { let servers = self .services .state_cache .room_servers(room_id) .ready_filter(|server_name| !self.services.globals.server_is_ours(server_name)); self.send_edu_servers(servers, serialized).await } #[tracing::instrument(skip(self, servers, serialized), level = "debug")] pub async fn send_edu_servers<'a, S>(&self, servers: S, serialized: Vec) -> Result<()> where S: Stream + Send + 'a, { let _cork = self.db.db.cork(); let requests = servers .map(|server| { (Destination::Normal(server.to_owned()), SendingEvent::Edu(serialized.clone())) }) .collect::>() .await; let keys = self.db.queue_requests(requests.iter().map(|(o, e)| (e, o))); for ((dest, event), queue_id) in requests.into_iter().zip(keys) { self.dispatch(Msg { dest, event, queue_id })?; } Ok(()) } #[tracing::instrument(skip(self, room_id), level = "debug")] pub async fn flush_room(&self, room_id: &RoomId) -> Result<()> { let servers = self .services .state_cache .room_servers(room_id) .ready_filter(|server_name| !self.services.globals.server_is_ours(server_name)); self.flush_servers(servers).await } #[tracing::instrument(skip(self, servers), level = "debug")] pub async fn flush_servers<'a, S>(&self, servers: S) -> Result<()> where S: Stream + Send + 'a, { servers .map(ToOwned::to_owned) .map(Destination::Normal) .map(Ok) .ready_try_for_each(|dest| { self.dispatch(Msg { dest, event: SendingEvent::Flush, queue_id: Vec::::new(), }) }) .await } /// Sends a request to a federation server #[tracing::instrument(skip_all, name = "request")] pub async fn send_federation_request( &self, dest: &ServerName, request: T, ) -> Result where T: OutgoingRequest + Debug + Send, { let client = &self.services.client.federation; self.send(client, dest, request).await } /// Like send_federation_request() but with a very large timeout #[tracing::instrument(skip_all, name = "synapse")] pub async fn send_synapse_request( &self, dest: &ServerName, request: T, ) -> Result where T: OutgoingRequest + Debug + Send, { let client = &self.services.client.synapse; self.send(client, dest, request).await } /// Sends a request to an appservice /// /// Only returns None if there is no url specified in the appservice /// registration file pub async fn send_appservice_request( &self, registration: Registration, request: T, ) -> Result> where T: OutgoingRequest + Debug + Send, { let client = &self.services.client.appservice; appservice::send_request(client, registration, request).await } /// Clean up queued sending event data /// /// Used after we remove an appservice registration or a user deletes a push /// key #[tracing::instrument(skip(self), level = "debug")] pub async fn cleanup_events( &self, appservice_id: Option<&str>, user_id: Option<&UserId>, push_key: Option<&str>, ) -> Result { match (appservice_id, user_id, push_key) { | (None, Some(user_id), Some(push_key)) => { self.db .delete_all_requests_for(&Destination::Push( user_id.to_owned(), push_key.to_owned(), )) .await; Ok(()) }, | (Some(appservice_id), None, None) => { self.db .delete_all_requests_for(&Destination::Appservice(appservice_id.to_owned())) .await; Ok(()) }, | _ => { debug_warn!("cleanup_events called with too many or too few arguments"); Ok(()) }, } } fn dispatch(&self, msg: Msg) -> Result<()> { debug_assert!(!self.sender.is_full(), "channel full"); debug_assert!(!self.sender.is_closed(), "channel closed"); self.sender.send(msg).map_err(|e| err!("{e}")) } }