implement per-event timeline filtering on /sync

This is the filter.room.timeline.{senders,types,contains_url} fields, and
their associated not_* pairs.

This change incidentally fixes a couple bugs in prev_batch handling. The
first is that '/sync' would omit prev_batch on a room's timeline response
if there were no events returned within the sync window. The second bug
is that sliding-sync was returing the count passed as 'since' as
prev_batch when there are no timeline events, instead of the count value
one higher than 'since'. This is wrong because both 'since' and
'prev_batch' are exclusive bounds. Both of these bugs were masked by a
ruma bug[1] that causes it to skip the entire timeline object when there
are no events.

[1]: https://github.com/ruma/ruma/pull/1796
This commit is contained in:
Benjamin Lee 2024-05-04 00:51:25 -07:00
parent 0c3d1e5745
commit 126c9b8bd2
No known key found for this signature in database
GPG key ID: FB9624E2885D55A4

View file

@ -691,7 +691,8 @@ async fn load_joined_room(
drop(insert_lock); drop(insert_lock);
}; };
let (timeline_pdus, limited) = load_timeline(sender_user, room_id, sincecount, 10, Some(filter))?; let (timeline_pdus, oldest_timeline_event, limited) =
load_timeline(sender_user, room_id, sincecount, 10, Some(filter))?;
let send_notification_counts = !timeline_pdus.is_empty() let send_notification_counts = !timeline_pdus.is_empty()
|| services() || services()
@ -1084,17 +1085,14 @@ async fn load_joined_room(
None None
}; };
let prev_batch = timeline_pdus let prev_batch = match oldest_timeline_event {
.first() Some(PduCount::Backfilled(_)) => {
.map_or(Ok::<_, Error>(None), |(pdu_count, _)| { error!("timeline in backfill state?!");
Ok(Some(match pdu_count { Some("0".to_owned())
PduCount::Backfilled(_) => { },
error!("timeline in backfill state?!"); Some(PduCount::Normal(c)) => Some(c.to_string()),
"0".to_owned() None => None,
}, };
PduCount::Normal(c) => c.to_string(),
}))
})?;
let room_events: Vec<_> = timeline_pdus let room_events: Vec<_> = timeline_pdus
.iter() .iter()
@ -1172,25 +1170,38 @@ async fn load_joined_room(
}) })
} }
/// Returns `(events, oldest_event_count, limited)`
///
/// These roughly match the fields of [`ruma::api::client::sync::v3::Limited`].
///
/// - `events` is a list of up to `limit` events newer than `roomsincecount`
/// that are allowed by the filter
/// - `oldest_event_count` is the inclusive lower bound of the range of events
/// that were considered for inclusion in `events`. It is none when no range
/// of events was considered (because the entire room was rejected by the
/// filter).
/// - `limited` is `true` if there may be some allowed events between
/// `oldest_event_count` and `romsincecount`. These are events that were not
/// returned because of `limit` or `load_limit`.
#[allow(clippy::type_complexity)]
fn load_timeline( fn load_timeline(
sender_user: &UserId, room_id: &RoomId, roomsincecount: PduCount, limit: u64, sender_user: &UserId, room_id: &RoomId, roomsincecount: PduCount, limit: u64,
filter: Option<&CompiledFilterDefinition<'_>>, filter: Option<&CompiledFilterDefinition<'_>>,
) -> Result<(Vec<(PduCount, PduEvent)>, bool), Error> { ) -> Result<(Vec<(PduCount, PduEvent)>, Option<PduCount>, bool), Error> {
let timeline_pdus; if filter.map_or(false, |filter| !filter.room.timeline.room_allowed(room_id)) {
let limited; Ok((vec![], None, false))
} else if services()
if let Some(filter) = filter {
if !filter.room.timeline.room_allowed(room_id) {
return Ok((vec![], false));
}
}
if services()
.rooms .rooms
.timeline .timeline
.last_timeline_count(sender_user, room_id)? .last_timeline_count(sender_user, room_id)?
> roomsincecount > roomsincecount
{ {
// Examine up to 2x `limit` events before stopping, even if we haven't
// found `limit` events that pass the filter yet. Without `load_limit`, the
// server could examine a very large number of events if a filter matches a
// small fraction.
let load_limit = limit.saturating_mul(2);
let mut non_timeline_pdus = services() let mut non_timeline_pdus = services()
.rooms .rooms
.timeline .timeline
@ -1205,22 +1216,37 @@ fn load_timeline(
.take_while(|(pducount, _)| pducount > &roomsincecount); .take_while(|(pducount, _)| pducount > &roomsincecount);
// Take the last events for the timeline // Take the last events for the timeline
timeline_pdus = non_timeline_pdus let mut oldest_event_count = None;
let mut timeline_pdus = non_timeline_pdus
.by_ref() .by_ref()
.take(usize::try_from(load_limit).unwrap_or(usize::MAX))
.inspect(|&(pducount, _)| oldest_event_count = Some(pducount))
.filter(|(_, pdu)| filter.map_or(true, |filter| filter.room.timeline.pdu_event_allowed(pdu)))
.take(limit as usize) .take(limit as usize)
.collect::<Vec<_>>()
.into_iter()
.rev()
.collect::<Vec<_>>(); .collect::<Vec<_>>();
timeline_pdus.reverse();
// They /sync response doesn't always return all messages, so we say the output // They /sync response doesn't always return all messages, so we say the output
// is limited unless there are events in non_timeline_pdus // is limited unless there are events in non_timeline_pdus
limited = non_timeline_pdus.next().is_some(); let limited = non_timeline_pdus.next().is_some();
Ok((timeline_pdus, oldest_event_count, limited))
} else { } else {
timeline_pdus = Vec::new(); // There are no events newer than `roomsincecount`, so oldest_event_count
limited = false; // is the next PduCount directly after `roomsincecount`. This may be an
// event that doesn't exist yet.
let oldest_event_count = match roomsincecount {
PduCount::Normal(c) => PduCount::Normal(c.saturating_add(1)),
PduCount::Backfilled(c) => {
if let Some(c) = c.checked_sub(1) {
PduCount::Backfilled(c)
} else {
PduCount::Normal(0)
}
},
};
Ok((vec![], Some(oldest_event_count), false))
} }
Ok((timeline_pdus, limited))
} }
fn share_encrypted_room(sender_user: &UserId, user_id: &UserId, ignore_room: &RoomId) -> Result<bool> { fn share_encrypted_room(sender_user: &UserId, user_id: &UserId, ignore_room: &RoomId) -> Result<bool> {
@ -1577,30 +1603,21 @@ pub(crate) async fn sync_events_v4_route(
for (room_id, (required_state_request, timeline_limit, roomsince)) in &todo_rooms { for (room_id, (required_state_request, timeline_limit, roomsince)) in &todo_rooms {
let roomsincecount = PduCount::Normal(*roomsince); let roomsincecount = PduCount::Normal(*roomsince);
let (timeline_pdus, limited) = load_timeline(&sender_user, room_id, roomsincecount, *timeline_limit, None)?; let (timeline_pdus, oldest_event_count, limited) =
load_timeline(&sender_user, room_id, roomsincecount, *timeline_limit, None)?;
if roomsince != &0 && timeline_pdus.is_empty() { if roomsince != &0 && timeline_pdus.is_empty() {
continue; continue;
} }
let prev_batch = timeline_pdus let prev_batch = match oldest_event_count {
.first() Some(PduCount::Backfilled(_)) => {
.map_or(Ok::<_, Error>(None), |(pdu_count, _)| { error!("timeline in backfill state?!");
Ok(Some(match pdu_count { Some("0".to_owned())
PduCount::Backfilled(_) => { },
error!("timeline in backfill state?!"); Some(PduCount::Normal(c)) => Some(c.to_string()),
"0".to_owned() None => None,
}, };
PduCount::Normal(c) => c.to_string(),
}))
})?
.or_else(|| {
if roomsince != &0 {
Some(roomsince.to_string())
} else {
None
}
});
let room_events: Vec<_> = timeline_pdus let room_events: Vec<_> = timeline_pdus
.iter() .iter()