mod appservice; mod data; mod dest; mod send; mod sender; use std::{fmt::Debug, iter::once, sync::Arc}; use async_trait::async_trait; use conduit::{ err, utils::{ReadyExt, TryReadyExt}, warn, Result, Server, }; use futures::{Stream, StreamExt}; use ruma::{ api::{appservice::Registration, OutgoingRequest}, RoomId, ServerName, UserId, }; use tokio::sync::Mutex; use self::data::Data; pub use self::{ dest::Destination, sender::{EDU_LIMIT, PDU_LIMIT}, }; use crate::{ account_data, client, globals, presence, pusher, resolver, rooms, rooms::timeline::RawPduId, server_keys, users, Dep, }; pub struct Service { server: Arc, services: Services, pub db: Data, sender: loole::Sender, receiver: Mutex>, } struct Services { client: Dep, globals: Dep, resolver: Dep, state: Dep, state_cache: Dep, user: Dep, users: Dep, presence: Dep, read_receipt: Dep, timeline: Dep, account_data: Dep, appservice: Dep, pusher: Dep, server_keys: Dep, } #[derive(Clone, Debug, PartialEq, Eq)] struct Msg { dest: Destination, event: SendingEvent, queue_id: Vec, } #[allow(clippy::module_name_repetitions)] #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub enum SendingEvent { Pdu(RawPduId), // pduid Edu(Vec), // pdu json Flush, // none } #[async_trait] impl crate::Service for Service { fn build(args: crate::Args<'_>) -> Result> { let (sender, receiver) = loole::unbounded(); Ok(Arc::new(Self { server: args.server.clone(), services: Services { client: args.depend::("client"), globals: args.depend::("globals"), resolver: args.depend::("resolver"), state: args.depend::("rooms::state"), state_cache: args.depend::("rooms::state_cache"), user: args.depend::("rooms::user"), users: args.depend::("users"), presence: args.depend::("presence"), read_receipt: args.depend::("rooms::read_receipt"), timeline: args.depend::("rooms::timeline"), account_data: args.depend::("account_data"), appservice: args.depend::("appservice"), pusher: args.depend::("pusher"), server_keys: args.depend::("server_keys"), }, db: Data::new(&args), sender, receiver: Mutex::new(receiver), })) } async fn worker(self: Arc) -> Result<()> { // trait impl can't be split between files so this just glues to mod sender self.sender().await } fn interrupt(&self) { if !self.sender.is_closed() { self.sender.close(); } } fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } } impl Service { #[tracing::instrument(skip(self, pdu_id, user, pushkey), level = "debug")] pub fn send_pdu_push(&self, pdu_id: &RawPduId, user: &UserId, pushkey: String) -> Result { let dest = Destination::Push(user.to_owned(), pushkey); let event = SendingEvent::Pdu(*pdu_id); let _cork = self.db.db.cork(); let keys = self.db.queue_requests(once((&event, &dest))); self.dispatch(Msg { dest, event, queue_id: keys.into_iter().next().expect("request queue key"), }) } #[tracing::instrument(skip(self), level = "debug")] pub fn send_pdu_appservice(&self, appservice_id: String, pdu_id: RawPduId) -> Result { let dest = Destination::Appservice(appservice_id); let event = SendingEvent::Pdu(pdu_id); let _cork = self.db.db.cork(); let keys = self.db.queue_requests(once((&event, &dest))); self.dispatch(Msg { dest, event, queue_id: keys.into_iter().next().expect("request queue key"), }) } #[tracing::instrument(skip(self, room_id, pdu_id), level = "debug")] pub async fn send_pdu_room(&self, room_id: &RoomId, pdu_id: &RawPduId) -> Result { let servers = self .services .state_cache .room_servers(room_id) .ready_filter(|server_name| !self.services.globals.server_is_ours(server_name)); self.send_pdu_servers(servers, pdu_id).await } #[tracing::instrument(skip(self, servers, pdu_id), level = "debug")] pub async fn send_pdu_servers<'a, S>(&self, servers: S, pdu_id: &RawPduId) -> Result where S: Stream + Send + 'a, { let _cork = self.db.db.cork(); let requests = servers .map(|server| (Destination::Normal(server.into()), SendingEvent::Pdu(pdu_id.to_owned()))) .collect::>() .await; let keys = self.db.queue_requests(requests.iter().map(|(o, e)| (e, o))); for ((dest, event), queue_id) in requests.into_iter().zip(keys) { self.dispatch(Msg { dest, event, queue_id, })?; } Ok(()) } #[tracing::instrument(skip(self, server, serialized), level = "debug")] pub fn send_edu_server(&self, server: &ServerName, serialized: Vec) -> Result<()> { let dest = Destination::Normal(server.to_owned()); let event = SendingEvent::Edu(serialized); let _cork = self.db.db.cork(); let keys = self.db.queue_requests(once((&event, &dest))); self.dispatch(Msg { dest, event, queue_id: keys.into_iter().next().expect("request queue key"), }) } #[tracing::instrument(skip(self, room_id, serialized), level = "debug")] pub async fn send_edu_room(&self, room_id: &RoomId, serialized: Vec) -> Result<()> { let servers = self .services .state_cache .room_servers(room_id) .ready_filter(|server_name| !self.services.globals.server_is_ours(server_name)); self.send_edu_servers(servers, serialized).await } #[tracing::instrument(skip(self, servers, serialized), level = "debug")] pub async fn send_edu_servers<'a, S>(&self, servers: S, serialized: Vec) -> Result<()> where S: Stream + Send + 'a, { let _cork = self.db.db.cork(); let requests = servers .map(|server| (Destination::Normal(server.to_owned()), SendingEvent::Edu(serialized.clone()))) .collect::>() .await; let keys = self.db.queue_requests(requests.iter().map(|(o, e)| (e, o))); for ((dest, event), queue_id) in requests.into_iter().zip(keys) { self.dispatch(Msg { dest, event, queue_id, })?; } Ok(()) } #[tracing::instrument(skip(self, room_id), level = "debug")] pub async fn flush_room(&self, room_id: &RoomId) -> Result<()> { let servers = self .services .state_cache .room_servers(room_id) .ready_filter(|server_name| !self.services.globals.server_is_ours(server_name)); self.flush_servers(servers).await } #[tracing::instrument(skip(self, servers), level = "debug")] pub async fn flush_servers<'a, S>(&self, servers: S) -> Result<()> where S: Stream + Send + 'a, { servers .map(ToOwned::to_owned) .map(Destination::Normal) .map(Ok) .ready_try_for_each(|dest| { self.dispatch(Msg { dest, event: SendingEvent::Flush, queue_id: Vec::::new(), }) }) .await } /// Sends a request to a federation server #[tracing::instrument(skip_all, name = "request")] pub async fn send_federation_request(&self, dest: &ServerName, request: T) -> Result where T: OutgoingRequest + Debug + Send, { let client = &self.services.client.federation; self.send(client, dest, request).await } /// Like send_federation_request() but with a very large timeout #[tracing::instrument(skip_all, name = "synapse")] pub async fn send_synapse_request(&self, dest: &ServerName, request: T) -> Result where T: OutgoingRequest + Debug + Send, { let client = &self.services.client.synapse; self.send(client, dest, request).await } /// Sends a request to an appservice /// /// Only returns None if there is no url specified in the appservice /// registration file pub async fn send_appservice_request( &self, registration: Registration, request: T, ) -> Result> where T: OutgoingRequest + Debug + Send, { let client = &self.services.client.appservice; appservice::send_request(client, registration, request).await } /// Cleanup event data /// Used for instance after we remove an appservice registration #[tracing::instrument(skip(self), level = "debug")] pub async fn cleanup_events(&self, appservice_id: String) { self.db .delete_all_requests_for(&Destination::Appservice(appservice_id)) .await; } fn dispatch(&self, msg: Msg) -> Result<()> { debug_assert!(!self.sender.is_full(), "channel full"); debug_assert!(!self.sender.is_closed(), "channel closed"); self.sender.send(msg).map_err(|e| err!("{e}")) } }