Compare commits

...

52 commits

Author SHA1 Message Date
Jacob Taylor
cda40e8f9f more funny settings (part 3 of 12)
Some checks failed
Checks / Prefligit / prefligit (push) Failing after 1s
Release Docker Image / define-variables (push) Failing after 1s
Release Docker Image / build-image (linux/amd64, release, linux-amd64, base) (push) Has been skipped
Release Docker Image / build-image (linux/arm64, release, linux-arm64, base) (push) Has been skipped
Release Docker Image / merge (push) Has been skipped
Checks / Rust / Format (push) Failing after 3s
Checks / Rust / Clippy (push) Failing after 17s
Checks / Rust / Cargo Test (push) Failing after 15s
2025-06-30 16:59:48 -07:00
Jacob Taylor
4d49133504 sender_workers scaling. this time, with feeling! 2025-06-30 16:59:48 -07:00
Jacob Taylor
66c2fe8315 vehicle loan documentation now available at window 7 2025-06-30 16:59:48 -07:00
Jacob Taylor
8111b89d47 lock the getter instead ??? c/o M 2025-06-30 16:59:48 -07:00
Jacob Taylor
6958bc150b make fetching key room events less smart 2025-06-30 16:59:48 -07:00
Jacob Taylor
05b0f2e365 change rocksdb stats level to 3
scale rocksdb background jobs and subcompactions

change rocksdb default error level to info from error

delete unused num_threads function

fix warns from cargo
2025-06-30 16:59:48 -07:00
nexy7574
513b057e9d modify more log strings so they're more useful than not 2025-06-30 16:59:48 -07:00
nexy7574
5c4a9942f4 When in doubt, log all the things 2025-06-30 16:59:48 -07:00
nexy7574
09b5c3420e log which room struggled to get mainline depth 2025-06-30 16:59:48 -07:00
nexy7574
3a634ff1b2 more logs 2025-06-30 16:59:48 -07:00
nexy7574
70ba586ffd Unsafe, untested, and potentially overeager PDU sanity checks 2025-06-30 16:59:48 -07:00
nexy7574
f0c6391eaa Fix room ID check 2025-06-30 16:59:48 -07:00
nexy7574
cfc5fab813 Kick up a fuss when m.room.create is unfindable 2025-06-30 16:59:48 -07:00
nexy7574
4ff1553b56 Note about ruma#2064 in TODO 2025-06-30 16:59:48 -07:00
nexy7574
6e9e252391 fix an auth rule not applying correctly 2025-06-30 16:59:48 -07:00
Jacob Taylor
c93228b8b7 upgrade some settings to enable 5g in continuwuity
enable converged 6g at the edge in continuwuity

better stateinfo_cache_capacity default

better roomid_spacehierarchy_cache_capacity

make sender workers default better and clamp value to core count

update sender workers documentation

add more parallelism_scaled and make them public

update 1 document
2025-06-30 16:59:48 -07:00
Jacob Taylor
0c2d8aa7bf add futures::FutureExt to make cb15ac3c01 work 2025-06-30 16:59:48 -07:00
Jason Volk
7da679328b Mitigate large futures
Signed-off-by: Jason Volk <jason@zemos.net>
2025-06-30 16:59:48 -07:00
Jacob Taylor
cc1b841eb3 bump the number of allowed immutable memtables by 1, to allow for greater flood protection
this should probably not be applied if you have rocksdb_atomic_flush = false (the default)
2025-06-30 16:59:48 -07:00
Jacob Taylor
bc901aece7 probably incorrectly delete support for non-standardized matrix srv record 2025-06-30 16:59:48 -07:00
Jade Ellis
9dfcf3538f fix: Filter out invalid replacements from bundled aggregations 2025-06-30 16:59:48 -07:00
Jade Ellis
e1127c5d70 feat: Add bundled aggregations support
Add support for the m.replace and m.reference bundled
aggregations.
This should fix plenty of subtle client issues.
Threads are not included in the new code as they have
historically been written to the database. Replacing the
old system would result in issues when switching away from
continuwuity, so saved for later.
Some TODOs have been left re event visibility and ignored users.
These should be OK for now, though.
2025-06-30 16:59:48 -07:00
Jade Ellis
57ad8c7fef refactor: Promote handling unsigned data out of timeline
Also fixes:
- Transaction IDs leaking in event route
- Age not being set for event relations or threads
- Both of the above for search results

Notes down concern with relations table
2025-06-30 16:59:48 -07:00
Jade Ellis
17930708d8
chore: Add second ko-fi as custom link
Some checks failed
Documentation / Build and Deploy Documentation (push) Has been skipped
Checks / Prefligit / prefligit (push) Failing after 2s
Release Docker Image / define-variables (push) Failing after 12s
Release Docker Image / build-image (linux/amd64, release, linux-amd64, base) (push) Has been skipped
Release Docker Image / build-image (linux/arm64, release, linux-arm64, base) (push) Has been skipped
Release Docker Image / merge (push) Has been skipped
Checks / Rust / Format (push) Failing after 6s
Checks / Rust / Clippy (push) Failing after 14s
Checks / Rust / Cargo Test (push) Failing after 11s
2025-06-29 23:06:26 +01:00
Jade Ellis
ec9d3d613e
chore: Add funding
Some checks failed
Documentation / Build and Deploy Documentation (push) Has been skipped
Checks / Prefligit / prefligit (push) Failing after 3s
Release Docker Image / define-variables (push) Failing after 1s
Release Docker Image / build-image (linux/amd64, release, linux-amd64, base) (push) Has been skipped
Release Docker Image / build-image (linux/arm64, release, linux-arm64, base) (push) Has been skipped
Release Docker Image / merge (push) Has been skipped
Checks / Rust / Format (push) Failing after 3s
Checks / Rust / Clippy (push) Failing after 14s
Checks / Rust / Cargo Test (push) Failing after 15s
2025-06-29 23:02:15 +01:00
nexy7574
d4862b8ead style: Remove redundant, unused functions
Some checks failed
Documentation / Build and Deploy Documentation (push) Has been skipped
Checks / Prefligit / prefligit (push) Failing after 13s
Release Docker Image / define-variables (push) Failing after 5s
Release Docker Image / build-image (linux/amd64, release, linux-amd64, base) (push) Has been skipped
Release Docker Image / build-image (linux/arm64, release, linux-arm64, base) (push) Has been skipped
Release Docker Image / merge (push) Has been skipped
Checks / Rust / Format (push) Failing after 5s
Checks / Rust / Clippy (push) Failing after 11s
Checks / Rust / Cargo Test (push) Failing after 12s
2025-06-29 15:38:01 +00:00
Jade Ellis
acb74faa07 feat: Pass sender through admin commands 2025-06-29 15:38:01 +00:00
Jade Ellis
ecc6fda98b feat: Record metadata about user suspensions 2025-06-29 15:38:01 +00:00
nexy7574
13e17d52e0 style: Remove unnecessary imports (clippy) 2025-06-29 15:38:01 +00:00
nexy7574
d8a27eeb54 fix: Failing open on database errors
oops
2025-06-29 15:38:01 +00:00
nexy7574
eb2e3b3bb7 fix: Missing suspensions shouldn't error
Turns out copying and pasting the function
above verbatim actually introduces more
problems than it solves!
2025-06-29 15:38:01 +00:00
nexy7574
72f8cb3038 feat: Do not allow suspended users to send typing statuses 2025-06-29 15:38:01 +00:00
nexy7574
1124097bd1 feat: Only allow private read receipts when suspended 2025-06-29 15:38:01 +00:00
nexy7574
08527a2880 feat: Prevent suspended users upgrading rooms 2025-06-29 15:38:01 +00:00
nexy7574
8e06571e7c feat: Prevent suspended users uploading media 2025-06-29 15:38:01 +00:00
nexy7574
90180916eb feat: Prevent suspended users performing room changes
Prevents kicks, bans, unbans, and alias modification
2025-06-29 15:38:01 +00:00
nexy7574
d0548ec064 feat: Forbid suspended users from sending state events 2025-06-29 15:38:01 +00:00
nexy7574
1ff8af8e9e style: Remove unneeded statements (clippy) 2025-06-29 15:38:01 +00:00
nexy7574
cc864dc8bb feat: Do not allow suspending admin users 2025-06-29 15:38:01 +00:00
nexy7574
8791a9b851 fix: Inappropriate empty check
I once again, assumed `true` is actually `false`.
2025-06-29 15:38:01 +00:00
nexy7574
968c0e236c fix: Create the column appropriately 2025-06-29 15:38:01 +00:00
nexy7574
5d5350a9fe feat: Prevent suspended users creating new rooms 2025-06-29 15:38:01 +00:00
nexy7574
e127c4e5a2 feat: Add un/suspend admin commands 2025-06-29 15:38:01 +00:00
nexy7574
a94128e698 feat: Prevent suspended users joining/knocking on rooms 2025-06-29 15:38:01 +00:00
nexy7574
a6ba9e3045 feat: Prevent suspended users changing their profile 2025-06-29 15:38:01 +00:00
nexy7574
286974cb9a feat: Prevent suspended users redacting events 2025-06-29 15:38:01 +00:00
nexy7574
accfda2586 feat: Prevent suspended users sending events 2025-06-29 15:38:01 +00:00
nexy7574
fac9e090cd feat: Add suspension helper to user service 2025-06-29 15:38:01 +00:00
nexy7574
b4bdd1ee65
chore: Update ruwuma
Some checks failed
Documentation / Build and Deploy Documentation (push) Has been skipped
Checks / Prefligit / prefligit (push) Failing after 11s
Release Docker Image / define-variables (push) Failing after 3s
Release Docker Image / build-image (linux/amd64, release, linux-amd64, base) (push) Has been skipped
Release Docker Image / build-image (linux/arm64, release, linux-arm64, base) (push) Has been skipped
Release Docker Image / merge (push) Has been skipped
Checks / Rust / Format (push) Failing after 3s
Checks / Rust / Clippy (push) Failing after 20s
Checks / Rust / Cargo Test (push) Failing after 19s
Fixes the wrong field name being serialised
2025-06-29 13:43:27 +01:00
nexy7574
4b5e8df95c
fix: Add missing init fields 2025-06-29 13:29:27 +01:00
nexy7574
d63c8b9fca
feat: Support passing through MSC4293 redact_events 2025-06-29 13:16:31 +01:00
nexy7574
9b6ac6c45f fix: Ignore existing membership when room is disconnected
Some checks failed
Documentation / Build and Deploy Documentation (push) Has been skipped
Checks / Prefligit / prefligit (push) Failing after 7s
Release Docker Image / define-variables (push) Failing after 3s
Release Docker Image / build-image (linux/amd64, release, linux-amd64, base) (push) Has been skipped
Release Docker Image / build-image (linux/arm64, release, linux-arm64, base) (push) Has been skipped
Release Docker Image / merge (push) Has been skipped
Checks / Rust / Format (push) Failing after 7s
Checks / Rust / Clippy (push) Failing after 20s
Checks / Rust / Cargo Test (push) Failing after 16s
2025-06-29 12:14:20 +00:00
55 changed files with 1450 additions and 298 deletions

5
.github/FUNDING.yml vendored Normal file
View file

@ -0,0 +1,5 @@
github: [JadedBlueEyes]
# Doesn't support an array, so we can only list nex
ko_fi: nexy7574
custom:
- https://ko-fi.com/JadedBlueEyes

22
Cargo.lock generated
View file

@ -3798,7 +3798,7 @@ dependencies = [
[[package]]
name = "ruma"
version = "0.10.1"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=d6870a7fb7f6cccff63f7fd0ff6c581bad80e983#d6870a7fb7f6cccff63f7fd0ff6c581bad80e983"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=9b65f83981f6f53d185ce77da37aaef9dfd764a9#9b65f83981f6f53d185ce77da37aaef9dfd764a9"
dependencies = [
"assign",
"js_int",
@ -3818,7 +3818,7 @@ dependencies = [
[[package]]
name = "ruma-appservice-api"
version = "0.10.0"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=d6870a7fb7f6cccff63f7fd0ff6c581bad80e983#d6870a7fb7f6cccff63f7fd0ff6c581bad80e983"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=9b65f83981f6f53d185ce77da37aaef9dfd764a9#9b65f83981f6f53d185ce77da37aaef9dfd764a9"
dependencies = [
"js_int",
"ruma-common",
@ -3830,7 +3830,7 @@ dependencies = [
[[package]]
name = "ruma-client-api"
version = "0.18.0"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=d6870a7fb7f6cccff63f7fd0ff6c581bad80e983#d6870a7fb7f6cccff63f7fd0ff6c581bad80e983"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=9b65f83981f6f53d185ce77da37aaef9dfd764a9#9b65f83981f6f53d185ce77da37aaef9dfd764a9"
dependencies = [
"as_variant",
"assign",
@ -3853,7 +3853,7 @@ dependencies = [
[[package]]
name = "ruma-common"
version = "0.13.0"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=d6870a7fb7f6cccff63f7fd0ff6c581bad80e983#d6870a7fb7f6cccff63f7fd0ff6c581bad80e983"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=9b65f83981f6f53d185ce77da37aaef9dfd764a9#9b65f83981f6f53d185ce77da37aaef9dfd764a9"
dependencies = [
"as_variant",
"base64 0.22.1",
@ -3885,7 +3885,7 @@ dependencies = [
[[package]]
name = "ruma-events"
version = "0.28.1"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=d6870a7fb7f6cccff63f7fd0ff6c581bad80e983#d6870a7fb7f6cccff63f7fd0ff6c581bad80e983"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=9b65f83981f6f53d185ce77da37aaef9dfd764a9#9b65f83981f6f53d185ce77da37aaef9dfd764a9"
dependencies = [
"as_variant",
"indexmap 2.9.0",
@ -3910,7 +3910,7 @@ dependencies = [
[[package]]
name = "ruma-federation-api"
version = "0.9.0"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=d6870a7fb7f6cccff63f7fd0ff6c581bad80e983#d6870a7fb7f6cccff63f7fd0ff6c581bad80e983"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=9b65f83981f6f53d185ce77da37aaef9dfd764a9#9b65f83981f6f53d185ce77da37aaef9dfd764a9"
dependencies = [
"bytes",
"headers",
@ -3932,7 +3932,7 @@ dependencies = [
[[package]]
name = "ruma-identifiers-validation"
version = "0.9.5"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=d6870a7fb7f6cccff63f7fd0ff6c581bad80e983#d6870a7fb7f6cccff63f7fd0ff6c581bad80e983"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=9b65f83981f6f53d185ce77da37aaef9dfd764a9#9b65f83981f6f53d185ce77da37aaef9dfd764a9"
dependencies = [
"js_int",
"thiserror 2.0.12",
@ -3941,7 +3941,7 @@ dependencies = [
[[package]]
name = "ruma-identity-service-api"
version = "0.9.0"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=d6870a7fb7f6cccff63f7fd0ff6c581bad80e983#d6870a7fb7f6cccff63f7fd0ff6c581bad80e983"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=9b65f83981f6f53d185ce77da37aaef9dfd764a9#9b65f83981f6f53d185ce77da37aaef9dfd764a9"
dependencies = [
"js_int",
"ruma-common",
@ -3951,7 +3951,7 @@ dependencies = [
[[package]]
name = "ruma-macros"
version = "0.13.0"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=d6870a7fb7f6cccff63f7fd0ff6c581bad80e983#d6870a7fb7f6cccff63f7fd0ff6c581bad80e983"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=9b65f83981f6f53d185ce77da37aaef9dfd764a9#9b65f83981f6f53d185ce77da37aaef9dfd764a9"
dependencies = [
"cfg-if",
"proc-macro-crate",
@ -3966,7 +3966,7 @@ dependencies = [
[[package]]
name = "ruma-push-gateway-api"
version = "0.9.0"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=d6870a7fb7f6cccff63f7fd0ff6c581bad80e983#d6870a7fb7f6cccff63f7fd0ff6c581bad80e983"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=9b65f83981f6f53d185ce77da37aaef9dfd764a9#9b65f83981f6f53d185ce77da37aaef9dfd764a9"
dependencies = [
"js_int",
"ruma-common",
@ -3978,7 +3978,7 @@ dependencies = [
[[package]]
name = "ruma-signatures"
version = "0.15.0"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=d6870a7fb7f6cccff63f7fd0ff6c581bad80e983#d6870a7fb7f6cccff63f7fd0ff6c581bad80e983"
source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=9b65f83981f6f53d185ce77da37aaef9dfd764a9#9b65f83981f6f53d185ce77da37aaef9dfd764a9"
dependencies = [
"base64 0.22.1",
"ed25519-dalek",

View file

@ -350,7 +350,7 @@ version = "0.1.2"
[workspace.dependencies.ruma]
git = "https://forgejo.ellis.link/continuwuation/ruwuma"
#branch = "conduwuit-changes"
rev = "d6870a7fb7f6cccff63f7fd0ff6c581bad80e983"
rev = "9b65f83981f6f53d185ce77da37aaef9dfd764a9"
features = [
"compat",
"rand",

View file

@ -990,7 +990,7 @@
# 3 to 5 = Statistics with possible performance impact.
# 6 = All statistics.
#
#rocksdb_stats_level = 1
#rocksdb_stats_level = 3
# This is a password that can be configured that will let you login to the
# server bot account (currently `@conduit`) for emergency troubleshooting
@ -1590,11 +1590,9 @@
#stream_amplification = 1024
# Number of sender task workers; determines sender parallelism. Default is
# '0' which means the value is determined internally, likely matching the
# number of tokio worker-threads or number of cores, etc. Override by
# setting a non-zero value.
# number of CPU cores. Override by setting a different value.
#
#sender_workers = 0
#sender_workers = 4
# Enables listener sockets; can be set to false to disable listening. This
# option is intended for developer/diagnostic purposes only.

View file

@ -7,13 +7,14 @@ use futures::{
io::{AsyncWriteExt, BufWriter},
lock::Mutex,
};
use ruma::EventId;
use ruma::{EventId, UserId};
pub(crate) struct Context<'a> {
pub(crate) services: &'a Services,
pub(crate) body: &'a [&'a str],
pub(crate) timer: SystemTime,
pub(crate) reply_id: Option<&'a EventId>,
pub(crate) sender: Option<&'a UserId>,
pub(crate) output: Mutex<BufWriter<Vec<u8>>>,
}
@ -36,4 +37,10 @@ impl Context<'_> {
output.write_all(s.as_bytes()).map_err(Into::into).await
})
}
/// Get the sender as a string, or service user ID if not available
pub(crate) fn sender_or_service_user(&self) -> &UserId {
self.sender
.unwrap_or_else(|| self.services.globals.server_user.as_ref())
}
}

View file

@ -63,6 +63,7 @@ async fn process_command(services: Arc<Services>, input: &CommandInput) -> Proce
body: &body,
timer: SystemTime::now(),
reply_id: input.reply_id.as_deref(),
sender: input.sender.as_deref(),
output: BufWriter::new(Vec::new()).into(),
};

View file

@ -31,7 +31,7 @@ pub(super) async fn last(&self, room_id: OwnedRoomOrAliasId) -> Result {
.services
.rooms
.timeline
.last_timeline_count(None, &room_id)
.last_timeline_count(&room_id)
.await?;
self.write_str(&format!("{result:#?}")).await
@ -52,7 +52,7 @@ pub(super) async fn pdus(
.services
.rooms
.timeline
.pdus_rev(None, &room_id, from)
.pdus_rev(&room_id, from)
.try_take(limit.unwrap_or(3))
.try_collect()
.await?;

View file

@ -6,6 +6,7 @@ use conduwuit::{
warn,
};
use futures::StreamExt;
use futures::FutureExt;
use ruma::{OwnedRoomId, OwnedRoomOrAliasId, RoomAliasId, RoomId, RoomOrAliasId};
use crate::{admin_command, admin_command_dispatch, get_room_info};
@ -155,7 +156,10 @@ async fn ban_room(&self, room: OwnedRoomOrAliasId) -> Result {
evicting admins too)",
);
if let Err(e) = leave_room(self.services, user_id, &room_id, None).await {
if let Err(e) = leave_room(self.services, user_id, &room_id, None)
.boxed()
.await
{
warn!("Failed to leave room: {e}");
}
@ -323,7 +327,10 @@ async fn ban_list_of_rooms(&self) -> Result {
evicting admins too)",
);
if let Err(e) = leave_room(self.services, user_id, &room_id, None).await {
if let Err(e) = leave_room(self.services, user_id, &room_id, None)
.boxed()
.await
{
warn!("Failed to leave room: {e}");
}

View file

@ -9,6 +9,7 @@ use conduwuit::{
};
use conduwuit_api::client::{leave_all_rooms, update_avatar_url, update_displayname};
use futures::StreamExt;
use futures::FutureExt;
use ruma::{
OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedUserId, UserId,
events::{
@ -224,6 +225,47 @@ pub(super) async fn deactivate(&self, no_leave_rooms: bool, user_id: String) ->
.await
}
#[admin_command]
pub(super) async fn suspend(&self, user_id: String) -> Result {
let user_id = parse_local_user_id(self.services, &user_id)?;
if user_id == self.services.globals.server_user {
return Err!("Not allowed to suspend the server service account.",);
}
if !self.services.users.exists(&user_id).await {
return Err!("User {user_id} does not exist.");
}
if self.services.users.is_admin(&user_id).await {
return Err!("Admin users cannot be suspended.");
}
// TODO: Record the actual user that sent the suspension where possible
self.services
.users
.suspend_account(&user_id, self.sender_or_service_user())
.await;
self.write_str(&format!("User {user_id} has been suspended."))
.await
}
#[admin_command]
pub(super) async fn unsuspend(&self, user_id: String) -> Result {
let user_id = parse_local_user_id(self.services, &user_id)?;
if user_id == self.services.globals.server_user {
return Err!("Not allowed to unsuspend the server service account.",);
}
if !self.services.users.exists(&user_id).await {
return Err!("User {user_id} does not exist.");
}
self.services.users.unsuspend_account(&user_id).await;
self.write_str(&format!("User {user_id} has been unsuspended."))
.await
}
#[admin_command]
pub(super) async fn reset_password(&self, username: String, password: Option<String>) -> Result {
let user_id = parse_local_user_id(self.services, &username)?;
@ -655,7 +697,9 @@ pub(super) async fn force_leave_room(
return Err!("{user_id} is not joined in the room");
}
leave_room(self.services, &user_id, &room_id, None).await?;
leave_room(self.services, &user_id, &room_id, None)
.boxed()
.await?;
self.write_str(&format!("{user_id} has left {room_id}.",))
.await

View file

@ -59,6 +59,28 @@ pub(super) enum UserCommand {
force: bool,
},
/// - Suspend a user
///
/// Suspended users are able to log in, sync, and read messages, but are not
/// able to send events nor redact them, cannot change their profile, and
/// are unable to join, invite to, or knock on rooms.
///
/// Suspended users can still leave rooms and deactivate their account.
/// Suspending them effectively makes them read-only.
Suspend {
/// Username of the user to suspend
user_id: String,
},
/// - Unsuspend a user
///
/// Reverses the effects of the `suspend` command, allowing the user to send
/// messages, change their profile, create room invites, etc.
Unsuspend {
/// Username of the user to unsuspend
user_id: String,
},
/// - List local users in the database
#[clap(alias = "list")]
ListUsers,

View file

@ -763,7 +763,9 @@ pub(crate) async fn deactivate_route(
super::update_displayname(&services, sender_user, None, &all_joined_rooms).await;
super::update_avatar_url(&services, sender_user, None, None, &all_joined_rooms).await;
full_user_deactivate(&services, sender_user, &all_joined_rooms).await?;
full_user_deactivate(&services, sender_user, &all_joined_rooms)
.boxed()
.await?;
info!("User {sender_user} deactivated their account.");
@ -915,7 +917,9 @@ pub async fn full_user_deactivate(
}
}
super::leave_all_rooms(services, user_id).await;
super::leave_all_rooms(services, user_id)
.boxed()
.await;
Ok(())
}

View file

@ -18,6 +18,9 @@ pub(crate) async fn create_alias_route(
body: Ruma<create_alias::v3::Request>,
) -> Result<create_alias::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
if services.users.is_suspended(sender_user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
services
.rooms
@ -63,6 +66,9 @@ pub(crate) async fn delete_alias_route(
body: Ruma<delete_alias::v3::Request>,
) -> Result<delete_alias::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
if services.users.is_suspended(sender_user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
services
.rooms

View file

@ -84,11 +84,25 @@ pub(crate) async fn get_context_route(
let base_event = ignored_filter(&services, (base_count, base_pdu), sender_user);
// PDUs are used to get seen user IDs and then returned in response.
let events_before = services
.rooms
.timeline
.pdus_rev(Some(sender_user), room_id, Some(base_count))
.pdus_rev(room_id, Some(base_count))
.ignore_err()
.then(async |mut pdu| {
pdu.1.set_unsigned(Some(sender_user));
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
.await
{
debug_warn!("Failed to add bundled aggregations: {e}");
}
pdu
})
.ready_filter_map(|item| event_filter(item, filter))
.wide_filter_map(|item| ignored_filter(&services, item, sender_user))
.wide_filter_map(|item| visibility_filter(&services, item, sender_user))
@ -98,8 +112,20 @@ pub(crate) async fn get_context_route(
let events_after = services
.rooms
.timeline
.pdus(Some(sender_user), room_id, Some(base_count))
.pdus(room_id, Some(base_count))
.ignore_err()
.then(async |mut pdu| {
pdu.1.set_unsigned(Some(sender_user));
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
.await
{
debug_warn!("Failed to add bundled aggregations: {e}");
}
pdu
})
.ready_filter_map(|item| event_filter(item, filter))
.wide_filter_map(|item| ignored_filter(&services, item, sender_user))
.wide_filter_map(|item| visibility_filter(&services, item, sender_user))

View file

@ -128,6 +128,9 @@ pub(crate) async fn set_room_visibility_route(
// Return 404 if the room doesn't exist
return Err!(Request(NotFound("Room not found")));
}
if services.users.is_suspended(sender_user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
if services
.users

View file

@ -52,6 +52,9 @@ pub(crate) async fn create_content_route(
body: Ruma<create_content::v3::Request>,
) -> Result<create_content::v3::Response> {
let user = body.sender_user.as_ref().expect("user is authenticated");
if services.users.is_suspended(user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
let filename = body.filename.as_deref();
let content_type = body.content_type.as_deref();

View file

@ -114,7 +114,9 @@ async fn banned_room_check(
.collect()
.await;
full_user_deactivate(services, user_id, &all_joined_rooms).await?;
full_user_deactivate(services, user_id, &all_joined_rooms)
.boxed()
.await?;
}
return Err!(Request(Forbidden("This room is banned on this homeserver.")));
@ -153,7 +155,9 @@ async fn banned_room_check(
.collect()
.await;
full_user_deactivate(services, user_id, &all_joined_rooms).await?;
full_user_deactivate(services, user_id, &all_joined_rooms)
.boxed()
.await?;
}
return Err!(Request(Forbidden("This remote server is banned on this homeserver.")));
@ -178,6 +182,9 @@ pub(crate) async fn join_room_by_id_route(
body: Ruma<join_room_by_id::v3::Request>,
) -> Result<join_room_by_id::v3::Response> {
let sender_user = body.sender_user();
if services.users.is_suspended(sender_user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
banned_room_check(
&services,
@ -249,6 +256,9 @@ pub(crate) async fn join_room_by_id_or_alias_route(
let sender_user = body.sender_user.as_deref().expect("user is authenticated");
let appservice_info = &body.appservice_info;
let body = body.body;
if services.users.is_suspended(sender_user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
let (servers, room_id) = match OwnedRoomId::try_from(body.room_id_or_alias) {
| Ok(room_id) => {
@ -259,6 +269,7 @@ pub(crate) async fn join_room_by_id_or_alias_route(
room_id.server_name(),
client,
)
.boxed()
.await?;
let mut servers = body.via.clone();
@ -369,6 +380,9 @@ pub(crate) async fn knock_room_route(
) -> Result<knock_room::v3::Response> {
let sender_user = body.sender_user();
let body = &body.body;
if services.users.is_suspended(sender_user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
let (servers, room_id) = match OwnedRoomId::try_from(body.room_id_or_alias.clone()) {
| Ok(room_id) => {
@ -478,6 +492,7 @@ pub(crate) async fn leave_room_route(
body: Ruma<leave_room::v3::Request>,
) -> Result<leave_room::v3::Response> {
leave_room(&services, body.sender_user(), &body.room_id, body.reason.clone())
.boxed()
.await
.map(|()| leave_room::v3::Response::new())
}
@ -492,6 +507,9 @@ pub(crate) async fn invite_user_route(
body: Ruma<invite_user::v3::Request>,
) -> Result<invite_user::v3::Response> {
let sender_user = body.sender_user();
if services.users.is_suspended(sender_user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
if !services.users.is_admin(sender_user).await && services.config.block_non_admin_invites {
debug_error!(
@ -566,6 +584,10 @@ pub(crate) async fn kick_user_route(
State(services): State<crate::State>,
body: Ruma<kick_user::v3::Request>,
) -> Result<kick_user::v3::Response> {
let sender_user = body.sender_user();
if services.users.is_suspended(sender_user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
let state_lock = services.rooms.state.mutex.lock(&body.room_id).await;
let Ok(event) = services
@ -601,7 +623,7 @@ pub(crate) async fn kick_user_route(
third_party_invite: None,
..event
}),
body.sender_user(),
sender_user,
&body.room_id,
&state_lock,
)
@ -625,6 +647,10 @@ pub(crate) async fn ban_user_route(
return Err!(Request(Forbidden("You cannot ban yourself.")));
}
if services.users.is_suspended(sender_user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
let state_lock = services.rooms.state.mutex.lock(&body.room_id).await;
let current_member_content = services
@ -646,6 +672,7 @@ pub(crate) async fn ban_user_route(
is_direct: None,
join_authorized_via_users_server: None,
third_party_invite: None,
redact_events: body.redact_events,
..current_member_content
}),
sender_user,
@ -666,6 +693,10 @@ pub(crate) async fn unban_user_route(
State(services): State<crate::State>,
body: Ruma<unban_user::v3::Request>,
) -> Result<unban_user::v3::Response> {
let sender_user = body.sender_user();
if services.users.is_suspended(sender_user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
let state_lock = services.rooms.state.mutex.lock(&body.room_id).await;
let current_member_content = services
@ -694,7 +725,7 @@ pub(crate) async fn unban_user_route(
is_direct: None,
..current_member_content
}),
body.sender_user(),
sender_user,
&body.room_id,
&state_lock,
)
@ -925,24 +956,32 @@ pub async fn join_room_by_id_helper(
return Ok(join_room_by_id::v3::Response { room_id: room_id.into() });
}
if let Ok(membership) = services
.rooms
.state_accessor
.get_member(room_id, sender_user)
.await
{
if membership.membership == MembershipState::Ban {
debug_warn!("{sender_user} is banned from {room_id} but attempted to join");
return Err!(Request(Forbidden("You are banned from the room.")));
}
}
let server_in_room = services
.rooms
.state_cache
.server_in_room(services.globals.server_name(), room_id)
.await;
// Only check our known membership if we're already in the room.
// See: https://forgejo.ellis.link/continuwuation/continuwuity/issues/855
let membership = if server_in_room {
services
.rooms
.state_accessor
.get_member(room_id, sender_user)
.await
} else {
debug!("Ignoring local state for join {room_id}, we aren't in the room yet.");
Ok(RoomMemberEventContent::new(MembershipState::Leave))
};
if let Ok(m) = membership {
if m.membership == MembershipState::Ban {
debug_warn!("{sender_user} is banned from {room_id} but attempted to join");
// TODO: return reason
return Err!(Request(Forbidden("You are banned from the room.")));
}
}
let local_join = server_in_room
|| servers.is_empty()
|| (servers.len() == 1 && services.globals.server_is_ours(&servers[0]));
@ -1243,6 +1282,7 @@ async fn join_room_by_id_helper_remote(
services.rooms.timeline.get_pdu(event_id).await.ok()
};
debug!("running stateres check on send_join parsed PDU");
let auth_check = state_res::event_auth::auth_check(
&state_res::RoomVersion::new(&room_version_id)?,
&parsed_join_pdu,
@ -1792,7 +1832,10 @@ pub async fn leave_all_rooms(services: &Services, user_id: &UserId) {
for room_id in all_rooms {
// ignore errors
if let Err(e) = leave_room(services, user_id, &room_id, None).await {
if let Err(e) = leave_room(services, user_id, &room_id, None)
.boxed()
.await
{
warn!(%user_id, "Failed to leave {room_id} remotely: {e}");
}
@ -1815,6 +1858,7 @@ pub async fn leave_room(
displayname: None,
third_party_invite: None,
blurhash: None,
redact_events: None,
};
let is_banned = services.rooms.metadata.is_banned(room_id);

View file

@ -2,7 +2,7 @@ use core::panic;
use axum::extract::State;
use conduwuit::{
Err, Result, at,
Err, Result, at, debug_warn,
matrix::{
Event,
pdu::{PduCount, PduEvent},
@ -114,14 +114,14 @@ pub(crate) async fn get_message_events_route(
| Direction::Forward => services
.rooms
.timeline
.pdus(Some(sender_user), room_id, Some(from))
.pdus(room_id, Some(from))
.ignore_err()
.boxed(),
| Direction::Backward => services
.rooms
.timeline
.pdus_rev(Some(sender_user), room_id, Some(from))
.pdus_rev(room_id, Some(from))
.ignore_err()
.boxed(),
};
@ -132,6 +132,18 @@ pub(crate) async fn get_message_events_route(
.wide_filter_map(|item| ignored_filter(&services, item, sender_user))
.wide_filter_map(|item| visibility_filter(&services, item, sender_user))
.take(limit)
.then(async |mut pdu| {
pdu.1.set_unsigned(Some(sender_user));
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
.await
{
debug_warn!("Failed to add bundled aggregations: {e}");
}
pdu
})
.collect()
.await;

View file

@ -36,6 +36,9 @@ pub(crate) async fn set_displayname_route(
body: Ruma<set_display_name::v3::Request>,
) -> Result<set_display_name::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
if services.users.is_suspended(sender_user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
if *sender_user != body.user_id && body.appservice_info.is_none() {
return Err!(Request(Forbidden("You cannot update the profile of another user")));
@ -125,6 +128,9 @@ pub(crate) async fn set_avatar_url_route(
body: Ruma<set_avatar_url::v3::Request>,
) -> Result<set_avatar_url::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
if services.users.is_suspended(sender_user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
if *sender_user != body.user_id && body.appservice_info.is_none() {
return Err!(Request(Forbidden("You cannot update the profile of another user")));
@ -343,6 +349,7 @@ pub async fn update_displayname(
reason: None,
is_direct: None,
third_party_invite: None,
redact_events: None,
});
Ok((pdu, room_id))
@ -396,6 +403,7 @@ pub async fn update_avatar_url(
reason: None,
is_direct: None,
third_party_invite: None,
redact_events: None,
});
Ok((pdu, room_id))

View file

@ -58,29 +58,34 @@ pub(crate) async fn set_read_marker_route(
}
if let Some(event) = &body.read_receipt {
let receipt_content = BTreeMap::from_iter([(
event.to_owned(),
BTreeMap::from_iter([(
ReceiptType::Read,
BTreeMap::from_iter([(sender_user.to_owned(), ruma::events::receipt::Receipt {
ts: Some(MilliSecondsSinceUnixEpoch::now()),
thread: ReceiptThread::Unthreaded,
})]),
)]),
)]);
if !services.users.is_suspended(sender_user).await? {
let receipt_content = BTreeMap::from_iter([(
event.to_owned(),
BTreeMap::from_iter([(
ReceiptType::Read,
BTreeMap::from_iter([(
sender_user.to_owned(),
ruma::events::receipt::Receipt {
ts: Some(MilliSecondsSinceUnixEpoch::now()),
thread: ReceiptThread::Unthreaded,
},
)]),
)]),
)]);
services
.rooms
.read_receipt
.readreceipt_update(
sender_user,
&body.room_id,
&ruma::events::receipt::ReceiptEvent {
content: ruma::events::receipt::ReceiptEventContent(receipt_content),
room_id: body.room_id.clone(),
},
)
.await;
services
.rooms
.read_receipt
.readreceipt_update(
sender_user,
&body.room_id,
&ruma::events::receipt::ReceiptEvent {
content: ruma::events::receipt::ReceiptEventContent(receipt_content),
room_id: body.room_id.clone(),
},
)
.await;
}
}
if let Some(event) = &body.private_read_receipt {

View file

@ -1,5 +1,5 @@
use axum::extract::State;
use conduwuit::{Result, matrix::pdu::PduBuilder};
use conduwuit::{Err, Result, matrix::pdu::PduBuilder};
use ruma::{
api::client::redact::redact_event, events::room::redaction::RoomRedactionEventContent,
};
@ -17,6 +17,10 @@ pub(crate) async fn redact_event_route(
) -> Result<redact_event::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let body = body.body;
if services.users.is_suspended(sender_user).await? {
// TODO: Users can redact their own messages while suspended
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
let state_lock = services.rooms.state.mutex.lock(&body.room_id).await;

View file

@ -1,6 +1,6 @@
use axum::extract::State;
use conduwuit::{
Result, at,
Result, at, debug_warn,
matrix::pdu::PduCount,
utils::{IterStream, ReadyExt, result::FlatOk, stream::WidebandExt},
};
@ -149,6 +149,17 @@ async fn paginate_relations_with_filter(
.ready_take_while(|(count, _)| Some(*count) != to)
.wide_filter_map(|item| visibility_filter(services, sender_user, item))
.take(limit)
.then(async |mut pdu| {
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
.await
{
debug_warn!("Failed to add bundled aggregations to relation: {e}");
}
pdu
})
.collect()
.await;
@ -172,6 +183,10 @@ async fn paginate_relations_with_filter(
})
}
// TODO: Can we move the visibility filter lower down, to avoid checking events
// that won't be sent? At the moment this also results in getting events that
// appear to have no relation because intermediaries are not visible to the
// user.
async fn visibility_filter(
services: &Services,
sender_user: &UserId,

View file

@ -70,6 +70,10 @@ pub(crate) async fn create_room_route(
));
}
if services.users.is_suspended(sender_user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
let room_id: OwnedRoomId = match &body.room_id {
| Some(custom_room_id) => custom_room_id_check(&services, custom_room_id)?,
| _ => RoomId::new(&services.server.name),

View file

@ -1,5 +1,5 @@
use axum::extract::State;
use conduwuit::{Err, Event, Result, err};
use conduwuit::{Err, Event, Result, debug_warn, err};
use futures::{FutureExt, TryFutureExt, future::try_join};
use ruma::api::client::room::get_room_event;
@ -38,7 +38,16 @@ pub(crate) async fn get_room_event_route(
"Fetched PDU must match requested"
);
event.add_age().ok();
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(body.sender_user(), &mut event)
.await
{
debug_warn!("Failed to add bundled aggregations to event: {e}");
}
event.set_unsigned(body.sender_user.as_deref());
Ok(get_room_event::v3::Response { event: event.into_room_event() })
}

View file

@ -1,6 +1,6 @@
use axum::extract::State;
use conduwuit::{
Err, PduEvent, Result, at,
Err, PduEvent, Result, at, debug_warn,
utils::{BoolExt, stream::TryTools},
};
use futures::TryStreamExt;
@ -25,12 +25,28 @@ pub(crate) async fn room_initial_sync_route(
return Err!(Request(Forbidden("No room preview available.")));
}
// Events are returned in body
let limit = LIMIT_MAX;
let events: Vec<_> = services
.rooms
.timeline
.pdus_rev(None, room_id, None)
.pdus_rev(room_id, None)
.try_take(limit)
.and_then(async |mut pdu| {
pdu.1.set_unsigned(body.sender_user.as_deref());
if let Some(sender_user) = body.sender_user.as_deref() {
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
.await
{
debug_warn!("Failed to add bundled aggregations: {e}");
}
}
Ok(pdu)
})
.try_collect()
.await?;

View file

@ -2,7 +2,7 @@ use std::cmp::max;
use axum::extract::State;
use conduwuit::{
Error, Result, err, info,
Err, Error, Result, err, info,
matrix::{StateKey, pdu::PduBuilder},
};
use futures::StreamExt;
@ -63,6 +63,10 @@ pub(crate) async fn upgrade_room_route(
));
}
if services.users.is_suspended(sender_user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
// Create a replacement room
let replacement_room = RoomId::new(services.globals.server_name());
@ -189,6 +193,7 @@ pub(crate) async fn upgrade_room_route(
blurhash: services.users.blurhash(sender_user).await.ok(),
reason: None,
join_authorized_via_users_server: None,
redact_events: None,
})
.expect("event is valid, we just created it"),
unsigned: None,

View file

@ -2,7 +2,7 @@ use std::collections::BTreeMap;
use axum::extract::State;
use conduwuit::{
Err, Result, at, is_true,
Err, Result, at, debug_warn, is_true,
matrix::pdu::PduEvent,
result::FlatOk,
utils::{IterStream, stream::ReadyExt},
@ -144,6 +144,17 @@ async fn category_room_events(
.map(at!(2))
.flatten()
.stream()
.then(|mut pdu| async {
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu)
.await
{
debug_warn!("Failed to add bundled aggregations to search result: {e}");
}
pdu
})
.map(PduEvent::into_room_event)
.map(|result| SearchResult {
rank: None,

View file

@ -23,6 +23,9 @@ pub(crate) async fn send_message_event_route(
let sender_user = body.sender_user();
let sender_device = body.sender_device.as_deref();
let appservice_info = body.appservice_info.as_ref();
if services.users.is_suspended(sender_user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
// Forbid m.room.encrypted if encryption is disabled
if MessageLikeEventType::RoomEncrypted == body.event_type && !services.config.allow_encryption

View file

@ -6,6 +6,7 @@ use conduwuit::{
};
use conduwuit_service::Services;
use futures::TryStreamExt;
use futures::FutureExt;
use ruma::{
OwnedEventId, RoomId, UserId,
api::client::state::{get_state_events, get_state_events_for_key, send_state_event},
@ -33,6 +34,10 @@ pub(crate) async fn send_state_event_for_key_route(
) -> Result<send_state_event::v3::Response> {
let sender_user = body.sender_user();
if services.users.is_suspended(sender_user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
Ok(send_state_event::v3::Response {
event_id: send_state_event_for_key_helper(
&services,
@ -59,6 +64,7 @@ pub(crate) async fn send_state_event_for_empty_key_route(
body: Ruma<send_state_event::v3::Request>,
) -> Result<RumaResponse<send_state_event::v3::Response>> {
send_state_event_for_key_route(State(services), body)
.boxed()
.await
.map(RumaResponse)
}

View file

@ -3,7 +3,7 @@ mod v4;
mod v5;
use conduwuit::{
Error, PduCount, Result,
Error, PduCount, Result, debug_warn,
matrix::pdu::PduEvent,
utils::stream::{BroadbandExt, ReadyExt, TryIgnore},
};
@ -31,11 +31,7 @@ async fn load_timeline(
next_batch: Option<PduCount>,
limit: usize,
) -> Result<(Vec<(PduCount, PduEvent)>, bool), Error> {
let last_timeline_count = services
.rooms
.timeline
.last_timeline_count(Some(sender_user), room_id)
.await?;
let last_timeline_count = services.rooms.timeline.last_timeline_count(room_id).await?;
if last_timeline_count <= roomsincecount {
return Ok((Vec::new(), false));
@ -44,10 +40,25 @@ async fn load_timeline(
let non_timeline_pdus = services
.rooms
.timeline
.pdus_rev(Some(sender_user), room_id, None)
.pdus_rev(room_id, None)
.ignore_err()
.ready_skip_while(|&(pducount, _)| pducount > next_batch.unwrap_or_else(PduCount::max))
.ready_take_while(|&(pducount, _)| pducount > roomsincecount);
.ready_take_while(|&(pducount, _)| pducount > roomsincecount)
.map(move |mut pdu| {
pdu.1.set_unsigned(Some(sender_user));
pdu
})
.then(async move |mut pdu| {
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
.await
{
debug_warn!("Failed to add bundled aggregations: {e}");
}
pdu
});
// Take the last events for the timeline
pin_mut!(non_timeline_pdus);

View file

@ -1189,7 +1189,7 @@ async fn calculate_heroes(
services
.rooms
.timeline
.all_pdus(sender_user, room_id)
.all_pdus(room_id)
.ready_filter(|(_, pdu)| pdu.kind == RoomMember)
.fold_default(|heroes: Vec<_>, (_, pdu)| {
fold_hero(heroes, services, room_id, sender_user, pdu)

View file

@ -1,6 +1,6 @@
use axum::extract::State;
use conduwuit::{
Result, at,
Result, at, debug_warn,
matrix::pdu::{PduCount, PduEvent},
};
use futures::StreamExt;
@ -28,6 +28,8 @@ pub(crate) async fn get_threads_route(
.transpose()?
.unwrap_or_else(PduCount::max);
// TODO: user_can_see_event and set_unsigned should be at the same level /
// function, so unsigned is only set for seen events.
let threads: Vec<(PduCount, PduEvent)> = services
.rooms
.threads
@ -42,6 +44,17 @@ pub(crate) async fn get_threads_route(
.await
.then_some((count, pdu))
})
.then(|(count, mut pdu)| async move {
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(body.sender_user(), &mut pdu)
.await
{
debug_warn!("Failed to add bundled aggregations to thread: {e}");
}
(count, pdu)
})
.collect()
.await;

View file

@ -26,41 +26,42 @@ pub(crate) async fn create_typing_event_route(
{
return Err!(Request(Forbidden("You are not in this room.")));
}
match body.state {
| Typing::Yes(duration) => {
let duration = utils::clamp(
duration.as_millis().try_into().unwrap_or(u64::MAX),
if !services.users.is_suspended(sender_user).await? {
match body.state {
| Typing::Yes(duration) => {
let duration = utils::clamp(
duration.as_millis().try_into().unwrap_or(u64::MAX),
services
.server
.config
.typing_client_timeout_min_s
.try_mul(1000)?,
services
.server
.config
.typing_client_timeout_max_s
.try_mul(1000)?,
);
services
.server
.config
.typing_client_timeout_min_s
.try_mul(1000)?,
.rooms
.typing
.typing_add(
sender_user,
&body.room_id,
utils::millis_since_unix_epoch()
.checked_add(duration)
.expect("user typing timeout should not get this high"),
)
.await?;
},
| _ => {
services
.server
.config
.typing_client_timeout_max_s
.try_mul(1000)?,
);
services
.rooms
.typing
.typing_add(
sender_user,
&body.room_id,
utils::millis_since_unix_epoch()
.checked_add(duration)
.expect("user typing timeout should not get this high"),
)
.await?;
},
| _ => {
services
.rooms
.typing
.typing_remove(sender_user, &body.room_id)
.await?;
},
.rooms
.typing
.typing_remove(sender_user, &body.room_id)
.await?;
},
}
}
// ping presence

View file

@ -3,6 +3,7 @@ use std::cmp;
use axum::extract::State;
use conduwuit::{
PduCount, Result,
result::LogErr,
utils::{IterStream, ReadyExt, stream::TryTools},
};
use futures::{FutureExt, StreamExt, TryStreamExt};
@ -62,7 +63,7 @@ pub(crate) async fn get_backfill_route(
pdus: services
.rooms
.timeline
.pdus_rev(None, &body.room_id, Some(from.saturating_add(1)))
.pdus_rev(&body.room_id, Some(from.saturating_add(1)))
.try_take(limit)
.try_filter_map(|(_, pdu)| async move {
Ok(services
@ -72,6 +73,15 @@ pub(crate) async fn get_backfill_route(
.await
.then_some(pdu))
})
.and_then(async |mut pdu| {
// Strip the transaction ID, as that is private
pdu.remove_transaction_id().log_err().ok();
// Add age, as this is specified
pdu.add_age().log_err().ok();
// It's not clear if we should strip or add any more data, leave as is.
// In particular: Redaction?
Ok(pdu)
})
.try_filter_map(|pdu| async move {
Ok(services
.rooms

View file

@ -1155,7 +1155,7 @@ pub struct Config {
/// 3 to 5 = Statistics with possible performance impact.
/// 6 = All statistics.
///
/// default: 1
/// default: 3
#[serde(default = "default_rocksdb_stats_level")]
pub rocksdb_stats_level: u8,
@ -1823,12 +1823,10 @@ pub struct Config {
pub stream_amplification: usize,
/// Number of sender task workers; determines sender parallelism. Default is
/// '0' which means the value is determined internally, likely matching the
/// number of tokio worker-threads or number of cores, etc. Override by
/// setting a non-zero value.
/// core count. Override by setting a different value.
///
/// default: 0
#[serde(default)]
/// default: core count
#[serde(default = "default_sender_workers")]
pub sender_workers: usize,
/// Enables listener sockets; can be set to false to disable listening. This
@ -2059,45 +2057,48 @@ fn default_database_backups_to_keep() -> i16 { 1 }
fn default_db_write_buffer_capacity_mb() -> f64 { 48.0 + parallelism_scaled_f64(4.0) }
fn default_db_cache_capacity_mb() -> f64 { 128.0 + parallelism_scaled_f64(64.0) }
fn default_db_cache_capacity_mb() -> f64 { 512.0 + parallelism_scaled_f64(512.0) }
fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(10_000).saturating_add(100_000) }
fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(50_000).saturating_add(100_000) }
fn default_cache_capacity_modifier() -> f64 { 1.0 }
fn default_auth_chain_cache_capacity() -> u32 {
parallelism_scaled_u32(10_000).saturating_add(100_000)
}
fn default_shorteventid_cache_capacity() -> u32 {
parallelism_scaled_u32(50_000).saturating_add(100_000)
}
fn default_shorteventid_cache_capacity() -> u32 {
parallelism_scaled_u32(100_000).saturating_add(100_000)
}
fn default_eventidshort_cache_capacity() -> u32 {
parallelism_scaled_u32(25_000).saturating_add(100_000)
parallelism_scaled_u32(50_000).saturating_add(100_000)
}
fn default_eventid_pdu_cache_capacity() -> u32 {
parallelism_scaled_u32(25_000).saturating_add(100_000)
parallelism_scaled_u32(50_000).saturating_add(100_000)
}
fn default_shortstatekey_cache_capacity() -> u32 {
parallelism_scaled_u32(10_000).saturating_add(100_000)
parallelism_scaled_u32(50_000).saturating_add(100_000)
}
fn default_statekeyshort_cache_capacity() -> u32 {
parallelism_scaled_u32(10_000).saturating_add(100_000)
parallelism_scaled_u32(50_000).saturating_add(100_000)
}
fn default_servernameevent_data_cache_capacity() -> u32 {
parallelism_scaled_u32(100_000).saturating_add(500_000)
parallelism_scaled_u32(100_000).saturating_add(100_000)
}
fn default_stateinfo_cache_capacity() -> u32 { parallelism_scaled_u32(100) }
fn default_stateinfo_cache_capacity() -> u32 {
parallelism_scaled_u32(500).clamp(100, 12000)
}
fn default_roomid_spacehierarchy_cache_capacity() -> u32 { parallelism_scaled_u32(1000) }
fn default_roomid_spacehierarchy_cache_capacity() -> u32 {
parallelism_scaled_u32(500).clamp(100, 12000) }
fn default_dns_cache_entries() -> u32 { 32768 }
fn default_dns_cache_entries() -> u32 { 327680 }
fn default_dns_min_ttl() -> u64 { 60 * 180 }
@ -2199,7 +2200,7 @@ fn default_typing_client_timeout_max_s() -> u64 { 45 }
fn default_rocksdb_recovery_mode() -> u8 { 1 }
fn default_rocksdb_log_level() -> String { "error".to_owned() }
fn default_rocksdb_log_level() -> String { "info".to_owned() }
fn default_rocksdb_log_time_to_roll() -> usize { 0 }
@ -2231,7 +2232,7 @@ fn default_rocksdb_compression_level() -> i32 { 32767 }
#[allow(clippy::doc_markdown)]
fn default_rocksdb_bottommost_compression_level() -> i32 { 32767 }
fn default_rocksdb_stats_level() -> u8 { 1 }
fn default_rocksdb_stats_level() -> u8 { 3 }
// I know, it's a great name
#[must_use]
@ -2286,14 +2287,13 @@ fn default_admin_log_capture() -> String {
fn default_admin_room_tag() -> String { "m.server_notice".to_owned() }
#[allow(clippy::as_conversions, clippy::cast_precision_loss)]
fn parallelism_scaled_f64(val: f64) -> f64 { val * (sys::available_parallelism() as f64) }
pub fn parallelism_scaled_f64(val: f64) -> f64 { val * (sys::available_parallelism() as f64) }
fn parallelism_scaled_u32(val: u32) -> u32 {
let val = val.try_into().expect("failed to cast u32 to usize");
parallelism_scaled(val).try_into().unwrap_or(u32::MAX)
}
pub fn parallelism_scaled_u32(val: u32) -> u32 { val.saturating_mul(sys::available_parallelism() as u32) }
fn parallelism_scaled(val: usize) -> usize { val.saturating_mul(sys::available_parallelism()) }
pub fn parallelism_scaled_i32(val: i32) -> i32 { val.saturating_mul(sys::available_parallelism() as i32) }
pub fn parallelism_scaled(val: usize) -> usize { val.saturating_mul(sys::available_parallelism()) }
fn default_trusted_server_batch_size() -> usize { 256 }
@ -2313,6 +2313,8 @@ fn default_stream_width_scale() -> f32 { 1.0 }
fn default_stream_amplification() -> usize { 1024 }
fn default_sender_workers() -> usize { parallelism_scaled(1) }
fn default_client_receive_timeout() -> u64 { 75 }
fn default_client_request_timeout() -> u64 { 180 }

View file

@ -1,11 +1,24 @@
use std::collections::BTreeMap;
use std::{borrow::Borrow, collections::BTreeMap};
use ruma::MilliSecondsSinceUnixEpoch;
use serde::Deserialize;
use serde_json::value::{RawValue as RawJsonValue, Value as JsonValue, to_raw_value};
use super::Pdu;
use crate::{Result, err, implement, is_true};
use crate::{Result, err, implement, is_true, result::LogErr};
/// Set the `unsigned` field of the PDU using only information in the PDU.
/// Some unsigned data is already set within the database (eg. prev events,
/// threads). Once this is done, other data must be calculated from the database
/// (eg. relations) This is for server-to-client events.
/// Backfill handles this itself.
#[implement(Pdu)]
pub fn set_unsigned(&mut self, user_id: Option<&ruma::UserId>) {
if Some(self.sender.borrow()) != user_id {
self.remove_transaction_id().log_err().ok();
}
self.add_age().log_err().ok();
}
#[implement(Pdu)]
pub fn remove_transaction_id(&mut self) -> Result {

View file

@ -13,6 +13,7 @@ use ruma::{
power_levels::RoomPowerLevelsEventContent,
third_party_invite::RoomThirdPartyInviteEventContent,
},
EventId,
int,
serde::{Base64, Raw},
};
@ -21,7 +22,6 @@ use serde::{
de::{Error as _, IgnoredAny},
};
use serde_json::{from_str as from_json_str, value::RawValue as RawJsonValue};
use super::{
Error, Event, Result, StateEventType, StateKey, TimelineEventType,
power_levels::{
@ -217,8 +217,9 @@ where
}
/*
// TODO: In the past this code caused problems federating with synapse, maybe this has been
// resolved already. Needs testing.
// TODO: In the past this code was commented as it caused problems with Synapse. This is no
// longer the case. This needs to be implemented.
// See also: https://github.com/ruma/ruma/pull/2064
//
// 2. Reject if auth_events
// a. auth_events cannot have duplicate keys since it's a BTree
@ -241,20 +242,46 @@ where
}
*/
let (room_create_event, power_levels_event, sender_member_event) = join3(
fetch_state(&StateEventType::RoomCreate, ""),
fetch_state(&StateEventType::RoomPowerLevels, ""),
fetch_state(&StateEventType::RoomMember, sender.as_str()),
)
.await;
// let (room_create_event, power_levels_event, sender_member_event) = join3(
// fetch_state(&StateEventType::RoomCreate, ""),
// fetch_state(&StateEventType::RoomPowerLevels, ""),
// fetch_state(&StateEventType::RoomMember, sender.as_str()),
// )
// .await;
let room_create_event = fetch_state(&StateEventType::RoomCreate, "").await;
let power_levels_event = fetch_state(&StateEventType::RoomPowerLevels, "").await;
let sender_member_event = fetch_state(&StateEventType::RoomMember, sender.as_str()).await;
let room_create_event = match room_create_event {
| None => {
warn!("no m.room.create event in auth chain");
error!(
create_event = room_create_event.as_ref().map(Event::event_id).unwrap_or(<&EventId>::try_from("$unknown").unwrap()).as_str(),
power_levels = power_levels_event.as_ref().map(Event::event_id).unwrap_or(<&EventId>::try_from("$unknown").unwrap()).as_str(),
member_event = sender_member_event.as_ref().map(Event::event_id).unwrap_or(<&EventId>::try_from("$unknown").unwrap()).as_str(),
"no m.room.create event found for {} ({})!",
incoming_event.event_id().as_str(),
incoming_event.room_id().as_str()
);
return Ok(false);
},
| Some(e) => e,
};
// just re-check 1.2 to work around a bug
let Some(room_id_server_name) = incoming_event.room_id().server_name() else {
warn!("room ID has no servername");
return Ok(false);
};
if room_id_server_name != room_create_event.sender().server_name() {
warn!(
"servername of room ID origin ({}) does not match servername of m.room.create \
sender ({})",
room_id_server_name,
room_create_event.sender().server_name()
);
return Ok(false);
}
// 3. If event does not have m.room.create in auth_events reject
if !incoming_event

View file

@ -609,7 +609,7 @@ where
let fetch_state = |ty: &StateEventType, key: &str| {
future::ready(auth_state.get(&ty.with_state_key(key)))
};
debug!("running auth check on {:?}", event.event_id());
let auth_result =
auth_check(room_version, &event, current_third_party.as_ref(), fetch_state).await;
@ -726,8 +726,12 @@ where
Fut: Future<Output = Option<E>> + Send,
E: Event + Send + Sync,
{
let mut room_id = None;
while let Some(sort_ev) = event {
debug!(event_id = sort_ev.event_id().as_str(), "mainline");
trace!(event_id = sort_ev.event_id().as_str(), "mainline");
if room_id.is_none() {
room_id = Some(sort_ev.room_id().to_owned());
}
let id = sort_ev.event_id();
if let Some(depth) = mainline_map.get(id) {
@ -746,7 +750,7 @@ where
}
}
}
// Did not find a power level event so we default to zero
warn!("could not find a power event in the mainline map for {room_id:?}, defaulting to zero depth");
Ok(0)
}

View file

@ -29,7 +29,7 @@ fn descriptor_cf_options(
set_table_options(&mut opts, &desc, cache)?;
opts.set_min_write_buffer_number(1);
opts.set_max_write_buffer_number(2);
opts.set_max_write_buffer_number(3);
opts.set_write_buffer_size(desc.write_size);
opts.set_target_file_size_base(desc.file_size);

View file

@ -1,8 +1,6 @@
use std::{cmp, convert::TryFrom};
use conduwuit::{Config, Result, utils};
use conduwuit::{Config, Result};
use rocksdb::{Cache, DBRecoveryMode, Env, LogLevel, Options, statistics::StatsLevel};
use conduwuit::config::{parallelism_scaled_i32, parallelism_scaled_u32};
use super::{cf_opts::cache_size_f64, logger::handle as handle_log};
/// Create database-wide options suitable for opening the database. This also
@ -23,8 +21,8 @@ pub(crate) fn db_options(config: &Config, env: &Env, row_cache: &Cache) -> Resul
set_logging_defaults(&mut opts, config);
// Processing
opts.set_max_background_jobs(num_threads::<i32>(config)?);
opts.set_max_subcompactions(num_threads::<u32>(config)?);
opts.set_max_background_jobs(parallelism_scaled_i32(1));
opts.set_max_subcompactions(parallelism_scaled_u32(1));
opts.set_avoid_unnecessary_blocking_io(true);
opts.set_max_file_opening_threads(0);
@ -126,15 +124,3 @@ fn set_logging_defaults(opts: &mut Options, config: &Config) {
opts.set_callback_logger(rocksdb_log_level, &handle_log);
}
}
fn num_threads<T: TryFrom<usize>>(config: &Config) -> Result<T> {
const MIN_PARALLELISM: usize = 2;
let requested = if config.rocksdb_parallelism_threads != 0 {
config.rocksdb_parallelism_threads
} else {
utils::available_parallelism()
};
utils::math::try_into::<T, usize>(cmp::max(MIN_PARALLELISM, requested))
}

View file

@ -378,6 +378,10 @@ pub(super) static MAPS: &[Descriptor] = &[
name: "userid_password",
..descriptor::RANDOM
},
Descriptor {
name: "userid_suspension",
..descriptor::RANDOM_SMALL
},
Descriptor {
name: "userid_presenceid",
..descriptor::RANDOM_SMALL

View file

@ -4,7 +4,6 @@ mod execute;
mod grant;
use std::{
future::Future,
pin::Pin,
sync::{Arc, RwLock as StdRwLock, Weak},
};
@ -14,7 +13,7 @@ use conduwuit::{
Error, PduEvent, Result, Server, debug, err, error, error::default_log, pdu::PduBuilder,
};
pub use create::create_admin_room;
use futures::{FutureExt, TryFutureExt};
use futures::{Future, FutureExt, TryFutureExt};
use loole::{Receiver, Sender};
use ruma::{
OwnedEventId, OwnedRoomId, RoomId, UserId,
@ -45,11 +44,13 @@ struct Services {
services: StdRwLock<Option<Weak<crate::Services>>>,
}
/// Inputs to a command are a multi-line string and optional reply_id.
/// Inputs to a command are a multi-line string, optional reply_id, and optional
/// sender.
#[derive(Debug)]
pub struct CommandInput {
pub command: String,
pub reply_id: Option<OwnedEventId>,
pub sender: Option<Box<UserId>>,
}
/// Prototype of the tab-completer. The input is buffered text when tab
@ -162,7 +163,22 @@ impl Service {
pub fn command(&self, command: String, reply_id: Option<OwnedEventId>) -> Result<()> {
self.channel
.0
.send(CommandInput { command, reply_id })
.send(CommandInput { command, reply_id, sender: None })
.map_err(|e| err!("Failed to enqueue admin command: {e:?}"))
}
/// Posts a command to the command processor queue with sender information
/// and returns. Processing will take place on the service worker's task
/// asynchronously. Errors if the queue is full.
pub fn command_with_sender(
&self,
command: String,
reply_id: Option<OwnedEventId>,
sender: Box<UserId>,
) -> Result<()> {
self.channel
.0
.send(CommandInput { command, reply_id, sender: Some(sender) })
.map_err(|e| err!("Failed to enqueue admin command: {e:?}"))
}
@ -173,7 +189,7 @@ impl Service {
command: String,
reply_id: Option<OwnedEventId>,
) -> ProcessorResult {
self.process_command(CommandInput { command, reply_id })
self.process_command(CommandInput { command, reply_id, sender: None })
.await
}

View file

@ -306,28 +306,25 @@ impl super::Service {
#[tracing::instrument(name = "srv", level = "debug", skip(self))]
async fn query_srv_record(&self, hostname: &'_ str) -> Result<Option<FedDest>> {
let hostnames =
[format!("_matrix-fed._tcp.{hostname}."), format!("_matrix._tcp.{hostname}.")];
self.services.server.check_running()?;
for hostname in hostnames {
self.services.server.check_running()?;
debug!("querying SRV for {hostname:?}");
debug!("querying SRV for {hostname:?}");
let hostname = hostname.trim_end_matches('.');
match self.resolver.resolver.srv_lookup(hostname).await {
| Err(e) => Self::handle_resolve_error(&e, hostname)?,
| Ok(result) => {
return Ok(result.iter().next().map(|result| {
FedDest::Named(
result.target().to_string().trim_end_matches('.').to_owned(),
format!(":{}", result.port())
.as_str()
.try_into()
.unwrap_or_else(|_| FedDest::default_port()),
)
}));
},
}
let hostname_suffix = format!("_matrix-fed._tcp.{hostname}.");
let hostname = hostname_suffix.trim_end_matches('.');
match self.resolver.resolver.srv_lookup(hostname).await {
| Err(e) => Self::handle_resolve_error(&e, hostname)?,
| Ok(result) => {
return Ok(result.iter().next().map(|result| {
FedDest::Named(
result.target().to_string().trim_end_matches('.').to_owned(),
format!(":{}", result.port())
.as_str()
.try_into()
.unwrap_or_else(|_| FedDest::default_port()),
)
}));
},
}
Ok(None)

View file

@ -76,7 +76,7 @@ pub(super) async fn handle_outlier_pdu<'a>(
// 5. Reject "due to auth events" if can't get all the auth events or some of
// the auth events are also rejected "due to auth events"
// NOTE: Step 5 is not applied anymore because it failed too often
debug!("Fetching auth events");
debug!("Fetching auth events for {}", incoming_pdu.event_id);
Box::pin(self.fetch_and_handle_outliers(
origin,
&incoming_pdu.auth_events,
@ -88,12 +88,12 @@ pub(super) async fn handle_outlier_pdu<'a>(
// 6. Reject "due to auth events" if the event doesn't pass auth based on the
// auth events
debug!("Checking based on auth events");
debug!("Checking {} based on auth events", incoming_pdu.event_id);
// Build map of auth events
let mut auth_events = HashMap::with_capacity(incoming_pdu.auth_events.len());
for id in &incoming_pdu.auth_events {
let Ok(auth_event) = self.services.timeline.get_pdu(id).await else {
warn!("Could not find auth event {id}");
warn!("Could not find auth event {id} for {}", incoming_pdu.event_id);
continue;
};
@ -119,10 +119,7 @@ pub(super) async fn handle_outlier_pdu<'a>(
}
// The original create event must be in the auth events
if !matches!(
auth_events.get(&(StateEventType::RoomCreate, String::new().into())),
Some(_) | None
) {
if !auth_events.contains_key(&(StateEventType::RoomCreate, String::new().into())) {
return Err!(Request(InvalidParam("Incoming event refers to wrong create event.")));
}
@ -131,6 +128,7 @@ pub(super) async fn handle_outlier_pdu<'a>(
ready(auth_events.get(&key))
};
debug!("running auth check to handle outlier pdu {:?}", incoming_pdu.event_id);
let auth_check = state_res::event_auth::auth_check(
&to_room_version(&room_version_id),
&incoming_pdu,

View file

@ -1,12 +1,6 @@
use std::{borrow::Borrow, collections::BTreeMap, iter::once, sync::Arc, time::Instant};
use conduwuit::{
Err, Result, debug, debug_info, err, implement,
matrix::{EventTypeExt, PduEvent, StateKey, state_res},
trace,
utils::stream::{BroadbandExt, ReadyExt},
warn,
};
use conduwuit::{Err, Result, debug, debug_info, err, implement, matrix::{EventTypeExt, PduEvent, StateKey, state_res}, trace, utils::stream::{BroadbandExt, ReadyExt}, warn, info};
use futures::{FutureExt, StreamExt, future::ready};
use ruma::{CanonicalJsonValue, RoomId, ServerName, events::StateEventType};
@ -44,7 +38,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
return Err!(Request(InvalidParam("Event has been soft failed")));
}
debug!("Upgrading to timeline pdu");
debug!("Upgrading pdu {} from outlier to timeline pdu", incoming_pdu.event_id);
let timer = Instant::now();
let room_version_id = get_room_version_id(create_event)?;
@ -52,7 +46,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
// backwards extremities doing all the checks in this list starting at 1.
// These are not timeline events.
debug!("Resolving state at event");
debug!("Resolving state at event {}", incoming_pdu.event_id);
let mut state_at_incoming_event = if incoming_pdu.prev_events.len() == 1 {
self.state_at_incoming_degree_one(&incoming_pdu).await?
} else {
@ -70,7 +64,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
state_at_incoming_event.expect("we always set this to some above");
let room_version = to_room_version(&room_version_id);
debug!("Performing auth check");
debug!("Performing auth check to upgrade {}", incoming_pdu.event_id);
// 11. Check the auth of the event passes based on the state of the event
let state_fetch_state = &state_at_incoming_event;
let state_fetch = |k: StateEventType, s: StateKey| async move {
@ -80,6 +74,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
self.services.timeline.get_pdu(event_id).await.ok()
};
debug!("running auth check on {}", incoming_pdu.event_id);
let auth_check = state_res::event_auth::auth_check(
&room_version,
&incoming_pdu,
@ -93,7 +88,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
return Err!(Request(Forbidden("Event has failed auth check with state at the event.")));
}
debug!("Gathering auth events");
debug!("Gathering auth events for {}", incoming_pdu.event_id);
let auth_events = self
.services
.state
@ -111,6 +106,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
ready(auth_events.get(&key).cloned())
};
debug!("running auth check on {} with claimed state auth", incoming_pdu.event_id);
let auth_check = state_res::event_auth::auth_check(
&room_version,
&incoming_pdu,
@ -121,7 +117,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
.map_err(|e| err!(Request(Forbidden("Auth check failed: {e:?}"))))?;
// Soft fail check before doing state res
debug!("Performing soft-fail check");
debug!("Performing soft-fail check on {}", incoming_pdu.event_id);
let soft_fail = match (auth_check, incoming_pdu.redacts_id(&room_version_id)) {
| (false, _) => true,
| (true, None) => false,
@ -145,7 +141,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
let extremities: Vec<_> = self
.services
.state
.get_forward_extremities(room_id)
.get_forward_extremities(room_id, &state_lock)
.map(ToOwned::to_owned)
.ready_filter(|event_id| {
// Remove any that are referenced by this incoming event's prev_events
@ -163,6 +159,8 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
.collect()
.await;
if extremities.len() == 0 { info!("Retained zero extremities when upgrading outlier PDU to timeline PDU with {} previous events", incoming_pdu.prev_events.len()) }
debug!(
"Retained {} extremities checked against {} prev_events",
extremities.len(),
@ -218,7 +216,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
// 14. Check if the event passes auth based on the "current state" of the room,
// if not soft fail it
if soft_fail {
debug!("Soft failing event");
info!("Soft failing event {}", incoming_pdu.event_id);
let extremities = extremities.iter().map(Borrow::borrow);
self.services

View file

@ -0,0 +1,765 @@
use conduwuit::{Event, PduEvent, Result, err};
use ruma::{
EventId, RoomId, UserId,
api::Direction,
events::relation::{BundledMessageLikeRelations, BundledReference, ReferenceChunk},
};
use super::PdusIterItem;
const MAX_BUNDLED_RELATIONS: usize = 50;
impl super::Service {
/// Gets bundled aggregations for an event according to the Matrix
/// specification.
/// - m.replace relations are bundled to include the most recent replacement
/// event.
/// - m.reference relations are bundled to include a chunk of event IDs.
#[tracing::instrument(skip(self), level = "debug")]
pub async fn get_bundled_aggregations(
&self,
user_id: &UserId,
room_id: &RoomId,
event_id: &EventId,
) -> Result<Option<BundledMessageLikeRelations<Box<serde_json::value::RawValue>>>> {
let relations = self
.get_relations(
user_id,
room_id,
event_id,
conduwuit::PduCount::max(),
MAX_BUNDLED_RELATIONS,
0,
Direction::Backward,
)
.await;
// The relations database code still handles the basic unsigned data
// We don't want to recursively fetch relations
// TODO: Event visibility check
// TODO: ignored users?
if relations.is_empty() {
return Ok(None);
}
// Get the original event for validation of replacement events
let original_event = self.services.timeline.get_pdu(event_id).await?;
let mut replace_events = Vec::with_capacity(relations.len());
let mut reference_events = Vec::with_capacity(relations.len());
for relation in &relations {
let pdu = &relation.1;
let content = pdu.get_content_as_value();
if let Some(relates_to) = content.get("m.relates_to") {
// We don't check that the event relates back, because we assume the database is
// good.
if let Some(rel_type) = relates_to.get("rel_type") {
match rel_type.as_str() {
| Some("m.replace") => {
// Only consider valid replacements
if Self::is_valid_replacement_event(&original_event, pdu).await? {
replace_events.push(relation);
}
},
| Some("m.reference") => {
reference_events.push(relation);
},
| _ => {
// Ignore other relation types for now
// Threads are in the database but not handled here
// Other types are not specified AFAICT.
},
}
}
}
}
// If no relations to bundle, return None
if replace_events.is_empty() && reference_events.is_empty() {
return Ok(None);
}
let mut bundled = BundledMessageLikeRelations::new();
// Handle m.replace relations - find the most recent one
if !replace_events.is_empty() {
let most_recent_replacement = Self::find_most_recent_replacement(&replace_events)?;
// Convert the replacement event to the bundled format
if let Some(replacement_pdu) = most_recent_replacement {
// According to the Matrix spec, we should include the full event as raw JSON
let replacement_json = serde_json::to_string(replacement_pdu)
.map_err(|e| err!(Database("Failed to serialize replacement event: {e}")))?;
let raw_value = serde_json::value::RawValue::from_string(replacement_json)
.map_err(|e| err!(Database("Failed to create RawValue: {e}")))?;
bundled.replace = Some(Box::new(raw_value));
}
}
// Handle m.reference relations - collect event IDs
if !reference_events.is_empty() {
let reference_chunk = Self::build_reference_chunk(&reference_events)?;
if !reference_chunk.is_empty() {
bundled.reference = Some(Box::new(ReferenceChunk::new(reference_chunk)));
}
}
// TODO: Handle other relation types (m.annotation, etc.) when specified
Ok(Some(bundled))
}
/// Build reference chunk for m.reference bundled aggregations
fn build_reference_chunk(
reference_events: &[&PdusIterItem],
) -> Result<Vec<BundledReference>> {
let mut chunk = Vec::with_capacity(reference_events.len());
for relation in reference_events {
let pdu = &relation.1;
let reference_entry = BundledReference::new(pdu.event_id().to_owned());
chunk.push(reference_entry);
}
// Don't sort, order is unspecified
Ok(chunk)
}
/// Find the most recent replacement event based on origin_server_ts and
/// lexicographic event_id ordering
fn find_most_recent_replacement<'a>(
replacement_events: &'a [&'a PdusIterItem],
) -> Result<Option<&'a PduEvent>> {
if replacement_events.is_empty() {
return Ok(None);
}
let mut most_recent: Option<&PduEvent> = None;
// Jank, is there a better way to do this?
for relation in replacement_events {
let pdu = &relation.1;
match most_recent {
| None => {
most_recent = Some(pdu);
},
| Some(current_most_recent) => {
// Compare by origin_server_ts first
match pdu
.origin_server_ts()
.cmp(&current_most_recent.origin_server_ts())
{
| std::cmp::Ordering::Greater => {
most_recent = Some(pdu);
},
| std::cmp::Ordering::Equal => {
// If timestamps are equal, use lexicographic ordering of event_id
if pdu.event_id() > current_most_recent.event_id() {
most_recent = Some(pdu);
}
},
| std::cmp::Ordering::Less => {
// Keep current most recent
},
}
},
}
}
Ok(most_recent)
}
/// Adds bundled aggregations to a PDU's unsigned field
#[tracing::instrument(skip(self, pdu), level = "debug")]
pub async fn add_bundled_aggregations_to_pdu(
&self,
user_id: &UserId,
pdu: &mut PduEvent,
) -> Result<()> {
if pdu.is_redacted() {
return Ok(());
}
let bundled_aggregations = self
.get_bundled_aggregations(user_id, pdu.room_id(), pdu.event_id())
.await?;
if let Some(aggregations) = bundled_aggregations {
let aggregations_json = serde_json::to_value(aggregations)
.map_err(|e| err!(Database("Failed to serialize bundled aggregations: {e}")))?;
Self::add_bundled_aggregations_to_unsigned(pdu, aggregations_json)?;
}
Ok(())
}
/// Helper method to add bundled aggregations to a PDU's unsigned
/// field
fn add_bundled_aggregations_to_unsigned(
pdu: &mut PduEvent,
aggregations_json: serde_json::Value,
) -> Result<()> {
use serde_json::{
Map, Value as JsonValue,
value::{RawValue as RawJsonValue, to_raw_value},
};
let mut unsigned: Map<String, JsonValue> = pdu
.unsigned
.as_deref()
.map(RawJsonValue::get)
.map_or_else(|| Ok(Map::new()), serde_json::from_str)
.map_err(|e| err!(Database("Invalid unsigned in pdu event: {e}")))?;
let relations = unsigned
.entry("m.relations")
.or_insert_with(|| JsonValue::Object(Map::new()))
.as_object_mut()
.ok_or_else(|| err!(Database("m.relations is not an object")))?;
if let JsonValue::Object(aggregations_map) = aggregations_json {
for (rel_type, aggregation) in aggregations_map {
relations.insert(rel_type, aggregation);
}
}
pdu.unsigned = Some(to_raw_value(&unsigned)?);
Ok(())
}
/// Validates that an event is acceptable as a replacement for another event
/// See C/S spec "Validity of replacement events"
#[tracing::instrument(level = "debug")]
async fn is_valid_replacement_event(
original_event: &PduEvent,
replacement_event: &PduEvent,
) -> Result<bool> {
// 1. Same room_id
if original_event.room_id() != replacement_event.room_id() {
return Ok(false);
}
// 2. Same sender
if original_event.sender() != replacement_event.sender() {
return Ok(false);
}
// 3. Same type
if original_event.event_type() != replacement_event.event_type() {
return Ok(false);
}
// 4. Neither event should have a state_key property
if original_event.state_key().is_some() || replacement_event.state_key().is_some() {
return Ok(false);
}
// 5. Original event must not have rel_type of m.replace
let original_content = original_event.get_content_as_value();
if let Some(relates_to) = original_content.get("m.relates_to") {
if let Some(rel_type) = relates_to.get("rel_type") {
if rel_type.as_str() == Some("m.replace") {
return Ok(false);
}
}
}
// 6. Replacement event must have m.new_content property
// Skip this check for encrypted events, as m.new_content would be inside the
// encrypted payload
if replacement_event.event_type() != &ruma::events::TimelineEventType::RoomEncrypted {
let replacement_content = replacement_event.get_content_as_value();
if replacement_content.get("m.new_content").is_none() {
return Ok(false);
}
}
Ok(true)
}
}
#[cfg(test)]
mod tests {
use conduwuit_core::pdu::{EventHash, PduEvent};
use ruma::{UInt, events::TimelineEventType, owned_event_id, owned_room_id, owned_user_id};
use serde_json::{Value as JsonValue, json, value::to_raw_value};
fn create_test_pdu(unsigned_content: Option<JsonValue>) -> PduEvent {
PduEvent {
event_id: owned_event_id!("$test:example.com"),
room_id: owned_room_id!("!test:example.com"),
sender: owned_user_id!("@test:example.com"),
origin_server_ts: UInt::try_from(1_234_567_890_u64).unwrap(),
kind: TimelineEventType::RoomMessage,
content: to_raw_value(&json!({"msgtype": "m.text", "body": "test"})).unwrap(),
state_key: None,
prev_events: vec![],
depth: UInt::from(1_u32),
auth_events: vec![],
redacts: None,
unsigned: unsigned_content.map(|content| to_raw_value(&content).unwrap()),
hashes: EventHash { sha256: "test_hash".to_owned() },
signatures: None,
origin: None,
}
}
fn create_bundled_aggregations() -> JsonValue {
json!({
"m.replace": {
"event_id": "$replace:example.com",
"origin_server_ts": 1_234_567_890,
"sender": "@replacer:example.com"
},
"m.reference": {
"count": 5,
"chunk": [
"$ref1:example.com",
"$ref2:example.com"
]
}
})
}
#[test]
fn test_add_bundled_aggregations_to_unsigned_no_existing_unsigned() {
let mut pdu = create_test_pdu(None);
let aggregations = create_bundled_aggregations();
let result = super::super::Service::add_bundled_aggregations_to_unsigned(
&mut pdu,
aggregations.clone(),
);
assert!(result.is_ok(), "Should succeed when no unsigned field exists");
assert!(pdu.unsigned.is_some(), "Unsigned field should be created");
let unsigned_str = pdu.unsigned.as_ref().unwrap().get();
let unsigned: JsonValue = serde_json::from_str(unsigned_str).unwrap();
assert!(unsigned.get("m.relations").is_some(), "m.relations should exist");
assert_eq!(
unsigned["m.relations"], aggregations,
"Relations should match the aggregations"
);
}
#[test]
fn test_add_bundled_aggregations_to_unsigned_overwrite_same_relation_type() {
let existing_unsigned = json!({
"m.relations": {
"m.replace": {
"event_id": "$old_replace:example.com",
"origin_server_ts": 1_111_111_111,
"sender": "@old_replacer:example.com"
}
}
});
let mut pdu = create_test_pdu(Some(existing_unsigned));
let new_aggregations = create_bundled_aggregations();
let result = super::super::Service::add_bundled_aggregations_to_unsigned(
&mut pdu,
new_aggregations.clone(),
);
assert!(result.is_ok(), "Should succeed when overwriting same relation type");
let unsigned_str = pdu.unsigned.as_ref().unwrap().get();
let unsigned: JsonValue = serde_json::from_str(unsigned_str).unwrap();
let relations = &unsigned["m.relations"];
assert_eq!(
relations["m.replace"], new_aggregations["m.replace"],
"m.replace should be updated"
);
assert_eq!(
relations["m.replace"]["event_id"], "$replace:example.com",
"Should have new event_id"
);
assert!(relations.get("m.reference").is_some(), "New m.reference should be added");
}
#[test]
fn test_add_bundled_aggregations_to_unsigned_preserve_other_unsigned_fields() {
// Test case: Other unsigned fields should be preserved
let existing_unsigned = json!({
"age": 98765,
"prev_content": {"msgtype": "m.text", "body": "old message"},
"redacted_because": {"event_id": "$redaction:example.com"},
"m.relations": {
"m.annotation": {"count": 1}
}
});
let mut pdu = create_test_pdu(Some(existing_unsigned));
let new_aggregations = json!({
"m.replace": {"event_id": "$new:example.com"}
});
let result = super::super::Service::add_bundled_aggregations_to_unsigned(
&mut pdu,
new_aggregations,
);
assert!(result.is_ok(), "Should succeed while preserving other fields");
let unsigned_str = pdu.unsigned.as_ref().unwrap().get();
let unsigned: JsonValue = serde_json::from_str(unsigned_str).unwrap();
// Verify all existing fields are preserved
assert_eq!(unsigned["age"], 98765, "age should be preserved");
assert!(unsigned.get("prev_content").is_some(), "prev_content should be preserved");
assert!(
unsigned.get("redacted_because").is_some(),
"redacted_because should be preserved"
);
// Verify relations were merged correctly
let relations = &unsigned["m.relations"];
assert!(
relations.get("m.annotation").is_some(),
"Existing m.annotation should be preserved"
);
assert!(relations.get("m.replace").is_some(), "New m.replace should be added");
}
#[test]
fn test_add_bundled_aggregations_to_unsigned_invalid_existing_unsigned() {
// Test case: Invalid JSON in existing unsigned should result in error
let mut pdu = create_test_pdu(None);
// Manually set invalid unsigned data
pdu.unsigned = Some(to_raw_value(&"invalid json").unwrap());
let aggregations = create_bundled_aggregations();
let result =
super::super::Service::add_bundled_aggregations_to_unsigned(&mut pdu, aggregations);
assert!(result.is_err(), "fails when existing unsigned is invalid");
// Should we ignore the error and overwrite anyway?
}
// Test helper function to create test PDU events
fn create_test_event(
event_id: &str,
room_id: &str,
sender: &str,
event_type: TimelineEventType,
content: &JsonValue,
state_key: Option<&str>,
) -> PduEvent {
PduEvent {
event_id: event_id.try_into().unwrap(),
room_id: room_id.try_into().unwrap(),
sender: sender.try_into().unwrap(),
origin_server_ts: UInt::try_from(1_234_567_890_u64).unwrap(),
kind: event_type,
content: to_raw_value(&content).unwrap(),
state_key: state_key.map(Into::into),
prev_events: vec![],
depth: UInt::from(1_u32),
auth_events: vec![],
redacts: None,
unsigned: None,
hashes: EventHash { sha256: "test_hash".to_owned() },
signatures: None,
origin: None,
}
}
/// Test that a valid replacement event passes validation
#[tokio::test]
async fn test_valid_replacement_event() {
let original = create_test_event(
"$original:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomMessage,
&json!({"msgtype": "m.text", "body": "original message"}),
None,
);
let replacement = create_test_event(
"$replacement:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomMessage,
&json!({
"msgtype": "m.text",
"body": "* edited message",
"m.new_content": {
"msgtype": "m.text",
"body": "edited message"
},
"m.relates_to": {
"rel_type": "m.replace",
"event_id": "$original:example.com"
}
}),
None,
);
let result =
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
assert!(result.is_ok(), "Validation should succeed");
assert!(result.unwrap(), "Valid replacement event should be accepted");
}
/// Test replacement event with different room ID is rejected
#[tokio::test]
async fn test_replacement_event_different_room() {
let original = create_test_event(
"$original:example.com",
"!room1:example.com",
"@user:example.com",
TimelineEventType::RoomMessage,
&json!({"msgtype": "m.text", "body": "original message"}),
None,
);
let replacement = create_test_event(
"$replacement:example.com",
"!room2:example.com", // Different room
"@user:example.com",
TimelineEventType::RoomMessage,
&json!({
"msgtype": "m.text",
"body": "* edited message",
"m.new_content": {
"msgtype": "m.text",
"body": "edited message"
}
}),
None,
);
let result =
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
assert!(result.is_ok(), "Validation should succeed");
assert!(!result.unwrap(), "Different room ID should be rejected");
}
/// Test replacement event with different sender is rejected
#[tokio::test]
async fn test_replacement_event_different_sender() {
let original = create_test_event(
"$original:example.com",
"!room:example.com",
"@user1:example.com",
TimelineEventType::RoomMessage,
&json!({"msgtype": "m.text", "body": "original message"}),
None,
);
let replacement = create_test_event(
"$replacement:example.com",
"!room:example.com",
"@user2:example.com", // Different sender
TimelineEventType::RoomMessage,
&json!({
"msgtype": "m.text",
"body": "* edited message",
"m.new_content": {
"msgtype": "m.text",
"body": "edited message"
}
}),
None,
);
let result =
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
assert!(result.is_ok(), "Validation should succeed");
assert!(!result.unwrap(), "Different sender should be rejected");
}
/// Test replacement event with different type is rejected
#[tokio::test]
async fn test_replacement_event_different_type() {
let original = create_test_event(
"$original:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomMessage,
&json!({"msgtype": "m.text", "body": "original message"}),
None,
);
let replacement = create_test_event(
"$replacement:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomTopic, // Different event type
&json!({
"topic": "new topic",
"m.new_content": {
"topic": "new topic"
}
}),
None,
);
let result =
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
assert!(result.is_ok(), "Validation should succeed");
assert!(!result.unwrap(), "Different event type should be rejected");
}
/// Test replacement event with state key is rejected
#[tokio::test]
async fn test_replacement_event_with_state_key() {
let original = create_test_event(
"$original:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomName,
&json!({"name": "room name"}),
Some(""), // Has state key
);
let replacement = create_test_event(
"$replacement:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomName,
&json!({
"name": "new room name",
"m.new_content": {
"name": "new room name"
}
}),
None,
);
let result =
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
assert!(result.is_ok(), "Validation should succeed");
assert!(!result.unwrap(), "Event with state key should be rejected");
}
/// Test replacement of an event that is already a replacement is rejected
#[tokio::test]
async fn test_replacement_event_original_is_replacement() {
let original = create_test_event(
"$original:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomMessage,
&json!({
"msgtype": "m.text",
"body": "* edited message",
"m.relates_to": {
"rel_type": "m.replace", // Original is already a replacement
"event_id": "$some_other:example.com"
}
}),
None,
);
let replacement = create_test_event(
"$replacement:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomMessage,
&json!({
"msgtype": "m.text",
"body": "* edited again",
"m.new_content": {
"msgtype": "m.text",
"body": "edited again"
}
}),
None,
);
let result =
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
assert!(result.is_ok(), "Validation should succeed");
assert!(!result.unwrap(), "Replacement of replacement should be rejected");
}
/// Test replacement event missing m.new_content is rejected
#[tokio::test]
async fn test_replacement_event_missing_new_content() {
let original = create_test_event(
"$original:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomMessage,
&json!({"msgtype": "m.text", "body": "original message"}),
None,
);
let replacement = create_test_event(
"$replacement:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomMessage,
&json!({
"msgtype": "m.text",
"body": "* edited message"
// Missing m.new_content
}),
None,
);
let result =
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
assert!(result.is_ok(), "Validation should succeed");
assert!(!result.unwrap(), "Missing m.new_content should be rejected");
}
/// Test encrypted replacement event without m.new_content is accepted
#[tokio::test]
async fn test_replacement_event_encrypted_missing_new_content_is_valid() {
let original = create_test_event(
"$original:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomEncrypted,
&json!({
"algorithm": "m.megolm.v1.aes-sha2",
"ciphertext": "encrypted_payload_base64",
"sender_key": "sender_key",
"session_id": "session_id"
}),
None,
);
let replacement = create_test_event(
"$replacement:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomEncrypted,
&json!({
"algorithm": "m.megolm.v1.aes-sha2",
"ciphertext": "encrypted_replacement_payload_base64",
"sender_key": "sender_key",
"session_id": "session_id",
"m.relates_to": {
"rel_type": "m.replace",
"event_id": "$original:example.com"
}
// No m.new_content in cleartext - this is valid for encrypted events
}),
None,
);
let result =
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
assert!(result.is_ok(), "Validation should succeed");
assert!(
result.unwrap(),
"Encrypted replacement without cleartext m.new_content should be accepted"
);
}
}

View file

@ -3,7 +3,6 @@ use std::{mem::size_of, sync::Arc};
use conduwuit::{
PduCount, PduEvent,
arrayvec::ArrayVec,
result::LogErr,
utils::{
ReadyExt,
stream::{TryIgnore, WidebandExt},
@ -80,9 +79,7 @@ impl Data {
let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?;
if pdu.sender != user_id {
pdu.remove_transaction_id().log_err().ok();
}
pdu.set_unsigned(Some(user_id));
Some((shorteventid, pdu))
})

View file

@ -1,3 +1,4 @@
mod bundled_aggregations;
mod data;
use std::sync::Arc;

View file

@ -127,7 +127,12 @@ pub async fn search_pdus<'a>(
.then_some(pdu)
})
.skip(query.skip)
.take(query.limit);
.take(query.limit)
.map(move |mut pdu| {
pdu.set_unsigned(query.user_id);
// TODO: bundled aggregation
pdu
});
Ok((count, pdus))
}

View file

@ -384,6 +384,7 @@ impl Service {
pub fn get_forward_extremities<'a>(
&'a self,
room_id: &'a RoomId,
_state_lock: &'a RoomMutexGuard,
) -> impl Stream<Item = &EventId> + Send + '_ {
let prefix = (room_id, Interfix);

View file

@ -160,9 +160,7 @@ impl Service {
let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?;
let pdu_id: PduId = pdu_id.into();
if pdu.sender != user_id {
pdu.remove_transaction_id().ok();
}
pdu.set_unsigned(Some(user_id));
Some((pdu_id.shorteventid, pdu))
});

View file

@ -1,14 +1,11 @@
use std::{borrow::Borrow, sync::Arc};
use std::sync::Arc;
use conduwuit::{
Err, PduCount, PduEvent, Result, at, err,
result::{LogErr, NotFound},
utils,
utils::stream::TryReadyExt,
Err, PduCount, PduEvent, Result, at, err, result::NotFound, utils, utils::stream::TryReadyExt,
};
use database::{Database, Deserialized, Json, KeyVal, Map};
use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, future::select_ok, pin_mut};
use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, UserId, api::Direction};
use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, api::Direction};
use super::{PduId, RawPduId};
use crate::{Dep, rooms, rooms::short::ShortRoomId};
@ -46,12 +43,8 @@ impl Data {
}
#[inline]
pub(super) async fn last_timeline_count(
&self,
sender_user: Option<&UserId>,
room_id: &RoomId,
) -> Result<PduCount> {
let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max());
pub(super) async fn last_timeline_count(&self, room_id: &RoomId) -> Result<PduCount> {
let pdus_rev = self.pdus_rev(room_id, PduCount::max());
pin_mut!(pdus_rev);
let last_count = pdus_rev
@ -65,12 +58,8 @@ impl Data {
}
#[inline]
pub(super) async fn latest_pdu_in_room(
&self,
sender_user: Option<&UserId>,
room_id: &RoomId,
) -> Result<PduEvent> {
let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max());
pub(super) async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result<PduEvent> {
let pdus_rev = self.pdus_rev(room_id, PduCount::max());
pin_mut!(pdus_rev);
pdus_rev
@ -223,7 +212,6 @@ impl Data {
/// order.
pub(super) fn pdus_rev<'a>(
&'a self,
user_id: Option<&'a UserId>,
room_id: &'a RoomId,
until: PduCount,
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
@ -233,14 +221,13 @@ impl Data {
self.pduid_pdu
.rev_raw_stream_from(&current)
.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
.ready_and_then(move |item| Self::each_pdu(item, user_id))
.ready_and_then(Self::from_json_slice)
})
.try_flatten_stream()
}
pub(super) fn pdus<'a>(
&'a self,
user_id: Option<&'a UserId>,
room_id: &'a RoomId,
from: PduCount,
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
@ -250,21 +237,15 @@ impl Data {
self.pduid_pdu
.raw_stream_from(&current)
.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
.ready_and_then(move |item| Self::each_pdu(item, user_id))
.ready_and_then(Self::from_json_slice)
})
.try_flatten_stream()
}
fn each_pdu((pdu_id, pdu): KeyVal<'_>, user_id: Option<&UserId>) -> Result<PdusIterItem> {
fn from_json_slice((pdu_id, pdu): KeyVal<'_>) -> Result<PdusIterItem> {
let pdu_id: RawPduId = pdu_id.into();
let mut pdu = serde_json::from_slice::<PduEvent>(pdu)?;
if Some(pdu.sender.borrow()) != user_id {
pdu.remove_transaction_id().log_err().ok();
}
pdu.add_age().log_err().ok();
let pdu = serde_json::from_slice::<PduEvent>(pdu)?;
Ok((pdu_id.pdu_count(), pdu))
}

View file

@ -165,7 +165,7 @@ impl Service {
#[tracing::instrument(skip(self), level = "debug")]
pub async fn first_item_in_room(&self, room_id: &RoomId) -> Result<(PduCount, PduEvent)> {
let pdus = self.pdus(None, room_id, None);
let pdus = self.pdus(room_id, None);
pin_mut!(pdus);
pdus.try_next()
@ -175,16 +175,12 @@ impl Service {
#[tracing::instrument(skip(self), level = "debug")]
pub async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result<PduEvent> {
self.db.latest_pdu_in_room(None, room_id).await
self.db.latest_pdu_in_room(room_id).await
}
#[tracing::instrument(skip(self), level = "debug")]
pub async fn last_timeline_count(
&self,
sender_user: Option<&UserId>,
room_id: &RoomId,
) -> Result<PduCount> {
self.db.last_timeline_count(sender_user, room_id).await
pub async fn last_timeline_count(&self, room_id: &RoomId) -> Result<PduCount> {
self.db.last_timeline_count(room_id).await
}
/// Returns the `count` of this pdu's id.
@ -536,15 +532,21 @@ impl Service {
self.services.search.index_pdu(shortroomid, &pdu_id, &body);
if self.services.admin.is_admin_command(pdu, &body).await {
self.services
.admin
.command(body, Some((*pdu.event_id).into()))?;
self.services.admin.command_with_sender(
body,
Some((*pdu.event_id).into()),
pdu.sender.clone().into(),
)?;
}
}
},
| _ => {},
}
// CONCERN: If we receive events with a relation out-of-order, we never write
// their relation / thread. We need some kind of way to trigger when we receive
// this event, and potentially a way to rebuild the table entirely.
if let Ok(content) = pdu.get_content::<ExtractRelatesToEventId>() {
if let Ok(related_pducount) = self.get_pdu_count(&content.relates_to.event_id).await {
self.services
@ -654,7 +656,7 @@ impl Service {
let prev_events: Vec<OwnedEventId> = self
.services
.state
.get_forward_extremities(room_id)
.get_forward_extremities(room_id, _mutex_lock)
.take(20)
.map(Into::into)
.collect()
@ -698,6 +700,20 @@ impl Service {
.await
.saturating_add(uint!(1));
if state_key.is_none() {
if prev_events.is_empty() {
warn!("Timeline event had zero prev_events, something broke.");
return Err!(Request(Unknown("Timeline event had zero prev_events.")));
}
if depth.le(&uint!(2)) {
warn!(
"Had unsafe depth of {depth} in {room_id} when creating non-state event. \
Bad!"
);
return Err!(Request(Unknown("Unsafe depth for non-state event.")));
}
};
let mut unsigned = unsigned.unwrap_or_default();
if let Some(state_key) = &state_key {
@ -769,6 +785,7 @@ impl Service {
ready(auth_events.get(&key))
};
debug!("running auth check on new {} event by {} in {}", pdu.kind, pdu.sender, pdu.room_id);
let auth_check = state_res::auth_check(
&room_version,
&pdu,
@ -1008,34 +1025,30 @@ impl Service {
#[inline]
pub fn all_pdus<'a>(
&'a self,
user_id: &'a UserId,
room_id: &'a RoomId,
) -> impl Stream<Item = PdusIterItem> + Send + 'a {
self.pdus(Some(user_id), room_id, None).ignore_err()
self.pdus(room_id, None).ignore_err()
}
/// Reverse iteration starting at from.
#[tracing::instrument(skip(self), level = "debug")]
pub fn pdus_rev<'a>(
&'a self,
user_id: Option<&'a UserId>,
room_id: &'a RoomId,
until: Option<PduCount>,
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
self.db
.pdus_rev(user_id, room_id, until.unwrap_or_else(PduCount::max))
.pdus_rev(room_id, until.unwrap_or_else(PduCount::max))
}
/// Forward iteration starting at from.
#[tracing::instrument(skip(self), level = "debug")]
pub fn pdus<'a>(
&'a self,
user_id: Option<&'a UserId>,
room_id: &'a RoomId,
from: Option<PduCount>,
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
self.db
.pdus(user_id, room_id, from.unwrap_or_else(PduCount::min))
self.db.pdus(room_id, from.unwrap_or_else(PduCount::min))
}
/// Replace a PDU with the redacted form.
@ -1154,7 +1167,7 @@ impl Service {
.boxed();
while let Some(ref backfill_server) = servers.next().await {
info!("Asking {backfill_server} for backfill");
info!("Asking {backfill_server} for backfill in {:?}", room_id.to_owned());
let response = self
.services
.sending
@ -1182,7 +1195,7 @@ impl Service {
}
}
info!("No servers could backfill, but backfill was needed in room {room_id}");
warn!("No servers could backfill, but backfill was needed in room {room_id}");
Ok(())
}

View file

@ -401,16 +401,10 @@ impl Service {
fn num_senders(args: &crate::Args<'_>) -> usize {
const MIN_SENDERS: usize = 1;
// Limit the number of senders to the number of workers threads or number of
// cores, conservatively.
let max_senders = args
.server
.metrics
.num_workers()
.min(available_parallelism());
// Limit the maximum number of senders to the number of cores.
let max_senders = available_parallelism();
// If the user doesn't override the default 0, this is intended to then default
// to 1 for now as multiple senders is experimental.
// default is 4 senders. clamp between 1 and core count.
args.server
.config
.sender_workers

View file

@ -781,7 +781,7 @@ impl Service {
for pdu in pdus {
// Redacted events are not notification targets (we don't send push for them)
if pdu.contains_unsigned_property("redacted_because", serde_json::Value::is_string) {
if pdu.is_redacted() {
continue;
}

View file

@ -16,10 +16,21 @@ use ruma::{
},
serde::Raw,
};
use serde::{Deserialize, Serialize};
use serde_json::json;
use crate::{Dep, account_data, admin, globals, rooms};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UserSuspension {
/// Whether the user is currently suspended
pub suspended: bool,
/// When the user was suspended (Unix timestamp in milliseconds)
pub suspended_at: u64,
/// User ID of who suspended this user
pub suspended_by: String,
}
pub struct Service {
services: Services,
db: Data,
@ -52,6 +63,7 @@ struct Data {
userid_lastonetimekeyupdate: Arc<Map>,
userid_masterkeyid: Arc<Map>,
userid_password: Arc<Map>,
userid_suspension: Arc<Map>,
userid_selfsigningkeyid: Arc<Map>,
userid_usersigningkeyid: Arc<Map>,
useridprofilekey_value: Arc<Map>,
@ -87,6 +99,7 @@ impl crate::Service for Service {
userid_lastonetimekeyupdate: args.db["userid_lastonetimekeyupdate"].clone(),
userid_masterkeyid: args.db["userid_masterkeyid"].clone(),
userid_password: args.db["userid_password"].clone(),
userid_suspension: args.db["userid_suspension"].clone(),
userid_selfsigningkeyid: args.db["userid_selfsigningkeyid"].clone(),
userid_usersigningkeyid: args.db["userid_usersigningkeyid"].clone(),
useridprofilekey_value: args.db["useridprofilekey_value"].clone(),
@ -143,6 +156,23 @@ impl Service {
Ok(())
}
/// Suspend account, placing it in a read-only state
pub async fn suspend_account(&self, user_id: &UserId, suspending_user: &UserId) {
self.db.userid_suspension.raw_put(
user_id,
Json(UserSuspension {
suspended: true,
suspended_at: MilliSecondsSinceUnixEpoch::now().get().into(),
suspended_by: suspending_user.to_string(),
}),
);
}
/// Unsuspend account, placing it in a read-write state
pub async fn unsuspend_account(&self, user_id: &UserId) {
self.db.userid_suspension.remove(user_id);
}
/// Check if a user has an account on this homeserver.
#[inline]
pub async fn exists(&self, user_id: &UserId) -> bool {
@ -159,6 +189,25 @@ impl Service {
.await
}
/// Check if account is suspended
pub async fn is_suspended(&self, user_id: &UserId) -> Result<bool> {
match self
.db
.userid_suspension
.get(user_id)
.await
.deserialized::<UserSuspension>()
{
| Ok(s) => Ok(s.suspended),
| Err(e) =>
if e.is_not_found() {
Ok(false)
} else {
Err(e)
},
}
}
/// Check if account is active, infallible
pub async fn is_active(&self, user_id: &UserId) -> bool {
!self.is_deactivated(user_id).await.unwrap_or(true)