diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs index f47399f1..04f6c0c3 100644 --- a/src/api/client_server/sync.rs +++ b/src/api/client_server/sync.rs @@ -691,7 +691,8 @@ async fn load_joined_room( drop(insert_lock); }; - let (timeline_pdus, limited) = load_timeline(sender_user, room_id, sincecount, 10, Some(filter))?; + let (timeline_pdus, oldest_timeline_event, limited) = + load_timeline(sender_user, room_id, sincecount, 10, Some(filter))?; let send_notification_counts = !timeline_pdus.is_empty() || services() @@ -1084,17 +1085,14 @@ async fn load_joined_room( None }; - let prev_batch = timeline_pdus - .first() - .map_or(Ok::<_, Error>(None), |(pdu_count, _)| { - Ok(Some(match pdu_count { - PduCount::Backfilled(_) => { - error!("timeline in backfill state?!"); - "0".to_owned() - }, - PduCount::Normal(c) => c.to_string(), - })) - })?; + let prev_batch = match oldest_timeline_event { + Some(PduCount::Backfilled(_)) => { + error!("timeline in backfill state?!"); + Some("0".to_owned()) + }, + Some(PduCount::Normal(c)) => Some(c.to_string()), + None => None, + }; let room_events: Vec<_> = timeline_pdus .iter() @@ -1172,25 +1170,38 @@ async fn load_joined_room( }) } +/// Returns `(events, oldest_event_count, limited)` +/// +/// These roughly match the fields of [`ruma::api::client::sync::v3::Limited`]. +/// +/// - `events` is a list of up to `limit` events newer than `roomsincecount` +/// that are allowed by the filter +/// - `oldest_event_count` is the inclusive lower bound of the range of events +/// that were considered for inclusion in `events`. It is none when no range +/// of events was considered (because the entire room was rejected by the +/// filter). +/// - `limited` is `true` if there may be some allowed events between +/// `oldest_event_count` and `romsincecount`. These are events that were not +/// returned because of `limit` or `load_limit`. +#[allow(clippy::type_complexity)] fn load_timeline( sender_user: &UserId, room_id: &RoomId, roomsincecount: PduCount, limit: u64, filter: Option<&CompiledFilterDefinition<'_>>, -) -> Result<(Vec<(PduCount, PduEvent)>, bool), Error> { - let timeline_pdus; - let limited; - - if let Some(filter) = filter { - if !filter.room.timeline.room_allowed(room_id) { - return Ok((vec![], false)); - } - } - - if services() +) -> Result<(Vec<(PduCount, PduEvent)>, Option, bool), Error> { + if filter.map_or(false, |filter| !filter.room.timeline.room_allowed(room_id)) { + Ok((vec![], None, false)) + } else if services() .rooms .timeline .last_timeline_count(sender_user, room_id)? > roomsincecount { + // Examine up to 2x `limit` events before stopping, even if we haven't + // found `limit` events that pass the filter yet. Without `load_limit`, the + // server could examine a very large number of events if a filter matches a + // small fraction. + let load_limit = limit.saturating_mul(2); + let mut non_timeline_pdus = services() .rooms .timeline @@ -1205,22 +1216,37 @@ fn load_timeline( .take_while(|(pducount, _)| pducount > &roomsincecount); // Take the last events for the timeline - timeline_pdus = non_timeline_pdus + let mut oldest_event_count = None; + let mut timeline_pdus = non_timeline_pdus .by_ref() + .take(usize::try_from(load_limit).unwrap_or(usize::MAX)) + .inspect(|&(pducount, _)| oldest_event_count = Some(pducount)) + .filter(|(_, pdu)| filter.map_or(true, |filter| filter.room.timeline.pdu_event_allowed(pdu))) .take(limit as usize) - .collect::>() - .into_iter() - .rev() .collect::>(); + timeline_pdus.reverse(); // They /sync response doesn't always return all messages, so we say the output // is limited unless there are events in non_timeline_pdus - limited = non_timeline_pdus.next().is_some(); + let limited = non_timeline_pdus.next().is_some(); + + Ok((timeline_pdus, oldest_event_count, limited)) } else { - timeline_pdus = Vec::new(); - limited = false; + // There are no events newer than `roomsincecount`, so oldest_event_count + // is the next PduCount directly after `roomsincecount`. This may be an + // event that doesn't exist yet. + let oldest_event_count = match roomsincecount { + PduCount::Normal(c) => PduCount::Normal(c.saturating_add(1)), + PduCount::Backfilled(c) => { + if let Some(c) = c.checked_sub(1) { + PduCount::Backfilled(c) + } else { + PduCount::Normal(0) + } + }, + }; + Ok((vec![], Some(oldest_event_count), false)) } - Ok((timeline_pdus, limited)) } fn share_encrypted_room(sender_user: &UserId, user_id: &UserId, ignore_room: &RoomId) -> Result { @@ -1577,30 +1603,21 @@ pub(crate) async fn sync_events_v4_route( for (room_id, (required_state_request, timeline_limit, roomsince)) in &todo_rooms { let roomsincecount = PduCount::Normal(*roomsince); - let (timeline_pdus, limited) = load_timeline(&sender_user, room_id, roomsincecount, *timeline_limit, None)?; + let (timeline_pdus, oldest_event_count, limited) = + load_timeline(&sender_user, room_id, roomsincecount, *timeline_limit, None)?; if roomsince != &0 && timeline_pdus.is_empty() { continue; } - let prev_batch = timeline_pdus - .first() - .map_or(Ok::<_, Error>(None), |(pdu_count, _)| { - Ok(Some(match pdu_count { - PduCount::Backfilled(_) => { - error!("timeline in backfill state?!"); - "0".to_owned() - }, - PduCount::Normal(c) => c.to_string(), - })) - })? - .or_else(|| { - if roomsince != &0 { - Some(roomsince.to_string()) - } else { - None - } - }); + let prev_batch = match oldest_event_count { + Some(PduCount::Backfilled(_)) => { + error!("timeline in backfill state?!"); + Some("0".to_owned()) + }, + Some(PduCount::Normal(c)) => Some(c.to_string()), + None => None, + }; let room_events: Vec<_> = timeline_pdus .iter()