From 35f69844e1e7ed29e4d53e5f219f59f905e7d128 Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Thu, 31 Jul 2025 19:52:19 +0100 Subject: [PATCH] feat(fast-joins): Implement using fast joins sorta --- src/api/client/membership/join.rs | 137 ++++++++++++++++++++++++++++-- 1 file changed, 132 insertions(+), 5 deletions(-) diff --git a/src/api/client/membership/join.rs b/src/api/client/membership/join.rs index f3434bf5..62ac876a 100644 --- a/src/api/client/membership/join.rs +++ b/src/api/client/membership/join.rs @@ -3,7 +3,7 @@ use std::{borrow::Borrow, collections::HashMap, iter::once, sync::Arc}; use axum::extract::State; use axum_client_ip::InsecureClientIp; use conduwuit::{ - Err, Result, debug, debug_info, debug_warn, err, error, info, + Err, Result, debug, debug_error, debug_info, debug_warn, err, error, info, matrix::{ StateKey, event::{gen_event_id, gen_event_id_canonical_json}, @@ -20,14 +20,14 @@ use conduwuit::{ }; use futures::{FutureExt, StreamExt}; use ruma::{ - CanonicalJsonObject, CanonicalJsonValue, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId, - RoomVersionId, UserId, + CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId, OwnedServerName, + OwnedUserId, RoomId, RoomVersionId, UserId, api::{ client::{ error::ErrorKind, membership::{ThirdPartySigned, join_room_by_id, join_room_by_id_or_alias}, }, - federation::{self}, + federation::{self, event::get_room_state}, }, canonical_json::to_canonical_value, events::{ @@ -691,6 +691,121 @@ async fn join_room_by_id_helper_remote( Ok(()) } +async fn fetch_state_in_background( + services: &Services, + room_id: OwnedRoomId, + event_id: OwnedEventId, + via: Vec, +) -> Result<()> { + let room_version = services.rooms.state.get_room_version(&room_id).await?; + + let mut state: HashMap = HashMap::new(); + + let mut cur_response = None; + for server_name in via { + cur_response = services + .sending + .send_federation_request(&server_name, get_room_state::v1::Request { + room_id: room_id.clone(), + event_id: event_id.clone(), + }) + .await + .ok(); + if cur_response.is_some() { + break; + } + } + let remote_state_response = cur_response + .ok_or_else(|| err!(BadServerResponse("Could not fetch state from any server")))?; + + for pdu in remote_state_response.pdus.clone() { + match services.rooms.event_handler.parse_incoming_pdu(&pdu).await { + | Ok(t) => t, + | Err(e) => { + warn!("Could not parse PDU, ignoring: {e}"); + continue; + }, + }; + } + + info!("Going through room_state response PDUs"); + for result in remote_state_response.pdus.iter().map(|pdu| { + services + .server_keys + .validate_and_add_event_id(pdu, &room_version) + }) { + let Ok((evt_id, value)) = result.await else { + continue; + }; + + let pdu = PduEvent::from_id_val(&evt_id, value.clone()).map_err(|e| { + debug_error!("Invalid PDU in fetching remote room state PDUs response: {value:#?}"); + err!(BadServerResponse(debug_error!("Invalid PDU in send_join response: {e:?}"))) + })?; + + services.rooms.outlier.add_pdu_outlier(&evt_id, &value); + + if let Some(state_key) = &pdu.state_key { + let shortstatekey = services + .rooms + .short + .get_or_create_shortstatekey(&pdu.kind.to_string().into(), state_key) + .await; + + state.insert(shortstatekey, pdu.event_id.clone()); + } + } + + info!("Going through auth_chain response"); + for result in remote_state_response.auth_chain.iter().map(|pdu| { + services + .server_keys + .validate_and_add_event_id(pdu, &room_version) + }) { + let Ok((event_id, value)) = result.await else { + continue; + }; + + services.rooms.outlier.add_pdu_outlier(&event_id, &value); + } + + let new_room_state = services + .rooms + .event_handler + .resolve_state(&room_id, &room_version, state) + .await?; + + info!("Forcing new room state"); + let state_lock = services.rooms.state.mutex.lock(&room_id).await; + let HashSetCompressStateEvent { + shortstatehash: short_state_hash, + added, + removed, + } = services + .rooms + .state_compressor + .save_state(room_id.as_ref(), new_room_state) + .await?; + + services + .rooms + .state + .force_state(room_id.as_ref(), short_state_hash, added, removed, &state_lock) + .await?; + + info!( + "Updating joined counts for room just in case (e.g. we may have found a difference in \ + the room's m.room.member state" + ); + services + .rooms + .state_cache + .update_joined_count(&room_id) + .await; + drop(state_lock); + Ok(()) +} + #[tracing::instrument(skip_all, fields(%sender_user, %room_id), name = "join_local")] async fn join_room_by_id_helper_local( services: &Services, @@ -875,6 +990,7 @@ async fn join_room_by_id_helper_local( // It has enough fields to be called a proper event now let join_event = join_event_stub; + info!("Performing fancy snazzy new v2 fast join"); let send_join_response = services .sending .send_synapse_request( @@ -882,7 +998,7 @@ async fn join_room_by_id_helper_local( federation::membership::create_join_event::v2::Request { room_id: room_id.to_owned(), event_id: event_id.clone(), - omit_members: false, + omit_members: true, pdu: services .sending .convert_to_outgoing_federation_event(join_event.clone()) @@ -910,6 +1026,17 @@ async fn join_room_by_id_helper_local( .handle_incoming_pdu(&remote_server, room_id, &signed_event_id, signed_value, true) .boxed() .await?; + // TODO: Actually do this in the background + // tokio::spawn(fetch_state_in_background( + // services.clone(), + // room_id.to_owned(), + // event_id.clone(), + // servers.to_vec(), + // )); + info!("Fetching state in background"); + fetch_state_in_background(services, room_id.to_owned(), event_id, servers.to_vec()) + .boxed() + .await?; } else { return Err(error); }