fix(relations): improve thread pagination and include root event

Replace unreliable PduCount pagination tokens with ShortEventId throughout
the relations and messages endpoints. ShortEventId provides stable, unique
identifiers that persist across server restarts and database operations.

Key improvements:
- Add token parsing helpers that try ShortEventId first, fall back to
  PduCount for backwards compatibility
- Include thread root event when paginating backwards to thread start
- Fix off-by-one error in get_relations that was returning the starting
  event in results
- Only return next_batch/prev_batch tokens when more events are available,
  preventing clients from making unnecessary requests at thread boundaries
- Ensure consistent token format between /relations, /messages, and /sync
  endpoints for interoperability

This fixes duplicate events when scrolling at thread boundaries and ensures
the thread root message is visible when viewing a thread, matching expected
client behaviour.
This commit is contained in:
Tom Foster 2025-08-10 18:22:20 +01:00
commit c368018e5e
3 changed files with 157 additions and 30 deletions

View file

@ -1,7 +1,11 @@
use axum::extract::State;
use conduwuit::{
Result, at,
matrix::{Event, event::RelationTypeEqual, pdu::PduCount},
Result, at, err,
matrix::{
Event,
event::RelationTypeEqual,
pdu::{PduCount, ShortEventId},
},
utils::{IterStream, ReadyExt, result::FlatOk, stream::WidebandExt},
};
use conduwuit_service::Services;
@ -20,6 +24,40 @@ use ruma::{
use crate::Ruma;
/// Parse a pagination token, trying ShortEventId first, then falling back to
/// PduCount
async fn parse_pagination_token(
services: &Services,
room_id: &RoomId,
token: Option<&str>,
default: PduCount,
) -> Result<PduCount> {
let Some(token) = token else {
return Ok(default);
};
// Try parsing as ShortEventId first
if let Ok(shorteventid) = token.parse::<ShortEventId>() {
// ShortEventId maps directly to a PduCount in our database
// The shorteventid IS the count value, just need to wrap it
Ok(PduCount::Normal(shorteventid))
} else if let Ok(count) = token.parse::<u64>() {
// Fallback to PduCount for backwards compatibility
Ok(PduCount::Normal(count))
} else if let Ok(count) = token.parse::<i64>() {
// Also handle negative counts for backfilled events
Ok(PduCount::from_signed(count))
} else {
Err(err!(Request(InvalidParam("Invalid pagination token"))))
}
}
/// Convert a PduCount to a token string (using the underlying ShortEventId)
fn count_to_token(count: PduCount) -> String {
// The PduCount's unsigned value IS the ShortEventId
count.into_unsigned().to_string()
}
/// # `GET /_matrix/client/r0/rooms/{roomId}/relations/{eventId}/{relType}/{eventType}`
pub(crate) async fn get_relating_events_with_rel_type_and_event_type_route(
State(services): State<crate::State>,
@ -109,15 +147,17 @@ async fn paginate_relations_with_filter(
recurse: bool,
dir: Direction,
) -> Result<get_relating_events::v1::Response> {
let start: PduCount = from
.map(str::parse)
.transpose()?
.unwrap_or_else(|| match dir {
| Direction::Forward => PduCount::min(),
| Direction::Backward => PduCount::max(),
});
let start: PduCount = parse_pagination_token(services, room_id, from, match dir {
| Direction::Forward => PduCount::min(),
| Direction::Backward => PduCount::max(),
})
.await?;
let to: Option<PduCount> = to.map(str::parse).flat_ok();
let to: Option<PduCount> = if let Some(to_str) = to {
Some(parse_pagination_token(services, room_id, Some(to_str), PduCount::min()).await?)
} else {
None
};
// Use limit or else 30, with maximum 100
let limit: usize = limit
@ -129,7 +169,12 @@ async fn paginate_relations_with_filter(
// Spec (v1.10) recommends depth of at least 3
let depth: u8 = if recurse { 3 } else { 1 };
let events: Vec<_> = services
// Check if this is a thread request
let is_thread = filter_rel_type
.as_ref()
.is_some_and(|rel| *rel == RelationType::Thread);
let mut events: Vec<_> = services
.rooms
.pdu_metadata
.get_relations(sender_user, room_id, target, start, limit, depth, dir)
@ -152,13 +197,49 @@ async fn paginate_relations_with_filter(
.collect()
.await;
let next_batch = match dir {
| Direction::Forward => events.last(),
| Direction::Backward => events.first(),
// For threads, check if we should include the root event
let mut at_thread_beginning = false;
if is_thread && dir == Direction::Backward {
// Check if we've reached the beginning of the thread
// (fewer events than requested means we've exhausted the thread)
if events.len() < limit {
// Try to get the thread root event
if let Ok(root_pdu) = services.rooms.timeline.get_pdu(target).await {
// Check visibility
if services
.rooms
.state_accessor
.user_can_see_event(sender_user, room_id, target)
.await
{
// Get the PduCount for the root event
if let Ok(root_count) = services.rooms.timeline.get_pdu_count(target).await {
// Insert the root at the beginning (for backwards pagination)
events.insert(0, (root_count, root_pdu));
at_thread_beginning = true;
}
}
}
}
}
.map(at!(0))
.as_ref()
.map(ToString::to_string);
// Determine if there are more events to fetch
let has_more = if at_thread_beginning {
false // We've included the root, no more events
} else {
// Check if we got a full page of results (might be more)
events.len() >= limit
};
let next_batch = if has_more {
match dir {
| Direction::Forward => events.last(),
| Direction::Backward => events.first(),
}
.map(|(count, _)| count_to_token(*count))
} else {
None
};
Ok(get_relating_events::v1::Response {
next_batch,