From 04130dcdd8d8ede10d900b76f26edb2a2f89fc3a Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Fri, 15 Aug 2025 04:10:40 +0100 Subject: [PATCH 01/37] fix(fed): Improve transaction flushing --- src/service/federation/execute.rs | 6 +++--- src/service/sending/sender.rs | 30 ++++++++++++++++++++++++++---- 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/src/service/federation/execute.rs b/src/service/federation/execute.rs index 1d1d1154..8be1befa 100644 --- a/src/service/federation/execute.rs +++ b/src/service/federation/execute.rs @@ -3,7 +3,7 @@ use std::{fmt::Debug, mem}; use bytes::Bytes; use conduwuit::{ Err, Error, Result, debug, debug::INFO_SPAN_LEVEL, debug_error, debug_warn, err, - error::inspect_debug_log, implement, trace, utils::string::EMPTY, + error::inspect_debug_log, implement, trace, utils::string::EMPTY, warn, }; use http::{HeaderValue, header::AUTHORIZATION}; use ipaddress::IPAddress; @@ -193,7 +193,7 @@ fn handle_error( ) -> Result { if e.is_timeout() || e.is_connect() { e = e.without_url(); - debug_warn!("{e:?}"); + debug_warn!(?url, "network error while sending request: {e:?}"); } else if e.is_redirect() { debug_error!( method = ?method, @@ -204,7 +204,7 @@ fn handle_error( e, ); } else { - debug_error!("{e:?}"); + warn!(?url, "failed to send federation request: {e:?}"); } Err(e.into()) diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index a708f746..7af4b533 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -10,7 +10,7 @@ use std::{ use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD}; use conduwuit_core::{ - Error, Event, Result, debug, err, error, + Error, Event, Result, debug, err, error, info, result::LogErr, trace, utils::{ @@ -142,7 +142,7 @@ impl Service { } fn handle_response_err(dest: Destination, statuses: &mut CurTransactionStatus, e: &Error) { - debug!(dest = ?dest, "{e:?}"); + debug!(dest = ?dest, "error response: {e:?}"); statuses.entry(dest).and_modify(|e| { *e = match e { | TransactionStatus::Running => TransactionStatus::Failed(1, Instant::now()), @@ -177,7 +177,21 @@ impl Service { if !new_events.is_empty() { self.db.mark_as_active(new_events.iter()); - let new_events_vec = new_events.into_iter().map(|(_, event)| event).collect(); + let new_events_vec: Vec = + new_events.into_iter().map(|(_, event)| event).collect(); + + if let Some(status) = statuses.get(&dest.clone()) { + if matches!(status, TransactionStatus::Running) { + // If the server is in backoff, clear it + warn!( + ?dest, + "Catching up destination with {} new events", + new_events_vec.len() + ); + statuses.insert(dest.clone(), TransactionStatus::Running); + } + } + futures.push(self.send_events(dest.clone(), new_events_vec)); } else { statuses.remove(dest); @@ -859,12 +873,20 @@ impl Service { pdus, edus, }; + let pdu_count = request.pdus.len(); + let edu_count = request.edus.len(); let result = self .services .federation .execute_on(&self.services.client.sender, &server, request) - .await; + .await + .inspect(|_| { + info!(%txn_id, %server, "Sent {} PDUs, {} EDUs", pdu_count, edu_count); + }) + .inspect_err(|e| { + error!(%txn_id, %server, "Failed to send transaction ({} PDUs, {} EDUs): {e:?}", pdu_count, edu_count); + }); for (event_id, result) in result.iter().flat_map(|resp| resp.pdus.iter()) { if let Err(e) = result { From 255aa44ecccbe46ddc178029a0a01be26f1547c9 Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Fri, 15 Aug 2025 04:20:03 +0100 Subject: [PATCH 02/37] fix(fed): Alter log levels to be less noisy --- src/service/sending/sender.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index 7af4b533..5498afc0 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -183,9 +183,9 @@ impl Service { if let Some(status) = statuses.get(&dest.clone()) { if matches!(status, TransactionStatus::Running) { // If the server is in backoff, clear it - warn!( + info!( ?dest, - "Catching up destination with {} new events", + "Catching up previously failed destination with {}+ new events", new_events_vec.len() ); statuses.insert(dest.clone(), TransactionStatus::Running); @@ -885,7 +885,7 @@ impl Service { info!(%txn_id, %server, "Sent {} PDUs, {} EDUs", pdu_count, edu_count); }) .inspect_err(|e| { - error!(%txn_id, %server, "Failed to send transaction ({} PDUs, {} EDUs): {e:?}", pdu_count, edu_count); + info!(%txn_id, %server, "Failed to send transaction ({} PDUs, {} EDUs): {e:?}", pdu_count, edu_count); }); for (event_id, result) in result.iter().flat_map(|resp| resp.pdus.iter()) { From 4085a90c1f5ddda14411a22457bee6f003a296a1 Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Wed, 20 Aug 2025 00:56:55 +0100 Subject: [PATCH 03/37] feat(fed): Something about nicer fed errors --- src/service/federation/execute.rs | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/src/service/federation/execute.rs b/src/service/federation/execute.rs index 8be1befa..ace29c1c 100644 --- a/src/service/federation/execute.rs +++ b/src/service/federation/execute.rs @@ -1,4 +1,8 @@ -use std::{fmt::Debug, mem}; +use std::{ + error::Error as _, + fmt::{Debug, Write}, + mem, +}; use bytes::Bytes; use conduwuit::{ @@ -193,9 +197,9 @@ fn handle_error( ) -> Result { if e.is_timeout() || e.is_connect() { e = e.without_url(); - debug_warn!(?url, "network error while sending request: {e:?}"); + warn!(?url, "network error while sending federation request: {e:?}"); } else if e.is_redirect() { - debug_error!( + warn!( method = ?method, url = ?url, final_url = ?e.url(), @@ -207,6 +211,14 @@ fn handle_error( warn!(?url, "failed to send federation request: {e:?}"); } + let mut nice_error = "Request failed".to_owned(); + let mut src = e.source(); + while let Some(source) = src { + write!(nice_error, ": {source:?}").expect("writing to string should not fail"); + src = source.source(); + } + warn!(nice_error, "Federation request error"); + Err(e.into()) } From e42e309797521194e80d870e4b97386bd503142a Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Mon, 25 Aug 2025 21:26:28 +0100 Subject: [PATCH 04/37] feat(fed): Handle EDUs before PDUs Aranje needs his crypto keys --- src/api/server/send.rs | 46 +++++++++++++++++++++++++++++++----------- 1 file changed, 34 insertions(+), 12 deletions(-) diff --git a/src/api/server/send.rs b/src/api/server/send.rs index 9c5bfd2b..86832bc6 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -5,7 +5,7 @@ use axum_client_ip::InsecureClientIp; use conduwuit::{ Err, Error, Result, debug, debug::INFO_SPAN_LEVEL, - debug_warn, err, error, + debug_warn, err, error, info, result::LogErr, trace, utils::{ @@ -79,13 +79,11 @@ pub(crate) async fn send_transaction_message_route( } let txn_start_time = Instant::now(); - trace!( + info!( pdus = body.pdus.len(), edus = body.edus.len(), - elapsed = ?txn_start_time.elapsed(), id = ?body.transaction_id, - origin =?body.origin(), - "Starting txn", + "Processing transaction", ); let pdus = body @@ -104,14 +102,21 @@ pub(crate) async fn send_transaction_message_route( .filter_map(Result::ok) .stream(); - let results = handle(&services, &client, body.origin(), txn_start_time, pdus, edus).await?; - - debug!( + info!( + pdus = body.pdus.len(), + edus = body.edus.len(), + elapsed = ?txn_start_time.elapsed(), + id = ?body.transaction_id, + "Validated transaction", + ); + + let results = handle(&services, &client, body.origin(), txn_start_time, pdus, edus).await?; + + info!( pdus = body.pdus.len(), edus = body.edus.len(), elapsed = ?txn_start_time.elapsed(), id = ?body.transaction_id, - origin =?body.origin(), "Finished txn", ); for (id, result) in &results { @@ -154,7 +159,8 @@ async fn handle( .into_iter() .try_stream() .broad_and_then(|(room_id, pdus): (_, Vec<_>)| { - handle_room(services, client, origin, started, room_id, pdus.into_iter()) + let count = pdus.len(); + handle_room(services, client, origin, started, room_id, pdus.into_iter(), count) .map_ok(Vec::into_iter) .map_ok(IterStream::try_stream) }) @@ -178,6 +184,7 @@ async fn handle_room( txn_start_time: Instant, room_id: OwnedRoomId, pdus: impl Iterator + Send, + count: usize, ) -> Result> { let _room_lock = services .rooms @@ -187,10 +194,20 @@ async fn handle_room( .await; let room_id = &room_id; + let mut n = 0; pdus.try_stream() .and_then(|(_, event_id, value)| async move { services.server.check_running()?; let pdu_start_time = Instant::now(); + info!( + %room_id, + %event_id, + pdu = n + 1, + total = count, + pdu_elapsed = ?pdu_start_time.elapsed(), + txn_elapsed = ?txn_start_time.elapsed(), + "Handling PDU", + ); let result = services .rooms .event_handler @@ -198,11 +215,16 @@ async fn handle_room( .await .map(|_| ()); - debug!( + info!( + %room_id, + %event_id, + pdu = n + 1, + total = count, pdu_elapsed = ?pdu_start_time.elapsed(), txn_elapsed = ?txn_start_time.elapsed(), - "Finished PDU {event_id}", + "Finished handling PDU {event_id}", ); + n += 1; Ok((event_id, result)) }) From 7d9ec7a0e5dff80a53e0dd9eb0b83d443968bd2d Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Mon, 25 Aug 2025 21:26:56 +0100 Subject: [PATCH 05/37] feat(fed): Handle EDUs before PDUs Aranje needs his crypto keys --- src/api/server/send.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/api/server/send.rs b/src/api/server/send.rs index 86832bc6..e7e4e9fd 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -143,6 +143,10 @@ async fn handle( pdus: impl Stream + Send, edus: impl Stream + Send, ) -> Result { + edus.for_each_concurrent(automatic_width(), |edu| handle_edu(services, client, origin, edu)) + .boxed() + .await; + // group pdus by room let pdus = pdus .collect() @@ -169,11 +173,6 @@ async fn handle( .boxed() .await?; - // evaluate edus after pdus, at least for now. - edus.for_each_concurrent(automatic_width(), |edu| handle_edu(services, client, origin, edu)) - .boxed() - .await; - Ok(results) } From 45301d4e419302f5b559ef7dddf9dd2d6f146f7b Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Wed, 14 May 2025 06:53:00 -0700 Subject: [PATCH 06/37] bump the number of allowed immutable memtables by 1, to allow for greater flood protection this should probably not be applied if you have rocksdb_atomic_flush = false (the default) --- src/database/engine/cf_opts.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/database/engine/cf_opts.rs b/src/database/engine/cf_opts.rs index 58358f02..b8d0b307 100644 --- a/src/database/engine/cf_opts.rs +++ b/src/database/engine/cf_opts.rs @@ -29,7 +29,7 @@ fn descriptor_cf_options( set_table_options(&mut opts, &desc, cache)?; opts.set_min_write_buffer_number(1); - opts.set_max_write_buffer_number(2); + opts.set_max_write_buffer_number(3); opts.set_write_buffer_size(desc.write_size); opts.set_target_file_size_base(desc.file_size); From a57df9af37fedce9eab04dc6bf7459fc3a9bee48 Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Sat, 21 Jun 2025 08:02:05 -0700 Subject: [PATCH 07/37] upgrade some settings to enable 5g in continuwuity enable converged 6g at the edge in continuwuity better stateinfo_cache_capacity default better roomid_spacehierarchy_cache_capacity make sender workers default better and clamp value to core count update sender workers documentation add more parallelism_scaled and make them public update 1 document --- conduwuit-example.toml | 6 +-- src/core/config/mod.rs | 75 +++++++++++++++++++------------------- src/service/sending/mod.rs | 12 ++---- 3 files changed, 43 insertions(+), 50 deletions(-) diff --git a/conduwuit-example.toml b/conduwuit-example.toml index 541050b1..3bd08ed2 100644 --- a/conduwuit-example.toml +++ b/conduwuit-example.toml @@ -1680,11 +1680,9 @@ #stream_amplification = 1024 # Number of sender task workers; determines sender parallelism. Default is -# '0' which means the value is determined internally, likely matching the -# number of tokio worker-threads or number of cores, etc. Override by -# setting a non-zero value. +# number of CPU cores. Override by setting a different value. # -#sender_workers = 0 +#sender_workers = 4 # Enables listener sockets; can be set to false to disable listening. This # option is intended for developer/diagnostic purposes only. diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index aa021be7..336e6e48 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -1917,12 +1917,10 @@ pub struct Config { pub stream_amplification: usize, /// Number of sender task workers; determines sender parallelism. Default is - /// '0' which means the value is determined internally, likely matching the - /// number of tokio worker-threads or number of cores, etc. Override by - /// setting a non-zero value. + /// '4'. Override by setting a different value. Values clamped 1 to core count. /// - /// default: 0 - #[serde(default)] + /// default: 4 + #[serde(default = "default_sender_workers")] pub sender_workers: usize, /// Enables listener sockets; can be set to false to disable listening. This @@ -2153,45 +2151,47 @@ fn default_database_backups_to_keep() -> i16 { 1 } fn default_db_write_buffer_capacity_mb() -> f64 { 48.0 + parallelism_scaled_f64(4.0) } -fn default_db_cache_capacity_mb() -> f64 { 128.0 + parallelism_scaled_f64(64.0) } +fn default_db_cache_capacity_mb() -> f64 { 512.0 + parallelism_scaled_f64(512.0) } -fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(10_000).saturating_add(100_000) } +fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(50_000).saturating_add(500_000) } fn default_cache_capacity_modifier() -> f64 { 1.0 } fn default_auth_chain_cache_capacity() -> u32 { - parallelism_scaled_u32(10_000).saturating_add(100_000) + parallelism_scaled_u32(50_000).saturating_add(500_000) } fn default_shorteventid_cache_capacity() -> u32 { - parallelism_scaled_u32(50_000).saturating_add(100_000) -} - -fn default_eventidshort_cache_capacity() -> u32 { - parallelism_scaled_u32(25_000).saturating_add(100_000) -} - -fn default_eventid_pdu_cache_capacity() -> u32 { - parallelism_scaled_u32(25_000).saturating_add(100_000) -} - -fn default_shortstatekey_cache_capacity() -> u32 { - parallelism_scaled_u32(10_000).saturating_add(100_000) -} - -fn default_statekeyshort_cache_capacity() -> u32 { - parallelism_scaled_u32(10_000).saturating_add(100_000) -} - -fn default_servernameevent_data_cache_capacity() -> u32 { parallelism_scaled_u32(100_000).saturating_add(500_000) } -fn default_stateinfo_cache_capacity() -> u32 { parallelism_scaled_u32(100) } +fn default_eventidshort_cache_capacity() -> u32 { + parallelism_scaled_u32(100_000).saturating_add(500_000) +} -fn default_roomid_spacehierarchy_cache_capacity() -> u32 { parallelism_scaled_u32(1000) } +fn default_eventid_pdu_cache_capacity() -> u32 { + parallelism_scaled_u32(50_000).saturating_add(500_000) +} -fn default_dns_cache_entries() -> u32 { 32768 } +fn default_shortstatekey_cache_capacity() -> u32 { + parallelism_scaled_u32(50_000).saturating_add(500_000) +} + +fn default_statekeyshort_cache_capacity() -> u32 { + parallelism_scaled_u32(50_000).saturating_add(500_000) +} + +fn default_servernameevent_data_cache_capacity() -> u32 { + parallelism_scaled_u32(200_000).saturating_add(500_000) +} + +fn default_stateinfo_cache_capacity() -> u32 { + parallelism_scaled_u32(500).clamp(100, 12000) } + +fn default_roomid_spacehierarchy_cache_capacity() -> u32 { + parallelism_scaled_u32(500).clamp(100, 12000) } + +fn default_dns_cache_entries() -> u32 { 327680 } fn default_dns_min_ttl() -> u64 { 60 * 180 } @@ -2384,14 +2384,13 @@ fn default_admin_log_capture() -> String { fn default_admin_room_tag() -> String { "m.server_notice".to_owned() } #[allow(clippy::as_conversions, clippy::cast_precision_loss)] -fn parallelism_scaled_f64(val: f64) -> f64 { val * (sys::available_parallelism() as f64) } +pub fn parallelism_scaled_f64(val: f64) -> f64 { val * (sys::available_parallelism() as f64) } -fn parallelism_scaled_u32(val: u32) -> u32 { - let val = val.try_into().expect("failed to cast u32 to usize"); - parallelism_scaled(val).try_into().unwrap_or(u32::MAX) -} +pub fn parallelism_scaled_u32(val: u32) -> u32 { val.saturating_mul(sys::available_parallelism() as u32) } -fn parallelism_scaled(val: usize) -> usize { val.saturating_mul(sys::available_parallelism()) } +pub fn parallelism_scaled_i32(val: i32) -> i32 { val.saturating_mul(sys::available_parallelism() as i32) } + +pub fn parallelism_scaled(val: usize) -> usize { val.saturating_mul(sys::available_parallelism()) } fn default_trusted_server_batch_size() -> usize { 256 } @@ -2411,6 +2410,8 @@ fn default_stream_width_scale() -> f32 { 1.0 } fn default_stream_amplification() -> usize { 1024 } +fn default_sender_workers() -> usize { 4 } + fn default_client_receive_timeout() -> u64 { 75 } fn default_client_request_timeout() -> u64 { 180 } diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index 08ca7010..ce687551 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -401,16 +401,10 @@ impl Service { fn num_senders(args: &crate::Args<'_>) -> usize { const MIN_SENDERS: usize = 1; - // Limit the number of senders to the number of workers threads or number of - // cores, conservatively. - let max_senders = args - .server - .metrics - .num_workers() - .min(available_parallelism()); + // Limit the maximum number of senders to the number of cores. + let max_senders = available_parallelism(); - // If the user doesn't override the default 0, this is intended to then default - // to 1 for now as multiple senders is experimental. + // default is 4 senders. clamp between 1 and core count. args.server .config .sender_workers From e8bba3ba37a493c196041f4fefea958ef737afaa Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Sat, 7 Jun 2025 00:46:55 +0100 Subject: [PATCH 08/37] fix an auth rule not applying correctly --- src/core/matrix/state_res/event_auth.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/core/matrix/state_res/event_auth.rs b/src/core/matrix/state_res/event_auth.rs index 77a4a95c..ca266c5c 100644 --- a/src/core/matrix/state_res/event_auth.rs +++ b/src/core/matrix/state_res/event_auth.rs @@ -256,6 +256,16 @@ where }, | Some(e) => e, }; + // just re-check 1.2 to work around a bug + let Some(room_id_server_name) = incoming_event.room_id().server_name() else { + warn!("room ID has no servername"); + return Ok(false); + }; + + if room_id_server_name != sender.server_name() { + warn!("servername of room ID does not match servername of m.room.create sender"); + return Ok(false); + } if incoming_event.room_id() != room_create_event.room_id() { warn!("room_id of incoming event does not match room_id of m.room.create event"); From 1d9786133200988f2bf627e538ed1865c8a1150a Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Sat, 7 Jun 2025 00:55:03 +0100 Subject: [PATCH 09/37] Note about ruma#2064 in TODO --- src/core/matrix/state_res/event_auth.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/core/matrix/state_res/event_auth.rs b/src/core/matrix/state_res/event_auth.rs index ca266c5c..40c32e03 100644 --- a/src/core/matrix/state_res/event_auth.rs +++ b/src/core/matrix/state_res/event_auth.rs @@ -149,8 +149,8 @@ where for<'a> &'a E: Event + Send, { debug!( - event_id = %incoming_event.event_id(), - event_type = ?incoming_event.event_type(), + event_id = format!("{}", incoming_event.event_id()), + event_type = format!("{}", incoming_event.event_type()), "auth_check beginning" ); @@ -219,7 +219,7 @@ where /* // TODO: In the past this code was commented as it caused problems with Synapse. This is no // longer the case. This needs to be implemented. - // See also: https://github.com/ruma/ruma/pull/2064 + // See also: https://github.com/ruma/ruma/pull/2064 // // 2. Reject if auth_events // a. auth_events cannot have duplicate keys since it's a BTree From e7399409b4229a8e3822c9c4fbc4516ec365cbdd Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Tue, 10 Jun 2025 22:33:31 +0100 Subject: [PATCH 10/37] Kick up a fuss when m.room.create is unfindable --- src/core/matrix/state_res/event_auth.rs | 4 ++-- src/core/matrix/state_res/mod.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/core/matrix/state_res/event_auth.rs b/src/core/matrix/state_res/event_auth.rs index 40c32e03..31c660ed 100644 --- a/src/core/matrix/state_res/event_auth.rs +++ b/src/core/matrix/state_res/event_auth.rs @@ -30,7 +30,7 @@ use super::{ }, room_version::RoomVersion, }; -use crate::{debug, error, trace, warn}; +use crate::{debug, err_log, error, trace, warn}; // FIXME: field extracting could be bundled for `content` #[derive(Deserialize)] @@ -251,7 +251,7 @@ where let room_create_event = match room_create_event { | None => { - warn!("no m.room.create event in auth chain"); + error!("no m.room.create event in auth chain for {}!", incoming_event.event_id()); return Ok(false); }, | Some(e) => e, diff --git a/src/core/matrix/state_res/mod.rs b/src/core/matrix/state_res/mod.rs index ce9d9276..e721e14c 100644 --- a/src/core/matrix/state_res/mod.rs +++ b/src/core/matrix/state_res/mod.rs @@ -753,7 +753,7 @@ where } } } - // Did not find a power level event so we default to zero + warn!("could not find a power event in the mainline map, defaulting to zero depth"); Ok(0) } From 43574118aa6c90057e445394d9e7346c04cf26b8 Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Tue, 10 Jun 2025 23:00:09 +0100 Subject: [PATCH 11/37] Fix room ID check --- src/core/matrix/state_res/event_auth.rs | 11 +++++++---- src/service/rooms/event_handler/handle_outlier_pdu.rs | 5 +---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/core/matrix/state_res/event_auth.rs b/src/core/matrix/state_res/event_auth.rs index 31c660ed..de4d20e1 100644 --- a/src/core/matrix/state_res/event_auth.rs +++ b/src/core/matrix/state_res/event_auth.rs @@ -30,7 +30,7 @@ use super::{ }, room_version::RoomVersion, }; -use crate::{debug, err_log, error, trace, warn}; +use crate::{debug, error, trace, warn}; // FIXME: field extracting could be bundled for `content` #[derive(Deserialize)] @@ -251,7 +251,7 @@ where let room_create_event = match room_create_event { | None => { - error!("no m.room.create event in auth chain for {}!", incoming_event.event_id()); + error!("no m.room.create event found for {}!", incoming_event.event_id()); return Ok(false); }, | Some(e) => e, @@ -262,8 +262,11 @@ where return Ok(false); }; - if room_id_server_name != sender.server_name() { - warn!("servername of room ID does not match servername of m.room.create sender"); + if room_id_server_name != room_create_event.sender().server_name() { + warn!( + "servername of room ID origin ({}) does not match servername of m.room.create sender ({})", + room_id_server_name, + room_create_event.sender().server_name()); return Ok(false); } diff --git a/src/service/rooms/event_handler/handle_outlier_pdu.rs b/src/service/rooms/event_handler/handle_outlier_pdu.rs index d79eed77..fad9ac74 100644 --- a/src/service/rooms/event_handler/handle_outlier_pdu.rs +++ b/src/service/rooms/event_handler/handle_outlier_pdu.rs @@ -122,10 +122,7 @@ where } // The original create event must be in the auth events - if !matches!( - auth_events.get(&(StateEventType::RoomCreate, String::new().into())), - Some(_) | None - ) { + if !auth_events.contains_key(&(StateEventType::RoomCreate, String::new().into())) { return Err!(Request(InvalidParam("Incoming event refers to wrong create event."))); } From ecdce68ae33a80290c07192ae4379ca453672740 Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Wed, 11 Jun 2025 01:27:25 +0100 Subject: [PATCH 12/37] more logs --- src/core/matrix/state_res/event_auth.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/core/matrix/state_res/event_auth.rs b/src/core/matrix/state_res/event_auth.rs index de4d20e1..fc1119de 100644 --- a/src/core/matrix/state_res/event_auth.rs +++ b/src/core/matrix/state_res/event_auth.rs @@ -13,6 +13,7 @@ use ruma::{ power_levels::RoomPowerLevelsEventContent, third_party_invite::RoomThirdPartyInviteEventContent, }, + EventId, int, serde::{Base64, Raw}, }; @@ -21,7 +22,6 @@ use serde::{ de::{Error as _, IgnoredAny}, }; use serde_json::{from_str as from_json_str, value::RawValue as RawJsonValue}; - use super::{ Error, Event, Result, StateEventType, StateKey, TimelineEventType, power_levels::{ @@ -251,7 +251,14 @@ where let room_create_event = match room_create_event { | None => { - error!("no m.room.create event found for {}!", incoming_event.event_id()); + error!( + create_event = room_create_event.as_ref().map(Event::event_id).unwrap_or(<&EventId>::try_from("$unknown").unwrap()).as_str(), + power_levels = power_levels_event.as_ref().map(Event::event_id).unwrap_or(<&EventId>::try_from("$unknown").unwrap()).as_str(), + member_event = sender_member_event.as_ref().map(Event::event_id).unwrap_or(<&EventId>::try_from("$unknown").unwrap()).as_str(), + "no m.room.create event found for {} ({})!", + incoming_event.event_id().as_str(), + incoming_event.room_id().as_str() + ); return Ok(false); }, | Some(e) => e, From 0a74dfe5a5b1e37cf1665a38ee1e7fffd9618628 Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Wed, 11 Jun 2025 01:42:19 +0100 Subject: [PATCH 13/37] log which room struggled to get mainline depth --- src/core/matrix/state_res/mod.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/core/matrix/state_res/mod.rs b/src/core/matrix/state_res/mod.rs index e721e14c..ba9c013d 100644 --- a/src/core/matrix/state_res/mod.rs +++ b/src/core/matrix/state_res/mod.rs @@ -733,8 +733,12 @@ where Fut: Future> + Send, E: Event + Send + Sync, { + let mut room_id = None; while let Some(sort_ev) = event { debug!(event_id = sort_ev.event_id().as_str(), "mainline"); + if room_id.is_none() { + room_id = Some(sort_ev.room_id().to_owned()); + } let id = sort_ev.event_id(); if let Some(depth) = mainline_map.get(id) { @@ -753,7 +757,7 @@ where } } } - warn!("could not find a power event in the mainline map, defaulting to zero depth"); + warn!("could not find a power event in the mainline map for {room_id:?}, defaulting to zero depth"); Ok(0) } From 54eab4775acf188f3643b204f99f6290070e544b Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Sat, 21 Jun 2025 08:02:49 -0700 Subject: [PATCH 14/37] change rocksdb stats level to 3 scale rocksdb background jobs and subcompactions change rocksdb default error level to info from error delete unused num_threads function fix warns from cargo --- conduwuit-example.toml | 2 +- src/core/config/mod.rs | 6 +++--- src/database/engine/db_opts.rs | 22 ++++------------------ 3 files changed, 8 insertions(+), 22 deletions(-) diff --git a/conduwuit-example.toml b/conduwuit-example.toml index 3bd08ed2..5b66044a 100644 --- a/conduwuit-example.toml +++ b/conduwuit-example.toml @@ -1066,7 +1066,7 @@ # 3 to 5 = Statistics with possible performance impact. # 6 = All statistics. # -#rocksdb_stats_level = 1 +#rocksdb_stats_level = 3 # This is a password that can be configured that will let you login to the # server bot account (currently `@conduit`) for emergency troubleshooting diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index 336e6e48..a0d58274 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -1235,7 +1235,7 @@ pub struct Config { /// 3 to 5 = Statistics with possible performance impact. /// 6 = All statistics. /// - /// default: 1 + /// default: 3 #[serde(default = "default_rocksdb_stats_level")] pub rocksdb_stats_level: u8, @@ -2297,7 +2297,7 @@ fn default_typing_client_timeout_max_s() -> u64 { 45 } fn default_rocksdb_recovery_mode() -> u8 { 1 } -fn default_rocksdb_log_level() -> String { "error".to_owned() } +fn default_rocksdb_log_level() -> String { "info".to_owned() } fn default_rocksdb_log_time_to_roll() -> usize { 0 } @@ -2329,7 +2329,7 @@ fn default_rocksdb_compression_level() -> i32 { 32767 } #[allow(clippy::doc_markdown)] fn default_rocksdb_bottommost_compression_level() -> i32 { 32767 } -fn default_rocksdb_stats_level() -> u8 { 1 } +fn default_rocksdb_stats_level() -> u8 { 3 } // I know, it's a great name #[must_use] diff --git a/src/database/engine/db_opts.rs b/src/database/engine/db_opts.rs index 18cec742..1299443d 100644 --- a/src/database/engine/db_opts.rs +++ b/src/database/engine/db_opts.rs @@ -1,8 +1,6 @@ -use std::{cmp, convert::TryFrom}; - -use conduwuit::{Config, Result, utils}; +use conduwuit::{Config, Result}; use rocksdb::{Cache, DBRecoveryMode, Env, LogLevel, Options, statistics::StatsLevel}; - +use conduwuit::config::{parallelism_scaled_i32, parallelism_scaled_u32}; use super::{cf_opts::cache_size_f64, logger::handle as handle_log}; /// Create database-wide options suitable for opening the database. This also @@ -23,8 +21,8 @@ pub(crate) fn db_options(config: &Config, env: &Env, row_cache: &Cache) -> Resul set_logging_defaults(&mut opts, config); // Processing - opts.set_max_background_jobs(num_threads::(config)?); - opts.set_max_subcompactions(num_threads::(config)?); + opts.set_max_background_jobs(parallelism_scaled_i32(1)); + opts.set_max_subcompactions(parallelism_scaled_u32(1)); opts.set_avoid_unnecessary_blocking_io(true); opts.set_max_file_opening_threads(0); @@ -126,15 +124,3 @@ fn set_logging_defaults(opts: &mut Options, config: &Config) { opts.set_callback_logger(rocksdb_log_level, &handle_log); } } - -fn num_threads>(config: &Config) -> Result { - const MIN_PARALLELISM: usize = 2; - - let requested = if config.rocksdb_parallelism_threads != 0 { - config.rocksdb_parallelism_threads - } else { - utils::available_parallelism() - }; - - utils::math::try_into::(cmp::max(MIN_PARALLELISM, requested)) -} From 008d90b11810500b1d270219023e09e1426b70b3 Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Wed, 18 Jun 2025 12:48:27 -0700 Subject: [PATCH 15/37] make fetching key room events less smart --- src/core/matrix/state_res/event_auth.rs | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/core/matrix/state_res/event_auth.rs b/src/core/matrix/state_res/event_auth.rs index fc1119de..ec70d684 100644 --- a/src/core/matrix/state_res/event_auth.rs +++ b/src/core/matrix/state_res/event_auth.rs @@ -242,12 +242,16 @@ where } */ - let (room_create_event, power_levels_event, sender_member_event) = join3( - fetch_state(&StateEventType::RoomCreate, ""), - fetch_state(&StateEventType::RoomPowerLevels, ""), - fetch_state(&StateEventType::RoomMember, sender.as_str()), - ) - .await; + // let (room_create_event, power_levels_event, sender_member_event) = join3( + // fetch_state(&StateEventType::RoomCreate, ""), + // fetch_state(&StateEventType::RoomPowerLevels, ""), + // fetch_state(&StateEventType::RoomMember, sender.as_str()), + // ) + // .await; + + let room_create_event = fetch_state(&StateEventType::RoomCreate, "").await; + let power_levels_event = fetch_state(&StateEventType::RoomPowerLevels, "").await; + let sender_member_event = fetch_state(&StateEventType::RoomMember, sender.as_str()).await; let room_create_event = match room_create_event { | None => { From 1e4cf59ab8fdf7f229178c76c4cdb851b3f80deb Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Thu, 3 Jul 2025 14:39:10 -0700 Subject: [PATCH 16/37] lock the getter instead ??? c/o M --- src/service/rooms/event_handler/upgrade_outlier_pdu.rs | 2 +- src/service/rooms/state/mod.rs | 1 + src/service/rooms/timeline/create.rs | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index d2e0623c..c92d7ac7 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -175,7 +175,7 @@ where let extremities: Vec<_> = self .services .state - .get_forward_extremities(room_id) + .get_forward_extremities(room_id, &state_lock) .map(ToOwned::to_owned) .ready_filter(|event_id| { // Remove any that are referenced by this incoming event's prev_events diff --git a/src/service/rooms/state/mod.rs b/src/service/rooms/state/mod.rs index 641aa6a9..92881126 100644 --- a/src/service/rooms/state/mod.rs +++ b/src/service/rooms/state/mod.rs @@ -388,6 +388,7 @@ impl Service { pub fn get_forward_extremities<'a>( &'a self, room_id: &'a RoomId, + _state_lock: &'a RoomMutexGuard, ) -> impl Stream + Send + '_ { let prefix = (room_id, Interfix); diff --git a/src/service/rooms/timeline/create.rs b/src/service/rooms/timeline/create.rs index 6732cd8e..188e71a4 100644 --- a/src/service/rooms/timeline/create.rs +++ b/src/service/rooms/timeline/create.rs @@ -42,7 +42,7 @@ pub async fn create_hash_and_sign_event( let prev_events: Vec = self .services .state - .get_forward_extremities(room_id) + .get_forward_extremities(room_id, _mutex_lock) .take(20) .map(Into::into) .collect() From 3e9cf3f4942f317c550f25e0e9a59f50ec1dbe76 Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Thu, 3 Jul 2025 14:44:27 -0700 Subject: [PATCH 17/37] vehicle loan documentation now available at window 7 --- src/service/rooms/event_handler/upgrade_outlier_pdu.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index c92d7ac7..717fe711 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -6,6 +6,7 @@ use conduwuit::{ trace, utils::stream::{BroadbandExt, ReadyExt}, warn, + info }; use futures::{FutureExt, StreamExt, future::ready}; use ruma::{CanonicalJsonValue, RoomId, ServerName, events::StateEventType}; @@ -193,6 +194,8 @@ where .collect() .await; + if extremities.len() == 0 { info!("Retained zero extremities when upgrading outlier PDU to timeline PDU with {} previous events, event id: {}", incoming_pdu.prev_events.len(), incoming_pdu.event_id) } + debug!( "Retained {} extremities checked against {} prev_events", extremities.len(), From 081487a413f24000f8df173e8e0295d656ad09a1 Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Sat, 21 Jun 2025 08:13:30 -0700 Subject: [PATCH 18/37] sender_workers scaling. this time, with feeling! --- src/core/config/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index a0d58274..d8dd5fcf 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -1917,9 +1917,9 @@ pub struct Config { pub stream_amplification: usize, /// Number of sender task workers; determines sender parallelism. Default is - /// '4'. Override by setting a different value. Values clamped 1 to core count. + /// core count. Override by setting a different value. /// - /// default: 4 + /// default: core count #[serde(default = "default_sender_workers")] pub sender_workers: usize, @@ -2410,7 +2410,7 @@ fn default_stream_width_scale() -> f32 { 1.0 } fn default_stream_amplification() -> usize { 1024 } -fn default_sender_workers() -> usize { 4 } +fn default_sender_workers() -> usize { parallelism_scaled(1) } fn default_client_receive_timeout() -> u64 { 75 } From 697e7aa2cdc71f3bbae480c6c3da8869daf24a3c Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Mon, 30 Jun 2025 15:25:11 -0700 Subject: [PATCH 19/37] more funny settings (part 3 of 12) --- src/core/config/mod.rs | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index d8dd5fcf..6cbda97e 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -2153,40 +2153,41 @@ fn default_db_write_buffer_capacity_mb() -> f64 { 48.0 + parallelism_scaled_f64( fn default_db_cache_capacity_mb() -> f64 { 512.0 + parallelism_scaled_f64(512.0) } -fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(50_000).saturating_add(500_000) } +fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(50_000).saturating_add(100_000) } fn default_cache_capacity_modifier() -> f64 { 1.0 } fn default_auth_chain_cache_capacity() -> u32 { - parallelism_scaled_u32(50_000).saturating_add(500_000) + parallelism_scaled_u32(50_000).saturating_add(100_000) } fn default_shorteventid_cache_capacity() -> u32 { - parallelism_scaled_u32(100_000).saturating_add(500_000) + parallelism_scaled_u32(100_000).saturating_add(100_000) } fn default_eventidshort_cache_capacity() -> u32 { - parallelism_scaled_u32(100_000).saturating_add(500_000) + parallelism_scaled_u32(50_000).saturating_add(100_000) } fn default_eventid_pdu_cache_capacity() -> u32 { - parallelism_scaled_u32(50_000).saturating_add(500_000) + parallelism_scaled_u32(50_000).saturating_add(100_000) } fn default_shortstatekey_cache_capacity() -> u32 { - parallelism_scaled_u32(50_000).saturating_add(500_000) + parallelism_scaled_u32(50_000).saturating_add(100_000) } fn default_statekeyshort_cache_capacity() -> u32 { - parallelism_scaled_u32(50_000).saturating_add(500_000) + parallelism_scaled_u32(50_000).saturating_add(100_000) } fn default_servernameevent_data_cache_capacity() -> u32 { - parallelism_scaled_u32(200_000).saturating_add(500_000) + parallelism_scaled_u32(100_000).saturating_add(100_000) } fn default_stateinfo_cache_capacity() -> u32 { - parallelism_scaled_u32(500).clamp(100, 12000) } + parallelism_scaled_u32(500).clamp(100, 12000) +} fn default_roomid_spacehierarchy_cache_capacity() -> u32 { parallelism_scaled_u32(500).clamp(100, 12000) } From 6483e984ced76472ccb85f263b8cafdf6bf64a0f Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Mon, 21 Jul 2025 17:22:19 -0700 Subject: [PATCH 20/37] fix too many infos --- src/service/rooms/event_handler/upgrade_outlier_pdu.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index 717fe711..1b6b8af5 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -5,8 +5,7 @@ use conduwuit::{ matrix::{Event, EventTypeExt, PduEvent, StateKey, state_res}, trace, utils::stream::{BroadbandExt, ReadyExt}, - warn, - info + warn }; use futures::{FutureExt, StreamExt, future::ready}; use ruma::{CanonicalJsonValue, RoomId, ServerName, events::StateEventType}; From 4baf48e2147370650e3a3228f2f0e69726351e31 Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Sat, 2 Aug 2025 20:59:02 -0700 Subject: [PATCH 21/37] exponential backoff is now just bees. did you want bees? no? well you have them now. congrats --- src/service/presence/mod.rs | 2 +- src/service/resolver/dns.rs | 4 ++-- src/service/rooms/event_handler/fetch_and_handle_outliers.rs | 2 +- src/service/rooms/event_handler/handle_prev_pdu.rs | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/service/presence/mod.rs b/src/service/presence/mod.rs index 8f646be6..876d8bee 100644 --- a/src/service/presence/mod.rs +++ b/src/service/presence/mod.rs @@ -100,7 +100,7 @@ impl Service { /// Pings the presence of the given user in the given room, setting the /// specified state. pub async fn ping_presence(&self, user_id: &UserId, new_state: &PresenceState) -> Result<()> { - const REFRESH_TIMEOUT: u64 = 60 * 1000; + const REFRESH_TIMEOUT: u64 = 60 * 1000 * 4; let last_presence = self.db.get_presence(user_id).await; let state_changed = match last_presence { diff --git a/src/service/resolver/dns.rs b/src/service/resolver/dns.rs index 3a0b2551..6856853e 100644 --- a/src/service/resolver/dns.rs +++ b/src/service/resolver/dns.rs @@ -53,9 +53,9 @@ impl Resolver { opts.cache_size = config.dns_cache_entries as usize; opts.preserve_intermediates = true; opts.negative_min_ttl = Some(Duration::from_secs(config.dns_min_ttl_nxdomain)); - opts.negative_max_ttl = Some(Duration::from_secs(60 * 60 * 24 * 30)); + opts.negative_max_ttl = Some(Duration::from_secs(60 * 60 * 24)); opts.positive_min_ttl = Some(Duration::from_secs(config.dns_min_ttl)); - opts.positive_max_ttl = Some(Duration::from_secs(60 * 60 * 24 * 7)); + opts.positive_max_ttl = Some(Duration::from_secs(60 * 60 * 24)); opts.timeout = Duration::from_secs(config.dns_timeout); opts.attempts = config.dns_attempts as usize; opts.try_tcp_on_error = config.dns_tcp_fallback; diff --git a/src/service/rooms/event_handler/fetch_and_handle_outliers.rs b/src/service/rooms/event_handler/fetch_and_handle_outliers.rs index 59b768f2..c72cce87 100644 --- a/src/service/rooms/event_handler/fetch_and_handle_outliers.rs +++ b/src/service/rooms/event_handler/fetch_and_handle_outliers.rs @@ -79,7 +79,7 @@ where { // Exponential backoff const MIN_DURATION: u64 = 60 * 2; - const MAX_DURATION: u64 = 60 * 60 * 8; + const MAX_DURATION: u64 = 60 * 60; if continue_exponential_backoff_secs( MIN_DURATION, MAX_DURATION, diff --git a/src/service/rooms/event_handler/handle_prev_pdu.rs b/src/service/rooms/event_handler/handle_prev_pdu.rs index cb4978d9..c786599a 100644 --- a/src/service/rooms/event_handler/handle_prev_pdu.rs +++ b/src/service/rooms/event_handler/handle_prev_pdu.rs @@ -46,7 +46,7 @@ where { // Exponential backoff const MIN_DURATION: u64 = 5 * 60; - const MAX_DURATION: u64 = 60 * 60 * 24; + const MAX_DURATION: u64 = 60 * 60; if continue_exponential_backoff_secs(MIN_DURATION, MAX_DURATION, time.elapsed(), *tries) { debug!( ?tries, From 7d9b5146969ec32133943659d0475df8cb5743da Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Thu, 21 Aug 2025 12:19:53 -0700 Subject: [PATCH 22/37] fix warn by removing unused debug imports --- src/service/federation/execute.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/service/federation/execute.rs b/src/service/federation/execute.rs index ace29c1c..9a3cd828 100644 --- a/src/service/federation/execute.rs +++ b/src/service/federation/execute.rs @@ -6,7 +6,7 @@ use std::{ use bytes::Bytes; use conduwuit::{ - Err, Error, Result, debug, debug::INFO_SPAN_LEVEL, debug_error, debug_warn, err, + Err, Error, Result, debug, debug::INFO_SPAN_LEVEL, err, error::inspect_debug_log, implement, trace, utils::string::EMPTY, warn, }; use http::{HeaderValue, header::AUTHORIZATION}; From a7929c19310a7d03ed5c819ddc5b05cb35f81a90 Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Tue, 26 Aug 2025 18:36:39 -0700 Subject: [PATCH 23/37] fix warn by removing unused debug imports delete more imports to quiet cargo --- src/api/server/send.rs | 2 +- src/service/federation/execute.rs | 13 +++++-------- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/src/api/server/send.rs b/src/api/server/send.rs index e7e4e9fd..385477d1 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -3,7 +3,7 @@ use std::{collections::BTreeMap, net::IpAddr, time::Instant}; use axum::extract::State; use axum_client_ip::InsecureClientIp; use conduwuit::{ - Err, Error, Result, debug, + Err, Error, Result, debug::INFO_SPAN_LEVEL, debug_warn, err, error, info, result::LogErr, diff --git a/src/service/federation/execute.rs b/src/service/federation/execute.rs index 9a3cd828..5fe10ec9 100644 --- a/src/service/federation/execute.rs +++ b/src/service/federation/execute.rs @@ -5,10 +5,7 @@ use std::{ }; use bytes::Bytes; -use conduwuit::{ - Err, Error, Result, debug, debug::INFO_SPAN_LEVEL, err, - error::inspect_debug_log, implement, trace, utils::string::EMPTY, warn, -}; +use conduwuit::{Err, Error, Result, debug, debug::INFO_SPAN_LEVEL, err, error::inspect_debug_log, implement, trace, utils::string::EMPTY, info}; use http::{HeaderValue, header::AUTHORIZATION}; use ipaddress::IPAddress; use reqwest::{Client, Method, Request, Response, Url}; @@ -197,9 +194,9 @@ fn handle_error( ) -> Result { if e.is_timeout() || e.is_connect() { e = e.without_url(); - warn!(?url, "network error while sending federation request: {e:?}"); + trace!(?url, "network error while sending federation request: {e:?}"); } else if e.is_redirect() { - warn!( + trace!( method = ?method, url = ?url, final_url = ?e.url(), @@ -208,7 +205,7 @@ fn handle_error( e, ); } else { - warn!(?url, "failed to send federation request: {e:?}"); + trace!(?url, "failed to send federation request: {e:?}"); } let mut nice_error = "Request failed".to_owned(); @@ -217,7 +214,7 @@ fn handle_error( write!(nice_error, ": {source:?}").expect("writing to string should not fail"); src = source.source(); } - warn!(nice_error, "Federation request error"); + info!(nice_error, "Federation request error"); Err(e.into()) } From e15d71d2303387e55c5f679ba4753ffa930527e3 Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Fri, 22 Aug 2025 10:44:31 -0700 Subject: [PATCH 24/37] log which room is being backfilled move more backfill log to info, clean up imports --- src/service/rooms/timeline/backfill.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/src/service/rooms/timeline/backfill.rs b/src/service/rooms/timeline/backfill.rs index e976981e..d91075e1 100644 --- a/src/service/rooms/timeline/backfill.rs +++ b/src/service/rooms/timeline/backfill.rs @@ -1,13 +1,13 @@ use std::iter::once; use conduwuit_core::{ - Result, debug, debug_warn, implement, info, + Result, debug, implement, info, matrix::{ event::Event, pdu::{PduCount, PduId, RawPduId}, }, utils::{IterStream, ReadyExt}, - validated, warn, + validated, }; use futures::{FutureExt, StreamExt}; use ruma::{ @@ -15,9 +15,7 @@ use ruma::{ api::federation, events::{ StateEventType, TimelineEventType, room::power_levels::RoomPowerLevelsEventContent, - }, - uint, -}; + }, UInt}; use serde_json::value::RawValue as RawJsonValue; use super::ExtractBody; @@ -100,7 +98,7 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re .boxed(); while let Some(ref backfill_server) = servers.next().await { - info!("Asking {backfill_server} for backfill"); + info!("Asking {backfill_server} for backfill of room {room_id}"); let response = self .services .sending @@ -109,7 +107,7 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re federation::backfill::get_backfill::v1::Request { room_id: room_id.to_owned(), v: vec![first_pdu.1.event_id().to_owned()], - limit: uint!(100), + limit: UInt::from(self.services.server.config.max_fetch_prev_events), }, ) .await; @@ -117,13 +115,13 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re | Ok(response) => { for pdu in response.pdus { if let Err(e) = self.backfill_pdu(backfill_server, pdu).boxed().await { - debug_warn!("Failed to add backfilled pdu in room {room_id}: {e}"); + info!("Failed to add backfilled pdu in room {room_id}: {e}"); } } return Ok(()); }, | Err(e) => { - warn!("{backfill_server} failed to provide backfill for room {room_id}: {e}"); + info!("{backfill_server} failed to provide backfill for room {room_id}: {e}"); }, } } From a75eacd54474db2dc5e30c642e3a044ce3d4ef95 Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Mon, 25 Aug 2025 20:51:55 -0700 Subject: [PATCH 25/37] reduce log volume (keeps 2 infos) adjust log volume demote a bunch of logs --- src/api/server/send.rs | 6 +++--- .../rooms/event_handler/fetch_and_handle_outliers.rs | 2 +- src/service/sending/sender.rs | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/api/server/send.rs b/src/api/server/send.rs index 385477d1..601da06c 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -79,7 +79,7 @@ pub(crate) async fn send_transaction_message_route( } let txn_start_time = Instant::now(); - info!( + trace!( pdus = body.pdus.len(), edus = body.edus.len(), id = ?body.transaction_id, @@ -102,7 +102,7 @@ pub(crate) async fn send_transaction_message_route( .filter_map(Result::ok) .stream(); - info!( + trace!( pdus = body.pdus.len(), edus = body.edus.len(), elapsed = ?txn_start_time.elapsed(), @@ -198,7 +198,7 @@ async fn handle_room( .and_then(|(_, event_id, value)| async move { services.server.check_running()?; let pdu_start_time = Instant::now(); - info!( + trace!( %room_id, %event_id, pdu = n + 1, diff --git a/src/service/rooms/event_handler/fetch_and_handle_outliers.rs b/src/service/rooms/event_handler/fetch_and_handle_outliers.rs index c72cce87..d273a41a 100644 --- a/src/service/rooms/event_handler/fetch_and_handle_outliers.rs +++ b/src/service/rooms/event_handler/fetch_and_handle_outliers.rs @@ -4,7 +4,7 @@ use std::{ }; use conduwuit::{ - Event, PduEvent, debug, debug_error, debug_warn, implement, + Event, PduEvent, debug, debug_error, implement, matrix::event::gen_event_id_canonical_json, trace, utils::continue_exponential_backoff_secs, warn, }; diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index 5498afc0..ac82669f 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -882,7 +882,7 @@ impl Service { .execute_on(&self.services.client.sender, &server, request) .await .inspect(|_| { - info!(%txn_id, %server, "Sent {} PDUs, {} EDUs", pdu_count, edu_count); + trace!(%txn_id, %server, "Sent {} PDUs, {} EDUs", pdu_count, edu_count); }) .inspect_err(|e| { info!(%txn_id, %server, "Failed to send transaction ({} PDUs, {} EDUs): {e:?}", pdu_count, edu_count); @@ -890,7 +890,7 @@ impl Service { for (event_id, result) in result.iter().flat_map(|resp| resp.pdus.iter()) { if let Err(e) = result { - warn!( + trace!( %txn_id, %server, "error sending PDU {event_id} to remote server: {e:?}" ); From c83f1ddb7164ddd98e130671d784ca1ed83ec964 Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Tue, 26 Aug 2025 18:08:19 -0700 Subject: [PATCH 26/37] add event_id to log entry --- src/service/rooms/event_handler/handle_outlier_pdu.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/service/rooms/event_handler/handle_outlier_pdu.rs b/src/service/rooms/event_handler/handle_outlier_pdu.rs index fad9ac74..3b8653e7 100644 --- a/src/service/rooms/event_handler/handle_outlier_pdu.rs +++ b/src/service/rooms/event_handler/handle_outlier_pdu.rs @@ -123,7 +123,7 @@ where // The original create event must be in the auth events if !auth_events.contains_key(&(StateEventType::RoomCreate, String::new().into())) { - return Err!(Request(InvalidParam("Incoming event refers to wrong create event."))); + return Err!(Request(InvalidParam("Incoming event refers to wrong create event. event_id={event_id}"))); } let state_fetch = |ty: &StateEventType, sk: &str| { From 60b8c9f5f2b92af8997997aa57e870eea9335425 Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Tue, 26 Aug 2025 18:42:40 -0700 Subject: [PATCH 27/37] turns out we need debug_warn --- .../rooms/event_handler/fetch_and_handle_outliers.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/service/rooms/event_handler/fetch_and_handle_outliers.rs b/src/service/rooms/event_handler/fetch_and_handle_outliers.rs index d273a41a..51d10d71 100644 --- a/src/service/rooms/event_handler/fetch_and_handle_outliers.rs +++ b/src/service/rooms/event_handler/fetch_and_handle_outliers.rs @@ -3,11 +3,7 @@ use std::{ time::Instant, }; -use conduwuit::{ - Event, PduEvent, debug, debug_error, implement, - matrix::event::gen_event_id_canonical_json, trace, utils::continue_exponential_backoff_secs, - warn, -}; +use conduwuit::{Event, PduEvent, debug, debug_error, implement, matrix::event::gen_event_id_canonical_json, trace, utils::continue_exponential_backoff_secs, warn, debug_warn}; use ruma::{ CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName, api::federation::event::get_event, From 9723753b5c598aae5eb4157eacd68346fbc4945d Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Sun, 31 Aug 2025 09:16:52 -0700 Subject: [PATCH 28/37] pass and use transaction id to collect timing info --- src/api/server/send.rs | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/src/api/server/send.rs b/src/api/server/send.rs index 601da06c..f5727cf6 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -79,10 +79,11 @@ pub(crate) async fn send_transaction_message_route( } let txn_start_time = Instant::now(); + let transaction_id = body.transaction_id.to_string(); trace!( pdus = body.pdus.len(), edus = body.edus.len(), - id = ?body.transaction_id, + id = transaction_id, "Processing transaction", ); @@ -106,17 +107,17 @@ pub(crate) async fn send_transaction_message_route( pdus = body.pdus.len(), edus = body.edus.len(), elapsed = ?txn_start_time.elapsed(), - id = ?body.transaction_id, + id = transaction_id, "Validated transaction", ); - let results = handle(&services, &client, body.origin(), txn_start_time, pdus, edus).await?; + let results = handle(&services, &client, body.origin(), txn_start_time, pdus, edus, &*transaction_id).await?; info!( pdus = body.pdus.len(), edus = body.edus.len(), elapsed = ?txn_start_time.elapsed(), - id = ?body.transaction_id, + id = transaction_id, "Finished txn", ); for (id, result) in &results { @@ -142,11 +143,13 @@ async fn handle( started: Instant, pdus: impl Stream + Send, edus: impl Stream + Send, + transaction_id: &str, ) -> Result { + let handle_start = Instant::now(); edus.for_each_concurrent(automatic_width(), |edu| handle_edu(services, client, origin, edu)) .boxed() .await; - + let pdu_start = Instant::now(); // group pdus by room let pdus = pdus .collect() @@ -157,14 +160,14 @@ async fn handle( .collect() }) .await; - + let results_start = Instant::now(); // we can evaluate rooms concurrently let results: ResolvedMap = pdus .into_iter() .try_stream() .broad_and_then(|(room_id, pdus): (_, Vec<_>)| { let count = pdus.len(); - handle_room(services, client, origin, started, room_id, pdus.into_iter(), count) + handle_room(services, client, origin, started, room_id, pdus.into_iter(), count, transaction_id) .map_ok(Vec::into_iter) .map_ok(IterStream::try_stream) }) @@ -172,7 +175,14 @@ async fn handle( .try_collect() .boxed() .await?; - + let handle_stop = Instant::now(); + info!( + edus = pdu_start.saturating_duration_since(handle_start).as_micros(), + pdus = results_start.saturating_duration_since(pdu_start).as_micros(), + handle_room = handle_stop.saturating_duration_since(results_start).as_micros(), + transaction_id = ?transaction_id, + "handled incoming transaction", + ); Ok(results) } @@ -184,6 +194,7 @@ async fn handle_room( room_id: OwnedRoomId, pdus: impl Iterator + Send, count: usize, + transaction_id: &str, ) -> Result> { let _room_lock = services .rooms @@ -201,6 +212,7 @@ async fn handle_room( trace!( %room_id, %event_id, + transaction_id = ?transaction_id, pdu = n + 1, total = count, pdu_elapsed = ?pdu_start_time.elapsed(), @@ -217,11 +229,12 @@ async fn handle_room( info!( %room_id, %event_id, + transaction_id = ?transaction_id, pdu = n + 1, total = count, pdu_elapsed = ?pdu_start_time.elapsed(), txn_elapsed = ?txn_start_time.elapsed(), - "Finished handling PDU {event_id}", + "Finished handling PDU", ); n += 1; From 8eb5f8466fc2d9a35a2a59298a36a84132ef5e69 Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Tue, 26 Aug 2025 22:25:16 -0700 Subject: [PATCH 29/37] process edus before pdus here, too --- src/api/server/send.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/api/server/send.rs b/src/api/server/send.rs index f5727cf6..b2794a6e 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -87,14 +87,6 @@ pub(crate) async fn send_transaction_message_route( "Processing transaction", ); - let pdus = body - .pdus - .iter() - .stream() - .broad_then(|pdu| services.rooms.event_handler.parse_incoming_pdu(pdu)) - .inspect_err(|e| debug_warn!("Could not parse PDU: {e}")) - .ready_filter_map(Result::ok); - let edus = body .edus .iter() @@ -103,6 +95,14 @@ pub(crate) async fn send_transaction_message_route( .filter_map(Result::ok) .stream(); + let pdus = body + .pdus + .iter() + .stream() + .broad_then(|pdu| services.rooms.event_handler.parse_incoming_pdu(pdu)) + .inspect_err(|e| debug_warn!("Could not parse PDU: {e}")) + .ready_filter_map(Result::ok); + trace!( pdus = body.pdus.len(), edus = body.edus.len(), From f09e1c075da43948dbfd08654e9c919724f57e23 Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Sat, 30 Aug 2025 16:00:46 +0100 Subject: [PATCH 30/37] fix(sync/v2): Room leaves being omitted incorrectly Partially borrowed from https://github.com/matrix-construct/tuwunel/commit/85a84f93c7ef7184a8eee1bb17116e5f0f0faf5a --- src/api/client/sync/v3.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index 01428c08..298a6e4b 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -430,7 +430,7 @@ async fn handle_left_room( .ok(); // Left before last sync - if Some(since) >= left_count { + if (Some(since) >= left_count && !include_leave) || Some(next_batch) < left_count { return Ok(None); } From 1f75b0a247e08c77837aec288a7563916ac38057 Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Sun, 31 Aug 2025 10:26:59 -0700 Subject: [PATCH 31/37] collect room lock timing ??? --- src/api/server/send.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/api/server/send.rs b/src/api/server/send.rs index b2794a6e..94991455 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -196,12 +196,14 @@ async fn handle_room( count: usize, transaction_id: &str, ) -> Result> { + let room_lock_start = Instant::now(); let _room_lock = services .rooms .event_handler .mutex_federation .lock(&room_id) .await; + let room_lock_end = Instant::now(); let room_id = &room_id; let mut n = 0; @@ -215,6 +217,7 @@ async fn handle_room( transaction_id = ?transaction_id, pdu = n + 1, total = count, + room_lock_time = ?room_lock_end.saturating_duration_since(room_lock_start).as_micros(), pdu_elapsed = ?pdu_start_time.elapsed(), txn_elapsed = ?txn_start_time.elapsed(), "Handling PDU", @@ -232,6 +235,7 @@ async fn handle_room( transaction_id = ?transaction_id, pdu = n + 1, total = count, + room_lock_time = ?room_lock_end.saturating_duration_since(room_lock_start).as_micros(), pdu_elapsed = ?pdu_start_time.elapsed(), txn_elapsed = ?txn_start_time.elapsed(), "Finished handling PDU", From 879c8e527badc2bf83ae497ce6bec6f8d7f08a18 Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Sat, 6 Sep 2025 10:36:19 -0700 Subject: [PATCH 32/37] revert get extremity locking --- src/service/rooms/event_handler/upgrade_outlier_pdu.rs | 2 +- src/service/rooms/timeline/create.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index 1b6b8af5..4a64c017 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -175,7 +175,7 @@ where let extremities: Vec<_> = self .services .state - .get_forward_extremities(room_id, &state_lock) + .get_forward_extremities(room_id) .map(ToOwned::to_owned) .ready_filter(|event_id| { // Remove any that are referenced by this incoming event's prev_events diff --git a/src/service/rooms/timeline/create.rs b/src/service/rooms/timeline/create.rs index 188e71a4..6732cd8e 100644 --- a/src/service/rooms/timeline/create.rs +++ b/src/service/rooms/timeline/create.rs @@ -42,7 +42,7 @@ pub async fn create_hash_and_sign_event( let prev_events: Vec = self .services .state - .get_forward_extremities(room_id, _mutex_lock) + .get_forward_extremities(room_id) .take(20) .map(Into::into) .collect() From 9bff1f6c06bb0d2c37926f95a6f88a6c1d19a8fa Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Tue, 8 Jul 2025 17:07:50 +0100 Subject: [PATCH 33/37] feat: Ask remote servers for individual unknown events --- src/api/client/room/event.rs | 7 +- src/service/rooms/timeline/backfill.rs | 114 ++++++++++++++++++++++++- 2 files changed, 112 insertions(+), 9 deletions(-) diff --git a/src/api/client/room/event.rs b/src/api/client/room/event.rs index 47228d67..75ae0758 100644 --- a/src/api/client/room/event.rs +++ b/src/api/client/room/event.rs @@ -18,7 +18,7 @@ pub(crate) async fn get_room_event_route( let event = services .rooms .timeline - .get_pdu(event_id) + .get_remote_pdu(room_id, event_id) .map_err(|_| err!(Request(NotFound("Event {} not found.", event_id)))); let visible = services @@ -33,11 +33,6 @@ pub(crate) async fn get_room_event_route( return Err!(Request(Forbidden("You don't have permission to view this event."))); } - debug_assert!( - event.event_id() == event_id && event.room_id() == room_id, - "Fetched PDU must match requested" - ); - event.add_age().ok(); Ok(get_room_event::v3::Response { event: event.into_format() }) diff --git a/src/service/rooms/timeline/backfill.rs b/src/service/rooms/timeline/backfill.rs index e976981e..dd613afd 100644 --- a/src/service/rooms/timeline/backfill.rs +++ b/src/service/rooms/timeline/backfill.rs @@ -1,5 +1,6 @@ use std::iter::once; +use conduwuit::{Err, PduEvent}; use conduwuit_core::{ Result, debug, debug_warn, implement, info, matrix::{ @@ -11,7 +12,7 @@ use conduwuit_core::{ }; use futures::{FutureExt, StreamExt}; use ruma::{ - RoomId, ServerName, + EventId, RoomId, ServerName, api::federation, events::{ StateEventType, TimelineEventType, room::power_levels::RoomPowerLevelsEventContent, @@ -100,7 +101,7 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re .boxed(); while let Some(ref backfill_server) = servers.next().await { - info!("Asking {backfill_server} for backfill"); + info!("Asking {backfill_server} for backfill in {room_id}"); let response = self .services .sending @@ -128,10 +129,117 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re } } - info!("No servers could backfill, but backfill was needed in room {room_id}"); + warn!("No servers could backfill, but backfill was needed in room {room_id}"); Ok(()) } +#[implement(super::Service)] +#[tracing::instrument(name = "get_remote_pdu", level = "debug", skip(self))] +pub async fn get_remote_pdu(&self, room_id: &RoomId, event_id: &EventId) -> Result { + let local = self.get_pdu(event_id).await; + if local.is_ok() { + // We already have this PDU, no need to backfill + debug!("We already have {event_id} in {room_id}, no need to backfill."); + return local; + } + debug!("Preparing to fetch event {event_id} in room {room_id} from remote servers."); + // Similar to backfill_if_required, but only for a single PDU + // Fetch a list of servers to try + if self + .services + .state_cache + .room_joined_count(room_id) + .await + .is_ok_and(|count| count <= 1) + && !self + .services + .state_accessor + .is_world_readable(room_id) + .await + { + // Room is empty (1 user or none), there is no one that can backfill + return Err!(Request(NotFound("No one can backfill this PDU, room is empty."))); + } + + let power_levels: RoomPowerLevelsEventContent = self + .services + .state_accessor + .room_state_get_content(room_id, &StateEventType::RoomPowerLevels, "") + .await + .unwrap_or_default(); + + let room_mods = power_levels.users.iter().filter_map(|(user_id, level)| { + if level > &power_levels.users_default && !self.services.globals.user_is_local(user_id) { + Some(user_id.server_name()) + } else { + None + } + }); + + let canonical_room_alias_server = once( + self.services + .state_accessor + .get_canonical_alias(room_id) + .await, + ) + .filter_map(Result::ok) + .map(|alias| alias.server_name().to_owned()) + .stream(); + let mut servers = room_mods + .stream() + .map(ToOwned::to_owned) + .chain(canonical_room_alias_server) + .chain( + self.services + .server + .config + .trusted_servers + .iter() + .map(ToOwned::to_owned) + .stream(), + ) + .ready_filter(|server_name| !self.services.globals.server_is_ours(server_name)) + .filter_map(|server_name| async move { + self.services + .state_cache + .server_in_room(&server_name, room_id) + .await + .then_some(server_name) + }) + .boxed(); + + while let Some(ref backfill_server) = servers.next().await { + info!("Asking {backfill_server} for event {}", event_id); + let response = self + .services + .sending + .send_federation_request(backfill_server, federation::event::get_event::v1::Request { + event_id: event_id.to_owned(), + include_unredacted_content: Some(false), + }) + .await; + let pdu = match response { + | Ok(response) => { + self.backfill_pdu(backfill_server, response.pdu) + .boxed() + .await?; + debug!("Successfully backfilled {event_id} from {backfill_server}"); + Some(self.get_pdu(event_id).await) + }, + | Err(e) => { + warn!("{backfill_server} failed to provide backfill for room {room_id}: {e}"); + None + }, + }; + if let Some(pdu) = pdu { + debug!("Fetched {event_id} from {backfill_server}"); + return pdu; + } + } + + Err!("No servers could be used to fetch {} in {}.", room_id, event_id) +} + #[implement(super::Service)] #[tracing::instrument(skip(self, pdu), level = "debug")] pub async fn backfill_pdu(&self, origin: &ServerName, pdu: Box) -> Result<()> { From 1ad26798a0b663d32967586a0965e71582fb8d94 Mon Sep 17 00:00:00 2001 From: Ginger Date: Tue, 2 Sep 2025 15:12:03 -0400 Subject: [PATCH 34/37] fix: Use `handle_incoming_pdu` directly to keep remote PDUs as outliers --- src/service/rooms/timeline/backfill.rs | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/src/service/rooms/timeline/backfill.rs b/src/service/rooms/timeline/backfill.rs index dd613afd..c0171691 100644 --- a/src/service/rooms/timeline/backfill.rs +++ b/src/service/rooms/timeline/backfill.rs @@ -2,7 +2,7 @@ use std::iter::once; use conduwuit::{Err, PduEvent}; use conduwuit_core::{ - Result, debug, debug_warn, implement, info, + Result, debug, debug_warn, err, implement, info, matrix::{ event::Event, pdu::{PduCount, PduId, RawPduId}, @@ -12,7 +12,7 @@ use conduwuit_core::{ }; use futures::{FutureExt, StreamExt}; use ruma::{ - EventId, RoomId, ServerName, + CanonicalJsonObject, EventId, RoomId, ServerName, api::federation, events::{ StateEventType, TimelineEventType, room::power_levels::RoomPowerLevelsEventContent, @@ -210,17 +210,26 @@ pub async fn get_remote_pdu(&self, room_id: &RoomId, event_id: &EventId) -> Resu while let Some(ref backfill_server) = servers.next().await { info!("Asking {backfill_server} for event {}", event_id); - let response = self + let value = self .services .sending .send_federation_request(backfill_server, federation::event::get_event::v1::Request { event_id: event_id.to_owned(), include_unredacted_content: Some(false), }) - .await; - let pdu = match response { - | Ok(response) => { - self.backfill_pdu(backfill_server, response.pdu) + .await + .and_then(|response| { + serde_json::from_str::(response.pdu.get()).map_err(|e| { + err!(BadServerResponse(debug_warn!( + "Error parsing incoming event {e:?} from {backfill_server}" + ))) + }) + }); + let pdu = match value { + | Ok(value) => { + self.services + .event_handler + .handle_incoming_pdu(backfill_server, &room_id, &event_id, value, false) .boxed() .await?; debug!("Successfully backfilled {event_id} from {backfill_server}"); From ac552b8829fd8f67dc42579ee3b4a01f309e015f Mon Sep 17 00:00:00 2001 From: Ginger Date: Thu, 4 Sep 2025 10:32:35 -0400 Subject: [PATCH 35/37] fix: Put the output of `!admin query room-timeline pdus` in a codeblock --- src/admin/query/room_timeline.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/admin/query/room_timeline.rs b/src/admin/query/room_timeline.rs index afcfec34..0f72b58c 100644 --- a/src/admin/query/room_timeline.rs +++ b/src/admin/query/room_timeline.rs @@ -57,5 +57,5 @@ pub(super) async fn pdus( .try_collect() .await?; - self.write_str(&format!("{result:#?}")).await + self.write_str(&format!("```\n{result:#?}\n```")).await } From 11c6b3ea0ae449b7a48220c0f9bbc91545dd8ad5 Mon Sep 17 00:00:00 2001 From: Ginger Date: Thu, 4 Sep 2025 10:33:43 -0400 Subject: [PATCH 36/37] fix: Fix pagination tokens being corrupted for backfilled PDUs --- src/api/client/message.rs | 6 +++--- src/api/client/relations.rs | 4 ++-- src/api/client/threads.rs | 4 ++-- src/api/client/utils.rs | 31 +++++------------------------- src/core/matrix/pdu/count.rs | 6 +++++- src/service/rooms/timeline/data.rs | 3 +-- 6 files changed, 18 insertions(+), 36 deletions(-) diff --git a/src/api/client/message.rs b/src/api/client/message.rs index f5a951f4..0145b7fe 100644 --- a/src/api/client/message.rs +++ b/src/api/client/message.rs @@ -35,7 +35,7 @@ use ruma::{ }; use tracing::warn; -use super::utils::{count_to_token, parse_pagination_token as parse_token}; +use super::utils::{count_to_pagination_token, pagination_token_to_count as parse_token}; use crate::Ruma; /// list of safe and common non-state events to ignore if the user is ignored @@ -181,8 +181,8 @@ pub(crate) async fn get_message_events_route( .collect(); Ok(get_message_events::v3::Response { - start: count_to_token(from), - end: next_token.map(count_to_token), + start: count_to_pagination_token(from), + end: next_token.map(count_to_pagination_token), chunk, state, }) diff --git a/src/api/client/relations.rs b/src/api/client/relations.rs index f6d8fe9e..f2ec3f23 100644 --- a/src/api/client/relations.rs +++ b/src/api/client/relations.rs @@ -18,7 +18,7 @@ use ruma::{ events::{TimelineEventType, relation::RelationType}, }; -use super::utils::{count_to_token, parse_pagination_token as parse_token}; +use super::utils::{count_to_pagination_token, pagination_token_to_count as parse_token}; use crate::Ruma; /// # `GET /_matrix/client/r0/rooms/{roomId}/relations/{eventId}/{relType}/{eventType}` @@ -193,7 +193,7 @@ async fn paginate_relations_with_filter( | Direction::Forward => events.last(), | Direction::Backward => events.first(), } - .map(|(count, _)| count_to_token(*count)) + .map(|(count, _)| count_to_pagination_token(*count)) } else { None }; diff --git a/src/api/client/threads.rs b/src/api/client/threads.rs index ca176eda..f0fb4a64 100644 --- a/src/api/client/threads.rs +++ b/src/api/client/threads.rs @@ -9,7 +9,7 @@ use conduwuit::{ use futures::StreamExt; use ruma::{api::client::threads::get_threads, uint}; -use crate::Ruma; +use crate::{Ruma, client::utils::pagination_token_to_count}; /// # `GET /_matrix/client/r0/rooms/{roomId}/threads` pub(crate) async fn get_threads_route( @@ -27,7 +27,7 @@ pub(crate) async fn get_threads_route( let from: PduCount = body .from .as_deref() - .map(str::parse) + .map(pagination_token_to_count) .transpose()? .unwrap_or_else(PduCount::max); diff --git a/src/api/client/utils.rs b/src/api/client/utils.rs index cc941b95..ec69388a 100644 --- a/src/api/client/utils.rs +++ b/src/api/client/utils.rs @@ -1,28 +1,7 @@ -use conduwuit::{ - Result, err, - matrix::pdu::{PduCount, ShortEventId}, -}; +use conduwuit::{Result, matrix::pdu::PduCount}; -/// Parse a pagination token, trying ShortEventId first, then falling back to -/// PduCount -pub(crate) fn parse_pagination_token(token: &str) -> Result { - // Try parsing as ShortEventId first - if let Ok(shorteventid) = token.parse::() { - // ShortEventId maps directly to a PduCount in our database - Ok(PduCount::Normal(shorteventid)) - } else if let Ok(count) = token.parse::() { - // Fallback to PduCount for backwards compatibility - Ok(PduCount::Normal(count)) - } else if let Ok(count) = token.parse::() { - // Also handle negative counts for backfilled events - Ok(PduCount::from_signed(count)) - } else { - Err(err!(Request(InvalidParam("Invalid pagination token")))) - } -} +/// Parse a pagination token +pub(crate) fn pagination_token_to_count(token: &str) -> Result { token.parse() } -/// Convert a PduCount to a token string (using the underlying ShortEventId) -pub(crate) fn count_to_token(count: PduCount) -> String { - // The PduCount's unsigned value IS the ShortEventId - count.into_unsigned().to_string() -} +/// Convert a PduCount to a token string +pub(crate) fn count_to_pagination_token(count: PduCount) -> String { count.to_string() } diff --git a/src/core/matrix/pdu/count.rs b/src/core/matrix/pdu/count.rs index b880278f..7fb12574 100644 --- a/src/core/matrix/pdu/count.rs +++ b/src/core/matrix/pdu/count.rs @@ -1,6 +1,10 @@ #![allow(clippy::cast_possible_wrap, clippy::cast_sign_loss, clippy::as_conversions)] -use std::{cmp::Ordering, fmt, fmt::Display, str::FromStr}; +use std::{ + cmp::Ordering, + fmt::{self, Display}, + str::FromStr, +}; use ruma::api::Direction; diff --git a/src/service/rooms/timeline/data.rs b/src/service/rooms/timeline/data.rs index fa10a5c0..9064df6c 100644 --- a/src/service/rooms/timeline/data.rs +++ b/src/service/rooms/timeline/data.rs @@ -3,8 +3,7 @@ use std::{borrow::Borrow, sync::Arc}; use conduwuit::{ Err, PduCount, PduEvent, Result, at, err, result::{LogErr, NotFound}, - utils, - utils::stream::TryReadyExt, + utils::{self, stream::TryReadyExt}, }; use database::{Database, Deserialized, Json, KeyVal, Map}; use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, future::select_ok, pin_mut}; From 8e7ee1e2100852ec9570616b3321a14151513630 Mon Sep 17 00:00:00 2001 From: Ginger Date: Thu, 4 Sep 2025 10:46:08 -0400 Subject: [PATCH 37/37] feat: Do not persist remote PDUs fetched with admin commands --- src/admin/debug/commands.rs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/admin/debug/commands.rs b/src/admin/debug/commands.rs index 81b0e9da..64f68330 100644 --- a/src/admin/debug/commands.rs +++ b/src/admin/debug/commands.rs @@ -281,15 +281,8 @@ pub(super) async fn get_remote_pdu( vec![(event_id, value, room_id)] }; - info!("Attempting to handle event ID {event_id} as backfilled PDU"); - self.services - .rooms - .timeline - .backfill_pdu(&server, response.pdu) - .await?; - let text = serde_json::to_string_pretty(&json)?; - let msg = "Got PDU from specified server and handled as backfilled"; + let msg = "Got PDU from specified server:"; write!(self, "{msg}. Event body:\n```json\n{text}\n```") }, }