diff --git a/conduwuit-example.toml b/conduwuit-example.toml index d85281aa..f744a07d 100644 --- a/conduwuit-example.toml +++ b/conduwuit-example.toml @@ -1001,7 +1001,7 @@ # 3 to 5 = Statistics with possible performance impact. # 6 = All statistics. # -#rocksdb_stats_level = 1 +#rocksdb_stats_level = 3 # This is a password that can be configured that will let you login to the # server bot account (currently `@conduit`) for emergency troubleshooting diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index 390a253e..8ef580f1 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -1171,7 +1171,7 @@ pub struct Config { /// 3 to 5 = Statistics with possible performance impact. /// 6 = All statistics. /// - /// default: 1 + /// default: 3 #[serde(default = "default_rocksdb_stats_level")] pub rocksdb_stats_level: u8, @@ -1839,9 +1839,9 @@ pub struct Config { pub stream_amplification: usize, /// Number of sender task workers; determines sender parallelism. Default is - /// '4'. Override by setting a different value. Values clamped 1 to core count. + /// core count. Override by setting a different value. /// - /// default: 4 + /// default: core count #[serde(default = "default_sender_workers")] pub sender_workers: usize, @@ -2075,40 +2075,41 @@ fn default_db_write_buffer_capacity_mb() -> f64 { 48.0 + parallelism_scaled_f64( fn default_db_cache_capacity_mb() -> f64 { 512.0 + parallelism_scaled_f64(512.0) } -fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(50_000).saturating_add(500_000) } +fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(50_000).saturating_add(100_000) } fn default_cache_capacity_modifier() -> f64 { 1.0 } fn default_auth_chain_cache_capacity() -> u32 { - parallelism_scaled_u32(50_000).saturating_add(500_000) + parallelism_scaled_u32(50_000).saturating_add(100_000) } fn default_shorteventid_cache_capacity() -> u32 { - parallelism_scaled_u32(100_000).saturating_add(500_000) + parallelism_scaled_u32(100_000).saturating_add(100_000) } fn default_eventidshort_cache_capacity() -> u32 { - parallelism_scaled_u32(100_000).saturating_add(500_000) + parallelism_scaled_u32(50_000).saturating_add(100_000) } fn default_eventid_pdu_cache_capacity() -> u32 { - parallelism_scaled_u32(50_000).saturating_add(500_000) + parallelism_scaled_u32(50_000).saturating_add(100_000) } fn default_shortstatekey_cache_capacity() -> u32 { - parallelism_scaled_u32(50_000).saturating_add(500_000) + parallelism_scaled_u32(50_000).saturating_add(100_000) } fn default_statekeyshort_cache_capacity() -> u32 { - parallelism_scaled_u32(50_000).saturating_add(500_000) + parallelism_scaled_u32(50_000).saturating_add(100_000) } fn default_servernameevent_data_cache_capacity() -> u32 { - parallelism_scaled_u32(200_000).saturating_add(500_000) + parallelism_scaled_u32(100_000).saturating_add(100_000) } fn default_stateinfo_cache_capacity() -> u32 { - parallelism_scaled_u32(500).clamp(100, 12000) } + parallelism_scaled_u32(500).clamp(100, 12000) +} fn default_roomid_spacehierarchy_cache_capacity() -> u32 { parallelism_scaled_u32(500).clamp(100, 12000) } @@ -2215,7 +2216,7 @@ fn default_typing_client_timeout_max_s() -> u64 { 45 } fn default_rocksdb_recovery_mode() -> u8 { 1 } -fn default_rocksdb_log_level() -> String { "error".to_owned() } +fn default_rocksdb_log_level() -> String { "info".to_owned() } fn default_rocksdb_log_time_to_roll() -> usize { 0 } @@ -2247,7 +2248,7 @@ fn default_rocksdb_compression_level() -> i32 { 32767 } #[allow(clippy::doc_markdown)] fn default_rocksdb_bottommost_compression_level() -> i32 { 32767 } -fn default_rocksdb_stats_level() -> u8 { 1 } +fn default_rocksdb_stats_level() -> u8 { 3 } // I know, it's a great name #[must_use] @@ -2328,7 +2329,7 @@ fn default_stream_width_scale() -> f32 { 1.0 } fn default_stream_amplification() -> usize { 1024 } -fn default_sender_workers() -> usize { 4 } +fn default_sender_workers() -> usize { parallelism_scaled(1) } fn default_client_receive_timeout() -> u64 { 75 } diff --git a/src/core/matrix/state_res/event_auth.rs b/src/core/matrix/state_res/event_auth.rs index bff3d4c8..be45896f 100644 --- a/src/core/matrix/state_res/event_auth.rs +++ b/src/core/matrix/state_res/event_auth.rs @@ -13,6 +13,7 @@ use ruma::{ power_levels::RoomPowerLevelsEventContent, third_party_invite::RoomThirdPartyInviteEventContent, }, + EventId, int, serde::{Base64, Raw}, }; @@ -21,7 +22,6 @@ use serde::{ de::{Error as _, IgnoredAny}, }; use serde_json::{from_str as from_json_str, value::RawValue as RawJsonValue}; - use super::{ Error, Event, Result, StateEventType, StateKey, TimelineEventType, power_levels::{ @@ -242,16 +242,27 @@ where } */ - let (room_create_event, power_levels_event, sender_member_event) = join3( - fetch_state(&StateEventType::RoomCreate, ""), - fetch_state(&StateEventType::RoomPowerLevels, ""), - fetch_state(&StateEventType::RoomMember, sender.as_str()), - ) - .await; + // let (room_create_event, power_levels_event, sender_member_event) = join3( + // fetch_state(&StateEventType::RoomCreate, ""), + // fetch_state(&StateEventType::RoomPowerLevels, ""), + // fetch_state(&StateEventType::RoomMember, sender.as_str()), + // ) + // .await; + + let room_create_event = fetch_state(&StateEventType::RoomCreate, "").await; + let power_levels_event = fetch_state(&StateEventType::RoomPowerLevels, "").await; + let sender_member_event = fetch_state(&StateEventType::RoomMember, sender.as_str()).await; let room_create_event = match room_create_event { | None => { - error!("no m.room.create event found for {}!", incoming_event.event_id()); + error!( + create_event = room_create_event.as_ref().map(Event::event_id).unwrap_or(<&EventId>::try_from("$unknown").unwrap()).as_str(), + power_levels = power_levels_event.as_ref().map(Event::event_id).unwrap_or(<&EventId>::try_from("$unknown").unwrap()).as_str(), + member_event = sender_member_event.as_ref().map(Event::event_id).unwrap_or(<&EventId>::try_from("$unknown").unwrap()).as_str(), + "no m.room.create event found for {} ({})!", + incoming_event.event_id().as_str(), + incoming_event.room_id().as_str() + ); return Ok(false); }, | Some(e) => e, diff --git a/src/core/matrix/state_res/mod.rs b/src/core/matrix/state_res/mod.rs index e721e14c..ba9c013d 100644 --- a/src/core/matrix/state_res/mod.rs +++ b/src/core/matrix/state_res/mod.rs @@ -733,8 +733,12 @@ where Fut: Future> + Send, E: Event + Send + Sync, { + let mut room_id = None; while let Some(sort_ev) = event { debug!(event_id = sort_ev.event_id().as_str(), "mainline"); + if room_id.is_none() { + room_id = Some(sort_ev.room_id().to_owned()); + } let id = sort_ev.event_id(); if let Some(depth) = mainline_map.get(id) { @@ -753,7 +757,7 @@ where } } } - warn!("could not find a power event in the mainline map, defaulting to zero depth"); + warn!("could not find a power event in the mainline map for {room_id:?}, defaulting to zero depth"); Ok(0) } diff --git a/src/database/engine/db_opts.rs b/src/database/engine/db_opts.rs index 18cec742..1299443d 100644 --- a/src/database/engine/db_opts.rs +++ b/src/database/engine/db_opts.rs @@ -1,8 +1,6 @@ -use std::{cmp, convert::TryFrom}; - -use conduwuit::{Config, Result, utils}; +use conduwuit::{Config, Result}; use rocksdb::{Cache, DBRecoveryMode, Env, LogLevel, Options, statistics::StatsLevel}; - +use conduwuit::config::{parallelism_scaled_i32, parallelism_scaled_u32}; use super::{cf_opts::cache_size_f64, logger::handle as handle_log}; /// Create database-wide options suitable for opening the database. This also @@ -23,8 +21,8 @@ pub(crate) fn db_options(config: &Config, env: &Env, row_cache: &Cache) -> Resul set_logging_defaults(&mut opts, config); // Processing - opts.set_max_background_jobs(num_threads::(config)?); - opts.set_max_subcompactions(num_threads::(config)?); + opts.set_max_background_jobs(parallelism_scaled_i32(1)); + opts.set_max_subcompactions(parallelism_scaled_u32(1)); opts.set_avoid_unnecessary_blocking_io(true); opts.set_max_file_opening_threads(0); @@ -126,15 +124,3 @@ fn set_logging_defaults(opts: &mut Options, config: &Config) { opts.set_callback_logger(rocksdb_log_level, &handle_log); } } - -fn num_threads>(config: &Config) -> Result { - const MIN_PARALLELISM: usize = 2; - - let requested = if config.rocksdb_parallelism_threads != 0 { - config.rocksdb_parallelism_threads - } else { - utils::available_parallelism() - }; - - utils::math::try_into::(cmp::max(MIN_PARALLELISM, requested)) -} diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index 4093cb05..f84c21ce 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -149,7 +149,7 @@ where let extremities: Vec<_> = self .services .state - .get_forward_extremities(room_id) + .get_forward_extremities(room_id, &state_lock) .map(ToOwned::to_owned) .ready_filter(|event_id| { // Remove any that are referenced by this incoming event's prev_events @@ -167,6 +167,8 @@ where .collect() .await; + if extremities.len() == 0 { info!("Retained zero extremities when upgrading outlier PDU to timeline PDU with {} previous events, event id: {}", incoming_pdu.prev_events.len(), incoming_pdu.event_id) } + debug!( "Retained {} extremities checked against {} prev_events", extremities.len(), diff --git a/src/service/rooms/state/mod.rs b/src/service/rooms/state/mod.rs index 641aa6a9..92881126 100644 --- a/src/service/rooms/state/mod.rs +++ b/src/service/rooms/state/mod.rs @@ -388,6 +388,7 @@ impl Service { pub fn get_forward_extremities<'a>( &'a self, room_id: &'a RoomId, + _state_lock: &'a RoomMutexGuard, ) -> impl Stream + Send + '_ { let prefix = (room_id, Interfix);