diff --git a/src/admin/room/moderation.rs b/src/admin/room/moderation.rs index ee429fc6..4c19da5c 100644 --- a/src/admin/room/moderation.rs +++ b/src/admin/room/moderation.rs @@ -6,6 +6,7 @@ use conduwuit::{ warn, }; use futures::StreamExt; +use futures::FutureExt; use ruma::{OwnedRoomId, OwnedRoomOrAliasId, RoomAliasId, RoomId, RoomOrAliasId}; use crate::{admin_command, admin_command_dispatch, get_room_info}; @@ -155,7 +156,10 @@ async fn ban_room(&self, room: OwnedRoomOrAliasId) -> Result { evicting admins too)", ); - if let Err(e) = leave_room(self.services, user_id, &room_id, None).await { + if let Err(e) = leave_room(self.services, user_id, &room_id, None) + .boxed() + .await + { warn!("Failed to leave room: {e}"); } @@ -323,7 +327,10 @@ async fn ban_list_of_rooms(&self) -> Result { evicting admins too)", ); - if let Err(e) = leave_room(self.services, user_id, &room_id, None).await { + if let Err(e) = leave_room(self.services, user_id, &room_id, None) + .boxed() + .await + { warn!("Failed to leave room: {e}"); } diff --git a/src/admin/user/commands.rs b/src/admin/user/commands.rs index e5e481e5..acd21bb3 100644 --- a/src/admin/user/commands.rs +++ b/src/admin/user/commands.rs @@ -9,6 +9,7 @@ use conduwuit::{ }; use conduwuit_api::client::{leave_all_rooms, update_avatar_url, update_displayname}; use futures::StreamExt; +use futures::FutureExt; use ruma::{ OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedUserId, UserId, events::{ @@ -655,7 +656,9 @@ pub(super) async fn force_leave_room( return Err!("{user_id} is not joined in the room"); } - leave_room(self.services, &user_id, &room_id, None).await?; + leave_room(self.services, &user_id, &room_id, None) + .boxed() + .await?; self.write_str(&format!("{user_id} has left {room_id}.",)) .await diff --git a/src/api/client/account.rs b/src/api/client/account.rs index 32f2530c..05dfa8b7 100644 --- a/src/api/client/account.rs +++ b/src/api/client/account.rs @@ -763,7 +763,9 @@ pub(crate) async fn deactivate_route( super::update_displayname(&services, sender_user, None, &all_joined_rooms).await; super::update_avatar_url(&services, sender_user, None, None, &all_joined_rooms).await; - full_user_deactivate(&services, sender_user, &all_joined_rooms).await?; + full_user_deactivate(&services, sender_user, &all_joined_rooms) + .boxed() + .await?; info!("User {sender_user} deactivated their account."); @@ -915,7 +917,9 @@ pub async fn full_user_deactivate( } } - super::leave_all_rooms(services, user_id).await; + super::leave_all_rooms(services, user_id) + .boxed() + .await; Ok(()) } diff --git a/src/api/client/membership.rs b/src/api/client/membership.rs index e587d806..584f955d 100644 --- a/src/api/client/membership.rs +++ b/src/api/client/membership.rs @@ -114,7 +114,9 @@ async fn banned_room_check( .collect() .await; - full_user_deactivate(services, user_id, &all_joined_rooms).await?; + full_user_deactivate(services, user_id, &all_joined_rooms) + .boxed() + .await?; } return Err!(Request(Forbidden("This room is banned on this homeserver."))); @@ -153,7 +155,9 @@ async fn banned_room_check( .collect() .await; - full_user_deactivate(services, user_id, &all_joined_rooms).await?; + full_user_deactivate(services, user_id, &all_joined_rooms) + .boxed() + .await?; } return Err!(Request(Forbidden("This remote server is banned on this homeserver."))); @@ -259,6 +263,7 @@ pub(crate) async fn join_room_by_id_or_alias_route( room_id.server_name(), client, ) + .boxed() .await?; let mut servers = body.via.clone(); @@ -478,6 +483,7 @@ pub(crate) async fn leave_room_route( body: Ruma, ) -> Result { leave_room(&services, body.sender_user(), &body.room_id, body.reason.clone()) + .boxed() .await .map(|()| leave_room::v3::Response::new()) } @@ -1792,7 +1798,10 @@ pub async fn leave_all_rooms(services: &Services, user_id: &UserId) { for room_id in all_rooms { // ignore errors - if let Err(e) = leave_room(services, user_id, &room_id, None).await { + if let Err(e) = leave_room(services, user_id, &room_id, None) + .boxed() + .await + { warn!(%user_id, "Failed to leave {room_id} remotely: {e}"); } diff --git a/src/api/client/space.rs b/src/api/client/space.rs index 92768926..23b1e80f 100644 --- a/src/api/client/space.rs +++ b/src/api/client/space.rs @@ -121,7 +121,9 @@ where .map(|(key, val)| (key, val.collect())) .collect(); - if !populate { + if populate { + rooms.push(summary_to_chunk(summary.clone())); + } else { children = children .iter() .rev() @@ -144,10 +146,8 @@ where .collect(); } - if populate { - rooms.push(summary_to_chunk(summary.clone())); - } else if queue.is_empty() && children.is_empty() { - return Err!(Request(InvalidParam("Room IDs in token were not found."))); + if !populate && queue.is_empty() && children.is_empty() { + break; } parents.insert(current_room.clone()); diff --git a/src/api/client/state.rs b/src/api/client/state.rs index 2ddc8f14..96b7e7aa 100644 --- a/src/api/client/state.rs +++ b/src/api/client/state.rs @@ -6,6 +6,7 @@ use conduwuit::{ }; use conduwuit_service::Services; use futures::TryStreamExt; +use futures::FutureExt; use ruma::{ OwnedEventId, RoomId, UserId, api::client::state::{get_state_events, get_state_events_for_key, send_state_event}, @@ -59,6 +60,7 @@ pub(crate) async fn send_state_event_for_empty_key_route( body: Ruma, ) -> Result> { send_state_event_for_key_route(State(services), body) + .boxed() .await .map(RumaResponse) } diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index da82a61c..7eb8c7e1 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -1009,8 +1009,6 @@ async fn calculate_state_incremental<'a>( ) -> Result { let since_shortstatehash = since_shortstatehash.unwrap_or(current_shortstatehash); - let state_changed = since_shortstatehash != current_shortstatehash; - let encrypted_room = services .rooms .state_accessor @@ -1042,7 +1040,7 @@ async fn calculate_state_incremental<'a>( }) .into(); - let state_diff_ids: OptionFuture<_> = (!full_state && state_changed) + let state_diff_ids: OptionFuture<_> = (!full_state) .then(|| { StreamExt::into_future( services diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index d4a10345..9ebfe74c 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -2059,45 +2059,45 @@ fn default_database_backups_to_keep() -> i16 { 1 } fn default_db_write_buffer_capacity_mb() -> f64 { 48.0 + parallelism_scaled_f64(4.0) } -fn default_db_cache_capacity_mb() -> f64 { 128.0 + parallelism_scaled_f64(64.0) } +fn default_db_cache_capacity_mb() -> f64 { 512.0 + parallelism_scaled_f64(512.0) } -fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(10_000).saturating_add(100_000) } +fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(50_000).saturating_add(500_000) } fn default_cache_capacity_modifier() -> f64 { 1.0 } fn default_auth_chain_cache_capacity() -> u32 { - parallelism_scaled_u32(10_000).saturating_add(100_000) + parallelism_scaled_u32(50_000).saturating_add(500_000) } fn default_shorteventid_cache_capacity() -> u32 { - parallelism_scaled_u32(50_000).saturating_add(100_000) -} - -fn default_eventidshort_cache_capacity() -> u32 { - parallelism_scaled_u32(25_000).saturating_add(100_000) -} - -fn default_eventid_pdu_cache_capacity() -> u32 { - parallelism_scaled_u32(25_000).saturating_add(100_000) -} - -fn default_shortstatekey_cache_capacity() -> u32 { - parallelism_scaled_u32(10_000).saturating_add(100_000) -} - -fn default_statekeyshort_cache_capacity() -> u32 { - parallelism_scaled_u32(10_000).saturating_add(100_000) -} - -fn default_servernameevent_data_cache_capacity() -> u32 { parallelism_scaled_u32(100_000).saturating_add(500_000) } -fn default_stateinfo_cache_capacity() -> u32 { parallelism_scaled_u32(100) } +fn default_eventidshort_cache_capacity() -> u32 { + parallelism_scaled_u32(100_000).saturating_add(500_000) +} + +fn default_eventid_pdu_cache_capacity() -> u32 { + parallelism_scaled_u32(50_000).saturating_add(500_000) +} + +fn default_shortstatekey_cache_capacity() -> u32 { + parallelism_scaled_u32(50_000).saturating_add(500_000) +} + +fn default_statekeyshort_cache_capacity() -> u32 { + parallelism_scaled_u32(50_000).saturating_add(500_000) +} + +fn default_servernameevent_data_cache_capacity() -> u32 { + parallelism_scaled_u32(200_000).saturating_add(500_000) +} + +fn default_stateinfo_cache_capacity() -> u32 { parallelism_scaled_u32(5000) } fn default_roomid_spacehierarchy_cache_capacity() -> u32 { parallelism_scaled_u32(1000) } -fn default_dns_cache_entries() -> u32 { 32768 } +fn default_dns_cache_entries() -> u32 { 327680 } fn default_dns_min_ttl() -> u64 { 60 * 180 } diff --git a/src/database/engine/cf_opts.rs b/src/database/engine/cf_opts.rs index cbbd1012..666f9f9e 100644 --- a/src/database/engine/cf_opts.rs +++ b/src/database/engine/cf_opts.rs @@ -29,7 +29,7 @@ fn descriptor_cf_options( set_table_options(&mut opts, &desc, cache)?; opts.set_min_write_buffer_number(1); - opts.set_max_write_buffer_number(2); + opts.set_max_write_buffer_number(3); opts.set_write_buffer_size(desc.write_size); opts.set_target_file_size_base(desc.file_size); diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index 683f5400..6b064424 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -4,7 +4,6 @@ mod execute; mod grant; use std::{ - future::Future, pin::Pin, sync::{Arc, RwLock as StdRwLock, Weak}, }; @@ -14,7 +13,7 @@ use conduwuit::{ Error, PduEvent, Result, Server, debug, err, error, error::default_log, pdu::PduBuilder, }; pub use create::create_admin_room; -use futures::{FutureExt, TryFutureExt}; +use futures::{Future, FutureExt, TryFutureExt}; use loole::{Receiver, Sender}; use ruma::{ OwnedEventId, OwnedRoomId, RoomId, UserId, diff --git a/src/service/resolver/actual.rs b/src/service/resolver/actual.rs index d23ef95a..52cd5d7d 100644 --- a/src/service/resolver/actual.rs +++ b/src/service/resolver/actual.rs @@ -306,28 +306,25 @@ impl super::Service { #[tracing::instrument(name = "srv", level = "debug", skip(self))] async fn query_srv_record(&self, hostname: &'_ str) -> Result> { - let hostnames = - [format!("_matrix-fed._tcp.{hostname}."), format!("_matrix._tcp.{hostname}.")]; + self.services.server.check_running()?; - for hostname in hostnames { - self.services.server.check_running()?; + debug!("querying SRV for {hostname:?}"); - debug!("querying SRV for {hostname:?}"); - let hostname = hostname.trim_end_matches('.'); - match self.resolver.resolver.srv_lookup(hostname).await { - | Err(e) => Self::handle_resolve_error(&e, hostname)?, - | Ok(result) => { - return Ok(result.iter().next().map(|result| { - FedDest::Named( - result.target().to_string().trim_end_matches('.').to_owned(), - format!(":{}", result.port()) - .as_str() - .try_into() - .unwrap_or_else(|_| FedDest::default_port()), - ) - })); - }, - } + let hostname_suffix = format!("_matrix-fed._tcp.{hostname}."); + let hostname = hostname_suffix.trim_end_matches('.'); + match self.resolver.resolver.srv_lookup(hostname).await { + | Err(e) => Self::handle_resolve_error(&e, hostname)?, + | Ok(result) => { + return Ok(result.iter().next().map(|result| { + FedDest::Named( + result.target().to_string().trim_end_matches('.').to_owned(), + format!(":{}", result.port()) + .as_str() + .try_into() + .unwrap_or_else(|_| FedDest::default_port()), + ) + })); + }, } Ok(None)