send read receipts and typing indicators to appservices

This commit is contained in:
Jade Ellis 2025-01-29 20:30:45 +00:00
parent eb7d893c86
commit d26d5cfa3a
No known key found for this signature in database
GPG key ID: 8705A2A3EBF77BD2
6 changed files with 148 additions and 52 deletions

View file

@ -72,14 +72,10 @@ pub(crate) async fn set_read_marker_route(
services services
.rooms .rooms
.read_receipt .read_receipt
.readreceipt_update( .readreceipt_update(sender_user, &body.room_id, ruma::events::receipt::ReceiptEvent {
sender_user, content: ruma::events::receipt::ReceiptEventContent(receipt_content),
&body.room_id, room_id: body.room_id.clone(),
&ruma::events::receipt::ReceiptEvent { })
content: ruma::events::receipt::ReceiptEventContent(receipt_content),
room_id: body.room_id.clone(),
},
)
.await; .await;
} }
@ -171,7 +167,7 @@ pub(crate) async fn create_receipt_route(
.readreceipt_update( .readreceipt_update(
sender_user, sender_user,
&body.room_id, &body.room_id,
&ruma::events::receipt::ReceiptEvent { ruma::events::receipt::ReceiptEvent {
content: ruma::events::receipt::ReceiptEventContent(receipt_content), content: ruma::events::receipt::ReceiptEventContent(receipt_content),
room_id: body.room_id.clone(), room_id: body.room_id.clone(),
}, },

View file

@ -791,8 +791,8 @@ async fn load_joined_room(
let typings = services let typings = services
.rooms .rooms
.typing .typing
.typings_all(room_id, sender_user) .typings_user(room_id, sender_user)
.await?; .await;
Ok(vec![serde_json::from_str(&serde_json::to_string(&typings)?)?]) Ok(vec![serde_json::from_str(&serde_json::to_string(&typings)?)?])
}) })

View file

@ -370,7 +370,7 @@ async fn handle_edu_receipt_room_user(
services services
.rooms .rooms
.read_receipt .read_receipt
.readreceipt_update(user_id, room_id, &ReceiptEvent { .readreceipt_update(user_id, room_id, ReceiptEvent {
content: ReceiptEventContent(content.into()), content: ReceiptEventContent(content.into()),
room_id: room_id.to_owned(), room_id: room_id.to_owned(),
}) })

View file

@ -2,9 +2,10 @@ mod data;
use std::{collections::BTreeMap, sync::Arc}; use std::{collections::BTreeMap, sync::Arc};
use conduwuit::{debug, err, warn, PduCount, PduId, RawPduId, Result}; use conduwuit::{debug, err, result::LogErr, warn, PduCount, PduId, RawPduId, Result};
use futures::{try_join, Stream, TryFutureExt}; use futures::{try_join, Stream, TryFutureExt};
use ruma::{ use ruma::{
api::appservice::event::push_events::v1::EphemeralData,
events::{ events::{
receipt::{ReceiptEvent, ReceiptEventContent, Receipts}, receipt::{ReceiptEvent, ReceiptEventContent, Receipts},
AnySyncEphemeralRoomEvent, SyncEphemeralRoomEvent, AnySyncEphemeralRoomEvent, SyncEphemeralRoomEvent,
@ -14,7 +15,11 @@ use ruma::{
}; };
use self::data::{Data, ReceiptItem}; use self::data::{Data, ReceiptItem};
use crate::{rooms, sending, Dep}; use crate::{
rooms,
sending::{self, EduBuf},
Dep,
};
pub struct Service { pub struct Service {
services: Services, services: Services,
@ -48,14 +53,24 @@ impl Service {
&self, &self,
user_id: &UserId, user_id: &UserId,
room_id: &RoomId, room_id: &RoomId,
event: &ReceiptEvent, event: ReceiptEvent,
) { ) {
self.db.readreceipt_update(user_id, room_id, event).await; self.db.readreceipt_update(user_id, room_id, &event).await;
self.services self.services
.sending .sending
.flush_room(room_id) .flush_room(room_id)
.await .await
.expect("room flush failed"); .expect("room flush failed");
// update appservices
let edu = EphemeralData::Receipt(event);
let mut buf = EduBuf::new();
serde_json::to_writer(&mut buf, &edu).expect("Serialized EphemeralData::Receipt");
_ = self
.services
.sending
.send_edu_appservice_room(room_id, buf)
.await
.log_err();
} }
/// Gets the latest private read receipt from the user in the room /// Gets the latest private read receipt from the user in the room

View file

@ -7,8 +7,11 @@ use conduwuit::{
}; };
use futures::StreamExt; use futures::StreamExt;
use ruma::{ use ruma::{
api::federation::transactions::edu::{Edu, TypingContent}, api::{
events::SyncEphemeralRoomEvent, appservice::event::push_events::v1::EphemeralData,
federation::transactions::edu::{Edu, TypingContent},
},
events::{typing::TypingEventContent, EphemeralRoomEvent, SyncEphemeralRoomEvent},
OwnedRoomId, OwnedUserId, RoomId, UserId, OwnedRoomId, OwnedUserId, RoomId, UserId,
}; };
use tokio::sync::{broadcast, RwLock}; use tokio::sync::{broadcast, RwLock};
@ -76,6 +79,9 @@ impl Service {
trace!("receiver found what it was looking for and is no longer interested"); trace!("receiver found what it was looking for and is no longer interested");
} }
// update appservices
self.appservice_send(room_id).await?;
// update federation // update federation
if self.services.globals.user_is_local(user_id) { if self.services.globals.user_is_local(user_id) {
self.federation_send(room_id, user_id, true).await?; self.federation_send(room_id, user_id, true).await?;
@ -104,6 +110,9 @@ impl Service {
trace!("receiver found what it was looking for and is no longer interested"); trace!("receiver found what it was looking for and is no longer interested");
} }
// update appservices
self.appservice_send(room_id).await?;
// update federation // update federation
if self.services.globals.user_is_local(user_id) { if self.services.globals.user_is_local(user_id) {
self.federation_send(room_id, user_id, false).await?; self.federation_send(room_id, user_id, false).await?;
@ -139,29 +148,33 @@ impl Service {
} }
}; };
if !removable.is_empty() { if removable.is_empty() {
let typing = &mut self.typing.write().await; return Ok(());
let room = typing.entry(room_id.to_owned()).or_default(); }
for user in &removable { let typing = &mut self.typing.write().await;
debug_info!("typing timeout {user:?} in {room_id:?}"); let room = typing.entry(room_id.to_owned()).or_default();
room.remove(user); for user in &removable {
} debug_info!("typing timeout {user:?} in {room_id:?}");
room.remove(user);
}
// update clients // update clients
self.last_typing_update self.last_typing_update
.write() .write()
.await .await
.insert(room_id.to_owned(), self.services.globals.next_count()?); .insert(room_id.to_owned(), self.services.globals.next_count()?);
if self.typing_update_sender.send(room_id.to_owned()).is_err() { if self.typing_update_sender.send(room_id.to_owned()).is_err() {
trace!("receiver found what it was looking for and is no longer interested"); trace!("receiver found what it was looking for and is no longer interested");
} }
// update federation // update appservices
for user in &removable { self.appservice_send(room_id).await?;
if self.services.globals.user_is_local(user) {
self.federation_send(room_id, user, false).await?; // update federation
} for user in &removable {
if self.services.globals.user_is_local(user) {
self.federation_send(room_id, user, false).await?;
} }
} }
@ -180,18 +193,31 @@ impl Service {
.unwrap_or(0)) .unwrap_or(0))
} }
/// Returns a new typing EDU. /// Returns a new typing EDU's content.
pub async fn typings_all( pub async fn typings_content(&self, room_id: &RoomId) -> TypingEventContent {
&self,
room_id: &RoomId,
sender_user: &UserId,
) -> Result<SyncEphemeralRoomEvent<ruma::events::typing::TypingEventContent>> {
let room_typing_indicators = self.typing.read().await.get(room_id).cloned(); let room_typing_indicators = self.typing.read().await.get(room_id).cloned();
let Some(typing_indicators) = room_typing_indicators else { let Some(typing_indicators) = room_typing_indicators else {
return Ok(SyncEphemeralRoomEvent { return TypingEventContent { user_ids: Vec::new() };
content: ruma::events::typing::TypingEventContent { user_ids: Vec::new() }, };
});
let user_ids: Vec<_> = typing_indicators.into_keys().collect();
TypingEventContent { user_ids }
}
/// Returns a new typing EDU, filtered for a specific user
pub async fn typings_user(
&self,
room_id: &RoomId,
sender_user: &UserId,
) -> SyncEphemeralRoomEvent<TypingEventContent> {
let room_typing_indicators = self.typing.read().await.get(room_id).cloned();
let Some(typing_indicators) = room_typing_indicators else {
return SyncEphemeralRoomEvent {
content: TypingEventContent { user_ids: Vec::new() },
};
}; };
let user_ids: Vec<_> = typing_indicators let user_ids: Vec<_> = typing_indicators
@ -208,9 +234,7 @@ impl Service {
.collect() .collect()
.await; .await;
Ok(SyncEphemeralRoomEvent { SyncEphemeralRoomEvent { content: TypingEventContent { user_ids } }
content: ruma::events::typing::TypingEventContent { user_ids },
})
} }
async fn federation_send( async fn federation_send(
@ -238,4 +262,19 @@ impl Service {
Ok(()) Ok(())
} }
async fn appservice_send(&self, room_id: &RoomId) -> Result<()> {
let edu = EphemeralData::Typing(EphemeralRoomEvent {
content: self.typings_content(room_id).await,
room_id: room_id.into(),
});
let mut buf = EduBuf::new();
serde_json::to_writer(&mut buf, &edu).expect("Serialized Edu::Typing");
self.services
.sending
.send_edu_appservice_room(room_id, buf)
.await
}
} }

View file

@ -30,8 +30,11 @@ pub use self::{
sender::{EDU_LIMIT, PDU_LIMIT}, sender::{EDU_LIMIT, PDU_LIMIT},
}; };
use crate::{ use crate::{
account_data, client, federation, globals, presence, pusher, rooms, account_data,
rooms::timeline::RawPduId, users, Dep, appservice::NamespaceRegex,
client, federation, globals, presence, pusher,
rooms::{self, timeline::RawPduId},
users, Dep,
}; };
pub struct Service { pub struct Service {
@ -42,6 +45,7 @@ pub struct Service {
} }
struct Services { struct Services {
alias: Dep<rooms::alias::Service>,
client: Dep<client::Service>, client: Dep<client::Service>,
globals: Dep<globals::Service>, globals: Dep<globals::Service>,
state: Dep<rooms::state::Service>, state: Dep<rooms::state::Service>,
@ -86,6 +90,7 @@ impl crate::Service for Service {
db: Data::new(&args), db: Data::new(&args),
server: args.server.clone(), server: args.server.clone(),
services: Services { services: Services {
alias: args.depend::<rooms::alias::Service>("rooms::alias"),
client: args.depend::<client::Service>("client"), client: args.depend::<client::Service>("client"),
globals: args.depend::<globals::Service>("globals"), globals: args.depend::<globals::Service>("globals"),
state: args.depend::<rooms::state::Service>("rooms::state"), state: args.depend::<rooms::state::Service>("rooms::state"),
@ -214,6 +219,47 @@ impl Service {
}) })
} }
#[tracing::instrument(skip(self, serialized), level = "debug")]
pub fn send_edu_appservice(&self, appservice_id: &str, serialized: EduBuf) -> Result {
let dest = Destination::Appservice(appservice_id.to_owned());
let event = SendingEvent::Edu(serialized);
let _cork = self.db.db.cork();
let keys = self.db.queue_requests(once((&event, &dest)));
self.dispatch(Msg {
dest,
event,
queue_id: keys.into_iter().next().expect("request queue key"),
})
}
#[tracing::instrument(skip(self, room_id, serialized), level = "debug")]
pub async fn send_edu_appservice_room(
&self,
room_id: &RoomId,
serialized: EduBuf,
) -> Result<()> {
for appservice in self.services.appservice.read().await.values() {
let matching_aliases = |aliases: NamespaceRegex| {
self.services
.alias
.local_aliases_for_room(room_id)
.ready_any(move |room_alias| aliases.is_match(room_alias.as_str()))
};
if appservice.rooms.is_match(room_id.as_str())
|| matching_aliases(appservice.aliases.clone()).await
|| self
.services
.state_cache
.appservice_in_room(room_id, appservice)
.await
{
self.send_edu_appservice(&appservice.registration.id, serialized.clone())?;
}
}
Ok(())
}
#[tracing::instrument(skip(self, room_id, serialized), level = "debug")] #[tracing::instrument(skip(self, room_id, serialized), level = "debug")]
pub async fn send_edu_room(&self, room_id: &RoomId, serialized: EduBuf) -> Result { pub async fn send_edu_room(&self, room_id: &RoomId, serialized: EduBuf) -> Result {
let servers = self let servers = self