diff --git a/Cargo.lock b/Cargo.lock index 160be0c7..ec6e848d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -771,7 +771,7 @@ dependencies = [ [[package]] name = "conduwuit" -version = "0.5.0-rc.5" +version = "0.5.0-rc.6" dependencies = [ "clap", "conduwuit_admin", @@ -800,7 +800,7 @@ dependencies = [ [[package]] name = "conduwuit_admin" -version = "0.5.0-rc.5" +version = "0.5.0-rc.6" dependencies = [ "clap", "conduwuit_api", @@ -821,7 +821,7 @@ dependencies = [ [[package]] name = "conduwuit_api" -version = "0.5.0-rc.5" +version = "0.5.0-rc.6" dependencies = [ "async-trait", "axum", @@ -853,14 +853,14 @@ dependencies = [ [[package]] name = "conduwuit_build_metadata" -version = "0.5.0-rc.5" +version = "0.5.0-rc.6" dependencies = [ "built 0.8.0", ] [[package]] name = "conduwuit_core" -version = "0.5.0-rc.5" +version = "0.5.0-rc.6" dependencies = [ "argon2", "arrayvec", @@ -919,7 +919,7 @@ dependencies = [ [[package]] name = "conduwuit_database" -version = "0.5.0-rc.5" +version = "0.5.0-rc.6" dependencies = [ "async-channel", "conduwuit_core", @@ -937,7 +937,7 @@ dependencies = [ [[package]] name = "conduwuit_macros" -version = "0.5.0-rc.5" +version = "0.5.0-rc.6" dependencies = [ "itertools 0.14.0", "proc-macro2", @@ -947,7 +947,7 @@ dependencies = [ [[package]] name = "conduwuit_router" -version = "0.5.0-rc.5" +version = "0.5.0-rc.6" dependencies = [ "axum", "axum-client-ip", @@ -981,7 +981,7 @@ dependencies = [ [[package]] name = "conduwuit_service" -version = "0.5.0-rc.5" +version = "0.5.0-rc.6" dependencies = [ "async-trait", "base64 0.22.1", @@ -1018,7 +1018,7 @@ dependencies = [ [[package]] name = "conduwuit_web" -version = "0.5.0-rc.5" +version = "0.5.0-rc.6" dependencies = [ "askama", "axum", diff --git a/Cargo.toml b/Cargo.toml index 1abff107..af904447 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,7 @@ license = "Apache-2.0" readme = "README.md" repository = "https://forgejo.ellis.link/continuwuation/continuwuity" rust-version = "1.86.0" -version = "0.5.0-rc.5" +version = "0.5.0-rc.6" [workspace.metadata.crane] name = "conduwuit" diff --git a/conduwuit-example.toml b/conduwuit-example.toml index 1a8be2aa..287bf65f 100644 --- a/conduwuit-example.toml +++ b/conduwuit-example.toml @@ -990,7 +990,7 @@ # 3 to 5 = Statistics with possible performance impact. # 6 = All statistics. # -#rocksdb_stats_level = 1 +#rocksdb_stats_level = 3 # This is a password that can be configured that will let you login to the # server bot account (currently `@conduit`) for emergency troubleshooting @@ -1590,11 +1590,9 @@ #stream_amplification = 1024 # Number of sender task workers; determines sender parallelism. Default is -# '0' which means the value is determined internally, likely matching the -# number of tokio worker-threads or number of cores, etc. Override by -# setting a non-zero value. +# number of CPU cores. Override by setting a different value. # -#sender_workers = 0 +#sender_workers = 4 # Enables listener sockets; can be set to false to disable listening. This # option is intended for developer/diagnostic purposes only. diff --git a/docs/static/announcements.json b/docs/static/announcements.json index 9b97d091..7dd2fb72 100644 --- a/docs/static/announcements.json +++ b/docs/static/announcements.json @@ -4,6 +4,10 @@ { "id": 1, "message": "Welcome to Continuwuity! Important announcements about the project will appear here." + }, + { + "id": 2, + "message": "🎉 Continuwuity v0.5.0-rc.6 is now available! This release includes improved knock-restricted room handling, automatic support contact configuration, and a new HTML landing page. Check [the release notes for full details](https://forgejo.ellis.link/continuwuation/continuwuity/releases/tag/v0.5.0-rc.6) and upgrade instructions." } ] -} \ No newline at end of file +} diff --git a/docs/static/announcements.schema.json b/docs/static/announcements.schema.json index 95b1d153..cacd10c9 100644 --- a/docs/static/announcements.schema.json +++ b/docs/static/announcements.schema.json @@ -3,7 +3,7 @@ "$id": "https://continwuity.org/schema/announcements.schema.json", "type": "object", "properties": { - "updates": { + "announcements": { "type": "array", "items": { "type": "object", @@ -16,6 +16,10 @@ }, "date": { "type": "string" + }, + "mention_room": { + "type": "boolean", + "description": "Whether to mention the room (@room) when posting this announcement" } }, "required": [ @@ -26,6 +30,6 @@ } }, "required": [ - "updates" + "announcements" ] } \ No newline at end of file diff --git a/src/admin/query/room_timeline.rs b/src/admin/query/room_timeline.rs index 0fd22ca7..58f75cb9 100644 --- a/src/admin/query/room_timeline.rs +++ b/src/admin/query/room_timeline.rs @@ -31,7 +31,7 @@ pub(super) async fn last(&self, room_id: OwnedRoomOrAliasId) -> Result { .services .rooms .timeline - .last_timeline_count(None, &room_id) + .last_timeline_count(&room_id) .await?; self.write_str(&format!("{result:#?}")).await @@ -52,7 +52,7 @@ pub(super) async fn pdus( .services .rooms .timeline - .pdus_rev(None, &room_id, from) + .pdus_rev(&room_id, from) .try_take(limit.unwrap_or(3)) .try_collect() .await?; diff --git a/src/admin/room/moderation.rs b/src/admin/room/moderation.rs index ee429fc6..4c19da5c 100644 --- a/src/admin/room/moderation.rs +++ b/src/admin/room/moderation.rs @@ -6,6 +6,7 @@ use conduwuit::{ warn, }; use futures::StreamExt; +use futures::FutureExt; use ruma::{OwnedRoomId, OwnedRoomOrAliasId, RoomAliasId, RoomId, RoomOrAliasId}; use crate::{admin_command, admin_command_dispatch, get_room_info}; @@ -155,7 +156,10 @@ async fn ban_room(&self, room: OwnedRoomOrAliasId) -> Result { evicting admins too)", ); - if let Err(e) = leave_room(self.services, user_id, &room_id, None).await { + if let Err(e) = leave_room(self.services, user_id, &room_id, None) + .boxed() + .await + { warn!("Failed to leave room: {e}"); } @@ -323,7 +327,10 @@ async fn ban_list_of_rooms(&self) -> Result { evicting admins too)", ); - if let Err(e) = leave_room(self.services, user_id, &room_id, None).await { + if let Err(e) = leave_room(self.services, user_id, &room_id, None) + .boxed() + .await + { warn!("Failed to leave room: {e}"); } diff --git a/src/admin/user/commands.rs b/src/admin/user/commands.rs index e5e481e5..acd21bb3 100644 --- a/src/admin/user/commands.rs +++ b/src/admin/user/commands.rs @@ -9,6 +9,7 @@ use conduwuit::{ }; use conduwuit_api::client::{leave_all_rooms, update_avatar_url, update_displayname}; use futures::StreamExt; +use futures::FutureExt; use ruma::{ OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedUserId, UserId, events::{ @@ -655,7 +656,9 @@ pub(super) async fn force_leave_room( return Err!("{user_id} is not joined in the room"); } - leave_room(self.services, &user_id, &room_id, None).await?; + leave_room(self.services, &user_id, &room_id, None) + .boxed() + .await?; self.write_str(&format!("{user_id} has left {room_id}.",)) .await diff --git a/src/api/client/account.rs b/src/api/client/account.rs index 32f2530c..05dfa8b7 100644 --- a/src/api/client/account.rs +++ b/src/api/client/account.rs @@ -763,7 +763,9 @@ pub(crate) async fn deactivate_route( super::update_displayname(&services, sender_user, None, &all_joined_rooms).await; super::update_avatar_url(&services, sender_user, None, None, &all_joined_rooms).await; - full_user_deactivate(&services, sender_user, &all_joined_rooms).await?; + full_user_deactivate(&services, sender_user, &all_joined_rooms) + .boxed() + .await?; info!("User {sender_user} deactivated their account."); @@ -915,7 +917,9 @@ pub async fn full_user_deactivate( } } - super::leave_all_rooms(services, user_id).await; + super::leave_all_rooms(services, user_id) + .boxed() + .await; Ok(()) } diff --git a/src/api/client/context.rs b/src/api/client/context.rs index dbc2a22f..ee3a458c 100644 --- a/src/api/client/context.rs +++ b/src/api/client/context.rs @@ -84,11 +84,25 @@ pub(crate) async fn get_context_route( let base_event = ignored_filter(&services, (base_count, base_pdu), sender_user); + // PDUs are used to get seen user IDs and then returned in response. + let events_before = services .rooms .timeline - .pdus_rev(Some(sender_user), room_id, Some(base_count)) + .pdus_rev(room_id, Some(base_count)) .ignore_err() + .then(async |mut pdu| { + pdu.1.set_unsigned(Some(sender_user)); + if let Err(e) = services + .rooms + .pdu_metadata + .add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1) + .await + { + debug_warn!("Failed to add bundled aggregations: {e}"); + } + pdu + }) .ready_filter_map(|item| event_filter(item, filter)) .wide_filter_map(|item| ignored_filter(&services, item, sender_user)) .wide_filter_map(|item| visibility_filter(&services, item, sender_user)) @@ -98,8 +112,20 @@ pub(crate) async fn get_context_route( let events_after = services .rooms .timeline - .pdus(Some(sender_user), room_id, Some(base_count)) + .pdus(room_id, Some(base_count)) .ignore_err() + .then(async |mut pdu| { + pdu.1.set_unsigned(Some(sender_user)); + if let Err(e) = services + .rooms + .pdu_metadata + .add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1) + .await + { + debug_warn!("Failed to add bundled aggregations: {e}"); + } + pdu + }) .ready_filter_map(|item| event_filter(item, filter)) .wide_filter_map(|item| ignored_filter(&services, item, sender_user)) .wide_filter_map(|item| visibility_filter(&services, item, sender_user)) diff --git a/src/api/client/membership.rs b/src/api/client/membership.rs index e587d806..13ff1b92 100644 --- a/src/api/client/membership.rs +++ b/src/api/client/membership.rs @@ -114,7 +114,9 @@ async fn banned_room_check( .collect() .await; - full_user_deactivate(services, user_id, &all_joined_rooms).await?; + full_user_deactivate(services, user_id, &all_joined_rooms) + .boxed() + .await?; } return Err!(Request(Forbidden("This room is banned on this homeserver."))); @@ -153,7 +155,9 @@ async fn banned_room_check( .collect() .await; - full_user_deactivate(services, user_id, &all_joined_rooms).await?; + full_user_deactivate(services, user_id, &all_joined_rooms) + .boxed() + .await?; } return Err!(Request(Forbidden("This remote server is banned on this homeserver."))); @@ -259,6 +263,7 @@ pub(crate) async fn join_room_by_id_or_alias_route( room_id.server_name(), client, ) + .boxed() .await?; let mut servers = body.via.clone(); @@ -478,6 +483,7 @@ pub(crate) async fn leave_room_route( body: Ruma, ) -> Result { leave_room(&services, body.sender_user(), &body.room_id, body.reason.clone()) + .boxed() .await .map(|()| leave_room::v3::Response::new()) } @@ -1243,6 +1249,7 @@ async fn join_room_by_id_helper_remote( services.rooms.timeline.get_pdu(event_id).await.ok() }; + debug!("running stateres check on send_join parsed PDU"); let auth_check = state_res::event_auth::auth_check( &state_res::RoomVersion::new(&room_version_id)?, &parsed_join_pdu, @@ -1792,7 +1799,10 @@ pub async fn leave_all_rooms(services: &Services, user_id: &UserId) { for room_id in all_rooms { // ignore errors - if let Err(e) = leave_room(services, user_id, &room_id, None).await { + if let Err(e) = leave_room(services, user_id, &room_id, None) + .boxed() + .await + { warn!(%user_id, "Failed to leave {room_id} remotely: {e}"); } diff --git a/src/api/client/message.rs b/src/api/client/message.rs index e442850b..6087478c 100644 --- a/src/api/client/message.rs +++ b/src/api/client/message.rs @@ -2,7 +2,7 @@ use core::panic; use axum::extract::State; use conduwuit::{ - Err, Result, at, + Err, Result, at, debug_warn, matrix::{ Event, pdu::{PduCount, PduEvent}, @@ -114,14 +114,14 @@ pub(crate) async fn get_message_events_route( | Direction::Forward => services .rooms .timeline - .pdus(Some(sender_user), room_id, Some(from)) + .pdus(room_id, Some(from)) .ignore_err() .boxed(), | Direction::Backward => services .rooms .timeline - .pdus_rev(Some(sender_user), room_id, Some(from)) + .pdus_rev(room_id, Some(from)) .ignore_err() .boxed(), }; @@ -132,6 +132,18 @@ pub(crate) async fn get_message_events_route( .wide_filter_map(|item| ignored_filter(&services, item, sender_user)) .wide_filter_map(|item| visibility_filter(&services, item, sender_user)) .take(limit) + .then(async |mut pdu| { + pdu.1.set_unsigned(Some(sender_user)); + if let Err(e) = services + .rooms + .pdu_metadata + .add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1) + .await + { + debug_warn!("Failed to add bundled aggregations: {e}"); + } + pdu + }) .collect() .await; diff --git a/src/api/client/relations.rs b/src/api/client/relations.rs index b8c2dd4d..377f0c71 100644 --- a/src/api/client/relations.rs +++ b/src/api/client/relations.rs @@ -1,6 +1,6 @@ use axum::extract::State; use conduwuit::{ - Result, at, + Result, at, debug_warn, matrix::pdu::PduCount, utils::{IterStream, ReadyExt, result::FlatOk, stream::WidebandExt}, }; @@ -149,6 +149,17 @@ async fn paginate_relations_with_filter( .ready_take_while(|(count, _)| Some(*count) != to) .wide_filter_map(|item| visibility_filter(services, sender_user, item)) .take(limit) + .then(async |mut pdu| { + if let Err(e) = services + .rooms + .pdu_metadata + .add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1) + .await + { + debug_warn!("Failed to add bundled aggregations to relation: {e}"); + } + pdu + }) .collect() .await; @@ -172,6 +183,10 @@ async fn paginate_relations_with_filter( }) } +// TODO: Can we move the visibility filter lower down, to avoid checking events +// that won't be sent? At the moment this also results in getting events that +// appear to have no relation because intermediaries are not visible to the +// user. async fn visibility_filter( services: &Services, sender_user: &UserId, diff --git a/src/api/client/room/event.rs b/src/api/client/room/event.rs index 2b115b5c..e7b0bb37 100644 --- a/src/api/client/room/event.rs +++ b/src/api/client/room/event.rs @@ -1,5 +1,5 @@ use axum::extract::State; -use conduwuit::{Err, Event, Result, err}; +use conduwuit::{Err, Event, Result, debug_warn, err}; use futures::{FutureExt, TryFutureExt, future::try_join}; use ruma::api::client::room::get_room_event; @@ -38,7 +38,16 @@ pub(crate) async fn get_room_event_route( "Fetched PDU must match requested" ); - event.add_age().ok(); + if let Err(e) = services + .rooms + .pdu_metadata + .add_bundled_aggregations_to_pdu(body.sender_user(), &mut event) + .await + { + debug_warn!("Failed to add bundled aggregations to event: {e}"); + } + + event.set_unsigned(body.sender_user.as_deref()); Ok(get_room_event::v3::Response { event: event.into_room_event() }) } diff --git a/src/api/client/room/initial_sync.rs b/src/api/client/room/initial_sync.rs index ca63610b..2f965245 100644 --- a/src/api/client/room/initial_sync.rs +++ b/src/api/client/room/initial_sync.rs @@ -1,6 +1,6 @@ use axum::extract::State; use conduwuit::{ - Err, PduEvent, Result, at, + Err, PduEvent, Result, at, debug_warn, utils::{BoolExt, stream::TryTools}, }; use futures::TryStreamExt; @@ -25,12 +25,28 @@ pub(crate) async fn room_initial_sync_route( return Err!(Request(Forbidden("No room preview available."))); } + // Events are returned in body + let limit = LIMIT_MAX; let events: Vec<_> = services .rooms .timeline - .pdus_rev(None, room_id, None) + .pdus_rev(room_id, None) .try_take(limit) + .and_then(async |mut pdu| { + pdu.1.set_unsigned(body.sender_user.as_deref()); + if let Some(sender_user) = body.sender_user.as_deref() { + if let Err(e) = services + .rooms + .pdu_metadata + .add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1) + .await + { + debug_warn!("Failed to add bundled aggregations: {e}"); + } + } + Ok(pdu) + }) .try_collect() .await?; diff --git a/src/api/client/search.rs b/src/api/client/search.rs index d4dcde57..af5fccec 100644 --- a/src/api/client/search.rs +++ b/src/api/client/search.rs @@ -2,7 +2,7 @@ use std::collections::BTreeMap; use axum::extract::State; use conduwuit::{ - Err, Result, at, is_true, + Err, Result, at, debug_warn, is_true, matrix::pdu::PduEvent, result::FlatOk, utils::{IterStream, stream::ReadyExt}, @@ -144,6 +144,17 @@ async fn category_room_events( .map(at!(2)) .flatten() .stream() + .then(|mut pdu| async { + if let Err(e) = services + .rooms + .pdu_metadata + .add_bundled_aggregations_to_pdu(sender_user, &mut pdu) + .await + { + debug_warn!("Failed to add bundled aggregations to search result: {e}"); + } + pdu + }) .map(PduEvent::into_room_event) .map(|result| SearchResult { rank: None, diff --git a/src/api/client/space.rs b/src/api/client/space.rs index 92768926..23b1e80f 100644 --- a/src/api/client/space.rs +++ b/src/api/client/space.rs @@ -121,7 +121,9 @@ where .map(|(key, val)| (key, val.collect())) .collect(); - if !populate { + if populate { + rooms.push(summary_to_chunk(summary.clone())); + } else { children = children .iter() .rev() @@ -144,10 +146,8 @@ where .collect(); } - if populate { - rooms.push(summary_to_chunk(summary.clone())); - } else if queue.is_empty() && children.is_empty() { - return Err!(Request(InvalidParam("Room IDs in token were not found."))); + if !populate && queue.is_empty() && children.is_empty() { + break; } parents.insert(current_room.clone()); diff --git a/src/api/client/state.rs b/src/api/client/state.rs index 2ddc8f14..96b7e7aa 100644 --- a/src/api/client/state.rs +++ b/src/api/client/state.rs @@ -6,6 +6,7 @@ use conduwuit::{ }; use conduwuit_service::Services; use futures::TryStreamExt; +use futures::FutureExt; use ruma::{ OwnedEventId, RoomId, UserId, api::client::state::{get_state_events, get_state_events_for_key, send_state_event}, @@ -59,6 +60,7 @@ pub(crate) async fn send_state_event_for_empty_key_route( body: Ruma, ) -> Result> { send_state_event_for_key_route(State(services), body) + .boxed() .await .map(RumaResponse) } diff --git a/src/api/client/sync/mod.rs b/src/api/client/sync/mod.rs index 40370160..1ea62883 100644 --- a/src/api/client/sync/mod.rs +++ b/src/api/client/sync/mod.rs @@ -3,7 +3,7 @@ mod v4; mod v5; use conduwuit::{ - Error, PduCount, Result, + Error, PduCount, Result, debug_warn, matrix::pdu::PduEvent, utils::stream::{BroadbandExt, ReadyExt, TryIgnore}, }; @@ -31,11 +31,7 @@ async fn load_timeline( next_batch: Option, limit: usize, ) -> Result<(Vec<(PduCount, PduEvent)>, bool), Error> { - let last_timeline_count = services - .rooms - .timeline - .last_timeline_count(Some(sender_user), room_id) - .await?; + let last_timeline_count = services.rooms.timeline.last_timeline_count(room_id).await?; if last_timeline_count <= roomsincecount { return Ok((Vec::new(), false)); @@ -44,10 +40,25 @@ async fn load_timeline( let non_timeline_pdus = services .rooms .timeline - .pdus_rev(Some(sender_user), room_id, None) + .pdus_rev(room_id, None) .ignore_err() .ready_skip_while(|&(pducount, _)| pducount > next_batch.unwrap_or_else(PduCount::max)) - .ready_take_while(|&(pducount, _)| pducount > roomsincecount); + .ready_take_while(|&(pducount, _)| pducount > roomsincecount) + .map(move |mut pdu| { + pdu.1.set_unsigned(Some(sender_user)); + pdu + }) + .then(async move |mut pdu| { + if let Err(e) = services + .rooms + .pdu_metadata + .add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1) + .await + { + debug_warn!("Failed to add bundled aggregations: {e}"); + } + pdu + }); // Take the last events for the timeline pin_mut!(non_timeline_pdus); diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index 8eac6b66..7eb8c7e1 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -1009,8 +1009,6 @@ async fn calculate_state_incremental<'a>( ) -> Result { let since_shortstatehash = since_shortstatehash.unwrap_or(current_shortstatehash); - let state_changed = since_shortstatehash != current_shortstatehash; - let encrypted_room = services .rooms .state_accessor @@ -1042,7 +1040,7 @@ async fn calculate_state_incremental<'a>( }) .into(); - let state_diff_ids: OptionFuture<_> = (!full_state && state_changed) + let state_diff_ids: OptionFuture<_> = (!full_state) .then(|| { StreamExt::into_future( services @@ -1191,7 +1189,7 @@ async fn calculate_heroes( services .rooms .timeline - .all_pdus(sender_user, room_id) + .all_pdus(room_id) .ready_filter(|(_, pdu)| pdu.kind == RoomMember) .fold_default(|heroes: Vec<_>, (_, pdu)| { fold_hero(heroes, services, room_id, sender_user, pdu) diff --git a/src/api/client/threads.rs b/src/api/client/threads.rs index 5b838bef..09fb75d6 100644 --- a/src/api/client/threads.rs +++ b/src/api/client/threads.rs @@ -1,6 +1,6 @@ use axum::extract::State; use conduwuit::{ - Result, at, + Result, at, debug_warn, matrix::pdu::{PduCount, PduEvent}, }; use futures::StreamExt; @@ -28,6 +28,8 @@ pub(crate) async fn get_threads_route( .transpose()? .unwrap_or_else(PduCount::max); + // TODO: user_can_see_event and set_unsigned should be at the same level / + // function, so unsigned is only set for seen events. let threads: Vec<(PduCount, PduEvent)> = services .rooms .threads @@ -42,6 +44,17 @@ pub(crate) async fn get_threads_route( .await .then_some((count, pdu)) }) + .then(|(count, mut pdu)| async move { + if let Err(e) = services + .rooms + .pdu_metadata + .add_bundled_aggregations_to_pdu(body.sender_user(), &mut pdu) + .await + { + debug_warn!("Failed to add bundled aggregations to thread: {e}"); + } + (count, pdu) + }) .collect() .await; diff --git a/src/api/server/backfill.rs b/src/api/server/backfill.rs index 3cfbcedc..058fc273 100644 --- a/src/api/server/backfill.rs +++ b/src/api/server/backfill.rs @@ -3,6 +3,7 @@ use std::cmp; use axum::extract::State; use conduwuit::{ PduCount, Result, + result::LogErr, utils::{IterStream, ReadyExt, stream::TryTools}, }; use futures::{FutureExt, StreamExt, TryStreamExt}; @@ -62,7 +63,7 @@ pub(crate) async fn get_backfill_route( pdus: services .rooms .timeline - .pdus_rev(None, &body.room_id, Some(from.saturating_add(1))) + .pdus_rev(&body.room_id, Some(from.saturating_add(1))) .try_take(limit) .try_filter_map(|(_, pdu)| async move { Ok(services @@ -72,6 +73,15 @@ pub(crate) async fn get_backfill_route( .await .then_some(pdu)) }) + .and_then(async |mut pdu| { + // Strip the transaction ID, as that is private + pdu.remove_transaction_id().log_err().ok(); + // Add age, as this is specified + pdu.add_age().log_err().ok(); + // It's not clear if we should strip or add any more data, leave as is. + // In particular: Redaction? + Ok(pdu) + }) .try_filter_map(|pdu| async move { Ok(services .rooms diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index d4a10345..14e8fa29 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -1155,7 +1155,7 @@ pub struct Config { /// 3 to 5 = Statistics with possible performance impact. /// 6 = All statistics. /// - /// default: 1 + /// default: 3 #[serde(default = "default_rocksdb_stats_level")] pub rocksdb_stats_level: u8, @@ -1823,12 +1823,10 @@ pub struct Config { pub stream_amplification: usize, /// Number of sender task workers; determines sender parallelism. Default is - /// '0' which means the value is determined internally, likely matching the - /// number of tokio worker-threads or number of cores, etc. Override by - /// setting a non-zero value. + /// '4'. Override by setting a different value. Values clamped 1 to core count. /// - /// default: 0 - #[serde(default)] + /// default: 4 + #[serde(default = "default_sender_workers")] pub sender_workers: usize, /// Enables listener sockets; can be set to false to disable listening. This @@ -2059,45 +2057,47 @@ fn default_database_backups_to_keep() -> i16 { 1 } fn default_db_write_buffer_capacity_mb() -> f64 { 48.0 + parallelism_scaled_f64(4.0) } -fn default_db_cache_capacity_mb() -> f64 { 128.0 + parallelism_scaled_f64(64.0) } +fn default_db_cache_capacity_mb() -> f64 { 512.0 + parallelism_scaled_f64(512.0) } -fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(10_000).saturating_add(100_000) } +fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(50_000).saturating_add(500_000) } fn default_cache_capacity_modifier() -> f64 { 1.0 } fn default_auth_chain_cache_capacity() -> u32 { - parallelism_scaled_u32(10_000).saturating_add(100_000) + parallelism_scaled_u32(50_000).saturating_add(500_000) } fn default_shorteventid_cache_capacity() -> u32 { - parallelism_scaled_u32(50_000).saturating_add(100_000) -} - -fn default_eventidshort_cache_capacity() -> u32 { - parallelism_scaled_u32(25_000).saturating_add(100_000) -} - -fn default_eventid_pdu_cache_capacity() -> u32 { - parallelism_scaled_u32(25_000).saturating_add(100_000) -} - -fn default_shortstatekey_cache_capacity() -> u32 { - parallelism_scaled_u32(10_000).saturating_add(100_000) -} - -fn default_statekeyshort_cache_capacity() -> u32 { - parallelism_scaled_u32(10_000).saturating_add(100_000) -} - -fn default_servernameevent_data_cache_capacity() -> u32 { parallelism_scaled_u32(100_000).saturating_add(500_000) } -fn default_stateinfo_cache_capacity() -> u32 { parallelism_scaled_u32(100) } +fn default_eventidshort_cache_capacity() -> u32 { + parallelism_scaled_u32(100_000).saturating_add(500_000) +} -fn default_roomid_spacehierarchy_cache_capacity() -> u32 { parallelism_scaled_u32(1000) } +fn default_eventid_pdu_cache_capacity() -> u32 { + parallelism_scaled_u32(50_000).saturating_add(500_000) +} -fn default_dns_cache_entries() -> u32 { 32768 } +fn default_shortstatekey_cache_capacity() -> u32 { + parallelism_scaled_u32(50_000).saturating_add(500_000) +} + +fn default_statekeyshort_cache_capacity() -> u32 { + parallelism_scaled_u32(50_000).saturating_add(500_000) +} + +fn default_servernameevent_data_cache_capacity() -> u32 { + parallelism_scaled_u32(200_000).saturating_add(500_000) +} + +fn default_stateinfo_cache_capacity() -> u32 { + parallelism_scaled_u32(500).clamp(100, 12000) } + +fn default_roomid_spacehierarchy_cache_capacity() -> u32 { + parallelism_scaled_u32(500).clamp(100, 12000) } + +fn default_dns_cache_entries() -> u32 { 327680 } fn default_dns_min_ttl() -> u64 { 60 * 180 } @@ -2199,7 +2199,7 @@ fn default_typing_client_timeout_max_s() -> u64 { 45 } fn default_rocksdb_recovery_mode() -> u8 { 1 } -fn default_rocksdb_log_level() -> String { "error".to_owned() } +fn default_rocksdb_log_level() -> String { "info".to_owned() } fn default_rocksdb_log_time_to_roll() -> usize { 0 } @@ -2231,7 +2231,7 @@ fn default_rocksdb_compression_level() -> i32 { 32767 } #[allow(clippy::doc_markdown)] fn default_rocksdb_bottommost_compression_level() -> i32 { 32767 } -fn default_rocksdb_stats_level() -> u8 { 1 } +fn default_rocksdb_stats_level() -> u8 { 3 } // I know, it's a great name #[must_use] @@ -2286,14 +2286,13 @@ fn default_admin_log_capture() -> String { fn default_admin_room_tag() -> String { "m.server_notice".to_owned() } #[allow(clippy::as_conversions, clippy::cast_precision_loss)] -fn parallelism_scaled_f64(val: f64) -> f64 { val * (sys::available_parallelism() as f64) } +pub fn parallelism_scaled_f64(val: f64) -> f64 { val * (sys::available_parallelism() as f64) } -fn parallelism_scaled_u32(val: u32) -> u32 { - let val = val.try_into().expect("failed to cast u32 to usize"); - parallelism_scaled(val).try_into().unwrap_or(u32::MAX) -} +pub fn parallelism_scaled_u32(val: u32) -> u32 { val.saturating_mul(sys::available_parallelism() as u32) } -fn parallelism_scaled(val: usize) -> usize { val.saturating_mul(sys::available_parallelism()) } +pub fn parallelism_scaled_i32(val: i32) -> i32 { val.saturating_mul(sys::available_parallelism() as i32) } + +pub fn parallelism_scaled(val: usize) -> usize { val.saturating_mul(sys::available_parallelism()) } fn default_trusted_server_batch_size() -> usize { 256 } @@ -2313,6 +2312,8 @@ fn default_stream_width_scale() -> f32 { 1.0 } fn default_stream_amplification() -> usize { 1024 } +fn default_sender_workers() -> usize { 4 } + fn default_client_receive_timeout() -> u64 { 75 } fn default_client_request_timeout() -> u64 { 180 } diff --git a/src/core/matrix/pdu/unsigned.rs b/src/core/matrix/pdu/unsigned.rs index 23897519..2726a292 100644 --- a/src/core/matrix/pdu/unsigned.rs +++ b/src/core/matrix/pdu/unsigned.rs @@ -1,11 +1,24 @@ -use std::collections::BTreeMap; +use std::{borrow::Borrow, collections::BTreeMap}; use ruma::MilliSecondsSinceUnixEpoch; use serde::Deserialize; use serde_json::value::{RawValue as RawJsonValue, Value as JsonValue, to_raw_value}; use super::Pdu; -use crate::{Result, err, implement, is_true}; +use crate::{Result, err, implement, is_true, result::LogErr}; + +/// Set the `unsigned` field of the PDU using only information in the PDU. +/// Some unsigned data is already set within the database (eg. prev events, +/// threads). Once this is done, other data must be calculated from the database +/// (eg. relations) This is for server-to-client events. +/// Backfill handles this itself. +#[implement(Pdu)] +pub fn set_unsigned(&mut self, user_id: Option<&ruma::UserId>) { + if Some(self.sender.borrow()) != user_id { + self.remove_transaction_id().log_err().ok(); + } + self.add_age().log_err().ok(); +} #[implement(Pdu)] pub fn remove_transaction_id(&mut self) -> Result { diff --git a/src/core/matrix/state_res/event_auth.rs b/src/core/matrix/state_res/event_auth.rs index 759ab5cb..67283b6a 100644 --- a/src/core/matrix/state_res/event_auth.rs +++ b/src/core/matrix/state_res/event_auth.rs @@ -13,6 +13,7 @@ use ruma::{ power_levels::RoomPowerLevelsEventContent, third_party_invite::RoomThirdPartyInviteEventContent, }, + EventId, int, serde::{Base64, Raw}, }; @@ -21,7 +22,6 @@ use serde::{ de::{Error as _, IgnoredAny}, }; use serde_json::{from_str as from_json_str, value::RawValue as RawJsonValue}; - use super::{ Error, Event, Result, StateEventType, StateKey, TimelineEventType, power_levels::{ @@ -217,8 +217,9 @@ where } /* - // TODO: In the past this code caused problems federating with synapse, maybe this has been - // resolved already. Needs testing. + // TODO: In the past this code was commented as it caused problems with Synapse. This is no + // longer the case. This needs to be implemented. + // See also: https://github.com/ruma/ruma/pull/2064 // // 2. Reject if auth_events // a. auth_events cannot have duplicate keys since it's a BTree @@ -241,20 +242,46 @@ where } */ - let (room_create_event, power_levels_event, sender_member_event) = join3( - fetch_state(&StateEventType::RoomCreate, ""), - fetch_state(&StateEventType::RoomPowerLevels, ""), - fetch_state(&StateEventType::RoomMember, sender.as_str()), - ) - .await; + // let (room_create_event, power_levels_event, sender_member_event) = join3( + // fetch_state(&StateEventType::RoomCreate, ""), + // fetch_state(&StateEventType::RoomPowerLevels, ""), + // fetch_state(&StateEventType::RoomMember, sender.as_str()), + // ) + // .await; + + let room_create_event = fetch_state(&StateEventType::RoomCreate, "").await; + let power_levels_event = fetch_state(&StateEventType::RoomPowerLevels, "").await; + let sender_member_event = fetch_state(&StateEventType::RoomMember, sender.as_str()).await; let room_create_event = match room_create_event { | None => { - warn!("no m.room.create event in auth chain"); + error!( + create_event = room_create_event.as_ref().map(Event::event_id).unwrap_or(<&EventId>::try_from("$unknown").unwrap()).as_str(), + power_levels = power_levels_event.as_ref().map(Event::event_id).unwrap_or(<&EventId>::try_from("$unknown").unwrap()).as_str(), + member_event = sender_member_event.as_ref().map(Event::event_id).unwrap_or(<&EventId>::try_from("$unknown").unwrap()).as_str(), + "no m.room.create event found for {} ({})!", + incoming_event.event_id().as_str(), + incoming_event.room_id().as_str() + ); return Ok(false); }, | Some(e) => e, }; + // just re-check 1.2 to work around a bug + let Some(room_id_server_name) = incoming_event.room_id().server_name() else { + warn!("room ID has no servername"); + return Ok(false); + }; + + if room_id_server_name != room_create_event.sender().server_name() { + warn!( + "servername of room ID origin ({}) does not match servername of m.room.create \ + sender ({})", + room_id_server_name, + room_create_event.sender().server_name() + ); + return Ok(false); + } // 3. If event does not have m.room.create in auth_events reject if !incoming_event diff --git a/src/core/matrix/state_res/mod.rs b/src/core/matrix/state_res/mod.rs index 651f6130..ea49b29e 100644 --- a/src/core/matrix/state_res/mod.rs +++ b/src/core/matrix/state_res/mod.rs @@ -609,7 +609,7 @@ where let fetch_state = |ty: &StateEventType, key: &str| { future::ready(auth_state.get(&ty.with_state_key(key))) }; - + debug!("running auth check on {:?}", event.event_id()); let auth_result = auth_check(room_version, &event, current_third_party.as_ref(), fetch_state).await; @@ -726,8 +726,12 @@ where Fut: Future> + Send, E: Event + Send + Sync, { + let mut room_id = None; while let Some(sort_ev) = event { - debug!(event_id = sort_ev.event_id().as_str(), "mainline"); + trace!(event_id = sort_ev.event_id().as_str(), "mainline"); + if room_id.is_none() { + room_id = Some(sort_ev.room_id().to_owned()); + } let id = sort_ev.event_id(); if let Some(depth) = mainline_map.get(id) { @@ -746,7 +750,7 @@ where } } } - // Did not find a power level event so we default to zero + warn!("could not find a power event in the mainline map for {room_id:?}, defaulting to zero depth"); Ok(0) } diff --git a/src/database/engine/cf_opts.rs b/src/database/engine/cf_opts.rs index cbbd1012..666f9f9e 100644 --- a/src/database/engine/cf_opts.rs +++ b/src/database/engine/cf_opts.rs @@ -29,7 +29,7 @@ fn descriptor_cf_options( set_table_options(&mut opts, &desc, cache)?; opts.set_min_write_buffer_number(1); - opts.set_max_write_buffer_number(2); + opts.set_max_write_buffer_number(3); opts.set_write_buffer_size(desc.write_size); opts.set_target_file_size_base(desc.file_size); diff --git a/src/database/engine/db_opts.rs b/src/database/engine/db_opts.rs index 18cec742..1299443d 100644 --- a/src/database/engine/db_opts.rs +++ b/src/database/engine/db_opts.rs @@ -1,8 +1,6 @@ -use std::{cmp, convert::TryFrom}; - -use conduwuit::{Config, Result, utils}; +use conduwuit::{Config, Result}; use rocksdb::{Cache, DBRecoveryMode, Env, LogLevel, Options, statistics::StatsLevel}; - +use conduwuit::config::{parallelism_scaled_i32, parallelism_scaled_u32}; use super::{cf_opts::cache_size_f64, logger::handle as handle_log}; /// Create database-wide options suitable for opening the database. This also @@ -23,8 +21,8 @@ pub(crate) fn db_options(config: &Config, env: &Env, row_cache: &Cache) -> Resul set_logging_defaults(&mut opts, config); // Processing - opts.set_max_background_jobs(num_threads::(config)?); - opts.set_max_subcompactions(num_threads::(config)?); + opts.set_max_background_jobs(parallelism_scaled_i32(1)); + opts.set_max_subcompactions(parallelism_scaled_u32(1)); opts.set_avoid_unnecessary_blocking_io(true); opts.set_max_file_opening_threads(0); @@ -126,15 +124,3 @@ fn set_logging_defaults(opts: &mut Options, config: &Config) { opts.set_callback_logger(rocksdb_log_level, &handle_log); } } - -fn num_threads>(config: &Config) -> Result { - const MIN_PARALLELISM: usize = 2; - - let requested = if config.rocksdb_parallelism_threads != 0 { - config.rocksdb_parallelism_threads - } else { - utils::available_parallelism() - }; - - utils::math::try_into::(cmp::max(MIN_PARALLELISM, requested)) -} diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index 683f5400..6b064424 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -4,7 +4,6 @@ mod execute; mod grant; use std::{ - future::Future, pin::Pin, sync::{Arc, RwLock as StdRwLock, Weak}, }; @@ -14,7 +13,7 @@ use conduwuit::{ Error, PduEvent, Result, Server, debug, err, error, error::default_log, pdu::PduBuilder, }; pub use create::create_admin_room; -use futures::{FutureExt, TryFutureExt}; +use futures::{Future, FutureExt, TryFutureExt}; use loole::{Receiver, Sender}; use ruma::{ OwnedEventId, OwnedRoomId, RoomId, UserId, diff --git a/src/service/announcements/mod.rs b/src/service/announcements/mod.rs index 4df8971b..2a70344d 100644 --- a/src/service/announcements/mod.rs +++ b/src/service/announcements/mod.rs @@ -20,7 +20,7 @@ use std::{sync::Arc, time::Duration}; use async_trait::async_trait; use conduwuit::{Result, Server, debug, info, warn}; use database::{Deserialized, Map}; -use ruma::events::room::message::RoomMessageEventContent; +use ruma::events::{Mentions, room::message::RoomMessageEventContent}; use serde::Deserialize; use tokio::{ sync::Notify, @@ -53,6 +53,8 @@ struct CheckForAnnouncementsResponseEntry { id: u64, date: Option, message: String, + #[serde(default, skip_serializing_if = "bool::not")] + mention_room: bool, } const CHECK_FOR_ANNOUNCEMENTS_URL: &str = @@ -139,19 +141,20 @@ impl Service { } else { info!("[announcements] {:#}", announcement.message); } + let mut message = RoomMessageEventContent::text_markdown(format!( + "### New announcement{}\n\n{}", + announcement + .date + .as_ref() + .map_or_else(String::new, |date| format!(" - `{date}`")), + announcement.message + )); - self.services - .admin - .send_message(RoomMessageEventContent::text_markdown(format!( - "### New announcement{}\n\n{}", - announcement - .date - .as_ref() - .map_or_else(String::new, |date| format!(" - `{date}`")), - announcement.message - ))) - .await - .ok(); + if announcement.mention_room { + message = message.add_mentions(Mentions::with_room_mention()); + } + + self.services.admin.send_message(message).await.ok(); } #[inline] diff --git a/src/service/resolver/actual.rs b/src/service/resolver/actual.rs index d23ef95a..52cd5d7d 100644 --- a/src/service/resolver/actual.rs +++ b/src/service/resolver/actual.rs @@ -306,28 +306,25 @@ impl super::Service { #[tracing::instrument(name = "srv", level = "debug", skip(self))] async fn query_srv_record(&self, hostname: &'_ str) -> Result> { - let hostnames = - [format!("_matrix-fed._tcp.{hostname}."), format!("_matrix._tcp.{hostname}.")]; + self.services.server.check_running()?; - for hostname in hostnames { - self.services.server.check_running()?; + debug!("querying SRV for {hostname:?}"); - debug!("querying SRV for {hostname:?}"); - let hostname = hostname.trim_end_matches('.'); - match self.resolver.resolver.srv_lookup(hostname).await { - | Err(e) => Self::handle_resolve_error(&e, hostname)?, - | Ok(result) => { - return Ok(result.iter().next().map(|result| { - FedDest::Named( - result.target().to_string().trim_end_matches('.').to_owned(), - format!(":{}", result.port()) - .as_str() - .try_into() - .unwrap_or_else(|_| FedDest::default_port()), - ) - })); - }, - } + let hostname_suffix = format!("_matrix-fed._tcp.{hostname}."); + let hostname = hostname_suffix.trim_end_matches('.'); + match self.resolver.resolver.srv_lookup(hostname).await { + | Err(e) => Self::handle_resolve_error(&e, hostname)?, + | Ok(result) => { + return Ok(result.iter().next().map(|result| { + FedDest::Named( + result.target().to_string().trim_end_matches('.').to_owned(), + format!(":{}", result.port()) + .as_str() + .try_into() + .unwrap_or_else(|_| FedDest::default_port()), + ) + })); + }, } Ok(None) diff --git a/src/service/rooms/event_handler/handle_outlier_pdu.rs b/src/service/rooms/event_handler/handle_outlier_pdu.rs index 5339249d..87b76222 100644 --- a/src/service/rooms/event_handler/handle_outlier_pdu.rs +++ b/src/service/rooms/event_handler/handle_outlier_pdu.rs @@ -76,7 +76,7 @@ pub(super) async fn handle_outlier_pdu<'a>( // 5. Reject "due to auth events" if can't get all the auth events or some of // the auth events are also rejected "due to auth events" // NOTE: Step 5 is not applied anymore because it failed too often - debug!("Fetching auth events"); + debug!("Fetching auth events for {}", incoming_pdu.event_id); Box::pin(self.fetch_and_handle_outliers( origin, &incoming_pdu.auth_events, @@ -88,12 +88,12 @@ pub(super) async fn handle_outlier_pdu<'a>( // 6. Reject "due to auth events" if the event doesn't pass auth based on the // auth events - debug!("Checking based on auth events"); + debug!("Checking {} based on auth events", incoming_pdu.event_id); // Build map of auth events let mut auth_events = HashMap::with_capacity(incoming_pdu.auth_events.len()); for id in &incoming_pdu.auth_events { let Ok(auth_event) = self.services.timeline.get_pdu(id).await else { - warn!("Could not find auth event {id}"); + warn!("Could not find auth event {id} for {}", incoming_pdu.event_id); continue; }; @@ -119,10 +119,7 @@ pub(super) async fn handle_outlier_pdu<'a>( } // The original create event must be in the auth events - if !matches!( - auth_events.get(&(StateEventType::RoomCreate, String::new().into())), - Some(_) | None - ) { + if !auth_events.contains_key(&(StateEventType::RoomCreate, String::new().into())) { return Err!(Request(InvalidParam("Incoming event refers to wrong create event."))); } @@ -131,6 +128,7 @@ pub(super) async fn handle_outlier_pdu<'a>( ready(auth_events.get(&key)) }; + debug!("running auth check to handle outlier pdu {:?}", incoming_pdu.event_id); let auth_check = state_res::event_auth::auth_check( &to_room_version(&room_version_id), &incoming_pdu, diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index 97d3df97..0dca2d70 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -1,12 +1,6 @@ use std::{borrow::Borrow, collections::BTreeMap, iter::once, sync::Arc, time::Instant}; -use conduwuit::{ - Err, Result, debug, debug_info, err, implement, - matrix::{EventTypeExt, PduEvent, StateKey, state_res}, - trace, - utils::stream::{BroadbandExt, ReadyExt}, - warn, -}; +use conduwuit::{Err, Result, debug, debug_info, err, implement, matrix::{EventTypeExt, PduEvent, StateKey, state_res}, trace, utils::stream::{BroadbandExt, ReadyExt}, warn, info}; use futures::{FutureExt, StreamExt, future::ready}; use ruma::{CanonicalJsonValue, RoomId, ServerName, events::StateEventType}; @@ -44,7 +38,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( return Err!(Request(InvalidParam("Event has been soft failed"))); } - debug!("Upgrading to timeline pdu"); + debug!("Upgrading pdu {} from outlier to timeline pdu", incoming_pdu.event_id); let timer = Instant::now(); let room_version_id = get_room_version_id(create_event)?; @@ -52,7 +46,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( // backwards extremities doing all the checks in this list starting at 1. // These are not timeline events. - debug!("Resolving state at event"); + debug!("Resolving state at event {}", incoming_pdu.event_id); let mut state_at_incoming_event = if incoming_pdu.prev_events.len() == 1 { self.state_at_incoming_degree_one(&incoming_pdu).await? } else { @@ -70,7 +64,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( state_at_incoming_event.expect("we always set this to some above"); let room_version = to_room_version(&room_version_id); - debug!("Performing auth check"); + debug!("Performing auth check to upgrade {}", incoming_pdu.event_id); // 11. Check the auth of the event passes based on the state of the event let state_fetch_state = &state_at_incoming_event; let state_fetch = |k: StateEventType, s: StateKey| async move { @@ -80,6 +74,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( self.services.timeline.get_pdu(event_id).await.ok() }; + debug!("running auth check on {}", incoming_pdu.event_id); let auth_check = state_res::event_auth::auth_check( &room_version, &incoming_pdu, @@ -93,7 +88,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( return Err!(Request(Forbidden("Event has failed auth check with state at the event."))); } - debug!("Gathering auth events"); + debug!("Gathering auth events for {}", incoming_pdu.event_id); let auth_events = self .services .state @@ -111,6 +106,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( ready(auth_events.get(&key).cloned()) }; + debug!("running auth check on {} with claimed state auth", incoming_pdu.event_id); let auth_check = state_res::event_auth::auth_check( &room_version, &incoming_pdu, @@ -121,7 +117,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( .map_err(|e| err!(Request(Forbidden("Auth check failed: {e:?}"))))?; // Soft fail check before doing state res - debug!("Performing soft-fail check"); + debug!("Performing soft-fail check on {}", incoming_pdu.event_id); let soft_fail = match (auth_check, incoming_pdu.redacts_id(&room_version_id)) { | (false, _) => true, | (true, None) => false, @@ -218,7 +214,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( // 14. Check if the event passes auth based on the "current state" of the room, // if not soft fail it if soft_fail { - debug!("Soft failing event"); + info!("Soft failing event {}", incoming_pdu.event_id); let extremities = extremities.iter().map(Borrow::borrow); self.services diff --git a/src/service/rooms/pdu_metadata/bundled_aggregations.rs b/src/service/rooms/pdu_metadata/bundled_aggregations.rs new file mode 100644 index 00000000..c47f637a --- /dev/null +++ b/src/service/rooms/pdu_metadata/bundled_aggregations.rs @@ -0,0 +1,765 @@ +use conduwuit::{Event, PduEvent, Result, err}; +use ruma::{ + EventId, RoomId, UserId, + api::Direction, + events::relation::{BundledMessageLikeRelations, BundledReference, ReferenceChunk}, +}; + +use super::PdusIterItem; + +const MAX_BUNDLED_RELATIONS: usize = 50; + +impl super::Service { + /// Gets bundled aggregations for an event according to the Matrix + /// specification. + /// - m.replace relations are bundled to include the most recent replacement + /// event. + /// - m.reference relations are bundled to include a chunk of event IDs. + #[tracing::instrument(skip(self), level = "debug")] + pub async fn get_bundled_aggregations( + &self, + user_id: &UserId, + room_id: &RoomId, + event_id: &EventId, + ) -> Result>>> { + let relations = self + .get_relations( + user_id, + room_id, + event_id, + conduwuit::PduCount::max(), + MAX_BUNDLED_RELATIONS, + 0, + Direction::Backward, + ) + .await; + // The relations database code still handles the basic unsigned data + // We don't want to recursively fetch relations + + // TODO: Event visibility check + // TODO: ignored users? + + if relations.is_empty() { + return Ok(None); + } + + // Get the original event for validation of replacement events + let original_event = self.services.timeline.get_pdu(event_id).await?; + + let mut replace_events = Vec::with_capacity(relations.len()); + let mut reference_events = Vec::with_capacity(relations.len()); + + for relation in &relations { + let pdu = &relation.1; + + let content = pdu.get_content_as_value(); + if let Some(relates_to) = content.get("m.relates_to") { + // We don't check that the event relates back, because we assume the database is + // good. + if let Some(rel_type) = relates_to.get("rel_type") { + match rel_type.as_str() { + | Some("m.replace") => { + // Only consider valid replacements + if Self::is_valid_replacement_event(&original_event, pdu).await? { + replace_events.push(relation); + } + }, + | Some("m.reference") => { + reference_events.push(relation); + }, + | _ => { + // Ignore other relation types for now + // Threads are in the database but not handled here + // Other types are not specified AFAICT. + }, + } + } + } + } + + // If no relations to bundle, return None + if replace_events.is_empty() && reference_events.is_empty() { + return Ok(None); + } + + let mut bundled = BundledMessageLikeRelations::new(); + + // Handle m.replace relations - find the most recent one + if !replace_events.is_empty() { + let most_recent_replacement = Self::find_most_recent_replacement(&replace_events)?; + + // Convert the replacement event to the bundled format + if let Some(replacement_pdu) = most_recent_replacement { + // According to the Matrix spec, we should include the full event as raw JSON + let replacement_json = serde_json::to_string(replacement_pdu) + .map_err(|e| err!(Database("Failed to serialize replacement event: {e}")))?; + let raw_value = serde_json::value::RawValue::from_string(replacement_json) + .map_err(|e| err!(Database("Failed to create RawValue: {e}")))?; + bundled.replace = Some(Box::new(raw_value)); + } + } + + // Handle m.reference relations - collect event IDs + if !reference_events.is_empty() { + let reference_chunk = Self::build_reference_chunk(&reference_events)?; + if !reference_chunk.is_empty() { + bundled.reference = Some(Box::new(ReferenceChunk::new(reference_chunk))); + } + } + + // TODO: Handle other relation types (m.annotation, etc.) when specified + + Ok(Some(bundled)) + } + + /// Build reference chunk for m.reference bundled aggregations + fn build_reference_chunk( + reference_events: &[&PdusIterItem], + ) -> Result> { + let mut chunk = Vec::with_capacity(reference_events.len()); + + for relation in reference_events { + let pdu = &relation.1; + + let reference_entry = BundledReference::new(pdu.event_id().to_owned()); + chunk.push(reference_entry); + } + + // Don't sort, order is unspecified + + Ok(chunk) + } + + /// Find the most recent replacement event based on origin_server_ts and + /// lexicographic event_id ordering + fn find_most_recent_replacement<'a>( + replacement_events: &'a [&'a PdusIterItem], + ) -> Result> { + if replacement_events.is_empty() { + return Ok(None); + } + + let mut most_recent: Option<&PduEvent> = None; + + // Jank, is there a better way to do this? + for relation in replacement_events { + let pdu = &relation.1; + + match most_recent { + | None => { + most_recent = Some(pdu); + }, + | Some(current_most_recent) => { + // Compare by origin_server_ts first + match pdu + .origin_server_ts() + .cmp(¤t_most_recent.origin_server_ts()) + { + | std::cmp::Ordering::Greater => { + most_recent = Some(pdu); + }, + | std::cmp::Ordering::Equal => { + // If timestamps are equal, use lexicographic ordering of event_id + if pdu.event_id() > current_most_recent.event_id() { + most_recent = Some(pdu); + } + }, + | std::cmp::Ordering::Less => { + // Keep current most recent + }, + } + }, + } + } + + Ok(most_recent) + } + + /// Adds bundled aggregations to a PDU's unsigned field + #[tracing::instrument(skip(self, pdu), level = "debug")] + pub async fn add_bundled_aggregations_to_pdu( + &self, + user_id: &UserId, + pdu: &mut PduEvent, + ) -> Result<()> { + if pdu.is_redacted() { + return Ok(()); + } + + let bundled_aggregations = self + .get_bundled_aggregations(user_id, pdu.room_id(), pdu.event_id()) + .await?; + + if let Some(aggregations) = bundled_aggregations { + let aggregations_json = serde_json::to_value(aggregations) + .map_err(|e| err!(Database("Failed to serialize bundled aggregations: {e}")))?; + + Self::add_bundled_aggregations_to_unsigned(pdu, aggregations_json)?; + } + + Ok(()) + } + + /// Helper method to add bundled aggregations to a PDU's unsigned + /// field + fn add_bundled_aggregations_to_unsigned( + pdu: &mut PduEvent, + aggregations_json: serde_json::Value, + ) -> Result<()> { + use serde_json::{ + Map, Value as JsonValue, + value::{RawValue as RawJsonValue, to_raw_value}, + }; + + let mut unsigned: Map = pdu + .unsigned + .as_deref() + .map(RawJsonValue::get) + .map_or_else(|| Ok(Map::new()), serde_json::from_str) + .map_err(|e| err!(Database("Invalid unsigned in pdu event: {e}")))?; + + let relations = unsigned + .entry("m.relations") + .or_insert_with(|| JsonValue::Object(Map::new())) + .as_object_mut() + .ok_or_else(|| err!(Database("m.relations is not an object")))?; + + if let JsonValue::Object(aggregations_map) = aggregations_json { + for (rel_type, aggregation) in aggregations_map { + relations.insert(rel_type, aggregation); + } + } + + pdu.unsigned = Some(to_raw_value(&unsigned)?); + + Ok(()) + } + + /// Validates that an event is acceptable as a replacement for another event + /// See C/S spec "Validity of replacement events" + #[tracing::instrument(level = "debug")] + async fn is_valid_replacement_event( + original_event: &PduEvent, + replacement_event: &PduEvent, + ) -> Result { + // 1. Same room_id + if original_event.room_id() != replacement_event.room_id() { + return Ok(false); + } + + // 2. Same sender + if original_event.sender() != replacement_event.sender() { + return Ok(false); + } + + // 3. Same type + if original_event.event_type() != replacement_event.event_type() { + return Ok(false); + } + + // 4. Neither event should have a state_key property + if original_event.state_key().is_some() || replacement_event.state_key().is_some() { + return Ok(false); + } + + // 5. Original event must not have rel_type of m.replace + let original_content = original_event.get_content_as_value(); + if let Some(relates_to) = original_content.get("m.relates_to") { + if let Some(rel_type) = relates_to.get("rel_type") { + if rel_type.as_str() == Some("m.replace") { + return Ok(false); + } + } + } + + // 6. Replacement event must have m.new_content property + // Skip this check for encrypted events, as m.new_content would be inside the + // encrypted payload + if replacement_event.event_type() != &ruma::events::TimelineEventType::RoomEncrypted { + let replacement_content = replacement_event.get_content_as_value(); + if replacement_content.get("m.new_content").is_none() { + return Ok(false); + } + } + + Ok(true) + } +} + +#[cfg(test)] +mod tests { + use conduwuit_core::pdu::{EventHash, PduEvent}; + use ruma::{UInt, events::TimelineEventType, owned_event_id, owned_room_id, owned_user_id}; + use serde_json::{Value as JsonValue, json, value::to_raw_value}; + + fn create_test_pdu(unsigned_content: Option) -> PduEvent { + PduEvent { + event_id: owned_event_id!("$test:example.com"), + room_id: owned_room_id!("!test:example.com"), + sender: owned_user_id!("@test:example.com"), + origin_server_ts: UInt::try_from(1_234_567_890_u64).unwrap(), + kind: TimelineEventType::RoomMessage, + content: to_raw_value(&json!({"msgtype": "m.text", "body": "test"})).unwrap(), + state_key: None, + prev_events: vec![], + depth: UInt::from(1_u32), + auth_events: vec![], + redacts: None, + unsigned: unsigned_content.map(|content| to_raw_value(&content).unwrap()), + hashes: EventHash { sha256: "test_hash".to_owned() }, + signatures: None, + origin: None, + } + } + + fn create_bundled_aggregations() -> JsonValue { + json!({ + "m.replace": { + "event_id": "$replace:example.com", + "origin_server_ts": 1_234_567_890, + "sender": "@replacer:example.com" + }, + "m.reference": { + "count": 5, + "chunk": [ + "$ref1:example.com", + "$ref2:example.com" + ] + } + }) + } + + #[test] + fn test_add_bundled_aggregations_to_unsigned_no_existing_unsigned() { + let mut pdu = create_test_pdu(None); + let aggregations = create_bundled_aggregations(); + + let result = super::super::Service::add_bundled_aggregations_to_unsigned( + &mut pdu, + aggregations.clone(), + ); + assert!(result.is_ok(), "Should succeed when no unsigned field exists"); + + assert!(pdu.unsigned.is_some(), "Unsigned field should be created"); + + let unsigned_str = pdu.unsigned.as_ref().unwrap().get(); + let unsigned: JsonValue = serde_json::from_str(unsigned_str).unwrap(); + + assert!(unsigned.get("m.relations").is_some(), "m.relations should exist"); + assert_eq!( + unsigned["m.relations"], aggregations, + "Relations should match the aggregations" + ); + } + + #[test] + fn test_add_bundled_aggregations_to_unsigned_overwrite_same_relation_type() { + let existing_unsigned = json!({ + "m.relations": { + "m.replace": { + "event_id": "$old_replace:example.com", + "origin_server_ts": 1_111_111_111, + "sender": "@old_replacer:example.com" + } + } + }); + + let mut pdu = create_test_pdu(Some(existing_unsigned)); + let new_aggregations = create_bundled_aggregations(); + + let result = super::super::Service::add_bundled_aggregations_to_unsigned( + &mut pdu, + new_aggregations.clone(), + ); + assert!(result.is_ok(), "Should succeed when overwriting same relation type"); + + let unsigned_str = pdu.unsigned.as_ref().unwrap().get(); + let unsigned: JsonValue = serde_json::from_str(unsigned_str).unwrap(); + + let relations = &unsigned["m.relations"]; + + assert_eq!( + relations["m.replace"], new_aggregations["m.replace"], + "m.replace should be updated" + ); + assert_eq!( + relations["m.replace"]["event_id"], "$replace:example.com", + "Should have new event_id" + ); + + assert!(relations.get("m.reference").is_some(), "New m.reference should be added"); + } + + #[test] + fn test_add_bundled_aggregations_to_unsigned_preserve_other_unsigned_fields() { + // Test case: Other unsigned fields should be preserved + let existing_unsigned = json!({ + "age": 98765, + "prev_content": {"msgtype": "m.text", "body": "old message"}, + "redacted_because": {"event_id": "$redaction:example.com"}, + "m.relations": { + "m.annotation": {"count": 1} + } + }); + + let mut pdu = create_test_pdu(Some(existing_unsigned)); + let new_aggregations = json!({ + "m.replace": {"event_id": "$new:example.com"} + }); + + let result = super::super::Service::add_bundled_aggregations_to_unsigned( + &mut pdu, + new_aggregations, + ); + assert!(result.is_ok(), "Should succeed while preserving other fields"); + + let unsigned_str = pdu.unsigned.as_ref().unwrap().get(); + let unsigned: JsonValue = serde_json::from_str(unsigned_str).unwrap(); + + // Verify all existing fields are preserved + assert_eq!(unsigned["age"], 98765, "age should be preserved"); + assert!(unsigned.get("prev_content").is_some(), "prev_content should be preserved"); + assert!( + unsigned.get("redacted_because").is_some(), + "redacted_because should be preserved" + ); + + // Verify relations were merged correctly + let relations = &unsigned["m.relations"]; + assert!( + relations.get("m.annotation").is_some(), + "Existing m.annotation should be preserved" + ); + assert!(relations.get("m.replace").is_some(), "New m.replace should be added"); + } + + #[test] + fn test_add_bundled_aggregations_to_unsigned_invalid_existing_unsigned() { + // Test case: Invalid JSON in existing unsigned should result in error + let mut pdu = create_test_pdu(None); + // Manually set invalid unsigned data + pdu.unsigned = Some(to_raw_value(&"invalid json").unwrap()); + + let aggregations = create_bundled_aggregations(); + let result = + super::super::Service::add_bundled_aggregations_to_unsigned(&mut pdu, aggregations); + + assert!(result.is_err(), "fails when existing unsigned is invalid"); + // Should we ignore the error and overwrite anyway? + } + + // Test helper function to create test PDU events + fn create_test_event( + event_id: &str, + room_id: &str, + sender: &str, + event_type: TimelineEventType, + content: &JsonValue, + state_key: Option<&str>, + ) -> PduEvent { + PduEvent { + event_id: event_id.try_into().unwrap(), + room_id: room_id.try_into().unwrap(), + sender: sender.try_into().unwrap(), + origin_server_ts: UInt::try_from(1_234_567_890_u64).unwrap(), + kind: event_type, + content: to_raw_value(&content).unwrap(), + state_key: state_key.map(Into::into), + prev_events: vec![], + depth: UInt::from(1_u32), + auth_events: vec![], + redacts: None, + unsigned: None, + hashes: EventHash { sha256: "test_hash".to_owned() }, + signatures: None, + origin: None, + } + } + + /// Test that a valid replacement event passes validation + #[tokio::test] + async fn test_valid_replacement_event() { + let original = create_test_event( + "$original:example.com", + "!room:example.com", + "@user:example.com", + TimelineEventType::RoomMessage, + &json!({"msgtype": "m.text", "body": "original message"}), + None, + ); + + let replacement = create_test_event( + "$replacement:example.com", + "!room:example.com", + "@user:example.com", + TimelineEventType::RoomMessage, + &json!({ + "msgtype": "m.text", + "body": "* edited message", + "m.new_content": { + "msgtype": "m.text", + "body": "edited message" + }, + "m.relates_to": { + "rel_type": "m.replace", + "event_id": "$original:example.com" + } + }), + None, + ); + + let result = + super::super::Service::is_valid_replacement_event(&original, &replacement).await; + assert!(result.is_ok(), "Validation should succeed"); + assert!(result.unwrap(), "Valid replacement event should be accepted"); + } + + /// Test replacement event with different room ID is rejected + #[tokio::test] + async fn test_replacement_event_different_room() { + let original = create_test_event( + "$original:example.com", + "!room1:example.com", + "@user:example.com", + TimelineEventType::RoomMessage, + &json!({"msgtype": "m.text", "body": "original message"}), + None, + ); + + let replacement = create_test_event( + "$replacement:example.com", + "!room2:example.com", // Different room + "@user:example.com", + TimelineEventType::RoomMessage, + &json!({ + "msgtype": "m.text", + "body": "* edited message", + "m.new_content": { + "msgtype": "m.text", + "body": "edited message" + } + }), + None, + ); + + let result = + super::super::Service::is_valid_replacement_event(&original, &replacement).await; + assert!(result.is_ok(), "Validation should succeed"); + assert!(!result.unwrap(), "Different room ID should be rejected"); + } + + /// Test replacement event with different sender is rejected + #[tokio::test] + async fn test_replacement_event_different_sender() { + let original = create_test_event( + "$original:example.com", + "!room:example.com", + "@user1:example.com", + TimelineEventType::RoomMessage, + &json!({"msgtype": "m.text", "body": "original message"}), + None, + ); + + let replacement = create_test_event( + "$replacement:example.com", + "!room:example.com", + "@user2:example.com", // Different sender + TimelineEventType::RoomMessage, + &json!({ + "msgtype": "m.text", + "body": "* edited message", + "m.new_content": { + "msgtype": "m.text", + "body": "edited message" + } + }), + None, + ); + + let result = + super::super::Service::is_valid_replacement_event(&original, &replacement).await; + assert!(result.is_ok(), "Validation should succeed"); + assert!(!result.unwrap(), "Different sender should be rejected"); + } + + /// Test replacement event with different type is rejected + #[tokio::test] + async fn test_replacement_event_different_type() { + let original = create_test_event( + "$original:example.com", + "!room:example.com", + "@user:example.com", + TimelineEventType::RoomMessage, + &json!({"msgtype": "m.text", "body": "original message"}), + None, + ); + + let replacement = create_test_event( + "$replacement:example.com", + "!room:example.com", + "@user:example.com", + TimelineEventType::RoomTopic, // Different event type + &json!({ + "topic": "new topic", + "m.new_content": { + "topic": "new topic" + } + }), + None, + ); + + let result = + super::super::Service::is_valid_replacement_event(&original, &replacement).await; + assert!(result.is_ok(), "Validation should succeed"); + assert!(!result.unwrap(), "Different event type should be rejected"); + } + + /// Test replacement event with state key is rejected + #[tokio::test] + async fn test_replacement_event_with_state_key() { + let original = create_test_event( + "$original:example.com", + "!room:example.com", + "@user:example.com", + TimelineEventType::RoomName, + &json!({"name": "room name"}), + Some(""), // Has state key + ); + + let replacement = create_test_event( + "$replacement:example.com", + "!room:example.com", + "@user:example.com", + TimelineEventType::RoomName, + &json!({ + "name": "new room name", + "m.new_content": { + "name": "new room name" + } + }), + None, + ); + + let result = + super::super::Service::is_valid_replacement_event(&original, &replacement).await; + assert!(result.is_ok(), "Validation should succeed"); + assert!(!result.unwrap(), "Event with state key should be rejected"); + } + + /// Test replacement of an event that is already a replacement is rejected + #[tokio::test] + async fn test_replacement_event_original_is_replacement() { + let original = create_test_event( + "$original:example.com", + "!room:example.com", + "@user:example.com", + TimelineEventType::RoomMessage, + &json!({ + "msgtype": "m.text", + "body": "* edited message", + "m.relates_to": { + "rel_type": "m.replace", // Original is already a replacement + "event_id": "$some_other:example.com" + } + }), + None, + ); + + let replacement = create_test_event( + "$replacement:example.com", + "!room:example.com", + "@user:example.com", + TimelineEventType::RoomMessage, + &json!({ + "msgtype": "m.text", + "body": "* edited again", + "m.new_content": { + "msgtype": "m.text", + "body": "edited again" + } + }), + None, + ); + + let result = + super::super::Service::is_valid_replacement_event(&original, &replacement).await; + assert!(result.is_ok(), "Validation should succeed"); + assert!(!result.unwrap(), "Replacement of replacement should be rejected"); + } + + /// Test replacement event missing m.new_content is rejected + #[tokio::test] + async fn test_replacement_event_missing_new_content() { + let original = create_test_event( + "$original:example.com", + "!room:example.com", + "@user:example.com", + TimelineEventType::RoomMessage, + &json!({"msgtype": "m.text", "body": "original message"}), + None, + ); + + let replacement = create_test_event( + "$replacement:example.com", + "!room:example.com", + "@user:example.com", + TimelineEventType::RoomMessage, + &json!({ + "msgtype": "m.text", + "body": "* edited message" + // Missing m.new_content + }), + None, + ); + + let result = + super::super::Service::is_valid_replacement_event(&original, &replacement).await; + assert!(result.is_ok(), "Validation should succeed"); + assert!(!result.unwrap(), "Missing m.new_content should be rejected"); + } + + /// Test encrypted replacement event without m.new_content is accepted + #[tokio::test] + async fn test_replacement_event_encrypted_missing_new_content_is_valid() { + let original = create_test_event( + "$original:example.com", + "!room:example.com", + "@user:example.com", + TimelineEventType::RoomEncrypted, + &json!({ + "algorithm": "m.megolm.v1.aes-sha2", + "ciphertext": "encrypted_payload_base64", + "sender_key": "sender_key", + "session_id": "session_id" + }), + None, + ); + + let replacement = create_test_event( + "$replacement:example.com", + "!room:example.com", + "@user:example.com", + TimelineEventType::RoomEncrypted, + &json!({ + "algorithm": "m.megolm.v1.aes-sha2", + "ciphertext": "encrypted_replacement_payload_base64", + "sender_key": "sender_key", + "session_id": "session_id", + "m.relates_to": { + "rel_type": "m.replace", + "event_id": "$original:example.com" + } + // No m.new_content in cleartext - this is valid for encrypted events + }), + None, + ); + + let result = + super::super::Service::is_valid_replacement_event(&original, &replacement).await; + assert!(result.is_ok(), "Validation should succeed"); + assert!( + result.unwrap(), + "Encrypted replacement without cleartext m.new_content should be accepted" + ); + } +} diff --git a/src/service/rooms/pdu_metadata/data.rs b/src/service/rooms/pdu_metadata/data.rs index f0beab5a..c4b37b99 100644 --- a/src/service/rooms/pdu_metadata/data.rs +++ b/src/service/rooms/pdu_metadata/data.rs @@ -3,7 +3,6 @@ use std::{mem::size_of, sync::Arc}; use conduwuit::{ PduCount, PduEvent, arrayvec::ArrayVec, - result::LogErr, utils::{ ReadyExt, stream::{TryIgnore, WidebandExt}, @@ -80,9 +79,7 @@ impl Data { let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?; - if pdu.sender != user_id { - pdu.remove_transaction_id().log_err().ok(); - } + pdu.set_unsigned(Some(user_id)); Some((shorteventid, pdu)) }) diff --git a/src/service/rooms/pdu_metadata/mod.rs b/src/service/rooms/pdu_metadata/mod.rs index 18221c2d..2dff54d8 100644 --- a/src/service/rooms/pdu_metadata/mod.rs +++ b/src/service/rooms/pdu_metadata/mod.rs @@ -1,3 +1,4 @@ +mod bundled_aggregations; mod data; use std::sync::Arc; diff --git a/src/service/rooms/search/mod.rs b/src/service/rooms/search/mod.rs index 4100dd75..7cef5dbf 100644 --- a/src/service/rooms/search/mod.rs +++ b/src/service/rooms/search/mod.rs @@ -127,7 +127,12 @@ pub async fn search_pdus<'a>( .then_some(pdu) }) .skip(query.skip) - .take(query.limit); + .take(query.limit) + .map(move |mut pdu| { + pdu.set_unsigned(query.user_id); + // TODO: bundled aggregation + pdu + }); Ok((count, pdus)) } diff --git a/src/service/rooms/threads/mod.rs b/src/service/rooms/threads/mod.rs index a680df55..a24183e6 100644 --- a/src/service/rooms/threads/mod.rs +++ b/src/service/rooms/threads/mod.rs @@ -160,9 +160,7 @@ impl Service { let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?; let pdu_id: PduId = pdu_id.into(); - if pdu.sender != user_id { - pdu.remove_transaction_id().ok(); - } + pdu.set_unsigned(Some(user_id)); Some((pdu_id.shorteventid, pdu)) }); diff --git a/src/service/rooms/timeline/data.rs b/src/service/rooms/timeline/data.rs index 94c78bb0..5f7b8c81 100644 --- a/src/service/rooms/timeline/data.rs +++ b/src/service/rooms/timeline/data.rs @@ -1,14 +1,11 @@ -use std::{borrow::Borrow, sync::Arc}; +use std::sync::Arc; use conduwuit::{ - Err, PduCount, PduEvent, Result, at, err, - result::{LogErr, NotFound}, - utils, - utils::stream::TryReadyExt, + Err, PduCount, PduEvent, Result, at, err, result::NotFound, utils, utils::stream::TryReadyExt, }; use database::{Database, Deserialized, Json, KeyVal, Map}; use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, future::select_ok, pin_mut}; -use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, UserId, api::Direction}; +use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, api::Direction}; use super::{PduId, RawPduId}; use crate::{Dep, rooms, rooms::short::ShortRoomId}; @@ -46,12 +43,8 @@ impl Data { } #[inline] - pub(super) async fn last_timeline_count( - &self, - sender_user: Option<&UserId>, - room_id: &RoomId, - ) -> Result { - let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max()); + pub(super) async fn last_timeline_count(&self, room_id: &RoomId) -> Result { + let pdus_rev = self.pdus_rev(room_id, PduCount::max()); pin_mut!(pdus_rev); let last_count = pdus_rev @@ -65,12 +58,8 @@ impl Data { } #[inline] - pub(super) async fn latest_pdu_in_room( - &self, - sender_user: Option<&UserId>, - room_id: &RoomId, - ) -> Result { - let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max()); + pub(super) async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result { + let pdus_rev = self.pdus_rev(room_id, PduCount::max()); pin_mut!(pdus_rev); pdus_rev @@ -223,7 +212,6 @@ impl Data { /// order. pub(super) fn pdus_rev<'a>( &'a self, - user_id: Option<&'a UserId>, room_id: &'a RoomId, until: PduCount, ) -> impl Stream> + Send + 'a { @@ -233,14 +221,13 @@ impl Data { self.pduid_pdu .rev_raw_stream_from(¤t) .ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix))) - .ready_and_then(move |item| Self::each_pdu(item, user_id)) + .ready_and_then(Self::from_json_slice) }) .try_flatten_stream() } pub(super) fn pdus<'a>( &'a self, - user_id: Option<&'a UserId>, room_id: &'a RoomId, from: PduCount, ) -> impl Stream> + Send + 'a { @@ -250,21 +237,15 @@ impl Data { self.pduid_pdu .raw_stream_from(¤t) .ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix))) - .ready_and_then(move |item| Self::each_pdu(item, user_id)) + .ready_and_then(Self::from_json_slice) }) .try_flatten_stream() } - fn each_pdu((pdu_id, pdu): KeyVal<'_>, user_id: Option<&UserId>) -> Result { + fn from_json_slice((pdu_id, pdu): KeyVal<'_>) -> Result { let pdu_id: RawPduId = pdu_id.into(); - let mut pdu = serde_json::from_slice::(pdu)?; - - if Some(pdu.sender.borrow()) != user_id { - pdu.remove_transaction_id().log_err().ok(); - } - - pdu.add_age().log_err().ok(); + let pdu = serde_json::from_slice::(pdu)?; Ok((pdu_id.pdu_count(), pdu)) } diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index 4b2f3cb2..f7925994 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -165,7 +165,7 @@ impl Service { #[tracing::instrument(skip(self), level = "debug")] pub async fn first_item_in_room(&self, room_id: &RoomId) -> Result<(PduCount, PduEvent)> { - let pdus = self.pdus(None, room_id, None); + let pdus = self.pdus(room_id, None); pin_mut!(pdus); pdus.try_next() @@ -175,16 +175,12 @@ impl Service { #[tracing::instrument(skip(self), level = "debug")] pub async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result { - self.db.latest_pdu_in_room(None, room_id).await + self.db.latest_pdu_in_room(room_id).await } #[tracing::instrument(skip(self), level = "debug")] - pub async fn last_timeline_count( - &self, - sender_user: Option<&UserId>, - room_id: &RoomId, - ) -> Result { - self.db.last_timeline_count(sender_user, room_id).await + pub async fn last_timeline_count(&self, room_id: &RoomId) -> Result { + self.db.last_timeline_count(room_id).await } /// Returns the `count` of this pdu's id. @@ -545,6 +541,10 @@ impl Service { | _ => {}, } + // CONCERN: If we receive events with a relation out-of-order, we never write + // their relation / thread. We need some kind of way to trigger when we receive + // this event, and potentially a way to rebuild the table entirely. + if let Ok(content) = pdu.get_content::() { if let Ok(related_pducount) = self.get_pdu_count(&content.relates_to.event_id).await { self.services @@ -698,6 +698,20 @@ impl Service { .await .saturating_add(uint!(1)); + if state_key.is_none() { + if prev_events.is_empty() { + warn!("Timeline event had zero prev_events, something broke."); + return Err!(Request(Unknown("Timeline event had zero prev_events."))); + } + if depth.le(&uint!(2)) { + warn!( + "Had unsafe depth of {depth} in {room_id} when creating non-state event. \ + Bad!" + ); + return Err!(Request(Unknown("Unsafe depth for non-state event."))); + } + }; + let mut unsigned = unsigned.unwrap_or_default(); if let Some(state_key) = &state_key { @@ -757,6 +771,7 @@ impl Service { ready(auth_events.get(&key)) }; + debug!("running auth check on new {} event by {} in {}", pdu.kind, pdu.sender, pdu.room_id); let auth_check = state_res::auth_check( &room_version, &pdu, @@ -996,34 +1011,30 @@ impl Service { #[inline] pub fn all_pdus<'a>( &'a self, - user_id: &'a UserId, room_id: &'a RoomId, ) -> impl Stream + Send + 'a { - self.pdus(Some(user_id), room_id, None).ignore_err() + self.pdus(room_id, None).ignore_err() } /// Reverse iteration starting at from. #[tracing::instrument(skip(self), level = "debug")] pub fn pdus_rev<'a>( &'a self, - user_id: Option<&'a UserId>, room_id: &'a RoomId, until: Option, ) -> impl Stream> + Send + 'a { self.db - .pdus_rev(user_id, room_id, until.unwrap_or_else(PduCount::max)) + .pdus_rev(room_id, until.unwrap_or_else(PduCount::max)) } /// Forward iteration starting at from. #[tracing::instrument(skip(self), level = "debug")] pub fn pdus<'a>( &'a self, - user_id: Option<&'a UserId>, room_id: &'a RoomId, from: Option, ) -> impl Stream> + Send + 'a { - self.db - .pdus(user_id, room_id, from.unwrap_or_else(PduCount::min)) + self.db.pdus(room_id, from.unwrap_or_else(PduCount::min)) } /// Replace a PDU with the redacted form. @@ -1142,7 +1153,7 @@ impl Service { .boxed(); while let Some(ref backfill_server) = servers.next().await { - info!("Asking {backfill_server} for backfill"); + info!("Asking {backfill_server} for backfill in {:?}", room_id.to_owned()); let response = self .services .sending @@ -1170,7 +1181,7 @@ impl Service { } } - info!("No servers could backfill, but backfill was needed in room {room_id}"); + warn!("No servers could backfill, but backfill was needed in room {room_id}"); Ok(()) } diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index 08ca7010..ce687551 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -401,16 +401,10 @@ impl Service { fn num_senders(args: &crate::Args<'_>) -> usize { const MIN_SENDERS: usize = 1; - // Limit the number of senders to the number of workers threads or number of - // cores, conservatively. - let max_senders = args - .server - .metrics - .num_workers() - .min(available_parallelism()); + // Limit the maximum number of senders to the number of cores. + let max_senders = available_parallelism(); - // If the user doesn't override the default 0, this is intended to then default - // to 1 for now as multiple senders is experimental. + // default is 4 senders. clamp between 1 and core count. args.server .config .sender_workers diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index fab02f6b..cd84f7e7 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -781,7 +781,7 @@ impl Service { for pdu in pdus { // Redacted events are not notification targets (we don't send push for them) - if pdu.contains_unsigned_property("redacted_because", serde_json::Value::is_string) { + if pdu.is_redacted() { continue; }