From 969d7cbb66bb56709e325f9cdcf8d9a4af11269c Mon Sep 17 00:00:00 2001 From: aviac Date: Tue, 26 Aug 2025 16:00:06 +0200 Subject: [PATCH 1/6] feat(nix): remove rocksdb from flake.nix inputs Consuming this flake is pretty annoying since the rocksdb input is fetched on every build which takes ~ 10 - 20 sec. By removing it and replacing it with a `pkgs.fetchFromGitea`, we create an intermediate derivation which is better for caching reasons. --- flake.lock | 20 +------------------- flake.nix | 12 +++++++----- 2 files changed, 8 insertions(+), 24 deletions(-) diff --git a/flake.lock b/flake.lock index 4c2bf9fb..f1859b57 100644 --- a/flake.lock +++ b/flake.lock @@ -513,23 +513,6 @@ "type": "github" } }, - "rocksdb": { - "flake": false, - "locked": { - "lastModified": 1753385396, - "narHash": "sha256-/Hvy1yTH/0D5aa7bc+/uqFugCQq4InTdwlRw88vA5IY=", - "ref": "10.4.fb", - "rev": "28d4b7276c16ed3e28af1bd96162d6442ce25923", - "revCount": 13318, - "type": "git", - "url": "https://forgejo.ellis.link/continuwuation/rocksdb" - }, - "original": { - "ref": "10.4.fb", - "type": "git", - "url": "https://forgejo.ellis.link/continuwuation/rocksdb" - } - }, "root": { "inputs": { "attic": "attic", @@ -539,8 +522,7 @@ "flake-compat": "flake-compat_3", "flake-utils": "flake-utils", "nix-filter": "nix-filter", - "nixpkgs": "nixpkgs_5", - "rocksdb": "rocksdb" + "nixpkgs": "nixpkgs_5" } }, "rust-analyzer-src": { diff --git a/flake.nix b/flake.nix index e65fcbda..b8ac6029 100644 --- a/flake.nix +++ b/flake.nix @@ -16,10 +16,6 @@ flake-utils.url = "github:numtide/flake-utils?ref=main"; nix-filter.url = "github:numtide/nix-filter?ref=main"; nixpkgs.url = "github:NixOS/nixpkgs?ref=nixpkgs-unstable"; - rocksdb = { - url = "git+https://forgejo.ellis.link/continuwuation/rocksdb?ref=10.4.fb"; - flake = false; - }; }; outputs = @@ -65,7 +61,13 @@ inherit (self) liburing; }).overrideAttrs (old: { - src = inputs.rocksdb; + src = pkgsHost.fetchFromGitea { + domain = "forgejo.ellis.link"; + owner = "continuwuation"; + repo = "rocksdb"; + rev = "10.4.fb"; + sha256 = "sha256-/Hvy1yTH/0D5aa7bc+/uqFugCQq4InTdwlRw88vA5IY="; + }; version = "v10.4.fb"; cmakeFlags = pkgs.lib.subtractLists [ From 9bff1f6c06bb0d2c37926f95a6f88a6c1d19a8fa Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Tue, 8 Jul 2025 17:07:50 +0100 Subject: [PATCH 2/6] feat: Ask remote servers for individual unknown events --- src/api/client/room/event.rs | 7 +- src/service/rooms/timeline/backfill.rs | 114 ++++++++++++++++++++++++- 2 files changed, 112 insertions(+), 9 deletions(-) diff --git a/src/api/client/room/event.rs b/src/api/client/room/event.rs index 47228d67..75ae0758 100644 --- a/src/api/client/room/event.rs +++ b/src/api/client/room/event.rs @@ -18,7 +18,7 @@ pub(crate) async fn get_room_event_route( let event = services .rooms .timeline - .get_pdu(event_id) + .get_remote_pdu(room_id, event_id) .map_err(|_| err!(Request(NotFound("Event {} not found.", event_id)))); let visible = services @@ -33,11 +33,6 @@ pub(crate) async fn get_room_event_route( return Err!(Request(Forbidden("You don't have permission to view this event."))); } - debug_assert!( - event.event_id() == event_id && event.room_id() == room_id, - "Fetched PDU must match requested" - ); - event.add_age().ok(); Ok(get_room_event::v3::Response { event: event.into_format() }) diff --git a/src/service/rooms/timeline/backfill.rs b/src/service/rooms/timeline/backfill.rs index e976981e..dd613afd 100644 --- a/src/service/rooms/timeline/backfill.rs +++ b/src/service/rooms/timeline/backfill.rs @@ -1,5 +1,6 @@ use std::iter::once; +use conduwuit::{Err, PduEvent}; use conduwuit_core::{ Result, debug, debug_warn, implement, info, matrix::{ @@ -11,7 +12,7 @@ use conduwuit_core::{ }; use futures::{FutureExt, StreamExt}; use ruma::{ - RoomId, ServerName, + EventId, RoomId, ServerName, api::federation, events::{ StateEventType, TimelineEventType, room::power_levels::RoomPowerLevelsEventContent, @@ -100,7 +101,7 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re .boxed(); while let Some(ref backfill_server) = servers.next().await { - info!("Asking {backfill_server} for backfill"); + info!("Asking {backfill_server} for backfill in {room_id}"); let response = self .services .sending @@ -128,10 +129,117 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re } } - info!("No servers could backfill, but backfill was needed in room {room_id}"); + warn!("No servers could backfill, but backfill was needed in room {room_id}"); Ok(()) } +#[implement(super::Service)] +#[tracing::instrument(name = "get_remote_pdu", level = "debug", skip(self))] +pub async fn get_remote_pdu(&self, room_id: &RoomId, event_id: &EventId) -> Result { + let local = self.get_pdu(event_id).await; + if local.is_ok() { + // We already have this PDU, no need to backfill + debug!("We already have {event_id} in {room_id}, no need to backfill."); + return local; + } + debug!("Preparing to fetch event {event_id} in room {room_id} from remote servers."); + // Similar to backfill_if_required, but only for a single PDU + // Fetch a list of servers to try + if self + .services + .state_cache + .room_joined_count(room_id) + .await + .is_ok_and(|count| count <= 1) + && !self + .services + .state_accessor + .is_world_readable(room_id) + .await + { + // Room is empty (1 user or none), there is no one that can backfill + return Err!(Request(NotFound("No one can backfill this PDU, room is empty."))); + } + + let power_levels: RoomPowerLevelsEventContent = self + .services + .state_accessor + .room_state_get_content(room_id, &StateEventType::RoomPowerLevels, "") + .await + .unwrap_or_default(); + + let room_mods = power_levels.users.iter().filter_map(|(user_id, level)| { + if level > &power_levels.users_default && !self.services.globals.user_is_local(user_id) { + Some(user_id.server_name()) + } else { + None + } + }); + + let canonical_room_alias_server = once( + self.services + .state_accessor + .get_canonical_alias(room_id) + .await, + ) + .filter_map(Result::ok) + .map(|alias| alias.server_name().to_owned()) + .stream(); + let mut servers = room_mods + .stream() + .map(ToOwned::to_owned) + .chain(canonical_room_alias_server) + .chain( + self.services + .server + .config + .trusted_servers + .iter() + .map(ToOwned::to_owned) + .stream(), + ) + .ready_filter(|server_name| !self.services.globals.server_is_ours(server_name)) + .filter_map(|server_name| async move { + self.services + .state_cache + .server_in_room(&server_name, room_id) + .await + .then_some(server_name) + }) + .boxed(); + + while let Some(ref backfill_server) = servers.next().await { + info!("Asking {backfill_server} for event {}", event_id); + let response = self + .services + .sending + .send_federation_request(backfill_server, federation::event::get_event::v1::Request { + event_id: event_id.to_owned(), + include_unredacted_content: Some(false), + }) + .await; + let pdu = match response { + | Ok(response) => { + self.backfill_pdu(backfill_server, response.pdu) + .boxed() + .await?; + debug!("Successfully backfilled {event_id} from {backfill_server}"); + Some(self.get_pdu(event_id).await) + }, + | Err(e) => { + warn!("{backfill_server} failed to provide backfill for room {room_id}: {e}"); + None + }, + }; + if let Some(pdu) = pdu { + debug!("Fetched {event_id} from {backfill_server}"); + return pdu; + } + } + + Err!("No servers could be used to fetch {} in {}.", room_id, event_id) +} + #[implement(super::Service)] #[tracing::instrument(skip(self, pdu), level = "debug")] pub async fn backfill_pdu(&self, origin: &ServerName, pdu: Box) -> Result<()> { From 1ad26798a0b663d32967586a0965e71582fb8d94 Mon Sep 17 00:00:00 2001 From: Ginger Date: Tue, 2 Sep 2025 15:12:03 -0400 Subject: [PATCH 3/6] fix: Use `handle_incoming_pdu` directly to keep remote PDUs as outliers --- src/service/rooms/timeline/backfill.rs | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/src/service/rooms/timeline/backfill.rs b/src/service/rooms/timeline/backfill.rs index dd613afd..c0171691 100644 --- a/src/service/rooms/timeline/backfill.rs +++ b/src/service/rooms/timeline/backfill.rs @@ -2,7 +2,7 @@ use std::iter::once; use conduwuit::{Err, PduEvent}; use conduwuit_core::{ - Result, debug, debug_warn, implement, info, + Result, debug, debug_warn, err, implement, info, matrix::{ event::Event, pdu::{PduCount, PduId, RawPduId}, @@ -12,7 +12,7 @@ use conduwuit_core::{ }; use futures::{FutureExt, StreamExt}; use ruma::{ - EventId, RoomId, ServerName, + CanonicalJsonObject, EventId, RoomId, ServerName, api::federation, events::{ StateEventType, TimelineEventType, room::power_levels::RoomPowerLevelsEventContent, @@ -210,17 +210,26 @@ pub async fn get_remote_pdu(&self, room_id: &RoomId, event_id: &EventId) -> Resu while let Some(ref backfill_server) = servers.next().await { info!("Asking {backfill_server} for event {}", event_id); - let response = self + let value = self .services .sending .send_federation_request(backfill_server, federation::event::get_event::v1::Request { event_id: event_id.to_owned(), include_unredacted_content: Some(false), }) - .await; - let pdu = match response { - | Ok(response) => { - self.backfill_pdu(backfill_server, response.pdu) + .await + .and_then(|response| { + serde_json::from_str::(response.pdu.get()).map_err(|e| { + err!(BadServerResponse(debug_warn!( + "Error parsing incoming event {e:?} from {backfill_server}" + ))) + }) + }); + let pdu = match value { + | Ok(value) => { + self.services + .event_handler + .handle_incoming_pdu(backfill_server, &room_id, &event_id, value, false) .boxed() .await?; debug!("Successfully backfilled {event_id} from {backfill_server}"); From ac552b8829fd8f67dc42579ee3b4a01f309e015f Mon Sep 17 00:00:00 2001 From: Ginger Date: Thu, 4 Sep 2025 10:32:35 -0400 Subject: [PATCH 4/6] fix: Put the output of `!admin query room-timeline pdus` in a codeblock --- src/admin/query/room_timeline.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/admin/query/room_timeline.rs b/src/admin/query/room_timeline.rs index afcfec34..0f72b58c 100644 --- a/src/admin/query/room_timeline.rs +++ b/src/admin/query/room_timeline.rs @@ -57,5 +57,5 @@ pub(super) async fn pdus( .try_collect() .await?; - self.write_str(&format!("{result:#?}")).await + self.write_str(&format!("```\n{result:#?}\n```")).await } From 11c6b3ea0ae449b7a48220c0f9bbc91545dd8ad5 Mon Sep 17 00:00:00 2001 From: Ginger Date: Thu, 4 Sep 2025 10:33:43 -0400 Subject: [PATCH 5/6] fix: Fix pagination tokens being corrupted for backfilled PDUs --- src/api/client/message.rs | 6 +++--- src/api/client/relations.rs | 4 ++-- src/api/client/threads.rs | 4 ++-- src/api/client/utils.rs | 31 +++++------------------------- src/core/matrix/pdu/count.rs | 6 +++++- src/service/rooms/timeline/data.rs | 3 +-- 6 files changed, 18 insertions(+), 36 deletions(-) diff --git a/src/api/client/message.rs b/src/api/client/message.rs index f5a951f4..0145b7fe 100644 --- a/src/api/client/message.rs +++ b/src/api/client/message.rs @@ -35,7 +35,7 @@ use ruma::{ }; use tracing::warn; -use super::utils::{count_to_token, parse_pagination_token as parse_token}; +use super::utils::{count_to_pagination_token, pagination_token_to_count as parse_token}; use crate::Ruma; /// list of safe and common non-state events to ignore if the user is ignored @@ -181,8 +181,8 @@ pub(crate) async fn get_message_events_route( .collect(); Ok(get_message_events::v3::Response { - start: count_to_token(from), - end: next_token.map(count_to_token), + start: count_to_pagination_token(from), + end: next_token.map(count_to_pagination_token), chunk, state, }) diff --git a/src/api/client/relations.rs b/src/api/client/relations.rs index f6d8fe9e..f2ec3f23 100644 --- a/src/api/client/relations.rs +++ b/src/api/client/relations.rs @@ -18,7 +18,7 @@ use ruma::{ events::{TimelineEventType, relation::RelationType}, }; -use super::utils::{count_to_token, parse_pagination_token as parse_token}; +use super::utils::{count_to_pagination_token, pagination_token_to_count as parse_token}; use crate::Ruma; /// # `GET /_matrix/client/r0/rooms/{roomId}/relations/{eventId}/{relType}/{eventType}` @@ -193,7 +193,7 @@ async fn paginate_relations_with_filter( | Direction::Forward => events.last(), | Direction::Backward => events.first(), } - .map(|(count, _)| count_to_token(*count)) + .map(|(count, _)| count_to_pagination_token(*count)) } else { None }; diff --git a/src/api/client/threads.rs b/src/api/client/threads.rs index ca176eda..f0fb4a64 100644 --- a/src/api/client/threads.rs +++ b/src/api/client/threads.rs @@ -9,7 +9,7 @@ use conduwuit::{ use futures::StreamExt; use ruma::{api::client::threads::get_threads, uint}; -use crate::Ruma; +use crate::{Ruma, client::utils::pagination_token_to_count}; /// # `GET /_matrix/client/r0/rooms/{roomId}/threads` pub(crate) async fn get_threads_route( @@ -27,7 +27,7 @@ pub(crate) async fn get_threads_route( let from: PduCount = body .from .as_deref() - .map(str::parse) + .map(pagination_token_to_count) .transpose()? .unwrap_or_else(PduCount::max); diff --git a/src/api/client/utils.rs b/src/api/client/utils.rs index cc941b95..ec69388a 100644 --- a/src/api/client/utils.rs +++ b/src/api/client/utils.rs @@ -1,28 +1,7 @@ -use conduwuit::{ - Result, err, - matrix::pdu::{PduCount, ShortEventId}, -}; +use conduwuit::{Result, matrix::pdu::PduCount}; -/// Parse a pagination token, trying ShortEventId first, then falling back to -/// PduCount -pub(crate) fn parse_pagination_token(token: &str) -> Result { - // Try parsing as ShortEventId first - if let Ok(shorteventid) = token.parse::() { - // ShortEventId maps directly to a PduCount in our database - Ok(PduCount::Normal(shorteventid)) - } else if let Ok(count) = token.parse::() { - // Fallback to PduCount for backwards compatibility - Ok(PduCount::Normal(count)) - } else if let Ok(count) = token.parse::() { - // Also handle negative counts for backfilled events - Ok(PduCount::from_signed(count)) - } else { - Err(err!(Request(InvalidParam("Invalid pagination token")))) - } -} +/// Parse a pagination token +pub(crate) fn pagination_token_to_count(token: &str) -> Result { token.parse() } -/// Convert a PduCount to a token string (using the underlying ShortEventId) -pub(crate) fn count_to_token(count: PduCount) -> String { - // The PduCount's unsigned value IS the ShortEventId - count.into_unsigned().to_string() -} +/// Convert a PduCount to a token string +pub(crate) fn count_to_pagination_token(count: PduCount) -> String { count.to_string() } diff --git a/src/core/matrix/pdu/count.rs b/src/core/matrix/pdu/count.rs index b880278f..7fb12574 100644 --- a/src/core/matrix/pdu/count.rs +++ b/src/core/matrix/pdu/count.rs @@ -1,6 +1,10 @@ #![allow(clippy::cast_possible_wrap, clippy::cast_sign_loss, clippy::as_conversions)] -use std::{cmp::Ordering, fmt, fmt::Display, str::FromStr}; +use std::{ + cmp::Ordering, + fmt::{self, Display}, + str::FromStr, +}; use ruma::api::Direction; diff --git a/src/service/rooms/timeline/data.rs b/src/service/rooms/timeline/data.rs index fa10a5c0..9064df6c 100644 --- a/src/service/rooms/timeline/data.rs +++ b/src/service/rooms/timeline/data.rs @@ -3,8 +3,7 @@ use std::{borrow::Borrow, sync::Arc}; use conduwuit::{ Err, PduCount, PduEvent, Result, at, err, result::{LogErr, NotFound}, - utils, - utils::stream::TryReadyExt, + utils::{self, stream::TryReadyExt}, }; use database::{Database, Deserialized, Json, KeyVal, Map}; use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, future::select_ok, pin_mut}; From 8e7ee1e2100852ec9570616b3321a14151513630 Mon Sep 17 00:00:00 2001 From: Ginger Date: Thu, 4 Sep 2025 10:46:08 -0400 Subject: [PATCH 6/6] feat: Do not persist remote PDUs fetched with admin commands --- src/admin/debug/commands.rs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/admin/debug/commands.rs b/src/admin/debug/commands.rs index 81b0e9da..64f68330 100644 --- a/src/admin/debug/commands.rs +++ b/src/admin/debug/commands.rs @@ -281,15 +281,8 @@ pub(super) async fn get_remote_pdu( vec![(event_id, value, room_id)] }; - info!("Attempting to handle event ID {event_id} as backfilled PDU"); - self.services - .rooms - .timeline - .backfill_pdu(&server, response.pdu) - .await?; - let text = serde_json::to_string_pretty(&json)?; - let msg = "Got PDU from specified server and handled as backfilled"; + let msg = "Got PDU from specified server:"; write!(self, "{msg}. Event body:\n```json\n{text}\n```") }, }