diff --git a/.forgejo/actions/detect-runner-os/action.yml b/.forgejo/actions/detect-runner-os/action.yml index 6ada1d5d..be8f9b0a 100644 --- a/.forgejo/actions/detect-runner-os/action.yml +++ b/.forgejo/actions/detect-runner-os/action.yml @@ -13,6 +13,12 @@ outputs: slug: description: 'Combined OS slug (e.g. Ubuntu-22.04)' value: ${{ steps.detect.outputs.slug }} + node_major: + description: 'Major version of Node.js if available (e.g. 22)' + value: ${{ steps.detect.outputs.node_major }} + node_version: + description: 'Full Node.js version if available (e.g. 22.19.0)' + value: ${{ steps.detect.outputs.node_version }} runs: using: composite @@ -30,7 +36,20 @@ runs: # Create combined slug OS_SLUG="${OS_NAME}-${OS_VERSION}" - # Set outputs + # Detect Node.js version if available + if command -v node >/dev/null 2>&1; then + NODE_VERSION=$(node --version | sed 's/v//') + NODE_MAJOR=$(echo $NODE_VERSION | cut -d. -f1) + echo "node_version=${NODE_VERSION}" >> $GITHUB_OUTPUT + echo "node_major=${NODE_MAJOR}" >> $GITHUB_OUTPUT + echo "🔍 Detected Node.js: v${NODE_VERSION}" + else + echo "node_version=" >> $GITHUB_OUTPUT + echo "node_major=" >> $GITHUB_OUTPUT + echo "🔍 Node.js not found" + fi + + # Set OS outputs echo "name=${OS_NAME}" >> $GITHUB_OUTPUT echo "version=${OS_VERSION}" >> $GITHUB_OUTPUT echo "slug=${OS_SLUG}" >> $GITHUB_OUTPUT diff --git a/.forgejo/actions/sccache/action.yml b/.forgejo/actions/sccache/action.yml index b5e5dcf4..b2441109 100644 --- a/.forgejo/actions/sccache/action.yml +++ b/.forgejo/actions/sccache/action.yml @@ -2,18 +2,12 @@ name: sccache description: | Install sccache for caching builds in GitHub Actions. -inputs: - token: - description: 'A Github PAT' - required: false runs: using: composite steps: - name: Install sccache - uses: https://github.com/mozilla-actions/sccache-action@v0.0.9 - with: - token: ${{ inputs.token }} + uses: https://git.tomfos.tr/tom/sccache-action@v1 - name: Configure sccache uses: https://github.com/actions/github-script@v7 with: diff --git a/.forgejo/actions/setup-rust/action.yml b/.forgejo/actions/setup-rust/action.yml index 091da8c2..a8736a75 100644 --- a/.forgejo/actions/setup-rust/action.yml +++ b/.forgejo/actions/setup-rust/action.yml @@ -88,19 +88,9 @@ runs: # Shared toolchain cache across all Rust versions key: toolchain-${{ steps.runner-os.outputs.slug }} - - name: Debug GitHub token availability - shell: bash - run: | - if [ -z "${{ inputs.github-token }}" ]; then - echo "⚠️ No GitHub token provided - sccache will use fallback download method" - else - echo "✅ GitHub token provided for sccache" - fi - name: Setup sccache - uses: https://github.com/mozilla-actions/sccache-action@v0.0.9 - with: - token: ${{ inputs.github-token }} + uses: https://git.tomfos.tr/tom/sccache-action@v1 - name: Cache build artifacts id: build-cache diff --git a/.forgejo/workflows/documentation.yml b/.forgejo/workflows/documentation.yml index caa0e1e2..dc0dd4c1 100644 --- a/.forgejo/workflows/documentation.yml +++ b/.forgejo/workflows/documentation.yml @@ -49,10 +49,23 @@ jobs: cp ./docs/static/_headers ./public/_headers echo "Copied .well-known files and _headers to ./public" + - name: Detect runner environment + id: runner-env + uses: ./.forgejo/actions/detect-runner-os + - name: Setup Node.js + if: steps.runner-env.outputs.node_major == '' || steps.runner-env.outputs.node_major < '20' uses: https://github.com/actions/setup-node@v5 with: - node-version: 20 + node-version: 22 + + - name: Cache npm dependencies + uses: actions/cache@v3 + with: + path: ~/.npm + key: ${{ steps.runner-env.outputs.slug }}-node-${{ hashFiles('**/package-lock.json') }} + restore-keys: | + ${{ steps.runner-env.outputs.slug }}-node- - name: Install dependencies run: npm install --save-dev wrangler@latest diff --git a/.forgejo/workflows/release-image.yml b/.forgejo/workflows/release-image.yml index 7b29b7ca..b7423567 100644 --- a/.forgejo/workflows/release-image.yml +++ b/.forgejo/workflows/release-image.yml @@ -11,7 +11,6 @@ on: - ".gitignore" - "renovate.json" - "pkg/**" - - "docker/**" - "docs/**" push: branches: @@ -23,7 +22,6 @@ on: - ".gitignore" - "renovate.json" - "pkg/**" - - "docker/**" - "docs/**" # Allows you to run this workflow manually from the Actions tab workflow_dispatch: @@ -55,6 +53,9 @@ jobs: let images = [] if (process.env.BUILTIN_REGISTRY_ENABLED === "true") { images.push(builtinImage) + } else { + // Fallback to official registry for forks/PRs without credentials + images.push('forgejo.ellis.link/continuwuation/continuwuity') } core.setOutput('images', images.join("\n")) core.setOutput('images_list', images.join(",")) @@ -100,6 +101,7 @@ jobs: with: persist-credentials: false - name: Install rust + if: ${{ env.BUILDKIT_ENDPOINT == '' }} id: rust-toolchain uses: ./.forgejo/actions/rust-toolchain @@ -110,9 +112,11 @@ jobs: driver: ${{ env.BUILDKIT_ENDPOINT != '' && 'remote' || 'docker-container' }} endpoint: ${{ env.BUILDKIT_ENDPOINT || '' }} - name: Set up QEMU + if: ${{ env.BUILDKIT_ENDPOINT == '' }} uses: docker/setup-qemu-action@v3 # Uses the `docker/login-action` action to log in to the Container registry registry using the account and password that will publish the packages. Once published, the packages are scoped to the account defined here. - name: Login to builtin registry + if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }} uses: docker/login-action@v3 with: registry: ${{ env.BUILTIN_REGISTRY }} @@ -138,15 +142,21 @@ jobs: run: | calculatedSha=$(git rev-parse --short ${{ github.sha }}) echo "COMMIT_SHORT_SHA=$calculatedSha" >> $GITHUB_ENV + echo "Short SHA: $calculatedSha" - name: Get Git commit timestamps - run: echo "TIMESTAMP=$(git log -1 --pretty=%ct)" >> $GITHUB_ENV + run: | + timestamp=$(git log -1 --pretty=%ct) + echo "TIMESTAMP=$timestamp" >> $GITHUB_ENV + echo "Commit timestamp: $timestamp" - uses: ./.forgejo/actions/timelord + if: ${{ env.BUILDKIT_ENDPOINT == '' }} with: key: timelord-v0 path: . - name: Cache Rust registry + if: ${{ env.BUILDKIT_ENDPOINT == '' }} uses: actions/cache@v3 with: path: | @@ -156,6 +166,7 @@ jobs: .cargo/registry/src key: rust-registry-image-${{hashFiles('**/Cargo.lock') }} - name: Cache cargo target + if: ${{ env.BUILDKIT_ENDPOINT == '' }} id: cache-cargo-target uses: actions/cache@v3 with: @@ -163,6 +174,7 @@ jobs: cargo-target-${{ matrix.target_cpu }}-${{ matrix.slug }}-${{ matrix.profile }} key: cargo-target-${{ matrix.target_cpu }}-${{ matrix.slug }}-${{ matrix.profile }}-${{hashFiles('**/Cargo.lock') }}-${{steps.rust-toolchain.outputs.rustc_version}} - name: Cache apt cache + if: ${{ env.BUILDKIT_ENDPOINT == '' }} id: cache-apt uses: actions/cache@v3 with: @@ -170,6 +182,7 @@ jobs: var-cache-apt-${{ matrix.slug }} key: var-cache-apt-${{ matrix.slug }} - name: Cache apt lib + if: ${{ env.BUILDKIT_ENDPOINT == '' }} id: cache-apt-lib uses: actions/cache@v3 with: @@ -177,6 +190,7 @@ jobs: var-lib-apt-${{ matrix.slug }} key: var-lib-apt-${{ matrix.slug }} - name: inject cache into docker + if: ${{ env.BUILDKIT_ENDPOINT == '' }} uses: https://github.com/reproducible-containers/buildkit-cache-dance@v3.3.0 with: cache-map: | @@ -199,7 +213,7 @@ jobs: context: . file: "docker/Dockerfile" build-args: | - GIT_COMMIT_HASH=${{ github.sha }}) + GIT_COMMIT_HASH=${{ github.sha }} GIT_COMMIT_HASH_SHORT=${{ env.COMMIT_SHORT_SHA }} GIT_REMOTE_URL=${{github.event.repository.html_url }} GIT_REMOTE_COMMIT_URL=${{github.event.head_commit.url }} @@ -209,27 +223,23 @@ jobs: cache-from: type=gha # cache-to: type=gha,mode=max sbom: true - outputs: type=image,"name=${{ needs.define-variables.outputs.images_list }}",push-by-digest=true,name-canonical=true,push=true + outputs: | + ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' && format('type=image,"name={0}",push-by-digest=true,name-canonical=true,push=true', needs.define-variables.outputs.images_list) || format('type=image,"name={0}",push=false', needs.define-variables.outputs.images_list) }} + type=local,dest=/tmp/binaries env: SOURCE_DATE_EPOCH: ${{ env.TIMESTAMP }} # For publishing multi-platform manifests - name: Export digest + if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }} run: | mkdir -p /tmp/digests digest="${{ steps.build.outputs.digest }}" touch "/tmp/digests/${digest#sha256:}" - - name: Extract binary from container (image) - id: extract-binary-image - run: | - mkdir -p /tmp/binaries - digest="${{ steps.build.outputs.digest }}" - echo "container_id=$(docker create --platform ${{ matrix.platform }} ${{ needs.define-variables.outputs.images_list }}@$digest)" >> $GITHUB_OUTPUT - - name: Extract binary from container (copy) - run: docker cp ${{ steps.extract-binary-image.outputs.container_id }}:/sbin/conduwuit /tmp/binaries/conduwuit-${{ matrix.target_cpu }}-${{ matrix.slug }}-${{ matrix.profile }} - - name: Extract binary from container (cleanup) - run: docker rm ${{ steps.extract-binary-image.outputs.container_id }} + # Binary extracted via local output for all builds + - name: Rename extracted binary + run: mv /tmp/binaries/sbin/conduwuit /tmp/binaries/conduwuit-${{ matrix.target_cpu }}-${{ matrix.slug }}-${{ matrix.profile }} - name: Upload binary artifact uses: forgejo/upload-artifact@v4 @@ -239,6 +249,7 @@ jobs: if-no-files-found: error - name: Upload digest + if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }} uses: forgejo/upload-artifact@v4 with: name: digests-${{ matrix.slug }} @@ -251,6 +262,7 @@ jobs: needs: [define-variables, build-image] steps: - name: Download digests + if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }} uses: forgejo/download-artifact@v4 with: path: /tmp/digests @@ -258,6 +270,7 @@ jobs: merge-multiple: true # Uses the `docker/login-action` action to log in to the Container registry registry using the account and password that will publish the packages. Once published, the packages are scoped to the account defined here. - name: Login to builtin registry + if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }} uses: docker/login-action@v3 with: registry: ${{ env.BUILTIN_REGISTRY }} @@ -265,6 +278,7 @@ jobs: password: ${{ secrets.BUILTIN_REGISTRY_PASSWORD || secrets.GITHUB_TOKEN }} - name: Set up Docker Buildx + if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }} uses: docker/setup-buildx-action@v3 with: # Use persistent BuildKit if BUILDKIT_ENDPOINT is set (e.g. tcp://buildkit:8125) @@ -272,6 +286,7 @@ jobs: endpoint: ${{ env.BUILDKIT_ENDPOINT || '' }} - name: Extract metadata (tags) for Docker + if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }} id: meta uses: docker/metadata-action@v5 with: @@ -289,6 +304,7 @@ jobs: DOCKER_METADATA_ANNOTATIONS_LEVELS: index - name: Create manifest list and push + if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }} working-directory: /tmp/digests env: IMAGES: ${{needs.define-variables.outputs.images}} @@ -306,6 +322,7 @@ jobs: done - name: Inspect image + if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }} env: IMAGES: ${{needs.define-variables.outputs.images}} shell: bash diff --git a/.vscode/settings.json b/.vscode/settings.json index a4fad964..82162ff7 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -7,5 +7,6 @@ "continuwuity", "homeserver", "homeservers" - ] + ], + "rust-analyzer.cargo.features": ["full"] } diff --git a/docker/Dockerfile b/docker/Dockerfile index 55150902..17e1c6a1 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -199,32 +199,57 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \ EOF # Extract dynamically linked dependencies -RUN </dev/null) && [ -n "$lddtree_output" ]; then + echo "$lddtree_output" | awk '{print $(NF-0) " " $1}' | sort -u -k 1,1 | \ + awk '{dest = ($2 ~ /^\//) ? "/out/libs-root" $2 : "/out/libs/" $2; print "install -D " $1 " " dest}' | \ + while read cmd; do eval "$cmd"; done + fi done -EOF + + # Show what will be copied to runtime + echo "=== Libraries being copied to runtime image:" + find /out/libs* -type f 2>/dev/null | sort || echo "No libraries found" +DEPS_EOF + +FROM ubuntu:latest AS prepper + +# Create layer structure +RUN mkdir -p /layer1/etc/ssl/certs \ + /layer2/usr/lib \ + /layer3/sbin /layer3/sbom + +# Copy SSL certs and root-path libraries to layer1 (ultra-stable) +COPY --from=base /etc/ssl/certs /layer1/etc/ssl/certs +COPY --from=builder /out/libs-root/ /layer1/ + +# Copy application libraries to layer2 (semi-stable) +COPY --from=builder /out/libs/ /layer2/usr/lib/ + +# Copy binaries and SBOM to layer3 (volatile) +COPY --from=builder /out/sbin/ /layer3/sbin/ +COPY --from=builder /out/sbom/ /layer3/sbom/ + +# Fix permissions after copying +RUN chmod -R 755 /layer1 /layer2 /layer3 FROM scratch WORKDIR / -# Copy root certs for tls into image -# You can also mount the certs from the host -# --volume /etc/ssl/certs:/etc/ssl/certs:ro -COPY --from=base /etc/ssl/certs /etc/ssl/certs +# Copy ultra-stable layer (SSL certs, system libraries) +COPY --from=prepper /layer1/ / -# Copy our build -COPY --from=builder /out/sbin/ /sbin/ -# Copy SBOM -COPY --from=builder /out/sbom/ /sbom/ +# Copy semi-stable layer (application libraries) +COPY --from=prepper /layer2/ / -# Copy dynamic libraries to root -COPY --from=builder /out/libs-root/ / -COPY --from=builder /out/libs/ /usr/lib/ +# Copy volatile layer (binaries, SBOM) +COPY --from=prepper /layer3/ / # Inform linker where to find libraries ENV LD_LIBRARY_PATH=/usr/lib diff --git a/src/admin/debug/commands.rs b/src/admin/debug/commands.rs index 81b0e9da..64f68330 100644 --- a/src/admin/debug/commands.rs +++ b/src/admin/debug/commands.rs @@ -281,15 +281,8 @@ pub(super) async fn get_remote_pdu( vec![(event_id, value, room_id)] }; - info!("Attempting to handle event ID {event_id} as backfilled PDU"); - self.services - .rooms - .timeline - .backfill_pdu(&server, response.pdu) - .await?; - let text = serde_json::to_string_pretty(&json)?; - let msg = "Got PDU from specified server and handled as backfilled"; + let msg = "Got PDU from specified server:"; write!(self, "{msg}. Event body:\n```json\n{text}\n```") }, } diff --git a/src/admin/query/room_timeline.rs b/src/admin/query/room_timeline.rs index afcfec34..0f72b58c 100644 --- a/src/admin/query/room_timeline.rs +++ b/src/admin/query/room_timeline.rs @@ -57,5 +57,5 @@ pub(super) async fn pdus( .try_collect() .await?; - self.write_str(&format!("{result:#?}")).await + self.write_str(&format!("```\n{result:#?}\n```")).await } diff --git a/src/api/client/message.rs b/src/api/client/message.rs index f5a951f4..842036f5 100644 --- a/src/api/client/message.rs +++ b/src/api/client/message.rs @@ -35,7 +35,6 @@ use ruma::{ }; use tracing::warn; -use super::utils::{count_to_token, parse_pagination_token as parse_token}; use crate::Ruma; /// list of safe and common non-state events to ignore if the user is ignored @@ -85,14 +84,14 @@ pub(crate) async fn get_message_events_route( let from: PduCount = body .from .as_deref() - .map(parse_token) + .map(str::parse) .transpose()? .unwrap_or_else(|| match body.dir { | Direction::Forward => PduCount::min(), | Direction::Backward => PduCount::max(), }); - let to: Option = body.to.as_deref().map(parse_token).transpose()?; + let to: Option = body.to.as_deref().map(str::parse).transpose()?; let limit: usize = body .limit @@ -181,8 +180,8 @@ pub(crate) async fn get_message_events_route( .collect(); Ok(get_message_events::v3::Response { - start: count_to_token(from), - end: next_token.map(count_to_token), + start: from.to_string(), + end: next_token.as_ref().map(PduCount::to_string), chunk, state, }) diff --git a/src/api/client/mod.rs b/src/api/client/mod.rs index c8ca7757..0014282c 100644 --- a/src/api/client/mod.rs +++ b/src/api/client/mod.rs @@ -37,7 +37,6 @@ pub(super) mod typing; pub(super) mod unstable; pub(super) mod unversioned; pub(super) mod user_directory; -pub(super) mod utils; pub(super) mod voip; pub(super) mod well_known; diff --git a/src/api/client/relations.rs b/src/api/client/relations.rs index f6d8fe9e..79fa1459 100644 --- a/src/api/client/relations.rs +++ b/src/api/client/relations.rs @@ -18,7 +18,6 @@ use ruma::{ events::{TimelineEventType, relation::RelationType}, }; -use super::utils::{count_to_token, parse_pagination_token as parse_token}; use crate::Ruma; /// # `GET /_matrix/client/r0/rooms/{roomId}/relations/{eventId}/{relType}/{eventType}` @@ -111,14 +110,14 @@ async fn paginate_relations_with_filter( dir: Direction, ) -> Result { let start: PduCount = from - .map(parse_token) + .map(str::parse) .transpose()? .unwrap_or_else(|| match dir { | Direction::Forward => PduCount::min(), | Direction::Backward => PduCount::max(), }); - let to: Option = to.map(parse_token).transpose()?; + let to: Option = to.map(str::parse).transpose()?; // Use limit or else 30, with maximum 100 let limit: usize = limit @@ -193,7 +192,7 @@ async fn paginate_relations_with_filter( | Direction::Forward => events.last(), | Direction::Backward => events.first(), } - .map(|(count, _)| count_to_token(*count)) + .map(|(count, _)| count.to_string()) } else { None }; diff --git a/src/api/client/room/event.rs b/src/api/client/room/event.rs index 47228d67..75ae0758 100644 --- a/src/api/client/room/event.rs +++ b/src/api/client/room/event.rs @@ -18,7 +18,7 @@ pub(crate) async fn get_room_event_route( let event = services .rooms .timeline - .get_pdu(event_id) + .get_remote_pdu(room_id, event_id) .map_err(|_| err!(Request(NotFound("Event {} not found.", event_id)))); let visible = services @@ -33,11 +33,6 @@ pub(crate) async fn get_room_event_route( return Err!(Request(Forbidden("You don't have permission to view this event."))); } - debug_assert!( - event.event_id() == event_id && event.room_id() == room_id, - "Fetched PDU must match requested" - ); - event.add_age().ok(); Ok(get_room_event::v3::Response { event: event.into_format() }) diff --git a/src/api/client/utils.rs b/src/api/client/utils.rs deleted file mode 100644 index cc941b95..00000000 --- a/src/api/client/utils.rs +++ /dev/null @@ -1,28 +0,0 @@ -use conduwuit::{ - Result, err, - matrix::pdu::{PduCount, ShortEventId}, -}; - -/// Parse a pagination token, trying ShortEventId first, then falling back to -/// PduCount -pub(crate) fn parse_pagination_token(token: &str) -> Result { - // Try parsing as ShortEventId first - if let Ok(shorteventid) = token.parse::() { - // ShortEventId maps directly to a PduCount in our database - Ok(PduCount::Normal(shorteventid)) - } else if let Ok(count) = token.parse::() { - // Fallback to PduCount for backwards compatibility - Ok(PduCount::Normal(count)) - } else if let Ok(count) = token.parse::() { - // Also handle negative counts for backfilled events - Ok(PduCount::from_signed(count)) - } else { - Err(err!(Request(InvalidParam("Invalid pagination token")))) - } -} - -/// Convert a PduCount to a token string (using the underlying ShortEventId) -pub(crate) fn count_to_token(count: PduCount) -> String { - // The PduCount's unsigned value IS the ShortEventId - count.into_unsigned().to_string() -} diff --git a/src/api/server/send_join.rs b/src/api/server/send_join.rs index 652451c7..d92581a1 100644 --- a/src/api/server/send_join.rs +++ b/src/api/server/send_join.rs @@ -1,12 +1,13 @@ #![allow(deprecated)] -use std::borrow::Borrow; +use std::{borrow::Borrow, time::Instant, vec}; use axum::extract::State; use conduwuit::{ - Err, Result, at, err, + Err, Event, Result, at, debug, err, info, matrix::event::gen_event_id_canonical_json, - utils::stream::{IterStream, TryBroadbandExt}, + trace, + utils::stream::{BroadbandExt, IterStream, TryBroadbandExt}, warn, }; use conduwuit_service::Services; @@ -25,12 +26,14 @@ use serde_json::value::{RawValue as RawJsonValue, to_raw_value}; use crate::Ruma; /// helper method for /send_join v1 and v2 +#[tracing::instrument(skip(services, pdu, omit_members), fields(room_id = room_id.as_str(), origin = origin.as_str()))] async fn create_join_event( services: &Services, origin: &ServerName, room_id: &RoomId, pdu: &RawJsonValue, -) -> Result { + omit_members: bool, +) -> Result { if !services.rooms.metadata.exists(room_id).await { return Err!(Request(NotFound("Room is unknown to this server."))); } @@ -53,8 +56,10 @@ async fn create_join_event( // We do not add the event_id field to the pdu here because of signature and // hashes checks + trace!("Getting room version"); let room_version_id = services.rooms.state.get_room_version(room_id).await?; + trace!("Generating event ID and converting to canonical json"); let Ok((event_id, mut value)) = gen_event_id_canonical_json(pdu, &room_version_id) else { // Event could not be converted to canonical json return Err!(Request(BadJson("Could not convert event to canonical json."))); @@ -103,7 +108,6 @@ async fn create_join_event( ))); } - // ACL check sender user server name let sender: OwnedUserId = serde_json::from_value( value .get("sender") @@ -113,12 +117,6 @@ async fn create_join_event( ) .map_err(|e| err!(Request(BadJson(warn!("sender property is not a valid user ID: {e}")))))?; - services - .rooms - .event_handler - .acl_check(sender.server_name(), room_id) - .await?; - // check if origin server is trying to send for another server if sender.server_name() != origin { return Err!(Request(Forbidden("Not allowed to join on behalf of another server."))); @@ -180,11 +178,6 @@ async fn create_join_event( } } - services - .server_keys - .hash_and_sign_event(&mut value, &room_version_id) - .map_err(|e| err!(Request(InvalidParam(warn!("Failed to sign send_join event: {e}")))))?; - let origin: OwnedServerName = serde_json::from_value( value .get("origin") @@ -194,6 +187,12 @@ async fn create_join_event( ) .map_err(|e| err!(Request(BadJson("Event has an invalid origin server name: {e}"))))?; + trace!("Signing send_join event"); + services + .server_keys + .hash_and_sign_event(&mut value, &room_version_id) + .map_err(|e| err!(Request(InvalidParam(warn!("Failed to sign send_join event: {e}")))))?; + let mutex_lock = services .rooms .event_handler @@ -201,6 +200,7 @@ async fn create_join_event( .lock(room_id) .await; + trace!("Acquired send_join mutex, persisting join event"); let pdu_id = services .rooms .event_handler @@ -210,7 +210,7 @@ async fn create_join_event( .ok_or_else(|| err!(Request(InvalidParam("Could not accept as timeline event."))))?; drop(mutex_lock); - + trace!("Fetching current state IDs"); let state_ids: Vec = services .rooms .state_accessor @@ -219,9 +219,23 @@ async fn create_join_event( .collect() .await; + trace!(%omit_members, "Constructing current state"); let state = state_ids .iter() .try_stream() + .broad_filter_map(|event_id| async move { + if omit_members { + if let Ok(e) = event_id.as_ref() { + let pdu = services.rooms.timeline.get_pdu(e).await; + if pdu.is_ok_and(|p| p.kind().to_cow_str() == "m.room.member") { + trace!("omitting member event {e:?} from returned state"); + // skip members + return None; + } + } + } + Some(event_id) + }) .broad_and_then(|event_id| services.rooms.timeline.get_pdu_json(event_id)) .broad_and_then(|pdu| { services @@ -234,6 +248,7 @@ async fn create_join_event( .await?; let starting_events = state_ids.iter().map(Borrow::borrow); + trace!("Constructing auth chain"); let auth_chain = services .rooms .auth_chain @@ -250,13 +265,37 @@ async fn create_join_event( .try_collect() .boxed() .await?; - + info!(fast_join = %omit_members, "Sending join event to other servers"); services.sending.send_pdu_room(room_id, &pdu_id).await?; - - Ok(create_join_event::v1::RoomState { + debug!("Finished sending join event"); + let servers_in_room: Option> = if !omit_members { + None + } else { + trace!("Fetching list of servers in room"); + let servers: Vec = services + .rooms + .state_cache + .room_servers(room_id) + .map(|sn| sn.as_str().to_owned()) + .collect() + .await; + // If there's no servers, just add us + let servers = if servers.is_empty() { + warn!("Failed to find any servers, adding our own server name as a last resort"); + vec![services.globals.server_name().to_string()] + } else { + trace!("Found {} servers in room", servers.len()); + servers + }; + Some(servers) + }; + debug!("Returning send_join data"); + Ok(create_join_event::v2::RoomState { auth_chain, state, event: to_raw_value(&CanonicalJsonValue::Object(value)).ok(), + members_omitted: omit_members, + servers_in_room, }) } @@ -294,11 +333,23 @@ pub(crate) async fn create_join_event_v1_route( } } - let room_state = create_join_event(&services, body.origin(), &body.room_id, &body.pdu) + let now = Instant::now(); + let room_state = create_join_event(&services, body.origin(), &body.room_id, &body.pdu, false) .boxed() .await?; + let transformed = create_join_event::v1::RoomState { + auth_chain: room_state.auth_chain, + state: room_state.state, + event: room_state.event, + }; + info!( + "Finished sending a join for {} in {} in {:?}", + body.origin(), + &body.room_id, + now.elapsed() + ); - Ok(create_join_event::v1::Response { room_state }) + Ok(create_join_event::v1::Response { room_state: transformed }) } /// # `PUT /_matrix/federation/v2/send_join/{roomId}/{eventId}` @@ -329,17 +380,17 @@ pub(crate) async fn create_join_event_v2_route( } } - let create_join_event::v1::RoomState { auth_chain, state, event } = - create_join_event(&services, body.origin(), &body.room_id, &body.pdu) + let now = Instant::now(); + let room_state = + create_join_event(&services, body.origin(), &body.room_id, &body.pdu, body.omit_members) .boxed() .await?; - let room_state = create_join_event::v2::RoomState { - members_omitted: false, - auth_chain, - state, - event, - servers_in_room: None, - }; + info!( + "Finished sending a join for {} in {} in {:?}", + body.origin(), + &body.room_id, + now.elapsed() + ); Ok(create_join_event::v2::Response { room_state }) } diff --git a/src/core/matrix/pdu/count.rs b/src/core/matrix/pdu/count.rs index b880278f..7fb12574 100644 --- a/src/core/matrix/pdu/count.rs +++ b/src/core/matrix/pdu/count.rs @@ -1,6 +1,10 @@ #![allow(clippy::cast_possible_wrap, clippy::cast_sign_loss, clippy::as_conversions)] -use std::{cmp::Ordering, fmt, fmt::Display, str::FromStr}; +use std::{ + cmp::Ordering, + fmt::{self, Display}, + str::FromStr, +}; use ruma::api::Direction; diff --git a/src/service/rooms/state_compressor/mod.rs b/src/service/rooms/state_compressor/mod.rs index f7f7d043..028c3e51 100644 --- a/src/service/rooms/state_compressor/mod.rs +++ b/src/service/rooms/state_compressor/mod.rs @@ -452,7 +452,7 @@ async fn get_statediff(&self, shortstatehash: ShortStateHash) -> Result Re .boxed(); while let Some(ref backfill_server) = servers.next().await { - info!("Asking {backfill_server} for backfill"); + info!("Asking {backfill_server} for backfill in {room_id}"); let response = self .services .sending @@ -128,10 +129,126 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re } } - info!("No servers could backfill, but backfill was needed in room {room_id}"); + warn!("No servers could backfill, but backfill was needed in room {room_id}"); Ok(()) } +#[implement(super::Service)] +#[tracing::instrument(name = "get_remote_pdu", level = "debug", skip(self))] +pub async fn get_remote_pdu(&self, room_id: &RoomId, event_id: &EventId) -> Result { + let local = self.get_pdu(event_id).await; + if local.is_ok() { + // We already have this PDU, no need to backfill + debug!("We already have {event_id} in {room_id}, no need to backfill."); + return local; + } + debug!("Preparing to fetch event {event_id} in room {room_id} from remote servers."); + // Similar to backfill_if_required, but only for a single PDU + // Fetch a list of servers to try + if self + .services + .state_cache + .room_joined_count(room_id) + .await + .is_ok_and(|count| count <= 1) + && !self + .services + .state_accessor + .is_world_readable(room_id) + .await + { + // Room is empty (1 user or none), there is no one that can backfill + return Err!(Request(NotFound("No one can backfill this PDU, room is empty."))); + } + + let power_levels: RoomPowerLevelsEventContent = self + .services + .state_accessor + .room_state_get_content(room_id, &StateEventType::RoomPowerLevels, "") + .await + .unwrap_or_default(); + + let room_mods = power_levels.users.iter().filter_map(|(user_id, level)| { + if level > &power_levels.users_default && !self.services.globals.user_is_local(user_id) { + Some(user_id.server_name()) + } else { + None + } + }); + + let canonical_room_alias_server = once( + self.services + .state_accessor + .get_canonical_alias(room_id) + .await, + ) + .filter_map(Result::ok) + .map(|alias| alias.server_name().to_owned()) + .stream(); + let mut servers = room_mods + .stream() + .map(ToOwned::to_owned) + .chain(canonical_room_alias_server) + .chain( + self.services + .server + .config + .trusted_servers + .iter() + .map(ToOwned::to_owned) + .stream(), + ) + .ready_filter(|server_name| !self.services.globals.server_is_ours(server_name)) + .filter_map(|server_name| async move { + self.services + .state_cache + .server_in_room(&server_name, room_id) + .await + .then_some(server_name) + }) + .boxed(); + + while let Some(ref backfill_server) = servers.next().await { + info!("Asking {backfill_server} for event {}", event_id); + let value = self + .services + .sending + .send_federation_request(backfill_server, federation::event::get_event::v1::Request { + event_id: event_id.to_owned(), + include_unredacted_content: Some(false), + }) + .await + .and_then(|response| { + serde_json::from_str::(response.pdu.get()).map_err(|e| { + err!(BadServerResponse(debug_warn!( + "Error parsing incoming event {e:?} from {backfill_server}" + ))) + }) + }); + let pdu = match value { + | Ok(value) => { + self.services + .event_handler + .handle_incoming_pdu(backfill_server, room_id, event_id, value, false) + .boxed() + .await?; + debug!("Successfully backfilled {event_id} from {backfill_server}"); + Some(self.get_pdu(event_id).await) + }, + | Err(e) => { + warn!("{backfill_server} failed to provide backfill for room {room_id}: {e}"); + None + }, + }; + if let Some(pdu) = pdu { + debug!("Fetched {event_id} from {backfill_server}"); + return pdu; + } + } + + Err!("No servers could be used to fetch {} in {}.", room_id, event_id) +} + #[implement(super::Service)] #[tracing::instrument(skip(self, pdu), level = "debug")] pub async fn backfill_pdu(&self, origin: &ServerName, pdu: Box) -> Result<()> { diff --git a/src/service/rooms/timeline/data.rs b/src/service/rooms/timeline/data.rs index fa10a5c0..9064df6c 100644 --- a/src/service/rooms/timeline/data.rs +++ b/src/service/rooms/timeline/data.rs @@ -3,8 +3,7 @@ use std::{borrow::Borrow, sync::Arc}; use conduwuit::{ Err, PduCount, PduEvent, Result, at, err, result::{LogErr, NotFound}, - utils, - utils::stream::TryReadyExt, + utils::{self, stream::TryReadyExt}, }; use database::{Database, Deserialized, Json, KeyVal, Map}; use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, future::select_ok, pin_mut};