diff --git a/src/api/server/send_join.rs b/src/api/server/send_join.rs index d92581a1..652451c7 100644 --- a/src/api/server/send_join.rs +++ b/src/api/server/send_join.rs @@ -1,13 +1,12 @@ #![allow(deprecated)] -use std::{borrow::Borrow, time::Instant, vec}; +use std::borrow::Borrow; use axum::extract::State; use conduwuit::{ - Err, Event, Result, at, debug, err, info, + Err, Result, at, err, matrix::event::gen_event_id_canonical_json, - trace, - utils::stream::{BroadbandExt, IterStream, TryBroadbandExt}, + utils::stream::{IterStream, TryBroadbandExt}, warn, }; use conduwuit_service::Services; @@ -26,14 +25,12 @@ use serde_json::value::{RawValue as RawJsonValue, to_raw_value}; use crate::Ruma; /// helper method for /send_join v1 and v2 -#[tracing::instrument(skip(services, pdu, omit_members), fields(room_id = room_id.as_str(), origin = origin.as_str()))] async fn create_join_event( services: &Services, origin: &ServerName, room_id: &RoomId, pdu: &RawJsonValue, - omit_members: bool, -) -> Result { +) -> Result { if !services.rooms.metadata.exists(room_id).await { return Err!(Request(NotFound("Room is unknown to this server."))); } @@ -56,10 +53,8 @@ async fn create_join_event( // We do not add the event_id field to the pdu here because of signature and // hashes checks - trace!("Getting room version"); let room_version_id = services.rooms.state.get_room_version(room_id).await?; - trace!("Generating event ID and converting to canonical json"); let Ok((event_id, mut value)) = gen_event_id_canonical_json(pdu, &room_version_id) else { // Event could not be converted to canonical json return Err!(Request(BadJson("Could not convert event to canonical json."))); @@ -108,6 +103,7 @@ async fn create_join_event( ))); } + // ACL check sender user server name let sender: OwnedUserId = serde_json::from_value( value .get("sender") @@ -117,6 +113,12 @@ async fn create_join_event( ) .map_err(|e| err!(Request(BadJson(warn!("sender property is not a valid user ID: {e}")))))?; + services + .rooms + .event_handler + .acl_check(sender.server_name(), room_id) + .await?; + // check if origin server is trying to send for another server if sender.server_name() != origin { return Err!(Request(Forbidden("Not allowed to join on behalf of another server."))); @@ -178,6 +180,11 @@ async fn create_join_event( } } + services + .server_keys + .hash_and_sign_event(&mut value, &room_version_id) + .map_err(|e| err!(Request(InvalidParam(warn!("Failed to sign send_join event: {e}")))))?; + let origin: OwnedServerName = serde_json::from_value( value .get("origin") @@ -187,12 +194,6 @@ async fn create_join_event( ) .map_err(|e| err!(Request(BadJson("Event has an invalid origin server name: {e}"))))?; - trace!("Signing send_join event"); - services - .server_keys - .hash_and_sign_event(&mut value, &room_version_id) - .map_err(|e| err!(Request(InvalidParam(warn!("Failed to sign send_join event: {e}")))))?; - let mutex_lock = services .rooms .event_handler @@ -200,7 +201,6 @@ async fn create_join_event( .lock(room_id) .await; - trace!("Acquired send_join mutex, persisting join event"); let pdu_id = services .rooms .event_handler @@ -210,7 +210,7 @@ async fn create_join_event( .ok_or_else(|| err!(Request(InvalidParam("Could not accept as timeline event."))))?; drop(mutex_lock); - trace!("Fetching current state IDs"); + let state_ids: Vec = services .rooms .state_accessor @@ -219,23 +219,9 @@ async fn create_join_event( .collect() .await; - trace!(%omit_members, "Constructing current state"); let state = state_ids .iter() .try_stream() - .broad_filter_map(|event_id| async move { - if omit_members { - if let Ok(e) = event_id.as_ref() { - let pdu = services.rooms.timeline.get_pdu(e).await; - if pdu.is_ok_and(|p| p.kind().to_cow_str() == "m.room.member") { - trace!("omitting member event {e:?} from returned state"); - // skip members - return None; - } - } - } - Some(event_id) - }) .broad_and_then(|event_id| services.rooms.timeline.get_pdu_json(event_id)) .broad_and_then(|pdu| { services @@ -248,7 +234,6 @@ async fn create_join_event( .await?; let starting_events = state_ids.iter().map(Borrow::borrow); - trace!("Constructing auth chain"); let auth_chain = services .rooms .auth_chain @@ -265,37 +250,13 @@ async fn create_join_event( .try_collect() .boxed() .await?; - info!(fast_join = %omit_members, "Sending join event to other servers"); + services.sending.send_pdu_room(room_id, &pdu_id).await?; - debug!("Finished sending join event"); - let servers_in_room: Option> = if !omit_members { - None - } else { - trace!("Fetching list of servers in room"); - let servers: Vec = services - .rooms - .state_cache - .room_servers(room_id) - .map(|sn| sn.as_str().to_owned()) - .collect() - .await; - // If there's no servers, just add us - let servers = if servers.is_empty() { - warn!("Failed to find any servers, adding our own server name as a last resort"); - vec![services.globals.server_name().to_string()] - } else { - trace!("Found {} servers in room", servers.len()); - servers - }; - Some(servers) - }; - debug!("Returning send_join data"); - Ok(create_join_event::v2::RoomState { + + Ok(create_join_event::v1::RoomState { auth_chain, state, event: to_raw_value(&CanonicalJsonValue::Object(value)).ok(), - members_omitted: omit_members, - servers_in_room, }) } @@ -333,23 +294,11 @@ pub(crate) async fn create_join_event_v1_route( } } - let now = Instant::now(); - let room_state = create_join_event(&services, body.origin(), &body.room_id, &body.pdu, false) + let room_state = create_join_event(&services, body.origin(), &body.room_id, &body.pdu) .boxed() .await?; - let transformed = create_join_event::v1::RoomState { - auth_chain: room_state.auth_chain, - state: room_state.state, - event: room_state.event, - }; - info!( - "Finished sending a join for {} in {} in {:?}", - body.origin(), - &body.room_id, - now.elapsed() - ); - Ok(create_join_event::v1::Response { room_state: transformed }) + Ok(create_join_event::v1::Response { room_state }) } /// # `PUT /_matrix/federation/v2/send_join/{roomId}/{eventId}` @@ -380,17 +329,17 @@ pub(crate) async fn create_join_event_v2_route( } } - let now = Instant::now(); - let room_state = - create_join_event(&services, body.origin(), &body.room_id, &body.pdu, body.omit_members) + let create_join_event::v1::RoomState { auth_chain, state, event } = + create_join_event(&services, body.origin(), &body.room_id, &body.pdu) .boxed() .await?; - info!( - "Finished sending a join for {} in {} in {:?}", - body.origin(), - &body.room_id, - now.elapsed() - ); + let room_state = create_join_event::v2::RoomState { + members_omitted: false, + auth_chain, + state, + event, + servers_in_room: None, + }; Ok(create_join_event::v2::Response { room_state }) } diff --git a/src/service/rooms/state_compressor/mod.rs b/src/service/rooms/state_compressor/mod.rs index 028c3e51..f7f7d043 100644 --- a/src/service/rooms/state_compressor/mod.rs +++ b/src/service/rooms/state_compressor/mod.rs @@ -452,7 +452,7 @@ async fn get_statediff(&self, shortstatehash: ShortStateHash) -> Result