diff --git a/.forgejo/actions/detect-runner-os/action.yml b/.forgejo/actions/detect-runner-os/action.yml index be8f9b0a..6ada1d5d 100644 --- a/.forgejo/actions/detect-runner-os/action.yml +++ b/.forgejo/actions/detect-runner-os/action.yml @@ -13,12 +13,6 @@ outputs: slug: description: 'Combined OS slug (e.g. Ubuntu-22.04)' value: ${{ steps.detect.outputs.slug }} - node_major: - description: 'Major version of Node.js if available (e.g. 22)' - value: ${{ steps.detect.outputs.node_major }} - node_version: - description: 'Full Node.js version if available (e.g. 22.19.0)' - value: ${{ steps.detect.outputs.node_version }} runs: using: composite @@ -36,20 +30,7 @@ runs: # Create combined slug OS_SLUG="${OS_NAME}-${OS_VERSION}" - # Detect Node.js version if available - if command -v node >/dev/null 2>&1; then - NODE_VERSION=$(node --version | sed 's/v//') - NODE_MAJOR=$(echo $NODE_VERSION | cut -d. -f1) - echo "node_version=${NODE_VERSION}" >> $GITHUB_OUTPUT - echo "node_major=${NODE_MAJOR}" >> $GITHUB_OUTPUT - echo "🔍 Detected Node.js: v${NODE_VERSION}" - else - echo "node_version=" >> $GITHUB_OUTPUT - echo "node_major=" >> $GITHUB_OUTPUT - echo "🔍 Node.js not found" - fi - - # Set OS outputs + # Set outputs echo "name=${OS_NAME}" >> $GITHUB_OUTPUT echo "version=${OS_VERSION}" >> $GITHUB_OUTPUT echo "slug=${OS_SLUG}" >> $GITHUB_OUTPUT diff --git a/.forgejo/actions/sccache/action.yml b/.forgejo/actions/sccache/action.yml index b2441109..b5e5dcf4 100644 --- a/.forgejo/actions/sccache/action.yml +++ b/.forgejo/actions/sccache/action.yml @@ -2,12 +2,18 @@ name: sccache description: | Install sccache for caching builds in GitHub Actions. +inputs: + token: + description: 'A Github PAT' + required: false runs: using: composite steps: - name: Install sccache - uses: https://git.tomfos.tr/tom/sccache-action@v1 + uses: https://github.com/mozilla-actions/sccache-action@v0.0.9 + with: + token: ${{ inputs.token }} - name: Configure sccache uses: https://github.com/actions/github-script@v7 with: diff --git a/.forgejo/actions/setup-rust/action.yml b/.forgejo/actions/setup-rust/action.yml index a8736a75..091da8c2 100644 --- a/.forgejo/actions/setup-rust/action.yml +++ b/.forgejo/actions/setup-rust/action.yml @@ -88,9 +88,19 @@ runs: # Shared toolchain cache across all Rust versions key: toolchain-${{ steps.runner-os.outputs.slug }} + - name: Debug GitHub token availability + shell: bash + run: | + if [ -z "${{ inputs.github-token }}" ]; then + echo "⚠️ No GitHub token provided - sccache will use fallback download method" + else + echo "✅ GitHub token provided for sccache" + fi - name: Setup sccache - uses: https://git.tomfos.tr/tom/sccache-action@v1 + uses: https://github.com/mozilla-actions/sccache-action@v0.0.9 + with: + token: ${{ inputs.github-token }} - name: Cache build artifacts id: build-cache diff --git a/.forgejo/workflows/documentation.yml b/.forgejo/workflows/documentation.yml index dc0dd4c1..caa0e1e2 100644 --- a/.forgejo/workflows/documentation.yml +++ b/.forgejo/workflows/documentation.yml @@ -49,23 +49,10 @@ jobs: cp ./docs/static/_headers ./public/_headers echo "Copied .well-known files and _headers to ./public" - - name: Detect runner environment - id: runner-env - uses: ./.forgejo/actions/detect-runner-os - - name: Setup Node.js - if: steps.runner-env.outputs.node_major == '' || steps.runner-env.outputs.node_major < '20' uses: https://github.com/actions/setup-node@v5 with: - node-version: 22 - - - name: Cache npm dependencies - uses: actions/cache@v3 - with: - path: ~/.npm - key: ${{ steps.runner-env.outputs.slug }}-node-${{ hashFiles('**/package-lock.json') }} - restore-keys: | - ${{ steps.runner-env.outputs.slug }}-node- + node-version: 20 - name: Install dependencies run: npm install --save-dev wrangler@latest diff --git a/.forgejo/workflows/release-image.yml b/.forgejo/workflows/release-image.yml index b7423567..7b29b7ca 100644 --- a/.forgejo/workflows/release-image.yml +++ b/.forgejo/workflows/release-image.yml @@ -11,6 +11,7 @@ on: - ".gitignore" - "renovate.json" - "pkg/**" + - "docker/**" - "docs/**" push: branches: @@ -22,6 +23,7 @@ on: - ".gitignore" - "renovate.json" - "pkg/**" + - "docker/**" - "docs/**" # Allows you to run this workflow manually from the Actions tab workflow_dispatch: @@ -53,9 +55,6 @@ jobs: let images = [] if (process.env.BUILTIN_REGISTRY_ENABLED === "true") { images.push(builtinImage) - } else { - // Fallback to official registry for forks/PRs without credentials - images.push('forgejo.ellis.link/continuwuation/continuwuity') } core.setOutput('images', images.join("\n")) core.setOutput('images_list', images.join(",")) @@ -101,7 +100,6 @@ jobs: with: persist-credentials: false - name: Install rust - if: ${{ env.BUILDKIT_ENDPOINT == '' }} id: rust-toolchain uses: ./.forgejo/actions/rust-toolchain @@ -112,11 +110,9 @@ jobs: driver: ${{ env.BUILDKIT_ENDPOINT != '' && 'remote' || 'docker-container' }} endpoint: ${{ env.BUILDKIT_ENDPOINT || '' }} - name: Set up QEMU - if: ${{ env.BUILDKIT_ENDPOINT == '' }} uses: docker/setup-qemu-action@v3 # Uses the `docker/login-action` action to log in to the Container registry registry using the account and password that will publish the packages. Once published, the packages are scoped to the account defined here. - name: Login to builtin registry - if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }} uses: docker/login-action@v3 with: registry: ${{ env.BUILTIN_REGISTRY }} @@ -142,21 +138,15 @@ jobs: run: | calculatedSha=$(git rev-parse --short ${{ github.sha }}) echo "COMMIT_SHORT_SHA=$calculatedSha" >> $GITHUB_ENV - echo "Short SHA: $calculatedSha" - name: Get Git commit timestamps - run: | - timestamp=$(git log -1 --pretty=%ct) - echo "TIMESTAMP=$timestamp" >> $GITHUB_ENV - echo "Commit timestamp: $timestamp" + run: echo "TIMESTAMP=$(git log -1 --pretty=%ct)" >> $GITHUB_ENV - uses: ./.forgejo/actions/timelord - if: ${{ env.BUILDKIT_ENDPOINT == '' }} with: key: timelord-v0 path: . - name: Cache Rust registry - if: ${{ env.BUILDKIT_ENDPOINT == '' }} uses: actions/cache@v3 with: path: | @@ -166,7 +156,6 @@ jobs: .cargo/registry/src key: rust-registry-image-${{hashFiles('**/Cargo.lock') }} - name: Cache cargo target - if: ${{ env.BUILDKIT_ENDPOINT == '' }} id: cache-cargo-target uses: actions/cache@v3 with: @@ -174,7 +163,6 @@ jobs: cargo-target-${{ matrix.target_cpu }}-${{ matrix.slug }}-${{ matrix.profile }} key: cargo-target-${{ matrix.target_cpu }}-${{ matrix.slug }}-${{ matrix.profile }}-${{hashFiles('**/Cargo.lock') }}-${{steps.rust-toolchain.outputs.rustc_version}} - name: Cache apt cache - if: ${{ env.BUILDKIT_ENDPOINT == '' }} id: cache-apt uses: actions/cache@v3 with: @@ -182,7 +170,6 @@ jobs: var-cache-apt-${{ matrix.slug }} key: var-cache-apt-${{ matrix.slug }} - name: Cache apt lib - if: ${{ env.BUILDKIT_ENDPOINT == '' }} id: cache-apt-lib uses: actions/cache@v3 with: @@ -190,7 +177,6 @@ jobs: var-lib-apt-${{ matrix.slug }} key: var-lib-apt-${{ matrix.slug }} - name: inject cache into docker - if: ${{ env.BUILDKIT_ENDPOINT == '' }} uses: https://github.com/reproducible-containers/buildkit-cache-dance@v3.3.0 with: cache-map: | @@ -213,7 +199,7 @@ jobs: context: . file: "docker/Dockerfile" build-args: | - GIT_COMMIT_HASH=${{ github.sha }} + GIT_COMMIT_HASH=${{ github.sha }}) GIT_COMMIT_HASH_SHORT=${{ env.COMMIT_SHORT_SHA }} GIT_REMOTE_URL=${{github.event.repository.html_url }} GIT_REMOTE_COMMIT_URL=${{github.event.head_commit.url }} @@ -223,23 +209,27 @@ jobs: cache-from: type=gha # cache-to: type=gha,mode=max sbom: true - outputs: | - ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' && format('type=image,"name={0}",push-by-digest=true,name-canonical=true,push=true', needs.define-variables.outputs.images_list) || format('type=image,"name={0}",push=false', needs.define-variables.outputs.images_list) }} - type=local,dest=/tmp/binaries + outputs: type=image,"name=${{ needs.define-variables.outputs.images_list }}",push-by-digest=true,name-canonical=true,push=true env: SOURCE_DATE_EPOCH: ${{ env.TIMESTAMP }} # For publishing multi-platform manifests - name: Export digest - if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }} run: | mkdir -p /tmp/digests digest="${{ steps.build.outputs.digest }}" touch "/tmp/digests/${digest#sha256:}" - # Binary extracted via local output for all builds - - name: Rename extracted binary - run: mv /tmp/binaries/sbin/conduwuit /tmp/binaries/conduwuit-${{ matrix.target_cpu }}-${{ matrix.slug }}-${{ matrix.profile }} + - name: Extract binary from container (image) + id: extract-binary-image + run: | + mkdir -p /tmp/binaries + digest="${{ steps.build.outputs.digest }}" + echo "container_id=$(docker create --platform ${{ matrix.platform }} ${{ needs.define-variables.outputs.images_list }}@$digest)" >> $GITHUB_OUTPUT + - name: Extract binary from container (copy) + run: docker cp ${{ steps.extract-binary-image.outputs.container_id }}:/sbin/conduwuit /tmp/binaries/conduwuit-${{ matrix.target_cpu }}-${{ matrix.slug }}-${{ matrix.profile }} + - name: Extract binary from container (cleanup) + run: docker rm ${{ steps.extract-binary-image.outputs.container_id }} - name: Upload binary artifact uses: forgejo/upload-artifact@v4 @@ -249,7 +239,6 @@ jobs: if-no-files-found: error - name: Upload digest - if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }} uses: forgejo/upload-artifact@v4 with: name: digests-${{ matrix.slug }} @@ -262,7 +251,6 @@ jobs: needs: [define-variables, build-image] steps: - name: Download digests - if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }} uses: forgejo/download-artifact@v4 with: path: /tmp/digests @@ -270,7 +258,6 @@ jobs: merge-multiple: true # Uses the `docker/login-action` action to log in to the Container registry registry using the account and password that will publish the packages. Once published, the packages are scoped to the account defined here. - name: Login to builtin registry - if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }} uses: docker/login-action@v3 with: registry: ${{ env.BUILTIN_REGISTRY }} @@ -278,7 +265,6 @@ jobs: password: ${{ secrets.BUILTIN_REGISTRY_PASSWORD || secrets.GITHUB_TOKEN }} - name: Set up Docker Buildx - if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }} uses: docker/setup-buildx-action@v3 with: # Use persistent BuildKit if BUILDKIT_ENDPOINT is set (e.g. tcp://buildkit:8125) @@ -286,7 +272,6 @@ jobs: endpoint: ${{ env.BUILDKIT_ENDPOINT || '' }} - name: Extract metadata (tags) for Docker - if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }} id: meta uses: docker/metadata-action@v5 with: @@ -304,7 +289,6 @@ jobs: DOCKER_METADATA_ANNOTATIONS_LEVELS: index - name: Create manifest list and push - if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }} working-directory: /tmp/digests env: IMAGES: ${{needs.define-variables.outputs.images}} @@ -322,7 +306,6 @@ jobs: done - name: Inspect image - if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }} env: IMAGES: ${{needs.define-variables.outputs.images}} shell: bash diff --git a/.vscode/settings.json b/.vscode/settings.json index 82162ff7..a4fad964 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -7,6 +7,5 @@ "continuwuity", "homeserver", "homeservers" - ], - "rust-analyzer.cargo.features": ["full"] + ] } diff --git a/docker/Dockerfile b/docker/Dockerfile index 17e1c6a1..55150902 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -199,57 +199,32 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \ EOF # Extract dynamically linked dependencies -RUN <<'DEPS_EOF' +RUN </dev/null) && [ -n "$lddtree_output" ]; then - echo "$lddtree_output" | awk '{print $(NF-0) " " $1}' | sort -u -k 1,1 | \ - awk '{dest = ($2 ~ /^\//) ? "/out/libs-root" $2 : "/out/libs/" $2; print "install -D " $1 " " dest}' | \ - while read cmd; do eval "$cmd"; done - fi + lddtree "$BINARY" | awk '{print $(NF-0) " " $1}' | sort -u -k 1,1 | awk '{print "install", "-D", $1, (($2 ~ /^\//) ? "/out/libs-root" $2 : "/out/libs/" $2)}' | xargs -I {} sh -c {} done - - # Show what will be copied to runtime - echo "=== Libraries being copied to runtime image:" - find /out/libs* -type f 2>/dev/null | sort || echo "No libraries found" -DEPS_EOF - -FROM ubuntu:latest AS prepper - -# Create layer structure -RUN mkdir -p /layer1/etc/ssl/certs \ - /layer2/usr/lib \ - /layer3/sbin /layer3/sbom - -# Copy SSL certs and root-path libraries to layer1 (ultra-stable) -COPY --from=base /etc/ssl/certs /layer1/etc/ssl/certs -COPY --from=builder /out/libs-root/ /layer1/ - -# Copy application libraries to layer2 (semi-stable) -COPY --from=builder /out/libs/ /layer2/usr/lib/ - -# Copy binaries and SBOM to layer3 (volatile) -COPY --from=builder /out/sbin/ /layer3/sbin/ -COPY --from=builder /out/sbom/ /layer3/sbom/ - -# Fix permissions after copying -RUN chmod -R 755 /layer1 /layer2 /layer3 +EOF FROM scratch WORKDIR / -# Copy ultra-stable layer (SSL certs, system libraries) -COPY --from=prepper /layer1/ / +# Copy root certs for tls into image +# You can also mount the certs from the host +# --volume /etc/ssl/certs:/etc/ssl/certs:ro +COPY --from=base /etc/ssl/certs /etc/ssl/certs -# Copy semi-stable layer (application libraries) -COPY --from=prepper /layer2/ / +# Copy our build +COPY --from=builder /out/sbin/ /sbin/ +# Copy SBOM +COPY --from=builder /out/sbom/ /sbom/ -# Copy volatile layer (binaries, SBOM) -COPY --from=prepper /layer3/ / +# Copy dynamic libraries to root +COPY --from=builder /out/libs-root/ / +COPY --from=builder /out/libs/ /usr/lib/ # Inform linker where to find libraries ENV LD_LIBRARY_PATH=/usr/lib diff --git a/src/admin/debug/commands.rs b/src/admin/debug/commands.rs index 64f68330..81b0e9da 100644 --- a/src/admin/debug/commands.rs +++ b/src/admin/debug/commands.rs @@ -281,8 +281,15 @@ pub(super) async fn get_remote_pdu( vec![(event_id, value, room_id)] }; + info!("Attempting to handle event ID {event_id} as backfilled PDU"); + self.services + .rooms + .timeline + .backfill_pdu(&server, response.pdu) + .await?; + let text = serde_json::to_string_pretty(&json)?; - let msg = "Got PDU from specified server:"; + let msg = "Got PDU from specified server and handled as backfilled"; write!(self, "{msg}. Event body:\n```json\n{text}\n```") }, } diff --git a/src/admin/query/room_timeline.rs b/src/admin/query/room_timeline.rs index 0f72b58c..afcfec34 100644 --- a/src/admin/query/room_timeline.rs +++ b/src/admin/query/room_timeline.rs @@ -57,5 +57,5 @@ pub(super) async fn pdus( .try_collect() .await?; - self.write_str(&format!("```\n{result:#?}\n```")).await + self.write_str(&format!("{result:#?}")).await } diff --git a/src/api/client/message.rs b/src/api/client/message.rs index 842036f5..f5a951f4 100644 --- a/src/api/client/message.rs +++ b/src/api/client/message.rs @@ -35,6 +35,7 @@ use ruma::{ }; use tracing::warn; +use super::utils::{count_to_token, parse_pagination_token as parse_token}; use crate::Ruma; /// list of safe and common non-state events to ignore if the user is ignored @@ -84,14 +85,14 @@ pub(crate) async fn get_message_events_route( let from: PduCount = body .from .as_deref() - .map(str::parse) + .map(parse_token) .transpose()? .unwrap_or_else(|| match body.dir { | Direction::Forward => PduCount::min(), | Direction::Backward => PduCount::max(), }); - let to: Option = body.to.as_deref().map(str::parse).transpose()?; + let to: Option = body.to.as_deref().map(parse_token).transpose()?; let limit: usize = body .limit @@ -180,8 +181,8 @@ pub(crate) async fn get_message_events_route( .collect(); Ok(get_message_events::v3::Response { - start: from.to_string(), - end: next_token.as_ref().map(PduCount::to_string), + start: count_to_token(from), + end: next_token.map(count_to_token), chunk, state, }) diff --git a/src/api/client/mod.rs b/src/api/client/mod.rs index 0014282c..c8ca7757 100644 --- a/src/api/client/mod.rs +++ b/src/api/client/mod.rs @@ -37,6 +37,7 @@ pub(super) mod typing; pub(super) mod unstable; pub(super) mod unversioned; pub(super) mod user_directory; +pub(super) mod utils; pub(super) mod voip; pub(super) mod well_known; diff --git a/src/api/client/relations.rs b/src/api/client/relations.rs index 79fa1459..f6d8fe9e 100644 --- a/src/api/client/relations.rs +++ b/src/api/client/relations.rs @@ -18,6 +18,7 @@ use ruma::{ events::{TimelineEventType, relation::RelationType}, }; +use super::utils::{count_to_token, parse_pagination_token as parse_token}; use crate::Ruma; /// # `GET /_matrix/client/r0/rooms/{roomId}/relations/{eventId}/{relType}/{eventType}` @@ -110,14 +111,14 @@ async fn paginate_relations_with_filter( dir: Direction, ) -> Result { let start: PduCount = from - .map(str::parse) + .map(parse_token) .transpose()? .unwrap_or_else(|| match dir { | Direction::Forward => PduCount::min(), | Direction::Backward => PduCount::max(), }); - let to: Option = to.map(str::parse).transpose()?; + let to: Option = to.map(parse_token).transpose()?; // Use limit or else 30, with maximum 100 let limit: usize = limit @@ -192,7 +193,7 @@ async fn paginate_relations_with_filter( | Direction::Forward => events.last(), | Direction::Backward => events.first(), } - .map(|(count, _)| count.to_string()) + .map(|(count, _)| count_to_token(*count)) } else { None }; diff --git a/src/api/client/room/event.rs b/src/api/client/room/event.rs index 75ae0758..47228d67 100644 --- a/src/api/client/room/event.rs +++ b/src/api/client/room/event.rs @@ -18,7 +18,7 @@ pub(crate) async fn get_room_event_route( let event = services .rooms .timeline - .get_remote_pdu(room_id, event_id) + .get_pdu(event_id) .map_err(|_| err!(Request(NotFound("Event {} not found.", event_id)))); let visible = services @@ -33,6 +33,11 @@ pub(crate) async fn get_room_event_route( return Err!(Request(Forbidden("You don't have permission to view this event."))); } + debug_assert!( + event.event_id() == event_id && event.room_id() == room_id, + "Fetched PDU must match requested" + ); + event.add_age().ok(); Ok(get_room_event::v3::Response { event: event.into_format() }) diff --git a/src/api/client/utils.rs b/src/api/client/utils.rs new file mode 100644 index 00000000..cc941b95 --- /dev/null +++ b/src/api/client/utils.rs @@ -0,0 +1,28 @@ +use conduwuit::{ + Result, err, + matrix::pdu::{PduCount, ShortEventId}, +}; + +/// Parse a pagination token, trying ShortEventId first, then falling back to +/// PduCount +pub(crate) fn parse_pagination_token(token: &str) -> Result { + // Try parsing as ShortEventId first + if let Ok(shorteventid) = token.parse::() { + // ShortEventId maps directly to a PduCount in our database + Ok(PduCount::Normal(shorteventid)) + } else if let Ok(count) = token.parse::() { + // Fallback to PduCount for backwards compatibility + Ok(PduCount::Normal(count)) + } else if let Ok(count) = token.parse::() { + // Also handle negative counts for backfilled events + Ok(PduCount::from_signed(count)) + } else { + Err(err!(Request(InvalidParam("Invalid pagination token")))) + } +} + +/// Convert a PduCount to a token string (using the underlying ShortEventId) +pub(crate) fn count_to_token(count: PduCount) -> String { + // The PduCount's unsigned value IS the ShortEventId + count.into_unsigned().to_string() +} diff --git a/src/api/server/send_join.rs b/src/api/server/send_join.rs index d92581a1..652451c7 100644 --- a/src/api/server/send_join.rs +++ b/src/api/server/send_join.rs @@ -1,13 +1,12 @@ #![allow(deprecated)] -use std::{borrow::Borrow, time::Instant, vec}; +use std::borrow::Borrow; use axum::extract::State; use conduwuit::{ - Err, Event, Result, at, debug, err, info, + Err, Result, at, err, matrix::event::gen_event_id_canonical_json, - trace, - utils::stream::{BroadbandExt, IterStream, TryBroadbandExt}, + utils::stream::{IterStream, TryBroadbandExt}, warn, }; use conduwuit_service::Services; @@ -26,14 +25,12 @@ use serde_json::value::{RawValue as RawJsonValue, to_raw_value}; use crate::Ruma; /// helper method for /send_join v1 and v2 -#[tracing::instrument(skip(services, pdu, omit_members), fields(room_id = room_id.as_str(), origin = origin.as_str()))] async fn create_join_event( services: &Services, origin: &ServerName, room_id: &RoomId, pdu: &RawJsonValue, - omit_members: bool, -) -> Result { +) -> Result { if !services.rooms.metadata.exists(room_id).await { return Err!(Request(NotFound("Room is unknown to this server."))); } @@ -56,10 +53,8 @@ async fn create_join_event( // We do not add the event_id field to the pdu here because of signature and // hashes checks - trace!("Getting room version"); let room_version_id = services.rooms.state.get_room_version(room_id).await?; - trace!("Generating event ID and converting to canonical json"); let Ok((event_id, mut value)) = gen_event_id_canonical_json(pdu, &room_version_id) else { // Event could not be converted to canonical json return Err!(Request(BadJson("Could not convert event to canonical json."))); @@ -108,6 +103,7 @@ async fn create_join_event( ))); } + // ACL check sender user server name let sender: OwnedUserId = serde_json::from_value( value .get("sender") @@ -117,6 +113,12 @@ async fn create_join_event( ) .map_err(|e| err!(Request(BadJson(warn!("sender property is not a valid user ID: {e}")))))?; + services + .rooms + .event_handler + .acl_check(sender.server_name(), room_id) + .await?; + // check if origin server is trying to send for another server if sender.server_name() != origin { return Err!(Request(Forbidden("Not allowed to join on behalf of another server."))); @@ -178,6 +180,11 @@ async fn create_join_event( } } + services + .server_keys + .hash_and_sign_event(&mut value, &room_version_id) + .map_err(|e| err!(Request(InvalidParam(warn!("Failed to sign send_join event: {e}")))))?; + let origin: OwnedServerName = serde_json::from_value( value .get("origin") @@ -187,12 +194,6 @@ async fn create_join_event( ) .map_err(|e| err!(Request(BadJson("Event has an invalid origin server name: {e}"))))?; - trace!("Signing send_join event"); - services - .server_keys - .hash_and_sign_event(&mut value, &room_version_id) - .map_err(|e| err!(Request(InvalidParam(warn!("Failed to sign send_join event: {e}")))))?; - let mutex_lock = services .rooms .event_handler @@ -200,7 +201,6 @@ async fn create_join_event( .lock(room_id) .await; - trace!("Acquired send_join mutex, persisting join event"); let pdu_id = services .rooms .event_handler @@ -210,7 +210,7 @@ async fn create_join_event( .ok_or_else(|| err!(Request(InvalidParam("Could not accept as timeline event."))))?; drop(mutex_lock); - trace!("Fetching current state IDs"); + let state_ids: Vec = services .rooms .state_accessor @@ -219,23 +219,9 @@ async fn create_join_event( .collect() .await; - trace!(%omit_members, "Constructing current state"); let state = state_ids .iter() .try_stream() - .broad_filter_map(|event_id| async move { - if omit_members { - if let Ok(e) = event_id.as_ref() { - let pdu = services.rooms.timeline.get_pdu(e).await; - if pdu.is_ok_and(|p| p.kind().to_cow_str() == "m.room.member") { - trace!("omitting member event {e:?} from returned state"); - // skip members - return None; - } - } - } - Some(event_id) - }) .broad_and_then(|event_id| services.rooms.timeline.get_pdu_json(event_id)) .broad_and_then(|pdu| { services @@ -248,7 +234,6 @@ async fn create_join_event( .await?; let starting_events = state_ids.iter().map(Borrow::borrow); - trace!("Constructing auth chain"); let auth_chain = services .rooms .auth_chain @@ -265,37 +250,13 @@ async fn create_join_event( .try_collect() .boxed() .await?; - info!(fast_join = %omit_members, "Sending join event to other servers"); + services.sending.send_pdu_room(room_id, &pdu_id).await?; - debug!("Finished sending join event"); - let servers_in_room: Option> = if !omit_members { - None - } else { - trace!("Fetching list of servers in room"); - let servers: Vec = services - .rooms - .state_cache - .room_servers(room_id) - .map(|sn| sn.as_str().to_owned()) - .collect() - .await; - // If there's no servers, just add us - let servers = if servers.is_empty() { - warn!("Failed to find any servers, adding our own server name as a last resort"); - vec![services.globals.server_name().to_string()] - } else { - trace!("Found {} servers in room", servers.len()); - servers - }; - Some(servers) - }; - debug!("Returning send_join data"); - Ok(create_join_event::v2::RoomState { + + Ok(create_join_event::v1::RoomState { auth_chain, state, event: to_raw_value(&CanonicalJsonValue::Object(value)).ok(), - members_omitted: omit_members, - servers_in_room, }) } @@ -333,23 +294,11 @@ pub(crate) async fn create_join_event_v1_route( } } - let now = Instant::now(); - let room_state = create_join_event(&services, body.origin(), &body.room_id, &body.pdu, false) + let room_state = create_join_event(&services, body.origin(), &body.room_id, &body.pdu) .boxed() .await?; - let transformed = create_join_event::v1::RoomState { - auth_chain: room_state.auth_chain, - state: room_state.state, - event: room_state.event, - }; - info!( - "Finished sending a join for {} in {} in {:?}", - body.origin(), - &body.room_id, - now.elapsed() - ); - Ok(create_join_event::v1::Response { room_state: transformed }) + Ok(create_join_event::v1::Response { room_state }) } /// # `PUT /_matrix/federation/v2/send_join/{roomId}/{eventId}` @@ -380,17 +329,17 @@ pub(crate) async fn create_join_event_v2_route( } } - let now = Instant::now(); - let room_state = - create_join_event(&services, body.origin(), &body.room_id, &body.pdu, body.omit_members) + let create_join_event::v1::RoomState { auth_chain, state, event } = + create_join_event(&services, body.origin(), &body.room_id, &body.pdu) .boxed() .await?; - info!( - "Finished sending a join for {} in {} in {:?}", - body.origin(), - &body.room_id, - now.elapsed() - ); + let room_state = create_join_event::v2::RoomState { + members_omitted: false, + auth_chain, + state, + event, + servers_in_room: None, + }; Ok(create_join_event::v2::Response { room_state }) } diff --git a/src/core/matrix/pdu/count.rs b/src/core/matrix/pdu/count.rs index 7fb12574..b880278f 100644 --- a/src/core/matrix/pdu/count.rs +++ b/src/core/matrix/pdu/count.rs @@ -1,10 +1,6 @@ #![allow(clippy::cast_possible_wrap, clippy::cast_sign_loss, clippy::as_conversions)] -use std::{ - cmp::Ordering, - fmt::{self, Display}, - str::FromStr, -}; +use std::{cmp::Ordering, fmt, fmt::Display, str::FromStr}; use ruma::api::Direction; diff --git a/src/service/rooms/state_compressor/mod.rs b/src/service/rooms/state_compressor/mod.rs index 028c3e51..f7f7d043 100644 --- a/src/service/rooms/state_compressor/mod.rs +++ b/src/service/rooms/state_compressor/mod.rs @@ -452,7 +452,7 @@ async fn get_statediff(&self, shortstatehash: ShortStateHash) -> Result Re .boxed(); while let Some(ref backfill_server) = servers.next().await { - info!("Asking {backfill_server} for backfill in {room_id}"); + info!("Asking {backfill_server} for backfill"); let response = self .services .sending @@ -129,126 +128,10 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re } } - warn!("No servers could backfill, but backfill was needed in room {room_id}"); + info!("No servers could backfill, but backfill was needed in room {room_id}"); Ok(()) } -#[implement(super::Service)] -#[tracing::instrument(name = "get_remote_pdu", level = "debug", skip(self))] -pub async fn get_remote_pdu(&self, room_id: &RoomId, event_id: &EventId) -> Result { - let local = self.get_pdu(event_id).await; - if local.is_ok() { - // We already have this PDU, no need to backfill - debug!("We already have {event_id} in {room_id}, no need to backfill."); - return local; - } - debug!("Preparing to fetch event {event_id} in room {room_id} from remote servers."); - // Similar to backfill_if_required, but only for a single PDU - // Fetch a list of servers to try - if self - .services - .state_cache - .room_joined_count(room_id) - .await - .is_ok_and(|count| count <= 1) - && !self - .services - .state_accessor - .is_world_readable(room_id) - .await - { - // Room is empty (1 user or none), there is no one that can backfill - return Err!(Request(NotFound("No one can backfill this PDU, room is empty."))); - } - - let power_levels: RoomPowerLevelsEventContent = self - .services - .state_accessor - .room_state_get_content(room_id, &StateEventType::RoomPowerLevels, "") - .await - .unwrap_or_default(); - - let room_mods = power_levels.users.iter().filter_map(|(user_id, level)| { - if level > &power_levels.users_default && !self.services.globals.user_is_local(user_id) { - Some(user_id.server_name()) - } else { - None - } - }); - - let canonical_room_alias_server = once( - self.services - .state_accessor - .get_canonical_alias(room_id) - .await, - ) - .filter_map(Result::ok) - .map(|alias| alias.server_name().to_owned()) - .stream(); - let mut servers = room_mods - .stream() - .map(ToOwned::to_owned) - .chain(canonical_room_alias_server) - .chain( - self.services - .server - .config - .trusted_servers - .iter() - .map(ToOwned::to_owned) - .stream(), - ) - .ready_filter(|server_name| !self.services.globals.server_is_ours(server_name)) - .filter_map(|server_name| async move { - self.services - .state_cache - .server_in_room(&server_name, room_id) - .await - .then_some(server_name) - }) - .boxed(); - - while let Some(ref backfill_server) = servers.next().await { - info!("Asking {backfill_server} for event {}", event_id); - let value = self - .services - .sending - .send_federation_request(backfill_server, federation::event::get_event::v1::Request { - event_id: event_id.to_owned(), - include_unredacted_content: Some(false), - }) - .await - .and_then(|response| { - serde_json::from_str::(response.pdu.get()).map_err(|e| { - err!(BadServerResponse(debug_warn!( - "Error parsing incoming event {e:?} from {backfill_server}" - ))) - }) - }); - let pdu = match value { - | Ok(value) => { - self.services - .event_handler - .handle_incoming_pdu(backfill_server, room_id, event_id, value, false) - .boxed() - .await?; - debug!("Successfully backfilled {event_id} from {backfill_server}"); - Some(self.get_pdu(event_id).await) - }, - | Err(e) => { - warn!("{backfill_server} failed to provide backfill for room {room_id}: {e}"); - None - }, - }; - if let Some(pdu) = pdu { - debug!("Fetched {event_id} from {backfill_server}"); - return pdu; - } - } - - Err!("No servers could be used to fetch {} in {}.", room_id, event_id) -} - #[implement(super::Service)] #[tracing::instrument(skip(self, pdu), level = "debug")] pub async fn backfill_pdu(&self, origin: &ServerName, pdu: Box) -> Result<()> { diff --git a/src/service/rooms/timeline/data.rs b/src/service/rooms/timeline/data.rs index 9064df6c..fa10a5c0 100644 --- a/src/service/rooms/timeline/data.rs +++ b/src/service/rooms/timeline/data.rs @@ -3,7 +3,8 @@ use std::{borrow::Borrow, sync::Arc}; use conduwuit::{ Err, PduCount, PduEvent, Result, at, err, result::{LogErr, NotFound}, - utils::{self, stream::TryReadyExt}, + utils, + utils::stream::TryReadyExt, }; use database::{Database, Deserialized, Json, KeyVal, Map}; use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, future::select_ok, pin_mut};