From 1a3107c20ad473e4f5b2cc7ac702dd1a59ce7a70 Mon Sep 17 00:00:00 2001 From: Tom Foster Date: Sun, 7 Sep 2025 09:28:59 +0100 Subject: [PATCH 01/47] fix(ci): Replace Mozilla sccache action with token-free alternative Replace mozilla-actions/sccache-action with a custom Forgejo-specific implementation that eliminates GitHub token dependencies and rate limiting issues for all contributors regardless of repository permissions. The new action mirrors sccache binaries to the Forgejo package registry and queries that instead of GitHub releases, maintaining identical functionality including hostedtoolcache support. --- .forgejo/actions/sccache/action.yml | 8 +------- .forgejo/actions/setup-rust/action.yml | 12 +----------- 2 files changed, 2 insertions(+), 18 deletions(-) diff --git a/.forgejo/actions/sccache/action.yml b/.forgejo/actions/sccache/action.yml index b5e5dcf4..b2441109 100644 --- a/.forgejo/actions/sccache/action.yml +++ b/.forgejo/actions/sccache/action.yml @@ -2,18 +2,12 @@ name: sccache description: | Install sccache for caching builds in GitHub Actions. -inputs: - token: - description: 'A Github PAT' - required: false runs: using: composite steps: - name: Install sccache - uses: https://github.com/mozilla-actions/sccache-action@v0.0.9 - with: - token: ${{ inputs.token }} + uses: https://git.tomfos.tr/tom/sccache-action@v1 - name: Configure sccache uses: https://github.com/actions/github-script@v7 with: diff --git a/.forgejo/actions/setup-rust/action.yml b/.forgejo/actions/setup-rust/action.yml index 091da8c2..a8736a75 100644 --- a/.forgejo/actions/setup-rust/action.yml +++ b/.forgejo/actions/setup-rust/action.yml @@ -88,19 +88,9 @@ runs: # Shared toolchain cache across all Rust versions key: toolchain-${{ steps.runner-os.outputs.slug }} - - name: Debug GitHub token availability - shell: bash - run: | - if [ -z "${{ inputs.github-token }}" ]; then - echo "⚠️ No GitHub token provided - sccache will use fallback download method" - else - echo "✅ GitHub token provided for sccache" - fi - name: Setup sccache - uses: https://github.com/mozilla-actions/sccache-action@v0.0.9 - with: - token: ${{ inputs.github-token }} + uses: https://git.tomfos.tr/tom/sccache-action@v1 - name: Cache build artifacts id: build-cache From fff9629b0f4d28148007f180f8c52807894b7c2b Mon Sep 17 00:00:00 2001 From: Tom Foster Date: Sun, 7 Sep 2025 13:21:58 +0100 Subject: [PATCH 02/47] fix(docker): Resolve liburing.so.2 loading error for non-root users Container failed to start when running as non-root (user 1000:1000) because copied directories had restrictive 770 permissions, likely due to different umask in persistent BuildKit. Non-root users couldn't access /usr/lib to load required dynamic libraries. Introduces prepper stage using Ubuntu to organize files into layered structure with explicit 755 directory permissions before copying to scratch image. Also fixes workflow syntax error and removes docker/** from paths-ignore to ensure Docker changes trigger CI builds. --- .forgejo/workflows/release-image.yml | 4 +- docker/Dockerfile | 57 ++++++++++++++++++++-------- 2 files changed, 42 insertions(+), 19 deletions(-) diff --git a/.forgejo/workflows/release-image.yml b/.forgejo/workflows/release-image.yml index 7b29b7ca..834b5602 100644 --- a/.forgejo/workflows/release-image.yml +++ b/.forgejo/workflows/release-image.yml @@ -11,7 +11,6 @@ on: - ".gitignore" - "renovate.json" - "pkg/**" - - "docker/**" - "docs/**" push: branches: @@ -23,7 +22,6 @@ on: - ".gitignore" - "renovate.json" - "pkg/**" - - "docker/**" - "docs/**" # Allows you to run this workflow manually from the Actions tab workflow_dispatch: @@ -199,7 +197,7 @@ jobs: context: . file: "docker/Dockerfile" build-args: | - GIT_COMMIT_HASH=${{ github.sha }}) + GIT_COMMIT_HASH=${{ github.sha }} GIT_COMMIT_HASH_SHORT=${{ env.COMMIT_SHORT_SHA }} GIT_REMOTE_URL=${{github.event.repository.html_url }} GIT_REMOTE_COMMIT_URL=${{github.event.head_commit.url }} diff --git a/docker/Dockerfile b/docker/Dockerfile index 55150902..17e1c6a1 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -199,32 +199,57 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \ EOF # Extract dynamically linked dependencies -RUN </dev/null) && [ -n "$lddtree_output" ]; then + echo "$lddtree_output" | awk '{print $(NF-0) " " $1}' | sort -u -k 1,1 | \ + awk '{dest = ($2 ~ /^\//) ? "/out/libs-root" $2 : "/out/libs/" $2; print "install -D " $1 " " dest}' | \ + while read cmd; do eval "$cmd"; done + fi done -EOF + + # Show what will be copied to runtime + echo "=== Libraries being copied to runtime image:" + find /out/libs* -type f 2>/dev/null | sort || echo "No libraries found" +DEPS_EOF + +FROM ubuntu:latest AS prepper + +# Create layer structure +RUN mkdir -p /layer1/etc/ssl/certs \ + /layer2/usr/lib \ + /layer3/sbin /layer3/sbom + +# Copy SSL certs and root-path libraries to layer1 (ultra-stable) +COPY --from=base /etc/ssl/certs /layer1/etc/ssl/certs +COPY --from=builder /out/libs-root/ /layer1/ + +# Copy application libraries to layer2 (semi-stable) +COPY --from=builder /out/libs/ /layer2/usr/lib/ + +# Copy binaries and SBOM to layer3 (volatile) +COPY --from=builder /out/sbin/ /layer3/sbin/ +COPY --from=builder /out/sbom/ /layer3/sbom/ + +# Fix permissions after copying +RUN chmod -R 755 /layer1 /layer2 /layer3 FROM scratch WORKDIR / -# Copy root certs for tls into image -# You can also mount the certs from the host -# --volume /etc/ssl/certs:/etc/ssl/certs:ro -COPY --from=base /etc/ssl/certs /etc/ssl/certs +# Copy ultra-stable layer (SSL certs, system libraries) +COPY --from=prepper /layer1/ / -# Copy our build -COPY --from=builder /out/sbin/ /sbin/ -# Copy SBOM -COPY --from=builder /out/sbom/ /sbom/ +# Copy semi-stable layer (application libraries) +COPY --from=prepper /layer2/ / -# Copy dynamic libraries to root -COPY --from=builder /out/libs-root/ / -COPY --from=builder /out/libs/ /usr/lib/ +# Copy volatile layer (binaries, SBOM) +COPY --from=prepper /layer3/ / # Inform linker where to find libraries ENV LD_LIBRARY_PATH=/usr/lib From d640853f9d3c5cd33fc14775090d634747091bdc Mon Sep 17 00:00:00 2001 From: Tom Foster Date: Sun, 7 Sep 2025 14:32:11 +0100 Subject: [PATCH 03/47] ci(docs): Optimise build performance with caching and conditional Node.js Skip installing Node.js entirely if v20+ is already available, otherwise install v22. Add npm dependency caching with OS-specific cache keys using the custom detect-runner-os action for proper cache isolation between runners. Dependencies normally take just under 10s, so this should more than halve the doc build time to free up runner slots. --- .forgejo/actions/detect-runner-os/action.yml | 21 +++++++++++++++++++- .forgejo/workflows/documentation.yml | 15 +++++++++++++- 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/.forgejo/actions/detect-runner-os/action.yml b/.forgejo/actions/detect-runner-os/action.yml index 6ada1d5d..be8f9b0a 100644 --- a/.forgejo/actions/detect-runner-os/action.yml +++ b/.forgejo/actions/detect-runner-os/action.yml @@ -13,6 +13,12 @@ outputs: slug: description: 'Combined OS slug (e.g. Ubuntu-22.04)' value: ${{ steps.detect.outputs.slug }} + node_major: + description: 'Major version of Node.js if available (e.g. 22)' + value: ${{ steps.detect.outputs.node_major }} + node_version: + description: 'Full Node.js version if available (e.g. 22.19.0)' + value: ${{ steps.detect.outputs.node_version }} runs: using: composite @@ -30,7 +36,20 @@ runs: # Create combined slug OS_SLUG="${OS_NAME}-${OS_VERSION}" - # Set outputs + # Detect Node.js version if available + if command -v node >/dev/null 2>&1; then + NODE_VERSION=$(node --version | sed 's/v//') + NODE_MAJOR=$(echo $NODE_VERSION | cut -d. -f1) + echo "node_version=${NODE_VERSION}" >> $GITHUB_OUTPUT + echo "node_major=${NODE_MAJOR}" >> $GITHUB_OUTPUT + echo "🔍 Detected Node.js: v${NODE_VERSION}" + else + echo "node_version=" >> $GITHUB_OUTPUT + echo "node_major=" >> $GITHUB_OUTPUT + echo "🔍 Node.js not found" + fi + + # Set OS outputs echo "name=${OS_NAME}" >> $GITHUB_OUTPUT echo "version=${OS_VERSION}" >> $GITHUB_OUTPUT echo "slug=${OS_SLUG}" >> $GITHUB_OUTPUT diff --git a/.forgejo/workflows/documentation.yml b/.forgejo/workflows/documentation.yml index 4f3e903c..67c8a30c 100644 --- a/.forgejo/workflows/documentation.yml +++ b/.forgejo/workflows/documentation.yml @@ -49,10 +49,23 @@ jobs: cp ./docs/static/_headers ./public/_headers echo "Copied .well-known files and _headers to ./public" + - name: Detect runner environment + id: runner-env + uses: ./.forgejo/actions/detect-runner-os + - name: Setup Node.js + if: steps.runner-env.outputs.node_major == '' || steps.runner-env.outputs.node_major < '20' uses: https://github.com/actions/setup-node@v4 with: - node-version: 20 + node-version: 22 + + - name: Cache npm dependencies + uses: actions/cache@v3 + with: + path: ~/.npm + key: ${{ steps.runner-env.outputs.slug }}-node-${{ hashFiles('**/package-lock.json') }} + restore-keys: | + ${{ steps.runner-env.outputs.slug }}-node- - name: Install dependencies run: npm install --save-dev wrangler@latest From 84fdcd326a4076bad19dc62e0061a15cafe54158 Mon Sep 17 00:00:00 2001 From: Tom Foster Date: Sun, 7 Sep 2025 17:08:36 +0100 Subject: [PATCH 04/47] fix(ci): Resolve registry push failures for fork PRs Fork PRs now fail during Docker image build with 'tag is needed when pushing to registry' because BUILTIN_REGISTRY_ENABLED evaluates to false without proper credentials, leaving the images list empty. This appears to be due to recent Forgejo permission changes affecting fork access to repository secrets. Add fallback to official registry when credentials unavailable, skip registry login and push operations for forks, and make merge job conditional since no digests exist without push. This allows forks to test Docker builds whilst avoiding authentication failures. --- .forgejo/workflows/release-image.yml | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/.forgejo/workflows/release-image.yml b/.forgejo/workflows/release-image.yml index 834b5602..069f1d34 100644 --- a/.forgejo/workflows/release-image.yml +++ b/.forgejo/workflows/release-image.yml @@ -53,6 +53,9 @@ jobs: let images = [] if (process.env.BUILTIN_REGISTRY_ENABLED === "true") { images.push(builtinImage) + } else { + // Fallback to official registry for forks/PRs without credentials + images.push('forgejo.ellis.link/continuwuation/continuwuity') } core.setOutput('images', images.join("\n")) core.setOutput('images_list', images.join(",")) @@ -111,6 +114,7 @@ jobs: uses: docker/setup-qemu-action@v3 # Uses the `docker/login-action` action to log in to the Container registry registry using the account and password that will publish the packages. Once published, the packages are scoped to the account defined here. - name: Login to builtin registry + if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }} uses: docker/login-action@v3 with: registry: ${{ env.BUILTIN_REGISTRY }} @@ -207,7 +211,7 @@ jobs: cache-from: type=gha # cache-to: type=gha,mode=max sbom: true - outputs: type=image,"name=${{ needs.define-variables.outputs.images_list }}",push-by-digest=true,name-canonical=true,push=true + outputs: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' && format('type=image,"name={0}",push-by-digest=true,name-canonical=true,push=true', needs.define-variables.outputs.images_list) || 'type=docker' }} env: SOURCE_DATE_EPOCH: ${{ env.TIMESTAMP }} @@ -249,6 +253,7 @@ jobs: needs: [define-variables, build-image] steps: - name: Download digests + if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }} uses: forgejo/download-artifact@v4 with: path: /tmp/digests @@ -256,6 +261,7 @@ jobs: merge-multiple: true # Uses the `docker/login-action` action to log in to the Container registry registry using the account and password that will publish the packages. Once published, the packages are scoped to the account defined here. - name: Login to builtin registry + if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }} uses: docker/login-action@v3 with: registry: ${{ env.BUILTIN_REGISTRY }} @@ -263,6 +269,7 @@ jobs: password: ${{ secrets.BUILTIN_REGISTRY_PASSWORD || secrets.GITHUB_TOKEN }} - name: Set up Docker Buildx + if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }} uses: docker/setup-buildx-action@v3 with: # Use persistent BuildKit if BUILDKIT_ENDPOINT is set (e.g. tcp://buildkit:8125) @@ -270,6 +277,7 @@ jobs: endpoint: ${{ env.BUILDKIT_ENDPOINT || '' }} - name: Extract metadata (tags) for Docker + if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }} id: meta uses: docker/metadata-action@v5 with: @@ -287,6 +295,7 @@ jobs: DOCKER_METADATA_ANNOTATIONS_LEVELS: index - name: Create manifest list and push + if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }} working-directory: /tmp/digests env: IMAGES: ${{needs.define-variables.outputs.images}} @@ -304,6 +313,7 @@ jobs: done - name: Inspect image + if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }} env: IMAGES: ${{needs.define-variables.outputs.images}} shell: bash From 2cedf0d2e1346769b023cd6e42ed3279142af5de Mon Sep 17 00:00:00 2001 From: Tom Foster Date: Sun, 7 Sep 2025 18:32:38 +0100 Subject: [PATCH 05/47] fix(ci): Use image output instead of docker for fork PRs Docker exporter doesn't support manifest lists (multi-platform builds). For fork PRs without registry credentials, use 'type=image,push=false' instead of 'type=docker' to build multi-platform images locally without pushing. --- .forgejo/workflows/release-image.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.forgejo/workflows/release-image.yml b/.forgejo/workflows/release-image.yml index 069f1d34..52f5e6e0 100644 --- a/.forgejo/workflows/release-image.yml +++ b/.forgejo/workflows/release-image.yml @@ -211,7 +211,7 @@ jobs: cache-from: type=gha # cache-to: type=gha,mode=max sbom: true - outputs: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' && format('type=image,"name={0}",push-by-digest=true,name-canonical=true,push=true', needs.define-variables.outputs.images_list) || 'type=docker' }} + outputs: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' && format('type=image,"name={0}",push-by-digest=true,name-canonical=true,push=true', needs.define-variables.outputs.images_list) || format('type=image,"name={0}",push=false', needs.define-variables.outputs.images_list) }} env: SOURCE_DATE_EPOCH: ${{ env.TIMESTAMP }} From 1e9701f379053b6f20681b48db6ede5026db4681 Mon Sep 17 00:00:00 2001 From: Tom Foster Date: Sun, 7 Sep 2025 18:59:05 +0100 Subject: [PATCH 06/47] ci(release-image): Skip setup steps when using persistent BuildKit When BUILDKIT_ENDPOINT is set, builds run on a persistent BuildKit instance, making runner setup steps unnecessary. Skip Rust toolchain installation, QEMU setup, caching steps, and timelord to eliminate ~7 operations per job. Also adds output to git SHA and timestamp steps for visibility. Cuts at least a minute off average build time through fewer installs, cache restores, and cache saves. --- .forgejo/workflows/release-image.yml | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/.forgejo/workflows/release-image.yml b/.forgejo/workflows/release-image.yml index 52f5e6e0..1a0e4f4e 100644 --- a/.forgejo/workflows/release-image.yml +++ b/.forgejo/workflows/release-image.yml @@ -101,6 +101,7 @@ jobs: with: persist-credentials: false - name: Install rust + if: ${{ env.BUILDKIT_ENDPOINT == '' }} id: rust-toolchain uses: ./.forgejo/actions/rust-toolchain @@ -111,6 +112,7 @@ jobs: driver: ${{ env.BUILDKIT_ENDPOINT != '' && 'remote' || 'docker-container' }} endpoint: ${{ env.BUILDKIT_ENDPOINT || '' }} - name: Set up QEMU + if: ${{ env.BUILDKIT_ENDPOINT == '' }} uses: docker/setup-qemu-action@v3 # Uses the `docker/login-action` action to log in to the Container registry registry using the account and password that will publish the packages. Once published, the packages are scoped to the account defined here. - name: Login to builtin registry @@ -140,15 +142,21 @@ jobs: run: | calculatedSha=$(git rev-parse --short ${{ github.sha }}) echo "COMMIT_SHORT_SHA=$calculatedSha" >> $GITHUB_ENV + echo "Short SHA: $calculatedSha" - name: Get Git commit timestamps - run: echo "TIMESTAMP=$(git log -1 --pretty=%ct)" >> $GITHUB_ENV + run: | + timestamp=$(git log -1 --pretty=%ct) + echo "TIMESTAMP=$timestamp" >> $GITHUB_ENV + echo "Commit timestamp: $timestamp" - uses: ./.forgejo/actions/timelord + if: ${{ env.BUILDKIT_ENDPOINT == '' }} with: key: timelord-v0 path: . - name: Cache Rust registry + if: ${{ env.BUILDKIT_ENDPOINT == '' }} uses: actions/cache@v3 with: path: | @@ -158,6 +166,7 @@ jobs: .cargo/registry/src key: rust-registry-image-${{hashFiles('**/Cargo.lock') }} - name: Cache cargo target + if: ${{ env.BUILDKIT_ENDPOINT == '' }} id: cache-cargo-target uses: actions/cache@v3 with: @@ -165,6 +174,7 @@ jobs: cargo-target-${{ matrix.target_cpu }}-${{ matrix.slug }}-${{ matrix.profile }} key: cargo-target-${{ matrix.target_cpu }}-${{ matrix.slug }}-${{ matrix.profile }}-${{hashFiles('**/Cargo.lock') }}-${{steps.rust-toolchain.outputs.rustc_version}} - name: Cache apt cache + if: ${{ env.BUILDKIT_ENDPOINT == '' }} id: cache-apt uses: actions/cache@v3 with: @@ -172,6 +182,7 @@ jobs: var-cache-apt-${{ matrix.slug }} key: var-cache-apt-${{ matrix.slug }} - name: Cache apt lib + if: ${{ env.BUILDKIT_ENDPOINT == '' }} id: cache-apt-lib uses: actions/cache@v3 with: @@ -179,6 +190,7 @@ jobs: var-lib-apt-${{ matrix.slug }} key: var-lib-apt-${{ matrix.slug }} - name: inject cache into docker + if: ${{ env.BUILDKIT_ENDPOINT == '' }} uses: https://github.com/reproducible-containers/buildkit-cache-dance@v3.3.0 with: cache-map: | From 4a1091dd06e3e4518edb4a3df296e7a28fef0da9 Mon Sep 17 00:00:00 2001 From: Tom Foster Date: Sun, 7 Sep 2025 20:06:39 +0100 Subject: [PATCH 07/47] ci(release-image): Unify binary extraction using BuildKit local output Fork PRs currently fail binary extraction with 'invalid reference format' and 'must specify at least one container source' errors. This replaces the registry-specific docker create/copy method with BuildKit's local output feature for all builds. Uses multiple outputs in single build: image export plus local binary extraction from /sbin. Speeds up extracting binary artifacts and saves a couple of extra workflow steps in the process. --- .forgejo/workflows/release-image.yml | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/.forgejo/workflows/release-image.yml b/.forgejo/workflows/release-image.yml index 1a0e4f4e..2a722edd 100644 --- a/.forgejo/workflows/release-image.yml +++ b/.forgejo/workflows/release-image.yml @@ -223,27 +223,23 @@ jobs: cache-from: type=gha # cache-to: type=gha,mode=max sbom: true - outputs: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' && format('type=image,"name={0}",push-by-digest=true,name-canonical=true,push=true', needs.define-variables.outputs.images_list) || format('type=image,"name={0}",push=false', needs.define-variables.outputs.images_list) }} + outputs: | + ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' && format('type=image,"name={0}",push-by-digest=true,name-canonical=true,push=true', needs.define-variables.outputs.images_list) || format('type=image,"name={0}",push=false', needs.define-variables.outputs.images_list) }} + type=local,dest=/tmp/binaries env: SOURCE_DATE_EPOCH: ${{ env.TIMESTAMP }} # For publishing multi-platform manifests - name: Export digest + if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }} run: | mkdir -p /tmp/digests digest="${{ steps.build.outputs.digest }}" touch "/tmp/digests/${digest#sha256:}" - - name: Extract binary from container (image) - id: extract-binary-image - run: | - mkdir -p /tmp/binaries - digest="${{ steps.build.outputs.digest }}" - echo "container_id=$(docker create --platform ${{ matrix.platform }} ${{ needs.define-variables.outputs.images_list }}@$digest)" >> $GITHUB_OUTPUT - - name: Extract binary from container (copy) - run: docker cp ${{ steps.extract-binary-image.outputs.container_id }}:/sbin/conduwuit /tmp/binaries/conduwuit-${{ matrix.target_cpu }}-${{ matrix.slug }}-${{ matrix.profile }} - - name: Extract binary from container (cleanup) - run: docker rm ${{ steps.extract-binary-image.outputs.container_id }} + # Binary extracted via local output for all builds + - name: Rename extracted binary + run: mv /tmp/binaries/sbin/conduwuit /tmp/binaries/conduwuit-${{ matrix.target_cpu }}-${{ matrix.slug }}-${{ matrix.profile }} - name: Upload binary artifact uses: forgejo/upload-artifact@v4 From 6cf3c839e4b2b6906f4017a3f77591937f37af6e Mon Sep 17 00:00:00 2001 From: Tom Foster Date: Sun, 7 Sep 2025 21:27:56 +0100 Subject: [PATCH 08/47] ci(release-image): Skip digest upload when not pushing images After #992, builds without registry credentials skip Docker image output but still extract binary artifacts. However, we were still trying to upload digests for images that weren't created. Add conditional check to only upload digests when actually pushing to registry. --- .forgejo/workflows/release-image.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.forgejo/workflows/release-image.yml b/.forgejo/workflows/release-image.yml index 2a722edd..b7423567 100644 --- a/.forgejo/workflows/release-image.yml +++ b/.forgejo/workflows/release-image.yml @@ -249,6 +249,7 @@ jobs: if-no-files-found: error - name: Upload digest + if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }} uses: forgejo/upload-artifact@v4 with: name: digests-${{ matrix.slug }} From 2cdccbf2fe16fee50c8d93a9816c66c4ec6bf698 Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Wed, 3 Sep 2025 14:20:50 +0100 Subject: [PATCH 09/47] feat(PR977): Support omitting members in the send_join response --- src/api/server/send_join.rs | 97 ++++++++++++++++++----- src/service/rooms/state_compressor/mod.rs | 2 +- 2 files changed, 80 insertions(+), 19 deletions(-) diff --git a/src/api/server/send_join.rs b/src/api/server/send_join.rs index 652451c7..3aebbbe7 100644 --- a/src/api/server/send_join.rs +++ b/src/api/server/send_join.rs @@ -1,12 +1,13 @@ #![allow(deprecated)] -use std::borrow::Borrow; +use std::{borrow::Borrow, time::Instant}; use axum::extract::State; use conduwuit::{ - Err, Result, at, err, + Err, Event, Result, at, debug, err, info, matrix::event::gen_event_id_canonical_json, - utils::stream::{IterStream, TryBroadbandExt}, + trace, + utils::stream::{BroadbandExt, IterStream, TryBroadbandExt}, warn, }; use conduwuit_service::Services; @@ -25,12 +26,14 @@ use serde_json::value::{RawValue as RawJsonValue, to_raw_value}; use crate::Ruma; /// helper method for /send_join v1 and v2 +#[tracing::instrument(skip(services, pdu, omit_members), fields(room_id = room_id.as_str(), origin = origin.as_str()))] async fn create_join_event( services: &Services, origin: &ServerName, room_id: &RoomId, pdu: &RawJsonValue, -) -> Result { + omit_members: bool, +) -> Result { if !services.rooms.metadata.exists(room_id).await { return Err!(Request(NotFound("Room is unknown to this server."))); } @@ -201,6 +204,7 @@ async fn create_join_event( .lock(room_id) .await; + debug!("Acquired send_join mutex, persisting join event"); let pdu_id = services .rooms .event_handler @@ -210,7 +214,7 @@ async fn create_join_event( .ok_or_else(|| err!(Request(InvalidParam("Could not accept as timeline event."))))?; drop(mutex_lock); - + debug!("Fetching current state IDs"); let state_ids: Vec = services .rooms .state_accessor @@ -219,9 +223,25 @@ async fn create_join_event( .collect() .await; + #[allow(clippy::unnecessary_unwrap)] let state = state_ids .iter() .try_stream() + .broad_filter_map(|event_id| async move { + if omit_members && event_id.is_ok() { + let pdu = services + .rooms + .timeline + .get_pdu(event_id.as_ref().unwrap()) + .await; + if pdu.is_ok_and(|p| p.kind().to_cow_str() == "m.room.member") { + trace!("omitting member event {event_id:?} from returned state"); + // skip members + return None; + } + } + Some(event_id) + }) .broad_and_then(|event_id| services.rooms.timeline.get_pdu_json(event_id)) .broad_and_then(|pdu| { services @@ -238,6 +258,17 @@ async fn create_join_event( .rooms .auth_chain .event_ids_iter(room_id, starting_events) + .broad_filter_map(|event_id| async { + if omit_members && event_id.as_ref().is_ok_and(|e| state_ids.contains(e)) { + // Don't include this event if it's already in the state + trace!( + "omitting member event {event_id:?} from returned auth chain as it is \ + already in state" + ); + return None; + } + Some(event_id) + }) .broad_and_then(|event_id| async move { services.rooms.timeline.get_pdu_json(&event_id).await }) @@ -252,11 +283,27 @@ async fn create_join_event( .await?; services.sending.send_pdu_room(room_id, &pdu_id).await?; - - Ok(create_join_event::v1::RoomState { + let servers_in_room: Option> = if omit_members { + None + } else { + debug!("Fetching list of servers in room"); + Some( + services + .rooms + .state_cache + .room_servers(room_id) + .map(|sn| sn.as_str().to_owned()) + .collect() + .await, + ) + }; + debug!("Returning send_join data"); + Ok(create_join_event::v2::RoomState { auth_chain, state, event: to_raw_value(&CanonicalJsonValue::Object(value)).ok(), + members_omitted: omit_members, + servers_in_room, }) } @@ -294,11 +341,24 @@ pub(crate) async fn create_join_event_v1_route( } } - let room_state = create_join_event(&services, body.origin(), &body.room_id, &body.pdu) + info!("Providing send_join for {} in {}", body.origin(), &body.room_id); + let now = Instant::now(); + let room_state = create_join_event(&services, body.origin(), &body.room_id, &body.pdu, false) .boxed() .await?; + let transformed = create_join_event::v1::RoomState { + auth_chain: room_state.auth_chain, + state: room_state.state, + event: room_state.event, + }; + info!( + "Finished creating the send_join payload for {} in {} in {:?}", + body.origin(), + &body.room_id, + now.elapsed() + ); - Ok(create_join_event::v1::Response { room_state }) + Ok(create_join_event::v1::Response { room_state: transformed }) } /// # `PUT /_matrix/federation/v2/send_join/{roomId}/{eventId}` @@ -329,17 +389,18 @@ pub(crate) async fn create_join_event_v2_route( } } - let create_join_event::v1::RoomState { auth_chain, state, event } = - create_join_event(&services, body.origin(), &body.room_id, &body.pdu) + info!("Providing send_join for {} in {}", body.origin(), &body.room_id); + let now = Instant::now(); + let room_state = + create_join_event(&services, body.origin(), &body.room_id, &body.pdu, body.omit_members) .boxed() .await?; - let room_state = create_join_event::v2::RoomState { - members_omitted: false, - auth_chain, - state, - event, - servers_in_room: None, - }; + info!( + "Finished creating the send_join payload for {} in {} in {:?}", + body.origin(), + &body.room_id, + now.elapsed() + ); Ok(create_join_event::v2::Response { room_state }) } diff --git a/src/service/rooms/state_compressor/mod.rs b/src/service/rooms/state_compressor/mod.rs index f7f7d043..028c3e51 100644 --- a/src/service/rooms/state_compressor/mod.rs +++ b/src/service/rooms/state_compressor/mod.rs @@ -452,7 +452,7 @@ async fn get_statediff(&self, shortstatehash: ShortStateHash) -> Result Date: Wed, 3 Sep 2025 15:08:28 +0100 Subject: [PATCH 10/47] fix(PR977): Omitting redundant entries from the auth_chain caused problems --- src/api/server/send_join.rs | 49 ++++++++++++++++++++----------------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/src/api/server/send_join.rs b/src/api/server/send_join.rs index 3aebbbe7..a52b5391 100644 --- a/src/api/server/send_join.rs +++ b/src/api/server/send_join.rs @@ -1,6 +1,6 @@ #![allow(deprecated)] -use std::{borrow::Borrow, time::Instant}; +use std::{borrow::Borrow, time::Instant, vec}; use axum::extract::State; use conduwuit::{ @@ -258,17 +258,17 @@ async fn create_join_event( .rooms .auth_chain .event_ids_iter(room_id, starting_events) - .broad_filter_map(|event_id| async { - if omit_members && event_id.as_ref().is_ok_and(|e| state_ids.contains(e)) { - // Don't include this event if it's already in the state - trace!( - "omitting member event {event_id:?} from returned auth chain as it is \ - already in state" - ); - return None; - } - Some(event_id) - }) + // .broad_filter_map(|event_id| async { + // if omit_members && event_id.as_ref().is_ok_and(|e| state_ids.contains(e)) { + // // Don't include this event if it's already in the state + // trace!( + // "omitting member event {event_id:?} from returned auth chain as it is \ + // already in state" + // ); + // return None; + // } + // Some(event_id) + // }) .broad_and_then(|event_id| async move { services.rooms.timeline.get_pdu_json(&event_id).await }) @@ -283,19 +283,24 @@ async fn create_join_event( .await?; services.sending.send_pdu_room(room_id, &pdu_id).await?; - let servers_in_room: Option> = if omit_members { + let servers_in_room: Option> = if !omit_members { None } else { debug!("Fetching list of servers in room"); - Some( - services - .rooms - .state_cache - .room_servers(room_id) - .map(|sn| sn.as_str().to_owned()) - .collect() - .await, - ) + let servers: Vec = services + .rooms + .state_cache + .room_servers(room_id) + .map(|sn| sn.as_str().to_owned()) + .collect() + .await; + // If there's no servers, just add us + let servers = if servers.is_empty() { + vec![services.globals.server_name().to_string()] + } else { + servers + }; + Some(servers) }; debug!("Returning send_join data"); Ok(create_join_event::v2::RoomState { From f47474d12aa67ffad071a6fd4b6665dc46cf29bf Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Sun, 7 Sep 2025 20:23:57 +0100 Subject: [PATCH 11/47] fix(PR977): Adjust some log levels --- src/api/server/send_join.rs | 29 ++++++++++------------------- 1 file changed, 10 insertions(+), 19 deletions(-) diff --git a/src/api/server/send_join.rs b/src/api/server/send_join.rs index a52b5391..3145f9fa 100644 --- a/src/api/server/send_join.rs +++ b/src/api/server/send_join.rs @@ -204,7 +204,7 @@ async fn create_join_event( .lock(room_id) .await; - debug!("Acquired send_join mutex, persisting join event"); + trace!("Acquired send_join mutex, persisting join event"); let pdu_id = services .rooms .event_handler @@ -214,7 +214,7 @@ async fn create_join_event( .ok_or_else(|| err!(Request(InvalidParam("Could not accept as timeline event."))))?; drop(mutex_lock); - debug!("Fetching current state IDs"); + trace!("Fetching current state IDs"); let state_ids: Vec = services .rooms .state_accessor @@ -258,17 +258,6 @@ async fn create_join_event( .rooms .auth_chain .event_ids_iter(room_id, starting_events) - // .broad_filter_map(|event_id| async { - // if omit_members && event_id.as_ref().is_ok_and(|e| state_ids.contains(e)) { - // // Don't include this event if it's already in the state - // trace!( - // "omitting member event {event_id:?} from returned auth chain as it is \ - // already in state" - // ); - // return None; - // } - // Some(event_id) - // }) .broad_and_then(|event_id| async move { services.rooms.timeline.get_pdu_json(&event_id).await }) @@ -281,12 +270,12 @@ async fn create_join_event( .try_collect() .boxed() .await?; - + info!(fast_join = %omit_members, "Sending a join for {origin} to {room_id}"); services.sending.send_pdu_room(room_id, &pdu_id).await?; let servers_in_room: Option> = if !omit_members { None } else { - debug!("Fetching list of servers in room"); + trace!("Fetching list of servers in room"); let servers: Vec = services .rooms .state_cache @@ -296,6 +285,10 @@ async fn create_join_event( .await; // If there's no servers, just add us let servers = if servers.is_empty() { + warn!( + "Failed to find any servers in {room_id}, adding our own server name as a last \ + resort" + ); vec![services.globals.server_name().to_string()] } else { servers @@ -346,7 +339,6 @@ pub(crate) async fn create_join_event_v1_route( } } - info!("Providing send_join for {} in {}", body.origin(), &body.room_id); let now = Instant::now(); let room_state = create_join_event(&services, body.origin(), &body.room_id, &body.pdu, false) .boxed() @@ -357,7 +349,7 @@ pub(crate) async fn create_join_event_v1_route( event: room_state.event, }; info!( - "Finished creating the send_join payload for {} in {} in {:?}", + "Finished sending a join for {} in {} in {:?}", body.origin(), &body.room_id, now.elapsed() @@ -394,14 +386,13 @@ pub(crate) async fn create_join_event_v2_route( } } - info!("Providing send_join for {} in {}", body.origin(), &body.room_id); let now = Instant::now(); let room_state = create_join_event(&services, body.origin(), &body.room_id, &body.pdu, body.omit_members) .boxed() .await?; info!( - "Finished creating the send_join payload for {} in {} in {:?}", + "Finished sending a join for {} in {} in {:?}", body.origin(), &body.room_id, now.elapsed() From d1fff1d09f7343cafd92df9282ff29b3cf9e3545 Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Sun, 7 Sep 2025 20:26:26 +0100 Subject: [PATCH 12/47] perf(pr977): Remove redundant ACL check in send_join --- src/api/server/send_join.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/src/api/server/send_join.rs b/src/api/server/send_join.rs index 3145f9fa..aa93047b 100644 --- a/src/api/server/send_join.rs +++ b/src/api/server/send_join.rs @@ -56,8 +56,10 @@ async fn create_join_event( // We do not add the event_id field to the pdu here because of signature and // hashes checks + trace!("Getting room version"); let room_version_id = services.rooms.state.get_room_version(room_id).await?; + trace!("Generating event ID and converting to canonical json"); let Ok((event_id, mut value)) = gen_event_id_canonical_json(pdu, &room_version_id) else { // Event could not be converted to canonical json return Err!(Request(BadJson("Could not convert event to canonical json."))); @@ -106,7 +108,6 @@ async fn create_join_event( ))); } - // ACL check sender user server name let sender: OwnedUserId = serde_json::from_value( value .get("sender") @@ -116,12 +117,6 @@ async fn create_join_event( ) .map_err(|e| err!(Request(BadJson(warn!("sender property is not a valid user ID: {e}")))))?; - services - .rooms - .event_handler - .acl_check(sender.server_name(), room_id) - .await?; - // check if origin server is trying to send for another server if sender.server_name() != origin { return Err!(Request(Forbidden("Not allowed to join on behalf of another server."))); From 09de586dc702b9adcb3298e974470066f198c43b Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Sun, 7 Sep 2025 20:32:26 +0100 Subject: [PATCH 13/47] feat(PR977): Log more things in the join process --- src/api/server/send_join.rs | 41 ++++++++++++++++++------------------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/src/api/server/send_join.rs b/src/api/server/send_join.rs index aa93047b..d92581a1 100644 --- a/src/api/server/send_join.rs +++ b/src/api/server/send_join.rs @@ -178,11 +178,6 @@ async fn create_join_event( } } - services - .server_keys - .hash_and_sign_event(&mut value, &room_version_id) - .map_err(|e| err!(Request(InvalidParam(warn!("Failed to sign send_join event: {e}")))))?; - let origin: OwnedServerName = serde_json::from_value( value .get("origin") @@ -192,6 +187,12 @@ async fn create_join_event( ) .map_err(|e| err!(Request(BadJson("Event has an invalid origin server name: {e}"))))?; + trace!("Signing send_join event"); + services + .server_keys + .hash_and_sign_event(&mut value, &room_version_id) + .map_err(|e| err!(Request(InvalidParam(warn!("Failed to sign send_join event: {e}")))))?; + let mutex_lock = services .rooms .event_handler @@ -218,21 +219,19 @@ async fn create_join_event( .collect() .await; - #[allow(clippy::unnecessary_unwrap)] + trace!(%omit_members, "Constructing current state"); let state = state_ids .iter() .try_stream() .broad_filter_map(|event_id| async move { - if omit_members && event_id.is_ok() { - let pdu = services - .rooms - .timeline - .get_pdu(event_id.as_ref().unwrap()) - .await; - if pdu.is_ok_and(|p| p.kind().to_cow_str() == "m.room.member") { - trace!("omitting member event {event_id:?} from returned state"); - // skip members - return None; + if omit_members { + if let Ok(e) = event_id.as_ref() { + let pdu = services.rooms.timeline.get_pdu(e).await; + if pdu.is_ok_and(|p| p.kind().to_cow_str() == "m.room.member") { + trace!("omitting member event {e:?} from returned state"); + // skip members + return None; + } } } Some(event_id) @@ -249,6 +248,7 @@ async fn create_join_event( .await?; let starting_events = state_ids.iter().map(Borrow::borrow); + trace!("Constructing auth chain"); let auth_chain = services .rooms .auth_chain @@ -265,8 +265,9 @@ async fn create_join_event( .try_collect() .boxed() .await?; - info!(fast_join = %omit_members, "Sending a join for {origin} to {room_id}"); + info!(fast_join = %omit_members, "Sending join event to other servers"); services.sending.send_pdu_room(room_id, &pdu_id).await?; + debug!("Finished sending join event"); let servers_in_room: Option> = if !omit_members { None } else { @@ -280,12 +281,10 @@ async fn create_join_event( .await; // If there's no servers, just add us let servers = if servers.is_empty() { - warn!( - "Failed to find any servers in {room_id}, adding our own server name as a last \ - resort" - ); + warn!("Failed to find any servers, adding our own server name as a last resort"); vec![services.globals.server_name().to_string()] } else { + trace!("Found {} servers in room", servers.len()); servers }; Some(servers) From e3fbf7a143e06528d505a0357e9558944c8bd928 Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Tue, 8 Jul 2025 17:07:50 +0100 Subject: [PATCH 14/47] feat: Ask remote servers for individual unknown events --- src/api/client/room/event.rs | 7 +- src/service/rooms/timeline/backfill.rs | 114 ++++++++++++++++++++++++- 2 files changed, 112 insertions(+), 9 deletions(-) diff --git a/src/api/client/room/event.rs b/src/api/client/room/event.rs index 47228d67..75ae0758 100644 --- a/src/api/client/room/event.rs +++ b/src/api/client/room/event.rs @@ -18,7 +18,7 @@ pub(crate) async fn get_room_event_route( let event = services .rooms .timeline - .get_pdu(event_id) + .get_remote_pdu(room_id, event_id) .map_err(|_| err!(Request(NotFound("Event {} not found.", event_id)))); let visible = services @@ -33,11 +33,6 @@ pub(crate) async fn get_room_event_route( return Err!(Request(Forbidden("You don't have permission to view this event."))); } - debug_assert!( - event.event_id() == event_id && event.room_id() == room_id, - "Fetched PDU must match requested" - ); - event.add_age().ok(); Ok(get_room_event::v3::Response { event: event.into_format() }) diff --git a/src/service/rooms/timeline/backfill.rs b/src/service/rooms/timeline/backfill.rs index e976981e..dd613afd 100644 --- a/src/service/rooms/timeline/backfill.rs +++ b/src/service/rooms/timeline/backfill.rs @@ -1,5 +1,6 @@ use std::iter::once; +use conduwuit::{Err, PduEvent}; use conduwuit_core::{ Result, debug, debug_warn, implement, info, matrix::{ @@ -11,7 +12,7 @@ use conduwuit_core::{ }; use futures::{FutureExt, StreamExt}; use ruma::{ - RoomId, ServerName, + EventId, RoomId, ServerName, api::federation, events::{ StateEventType, TimelineEventType, room::power_levels::RoomPowerLevelsEventContent, @@ -100,7 +101,7 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re .boxed(); while let Some(ref backfill_server) = servers.next().await { - info!("Asking {backfill_server} for backfill"); + info!("Asking {backfill_server} for backfill in {room_id}"); let response = self .services .sending @@ -128,10 +129,117 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re } } - info!("No servers could backfill, but backfill was needed in room {room_id}"); + warn!("No servers could backfill, but backfill was needed in room {room_id}"); Ok(()) } +#[implement(super::Service)] +#[tracing::instrument(name = "get_remote_pdu", level = "debug", skip(self))] +pub async fn get_remote_pdu(&self, room_id: &RoomId, event_id: &EventId) -> Result { + let local = self.get_pdu(event_id).await; + if local.is_ok() { + // We already have this PDU, no need to backfill + debug!("We already have {event_id} in {room_id}, no need to backfill."); + return local; + } + debug!("Preparing to fetch event {event_id} in room {room_id} from remote servers."); + // Similar to backfill_if_required, but only for a single PDU + // Fetch a list of servers to try + if self + .services + .state_cache + .room_joined_count(room_id) + .await + .is_ok_and(|count| count <= 1) + && !self + .services + .state_accessor + .is_world_readable(room_id) + .await + { + // Room is empty (1 user or none), there is no one that can backfill + return Err!(Request(NotFound("No one can backfill this PDU, room is empty."))); + } + + let power_levels: RoomPowerLevelsEventContent = self + .services + .state_accessor + .room_state_get_content(room_id, &StateEventType::RoomPowerLevels, "") + .await + .unwrap_or_default(); + + let room_mods = power_levels.users.iter().filter_map(|(user_id, level)| { + if level > &power_levels.users_default && !self.services.globals.user_is_local(user_id) { + Some(user_id.server_name()) + } else { + None + } + }); + + let canonical_room_alias_server = once( + self.services + .state_accessor + .get_canonical_alias(room_id) + .await, + ) + .filter_map(Result::ok) + .map(|alias| alias.server_name().to_owned()) + .stream(); + let mut servers = room_mods + .stream() + .map(ToOwned::to_owned) + .chain(canonical_room_alias_server) + .chain( + self.services + .server + .config + .trusted_servers + .iter() + .map(ToOwned::to_owned) + .stream(), + ) + .ready_filter(|server_name| !self.services.globals.server_is_ours(server_name)) + .filter_map(|server_name| async move { + self.services + .state_cache + .server_in_room(&server_name, room_id) + .await + .then_some(server_name) + }) + .boxed(); + + while let Some(ref backfill_server) = servers.next().await { + info!("Asking {backfill_server} for event {}", event_id); + let response = self + .services + .sending + .send_federation_request(backfill_server, federation::event::get_event::v1::Request { + event_id: event_id.to_owned(), + include_unredacted_content: Some(false), + }) + .await; + let pdu = match response { + | Ok(response) => { + self.backfill_pdu(backfill_server, response.pdu) + .boxed() + .await?; + debug!("Successfully backfilled {event_id} from {backfill_server}"); + Some(self.get_pdu(event_id).await) + }, + | Err(e) => { + warn!("{backfill_server} failed to provide backfill for room {room_id}: {e}"); + None + }, + }; + if let Some(pdu) = pdu { + debug!("Fetched {event_id} from {backfill_server}"); + return pdu; + } + } + + Err!("No servers could be used to fetch {} in {}.", room_id, event_id) +} + #[implement(super::Service)] #[tracing::instrument(skip(self, pdu), level = "debug")] pub async fn backfill_pdu(&self, origin: &ServerName, pdu: Box) -> Result<()> { From f3824ffc3d5c2ed6d5067f134850a451564c8f7a Mon Sep 17 00:00:00 2001 From: Ginger Date: Tue, 2 Sep 2025 15:12:03 -0400 Subject: [PATCH 15/47] fix: Use `handle_incoming_pdu` directly to keep remote PDUs as outliers --- src/service/rooms/timeline/backfill.rs | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/src/service/rooms/timeline/backfill.rs b/src/service/rooms/timeline/backfill.rs index dd613afd..c0171691 100644 --- a/src/service/rooms/timeline/backfill.rs +++ b/src/service/rooms/timeline/backfill.rs @@ -2,7 +2,7 @@ use std::iter::once; use conduwuit::{Err, PduEvent}; use conduwuit_core::{ - Result, debug, debug_warn, implement, info, + Result, debug, debug_warn, err, implement, info, matrix::{ event::Event, pdu::{PduCount, PduId, RawPduId}, @@ -12,7 +12,7 @@ use conduwuit_core::{ }; use futures::{FutureExt, StreamExt}; use ruma::{ - EventId, RoomId, ServerName, + CanonicalJsonObject, EventId, RoomId, ServerName, api::federation, events::{ StateEventType, TimelineEventType, room::power_levels::RoomPowerLevelsEventContent, @@ -210,17 +210,26 @@ pub async fn get_remote_pdu(&self, room_id: &RoomId, event_id: &EventId) -> Resu while let Some(ref backfill_server) = servers.next().await { info!("Asking {backfill_server} for event {}", event_id); - let response = self + let value = self .services .sending .send_federation_request(backfill_server, federation::event::get_event::v1::Request { event_id: event_id.to_owned(), include_unredacted_content: Some(false), }) - .await; - let pdu = match response { - | Ok(response) => { - self.backfill_pdu(backfill_server, response.pdu) + .await + .and_then(|response| { + serde_json::from_str::(response.pdu.get()).map_err(|e| { + err!(BadServerResponse(debug_warn!( + "Error parsing incoming event {e:?} from {backfill_server}" + ))) + }) + }); + let pdu = match value { + | Ok(value) => { + self.services + .event_handler + .handle_incoming_pdu(backfill_server, &room_id, &event_id, value, false) .boxed() .await?; debug!("Successfully backfilled {event_id} from {backfill_server}"); From e38dec58648aefec3a289bbfb5043bd1b02e4187 Mon Sep 17 00:00:00 2001 From: Ginger Date: Thu, 4 Sep 2025 10:32:35 -0400 Subject: [PATCH 16/47] fix: Put the output of `!admin query room-timeline pdus` in a codeblock --- src/admin/query/room_timeline.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/admin/query/room_timeline.rs b/src/admin/query/room_timeline.rs index afcfec34..0f72b58c 100644 --- a/src/admin/query/room_timeline.rs +++ b/src/admin/query/room_timeline.rs @@ -57,5 +57,5 @@ pub(super) async fn pdus( .try_collect() .await?; - self.write_str(&format!("{result:#?}")).await + self.write_str(&format!("```\n{result:#?}\n```")).await } From 16f4efa7089292d27e27e49fc5231a58d020aed3 Mon Sep 17 00:00:00 2001 From: Ginger Date: Thu, 4 Sep 2025 10:33:43 -0400 Subject: [PATCH 17/47] fix: Fix pagination tokens being corrupted for backfilled PDUs --- src/api/client/message.rs | 6 +++--- src/api/client/relations.rs | 4 ++-- src/api/client/threads.rs | 4 ++-- src/api/client/utils.rs | 31 +++++------------------------- src/core/matrix/pdu/count.rs | 6 +++++- src/service/rooms/timeline/data.rs | 3 +-- 6 files changed, 18 insertions(+), 36 deletions(-) diff --git a/src/api/client/message.rs b/src/api/client/message.rs index f5a951f4..0145b7fe 100644 --- a/src/api/client/message.rs +++ b/src/api/client/message.rs @@ -35,7 +35,7 @@ use ruma::{ }; use tracing::warn; -use super::utils::{count_to_token, parse_pagination_token as parse_token}; +use super::utils::{count_to_pagination_token, pagination_token_to_count as parse_token}; use crate::Ruma; /// list of safe and common non-state events to ignore if the user is ignored @@ -181,8 +181,8 @@ pub(crate) async fn get_message_events_route( .collect(); Ok(get_message_events::v3::Response { - start: count_to_token(from), - end: next_token.map(count_to_token), + start: count_to_pagination_token(from), + end: next_token.map(count_to_pagination_token), chunk, state, }) diff --git a/src/api/client/relations.rs b/src/api/client/relations.rs index f6d8fe9e..f2ec3f23 100644 --- a/src/api/client/relations.rs +++ b/src/api/client/relations.rs @@ -18,7 +18,7 @@ use ruma::{ events::{TimelineEventType, relation::RelationType}, }; -use super::utils::{count_to_token, parse_pagination_token as parse_token}; +use super::utils::{count_to_pagination_token, pagination_token_to_count as parse_token}; use crate::Ruma; /// # `GET /_matrix/client/r0/rooms/{roomId}/relations/{eventId}/{relType}/{eventType}` @@ -193,7 +193,7 @@ async fn paginate_relations_with_filter( | Direction::Forward => events.last(), | Direction::Backward => events.first(), } - .map(|(count, _)| count_to_token(*count)) + .map(|(count, _)| count_to_pagination_token(*count)) } else { None }; diff --git a/src/api/client/threads.rs b/src/api/client/threads.rs index ca176eda..f0fb4a64 100644 --- a/src/api/client/threads.rs +++ b/src/api/client/threads.rs @@ -9,7 +9,7 @@ use conduwuit::{ use futures::StreamExt; use ruma::{api::client::threads::get_threads, uint}; -use crate::Ruma; +use crate::{Ruma, client::utils::pagination_token_to_count}; /// # `GET /_matrix/client/r0/rooms/{roomId}/threads` pub(crate) async fn get_threads_route( @@ -27,7 +27,7 @@ pub(crate) async fn get_threads_route( let from: PduCount = body .from .as_deref() - .map(str::parse) + .map(pagination_token_to_count) .transpose()? .unwrap_or_else(PduCount::max); diff --git a/src/api/client/utils.rs b/src/api/client/utils.rs index cc941b95..ec69388a 100644 --- a/src/api/client/utils.rs +++ b/src/api/client/utils.rs @@ -1,28 +1,7 @@ -use conduwuit::{ - Result, err, - matrix::pdu::{PduCount, ShortEventId}, -}; +use conduwuit::{Result, matrix::pdu::PduCount}; -/// Parse a pagination token, trying ShortEventId first, then falling back to -/// PduCount -pub(crate) fn parse_pagination_token(token: &str) -> Result { - // Try parsing as ShortEventId first - if let Ok(shorteventid) = token.parse::() { - // ShortEventId maps directly to a PduCount in our database - Ok(PduCount::Normal(shorteventid)) - } else if let Ok(count) = token.parse::() { - // Fallback to PduCount for backwards compatibility - Ok(PduCount::Normal(count)) - } else if let Ok(count) = token.parse::() { - // Also handle negative counts for backfilled events - Ok(PduCount::from_signed(count)) - } else { - Err(err!(Request(InvalidParam("Invalid pagination token")))) - } -} +/// Parse a pagination token +pub(crate) fn pagination_token_to_count(token: &str) -> Result { token.parse() } -/// Convert a PduCount to a token string (using the underlying ShortEventId) -pub(crate) fn count_to_token(count: PduCount) -> String { - // The PduCount's unsigned value IS the ShortEventId - count.into_unsigned().to_string() -} +/// Convert a PduCount to a token string +pub(crate) fn count_to_pagination_token(count: PduCount) -> String { count.to_string() } diff --git a/src/core/matrix/pdu/count.rs b/src/core/matrix/pdu/count.rs index b880278f..7fb12574 100644 --- a/src/core/matrix/pdu/count.rs +++ b/src/core/matrix/pdu/count.rs @@ -1,6 +1,10 @@ #![allow(clippy::cast_possible_wrap, clippy::cast_sign_loss, clippy::as_conversions)] -use std::{cmp::Ordering, fmt, fmt::Display, str::FromStr}; +use std::{ + cmp::Ordering, + fmt::{self, Display}, + str::FromStr, +}; use ruma::api::Direction; diff --git a/src/service/rooms/timeline/data.rs b/src/service/rooms/timeline/data.rs index fa10a5c0..9064df6c 100644 --- a/src/service/rooms/timeline/data.rs +++ b/src/service/rooms/timeline/data.rs @@ -3,8 +3,7 @@ use std::{borrow::Borrow, sync::Arc}; use conduwuit::{ Err, PduCount, PduEvent, Result, at, err, result::{LogErr, NotFound}, - utils, - utils::stream::TryReadyExt, + utils::{self, stream::TryReadyExt}, }; use database::{Database, Deserialized, Json, KeyVal, Map}; use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, future::select_ok, pin_mut}; From e27ef7f5ec5e95c33f06c1f5d06cd16ca266cd07 Mon Sep 17 00:00:00 2001 From: Ginger Date: Thu, 4 Sep 2025 10:46:08 -0400 Subject: [PATCH 18/47] feat: Do not persist remote PDUs fetched with admin commands --- src/admin/debug/commands.rs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/admin/debug/commands.rs b/src/admin/debug/commands.rs index 81b0e9da..64f68330 100644 --- a/src/admin/debug/commands.rs +++ b/src/admin/debug/commands.rs @@ -281,15 +281,8 @@ pub(super) async fn get_remote_pdu( vec![(event_id, value, room_id)] }; - info!("Attempting to handle event ID {event_id} as backfilled PDU"); - self.services - .rooms - .timeline - .backfill_pdu(&server, response.pdu) - .await?; - let text = serde_json::to_string_pretty(&json)?; - let msg = "Got PDU from specified server and handled as backfilled"; + let msg = "Got PDU from specified server:"; write!(self, "{msg}. Event body:\n```json\n{text}\n```") }, } From 90fd92977e6dca4cad4625d8383eb9a42c0f4b4b Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Sun, 7 Sep 2025 22:08:37 +0100 Subject: [PATCH 19/47] style: Run clippy --- src/service/rooms/timeline/backfill.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/service/rooms/timeline/backfill.rs b/src/service/rooms/timeline/backfill.rs index c0171691..9dfe8f7a 100644 --- a/src/service/rooms/timeline/backfill.rs +++ b/src/service/rooms/timeline/backfill.rs @@ -229,7 +229,7 @@ pub async fn get_remote_pdu(&self, room_id: &RoomId, event_id: &EventId) -> Resu | Ok(value) => { self.services .event_handler - .handle_incoming_pdu(backfill_server, &room_id, &event_id, value, false) + .handle_incoming_pdu(backfill_server, room_id, event_id, value, false) .boxed() .await?; debug!("Successfully backfilled {event_id} from {backfill_server}"); From 1e541875ad94fcacca268515569c1cb32879b205 Mon Sep 17 00:00:00 2001 From: Ginger Date: Sun, 7 Sep 2025 18:06:11 -0400 Subject: [PATCH 20/47] fix: Nuke `src/api/client/utils.rs` --- src/api/client/message.rs | 9 ++++----- src/api/client/mod.rs | 1 - src/api/client/relations.rs | 7 +++---- src/api/client/threads.rs | 4 ++-- src/api/client/utils.rs | 7 ------- 5 files changed, 9 insertions(+), 19 deletions(-) delete mode 100644 src/api/client/utils.rs diff --git a/src/api/client/message.rs b/src/api/client/message.rs index 0145b7fe..842036f5 100644 --- a/src/api/client/message.rs +++ b/src/api/client/message.rs @@ -35,7 +35,6 @@ use ruma::{ }; use tracing::warn; -use super::utils::{count_to_pagination_token, pagination_token_to_count as parse_token}; use crate::Ruma; /// list of safe and common non-state events to ignore if the user is ignored @@ -85,14 +84,14 @@ pub(crate) async fn get_message_events_route( let from: PduCount = body .from .as_deref() - .map(parse_token) + .map(str::parse) .transpose()? .unwrap_or_else(|| match body.dir { | Direction::Forward => PduCount::min(), | Direction::Backward => PduCount::max(), }); - let to: Option = body.to.as_deref().map(parse_token).transpose()?; + let to: Option = body.to.as_deref().map(str::parse).transpose()?; let limit: usize = body .limit @@ -181,8 +180,8 @@ pub(crate) async fn get_message_events_route( .collect(); Ok(get_message_events::v3::Response { - start: count_to_pagination_token(from), - end: next_token.map(count_to_pagination_token), + start: from.to_string(), + end: next_token.as_ref().map(PduCount::to_string), chunk, state, }) diff --git a/src/api/client/mod.rs b/src/api/client/mod.rs index c8ca7757..0014282c 100644 --- a/src/api/client/mod.rs +++ b/src/api/client/mod.rs @@ -37,7 +37,6 @@ pub(super) mod typing; pub(super) mod unstable; pub(super) mod unversioned; pub(super) mod user_directory; -pub(super) mod utils; pub(super) mod voip; pub(super) mod well_known; diff --git a/src/api/client/relations.rs b/src/api/client/relations.rs index f2ec3f23..79fa1459 100644 --- a/src/api/client/relations.rs +++ b/src/api/client/relations.rs @@ -18,7 +18,6 @@ use ruma::{ events::{TimelineEventType, relation::RelationType}, }; -use super::utils::{count_to_pagination_token, pagination_token_to_count as parse_token}; use crate::Ruma; /// # `GET /_matrix/client/r0/rooms/{roomId}/relations/{eventId}/{relType}/{eventType}` @@ -111,14 +110,14 @@ async fn paginate_relations_with_filter( dir: Direction, ) -> Result { let start: PduCount = from - .map(parse_token) + .map(str::parse) .transpose()? .unwrap_or_else(|| match dir { | Direction::Forward => PduCount::min(), | Direction::Backward => PduCount::max(), }); - let to: Option = to.map(parse_token).transpose()?; + let to: Option = to.map(str::parse).transpose()?; // Use limit or else 30, with maximum 100 let limit: usize = limit @@ -193,7 +192,7 @@ async fn paginate_relations_with_filter( | Direction::Forward => events.last(), | Direction::Backward => events.first(), } - .map(|(count, _)| count_to_pagination_token(*count)) + .map(|(count, _)| count.to_string()) } else { None }; diff --git a/src/api/client/threads.rs b/src/api/client/threads.rs index f0fb4a64..ca176eda 100644 --- a/src/api/client/threads.rs +++ b/src/api/client/threads.rs @@ -9,7 +9,7 @@ use conduwuit::{ use futures::StreamExt; use ruma::{api::client::threads::get_threads, uint}; -use crate::{Ruma, client::utils::pagination_token_to_count}; +use crate::Ruma; /// # `GET /_matrix/client/r0/rooms/{roomId}/threads` pub(crate) async fn get_threads_route( @@ -27,7 +27,7 @@ pub(crate) async fn get_threads_route( let from: PduCount = body .from .as_deref() - .map(pagination_token_to_count) + .map(str::parse) .transpose()? .unwrap_or_else(PduCount::max); diff --git a/src/api/client/utils.rs b/src/api/client/utils.rs deleted file mode 100644 index ec69388a..00000000 --- a/src/api/client/utils.rs +++ /dev/null @@ -1,7 +0,0 @@ -use conduwuit::{Result, matrix::pdu::PduCount}; - -/// Parse a pagination token -pub(crate) fn pagination_token_to_count(token: &str) -> Result { token.parse() } - -/// Convert a PduCount to a token string -pub(crate) fn count_to_pagination_token(count: PduCount) -> String { count.to_string() } From 5d3e10a0481f4857cc706068c0d0c04d44ee9105 Mon Sep 17 00:00:00 2001 From: Ginger Date: Sun, 7 Sep 2025 18:07:03 -0400 Subject: [PATCH 21/47] fix: Make RA use the `full` feature --- .vscode/settings.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index a4fad964..82162ff7 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -7,5 +7,6 @@ "continuwuity", "homeserver", "homeservers" - ] + ], + "rust-analyzer.cargo.features": ["full"] } From 0abfc192d45dabd1725d27947995e020aa4d500b Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Fri, 15 Aug 2025 04:10:40 +0100 Subject: [PATCH 22/47] fix(fed): Improve transaction flushing --- src/service/federation/execute.rs | 6 +++--- src/service/sending/sender.rs | 30 ++++++++++++++++++++++++++---- 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/src/service/federation/execute.rs b/src/service/federation/execute.rs index 1d1d1154..8be1befa 100644 --- a/src/service/federation/execute.rs +++ b/src/service/federation/execute.rs @@ -3,7 +3,7 @@ use std::{fmt::Debug, mem}; use bytes::Bytes; use conduwuit::{ Err, Error, Result, debug, debug::INFO_SPAN_LEVEL, debug_error, debug_warn, err, - error::inspect_debug_log, implement, trace, utils::string::EMPTY, + error::inspect_debug_log, implement, trace, utils::string::EMPTY, warn, }; use http::{HeaderValue, header::AUTHORIZATION}; use ipaddress::IPAddress; @@ -193,7 +193,7 @@ fn handle_error( ) -> Result { if e.is_timeout() || e.is_connect() { e = e.without_url(); - debug_warn!("{e:?}"); + debug_warn!(?url, "network error while sending request: {e:?}"); } else if e.is_redirect() { debug_error!( method = ?method, @@ -204,7 +204,7 @@ fn handle_error( e, ); } else { - debug_error!("{e:?}"); + warn!(?url, "failed to send federation request: {e:?}"); } Err(e.into()) diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index a708f746..7af4b533 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -10,7 +10,7 @@ use std::{ use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD}; use conduwuit_core::{ - Error, Event, Result, debug, err, error, + Error, Event, Result, debug, err, error, info, result::LogErr, trace, utils::{ @@ -142,7 +142,7 @@ impl Service { } fn handle_response_err(dest: Destination, statuses: &mut CurTransactionStatus, e: &Error) { - debug!(dest = ?dest, "{e:?}"); + debug!(dest = ?dest, "error response: {e:?}"); statuses.entry(dest).and_modify(|e| { *e = match e { | TransactionStatus::Running => TransactionStatus::Failed(1, Instant::now()), @@ -177,7 +177,21 @@ impl Service { if !new_events.is_empty() { self.db.mark_as_active(new_events.iter()); - let new_events_vec = new_events.into_iter().map(|(_, event)| event).collect(); + let new_events_vec: Vec = + new_events.into_iter().map(|(_, event)| event).collect(); + + if let Some(status) = statuses.get(&dest.clone()) { + if matches!(status, TransactionStatus::Running) { + // If the server is in backoff, clear it + warn!( + ?dest, + "Catching up destination with {} new events", + new_events_vec.len() + ); + statuses.insert(dest.clone(), TransactionStatus::Running); + } + } + futures.push(self.send_events(dest.clone(), new_events_vec)); } else { statuses.remove(dest); @@ -859,12 +873,20 @@ impl Service { pdus, edus, }; + let pdu_count = request.pdus.len(); + let edu_count = request.edus.len(); let result = self .services .federation .execute_on(&self.services.client.sender, &server, request) - .await; + .await + .inspect(|_| { + info!(%txn_id, %server, "Sent {} PDUs, {} EDUs", pdu_count, edu_count); + }) + .inspect_err(|e| { + error!(%txn_id, %server, "Failed to send transaction ({} PDUs, {} EDUs): {e:?}", pdu_count, edu_count); + }); for (event_id, result) in result.iter().flat_map(|resp| resp.pdus.iter()) { if let Err(e) = result { From e991a10de23334714624058bad7df68f9f604115 Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Fri, 15 Aug 2025 04:20:03 +0100 Subject: [PATCH 23/47] fix(fed): Alter log levels to be less noisy --- src/service/sending/sender.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index 7af4b533..5498afc0 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -183,9 +183,9 @@ impl Service { if let Some(status) = statuses.get(&dest.clone()) { if matches!(status, TransactionStatus::Running) { // If the server is in backoff, clear it - warn!( + info!( ?dest, - "Catching up destination with {} new events", + "Catching up previously failed destination with {}+ new events", new_events_vec.len() ); statuses.insert(dest.clone(), TransactionStatus::Running); @@ -885,7 +885,7 @@ impl Service { info!(%txn_id, %server, "Sent {} PDUs, {} EDUs", pdu_count, edu_count); }) .inspect_err(|e| { - error!(%txn_id, %server, "Failed to send transaction ({} PDUs, {} EDUs): {e:?}", pdu_count, edu_count); + info!(%txn_id, %server, "Failed to send transaction ({} PDUs, {} EDUs): {e:?}", pdu_count, edu_count); }); for (event_id, result) in result.iter().flat_map(|resp| resp.pdus.iter()) { From b45b630af73241af2d97a6c9163a85e035c47b89 Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Wed, 20 Aug 2025 00:56:55 +0100 Subject: [PATCH 24/47] feat(fed): Something about nicer fed errors --- src/service/federation/execute.rs | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/src/service/federation/execute.rs b/src/service/federation/execute.rs index 8be1befa..ace29c1c 100644 --- a/src/service/federation/execute.rs +++ b/src/service/federation/execute.rs @@ -1,4 +1,8 @@ -use std::{fmt::Debug, mem}; +use std::{ + error::Error as _, + fmt::{Debug, Write}, + mem, +}; use bytes::Bytes; use conduwuit::{ @@ -193,9 +197,9 @@ fn handle_error( ) -> Result { if e.is_timeout() || e.is_connect() { e = e.without_url(); - debug_warn!(?url, "network error while sending request: {e:?}"); + warn!(?url, "network error while sending federation request: {e:?}"); } else if e.is_redirect() { - debug_error!( + warn!( method = ?method, url = ?url, final_url = ?e.url(), @@ -207,6 +211,14 @@ fn handle_error( warn!(?url, "failed to send federation request: {e:?}"); } + let mut nice_error = "Request failed".to_owned(); + let mut src = e.source(); + while let Some(source) = src { + write!(nice_error, ": {source:?}").expect("writing to string should not fail"); + src = source.source(); + } + warn!(nice_error, "Federation request error"); + Err(e.into()) } From 8584116555e28e03cd6a86e187fddabcbac0f82b Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Mon, 25 Aug 2025 21:26:28 +0100 Subject: [PATCH 25/47] feat(fed): Handle EDUs before PDUs Aranje needs his crypto keys --- src/api/server/send.rs | 46 +++++++++++++++++++++++++++++++----------- 1 file changed, 34 insertions(+), 12 deletions(-) diff --git a/src/api/server/send.rs b/src/api/server/send.rs index 9c5bfd2b..86832bc6 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -5,7 +5,7 @@ use axum_client_ip::InsecureClientIp; use conduwuit::{ Err, Error, Result, debug, debug::INFO_SPAN_LEVEL, - debug_warn, err, error, + debug_warn, err, error, info, result::LogErr, trace, utils::{ @@ -79,13 +79,11 @@ pub(crate) async fn send_transaction_message_route( } let txn_start_time = Instant::now(); - trace!( + info!( pdus = body.pdus.len(), edus = body.edus.len(), - elapsed = ?txn_start_time.elapsed(), id = ?body.transaction_id, - origin =?body.origin(), - "Starting txn", + "Processing transaction", ); let pdus = body @@ -104,14 +102,21 @@ pub(crate) async fn send_transaction_message_route( .filter_map(Result::ok) .stream(); - let results = handle(&services, &client, body.origin(), txn_start_time, pdus, edus).await?; - - debug!( + info!( + pdus = body.pdus.len(), + edus = body.edus.len(), + elapsed = ?txn_start_time.elapsed(), + id = ?body.transaction_id, + "Validated transaction", + ); + + let results = handle(&services, &client, body.origin(), txn_start_time, pdus, edus).await?; + + info!( pdus = body.pdus.len(), edus = body.edus.len(), elapsed = ?txn_start_time.elapsed(), id = ?body.transaction_id, - origin =?body.origin(), "Finished txn", ); for (id, result) in &results { @@ -154,7 +159,8 @@ async fn handle( .into_iter() .try_stream() .broad_and_then(|(room_id, pdus): (_, Vec<_>)| { - handle_room(services, client, origin, started, room_id, pdus.into_iter()) + let count = pdus.len(); + handle_room(services, client, origin, started, room_id, pdus.into_iter(), count) .map_ok(Vec::into_iter) .map_ok(IterStream::try_stream) }) @@ -178,6 +184,7 @@ async fn handle_room( txn_start_time: Instant, room_id: OwnedRoomId, pdus: impl Iterator + Send, + count: usize, ) -> Result> { let _room_lock = services .rooms @@ -187,10 +194,20 @@ async fn handle_room( .await; let room_id = &room_id; + let mut n = 0; pdus.try_stream() .and_then(|(_, event_id, value)| async move { services.server.check_running()?; let pdu_start_time = Instant::now(); + info!( + %room_id, + %event_id, + pdu = n + 1, + total = count, + pdu_elapsed = ?pdu_start_time.elapsed(), + txn_elapsed = ?txn_start_time.elapsed(), + "Handling PDU", + ); let result = services .rooms .event_handler @@ -198,11 +215,16 @@ async fn handle_room( .await .map(|_| ()); - debug!( + info!( + %room_id, + %event_id, + pdu = n + 1, + total = count, pdu_elapsed = ?pdu_start_time.elapsed(), txn_elapsed = ?txn_start_time.elapsed(), - "Finished PDU {event_id}", + "Finished handling PDU {event_id}", ); + n += 1; Ok((event_id, result)) }) From 6996985a06510cd4c0d4c9fa7feea6471a9cb651 Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Mon, 25 Aug 2025 21:26:56 +0100 Subject: [PATCH 26/47] feat(fed): Handle EDUs before PDUs Aranje needs his crypto keys --- src/api/server/send.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/api/server/send.rs b/src/api/server/send.rs index 86832bc6..e7e4e9fd 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -143,6 +143,10 @@ async fn handle( pdus: impl Stream + Send, edus: impl Stream + Send, ) -> Result { + edus.for_each_concurrent(automatic_width(), |edu| handle_edu(services, client, origin, edu)) + .boxed() + .await; + // group pdus by room let pdus = pdus .collect() @@ -169,11 +173,6 @@ async fn handle( .boxed() .await?; - // evaluate edus after pdus, at least for now. - edus.for_each_concurrent(automatic_width(), |edu| handle_edu(services, client, origin, edu)) - .boxed() - .await; - Ok(results) } From bcca10b50415b4cc77cad8c11c78e265f44ac40b Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Wed, 14 May 2025 06:53:00 -0700 Subject: [PATCH 27/47] bump the number of allowed immutable memtables by 1, to allow for greater flood protection this should probably not be applied if you have rocksdb_atomic_flush = false (the default) --- src/database/engine/cf_opts.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/database/engine/cf_opts.rs b/src/database/engine/cf_opts.rs index 58358f02..b8d0b307 100644 --- a/src/database/engine/cf_opts.rs +++ b/src/database/engine/cf_opts.rs @@ -29,7 +29,7 @@ fn descriptor_cf_options( set_table_options(&mut opts, &desc, cache)?; opts.set_min_write_buffer_number(1); - opts.set_max_write_buffer_number(2); + opts.set_max_write_buffer_number(3); opts.set_write_buffer_size(desc.write_size); opts.set_target_file_size_base(desc.file_size); From 60d403f3d7ec7d4aa305004da07b0326809be872 Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Sat, 21 Jun 2025 08:02:05 -0700 Subject: [PATCH 28/47] upgrade some settings to enable 5g in continuwuity enable converged 6g at the edge in continuwuity better stateinfo_cache_capacity default better roomid_spacehierarchy_cache_capacity make sender workers default better and clamp value to core count update sender workers documentation add more parallelism_scaled and make them public update 1 document --- conduwuit-example.toml | 6 +-- src/core/config/mod.rs | 75 +++++++++++++++++++------------------- src/service/sending/mod.rs | 12 ++---- 3 files changed, 43 insertions(+), 50 deletions(-) diff --git a/conduwuit-example.toml b/conduwuit-example.toml index 07374aae..b7a3fa07 100644 --- a/conduwuit-example.toml +++ b/conduwuit-example.toml @@ -1688,11 +1688,9 @@ #stream_amplification = 1024 # Number of sender task workers; determines sender parallelism. Default is -# '0' which means the value is determined internally, likely matching the -# number of tokio worker-threads or number of cores, etc. Override by -# setting a non-zero value. +# number of CPU cores. Override by setting a different value. # -#sender_workers = 0 +#sender_workers = 4 # Enables listener sockets; can be set to false to disable listening. This # option is intended for developer/diagnostic purposes only. diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index b6f6ab53..264c7c6b 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -1929,12 +1929,10 @@ pub struct Config { pub stream_amplification: usize, /// Number of sender task workers; determines sender parallelism. Default is - /// '0' which means the value is determined internally, likely matching the - /// number of tokio worker-threads or number of cores, etc. Override by - /// setting a non-zero value. + /// '4'. Override by setting a different value. Values clamped 1 to core count. /// - /// default: 0 - #[serde(default)] + /// default: 4 + #[serde(default = "default_sender_workers")] pub sender_workers: usize, /// Enables listener sockets; can be set to false to disable listening. This @@ -2277,45 +2275,47 @@ fn default_database_backups_to_keep() -> i16 { 1 } fn default_db_write_buffer_capacity_mb() -> f64 { 48.0 + parallelism_scaled_f64(4.0) } -fn default_db_cache_capacity_mb() -> f64 { 128.0 + parallelism_scaled_f64(64.0) } +fn default_db_cache_capacity_mb() -> f64 { 512.0 + parallelism_scaled_f64(512.0) } -fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(10_000).saturating_add(100_000) } +fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(50_000).saturating_add(500_000) } fn default_cache_capacity_modifier() -> f64 { 1.0 } fn default_auth_chain_cache_capacity() -> u32 { - parallelism_scaled_u32(10_000).saturating_add(100_000) + parallelism_scaled_u32(50_000).saturating_add(500_000) } fn default_shorteventid_cache_capacity() -> u32 { - parallelism_scaled_u32(50_000).saturating_add(100_000) -} - -fn default_eventidshort_cache_capacity() -> u32 { - parallelism_scaled_u32(25_000).saturating_add(100_000) -} - -fn default_eventid_pdu_cache_capacity() -> u32 { - parallelism_scaled_u32(25_000).saturating_add(100_000) -} - -fn default_shortstatekey_cache_capacity() -> u32 { - parallelism_scaled_u32(10_000).saturating_add(100_000) -} - -fn default_statekeyshort_cache_capacity() -> u32 { - parallelism_scaled_u32(10_000).saturating_add(100_000) -} - -fn default_servernameevent_data_cache_capacity() -> u32 { parallelism_scaled_u32(100_000).saturating_add(500_000) } -fn default_stateinfo_cache_capacity() -> u32 { parallelism_scaled_u32(100) } +fn default_eventidshort_cache_capacity() -> u32 { + parallelism_scaled_u32(100_000).saturating_add(500_000) +} -fn default_roomid_spacehierarchy_cache_capacity() -> u32 { parallelism_scaled_u32(1000) } +fn default_eventid_pdu_cache_capacity() -> u32 { + parallelism_scaled_u32(50_000).saturating_add(500_000) +} -fn default_dns_cache_entries() -> u32 { 32768 } +fn default_shortstatekey_cache_capacity() -> u32 { + parallelism_scaled_u32(50_000).saturating_add(500_000) +} + +fn default_statekeyshort_cache_capacity() -> u32 { + parallelism_scaled_u32(50_000).saturating_add(500_000) +} + +fn default_servernameevent_data_cache_capacity() -> u32 { + parallelism_scaled_u32(200_000).saturating_add(500_000) +} + +fn default_stateinfo_cache_capacity() -> u32 { + parallelism_scaled_u32(500).clamp(100, 12000) } + +fn default_roomid_spacehierarchy_cache_capacity() -> u32 { + parallelism_scaled_u32(500).clamp(100, 12000) } + +fn default_dns_cache_entries() -> u32 { 327680 } fn default_dns_min_ttl() -> u64 { 60 * 180 } @@ -2508,14 +2508,13 @@ fn default_admin_log_capture() -> String { fn default_admin_room_tag() -> String { "m.server_notice".to_owned() } #[allow(clippy::as_conversions, clippy::cast_precision_loss)] -fn parallelism_scaled_f64(val: f64) -> f64 { val * (sys::available_parallelism() as f64) } +pub fn parallelism_scaled_f64(val: f64) -> f64 { val * (sys::available_parallelism() as f64) } -fn parallelism_scaled_u32(val: u32) -> u32 { - let val = val.try_into().expect("failed to cast u32 to usize"); - parallelism_scaled(val).try_into().unwrap_or(u32::MAX) -} +pub fn parallelism_scaled_u32(val: u32) -> u32 { val.saturating_mul(sys::available_parallelism() as u32) } -fn parallelism_scaled(val: usize) -> usize { val.saturating_mul(sys::available_parallelism()) } +pub fn parallelism_scaled_i32(val: i32) -> i32 { val.saturating_mul(sys::available_parallelism() as i32) } + +pub fn parallelism_scaled(val: usize) -> usize { val.saturating_mul(sys::available_parallelism()) } fn default_trusted_server_batch_size() -> usize { 256 } @@ -2535,6 +2534,8 @@ fn default_stream_width_scale() -> f32 { 1.0 } fn default_stream_amplification() -> usize { 1024 } +fn default_sender_workers() -> usize { 4 } + fn default_client_receive_timeout() -> u64 { 75 } fn default_client_request_timeout() -> u64 { 180 } diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index 08ca7010..ce687551 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -401,16 +401,10 @@ impl Service { fn num_senders(args: &crate::Args<'_>) -> usize { const MIN_SENDERS: usize = 1; - // Limit the number of senders to the number of workers threads or number of - // cores, conservatively. - let max_senders = args - .server - .metrics - .num_workers() - .min(available_parallelism()); + // Limit the maximum number of senders to the number of cores. + let max_senders = available_parallelism(); - // If the user doesn't override the default 0, this is intended to then default - // to 1 for now as multiple senders is experimental. + // default is 4 senders. clamp between 1 and core count. args.server .config .sender_workers From ffac6d81553152f63d0ed8484a312f102dba4abe Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Sat, 7 Jun 2025 00:46:55 +0100 Subject: [PATCH 29/47] fix an auth rule not applying correctly --- src/core/matrix/state_res/event_auth.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/core/matrix/state_res/event_auth.rs b/src/core/matrix/state_res/event_auth.rs index 77a4a95c..ca266c5c 100644 --- a/src/core/matrix/state_res/event_auth.rs +++ b/src/core/matrix/state_res/event_auth.rs @@ -256,6 +256,16 @@ where }, | Some(e) => e, }; + // just re-check 1.2 to work around a bug + let Some(room_id_server_name) = incoming_event.room_id().server_name() else { + warn!("room ID has no servername"); + return Ok(false); + }; + + if room_id_server_name != sender.server_name() { + warn!("servername of room ID does not match servername of m.room.create sender"); + return Ok(false); + } if incoming_event.room_id() != room_create_event.room_id() { warn!("room_id of incoming event does not match room_id of m.room.create event"); From b157c02fec7be1bf56b1ed10650e4dc0ef8ff8c8 Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Sat, 7 Jun 2025 00:55:03 +0100 Subject: [PATCH 30/47] Note about ruma#2064 in TODO --- src/core/matrix/state_res/event_auth.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/core/matrix/state_res/event_auth.rs b/src/core/matrix/state_res/event_auth.rs index ca266c5c..40c32e03 100644 --- a/src/core/matrix/state_res/event_auth.rs +++ b/src/core/matrix/state_res/event_auth.rs @@ -149,8 +149,8 @@ where for<'a> &'a E: Event + Send, { debug!( - event_id = %incoming_event.event_id(), - event_type = ?incoming_event.event_type(), + event_id = format!("{}", incoming_event.event_id()), + event_type = format!("{}", incoming_event.event_type()), "auth_check beginning" ); @@ -219,7 +219,7 @@ where /* // TODO: In the past this code was commented as it caused problems with Synapse. This is no // longer the case. This needs to be implemented. - // See also: https://github.com/ruma/ruma/pull/2064 + // See also: https://github.com/ruma/ruma/pull/2064 // // 2. Reject if auth_events // a. auth_events cannot have duplicate keys since it's a BTree From 5dcd825132bda4ea755e263c96dc40293b057d18 Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Tue, 10 Jun 2025 22:33:31 +0100 Subject: [PATCH 31/47] Kick up a fuss when m.room.create is unfindable --- src/core/matrix/state_res/event_auth.rs | 4 ++-- src/core/matrix/state_res/mod.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/core/matrix/state_res/event_auth.rs b/src/core/matrix/state_res/event_auth.rs index 40c32e03..31c660ed 100644 --- a/src/core/matrix/state_res/event_auth.rs +++ b/src/core/matrix/state_res/event_auth.rs @@ -30,7 +30,7 @@ use super::{ }, room_version::RoomVersion, }; -use crate::{debug, error, trace, warn}; +use crate::{debug, err_log, error, trace, warn}; // FIXME: field extracting could be bundled for `content` #[derive(Deserialize)] @@ -251,7 +251,7 @@ where let room_create_event = match room_create_event { | None => { - warn!("no m.room.create event in auth chain"); + error!("no m.room.create event in auth chain for {}!", incoming_event.event_id()); return Ok(false); }, | Some(e) => e, diff --git a/src/core/matrix/state_res/mod.rs b/src/core/matrix/state_res/mod.rs index ce9d9276..e721e14c 100644 --- a/src/core/matrix/state_res/mod.rs +++ b/src/core/matrix/state_res/mod.rs @@ -753,7 +753,7 @@ where } } } - // Did not find a power level event so we default to zero + warn!("could not find a power event in the mainline map, defaulting to zero depth"); Ok(0) } From e3b632529bfca61507927a62c932b52e464280b0 Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Tue, 10 Jun 2025 23:00:09 +0100 Subject: [PATCH 32/47] Fix room ID check --- src/core/matrix/state_res/event_auth.rs | 11 +++++++---- src/service/rooms/event_handler/handle_outlier_pdu.rs | 5 +---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/core/matrix/state_res/event_auth.rs b/src/core/matrix/state_res/event_auth.rs index 31c660ed..de4d20e1 100644 --- a/src/core/matrix/state_res/event_auth.rs +++ b/src/core/matrix/state_res/event_auth.rs @@ -30,7 +30,7 @@ use super::{ }, room_version::RoomVersion, }; -use crate::{debug, err_log, error, trace, warn}; +use crate::{debug, error, trace, warn}; // FIXME: field extracting could be bundled for `content` #[derive(Deserialize)] @@ -251,7 +251,7 @@ where let room_create_event = match room_create_event { | None => { - error!("no m.room.create event in auth chain for {}!", incoming_event.event_id()); + error!("no m.room.create event found for {}!", incoming_event.event_id()); return Ok(false); }, | Some(e) => e, @@ -262,8 +262,11 @@ where return Ok(false); }; - if room_id_server_name != sender.server_name() { - warn!("servername of room ID does not match servername of m.room.create sender"); + if room_id_server_name != room_create_event.sender().server_name() { + warn!( + "servername of room ID origin ({}) does not match servername of m.room.create sender ({})", + room_id_server_name, + room_create_event.sender().server_name()); return Ok(false); } diff --git a/src/service/rooms/event_handler/handle_outlier_pdu.rs b/src/service/rooms/event_handler/handle_outlier_pdu.rs index d79eed77..fad9ac74 100644 --- a/src/service/rooms/event_handler/handle_outlier_pdu.rs +++ b/src/service/rooms/event_handler/handle_outlier_pdu.rs @@ -122,10 +122,7 @@ where } // The original create event must be in the auth events - if !matches!( - auth_events.get(&(StateEventType::RoomCreate, String::new().into())), - Some(_) | None - ) { + if !auth_events.contains_key(&(StateEventType::RoomCreate, String::new().into())) { return Err!(Request(InvalidParam("Incoming event refers to wrong create event."))); } From 182a2d5ed41e50bb93a3b327963238a3bfc3b01d Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Wed, 11 Jun 2025 01:27:25 +0100 Subject: [PATCH 33/47] more logs --- src/core/matrix/state_res/event_auth.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/core/matrix/state_res/event_auth.rs b/src/core/matrix/state_res/event_auth.rs index de4d20e1..fc1119de 100644 --- a/src/core/matrix/state_res/event_auth.rs +++ b/src/core/matrix/state_res/event_auth.rs @@ -13,6 +13,7 @@ use ruma::{ power_levels::RoomPowerLevelsEventContent, third_party_invite::RoomThirdPartyInviteEventContent, }, + EventId, int, serde::{Base64, Raw}, }; @@ -21,7 +22,6 @@ use serde::{ de::{Error as _, IgnoredAny}, }; use serde_json::{from_str as from_json_str, value::RawValue as RawJsonValue}; - use super::{ Error, Event, Result, StateEventType, StateKey, TimelineEventType, power_levels::{ @@ -251,7 +251,14 @@ where let room_create_event = match room_create_event { | None => { - error!("no m.room.create event found for {}!", incoming_event.event_id()); + error!( + create_event = room_create_event.as_ref().map(Event::event_id).unwrap_or(<&EventId>::try_from("$unknown").unwrap()).as_str(), + power_levels = power_levels_event.as_ref().map(Event::event_id).unwrap_or(<&EventId>::try_from("$unknown").unwrap()).as_str(), + member_event = sender_member_event.as_ref().map(Event::event_id).unwrap_or(<&EventId>::try_from("$unknown").unwrap()).as_str(), + "no m.room.create event found for {} ({})!", + incoming_event.event_id().as_str(), + incoming_event.room_id().as_str() + ); return Ok(false); }, | Some(e) => e, From f9af07fb9d70e34fa1ab50682f1eb43f736fea5b Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Wed, 11 Jun 2025 01:42:19 +0100 Subject: [PATCH 34/47] log which room struggled to get mainline depth --- src/core/matrix/state_res/mod.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/core/matrix/state_res/mod.rs b/src/core/matrix/state_res/mod.rs index e721e14c..ba9c013d 100644 --- a/src/core/matrix/state_res/mod.rs +++ b/src/core/matrix/state_res/mod.rs @@ -733,8 +733,12 @@ where Fut: Future> + Send, E: Event + Send + Sync, { + let mut room_id = None; while let Some(sort_ev) = event { debug!(event_id = sort_ev.event_id().as_str(), "mainline"); + if room_id.is_none() { + room_id = Some(sort_ev.room_id().to_owned()); + } let id = sort_ev.event_id(); if let Some(depth) = mainline_map.get(id) { @@ -753,7 +757,7 @@ where } } } - warn!("could not find a power event in the mainline map, defaulting to zero depth"); + warn!("could not find a power event in the mainline map for {room_id:?}, defaulting to zero depth"); Ok(0) } From dc9776bc1b4f5951850e8730ed8dcb68ea7b95b5 Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Thu, 3 Jul 2025 14:44:27 -0700 Subject: [PATCH 35/47] vehicle loan documentation now available at window 7 --- src/service/rooms/event_handler/upgrade_outlier_pdu.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index d2e0623c..31492e9e 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -6,6 +6,7 @@ use conduwuit::{ trace, utils::stream::{BroadbandExt, ReadyExt}, warn, + info }; use futures::{FutureExt, StreamExt, future::ready}; use ruma::{CanonicalJsonValue, RoomId, ServerName, events::StateEventType}; @@ -193,6 +194,8 @@ where .collect() .await; + if extremities.len() == 0 { info!("Retained zero extremities when upgrading outlier PDU to timeline PDU with {} previous events, event id: {}", incoming_pdu.prev_events.len(), incoming_pdu.event_id) } + debug!( "Retained {} extremities checked against {} prev_events", extremities.len(), From 785ba4eaada7bcc20428128803841d8e75682885 Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Sat, 21 Jun 2025 08:13:30 -0700 Subject: [PATCH 36/47] sender_workers scaling. this time, with feeling! --- src/core/config/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index 264c7c6b..d9fb462b 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -1929,9 +1929,9 @@ pub struct Config { pub stream_amplification: usize, /// Number of sender task workers; determines sender parallelism. Default is - /// '4'. Override by setting a different value. Values clamped 1 to core count. + /// core count. Override by setting a different value. /// - /// default: 4 + /// default: core count #[serde(default = "default_sender_workers")] pub sender_workers: usize, @@ -2534,7 +2534,7 @@ fn default_stream_width_scale() -> f32 { 1.0 } fn default_stream_amplification() -> usize { 1024 } -fn default_sender_workers() -> usize { 4 } +fn default_sender_workers() -> usize { parallelism_scaled(1) } fn default_client_receive_timeout() -> u64 { 75 } From 64fb278133662b248ba558330417f21dbc203d89 Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Mon, 30 Jun 2025 15:25:11 -0700 Subject: [PATCH 37/47] more funny settings (part 3 of 12) --- src/core/config/mod.rs | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index d9fb462b..0d3e80b1 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -2277,40 +2277,41 @@ fn default_db_write_buffer_capacity_mb() -> f64 { 48.0 + parallelism_scaled_f64( fn default_db_cache_capacity_mb() -> f64 { 512.0 + parallelism_scaled_f64(512.0) } -fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(50_000).saturating_add(500_000) } +fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(50_000).saturating_add(100_000) } fn default_cache_capacity_modifier() -> f64 { 1.0 } fn default_auth_chain_cache_capacity() -> u32 { - parallelism_scaled_u32(50_000).saturating_add(500_000) + parallelism_scaled_u32(50_000).saturating_add(100_000) } fn default_shorteventid_cache_capacity() -> u32 { - parallelism_scaled_u32(100_000).saturating_add(500_000) + parallelism_scaled_u32(100_000).saturating_add(100_000) } fn default_eventidshort_cache_capacity() -> u32 { - parallelism_scaled_u32(100_000).saturating_add(500_000) + parallelism_scaled_u32(50_000).saturating_add(100_000) } fn default_eventid_pdu_cache_capacity() -> u32 { - parallelism_scaled_u32(50_000).saturating_add(500_000) + parallelism_scaled_u32(50_000).saturating_add(100_000) } fn default_shortstatekey_cache_capacity() -> u32 { - parallelism_scaled_u32(50_000).saturating_add(500_000) + parallelism_scaled_u32(50_000).saturating_add(100_000) } fn default_statekeyshort_cache_capacity() -> u32 { - parallelism_scaled_u32(50_000).saturating_add(500_000) + parallelism_scaled_u32(50_000).saturating_add(100_000) } fn default_servernameevent_data_cache_capacity() -> u32 { - parallelism_scaled_u32(200_000).saturating_add(500_000) + parallelism_scaled_u32(100_000).saturating_add(100_000) } fn default_stateinfo_cache_capacity() -> u32 { - parallelism_scaled_u32(500).clamp(100, 12000) } + parallelism_scaled_u32(500).clamp(100, 12000) +} fn default_roomid_spacehierarchy_cache_capacity() -> u32 { parallelism_scaled_u32(500).clamp(100, 12000) } From f02e8902924cc44e601d10261da54c5f58d9e45a Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Mon, 21 Jul 2025 17:22:19 -0700 Subject: [PATCH 38/47] fix too many infos --- src/service/rooms/event_handler/upgrade_outlier_pdu.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index 31492e9e..4a64c017 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -5,8 +5,7 @@ use conduwuit::{ matrix::{Event, EventTypeExt, PduEvent, StateKey, state_res}, trace, utils::stream::{BroadbandExt, ReadyExt}, - warn, - info + warn }; use futures::{FutureExt, StreamExt, future::ready}; use ruma::{CanonicalJsonValue, RoomId, ServerName, events::StateEventType}; From abe234537ffd77e40f941553270329ae04b61a30 Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Sat, 2 Aug 2025 20:59:02 -0700 Subject: [PATCH 39/47] exponential backoff is now just bees. did you want bees? no? well you have them now. congrats --- src/service/presence/mod.rs | 2 +- src/service/resolver/dns.rs | 4 ++-- src/service/rooms/event_handler/fetch_and_handle_outliers.rs | 2 +- src/service/rooms/event_handler/handle_prev_pdu.rs | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/service/presence/mod.rs b/src/service/presence/mod.rs index e7ce64bc..1de99541 100644 --- a/src/service/presence/mod.rs +++ b/src/service/presence/mod.rs @@ -100,7 +100,7 @@ impl Service { /// Pings the presence of the given user in the given room, setting the /// specified state. pub async fn ping_presence(&self, user_id: &UserId, new_state: &PresenceState) -> Result<()> { - const REFRESH_TIMEOUT: u64 = 60 * 1000; + const REFRESH_TIMEOUT: u64 = 60 * 1000 * 4; let last_presence = self.db.get_presence(user_id).await; let state_changed = match last_presence { diff --git a/src/service/resolver/dns.rs b/src/service/resolver/dns.rs index 3a0b2551..6856853e 100644 --- a/src/service/resolver/dns.rs +++ b/src/service/resolver/dns.rs @@ -53,9 +53,9 @@ impl Resolver { opts.cache_size = config.dns_cache_entries as usize; opts.preserve_intermediates = true; opts.negative_min_ttl = Some(Duration::from_secs(config.dns_min_ttl_nxdomain)); - opts.negative_max_ttl = Some(Duration::from_secs(60 * 60 * 24 * 30)); + opts.negative_max_ttl = Some(Duration::from_secs(60 * 60 * 24)); opts.positive_min_ttl = Some(Duration::from_secs(config.dns_min_ttl)); - opts.positive_max_ttl = Some(Duration::from_secs(60 * 60 * 24 * 7)); + opts.positive_max_ttl = Some(Duration::from_secs(60 * 60 * 24)); opts.timeout = Duration::from_secs(config.dns_timeout); opts.attempts = config.dns_attempts as usize; opts.try_tcp_on_error = config.dns_tcp_fallback; diff --git a/src/service/rooms/event_handler/fetch_and_handle_outliers.rs b/src/service/rooms/event_handler/fetch_and_handle_outliers.rs index 59b768f2..c72cce87 100644 --- a/src/service/rooms/event_handler/fetch_and_handle_outliers.rs +++ b/src/service/rooms/event_handler/fetch_and_handle_outliers.rs @@ -79,7 +79,7 @@ where { // Exponential backoff const MIN_DURATION: u64 = 60 * 2; - const MAX_DURATION: u64 = 60 * 60 * 8; + const MAX_DURATION: u64 = 60 * 60; if continue_exponential_backoff_secs( MIN_DURATION, MAX_DURATION, diff --git a/src/service/rooms/event_handler/handle_prev_pdu.rs b/src/service/rooms/event_handler/handle_prev_pdu.rs index cb4978d9..c786599a 100644 --- a/src/service/rooms/event_handler/handle_prev_pdu.rs +++ b/src/service/rooms/event_handler/handle_prev_pdu.rs @@ -46,7 +46,7 @@ where { // Exponential backoff const MIN_DURATION: u64 = 5 * 60; - const MAX_DURATION: u64 = 60 * 60 * 24; + const MAX_DURATION: u64 = 60 * 60; if continue_exponential_backoff_secs(MIN_DURATION, MAX_DURATION, time.elapsed(), *tries) { debug!( ?tries, From 78ade5aa683d8d3d63a512b63a9d4be59fb99c24 Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Thu, 21 Aug 2025 12:19:53 -0700 Subject: [PATCH 40/47] fix warn by removing unused debug imports --- src/service/federation/execute.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/service/federation/execute.rs b/src/service/federation/execute.rs index ace29c1c..9a3cd828 100644 --- a/src/service/federation/execute.rs +++ b/src/service/federation/execute.rs @@ -6,7 +6,7 @@ use std::{ use bytes::Bytes; use conduwuit::{ - Err, Error, Result, debug, debug::INFO_SPAN_LEVEL, debug_error, debug_warn, err, + Err, Error, Result, debug, debug::INFO_SPAN_LEVEL, err, error::inspect_debug_log, implement, trace, utils::string::EMPTY, warn, }; use http::{HeaderValue, header::AUTHORIZATION}; From 25b0d43b9cdc55a6d62ea44ac5ed2d96ab464fff Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Tue, 26 Aug 2025 18:36:39 -0700 Subject: [PATCH 41/47] fix warn by removing unused debug imports delete more imports to quiet cargo --- src/api/server/send.rs | 2 +- src/service/federation/execute.rs | 13 +++++-------- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/src/api/server/send.rs b/src/api/server/send.rs index e7e4e9fd..385477d1 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -3,7 +3,7 @@ use std::{collections::BTreeMap, net::IpAddr, time::Instant}; use axum::extract::State; use axum_client_ip::InsecureClientIp; use conduwuit::{ - Err, Error, Result, debug, + Err, Error, Result, debug::INFO_SPAN_LEVEL, debug_warn, err, error, info, result::LogErr, diff --git a/src/service/federation/execute.rs b/src/service/federation/execute.rs index 9a3cd828..5fe10ec9 100644 --- a/src/service/federation/execute.rs +++ b/src/service/federation/execute.rs @@ -5,10 +5,7 @@ use std::{ }; use bytes::Bytes; -use conduwuit::{ - Err, Error, Result, debug, debug::INFO_SPAN_LEVEL, err, - error::inspect_debug_log, implement, trace, utils::string::EMPTY, warn, -}; +use conduwuit::{Err, Error, Result, debug, debug::INFO_SPAN_LEVEL, err, error::inspect_debug_log, implement, trace, utils::string::EMPTY, info}; use http::{HeaderValue, header::AUTHORIZATION}; use ipaddress::IPAddress; use reqwest::{Client, Method, Request, Response, Url}; @@ -197,9 +194,9 @@ fn handle_error( ) -> Result { if e.is_timeout() || e.is_connect() { e = e.without_url(); - warn!(?url, "network error while sending federation request: {e:?}"); + trace!(?url, "network error while sending federation request: {e:?}"); } else if e.is_redirect() { - warn!( + trace!( method = ?method, url = ?url, final_url = ?e.url(), @@ -208,7 +205,7 @@ fn handle_error( e, ); } else { - warn!(?url, "failed to send federation request: {e:?}"); + trace!(?url, "failed to send federation request: {e:?}"); } let mut nice_error = "Request failed".to_owned(); @@ -217,7 +214,7 @@ fn handle_error( write!(nice_error, ": {source:?}").expect("writing to string should not fail"); src = source.source(); } - warn!(nice_error, "Federation request error"); + info!(nice_error, "Federation request error"); Err(e.into()) } From 753f8aabaeb8f468426d23c1603fa424b6efd5b6 Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Mon, 25 Aug 2025 20:51:55 -0700 Subject: [PATCH 42/47] reduce log volume (keeps 2 infos) adjust log volume demote a bunch of logs --- src/api/server/send.rs | 6 +++--- .../rooms/event_handler/fetch_and_handle_outliers.rs | 2 +- src/service/sending/sender.rs | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/api/server/send.rs b/src/api/server/send.rs index 385477d1..601da06c 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -79,7 +79,7 @@ pub(crate) async fn send_transaction_message_route( } let txn_start_time = Instant::now(); - info!( + trace!( pdus = body.pdus.len(), edus = body.edus.len(), id = ?body.transaction_id, @@ -102,7 +102,7 @@ pub(crate) async fn send_transaction_message_route( .filter_map(Result::ok) .stream(); - info!( + trace!( pdus = body.pdus.len(), edus = body.edus.len(), elapsed = ?txn_start_time.elapsed(), @@ -198,7 +198,7 @@ async fn handle_room( .and_then(|(_, event_id, value)| async move { services.server.check_running()?; let pdu_start_time = Instant::now(); - info!( + trace!( %room_id, %event_id, pdu = n + 1, diff --git a/src/service/rooms/event_handler/fetch_and_handle_outliers.rs b/src/service/rooms/event_handler/fetch_and_handle_outliers.rs index c72cce87..d273a41a 100644 --- a/src/service/rooms/event_handler/fetch_and_handle_outliers.rs +++ b/src/service/rooms/event_handler/fetch_and_handle_outliers.rs @@ -4,7 +4,7 @@ use std::{ }; use conduwuit::{ - Event, PduEvent, debug, debug_error, debug_warn, implement, + Event, PduEvent, debug, debug_error, implement, matrix::event::gen_event_id_canonical_json, trace, utils::continue_exponential_backoff_secs, warn, }; diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index 5498afc0..ac82669f 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -882,7 +882,7 @@ impl Service { .execute_on(&self.services.client.sender, &server, request) .await .inspect(|_| { - info!(%txn_id, %server, "Sent {} PDUs, {} EDUs", pdu_count, edu_count); + trace!(%txn_id, %server, "Sent {} PDUs, {} EDUs", pdu_count, edu_count); }) .inspect_err(|e| { info!(%txn_id, %server, "Failed to send transaction ({} PDUs, {} EDUs): {e:?}", pdu_count, edu_count); @@ -890,7 +890,7 @@ impl Service { for (event_id, result) in result.iter().flat_map(|resp| resp.pdus.iter()) { if let Err(e) = result { - warn!( + trace!( %txn_id, %server, "error sending PDU {event_id} to remote server: {e:?}" ); From f48c27e34f83bd9d55a9b76f4bed4c2675b27e63 Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Tue, 26 Aug 2025 18:08:19 -0700 Subject: [PATCH 43/47] add event_id to log entry --- src/service/rooms/event_handler/handle_outlier_pdu.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/service/rooms/event_handler/handle_outlier_pdu.rs b/src/service/rooms/event_handler/handle_outlier_pdu.rs index fad9ac74..3b8653e7 100644 --- a/src/service/rooms/event_handler/handle_outlier_pdu.rs +++ b/src/service/rooms/event_handler/handle_outlier_pdu.rs @@ -123,7 +123,7 @@ where // The original create event must be in the auth events if !auth_events.contains_key(&(StateEventType::RoomCreate, String::new().into())) { - return Err!(Request(InvalidParam("Incoming event refers to wrong create event."))); + return Err!(Request(InvalidParam("Incoming event refers to wrong create event. event_id={event_id}"))); } let state_fetch = |ty: &StateEventType, sk: &str| { From 641184dfc5275ede909c74b927508040ae2ba242 Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Tue, 26 Aug 2025 18:42:40 -0700 Subject: [PATCH 44/47] turns out we need debug_warn --- .../rooms/event_handler/fetch_and_handle_outliers.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/service/rooms/event_handler/fetch_and_handle_outliers.rs b/src/service/rooms/event_handler/fetch_and_handle_outliers.rs index d273a41a..51d10d71 100644 --- a/src/service/rooms/event_handler/fetch_and_handle_outliers.rs +++ b/src/service/rooms/event_handler/fetch_and_handle_outliers.rs @@ -3,11 +3,7 @@ use std::{ time::Instant, }; -use conduwuit::{ - Event, PduEvent, debug, debug_error, implement, - matrix::event::gen_event_id_canonical_json, trace, utils::continue_exponential_backoff_secs, - warn, -}; +use conduwuit::{Event, PduEvent, debug, debug_error, implement, matrix::event::gen_event_id_canonical_json, trace, utils::continue_exponential_backoff_secs, warn, debug_warn}; use ruma::{ CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName, api::federation::event::get_event, From 066be5b38f277e01c8976d41aba776bfbaefd0e0 Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Sun, 31 Aug 2025 09:16:52 -0700 Subject: [PATCH 45/47] pass and use transaction id to collect timing info --- src/api/server/send.rs | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/src/api/server/send.rs b/src/api/server/send.rs index 601da06c..f5727cf6 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -79,10 +79,11 @@ pub(crate) async fn send_transaction_message_route( } let txn_start_time = Instant::now(); + let transaction_id = body.transaction_id.to_string(); trace!( pdus = body.pdus.len(), edus = body.edus.len(), - id = ?body.transaction_id, + id = transaction_id, "Processing transaction", ); @@ -106,17 +107,17 @@ pub(crate) async fn send_transaction_message_route( pdus = body.pdus.len(), edus = body.edus.len(), elapsed = ?txn_start_time.elapsed(), - id = ?body.transaction_id, + id = transaction_id, "Validated transaction", ); - let results = handle(&services, &client, body.origin(), txn_start_time, pdus, edus).await?; + let results = handle(&services, &client, body.origin(), txn_start_time, pdus, edus, &*transaction_id).await?; info!( pdus = body.pdus.len(), edus = body.edus.len(), elapsed = ?txn_start_time.elapsed(), - id = ?body.transaction_id, + id = transaction_id, "Finished txn", ); for (id, result) in &results { @@ -142,11 +143,13 @@ async fn handle( started: Instant, pdus: impl Stream + Send, edus: impl Stream + Send, + transaction_id: &str, ) -> Result { + let handle_start = Instant::now(); edus.for_each_concurrent(automatic_width(), |edu| handle_edu(services, client, origin, edu)) .boxed() .await; - + let pdu_start = Instant::now(); // group pdus by room let pdus = pdus .collect() @@ -157,14 +160,14 @@ async fn handle( .collect() }) .await; - + let results_start = Instant::now(); // we can evaluate rooms concurrently let results: ResolvedMap = pdus .into_iter() .try_stream() .broad_and_then(|(room_id, pdus): (_, Vec<_>)| { let count = pdus.len(); - handle_room(services, client, origin, started, room_id, pdus.into_iter(), count) + handle_room(services, client, origin, started, room_id, pdus.into_iter(), count, transaction_id) .map_ok(Vec::into_iter) .map_ok(IterStream::try_stream) }) @@ -172,7 +175,14 @@ async fn handle( .try_collect() .boxed() .await?; - + let handle_stop = Instant::now(); + info!( + edus = pdu_start.saturating_duration_since(handle_start).as_micros(), + pdus = results_start.saturating_duration_since(pdu_start).as_micros(), + handle_room = handle_stop.saturating_duration_since(results_start).as_micros(), + transaction_id = ?transaction_id, + "handled incoming transaction", + ); Ok(results) } @@ -184,6 +194,7 @@ async fn handle_room( room_id: OwnedRoomId, pdus: impl Iterator + Send, count: usize, + transaction_id: &str, ) -> Result> { let _room_lock = services .rooms @@ -201,6 +212,7 @@ async fn handle_room( trace!( %room_id, %event_id, + transaction_id = ?transaction_id, pdu = n + 1, total = count, pdu_elapsed = ?pdu_start_time.elapsed(), @@ -217,11 +229,12 @@ async fn handle_room( info!( %room_id, %event_id, + transaction_id = ?transaction_id, pdu = n + 1, total = count, pdu_elapsed = ?pdu_start_time.elapsed(), txn_elapsed = ?txn_start_time.elapsed(), - "Finished handling PDU {event_id}", + "Finished handling PDU", ); n += 1; From cd97d77ebce92d1732594d76bfaa9a0dd582f309 Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Tue, 26 Aug 2025 22:25:16 -0700 Subject: [PATCH 46/47] process edus before pdus here, too --- src/api/server/send.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/api/server/send.rs b/src/api/server/send.rs index f5727cf6..b2794a6e 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -87,14 +87,6 @@ pub(crate) async fn send_transaction_message_route( "Processing transaction", ); - let pdus = body - .pdus - .iter() - .stream() - .broad_then(|pdu| services.rooms.event_handler.parse_incoming_pdu(pdu)) - .inspect_err(|e| debug_warn!("Could not parse PDU: {e}")) - .ready_filter_map(Result::ok); - let edus = body .edus .iter() @@ -103,6 +95,14 @@ pub(crate) async fn send_transaction_message_route( .filter_map(Result::ok) .stream(); + let pdus = body + .pdus + .iter() + .stream() + .broad_then(|pdu| services.rooms.event_handler.parse_incoming_pdu(pdu)) + .inspect_err(|e| debug_warn!("Could not parse PDU: {e}")) + .ready_filter_map(Result::ok); + trace!( pdus = body.pdus.len(), edus = body.edus.len(), From eb37e0de77071c4d407b3a64c3f2af8b1cfb278a Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Sun, 31 Aug 2025 10:26:59 -0700 Subject: [PATCH 47/47] collect room lock timing ??? --- src/api/server/send.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/api/server/send.rs b/src/api/server/send.rs index b2794a6e..94991455 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -196,12 +196,14 @@ async fn handle_room( count: usize, transaction_id: &str, ) -> Result> { + let room_lock_start = Instant::now(); let _room_lock = services .rooms .event_handler .mutex_federation .lock(&room_id) .await; + let room_lock_end = Instant::now(); let room_id = &room_id; let mut n = 0; @@ -215,6 +217,7 @@ async fn handle_room( transaction_id = ?transaction_id, pdu = n + 1, total = count, + room_lock_time = ?room_lock_end.saturating_duration_since(room_lock_start).as_micros(), pdu_elapsed = ?pdu_start_time.elapsed(), txn_elapsed = ?txn_start_time.elapsed(), "Handling PDU", @@ -232,6 +235,7 @@ async fn handle_room( transaction_id = ?transaction_id, pdu = n + 1, total = count, + room_lock_time = ?room_lock_end.saturating_duration_since(room_lock_start).as_micros(), pdu_elapsed = ?pdu_start_time.elapsed(), txn_elapsed = ?txn_start_time.elapsed(), "Finished handling PDU",