diff --git a/src/api/client/read_marker.rs b/src/api/client/read_marker.rs index 89fe003a..ab7cc6ad 100644 --- a/src/api/client/read_marker.rs +++ b/src/api/client/read_marker.rs @@ -72,14 +72,10 @@ pub(crate) async fn set_read_marker_route( services .rooms .read_receipt - .readreceipt_update( - sender_user, - &body.room_id, - &ruma::events::receipt::ReceiptEvent { - content: ruma::events::receipt::ReceiptEventContent(receipt_content), - room_id: body.room_id.clone(), - }, - ) + .readreceipt_update(sender_user, &body.room_id, ruma::events::receipt::ReceiptEvent { + content: ruma::events::receipt::ReceiptEventContent(receipt_content), + room_id: body.room_id.clone(), + }) .await; } @@ -171,7 +167,7 @@ pub(crate) async fn create_receipt_route( .readreceipt_update( sender_user, &body.room_id, - &ruma::events::receipt::ReceiptEvent { + ruma::events::receipt::ReceiptEvent { content: ruma::events::receipt::ReceiptEventContent(receipt_content), room_id: body.room_id.clone(), }, diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index cd4dfc90..28f97c04 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -791,8 +791,8 @@ async fn load_joined_room( let typings = services .rooms .typing - .typings_all(room_id, sender_user) - .await?; + .typings_user(room_id, sender_user) + .await; Ok(vec![serde_json::from_str(&serde_json::to_string(&typings)?)?]) }) diff --git a/src/api/server/send.rs b/src/api/server/send.rs index 2e615a0c..1ec4bcbc 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -370,7 +370,7 @@ async fn handle_edu_receipt_room_user( services .rooms .read_receipt - .readreceipt_update(user_id, room_id, &ReceiptEvent { + .readreceipt_update(user_id, room_id, ReceiptEvent { content: ReceiptEventContent(content.into()), room_id: room_id.to_owned(), }) diff --git a/src/service/rooms/read_receipt/mod.rs b/src/service/rooms/read_receipt/mod.rs index 2bc21355..adc57dc3 100644 --- a/src/service/rooms/read_receipt/mod.rs +++ b/src/service/rooms/read_receipt/mod.rs @@ -2,9 +2,10 @@ mod data; use std::{collections::BTreeMap, sync::Arc}; -use conduwuit::{debug, err, warn, PduCount, PduId, RawPduId, Result}; +use conduwuit::{debug, err, result::LogErr, warn, PduCount, PduId, RawPduId, Result}; use futures::{try_join, Stream, TryFutureExt}; use ruma::{ + api::appservice::event::push_events::v1::EphemeralData, events::{ receipt::{ReceiptEvent, ReceiptEventContent, Receipts}, AnySyncEphemeralRoomEvent, SyncEphemeralRoomEvent, @@ -14,7 +15,11 @@ use ruma::{ }; use self::data::{Data, ReceiptItem}; -use crate::{rooms, sending, Dep}; +use crate::{ + rooms, + sending::{self, EduBuf}, + Dep, +}; pub struct Service { services: Services, @@ -48,14 +53,24 @@ impl Service { &self, user_id: &UserId, room_id: &RoomId, - event: &ReceiptEvent, + event: ReceiptEvent, ) { - self.db.readreceipt_update(user_id, room_id, event).await; + self.db.readreceipt_update(user_id, room_id, &event).await; self.services .sending .flush_room(room_id) .await .expect("room flush failed"); + // update appservices + let edu = EphemeralData::Receipt(event); + let mut buf = EduBuf::new(); + serde_json::to_writer(&mut buf, &edu).expect("Serialized EphemeralData::Receipt"); + _ = self + .services + .sending + .send_edu_appservice_room(room_id, buf) + .await + .log_err(); } /// Gets the latest private read receipt from the user in the room diff --git a/src/service/rooms/typing/mod.rs b/src/service/rooms/typing/mod.rs index c710b33a..0d5a380a 100644 --- a/src/service/rooms/typing/mod.rs +++ b/src/service/rooms/typing/mod.rs @@ -7,8 +7,11 @@ use conduwuit::{ }; use futures::StreamExt; use ruma::{ - api::federation::transactions::edu::{Edu, TypingContent}, - events::SyncEphemeralRoomEvent, + api::{ + appservice::event::push_events::v1::EphemeralData, + federation::transactions::edu::{Edu, TypingContent}, + }, + events::{typing::TypingEventContent, EphemeralRoomEvent, SyncEphemeralRoomEvent}, OwnedRoomId, OwnedUserId, RoomId, UserId, }; use tokio::sync::{broadcast, RwLock}; @@ -76,6 +79,9 @@ impl Service { trace!("receiver found what it was looking for and is no longer interested"); } + // update appservices + self.appservice_send(room_id).await?; + // update federation if self.services.globals.user_is_local(user_id) { self.federation_send(room_id, user_id, true).await?; @@ -104,6 +110,9 @@ impl Service { trace!("receiver found what it was looking for and is no longer interested"); } + // update appservices + self.appservice_send(room_id).await?; + // update federation if self.services.globals.user_is_local(user_id) { self.federation_send(room_id, user_id, false).await?; @@ -139,29 +148,33 @@ impl Service { } }; - if !removable.is_empty() { - let typing = &mut self.typing.write().await; - let room = typing.entry(room_id.to_owned()).or_default(); - for user in &removable { - debug_info!("typing timeout {user:?} in {room_id:?}"); - room.remove(user); - } + if removable.is_empty() { + return Ok(()); + } + let typing = &mut self.typing.write().await; + let room = typing.entry(room_id.to_owned()).or_default(); + for user in &removable { + debug_info!("typing timeout {user:?} in {room_id:?}"); + room.remove(user); + } - // update clients - self.last_typing_update - .write() - .await - .insert(room_id.to_owned(), self.services.globals.next_count()?); + // update clients + self.last_typing_update + .write() + .await + .insert(room_id.to_owned(), self.services.globals.next_count()?); - if self.typing_update_sender.send(room_id.to_owned()).is_err() { - trace!("receiver found what it was looking for and is no longer interested"); - } + if self.typing_update_sender.send(room_id.to_owned()).is_err() { + trace!("receiver found what it was looking for and is no longer interested"); + } - // update federation - for user in &removable { - if self.services.globals.user_is_local(user) { - self.federation_send(room_id, user, false).await?; - } + // update appservices + self.appservice_send(room_id).await?; + + // update federation + for user in &removable { + if self.services.globals.user_is_local(user) { + self.federation_send(room_id, user, false).await?; } } @@ -180,18 +193,31 @@ impl Service { .unwrap_or(0)) } - /// Returns a new typing EDU. - pub async fn typings_all( - &self, - room_id: &RoomId, - sender_user: &UserId, - ) -> Result> { + /// Returns a new typing EDU's content. + pub async fn typings_content(&self, room_id: &RoomId) -> TypingEventContent { let room_typing_indicators = self.typing.read().await.get(room_id).cloned(); let Some(typing_indicators) = room_typing_indicators else { - return Ok(SyncEphemeralRoomEvent { - content: ruma::events::typing::TypingEventContent { user_ids: Vec::new() }, - }); + return TypingEventContent { user_ids: Vec::new() }; + }; + + let user_ids: Vec<_> = typing_indicators.into_keys().collect(); + + TypingEventContent { user_ids } + } + + /// Returns a new typing EDU, filtered for a specific user + pub async fn typings_user( + &self, + room_id: &RoomId, + sender_user: &UserId, + ) -> SyncEphemeralRoomEvent { + let room_typing_indicators = self.typing.read().await.get(room_id).cloned(); + + let Some(typing_indicators) = room_typing_indicators else { + return SyncEphemeralRoomEvent { + content: TypingEventContent { user_ids: Vec::new() }, + }; }; let user_ids: Vec<_> = typing_indicators @@ -208,9 +234,7 @@ impl Service { .collect() .await; - Ok(SyncEphemeralRoomEvent { - content: ruma::events::typing::TypingEventContent { user_ids }, - }) + SyncEphemeralRoomEvent { content: TypingEventContent { user_ids } } } async fn federation_send( @@ -238,4 +262,19 @@ impl Service { Ok(()) } + + async fn appservice_send(&self, room_id: &RoomId) -> Result<()> { + let edu = EphemeralData::Typing(EphemeralRoomEvent { + content: self.typings_content(room_id).await, + room_id: room_id.into(), + }); + + let mut buf = EduBuf::new(); + serde_json::to_writer(&mut buf, &edu).expect("Serialized Edu::Typing"); + + self.services + .sending + .send_edu_appservice_room(room_id, buf) + .await + } } diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index b146ad49..04530171 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -30,8 +30,11 @@ pub use self::{ sender::{EDU_LIMIT, PDU_LIMIT}, }; use crate::{ - account_data, client, federation, globals, presence, pusher, rooms, - rooms::timeline::RawPduId, users, Dep, + account_data, + appservice::NamespaceRegex, + client, federation, globals, presence, pusher, + rooms::{self, timeline::RawPduId}, + users, Dep, }; pub struct Service { @@ -42,6 +45,7 @@ pub struct Service { } struct Services { + alias: Dep, client: Dep, globals: Dep, state: Dep, @@ -86,6 +90,7 @@ impl crate::Service for Service { db: Data::new(&args), server: args.server.clone(), services: Services { + alias: args.depend::("rooms::alias"), client: args.depend::("client"), globals: args.depend::("globals"), state: args.depend::("rooms::state"), @@ -214,6 +219,47 @@ impl Service { }) } + #[tracing::instrument(skip(self, serialized), level = "debug")] + pub fn send_edu_appservice(&self, appservice_id: &str, serialized: EduBuf) -> Result { + let dest = Destination::Appservice(appservice_id.to_owned()); + let event = SendingEvent::Edu(serialized); + let _cork = self.db.db.cork(); + let keys = self.db.queue_requests(once((&event, &dest))); + self.dispatch(Msg { + dest, + event, + queue_id: keys.into_iter().next().expect("request queue key"), + }) + } + + #[tracing::instrument(skip(self, room_id, serialized), level = "debug")] + pub async fn send_edu_appservice_room( + &self, + room_id: &RoomId, + serialized: EduBuf, + ) -> Result<()> { + for appservice in self.services.appservice.read().await.values() { + let matching_aliases = |aliases: NamespaceRegex| { + self.services + .alias + .local_aliases_for_room(room_id) + .ready_any(move |room_alias| aliases.is_match(room_alias.as_str())) + }; + + if appservice.rooms.is_match(room_id.as_str()) + || matching_aliases(appservice.aliases.clone()).await + || self + .services + .state_cache + .appservice_in_room(room_id, appservice) + .await + { + self.send_edu_appservice(&appservice.registration.id, serialized.clone())?; + } + } + Ok(()) + } + #[tracing::instrument(skip(self, room_id, serialized), level = "debug")] pub async fn send_edu_room(&self, room_id: &RoomId, serialized: EduBuf) -> Result { let servers = self