diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index 881decfa..8ef580f1 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -1839,9 +1839,9 @@ pub struct Config { pub stream_amplification: usize, /// Number of sender task workers; determines sender parallelism. Default is - /// '4'. Override by setting a different value. Values clamped 1 to core count. + /// core count. Override by setting a different value. /// - /// default: 4 + /// default: core count #[serde(default = "default_sender_workers")] pub sender_workers: usize, @@ -2075,40 +2075,41 @@ fn default_db_write_buffer_capacity_mb() -> f64 { 48.0 + parallelism_scaled_f64( fn default_db_cache_capacity_mb() -> f64 { 512.0 + parallelism_scaled_f64(512.0) } -fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(50_000).saturating_add(500_000) } +fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(50_000).saturating_add(100_000) } fn default_cache_capacity_modifier() -> f64 { 1.0 } fn default_auth_chain_cache_capacity() -> u32 { - parallelism_scaled_u32(50_000).saturating_add(500_000) + parallelism_scaled_u32(50_000).saturating_add(100_000) } fn default_shorteventid_cache_capacity() -> u32 { - parallelism_scaled_u32(100_000).saturating_add(500_000) + parallelism_scaled_u32(100_000).saturating_add(100_000) } fn default_eventidshort_cache_capacity() -> u32 { - parallelism_scaled_u32(100_000).saturating_add(500_000) + parallelism_scaled_u32(50_000).saturating_add(100_000) } fn default_eventid_pdu_cache_capacity() -> u32 { - parallelism_scaled_u32(50_000).saturating_add(500_000) + parallelism_scaled_u32(50_000).saturating_add(100_000) } fn default_shortstatekey_cache_capacity() -> u32 { - parallelism_scaled_u32(50_000).saturating_add(500_000) + parallelism_scaled_u32(50_000).saturating_add(100_000) } fn default_statekeyshort_cache_capacity() -> u32 { - parallelism_scaled_u32(50_000).saturating_add(500_000) + parallelism_scaled_u32(50_000).saturating_add(100_000) } fn default_servernameevent_data_cache_capacity() -> u32 { - parallelism_scaled_u32(200_000).saturating_add(500_000) + parallelism_scaled_u32(100_000).saturating_add(100_000) } fn default_stateinfo_cache_capacity() -> u32 { - parallelism_scaled_u32(500).clamp(100, 12000) } + parallelism_scaled_u32(500).clamp(100, 12000) +} fn default_roomid_spacehierarchy_cache_capacity() -> u32 { parallelism_scaled_u32(500).clamp(100, 12000) } @@ -2328,7 +2329,7 @@ fn default_stream_width_scale() -> f32 { 1.0 } fn default_stream_amplification() -> usize { 1024 } -fn default_sender_workers() -> usize { 4 } +fn default_sender_workers() -> usize { parallelism_scaled(1) } fn default_client_receive_timeout() -> u64 { 75 } diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index 4093cb05..f84c21ce 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -149,7 +149,7 @@ where let extremities: Vec<_> = self .services .state - .get_forward_extremities(room_id) + .get_forward_extremities(room_id, &state_lock) .map(ToOwned::to_owned) .ready_filter(|event_id| { // Remove any that are referenced by this incoming event's prev_events @@ -167,6 +167,8 @@ where .collect() .await; + if extremities.len() == 0 { info!("Retained zero extremities when upgrading outlier PDU to timeline PDU with {} previous events, event id: {}", incoming_pdu.prev_events.len(), incoming_pdu.event_id) } + debug!( "Retained {} extremities checked against {} prev_events", extremities.len(), diff --git a/src/service/rooms/state/mod.rs b/src/service/rooms/state/mod.rs index 641aa6a9..92881126 100644 --- a/src/service/rooms/state/mod.rs +++ b/src/service/rooms/state/mod.rs @@ -388,6 +388,7 @@ impl Service { pub fn get_forward_extremities<'a>( &'a self, room_id: &'a RoomId, + _state_lock: &'a RoomMutexGuard, ) -> impl Stream + Send + '_ { let prefix = (room_id, Interfix);