Compare commits

...

47 commits

Author SHA1 Message Date
Jacob Taylor
eb37e0de77 collect room lock timing ??? 2025-09-07 18:36:30 -07:00
Jacob Taylor
cd97d77ebc process edus before pdus here, too 2025-09-07 18:36:30 -07:00
Jacob Taylor
066be5b38f pass and use transaction id to collect timing info 2025-09-07 18:36:30 -07:00
Jacob Taylor
641184dfc5 turns out we need debug_warn 2025-09-07 18:36:30 -07:00
Jacob Taylor
f48c27e34f add event_id to log entry 2025-09-07 18:36:30 -07:00
Jacob Taylor
753f8aabae reduce log volume (keeps 2 infos)
adjust log volume

demote a bunch of logs
2025-09-07 18:36:30 -07:00
Jacob Taylor
25b0d43b9c fix warn by removing unused debug imports
delete more imports to quiet cargo
2025-09-07 18:36:30 -07:00
Jacob Taylor
78ade5aa68 fix warn by removing unused debug imports 2025-09-07 18:36:30 -07:00
Jacob Taylor
abe234537f exponential backoff is now just bees. did you want bees? no? well you have them now. congrats 2025-09-07 18:36:30 -07:00
Jacob Taylor
f02e890292 fix too many infos 2025-09-07 18:36:30 -07:00
Jacob Taylor
64fb278133 more funny settings (part 3 of 12) 2025-09-07 18:36:30 -07:00
Jacob Taylor
785ba4eaad sender_workers scaling. this time, with feeling! 2025-09-07 18:36:30 -07:00
Jacob Taylor
dc9776bc1b vehicle loan documentation now available at window 7 2025-09-07 18:36:30 -07:00
nexy7574
f9af07fb9d log which room struggled to get mainline depth 2025-09-07 18:23:07 -07:00
nexy7574
182a2d5ed4 more logs 2025-09-07 18:23:07 -07:00
nexy7574
e3b632529b Fix room ID check 2025-09-07 18:23:07 -07:00
nexy7574
5dcd825132 Kick up a fuss when m.room.create is unfindable 2025-09-07 18:23:07 -07:00
nexy7574
b157c02fec Note about ruma#2064 in TODO 2025-09-07 18:23:07 -07:00
nexy7574
ffac6d8155 fix an auth rule not applying correctly 2025-09-07 18:23:07 -07:00
Jacob Taylor
60d403f3d7 upgrade some settings to enable 5g in continuwuity
enable converged 6g at the edge in continuwuity

better stateinfo_cache_capacity default

better roomid_spacehierarchy_cache_capacity

make sender workers default better and clamp value to core count

update sender workers documentation

add more parallelism_scaled and make them public

update 1 document
2025-09-07 18:23:07 -07:00
Jacob Taylor
bcca10b504 bump the number of allowed immutable memtables by 1, to allow for greater flood protection
this should probably not be applied if you have rocksdb_atomic_flush = false (the default)
2025-09-07 18:23:07 -07:00
nexy7574
6996985a06 feat(fed): Handle EDUs before PDUs
Aranje needs his crypto keys
2025-09-07 18:23:07 -07:00
nexy7574
8584116555 feat(fed): Handle EDUs before PDUs
Aranje needs his crypto keys
2025-09-07 18:23:07 -07:00
nexy7574
b45b630af7 feat(fed): Something about nicer fed errors 2025-09-07 18:23:07 -07:00
nexy7574
e991a10de2 fix(fed): Alter log levels to be less noisy 2025-09-07 18:23:07 -07:00
nexy7574
0abfc192d4 fix(fed): Improve transaction flushing 2025-09-07 18:23:07 -07:00
Ginger
5d3e10a048
fix: Make RA use the full feature 2025-09-07 18:07:03 -04:00
Ginger
1e541875ad
fix: Nuke src/api/client/utils.rs 2025-09-07 18:06:11 -04:00
nexy7574
90fd92977e style: Run clippy 2025-09-07 21:20:26 +00:00
Ginger
e27ef7f5ec feat: Do not persist remote PDUs fetched with admin commands 2025-09-07 21:20:26 +00:00
Ginger
16f4efa708 fix: Fix pagination tokens being corrupted for backfilled PDUs 2025-09-07 21:20:26 +00:00
Ginger
e38dec5864 fix: Put the output of !admin query room-timeline pdus in a codeblock 2025-09-07 21:20:26 +00:00
Ginger
f3824ffc3d fix: Use handle_incoming_pdu directly to keep remote PDUs as outliers 2025-09-07 21:20:26 +00:00
nexy7574
e3fbf7a143 feat: Ask remote servers for individual unknown events 2025-09-07 21:20:26 +00:00
nexy7574
09de586dc7
feat(PR977): Log more things in the join process 2025-09-07 22:01:07 +01:00
nexy7574
d1fff1d09f
perf(pr977): Remove redundant ACL check in send_join 2025-09-07 22:01:07 +01:00
nexy7574
f47474d12a
fix(PR977): Adjust some log levels 2025-09-07 22:01:07 +01:00
nexy7574
53da294e53
fix(PR977): Omitting redundant entries from the auth_chain caused problems 2025-09-07 22:01:07 +01:00
nexy7574
2cdccbf2fe
feat(PR977): Support omitting members in the send_join response 2025-09-07 22:01:07 +01:00
Tom Foster
6cf3c839e4 ci(release-image): Skip digest upload when not pushing images
After #992, builds without registry credentials skip Docker image output
but still extract binary artifacts. However, we were still trying to
upload digests for images that weren't created. Add conditional check
to only upload digests when actually pushing to registry.
2025-09-07 21:27:56 +01:00
Tom Foster
4a1091dd06 ci(release-image): Unify binary extraction using BuildKit local output
Fork PRs currently fail binary extraction with 'invalid reference format'
and 'must specify at least one container source' errors. This replaces the
registry-specific docker create/copy method with BuildKit's local output
feature for all builds.

Uses multiple outputs in single build: image export plus local binary
extraction from /sbin. Speeds up extracting binary artifacts and saves a
couple of extra workflow steps in the process.
2025-09-07 20:46:11 +01:00
Tom Foster
1e9701f379 ci(release-image): Skip setup steps when using persistent BuildKit
When BUILDKIT_ENDPOINT is set, builds run on a persistent BuildKit instance,
making runner setup steps unnecessary. Skip Rust toolchain installation,
QEMU setup, caching steps, and timelord to eliminate ~7 operations per job.

Also adds output to git SHA and timestamp steps for visibility.

Cuts at least a minute off average build time through fewer installs,
cache restores, and cache saves.
2025-09-07 18:59:05 +01:00
Tom Foster
2cedf0d2e1 fix(ci): Use image output instead of docker for fork PRs
Docker exporter doesn't support manifest lists (multi-platform builds).
For fork PRs without registry credentials, use 'type=image,push=false'
instead of 'type=docker' to build multi-platform images locally without pushing.
2025-09-07 18:32:38 +01:00
Tom Foster
84fdcd326a fix(ci): Resolve registry push failures for fork PRs
Fork PRs now fail during Docker image build with 'tag is needed when
pushing to registry' because BUILTIN_REGISTRY_ENABLED evaluates to false
without proper credentials, leaving the images list empty. This appears
to be due to recent Forgejo permission changes affecting fork access to
repository secrets.

Add fallback to official registry when credentials unavailable, skip
registry login and push operations for forks, and make merge job
conditional since no digests exist without push. This allows forks to
test Docker builds whilst avoiding authentication failures.
2025-09-07 17:39:18 +01:00
Tom Foster
d640853f9d ci(docs): Optimise build performance with caching and conditional Node.js
Skip installing Node.js entirely if v20+ is already available, otherwise
install v22. Add npm dependency caching with OS-specific cache keys using
the custom detect-runner-os action for proper cache isolation between
runners. Dependencies normally take just under 10s, so this should more
than halve the doc build time to free up runner slots.
2025-09-07 14:51:10 +01:00
Tom Foster
fff9629b0f fix(docker): Resolve liburing.so.2 loading error for non-root users
Container failed to start when running as non-root (user 1000:1000) because
copied directories had restrictive 770 permissions, likely due to different
umask in persistent BuildKit. Non-root users couldn't access /usr/lib to
load required dynamic libraries.

Introduces prepper stage using Ubuntu to organize files into layered structure
with explicit 755 directory permissions before copying to scratch image.
Also fixes workflow syntax error and removes docker/** from paths-ignore to
ensure Docker changes trigger CI builds.
2025-09-07 14:13:14 +01:00
Tom Foster
1a3107c20a fix(ci): Replace Mozilla sccache action with token-free alternative
Replace mozilla-actions/sccache-action with a custom Forgejo-specific
implementation that eliminates GitHub token dependencies and rate limiting
issues for all contributors regardless of repository permissions.

The new action mirrors sccache binaries to the Forgejo package registry
and queries that instead of GitHub releases, maintaining identical functionality
including hostedtoolcache support.
2025-09-07 09:29:32 +01:00
34 changed files with 515 additions and 246 deletions

View file

@ -13,6 +13,12 @@ outputs:
slug:
description: 'Combined OS slug (e.g. Ubuntu-22.04)'
value: ${{ steps.detect.outputs.slug }}
node_major:
description: 'Major version of Node.js if available (e.g. 22)'
value: ${{ steps.detect.outputs.node_major }}
node_version:
description: 'Full Node.js version if available (e.g. 22.19.0)'
value: ${{ steps.detect.outputs.node_version }}
runs:
using: composite
@ -30,7 +36,20 @@ runs:
# Create combined slug
OS_SLUG="${OS_NAME}-${OS_VERSION}"
# Set outputs
# Detect Node.js version if available
if command -v node >/dev/null 2>&1; then
NODE_VERSION=$(node --version | sed 's/v//')
NODE_MAJOR=$(echo $NODE_VERSION | cut -d. -f1)
echo "node_version=${NODE_VERSION}" >> $GITHUB_OUTPUT
echo "node_major=${NODE_MAJOR}" >> $GITHUB_OUTPUT
echo "🔍 Detected Node.js: v${NODE_VERSION}"
else
echo "node_version=" >> $GITHUB_OUTPUT
echo "node_major=" >> $GITHUB_OUTPUT
echo "🔍 Node.js not found"
fi
# Set OS outputs
echo "name=${OS_NAME}" >> $GITHUB_OUTPUT
echo "version=${OS_VERSION}" >> $GITHUB_OUTPUT
echo "slug=${OS_SLUG}" >> $GITHUB_OUTPUT

View file

@ -2,18 +2,12 @@ name: sccache
description: |
Install sccache for caching builds in GitHub Actions.
inputs:
token:
description: 'A Github PAT'
required: false
runs:
using: composite
steps:
- name: Install sccache
uses: https://github.com/mozilla-actions/sccache-action@v0.0.9
with:
token: ${{ inputs.token }}
uses: https://git.tomfos.tr/tom/sccache-action@v1
- name: Configure sccache
uses: https://github.com/actions/github-script@v7
with:

View file

@ -88,19 +88,9 @@ runs:
# Shared toolchain cache across all Rust versions
key: toolchain-${{ steps.runner-os.outputs.slug }}
- name: Debug GitHub token availability
shell: bash
run: |
if [ -z "${{ inputs.github-token }}" ]; then
echo "⚠️ No GitHub token provided - sccache will use fallback download method"
else
echo "✅ GitHub token provided for sccache"
fi
- name: Setup sccache
uses: https://github.com/mozilla-actions/sccache-action@v0.0.9
with:
token: ${{ inputs.github-token }}
uses: https://git.tomfos.tr/tom/sccache-action@v1
- name: Cache build artifacts
id: build-cache

View file

@ -49,10 +49,23 @@ jobs:
cp ./docs/static/_headers ./public/_headers
echo "Copied .well-known files and _headers to ./public"
- name: Detect runner environment
id: runner-env
uses: ./.forgejo/actions/detect-runner-os
- name: Setup Node.js
if: steps.runner-env.outputs.node_major == '' || steps.runner-env.outputs.node_major < '20'
uses: https://github.com/actions/setup-node@v4
with:
node-version: 20
node-version: 22
- name: Cache npm dependencies
uses: actions/cache@v3
with:
path: ~/.npm
key: ${{ steps.runner-env.outputs.slug }}-node-${{ hashFiles('**/package-lock.json') }}
restore-keys: |
${{ steps.runner-env.outputs.slug }}-node-
- name: Install dependencies
run: npm install --save-dev wrangler@latest

View file

@ -11,7 +11,6 @@ on:
- ".gitignore"
- "renovate.json"
- "pkg/**"
- "docker/**"
- "docs/**"
push:
branches:
@ -23,7 +22,6 @@ on:
- ".gitignore"
- "renovate.json"
- "pkg/**"
- "docker/**"
- "docs/**"
# Allows you to run this workflow manually from the Actions tab
workflow_dispatch:
@ -55,6 +53,9 @@ jobs:
let images = []
if (process.env.BUILTIN_REGISTRY_ENABLED === "true") {
images.push(builtinImage)
} else {
// Fallback to official registry for forks/PRs without credentials
images.push('forgejo.ellis.link/continuwuation/continuwuity')
}
core.setOutput('images', images.join("\n"))
core.setOutput('images_list', images.join(","))
@ -100,6 +101,7 @@ jobs:
with:
persist-credentials: false
- name: Install rust
if: ${{ env.BUILDKIT_ENDPOINT == '' }}
id: rust-toolchain
uses: ./.forgejo/actions/rust-toolchain
@ -110,9 +112,11 @@ jobs:
driver: ${{ env.BUILDKIT_ENDPOINT != '' && 'remote' || 'docker-container' }}
endpoint: ${{ env.BUILDKIT_ENDPOINT || '' }}
- name: Set up QEMU
if: ${{ env.BUILDKIT_ENDPOINT == '' }}
uses: docker/setup-qemu-action@v3
# Uses the `docker/login-action` action to log in to the Container registry registry using the account and password that will publish the packages. Once published, the packages are scoped to the account defined here.
- name: Login to builtin registry
if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }}
uses: docker/login-action@v3
with:
registry: ${{ env.BUILTIN_REGISTRY }}
@ -138,15 +142,21 @@ jobs:
run: |
calculatedSha=$(git rev-parse --short ${{ github.sha }})
echo "COMMIT_SHORT_SHA=$calculatedSha" >> $GITHUB_ENV
echo "Short SHA: $calculatedSha"
- name: Get Git commit timestamps
run: echo "TIMESTAMP=$(git log -1 --pretty=%ct)" >> $GITHUB_ENV
run: |
timestamp=$(git log -1 --pretty=%ct)
echo "TIMESTAMP=$timestamp" >> $GITHUB_ENV
echo "Commit timestamp: $timestamp"
- uses: ./.forgejo/actions/timelord
if: ${{ env.BUILDKIT_ENDPOINT == '' }}
with:
key: timelord-v0
path: .
- name: Cache Rust registry
if: ${{ env.BUILDKIT_ENDPOINT == '' }}
uses: actions/cache@v3
with:
path: |
@ -156,6 +166,7 @@ jobs:
.cargo/registry/src
key: rust-registry-image-${{hashFiles('**/Cargo.lock') }}
- name: Cache cargo target
if: ${{ env.BUILDKIT_ENDPOINT == '' }}
id: cache-cargo-target
uses: actions/cache@v3
with:
@ -163,6 +174,7 @@ jobs:
cargo-target-${{ matrix.target_cpu }}-${{ matrix.slug }}-${{ matrix.profile }}
key: cargo-target-${{ matrix.target_cpu }}-${{ matrix.slug }}-${{ matrix.profile }}-${{hashFiles('**/Cargo.lock') }}-${{steps.rust-toolchain.outputs.rustc_version}}
- name: Cache apt cache
if: ${{ env.BUILDKIT_ENDPOINT == '' }}
id: cache-apt
uses: actions/cache@v3
with:
@ -170,6 +182,7 @@ jobs:
var-cache-apt-${{ matrix.slug }}
key: var-cache-apt-${{ matrix.slug }}
- name: Cache apt lib
if: ${{ env.BUILDKIT_ENDPOINT == '' }}
id: cache-apt-lib
uses: actions/cache@v3
with:
@ -177,6 +190,7 @@ jobs:
var-lib-apt-${{ matrix.slug }}
key: var-lib-apt-${{ matrix.slug }}
- name: inject cache into docker
if: ${{ env.BUILDKIT_ENDPOINT == '' }}
uses: https://github.com/reproducible-containers/buildkit-cache-dance@v3.3.0
with:
cache-map: |
@ -199,7 +213,7 @@ jobs:
context: .
file: "docker/Dockerfile"
build-args: |
GIT_COMMIT_HASH=${{ github.sha }})
GIT_COMMIT_HASH=${{ github.sha }}
GIT_COMMIT_HASH_SHORT=${{ env.COMMIT_SHORT_SHA }}
GIT_REMOTE_URL=${{github.event.repository.html_url }}
GIT_REMOTE_COMMIT_URL=${{github.event.head_commit.url }}
@ -209,27 +223,23 @@ jobs:
cache-from: type=gha
# cache-to: type=gha,mode=max
sbom: true
outputs: type=image,"name=${{ needs.define-variables.outputs.images_list }}",push-by-digest=true,name-canonical=true,push=true
outputs: |
${{ env.BUILTIN_REGISTRY_ENABLED == 'true' && format('type=image,"name={0}",push-by-digest=true,name-canonical=true,push=true', needs.define-variables.outputs.images_list) || format('type=image,"name={0}",push=false', needs.define-variables.outputs.images_list) }}
type=local,dest=/tmp/binaries
env:
SOURCE_DATE_EPOCH: ${{ env.TIMESTAMP }}
# For publishing multi-platform manifests
- name: Export digest
if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }}
run: |
mkdir -p /tmp/digests
digest="${{ steps.build.outputs.digest }}"
touch "/tmp/digests/${digest#sha256:}"
- name: Extract binary from container (image)
id: extract-binary-image
run: |
mkdir -p /tmp/binaries
digest="${{ steps.build.outputs.digest }}"
echo "container_id=$(docker create --platform ${{ matrix.platform }} ${{ needs.define-variables.outputs.images_list }}@$digest)" >> $GITHUB_OUTPUT
- name: Extract binary from container (copy)
run: docker cp ${{ steps.extract-binary-image.outputs.container_id }}:/sbin/conduwuit /tmp/binaries/conduwuit-${{ matrix.target_cpu }}-${{ matrix.slug }}-${{ matrix.profile }}
- name: Extract binary from container (cleanup)
run: docker rm ${{ steps.extract-binary-image.outputs.container_id }}
# Binary extracted via local output for all builds
- name: Rename extracted binary
run: mv /tmp/binaries/sbin/conduwuit /tmp/binaries/conduwuit-${{ matrix.target_cpu }}-${{ matrix.slug }}-${{ matrix.profile }}
- name: Upload binary artifact
uses: forgejo/upload-artifact@v4
@ -239,6 +249,7 @@ jobs:
if-no-files-found: error
- name: Upload digest
if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }}
uses: forgejo/upload-artifact@v4
with:
name: digests-${{ matrix.slug }}
@ -251,6 +262,7 @@ jobs:
needs: [define-variables, build-image]
steps:
- name: Download digests
if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }}
uses: forgejo/download-artifact@v4
with:
path: /tmp/digests
@ -258,6 +270,7 @@ jobs:
merge-multiple: true
# Uses the `docker/login-action` action to log in to the Container registry registry using the account and password that will publish the packages. Once published, the packages are scoped to the account defined here.
- name: Login to builtin registry
if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }}
uses: docker/login-action@v3
with:
registry: ${{ env.BUILTIN_REGISTRY }}
@ -265,6 +278,7 @@ jobs:
password: ${{ secrets.BUILTIN_REGISTRY_PASSWORD || secrets.GITHUB_TOKEN }}
- name: Set up Docker Buildx
if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }}
uses: docker/setup-buildx-action@v3
with:
# Use persistent BuildKit if BUILDKIT_ENDPOINT is set (e.g. tcp://buildkit:8125)
@ -272,6 +286,7 @@ jobs:
endpoint: ${{ env.BUILDKIT_ENDPOINT || '' }}
- name: Extract metadata (tags) for Docker
if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }}
id: meta
uses: docker/metadata-action@v5
with:
@ -289,6 +304,7 @@ jobs:
DOCKER_METADATA_ANNOTATIONS_LEVELS: index
- name: Create manifest list and push
if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }}
working-directory: /tmp/digests
env:
IMAGES: ${{needs.define-variables.outputs.images}}
@ -306,6 +322,7 @@ jobs:
done
- name: Inspect image
if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }}
env:
IMAGES: ${{needs.define-variables.outputs.images}}
shell: bash

View file

@ -7,5 +7,6 @@
"continuwuity",
"homeserver",
"homeservers"
]
],
"rust-analyzer.cargo.features": ["full"]
}

View file

@ -1688,11 +1688,9 @@
#stream_amplification = 1024
# Number of sender task workers; determines sender parallelism. Default is
# '0' which means the value is determined internally, likely matching the
# number of tokio worker-threads or number of cores, etc. Override by
# setting a non-zero value.
# number of CPU cores. Override by setting a different value.
#
#sender_workers = 0
#sender_workers = 4
# Enables listener sockets; can be set to false to disable listening. This
# option is intended for developer/diagnostic purposes only.

View file

@ -199,32 +199,57 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \
EOF
# Extract dynamically linked dependencies
RUN <<EOF
RUN <<'DEPS_EOF'
set -o xtrace
mkdir /out/libs
mkdir /out/libs-root
mkdir /out/libs /out/libs-root
# Process each binary
for BINARY in /out/sbin/*; do
lddtree "$BINARY" | awk '{print $(NF-0) " " $1}' | sort -u -k 1,1 | awk '{print "install", "-D", $1, (($2 ~ /^\//) ? "/out/libs-root" $2 : "/out/libs/" $2)}' | xargs -I {} sh -c {}
if lddtree_output=$(lddtree "$BINARY" 2>/dev/null) && [ -n "$lddtree_output" ]; then
echo "$lddtree_output" | awk '{print $(NF-0) " " $1}' | sort -u -k 1,1 | \
awk '{dest = ($2 ~ /^\//) ? "/out/libs-root" $2 : "/out/libs/" $2; print "install -D " $1 " " dest}' | \
while read cmd; do eval "$cmd"; done
fi
done
EOF
# Show what will be copied to runtime
echo "=== Libraries being copied to runtime image:"
find /out/libs* -type f 2>/dev/null | sort || echo "No libraries found"
DEPS_EOF
FROM ubuntu:latest AS prepper
# Create layer structure
RUN mkdir -p /layer1/etc/ssl/certs \
/layer2/usr/lib \
/layer3/sbin /layer3/sbom
# Copy SSL certs and root-path libraries to layer1 (ultra-stable)
COPY --from=base /etc/ssl/certs /layer1/etc/ssl/certs
COPY --from=builder /out/libs-root/ /layer1/
# Copy application libraries to layer2 (semi-stable)
COPY --from=builder /out/libs/ /layer2/usr/lib/
# Copy binaries and SBOM to layer3 (volatile)
COPY --from=builder /out/sbin/ /layer3/sbin/
COPY --from=builder /out/sbom/ /layer3/sbom/
# Fix permissions after copying
RUN chmod -R 755 /layer1 /layer2 /layer3
FROM scratch
WORKDIR /
# Copy root certs for tls into image
# You can also mount the certs from the host
# --volume /etc/ssl/certs:/etc/ssl/certs:ro
COPY --from=base /etc/ssl/certs /etc/ssl/certs
# Copy ultra-stable layer (SSL certs, system libraries)
COPY --from=prepper /layer1/ /
# Copy our build
COPY --from=builder /out/sbin/ /sbin/
# Copy SBOM
COPY --from=builder /out/sbom/ /sbom/
# Copy semi-stable layer (application libraries)
COPY --from=prepper /layer2/ /
# Copy dynamic libraries to root
COPY --from=builder /out/libs-root/ /
COPY --from=builder /out/libs/ /usr/lib/
# Copy volatile layer (binaries, SBOM)
COPY --from=prepper /layer3/ /
# Inform linker where to find libraries
ENV LD_LIBRARY_PATH=/usr/lib

View file

@ -281,15 +281,8 @@ pub(super) async fn get_remote_pdu(
vec![(event_id, value, room_id)]
};
info!("Attempting to handle event ID {event_id} as backfilled PDU");
self.services
.rooms
.timeline
.backfill_pdu(&server, response.pdu)
.await?;
let text = serde_json::to_string_pretty(&json)?;
let msg = "Got PDU from specified server and handled as backfilled";
let msg = "Got PDU from specified server:";
write!(self, "{msg}. Event body:\n```json\n{text}\n```")
},
}

View file

@ -57,5 +57,5 @@ pub(super) async fn pdus(
.try_collect()
.await?;
self.write_str(&format!("{result:#?}")).await
self.write_str(&format!("```\n{result:#?}\n```")).await
}

View file

@ -35,7 +35,6 @@ use ruma::{
};
use tracing::warn;
use super::utils::{count_to_token, parse_pagination_token as parse_token};
use crate::Ruma;
/// list of safe and common non-state events to ignore if the user is ignored
@ -85,14 +84,14 @@ pub(crate) async fn get_message_events_route(
let from: PduCount = body
.from
.as_deref()
.map(parse_token)
.map(str::parse)
.transpose()?
.unwrap_or_else(|| match body.dir {
| Direction::Forward => PduCount::min(),
| Direction::Backward => PduCount::max(),
});
let to: Option<PduCount> = body.to.as_deref().map(parse_token).transpose()?;
let to: Option<PduCount> = body.to.as_deref().map(str::parse).transpose()?;
let limit: usize = body
.limit
@ -181,8 +180,8 @@ pub(crate) async fn get_message_events_route(
.collect();
Ok(get_message_events::v3::Response {
start: count_to_token(from),
end: next_token.map(count_to_token),
start: from.to_string(),
end: next_token.as_ref().map(PduCount::to_string),
chunk,
state,
})

View file

@ -37,7 +37,6 @@ pub(super) mod typing;
pub(super) mod unstable;
pub(super) mod unversioned;
pub(super) mod user_directory;
pub(super) mod utils;
pub(super) mod voip;
pub(super) mod well_known;

View file

@ -18,7 +18,6 @@ use ruma::{
events::{TimelineEventType, relation::RelationType},
};
use super::utils::{count_to_token, parse_pagination_token as parse_token};
use crate::Ruma;
/// # `GET /_matrix/client/r0/rooms/{roomId}/relations/{eventId}/{relType}/{eventType}`
@ -111,14 +110,14 @@ async fn paginate_relations_with_filter(
dir: Direction,
) -> Result<get_relating_events::v1::Response> {
let start: PduCount = from
.map(parse_token)
.map(str::parse)
.transpose()?
.unwrap_or_else(|| match dir {
| Direction::Forward => PduCount::min(),
| Direction::Backward => PduCount::max(),
});
let to: Option<PduCount> = to.map(parse_token).transpose()?;
let to: Option<PduCount> = to.map(str::parse).transpose()?;
// Use limit or else 30, with maximum 100
let limit: usize = limit
@ -193,7 +192,7 @@ async fn paginate_relations_with_filter(
| Direction::Forward => events.last(),
| Direction::Backward => events.first(),
}
.map(|(count, _)| count_to_token(*count))
.map(|(count, _)| count.to_string())
} else {
None
};

View file

@ -18,7 +18,7 @@ pub(crate) async fn get_room_event_route(
let event = services
.rooms
.timeline
.get_pdu(event_id)
.get_remote_pdu(room_id, event_id)
.map_err(|_| err!(Request(NotFound("Event {} not found.", event_id))));
let visible = services
@ -33,11 +33,6 @@ pub(crate) async fn get_room_event_route(
return Err!(Request(Forbidden("You don't have permission to view this event.")));
}
debug_assert!(
event.event_id() == event_id && event.room_id() == room_id,
"Fetched PDU must match requested"
);
event.add_age().ok();
Ok(get_room_event::v3::Response { event: event.into_format() })

View file

@ -1,28 +0,0 @@
use conduwuit::{
Result, err,
matrix::pdu::{PduCount, ShortEventId},
};
/// Parse a pagination token, trying ShortEventId first, then falling back to
/// PduCount
pub(crate) fn parse_pagination_token(token: &str) -> Result<PduCount> {
// Try parsing as ShortEventId first
if let Ok(shorteventid) = token.parse::<ShortEventId>() {
// ShortEventId maps directly to a PduCount in our database
Ok(PduCount::Normal(shorteventid))
} else if let Ok(count) = token.parse::<u64>() {
// Fallback to PduCount for backwards compatibility
Ok(PduCount::Normal(count))
} else if let Ok(count) = token.parse::<i64>() {
// Also handle negative counts for backfilled events
Ok(PduCount::from_signed(count))
} else {
Err(err!(Request(InvalidParam("Invalid pagination token"))))
}
}
/// Convert a PduCount to a token string (using the underlying ShortEventId)
pub(crate) fn count_to_token(count: PduCount) -> String {
// The PduCount's unsigned value IS the ShortEventId
count.into_unsigned().to_string()
}

View file

@ -3,9 +3,9 @@ use std::{collections::BTreeMap, net::IpAddr, time::Instant};
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{
Err, Error, Result, debug,
Err, Error, Result,
debug::INFO_SPAN_LEVEL,
debug_warn, err, error,
debug_warn, err, error, info,
result::LogErr,
trace,
utils::{
@ -79,23 +79,14 @@ pub(crate) async fn send_transaction_message_route(
}
let txn_start_time = Instant::now();
let transaction_id = body.transaction_id.to_string();
trace!(
pdus = body.pdus.len(),
edus = body.edus.len(),
elapsed = ?txn_start_time.elapsed(),
id = ?body.transaction_id,
origin =?body.origin(),
"Starting txn",
id = transaction_id,
"Processing transaction",
);
let pdus = body
.pdus
.iter()
.stream()
.broad_then(|pdu| services.rooms.event_handler.parse_incoming_pdu(pdu))
.inspect_err(|e| debug_warn!("Could not parse PDU: {e}"))
.ready_filter_map(Result::ok);
let edus = body
.edus
.iter()
@ -104,14 +95,29 @@ pub(crate) async fn send_transaction_message_route(
.filter_map(Result::ok)
.stream();
let results = handle(&services, &client, body.origin(), txn_start_time, pdus, edus).await?;
let pdus = body
.pdus
.iter()
.stream()
.broad_then(|pdu| services.rooms.event_handler.parse_incoming_pdu(pdu))
.inspect_err(|e| debug_warn!("Could not parse PDU: {e}"))
.ready_filter_map(Result::ok);
debug!(
trace!(
pdus = body.pdus.len(),
edus = body.edus.len(),
elapsed = ?txn_start_time.elapsed(),
id = ?body.transaction_id,
origin =?body.origin(),
id = transaction_id,
"Validated transaction",
);
let results = handle(&services, &client, body.origin(), txn_start_time, pdus, edus, &*transaction_id).await?;
info!(
pdus = body.pdus.len(),
edus = body.edus.len(),
elapsed = ?txn_start_time.elapsed(),
id = transaction_id,
"Finished txn",
);
for (id, result) in &results {
@ -137,7 +143,13 @@ async fn handle(
started: Instant,
pdus: impl Stream<Item = Pdu> + Send,
edus: impl Stream<Item = Edu> + Send,
transaction_id: &str,
) -> Result<ResolvedMap> {
let handle_start = Instant::now();
edus.for_each_concurrent(automatic_width(), |edu| handle_edu(services, client, origin, edu))
.boxed()
.await;
let pdu_start = Instant::now();
// group pdus by room
let pdus = pdus
.collect()
@ -148,13 +160,14 @@ async fn handle(
.collect()
})
.await;
let results_start = Instant::now();
// we can evaluate rooms concurrently
let results: ResolvedMap = pdus
.into_iter()
.try_stream()
.broad_and_then(|(room_id, pdus): (_, Vec<_>)| {
handle_room(services, client, origin, started, room_id, pdus.into_iter())
let count = pdus.len();
handle_room(services, client, origin, started, room_id, pdus.into_iter(), count, transaction_id)
.map_ok(Vec::into_iter)
.map_ok(IterStream::try_stream)
})
@ -162,12 +175,14 @@ async fn handle(
.try_collect()
.boxed()
.await?;
// evaluate edus after pdus, at least for now.
edus.for_each_concurrent(automatic_width(), |edu| handle_edu(services, client, origin, edu))
.boxed()
.await;
let handle_stop = Instant::now();
info!(
edus = pdu_start.saturating_duration_since(handle_start).as_micros(),
pdus = results_start.saturating_duration_since(pdu_start).as_micros(),
handle_room = handle_stop.saturating_duration_since(results_start).as_micros(),
transaction_id = ?transaction_id,
"handled incoming transaction",
);
Ok(results)
}
@ -178,19 +193,35 @@ async fn handle_room(
txn_start_time: Instant,
room_id: OwnedRoomId,
pdus: impl Iterator<Item = Pdu> + Send,
count: usize,
transaction_id: &str,
) -> Result<Vec<(OwnedEventId, Result)>> {
let room_lock_start = Instant::now();
let _room_lock = services
.rooms
.event_handler
.mutex_federation
.lock(&room_id)
.await;
let room_lock_end = Instant::now();
let room_id = &room_id;
let mut n = 0;
pdus.try_stream()
.and_then(|(_, event_id, value)| async move {
services.server.check_running()?;
let pdu_start_time = Instant::now();
trace!(
%room_id,
%event_id,
transaction_id = ?transaction_id,
pdu = n + 1,
total = count,
room_lock_time = ?room_lock_end.saturating_duration_since(room_lock_start).as_micros(),
pdu_elapsed = ?pdu_start_time.elapsed(),
txn_elapsed = ?txn_start_time.elapsed(),
"Handling PDU",
);
let result = services
.rooms
.event_handler
@ -198,11 +229,18 @@ async fn handle_room(
.await
.map(|_| ());
debug!(
info!(
%room_id,
%event_id,
transaction_id = ?transaction_id,
pdu = n + 1,
total = count,
room_lock_time = ?room_lock_end.saturating_duration_since(room_lock_start).as_micros(),
pdu_elapsed = ?pdu_start_time.elapsed(),
txn_elapsed = ?txn_start_time.elapsed(),
"Finished PDU {event_id}",
"Finished handling PDU",
);
n += 1;
Ok((event_id, result))
})

View file

@ -1,12 +1,13 @@
#![allow(deprecated)]
use std::borrow::Borrow;
use std::{borrow::Borrow, time::Instant, vec};
use axum::extract::State;
use conduwuit::{
Err, Result, at, err,
Err, Event, Result, at, debug, err, info,
matrix::event::gen_event_id_canonical_json,
utils::stream::{IterStream, TryBroadbandExt},
trace,
utils::stream::{BroadbandExt, IterStream, TryBroadbandExt},
warn,
};
use conduwuit_service::Services;
@ -25,12 +26,14 @@ use serde_json::value::{RawValue as RawJsonValue, to_raw_value};
use crate::Ruma;
/// helper method for /send_join v1 and v2
#[tracing::instrument(skip(services, pdu, omit_members), fields(room_id = room_id.as_str(), origin = origin.as_str()))]
async fn create_join_event(
services: &Services,
origin: &ServerName,
room_id: &RoomId,
pdu: &RawJsonValue,
) -> Result<create_join_event::v1::RoomState> {
omit_members: bool,
) -> Result<create_join_event::v2::RoomState> {
if !services.rooms.metadata.exists(room_id).await {
return Err!(Request(NotFound("Room is unknown to this server.")));
}
@ -53,8 +56,10 @@ async fn create_join_event(
// We do not add the event_id field to the pdu here because of signature and
// hashes checks
trace!("Getting room version");
let room_version_id = services.rooms.state.get_room_version(room_id).await?;
trace!("Generating event ID and converting to canonical json");
let Ok((event_id, mut value)) = gen_event_id_canonical_json(pdu, &room_version_id) else {
// Event could not be converted to canonical json
return Err!(Request(BadJson("Could not convert event to canonical json.")));
@ -103,7 +108,6 @@ async fn create_join_event(
)));
}
// ACL check sender user server name
let sender: OwnedUserId = serde_json::from_value(
value
.get("sender")
@ -113,12 +117,6 @@ async fn create_join_event(
)
.map_err(|e| err!(Request(BadJson(warn!("sender property is not a valid user ID: {e}")))))?;
services
.rooms
.event_handler
.acl_check(sender.server_name(), room_id)
.await?;
// check if origin server is trying to send for another server
if sender.server_name() != origin {
return Err!(Request(Forbidden("Not allowed to join on behalf of another server.")));
@ -180,11 +178,6 @@ async fn create_join_event(
}
}
services
.server_keys
.hash_and_sign_event(&mut value, &room_version_id)
.map_err(|e| err!(Request(InvalidParam(warn!("Failed to sign send_join event: {e}")))))?;
let origin: OwnedServerName = serde_json::from_value(
value
.get("origin")
@ -194,6 +187,12 @@ async fn create_join_event(
)
.map_err(|e| err!(Request(BadJson("Event has an invalid origin server name: {e}"))))?;
trace!("Signing send_join event");
services
.server_keys
.hash_and_sign_event(&mut value, &room_version_id)
.map_err(|e| err!(Request(InvalidParam(warn!("Failed to sign send_join event: {e}")))))?;
let mutex_lock = services
.rooms
.event_handler
@ -201,6 +200,7 @@ async fn create_join_event(
.lock(room_id)
.await;
trace!("Acquired send_join mutex, persisting join event");
let pdu_id = services
.rooms
.event_handler
@ -210,7 +210,7 @@ async fn create_join_event(
.ok_or_else(|| err!(Request(InvalidParam("Could not accept as timeline event."))))?;
drop(mutex_lock);
trace!("Fetching current state IDs");
let state_ids: Vec<OwnedEventId> = services
.rooms
.state_accessor
@ -219,9 +219,23 @@ async fn create_join_event(
.collect()
.await;
trace!(%omit_members, "Constructing current state");
let state = state_ids
.iter()
.try_stream()
.broad_filter_map(|event_id| async move {
if omit_members {
if let Ok(e) = event_id.as_ref() {
let pdu = services.rooms.timeline.get_pdu(e).await;
if pdu.is_ok_and(|p| p.kind().to_cow_str() == "m.room.member") {
trace!("omitting member event {e:?} from returned state");
// skip members
return None;
}
}
}
Some(event_id)
})
.broad_and_then(|event_id| services.rooms.timeline.get_pdu_json(event_id))
.broad_and_then(|pdu| {
services
@ -234,6 +248,7 @@ async fn create_join_event(
.await?;
let starting_events = state_ids.iter().map(Borrow::borrow);
trace!("Constructing auth chain");
let auth_chain = services
.rooms
.auth_chain
@ -250,13 +265,37 @@ async fn create_join_event(
.try_collect()
.boxed()
.await?;
info!(fast_join = %omit_members, "Sending join event to other servers");
services.sending.send_pdu_room(room_id, &pdu_id).await?;
Ok(create_join_event::v1::RoomState {
debug!("Finished sending join event");
let servers_in_room: Option<Vec<_>> = if !omit_members {
None
} else {
trace!("Fetching list of servers in room");
let servers: Vec<String> = services
.rooms
.state_cache
.room_servers(room_id)
.map(|sn| sn.as_str().to_owned())
.collect()
.await;
// If there's no servers, just add us
let servers = if servers.is_empty() {
warn!("Failed to find any servers, adding our own server name as a last resort");
vec![services.globals.server_name().to_string()]
} else {
trace!("Found {} servers in room", servers.len());
servers
};
Some(servers)
};
debug!("Returning send_join data");
Ok(create_join_event::v2::RoomState {
auth_chain,
state,
event: to_raw_value(&CanonicalJsonValue::Object(value)).ok(),
members_omitted: omit_members,
servers_in_room,
})
}
@ -294,11 +333,23 @@ pub(crate) async fn create_join_event_v1_route(
}
}
let room_state = create_join_event(&services, body.origin(), &body.room_id, &body.pdu)
let now = Instant::now();
let room_state = create_join_event(&services, body.origin(), &body.room_id, &body.pdu, false)
.boxed()
.await?;
let transformed = create_join_event::v1::RoomState {
auth_chain: room_state.auth_chain,
state: room_state.state,
event: room_state.event,
};
info!(
"Finished sending a join for {} in {} in {:?}",
body.origin(),
&body.room_id,
now.elapsed()
);
Ok(create_join_event::v1::Response { room_state })
Ok(create_join_event::v1::Response { room_state: transformed })
}
/// # `PUT /_matrix/federation/v2/send_join/{roomId}/{eventId}`
@ -329,17 +380,17 @@ pub(crate) async fn create_join_event_v2_route(
}
}
let create_join_event::v1::RoomState { auth_chain, state, event } =
create_join_event(&services, body.origin(), &body.room_id, &body.pdu)
let now = Instant::now();
let room_state =
create_join_event(&services, body.origin(), &body.room_id, &body.pdu, body.omit_members)
.boxed()
.await?;
let room_state = create_join_event::v2::RoomState {
members_omitted: false,
auth_chain,
state,
event,
servers_in_room: None,
};
info!(
"Finished sending a join for {} in {} in {:?}",
body.origin(),
&body.room_id,
now.elapsed()
);
Ok(create_join_event::v2::Response { room_state })
}

View file

@ -1929,12 +1929,10 @@ pub struct Config {
pub stream_amplification: usize,
/// Number of sender task workers; determines sender parallelism. Default is
/// '0' which means the value is determined internally, likely matching the
/// number of tokio worker-threads or number of cores, etc. Override by
/// setting a non-zero value.
/// core count. Override by setting a different value.
///
/// default: 0
#[serde(default)]
/// default: core count
#[serde(default = "default_sender_workers")]
pub sender_workers: usize,
/// Enables listener sockets; can be set to false to disable listening. This
@ -2277,45 +2275,48 @@ fn default_database_backups_to_keep() -> i16 { 1 }
fn default_db_write_buffer_capacity_mb() -> f64 { 48.0 + parallelism_scaled_f64(4.0) }
fn default_db_cache_capacity_mb() -> f64 { 128.0 + parallelism_scaled_f64(64.0) }
fn default_db_cache_capacity_mb() -> f64 { 512.0 + parallelism_scaled_f64(512.0) }
fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(10_000).saturating_add(100_000) }
fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(50_000).saturating_add(100_000) }
fn default_cache_capacity_modifier() -> f64 { 1.0 }
fn default_auth_chain_cache_capacity() -> u32 {
parallelism_scaled_u32(10_000).saturating_add(100_000)
}
fn default_shorteventid_cache_capacity() -> u32 {
parallelism_scaled_u32(50_000).saturating_add(100_000)
}
fn default_shorteventid_cache_capacity() -> u32 {
parallelism_scaled_u32(100_000).saturating_add(100_000)
}
fn default_eventidshort_cache_capacity() -> u32 {
parallelism_scaled_u32(25_000).saturating_add(100_000)
parallelism_scaled_u32(50_000).saturating_add(100_000)
}
fn default_eventid_pdu_cache_capacity() -> u32 {
parallelism_scaled_u32(25_000).saturating_add(100_000)
parallelism_scaled_u32(50_000).saturating_add(100_000)
}
fn default_shortstatekey_cache_capacity() -> u32 {
parallelism_scaled_u32(10_000).saturating_add(100_000)
parallelism_scaled_u32(50_000).saturating_add(100_000)
}
fn default_statekeyshort_cache_capacity() -> u32 {
parallelism_scaled_u32(10_000).saturating_add(100_000)
parallelism_scaled_u32(50_000).saturating_add(100_000)
}
fn default_servernameevent_data_cache_capacity() -> u32 {
parallelism_scaled_u32(100_000).saturating_add(500_000)
parallelism_scaled_u32(100_000).saturating_add(100_000)
}
fn default_stateinfo_cache_capacity() -> u32 { parallelism_scaled_u32(100) }
fn default_stateinfo_cache_capacity() -> u32 {
parallelism_scaled_u32(500).clamp(100, 12000)
}
fn default_roomid_spacehierarchy_cache_capacity() -> u32 { parallelism_scaled_u32(1000) }
fn default_roomid_spacehierarchy_cache_capacity() -> u32 {
parallelism_scaled_u32(500).clamp(100, 12000) }
fn default_dns_cache_entries() -> u32 { 32768 }
fn default_dns_cache_entries() -> u32 { 327680 }
fn default_dns_min_ttl() -> u64 { 60 * 180 }
@ -2508,14 +2509,13 @@ fn default_admin_log_capture() -> String {
fn default_admin_room_tag() -> String { "m.server_notice".to_owned() }
#[allow(clippy::as_conversions, clippy::cast_precision_loss)]
fn parallelism_scaled_f64(val: f64) -> f64 { val * (sys::available_parallelism() as f64) }
pub fn parallelism_scaled_f64(val: f64) -> f64 { val * (sys::available_parallelism() as f64) }
fn parallelism_scaled_u32(val: u32) -> u32 {
let val = val.try_into().expect("failed to cast u32 to usize");
parallelism_scaled(val).try_into().unwrap_or(u32::MAX)
}
pub fn parallelism_scaled_u32(val: u32) -> u32 { val.saturating_mul(sys::available_parallelism() as u32) }
fn parallelism_scaled(val: usize) -> usize { val.saturating_mul(sys::available_parallelism()) }
pub fn parallelism_scaled_i32(val: i32) -> i32 { val.saturating_mul(sys::available_parallelism() as i32) }
pub fn parallelism_scaled(val: usize) -> usize { val.saturating_mul(sys::available_parallelism()) }
fn default_trusted_server_batch_size() -> usize { 256 }
@ -2535,6 +2535,8 @@ fn default_stream_width_scale() -> f32 { 1.0 }
fn default_stream_amplification() -> usize { 1024 }
fn default_sender_workers() -> usize { parallelism_scaled(1) }
fn default_client_receive_timeout() -> u64 { 75 }
fn default_client_request_timeout() -> u64 { 180 }

View file

@ -1,6 +1,10 @@
#![allow(clippy::cast_possible_wrap, clippy::cast_sign_loss, clippy::as_conversions)]
use std::{cmp::Ordering, fmt, fmt::Display, str::FromStr};
use std::{
cmp::Ordering,
fmt::{self, Display},
str::FromStr,
};
use ruma::api::Direction;

View file

@ -13,6 +13,7 @@ use ruma::{
power_levels::RoomPowerLevelsEventContent,
third_party_invite::RoomThirdPartyInviteEventContent,
},
EventId,
int,
serde::{Base64, Raw},
};
@ -21,7 +22,6 @@ use serde::{
de::{Error as _, IgnoredAny},
};
use serde_json::{from_str as from_json_str, value::RawValue as RawJsonValue};
use super::{
Error, Event, Result, StateEventType, StateKey, TimelineEventType,
power_levels::{
@ -149,8 +149,8 @@ where
for<'a> &'a E: Event + Send,
{
debug!(
event_id = %incoming_event.event_id(),
event_type = ?incoming_event.event_type(),
event_id = format!("{}", incoming_event.event_id()),
event_type = format!("{}", incoming_event.event_type()),
"auth_check beginning"
);
@ -219,7 +219,7 @@ where
/*
// TODO: In the past this code was commented as it caused problems with Synapse. This is no
// longer the case. This needs to be implemented.
// See also: https://github.com/ruma/ruma/pull/2064
// See also: https://github.com/ruma/ruma/pull/2064
//
// 2. Reject if auth_events
// a. auth_events cannot have duplicate keys since it's a BTree
@ -251,11 +251,31 @@ where
let room_create_event = match room_create_event {
| None => {
warn!("no m.room.create event in auth chain");
error!(
create_event = room_create_event.as_ref().map(Event::event_id).unwrap_or(<&EventId>::try_from("$unknown").unwrap()).as_str(),
power_levels = power_levels_event.as_ref().map(Event::event_id).unwrap_or(<&EventId>::try_from("$unknown").unwrap()).as_str(),
member_event = sender_member_event.as_ref().map(Event::event_id).unwrap_or(<&EventId>::try_from("$unknown").unwrap()).as_str(),
"no m.room.create event found for {} ({})!",
incoming_event.event_id().as_str(),
incoming_event.room_id().as_str()
);
return Ok(false);
},
| Some(e) => e,
};
// just re-check 1.2 to work around a bug
let Some(room_id_server_name) = incoming_event.room_id().server_name() else {
warn!("room ID has no servername");
return Ok(false);
};
if room_id_server_name != room_create_event.sender().server_name() {
warn!(
"servername of room ID origin ({}) does not match servername of m.room.create sender ({})",
room_id_server_name,
room_create_event.sender().server_name());
return Ok(false);
}
if incoming_event.room_id() != room_create_event.room_id() {
warn!("room_id of incoming event does not match room_id of m.room.create event");

View file

@ -733,8 +733,12 @@ where
Fut: Future<Output = Option<E>> + Send,
E: Event + Send + Sync,
{
let mut room_id = None;
while let Some(sort_ev) = event {
debug!(event_id = sort_ev.event_id().as_str(), "mainline");
if room_id.is_none() {
room_id = Some(sort_ev.room_id().to_owned());
}
let id = sort_ev.event_id();
if let Some(depth) = mainline_map.get(id) {
@ -753,7 +757,7 @@ where
}
}
}
// Did not find a power level event so we default to zero
warn!("could not find a power event in the mainline map for {room_id:?}, defaulting to zero depth");
Ok(0)
}

View file

@ -29,7 +29,7 @@ fn descriptor_cf_options(
set_table_options(&mut opts, &desc, cache)?;
opts.set_min_write_buffer_number(1);
opts.set_max_write_buffer_number(2);
opts.set_max_write_buffer_number(3);
opts.set_write_buffer_size(desc.write_size);
opts.set_target_file_size_base(desc.file_size);

View file

@ -1,10 +1,11 @@
use std::{fmt::Debug, mem};
use std::{
error::Error as _,
fmt::{Debug, Write},
mem,
};
use bytes::Bytes;
use conduwuit::{
Err, Error, Result, debug, debug::INFO_SPAN_LEVEL, debug_error, debug_warn, err,
error::inspect_debug_log, implement, trace, utils::string::EMPTY,
};
use conduwuit::{Err, Error, Result, debug, debug::INFO_SPAN_LEVEL, err, error::inspect_debug_log, implement, trace, utils::string::EMPTY, info};
use http::{HeaderValue, header::AUTHORIZATION};
use ipaddress::IPAddress;
use reqwest::{Client, Method, Request, Response, Url};
@ -193,9 +194,9 @@ fn handle_error(
) -> Result {
if e.is_timeout() || e.is_connect() {
e = e.without_url();
debug_warn!("{e:?}");
trace!(?url, "network error while sending federation request: {e:?}");
} else if e.is_redirect() {
debug_error!(
trace!(
method = ?method,
url = ?url,
final_url = ?e.url(),
@ -204,9 +205,17 @@ fn handle_error(
e,
);
} else {
debug_error!("{e:?}");
trace!(?url, "failed to send federation request: {e:?}");
}
let mut nice_error = "Request failed".to_owned();
let mut src = e.source();
while let Some(source) = src {
write!(nice_error, ": {source:?}").expect("writing to string should not fail");
src = source.source();
}
info!(nice_error, "Federation request error");
Err(e.into())
}

View file

@ -100,7 +100,7 @@ impl Service {
/// Pings the presence of the given user in the given room, setting the
/// specified state.
pub async fn ping_presence(&self, user_id: &UserId, new_state: &PresenceState) -> Result<()> {
const REFRESH_TIMEOUT: u64 = 60 * 1000;
const REFRESH_TIMEOUT: u64 = 60 * 1000 * 4;
let last_presence = self.db.get_presence(user_id).await;
let state_changed = match last_presence {

View file

@ -53,9 +53,9 @@ impl Resolver {
opts.cache_size = config.dns_cache_entries as usize;
opts.preserve_intermediates = true;
opts.negative_min_ttl = Some(Duration::from_secs(config.dns_min_ttl_nxdomain));
opts.negative_max_ttl = Some(Duration::from_secs(60 * 60 * 24 * 30));
opts.negative_max_ttl = Some(Duration::from_secs(60 * 60 * 24));
opts.positive_min_ttl = Some(Duration::from_secs(config.dns_min_ttl));
opts.positive_max_ttl = Some(Duration::from_secs(60 * 60 * 24 * 7));
opts.positive_max_ttl = Some(Duration::from_secs(60 * 60 * 24));
opts.timeout = Duration::from_secs(config.dns_timeout);
opts.attempts = config.dns_attempts as usize;
opts.try_tcp_on_error = config.dns_tcp_fallback;

View file

@ -3,11 +3,7 @@ use std::{
time::Instant,
};
use conduwuit::{
Event, PduEvent, debug, debug_error, debug_warn, implement,
matrix::event::gen_event_id_canonical_json, trace, utils::continue_exponential_backoff_secs,
warn,
};
use conduwuit::{Event, PduEvent, debug, debug_error, implement, matrix::event::gen_event_id_canonical_json, trace, utils::continue_exponential_backoff_secs, warn, debug_warn};
use ruma::{
CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName,
api::federation::event::get_event,
@ -79,7 +75,7 @@ where
{
// Exponential backoff
const MIN_DURATION: u64 = 60 * 2;
const MAX_DURATION: u64 = 60 * 60 * 8;
const MAX_DURATION: u64 = 60 * 60;
if continue_exponential_backoff_secs(
MIN_DURATION,
MAX_DURATION,

View file

@ -122,11 +122,8 @@ where
}
// The original create event must be in the auth events
if !matches!(
auth_events.get(&(StateEventType::RoomCreate, String::new().into())),
Some(_) | None
) {
return Err!(Request(InvalidParam("Incoming event refers to wrong create event.")));
if !auth_events.contains_key(&(StateEventType::RoomCreate, String::new().into())) {
return Err!(Request(InvalidParam("Incoming event refers to wrong create event. event_id={event_id}")));
}
let state_fetch = |ty: &StateEventType, sk: &str| {

View file

@ -46,7 +46,7 @@ where
{
// Exponential backoff
const MIN_DURATION: u64 = 5 * 60;
const MAX_DURATION: u64 = 60 * 60 * 24;
const MAX_DURATION: u64 = 60 * 60;
if continue_exponential_backoff_secs(MIN_DURATION, MAX_DURATION, time.elapsed(), *tries) {
debug!(
?tries,

View file

@ -5,7 +5,7 @@ use conduwuit::{
matrix::{Event, EventTypeExt, PduEvent, StateKey, state_res},
trace,
utils::stream::{BroadbandExt, ReadyExt},
warn,
warn
};
use futures::{FutureExt, StreamExt, future::ready};
use ruma::{CanonicalJsonValue, RoomId, ServerName, events::StateEventType};
@ -193,6 +193,8 @@ where
.collect()
.await;
if extremities.len() == 0 { info!("Retained zero extremities when upgrading outlier PDU to timeline PDU with {} previous events, event id: {}", incoming_pdu.prev_events.len(), incoming_pdu.event_id) }
debug!(
"Retained {} extremities checked against {} prev_events",
extremities.len(),

View file

@ -452,7 +452,7 @@ async fn get_statediff(&self, shortstatehash: ShortStateHash) -> Result<StateDif
.ok()
.take_if(|parent| *parent != 0);
debug_assert!(value.len() % STRIDE == 0, "value not aligned to stride");
debug_assert!(value.len().is_multiple_of(STRIDE), "value not aligned to stride");
let _num_values = value.len() / STRIDE;
let mut add_mode = true;

View file

@ -1,7 +1,8 @@
use std::iter::once;
use conduwuit::{Err, PduEvent};
use conduwuit_core::{
Result, debug, debug_warn, implement, info,
Result, debug, debug_warn, err, implement, info,
matrix::{
event::Event,
pdu::{PduCount, PduId, RawPduId},
@ -11,7 +12,7 @@ use conduwuit_core::{
};
use futures::{FutureExt, StreamExt};
use ruma::{
RoomId, ServerName,
CanonicalJsonObject, EventId, RoomId, ServerName,
api::federation,
events::{
StateEventType, TimelineEventType, room::power_levels::RoomPowerLevelsEventContent,
@ -100,7 +101,7 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re
.boxed();
while let Some(ref backfill_server) = servers.next().await {
info!("Asking {backfill_server} for backfill");
info!("Asking {backfill_server} for backfill in {room_id}");
let response = self
.services
.sending
@ -128,10 +129,126 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re
}
}
info!("No servers could backfill, but backfill was needed in room {room_id}");
warn!("No servers could backfill, but backfill was needed in room {room_id}");
Ok(())
}
#[implement(super::Service)]
#[tracing::instrument(name = "get_remote_pdu", level = "debug", skip(self))]
pub async fn get_remote_pdu(&self, room_id: &RoomId, event_id: &EventId) -> Result<PduEvent> {
let local = self.get_pdu(event_id).await;
if local.is_ok() {
// We already have this PDU, no need to backfill
debug!("We already have {event_id} in {room_id}, no need to backfill.");
return local;
}
debug!("Preparing to fetch event {event_id} in room {room_id} from remote servers.");
// Similar to backfill_if_required, but only for a single PDU
// Fetch a list of servers to try
if self
.services
.state_cache
.room_joined_count(room_id)
.await
.is_ok_and(|count| count <= 1)
&& !self
.services
.state_accessor
.is_world_readable(room_id)
.await
{
// Room is empty (1 user or none), there is no one that can backfill
return Err!(Request(NotFound("No one can backfill this PDU, room is empty.")));
}
let power_levels: RoomPowerLevelsEventContent = self
.services
.state_accessor
.room_state_get_content(room_id, &StateEventType::RoomPowerLevels, "")
.await
.unwrap_or_default();
let room_mods = power_levels.users.iter().filter_map(|(user_id, level)| {
if level > &power_levels.users_default && !self.services.globals.user_is_local(user_id) {
Some(user_id.server_name())
} else {
None
}
});
let canonical_room_alias_server = once(
self.services
.state_accessor
.get_canonical_alias(room_id)
.await,
)
.filter_map(Result::ok)
.map(|alias| alias.server_name().to_owned())
.stream();
let mut servers = room_mods
.stream()
.map(ToOwned::to_owned)
.chain(canonical_room_alias_server)
.chain(
self.services
.server
.config
.trusted_servers
.iter()
.map(ToOwned::to_owned)
.stream(),
)
.ready_filter(|server_name| !self.services.globals.server_is_ours(server_name))
.filter_map(|server_name| async move {
self.services
.state_cache
.server_in_room(&server_name, room_id)
.await
.then_some(server_name)
})
.boxed();
while let Some(ref backfill_server) = servers.next().await {
info!("Asking {backfill_server} for event {}", event_id);
let value = self
.services
.sending
.send_federation_request(backfill_server, federation::event::get_event::v1::Request {
event_id: event_id.to_owned(),
include_unredacted_content: Some(false),
})
.await
.and_then(|response| {
serde_json::from_str::<CanonicalJsonObject>(response.pdu.get()).map_err(|e| {
err!(BadServerResponse(debug_warn!(
"Error parsing incoming event {e:?} from {backfill_server}"
)))
})
});
let pdu = match value {
| Ok(value) => {
self.services
.event_handler
.handle_incoming_pdu(backfill_server, room_id, event_id, value, false)
.boxed()
.await?;
debug!("Successfully backfilled {event_id} from {backfill_server}");
Some(self.get_pdu(event_id).await)
},
| Err(e) => {
warn!("{backfill_server} failed to provide backfill for room {room_id}: {e}");
None
},
};
if let Some(pdu) = pdu {
debug!("Fetched {event_id} from {backfill_server}");
return pdu;
}
}
Err!("No servers could be used to fetch {} in {}.", room_id, event_id)
}
#[implement(super::Service)]
#[tracing::instrument(skip(self, pdu), level = "debug")]
pub async fn backfill_pdu(&self, origin: &ServerName, pdu: Box<RawJsonValue>) -> Result<()> {

View file

@ -3,8 +3,7 @@ use std::{borrow::Borrow, sync::Arc};
use conduwuit::{
Err, PduCount, PduEvent, Result, at, err,
result::{LogErr, NotFound},
utils,
utils::stream::TryReadyExt,
utils::{self, stream::TryReadyExt},
};
use database::{Database, Deserialized, Json, KeyVal, Map};
use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, future::select_ok, pin_mut};

View file

@ -401,16 +401,10 @@ impl Service {
fn num_senders(args: &crate::Args<'_>) -> usize {
const MIN_SENDERS: usize = 1;
// Limit the number of senders to the number of workers threads or number of
// cores, conservatively.
let max_senders = args
.server
.metrics
.num_workers()
.min(available_parallelism());
// Limit the maximum number of senders to the number of cores.
let max_senders = available_parallelism();
// If the user doesn't override the default 0, this is intended to then default
// to 1 for now as multiple senders is experimental.
// default is 4 senders. clamp between 1 and core count.
args.server
.config
.sender_workers

View file

@ -10,7 +10,7 @@ use std::{
use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD};
use conduwuit_core::{
Error, Event, Result, debug, err, error,
Error, Event, Result, debug, err, error, info,
result::LogErr,
trace,
utils::{
@ -142,7 +142,7 @@ impl Service {
}
fn handle_response_err(dest: Destination, statuses: &mut CurTransactionStatus, e: &Error) {
debug!(dest = ?dest, "{e:?}");
debug!(dest = ?dest, "error response: {e:?}");
statuses.entry(dest).and_modify(|e| {
*e = match e {
| TransactionStatus::Running => TransactionStatus::Failed(1, Instant::now()),
@ -177,7 +177,21 @@ impl Service {
if !new_events.is_empty() {
self.db.mark_as_active(new_events.iter());
let new_events_vec = new_events.into_iter().map(|(_, event)| event).collect();
let new_events_vec: Vec<SendingEvent> =
new_events.into_iter().map(|(_, event)| event).collect();
if let Some(status) = statuses.get(&dest.clone()) {
if matches!(status, TransactionStatus::Running) {
// If the server is in backoff, clear it
info!(
?dest,
"Catching up previously failed destination with {}+ new events",
new_events_vec.len()
);
statuses.insert(dest.clone(), TransactionStatus::Running);
}
}
futures.push(self.send_events(dest.clone(), new_events_vec));
} else {
statuses.remove(dest);
@ -859,16 +873,24 @@ impl Service {
pdus,
edus,
};
let pdu_count = request.pdus.len();
let edu_count = request.edus.len();
let result = self
.services
.federation
.execute_on(&self.services.client.sender, &server, request)
.await;
.await
.inspect(|_| {
trace!(%txn_id, %server, "Sent {} PDUs, {} EDUs", pdu_count, edu_count);
})
.inspect_err(|e| {
info!(%txn_id, %server, "Failed to send transaction ({} PDUs, {} EDUs): {e:?}", pdu_count, edu_count);
});
for (event_id, result) in result.iter().flat_map(|resp| resp.pdus.iter()) {
if let Err(e) = result {
warn!(
trace!(
%txn_id, %server,
"error sending PDU {event_id} to remote server: {e:?}"
);