diff --git a/.vscode/settings.json b/.vscode/settings.json index 82162ff7..a4fad964 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -7,6 +7,5 @@ "continuwuity", "homeserver", "homeservers" - ], - "rust-analyzer.cargo.features": ["full"] + ] } diff --git a/src/admin/debug/commands.rs b/src/admin/debug/commands.rs index 64f68330..81b0e9da 100644 --- a/src/admin/debug/commands.rs +++ b/src/admin/debug/commands.rs @@ -281,8 +281,15 @@ pub(super) async fn get_remote_pdu( vec![(event_id, value, room_id)] }; + info!("Attempting to handle event ID {event_id} as backfilled PDU"); + self.services + .rooms + .timeline + .backfill_pdu(&server, response.pdu) + .await?; + let text = serde_json::to_string_pretty(&json)?; - let msg = "Got PDU from specified server:"; + let msg = "Got PDU from specified server and handled as backfilled"; write!(self, "{msg}. Event body:\n```json\n{text}\n```") }, } diff --git a/src/admin/query/room_timeline.rs b/src/admin/query/room_timeline.rs index 0f72b58c..afcfec34 100644 --- a/src/admin/query/room_timeline.rs +++ b/src/admin/query/room_timeline.rs @@ -57,5 +57,5 @@ pub(super) async fn pdus( .try_collect() .await?; - self.write_str(&format!("```\n{result:#?}\n```")).await + self.write_str(&format!("{result:#?}")).await } diff --git a/src/api/client/message.rs b/src/api/client/message.rs index 842036f5..f5a951f4 100644 --- a/src/api/client/message.rs +++ b/src/api/client/message.rs @@ -35,6 +35,7 @@ use ruma::{ }; use tracing::warn; +use super::utils::{count_to_token, parse_pagination_token as parse_token}; use crate::Ruma; /// list of safe and common non-state events to ignore if the user is ignored @@ -84,14 +85,14 @@ pub(crate) async fn get_message_events_route( let from: PduCount = body .from .as_deref() - .map(str::parse) + .map(parse_token) .transpose()? .unwrap_or_else(|| match body.dir { | Direction::Forward => PduCount::min(), | Direction::Backward => PduCount::max(), }); - let to: Option = body.to.as_deref().map(str::parse).transpose()?; + let to: Option = body.to.as_deref().map(parse_token).transpose()?; let limit: usize = body .limit @@ -180,8 +181,8 @@ pub(crate) async fn get_message_events_route( .collect(); Ok(get_message_events::v3::Response { - start: from.to_string(), - end: next_token.as_ref().map(PduCount::to_string), + start: count_to_token(from), + end: next_token.map(count_to_token), chunk, state, }) diff --git a/src/api/client/mod.rs b/src/api/client/mod.rs index 0014282c..c8ca7757 100644 --- a/src/api/client/mod.rs +++ b/src/api/client/mod.rs @@ -37,6 +37,7 @@ pub(super) mod typing; pub(super) mod unstable; pub(super) mod unversioned; pub(super) mod user_directory; +pub(super) mod utils; pub(super) mod voip; pub(super) mod well_known; diff --git a/src/api/client/relations.rs b/src/api/client/relations.rs index 79fa1459..f6d8fe9e 100644 --- a/src/api/client/relations.rs +++ b/src/api/client/relations.rs @@ -18,6 +18,7 @@ use ruma::{ events::{TimelineEventType, relation::RelationType}, }; +use super::utils::{count_to_token, parse_pagination_token as parse_token}; use crate::Ruma; /// # `GET /_matrix/client/r0/rooms/{roomId}/relations/{eventId}/{relType}/{eventType}` @@ -110,14 +111,14 @@ async fn paginate_relations_with_filter( dir: Direction, ) -> Result { let start: PduCount = from - .map(str::parse) + .map(parse_token) .transpose()? .unwrap_or_else(|| match dir { | Direction::Forward => PduCount::min(), | Direction::Backward => PduCount::max(), }); - let to: Option = to.map(str::parse).transpose()?; + let to: Option = to.map(parse_token).transpose()?; // Use limit or else 30, with maximum 100 let limit: usize = limit @@ -192,7 +193,7 @@ async fn paginate_relations_with_filter( | Direction::Forward => events.last(), | Direction::Backward => events.first(), } - .map(|(count, _)| count.to_string()) + .map(|(count, _)| count_to_token(*count)) } else { None }; diff --git a/src/api/client/room/event.rs b/src/api/client/room/event.rs index 75ae0758..47228d67 100644 --- a/src/api/client/room/event.rs +++ b/src/api/client/room/event.rs @@ -18,7 +18,7 @@ pub(crate) async fn get_room_event_route( let event = services .rooms .timeline - .get_remote_pdu(room_id, event_id) + .get_pdu(event_id) .map_err(|_| err!(Request(NotFound("Event {} not found.", event_id)))); let visible = services @@ -33,6 +33,11 @@ pub(crate) async fn get_room_event_route( return Err!(Request(Forbidden("You don't have permission to view this event."))); } + debug_assert!( + event.event_id() == event_id && event.room_id() == room_id, + "Fetched PDU must match requested" + ); + event.add_age().ok(); Ok(get_room_event::v3::Response { event: event.into_format() }) diff --git a/src/api/client/utils.rs b/src/api/client/utils.rs new file mode 100644 index 00000000..cc941b95 --- /dev/null +++ b/src/api/client/utils.rs @@ -0,0 +1,28 @@ +use conduwuit::{ + Result, err, + matrix::pdu::{PduCount, ShortEventId}, +}; + +/// Parse a pagination token, trying ShortEventId first, then falling back to +/// PduCount +pub(crate) fn parse_pagination_token(token: &str) -> Result { + // Try parsing as ShortEventId first + if let Ok(shorteventid) = token.parse::() { + // ShortEventId maps directly to a PduCount in our database + Ok(PduCount::Normal(shorteventid)) + } else if let Ok(count) = token.parse::() { + // Fallback to PduCount for backwards compatibility + Ok(PduCount::Normal(count)) + } else if let Ok(count) = token.parse::() { + // Also handle negative counts for backfilled events + Ok(PduCount::from_signed(count)) + } else { + Err(err!(Request(InvalidParam("Invalid pagination token")))) + } +} + +/// Convert a PduCount to a token string (using the underlying ShortEventId) +pub(crate) fn count_to_token(count: PduCount) -> String { + // The PduCount's unsigned value IS the ShortEventId + count.into_unsigned().to_string() +} diff --git a/src/core/matrix/pdu/count.rs b/src/core/matrix/pdu/count.rs index 7fb12574..b880278f 100644 --- a/src/core/matrix/pdu/count.rs +++ b/src/core/matrix/pdu/count.rs @@ -1,10 +1,6 @@ #![allow(clippy::cast_possible_wrap, clippy::cast_sign_loss, clippy::as_conversions)] -use std::{ - cmp::Ordering, - fmt::{self, Display}, - str::FromStr, -}; +use std::{cmp::Ordering, fmt, fmt::Display, str::FromStr}; use ruma::api::Direction; diff --git a/src/service/rooms/timeline/backfill.rs b/src/service/rooms/timeline/backfill.rs index 9dfe8f7a..e976981e 100644 --- a/src/service/rooms/timeline/backfill.rs +++ b/src/service/rooms/timeline/backfill.rs @@ -1,8 +1,7 @@ use std::iter::once; -use conduwuit::{Err, PduEvent}; use conduwuit_core::{ - Result, debug, debug_warn, err, implement, info, + Result, debug, debug_warn, implement, info, matrix::{ event::Event, pdu::{PduCount, PduId, RawPduId}, @@ -12,7 +11,7 @@ use conduwuit_core::{ }; use futures::{FutureExt, StreamExt}; use ruma::{ - CanonicalJsonObject, EventId, RoomId, ServerName, + RoomId, ServerName, api::federation, events::{ StateEventType, TimelineEventType, room::power_levels::RoomPowerLevelsEventContent, @@ -101,7 +100,7 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re .boxed(); while let Some(ref backfill_server) = servers.next().await { - info!("Asking {backfill_server} for backfill in {room_id}"); + info!("Asking {backfill_server} for backfill"); let response = self .services .sending @@ -129,126 +128,10 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re } } - warn!("No servers could backfill, but backfill was needed in room {room_id}"); + info!("No servers could backfill, but backfill was needed in room {room_id}"); Ok(()) } -#[implement(super::Service)] -#[tracing::instrument(name = "get_remote_pdu", level = "debug", skip(self))] -pub async fn get_remote_pdu(&self, room_id: &RoomId, event_id: &EventId) -> Result { - let local = self.get_pdu(event_id).await; - if local.is_ok() { - // We already have this PDU, no need to backfill - debug!("We already have {event_id} in {room_id}, no need to backfill."); - return local; - } - debug!("Preparing to fetch event {event_id} in room {room_id} from remote servers."); - // Similar to backfill_if_required, but only for a single PDU - // Fetch a list of servers to try - if self - .services - .state_cache - .room_joined_count(room_id) - .await - .is_ok_and(|count| count <= 1) - && !self - .services - .state_accessor - .is_world_readable(room_id) - .await - { - // Room is empty (1 user or none), there is no one that can backfill - return Err!(Request(NotFound("No one can backfill this PDU, room is empty."))); - } - - let power_levels: RoomPowerLevelsEventContent = self - .services - .state_accessor - .room_state_get_content(room_id, &StateEventType::RoomPowerLevels, "") - .await - .unwrap_or_default(); - - let room_mods = power_levels.users.iter().filter_map(|(user_id, level)| { - if level > &power_levels.users_default && !self.services.globals.user_is_local(user_id) { - Some(user_id.server_name()) - } else { - None - } - }); - - let canonical_room_alias_server = once( - self.services - .state_accessor - .get_canonical_alias(room_id) - .await, - ) - .filter_map(Result::ok) - .map(|alias| alias.server_name().to_owned()) - .stream(); - let mut servers = room_mods - .stream() - .map(ToOwned::to_owned) - .chain(canonical_room_alias_server) - .chain( - self.services - .server - .config - .trusted_servers - .iter() - .map(ToOwned::to_owned) - .stream(), - ) - .ready_filter(|server_name| !self.services.globals.server_is_ours(server_name)) - .filter_map(|server_name| async move { - self.services - .state_cache - .server_in_room(&server_name, room_id) - .await - .then_some(server_name) - }) - .boxed(); - - while let Some(ref backfill_server) = servers.next().await { - info!("Asking {backfill_server} for event {}", event_id); - let value = self - .services - .sending - .send_federation_request(backfill_server, federation::event::get_event::v1::Request { - event_id: event_id.to_owned(), - include_unredacted_content: Some(false), - }) - .await - .and_then(|response| { - serde_json::from_str::(response.pdu.get()).map_err(|e| { - err!(BadServerResponse(debug_warn!( - "Error parsing incoming event {e:?} from {backfill_server}" - ))) - }) - }); - let pdu = match value { - | Ok(value) => { - self.services - .event_handler - .handle_incoming_pdu(backfill_server, room_id, event_id, value, false) - .boxed() - .await?; - debug!("Successfully backfilled {event_id} from {backfill_server}"); - Some(self.get_pdu(event_id).await) - }, - | Err(e) => { - warn!("{backfill_server} failed to provide backfill for room {room_id}: {e}"); - None - }, - }; - if let Some(pdu) = pdu { - debug!("Fetched {event_id} from {backfill_server}"); - return pdu; - } - } - - Err!("No servers could be used to fetch {} in {}.", room_id, event_id) -} - #[implement(super::Service)] #[tracing::instrument(skip(self, pdu), level = "debug")] pub async fn backfill_pdu(&self, origin: &ServerName, pdu: Box) -> Result<()> { diff --git a/src/service/rooms/timeline/data.rs b/src/service/rooms/timeline/data.rs index 9064df6c..fa10a5c0 100644 --- a/src/service/rooms/timeline/data.rs +++ b/src/service/rooms/timeline/data.rs @@ -3,7 +3,8 @@ use std::{borrow::Borrow, sync::Arc}; use conduwuit::{ Err, PduCount, PduEvent, Result, at, err, result::{LogErr, NotFound}, - utils::{self, stream::TryReadyExt}, + utils, + utils::stream::TryReadyExt, }; use database::{Database, Deserialized, Json, KeyVal, Map}; use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, future::select_ok, pin_mut};