feat(PR977): Support omitting members in the send_join response

This commit is contained in:
nexy7574 2025-09-03 14:20:50 +01:00
commit 2cdccbf2fe
No known key found for this signature in database
2 changed files with 80 additions and 19 deletions

View file

@ -1,12 +1,13 @@
#![allow(deprecated)] #![allow(deprecated)]
use std::borrow::Borrow; use std::{borrow::Borrow, time::Instant};
use axum::extract::State; use axum::extract::State;
use conduwuit::{ use conduwuit::{
Err, Result, at, err, Err, Event, Result, at, debug, err, info,
matrix::event::gen_event_id_canonical_json, matrix::event::gen_event_id_canonical_json,
utils::stream::{IterStream, TryBroadbandExt}, trace,
utils::stream::{BroadbandExt, IterStream, TryBroadbandExt},
warn, warn,
}; };
use conduwuit_service::Services; use conduwuit_service::Services;
@ -25,12 +26,14 @@ use serde_json::value::{RawValue as RawJsonValue, to_raw_value};
use crate::Ruma; use crate::Ruma;
/// helper method for /send_join v1 and v2 /// helper method for /send_join v1 and v2
#[tracing::instrument(skip(services, pdu, omit_members), fields(room_id = room_id.as_str(), origin = origin.as_str()))]
async fn create_join_event( async fn create_join_event(
services: &Services, services: &Services,
origin: &ServerName, origin: &ServerName,
room_id: &RoomId, room_id: &RoomId,
pdu: &RawJsonValue, pdu: &RawJsonValue,
) -> Result<create_join_event::v1::RoomState> { omit_members: bool,
) -> Result<create_join_event::v2::RoomState> {
if !services.rooms.metadata.exists(room_id).await { if !services.rooms.metadata.exists(room_id).await {
return Err!(Request(NotFound("Room is unknown to this server."))); return Err!(Request(NotFound("Room is unknown to this server.")));
} }
@ -201,6 +204,7 @@ async fn create_join_event(
.lock(room_id) .lock(room_id)
.await; .await;
debug!("Acquired send_join mutex, persisting join event");
let pdu_id = services let pdu_id = services
.rooms .rooms
.event_handler .event_handler
@ -210,7 +214,7 @@ async fn create_join_event(
.ok_or_else(|| err!(Request(InvalidParam("Could not accept as timeline event."))))?; .ok_or_else(|| err!(Request(InvalidParam("Could not accept as timeline event."))))?;
drop(mutex_lock); drop(mutex_lock);
debug!("Fetching current state IDs");
let state_ids: Vec<OwnedEventId> = services let state_ids: Vec<OwnedEventId> = services
.rooms .rooms
.state_accessor .state_accessor
@ -219,9 +223,25 @@ async fn create_join_event(
.collect() .collect()
.await; .await;
#[allow(clippy::unnecessary_unwrap)]
let state = state_ids let state = state_ids
.iter() .iter()
.try_stream() .try_stream()
.broad_filter_map(|event_id| async move {
if omit_members && event_id.is_ok() {
let pdu = services
.rooms
.timeline
.get_pdu(event_id.as_ref().unwrap())
.await;
if pdu.is_ok_and(|p| p.kind().to_cow_str() == "m.room.member") {
trace!("omitting member event {event_id:?} from returned state");
// skip members
return None;
}
}
Some(event_id)
})
.broad_and_then(|event_id| services.rooms.timeline.get_pdu_json(event_id)) .broad_and_then(|event_id| services.rooms.timeline.get_pdu_json(event_id))
.broad_and_then(|pdu| { .broad_and_then(|pdu| {
services services
@ -238,6 +258,17 @@ async fn create_join_event(
.rooms .rooms
.auth_chain .auth_chain
.event_ids_iter(room_id, starting_events) .event_ids_iter(room_id, starting_events)
.broad_filter_map(|event_id| async {
if omit_members && event_id.as_ref().is_ok_and(|e| state_ids.contains(e)) {
// Don't include this event if it's already in the state
trace!(
"omitting member event {event_id:?} from returned auth chain as it is \
already in state"
);
return None;
}
Some(event_id)
})
.broad_and_then(|event_id| async move { .broad_and_then(|event_id| async move {
services.rooms.timeline.get_pdu_json(&event_id).await services.rooms.timeline.get_pdu_json(&event_id).await
}) })
@ -252,11 +283,27 @@ async fn create_join_event(
.await?; .await?;
services.sending.send_pdu_room(room_id, &pdu_id).await?; services.sending.send_pdu_room(room_id, &pdu_id).await?;
let servers_in_room: Option<Vec<_>> = if omit_members {
Ok(create_join_event::v1::RoomState { None
} else {
debug!("Fetching list of servers in room");
Some(
services
.rooms
.state_cache
.room_servers(room_id)
.map(|sn| sn.as_str().to_owned())
.collect()
.await,
)
};
debug!("Returning send_join data");
Ok(create_join_event::v2::RoomState {
auth_chain, auth_chain,
state, state,
event: to_raw_value(&CanonicalJsonValue::Object(value)).ok(), event: to_raw_value(&CanonicalJsonValue::Object(value)).ok(),
members_omitted: omit_members,
servers_in_room,
}) })
} }
@ -294,11 +341,24 @@ pub(crate) async fn create_join_event_v1_route(
} }
} }
let room_state = create_join_event(&services, body.origin(), &body.room_id, &body.pdu) info!("Providing send_join for {} in {}", body.origin(), &body.room_id);
let now = Instant::now();
let room_state = create_join_event(&services, body.origin(), &body.room_id, &body.pdu, false)
.boxed() .boxed()
.await?; .await?;
let transformed = create_join_event::v1::RoomState {
auth_chain: room_state.auth_chain,
state: room_state.state,
event: room_state.event,
};
info!(
"Finished creating the send_join payload for {} in {} in {:?}",
body.origin(),
&body.room_id,
now.elapsed()
);
Ok(create_join_event::v1::Response { room_state }) Ok(create_join_event::v1::Response { room_state: transformed })
} }
/// # `PUT /_matrix/federation/v2/send_join/{roomId}/{eventId}` /// # `PUT /_matrix/federation/v2/send_join/{roomId}/{eventId}`
@ -329,17 +389,18 @@ pub(crate) async fn create_join_event_v2_route(
} }
} }
let create_join_event::v1::RoomState { auth_chain, state, event } = info!("Providing send_join for {} in {}", body.origin(), &body.room_id);
create_join_event(&services, body.origin(), &body.room_id, &body.pdu) let now = Instant::now();
let room_state =
create_join_event(&services, body.origin(), &body.room_id, &body.pdu, body.omit_members)
.boxed() .boxed()
.await?; .await?;
let room_state = create_join_event::v2::RoomState { info!(
members_omitted: false, "Finished creating the send_join payload for {} in {} in {:?}",
auth_chain, body.origin(),
state, &body.room_id,
event, now.elapsed()
servers_in_room: None, );
};
Ok(create_join_event::v2::Response { room_state }) Ok(create_join_event::v2::Response { room_state })
} }

View file

@ -452,7 +452,7 @@ async fn get_statediff(&self, shortstatehash: ShortStateHash) -> Result<StateDif
.ok() .ok()
.take_if(|parent| *parent != 0); .take_if(|parent| *parent != 0);
debug_assert!(value.len() % STRIDE == 0, "value not aligned to stride"); debug_assert!(value.len().is_multiple_of(STRIDE), "value not aligned to stride");
let _num_values = value.len() / STRIDE; let _num_values = value.len() / STRIDE;
let mut add_mode = true; let mut add_mode = true;