diff --git a/flake.lock b/flake.lock index f1859b57..4c2bf9fb 100644 --- a/flake.lock +++ b/flake.lock @@ -513,6 +513,23 @@ "type": "github" } }, + "rocksdb": { + "flake": false, + "locked": { + "lastModified": 1753385396, + "narHash": "sha256-/Hvy1yTH/0D5aa7bc+/uqFugCQq4InTdwlRw88vA5IY=", + "ref": "10.4.fb", + "rev": "28d4b7276c16ed3e28af1bd96162d6442ce25923", + "revCount": 13318, + "type": "git", + "url": "https://forgejo.ellis.link/continuwuation/rocksdb" + }, + "original": { + "ref": "10.4.fb", + "type": "git", + "url": "https://forgejo.ellis.link/continuwuation/rocksdb" + } + }, "root": { "inputs": { "attic": "attic", @@ -522,7 +539,8 @@ "flake-compat": "flake-compat_3", "flake-utils": "flake-utils", "nix-filter": "nix-filter", - "nixpkgs": "nixpkgs_5" + "nixpkgs": "nixpkgs_5", + "rocksdb": "rocksdb" } }, "rust-analyzer-src": { diff --git a/flake.nix b/flake.nix index b8ac6029..e65fcbda 100644 --- a/flake.nix +++ b/flake.nix @@ -16,6 +16,10 @@ flake-utils.url = "github:numtide/flake-utils?ref=main"; nix-filter.url = "github:numtide/nix-filter?ref=main"; nixpkgs.url = "github:NixOS/nixpkgs?ref=nixpkgs-unstable"; + rocksdb = { + url = "git+https://forgejo.ellis.link/continuwuation/rocksdb?ref=10.4.fb"; + flake = false; + }; }; outputs = @@ -61,13 +65,7 @@ inherit (self) liburing; }).overrideAttrs (old: { - src = pkgsHost.fetchFromGitea { - domain = "forgejo.ellis.link"; - owner = "continuwuation"; - repo = "rocksdb"; - rev = "10.4.fb"; - sha256 = "sha256-/Hvy1yTH/0D5aa7bc+/uqFugCQq4InTdwlRw88vA5IY="; - }; + src = inputs.rocksdb; version = "v10.4.fb"; cmakeFlags = pkgs.lib.subtractLists [ diff --git a/src/admin/debug/commands.rs b/src/admin/debug/commands.rs index 64f68330..81b0e9da 100644 --- a/src/admin/debug/commands.rs +++ b/src/admin/debug/commands.rs @@ -281,8 +281,15 @@ pub(super) async fn get_remote_pdu( vec![(event_id, value, room_id)] }; + info!("Attempting to handle event ID {event_id} as backfilled PDU"); + self.services + .rooms + .timeline + .backfill_pdu(&server, response.pdu) + .await?; + let text = serde_json::to_string_pretty(&json)?; - let msg = "Got PDU from specified server:"; + let msg = "Got PDU from specified server and handled as backfilled"; write!(self, "{msg}. Event body:\n```json\n{text}\n```") }, } diff --git a/src/admin/query/room_timeline.rs b/src/admin/query/room_timeline.rs index 0f72b58c..afcfec34 100644 --- a/src/admin/query/room_timeline.rs +++ b/src/admin/query/room_timeline.rs @@ -57,5 +57,5 @@ pub(super) async fn pdus( .try_collect() .await?; - self.write_str(&format!("```\n{result:#?}\n```")).await + self.write_str(&format!("{result:#?}")).await } diff --git a/src/api/client/message.rs b/src/api/client/message.rs index 0145b7fe..f5a951f4 100644 --- a/src/api/client/message.rs +++ b/src/api/client/message.rs @@ -35,7 +35,7 @@ use ruma::{ }; use tracing::warn; -use super::utils::{count_to_pagination_token, pagination_token_to_count as parse_token}; +use super::utils::{count_to_token, parse_pagination_token as parse_token}; use crate::Ruma; /// list of safe and common non-state events to ignore if the user is ignored @@ -181,8 +181,8 @@ pub(crate) async fn get_message_events_route( .collect(); Ok(get_message_events::v3::Response { - start: count_to_pagination_token(from), - end: next_token.map(count_to_pagination_token), + start: count_to_token(from), + end: next_token.map(count_to_token), chunk, state, }) diff --git a/src/api/client/relations.rs b/src/api/client/relations.rs index f2ec3f23..f6d8fe9e 100644 --- a/src/api/client/relations.rs +++ b/src/api/client/relations.rs @@ -18,7 +18,7 @@ use ruma::{ events::{TimelineEventType, relation::RelationType}, }; -use super::utils::{count_to_pagination_token, pagination_token_to_count as parse_token}; +use super::utils::{count_to_token, parse_pagination_token as parse_token}; use crate::Ruma; /// # `GET /_matrix/client/r0/rooms/{roomId}/relations/{eventId}/{relType}/{eventType}` @@ -193,7 +193,7 @@ async fn paginate_relations_with_filter( | Direction::Forward => events.last(), | Direction::Backward => events.first(), } - .map(|(count, _)| count_to_pagination_token(*count)) + .map(|(count, _)| count_to_token(*count)) } else { None }; diff --git a/src/api/client/room/event.rs b/src/api/client/room/event.rs index 75ae0758..47228d67 100644 --- a/src/api/client/room/event.rs +++ b/src/api/client/room/event.rs @@ -18,7 +18,7 @@ pub(crate) async fn get_room_event_route( let event = services .rooms .timeline - .get_remote_pdu(room_id, event_id) + .get_pdu(event_id) .map_err(|_| err!(Request(NotFound("Event {} not found.", event_id)))); let visible = services @@ -33,6 +33,11 @@ pub(crate) async fn get_room_event_route( return Err!(Request(Forbidden("You don't have permission to view this event."))); } + debug_assert!( + event.event_id() == event_id && event.room_id() == room_id, + "Fetched PDU must match requested" + ); + event.add_age().ok(); Ok(get_room_event::v3::Response { event: event.into_format() }) diff --git a/src/api/client/threads.rs b/src/api/client/threads.rs index f0fb4a64..ca176eda 100644 --- a/src/api/client/threads.rs +++ b/src/api/client/threads.rs @@ -9,7 +9,7 @@ use conduwuit::{ use futures::StreamExt; use ruma::{api::client::threads::get_threads, uint}; -use crate::{Ruma, client::utils::pagination_token_to_count}; +use crate::Ruma; /// # `GET /_matrix/client/r0/rooms/{roomId}/threads` pub(crate) async fn get_threads_route( @@ -27,7 +27,7 @@ pub(crate) async fn get_threads_route( let from: PduCount = body .from .as_deref() - .map(pagination_token_to_count) + .map(str::parse) .transpose()? .unwrap_or_else(PduCount::max); diff --git a/src/api/client/utils.rs b/src/api/client/utils.rs index ec69388a..cc941b95 100644 --- a/src/api/client/utils.rs +++ b/src/api/client/utils.rs @@ -1,7 +1,28 @@ -use conduwuit::{Result, matrix::pdu::PduCount}; +use conduwuit::{ + Result, err, + matrix::pdu::{PduCount, ShortEventId}, +}; -/// Parse a pagination token -pub(crate) fn pagination_token_to_count(token: &str) -> Result { token.parse() } +/// Parse a pagination token, trying ShortEventId first, then falling back to +/// PduCount +pub(crate) fn parse_pagination_token(token: &str) -> Result { + // Try parsing as ShortEventId first + if let Ok(shorteventid) = token.parse::() { + // ShortEventId maps directly to a PduCount in our database + Ok(PduCount::Normal(shorteventid)) + } else if let Ok(count) = token.parse::() { + // Fallback to PduCount for backwards compatibility + Ok(PduCount::Normal(count)) + } else if let Ok(count) = token.parse::() { + // Also handle negative counts for backfilled events + Ok(PduCount::from_signed(count)) + } else { + Err(err!(Request(InvalidParam("Invalid pagination token")))) + } +} -/// Convert a PduCount to a token string -pub(crate) fn count_to_pagination_token(count: PduCount) -> String { count.to_string() } +/// Convert a PduCount to a token string (using the underlying ShortEventId) +pub(crate) fn count_to_token(count: PduCount) -> String { + // The PduCount's unsigned value IS the ShortEventId + count.into_unsigned().to_string() +} diff --git a/src/core/matrix/pdu/count.rs b/src/core/matrix/pdu/count.rs index 7fb12574..b880278f 100644 --- a/src/core/matrix/pdu/count.rs +++ b/src/core/matrix/pdu/count.rs @@ -1,10 +1,6 @@ #![allow(clippy::cast_possible_wrap, clippy::cast_sign_loss, clippy::as_conversions)] -use std::{ - cmp::Ordering, - fmt::{self, Display}, - str::FromStr, -}; +use std::{cmp::Ordering, fmt, fmt::Display, str::FromStr}; use ruma::api::Direction; diff --git a/src/service/rooms/timeline/backfill.rs b/src/service/rooms/timeline/backfill.rs index c0171691..d91075e1 100644 --- a/src/service/rooms/timeline/backfill.rs +++ b/src/service/rooms/timeline/backfill.rs @@ -1,24 +1,21 @@ use std::iter::once; -use conduwuit::{Err, PduEvent}; use conduwuit_core::{ - Result, debug, debug_warn, err, implement, info, + Result, debug, implement, info, matrix::{ event::Event, pdu::{PduCount, PduId, RawPduId}, }, utils::{IterStream, ReadyExt}, - validated, warn, + validated, }; use futures::{FutureExt, StreamExt}; use ruma::{ - CanonicalJsonObject, EventId, RoomId, ServerName, + RoomId, ServerName, api::federation, events::{ StateEventType, TimelineEventType, room::power_levels::RoomPowerLevelsEventContent, - }, - uint, -}; + }, UInt}; use serde_json::value::RawValue as RawJsonValue; use super::ExtractBody; @@ -101,7 +98,7 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re .boxed(); while let Some(ref backfill_server) = servers.next().await { - info!("Asking {backfill_server} for backfill in {room_id}"); + info!("Asking {backfill_server} for backfill of room {room_id}"); let response = self .services .sending @@ -110,7 +107,7 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re federation::backfill::get_backfill::v1::Request { room_id: room_id.to_owned(), v: vec![first_pdu.1.event_id().to_owned()], - limit: uint!(100), + limit: UInt::from(self.services.server.config.max_fetch_prev_events), }, ) .await; @@ -118,137 +115,21 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re | Ok(response) => { for pdu in response.pdus { if let Err(e) = self.backfill_pdu(backfill_server, pdu).boxed().await { - debug_warn!("Failed to add backfilled pdu in room {room_id}: {e}"); + info!("Failed to add backfilled pdu in room {room_id}: {e}"); } } return Ok(()); }, | Err(e) => { - warn!("{backfill_server} failed to provide backfill for room {room_id}: {e}"); + info!("{backfill_server} failed to provide backfill for room {room_id}: {e}"); }, } } - warn!("No servers could backfill, but backfill was needed in room {room_id}"); + info!("No servers could backfill, but backfill was needed in room {room_id}"); Ok(()) } -#[implement(super::Service)] -#[tracing::instrument(name = "get_remote_pdu", level = "debug", skip(self))] -pub async fn get_remote_pdu(&self, room_id: &RoomId, event_id: &EventId) -> Result { - let local = self.get_pdu(event_id).await; - if local.is_ok() { - // We already have this PDU, no need to backfill - debug!("We already have {event_id} in {room_id}, no need to backfill."); - return local; - } - debug!("Preparing to fetch event {event_id} in room {room_id} from remote servers."); - // Similar to backfill_if_required, but only for a single PDU - // Fetch a list of servers to try - if self - .services - .state_cache - .room_joined_count(room_id) - .await - .is_ok_and(|count| count <= 1) - && !self - .services - .state_accessor - .is_world_readable(room_id) - .await - { - // Room is empty (1 user or none), there is no one that can backfill - return Err!(Request(NotFound("No one can backfill this PDU, room is empty."))); - } - - let power_levels: RoomPowerLevelsEventContent = self - .services - .state_accessor - .room_state_get_content(room_id, &StateEventType::RoomPowerLevels, "") - .await - .unwrap_or_default(); - - let room_mods = power_levels.users.iter().filter_map(|(user_id, level)| { - if level > &power_levels.users_default && !self.services.globals.user_is_local(user_id) { - Some(user_id.server_name()) - } else { - None - } - }); - - let canonical_room_alias_server = once( - self.services - .state_accessor - .get_canonical_alias(room_id) - .await, - ) - .filter_map(Result::ok) - .map(|alias| alias.server_name().to_owned()) - .stream(); - let mut servers = room_mods - .stream() - .map(ToOwned::to_owned) - .chain(canonical_room_alias_server) - .chain( - self.services - .server - .config - .trusted_servers - .iter() - .map(ToOwned::to_owned) - .stream(), - ) - .ready_filter(|server_name| !self.services.globals.server_is_ours(server_name)) - .filter_map(|server_name| async move { - self.services - .state_cache - .server_in_room(&server_name, room_id) - .await - .then_some(server_name) - }) - .boxed(); - - while let Some(ref backfill_server) = servers.next().await { - info!("Asking {backfill_server} for event {}", event_id); - let value = self - .services - .sending - .send_federation_request(backfill_server, federation::event::get_event::v1::Request { - event_id: event_id.to_owned(), - include_unredacted_content: Some(false), - }) - .await - .and_then(|response| { - serde_json::from_str::(response.pdu.get()).map_err(|e| { - err!(BadServerResponse(debug_warn!( - "Error parsing incoming event {e:?} from {backfill_server}" - ))) - }) - }); - let pdu = match value { - | Ok(value) => { - self.services - .event_handler - .handle_incoming_pdu(backfill_server, &room_id, &event_id, value, false) - .boxed() - .await?; - debug!("Successfully backfilled {event_id} from {backfill_server}"); - Some(self.get_pdu(event_id).await) - }, - | Err(e) => { - warn!("{backfill_server} failed to provide backfill for room {room_id}: {e}"); - None - }, - }; - if let Some(pdu) = pdu { - debug!("Fetched {event_id} from {backfill_server}"); - return pdu; - } - } - - Err!("No servers could be used to fetch {} in {}.", room_id, event_id) -} - #[implement(super::Service)] #[tracing::instrument(skip(self, pdu), level = "debug")] pub async fn backfill_pdu(&self, origin: &ServerName, pdu: Box) -> Result<()> { diff --git a/src/service/rooms/timeline/data.rs b/src/service/rooms/timeline/data.rs index 9064df6c..fa10a5c0 100644 --- a/src/service/rooms/timeline/data.rs +++ b/src/service/rooms/timeline/data.rs @@ -3,7 +3,8 @@ use std::{borrow::Borrow, sync::Arc}; use conduwuit::{ Err, PduCount, PduEvent, Result, at, err, result::{LogErr, NotFound}, - utils::{self, stream::TryReadyExt}, + utils, + utils::stream::TryReadyExt, }; use database::{Database, Deserialized, Json, KeyVal, Map}; use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, future::select_ok, pin_mut};