diff --git a/.forgejo/workflows/release-image.yml b/.forgejo/workflows/release-image.yml index 52f5e6e0..1a0e4f4e 100644 --- a/.forgejo/workflows/release-image.yml +++ b/.forgejo/workflows/release-image.yml @@ -101,6 +101,7 @@ jobs: with: persist-credentials: false - name: Install rust + if: ${{ env.BUILDKIT_ENDPOINT == '' }} id: rust-toolchain uses: ./.forgejo/actions/rust-toolchain @@ -111,6 +112,7 @@ jobs: driver: ${{ env.BUILDKIT_ENDPOINT != '' && 'remote' || 'docker-container' }} endpoint: ${{ env.BUILDKIT_ENDPOINT || '' }} - name: Set up QEMU + if: ${{ env.BUILDKIT_ENDPOINT == '' }} uses: docker/setup-qemu-action@v3 # Uses the `docker/login-action` action to log in to the Container registry registry using the account and password that will publish the packages. Once published, the packages are scoped to the account defined here. - name: Login to builtin registry @@ -140,15 +142,21 @@ jobs: run: | calculatedSha=$(git rev-parse --short ${{ github.sha }}) echo "COMMIT_SHORT_SHA=$calculatedSha" >> $GITHUB_ENV + echo "Short SHA: $calculatedSha" - name: Get Git commit timestamps - run: echo "TIMESTAMP=$(git log -1 --pretty=%ct)" >> $GITHUB_ENV + run: | + timestamp=$(git log -1 --pretty=%ct) + echo "TIMESTAMP=$timestamp" >> $GITHUB_ENV + echo "Commit timestamp: $timestamp" - uses: ./.forgejo/actions/timelord + if: ${{ env.BUILDKIT_ENDPOINT == '' }} with: key: timelord-v0 path: . - name: Cache Rust registry + if: ${{ env.BUILDKIT_ENDPOINT == '' }} uses: actions/cache@v3 with: path: | @@ -158,6 +166,7 @@ jobs: .cargo/registry/src key: rust-registry-image-${{hashFiles('**/Cargo.lock') }} - name: Cache cargo target + if: ${{ env.BUILDKIT_ENDPOINT == '' }} id: cache-cargo-target uses: actions/cache@v3 with: @@ -165,6 +174,7 @@ jobs: cargo-target-${{ matrix.target_cpu }}-${{ matrix.slug }}-${{ matrix.profile }} key: cargo-target-${{ matrix.target_cpu }}-${{ matrix.slug }}-${{ matrix.profile }}-${{hashFiles('**/Cargo.lock') }}-${{steps.rust-toolchain.outputs.rustc_version}} - name: Cache apt cache + if: ${{ env.BUILDKIT_ENDPOINT == '' }} id: cache-apt uses: actions/cache@v3 with: @@ -172,6 +182,7 @@ jobs: var-cache-apt-${{ matrix.slug }} key: var-cache-apt-${{ matrix.slug }} - name: Cache apt lib + if: ${{ env.BUILDKIT_ENDPOINT == '' }} id: cache-apt-lib uses: actions/cache@v3 with: @@ -179,6 +190,7 @@ jobs: var-lib-apt-${{ matrix.slug }} key: var-lib-apt-${{ matrix.slug }} - name: inject cache into docker + if: ${{ env.BUILDKIT_ENDPOINT == '' }} uses: https://github.com/reproducible-containers/buildkit-cache-dance@v3.3.0 with: cache-map: | diff --git a/src/api/server/send_join.rs b/src/api/server/send_join.rs index 652451c7..d92581a1 100644 --- a/src/api/server/send_join.rs +++ b/src/api/server/send_join.rs @@ -1,12 +1,13 @@ #![allow(deprecated)] -use std::borrow::Borrow; +use std::{borrow::Borrow, time::Instant, vec}; use axum::extract::State; use conduwuit::{ - Err, Result, at, err, + Err, Event, Result, at, debug, err, info, matrix::event::gen_event_id_canonical_json, - utils::stream::{IterStream, TryBroadbandExt}, + trace, + utils::stream::{BroadbandExt, IterStream, TryBroadbandExt}, warn, }; use conduwuit_service::Services; @@ -25,12 +26,14 @@ use serde_json::value::{RawValue as RawJsonValue, to_raw_value}; use crate::Ruma; /// helper method for /send_join v1 and v2 +#[tracing::instrument(skip(services, pdu, omit_members), fields(room_id = room_id.as_str(), origin = origin.as_str()))] async fn create_join_event( services: &Services, origin: &ServerName, room_id: &RoomId, pdu: &RawJsonValue, -) -> Result { + omit_members: bool, +) -> Result { if !services.rooms.metadata.exists(room_id).await { return Err!(Request(NotFound("Room is unknown to this server."))); } @@ -53,8 +56,10 @@ async fn create_join_event( // We do not add the event_id field to the pdu here because of signature and // hashes checks + trace!("Getting room version"); let room_version_id = services.rooms.state.get_room_version(room_id).await?; + trace!("Generating event ID and converting to canonical json"); let Ok((event_id, mut value)) = gen_event_id_canonical_json(pdu, &room_version_id) else { // Event could not be converted to canonical json return Err!(Request(BadJson("Could not convert event to canonical json."))); @@ -103,7 +108,6 @@ async fn create_join_event( ))); } - // ACL check sender user server name let sender: OwnedUserId = serde_json::from_value( value .get("sender") @@ -113,12 +117,6 @@ async fn create_join_event( ) .map_err(|e| err!(Request(BadJson(warn!("sender property is not a valid user ID: {e}")))))?; - services - .rooms - .event_handler - .acl_check(sender.server_name(), room_id) - .await?; - // check if origin server is trying to send for another server if sender.server_name() != origin { return Err!(Request(Forbidden("Not allowed to join on behalf of another server."))); @@ -180,11 +178,6 @@ async fn create_join_event( } } - services - .server_keys - .hash_and_sign_event(&mut value, &room_version_id) - .map_err(|e| err!(Request(InvalidParam(warn!("Failed to sign send_join event: {e}")))))?; - let origin: OwnedServerName = serde_json::from_value( value .get("origin") @@ -194,6 +187,12 @@ async fn create_join_event( ) .map_err(|e| err!(Request(BadJson("Event has an invalid origin server name: {e}"))))?; + trace!("Signing send_join event"); + services + .server_keys + .hash_and_sign_event(&mut value, &room_version_id) + .map_err(|e| err!(Request(InvalidParam(warn!("Failed to sign send_join event: {e}")))))?; + let mutex_lock = services .rooms .event_handler @@ -201,6 +200,7 @@ async fn create_join_event( .lock(room_id) .await; + trace!("Acquired send_join mutex, persisting join event"); let pdu_id = services .rooms .event_handler @@ -210,7 +210,7 @@ async fn create_join_event( .ok_or_else(|| err!(Request(InvalidParam("Could not accept as timeline event."))))?; drop(mutex_lock); - + trace!("Fetching current state IDs"); let state_ids: Vec = services .rooms .state_accessor @@ -219,9 +219,23 @@ async fn create_join_event( .collect() .await; + trace!(%omit_members, "Constructing current state"); let state = state_ids .iter() .try_stream() + .broad_filter_map(|event_id| async move { + if omit_members { + if let Ok(e) = event_id.as_ref() { + let pdu = services.rooms.timeline.get_pdu(e).await; + if pdu.is_ok_and(|p| p.kind().to_cow_str() == "m.room.member") { + trace!("omitting member event {e:?} from returned state"); + // skip members + return None; + } + } + } + Some(event_id) + }) .broad_and_then(|event_id| services.rooms.timeline.get_pdu_json(event_id)) .broad_and_then(|pdu| { services @@ -234,6 +248,7 @@ async fn create_join_event( .await?; let starting_events = state_ids.iter().map(Borrow::borrow); + trace!("Constructing auth chain"); let auth_chain = services .rooms .auth_chain @@ -250,13 +265,37 @@ async fn create_join_event( .try_collect() .boxed() .await?; - + info!(fast_join = %omit_members, "Sending join event to other servers"); services.sending.send_pdu_room(room_id, &pdu_id).await?; - - Ok(create_join_event::v1::RoomState { + debug!("Finished sending join event"); + let servers_in_room: Option> = if !omit_members { + None + } else { + trace!("Fetching list of servers in room"); + let servers: Vec = services + .rooms + .state_cache + .room_servers(room_id) + .map(|sn| sn.as_str().to_owned()) + .collect() + .await; + // If there's no servers, just add us + let servers = if servers.is_empty() { + warn!("Failed to find any servers, adding our own server name as a last resort"); + vec![services.globals.server_name().to_string()] + } else { + trace!("Found {} servers in room", servers.len()); + servers + }; + Some(servers) + }; + debug!("Returning send_join data"); + Ok(create_join_event::v2::RoomState { auth_chain, state, event: to_raw_value(&CanonicalJsonValue::Object(value)).ok(), + members_omitted: omit_members, + servers_in_room, }) } @@ -294,11 +333,23 @@ pub(crate) async fn create_join_event_v1_route( } } - let room_state = create_join_event(&services, body.origin(), &body.room_id, &body.pdu) + let now = Instant::now(); + let room_state = create_join_event(&services, body.origin(), &body.room_id, &body.pdu, false) .boxed() .await?; + let transformed = create_join_event::v1::RoomState { + auth_chain: room_state.auth_chain, + state: room_state.state, + event: room_state.event, + }; + info!( + "Finished sending a join for {} in {} in {:?}", + body.origin(), + &body.room_id, + now.elapsed() + ); - Ok(create_join_event::v1::Response { room_state }) + Ok(create_join_event::v1::Response { room_state: transformed }) } /// # `PUT /_matrix/federation/v2/send_join/{roomId}/{eventId}` @@ -329,17 +380,17 @@ pub(crate) async fn create_join_event_v2_route( } } - let create_join_event::v1::RoomState { auth_chain, state, event } = - create_join_event(&services, body.origin(), &body.room_id, &body.pdu) + let now = Instant::now(); + let room_state = + create_join_event(&services, body.origin(), &body.room_id, &body.pdu, body.omit_members) .boxed() .await?; - let room_state = create_join_event::v2::RoomState { - members_omitted: false, - auth_chain, - state, - event, - servers_in_room: None, - }; + info!( + "Finished sending a join for {} in {} in {:?}", + body.origin(), + &body.room_id, + now.elapsed() + ); Ok(create_join_event::v2::Response { room_state }) } diff --git a/src/service/rooms/state_compressor/mod.rs b/src/service/rooms/state_compressor/mod.rs index f7f7d043..028c3e51 100644 --- a/src/service/rooms/state_compressor/mod.rs +++ b/src/service/rooms/state_compressor/mod.rs @@ -452,7 +452,7 @@ async fn get_statediff(&self, shortstatehash: ShortStateHash) -> Result