Compare commits

...

8 commits

Author SHA1 Message Date
Jacob Taylor
6eea30b77a enable converged 6g at the edge in continuwuity
Some checks failed
Release Docker Image / define-variables (push) Failing after 1s
Release Docker Image / build-image (linux/amd64, linux-amd64) (push) Has been skipped
Release Docker Image / build-image (linux/arm64, linux-arm64) (push) Has been skipped
Release Docker Image / merge (push) Has been skipped
Rust Checks / Format (push) Failing after 2s
Rust Checks / Clippy (push) Failing after 8s
Rust Checks / Cargo Test (push) Failing after 10s
2025-06-03 18:41:46 -07:00
nexy7574
bea465eedd Always calculate state diff IDs in syncv3
seemingly fixes #779
2025-06-03 18:41:46 -07:00
Jacob Taylor
2dfd94233c upgrade some settings to enable 5g in continuwuity 2025-06-03 18:41:46 -07:00
Jacob Taylor
d461edcef0 add futures::FutureExt to make cba9ee5240 work 2025-06-03 18:41:46 -07:00
Jason Volk
c574eec9de Mitigate large futures
Signed-off-by: Jason Volk <jason@zemos.net>
2025-06-03 18:41:46 -07:00
Jacob Taylor
0ccb61d341 bump the number of allowed immutable memtables by 1, to allow for greater flood protection
this should probably not be applied if you have rocksdb_atomic_flush = false (the default)
2025-06-03 18:41:46 -07:00
Jacob Taylor
b2aacaaf5b probably incorrectly delete support for non-standardized matrix srv record 2025-06-03 18:41:46 -07:00
Jacob Taylor
3c055d9f57 Fix spaces rooms list load error. rev2 2025-06-03 18:41:46 -07:00
11 changed files with 83 additions and 64 deletions

View file

@ -6,6 +6,7 @@ use conduwuit::{
warn, warn,
}; };
use futures::StreamExt; use futures::StreamExt;
use futures::FutureExt;
use ruma::{OwnedRoomId, OwnedRoomOrAliasId, RoomAliasId, RoomId, RoomOrAliasId}; use ruma::{OwnedRoomId, OwnedRoomOrAliasId, RoomAliasId, RoomId, RoomOrAliasId};
use crate::{admin_command, admin_command_dispatch, get_room_info}; use crate::{admin_command, admin_command_dispatch, get_room_info};
@ -155,7 +156,10 @@ async fn ban_room(&self, room: OwnedRoomOrAliasId) -> Result {
evicting admins too)", evicting admins too)",
); );
if let Err(e) = leave_room(self.services, user_id, &room_id, None).await { if let Err(e) = leave_room(self.services, user_id, &room_id, None)
.boxed()
.await
{
warn!("Failed to leave room: {e}"); warn!("Failed to leave room: {e}");
} }
@ -323,7 +327,10 @@ async fn ban_list_of_rooms(&self) -> Result {
evicting admins too)", evicting admins too)",
); );
if let Err(e) = leave_room(self.services, user_id, &room_id, None).await { if let Err(e) = leave_room(self.services, user_id, &room_id, None)
.boxed()
.await
{
warn!("Failed to leave room: {e}"); warn!("Failed to leave room: {e}");
} }

View file

@ -9,6 +9,7 @@ use conduwuit::{
}; };
use conduwuit_api::client::{leave_all_rooms, update_avatar_url, update_displayname}; use conduwuit_api::client::{leave_all_rooms, update_avatar_url, update_displayname};
use futures::StreamExt; use futures::StreamExt;
use futures::FutureExt;
use ruma::{ use ruma::{
OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedUserId, UserId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedUserId, UserId,
events::{ events::{
@ -655,7 +656,9 @@ pub(super) async fn force_leave_room(
return Err!("{user_id} is not joined in the room"); return Err!("{user_id} is not joined in the room");
} }
leave_room(self.services, &user_id, &room_id, None).await?; leave_room(self.services, &user_id, &room_id, None)
.boxed()
.await?;
self.write_str(&format!("{user_id} has left {room_id}.",)) self.write_str(&format!("{user_id} has left {room_id}.",))
.await .await

View file

@ -763,7 +763,9 @@ pub(crate) async fn deactivate_route(
super::update_displayname(&services, sender_user, None, &all_joined_rooms).await; super::update_displayname(&services, sender_user, None, &all_joined_rooms).await;
super::update_avatar_url(&services, sender_user, None, None, &all_joined_rooms).await; super::update_avatar_url(&services, sender_user, None, None, &all_joined_rooms).await;
full_user_deactivate(&services, sender_user, &all_joined_rooms).await?; full_user_deactivate(&services, sender_user, &all_joined_rooms)
.boxed()
.await?;
info!("User {sender_user} deactivated their account."); info!("User {sender_user} deactivated their account.");
@ -915,7 +917,9 @@ pub async fn full_user_deactivate(
} }
} }
super::leave_all_rooms(services, user_id).await; super::leave_all_rooms(services, user_id)
.boxed()
.await;
Ok(()) Ok(())
} }

View file

@ -114,7 +114,9 @@ async fn banned_room_check(
.collect() .collect()
.await; .await;
full_user_deactivate(services, user_id, &all_joined_rooms).await?; full_user_deactivate(services, user_id, &all_joined_rooms)
.boxed()
.await?;
} }
return Err!(Request(Forbidden("This room is banned on this homeserver."))); return Err!(Request(Forbidden("This room is banned on this homeserver.")));
@ -153,7 +155,9 @@ async fn banned_room_check(
.collect() .collect()
.await; .await;
full_user_deactivate(services, user_id, &all_joined_rooms).await?; full_user_deactivate(services, user_id, &all_joined_rooms)
.boxed()
.await?;
} }
return Err!(Request(Forbidden("This remote server is banned on this homeserver."))); return Err!(Request(Forbidden("This remote server is banned on this homeserver.")));
@ -259,6 +263,7 @@ pub(crate) async fn join_room_by_id_or_alias_route(
room_id.server_name(), room_id.server_name(),
client, client,
) )
.boxed()
.await?; .await?;
let mut servers = body.via.clone(); let mut servers = body.via.clone();
@ -478,6 +483,7 @@ pub(crate) async fn leave_room_route(
body: Ruma<leave_room::v3::Request>, body: Ruma<leave_room::v3::Request>,
) -> Result<leave_room::v3::Response> { ) -> Result<leave_room::v3::Response> {
leave_room(&services, body.sender_user(), &body.room_id, body.reason.clone()) leave_room(&services, body.sender_user(), &body.room_id, body.reason.clone())
.boxed()
.await .await
.map(|()| leave_room::v3::Response::new()) .map(|()| leave_room::v3::Response::new())
} }
@ -1792,7 +1798,10 @@ pub async fn leave_all_rooms(services: &Services, user_id: &UserId) {
for room_id in all_rooms { for room_id in all_rooms {
// ignore errors // ignore errors
if let Err(e) = leave_room(services, user_id, &room_id, None).await { if let Err(e) = leave_room(services, user_id, &room_id, None)
.boxed()
.await
{
warn!(%user_id, "Failed to leave {room_id} remotely: {e}"); warn!(%user_id, "Failed to leave {room_id} remotely: {e}");
} }

View file

@ -121,7 +121,9 @@ where
.map(|(key, val)| (key, val.collect())) .map(|(key, val)| (key, val.collect()))
.collect(); .collect();
if !populate { if populate {
rooms.push(summary_to_chunk(summary.clone()));
} else {
children = children children = children
.iter() .iter()
.rev() .rev()
@ -144,10 +146,8 @@ where
.collect(); .collect();
} }
if populate { if !populate && queue.is_empty() && children.is_empty() {
rooms.push(summary_to_chunk(summary.clone())); break;
} else if queue.is_empty() && children.is_empty() {
return Err!(Request(InvalidParam("Room IDs in token were not found.")));
} }
parents.insert(current_room.clone()); parents.insert(current_room.clone());

View file

@ -6,6 +6,7 @@ use conduwuit::{
}; };
use conduwuit_service::Services; use conduwuit_service::Services;
use futures::TryStreamExt; use futures::TryStreamExt;
use futures::FutureExt;
use ruma::{ use ruma::{
OwnedEventId, RoomId, UserId, OwnedEventId, RoomId, UserId,
api::client::state::{get_state_events, get_state_events_for_key, send_state_event}, api::client::state::{get_state_events, get_state_events_for_key, send_state_event},
@ -59,6 +60,7 @@ pub(crate) async fn send_state_event_for_empty_key_route(
body: Ruma<send_state_event::v3::Request>, body: Ruma<send_state_event::v3::Request>,
) -> Result<RumaResponse<send_state_event::v3::Response>> { ) -> Result<RumaResponse<send_state_event::v3::Response>> {
send_state_event_for_key_route(State(services), body) send_state_event_for_key_route(State(services), body)
.boxed()
.await .await
.map(RumaResponse) .map(RumaResponse)
} }

View file

@ -1009,8 +1009,6 @@ async fn calculate_state_incremental<'a>(
) -> Result<StateChanges> { ) -> Result<StateChanges> {
let since_shortstatehash = since_shortstatehash.unwrap_or(current_shortstatehash); let since_shortstatehash = since_shortstatehash.unwrap_or(current_shortstatehash);
let state_changed = since_shortstatehash != current_shortstatehash;
let encrypted_room = services let encrypted_room = services
.rooms .rooms
.state_accessor .state_accessor
@ -1042,7 +1040,7 @@ async fn calculate_state_incremental<'a>(
}) })
.into(); .into();
let state_diff_ids: OptionFuture<_> = (!full_state && state_changed) let state_diff_ids: OptionFuture<_> = (!full_state)
.then(|| { .then(|| {
StreamExt::into_future( StreamExt::into_future(
services services

View file

@ -2059,45 +2059,45 @@ fn default_database_backups_to_keep() -> i16 { 1 }
fn default_db_write_buffer_capacity_mb() -> f64 { 48.0 + parallelism_scaled_f64(4.0) } fn default_db_write_buffer_capacity_mb() -> f64 { 48.0 + parallelism_scaled_f64(4.0) }
fn default_db_cache_capacity_mb() -> f64 { 128.0 + parallelism_scaled_f64(64.0) } fn default_db_cache_capacity_mb() -> f64 { 512.0 + parallelism_scaled_f64(512.0) }
fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(10_000).saturating_add(100_000) } fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(50_000).saturating_add(500_000) }
fn default_cache_capacity_modifier() -> f64 { 1.0 } fn default_cache_capacity_modifier() -> f64 { 1.0 }
fn default_auth_chain_cache_capacity() -> u32 { fn default_auth_chain_cache_capacity() -> u32 {
parallelism_scaled_u32(10_000).saturating_add(100_000) parallelism_scaled_u32(50_000).saturating_add(500_000)
} }
fn default_shorteventid_cache_capacity() -> u32 { fn default_shorteventid_cache_capacity() -> u32 {
parallelism_scaled_u32(50_000).saturating_add(100_000)
}
fn default_eventidshort_cache_capacity() -> u32 {
parallelism_scaled_u32(25_000).saturating_add(100_000)
}
fn default_eventid_pdu_cache_capacity() -> u32 {
parallelism_scaled_u32(25_000).saturating_add(100_000)
}
fn default_shortstatekey_cache_capacity() -> u32 {
parallelism_scaled_u32(10_000).saturating_add(100_000)
}
fn default_statekeyshort_cache_capacity() -> u32 {
parallelism_scaled_u32(10_000).saturating_add(100_000)
}
fn default_servernameevent_data_cache_capacity() -> u32 {
parallelism_scaled_u32(100_000).saturating_add(500_000) parallelism_scaled_u32(100_000).saturating_add(500_000)
} }
fn default_stateinfo_cache_capacity() -> u32 { parallelism_scaled_u32(100) } fn default_eventidshort_cache_capacity() -> u32 {
parallelism_scaled_u32(100_000).saturating_add(500_000)
}
fn default_eventid_pdu_cache_capacity() -> u32 {
parallelism_scaled_u32(50_000).saturating_add(500_000)
}
fn default_shortstatekey_cache_capacity() -> u32 {
parallelism_scaled_u32(50_000).saturating_add(500_000)
}
fn default_statekeyshort_cache_capacity() -> u32 {
parallelism_scaled_u32(50_000).saturating_add(500_000)
}
fn default_servernameevent_data_cache_capacity() -> u32 {
parallelism_scaled_u32(200_000).saturating_add(500_000)
}
fn default_stateinfo_cache_capacity() -> u32 { parallelism_scaled_u32(5000) }
fn default_roomid_spacehierarchy_cache_capacity() -> u32 { parallelism_scaled_u32(1000) } fn default_roomid_spacehierarchy_cache_capacity() -> u32 { parallelism_scaled_u32(1000) }
fn default_dns_cache_entries() -> u32 { 32768 } fn default_dns_cache_entries() -> u32 { 327680 }
fn default_dns_min_ttl() -> u64 { 60 * 180 } fn default_dns_min_ttl() -> u64 { 60 * 180 }

View file

@ -29,7 +29,7 @@ fn descriptor_cf_options(
set_table_options(&mut opts, &desc, cache)?; set_table_options(&mut opts, &desc, cache)?;
opts.set_min_write_buffer_number(1); opts.set_min_write_buffer_number(1);
opts.set_max_write_buffer_number(2); opts.set_max_write_buffer_number(3);
opts.set_write_buffer_size(desc.write_size); opts.set_write_buffer_size(desc.write_size);
opts.set_target_file_size_base(desc.file_size); opts.set_target_file_size_base(desc.file_size);

View file

@ -4,7 +4,6 @@ mod execute;
mod grant; mod grant;
use std::{ use std::{
future::Future,
pin::Pin, pin::Pin,
sync::{Arc, RwLock as StdRwLock, Weak}, sync::{Arc, RwLock as StdRwLock, Weak},
}; };
@ -14,7 +13,7 @@ use conduwuit::{
Error, PduEvent, Result, Server, debug, err, error, error::default_log, pdu::PduBuilder, Error, PduEvent, Result, Server, debug, err, error, error::default_log, pdu::PduBuilder,
}; };
pub use create::create_admin_room; pub use create::create_admin_room;
use futures::{FutureExt, TryFutureExt}; use futures::{Future, FutureExt, TryFutureExt};
use loole::{Receiver, Sender}; use loole::{Receiver, Sender};
use ruma::{ use ruma::{
OwnedEventId, OwnedRoomId, RoomId, UserId, OwnedEventId, OwnedRoomId, RoomId, UserId,

View file

@ -306,28 +306,25 @@ impl super::Service {
#[tracing::instrument(name = "srv", level = "debug", skip(self))] #[tracing::instrument(name = "srv", level = "debug", skip(self))]
async fn query_srv_record(&self, hostname: &'_ str) -> Result<Option<FedDest>> { async fn query_srv_record(&self, hostname: &'_ str) -> Result<Option<FedDest>> {
let hostnames = self.services.server.check_running()?;
[format!("_matrix-fed._tcp.{hostname}."), format!("_matrix._tcp.{hostname}.")];
for hostname in hostnames { debug!("querying SRV for {hostname:?}");
self.services.server.check_running()?;
debug!("querying SRV for {hostname:?}"); let hostname_suffix = format!("_matrix-fed._tcp.{hostname}.");
let hostname = hostname.trim_end_matches('.'); let hostname = hostname_suffix.trim_end_matches('.');
match self.resolver.resolver.srv_lookup(hostname).await { match self.resolver.resolver.srv_lookup(hostname).await {
| Err(e) => Self::handle_resolve_error(&e, hostname)?, | Err(e) => Self::handle_resolve_error(&e, hostname)?,
| Ok(result) => { | Ok(result) => {
return Ok(result.iter().next().map(|result| { return Ok(result.iter().next().map(|result| {
FedDest::Named( FedDest::Named(
result.target().to_string().trim_end_matches('.').to_owned(), result.target().to_string().trim_end_matches('.').to_owned(),
format!(":{}", result.port()) format!(":{}", result.port())
.as_str() .as_str()
.try_into() .try_into()
.unwrap_or_else(|_| FedDest::default_port()), .unwrap_or_else(|_| FedDest::default_port()),
) )
})); }));
}, },
}
} }
Ok(None) Ok(None)