Compare commits

..

39 commits

Author SHA1 Message Date
Jacob Taylor
efa758c29b Merge remote-tracking branch 'ginger/single-event-backfill' into illegal-car-mods 2025-09-06 11:55:50 -07:00
Ginger
8e7ee1e210 feat: Do not persist remote PDUs fetched with admin commands 2025-09-06 18:06:06 +00:00
Ginger
11c6b3ea0a fix: Fix pagination tokens being corrupted for backfilled PDUs 2025-09-06 18:06:06 +00:00
Ginger
ac552b8829 fix: Put the output of !admin query room-timeline pdus in a codeblock 2025-09-06 18:06:06 +00:00
Ginger
1ad26798a0 fix: Use handle_incoming_pdu directly to keep remote PDUs as outliers 2025-09-06 18:06:06 +00:00
nexy7574
9bff1f6c06 feat: Ask remote servers for individual unknown events 2025-09-06 18:06:06 +00:00
Jacob Taylor
879c8e527b revert get extremity locking 2025-09-06 10:36:19 -07:00
Jacob Taylor
098eb8abca Merge remote-tracking branch 'origin/main' into illegal-car-mods 2025-09-06 10:19:59 -07:00
Jacob Taylor
1f75b0a247 collect room lock timing ??? 2025-08-31 10:26:59 -07:00
nexy7574
f09e1c075d fix(sync/v2): Room leaves being omitted incorrectly
Partially borrowed from 85a84f93c7
2025-08-31 09:16:53 -07:00
Jacob Taylor
8eb5f8466f process edus before pdus here, too 2025-08-31 09:16:53 -07:00
Jacob Taylor
9723753b5c pass and use transaction id to collect timing info 2025-08-31 09:16:52 -07:00
Jacob Taylor
60b8c9f5f2 turns out we need debug_warn 2025-08-26 18:42:40 -07:00
Jacob Taylor
c83f1ddb71 add event_id to log entry 2025-08-26 18:39:37 -07:00
Jacob Taylor
a75eacd544 reduce log volume (keeps 2 infos)
adjust log volume

demote a bunch of logs
2025-08-26 18:39:37 -07:00
Jacob Taylor
e15d71d230 log which room is being backfilled
move more backfill log to info, clean up imports
2025-08-26 18:38:49 -07:00
Jacob Taylor
a7929c1931 fix warn by removing unused debug imports
delete more imports to quiet cargo
2025-08-26 18:36:39 -07:00
Jacob Taylor
7d9b514696 fix warn by removing unused debug imports 2025-08-26 18:10:08 -07:00
Jacob Taylor
4baf48e214 exponential backoff is now just bees. did you want bees? no? well you have them now. congrats 2025-08-26 18:10:08 -07:00
Jacob Taylor
6483e984ce fix too many infos 2025-08-26 18:10:08 -07:00
Jacob Taylor
697e7aa2cd more funny settings (part 3 of 12) 2025-08-26 18:10:08 -07:00
Jacob Taylor
081487a413 sender_workers scaling. this time, with feeling! 2025-08-26 18:10:08 -07:00
Jacob Taylor
3e9cf3f494 vehicle loan documentation now available at window 7 2025-08-26 18:10:08 -07:00
Jacob Taylor
1e4cf59ab8 lock the getter instead ??? c/o M 2025-08-26 18:10:08 -07:00
Jacob Taylor
008d90b118 make fetching key room events less smart 2025-08-26 18:10:07 -07:00
Jacob Taylor
54eab4775a change rocksdb stats level to 3
scale rocksdb background jobs and subcompactions

change rocksdb default error level to info from error

delete unused num_threads function

fix warns from cargo
2025-08-26 18:10:07 -07:00
nexy7574
0a74dfe5a5 log which room struggled to get mainline depth 2025-08-26 18:10:07 -07:00
nexy7574
ecdce68ae3 more logs 2025-08-26 18:10:07 -07:00
nexy7574
43574118aa Fix room ID check 2025-08-26 18:10:07 -07:00
nexy7574
e7399409b4 Kick up a fuss when m.room.create is unfindable 2025-08-26 18:10:07 -07:00
nexy7574
1d97861332 Note about ruma#2064 in TODO 2025-08-26 18:10:07 -07:00
nexy7574
e8bba3ba37 fix an auth rule not applying correctly 2025-08-26 18:10:07 -07:00
Jacob Taylor
a57df9af37 upgrade some settings to enable 5g in continuwuity
enable converged 6g at the edge in continuwuity

better stateinfo_cache_capacity default

better roomid_spacehierarchy_cache_capacity

make sender workers default better and clamp value to core count

update sender workers documentation

add more parallelism_scaled and make them public

update 1 document
2025-08-26 18:10:07 -07:00
Jacob Taylor
45301d4e41 bump the number of allowed immutable memtables by 1, to allow for greater flood protection
this should probably not be applied if you have rocksdb_atomic_flush = false (the default)
2025-08-26 18:10:07 -07:00
nexy7574
7d9ec7a0e5
feat(fed): Handle EDUs before PDUs
Aranje needs his crypto keys
2025-08-25 21:26:56 +01:00
nexy7574
e42e309797
feat(fed): Handle EDUs before PDUs
Aranje needs his crypto keys
2025-08-25 21:26:28 +01:00
nexy7574
4085a90c1f
feat(fed): Something about nicer fed errors 2025-08-20 00:56:55 +01:00
nexy7574
255aa44ecc
fix(fed): Alter log levels to be less noisy 2025-08-15 04:20:03 +01:00
nexy7574
04130dcdd8
fix(fed): Improve transaction flushing 2025-08-15 04:11:54 +01:00
19 changed files with 122 additions and 232 deletions

View file

@ -13,12 +13,6 @@ outputs:
slug:
description: 'Combined OS slug (e.g. Ubuntu-22.04)'
value: ${{ steps.detect.outputs.slug }}
node_major:
description: 'Major version of Node.js if available (e.g. 22)'
value: ${{ steps.detect.outputs.node_major }}
node_version:
description: 'Full Node.js version if available (e.g. 22.19.0)'
value: ${{ steps.detect.outputs.node_version }}
runs:
using: composite
@ -36,20 +30,7 @@ runs:
# Create combined slug
OS_SLUG="${OS_NAME}-${OS_VERSION}"
# Detect Node.js version if available
if command -v node >/dev/null 2>&1; then
NODE_VERSION=$(node --version | sed 's/v//')
NODE_MAJOR=$(echo $NODE_VERSION | cut -d. -f1)
echo "node_version=${NODE_VERSION}" >> $GITHUB_OUTPUT
echo "node_major=${NODE_MAJOR}" >> $GITHUB_OUTPUT
echo "🔍 Detected Node.js: v${NODE_VERSION}"
else
echo "node_version=" >> $GITHUB_OUTPUT
echo "node_major=" >> $GITHUB_OUTPUT
echo "🔍 Node.js not found"
fi
# Set OS outputs
# Set outputs
echo "name=${OS_NAME}" >> $GITHUB_OUTPUT
echo "version=${OS_VERSION}" >> $GITHUB_OUTPUT
echo "slug=${OS_SLUG}" >> $GITHUB_OUTPUT

View file

@ -2,12 +2,18 @@ name: sccache
description: |
Install sccache for caching builds in GitHub Actions.
inputs:
token:
description: 'A Github PAT'
required: false
runs:
using: composite
steps:
- name: Install sccache
uses: https://git.tomfos.tr/tom/sccache-action@v1
uses: https://github.com/mozilla-actions/sccache-action@v0.0.9
with:
token: ${{ inputs.token }}
- name: Configure sccache
uses: https://github.com/actions/github-script@v7
with:

View file

@ -88,9 +88,19 @@ runs:
# Shared toolchain cache across all Rust versions
key: toolchain-${{ steps.runner-os.outputs.slug }}
- name: Debug GitHub token availability
shell: bash
run: |
if [ -z "${{ inputs.github-token }}" ]; then
echo "⚠️ No GitHub token provided - sccache will use fallback download method"
else
echo "✅ GitHub token provided for sccache"
fi
- name: Setup sccache
uses: https://git.tomfos.tr/tom/sccache-action@v1
uses: https://github.com/mozilla-actions/sccache-action@v0.0.9
with:
token: ${{ inputs.github-token }}
- name: Cache build artifacts
id: build-cache

View file

@ -49,23 +49,10 @@ jobs:
cp ./docs/static/_headers ./public/_headers
echo "Copied .well-known files and _headers to ./public"
- name: Detect runner environment
id: runner-env
uses: ./.forgejo/actions/detect-runner-os
- name: Setup Node.js
if: steps.runner-env.outputs.node_major == '' || steps.runner-env.outputs.node_major < '20'
uses: https://github.com/actions/setup-node@v4
with:
node-version: 22
- name: Cache npm dependencies
uses: actions/cache@v3
with:
path: ~/.npm
key: ${{ steps.runner-env.outputs.slug }}-node-${{ hashFiles('**/package-lock.json') }}
restore-keys: |
${{ steps.runner-env.outputs.slug }}-node-
node-version: 20
- name: Install dependencies
run: npm install --save-dev wrangler@latest

View file

@ -11,6 +11,7 @@ on:
- ".gitignore"
- "renovate.json"
- "pkg/**"
- "docker/**"
- "docs/**"
push:
branches:
@ -22,6 +23,7 @@ on:
- ".gitignore"
- "renovate.json"
- "pkg/**"
- "docker/**"
- "docs/**"
# Allows you to run this workflow manually from the Actions tab
workflow_dispatch:
@ -53,9 +55,6 @@ jobs:
let images = []
if (process.env.BUILTIN_REGISTRY_ENABLED === "true") {
images.push(builtinImage)
} else {
// Fallback to official registry for forks/PRs without credentials
images.push('forgejo.ellis.link/continuwuation/continuwuity')
}
core.setOutput('images', images.join("\n"))
core.setOutput('images_list', images.join(","))
@ -101,7 +100,6 @@ jobs:
with:
persist-credentials: false
- name: Install rust
if: ${{ env.BUILDKIT_ENDPOINT == '' }}
id: rust-toolchain
uses: ./.forgejo/actions/rust-toolchain
@ -112,11 +110,9 @@ jobs:
driver: ${{ env.BUILDKIT_ENDPOINT != '' && 'remote' || 'docker-container' }}
endpoint: ${{ env.BUILDKIT_ENDPOINT || '' }}
- name: Set up QEMU
if: ${{ env.BUILDKIT_ENDPOINT == '' }}
uses: docker/setup-qemu-action@v3
# Uses the `docker/login-action` action to log in to the Container registry registry using the account and password that will publish the packages. Once published, the packages are scoped to the account defined here.
- name: Login to builtin registry
if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }}
uses: docker/login-action@v3
with:
registry: ${{ env.BUILTIN_REGISTRY }}
@ -142,21 +138,15 @@ jobs:
run: |
calculatedSha=$(git rev-parse --short ${{ github.sha }})
echo "COMMIT_SHORT_SHA=$calculatedSha" >> $GITHUB_ENV
echo "Short SHA: $calculatedSha"
- name: Get Git commit timestamps
run: |
timestamp=$(git log -1 --pretty=%ct)
echo "TIMESTAMP=$timestamp" >> $GITHUB_ENV
echo "Commit timestamp: $timestamp"
run: echo "TIMESTAMP=$(git log -1 --pretty=%ct)" >> $GITHUB_ENV
- uses: ./.forgejo/actions/timelord
if: ${{ env.BUILDKIT_ENDPOINT == '' }}
with:
key: timelord-v0
path: .
- name: Cache Rust registry
if: ${{ env.BUILDKIT_ENDPOINT == '' }}
uses: actions/cache@v3
with:
path: |
@ -166,7 +156,6 @@ jobs:
.cargo/registry/src
key: rust-registry-image-${{hashFiles('**/Cargo.lock') }}
- name: Cache cargo target
if: ${{ env.BUILDKIT_ENDPOINT == '' }}
id: cache-cargo-target
uses: actions/cache@v3
with:
@ -174,7 +163,6 @@ jobs:
cargo-target-${{ matrix.target_cpu }}-${{ matrix.slug }}-${{ matrix.profile }}
key: cargo-target-${{ matrix.target_cpu }}-${{ matrix.slug }}-${{ matrix.profile }}-${{hashFiles('**/Cargo.lock') }}-${{steps.rust-toolchain.outputs.rustc_version}}
- name: Cache apt cache
if: ${{ env.BUILDKIT_ENDPOINT == '' }}
id: cache-apt
uses: actions/cache@v3
with:
@ -182,7 +170,6 @@ jobs:
var-cache-apt-${{ matrix.slug }}
key: var-cache-apt-${{ matrix.slug }}
- name: Cache apt lib
if: ${{ env.BUILDKIT_ENDPOINT == '' }}
id: cache-apt-lib
uses: actions/cache@v3
with:
@ -190,7 +177,6 @@ jobs:
var-lib-apt-${{ matrix.slug }}
key: var-lib-apt-${{ matrix.slug }}
- name: inject cache into docker
if: ${{ env.BUILDKIT_ENDPOINT == '' }}
uses: https://github.com/reproducible-containers/buildkit-cache-dance@v3.3.0
with:
cache-map: |
@ -213,7 +199,7 @@ jobs:
context: .
file: "docker/Dockerfile"
build-args: |
GIT_COMMIT_HASH=${{ github.sha }}
GIT_COMMIT_HASH=${{ github.sha }})
GIT_COMMIT_HASH_SHORT=${{ env.COMMIT_SHORT_SHA }}
GIT_REMOTE_URL=${{github.event.repository.html_url }}
GIT_REMOTE_COMMIT_URL=${{github.event.head_commit.url }}
@ -223,23 +209,27 @@ jobs:
cache-from: type=gha
# cache-to: type=gha,mode=max
sbom: true
outputs: |
${{ env.BUILTIN_REGISTRY_ENABLED == 'true' && format('type=image,"name={0}",push-by-digest=true,name-canonical=true,push=true', needs.define-variables.outputs.images_list) || format('type=image,"name={0}",push=false', needs.define-variables.outputs.images_list) }}
type=local,dest=/tmp/binaries
outputs: type=image,"name=${{ needs.define-variables.outputs.images_list }}",push-by-digest=true,name-canonical=true,push=true
env:
SOURCE_DATE_EPOCH: ${{ env.TIMESTAMP }}
# For publishing multi-platform manifests
- name: Export digest
if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }}
run: |
mkdir -p /tmp/digests
digest="${{ steps.build.outputs.digest }}"
touch "/tmp/digests/${digest#sha256:}"
# Binary extracted via local output for all builds
- name: Rename extracted binary
run: mv /tmp/binaries/sbin/conduwuit /tmp/binaries/conduwuit-${{ matrix.target_cpu }}-${{ matrix.slug }}-${{ matrix.profile }}
- name: Extract binary from container (image)
id: extract-binary-image
run: |
mkdir -p /tmp/binaries
digest="${{ steps.build.outputs.digest }}"
echo "container_id=$(docker create --platform ${{ matrix.platform }} ${{ needs.define-variables.outputs.images_list }}@$digest)" >> $GITHUB_OUTPUT
- name: Extract binary from container (copy)
run: docker cp ${{ steps.extract-binary-image.outputs.container_id }}:/sbin/conduwuit /tmp/binaries/conduwuit-${{ matrix.target_cpu }}-${{ matrix.slug }}-${{ matrix.profile }}
- name: Extract binary from container (cleanup)
run: docker rm ${{ steps.extract-binary-image.outputs.container_id }}
- name: Upload binary artifact
uses: forgejo/upload-artifact@v4
@ -249,7 +239,6 @@ jobs:
if-no-files-found: error
- name: Upload digest
if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }}
uses: forgejo/upload-artifact@v4
with:
name: digests-${{ matrix.slug }}
@ -262,7 +251,6 @@ jobs:
needs: [define-variables, build-image]
steps:
- name: Download digests
if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }}
uses: forgejo/download-artifact@v4
with:
path: /tmp/digests
@ -270,7 +258,6 @@ jobs:
merge-multiple: true
# Uses the `docker/login-action` action to log in to the Container registry registry using the account and password that will publish the packages. Once published, the packages are scoped to the account defined here.
- name: Login to builtin registry
if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }}
uses: docker/login-action@v3
with:
registry: ${{ env.BUILTIN_REGISTRY }}
@ -278,7 +265,6 @@ jobs:
password: ${{ secrets.BUILTIN_REGISTRY_PASSWORD || secrets.GITHUB_TOKEN }}
- name: Set up Docker Buildx
if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }}
uses: docker/setup-buildx-action@v3
with:
# Use persistent BuildKit if BUILDKIT_ENDPOINT is set (e.g. tcp://buildkit:8125)
@ -286,7 +272,6 @@ jobs:
endpoint: ${{ env.BUILDKIT_ENDPOINT || '' }}
- name: Extract metadata (tags) for Docker
if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }}
id: meta
uses: docker/metadata-action@v5
with:
@ -304,7 +289,6 @@ jobs:
DOCKER_METADATA_ANNOTATIONS_LEVELS: index
- name: Create manifest list and push
if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }}
working-directory: /tmp/digests
env:
IMAGES: ${{needs.define-variables.outputs.images}}
@ -322,7 +306,6 @@ jobs:
done
- name: Inspect image
if: ${{ env.BUILTIN_REGISTRY_ENABLED == 'true' }}
env:
IMAGES: ${{needs.define-variables.outputs.images}}
shell: bash

View file

@ -7,6 +7,5 @@
"continuwuity",
"homeserver",
"homeservers"
],
"rust-analyzer.cargo.features": ["full"]
]
}

View file

@ -1074,7 +1074,7 @@
# 3 to 5 = Statistics with possible performance impact.
# 6 = All statistics.
#
#rocksdb_stats_level = 1
#rocksdb_stats_level = 3
# This is a password that can be configured that will let you login to the
# server bot account (currently `@conduit`) for emergency troubleshooting

View file

@ -199,57 +199,32 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \
EOF
# Extract dynamically linked dependencies
RUN <<'DEPS_EOF'
RUN <<EOF
set -o xtrace
mkdir /out/libs /out/libs-root
# Process each binary
mkdir /out/libs
mkdir /out/libs-root
for BINARY in /out/sbin/*; do
if lddtree_output=$(lddtree "$BINARY" 2>/dev/null) && [ -n "$lddtree_output" ]; then
echo "$lddtree_output" | awk '{print $(NF-0) " " $1}' | sort -u -k 1,1 | \
awk '{dest = ($2 ~ /^\//) ? "/out/libs-root" $2 : "/out/libs/" $2; print "install -D " $1 " " dest}' | \
while read cmd; do eval "$cmd"; done
fi
lddtree "$BINARY" | awk '{print $(NF-0) " " $1}' | sort -u -k 1,1 | awk '{print "install", "-D", $1, (($2 ~ /^\//) ? "/out/libs-root" $2 : "/out/libs/" $2)}' | xargs -I {} sh -c {}
done
# Show what will be copied to runtime
echo "=== Libraries being copied to runtime image:"
find /out/libs* -type f 2>/dev/null | sort || echo "No libraries found"
DEPS_EOF
FROM ubuntu:latest AS prepper
# Create layer structure
RUN mkdir -p /layer1/etc/ssl/certs \
/layer2/usr/lib \
/layer3/sbin /layer3/sbom
# Copy SSL certs and root-path libraries to layer1 (ultra-stable)
COPY --from=base /etc/ssl/certs /layer1/etc/ssl/certs
COPY --from=builder /out/libs-root/ /layer1/
# Copy application libraries to layer2 (semi-stable)
COPY --from=builder /out/libs/ /layer2/usr/lib/
# Copy binaries and SBOM to layer3 (volatile)
COPY --from=builder /out/sbin/ /layer3/sbin/
COPY --from=builder /out/sbom/ /layer3/sbom/
# Fix permissions after copying
RUN chmod -R 755 /layer1 /layer2 /layer3
EOF
FROM scratch
WORKDIR /
# Copy ultra-stable layer (SSL certs, system libraries)
COPY --from=prepper /layer1/ /
# Copy root certs for tls into image
# You can also mount the certs from the host
# --volume /etc/ssl/certs:/etc/ssl/certs:ro
COPY --from=base /etc/ssl/certs /etc/ssl/certs
# Copy semi-stable layer (application libraries)
COPY --from=prepper /layer2/ /
# Copy our build
COPY --from=builder /out/sbin/ /sbin/
# Copy SBOM
COPY --from=builder /out/sbom/ /sbom/
# Copy volatile layer (binaries, SBOM)
COPY --from=prepper /layer3/ /
# Copy dynamic libraries to root
COPY --from=builder /out/libs-root/ /
COPY --from=builder /out/libs/ /usr/lib/
# Inform linker where to find libraries
ENV LD_LIBRARY_PATH=/usr/lib

View file

@ -35,6 +35,7 @@ use ruma::{
};
use tracing::warn;
use super::utils::{count_to_pagination_token, pagination_token_to_count as parse_token};
use crate::Ruma;
/// list of safe and common non-state events to ignore if the user is ignored
@ -84,14 +85,14 @@ pub(crate) async fn get_message_events_route(
let from: PduCount = body
.from
.as_deref()
.map(str::parse)
.map(parse_token)
.transpose()?
.unwrap_or_else(|| match body.dir {
| Direction::Forward => PduCount::min(),
| Direction::Backward => PduCount::max(),
});
let to: Option<PduCount> = body.to.as_deref().map(str::parse).transpose()?;
let to: Option<PduCount> = body.to.as_deref().map(parse_token).transpose()?;
let limit: usize = body
.limit
@ -180,8 +181,8 @@ pub(crate) async fn get_message_events_route(
.collect();
Ok(get_message_events::v3::Response {
start: from.to_string(),
end: next_token.as_ref().map(PduCount::to_string),
start: count_to_pagination_token(from),
end: next_token.map(count_to_pagination_token),
chunk,
state,
})

View file

@ -37,6 +37,7 @@ pub(super) mod typing;
pub(super) mod unstable;
pub(super) mod unversioned;
pub(super) mod user_directory;
pub(super) mod utils;
pub(super) mod voip;
pub(super) mod well_known;

View file

@ -18,6 +18,7 @@ use ruma::{
events::{TimelineEventType, relation::RelationType},
};
use super::utils::{count_to_pagination_token, pagination_token_to_count as parse_token};
use crate::Ruma;
/// # `GET /_matrix/client/r0/rooms/{roomId}/relations/{eventId}/{relType}/{eventType}`
@ -110,14 +111,14 @@ async fn paginate_relations_with_filter(
dir: Direction,
) -> Result<get_relating_events::v1::Response> {
let start: PduCount = from
.map(str::parse)
.map(parse_token)
.transpose()?
.unwrap_or_else(|| match dir {
| Direction::Forward => PduCount::min(),
| Direction::Backward => PduCount::max(),
});
let to: Option<PduCount> = to.map(str::parse).transpose()?;
let to: Option<PduCount> = to.map(parse_token).transpose()?;
// Use limit or else 30, with maximum 100
let limit: usize = limit
@ -192,7 +193,7 @@ async fn paginate_relations_with_filter(
| Direction::Forward => events.last(),
| Direction::Backward => events.first(),
}
.map(|(count, _)| count.to_string())
.map(|(count, _)| count_to_pagination_token(*count))
} else {
None
};

View file

@ -9,7 +9,7 @@ use conduwuit::{
use futures::StreamExt;
use ruma::{api::client::threads::get_threads, uint};
use crate::Ruma;
use crate::{Ruma, client::utils::pagination_token_to_count};
/// # `GET /_matrix/client/r0/rooms/{roomId}/threads`
pub(crate) async fn get_threads_route(
@ -27,7 +27,7 @@ pub(crate) async fn get_threads_route(
let from: PduCount = body
.from
.as_deref()
.map(str::parse)
.map(pagination_token_to_count)
.transpose()?
.unwrap_or_else(PduCount::max);

7
src/api/client/utils.rs Normal file
View file

@ -0,0 +1,7 @@
use conduwuit::{Result, matrix::pdu::PduCount};
/// Parse a pagination token
pub(crate) fn pagination_token_to_count(token: &str) -> Result<PduCount> { token.parse() }
/// Convert a PduCount to a token string
pub(crate) fn count_to_pagination_token(count: PduCount) -> String { count.to_string() }

View file

@ -1,13 +1,12 @@
#![allow(deprecated)]
use std::{borrow::Borrow, time::Instant, vec};
use std::borrow::Borrow;
use axum::extract::State;
use conduwuit::{
Err, Event, Result, at, debug, err, info,
Err, Result, at, err,
matrix::event::gen_event_id_canonical_json,
trace,
utils::stream::{BroadbandExt, IterStream, TryBroadbandExt},
utils::stream::{IterStream, TryBroadbandExt},
warn,
};
use conduwuit_service::Services;
@ -26,14 +25,12 @@ use serde_json::value::{RawValue as RawJsonValue, to_raw_value};
use crate::Ruma;
/// helper method for /send_join v1 and v2
#[tracing::instrument(skip(services, pdu, omit_members), fields(room_id = room_id.as_str(), origin = origin.as_str()))]
async fn create_join_event(
services: &Services,
origin: &ServerName,
room_id: &RoomId,
pdu: &RawJsonValue,
omit_members: bool,
) -> Result<create_join_event::v2::RoomState> {
) -> Result<create_join_event::v1::RoomState> {
if !services.rooms.metadata.exists(room_id).await {
return Err!(Request(NotFound("Room is unknown to this server.")));
}
@ -56,10 +53,8 @@ async fn create_join_event(
// We do not add the event_id field to the pdu here because of signature and
// hashes checks
trace!("Getting room version");
let room_version_id = services.rooms.state.get_room_version(room_id).await?;
trace!("Generating event ID and converting to canonical json");
let Ok((event_id, mut value)) = gen_event_id_canonical_json(pdu, &room_version_id) else {
// Event could not be converted to canonical json
return Err!(Request(BadJson("Could not convert event to canonical json.")));
@ -108,6 +103,7 @@ async fn create_join_event(
)));
}
// ACL check sender user server name
let sender: OwnedUserId = serde_json::from_value(
value
.get("sender")
@ -117,6 +113,12 @@ async fn create_join_event(
)
.map_err(|e| err!(Request(BadJson(warn!("sender property is not a valid user ID: {e}")))))?;
services
.rooms
.event_handler
.acl_check(sender.server_name(), room_id)
.await?;
// check if origin server is trying to send for another server
if sender.server_name() != origin {
return Err!(Request(Forbidden("Not allowed to join on behalf of another server.")));
@ -178,6 +180,11 @@ async fn create_join_event(
}
}
services
.server_keys
.hash_and_sign_event(&mut value, &room_version_id)
.map_err(|e| err!(Request(InvalidParam(warn!("Failed to sign send_join event: {e}")))))?;
let origin: OwnedServerName = serde_json::from_value(
value
.get("origin")
@ -187,12 +194,6 @@ async fn create_join_event(
)
.map_err(|e| err!(Request(BadJson("Event has an invalid origin server name: {e}"))))?;
trace!("Signing send_join event");
services
.server_keys
.hash_and_sign_event(&mut value, &room_version_id)
.map_err(|e| err!(Request(InvalidParam(warn!("Failed to sign send_join event: {e}")))))?;
let mutex_lock = services
.rooms
.event_handler
@ -200,7 +201,6 @@ async fn create_join_event(
.lock(room_id)
.await;
trace!("Acquired send_join mutex, persisting join event");
let pdu_id = services
.rooms
.event_handler
@ -210,7 +210,7 @@ async fn create_join_event(
.ok_or_else(|| err!(Request(InvalidParam("Could not accept as timeline event."))))?;
drop(mutex_lock);
trace!("Fetching current state IDs");
let state_ids: Vec<OwnedEventId> = services
.rooms
.state_accessor
@ -219,23 +219,9 @@ async fn create_join_event(
.collect()
.await;
trace!(%omit_members, "Constructing current state");
let state = state_ids
.iter()
.try_stream()
.broad_filter_map(|event_id| async move {
if omit_members {
if let Ok(e) = event_id.as_ref() {
let pdu = services.rooms.timeline.get_pdu(e).await;
if pdu.is_ok_and(|p| p.kind().to_cow_str() == "m.room.member") {
trace!("omitting member event {e:?} from returned state");
// skip members
return None;
}
}
}
Some(event_id)
})
.broad_and_then(|event_id| services.rooms.timeline.get_pdu_json(event_id))
.broad_and_then(|pdu| {
services
@ -248,7 +234,6 @@ async fn create_join_event(
.await?;
let starting_events = state_ids.iter().map(Borrow::borrow);
trace!("Constructing auth chain");
let auth_chain = services
.rooms
.auth_chain
@ -265,37 +250,13 @@ async fn create_join_event(
.try_collect()
.boxed()
.await?;
info!(fast_join = %omit_members, "Sending join event to other servers");
services.sending.send_pdu_room(room_id, &pdu_id).await?;
debug!("Finished sending join event");
let servers_in_room: Option<Vec<_>> = if !omit_members {
None
} else {
trace!("Fetching list of servers in room");
let servers: Vec<String> = services
.rooms
.state_cache
.room_servers(room_id)
.map(|sn| sn.as_str().to_owned())
.collect()
.await;
// If there's no servers, just add us
let servers = if servers.is_empty() {
warn!("Failed to find any servers, adding our own server name as a last resort");
vec![services.globals.server_name().to_string()]
} else {
trace!("Found {} servers in room", servers.len());
servers
};
Some(servers)
};
debug!("Returning send_join data");
Ok(create_join_event::v2::RoomState {
Ok(create_join_event::v1::RoomState {
auth_chain,
state,
event: to_raw_value(&CanonicalJsonValue::Object(value)).ok(),
members_omitted: omit_members,
servers_in_room,
})
}
@ -333,23 +294,11 @@ pub(crate) async fn create_join_event_v1_route(
}
}
let now = Instant::now();
let room_state = create_join_event(&services, body.origin(), &body.room_id, &body.pdu, false)
let room_state = create_join_event(&services, body.origin(), &body.room_id, &body.pdu)
.boxed()
.await?;
let transformed = create_join_event::v1::RoomState {
auth_chain: room_state.auth_chain,
state: room_state.state,
event: room_state.event,
};
info!(
"Finished sending a join for {} in {} in {:?}",
body.origin(),
&body.room_id,
now.elapsed()
);
Ok(create_join_event::v1::Response { room_state: transformed })
Ok(create_join_event::v1::Response { room_state })
}
/// # `PUT /_matrix/federation/v2/send_join/{roomId}/{eventId}`
@ -380,17 +329,17 @@ pub(crate) async fn create_join_event_v2_route(
}
}
let now = Instant::now();
let room_state =
create_join_event(&services, body.origin(), &body.room_id, &body.pdu, body.omit_members)
let create_join_event::v1::RoomState { auth_chain, state, event } =
create_join_event(&services, body.origin(), &body.room_id, &body.pdu)
.boxed()
.await?;
info!(
"Finished sending a join for {} in {} in {:?}",
body.origin(),
&body.room_id,
now.elapsed()
);
let room_state = create_join_event::v2::RoomState {
members_omitted: false,
auth_chain,
state,
event,
servers_in_room: None,
};
Ok(create_join_event::v2::Response { room_state })
}

View file

@ -1247,7 +1247,7 @@ pub struct Config {
/// 3 to 5 = Statistics with possible performance impact.
/// 6 = All statistics.
///
/// default: 1
/// default: 3
#[serde(default = "default_rocksdb_stats_level")]
pub rocksdb_stats_level: u8,
@ -2422,7 +2422,7 @@ fn default_typing_client_timeout_max_s() -> u64 { 45 }
fn default_rocksdb_recovery_mode() -> u8 { 1 }
fn default_rocksdb_log_level() -> String { "error".to_owned() }
fn default_rocksdb_log_level() -> String { "info".to_owned() }
fn default_rocksdb_log_time_to_roll() -> usize { 0 }
@ -2454,7 +2454,7 @@ fn default_rocksdb_compression_level() -> i32 { 32767 }
#[allow(clippy::doc_markdown)]
fn default_rocksdb_bottommost_compression_level() -> i32 { 32767 }
fn default_rocksdb_stats_level() -> u8 { 1 }
fn default_rocksdb_stats_level() -> u8 { 3 }
// I know, it's a great name
#[must_use]

View file

@ -242,12 +242,16 @@ where
}
*/
let (room_create_event, power_levels_event, sender_member_event) = join3(
fetch_state(&StateEventType::RoomCreate, ""),
fetch_state(&StateEventType::RoomPowerLevels, ""),
fetch_state(&StateEventType::RoomMember, sender.as_str()),
)
.await;
// let (room_create_event, power_levels_event, sender_member_event) = join3(
// fetch_state(&StateEventType::RoomCreate, ""),
// fetch_state(&StateEventType::RoomPowerLevels, ""),
// fetch_state(&StateEventType::RoomMember, sender.as_str()),
// )
// .await;
let room_create_event = fetch_state(&StateEventType::RoomCreate, "").await;
let power_levels_event = fetch_state(&StateEventType::RoomPowerLevels, "").await;
let sender_member_event = fetch_state(&StateEventType::RoomMember, sender.as_str()).await;
let room_create_event = match room_create_event {
| None => {

View file

@ -1,8 +1,6 @@
use std::{cmp, convert::TryFrom};
use conduwuit::{Config, Result, utils};
use conduwuit::{Config, Result};
use rocksdb::{Cache, DBRecoveryMode, Env, LogLevel, Options, statistics::StatsLevel};
use conduwuit::config::{parallelism_scaled_i32, parallelism_scaled_u32};
use super::{cf_opts::cache_size_f64, logger::handle as handle_log};
/// Create database-wide options suitable for opening the database. This also
@ -23,8 +21,8 @@ pub(crate) fn db_options(config: &Config, env: &Env, row_cache: &Cache) -> Resul
set_logging_defaults(&mut opts, config);
// Processing
opts.set_max_background_jobs(num_threads::<i32>(config)?);
opts.set_max_subcompactions(num_threads::<u32>(config)?);
opts.set_max_background_jobs(parallelism_scaled_i32(1));
opts.set_max_subcompactions(parallelism_scaled_u32(1));
opts.set_avoid_unnecessary_blocking_io(true);
opts.set_max_file_opening_threads(0);
@ -126,15 +124,3 @@ fn set_logging_defaults(opts: &mut Options, config: &Config) {
opts.set_callback_logger(rocksdb_log_level, &handle_log);
}
}
fn num_threads<T: TryFrom<usize>>(config: &Config) -> Result<T> {
const MIN_PARALLELISM: usize = 2;
let requested = if config.rocksdb_parallelism_threads != 0 {
config.rocksdb_parallelism_threads
} else {
utils::available_parallelism()
};
utils::math::try_into::<T, usize>(cmp::max(MIN_PARALLELISM, requested))
}

View file

@ -452,7 +452,7 @@ async fn get_statediff(&self, shortstatehash: ShortStateHash) -> Result<StateDif
.ok()
.take_if(|parent| *parent != 0);
debug_assert!(value.len().is_multiple_of(STRIDE), "value not aligned to stride");
debug_assert!(value.len() % STRIDE == 0, "value not aligned to stride");
let _num_values = value.len() / STRIDE;
let mut add_mode = true;

View file

@ -229,7 +229,7 @@ pub async fn get_remote_pdu(&self, room_id: &RoomId, event_id: &EventId) -> Resu
| Ok(value) => {
self.services
.event_handler
.handle_incoming_pdu(backfill_server, room_id, event_id, value, false)
.handle_incoming_pdu(backfill_server, &room_id, &event_id, value, false)
.boxed()
.await?;
debug!("Successfully backfilled {event_id} from {backfill_server}");