Send read reciept and typing indicator EDUs to appservices with receive_ephemeral

This commit is contained in:
Jade Ellis 2024-12-18 03:04:39 +00:00 committed by strawberry
commit 3675c941f8
8 changed files with 135 additions and 46 deletions

View file

@ -25,7 +25,10 @@ pub use self::{
sender::{EDU_LIMIT, PDU_LIMIT},
};
use crate::{
account_data, client, globals, presence, pusher, resolver, rooms, rooms::timeline::RawPduId,
account_data,
appservice::NamespaceRegex,
client, globals, presence, pusher, resolver,
rooms::{self, timeline::RawPduId},
server_keys, users, Dep,
};
@ -38,6 +41,7 @@ pub struct Service {
}
struct Services {
alias: Dep<rooms::alias::Service>,
client: Dep<client::Service>,
globals: Dep<globals::Service>,
resolver: Dep<resolver::Service>,
@ -76,6 +80,7 @@ impl crate::Service for Service {
Ok(Arc::new(Self {
server: args.server.clone(),
services: Services {
alias: args.depend::<rooms::alias::Service>("rooms::alias"),
client: args.depend::<client::Service>("client"),
globals: args.depend::<globals::Service>("globals"),
resolver: args.depend::<resolver::Service>("resolver"),
@ -184,6 +189,47 @@ impl Service {
})
}
#[tracing::instrument(skip(self, serialized), level = "debug")]
pub fn send_edu_appservice(&self, appservice_id: String, serialized: Vec<u8>) -> Result {
let dest = Destination::Appservice(appservice_id);
let event = SendingEvent::Edu(serialized);
let _cork = self.db.db.cork();
let keys = self.db.queue_requests(once((&event, &dest)));
self.dispatch(Msg {
dest,
event,
queue_id: keys.into_iter().next().expect("request queue key"),
})
}
#[tracing::instrument(skip(self, room_id, serialized), level = "debug")]
pub async fn send_edu_appservice_room(
&self,
room_id: &RoomId,
serialized: Vec<u8>,
) -> Result<()> {
for appservice in self.services.appservice.read().await.values() {
let matching_aliases = |aliases: NamespaceRegex| {
self.services
.alias
.local_aliases_for_room(room_id)
.ready_any(move |room_alias| aliases.is_match(room_alias.as_str()))
};
if appservice.rooms.is_match(room_id.as_str())
|| matching_aliases(appservice.aliases.clone()).await
|| self
.services
.state_cache
.appservice_in_room(room_id, appservice)
.await
{
self.send_edu_appservice(appservice.registration.id.clone(), serialized.clone())?;
}
}
Ok(())
}
#[tracing::instrument(skip(self, room_id, serialized), level = "debug")]
pub async fn send_edu_room(&self, room_id: &RoomId, serialized: Vec<u8>) -> Result<()> {
let servers = self

View file

@ -1,3 +1,4 @@
use core::str;
use std::{
collections::{BTreeMap, HashMap, HashSet},
fmt::Debug,
@ -21,7 +22,7 @@ use futures::{
};
use ruma::{
api::{
appservice::event::push_events::v1::Edu as RumaEdu,
appservice::event::push_events::v1::EphemeralData,
federation::transactions::{
edu::{
DeviceListUpdateContent, Edu, PresenceContent, PresenceUpdate, ReceiptContent,
@ -587,7 +588,7 @@ impl Service {
.filter(|event| matches!(event, SendingEvent::Pdu(_)))
.count(),
);
let mut edu_jsons: Vec<RumaEdu> = Vec::with_capacity(
let mut edu_jsons: Vec<EphemeralData> = Vec::with_capacity(
events
.iter()
.filter(|event| matches!(event, SendingEvent::Edu(_)))
@ -600,16 +601,12 @@ impl Service {
pdu_jsons.push(pdu.to_room_event());
}
},
| SendingEvent::Edu(edu) => {
if appservice
.receive_ephemeral
.is_some_and(|receive_edus| receive_edus)
{
| SendingEvent::Edu(edu) =>
if appservice.receive_ephemeral {
if let Ok(edu) = serde_json::from_slice(edu) {
edu_jsons.push(edu);
}
}
},
},
| SendingEvent::Flush => {}, // flush only; no new content
}
}