mirror of
https://forgejo.ellis.link/continuwuation/continuwuity.git
synced 2025-09-11 01:32:49 +02:00
Compare commits
No commits in common. "09de586dc702b9adcb3298e974470066f198c43b" and "6cf3c839e4b2b6906f4017a3f77591937f37af6e" have entirely different histories.
09de586dc7
...
6cf3c839e4
2 changed files with 32 additions and 83 deletions
|
@ -1,13 +1,12 @@
|
|||
#![allow(deprecated)]
|
||||
|
||||
use std::{borrow::Borrow, time::Instant, vec};
|
||||
use std::borrow::Borrow;
|
||||
|
||||
use axum::extract::State;
|
||||
use conduwuit::{
|
||||
Err, Event, Result, at, debug, err, info,
|
||||
Err, Result, at, err,
|
||||
matrix::event::gen_event_id_canonical_json,
|
||||
trace,
|
||||
utils::stream::{BroadbandExt, IterStream, TryBroadbandExt},
|
||||
utils::stream::{IterStream, TryBroadbandExt},
|
||||
warn,
|
||||
};
|
||||
use conduwuit_service::Services;
|
||||
|
@ -26,14 +25,12 @@ use serde_json::value::{RawValue as RawJsonValue, to_raw_value};
|
|||
use crate::Ruma;
|
||||
|
||||
/// helper method for /send_join v1 and v2
|
||||
#[tracing::instrument(skip(services, pdu, omit_members), fields(room_id = room_id.as_str(), origin = origin.as_str()))]
|
||||
async fn create_join_event(
|
||||
services: &Services,
|
||||
origin: &ServerName,
|
||||
room_id: &RoomId,
|
||||
pdu: &RawJsonValue,
|
||||
omit_members: bool,
|
||||
) -> Result<create_join_event::v2::RoomState> {
|
||||
) -> Result<create_join_event::v1::RoomState> {
|
||||
if !services.rooms.metadata.exists(room_id).await {
|
||||
return Err!(Request(NotFound("Room is unknown to this server.")));
|
||||
}
|
||||
|
@ -56,10 +53,8 @@ async fn create_join_event(
|
|||
|
||||
// We do not add the event_id field to the pdu here because of signature and
|
||||
// hashes checks
|
||||
trace!("Getting room version");
|
||||
let room_version_id = services.rooms.state.get_room_version(room_id).await?;
|
||||
|
||||
trace!("Generating event ID and converting to canonical json");
|
||||
let Ok((event_id, mut value)) = gen_event_id_canonical_json(pdu, &room_version_id) else {
|
||||
// Event could not be converted to canonical json
|
||||
return Err!(Request(BadJson("Could not convert event to canonical json.")));
|
||||
|
@ -108,6 +103,7 @@ async fn create_join_event(
|
|||
)));
|
||||
}
|
||||
|
||||
// ACL check sender user server name
|
||||
let sender: OwnedUserId = serde_json::from_value(
|
||||
value
|
||||
.get("sender")
|
||||
|
@ -117,6 +113,12 @@ async fn create_join_event(
|
|||
)
|
||||
.map_err(|e| err!(Request(BadJson(warn!("sender property is not a valid user ID: {e}")))))?;
|
||||
|
||||
services
|
||||
.rooms
|
||||
.event_handler
|
||||
.acl_check(sender.server_name(), room_id)
|
||||
.await?;
|
||||
|
||||
// check if origin server is trying to send for another server
|
||||
if sender.server_name() != origin {
|
||||
return Err!(Request(Forbidden("Not allowed to join on behalf of another server.")));
|
||||
|
@ -178,6 +180,11 @@ async fn create_join_event(
|
|||
}
|
||||
}
|
||||
|
||||
services
|
||||
.server_keys
|
||||
.hash_and_sign_event(&mut value, &room_version_id)
|
||||
.map_err(|e| err!(Request(InvalidParam(warn!("Failed to sign send_join event: {e}")))))?;
|
||||
|
||||
let origin: OwnedServerName = serde_json::from_value(
|
||||
value
|
||||
.get("origin")
|
||||
|
@ -187,12 +194,6 @@ async fn create_join_event(
|
|||
)
|
||||
.map_err(|e| err!(Request(BadJson("Event has an invalid origin server name: {e}"))))?;
|
||||
|
||||
trace!("Signing send_join event");
|
||||
services
|
||||
.server_keys
|
||||
.hash_and_sign_event(&mut value, &room_version_id)
|
||||
.map_err(|e| err!(Request(InvalidParam(warn!("Failed to sign send_join event: {e}")))))?;
|
||||
|
||||
let mutex_lock = services
|
||||
.rooms
|
||||
.event_handler
|
||||
|
@ -200,7 +201,6 @@ async fn create_join_event(
|
|||
.lock(room_id)
|
||||
.await;
|
||||
|
||||
trace!("Acquired send_join mutex, persisting join event");
|
||||
let pdu_id = services
|
||||
.rooms
|
||||
.event_handler
|
||||
|
@ -210,7 +210,7 @@ async fn create_join_event(
|
|||
.ok_or_else(|| err!(Request(InvalidParam("Could not accept as timeline event."))))?;
|
||||
|
||||
drop(mutex_lock);
|
||||
trace!("Fetching current state IDs");
|
||||
|
||||
let state_ids: Vec<OwnedEventId> = services
|
||||
.rooms
|
||||
.state_accessor
|
||||
|
@ -219,23 +219,9 @@ async fn create_join_event(
|
|||
.collect()
|
||||
.await;
|
||||
|
||||
trace!(%omit_members, "Constructing current state");
|
||||
let state = state_ids
|
||||
.iter()
|
||||
.try_stream()
|
||||
.broad_filter_map(|event_id| async move {
|
||||
if omit_members {
|
||||
if let Ok(e) = event_id.as_ref() {
|
||||
let pdu = services.rooms.timeline.get_pdu(e).await;
|
||||
if pdu.is_ok_and(|p| p.kind().to_cow_str() == "m.room.member") {
|
||||
trace!("omitting member event {e:?} from returned state");
|
||||
// skip members
|
||||
return None;
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(event_id)
|
||||
})
|
||||
.broad_and_then(|event_id| services.rooms.timeline.get_pdu_json(event_id))
|
||||
.broad_and_then(|pdu| {
|
||||
services
|
||||
|
@ -248,7 +234,6 @@ async fn create_join_event(
|
|||
.await?;
|
||||
|
||||
let starting_events = state_ids.iter().map(Borrow::borrow);
|
||||
trace!("Constructing auth chain");
|
||||
let auth_chain = services
|
||||
.rooms
|
||||
.auth_chain
|
||||
|
@ -265,37 +250,13 @@ async fn create_join_event(
|
|||
.try_collect()
|
||||
.boxed()
|
||||
.await?;
|
||||
info!(fast_join = %omit_members, "Sending join event to other servers");
|
||||
|
||||
services.sending.send_pdu_room(room_id, &pdu_id).await?;
|
||||
debug!("Finished sending join event");
|
||||
let servers_in_room: Option<Vec<_>> = if !omit_members {
|
||||
None
|
||||
} else {
|
||||
trace!("Fetching list of servers in room");
|
||||
let servers: Vec<String> = services
|
||||
.rooms
|
||||
.state_cache
|
||||
.room_servers(room_id)
|
||||
.map(|sn| sn.as_str().to_owned())
|
||||
.collect()
|
||||
.await;
|
||||
// If there's no servers, just add us
|
||||
let servers = if servers.is_empty() {
|
||||
warn!("Failed to find any servers, adding our own server name as a last resort");
|
||||
vec![services.globals.server_name().to_string()]
|
||||
} else {
|
||||
trace!("Found {} servers in room", servers.len());
|
||||
servers
|
||||
};
|
||||
Some(servers)
|
||||
};
|
||||
debug!("Returning send_join data");
|
||||
Ok(create_join_event::v2::RoomState {
|
||||
|
||||
Ok(create_join_event::v1::RoomState {
|
||||
auth_chain,
|
||||
state,
|
||||
event: to_raw_value(&CanonicalJsonValue::Object(value)).ok(),
|
||||
members_omitted: omit_members,
|
||||
servers_in_room,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -333,23 +294,11 @@ pub(crate) async fn create_join_event_v1_route(
|
|||
}
|
||||
}
|
||||
|
||||
let now = Instant::now();
|
||||
let room_state = create_join_event(&services, body.origin(), &body.room_id, &body.pdu, false)
|
||||
let room_state = create_join_event(&services, body.origin(), &body.room_id, &body.pdu)
|
||||
.boxed()
|
||||
.await?;
|
||||
let transformed = create_join_event::v1::RoomState {
|
||||
auth_chain: room_state.auth_chain,
|
||||
state: room_state.state,
|
||||
event: room_state.event,
|
||||
};
|
||||
info!(
|
||||
"Finished sending a join for {} in {} in {:?}",
|
||||
body.origin(),
|
||||
&body.room_id,
|
||||
now.elapsed()
|
||||
);
|
||||
|
||||
Ok(create_join_event::v1::Response { room_state: transformed })
|
||||
Ok(create_join_event::v1::Response { room_state })
|
||||
}
|
||||
|
||||
/// # `PUT /_matrix/federation/v2/send_join/{roomId}/{eventId}`
|
||||
|
@ -380,17 +329,17 @@ pub(crate) async fn create_join_event_v2_route(
|
|||
}
|
||||
}
|
||||
|
||||
let now = Instant::now();
|
||||
let room_state =
|
||||
create_join_event(&services, body.origin(), &body.room_id, &body.pdu, body.omit_members)
|
||||
let create_join_event::v1::RoomState { auth_chain, state, event } =
|
||||
create_join_event(&services, body.origin(), &body.room_id, &body.pdu)
|
||||
.boxed()
|
||||
.await?;
|
||||
info!(
|
||||
"Finished sending a join for {} in {} in {:?}",
|
||||
body.origin(),
|
||||
&body.room_id,
|
||||
now.elapsed()
|
||||
);
|
||||
let room_state = create_join_event::v2::RoomState {
|
||||
members_omitted: false,
|
||||
auth_chain,
|
||||
state,
|
||||
event,
|
||||
servers_in_room: None,
|
||||
};
|
||||
|
||||
Ok(create_join_event::v2::Response { room_state })
|
||||
}
|
||||
|
|
|
@ -452,7 +452,7 @@ async fn get_statediff(&self, shortstatehash: ShortStateHash) -> Result<StateDif
|
|||
.ok()
|
||||
.take_if(|parent| *parent != 0);
|
||||
|
||||
debug_assert!(value.len().is_multiple_of(STRIDE), "value not aligned to stride");
|
||||
debug_assert!(value.len() % STRIDE == 0, "value not aligned to stride");
|
||||
let _num_values = value.len() / STRIDE;
|
||||
|
||||
let mut add_mode = true;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue