From 2cdccbf2fe16fee50c8d93a9816c66c4ec6bf698 Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Wed, 3 Sep 2025 14:20:50 +0100 Subject: [PATCH] feat(PR977): Support omitting members in the send_join response --- src/api/server/send_join.rs | 97 ++++++++++++++++++----- src/service/rooms/state_compressor/mod.rs | 2 +- 2 files changed, 80 insertions(+), 19 deletions(-) diff --git a/src/api/server/send_join.rs b/src/api/server/send_join.rs index 652451c7..3aebbbe7 100644 --- a/src/api/server/send_join.rs +++ b/src/api/server/send_join.rs @@ -1,12 +1,13 @@ #![allow(deprecated)] -use std::borrow::Borrow; +use std::{borrow::Borrow, time::Instant}; use axum::extract::State; use conduwuit::{ - Err, Result, at, err, + Err, Event, Result, at, debug, err, info, matrix::event::gen_event_id_canonical_json, - utils::stream::{IterStream, TryBroadbandExt}, + trace, + utils::stream::{BroadbandExt, IterStream, TryBroadbandExt}, warn, }; use conduwuit_service::Services; @@ -25,12 +26,14 @@ use serde_json::value::{RawValue as RawJsonValue, to_raw_value}; use crate::Ruma; /// helper method for /send_join v1 and v2 +#[tracing::instrument(skip(services, pdu, omit_members), fields(room_id = room_id.as_str(), origin = origin.as_str()))] async fn create_join_event( services: &Services, origin: &ServerName, room_id: &RoomId, pdu: &RawJsonValue, -) -> Result { + omit_members: bool, +) -> Result { if !services.rooms.metadata.exists(room_id).await { return Err!(Request(NotFound("Room is unknown to this server."))); } @@ -201,6 +204,7 @@ async fn create_join_event( .lock(room_id) .await; + debug!("Acquired send_join mutex, persisting join event"); let pdu_id = services .rooms .event_handler @@ -210,7 +214,7 @@ async fn create_join_event( .ok_or_else(|| err!(Request(InvalidParam("Could not accept as timeline event."))))?; drop(mutex_lock); - + debug!("Fetching current state IDs"); let state_ids: Vec = services .rooms .state_accessor @@ -219,9 +223,25 @@ async fn create_join_event( .collect() .await; + #[allow(clippy::unnecessary_unwrap)] let state = state_ids .iter() .try_stream() + .broad_filter_map(|event_id| async move { + if omit_members && event_id.is_ok() { + let pdu = services + .rooms + .timeline + .get_pdu(event_id.as_ref().unwrap()) + .await; + if pdu.is_ok_and(|p| p.kind().to_cow_str() == "m.room.member") { + trace!("omitting member event {event_id:?} from returned state"); + // skip members + return None; + } + } + Some(event_id) + }) .broad_and_then(|event_id| services.rooms.timeline.get_pdu_json(event_id)) .broad_and_then(|pdu| { services @@ -238,6 +258,17 @@ async fn create_join_event( .rooms .auth_chain .event_ids_iter(room_id, starting_events) + .broad_filter_map(|event_id| async { + if omit_members && event_id.as_ref().is_ok_and(|e| state_ids.contains(e)) { + // Don't include this event if it's already in the state + trace!( + "omitting member event {event_id:?} from returned auth chain as it is \ + already in state" + ); + return None; + } + Some(event_id) + }) .broad_and_then(|event_id| async move { services.rooms.timeline.get_pdu_json(&event_id).await }) @@ -252,11 +283,27 @@ async fn create_join_event( .await?; services.sending.send_pdu_room(room_id, &pdu_id).await?; - - Ok(create_join_event::v1::RoomState { + let servers_in_room: Option> = if omit_members { + None + } else { + debug!("Fetching list of servers in room"); + Some( + services + .rooms + .state_cache + .room_servers(room_id) + .map(|sn| sn.as_str().to_owned()) + .collect() + .await, + ) + }; + debug!("Returning send_join data"); + Ok(create_join_event::v2::RoomState { auth_chain, state, event: to_raw_value(&CanonicalJsonValue::Object(value)).ok(), + members_omitted: omit_members, + servers_in_room, }) } @@ -294,11 +341,24 @@ pub(crate) async fn create_join_event_v1_route( } } - let room_state = create_join_event(&services, body.origin(), &body.room_id, &body.pdu) + info!("Providing send_join for {} in {}", body.origin(), &body.room_id); + let now = Instant::now(); + let room_state = create_join_event(&services, body.origin(), &body.room_id, &body.pdu, false) .boxed() .await?; + let transformed = create_join_event::v1::RoomState { + auth_chain: room_state.auth_chain, + state: room_state.state, + event: room_state.event, + }; + info!( + "Finished creating the send_join payload for {} in {} in {:?}", + body.origin(), + &body.room_id, + now.elapsed() + ); - Ok(create_join_event::v1::Response { room_state }) + Ok(create_join_event::v1::Response { room_state: transformed }) } /// # `PUT /_matrix/federation/v2/send_join/{roomId}/{eventId}` @@ -329,17 +389,18 @@ pub(crate) async fn create_join_event_v2_route( } } - let create_join_event::v1::RoomState { auth_chain, state, event } = - create_join_event(&services, body.origin(), &body.room_id, &body.pdu) + info!("Providing send_join for {} in {}", body.origin(), &body.room_id); + let now = Instant::now(); + let room_state = + create_join_event(&services, body.origin(), &body.room_id, &body.pdu, body.omit_members) .boxed() .await?; - let room_state = create_join_event::v2::RoomState { - members_omitted: false, - auth_chain, - state, - event, - servers_in_room: None, - }; + info!( + "Finished creating the send_join payload for {} in {} in {:?}", + body.origin(), + &body.room_id, + now.elapsed() + ); Ok(create_join_event::v2::Response { room_state }) } diff --git a/src/service/rooms/state_compressor/mod.rs b/src/service/rooms/state_compressor/mod.rs index f7f7d043..028c3e51 100644 --- a/src/service/rooms/state_compressor/mod.rs +++ b/src/service/rooms/state_compressor/mod.rs @@ -452,7 +452,7 @@ async fn get_statediff(&self, shortstatehash: ShortStateHash) -> Result