Compare commits

...

24 commits

Author SHA1 Message Date
Jacob Taylor
f6f9730e19 sender_workers scaling. this time, with feeling!
Some checks failed
Release Docker Image / define-variables (push) Failing after 1s
Release Docker Image / build-image (linux/amd64, release, linux-amd64, base) (push) Has been skipped
Release Docker Image / build-image (linux/arm64, release, linux-arm64, base) (push) Has been skipped
Release Docker Image / merge (push) Has been skipped
Rust Checks / Format (push) Failing after 3s
Rust Checks / Clippy (push) Failing after 8s
Rust Checks / Cargo Test (push) Failing after 16s
2025-06-21 08:13:30 -07:00
Jacob Taylor
3fae827e2b vehicle loan documentation now available at window 7 2025-06-21 08:06:15 -07:00
Jacob Taylor
d157ff25a4 lock the getter instead ??? c/o M 2025-06-21 08:04:00 -07:00
Jacob Taylor
d88dd042f4 make fetching key room events less smart 2025-06-21 08:02:52 -07:00
Jacob Taylor
af5330f642 change rocksdb stats level to 3
scale rocksdb background jobs and subcompactions

change rocksdb default error level to info from error

delete unused num_threads function

fix warns from cargo
2025-06-21 08:02:49 -07:00
nexy7574
da659f17ab modify more log strings so they're more useful than not 2025-06-21 08:02:28 -07:00
nexy7574
d3321390df When in doubt, log all the things 2025-06-21 08:02:28 -07:00
nexy7574
a02a82f316 log which room struggled to get mainline depth 2025-06-21 08:02:28 -07:00
nexy7574
4e92962694 more logs 2025-06-21 08:02:28 -07:00
nexy7574
86f369d8d6 Unsafe, untested, and potentially overeager PDU sanity checks 2025-06-21 08:02:28 -07:00
nexy7574
01d1224d1c Fix room ID check 2025-06-21 08:02:28 -07:00
nexy7574
65e2061447 Kick up a fuss when m.room.create is unfindable 2025-06-21 08:02:28 -07:00
nexy7574
9246f3be44 Note about ruma#2064 in TODO 2025-06-21 08:02:28 -07:00
nexy7574
c23912fd42 fix an auth rule not applying correctly 2025-06-21 08:02:28 -07:00
nexy7574
94c4d85716 Always calculate state diff IDs in syncv3
seemingly fixes #779
2025-06-21 08:02:28 -07:00
Jacob Taylor
6ef00341ea upgrade some settings to enable 5g in continuwuity
enable converged 6g at the edge in continuwuity

better stateinfo_cache_capacity default

better roomid_spacehierarchy_cache_capacity

make sender workers default better and clamp value to core count

update sender workers documentation

add more parallelism_scaled and make them public

update 1 document
2025-06-21 08:02:05 -07:00
Jacob Taylor
45ddec699a add futures::FutureExt to make cb15ac3c01 work 2025-06-21 07:52:20 -07:00
Jason Volk
ec8abacbf3 Mitigate large futures
Signed-off-by: Jason Volk <jason@zemos.net>
2025-06-21 07:52:20 -07:00
Jacob Taylor
43c5f2572e bump the number of allowed immutable memtables by 1, to allow for greater flood protection
this should probably not be applied if you have rocksdb_atomic_flush = false (the default)
2025-06-21 07:52:20 -07:00
Jacob Taylor
d263c52ddc probably incorrectly delete support for non-standardized matrix srv record 2025-06-21 07:52:20 -07:00
Jacob Taylor
4b0f2c3dfa Fix spaces rooms list load error. rev2 2025-06-21 07:52:20 -07:00
Jade Ellis
b5a6b56d53 fix: Filter out invalid replacements from bundled aggregations 2025-06-21 07:52:20 -07:00
Jade Ellis
5528911e2a feat: Add bundled aggregations support
Add support for the m.replace and m.reference bundled
aggregations.
This should fix plenty of subtle client issues.
Threads are not included in the new code as they have
historically been written to the database. Replacing the
old system would result in issues when switching away from
continuwuity, so saved for later.
Some TODOs have been left re event visibility and ignored users.
These should be OK for now, though.
2025-06-21 07:52:20 -07:00
Jade Ellis
1b08e0e33d refactor: Promote handling unsigned data out of timeline
Also fixes:
- Transaction IDs leaking in event route
- Age not being set for event relations or threads
- Both of the above for search results

Notes down concern with relations table
2025-06-21 07:52:20 -07:00
38 changed files with 1149 additions and 228 deletions

View file

@ -990,7 +990,7 @@
# 3 to 5 = Statistics with possible performance impact.
# 6 = All statistics.
#
#rocksdb_stats_level = 1
#rocksdb_stats_level = 3
# This is a password that can be configured that will let you login to the
# server bot account (currently `@conduit`) for emergency troubleshooting
@ -1590,11 +1590,9 @@
#stream_amplification = 1024
# Number of sender task workers; determines sender parallelism. Default is
# '0' which means the value is determined internally, likely matching the
# number of tokio worker-threads or number of cores, etc. Override by
# setting a non-zero value.
# number of CPU cores. Override by setting a different value.
#
#sender_workers = 0
#sender_workers = 4
# Enables listener sockets; can be set to false to disable listening. This
# option is intended for developer/diagnostic purposes only.

View file

@ -31,7 +31,7 @@ pub(super) async fn last(&self, room_id: OwnedRoomOrAliasId) -> Result {
.services
.rooms
.timeline
.last_timeline_count(None, &room_id)
.last_timeline_count(&room_id)
.await?;
self.write_str(&format!("{result:#?}")).await
@ -52,7 +52,7 @@ pub(super) async fn pdus(
.services
.rooms
.timeline
.pdus_rev(None, &room_id, from)
.pdus_rev(&room_id, from)
.try_take(limit.unwrap_or(3))
.try_collect()
.await?;

View file

@ -6,6 +6,7 @@ use conduwuit::{
warn,
};
use futures::StreamExt;
use futures::FutureExt;
use ruma::{OwnedRoomId, OwnedRoomOrAliasId, RoomAliasId, RoomId, RoomOrAliasId};
use crate::{admin_command, admin_command_dispatch, get_room_info};
@ -155,7 +156,10 @@ async fn ban_room(&self, room: OwnedRoomOrAliasId) -> Result {
evicting admins too)",
);
if let Err(e) = leave_room(self.services, user_id, &room_id, None).await {
if let Err(e) = leave_room(self.services, user_id, &room_id, None)
.boxed()
.await
{
warn!("Failed to leave room: {e}");
}
@ -323,7 +327,10 @@ async fn ban_list_of_rooms(&self) -> Result {
evicting admins too)",
);
if let Err(e) = leave_room(self.services, user_id, &room_id, None).await {
if let Err(e) = leave_room(self.services, user_id, &room_id, None)
.boxed()
.await
{
warn!("Failed to leave room: {e}");
}

View file

@ -9,6 +9,7 @@ use conduwuit::{
};
use conduwuit_api::client::{leave_all_rooms, update_avatar_url, update_displayname};
use futures::StreamExt;
use futures::FutureExt;
use ruma::{
OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedUserId, UserId,
events::{
@ -655,7 +656,9 @@ pub(super) async fn force_leave_room(
return Err!("{user_id} is not joined in the room");
}
leave_room(self.services, &user_id, &room_id, None).await?;
leave_room(self.services, &user_id, &room_id, None)
.boxed()
.await?;
self.write_str(&format!("{user_id} has left {room_id}.",))
.await

View file

@ -763,7 +763,9 @@ pub(crate) async fn deactivate_route(
super::update_displayname(&services, sender_user, None, &all_joined_rooms).await;
super::update_avatar_url(&services, sender_user, None, None, &all_joined_rooms).await;
full_user_deactivate(&services, sender_user, &all_joined_rooms).await?;
full_user_deactivate(&services, sender_user, &all_joined_rooms)
.boxed()
.await?;
info!("User {sender_user} deactivated their account.");
@ -915,7 +917,9 @@ pub async fn full_user_deactivate(
}
}
super::leave_all_rooms(services, user_id).await;
super::leave_all_rooms(services, user_id)
.boxed()
.await;
Ok(())
}

View file

@ -84,11 +84,25 @@ pub(crate) async fn get_context_route(
let base_event = ignored_filter(&services, (base_count, base_pdu), sender_user);
// PDUs are used to get seen user IDs and then returned in response.
let events_before = services
.rooms
.timeline
.pdus_rev(Some(sender_user), room_id, Some(base_count))
.pdus_rev(room_id, Some(base_count))
.ignore_err()
.then(async |mut pdu| {
pdu.1.set_unsigned(Some(sender_user));
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
.await
{
debug_warn!("Failed to add bundled aggregations: {e}");
}
pdu
})
.ready_filter_map(|item| event_filter(item, filter))
.wide_filter_map(|item| ignored_filter(&services, item, sender_user))
.wide_filter_map(|item| visibility_filter(&services, item, sender_user))
@ -98,8 +112,20 @@ pub(crate) async fn get_context_route(
let events_after = services
.rooms
.timeline
.pdus(Some(sender_user), room_id, Some(base_count))
.pdus(room_id, Some(base_count))
.ignore_err()
.then(async |mut pdu| {
pdu.1.set_unsigned(Some(sender_user));
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
.await
{
debug_warn!("Failed to add bundled aggregations: {e}");
}
pdu
})
.ready_filter_map(|item| event_filter(item, filter))
.wide_filter_map(|item| ignored_filter(&services, item, sender_user))
.wide_filter_map(|item| visibility_filter(&services, item, sender_user))

View file

@ -114,7 +114,9 @@ async fn banned_room_check(
.collect()
.await;
full_user_deactivate(services, user_id, &all_joined_rooms).await?;
full_user_deactivate(services, user_id, &all_joined_rooms)
.boxed()
.await?;
}
return Err!(Request(Forbidden("This room is banned on this homeserver.")));
@ -153,7 +155,9 @@ async fn banned_room_check(
.collect()
.await;
full_user_deactivate(services, user_id, &all_joined_rooms).await?;
full_user_deactivate(services, user_id, &all_joined_rooms)
.boxed()
.await?;
}
return Err!(Request(Forbidden("This remote server is banned on this homeserver.")));
@ -259,6 +263,7 @@ pub(crate) async fn join_room_by_id_or_alias_route(
room_id.server_name(),
client,
)
.boxed()
.await?;
let mut servers = body.via.clone();
@ -478,6 +483,7 @@ pub(crate) async fn leave_room_route(
body: Ruma<leave_room::v3::Request>,
) -> Result<leave_room::v3::Response> {
leave_room(&services, body.sender_user(), &body.room_id, body.reason.clone())
.boxed()
.await
.map(|()| leave_room::v3::Response::new())
}
@ -1243,6 +1249,7 @@ async fn join_room_by_id_helper_remote(
services.rooms.timeline.get_pdu(event_id).await.ok()
};
debug!("running stateres check on send_join parsed PDU");
let auth_check = state_res::event_auth::auth_check(
&state_res::RoomVersion::new(&room_version_id)?,
&parsed_join_pdu,
@ -1792,7 +1799,10 @@ pub async fn leave_all_rooms(services: &Services, user_id: &UserId) {
for room_id in all_rooms {
// ignore errors
if let Err(e) = leave_room(services, user_id, &room_id, None).await {
if let Err(e) = leave_room(services, user_id, &room_id, None)
.boxed()
.await
{
warn!(%user_id, "Failed to leave {room_id} remotely: {e}");
}

View file

@ -2,7 +2,7 @@ use core::panic;
use axum::extract::State;
use conduwuit::{
Err, Result, at,
Err, Result, at, debug_warn,
matrix::{
Event,
pdu::{PduCount, PduEvent},
@ -114,14 +114,14 @@ pub(crate) async fn get_message_events_route(
| Direction::Forward => services
.rooms
.timeline
.pdus(Some(sender_user), room_id, Some(from))
.pdus(room_id, Some(from))
.ignore_err()
.boxed(),
| Direction::Backward => services
.rooms
.timeline
.pdus_rev(Some(sender_user), room_id, Some(from))
.pdus_rev(room_id, Some(from))
.ignore_err()
.boxed(),
};
@ -132,6 +132,18 @@ pub(crate) async fn get_message_events_route(
.wide_filter_map(|item| ignored_filter(&services, item, sender_user))
.wide_filter_map(|item| visibility_filter(&services, item, sender_user))
.take(limit)
.then(async |mut pdu| {
pdu.1.set_unsigned(Some(sender_user));
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
.await
{
debug_warn!("Failed to add bundled aggregations: {e}");
}
pdu
})
.collect()
.await;

View file

@ -1,6 +1,6 @@
use axum::extract::State;
use conduwuit::{
Result, at,
Result, at, debug_warn,
matrix::pdu::PduCount,
utils::{IterStream, ReadyExt, result::FlatOk, stream::WidebandExt},
};
@ -149,6 +149,17 @@ async fn paginate_relations_with_filter(
.ready_take_while(|(count, _)| Some(*count) != to)
.wide_filter_map(|item| visibility_filter(services, sender_user, item))
.take(limit)
.then(async |mut pdu| {
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
.await
{
debug_warn!("Failed to add bundled aggregations to relation: {e}");
}
pdu
})
.collect()
.await;
@ -172,6 +183,10 @@ async fn paginate_relations_with_filter(
})
}
// TODO: Can we move the visibility filter lower down, to avoid checking events
// that won't be sent? At the moment this also results in getting events that
// appear to have no relation because intermediaries are not visible to the
// user.
async fn visibility_filter(
services: &Services,
sender_user: &UserId,

View file

@ -1,5 +1,5 @@
use axum::extract::State;
use conduwuit::{Err, Event, Result, err};
use conduwuit::{Err, Event, Result, debug_warn, err};
use futures::{FutureExt, TryFutureExt, future::try_join};
use ruma::api::client::room::get_room_event;
@ -38,7 +38,16 @@ pub(crate) async fn get_room_event_route(
"Fetched PDU must match requested"
);
event.add_age().ok();
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(body.sender_user(), &mut event)
.await
{
debug_warn!("Failed to add bundled aggregations to event: {e}");
}
event.set_unsigned(body.sender_user.as_deref());
Ok(get_room_event::v3::Response { event: event.into_room_event() })
}

View file

@ -1,6 +1,6 @@
use axum::extract::State;
use conduwuit::{
Err, PduEvent, Result, at,
Err, PduEvent, Result, at, debug_warn,
utils::{BoolExt, stream::TryTools},
};
use futures::TryStreamExt;
@ -25,12 +25,28 @@ pub(crate) async fn room_initial_sync_route(
return Err!(Request(Forbidden("No room preview available.")));
}
// Events are returned in body
let limit = LIMIT_MAX;
let events: Vec<_> = services
.rooms
.timeline
.pdus_rev(None, room_id, None)
.pdus_rev(room_id, None)
.try_take(limit)
.and_then(async |mut pdu| {
pdu.1.set_unsigned(body.sender_user.as_deref());
if let Some(sender_user) = body.sender_user.as_deref() {
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
.await
{
debug_warn!("Failed to add bundled aggregations: {e}");
}
}
Ok(pdu)
})
.try_collect()
.await?;

View file

@ -2,7 +2,7 @@ use std::collections::BTreeMap;
use axum::extract::State;
use conduwuit::{
Err, Result, at, is_true,
Err, Result, at, debug_warn, is_true,
matrix::pdu::PduEvent,
result::FlatOk,
utils::{IterStream, stream::ReadyExt},
@ -144,6 +144,17 @@ async fn category_room_events(
.map(at!(2))
.flatten()
.stream()
.then(|mut pdu| async {
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu)
.await
{
debug_warn!("Failed to add bundled aggregations to search result: {e}");
}
pdu
})
.map(PduEvent::into_room_event)
.map(|result| SearchResult {
rank: None,

View file

@ -121,7 +121,9 @@ where
.map(|(key, val)| (key, val.collect()))
.collect();
if !populate {
if populate {
rooms.push(summary_to_chunk(summary.clone()));
} else {
children = children
.iter()
.rev()
@ -144,10 +146,8 @@ where
.collect();
}
if populate {
rooms.push(summary_to_chunk(summary.clone()));
} else if queue.is_empty() && children.is_empty() {
return Err!(Request(InvalidParam("Room IDs in token were not found.")));
if !populate && queue.is_empty() && children.is_empty() {
break;
}
parents.insert(current_room.clone());

View file

@ -6,6 +6,7 @@ use conduwuit::{
};
use conduwuit_service::Services;
use futures::TryStreamExt;
use futures::FutureExt;
use ruma::{
OwnedEventId, RoomId, UserId,
api::client::state::{get_state_events, get_state_events_for_key, send_state_event},
@ -59,6 +60,7 @@ pub(crate) async fn send_state_event_for_empty_key_route(
body: Ruma<send_state_event::v3::Request>,
) -> Result<RumaResponse<send_state_event::v3::Response>> {
send_state_event_for_key_route(State(services), body)
.boxed()
.await
.map(RumaResponse)
}

View file

@ -3,7 +3,7 @@ mod v4;
mod v5;
use conduwuit::{
Error, PduCount, Result,
Error, PduCount, Result, debug_warn,
matrix::pdu::PduEvent,
utils::stream::{BroadbandExt, ReadyExt, TryIgnore},
};
@ -31,11 +31,7 @@ async fn load_timeline(
next_batch: Option<PduCount>,
limit: usize,
) -> Result<(Vec<(PduCount, PduEvent)>, bool), Error> {
let last_timeline_count = services
.rooms
.timeline
.last_timeline_count(Some(sender_user), room_id)
.await?;
let last_timeline_count = services.rooms.timeline.last_timeline_count(room_id).await?;
if last_timeline_count <= roomsincecount {
return Ok((Vec::new(), false));
@ -44,10 +40,25 @@ async fn load_timeline(
let non_timeline_pdus = services
.rooms
.timeline
.pdus_rev(Some(sender_user), room_id, None)
.pdus_rev(room_id, None)
.ignore_err()
.ready_skip_while(|&(pducount, _)| pducount > next_batch.unwrap_or_else(PduCount::max))
.ready_take_while(|&(pducount, _)| pducount > roomsincecount);
.ready_take_while(|&(pducount, _)| pducount > roomsincecount)
.map(move |mut pdu| {
pdu.1.set_unsigned(Some(sender_user));
pdu
})
.then(async move |mut pdu| {
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
.await
{
debug_warn!("Failed to add bundled aggregations: {e}");
}
pdu
});
// Take the last events for the timeline
pin_mut!(non_timeline_pdus);

View file

@ -1009,8 +1009,6 @@ async fn calculate_state_incremental<'a>(
) -> Result<StateChanges> {
let since_shortstatehash = since_shortstatehash.unwrap_or(current_shortstatehash);
let state_changed = since_shortstatehash != current_shortstatehash;
let encrypted_room = services
.rooms
.state_accessor
@ -1042,7 +1040,7 @@ async fn calculate_state_incremental<'a>(
})
.into();
let state_diff_ids: OptionFuture<_> = (!full_state && state_changed)
let state_diff_ids: OptionFuture<_> = (!full_state)
.then(|| {
StreamExt::into_future(
services
@ -1191,7 +1189,7 @@ async fn calculate_heroes(
services
.rooms
.timeline
.all_pdus(sender_user, room_id)
.all_pdus(room_id)
.ready_filter(|(_, pdu)| pdu.kind == RoomMember)
.fold_default(|heroes: Vec<_>, (_, pdu)| {
fold_hero(heroes, services, room_id, sender_user, pdu)

View file

@ -1,6 +1,6 @@
use axum::extract::State;
use conduwuit::{
Result, at,
Result, at, debug_warn,
matrix::pdu::{PduCount, PduEvent},
};
use futures::StreamExt;
@ -28,6 +28,8 @@ pub(crate) async fn get_threads_route(
.transpose()?
.unwrap_or_else(PduCount::max);
// TODO: user_can_see_event and set_unsigned should be at the same level /
// function, so unsigned is only set for seen events.
let threads: Vec<(PduCount, PduEvent)> = services
.rooms
.threads
@ -42,6 +44,17 @@ pub(crate) async fn get_threads_route(
.await
.then_some((count, pdu))
})
.then(|(count, mut pdu)| async move {
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(body.sender_user(), &mut pdu)
.await
{
debug_warn!("Failed to add bundled aggregations to thread: {e}");
}
(count, pdu)
})
.collect()
.await;

View file

@ -3,6 +3,7 @@ use std::cmp;
use axum::extract::State;
use conduwuit::{
PduCount, Result,
result::LogErr,
utils::{IterStream, ReadyExt, stream::TryTools},
};
use futures::{FutureExt, StreamExt, TryStreamExt};
@ -62,7 +63,7 @@ pub(crate) async fn get_backfill_route(
pdus: services
.rooms
.timeline
.pdus_rev(None, &body.room_id, Some(from.saturating_add(1)))
.pdus_rev(&body.room_id, Some(from.saturating_add(1)))
.try_take(limit)
.try_filter_map(|(_, pdu)| async move {
Ok(services
@ -72,6 +73,15 @@ pub(crate) async fn get_backfill_route(
.await
.then_some(pdu))
})
.and_then(async |mut pdu| {
// Strip the transaction ID, as that is private
pdu.remove_transaction_id().log_err().ok();
// Add age, as this is specified
pdu.add_age().log_err().ok();
// It's not clear if we should strip or add any more data, leave as is.
// In particular: Redaction?
Ok(pdu)
})
.try_filter_map(|pdu| async move {
Ok(services
.rooms

View file

@ -1155,7 +1155,7 @@ pub struct Config {
/// 3 to 5 = Statistics with possible performance impact.
/// 6 = All statistics.
///
/// default: 1
/// default: 3
#[serde(default = "default_rocksdb_stats_level")]
pub rocksdb_stats_level: u8,
@ -1823,12 +1823,10 @@ pub struct Config {
pub stream_amplification: usize,
/// Number of sender task workers; determines sender parallelism. Default is
/// '0' which means the value is determined internally, likely matching the
/// number of tokio worker-threads or number of cores, etc. Override by
/// setting a non-zero value.
/// core count. Override by setting a different value.
///
/// default: 0
#[serde(default)]
/// default: core count
#[serde(default = "default_sender_workers")]
pub sender_workers: usize,
/// Enables listener sockets; can be set to false to disable listening. This
@ -2059,45 +2057,47 @@ fn default_database_backups_to_keep() -> i16 { 1 }
fn default_db_write_buffer_capacity_mb() -> f64 { 48.0 + parallelism_scaled_f64(4.0) }
fn default_db_cache_capacity_mb() -> f64 { 128.0 + parallelism_scaled_f64(64.0) }
fn default_db_cache_capacity_mb() -> f64 { 512.0 + parallelism_scaled_f64(512.0) }
fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(10_000).saturating_add(100_000) }
fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(50_000).saturating_add(500_000) }
fn default_cache_capacity_modifier() -> f64 { 1.0 }
fn default_auth_chain_cache_capacity() -> u32 {
parallelism_scaled_u32(10_000).saturating_add(100_000)
parallelism_scaled_u32(50_000).saturating_add(500_000)
}
fn default_shorteventid_cache_capacity() -> u32 {
parallelism_scaled_u32(50_000).saturating_add(100_000)
}
fn default_eventidshort_cache_capacity() -> u32 {
parallelism_scaled_u32(25_000).saturating_add(100_000)
}
fn default_eventid_pdu_cache_capacity() -> u32 {
parallelism_scaled_u32(25_000).saturating_add(100_000)
}
fn default_shortstatekey_cache_capacity() -> u32 {
parallelism_scaled_u32(10_000).saturating_add(100_000)
}
fn default_statekeyshort_cache_capacity() -> u32 {
parallelism_scaled_u32(10_000).saturating_add(100_000)
}
fn default_servernameevent_data_cache_capacity() -> u32 {
parallelism_scaled_u32(100_000).saturating_add(500_000)
}
fn default_stateinfo_cache_capacity() -> u32 { parallelism_scaled_u32(100) }
fn default_eventidshort_cache_capacity() -> u32 {
parallelism_scaled_u32(100_000).saturating_add(500_000)
}
fn default_roomid_spacehierarchy_cache_capacity() -> u32 { parallelism_scaled_u32(1000) }
fn default_eventid_pdu_cache_capacity() -> u32 {
parallelism_scaled_u32(50_000).saturating_add(500_000)
}
fn default_dns_cache_entries() -> u32 { 32768 }
fn default_shortstatekey_cache_capacity() -> u32 {
parallelism_scaled_u32(50_000).saturating_add(500_000)
}
fn default_statekeyshort_cache_capacity() -> u32 {
parallelism_scaled_u32(50_000).saturating_add(500_000)
}
fn default_servernameevent_data_cache_capacity() -> u32 {
parallelism_scaled_u32(200_000).saturating_add(500_000)
}
fn default_stateinfo_cache_capacity() -> u32 {
parallelism_scaled_u32(500).clamp(100, 12000) }
fn default_roomid_spacehierarchy_cache_capacity() -> u32 {
parallelism_scaled_u32(500).clamp(100, 12000) }
fn default_dns_cache_entries() -> u32 { 327680 }
fn default_dns_min_ttl() -> u64 { 60 * 180 }
@ -2199,7 +2199,7 @@ fn default_typing_client_timeout_max_s() -> u64 { 45 }
fn default_rocksdb_recovery_mode() -> u8 { 1 }
fn default_rocksdb_log_level() -> String { "error".to_owned() }
fn default_rocksdb_log_level() -> String { "info".to_owned() }
fn default_rocksdb_log_time_to_roll() -> usize { 0 }
@ -2231,7 +2231,7 @@ fn default_rocksdb_compression_level() -> i32 { 32767 }
#[allow(clippy::doc_markdown)]
fn default_rocksdb_bottommost_compression_level() -> i32 { 32767 }
fn default_rocksdb_stats_level() -> u8 { 1 }
fn default_rocksdb_stats_level() -> u8 { 3 }
// I know, it's a great name
#[must_use]
@ -2286,14 +2286,13 @@ fn default_admin_log_capture() -> String {
fn default_admin_room_tag() -> String { "m.server_notice".to_owned() }
#[allow(clippy::as_conversions, clippy::cast_precision_loss)]
fn parallelism_scaled_f64(val: f64) -> f64 { val * (sys::available_parallelism() as f64) }
pub fn parallelism_scaled_f64(val: f64) -> f64 { val * (sys::available_parallelism() as f64) }
fn parallelism_scaled_u32(val: u32) -> u32 {
let val = val.try_into().expect("failed to cast u32 to usize");
parallelism_scaled(val).try_into().unwrap_or(u32::MAX)
}
pub fn parallelism_scaled_u32(val: u32) -> u32 { val.saturating_mul(sys::available_parallelism() as u32) }
fn parallelism_scaled(val: usize) -> usize { val.saturating_mul(sys::available_parallelism()) }
pub fn parallelism_scaled_i32(val: i32) -> i32 { val.saturating_mul(sys::available_parallelism() as i32) }
pub fn parallelism_scaled(val: usize) -> usize { val.saturating_mul(sys::available_parallelism()) }
fn default_trusted_server_batch_size() -> usize { 256 }
@ -2313,6 +2312,8 @@ fn default_stream_width_scale() -> f32 { 1.0 }
fn default_stream_amplification() -> usize { 1024 }
fn default_sender_workers() -> usize { parallelism_scaled(1) }
fn default_client_receive_timeout() -> u64 { 75 }
fn default_client_request_timeout() -> u64 { 180 }

View file

@ -1,11 +1,24 @@
use std::collections::BTreeMap;
use std::{borrow::Borrow, collections::BTreeMap};
use ruma::MilliSecondsSinceUnixEpoch;
use serde::Deserialize;
use serde_json::value::{RawValue as RawJsonValue, Value as JsonValue, to_raw_value};
use super::Pdu;
use crate::{Result, err, implement, is_true};
use crate::{Result, err, implement, is_true, result::LogErr};
/// Set the `unsigned` field of the PDU using only information in the PDU.
/// Some unsigned data is already set within the database (eg. prev events,
/// threads). Once this is done, other data must be calculated from the database
/// (eg. relations) This is for server-to-client events.
/// Backfill handles this itself.
#[implement(Pdu)]
pub fn set_unsigned(&mut self, user_id: Option<&ruma::UserId>) {
if Some(self.sender.borrow()) != user_id {
self.remove_transaction_id().log_err().ok();
}
self.add_age().log_err().ok();
}
#[implement(Pdu)]
pub fn remove_transaction_id(&mut self) -> Result {

View file

@ -13,6 +13,7 @@ use ruma::{
power_levels::RoomPowerLevelsEventContent,
third_party_invite::RoomThirdPartyInviteEventContent,
},
EventId,
int,
serde::{Base64, Raw},
};
@ -21,7 +22,6 @@ use serde::{
de::{Error as _, IgnoredAny},
};
use serde_json::{from_str as from_json_str, value::RawValue as RawJsonValue};
use super::{
Error, Event, Result, StateEventType, StateKey, TimelineEventType,
power_levels::{
@ -217,8 +217,9 @@ where
}
/*
// TODO: In the past this code caused problems federating with synapse, maybe this has been
// resolved already. Needs testing.
// TODO: In the past this code was commented as it caused problems with Synapse. This is no
// longer the case. This needs to be implemented.
// See also: https://github.com/ruma/ruma/pull/2064
//
// 2. Reject if auth_events
// a. auth_events cannot have duplicate keys since it's a BTree
@ -241,20 +242,46 @@ where
}
*/
let (room_create_event, power_levels_event, sender_member_event) = join3(
fetch_state(&StateEventType::RoomCreate, ""),
fetch_state(&StateEventType::RoomPowerLevels, ""),
fetch_state(&StateEventType::RoomMember, sender.as_str()),
)
.await;
// let (room_create_event, power_levels_event, sender_member_event) = join3(
// fetch_state(&StateEventType::RoomCreate, ""),
// fetch_state(&StateEventType::RoomPowerLevels, ""),
// fetch_state(&StateEventType::RoomMember, sender.as_str()),
// )
// .await;
let room_create_event = fetch_state(&StateEventType::RoomCreate, "").await;
let power_levels_event = fetch_state(&StateEventType::RoomPowerLevels, "").await;
let sender_member_event = fetch_state(&StateEventType::RoomMember, sender.as_str()).await;
let room_create_event = match room_create_event {
| None => {
warn!("no m.room.create event in auth chain");
error!(
create_event = room_create_event.as_ref().map(Event::event_id).unwrap_or(<&EventId>::try_from("$unknown").unwrap()).as_str(),
power_levels = power_levels_event.as_ref().map(Event::event_id).unwrap_or(<&EventId>::try_from("$unknown").unwrap()).as_str(),
member_event = sender_member_event.as_ref().map(Event::event_id).unwrap_or(<&EventId>::try_from("$unknown").unwrap()).as_str(),
"no m.room.create event found for {} ({})!",
incoming_event.event_id().as_str(),
incoming_event.room_id().as_str()
);
return Ok(false);
},
| Some(e) => e,
};
// just re-check 1.2 to work around a bug
let Some(room_id_server_name) = incoming_event.room_id().server_name() else {
warn!("room ID has no servername");
return Ok(false);
};
if room_id_server_name != room_create_event.sender().server_name() {
warn!(
"servername of room ID origin ({}) does not match servername of m.room.create \
sender ({})",
room_id_server_name,
room_create_event.sender().server_name()
);
return Ok(false);
}
// 3. If event does not have m.room.create in auth_events reject
if !incoming_event

View file

@ -609,7 +609,7 @@ where
let fetch_state = |ty: &StateEventType, key: &str| {
future::ready(auth_state.get(&ty.with_state_key(key)))
};
debug!("running auth check on {:?}", event.event_id());
let auth_result =
auth_check(room_version, &event, current_third_party.as_ref(), fetch_state).await;
@ -726,8 +726,12 @@ where
Fut: Future<Output = Option<E>> + Send,
E: Event + Send + Sync,
{
let mut room_id = None;
while let Some(sort_ev) = event {
debug!(event_id = sort_ev.event_id().as_str(), "mainline");
trace!(event_id = sort_ev.event_id().as_str(), "mainline");
if room_id.is_none() {
room_id = Some(sort_ev.room_id().to_owned());
}
let id = sort_ev.event_id();
if let Some(depth) = mainline_map.get(id) {
@ -746,7 +750,7 @@ where
}
}
}
// Did not find a power level event so we default to zero
warn!("could not find a power event in the mainline map for {room_id:?}, defaulting to zero depth");
Ok(0)
}

View file

@ -29,7 +29,7 @@ fn descriptor_cf_options(
set_table_options(&mut opts, &desc, cache)?;
opts.set_min_write_buffer_number(1);
opts.set_max_write_buffer_number(2);
opts.set_max_write_buffer_number(3);
opts.set_write_buffer_size(desc.write_size);
opts.set_target_file_size_base(desc.file_size);

View file

@ -1,8 +1,6 @@
use std::{cmp, convert::TryFrom};
use conduwuit::{Config, Result, utils};
use conduwuit::{Config, Result};
use rocksdb::{Cache, DBRecoveryMode, Env, LogLevel, Options, statistics::StatsLevel};
use conduwuit::config::{parallelism_scaled_i32, parallelism_scaled_u32};
use super::{cf_opts::cache_size_f64, logger::handle as handle_log};
/// Create database-wide options suitable for opening the database. This also
@ -23,8 +21,8 @@ pub(crate) fn db_options(config: &Config, env: &Env, row_cache: &Cache) -> Resul
set_logging_defaults(&mut opts, config);
// Processing
opts.set_max_background_jobs(num_threads::<i32>(config)?);
opts.set_max_subcompactions(num_threads::<u32>(config)?);
opts.set_max_background_jobs(parallelism_scaled_i32(1));
opts.set_max_subcompactions(parallelism_scaled_u32(1));
opts.set_avoid_unnecessary_blocking_io(true);
opts.set_max_file_opening_threads(0);
@ -126,15 +124,3 @@ fn set_logging_defaults(opts: &mut Options, config: &Config) {
opts.set_callback_logger(rocksdb_log_level, &handle_log);
}
}
fn num_threads<T: TryFrom<usize>>(config: &Config) -> Result<T> {
const MIN_PARALLELISM: usize = 2;
let requested = if config.rocksdb_parallelism_threads != 0 {
config.rocksdb_parallelism_threads
} else {
utils::available_parallelism()
};
utils::math::try_into::<T, usize>(cmp::max(MIN_PARALLELISM, requested))
}

View file

@ -4,7 +4,6 @@ mod execute;
mod grant;
use std::{
future::Future,
pin::Pin,
sync::{Arc, RwLock as StdRwLock, Weak},
};
@ -14,7 +13,7 @@ use conduwuit::{
Error, PduEvent, Result, Server, debug, err, error, error::default_log, pdu::PduBuilder,
};
pub use create::create_admin_room;
use futures::{FutureExt, TryFutureExt};
use futures::{Future, FutureExt, TryFutureExt};
use loole::{Receiver, Sender};
use ruma::{
OwnedEventId, OwnedRoomId, RoomId, UserId,

View file

@ -306,14 +306,12 @@ impl super::Service {
#[tracing::instrument(name = "srv", level = "debug", skip(self))]
async fn query_srv_record(&self, hostname: &'_ str) -> Result<Option<FedDest>> {
let hostnames =
[format!("_matrix-fed._tcp.{hostname}."), format!("_matrix._tcp.{hostname}.")];
for hostname in hostnames {
self.services.server.check_running()?;
debug!("querying SRV for {hostname:?}");
let hostname = hostname.trim_end_matches('.');
let hostname_suffix = format!("_matrix-fed._tcp.{hostname}.");
let hostname = hostname_suffix.trim_end_matches('.');
match self.resolver.resolver.srv_lookup(hostname).await {
| Err(e) => Self::handle_resolve_error(&e, hostname)?,
| Ok(result) => {
@ -328,7 +326,6 @@ impl super::Service {
}));
},
}
}
Ok(None)
}

View file

@ -76,7 +76,7 @@ pub(super) async fn handle_outlier_pdu<'a>(
// 5. Reject "due to auth events" if can't get all the auth events or some of
// the auth events are also rejected "due to auth events"
// NOTE: Step 5 is not applied anymore because it failed too often
debug!("Fetching auth events");
debug!("Fetching auth events for {}", incoming_pdu.event_id);
Box::pin(self.fetch_and_handle_outliers(
origin,
&incoming_pdu.auth_events,
@ -88,12 +88,12 @@ pub(super) async fn handle_outlier_pdu<'a>(
// 6. Reject "due to auth events" if the event doesn't pass auth based on the
// auth events
debug!("Checking based on auth events");
debug!("Checking {} based on auth events", incoming_pdu.event_id);
// Build map of auth events
let mut auth_events = HashMap::with_capacity(incoming_pdu.auth_events.len());
for id in &incoming_pdu.auth_events {
let Ok(auth_event) = self.services.timeline.get_pdu(id).await else {
warn!("Could not find auth event {id}");
warn!("Could not find auth event {id} for {}", incoming_pdu.event_id);
continue;
};
@ -119,10 +119,7 @@ pub(super) async fn handle_outlier_pdu<'a>(
}
// The original create event must be in the auth events
if !matches!(
auth_events.get(&(StateEventType::RoomCreate, String::new().into())),
Some(_) | None
) {
if !auth_events.contains_key(&(StateEventType::RoomCreate, String::new().into())) {
return Err!(Request(InvalidParam("Incoming event refers to wrong create event.")));
}
@ -131,6 +128,7 @@ pub(super) async fn handle_outlier_pdu<'a>(
ready(auth_events.get(&key))
};
debug!("running auth check to handle outlier pdu {:?}", incoming_pdu.event_id);
let auth_check = state_res::event_auth::auth_check(
&to_room_version(&room_version_id),
&incoming_pdu,

View file

@ -1,12 +1,6 @@
use std::{borrow::Borrow, collections::BTreeMap, iter::once, sync::Arc, time::Instant};
use conduwuit::{
Err, Result, debug, debug_info, err, implement,
matrix::{EventTypeExt, PduEvent, StateKey, state_res},
trace,
utils::stream::{BroadbandExt, ReadyExt},
warn,
};
use conduwuit::{Err, Result, debug, debug_info, err, implement, matrix::{EventTypeExt, PduEvent, StateKey, state_res}, trace, utils::stream::{BroadbandExt, ReadyExt}, warn, info};
use futures::{FutureExt, StreamExt, future::ready};
use ruma::{CanonicalJsonValue, RoomId, ServerName, events::StateEventType};
@ -44,7 +38,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
return Err!(Request(InvalidParam("Event has been soft failed")));
}
debug!("Upgrading to timeline pdu");
debug!("Upgrading pdu {} from outlier to timeline pdu", incoming_pdu.event_id);
let timer = Instant::now();
let room_version_id = get_room_version_id(create_event)?;
@ -52,7 +46,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
// backwards extremities doing all the checks in this list starting at 1.
// These are not timeline events.
debug!("Resolving state at event");
debug!("Resolving state at event {}", incoming_pdu.event_id);
let mut state_at_incoming_event = if incoming_pdu.prev_events.len() == 1 {
self.state_at_incoming_degree_one(&incoming_pdu).await?
} else {
@ -70,7 +64,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
state_at_incoming_event.expect("we always set this to some above");
let room_version = to_room_version(&room_version_id);
debug!("Performing auth check");
debug!("Performing auth check to upgrade {}", incoming_pdu.event_id);
// 11. Check the auth of the event passes based on the state of the event
let state_fetch_state = &state_at_incoming_event;
let state_fetch = |k: StateEventType, s: StateKey| async move {
@ -80,6 +74,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
self.services.timeline.get_pdu(event_id).await.ok()
};
debug!("running auth check on {}", incoming_pdu.event_id);
let auth_check = state_res::event_auth::auth_check(
&room_version,
&incoming_pdu,
@ -93,7 +88,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
return Err!(Request(Forbidden("Event has failed auth check with state at the event.")));
}
debug!("Gathering auth events");
debug!("Gathering auth events for {}", incoming_pdu.event_id);
let auth_events = self
.services
.state
@ -111,6 +106,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
ready(auth_events.get(&key).cloned())
};
debug!("running auth check on {} with claimed state auth", incoming_pdu.event_id);
let auth_check = state_res::event_auth::auth_check(
&room_version,
&incoming_pdu,
@ -121,7 +117,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
.map_err(|e| err!(Request(Forbidden("Auth check failed: {e:?}"))))?;
// Soft fail check before doing state res
debug!("Performing soft-fail check");
debug!("Performing soft-fail check on {}", incoming_pdu.event_id);
let soft_fail = match (auth_check, incoming_pdu.redacts_id(&room_version_id)) {
| (false, _) => true,
| (true, None) => false,
@ -145,7 +141,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
let extremities: Vec<_> = self
.services
.state
.get_forward_extremities(room_id)
.get_forward_extremities(room_id, &state_lock)
.map(ToOwned::to_owned)
.ready_filter(|event_id| {
// Remove any that are referenced by this incoming event's prev_events
@ -163,6 +159,8 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
.collect()
.await;
if extremities.len() == 0 { info!("Retained zero extremities when upgrading outlier PDU to timeline PDU with {} previous events", incoming_pdu.prev_events.len()) }
debug!(
"Retained {} extremities checked against {} prev_events",
extremities.len(),
@ -218,7 +216,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
// 14. Check if the event passes auth based on the "current state" of the room,
// if not soft fail it
if soft_fail {
debug!("Soft failing event");
info!("Soft failing event {}", incoming_pdu.event_id);
let extremities = extremities.iter().map(Borrow::borrow);
self.services

View file

@ -0,0 +1,765 @@
use conduwuit::{Event, PduEvent, Result, err};
use ruma::{
EventId, RoomId, UserId,
api::Direction,
events::relation::{BundledMessageLikeRelations, BundledReference, ReferenceChunk},
};
use super::PdusIterItem;
const MAX_BUNDLED_RELATIONS: usize = 50;
impl super::Service {
/// Gets bundled aggregations for an event according to the Matrix
/// specification.
/// - m.replace relations are bundled to include the most recent replacement
/// event.
/// - m.reference relations are bundled to include a chunk of event IDs.
#[tracing::instrument(skip(self), level = "debug")]
pub async fn get_bundled_aggregations(
&self,
user_id: &UserId,
room_id: &RoomId,
event_id: &EventId,
) -> Result<Option<BundledMessageLikeRelations<Box<serde_json::value::RawValue>>>> {
let relations = self
.get_relations(
user_id,
room_id,
event_id,
conduwuit::PduCount::max(),
MAX_BUNDLED_RELATIONS,
0,
Direction::Backward,
)
.await;
// The relations database code still handles the basic unsigned data
// We don't want to recursively fetch relations
// TODO: Event visibility check
// TODO: ignored users?
if relations.is_empty() {
return Ok(None);
}
// Get the original event for validation of replacement events
let original_event = self.services.timeline.get_pdu(event_id).await?;
let mut replace_events = Vec::with_capacity(relations.len());
let mut reference_events = Vec::with_capacity(relations.len());
for relation in &relations {
let pdu = &relation.1;
let content = pdu.get_content_as_value();
if let Some(relates_to) = content.get("m.relates_to") {
// We don't check that the event relates back, because we assume the database is
// good.
if let Some(rel_type) = relates_to.get("rel_type") {
match rel_type.as_str() {
| Some("m.replace") => {
// Only consider valid replacements
if Self::is_valid_replacement_event(&original_event, pdu).await? {
replace_events.push(relation);
}
},
| Some("m.reference") => {
reference_events.push(relation);
},
| _ => {
// Ignore other relation types for now
// Threads are in the database but not handled here
// Other types are not specified AFAICT.
},
}
}
}
}
// If no relations to bundle, return None
if replace_events.is_empty() && reference_events.is_empty() {
return Ok(None);
}
let mut bundled = BundledMessageLikeRelations::new();
// Handle m.replace relations - find the most recent one
if !replace_events.is_empty() {
let most_recent_replacement = Self::find_most_recent_replacement(&replace_events)?;
// Convert the replacement event to the bundled format
if let Some(replacement_pdu) = most_recent_replacement {
// According to the Matrix spec, we should include the full event as raw JSON
let replacement_json = serde_json::to_string(replacement_pdu)
.map_err(|e| err!(Database("Failed to serialize replacement event: {e}")))?;
let raw_value = serde_json::value::RawValue::from_string(replacement_json)
.map_err(|e| err!(Database("Failed to create RawValue: {e}")))?;
bundled.replace = Some(Box::new(raw_value));
}
}
// Handle m.reference relations - collect event IDs
if !reference_events.is_empty() {
let reference_chunk = Self::build_reference_chunk(&reference_events)?;
if !reference_chunk.is_empty() {
bundled.reference = Some(Box::new(ReferenceChunk::new(reference_chunk)));
}
}
// TODO: Handle other relation types (m.annotation, etc.) when specified
Ok(Some(bundled))
}
/// Build reference chunk for m.reference bundled aggregations
fn build_reference_chunk(
reference_events: &[&PdusIterItem],
) -> Result<Vec<BundledReference>> {
let mut chunk = Vec::with_capacity(reference_events.len());
for relation in reference_events {
let pdu = &relation.1;
let reference_entry = BundledReference::new(pdu.event_id().to_owned());
chunk.push(reference_entry);
}
// Don't sort, order is unspecified
Ok(chunk)
}
/// Find the most recent replacement event based on origin_server_ts and
/// lexicographic event_id ordering
fn find_most_recent_replacement<'a>(
replacement_events: &'a [&'a PdusIterItem],
) -> Result<Option<&'a PduEvent>> {
if replacement_events.is_empty() {
return Ok(None);
}
let mut most_recent: Option<&PduEvent> = None;
// Jank, is there a better way to do this?
for relation in replacement_events {
let pdu = &relation.1;
match most_recent {
| None => {
most_recent = Some(pdu);
},
| Some(current_most_recent) => {
// Compare by origin_server_ts first
match pdu
.origin_server_ts()
.cmp(&current_most_recent.origin_server_ts())
{
| std::cmp::Ordering::Greater => {
most_recent = Some(pdu);
},
| std::cmp::Ordering::Equal => {
// If timestamps are equal, use lexicographic ordering of event_id
if pdu.event_id() > current_most_recent.event_id() {
most_recent = Some(pdu);
}
},
| std::cmp::Ordering::Less => {
// Keep current most recent
},
}
},
}
}
Ok(most_recent)
}
/// Adds bundled aggregations to a PDU's unsigned field
#[tracing::instrument(skip(self, pdu), level = "debug")]
pub async fn add_bundled_aggregations_to_pdu(
&self,
user_id: &UserId,
pdu: &mut PduEvent,
) -> Result<()> {
if pdu.is_redacted() {
return Ok(());
}
let bundled_aggregations = self
.get_bundled_aggregations(user_id, pdu.room_id(), pdu.event_id())
.await?;
if let Some(aggregations) = bundled_aggregations {
let aggregations_json = serde_json::to_value(aggregations)
.map_err(|e| err!(Database("Failed to serialize bundled aggregations: {e}")))?;
Self::add_bundled_aggregations_to_unsigned(pdu, aggregations_json)?;
}
Ok(())
}
/// Helper method to add bundled aggregations to a PDU's unsigned
/// field
fn add_bundled_aggregations_to_unsigned(
pdu: &mut PduEvent,
aggregations_json: serde_json::Value,
) -> Result<()> {
use serde_json::{
Map, Value as JsonValue,
value::{RawValue as RawJsonValue, to_raw_value},
};
let mut unsigned: Map<String, JsonValue> = pdu
.unsigned
.as_deref()
.map(RawJsonValue::get)
.map_or_else(|| Ok(Map::new()), serde_json::from_str)
.map_err(|e| err!(Database("Invalid unsigned in pdu event: {e}")))?;
let relations = unsigned
.entry("m.relations")
.or_insert_with(|| JsonValue::Object(Map::new()))
.as_object_mut()
.ok_or_else(|| err!(Database("m.relations is not an object")))?;
if let JsonValue::Object(aggregations_map) = aggregations_json {
for (rel_type, aggregation) in aggregations_map {
relations.insert(rel_type, aggregation);
}
}
pdu.unsigned = Some(to_raw_value(&unsigned)?);
Ok(())
}
/// Validates that an event is acceptable as a replacement for another event
/// See C/S spec "Validity of replacement events"
#[tracing::instrument(level = "debug")]
async fn is_valid_replacement_event(
original_event: &PduEvent,
replacement_event: &PduEvent,
) -> Result<bool> {
// 1. Same room_id
if original_event.room_id() != replacement_event.room_id() {
return Ok(false);
}
// 2. Same sender
if original_event.sender() != replacement_event.sender() {
return Ok(false);
}
// 3. Same type
if original_event.event_type() != replacement_event.event_type() {
return Ok(false);
}
// 4. Neither event should have a state_key property
if original_event.state_key().is_some() || replacement_event.state_key().is_some() {
return Ok(false);
}
// 5. Original event must not have rel_type of m.replace
let original_content = original_event.get_content_as_value();
if let Some(relates_to) = original_content.get("m.relates_to") {
if let Some(rel_type) = relates_to.get("rel_type") {
if rel_type.as_str() == Some("m.replace") {
return Ok(false);
}
}
}
// 6. Replacement event must have m.new_content property
// Skip this check for encrypted events, as m.new_content would be inside the
// encrypted payload
if replacement_event.event_type() != &ruma::events::TimelineEventType::RoomEncrypted {
let replacement_content = replacement_event.get_content_as_value();
if replacement_content.get("m.new_content").is_none() {
return Ok(false);
}
}
Ok(true)
}
}
#[cfg(test)]
mod tests {
use conduwuit_core::pdu::{EventHash, PduEvent};
use ruma::{UInt, events::TimelineEventType, owned_event_id, owned_room_id, owned_user_id};
use serde_json::{Value as JsonValue, json, value::to_raw_value};
fn create_test_pdu(unsigned_content: Option<JsonValue>) -> PduEvent {
PduEvent {
event_id: owned_event_id!("$test:example.com"),
room_id: owned_room_id!("!test:example.com"),
sender: owned_user_id!("@test:example.com"),
origin_server_ts: UInt::try_from(1_234_567_890_u64).unwrap(),
kind: TimelineEventType::RoomMessage,
content: to_raw_value(&json!({"msgtype": "m.text", "body": "test"})).unwrap(),
state_key: None,
prev_events: vec![],
depth: UInt::from(1_u32),
auth_events: vec![],
redacts: None,
unsigned: unsigned_content.map(|content| to_raw_value(&content).unwrap()),
hashes: EventHash { sha256: "test_hash".to_owned() },
signatures: None,
origin: None,
}
}
fn create_bundled_aggregations() -> JsonValue {
json!({
"m.replace": {
"event_id": "$replace:example.com",
"origin_server_ts": 1_234_567_890,
"sender": "@replacer:example.com"
},
"m.reference": {
"count": 5,
"chunk": [
"$ref1:example.com",
"$ref2:example.com"
]
}
})
}
#[test]
fn test_add_bundled_aggregations_to_unsigned_no_existing_unsigned() {
let mut pdu = create_test_pdu(None);
let aggregations = create_bundled_aggregations();
let result = super::super::Service::add_bundled_aggregations_to_unsigned(
&mut pdu,
aggregations.clone(),
);
assert!(result.is_ok(), "Should succeed when no unsigned field exists");
assert!(pdu.unsigned.is_some(), "Unsigned field should be created");
let unsigned_str = pdu.unsigned.as_ref().unwrap().get();
let unsigned: JsonValue = serde_json::from_str(unsigned_str).unwrap();
assert!(unsigned.get("m.relations").is_some(), "m.relations should exist");
assert_eq!(
unsigned["m.relations"], aggregations,
"Relations should match the aggregations"
);
}
#[test]
fn test_add_bundled_aggregations_to_unsigned_overwrite_same_relation_type() {
let existing_unsigned = json!({
"m.relations": {
"m.replace": {
"event_id": "$old_replace:example.com",
"origin_server_ts": 1_111_111_111,
"sender": "@old_replacer:example.com"
}
}
});
let mut pdu = create_test_pdu(Some(existing_unsigned));
let new_aggregations = create_bundled_aggregations();
let result = super::super::Service::add_bundled_aggregations_to_unsigned(
&mut pdu,
new_aggregations.clone(),
);
assert!(result.is_ok(), "Should succeed when overwriting same relation type");
let unsigned_str = pdu.unsigned.as_ref().unwrap().get();
let unsigned: JsonValue = serde_json::from_str(unsigned_str).unwrap();
let relations = &unsigned["m.relations"];
assert_eq!(
relations["m.replace"], new_aggregations["m.replace"],
"m.replace should be updated"
);
assert_eq!(
relations["m.replace"]["event_id"], "$replace:example.com",
"Should have new event_id"
);
assert!(relations.get("m.reference").is_some(), "New m.reference should be added");
}
#[test]
fn test_add_bundled_aggregations_to_unsigned_preserve_other_unsigned_fields() {
// Test case: Other unsigned fields should be preserved
let existing_unsigned = json!({
"age": 98765,
"prev_content": {"msgtype": "m.text", "body": "old message"},
"redacted_because": {"event_id": "$redaction:example.com"},
"m.relations": {
"m.annotation": {"count": 1}
}
});
let mut pdu = create_test_pdu(Some(existing_unsigned));
let new_aggregations = json!({
"m.replace": {"event_id": "$new:example.com"}
});
let result = super::super::Service::add_bundled_aggregations_to_unsigned(
&mut pdu,
new_aggregations,
);
assert!(result.is_ok(), "Should succeed while preserving other fields");
let unsigned_str = pdu.unsigned.as_ref().unwrap().get();
let unsigned: JsonValue = serde_json::from_str(unsigned_str).unwrap();
// Verify all existing fields are preserved
assert_eq!(unsigned["age"], 98765, "age should be preserved");
assert!(unsigned.get("prev_content").is_some(), "prev_content should be preserved");
assert!(
unsigned.get("redacted_because").is_some(),
"redacted_because should be preserved"
);
// Verify relations were merged correctly
let relations = &unsigned["m.relations"];
assert!(
relations.get("m.annotation").is_some(),
"Existing m.annotation should be preserved"
);
assert!(relations.get("m.replace").is_some(), "New m.replace should be added");
}
#[test]
fn test_add_bundled_aggregations_to_unsigned_invalid_existing_unsigned() {
// Test case: Invalid JSON in existing unsigned should result in error
let mut pdu = create_test_pdu(None);
// Manually set invalid unsigned data
pdu.unsigned = Some(to_raw_value(&"invalid json").unwrap());
let aggregations = create_bundled_aggregations();
let result =
super::super::Service::add_bundled_aggregations_to_unsigned(&mut pdu, aggregations);
assert!(result.is_err(), "fails when existing unsigned is invalid");
// Should we ignore the error and overwrite anyway?
}
// Test helper function to create test PDU events
fn create_test_event(
event_id: &str,
room_id: &str,
sender: &str,
event_type: TimelineEventType,
content: &JsonValue,
state_key: Option<&str>,
) -> PduEvent {
PduEvent {
event_id: event_id.try_into().unwrap(),
room_id: room_id.try_into().unwrap(),
sender: sender.try_into().unwrap(),
origin_server_ts: UInt::try_from(1_234_567_890_u64).unwrap(),
kind: event_type,
content: to_raw_value(&content).unwrap(),
state_key: state_key.map(Into::into),
prev_events: vec![],
depth: UInt::from(1_u32),
auth_events: vec![],
redacts: None,
unsigned: None,
hashes: EventHash { sha256: "test_hash".to_owned() },
signatures: None,
origin: None,
}
}
/// Test that a valid replacement event passes validation
#[tokio::test]
async fn test_valid_replacement_event() {
let original = create_test_event(
"$original:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomMessage,
&json!({"msgtype": "m.text", "body": "original message"}),
None,
);
let replacement = create_test_event(
"$replacement:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomMessage,
&json!({
"msgtype": "m.text",
"body": "* edited message",
"m.new_content": {
"msgtype": "m.text",
"body": "edited message"
},
"m.relates_to": {
"rel_type": "m.replace",
"event_id": "$original:example.com"
}
}),
None,
);
let result =
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
assert!(result.is_ok(), "Validation should succeed");
assert!(result.unwrap(), "Valid replacement event should be accepted");
}
/// Test replacement event with different room ID is rejected
#[tokio::test]
async fn test_replacement_event_different_room() {
let original = create_test_event(
"$original:example.com",
"!room1:example.com",
"@user:example.com",
TimelineEventType::RoomMessage,
&json!({"msgtype": "m.text", "body": "original message"}),
None,
);
let replacement = create_test_event(
"$replacement:example.com",
"!room2:example.com", // Different room
"@user:example.com",
TimelineEventType::RoomMessage,
&json!({
"msgtype": "m.text",
"body": "* edited message",
"m.new_content": {
"msgtype": "m.text",
"body": "edited message"
}
}),
None,
);
let result =
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
assert!(result.is_ok(), "Validation should succeed");
assert!(!result.unwrap(), "Different room ID should be rejected");
}
/// Test replacement event with different sender is rejected
#[tokio::test]
async fn test_replacement_event_different_sender() {
let original = create_test_event(
"$original:example.com",
"!room:example.com",
"@user1:example.com",
TimelineEventType::RoomMessage,
&json!({"msgtype": "m.text", "body": "original message"}),
None,
);
let replacement = create_test_event(
"$replacement:example.com",
"!room:example.com",
"@user2:example.com", // Different sender
TimelineEventType::RoomMessage,
&json!({
"msgtype": "m.text",
"body": "* edited message",
"m.new_content": {
"msgtype": "m.text",
"body": "edited message"
}
}),
None,
);
let result =
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
assert!(result.is_ok(), "Validation should succeed");
assert!(!result.unwrap(), "Different sender should be rejected");
}
/// Test replacement event with different type is rejected
#[tokio::test]
async fn test_replacement_event_different_type() {
let original = create_test_event(
"$original:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomMessage,
&json!({"msgtype": "m.text", "body": "original message"}),
None,
);
let replacement = create_test_event(
"$replacement:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomTopic, // Different event type
&json!({
"topic": "new topic",
"m.new_content": {
"topic": "new topic"
}
}),
None,
);
let result =
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
assert!(result.is_ok(), "Validation should succeed");
assert!(!result.unwrap(), "Different event type should be rejected");
}
/// Test replacement event with state key is rejected
#[tokio::test]
async fn test_replacement_event_with_state_key() {
let original = create_test_event(
"$original:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomName,
&json!({"name": "room name"}),
Some(""), // Has state key
);
let replacement = create_test_event(
"$replacement:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomName,
&json!({
"name": "new room name",
"m.new_content": {
"name": "new room name"
}
}),
None,
);
let result =
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
assert!(result.is_ok(), "Validation should succeed");
assert!(!result.unwrap(), "Event with state key should be rejected");
}
/// Test replacement of an event that is already a replacement is rejected
#[tokio::test]
async fn test_replacement_event_original_is_replacement() {
let original = create_test_event(
"$original:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomMessage,
&json!({
"msgtype": "m.text",
"body": "* edited message",
"m.relates_to": {
"rel_type": "m.replace", // Original is already a replacement
"event_id": "$some_other:example.com"
}
}),
None,
);
let replacement = create_test_event(
"$replacement:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomMessage,
&json!({
"msgtype": "m.text",
"body": "* edited again",
"m.new_content": {
"msgtype": "m.text",
"body": "edited again"
}
}),
None,
);
let result =
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
assert!(result.is_ok(), "Validation should succeed");
assert!(!result.unwrap(), "Replacement of replacement should be rejected");
}
/// Test replacement event missing m.new_content is rejected
#[tokio::test]
async fn test_replacement_event_missing_new_content() {
let original = create_test_event(
"$original:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomMessage,
&json!({"msgtype": "m.text", "body": "original message"}),
None,
);
let replacement = create_test_event(
"$replacement:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomMessage,
&json!({
"msgtype": "m.text",
"body": "* edited message"
// Missing m.new_content
}),
None,
);
let result =
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
assert!(result.is_ok(), "Validation should succeed");
assert!(!result.unwrap(), "Missing m.new_content should be rejected");
}
/// Test encrypted replacement event without m.new_content is accepted
#[tokio::test]
async fn test_replacement_event_encrypted_missing_new_content_is_valid() {
let original = create_test_event(
"$original:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomEncrypted,
&json!({
"algorithm": "m.megolm.v1.aes-sha2",
"ciphertext": "encrypted_payload_base64",
"sender_key": "sender_key",
"session_id": "session_id"
}),
None,
);
let replacement = create_test_event(
"$replacement:example.com",
"!room:example.com",
"@user:example.com",
TimelineEventType::RoomEncrypted,
&json!({
"algorithm": "m.megolm.v1.aes-sha2",
"ciphertext": "encrypted_replacement_payload_base64",
"sender_key": "sender_key",
"session_id": "session_id",
"m.relates_to": {
"rel_type": "m.replace",
"event_id": "$original:example.com"
}
// No m.new_content in cleartext - this is valid for encrypted events
}),
None,
);
let result =
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
assert!(result.is_ok(), "Validation should succeed");
assert!(
result.unwrap(),
"Encrypted replacement without cleartext m.new_content should be accepted"
);
}
}

View file

@ -3,7 +3,6 @@ use std::{mem::size_of, sync::Arc};
use conduwuit::{
PduCount, PduEvent,
arrayvec::ArrayVec,
result::LogErr,
utils::{
ReadyExt,
stream::{TryIgnore, WidebandExt},
@ -80,9 +79,7 @@ impl Data {
let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?;
if pdu.sender != user_id {
pdu.remove_transaction_id().log_err().ok();
}
pdu.set_unsigned(Some(user_id));
Some((shorteventid, pdu))
})

View file

@ -1,3 +1,4 @@
mod bundled_aggregations;
mod data;
use std::sync::Arc;

View file

@ -127,7 +127,12 @@ pub async fn search_pdus<'a>(
.then_some(pdu)
})
.skip(query.skip)
.take(query.limit);
.take(query.limit)
.map(move |mut pdu| {
pdu.set_unsigned(query.user_id);
// TODO: bundled aggregation
pdu
});
Ok((count, pdus))
}

View file

@ -384,6 +384,7 @@ impl Service {
pub fn get_forward_extremities<'a>(
&'a self,
room_id: &'a RoomId,
_state_lock: &'a RoomMutexGuard,
) -> impl Stream<Item = &EventId> + Send + '_ {
let prefix = (room_id, Interfix);

View file

@ -160,9 +160,7 @@ impl Service {
let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?;
let pdu_id: PduId = pdu_id.into();
if pdu.sender != user_id {
pdu.remove_transaction_id().ok();
}
pdu.set_unsigned(Some(user_id));
Some((pdu_id.shorteventid, pdu))
});

View file

@ -1,14 +1,11 @@
use std::{borrow::Borrow, sync::Arc};
use std::sync::Arc;
use conduwuit::{
Err, PduCount, PduEvent, Result, at, err,
result::{LogErr, NotFound},
utils,
utils::stream::TryReadyExt,
Err, PduCount, PduEvent, Result, at, err, result::NotFound, utils, utils::stream::TryReadyExt,
};
use database::{Database, Deserialized, Json, KeyVal, Map};
use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, future::select_ok, pin_mut};
use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, UserId, api::Direction};
use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, api::Direction};
use super::{PduId, RawPduId};
use crate::{Dep, rooms, rooms::short::ShortRoomId};
@ -46,12 +43,8 @@ impl Data {
}
#[inline]
pub(super) async fn last_timeline_count(
&self,
sender_user: Option<&UserId>,
room_id: &RoomId,
) -> Result<PduCount> {
let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max());
pub(super) async fn last_timeline_count(&self, room_id: &RoomId) -> Result<PduCount> {
let pdus_rev = self.pdus_rev(room_id, PduCount::max());
pin_mut!(pdus_rev);
let last_count = pdus_rev
@ -65,12 +58,8 @@ impl Data {
}
#[inline]
pub(super) async fn latest_pdu_in_room(
&self,
sender_user: Option<&UserId>,
room_id: &RoomId,
) -> Result<PduEvent> {
let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max());
pub(super) async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result<PduEvent> {
let pdus_rev = self.pdus_rev(room_id, PduCount::max());
pin_mut!(pdus_rev);
pdus_rev
@ -223,7 +212,6 @@ impl Data {
/// order.
pub(super) fn pdus_rev<'a>(
&'a self,
user_id: Option<&'a UserId>,
room_id: &'a RoomId,
until: PduCount,
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
@ -233,14 +221,13 @@ impl Data {
self.pduid_pdu
.rev_raw_stream_from(&current)
.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
.ready_and_then(move |item| Self::each_pdu(item, user_id))
.ready_and_then(Self::from_json_slice)
})
.try_flatten_stream()
}
pub(super) fn pdus<'a>(
&'a self,
user_id: Option<&'a UserId>,
room_id: &'a RoomId,
from: PduCount,
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
@ -250,21 +237,15 @@ impl Data {
self.pduid_pdu
.raw_stream_from(&current)
.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
.ready_and_then(move |item| Self::each_pdu(item, user_id))
.ready_and_then(Self::from_json_slice)
})
.try_flatten_stream()
}
fn each_pdu((pdu_id, pdu): KeyVal<'_>, user_id: Option<&UserId>) -> Result<PdusIterItem> {
fn from_json_slice((pdu_id, pdu): KeyVal<'_>) -> Result<PdusIterItem> {
let pdu_id: RawPduId = pdu_id.into();
let mut pdu = serde_json::from_slice::<PduEvent>(pdu)?;
if Some(pdu.sender.borrow()) != user_id {
pdu.remove_transaction_id().log_err().ok();
}
pdu.add_age().log_err().ok();
let pdu = serde_json::from_slice::<PduEvent>(pdu)?;
Ok((pdu_id.pdu_count(), pdu))
}

View file

@ -165,7 +165,7 @@ impl Service {
#[tracing::instrument(skip(self), level = "debug")]
pub async fn first_item_in_room(&self, room_id: &RoomId) -> Result<(PduCount, PduEvent)> {
let pdus = self.pdus(None, room_id, None);
let pdus = self.pdus(room_id, None);
pin_mut!(pdus);
pdus.try_next()
@ -175,16 +175,12 @@ impl Service {
#[tracing::instrument(skip(self), level = "debug")]
pub async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result<PduEvent> {
self.db.latest_pdu_in_room(None, room_id).await
self.db.latest_pdu_in_room(room_id).await
}
#[tracing::instrument(skip(self), level = "debug")]
pub async fn last_timeline_count(
&self,
sender_user: Option<&UserId>,
room_id: &RoomId,
) -> Result<PduCount> {
self.db.last_timeline_count(sender_user, room_id).await
pub async fn last_timeline_count(&self, room_id: &RoomId) -> Result<PduCount> {
self.db.last_timeline_count(room_id).await
}
/// Returns the `count` of this pdu's id.
@ -545,6 +541,10 @@ impl Service {
| _ => {},
}
// CONCERN: If we receive events with a relation out-of-order, we never write
// their relation / thread. We need some kind of way to trigger when we receive
// this event, and potentially a way to rebuild the table entirely.
if let Ok(content) = pdu.get_content::<ExtractRelatesToEventId>() {
if let Ok(related_pducount) = self.get_pdu_count(&content.relates_to.event_id).await {
self.services
@ -654,7 +654,7 @@ impl Service {
let prev_events: Vec<OwnedEventId> = self
.services
.state
.get_forward_extremities(room_id)
.get_forward_extremities(room_id, _mutex_lock)
.take(20)
.map(Into::into)
.collect()
@ -698,6 +698,20 @@ impl Service {
.await
.saturating_add(uint!(1));
if state_key.is_none() {
if prev_events.is_empty() {
warn!("Timeline event had zero prev_events, something broke.");
return Err!(Request(Unknown("Timeline event had zero prev_events.")));
}
if depth.le(&uint!(2)) {
warn!(
"Had unsafe depth of {depth} in {room_id} when creating non-state event. \
Bad!"
);
return Err!(Request(Unknown("Unsafe depth for non-state event.")));
}
};
let mut unsigned = unsigned.unwrap_or_default();
if let Some(state_key) = &state_key {
@ -757,6 +771,7 @@ impl Service {
ready(auth_events.get(&key))
};
debug!("running auth check on new {} event by {} in {}", pdu.kind, pdu.sender, pdu.room_id);
let auth_check = state_res::auth_check(
&room_version,
&pdu,
@ -996,34 +1011,30 @@ impl Service {
#[inline]
pub fn all_pdus<'a>(
&'a self,
user_id: &'a UserId,
room_id: &'a RoomId,
) -> impl Stream<Item = PdusIterItem> + Send + 'a {
self.pdus(Some(user_id), room_id, None).ignore_err()
self.pdus(room_id, None).ignore_err()
}
/// Reverse iteration starting at from.
#[tracing::instrument(skip(self), level = "debug")]
pub fn pdus_rev<'a>(
&'a self,
user_id: Option<&'a UserId>,
room_id: &'a RoomId,
until: Option<PduCount>,
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
self.db
.pdus_rev(user_id, room_id, until.unwrap_or_else(PduCount::max))
.pdus_rev(room_id, until.unwrap_or_else(PduCount::max))
}
/// Forward iteration starting at from.
#[tracing::instrument(skip(self), level = "debug")]
pub fn pdus<'a>(
&'a self,
user_id: Option<&'a UserId>,
room_id: &'a RoomId,
from: Option<PduCount>,
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
self.db
.pdus(user_id, room_id, from.unwrap_or_else(PduCount::min))
self.db.pdus(room_id, from.unwrap_or_else(PduCount::min))
}
/// Replace a PDU with the redacted form.
@ -1142,7 +1153,7 @@ impl Service {
.boxed();
while let Some(ref backfill_server) = servers.next().await {
info!("Asking {backfill_server} for backfill");
info!("Asking {backfill_server} for backfill in {:?}", room_id.to_owned());
let response = self
.services
.sending
@ -1170,7 +1181,7 @@ impl Service {
}
}
info!("No servers could backfill, but backfill was needed in room {room_id}");
warn!("No servers could backfill, but backfill was needed in room {room_id}");
Ok(())
}

View file

@ -401,16 +401,10 @@ impl Service {
fn num_senders(args: &crate::Args<'_>) -> usize {
const MIN_SENDERS: usize = 1;
// Limit the number of senders to the number of workers threads or number of
// cores, conservatively.
let max_senders = args
.server
.metrics
.num_workers()
.min(available_parallelism());
// Limit the maximum number of senders to the number of cores.
let max_senders = available_parallelism();
// If the user doesn't override the default 0, this is intended to then default
// to 1 for now as multiple senders is experimental.
// default is 4 senders. clamp between 1 and core count.
args.server
.config
.sender_workers

View file

@ -781,7 +781,7 @@ impl Service {
for pdu in pdus {
// Redacted events are not notification targets (we don't send push for them)
if pdu.contains_unsigned_property("redacted_because", serde_json::Value::is_string) {
if pdu.is_redacted() {
continue;
}