mirror of
https://forgejo.ellis.link/continuwuation/continuwuity.git
synced 2025-09-10 04:22:50 +02:00
feat(fast-joins): Implement using fast joins sorta
This commit is contained in:
parent
57eae642be
commit
35f69844e1
1 changed files with 132 additions and 5 deletions
|
@ -3,7 +3,7 @@ use std::{borrow::Borrow, collections::HashMap, iter::once, sync::Arc};
|
|||
use axum::extract::State;
|
||||
use axum_client_ip::InsecureClientIp;
|
||||
use conduwuit::{
|
||||
Err, Result, debug, debug_info, debug_warn, err, error, info,
|
||||
Err, Result, debug, debug_error, debug_info, debug_warn, err, error, info,
|
||||
matrix::{
|
||||
StateKey,
|
||||
event::{gen_event_id, gen_event_id_canonical_json},
|
||||
|
@ -20,14 +20,14 @@ use conduwuit::{
|
|||
};
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use ruma::{
|
||||
CanonicalJsonObject, CanonicalJsonValue, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId,
|
||||
RoomVersionId, UserId,
|
||||
CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId, OwnedServerName,
|
||||
OwnedUserId, RoomId, RoomVersionId, UserId,
|
||||
api::{
|
||||
client::{
|
||||
error::ErrorKind,
|
||||
membership::{ThirdPartySigned, join_room_by_id, join_room_by_id_or_alias},
|
||||
},
|
||||
federation::{self},
|
||||
federation::{self, event::get_room_state},
|
||||
},
|
||||
canonical_json::to_canonical_value,
|
||||
events::{
|
||||
|
@ -691,6 +691,121 @@ async fn join_room_by_id_helper_remote(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn fetch_state_in_background(
|
||||
services: &Services,
|
||||
room_id: OwnedRoomId,
|
||||
event_id: OwnedEventId,
|
||||
via: Vec<OwnedServerName>,
|
||||
) -> Result<()> {
|
||||
let room_version = services.rooms.state.get_room_version(&room_id).await?;
|
||||
|
||||
let mut state: HashMap<u64, OwnedEventId> = HashMap::new();
|
||||
|
||||
let mut cur_response = None;
|
||||
for server_name in via {
|
||||
cur_response = services
|
||||
.sending
|
||||
.send_federation_request(&server_name, get_room_state::v1::Request {
|
||||
room_id: room_id.clone(),
|
||||
event_id: event_id.clone(),
|
||||
})
|
||||
.await
|
||||
.ok();
|
||||
if cur_response.is_some() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
let remote_state_response = cur_response
|
||||
.ok_or_else(|| err!(BadServerResponse("Could not fetch state from any server")))?;
|
||||
|
||||
for pdu in remote_state_response.pdus.clone() {
|
||||
match services.rooms.event_handler.parse_incoming_pdu(&pdu).await {
|
||||
| Ok(t) => t,
|
||||
| Err(e) => {
|
||||
warn!("Could not parse PDU, ignoring: {e}");
|
||||
continue;
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
info!("Going through room_state response PDUs");
|
||||
for result in remote_state_response.pdus.iter().map(|pdu| {
|
||||
services
|
||||
.server_keys
|
||||
.validate_and_add_event_id(pdu, &room_version)
|
||||
}) {
|
||||
let Ok((evt_id, value)) = result.await else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let pdu = PduEvent::from_id_val(&evt_id, value.clone()).map_err(|e| {
|
||||
debug_error!("Invalid PDU in fetching remote room state PDUs response: {value:#?}");
|
||||
err!(BadServerResponse(debug_error!("Invalid PDU in send_join response: {e:?}")))
|
||||
})?;
|
||||
|
||||
services.rooms.outlier.add_pdu_outlier(&evt_id, &value);
|
||||
|
||||
if let Some(state_key) = &pdu.state_key {
|
||||
let shortstatekey = services
|
||||
.rooms
|
||||
.short
|
||||
.get_or_create_shortstatekey(&pdu.kind.to_string().into(), state_key)
|
||||
.await;
|
||||
|
||||
state.insert(shortstatekey, pdu.event_id.clone());
|
||||
}
|
||||
}
|
||||
|
||||
info!("Going through auth_chain response");
|
||||
for result in remote_state_response.auth_chain.iter().map(|pdu| {
|
||||
services
|
||||
.server_keys
|
||||
.validate_and_add_event_id(pdu, &room_version)
|
||||
}) {
|
||||
let Ok((event_id, value)) = result.await else {
|
||||
continue;
|
||||
};
|
||||
|
||||
services.rooms.outlier.add_pdu_outlier(&event_id, &value);
|
||||
}
|
||||
|
||||
let new_room_state = services
|
||||
.rooms
|
||||
.event_handler
|
||||
.resolve_state(&room_id, &room_version, state)
|
||||
.await?;
|
||||
|
||||
info!("Forcing new room state");
|
||||
let state_lock = services.rooms.state.mutex.lock(&room_id).await;
|
||||
let HashSetCompressStateEvent {
|
||||
shortstatehash: short_state_hash,
|
||||
added,
|
||||
removed,
|
||||
} = services
|
||||
.rooms
|
||||
.state_compressor
|
||||
.save_state(room_id.as_ref(), new_room_state)
|
||||
.await?;
|
||||
|
||||
services
|
||||
.rooms
|
||||
.state
|
||||
.force_state(room_id.as_ref(), short_state_hash, added, removed, &state_lock)
|
||||
.await?;
|
||||
|
||||
info!(
|
||||
"Updating joined counts for room just in case (e.g. we may have found a difference in \
|
||||
the room's m.room.member state"
|
||||
);
|
||||
services
|
||||
.rooms
|
||||
.state_cache
|
||||
.update_joined_count(&room_id)
|
||||
.await;
|
||||
drop(state_lock);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all, fields(%sender_user, %room_id), name = "join_local")]
|
||||
async fn join_room_by_id_helper_local(
|
||||
services: &Services,
|
||||
|
@ -875,6 +990,7 @@ async fn join_room_by_id_helper_local(
|
|||
// It has enough fields to be called a proper event now
|
||||
let join_event = join_event_stub;
|
||||
|
||||
info!("Performing fancy snazzy new v2 fast join");
|
||||
let send_join_response = services
|
||||
.sending
|
||||
.send_synapse_request(
|
||||
|
@ -882,7 +998,7 @@ async fn join_room_by_id_helper_local(
|
|||
federation::membership::create_join_event::v2::Request {
|
||||
room_id: room_id.to_owned(),
|
||||
event_id: event_id.clone(),
|
||||
omit_members: false,
|
||||
omit_members: true,
|
||||
pdu: services
|
||||
.sending
|
||||
.convert_to_outgoing_federation_event(join_event.clone())
|
||||
|
@ -910,6 +1026,17 @@ async fn join_room_by_id_helper_local(
|
|||
.handle_incoming_pdu(&remote_server, room_id, &signed_event_id, signed_value, true)
|
||||
.boxed()
|
||||
.await?;
|
||||
// TODO: Actually do this in the background
|
||||
// tokio::spawn(fetch_state_in_background(
|
||||
// services.clone(),
|
||||
// room_id.to_owned(),
|
||||
// event_id.clone(),
|
||||
// servers.to_vec(),
|
||||
// ));
|
||||
info!("Fetching state in background");
|
||||
fetch_state_in_background(services, room_id.to_owned(), event_id, servers.to_vec())
|
||||
.boxed()
|
||||
.await?;
|
||||
} else {
|
||||
return Err(error);
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue