From 490a5d087e754474d44b6563508eeead11d6d289 Mon Sep 17 00:00:00 2001 From: Jade Ellis Date: Tue, 3 Jun 2025 21:31:02 +0100 Subject: [PATCH 01/28] refactor: Promote handling unsigned data out of timeline Also fixes: - Transaction IDs leaking in event route - Age not being set for event relations or threads - Both of the above for search results Notes down concern with relations table --- src/admin/query/room_timeline.rs | 4 +-- src/api/client/context.rs | 16 ++++++++-- src/api/client/message.rs | 9 ++++-- src/api/client/relations.rs | 4 +++ src/api/client/room/event.rs | 2 +- src/api/client/room/initial_sync.rs | 9 +++++- src/api/client/sync/mod.rs | 13 ++++---- src/api/client/sync/v3.rs | 2 +- src/api/client/threads.rs | 3 ++ src/api/server/backfill.rs | 12 +++++++- src/core/matrix/pdu/unsigned.rs | 17 +++++++++-- src/service/rooms/pdu_metadata/data.rs | 5 +--- src/service/rooms/search/mod.rs | 7 ++++- src/service/rooms/threads/mod.rs | 4 +-- src/service/rooms/timeline/data.rs | 41 +++++++------------------- src/service/rooms/timeline/mod.rs | 26 +++++++--------- src/service/sending/sender.rs | 2 +- 17 files changed, 104 insertions(+), 72 deletions(-) diff --git a/src/admin/query/room_timeline.rs b/src/admin/query/room_timeline.rs index 0fd22ca7..58f75cb9 100644 --- a/src/admin/query/room_timeline.rs +++ b/src/admin/query/room_timeline.rs @@ -31,7 +31,7 @@ pub(super) async fn last(&self, room_id: OwnedRoomOrAliasId) -> Result { .services .rooms .timeline - .last_timeline_count(None, &room_id) + .last_timeline_count(&room_id) .await?; self.write_str(&format!("{result:#?}")).await @@ -52,7 +52,7 @@ pub(super) async fn pdus( .services .rooms .timeline - .pdus_rev(None, &room_id, from) + .pdus_rev(&room_id, from) .try_take(limit.unwrap_or(3)) .try_collect() .await?; diff --git a/src/api/client/context.rs b/src/api/client/context.rs index dbc2a22f..8c9ba64c 100644 --- a/src/api/client/context.rs +++ b/src/api/client/context.rs @@ -84,11 +84,18 @@ pub(crate) async fn get_context_route( let base_event = ignored_filter(&services, (base_count, base_pdu), sender_user); + // PDUs are used to get seen user IDs and then returned in response. + let events_before = services .rooms .timeline - .pdus_rev(Some(sender_user), room_id, Some(base_count)) + .pdus_rev(room_id, Some(base_count)) .ignore_err() + .then(async |mut pdu| { + pdu.1.set_unsigned(Some(sender_user)); + // TODO: bundled aggregations + pdu + }) .ready_filter_map(|item| event_filter(item, filter)) .wide_filter_map(|item| ignored_filter(&services, item, sender_user)) .wide_filter_map(|item| visibility_filter(&services, item, sender_user)) @@ -98,8 +105,13 @@ pub(crate) async fn get_context_route( let events_after = services .rooms .timeline - .pdus(Some(sender_user), room_id, Some(base_count)) + .pdus(room_id, Some(base_count)) .ignore_err() + .then(async |mut pdu| { + pdu.1.set_unsigned(Some(sender_user)); + // TODO: bundled aggregations + pdu + }) .ready_filter_map(|item| event_filter(item, filter)) .wide_filter_map(|item| ignored_filter(&services, item, sender_user)) .wide_filter_map(|item| visibility_filter(&services, item, sender_user)) diff --git a/src/api/client/message.rs b/src/api/client/message.rs index e442850b..cd6b0a60 100644 --- a/src/api/client/message.rs +++ b/src/api/client/message.rs @@ -114,14 +114,14 @@ pub(crate) async fn get_message_events_route( | Direction::Forward => services .rooms .timeline - .pdus(Some(sender_user), room_id, Some(from)) + .pdus(room_id, Some(from)) .ignore_err() .boxed(), | Direction::Backward => services .rooms .timeline - .pdus_rev(Some(sender_user), room_id, Some(from)) + .pdus_rev(room_id, Some(from)) .ignore_err() .boxed(), }; @@ -132,6 +132,11 @@ pub(crate) async fn get_message_events_route( .wide_filter_map(|item| ignored_filter(&services, item, sender_user)) .wide_filter_map(|item| visibility_filter(&services, item, sender_user)) .take(limit) + .then(async |mut pdu| { + pdu.1.set_unsigned(Some(sender_user)); + // TODO: bundled aggregations + pdu + }) .collect() .await; diff --git a/src/api/client/relations.rs b/src/api/client/relations.rs index b8c2dd4d..5e323756 100644 --- a/src/api/client/relations.rs +++ b/src/api/client/relations.rs @@ -172,6 +172,10 @@ async fn paginate_relations_with_filter( }) } +// TODO: Can we move the visibility filter lower down, to avoid checking events +// that won't be sent? At the moment this also results in getting events that +// appear to have no relation because intermediaries are not visible to the +// user. async fn visibility_filter( services: &Services, sender_user: &UserId, diff --git a/src/api/client/room/event.rs b/src/api/client/room/event.rs index 2b115b5c..ec673d75 100644 --- a/src/api/client/room/event.rs +++ b/src/api/client/room/event.rs @@ -38,7 +38,7 @@ pub(crate) async fn get_room_event_route( "Fetched PDU must match requested" ); - event.add_age().ok(); + event.set_unsigned(body.sender_user.as_deref()); Ok(get_room_event::v3::Response { event: event.into_room_event() }) } diff --git a/src/api/client/room/initial_sync.rs b/src/api/client/room/initial_sync.rs index ca63610b..765d2a39 100644 --- a/src/api/client/room/initial_sync.rs +++ b/src/api/client/room/initial_sync.rs @@ -25,12 +25,19 @@ pub(crate) async fn room_initial_sync_route( return Err!(Request(Forbidden("No room preview available."))); } + // Events are returned in body + let limit = LIMIT_MAX; let events: Vec<_> = services .rooms .timeline - .pdus_rev(None, room_id, None) + .pdus_rev(room_id, None) .try_take(limit) + .and_then(async |mut pdu| { + pdu.1.set_unsigned(body.sender_user.as_deref()); + // TODO: bundled aggregations + Ok(pdu) + }) .try_collect() .await?; diff --git a/src/api/client/sync/mod.rs b/src/api/client/sync/mod.rs index 40370160..db581bd6 100644 --- a/src/api/client/sync/mod.rs +++ b/src/api/client/sync/mod.rs @@ -31,11 +31,7 @@ async fn load_timeline( next_batch: Option, limit: usize, ) -> Result<(Vec<(PduCount, PduEvent)>, bool), Error> { - let last_timeline_count = services - .rooms - .timeline - .last_timeline_count(Some(sender_user), room_id) - .await?; + let last_timeline_count = services.rooms.timeline.last_timeline_count(room_id).await?; if last_timeline_count <= roomsincecount { return Ok((Vec::new(), false)); @@ -44,8 +40,13 @@ async fn load_timeline( let non_timeline_pdus = services .rooms .timeline - .pdus_rev(Some(sender_user), room_id, None) + .pdus_rev(room_id, None) .ignore_err() + .map(move |mut pdu| { + pdu.1.set_unsigned(Some(sender_user)); + // TODO: bundled aggregations + pdu + }) .ready_skip_while(|&(pducount, _)| pducount > next_batch.unwrap_or_else(PduCount::max)) .ready_take_while(|&(pducount, _)| pducount > roomsincecount); diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index 8eac6b66..da82a61c 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -1191,7 +1191,7 @@ async fn calculate_heroes( services .rooms .timeline - .all_pdus(sender_user, room_id) + .all_pdus(room_id) .ready_filter(|(_, pdu)| pdu.kind == RoomMember) .fold_default(|heroes: Vec<_>, (_, pdu)| { fold_hero(heroes, services, room_id, sender_user, pdu) diff --git a/src/api/client/threads.rs b/src/api/client/threads.rs index 5b838bef..404da4f8 100644 --- a/src/api/client/threads.rs +++ b/src/api/client/threads.rs @@ -28,6 +28,9 @@ pub(crate) async fn get_threads_route( .transpose()? .unwrap_or_else(PduCount::max); + // TODO: bundled aggregation + // TODO: user_can_see_event and set_unsigned should be at the same level / + // function, so unsigned is only set for seen events. let threads: Vec<(PduCount, PduEvent)> = services .rooms .threads diff --git a/src/api/server/backfill.rs b/src/api/server/backfill.rs index 3cfbcedc..058fc273 100644 --- a/src/api/server/backfill.rs +++ b/src/api/server/backfill.rs @@ -3,6 +3,7 @@ use std::cmp; use axum::extract::State; use conduwuit::{ PduCount, Result, + result::LogErr, utils::{IterStream, ReadyExt, stream::TryTools}, }; use futures::{FutureExt, StreamExt, TryStreamExt}; @@ -62,7 +63,7 @@ pub(crate) async fn get_backfill_route( pdus: services .rooms .timeline - .pdus_rev(None, &body.room_id, Some(from.saturating_add(1))) + .pdus_rev(&body.room_id, Some(from.saturating_add(1))) .try_take(limit) .try_filter_map(|(_, pdu)| async move { Ok(services @@ -72,6 +73,15 @@ pub(crate) async fn get_backfill_route( .await .then_some(pdu)) }) + .and_then(async |mut pdu| { + // Strip the transaction ID, as that is private + pdu.remove_transaction_id().log_err().ok(); + // Add age, as this is specified + pdu.add_age().log_err().ok(); + // It's not clear if we should strip or add any more data, leave as is. + // In particular: Redaction? + Ok(pdu) + }) .try_filter_map(|pdu| async move { Ok(services .rooms diff --git a/src/core/matrix/pdu/unsigned.rs b/src/core/matrix/pdu/unsigned.rs index 23897519..2726a292 100644 --- a/src/core/matrix/pdu/unsigned.rs +++ b/src/core/matrix/pdu/unsigned.rs @@ -1,11 +1,24 @@ -use std::collections::BTreeMap; +use std::{borrow::Borrow, collections::BTreeMap}; use ruma::MilliSecondsSinceUnixEpoch; use serde::Deserialize; use serde_json::value::{RawValue as RawJsonValue, Value as JsonValue, to_raw_value}; use super::Pdu; -use crate::{Result, err, implement, is_true}; +use crate::{Result, err, implement, is_true, result::LogErr}; + +/// Set the `unsigned` field of the PDU using only information in the PDU. +/// Some unsigned data is already set within the database (eg. prev events, +/// threads). Once this is done, other data must be calculated from the database +/// (eg. relations) This is for server-to-client events. +/// Backfill handles this itself. +#[implement(Pdu)] +pub fn set_unsigned(&mut self, user_id: Option<&ruma::UserId>) { + if Some(self.sender.borrow()) != user_id { + self.remove_transaction_id().log_err().ok(); + } + self.add_age().log_err().ok(); +} #[implement(Pdu)] pub fn remove_transaction_id(&mut self) -> Result { diff --git a/src/service/rooms/pdu_metadata/data.rs b/src/service/rooms/pdu_metadata/data.rs index f0beab5a..c4b37b99 100644 --- a/src/service/rooms/pdu_metadata/data.rs +++ b/src/service/rooms/pdu_metadata/data.rs @@ -3,7 +3,6 @@ use std::{mem::size_of, sync::Arc}; use conduwuit::{ PduCount, PduEvent, arrayvec::ArrayVec, - result::LogErr, utils::{ ReadyExt, stream::{TryIgnore, WidebandExt}, @@ -80,9 +79,7 @@ impl Data { let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?; - if pdu.sender != user_id { - pdu.remove_transaction_id().log_err().ok(); - } + pdu.set_unsigned(Some(user_id)); Some((shorteventid, pdu)) }) diff --git a/src/service/rooms/search/mod.rs b/src/service/rooms/search/mod.rs index 4100dd75..7cef5dbf 100644 --- a/src/service/rooms/search/mod.rs +++ b/src/service/rooms/search/mod.rs @@ -127,7 +127,12 @@ pub async fn search_pdus<'a>( .then_some(pdu) }) .skip(query.skip) - .take(query.limit); + .take(query.limit) + .map(move |mut pdu| { + pdu.set_unsigned(query.user_id); + // TODO: bundled aggregation + pdu + }); Ok((count, pdus)) } diff --git a/src/service/rooms/threads/mod.rs b/src/service/rooms/threads/mod.rs index a680df55..a24183e6 100644 --- a/src/service/rooms/threads/mod.rs +++ b/src/service/rooms/threads/mod.rs @@ -160,9 +160,7 @@ impl Service { let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?; let pdu_id: PduId = pdu_id.into(); - if pdu.sender != user_id { - pdu.remove_transaction_id().ok(); - } + pdu.set_unsigned(Some(user_id)); Some((pdu_id.shorteventid, pdu)) }); diff --git a/src/service/rooms/timeline/data.rs b/src/service/rooms/timeline/data.rs index 94c78bb0..5f7b8c81 100644 --- a/src/service/rooms/timeline/data.rs +++ b/src/service/rooms/timeline/data.rs @@ -1,14 +1,11 @@ -use std::{borrow::Borrow, sync::Arc}; +use std::sync::Arc; use conduwuit::{ - Err, PduCount, PduEvent, Result, at, err, - result::{LogErr, NotFound}, - utils, - utils::stream::TryReadyExt, + Err, PduCount, PduEvent, Result, at, err, result::NotFound, utils, utils::stream::TryReadyExt, }; use database::{Database, Deserialized, Json, KeyVal, Map}; use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, future::select_ok, pin_mut}; -use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, UserId, api::Direction}; +use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, api::Direction}; use super::{PduId, RawPduId}; use crate::{Dep, rooms, rooms::short::ShortRoomId}; @@ -46,12 +43,8 @@ impl Data { } #[inline] - pub(super) async fn last_timeline_count( - &self, - sender_user: Option<&UserId>, - room_id: &RoomId, - ) -> Result { - let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max()); + pub(super) async fn last_timeline_count(&self, room_id: &RoomId) -> Result { + let pdus_rev = self.pdus_rev(room_id, PduCount::max()); pin_mut!(pdus_rev); let last_count = pdus_rev @@ -65,12 +58,8 @@ impl Data { } #[inline] - pub(super) async fn latest_pdu_in_room( - &self, - sender_user: Option<&UserId>, - room_id: &RoomId, - ) -> Result { - let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max()); + pub(super) async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result { + let pdus_rev = self.pdus_rev(room_id, PduCount::max()); pin_mut!(pdus_rev); pdus_rev @@ -223,7 +212,6 @@ impl Data { /// order. pub(super) fn pdus_rev<'a>( &'a self, - user_id: Option<&'a UserId>, room_id: &'a RoomId, until: PduCount, ) -> impl Stream> + Send + 'a { @@ -233,14 +221,13 @@ impl Data { self.pduid_pdu .rev_raw_stream_from(¤t) .ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix))) - .ready_and_then(move |item| Self::each_pdu(item, user_id)) + .ready_and_then(Self::from_json_slice) }) .try_flatten_stream() } pub(super) fn pdus<'a>( &'a self, - user_id: Option<&'a UserId>, room_id: &'a RoomId, from: PduCount, ) -> impl Stream> + Send + 'a { @@ -250,21 +237,15 @@ impl Data { self.pduid_pdu .raw_stream_from(¤t) .ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix))) - .ready_and_then(move |item| Self::each_pdu(item, user_id)) + .ready_and_then(Self::from_json_slice) }) .try_flatten_stream() } - fn each_pdu((pdu_id, pdu): KeyVal<'_>, user_id: Option<&UserId>) -> Result { + fn from_json_slice((pdu_id, pdu): KeyVal<'_>) -> Result { let pdu_id: RawPduId = pdu_id.into(); - let mut pdu = serde_json::from_slice::(pdu)?; - - if Some(pdu.sender.borrow()) != user_id { - pdu.remove_transaction_id().log_err().ok(); - } - - pdu.add_age().log_err().ok(); + let pdu = serde_json::from_slice::(pdu)?; Ok((pdu_id.pdu_count(), pdu)) } diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index 4b2f3cb2..e65ae2c7 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -165,7 +165,7 @@ impl Service { #[tracing::instrument(skip(self), level = "debug")] pub async fn first_item_in_room(&self, room_id: &RoomId) -> Result<(PduCount, PduEvent)> { - let pdus = self.pdus(None, room_id, None); + let pdus = self.pdus(room_id, None); pin_mut!(pdus); pdus.try_next() @@ -175,16 +175,12 @@ impl Service { #[tracing::instrument(skip(self), level = "debug")] pub async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result { - self.db.latest_pdu_in_room(None, room_id).await + self.db.latest_pdu_in_room(room_id).await } #[tracing::instrument(skip(self), level = "debug")] - pub async fn last_timeline_count( - &self, - sender_user: Option<&UserId>, - room_id: &RoomId, - ) -> Result { - self.db.last_timeline_count(sender_user, room_id).await + pub async fn last_timeline_count(&self, room_id: &RoomId) -> Result { + self.db.last_timeline_count(room_id).await } /// Returns the `count` of this pdu's id. @@ -545,6 +541,10 @@ impl Service { | _ => {}, } + // CONCERN: If we receive events with a relation out-of-order, we never write + // their relation / thread. We need some kind of way to trigger when we receive + // this event, and potentially a way to rebuild the table entirely. + if let Ok(content) = pdu.get_content::() { if let Ok(related_pducount) = self.get_pdu_count(&content.relates_to.event_id).await { self.services @@ -996,34 +996,30 @@ impl Service { #[inline] pub fn all_pdus<'a>( &'a self, - user_id: &'a UserId, room_id: &'a RoomId, ) -> impl Stream + Send + 'a { - self.pdus(Some(user_id), room_id, None).ignore_err() + self.pdus(room_id, None).ignore_err() } /// Reverse iteration starting at from. #[tracing::instrument(skip(self), level = "debug")] pub fn pdus_rev<'a>( &'a self, - user_id: Option<&'a UserId>, room_id: &'a RoomId, until: Option, ) -> impl Stream> + Send + 'a { self.db - .pdus_rev(user_id, room_id, until.unwrap_or_else(PduCount::max)) + .pdus_rev(room_id, until.unwrap_or_else(PduCount::max)) } /// Forward iteration starting at from. #[tracing::instrument(skip(self), level = "debug")] pub fn pdus<'a>( &'a self, - user_id: Option<&'a UserId>, room_id: &'a RoomId, from: Option, ) -> impl Stream> + Send + 'a { - self.db - .pdus(user_id, room_id, from.unwrap_or_else(PduCount::min)) + self.db.pdus(room_id, from.unwrap_or_else(PduCount::min)) } /// Replace a PDU with the redacted form. diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index fab02f6b..cd84f7e7 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -781,7 +781,7 @@ impl Service { for pdu in pdus { // Redacted events are not notification targets (we don't send push for them) - if pdu.contains_unsigned_property("redacted_because", serde_json::Value::is_string) { + if pdu.is_redacted() { continue; } From 980e2bb8d531b859fb669a3694ef5583aac5fedf Mon Sep 17 00:00:00 2001 From: Jade Ellis Date: Wed, 4 Jun 2025 00:11:09 +0100 Subject: [PATCH 02/28] feat: Add bundled aggregations support Add support for the m.replace and m.reference bundled aggregations. This should fix plenty of subtle client issues. Threads are not included in the new code as they have historically been written to the database. Replacing the old system would result in issues when switching away from continuwuity, so saved for later. Some TODOs have been left re event visibility and ignored users. These should be OK for now, though. --- src/api/client/context.rs | 18 +- src/api/client/message.rs | 11 +- src/api/client/relations.rs | 13 +- src/api/client/room/event.rs | 11 +- src/api/client/room/initial_sync.rs | 13 +- src/api/client/search.rs | 13 +- src/api/client/sync/mod.rs | 18 +- src/api/client/threads.rs | 14 +- .../pdu_metadata/bundled_aggregations.rs | 394 ++++++++++++++++++ src/service/rooms/pdu_metadata/mod.rs | 1 + 10 files changed, 491 insertions(+), 15 deletions(-) create mode 100644 src/service/rooms/pdu_metadata/bundled_aggregations.rs diff --git a/src/api/client/context.rs b/src/api/client/context.rs index 8c9ba64c..ee3a458c 100644 --- a/src/api/client/context.rs +++ b/src/api/client/context.rs @@ -93,7 +93,14 @@ pub(crate) async fn get_context_route( .ignore_err() .then(async |mut pdu| { pdu.1.set_unsigned(Some(sender_user)); - // TODO: bundled aggregations + if let Err(e) = services + .rooms + .pdu_metadata + .add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1) + .await + { + debug_warn!("Failed to add bundled aggregations: {e}"); + } pdu }) .ready_filter_map(|item| event_filter(item, filter)) @@ -109,7 +116,14 @@ pub(crate) async fn get_context_route( .ignore_err() .then(async |mut pdu| { pdu.1.set_unsigned(Some(sender_user)); - // TODO: bundled aggregations + if let Err(e) = services + .rooms + .pdu_metadata + .add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1) + .await + { + debug_warn!("Failed to add bundled aggregations: {e}"); + } pdu }) .ready_filter_map(|item| event_filter(item, filter)) diff --git a/src/api/client/message.rs b/src/api/client/message.rs index cd6b0a60..6087478c 100644 --- a/src/api/client/message.rs +++ b/src/api/client/message.rs @@ -2,7 +2,7 @@ use core::panic; use axum::extract::State; use conduwuit::{ - Err, Result, at, + Err, Result, at, debug_warn, matrix::{ Event, pdu::{PduCount, PduEvent}, @@ -134,7 +134,14 @@ pub(crate) async fn get_message_events_route( .take(limit) .then(async |mut pdu| { pdu.1.set_unsigned(Some(sender_user)); - // TODO: bundled aggregations + if let Err(e) = services + .rooms + .pdu_metadata + .add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1) + .await + { + debug_warn!("Failed to add bundled aggregations: {e}"); + } pdu }) .collect() diff --git a/src/api/client/relations.rs b/src/api/client/relations.rs index 5e323756..377f0c71 100644 --- a/src/api/client/relations.rs +++ b/src/api/client/relations.rs @@ -1,6 +1,6 @@ use axum::extract::State; use conduwuit::{ - Result, at, + Result, at, debug_warn, matrix::pdu::PduCount, utils::{IterStream, ReadyExt, result::FlatOk, stream::WidebandExt}, }; @@ -149,6 +149,17 @@ async fn paginate_relations_with_filter( .ready_take_while(|(count, _)| Some(*count) != to) .wide_filter_map(|item| visibility_filter(services, sender_user, item)) .take(limit) + .then(async |mut pdu| { + if let Err(e) = services + .rooms + .pdu_metadata + .add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1) + .await + { + debug_warn!("Failed to add bundled aggregations to relation: {e}"); + } + pdu + }) .collect() .await; diff --git a/src/api/client/room/event.rs b/src/api/client/room/event.rs index ec673d75..e7b0bb37 100644 --- a/src/api/client/room/event.rs +++ b/src/api/client/room/event.rs @@ -1,5 +1,5 @@ use axum::extract::State; -use conduwuit::{Err, Event, Result, err}; +use conduwuit::{Err, Event, Result, debug_warn, err}; use futures::{FutureExt, TryFutureExt, future::try_join}; use ruma::api::client::room::get_room_event; @@ -38,6 +38,15 @@ pub(crate) async fn get_room_event_route( "Fetched PDU must match requested" ); + if let Err(e) = services + .rooms + .pdu_metadata + .add_bundled_aggregations_to_pdu(body.sender_user(), &mut event) + .await + { + debug_warn!("Failed to add bundled aggregations to event: {e}"); + } + event.set_unsigned(body.sender_user.as_deref()); Ok(get_room_event::v3::Response { event: event.into_room_event() }) diff --git a/src/api/client/room/initial_sync.rs b/src/api/client/room/initial_sync.rs index 765d2a39..2f965245 100644 --- a/src/api/client/room/initial_sync.rs +++ b/src/api/client/room/initial_sync.rs @@ -1,6 +1,6 @@ use axum::extract::State; use conduwuit::{ - Err, PduEvent, Result, at, + Err, PduEvent, Result, at, debug_warn, utils::{BoolExt, stream::TryTools}, }; use futures::TryStreamExt; @@ -35,7 +35,16 @@ pub(crate) async fn room_initial_sync_route( .try_take(limit) .and_then(async |mut pdu| { pdu.1.set_unsigned(body.sender_user.as_deref()); - // TODO: bundled aggregations + if let Some(sender_user) = body.sender_user.as_deref() { + if let Err(e) = services + .rooms + .pdu_metadata + .add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1) + .await + { + debug_warn!("Failed to add bundled aggregations: {e}"); + } + } Ok(pdu) }) .try_collect() diff --git a/src/api/client/search.rs b/src/api/client/search.rs index d4dcde57..af5fccec 100644 --- a/src/api/client/search.rs +++ b/src/api/client/search.rs @@ -2,7 +2,7 @@ use std::collections::BTreeMap; use axum::extract::State; use conduwuit::{ - Err, Result, at, is_true, + Err, Result, at, debug_warn, is_true, matrix::pdu::PduEvent, result::FlatOk, utils::{IterStream, stream::ReadyExt}, @@ -144,6 +144,17 @@ async fn category_room_events( .map(at!(2)) .flatten() .stream() + .then(|mut pdu| async { + if let Err(e) = services + .rooms + .pdu_metadata + .add_bundled_aggregations_to_pdu(sender_user, &mut pdu) + .await + { + debug_warn!("Failed to add bundled aggregations to search result: {e}"); + } + pdu + }) .map(PduEvent::into_room_event) .map(|result| SearchResult { rank: None, diff --git a/src/api/client/sync/mod.rs b/src/api/client/sync/mod.rs index db581bd6..1ea62883 100644 --- a/src/api/client/sync/mod.rs +++ b/src/api/client/sync/mod.rs @@ -3,7 +3,7 @@ mod v4; mod v5; use conduwuit::{ - Error, PduCount, Result, + Error, PduCount, Result, debug_warn, matrix::pdu::PduEvent, utils::stream::{BroadbandExt, ReadyExt, TryIgnore}, }; @@ -42,13 +42,23 @@ async fn load_timeline( .timeline .pdus_rev(room_id, None) .ignore_err() + .ready_skip_while(|&(pducount, _)| pducount > next_batch.unwrap_or_else(PduCount::max)) + .ready_take_while(|&(pducount, _)| pducount > roomsincecount) .map(move |mut pdu| { pdu.1.set_unsigned(Some(sender_user)); - // TODO: bundled aggregations pdu }) - .ready_skip_while(|&(pducount, _)| pducount > next_batch.unwrap_or_else(PduCount::max)) - .ready_take_while(|&(pducount, _)| pducount > roomsincecount); + .then(async move |mut pdu| { + if let Err(e) = services + .rooms + .pdu_metadata + .add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1) + .await + { + debug_warn!("Failed to add bundled aggregations: {e}"); + } + pdu + }); // Take the last events for the timeline pin_mut!(non_timeline_pdus); diff --git a/src/api/client/threads.rs b/src/api/client/threads.rs index 404da4f8..09fb75d6 100644 --- a/src/api/client/threads.rs +++ b/src/api/client/threads.rs @@ -1,6 +1,6 @@ use axum::extract::State; use conduwuit::{ - Result, at, + Result, at, debug_warn, matrix::pdu::{PduCount, PduEvent}, }; use futures::StreamExt; @@ -28,7 +28,6 @@ pub(crate) async fn get_threads_route( .transpose()? .unwrap_or_else(PduCount::max); - // TODO: bundled aggregation // TODO: user_can_see_event and set_unsigned should be at the same level / // function, so unsigned is only set for seen events. let threads: Vec<(PduCount, PduEvent)> = services @@ -45,6 +44,17 @@ pub(crate) async fn get_threads_route( .await .then_some((count, pdu)) }) + .then(|(count, mut pdu)| async move { + if let Err(e) = services + .rooms + .pdu_metadata + .add_bundled_aggregations_to_pdu(body.sender_user(), &mut pdu) + .await + { + debug_warn!("Failed to add bundled aggregations to thread: {e}"); + } + (count, pdu) + }) .collect() .await; diff --git a/src/service/rooms/pdu_metadata/bundled_aggregations.rs b/src/service/rooms/pdu_metadata/bundled_aggregations.rs new file mode 100644 index 00000000..4ef8efc1 --- /dev/null +++ b/src/service/rooms/pdu_metadata/bundled_aggregations.rs @@ -0,0 +1,394 @@ +use conduwuit::{Event, PduEvent, Result, err}; +use ruma::{ + EventId, RoomId, UserId, + api::Direction, + events::relation::{BundledMessageLikeRelations, BundledReference, ReferenceChunk}, +}; + +use super::PdusIterItem; + +const MAX_BUNDLED_RELATIONS: usize = 50; + +impl super::Service { + /// Gets bundled aggregations for an event according to the Matrix + /// specification. + /// - m.replace relations are bundled to include the most recent replacement + /// event. + /// - m.reference relations are bundled to include a chunk of event IDs. + #[tracing::instrument(skip(self), level = "debug")] + pub async fn get_bundled_aggregations( + &self, + user_id: &UserId, + room_id: &RoomId, + event_id: &EventId, + ) -> Result>>> { + let relations = self + .get_relations( + user_id, + room_id, + event_id, + conduwuit::PduCount::max(), + MAX_BUNDLED_RELATIONS, + 0, + Direction::Backward, + ) + .await; + // The relations database code still handles the basic unsigned data + // We don't want to recursively fetch relations + + // TODO: Event visibility check + // TODO: ignored users? + + if relations.is_empty() { + return Ok(None); + } + + let mut replace_events = Vec::with_capacity(relations.len().min(10)); // Most events have few replacements + let mut reference_events = Vec::with_capacity(relations.len()); + + for relation in &relations { + let pdu = &relation.1; + + let content = pdu.get_content_as_value(); + if let Some(relates_to) = content.get("m.relates_to") { + // We don't check that the event relates back, because we assume the database is + // good. + if let Some(rel_type) = relates_to.get("rel_type") { + match rel_type.as_str() { + | Some("m.replace") => { + replace_events.push(relation); + }, + | Some("m.reference") => { + reference_events.push(relation); + }, + | _ => { + // Ignore other relation types for now + // Threads are in the database but not handled here + // Other types are not specified AFAICT. + }, + } + } + } + } + + // If no relations to bundle, return None + if replace_events.is_empty() && reference_events.is_empty() { + return Ok(None); + } + + let mut bundled = BundledMessageLikeRelations::new(); + + // Handle m.replace relations - find the most recent one + if !replace_events.is_empty() { + let most_recent_replacement = Self::find_most_recent_replacement(&replace_events)?; + + // Convert the replacement event to the bundled format + if let Some(replacement_pdu) = most_recent_replacement { + // According to the Matrix spec, we should include the full event as raw JSON + let replacement_json = serde_json::to_string(replacement_pdu) + .map_err(|e| err!(Database("Failed to serialize replacement event: {e}")))?; + let raw_value = serde_json::value::RawValue::from_string(replacement_json) + .map_err(|e| err!(Database("Failed to create RawValue: {e}")))?; + bundled.replace = Some(Box::new(raw_value)); + } + } + + // Handle m.reference relations - collect event IDs + if !reference_events.is_empty() { + let reference_chunk = Self::build_reference_chunk(&reference_events)?; + if !reference_chunk.is_empty() { + bundled.reference = Some(Box::new(ReferenceChunk::new(reference_chunk))); + } + } + + // TODO: Handle other relation types (m.annotation, etc.) when specified + + Ok(Some(bundled)) + } + + /// Build reference chunk for m.reference bundled aggregations + fn build_reference_chunk( + reference_events: &[&PdusIterItem], + ) -> Result> { + let mut chunk = Vec::with_capacity(reference_events.len()); + + for relation in reference_events { + let pdu = &relation.1; + + let reference_entry = BundledReference::new(pdu.event_id().to_owned()); + chunk.push(reference_entry); + } + + // Don't sort, order is unspecified + + Ok(chunk) + } + + /// Find the most recent replacement event based on origin_server_ts and + /// lexicographic event_id ordering + fn find_most_recent_replacement<'a>( + replacement_events: &'a [&'a PdusIterItem], + ) -> Result> { + if replacement_events.is_empty() { + return Ok(None); + } + + let mut most_recent: Option<&PduEvent> = None; + + // Jank, is there a better way to do this? + for relation in replacement_events { + let pdu = &relation.1; + + match most_recent { + | None => { + most_recent = Some(pdu); + }, + | Some(current_most_recent) => { + // Compare by origin_server_ts first + match pdu + .origin_server_ts() + .cmp(¤t_most_recent.origin_server_ts()) + { + | std::cmp::Ordering::Greater => { + most_recent = Some(pdu); + }, + | std::cmp::Ordering::Equal => { + // If timestamps are equal, use lexicographic ordering of event_id + if pdu.event_id() > current_most_recent.event_id() { + most_recent = Some(pdu); + } + }, + | std::cmp::Ordering::Less => { + // Keep current most recent + }, + } + }, + } + } + + Ok(most_recent) + } + + /// Adds bundled aggregations to a PDU's unsigned field + #[tracing::instrument(skip(self, pdu), level = "debug")] + pub async fn add_bundled_aggregations_to_pdu( + &self, + user_id: &UserId, + pdu: &mut PduEvent, + ) -> Result<()> { + if pdu.is_redacted() { + return Ok(()); + } + + let bundled_aggregations = self + .get_bundled_aggregations(user_id, pdu.room_id(), pdu.event_id()) + .await?; + + if let Some(aggregations) = bundled_aggregations { + let aggregations_json = serde_json::to_value(aggregations) + .map_err(|e| err!(Database("Failed to serialize bundled aggregations: {e}")))?; + + Self::add_bundled_aggregations_to_unsigned(pdu, aggregations_json)?; + } + + Ok(()) + } + + /// Helper method to add bundled aggregations to a PDU's unsigned + /// field + fn add_bundled_aggregations_to_unsigned( + pdu: &mut PduEvent, + aggregations_json: serde_json::Value, + ) -> Result<()> { + use serde_json::{ + Map, Value as JsonValue, + value::{RawValue as RawJsonValue, to_raw_value}, + }; + + let mut unsigned: Map = pdu + .unsigned + .as_deref() + .map(RawJsonValue::get) + .map_or_else(|| Ok(Map::new()), serde_json::from_str) + .map_err(|e| err!(Database("Invalid unsigned in pdu event: {e}")))?; + + let relations = unsigned + .entry("m.relations") + .or_insert_with(|| JsonValue::Object(Map::new())) + .as_object_mut() + .ok_or_else(|| err!(Database("m.relations is not an object")))?; + + if let JsonValue::Object(aggregations_map) = aggregations_json { + for (rel_type, aggregation) in aggregations_map { + relations.insert(rel_type, aggregation); + } + } + + pdu.unsigned = Some(to_raw_value(&unsigned)?); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use conduwuit_core::pdu::{EventHash, PduEvent}; + use ruma::{UInt, events::TimelineEventType, owned_event_id, owned_room_id, owned_user_id}; + use serde_json::{Value as JsonValue, json, value::to_raw_value}; + + fn create_test_pdu(unsigned_content: Option) -> PduEvent { + PduEvent { + event_id: owned_event_id!("$test:example.com"), + room_id: owned_room_id!("!test:example.com"), + sender: owned_user_id!("@test:example.com"), + origin_server_ts: UInt::try_from(1_234_567_890_u64).unwrap(), + kind: TimelineEventType::RoomMessage, + content: to_raw_value(&json!({"msgtype": "m.text", "body": "test"})).unwrap(), + state_key: None, + prev_events: vec![], + depth: UInt::from(1_u32), + auth_events: vec![], + redacts: None, + unsigned: unsigned_content.map(|content| to_raw_value(&content).unwrap()), + hashes: EventHash { sha256: "test_hash".to_owned() }, + signatures: None, + origin: None, + } + } + + fn create_bundled_aggregations() -> JsonValue { + json!({ + "m.replace": { + "event_id": "$replace:example.com", + "origin_server_ts": 1_234_567_890, + "sender": "@replacer:example.com" + }, + "m.reference": { + "count": 5, + "chunk": [ + "$ref1:example.com", + "$ref2:example.com" + ] + } + }) + } + + #[test] + fn test_add_bundled_aggregations_to_unsigned_no_existing_unsigned() { + let mut pdu = create_test_pdu(None); + let aggregations = create_bundled_aggregations(); + + let result = super::super::Service::add_bundled_aggregations_to_unsigned( + &mut pdu, + aggregations.clone(), + ); + assert!(result.is_ok(), "Should succeed when no unsigned field exists"); + + assert!(pdu.unsigned.is_some(), "Unsigned field should be created"); + + let unsigned_str = pdu.unsigned.as_ref().unwrap().get(); + let unsigned: JsonValue = serde_json::from_str(unsigned_str).unwrap(); + + assert!(unsigned.get("m.relations").is_some(), "m.relations should exist"); + assert_eq!( + unsigned["m.relations"], aggregations, + "Relations should match the aggregations" + ); + } + + #[test] + fn test_add_bundled_aggregations_to_unsigned_overwrite_same_relation_type() { + let existing_unsigned = json!({ + "m.relations": { + "m.replace": { + "event_id": "$old_replace:example.com", + "origin_server_ts": 1_111_111_111, + "sender": "@old_replacer:example.com" + } + } + }); + + let mut pdu = create_test_pdu(Some(existing_unsigned)); + let new_aggregations = create_bundled_aggregations(); + + let result = super::super::Service::add_bundled_aggregations_to_unsigned( + &mut pdu, + new_aggregations.clone(), + ); + assert!(result.is_ok(), "Should succeed when overwriting same relation type"); + + let unsigned_str = pdu.unsigned.as_ref().unwrap().get(); + let unsigned: JsonValue = serde_json::from_str(unsigned_str).unwrap(); + + let relations = &unsigned["m.relations"]; + + assert_eq!( + relations["m.replace"], new_aggregations["m.replace"], + "m.replace should be updated" + ); + assert_eq!( + relations["m.replace"]["event_id"], "$replace:example.com", + "Should have new event_id" + ); + + assert!(relations.get("m.reference").is_some(), "New m.reference should be added"); + } + + #[test] + fn test_add_bundled_aggregations_to_unsigned_preserve_other_unsigned_fields() { + // Test case: Other unsigned fields should be preserved + let existing_unsigned = json!({ + "age": 98765, + "prev_content": {"msgtype": "m.text", "body": "old message"}, + "redacted_because": {"event_id": "$redaction:example.com"}, + "m.relations": { + "m.annotation": {"count": 1} + } + }); + + let mut pdu = create_test_pdu(Some(existing_unsigned)); + let new_aggregations = json!({ + "m.replace": {"event_id": "$new:example.com"} + }); + + let result = super::super::Service::add_bundled_aggregations_to_unsigned( + &mut pdu, + new_aggregations, + ); + assert!(result.is_ok(), "Should succeed while preserving other fields"); + + let unsigned_str = pdu.unsigned.as_ref().unwrap().get(); + let unsigned: JsonValue = serde_json::from_str(unsigned_str).unwrap(); + + // Verify all existing fields are preserved + assert_eq!(unsigned["age"], 98765, "age should be preserved"); + assert!(unsigned.get("prev_content").is_some(), "prev_content should be preserved"); + assert!( + unsigned.get("redacted_because").is_some(), + "redacted_because should be preserved" + ); + + // Verify relations were merged correctly + let relations = &unsigned["m.relations"]; + assert!( + relations.get("m.annotation").is_some(), + "Existing m.annotation should be preserved" + ); + assert!(relations.get("m.replace").is_some(), "New m.replace should be added"); + } + + #[test] + fn test_add_bundled_aggregations_to_unsigned_invalid_existing_unsigned() { + // Test case: Invalid JSON in existing unsigned should result in error + let mut pdu = create_test_pdu(None); + // Manually set invalid unsigned data + pdu.unsigned = Some(to_raw_value(&"invalid json").unwrap()); + + let aggregations = create_bundled_aggregations(); + let result = + super::super::Service::add_bundled_aggregations_to_unsigned(&mut pdu, aggregations); + + assert!(result.is_err(), "fails when existing unsigned is invalid"); + // Should we ignore the error and overwrite anyway? + } +} diff --git a/src/service/rooms/pdu_metadata/mod.rs b/src/service/rooms/pdu_metadata/mod.rs index 18221c2d..2dff54d8 100644 --- a/src/service/rooms/pdu_metadata/mod.rs +++ b/src/service/rooms/pdu_metadata/mod.rs @@ -1,3 +1,4 @@ +mod bundled_aggregations; mod data; use std::sync::Arc; From 74cf5445cc2aee970ea51f70aeb113aeba66ff8b Mon Sep 17 00:00:00 2001 From: Jade Ellis Date: Wed, 4 Jun 2025 22:50:17 +0100 Subject: [PATCH 03/28] fix: Filter out invalid replacements from bundled aggregations --- .../pdu_metadata/bundled_aggregations.rs | 375 +++++++++++++++++- 1 file changed, 373 insertions(+), 2 deletions(-) diff --git a/src/service/rooms/pdu_metadata/bundled_aggregations.rs b/src/service/rooms/pdu_metadata/bundled_aggregations.rs index 4ef8efc1..c47f637a 100644 --- a/src/service/rooms/pdu_metadata/bundled_aggregations.rs +++ b/src/service/rooms/pdu_metadata/bundled_aggregations.rs @@ -43,7 +43,10 @@ impl super::Service { return Ok(None); } - let mut replace_events = Vec::with_capacity(relations.len().min(10)); // Most events have few replacements + // Get the original event for validation of replacement events + let original_event = self.services.timeline.get_pdu(event_id).await?; + + let mut replace_events = Vec::with_capacity(relations.len()); let mut reference_events = Vec::with_capacity(relations.len()); for relation in &relations { @@ -56,7 +59,10 @@ impl super::Service { if let Some(rel_type) = relates_to.get("rel_type") { match rel_type.as_str() { | Some("m.replace") => { - replace_events.push(relation); + // Only consider valid replacements + if Self::is_valid_replacement_event(&original_event, pdu).await? { + replace_events.push(relation); + } }, | Some("m.reference") => { reference_events.push(relation); @@ -228,6 +234,56 @@ impl super::Service { Ok(()) } + + /// Validates that an event is acceptable as a replacement for another event + /// See C/S spec "Validity of replacement events" + #[tracing::instrument(level = "debug")] + async fn is_valid_replacement_event( + original_event: &PduEvent, + replacement_event: &PduEvent, + ) -> Result { + // 1. Same room_id + if original_event.room_id() != replacement_event.room_id() { + return Ok(false); + } + + // 2. Same sender + if original_event.sender() != replacement_event.sender() { + return Ok(false); + } + + // 3. Same type + if original_event.event_type() != replacement_event.event_type() { + return Ok(false); + } + + // 4. Neither event should have a state_key property + if original_event.state_key().is_some() || replacement_event.state_key().is_some() { + return Ok(false); + } + + // 5. Original event must not have rel_type of m.replace + let original_content = original_event.get_content_as_value(); + if let Some(relates_to) = original_content.get("m.relates_to") { + if let Some(rel_type) = relates_to.get("rel_type") { + if rel_type.as_str() == Some("m.replace") { + return Ok(false); + } + } + } + + // 6. Replacement event must have m.new_content property + // Skip this check for encrypted events, as m.new_content would be inside the + // encrypted payload + if replacement_event.event_type() != &ruma::events::TimelineEventType::RoomEncrypted { + let replacement_content = replacement_event.get_content_as_value(); + if replacement_content.get("m.new_content").is_none() { + return Ok(false); + } + } + + Ok(true) + } } #[cfg(test)] @@ -391,4 +447,319 @@ mod tests { assert!(result.is_err(), "fails when existing unsigned is invalid"); // Should we ignore the error and overwrite anyway? } + + // Test helper function to create test PDU events + fn create_test_event( + event_id: &str, + room_id: &str, + sender: &str, + event_type: TimelineEventType, + content: &JsonValue, + state_key: Option<&str>, + ) -> PduEvent { + PduEvent { + event_id: event_id.try_into().unwrap(), + room_id: room_id.try_into().unwrap(), + sender: sender.try_into().unwrap(), + origin_server_ts: UInt::try_from(1_234_567_890_u64).unwrap(), + kind: event_type, + content: to_raw_value(&content).unwrap(), + state_key: state_key.map(Into::into), + prev_events: vec![], + depth: UInt::from(1_u32), + auth_events: vec![], + redacts: None, + unsigned: None, + hashes: EventHash { sha256: "test_hash".to_owned() }, + signatures: None, + origin: None, + } + } + + /// Test that a valid replacement event passes validation + #[tokio::test] + async fn test_valid_replacement_event() { + let original = create_test_event( + "$original:example.com", + "!room:example.com", + "@user:example.com", + TimelineEventType::RoomMessage, + &json!({"msgtype": "m.text", "body": "original message"}), + None, + ); + + let replacement = create_test_event( + "$replacement:example.com", + "!room:example.com", + "@user:example.com", + TimelineEventType::RoomMessage, + &json!({ + "msgtype": "m.text", + "body": "* edited message", + "m.new_content": { + "msgtype": "m.text", + "body": "edited message" + }, + "m.relates_to": { + "rel_type": "m.replace", + "event_id": "$original:example.com" + } + }), + None, + ); + + let result = + super::super::Service::is_valid_replacement_event(&original, &replacement).await; + assert!(result.is_ok(), "Validation should succeed"); + assert!(result.unwrap(), "Valid replacement event should be accepted"); + } + + /// Test replacement event with different room ID is rejected + #[tokio::test] + async fn test_replacement_event_different_room() { + let original = create_test_event( + "$original:example.com", + "!room1:example.com", + "@user:example.com", + TimelineEventType::RoomMessage, + &json!({"msgtype": "m.text", "body": "original message"}), + None, + ); + + let replacement = create_test_event( + "$replacement:example.com", + "!room2:example.com", // Different room + "@user:example.com", + TimelineEventType::RoomMessage, + &json!({ + "msgtype": "m.text", + "body": "* edited message", + "m.new_content": { + "msgtype": "m.text", + "body": "edited message" + } + }), + None, + ); + + let result = + super::super::Service::is_valid_replacement_event(&original, &replacement).await; + assert!(result.is_ok(), "Validation should succeed"); + assert!(!result.unwrap(), "Different room ID should be rejected"); + } + + /// Test replacement event with different sender is rejected + #[tokio::test] + async fn test_replacement_event_different_sender() { + let original = create_test_event( + "$original:example.com", + "!room:example.com", + "@user1:example.com", + TimelineEventType::RoomMessage, + &json!({"msgtype": "m.text", "body": "original message"}), + None, + ); + + let replacement = create_test_event( + "$replacement:example.com", + "!room:example.com", + "@user2:example.com", // Different sender + TimelineEventType::RoomMessage, + &json!({ + "msgtype": "m.text", + "body": "* edited message", + "m.new_content": { + "msgtype": "m.text", + "body": "edited message" + } + }), + None, + ); + + let result = + super::super::Service::is_valid_replacement_event(&original, &replacement).await; + assert!(result.is_ok(), "Validation should succeed"); + assert!(!result.unwrap(), "Different sender should be rejected"); + } + + /// Test replacement event with different type is rejected + #[tokio::test] + async fn test_replacement_event_different_type() { + let original = create_test_event( + "$original:example.com", + "!room:example.com", + "@user:example.com", + TimelineEventType::RoomMessage, + &json!({"msgtype": "m.text", "body": "original message"}), + None, + ); + + let replacement = create_test_event( + "$replacement:example.com", + "!room:example.com", + "@user:example.com", + TimelineEventType::RoomTopic, // Different event type + &json!({ + "topic": "new topic", + "m.new_content": { + "topic": "new topic" + } + }), + None, + ); + + let result = + super::super::Service::is_valid_replacement_event(&original, &replacement).await; + assert!(result.is_ok(), "Validation should succeed"); + assert!(!result.unwrap(), "Different event type should be rejected"); + } + + /// Test replacement event with state key is rejected + #[tokio::test] + async fn test_replacement_event_with_state_key() { + let original = create_test_event( + "$original:example.com", + "!room:example.com", + "@user:example.com", + TimelineEventType::RoomName, + &json!({"name": "room name"}), + Some(""), // Has state key + ); + + let replacement = create_test_event( + "$replacement:example.com", + "!room:example.com", + "@user:example.com", + TimelineEventType::RoomName, + &json!({ + "name": "new room name", + "m.new_content": { + "name": "new room name" + } + }), + None, + ); + + let result = + super::super::Service::is_valid_replacement_event(&original, &replacement).await; + assert!(result.is_ok(), "Validation should succeed"); + assert!(!result.unwrap(), "Event with state key should be rejected"); + } + + /// Test replacement of an event that is already a replacement is rejected + #[tokio::test] + async fn test_replacement_event_original_is_replacement() { + let original = create_test_event( + "$original:example.com", + "!room:example.com", + "@user:example.com", + TimelineEventType::RoomMessage, + &json!({ + "msgtype": "m.text", + "body": "* edited message", + "m.relates_to": { + "rel_type": "m.replace", // Original is already a replacement + "event_id": "$some_other:example.com" + } + }), + None, + ); + + let replacement = create_test_event( + "$replacement:example.com", + "!room:example.com", + "@user:example.com", + TimelineEventType::RoomMessage, + &json!({ + "msgtype": "m.text", + "body": "* edited again", + "m.new_content": { + "msgtype": "m.text", + "body": "edited again" + } + }), + None, + ); + + let result = + super::super::Service::is_valid_replacement_event(&original, &replacement).await; + assert!(result.is_ok(), "Validation should succeed"); + assert!(!result.unwrap(), "Replacement of replacement should be rejected"); + } + + /// Test replacement event missing m.new_content is rejected + #[tokio::test] + async fn test_replacement_event_missing_new_content() { + let original = create_test_event( + "$original:example.com", + "!room:example.com", + "@user:example.com", + TimelineEventType::RoomMessage, + &json!({"msgtype": "m.text", "body": "original message"}), + None, + ); + + let replacement = create_test_event( + "$replacement:example.com", + "!room:example.com", + "@user:example.com", + TimelineEventType::RoomMessage, + &json!({ + "msgtype": "m.text", + "body": "* edited message" + // Missing m.new_content + }), + None, + ); + + let result = + super::super::Service::is_valid_replacement_event(&original, &replacement).await; + assert!(result.is_ok(), "Validation should succeed"); + assert!(!result.unwrap(), "Missing m.new_content should be rejected"); + } + + /// Test encrypted replacement event without m.new_content is accepted + #[tokio::test] + async fn test_replacement_event_encrypted_missing_new_content_is_valid() { + let original = create_test_event( + "$original:example.com", + "!room:example.com", + "@user:example.com", + TimelineEventType::RoomEncrypted, + &json!({ + "algorithm": "m.megolm.v1.aes-sha2", + "ciphertext": "encrypted_payload_base64", + "sender_key": "sender_key", + "session_id": "session_id" + }), + None, + ); + + let replacement = create_test_event( + "$replacement:example.com", + "!room:example.com", + "@user:example.com", + TimelineEventType::RoomEncrypted, + &json!({ + "algorithm": "m.megolm.v1.aes-sha2", + "ciphertext": "encrypted_replacement_payload_base64", + "sender_key": "sender_key", + "session_id": "session_id", + "m.relates_to": { + "rel_type": "m.replace", + "event_id": "$original:example.com" + } + // No m.new_content in cleartext - this is valid for encrypted events + }), + None, + ); + + let result = + super::super::Service::is_valid_replacement_event(&original, &replacement).await; + assert!(result.is_ok(), "Validation should succeed"); + assert!( + result.unwrap(), + "Encrypted replacement without cleartext m.new_content should be accepted" + ); + } } From eda20ac4f5934642025701bd15347c8b20140f3e Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Sat, 7 Jun 2025 00:46:55 +0100 Subject: [PATCH 04/28] fix an auth rule not applying correctly --- src/core/matrix/state_res/event_auth.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/core/matrix/state_res/event_auth.rs b/src/core/matrix/state_res/event_auth.rs index 715e5156..fcdce2e6 100644 --- a/src/core/matrix/state_res/event_auth.rs +++ b/src/core/matrix/state_res/event_auth.rs @@ -255,6 +255,16 @@ where }, | Some(e) => e, }; + // just re-check 1.2 to work around a bug + let Some(room_id_server_name) = incoming_event.room_id().server_name() else { + warn!("room ID has no servername"); + return Ok(false); + }; + + if room_id_server_name != sender.server_name() { + warn!("servername of room ID does not match servername of m.room.create sender"); + return Ok(false); + } // 3. If event does not have m.room.create in auth_events reject if !incoming_event From 24cd34ee98a6657c7e60d24bc9aec52327e5cf88 Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Sat, 7 Jun 2025 00:55:03 +0100 Subject: [PATCH 05/28] Note about ruma#2064 in TODO --- src/core/matrix/state_res/event_auth.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/core/matrix/state_res/event_auth.rs b/src/core/matrix/state_res/event_auth.rs index fcdce2e6..11cdff40 100644 --- a/src/core/matrix/state_res/event_auth.rs +++ b/src/core/matrix/state_res/event_auth.rs @@ -217,8 +217,9 @@ where } /* - // TODO: In the past this code caused problems federating with synapse, maybe this has been - // resolved already. Needs testing. + // TODO: In the past this code was commented as it caused problems with Synapse. This is no + // longer the case. This needs to be implemented. + // See also: https://github.com/ruma/ruma/pull/2064 // // 2. Reject if auth_events // a. auth_events cannot have duplicate keys since it's a BTree From 64519ac672a48efdb6a06ed240c915438b540ea6 Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Fri, 25 Apr 2025 20:59:52 -0700 Subject: [PATCH 06/28] Fix spaces rooms list load error. rev2 --- src/api/client/space.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/api/client/space.rs b/src/api/client/space.rs index 92768926..23b1e80f 100644 --- a/src/api/client/space.rs +++ b/src/api/client/space.rs @@ -121,7 +121,9 @@ where .map(|(key, val)| (key, val.collect())) .collect(); - if !populate { + if populate { + rooms.push(summary_to_chunk(summary.clone())); + } else { children = children .iter() .rev() @@ -144,10 +146,8 @@ where .collect(); } - if populate { - rooms.push(summary_to_chunk(summary.clone())); - } else if queue.is_empty() && children.is_empty() { - return Err!(Request(InvalidParam("Room IDs in token were not found."))); + if !populate && queue.is_empty() && children.is_empty() { + break; } parents.insert(current_room.clone()); From 3bb571035690fece7214df84ea88228c2424452c Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Fri, 25 Apr 2025 21:06:00 -0700 Subject: [PATCH 07/28] probably incorrectly delete support for non-standardized matrix srv record --- src/service/resolver/actual.rs | 37 ++++++++++++++++------------------ 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/src/service/resolver/actual.rs b/src/service/resolver/actual.rs index d23ef95a..52cd5d7d 100644 --- a/src/service/resolver/actual.rs +++ b/src/service/resolver/actual.rs @@ -306,28 +306,25 @@ impl super::Service { #[tracing::instrument(name = "srv", level = "debug", skip(self))] async fn query_srv_record(&self, hostname: &'_ str) -> Result> { - let hostnames = - [format!("_matrix-fed._tcp.{hostname}."), format!("_matrix._tcp.{hostname}.")]; + self.services.server.check_running()?; - for hostname in hostnames { - self.services.server.check_running()?; + debug!("querying SRV for {hostname:?}"); - debug!("querying SRV for {hostname:?}"); - let hostname = hostname.trim_end_matches('.'); - match self.resolver.resolver.srv_lookup(hostname).await { - | Err(e) => Self::handle_resolve_error(&e, hostname)?, - | Ok(result) => { - return Ok(result.iter().next().map(|result| { - FedDest::Named( - result.target().to_string().trim_end_matches('.').to_owned(), - format!(":{}", result.port()) - .as_str() - .try_into() - .unwrap_or_else(|_| FedDest::default_port()), - ) - })); - }, - } + let hostname_suffix = format!("_matrix-fed._tcp.{hostname}."); + let hostname = hostname_suffix.trim_end_matches('.'); + match self.resolver.resolver.srv_lookup(hostname).await { + | Err(e) => Self::handle_resolve_error(&e, hostname)?, + | Ok(result) => { + return Ok(result.iter().next().map(|result| { + FedDest::Named( + result.target().to_string().trim_end_matches('.').to_owned(), + format!(":{}", result.port()) + .as_str() + .try_into() + .unwrap_or_else(|_| FedDest::default_port()), + ) + })); + }, } Ok(None) From bad37eaf0de364a0e232dac33761df6bde9ed787 Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Wed, 14 May 2025 06:53:00 -0700 Subject: [PATCH 08/28] bump the number of allowed immutable memtables by 1, to allow for greater flood protection this should probably not be applied if you have rocksdb_atomic_flush = false (the default) --- src/database/engine/cf_opts.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/database/engine/cf_opts.rs b/src/database/engine/cf_opts.rs index cbbd1012..666f9f9e 100644 --- a/src/database/engine/cf_opts.rs +++ b/src/database/engine/cf_opts.rs @@ -29,7 +29,7 @@ fn descriptor_cf_options( set_table_options(&mut opts, &desc, cache)?; opts.set_min_write_buffer_number(1); - opts.set_max_write_buffer_number(2); + opts.set_max_write_buffer_number(3); opts.set_write_buffer_size(desc.write_size); opts.set_target_file_size_base(desc.file_size); From cb15ac3c0169cc57be584698ec03fbbe1ee9ffce Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sat, 26 Apr 2025 08:23:57 +0000 Subject: [PATCH 09/28] Mitigate large futures Signed-off-by: Jason Volk --- src/admin/room/moderation.rs | 10 ++++++++-- src/admin/user/commands.rs | 4 +++- src/api/client/account.rs | 8 ++++++-- src/api/client/membership.rs | 15 ++++++++++++--- src/api/client/state.rs | 1 + src/service/admin/mod.rs | 3 +-- 6 files changed, 31 insertions(+), 10 deletions(-) diff --git a/src/admin/room/moderation.rs b/src/admin/room/moderation.rs index ee429fc6..af6cf928 100644 --- a/src/admin/room/moderation.rs +++ b/src/admin/room/moderation.rs @@ -155,7 +155,10 @@ async fn ban_room(&self, room: OwnedRoomOrAliasId) -> Result { evicting admins too)", ); - if let Err(e) = leave_room(self.services, user_id, &room_id, None).await { + if let Err(e) = leave_room(self.services, user_id, &room_id, None) + .boxed() + .await + { warn!("Failed to leave room: {e}"); } @@ -323,7 +326,10 @@ async fn ban_list_of_rooms(&self) -> Result { evicting admins too)", ); - if let Err(e) = leave_room(self.services, user_id, &room_id, None).await { + if let Err(e) = leave_room(self.services, user_id, &room_id, None) + .boxed() + .await + { warn!("Failed to leave room: {e}"); } diff --git a/src/admin/user/commands.rs b/src/admin/user/commands.rs index e5e481e5..062534c3 100644 --- a/src/admin/user/commands.rs +++ b/src/admin/user/commands.rs @@ -655,7 +655,9 @@ pub(super) async fn force_leave_room( return Err!("{user_id} is not joined in the room"); } - leave_room(self.services, &user_id, &room_id, None).await?; + leave_room(self.services, &user_id, &room_id, None) + .boxed() + .await?; self.write_str(&format!("{user_id} has left {room_id}.",)) .await diff --git a/src/api/client/account.rs b/src/api/client/account.rs index 32f2530c..05dfa8b7 100644 --- a/src/api/client/account.rs +++ b/src/api/client/account.rs @@ -763,7 +763,9 @@ pub(crate) async fn deactivate_route( super::update_displayname(&services, sender_user, None, &all_joined_rooms).await; super::update_avatar_url(&services, sender_user, None, None, &all_joined_rooms).await; - full_user_deactivate(&services, sender_user, &all_joined_rooms).await?; + full_user_deactivate(&services, sender_user, &all_joined_rooms) + .boxed() + .await?; info!("User {sender_user} deactivated their account."); @@ -915,7 +917,9 @@ pub async fn full_user_deactivate( } } - super::leave_all_rooms(services, user_id).await; + super::leave_all_rooms(services, user_id) + .boxed() + .await; Ok(()) } diff --git a/src/api/client/membership.rs b/src/api/client/membership.rs index e587d806..584f955d 100644 --- a/src/api/client/membership.rs +++ b/src/api/client/membership.rs @@ -114,7 +114,9 @@ async fn banned_room_check( .collect() .await; - full_user_deactivate(services, user_id, &all_joined_rooms).await?; + full_user_deactivate(services, user_id, &all_joined_rooms) + .boxed() + .await?; } return Err!(Request(Forbidden("This room is banned on this homeserver."))); @@ -153,7 +155,9 @@ async fn banned_room_check( .collect() .await; - full_user_deactivate(services, user_id, &all_joined_rooms).await?; + full_user_deactivate(services, user_id, &all_joined_rooms) + .boxed() + .await?; } return Err!(Request(Forbidden("This remote server is banned on this homeserver."))); @@ -259,6 +263,7 @@ pub(crate) async fn join_room_by_id_or_alias_route( room_id.server_name(), client, ) + .boxed() .await?; let mut servers = body.via.clone(); @@ -478,6 +483,7 @@ pub(crate) async fn leave_room_route( body: Ruma, ) -> Result { leave_room(&services, body.sender_user(), &body.room_id, body.reason.clone()) + .boxed() .await .map(|()| leave_room::v3::Response::new()) } @@ -1792,7 +1798,10 @@ pub async fn leave_all_rooms(services: &Services, user_id: &UserId) { for room_id in all_rooms { // ignore errors - if let Err(e) = leave_room(services, user_id, &room_id, None).await { + if let Err(e) = leave_room(services, user_id, &room_id, None) + .boxed() + .await + { warn!(%user_id, "Failed to leave {room_id} remotely: {e}"); } diff --git a/src/api/client/state.rs b/src/api/client/state.rs index 2ddc8f14..c915c10c 100644 --- a/src/api/client/state.rs +++ b/src/api/client/state.rs @@ -59,6 +59,7 @@ pub(crate) async fn send_state_event_for_empty_key_route( body: Ruma, ) -> Result> { send_state_event_for_key_route(State(services), body) + .boxed() .await .map(RumaResponse) } diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index 683f5400..6b064424 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -4,7 +4,6 @@ mod execute; mod grant; use std::{ - future::Future, pin::Pin, sync::{Arc, RwLock as StdRwLock, Weak}, }; @@ -14,7 +13,7 @@ use conduwuit::{ Error, PduEvent, Result, Server, debug, err, error, error::default_log, pdu::PduBuilder, }; pub use create::create_admin_room; -use futures::{FutureExt, TryFutureExt}; +use futures::{Future, FutureExt, TryFutureExt}; use loole::{Receiver, Sender}; use ruma::{ OwnedEventId, OwnedRoomId, RoomId, UserId, From 463e367d7bfcad1997d28e78fc4fdb8323dd1170 Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Thu, 22 May 2025 16:54:15 -0700 Subject: [PATCH 10/28] add futures::FutureExt to make cb15ac3c0169cc57be584698ec03fbbe1ee9ffce work --- src/admin/room/moderation.rs | 1 + src/admin/user/commands.rs | 1 + src/api/client/state.rs | 1 + 3 files changed, 3 insertions(+) diff --git a/src/admin/room/moderation.rs b/src/admin/room/moderation.rs index af6cf928..4c19da5c 100644 --- a/src/admin/room/moderation.rs +++ b/src/admin/room/moderation.rs @@ -6,6 +6,7 @@ use conduwuit::{ warn, }; use futures::StreamExt; +use futures::FutureExt; use ruma::{OwnedRoomId, OwnedRoomOrAliasId, RoomAliasId, RoomId, RoomOrAliasId}; use crate::{admin_command, admin_command_dispatch, get_room_info}; diff --git a/src/admin/user/commands.rs b/src/admin/user/commands.rs index 062534c3..acd21bb3 100644 --- a/src/admin/user/commands.rs +++ b/src/admin/user/commands.rs @@ -9,6 +9,7 @@ use conduwuit::{ }; use conduwuit_api::client::{leave_all_rooms, update_avatar_url, update_displayname}; use futures::StreamExt; +use futures::FutureExt; use ruma::{ OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedUserId, UserId, events::{ diff --git a/src/api/client/state.rs b/src/api/client/state.rs index c915c10c..96b7e7aa 100644 --- a/src/api/client/state.rs +++ b/src/api/client/state.rs @@ -6,6 +6,7 @@ use conduwuit::{ }; use conduwuit_service::Services; use futures::TryStreamExt; +use futures::FutureExt; use ruma::{ OwnedEventId, RoomId, UserId, api::client::state::{get_state_events, get_state_events_for_key, send_state_event}, From 911345fd2cda6d6c28e1957782a1269b1dc36326 Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Sat, 24 May 2025 07:27:57 -0700 Subject: [PATCH 11/28] upgrade some settings to enable 5g in continuwuity --- src/core/config/mod.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index d4a10345..94115340 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -2059,41 +2059,41 @@ fn default_database_backups_to_keep() -> i16 { 1 } fn default_db_write_buffer_capacity_mb() -> f64 { 48.0 + parallelism_scaled_f64(4.0) } -fn default_db_cache_capacity_mb() -> f64 { 128.0 + parallelism_scaled_f64(64.0) } +fn default_db_cache_capacity_mb() -> f64 { 256.0 + parallelism_scaled_f64(256.0) } -fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(10_000).saturating_add(100_000) } +fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(25_000).saturating_add(200_000) } fn default_cache_capacity_modifier() -> f64 { 1.0 } fn default_auth_chain_cache_capacity() -> u32 { - parallelism_scaled_u32(10_000).saturating_add(100_000) + parallelism_scaled_u32(25_000).saturating_add(200_000) } fn default_shorteventid_cache_capacity() -> u32 { - parallelism_scaled_u32(50_000).saturating_add(100_000) + parallelism_scaled_u32(50_000).saturating_add(200_000) } fn default_eventidshort_cache_capacity() -> u32 { - parallelism_scaled_u32(25_000).saturating_add(100_000) + parallelism_scaled_u32(25_000).saturating_add(200_000) } fn default_eventid_pdu_cache_capacity() -> u32 { - parallelism_scaled_u32(25_000).saturating_add(100_000) + parallelism_scaled_u32(25_000).saturating_add(200_000) } fn default_shortstatekey_cache_capacity() -> u32 { - parallelism_scaled_u32(10_000).saturating_add(100_000) + parallelism_scaled_u32(25_000).saturating_add(200_000) } fn default_statekeyshort_cache_capacity() -> u32 { - parallelism_scaled_u32(10_000).saturating_add(100_000) + parallelism_scaled_u32(25_000).saturating_add(200_000) } fn default_servernameevent_data_cache_capacity() -> u32 { - parallelism_scaled_u32(100_000).saturating_add(500_000) + parallelism_scaled_u32(200_000).saturating_add(500_000) } -fn default_stateinfo_cache_capacity() -> u32 { parallelism_scaled_u32(100) } +fn default_stateinfo_cache_capacity() -> u32 { parallelism_scaled_u32(2000) } fn default_roomid_spacehierarchy_cache_capacity() -> u32 { parallelism_scaled_u32(1000) } From e9d103e3de2858fefb6889f1255bb783ca5f6d3f Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Mon, 26 May 2025 01:22:19 +0100 Subject: [PATCH 12/28] Always calculate state diff IDs in syncv3 seemingly fixes #779 --- src/api/client/sync/v3.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index da82a61c..7eb8c7e1 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -1009,8 +1009,6 @@ async fn calculate_state_incremental<'a>( ) -> Result { let since_shortstatehash = since_shortstatehash.unwrap_or(current_shortstatehash); - let state_changed = since_shortstatehash != current_shortstatehash; - let encrypted_room = services .rooms .state_accessor @@ -1042,7 +1040,7 @@ async fn calculate_state_incremental<'a>( }) .into(); - let state_diff_ids: OptionFuture<_> = (!full_state && state_changed) + let state_diff_ids: OptionFuture<_> = (!full_state) .then(|| { StreamExt::into_future( services From f7bdfdb5e5bf749691802e3a3b73dd36c9c4b83a Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Tue, 27 May 2025 18:56:37 -0700 Subject: [PATCH 13/28] enable converged 6g at the edge in continuwuity --- src/core/config/mod.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index 94115340..9ebfe74c 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -2059,45 +2059,45 @@ fn default_database_backups_to_keep() -> i16 { 1 } fn default_db_write_buffer_capacity_mb() -> f64 { 48.0 + parallelism_scaled_f64(4.0) } -fn default_db_cache_capacity_mb() -> f64 { 256.0 + parallelism_scaled_f64(256.0) } +fn default_db_cache_capacity_mb() -> f64 { 512.0 + parallelism_scaled_f64(512.0) } -fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(25_000).saturating_add(200_000) } +fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(50_000).saturating_add(500_000) } fn default_cache_capacity_modifier() -> f64 { 1.0 } fn default_auth_chain_cache_capacity() -> u32 { - parallelism_scaled_u32(25_000).saturating_add(200_000) + parallelism_scaled_u32(50_000).saturating_add(500_000) } fn default_shorteventid_cache_capacity() -> u32 { - parallelism_scaled_u32(50_000).saturating_add(200_000) + parallelism_scaled_u32(100_000).saturating_add(500_000) } fn default_eventidshort_cache_capacity() -> u32 { - parallelism_scaled_u32(25_000).saturating_add(200_000) + parallelism_scaled_u32(100_000).saturating_add(500_000) } fn default_eventid_pdu_cache_capacity() -> u32 { - parallelism_scaled_u32(25_000).saturating_add(200_000) + parallelism_scaled_u32(50_000).saturating_add(500_000) } fn default_shortstatekey_cache_capacity() -> u32 { - parallelism_scaled_u32(25_000).saturating_add(200_000) + parallelism_scaled_u32(50_000).saturating_add(500_000) } fn default_statekeyshort_cache_capacity() -> u32 { - parallelism_scaled_u32(25_000).saturating_add(200_000) + parallelism_scaled_u32(50_000).saturating_add(500_000) } fn default_servernameevent_data_cache_capacity() -> u32 { parallelism_scaled_u32(200_000).saturating_add(500_000) } -fn default_stateinfo_cache_capacity() -> u32 { parallelism_scaled_u32(2000) } +fn default_stateinfo_cache_capacity() -> u32 { parallelism_scaled_u32(5000) } fn default_roomid_spacehierarchy_cache_capacity() -> u32 { parallelism_scaled_u32(1000) } -fn default_dns_cache_entries() -> u32 { 32768 } +fn default_dns_cache_entries() -> u32 { 327680 } fn default_dns_min_ttl() -> u64 { 60 * 180 } From b44211c03ed9cb485bd04977e883cc272ee24985 Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Tue, 10 Jun 2025 22:33:31 +0100 Subject: [PATCH 14/28] Kick up a fuss when m.room.create is unfindable --- src/core/matrix/state_res/event_auth.rs | 4 ++-- src/core/matrix/state_res/mod.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/core/matrix/state_res/event_auth.rs b/src/core/matrix/state_res/event_auth.rs index 11cdff40..7f35a7d7 100644 --- a/src/core/matrix/state_res/event_auth.rs +++ b/src/core/matrix/state_res/event_auth.rs @@ -30,7 +30,7 @@ use super::{ }, room_version::RoomVersion, }; -use crate::{debug, error, trace, warn}; +use crate::{debug, err_log, error, trace, warn}; // FIXME: field extracting could be bundled for `content` #[derive(Deserialize)] @@ -251,7 +251,7 @@ where let room_create_event = match room_create_event { | None => { - warn!("no m.room.create event in auth chain"); + error!("no m.room.create event in auth chain for {}!", incoming_event.event_id()); return Ok(false); }, | Some(e) => e, diff --git a/src/core/matrix/state_res/mod.rs b/src/core/matrix/state_res/mod.rs index 651f6130..38c7c259 100644 --- a/src/core/matrix/state_res/mod.rs +++ b/src/core/matrix/state_res/mod.rs @@ -746,7 +746,7 @@ where } } } - // Did not find a power level event so we default to zero + warn!("could not find a power event in the mainline map, defaulting to zero depth"); Ok(0) } From 3ebac172918d61607003bc9806352d0c727427a4 Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Tue, 10 Jun 2025 23:00:09 +0100 Subject: [PATCH 15/28] Fix room ID check --- src/core/matrix/state_res/event_auth.rs | 11 +++++++---- src/service/rooms/event_handler/handle_outlier_pdu.rs | 5 +---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/core/matrix/state_res/event_auth.rs b/src/core/matrix/state_res/event_auth.rs index 7f35a7d7..96bacfde 100644 --- a/src/core/matrix/state_res/event_auth.rs +++ b/src/core/matrix/state_res/event_auth.rs @@ -30,7 +30,7 @@ use super::{ }, room_version::RoomVersion, }; -use crate::{debug, err_log, error, trace, warn}; +use crate::{debug, error, trace, warn}; // FIXME: field extracting could be bundled for `content` #[derive(Deserialize)] @@ -251,7 +251,7 @@ where let room_create_event = match room_create_event { | None => { - error!("no m.room.create event in auth chain for {}!", incoming_event.event_id()); + error!("no m.room.create event found for {}!", incoming_event.event_id()); return Ok(false); }, | Some(e) => e, @@ -262,8 +262,11 @@ where return Ok(false); }; - if room_id_server_name != sender.server_name() { - warn!("servername of room ID does not match servername of m.room.create sender"); + if room_id_server_name != room_create_event.sender().server_name() { + warn!( + "servername of room ID origin ({}) does not match servername of m.room.create sender ({})", + room_id_server_name, + room_create_event.sender().server_name()); return Ok(false); } diff --git a/src/service/rooms/event_handler/handle_outlier_pdu.rs b/src/service/rooms/event_handler/handle_outlier_pdu.rs index 5339249d..f9889d58 100644 --- a/src/service/rooms/event_handler/handle_outlier_pdu.rs +++ b/src/service/rooms/event_handler/handle_outlier_pdu.rs @@ -119,10 +119,7 @@ pub(super) async fn handle_outlier_pdu<'a>( } // The original create event must be in the auth events - if !matches!( - auth_events.get(&(StateEventType::RoomCreate, String::new().into())), - Some(_) | None - ) { + if !auth_events.contains_key(&(StateEventType::RoomCreate, String::new().into())) { return Err!(Request(InvalidParam("Incoming event refers to wrong create event."))); } From 5ea42418f7b97f8ca7180937114921e30964e2cc Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Wed, 11 Jun 2025 00:45:57 +0100 Subject: [PATCH 16/28] Unsafe, untested, and potentially overeager PDU sanity checks --- src/core/matrix/state_res/event_auth.rs | 8 +++++--- src/service/rooms/timeline/mod.rs | 14 ++++++++++++++ 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/src/core/matrix/state_res/event_auth.rs b/src/core/matrix/state_res/event_auth.rs index 96bacfde..777edbb9 100644 --- a/src/core/matrix/state_res/event_auth.rs +++ b/src/core/matrix/state_res/event_auth.rs @@ -219,7 +219,7 @@ where /* // TODO: In the past this code was commented as it caused problems with Synapse. This is no // longer the case. This needs to be implemented. - // See also: https://github.com/ruma/ruma/pull/2064 + // See also: https://github.com/ruma/ruma/pull/2064 // // 2. Reject if auth_events // a. auth_events cannot have duplicate keys since it's a BTree @@ -264,9 +264,11 @@ where if room_id_server_name != room_create_event.sender().server_name() { warn!( - "servername of room ID origin ({}) does not match servername of m.room.create sender ({})", + "servername of room ID origin ({}) does not match servername of m.room.create \ + sender ({})", room_id_server_name, - room_create_event.sender().server_name()); + room_create_event.sender().server_name() + ); return Ok(false); } diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index 4b2f3cb2..43643c5a 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -698,6 +698,20 @@ impl Service { .await .saturating_add(uint!(1)); + if state_key.is_none() { + if prev_events.is_empty() { + warn!("Timeline event had zero prev_events, something broke."); + return Err!(Request(Unknown("Timeline event had zero prev_events."))); + } + if depth.le(&uint!(2)) { + warn!( + "Had unsafe depth of {depth} in {room_id} when creating non-state event. \ + Bad!" + ); + return Err!(Request(Unknown("Unsafe depth for non-state event."))); + } + }; + let mut unsigned = unsigned.unwrap_or_default(); if let Some(state_key) = &state_key { From cfff12190e592ffa2d2dfde65d3acfec67c99c97 Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Wed, 11 Jun 2025 01:27:25 +0100 Subject: [PATCH 17/28] more logs --- src/core/matrix/state_res/event_auth.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/core/matrix/state_res/event_auth.rs b/src/core/matrix/state_res/event_auth.rs index 777edbb9..1eed098b 100644 --- a/src/core/matrix/state_res/event_auth.rs +++ b/src/core/matrix/state_res/event_auth.rs @@ -13,6 +13,7 @@ use ruma::{ power_levels::RoomPowerLevelsEventContent, third_party_invite::RoomThirdPartyInviteEventContent, }, + EventId, int, serde::{Base64, Raw}, }; @@ -21,7 +22,6 @@ use serde::{ de::{Error as _, IgnoredAny}, }; use serde_json::{from_str as from_json_str, value::RawValue as RawJsonValue}; - use super::{ Error, Event, Result, StateEventType, StateKey, TimelineEventType, power_levels::{ @@ -251,7 +251,14 @@ where let room_create_event = match room_create_event { | None => { - error!("no m.room.create event found for {}!", incoming_event.event_id()); + error!( + create_event = room_create_event.as_ref().map(Event::event_id).unwrap_or(<&EventId>::try_from("$unknown").unwrap()).as_str(), + power_levels = power_levels_event.as_ref().map(Event::event_id).unwrap_or(<&EventId>::try_from("$unknown").unwrap()).as_str(), + member_event = sender_member_event.as_ref().map(Event::event_id).unwrap_or(<&EventId>::try_from("$unknown").unwrap()).as_str(), + "no m.room.create event found for {} ({})!", + incoming_event.event_id().as_str(), + incoming_event.room_id().as_str() + ); return Ok(false); }, | Some(e) => e, From e6aae8a99497e9cf253b07e82196a71889396dbc Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Wed, 11 Jun 2025 01:42:19 +0100 Subject: [PATCH 18/28] log which room struggled to get mainline depth --- src/core/matrix/state_res/mod.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/core/matrix/state_res/mod.rs b/src/core/matrix/state_res/mod.rs index 38c7c259..a2052b92 100644 --- a/src/core/matrix/state_res/mod.rs +++ b/src/core/matrix/state_res/mod.rs @@ -726,8 +726,12 @@ where Fut: Future> + Send, E: Event + Send + Sync, { + let mut room_id = None; while let Some(sort_ev) = event { debug!(event_id = sort_ev.event_id().as_str(), "mainline"); + if room_id.is_none() { + room_id = Some(sort_ev.room_id().to_owned()); + } let id = sort_ev.event_id(); if let Some(depth) = mainline_map.get(id) { @@ -746,7 +750,7 @@ where } } } - warn!("could not find a power event in the mainline map, defaulting to zero depth"); + warn!("could not find a power event in the mainline map for {room_id:?}, defaulting to zero depth"); Ok(0) } From afbb1b52ede0d0227991afcf4afd59a7c614fbd8 Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Tue, 10 Jun 2025 18:49:22 -0700 Subject: [PATCH 19/28] better stateinfo_cache_capacity default --- src/core/config/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index 9ebfe74c..00623f44 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -2093,7 +2093,8 @@ fn default_servernameevent_data_cache_capacity() -> u32 { parallelism_scaled_u32(200_000).saturating_add(500_000) } -fn default_stateinfo_cache_capacity() -> u32 { parallelism_scaled_u32(5000) } +fn default_stateinfo_cache_capacity() -> u32 { + parallelism_scaled_u32(500).clamp(100, 12000) } fn default_roomid_spacehierarchy_cache_capacity() -> u32 { parallelism_scaled_u32(1000) } From 2a66f8104066fa2f79bccd4734f86858666578a1 Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Tue, 10 Jun 2025 18:57:22 -0700 Subject: [PATCH 20/28] better roomid_spacehierarchy_cache_capacity --- src/core/config/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index 00623f44..a58bcfe5 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -2096,7 +2096,8 @@ fn default_servernameevent_data_cache_capacity() -> u32 { fn default_stateinfo_cache_capacity() -> u32 { parallelism_scaled_u32(500).clamp(100, 12000) } -fn default_roomid_spacehierarchy_cache_capacity() -> u32 { parallelism_scaled_u32(1000) } +fn default_roomid_spacehierarchy_cache_capacity() -> u32 { + parallelism_scaled_u32(500).clamp(100, 12000) } fn default_dns_cache_entries() -> u32 { 327680 } From 6e438c8448d12fc6b2aec2bc080398d914cc22b6 Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Wed, 11 Jun 2025 19:53:46 +0100 Subject: [PATCH 21/28] When in doubt, log all the things --- src/api/client/membership.rs | 1 + src/core/matrix/state_res/mod.rs | 2 +- .../rooms/event_handler/handle_outlier_pdu.rs | 7 +++--- .../event_handler/upgrade_outlier_pdu.rs | 22 ++++++++----------- src/service/rooms/timeline/mod.rs | 1 + 5 files changed, 16 insertions(+), 17 deletions(-) diff --git a/src/api/client/membership.rs b/src/api/client/membership.rs index 2847d668..d9306404 100644 --- a/src/api/client/membership.rs +++ b/src/api/client/membership.rs @@ -1243,6 +1243,7 @@ async fn join_room_by_id_helper_remote( services.rooms.timeline.get_pdu(event_id).await.ok() }; + debug!("running stateres check on send_join parsed PDU"); let auth_check = state_res::event_auth::auth_check( &state_res::RoomVersion::new(&room_version_id)?, &parsed_join_pdu, diff --git a/src/core/matrix/state_res/mod.rs b/src/core/matrix/state_res/mod.rs index a2052b92..516115d9 100644 --- a/src/core/matrix/state_res/mod.rs +++ b/src/core/matrix/state_res/mod.rs @@ -609,7 +609,7 @@ where let fetch_state = |ty: &StateEventType, key: &str| { future::ready(auth_state.get(&ty.with_state_key(key))) }; - + debug!("running auth check on {:?}", event.event_id()); let auth_result = auth_check(room_version, &event, current_third_party.as_ref(), fetch_state).await; diff --git a/src/service/rooms/event_handler/handle_outlier_pdu.rs b/src/service/rooms/event_handler/handle_outlier_pdu.rs index f9889d58..87b76222 100644 --- a/src/service/rooms/event_handler/handle_outlier_pdu.rs +++ b/src/service/rooms/event_handler/handle_outlier_pdu.rs @@ -76,7 +76,7 @@ pub(super) async fn handle_outlier_pdu<'a>( // 5. Reject "due to auth events" if can't get all the auth events or some of // the auth events are also rejected "due to auth events" // NOTE: Step 5 is not applied anymore because it failed too often - debug!("Fetching auth events"); + debug!("Fetching auth events for {}", incoming_pdu.event_id); Box::pin(self.fetch_and_handle_outliers( origin, &incoming_pdu.auth_events, @@ -88,12 +88,12 @@ pub(super) async fn handle_outlier_pdu<'a>( // 6. Reject "due to auth events" if the event doesn't pass auth based on the // auth events - debug!("Checking based on auth events"); + debug!("Checking {} based on auth events", incoming_pdu.event_id); // Build map of auth events let mut auth_events = HashMap::with_capacity(incoming_pdu.auth_events.len()); for id in &incoming_pdu.auth_events { let Ok(auth_event) = self.services.timeline.get_pdu(id).await else { - warn!("Could not find auth event {id}"); + warn!("Could not find auth event {id} for {}", incoming_pdu.event_id); continue; }; @@ -128,6 +128,7 @@ pub(super) async fn handle_outlier_pdu<'a>( ready(auth_events.get(&key)) }; + debug!("running auth check to handle outlier pdu {:?}", incoming_pdu.event_id); let auth_check = state_res::event_auth::auth_check( &to_room_version(&room_version_id), &incoming_pdu, diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index 97d3df97..0dca2d70 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -1,12 +1,6 @@ use std::{borrow::Borrow, collections::BTreeMap, iter::once, sync::Arc, time::Instant}; -use conduwuit::{ - Err, Result, debug, debug_info, err, implement, - matrix::{EventTypeExt, PduEvent, StateKey, state_res}, - trace, - utils::stream::{BroadbandExt, ReadyExt}, - warn, -}; +use conduwuit::{Err, Result, debug, debug_info, err, implement, matrix::{EventTypeExt, PduEvent, StateKey, state_res}, trace, utils::stream::{BroadbandExt, ReadyExt}, warn, info}; use futures::{FutureExt, StreamExt, future::ready}; use ruma::{CanonicalJsonValue, RoomId, ServerName, events::StateEventType}; @@ -44,7 +38,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( return Err!(Request(InvalidParam("Event has been soft failed"))); } - debug!("Upgrading to timeline pdu"); + debug!("Upgrading pdu {} from outlier to timeline pdu", incoming_pdu.event_id); let timer = Instant::now(); let room_version_id = get_room_version_id(create_event)?; @@ -52,7 +46,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( // backwards extremities doing all the checks in this list starting at 1. // These are not timeline events. - debug!("Resolving state at event"); + debug!("Resolving state at event {}", incoming_pdu.event_id); let mut state_at_incoming_event = if incoming_pdu.prev_events.len() == 1 { self.state_at_incoming_degree_one(&incoming_pdu).await? } else { @@ -70,7 +64,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( state_at_incoming_event.expect("we always set this to some above"); let room_version = to_room_version(&room_version_id); - debug!("Performing auth check"); + debug!("Performing auth check to upgrade {}", incoming_pdu.event_id); // 11. Check the auth of the event passes based on the state of the event let state_fetch_state = &state_at_incoming_event; let state_fetch = |k: StateEventType, s: StateKey| async move { @@ -80,6 +74,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( self.services.timeline.get_pdu(event_id).await.ok() }; + debug!("running auth check on {}", incoming_pdu.event_id); let auth_check = state_res::event_auth::auth_check( &room_version, &incoming_pdu, @@ -93,7 +88,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( return Err!(Request(Forbidden("Event has failed auth check with state at the event."))); } - debug!("Gathering auth events"); + debug!("Gathering auth events for {}", incoming_pdu.event_id); let auth_events = self .services .state @@ -111,6 +106,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( ready(auth_events.get(&key).cloned()) }; + debug!("running auth check on {} with claimed state auth", incoming_pdu.event_id); let auth_check = state_res::event_auth::auth_check( &room_version, &incoming_pdu, @@ -121,7 +117,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( .map_err(|e| err!(Request(Forbidden("Auth check failed: {e:?}"))))?; // Soft fail check before doing state res - debug!("Performing soft-fail check"); + debug!("Performing soft-fail check on {}", incoming_pdu.event_id); let soft_fail = match (auth_check, incoming_pdu.redacts_id(&room_version_id)) { | (false, _) => true, | (true, None) => false, @@ -218,7 +214,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( // 14. Check if the event passes auth based on the "current state" of the room, // if not soft fail it if soft_fail { - debug!("Soft failing event"); + info!("Soft failing event {}", incoming_pdu.event_id); let extremities = extremities.iter().map(Borrow::borrow); self.services diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index 43643c5a..d3abd722 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -771,6 +771,7 @@ impl Service { ready(auth_events.get(&key)) }; + debug!("running auth check on new {} event by {} in {}", pdu.kind, pdu.sender, pdu.room_id); let auth_check = state_res::auth_check( &room_version, &pdu, From 51185daca26e0e9202496664a54090a01db2bc48 Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Thu, 12 Jun 2025 01:05:45 +0100 Subject: [PATCH 22/28] modify more log strings so they're more useful than not --- src/core/matrix/state_res/mod.rs | 2 +- src/service/rooms/timeline/mod.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/core/matrix/state_res/mod.rs b/src/core/matrix/state_res/mod.rs index 516115d9..ea49b29e 100644 --- a/src/core/matrix/state_res/mod.rs +++ b/src/core/matrix/state_res/mod.rs @@ -728,7 +728,7 @@ where { let mut room_id = None; while let Some(sort_ev) = event { - debug!(event_id = sort_ev.event_id().as_str(), "mainline"); + trace!(event_id = sort_ev.event_id().as_str(), "mainline"); if room_id.is_none() { room_id = Some(sort_ev.room_id().to_owned()); } diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index d3abd722..6479d3b4 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -1157,7 +1157,7 @@ impl Service { .boxed(); while let Some(ref backfill_server) = servers.next().await { - info!("Asking {backfill_server} for backfill"); + info!("Asking {backfill_server} for backfill in {:?}", room_id.to_owned()); let response = self .services .sending @@ -1185,7 +1185,7 @@ impl Service { } } - info!("No servers could backfill, but backfill was needed in room {room_id}"); + warn!("No servers could backfill, but backfill was needed in room {room_id}"); Ok(()) } From 3fe99fbfe71e04055a5e5cfa6b235359f68de719 Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Fri, 13 Jun 2025 20:45:35 -0700 Subject: [PATCH 23/28] make sender workers default better and clamp value to core count --- src/core/config/mod.rs | 4 +++- src/service/sending/mod.rs | 12 +++--------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index a58bcfe5..bfe0c450 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -1828,7 +1828,7 @@ pub struct Config { /// setting a non-zero value. /// /// default: 0 - #[serde(default)] + #[serde(default = "default_sender_workers")] pub sender_workers: usize, /// Enables listener sockets; can be set to false to disable listening. This @@ -2315,6 +2315,8 @@ fn default_stream_width_scale() -> f32 { 1.0 } fn default_stream_amplification() -> usize { 1024 } +fn default_sender_workers() -> usize { 4 } + fn default_client_receive_timeout() -> u64 { 75 } fn default_client_request_timeout() -> u64 { 180 } diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index 08ca7010..ce687551 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -401,16 +401,10 @@ impl Service { fn num_senders(args: &crate::Args<'_>) -> usize { const MIN_SENDERS: usize = 1; - // Limit the number of senders to the number of workers threads or number of - // cores, conservatively. - let max_senders = args - .server - .metrics - .num_workers() - .min(available_parallelism()); + // Limit the maximum number of senders to the number of cores. + let max_senders = available_parallelism(); - // If the user doesn't override the default 0, this is intended to then default - // to 1 for now as multiple senders is experimental. + // default is 4 senders. clamp between 1 and core count. args.server .config .sender_workers From b66154b05d925ddbc170a6d288f731461e36404a Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Fri, 13 Jun 2025 21:36:05 -0700 Subject: [PATCH 24/28] update sender workers documentation --- src/core/config/mod.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index bfe0c450..afb4167d 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -1823,11 +1823,9 @@ pub struct Config { pub stream_amplification: usize, /// Number of sender task workers; determines sender parallelism. Default is - /// '0' which means the value is determined internally, likely matching the - /// number of tokio worker-threads or number of cores, etc. Override by - /// setting a non-zero value. + /// '4'. Override by setting a different value. Values clamped 1 to core count. /// - /// default: 0 + /// default: 4 #[serde(default = "default_sender_workers")] pub sender_workers: usize, From c013d06e7bc63e1833c9abaf1259520212433ac9 Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Fri, 13 Jun 2025 22:25:18 -0700 Subject: [PATCH 25/28] add more parallelism_scaled and make them public --- src/core/config/mod.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index afb4167d..4ff87a74 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -2286,14 +2286,13 @@ fn default_admin_log_capture() -> String { fn default_admin_room_tag() -> String { "m.server_notice".to_owned() } #[allow(clippy::as_conversions, clippy::cast_precision_loss)] -fn parallelism_scaled_f64(val: f64) -> f64 { val * (sys::available_parallelism() as f64) } +pub fn parallelism_scaled_f64(val: f64) -> f64 { val * (sys::available_parallelism() as f64) } -fn parallelism_scaled_u32(val: u32) -> u32 { - let val = val.try_into().expect("failed to cast u32 to usize"); - parallelism_scaled(val).try_into().unwrap_or(u32::MAX) -} +pub fn parallelism_scaled_u32(val: u32) -> u32 { val.saturating_mul(sys::available_parallelism() as u32) } -fn parallelism_scaled(val: usize) -> usize { val.saturating_mul(sys::available_parallelism()) } +pub fn parallelism_scaled_i32(val: i32) -> i32 { val.saturating_mul(sys::available_parallelism() as i32) } + +pub fn parallelism_scaled(val: usize) -> usize { val.saturating_mul(sys::available_parallelism()) } fn default_trusted_server_batch_size() -> usize { 256 } From 88d68d148f081466c03562b05d59abb45827bc1c Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Fri, 13 Jun 2025 22:26:07 -0700 Subject: [PATCH 26/28] change rocksdb stats level to 3 --- src/core/config/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index 4ff87a74..5afb3f1f 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -1155,7 +1155,7 @@ pub struct Config { /// 3 to 5 = Statistics with possible performance impact. /// 6 = All statistics. /// - /// default: 1 + /// default: 3 #[serde(default = "default_rocksdb_stats_level")] pub rocksdb_stats_level: u8, @@ -2231,7 +2231,7 @@ fn default_rocksdb_compression_level() -> i32 { 32767 } #[allow(clippy::doc_markdown)] fn default_rocksdb_bottommost_compression_level() -> i32 { 32767 } -fn default_rocksdb_stats_level() -> u8 { 1 } +fn default_rocksdb_stats_level() -> u8 { 3 } // I know, it's a great name #[must_use] From 8b5cdb147137b88b57a593824950eb8c046c4ba0 Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Fri, 13 Jun 2025 22:33:58 -0700 Subject: [PATCH 27/28] scale rocksdb background jobs and subcompactions --- src/database/engine/db_opts.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/database/engine/db_opts.rs b/src/database/engine/db_opts.rs index 18cec742..4a717b4d 100644 --- a/src/database/engine/db_opts.rs +++ b/src/database/engine/db_opts.rs @@ -2,7 +2,7 @@ use std::{cmp, convert::TryFrom}; use conduwuit::{Config, Result, utils}; use rocksdb::{Cache, DBRecoveryMode, Env, LogLevel, Options, statistics::StatsLevel}; - +use conduwuit::config::{parallelism_scaled_i32, parallelism_scaled_u32}; use super::{cf_opts::cache_size_f64, logger::handle as handle_log}; /// Create database-wide options suitable for opening the database. This also @@ -23,8 +23,8 @@ pub(crate) fn db_options(config: &Config, env: &Env, row_cache: &Cache) -> Resul set_logging_defaults(&mut opts, config); // Processing - opts.set_max_background_jobs(num_threads::(config)?); - opts.set_max_subcompactions(num_threads::(config)?); + opts.set_max_background_jobs(parallelism_scaled_i32(1)); + opts.set_max_subcompactions(parallelism_scaled_u32(1)); opts.set_avoid_unnecessary_blocking_io(true); opts.set_max_file_opening_threads(0); @@ -128,7 +128,7 @@ fn set_logging_defaults(opts: &mut Options, config: &Config) { } fn num_threads>(config: &Config) -> Result { - const MIN_PARALLELISM: usize = 2; + const MIN_PARALLELISM: usize = 4; let requested = if config.rocksdb_parallelism_threads != 0 { config.rocksdb_parallelism_threads From f5655ea0d5646205c0b2961c2b367a7bba9dc7f0 Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Fri, 13 Jun 2025 22:35:15 -0700 Subject: [PATCH 28/28] change rocksdb default error level to info from error --- src/core/config/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index 5afb3f1f..14e8fa29 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -2199,7 +2199,7 @@ fn default_typing_client_timeout_max_s() -> u64 { 45 } fn default_rocksdb_recovery_mode() -> u8 { 1 } -fn default_rocksdb_log_level() -> String { "error".to_owned() } +fn default_rocksdb_log_level() -> String { "info".to_owned() } fn default_rocksdb_log_time_to_roll() -> usize { 0 }