diff --git a/conduwuit-example.toml b/conduwuit-example.toml index 07374aae..c2120ec0 100644 --- a/conduwuit-example.toml +++ b/conduwuit-example.toml @@ -1074,7 +1074,7 @@ # 3 to 5 = Statistics with possible performance impact. # 6 = All statistics. # -#rocksdb_stats_level = 1 +#rocksdb_stats_level = 3 # This is a password that can be configured that will let you login to the # server bot account (currently `@conduit`) for emergency troubleshooting @@ -1688,11 +1688,9 @@ #stream_amplification = 1024 # Number of sender task workers; determines sender parallelism. Default is -# '0' which means the value is determined internally, likely matching the -# number of tokio worker-threads or number of cores, etc. Override by -# setting a non-zero value. +# number of CPU cores. Override by setting a different value. # -#sender_workers = 0 +#sender_workers = 4 # Enables listener sockets; can be set to false to disable listening. This # option is intended for developer/diagnostic purposes only. diff --git a/src/admin/debug/commands.rs b/src/admin/debug/commands.rs index 81b0e9da..64f68330 100644 --- a/src/admin/debug/commands.rs +++ b/src/admin/debug/commands.rs @@ -281,15 +281,8 @@ pub(super) async fn get_remote_pdu( vec![(event_id, value, room_id)] }; - info!("Attempting to handle event ID {event_id} as backfilled PDU"); - self.services - .rooms - .timeline - .backfill_pdu(&server, response.pdu) - .await?; - let text = serde_json::to_string_pretty(&json)?; - let msg = "Got PDU from specified server and handled as backfilled"; + let msg = "Got PDU from specified server:"; write!(self, "{msg}. Event body:\n```json\n{text}\n```") }, } diff --git a/src/admin/query/room_timeline.rs b/src/admin/query/room_timeline.rs index afcfec34..0f72b58c 100644 --- a/src/admin/query/room_timeline.rs +++ b/src/admin/query/room_timeline.rs @@ -57,5 +57,5 @@ pub(super) async fn pdus( .try_collect() .await?; - self.write_str(&format!("{result:#?}")).await + self.write_str(&format!("```\n{result:#?}\n```")).await } diff --git a/src/api/client/message.rs b/src/api/client/message.rs index f5a951f4..0145b7fe 100644 --- a/src/api/client/message.rs +++ b/src/api/client/message.rs @@ -35,7 +35,7 @@ use ruma::{ }; use tracing::warn; -use super::utils::{count_to_token, parse_pagination_token as parse_token}; +use super::utils::{count_to_pagination_token, pagination_token_to_count as parse_token}; use crate::Ruma; /// list of safe and common non-state events to ignore if the user is ignored @@ -181,8 +181,8 @@ pub(crate) async fn get_message_events_route( .collect(); Ok(get_message_events::v3::Response { - start: count_to_token(from), - end: next_token.map(count_to_token), + start: count_to_pagination_token(from), + end: next_token.map(count_to_pagination_token), chunk, state, }) diff --git a/src/api/client/relations.rs b/src/api/client/relations.rs index f6d8fe9e..f2ec3f23 100644 --- a/src/api/client/relations.rs +++ b/src/api/client/relations.rs @@ -18,7 +18,7 @@ use ruma::{ events::{TimelineEventType, relation::RelationType}, }; -use super::utils::{count_to_token, parse_pagination_token as parse_token}; +use super::utils::{count_to_pagination_token, pagination_token_to_count as parse_token}; use crate::Ruma; /// # `GET /_matrix/client/r0/rooms/{roomId}/relations/{eventId}/{relType}/{eventType}` @@ -193,7 +193,7 @@ async fn paginate_relations_with_filter( | Direction::Forward => events.last(), | Direction::Backward => events.first(), } - .map(|(count, _)| count_to_token(*count)) + .map(|(count, _)| count_to_pagination_token(*count)) } else { None }; diff --git a/src/api/client/room/event.rs b/src/api/client/room/event.rs index 47228d67..75ae0758 100644 --- a/src/api/client/room/event.rs +++ b/src/api/client/room/event.rs @@ -18,7 +18,7 @@ pub(crate) async fn get_room_event_route( let event = services .rooms .timeline - .get_pdu(event_id) + .get_remote_pdu(room_id, event_id) .map_err(|_| err!(Request(NotFound("Event {} not found.", event_id)))); let visible = services @@ -33,11 +33,6 @@ pub(crate) async fn get_room_event_route( return Err!(Request(Forbidden("You don't have permission to view this event."))); } - debug_assert!( - event.event_id() == event_id && event.room_id() == room_id, - "Fetched PDU must match requested" - ); - event.add_age().ok(); Ok(get_room_event::v3::Response { event: event.into_format() }) diff --git a/src/api/client/threads.rs b/src/api/client/threads.rs index ca176eda..f0fb4a64 100644 --- a/src/api/client/threads.rs +++ b/src/api/client/threads.rs @@ -9,7 +9,7 @@ use conduwuit::{ use futures::StreamExt; use ruma::{api::client::threads::get_threads, uint}; -use crate::Ruma; +use crate::{Ruma, client::utils::pagination_token_to_count}; /// # `GET /_matrix/client/r0/rooms/{roomId}/threads` pub(crate) async fn get_threads_route( @@ -27,7 +27,7 @@ pub(crate) async fn get_threads_route( let from: PduCount = body .from .as_deref() - .map(str::parse) + .map(pagination_token_to_count) .transpose()? .unwrap_or_else(PduCount::max); diff --git a/src/api/client/utils.rs b/src/api/client/utils.rs index cc941b95..ec69388a 100644 --- a/src/api/client/utils.rs +++ b/src/api/client/utils.rs @@ -1,28 +1,7 @@ -use conduwuit::{ - Result, err, - matrix::pdu::{PduCount, ShortEventId}, -}; +use conduwuit::{Result, matrix::pdu::PduCount}; -/// Parse a pagination token, trying ShortEventId first, then falling back to -/// PduCount -pub(crate) fn parse_pagination_token(token: &str) -> Result { - // Try parsing as ShortEventId first - if let Ok(shorteventid) = token.parse::() { - // ShortEventId maps directly to a PduCount in our database - Ok(PduCount::Normal(shorteventid)) - } else if let Ok(count) = token.parse::() { - // Fallback to PduCount for backwards compatibility - Ok(PduCount::Normal(count)) - } else if let Ok(count) = token.parse::() { - // Also handle negative counts for backfilled events - Ok(PduCount::from_signed(count)) - } else { - Err(err!(Request(InvalidParam("Invalid pagination token")))) - } -} +/// Parse a pagination token +pub(crate) fn pagination_token_to_count(token: &str) -> Result { token.parse() } -/// Convert a PduCount to a token string (using the underlying ShortEventId) -pub(crate) fn count_to_token(count: PduCount) -> String { - // The PduCount's unsigned value IS the ShortEventId - count.into_unsigned().to_string() -} +/// Convert a PduCount to a token string +pub(crate) fn count_to_pagination_token(count: PduCount) -> String { count.to_string() } diff --git a/src/api/server/send.rs b/src/api/server/send.rs index 9c5bfd2b..94991455 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -3,9 +3,9 @@ use std::{collections::BTreeMap, net::IpAddr, time::Instant}; use axum::extract::State; use axum_client_ip::InsecureClientIp; use conduwuit::{ - Err, Error, Result, debug, + Err, Error, Result, debug::INFO_SPAN_LEVEL, - debug_warn, err, error, + debug_warn, err, error, info, result::LogErr, trace, utils::{ @@ -79,23 +79,14 @@ pub(crate) async fn send_transaction_message_route( } let txn_start_time = Instant::now(); + let transaction_id = body.transaction_id.to_string(); trace!( pdus = body.pdus.len(), edus = body.edus.len(), - elapsed = ?txn_start_time.elapsed(), - id = ?body.transaction_id, - origin =?body.origin(), - "Starting txn", + id = transaction_id, + "Processing transaction", ); - let pdus = body - .pdus - .iter() - .stream() - .broad_then(|pdu| services.rooms.event_handler.parse_incoming_pdu(pdu)) - .inspect_err(|e| debug_warn!("Could not parse PDU: {e}")) - .ready_filter_map(Result::ok); - let edus = body .edus .iter() @@ -104,14 +95,29 @@ pub(crate) async fn send_transaction_message_route( .filter_map(Result::ok) .stream(); - let results = handle(&services, &client, body.origin(), txn_start_time, pdus, edus).await?; + let pdus = body + .pdus + .iter() + .stream() + .broad_then(|pdu| services.rooms.event_handler.parse_incoming_pdu(pdu)) + .inspect_err(|e| debug_warn!("Could not parse PDU: {e}")) + .ready_filter_map(Result::ok); - debug!( + trace!( pdus = body.pdus.len(), edus = body.edus.len(), elapsed = ?txn_start_time.elapsed(), - id = ?body.transaction_id, - origin =?body.origin(), + id = transaction_id, + "Validated transaction", + ); + + let results = handle(&services, &client, body.origin(), txn_start_time, pdus, edus, &*transaction_id).await?; + + info!( + pdus = body.pdus.len(), + edus = body.edus.len(), + elapsed = ?txn_start_time.elapsed(), + id = transaction_id, "Finished txn", ); for (id, result) in &results { @@ -137,7 +143,13 @@ async fn handle( started: Instant, pdus: impl Stream + Send, edus: impl Stream + Send, + transaction_id: &str, ) -> Result { + let handle_start = Instant::now(); + edus.for_each_concurrent(automatic_width(), |edu| handle_edu(services, client, origin, edu)) + .boxed() + .await; + let pdu_start = Instant::now(); // group pdus by room let pdus = pdus .collect() @@ -148,13 +160,14 @@ async fn handle( .collect() }) .await; - + let results_start = Instant::now(); // we can evaluate rooms concurrently let results: ResolvedMap = pdus .into_iter() .try_stream() .broad_and_then(|(room_id, pdus): (_, Vec<_>)| { - handle_room(services, client, origin, started, room_id, pdus.into_iter()) + let count = pdus.len(); + handle_room(services, client, origin, started, room_id, pdus.into_iter(), count, transaction_id) .map_ok(Vec::into_iter) .map_ok(IterStream::try_stream) }) @@ -162,12 +175,14 @@ async fn handle( .try_collect() .boxed() .await?; - - // evaluate edus after pdus, at least for now. - edus.for_each_concurrent(automatic_width(), |edu| handle_edu(services, client, origin, edu)) - .boxed() - .await; - + let handle_stop = Instant::now(); + info!( + edus = pdu_start.saturating_duration_since(handle_start).as_micros(), + pdus = results_start.saturating_duration_since(pdu_start).as_micros(), + handle_room = handle_stop.saturating_duration_since(results_start).as_micros(), + transaction_id = ?transaction_id, + "handled incoming transaction", + ); Ok(results) } @@ -178,19 +193,35 @@ async fn handle_room( txn_start_time: Instant, room_id: OwnedRoomId, pdus: impl Iterator + Send, + count: usize, + transaction_id: &str, ) -> Result> { + let room_lock_start = Instant::now(); let _room_lock = services .rooms .event_handler .mutex_federation .lock(&room_id) .await; + let room_lock_end = Instant::now(); let room_id = &room_id; + let mut n = 0; pdus.try_stream() .and_then(|(_, event_id, value)| async move { services.server.check_running()?; let pdu_start_time = Instant::now(); + trace!( + %room_id, + %event_id, + transaction_id = ?transaction_id, + pdu = n + 1, + total = count, + room_lock_time = ?room_lock_end.saturating_duration_since(room_lock_start).as_micros(), + pdu_elapsed = ?pdu_start_time.elapsed(), + txn_elapsed = ?txn_start_time.elapsed(), + "Handling PDU", + ); let result = services .rooms .event_handler @@ -198,11 +229,18 @@ async fn handle_room( .await .map(|_| ()); - debug!( + info!( + %room_id, + %event_id, + transaction_id = ?transaction_id, + pdu = n + 1, + total = count, + room_lock_time = ?room_lock_end.saturating_duration_since(room_lock_start).as_micros(), pdu_elapsed = ?pdu_start_time.elapsed(), txn_elapsed = ?txn_start_time.elapsed(), - "Finished PDU {event_id}", + "Finished handling PDU", ); + n += 1; Ok((event_id, result)) }) diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index b6f6ab53..f29b823d 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -1247,7 +1247,7 @@ pub struct Config { /// 3 to 5 = Statistics with possible performance impact. /// 6 = All statistics. /// - /// default: 1 + /// default: 3 #[serde(default = "default_rocksdb_stats_level")] pub rocksdb_stats_level: u8, @@ -1929,12 +1929,10 @@ pub struct Config { pub stream_amplification: usize, /// Number of sender task workers; determines sender parallelism. Default is - /// '0' which means the value is determined internally, likely matching the - /// number of tokio worker-threads or number of cores, etc. Override by - /// setting a non-zero value. + /// core count. Override by setting a different value. /// - /// default: 0 - #[serde(default)] + /// default: core count + #[serde(default = "default_sender_workers")] pub sender_workers: usize, /// Enables listener sockets; can be set to false to disable listening. This @@ -2277,45 +2275,48 @@ fn default_database_backups_to_keep() -> i16 { 1 } fn default_db_write_buffer_capacity_mb() -> f64 { 48.0 + parallelism_scaled_f64(4.0) } -fn default_db_cache_capacity_mb() -> f64 { 128.0 + parallelism_scaled_f64(64.0) } +fn default_db_cache_capacity_mb() -> f64 { 512.0 + parallelism_scaled_f64(512.0) } -fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(10_000).saturating_add(100_000) } +fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(50_000).saturating_add(100_000) } fn default_cache_capacity_modifier() -> f64 { 1.0 } fn default_auth_chain_cache_capacity() -> u32 { - parallelism_scaled_u32(10_000).saturating_add(100_000) -} - -fn default_shorteventid_cache_capacity() -> u32 { parallelism_scaled_u32(50_000).saturating_add(100_000) } +fn default_shorteventid_cache_capacity() -> u32 { + parallelism_scaled_u32(100_000).saturating_add(100_000) +} + fn default_eventidshort_cache_capacity() -> u32 { - parallelism_scaled_u32(25_000).saturating_add(100_000) + parallelism_scaled_u32(50_000).saturating_add(100_000) } fn default_eventid_pdu_cache_capacity() -> u32 { - parallelism_scaled_u32(25_000).saturating_add(100_000) + parallelism_scaled_u32(50_000).saturating_add(100_000) } fn default_shortstatekey_cache_capacity() -> u32 { - parallelism_scaled_u32(10_000).saturating_add(100_000) + parallelism_scaled_u32(50_000).saturating_add(100_000) } fn default_statekeyshort_cache_capacity() -> u32 { - parallelism_scaled_u32(10_000).saturating_add(100_000) + parallelism_scaled_u32(50_000).saturating_add(100_000) } fn default_servernameevent_data_cache_capacity() -> u32 { - parallelism_scaled_u32(100_000).saturating_add(500_000) + parallelism_scaled_u32(100_000).saturating_add(100_000) } -fn default_stateinfo_cache_capacity() -> u32 { parallelism_scaled_u32(100) } +fn default_stateinfo_cache_capacity() -> u32 { + parallelism_scaled_u32(500).clamp(100, 12000) +} -fn default_roomid_spacehierarchy_cache_capacity() -> u32 { parallelism_scaled_u32(1000) } +fn default_roomid_spacehierarchy_cache_capacity() -> u32 { + parallelism_scaled_u32(500).clamp(100, 12000) } -fn default_dns_cache_entries() -> u32 { 32768 } +fn default_dns_cache_entries() -> u32 { 327680 } fn default_dns_min_ttl() -> u64 { 60 * 180 } @@ -2421,7 +2422,7 @@ fn default_typing_client_timeout_max_s() -> u64 { 45 } fn default_rocksdb_recovery_mode() -> u8 { 1 } -fn default_rocksdb_log_level() -> String { "error".to_owned() } +fn default_rocksdb_log_level() -> String { "info".to_owned() } fn default_rocksdb_log_time_to_roll() -> usize { 0 } @@ -2453,7 +2454,7 @@ fn default_rocksdb_compression_level() -> i32 { 32767 } #[allow(clippy::doc_markdown)] fn default_rocksdb_bottommost_compression_level() -> i32 { 32767 } -fn default_rocksdb_stats_level() -> u8 { 1 } +fn default_rocksdb_stats_level() -> u8 { 3 } // I know, it's a great name #[must_use] @@ -2508,14 +2509,13 @@ fn default_admin_log_capture() -> String { fn default_admin_room_tag() -> String { "m.server_notice".to_owned() } #[allow(clippy::as_conversions, clippy::cast_precision_loss)] -fn parallelism_scaled_f64(val: f64) -> f64 { val * (sys::available_parallelism() as f64) } +pub fn parallelism_scaled_f64(val: f64) -> f64 { val * (sys::available_parallelism() as f64) } -fn parallelism_scaled_u32(val: u32) -> u32 { - let val = val.try_into().expect("failed to cast u32 to usize"); - parallelism_scaled(val).try_into().unwrap_or(u32::MAX) -} +pub fn parallelism_scaled_u32(val: u32) -> u32 { val.saturating_mul(sys::available_parallelism() as u32) } -fn parallelism_scaled(val: usize) -> usize { val.saturating_mul(sys::available_parallelism()) } +pub fn parallelism_scaled_i32(val: i32) -> i32 { val.saturating_mul(sys::available_parallelism() as i32) } + +pub fn parallelism_scaled(val: usize) -> usize { val.saturating_mul(sys::available_parallelism()) } fn default_trusted_server_batch_size() -> usize { 256 } @@ -2535,6 +2535,8 @@ fn default_stream_width_scale() -> f32 { 1.0 } fn default_stream_amplification() -> usize { 1024 } +fn default_sender_workers() -> usize { parallelism_scaled(1) } + fn default_client_receive_timeout() -> u64 { 75 } fn default_client_request_timeout() -> u64 { 180 } diff --git a/src/core/matrix/pdu/count.rs b/src/core/matrix/pdu/count.rs index b880278f..7fb12574 100644 --- a/src/core/matrix/pdu/count.rs +++ b/src/core/matrix/pdu/count.rs @@ -1,6 +1,10 @@ #![allow(clippy::cast_possible_wrap, clippy::cast_sign_loss, clippy::as_conversions)] -use std::{cmp::Ordering, fmt, fmt::Display, str::FromStr}; +use std::{ + cmp::Ordering, + fmt::{self, Display}, + str::FromStr, +}; use ruma::api::Direction; diff --git a/src/core/matrix/state_res/event_auth.rs b/src/core/matrix/state_res/event_auth.rs index 77a4a95c..ec70d684 100644 --- a/src/core/matrix/state_res/event_auth.rs +++ b/src/core/matrix/state_res/event_auth.rs @@ -13,6 +13,7 @@ use ruma::{ power_levels::RoomPowerLevelsEventContent, third_party_invite::RoomThirdPartyInviteEventContent, }, + EventId, int, serde::{Base64, Raw}, }; @@ -21,7 +22,6 @@ use serde::{ de::{Error as _, IgnoredAny}, }; use serde_json::{from_str as from_json_str, value::RawValue as RawJsonValue}; - use super::{ Error, Event, Result, StateEventType, StateKey, TimelineEventType, power_levels::{ @@ -149,8 +149,8 @@ where for<'a> &'a E: Event + Send, { debug!( - event_id = %incoming_event.event_id(), - event_type = ?incoming_event.event_type(), + event_id = format!("{}", incoming_event.event_id()), + event_type = format!("{}", incoming_event.event_type()), "auth_check beginning" ); @@ -219,7 +219,7 @@ where /* // TODO: In the past this code was commented as it caused problems with Synapse. This is no // longer the case. This needs to be implemented. - // See also: https://github.com/ruma/ruma/pull/2064 + // See also: https://github.com/ruma/ruma/pull/2064 // // 2. Reject if auth_events // a. auth_events cannot have duplicate keys since it's a BTree @@ -242,20 +242,44 @@ where } */ - let (room_create_event, power_levels_event, sender_member_event) = join3( - fetch_state(&StateEventType::RoomCreate, ""), - fetch_state(&StateEventType::RoomPowerLevels, ""), - fetch_state(&StateEventType::RoomMember, sender.as_str()), - ) - .await; + // let (room_create_event, power_levels_event, sender_member_event) = join3( + // fetch_state(&StateEventType::RoomCreate, ""), + // fetch_state(&StateEventType::RoomPowerLevels, ""), + // fetch_state(&StateEventType::RoomMember, sender.as_str()), + // ) + // .await; + + let room_create_event = fetch_state(&StateEventType::RoomCreate, "").await; + let power_levels_event = fetch_state(&StateEventType::RoomPowerLevels, "").await; + let sender_member_event = fetch_state(&StateEventType::RoomMember, sender.as_str()).await; let room_create_event = match room_create_event { | None => { - warn!("no m.room.create event in auth chain"); + error!( + create_event = room_create_event.as_ref().map(Event::event_id).unwrap_or(<&EventId>::try_from("$unknown").unwrap()).as_str(), + power_levels = power_levels_event.as_ref().map(Event::event_id).unwrap_or(<&EventId>::try_from("$unknown").unwrap()).as_str(), + member_event = sender_member_event.as_ref().map(Event::event_id).unwrap_or(<&EventId>::try_from("$unknown").unwrap()).as_str(), + "no m.room.create event found for {} ({})!", + incoming_event.event_id().as_str(), + incoming_event.room_id().as_str() + ); return Ok(false); }, | Some(e) => e, }; + // just re-check 1.2 to work around a bug + let Some(room_id_server_name) = incoming_event.room_id().server_name() else { + warn!("room ID has no servername"); + return Ok(false); + }; + + if room_id_server_name != room_create_event.sender().server_name() { + warn!( + "servername of room ID origin ({}) does not match servername of m.room.create sender ({})", + room_id_server_name, + room_create_event.sender().server_name()); + return Ok(false); + } if incoming_event.room_id() != room_create_event.room_id() { warn!("room_id of incoming event does not match room_id of m.room.create event"); diff --git a/src/core/matrix/state_res/mod.rs b/src/core/matrix/state_res/mod.rs index ce9d9276..ba9c013d 100644 --- a/src/core/matrix/state_res/mod.rs +++ b/src/core/matrix/state_res/mod.rs @@ -733,8 +733,12 @@ where Fut: Future> + Send, E: Event + Send + Sync, { + let mut room_id = None; while let Some(sort_ev) = event { debug!(event_id = sort_ev.event_id().as_str(), "mainline"); + if room_id.is_none() { + room_id = Some(sort_ev.room_id().to_owned()); + } let id = sort_ev.event_id(); if let Some(depth) = mainline_map.get(id) { @@ -753,7 +757,7 @@ where } } } - // Did not find a power level event so we default to zero + warn!("could not find a power event in the mainline map for {room_id:?}, defaulting to zero depth"); Ok(0) } diff --git a/src/database/engine/cf_opts.rs b/src/database/engine/cf_opts.rs index 58358f02..b8d0b307 100644 --- a/src/database/engine/cf_opts.rs +++ b/src/database/engine/cf_opts.rs @@ -29,7 +29,7 @@ fn descriptor_cf_options( set_table_options(&mut opts, &desc, cache)?; opts.set_min_write_buffer_number(1); - opts.set_max_write_buffer_number(2); + opts.set_max_write_buffer_number(3); opts.set_write_buffer_size(desc.write_size); opts.set_target_file_size_base(desc.file_size); diff --git a/src/database/engine/db_opts.rs b/src/database/engine/db_opts.rs index 18cec742..1299443d 100644 --- a/src/database/engine/db_opts.rs +++ b/src/database/engine/db_opts.rs @@ -1,8 +1,6 @@ -use std::{cmp, convert::TryFrom}; - -use conduwuit::{Config, Result, utils}; +use conduwuit::{Config, Result}; use rocksdb::{Cache, DBRecoveryMode, Env, LogLevel, Options, statistics::StatsLevel}; - +use conduwuit::config::{parallelism_scaled_i32, parallelism_scaled_u32}; use super::{cf_opts::cache_size_f64, logger::handle as handle_log}; /// Create database-wide options suitable for opening the database. This also @@ -23,8 +21,8 @@ pub(crate) fn db_options(config: &Config, env: &Env, row_cache: &Cache) -> Resul set_logging_defaults(&mut opts, config); // Processing - opts.set_max_background_jobs(num_threads::(config)?); - opts.set_max_subcompactions(num_threads::(config)?); + opts.set_max_background_jobs(parallelism_scaled_i32(1)); + opts.set_max_subcompactions(parallelism_scaled_u32(1)); opts.set_avoid_unnecessary_blocking_io(true); opts.set_max_file_opening_threads(0); @@ -126,15 +124,3 @@ fn set_logging_defaults(opts: &mut Options, config: &Config) { opts.set_callback_logger(rocksdb_log_level, &handle_log); } } - -fn num_threads>(config: &Config) -> Result { - const MIN_PARALLELISM: usize = 2; - - let requested = if config.rocksdb_parallelism_threads != 0 { - config.rocksdb_parallelism_threads - } else { - utils::available_parallelism() - }; - - utils::math::try_into::(cmp::max(MIN_PARALLELISM, requested)) -} diff --git a/src/service/federation/execute.rs b/src/service/federation/execute.rs index 1d1d1154..5fe10ec9 100644 --- a/src/service/federation/execute.rs +++ b/src/service/federation/execute.rs @@ -1,10 +1,11 @@ -use std::{fmt::Debug, mem}; +use std::{ + error::Error as _, + fmt::{Debug, Write}, + mem, +}; use bytes::Bytes; -use conduwuit::{ - Err, Error, Result, debug, debug::INFO_SPAN_LEVEL, debug_error, debug_warn, err, - error::inspect_debug_log, implement, trace, utils::string::EMPTY, -}; +use conduwuit::{Err, Error, Result, debug, debug::INFO_SPAN_LEVEL, err, error::inspect_debug_log, implement, trace, utils::string::EMPTY, info}; use http::{HeaderValue, header::AUTHORIZATION}; use ipaddress::IPAddress; use reqwest::{Client, Method, Request, Response, Url}; @@ -193,9 +194,9 @@ fn handle_error( ) -> Result { if e.is_timeout() || e.is_connect() { e = e.without_url(); - debug_warn!("{e:?}"); + trace!(?url, "network error while sending federation request: {e:?}"); } else if e.is_redirect() { - debug_error!( + trace!( method = ?method, url = ?url, final_url = ?e.url(), @@ -204,9 +205,17 @@ fn handle_error( e, ); } else { - debug_error!("{e:?}"); + trace!(?url, "failed to send federation request: {e:?}"); } + let mut nice_error = "Request failed".to_owned(); + let mut src = e.source(); + while let Some(source) = src { + write!(nice_error, ": {source:?}").expect("writing to string should not fail"); + src = source.source(); + } + info!(nice_error, "Federation request error"); + Err(e.into()) } diff --git a/src/service/presence/mod.rs b/src/service/presence/mod.rs index e7ce64bc..1de99541 100644 --- a/src/service/presence/mod.rs +++ b/src/service/presence/mod.rs @@ -100,7 +100,7 @@ impl Service { /// Pings the presence of the given user in the given room, setting the /// specified state. pub async fn ping_presence(&self, user_id: &UserId, new_state: &PresenceState) -> Result<()> { - const REFRESH_TIMEOUT: u64 = 60 * 1000; + const REFRESH_TIMEOUT: u64 = 60 * 1000 * 4; let last_presence = self.db.get_presence(user_id).await; let state_changed = match last_presence { diff --git a/src/service/resolver/dns.rs b/src/service/resolver/dns.rs index 3a0b2551..6856853e 100644 --- a/src/service/resolver/dns.rs +++ b/src/service/resolver/dns.rs @@ -53,9 +53,9 @@ impl Resolver { opts.cache_size = config.dns_cache_entries as usize; opts.preserve_intermediates = true; opts.negative_min_ttl = Some(Duration::from_secs(config.dns_min_ttl_nxdomain)); - opts.negative_max_ttl = Some(Duration::from_secs(60 * 60 * 24 * 30)); + opts.negative_max_ttl = Some(Duration::from_secs(60 * 60 * 24)); opts.positive_min_ttl = Some(Duration::from_secs(config.dns_min_ttl)); - opts.positive_max_ttl = Some(Duration::from_secs(60 * 60 * 24 * 7)); + opts.positive_max_ttl = Some(Duration::from_secs(60 * 60 * 24)); opts.timeout = Duration::from_secs(config.dns_timeout); opts.attempts = config.dns_attempts as usize; opts.try_tcp_on_error = config.dns_tcp_fallback; diff --git a/src/service/rooms/event_handler/fetch_and_handle_outliers.rs b/src/service/rooms/event_handler/fetch_and_handle_outliers.rs index 59b768f2..51d10d71 100644 --- a/src/service/rooms/event_handler/fetch_and_handle_outliers.rs +++ b/src/service/rooms/event_handler/fetch_and_handle_outliers.rs @@ -3,11 +3,7 @@ use std::{ time::Instant, }; -use conduwuit::{ - Event, PduEvent, debug, debug_error, debug_warn, implement, - matrix::event::gen_event_id_canonical_json, trace, utils::continue_exponential_backoff_secs, - warn, -}; +use conduwuit::{Event, PduEvent, debug, debug_error, implement, matrix::event::gen_event_id_canonical_json, trace, utils::continue_exponential_backoff_secs, warn, debug_warn}; use ruma::{ CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName, api::federation::event::get_event, @@ -79,7 +75,7 @@ where { // Exponential backoff const MIN_DURATION: u64 = 60 * 2; - const MAX_DURATION: u64 = 60 * 60 * 8; + const MAX_DURATION: u64 = 60 * 60; if continue_exponential_backoff_secs( MIN_DURATION, MAX_DURATION, diff --git a/src/service/rooms/event_handler/handle_outlier_pdu.rs b/src/service/rooms/event_handler/handle_outlier_pdu.rs index d79eed77..3b8653e7 100644 --- a/src/service/rooms/event_handler/handle_outlier_pdu.rs +++ b/src/service/rooms/event_handler/handle_outlier_pdu.rs @@ -122,11 +122,8 @@ where } // The original create event must be in the auth events - if !matches!( - auth_events.get(&(StateEventType::RoomCreate, String::new().into())), - Some(_) | None - ) { - return Err!(Request(InvalidParam("Incoming event refers to wrong create event."))); + if !auth_events.contains_key(&(StateEventType::RoomCreate, String::new().into())) { + return Err!(Request(InvalidParam("Incoming event refers to wrong create event. event_id={event_id}"))); } let state_fetch = |ty: &StateEventType, sk: &str| { diff --git a/src/service/rooms/event_handler/handle_prev_pdu.rs b/src/service/rooms/event_handler/handle_prev_pdu.rs index cb4978d9..c786599a 100644 --- a/src/service/rooms/event_handler/handle_prev_pdu.rs +++ b/src/service/rooms/event_handler/handle_prev_pdu.rs @@ -46,7 +46,7 @@ where { // Exponential backoff const MIN_DURATION: u64 = 5 * 60; - const MAX_DURATION: u64 = 60 * 60 * 24; + const MAX_DURATION: u64 = 60 * 60; if continue_exponential_backoff_secs(MIN_DURATION, MAX_DURATION, time.elapsed(), *tries) { debug!( ?tries, diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index d2e0623c..4a64c017 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -5,7 +5,7 @@ use conduwuit::{ matrix::{Event, EventTypeExt, PduEvent, StateKey, state_res}, trace, utils::stream::{BroadbandExt, ReadyExt}, - warn, + warn }; use futures::{FutureExt, StreamExt, future::ready}; use ruma::{CanonicalJsonValue, RoomId, ServerName, events::StateEventType}; @@ -193,6 +193,8 @@ where .collect() .await; + if extremities.len() == 0 { info!("Retained zero extremities when upgrading outlier PDU to timeline PDU with {} previous events, event id: {}", incoming_pdu.prev_events.len(), incoming_pdu.event_id) } + debug!( "Retained {} extremities checked against {} prev_events", extremities.len(), diff --git a/src/service/rooms/timeline/backfill.rs b/src/service/rooms/timeline/backfill.rs index e976981e..c0171691 100644 --- a/src/service/rooms/timeline/backfill.rs +++ b/src/service/rooms/timeline/backfill.rs @@ -1,7 +1,8 @@ use std::iter::once; +use conduwuit::{Err, PduEvent}; use conduwuit_core::{ - Result, debug, debug_warn, implement, info, + Result, debug, debug_warn, err, implement, info, matrix::{ event::Event, pdu::{PduCount, PduId, RawPduId}, @@ -11,7 +12,7 @@ use conduwuit_core::{ }; use futures::{FutureExt, StreamExt}; use ruma::{ - RoomId, ServerName, + CanonicalJsonObject, EventId, RoomId, ServerName, api::federation, events::{ StateEventType, TimelineEventType, room::power_levels::RoomPowerLevelsEventContent, @@ -100,7 +101,7 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re .boxed(); while let Some(ref backfill_server) = servers.next().await { - info!("Asking {backfill_server} for backfill"); + info!("Asking {backfill_server} for backfill in {room_id}"); let response = self .services .sending @@ -128,10 +129,126 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re } } - info!("No servers could backfill, but backfill was needed in room {room_id}"); + warn!("No servers could backfill, but backfill was needed in room {room_id}"); Ok(()) } +#[implement(super::Service)] +#[tracing::instrument(name = "get_remote_pdu", level = "debug", skip(self))] +pub async fn get_remote_pdu(&self, room_id: &RoomId, event_id: &EventId) -> Result { + let local = self.get_pdu(event_id).await; + if local.is_ok() { + // We already have this PDU, no need to backfill + debug!("We already have {event_id} in {room_id}, no need to backfill."); + return local; + } + debug!("Preparing to fetch event {event_id} in room {room_id} from remote servers."); + // Similar to backfill_if_required, but only for a single PDU + // Fetch a list of servers to try + if self + .services + .state_cache + .room_joined_count(room_id) + .await + .is_ok_and(|count| count <= 1) + && !self + .services + .state_accessor + .is_world_readable(room_id) + .await + { + // Room is empty (1 user or none), there is no one that can backfill + return Err!(Request(NotFound("No one can backfill this PDU, room is empty."))); + } + + let power_levels: RoomPowerLevelsEventContent = self + .services + .state_accessor + .room_state_get_content(room_id, &StateEventType::RoomPowerLevels, "") + .await + .unwrap_or_default(); + + let room_mods = power_levels.users.iter().filter_map(|(user_id, level)| { + if level > &power_levels.users_default && !self.services.globals.user_is_local(user_id) { + Some(user_id.server_name()) + } else { + None + } + }); + + let canonical_room_alias_server = once( + self.services + .state_accessor + .get_canonical_alias(room_id) + .await, + ) + .filter_map(Result::ok) + .map(|alias| alias.server_name().to_owned()) + .stream(); + let mut servers = room_mods + .stream() + .map(ToOwned::to_owned) + .chain(canonical_room_alias_server) + .chain( + self.services + .server + .config + .trusted_servers + .iter() + .map(ToOwned::to_owned) + .stream(), + ) + .ready_filter(|server_name| !self.services.globals.server_is_ours(server_name)) + .filter_map(|server_name| async move { + self.services + .state_cache + .server_in_room(&server_name, room_id) + .await + .then_some(server_name) + }) + .boxed(); + + while let Some(ref backfill_server) = servers.next().await { + info!("Asking {backfill_server} for event {}", event_id); + let value = self + .services + .sending + .send_federation_request(backfill_server, federation::event::get_event::v1::Request { + event_id: event_id.to_owned(), + include_unredacted_content: Some(false), + }) + .await + .and_then(|response| { + serde_json::from_str::(response.pdu.get()).map_err(|e| { + err!(BadServerResponse(debug_warn!( + "Error parsing incoming event {e:?} from {backfill_server}" + ))) + }) + }); + let pdu = match value { + | Ok(value) => { + self.services + .event_handler + .handle_incoming_pdu(backfill_server, &room_id, &event_id, value, false) + .boxed() + .await?; + debug!("Successfully backfilled {event_id} from {backfill_server}"); + Some(self.get_pdu(event_id).await) + }, + | Err(e) => { + warn!("{backfill_server} failed to provide backfill for room {room_id}: {e}"); + None + }, + }; + if let Some(pdu) = pdu { + debug!("Fetched {event_id} from {backfill_server}"); + return pdu; + } + } + + Err!("No servers could be used to fetch {} in {}.", room_id, event_id) +} + #[implement(super::Service)] #[tracing::instrument(skip(self, pdu), level = "debug")] pub async fn backfill_pdu(&self, origin: &ServerName, pdu: Box) -> Result<()> { diff --git a/src/service/rooms/timeline/data.rs b/src/service/rooms/timeline/data.rs index fa10a5c0..9064df6c 100644 --- a/src/service/rooms/timeline/data.rs +++ b/src/service/rooms/timeline/data.rs @@ -3,8 +3,7 @@ use std::{borrow::Borrow, sync::Arc}; use conduwuit::{ Err, PduCount, PduEvent, Result, at, err, result::{LogErr, NotFound}, - utils, - utils::stream::TryReadyExt, + utils::{self, stream::TryReadyExt}, }; use database::{Database, Deserialized, Json, KeyVal, Map}; use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, future::select_ok, pin_mut}; diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index 08ca7010..ce687551 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -401,16 +401,10 @@ impl Service { fn num_senders(args: &crate::Args<'_>) -> usize { const MIN_SENDERS: usize = 1; - // Limit the number of senders to the number of workers threads or number of - // cores, conservatively. - let max_senders = args - .server - .metrics - .num_workers() - .min(available_parallelism()); + // Limit the maximum number of senders to the number of cores. + let max_senders = available_parallelism(); - // If the user doesn't override the default 0, this is intended to then default - // to 1 for now as multiple senders is experimental. + // default is 4 senders. clamp between 1 and core count. args.server .config .sender_workers diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index a708f746..ac82669f 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -10,7 +10,7 @@ use std::{ use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD}; use conduwuit_core::{ - Error, Event, Result, debug, err, error, + Error, Event, Result, debug, err, error, info, result::LogErr, trace, utils::{ @@ -142,7 +142,7 @@ impl Service { } fn handle_response_err(dest: Destination, statuses: &mut CurTransactionStatus, e: &Error) { - debug!(dest = ?dest, "{e:?}"); + debug!(dest = ?dest, "error response: {e:?}"); statuses.entry(dest).and_modify(|e| { *e = match e { | TransactionStatus::Running => TransactionStatus::Failed(1, Instant::now()), @@ -177,7 +177,21 @@ impl Service { if !new_events.is_empty() { self.db.mark_as_active(new_events.iter()); - let new_events_vec = new_events.into_iter().map(|(_, event)| event).collect(); + let new_events_vec: Vec = + new_events.into_iter().map(|(_, event)| event).collect(); + + if let Some(status) = statuses.get(&dest.clone()) { + if matches!(status, TransactionStatus::Running) { + // If the server is in backoff, clear it + info!( + ?dest, + "Catching up previously failed destination with {}+ new events", + new_events_vec.len() + ); + statuses.insert(dest.clone(), TransactionStatus::Running); + } + } + futures.push(self.send_events(dest.clone(), new_events_vec)); } else { statuses.remove(dest); @@ -859,16 +873,24 @@ impl Service { pdus, edus, }; + let pdu_count = request.pdus.len(); + let edu_count = request.edus.len(); let result = self .services .federation .execute_on(&self.services.client.sender, &server, request) - .await; + .await + .inspect(|_| { + trace!(%txn_id, %server, "Sent {} PDUs, {} EDUs", pdu_count, edu_count); + }) + .inspect_err(|e| { + info!(%txn_id, %server, "Failed to send transaction ({} PDUs, {} EDUs): {e:?}", pdu_count, edu_count); + }); for (event_id, result) in result.iter().flat_map(|resp| resp.pdus.iter()) { if let Err(e) = result { - warn!( + trace!( %txn_id, %server, "error sending PDU {event_id} to remote server: {e:?}" );