diff --git a/conduwuit-example.toml b/conduwuit-example.toml index bdc2f570..a04819df 100644 --- a/conduwuit-example.toml +++ b/conduwuit-example.toml @@ -1041,7 +1041,7 @@ # 3 to 5 = Statistics with possible performance impact. # 6 = All statistics. # -#rocksdb_stats_level = 1 +#rocksdb_stats_level = 3 # This is a password that can be configured that will let you login to the # server bot account (currently `@conduit`) for emergency troubleshooting @@ -1655,11 +1655,9 @@ #stream_amplification = 1024 # Number of sender task workers; determines sender parallelism. Default is -# '0' which means the value is determined internally, likely matching the -# number of tokio worker-threads or number of cores, etc. Override by -# setting a non-zero value. +# number of CPU cores. Override by setting a different value. # -#sender_workers = 0 +#sender_workers = 4 # Enables listener sockets; can be set to false to disable listening. This # option is intended for developer/diagnostic purposes only. diff --git a/src/api/client/account.rs b/src/api/client/account.rs index 11414abf..0cea7bd9 100644 --- a/src/api/client/account.rs +++ b/src/api/client/account.rs @@ -298,7 +298,9 @@ pub(crate) async fn register_route( session: None, auth_error: None, }; - let mut skip_auth = body.appservice_info.is_some(); + let skip_auth = body.appservice_info.is_some() || is_guest; + + // Populate required UIAA flows if services.globals.registration_token.is_some() { // Registration token required uiaainfo.flows.push(AuthFlow { @@ -317,9 +319,10 @@ pub(crate) async fn register_route( }, })) .expect("Failed to serialize recaptcha params"); - skip_auth = skip_auth || is_guest; } - } else { + } + + if uiaainfo.flows.is_empty() && !skip_auth { // No registration token necessary, but clients must still go through the flow uiaainfo = UiaaInfo { flows: vec![AuthFlow { stages: vec![AuthType::Dummy] }], @@ -328,7 +331,6 @@ pub(crate) async fn register_route( session: None, auth_error: None, }; - skip_auth = skip_auth || is_guest; } if !skip_auth { diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index d93acd9b..bd44f9ff 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -1207,7 +1207,7 @@ pub struct Config { /// 3 to 5 = Statistics with possible performance impact. /// 6 = All statistics. /// - /// default: 1 + /// default: 3 #[serde(default = "default_rocksdb_stats_level")] pub rocksdb_stats_level: u8, @@ -1889,12 +1889,10 @@ pub struct Config { pub stream_amplification: usize, /// Number of sender task workers; determines sender parallelism. Default is - /// '0' which means the value is determined internally, likely matching the - /// number of tokio worker-threads or number of cores, etc. Override by - /// setting a non-zero value. + /// core count. Override by setting a different value. /// - /// default: 0 - #[serde(default)] + /// default: core count + #[serde(default = "default_sender_workers")] pub sender_workers: usize, /// Enables listener sockets; can be set to false to disable listening. This @@ -2125,45 +2123,48 @@ fn default_database_backups_to_keep() -> i16 { 1 } fn default_db_write_buffer_capacity_mb() -> f64 { 48.0 + parallelism_scaled_f64(4.0) } -fn default_db_cache_capacity_mb() -> f64 { 128.0 + parallelism_scaled_f64(64.0) } +fn default_db_cache_capacity_mb() -> f64 { 512.0 + parallelism_scaled_f64(512.0) } -fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(10_000).saturating_add(100_000) } +fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(50_000).saturating_add(100_000) } fn default_cache_capacity_modifier() -> f64 { 1.0 } fn default_auth_chain_cache_capacity() -> u32 { - parallelism_scaled_u32(10_000).saturating_add(100_000) -} - -fn default_shorteventid_cache_capacity() -> u32 { parallelism_scaled_u32(50_000).saturating_add(100_000) } +fn default_shorteventid_cache_capacity() -> u32 { + parallelism_scaled_u32(100_000).saturating_add(100_000) +} + fn default_eventidshort_cache_capacity() -> u32 { - parallelism_scaled_u32(25_000).saturating_add(100_000) + parallelism_scaled_u32(50_000).saturating_add(100_000) } fn default_eventid_pdu_cache_capacity() -> u32 { - parallelism_scaled_u32(25_000).saturating_add(100_000) + parallelism_scaled_u32(50_000).saturating_add(100_000) } fn default_shortstatekey_cache_capacity() -> u32 { - parallelism_scaled_u32(10_000).saturating_add(100_000) + parallelism_scaled_u32(50_000).saturating_add(100_000) } fn default_statekeyshort_cache_capacity() -> u32 { - parallelism_scaled_u32(10_000).saturating_add(100_000) + parallelism_scaled_u32(50_000).saturating_add(100_000) } fn default_servernameevent_data_cache_capacity() -> u32 { - parallelism_scaled_u32(100_000).saturating_add(500_000) + parallelism_scaled_u32(100_000).saturating_add(100_000) } -fn default_stateinfo_cache_capacity() -> u32 { parallelism_scaled_u32(100) } +fn default_stateinfo_cache_capacity() -> u32 { + parallelism_scaled_u32(500).clamp(100, 12000) +} -fn default_roomid_spacehierarchy_cache_capacity() -> u32 { parallelism_scaled_u32(1000) } +fn default_roomid_spacehierarchy_cache_capacity() -> u32 { + parallelism_scaled_u32(500).clamp(100, 12000) } -fn default_dns_cache_entries() -> u32 { 32768 } +fn default_dns_cache_entries() -> u32 { 327680 } fn default_dns_min_ttl() -> u64 { 60 * 180 } @@ -2265,7 +2266,7 @@ fn default_typing_client_timeout_max_s() -> u64 { 45 } fn default_rocksdb_recovery_mode() -> u8 { 1 } -fn default_rocksdb_log_level() -> String { "error".to_owned() } +fn default_rocksdb_log_level() -> String { "info".to_owned() } fn default_rocksdb_log_time_to_roll() -> usize { 0 } @@ -2297,7 +2298,7 @@ fn default_rocksdb_compression_level() -> i32 { 32767 } #[allow(clippy::doc_markdown)] fn default_rocksdb_bottommost_compression_level() -> i32 { 32767 } -fn default_rocksdb_stats_level() -> u8 { 1 } +fn default_rocksdb_stats_level() -> u8 { 3 } // I know, it's a great name #[must_use] @@ -2352,14 +2353,13 @@ fn default_admin_log_capture() -> String { fn default_admin_room_tag() -> String { "m.server_notice".to_owned() } #[allow(clippy::as_conversions, clippy::cast_precision_loss)] -fn parallelism_scaled_f64(val: f64) -> f64 { val * (sys::available_parallelism() as f64) } +pub fn parallelism_scaled_f64(val: f64) -> f64 { val * (sys::available_parallelism() as f64) } -fn parallelism_scaled_u32(val: u32) -> u32 { - let val = val.try_into().expect("failed to cast u32 to usize"); - parallelism_scaled(val).try_into().unwrap_or(u32::MAX) -} +pub fn parallelism_scaled_u32(val: u32) -> u32 { val.saturating_mul(sys::available_parallelism() as u32) } -fn parallelism_scaled(val: usize) -> usize { val.saturating_mul(sys::available_parallelism()) } +pub fn parallelism_scaled_i32(val: i32) -> i32 { val.saturating_mul(sys::available_parallelism() as i32) } + +pub fn parallelism_scaled(val: usize) -> usize { val.saturating_mul(sys::available_parallelism()) } fn default_trusted_server_batch_size() -> usize { 256 } @@ -2379,6 +2379,8 @@ fn default_stream_width_scale() -> f32 { 1.0 } fn default_stream_amplification() -> usize { 1024 } +fn default_sender_workers() -> usize { parallelism_scaled(1) } + fn default_client_receive_timeout() -> u64 { 75 } fn default_client_request_timeout() -> u64 { 180 } diff --git a/src/core/matrix/state_res/event_auth.rs b/src/core/matrix/state_res/event_auth.rs index 5c36ce03..ec70d684 100644 --- a/src/core/matrix/state_res/event_auth.rs +++ b/src/core/matrix/state_res/event_auth.rs @@ -13,6 +13,7 @@ use ruma::{ power_levels::RoomPowerLevelsEventContent, third_party_invite::RoomThirdPartyInviteEventContent, }, + EventId, int, serde::{Base64, Raw}, }; @@ -21,7 +22,6 @@ use serde::{ de::{Error as _, IgnoredAny}, }; use serde_json::{from_str as from_json_str, value::RawValue as RawJsonValue}; - use super::{ Error, Event, Result, StateEventType, StateKey, TimelineEventType, power_levels::{ @@ -217,8 +217,9 @@ where } /* - // TODO: In the past this code caused problems federating with synapse, maybe this has been - // resolved already. Needs testing. + // TODO: In the past this code was commented as it caused problems with Synapse. This is no + // longer the case. This needs to be implemented. + // See also: https://github.com/ruma/ruma/pull/2064 // // 2. Reject if auth_events // a. auth_events cannot have duplicate keys since it's a BTree @@ -241,20 +242,44 @@ where } */ - let (room_create_event, power_levels_event, sender_member_event) = join3( - fetch_state(&StateEventType::RoomCreate, ""), - fetch_state(&StateEventType::RoomPowerLevels, ""), - fetch_state(&StateEventType::RoomMember, sender.as_str()), - ) - .await; + // let (room_create_event, power_levels_event, sender_member_event) = join3( + // fetch_state(&StateEventType::RoomCreate, ""), + // fetch_state(&StateEventType::RoomPowerLevels, ""), + // fetch_state(&StateEventType::RoomMember, sender.as_str()), + // ) + // .await; + + let room_create_event = fetch_state(&StateEventType::RoomCreate, "").await; + let power_levels_event = fetch_state(&StateEventType::RoomPowerLevels, "").await; + let sender_member_event = fetch_state(&StateEventType::RoomMember, sender.as_str()).await; let room_create_event = match room_create_event { | None => { - warn!("no m.room.create event in auth chain"); + error!( + create_event = room_create_event.as_ref().map(Event::event_id).unwrap_or(<&EventId>::try_from("$unknown").unwrap()).as_str(), + power_levels = power_levels_event.as_ref().map(Event::event_id).unwrap_or(<&EventId>::try_from("$unknown").unwrap()).as_str(), + member_event = sender_member_event.as_ref().map(Event::event_id).unwrap_or(<&EventId>::try_from("$unknown").unwrap()).as_str(), + "no m.room.create event found for {} ({})!", + incoming_event.event_id().as_str(), + incoming_event.room_id().as_str() + ); return Ok(false); }, | Some(e) => e, }; + // just re-check 1.2 to work around a bug + let Some(room_id_server_name) = incoming_event.room_id().server_name() else { + warn!("room ID has no servername"); + return Ok(false); + }; + + if room_id_server_name != room_create_event.sender().server_name() { + warn!( + "servername of room ID origin ({}) does not match servername of m.room.create sender ({})", + room_id_server_name, + room_create_event.sender().server_name()); + return Ok(false); + } if incoming_event.room_id() != room_create_event.room_id() { warn!("room_id of incoming event does not match room_id of m.room.create event"); diff --git a/src/core/matrix/state_res/mod.rs b/src/core/matrix/state_res/mod.rs index ce9d9276..ba9c013d 100644 --- a/src/core/matrix/state_res/mod.rs +++ b/src/core/matrix/state_res/mod.rs @@ -733,8 +733,12 @@ where Fut: Future> + Send, E: Event + Send + Sync, { + let mut room_id = None; while let Some(sort_ev) = event { debug!(event_id = sort_ev.event_id().as_str(), "mainline"); + if room_id.is_none() { + room_id = Some(sort_ev.room_id().to_owned()); + } let id = sort_ev.event_id(); if let Some(depth) = mainline_map.get(id) { @@ -753,7 +757,7 @@ where } } } - // Did not find a power level event so we default to zero + warn!("could not find a power event in the mainline map for {room_id:?}, defaulting to zero depth"); Ok(0) } diff --git a/src/database/engine/cf_opts.rs b/src/database/engine/cf_opts.rs index cbbd1012..666f9f9e 100644 --- a/src/database/engine/cf_opts.rs +++ b/src/database/engine/cf_opts.rs @@ -29,7 +29,7 @@ fn descriptor_cf_options( set_table_options(&mut opts, &desc, cache)?; opts.set_min_write_buffer_number(1); - opts.set_max_write_buffer_number(2); + opts.set_max_write_buffer_number(3); opts.set_write_buffer_size(desc.write_size); opts.set_target_file_size_base(desc.file_size); diff --git a/src/database/engine/db_opts.rs b/src/database/engine/db_opts.rs index 18cec742..1299443d 100644 --- a/src/database/engine/db_opts.rs +++ b/src/database/engine/db_opts.rs @@ -1,8 +1,6 @@ -use std::{cmp, convert::TryFrom}; - -use conduwuit::{Config, Result, utils}; +use conduwuit::{Config, Result}; use rocksdb::{Cache, DBRecoveryMode, Env, LogLevel, Options, statistics::StatsLevel}; - +use conduwuit::config::{parallelism_scaled_i32, parallelism_scaled_u32}; use super::{cf_opts::cache_size_f64, logger::handle as handle_log}; /// Create database-wide options suitable for opening the database. This also @@ -23,8 +21,8 @@ pub(crate) fn db_options(config: &Config, env: &Env, row_cache: &Cache) -> Resul set_logging_defaults(&mut opts, config); // Processing - opts.set_max_background_jobs(num_threads::(config)?); - opts.set_max_subcompactions(num_threads::(config)?); + opts.set_max_background_jobs(parallelism_scaled_i32(1)); + opts.set_max_subcompactions(parallelism_scaled_u32(1)); opts.set_avoid_unnecessary_blocking_io(true); opts.set_max_file_opening_threads(0); @@ -126,15 +124,3 @@ fn set_logging_defaults(opts: &mut Options, config: &Config) { opts.set_callback_logger(rocksdb_log_level, &handle_log); } } - -fn num_threads>(config: &Config) -> Result { - const MIN_PARALLELISM: usize = 2; - - let requested = if config.rocksdb_parallelism_threads != 0 { - config.rocksdb_parallelism_threads - } else { - utils::available_parallelism() - }; - - utils::math::try_into::(cmp::max(MIN_PARALLELISM, requested)) -} diff --git a/src/service/admin/grant.rs b/src/service/admin/grant.rs index 0d0e3fc1..172187cb 100644 --- a/src/service/admin/grant.rs +++ b/src/service/admin/grant.rs @@ -192,8 +192,9 @@ pub async fn revoke_admin(&self, user_id: &UserId) -> Result { | Err(e) => return Err!(error!(?e, "Failure occurred while attempting revoke.")), - | Ok(event) if !matches!(event.membership, Invite | Knock | Join) => - return Err!("Cannot revoke {user_id} in membership state {:?}.", event.membership), + | Ok(event) if !matches!(event.membership, Invite | Knock | Join) => { + return Err!("Cannot revoke {user_id} in membership state {:?}.", event.membership); + }, | Ok(event) => { assert!( diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index d971ce95..f496c414 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -9,6 +9,7 @@ use std::{ }; use async_trait::async_trait; +use conduwuit::{Err, utils}; use conduwuit_core::{ Error, Event, Result, Server, debug, err, error, error::default_log, pdu::PduBuilder, }; @@ -16,15 +17,20 @@ pub use create::create_admin_room; use futures::{Future, FutureExt, TryFutureExt}; use loole::{Receiver, Sender}; use ruma::{ - OwnedEventId, OwnedRoomId, RoomId, UserId, + Mxc, OwnedEventId, OwnedMxcUri, OwnedRoomId, RoomId, UInt, UserId, events::{ Mentions, - room::message::{Relation, RoomMessageEventContent}, + room::{ + MediaSource, + message::{ + FileInfo, FileMessageEventContent, MessageType, Relation, RoomMessageEventContent, + }, + }, }, }; use tokio::sync::RwLock; -use crate::{Dep, account_data, globals, rooms, rooms::state::RoomMutexGuard}; +use crate::{Dep, account_data, globals, media::MXC_LENGTH, rooms, rooms::state::RoomMutexGuard}; pub struct Service { services: Services, @@ -45,6 +51,7 @@ struct Services { state_accessor: Dep, account_data: Dep, services: StdRwLock>>, + media: Dep, } /// Inputs to a command are a multi-line string, optional reply_id, and optional @@ -94,6 +101,7 @@ impl crate::Service for Service { .depend::("rooms::state_accessor"), account_data: args.depend::("account_data"), services: None.into(), + media: args.depend::("media"), }, channel: loole::bounded(COMMAND_QUEUE_LIMIT), handle: RwLock::new(None), @@ -157,8 +165,65 @@ impl Service { .ok(); } - /// Sends a message to the admin room as the admin user (see send_text() for - /// convenience). + /// Either returns a small-enough message, or converts a large message into + /// a file + pub async fn text_or_file( + &self, + message_content: RoomMessageEventContent, + ) -> RoomMessageEventContent { + let body_len = Self::collate_msg_size(&message_content); + if body_len > 60000 { + // Intercept and send as file + let file = self + .text_to_file(message_content.body()) + .await + .expect("failed to create text file"); + let size_u64: u64 = message_content.body().len().try_into().map_or(0, |n| n); + let metadata = FileInfo { + mimetype: Some("text/markdown".to_owned()), + size: Some(UInt::new_saturating(size_u64)), + thumbnail_info: None, + thumbnail_source: None, + }; + let content = FileMessageEventContent { + body: "Output was too large to send as text.".to_owned(), + formatted: None, + filename: Some("output.md".to_owned()), + source: MediaSource::Plain(file), + info: Some(Box::new(metadata)), + }; + RoomMessageEventContent::new(MessageType::File(content)) + } else { + message_content + } + } + + #[must_use] + pub fn collate_msg_size(content: &RoomMessageEventContent) -> u64 { + content + .body() + .len() + .saturating_add(match &content.msgtype { + | MessageType::Text(t) => + if t.formatted.is_some() { + t.formatted.as_ref().map_or(0, |f| f.body.len()) + } else { + 0 + }, + | MessageType::Notice(n) => + if n.formatted.is_some() { + n.formatted.as_ref().map_or(0, |f| f.body.len()) + } else { + 0 + }, + | _ => 0, + }) + .try_into() + .expect("size too large") + } + + /// Sends a message to the admin room as the admin user (see send_text() + /// for convenience). pub async fn send_message(&self, message_content: RoomMessageEventContent) -> Result<()> { let user_id = &self.services.globals.server_user; let room_id = self.get_admin_room().await?; @@ -178,6 +243,36 @@ impl Service { self.send_message(message_content).await } + /// Casts a text body into a file and creates a file for it. + pub async fn text_to_file(&self, body: &str) -> Result { + let mxc = Mxc { + server_name: self.services.globals.server_name(), + media_id: &utils::random_string(MXC_LENGTH), + }; + match self + .services + .media + .create( + &mxc, + Some(self.services.globals.server_user.as_ref()), + Some(&utils::content_disposition::make_content_disposition( + None, + Some("text/markdown"), + Some("output.md"), + )), + Some("text/markdown"), + body.as_bytes(), + ) + .await + { + | Ok(()) => Ok(mxc.to_string().into()), + | Err(e) => { + error!("Failed to upload text to file: {e}"); + Err!(Request(Unknown("Failed to upload text to file"))) + }, + } + } + /// Posts a command to the command processor queue and returns. Processing /// will take place on the service worker's task asynchronously. Errors if /// the queue is full. @@ -325,11 +420,15 @@ impl Service { assert!(self.user_is_admin(user_id).await, "sender is not admin"); let state_lock = self.services.state.mutex.lock(room_id).await; - if let Err(e) = self .services .timeline - .build_and_append_pdu(PduBuilder::timeline(&content), user_id, room_id, &state_lock) + .build_and_append_pdu( + PduBuilder::timeline(&self.text_or_file(content).await), + user_id, + room_id, + &state_lock, + ) .await { self.handle_response_error(e, room_id, user_id, &state_lock) diff --git a/src/service/resolver/actual.rs b/src/service/resolver/actual.rs index d23ef95a..52cd5d7d 100644 --- a/src/service/resolver/actual.rs +++ b/src/service/resolver/actual.rs @@ -306,28 +306,25 @@ impl super::Service { #[tracing::instrument(name = "srv", level = "debug", skip(self))] async fn query_srv_record(&self, hostname: &'_ str) -> Result> { - let hostnames = - [format!("_matrix-fed._tcp.{hostname}."), format!("_matrix._tcp.{hostname}.")]; + self.services.server.check_running()?; - for hostname in hostnames { - self.services.server.check_running()?; + debug!("querying SRV for {hostname:?}"); - debug!("querying SRV for {hostname:?}"); - let hostname = hostname.trim_end_matches('.'); - match self.resolver.resolver.srv_lookup(hostname).await { - | Err(e) => Self::handle_resolve_error(&e, hostname)?, - | Ok(result) => { - return Ok(result.iter().next().map(|result| { - FedDest::Named( - result.target().to_string().trim_end_matches('.').to_owned(), - format!(":{}", result.port()) - .as_str() - .try_into() - .unwrap_or_else(|_| FedDest::default_port()), - ) - })); - }, - } + let hostname_suffix = format!("_matrix-fed._tcp.{hostname}."); + let hostname = hostname_suffix.trim_end_matches('.'); + match self.resolver.resolver.srv_lookup(hostname).await { + | Err(e) => Self::handle_resolve_error(&e, hostname)?, + | Ok(result) => { + return Ok(result.iter().next().map(|result| { + FedDest::Named( + result.target().to_string().trim_end_matches('.').to_owned(), + format!(":{}", result.port()) + .as_str() + .try_into() + .unwrap_or_else(|_| FedDest::default_port()), + ) + })); + }, } Ok(None) diff --git a/src/service/rooms/event_handler/handle_outlier_pdu.rs b/src/service/rooms/event_handler/handle_outlier_pdu.rs index d79eed77..fad9ac74 100644 --- a/src/service/rooms/event_handler/handle_outlier_pdu.rs +++ b/src/service/rooms/event_handler/handle_outlier_pdu.rs @@ -122,10 +122,7 @@ where } // The original create event must be in the auth events - if !matches!( - auth_events.get(&(StateEventType::RoomCreate, String::new().into())), - Some(_) | None - ) { + if !auth_events.contains_key(&(StateEventType::RoomCreate, String::new().into())) { return Err!(Request(InvalidParam("Incoming event refers to wrong create event."))); } diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index 4093cb05..bc2408df 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -6,6 +6,7 @@ use conduwuit::{ trace, utils::stream::{BroadbandExt, ReadyExt}, warn, + info }; use futures::{FutureExt, StreamExt, future::ready}; use ruma::{CanonicalJsonValue, RoomId, ServerName, events::StateEventType}; @@ -149,7 +150,7 @@ where let extremities: Vec<_> = self .services .state - .get_forward_extremities(room_id) + .get_forward_extremities(room_id, &state_lock) .map(ToOwned::to_owned) .ready_filter(|event_id| { // Remove any that are referenced by this incoming event's prev_events @@ -167,6 +168,8 @@ where .collect() .await; + if extremities.len() == 0 { info!("Retained zero extremities when upgrading outlier PDU to timeline PDU with {} previous events, event id: {}", incoming_pdu.prev_events.len(), incoming_pdu.event_id) } + debug!( "Retained {} extremities checked against {} prev_events", extremities.len(), diff --git a/src/service/rooms/state/mod.rs b/src/service/rooms/state/mod.rs index 641aa6a9..92881126 100644 --- a/src/service/rooms/state/mod.rs +++ b/src/service/rooms/state/mod.rs @@ -388,6 +388,7 @@ impl Service { pub fn get_forward_extremities<'a>( &'a self, room_id: &'a RoomId, + _state_lock: &'a RoomMutexGuard, ) -> impl Stream + Send + '_ { let prefix = (room_id, Interfix); diff --git a/src/service/rooms/timeline/create.rs b/src/service/rooms/timeline/create.rs index 20ccaf56..1be2f58b 100644 --- a/src/service/rooms/timeline/create.rs +++ b/src/service/rooms/timeline/create.rs @@ -42,7 +42,7 @@ pub async fn create_hash_and_sign_event( let prev_events: Vec = self .services .state - .get_forward_extremities(room_id) + .get_forward_extremities(room_id, _mutex_lock) .take(20) .map(Into::into) .collect() diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index 08ca7010..ce687551 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -401,16 +401,10 @@ impl Service { fn num_senders(args: &crate::Args<'_>) -> usize { const MIN_SENDERS: usize = 1; - // Limit the number of senders to the number of workers threads or number of - // cores, conservatively. - let max_senders = args - .server - .metrics - .num_workers() - .min(available_parallelism()); + // Limit the maximum number of senders to the number of cores. + let max_senders = available_parallelism(); - // If the user doesn't override the default 0, this is intended to then default - // to 1 for now as multiple senders is experimental. + // default is 4 senders. clamp between 1 and core count. args.server .config .sender_workers