Compare commits

...

4 commits

Author SHA1 Message Date
Jacob Taylor
db2e7690c8 more funny settings (part 3 of 12)
Some checks failed
Checks / Prefligit / prefligit (push) Failing after 5s
Release Docker Image / define-variables (push) Failing after 2s
Release Docker Image / build-image (linux/amd64, release, linux-amd64, base) (push) Has been skipped
Release Docker Image / build-image (linux/arm64, release, linux-arm64, base) (push) Has been skipped
Release Docker Image / merge (push) Has been skipped
Checks / Rust / Format (push) Failing after 3s
Checks / Rust / Clippy (push) Failing after 16s
Checks / Rust / Cargo Test (push) Failing after 15s
2025-07-03 14:44:28 -07:00
Jacob Taylor
1f1cde297d sender_workers scaling. this time, with feeling! 2025-07-03 14:44:28 -07:00
Jacob Taylor
2e80200c64 vehicle loan documentation now available at window 7 2025-07-03 14:44:27 -07:00
Jacob Taylor
9e8c193840 lock the getter instead ??? c/o M 2025-07-03 14:39:10 -07:00
4 changed files with 19 additions and 14 deletions

View file

@ -1839,9 +1839,9 @@ pub struct Config {
pub stream_amplification: usize,
/// Number of sender task workers; determines sender parallelism. Default is
/// '4'. Override by setting a different value. Values clamped 1 to core count.
/// core count. Override by setting a different value.
///
/// default: 4
/// default: core count
#[serde(default = "default_sender_workers")]
pub sender_workers: usize,
@ -2075,40 +2075,41 @@ fn default_db_write_buffer_capacity_mb() -> f64 { 48.0 + parallelism_scaled_f64(
fn default_db_cache_capacity_mb() -> f64 { 512.0 + parallelism_scaled_f64(512.0) }
fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(50_000).saturating_add(500_000) }
fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(50_000).saturating_add(100_000) }
fn default_cache_capacity_modifier() -> f64 { 1.0 }
fn default_auth_chain_cache_capacity() -> u32 {
parallelism_scaled_u32(50_000).saturating_add(500_000)
parallelism_scaled_u32(50_000).saturating_add(100_000)
}
fn default_shorteventid_cache_capacity() -> u32 {
parallelism_scaled_u32(100_000).saturating_add(500_000)
parallelism_scaled_u32(100_000).saturating_add(100_000)
}
fn default_eventidshort_cache_capacity() -> u32 {
parallelism_scaled_u32(100_000).saturating_add(500_000)
parallelism_scaled_u32(50_000).saturating_add(100_000)
}
fn default_eventid_pdu_cache_capacity() -> u32 {
parallelism_scaled_u32(50_000).saturating_add(500_000)
parallelism_scaled_u32(50_000).saturating_add(100_000)
}
fn default_shortstatekey_cache_capacity() -> u32 {
parallelism_scaled_u32(50_000).saturating_add(500_000)
parallelism_scaled_u32(50_000).saturating_add(100_000)
}
fn default_statekeyshort_cache_capacity() -> u32 {
parallelism_scaled_u32(50_000).saturating_add(500_000)
parallelism_scaled_u32(50_000).saturating_add(100_000)
}
fn default_servernameevent_data_cache_capacity() -> u32 {
parallelism_scaled_u32(200_000).saturating_add(500_000)
parallelism_scaled_u32(100_000).saturating_add(100_000)
}
fn default_stateinfo_cache_capacity() -> u32 {
parallelism_scaled_u32(500).clamp(100, 12000) }
parallelism_scaled_u32(500).clamp(100, 12000)
}
fn default_roomid_spacehierarchy_cache_capacity() -> u32 {
parallelism_scaled_u32(500).clamp(100, 12000) }
@ -2328,7 +2329,7 @@ fn default_stream_width_scale() -> f32 { 1.0 }
fn default_stream_amplification() -> usize { 1024 }
fn default_sender_workers() -> usize { 4 }
fn default_sender_workers() -> usize { parallelism_scaled(1) }
fn default_client_receive_timeout() -> u64 { 75 }

View file

@ -6,6 +6,7 @@ use conduwuit::{
trace,
utils::stream::{BroadbandExt, ReadyExt},
warn,
info
};
use futures::{FutureExt, StreamExt, future::ready};
use ruma::{CanonicalJsonValue, RoomId, ServerName, events::StateEventType};
@ -149,7 +150,7 @@ where
let extremities: Vec<_> = self
.services
.state
.get_forward_extremities(room_id)
.get_forward_extremities(room_id, &state_lock)
.map(ToOwned::to_owned)
.ready_filter(|event_id| {
// Remove any that are referenced by this incoming event's prev_events
@ -167,6 +168,8 @@ where
.collect()
.await;
if extremities.len() == 0 { info!("Retained zero extremities when upgrading outlier PDU to timeline PDU with {} previous events, event id: {}", incoming_pdu.prev_events.len(), incoming_pdu.event_id) }
debug!(
"Retained {} extremities checked against {} prev_events",
extremities.len(),

View file

@ -388,6 +388,7 @@ impl Service {
pub fn get_forward_extremities<'a>(
&'a self,
room_id: &'a RoomId,
_state_lock: &'a RoomMutexGuard,
) -> impl Stream<Item = &EventId> + Send + '_ {
let prefix = (room_id, Interfix);

View file

@ -42,7 +42,7 @@ pub async fn create_hash_and_sign_event(
let prev_events: Vec<OwnedEventId> = self
.services
.state
.get_forward_extremities(room_id)
.get_forward_extremities(room_id, _mutex_lock)
.take(20)
.map(Into::into)
.collect()