diff --git a/src/api/client/membership.rs b/src/api/client/membership.rs index 13ff1b92..584f955d 100644 --- a/src/api/client/membership.rs +++ b/src/api/client/membership.rs @@ -1249,7 +1249,6 @@ async fn join_room_by_id_helper_remote( services.rooms.timeline.get_pdu(event_id).await.ok() }; - debug!("running stateres check on send_join parsed PDU"); let auth_check = state_res::event_auth::auth_check( &state_res::RoomVersion::new(&room_version_id)?, &parsed_join_pdu, diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index bfe0c450..a58bcfe5 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -1828,7 +1828,7 @@ pub struct Config { /// setting a non-zero value. /// /// default: 0 - #[serde(default = "default_sender_workers")] + #[serde(default)] pub sender_workers: usize, /// Enables listener sockets; can be set to false to disable listening. This @@ -2315,8 +2315,6 @@ fn default_stream_width_scale() -> f32 { 1.0 } fn default_stream_amplification() -> usize { 1024 } -fn default_sender_workers() -> usize { 4 } - fn default_client_receive_timeout() -> u64 { 75 } fn default_client_request_timeout() -> u64 { 180 } diff --git a/src/core/matrix/state_res/mod.rs b/src/core/matrix/state_res/mod.rs index ea49b29e..a2052b92 100644 --- a/src/core/matrix/state_res/mod.rs +++ b/src/core/matrix/state_res/mod.rs @@ -609,7 +609,7 @@ where let fetch_state = |ty: &StateEventType, key: &str| { future::ready(auth_state.get(&ty.with_state_key(key))) }; - debug!("running auth check on {:?}", event.event_id()); + let auth_result = auth_check(room_version, &event, current_third_party.as_ref(), fetch_state).await; @@ -728,7 +728,7 @@ where { let mut room_id = None; while let Some(sort_ev) = event { - trace!(event_id = sort_ev.event_id().as_str(), "mainline"); + debug!(event_id = sort_ev.event_id().as_str(), "mainline"); if room_id.is_none() { room_id = Some(sort_ev.room_id().to_owned()); } diff --git a/src/service/rooms/event_handler/handle_outlier_pdu.rs b/src/service/rooms/event_handler/handle_outlier_pdu.rs index 87b76222..f9889d58 100644 --- a/src/service/rooms/event_handler/handle_outlier_pdu.rs +++ b/src/service/rooms/event_handler/handle_outlier_pdu.rs @@ -76,7 +76,7 @@ pub(super) async fn handle_outlier_pdu<'a>( // 5. Reject "due to auth events" if can't get all the auth events or some of // the auth events are also rejected "due to auth events" // NOTE: Step 5 is not applied anymore because it failed too often - debug!("Fetching auth events for {}", incoming_pdu.event_id); + debug!("Fetching auth events"); Box::pin(self.fetch_and_handle_outliers( origin, &incoming_pdu.auth_events, @@ -88,12 +88,12 @@ pub(super) async fn handle_outlier_pdu<'a>( // 6. Reject "due to auth events" if the event doesn't pass auth based on the // auth events - debug!("Checking {} based on auth events", incoming_pdu.event_id); + debug!("Checking based on auth events"); // Build map of auth events let mut auth_events = HashMap::with_capacity(incoming_pdu.auth_events.len()); for id in &incoming_pdu.auth_events { let Ok(auth_event) = self.services.timeline.get_pdu(id).await else { - warn!("Could not find auth event {id} for {}", incoming_pdu.event_id); + warn!("Could not find auth event {id}"); continue; }; @@ -128,7 +128,6 @@ pub(super) async fn handle_outlier_pdu<'a>( ready(auth_events.get(&key)) }; - debug!("running auth check to handle outlier pdu {:?}", incoming_pdu.event_id); let auth_check = state_res::event_auth::auth_check( &to_room_version(&room_version_id), &incoming_pdu, diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index 0dca2d70..97d3df97 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -1,6 +1,12 @@ use std::{borrow::Borrow, collections::BTreeMap, iter::once, sync::Arc, time::Instant}; -use conduwuit::{Err, Result, debug, debug_info, err, implement, matrix::{EventTypeExt, PduEvent, StateKey, state_res}, trace, utils::stream::{BroadbandExt, ReadyExt}, warn, info}; +use conduwuit::{ + Err, Result, debug, debug_info, err, implement, + matrix::{EventTypeExt, PduEvent, StateKey, state_res}, + trace, + utils::stream::{BroadbandExt, ReadyExt}, + warn, +}; use futures::{FutureExt, StreamExt, future::ready}; use ruma::{CanonicalJsonValue, RoomId, ServerName, events::StateEventType}; @@ -38,7 +44,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( return Err!(Request(InvalidParam("Event has been soft failed"))); } - debug!("Upgrading pdu {} from outlier to timeline pdu", incoming_pdu.event_id); + debug!("Upgrading to timeline pdu"); let timer = Instant::now(); let room_version_id = get_room_version_id(create_event)?; @@ -46,7 +52,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( // backwards extremities doing all the checks in this list starting at 1. // These are not timeline events. - debug!("Resolving state at event {}", incoming_pdu.event_id); + debug!("Resolving state at event"); let mut state_at_incoming_event = if incoming_pdu.prev_events.len() == 1 { self.state_at_incoming_degree_one(&incoming_pdu).await? } else { @@ -64,7 +70,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( state_at_incoming_event.expect("we always set this to some above"); let room_version = to_room_version(&room_version_id); - debug!("Performing auth check to upgrade {}", incoming_pdu.event_id); + debug!("Performing auth check"); // 11. Check the auth of the event passes based on the state of the event let state_fetch_state = &state_at_incoming_event; let state_fetch = |k: StateEventType, s: StateKey| async move { @@ -74,7 +80,6 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( self.services.timeline.get_pdu(event_id).await.ok() }; - debug!("running auth check on {}", incoming_pdu.event_id); let auth_check = state_res::event_auth::auth_check( &room_version, &incoming_pdu, @@ -88,7 +93,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( return Err!(Request(Forbidden("Event has failed auth check with state at the event."))); } - debug!("Gathering auth events for {}", incoming_pdu.event_id); + debug!("Gathering auth events"); let auth_events = self .services .state @@ -106,7 +111,6 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( ready(auth_events.get(&key).cloned()) }; - debug!("running auth check on {} with claimed state auth", incoming_pdu.event_id); let auth_check = state_res::event_auth::auth_check( &room_version, &incoming_pdu, @@ -117,7 +121,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( .map_err(|e| err!(Request(Forbidden("Auth check failed: {e:?}"))))?; // Soft fail check before doing state res - debug!("Performing soft-fail check on {}", incoming_pdu.event_id); + debug!("Performing soft-fail check"); let soft_fail = match (auth_check, incoming_pdu.redacts_id(&room_version_id)) { | (false, _) => true, | (true, None) => false, @@ -214,7 +218,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( // 14. Check if the event passes auth based on the "current state" of the room, // if not soft fail it if soft_fail { - info!("Soft failing event {}", incoming_pdu.event_id); + debug!("Soft failing event"); let extremities = extremities.iter().map(Borrow::borrow); self.services diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index f7925994..74c7348d 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -771,7 +771,6 @@ impl Service { ready(auth_events.get(&key)) }; - debug!("running auth check on new {} event by {} in {}", pdu.kind, pdu.sender, pdu.room_id); let auth_check = state_res::auth_check( &room_version, &pdu, @@ -1153,7 +1152,7 @@ impl Service { .boxed(); while let Some(ref backfill_server) = servers.next().await { - info!("Asking {backfill_server} for backfill in {:?}", room_id.to_owned()); + info!("Asking {backfill_server} for backfill"); let response = self .services .sending @@ -1181,7 +1180,7 @@ impl Service { } } - warn!("No servers could backfill, but backfill was needed in room {room_id}"); + info!("No servers could backfill, but backfill was needed in room {room_id}"); Ok(()) } diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index ce687551..08ca7010 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -401,10 +401,16 @@ impl Service { fn num_senders(args: &crate::Args<'_>) -> usize { const MIN_SENDERS: usize = 1; - // Limit the maximum number of senders to the number of cores. - let max_senders = available_parallelism(); + // Limit the number of senders to the number of workers threads or number of + // cores, conservatively. + let max_senders = args + .server + .metrics + .num_workers() + .min(available_parallelism()); - // default is 4 senders. clamp between 1 and core count. + // If the user doesn't override the default 0, this is intended to then default + // to 1 for now as multiple senders is experimental. args.server .config .sender_workers