refactor: Promote handling unsigned data out of timeline

Also fixes:
- Transaction IDs leaking in event route
- Age not being set for event relations or threads
- Both of the above for search results

Notes down concern with relations table
This commit is contained in:
Jade Ellis 2025-06-03 21:31:02 +01:00 committed by Jacob Taylor
parent add5c7052c
commit 1b08e0e33d
17 changed files with 104 additions and 72 deletions

View file

@ -31,7 +31,7 @@ pub(super) async fn last(&self, room_id: OwnedRoomOrAliasId) -> Result {
.services
.rooms
.timeline
.last_timeline_count(None, &room_id)
.last_timeline_count(&room_id)
.await?;
self.write_str(&format!("{result:#?}")).await
@ -52,7 +52,7 @@ pub(super) async fn pdus(
.services
.rooms
.timeline
.pdus_rev(None, &room_id, from)
.pdus_rev(&room_id, from)
.try_take(limit.unwrap_or(3))
.try_collect()
.await?;

View file

@ -84,11 +84,18 @@ pub(crate) async fn get_context_route(
let base_event = ignored_filter(&services, (base_count, base_pdu), sender_user);
// PDUs are used to get seen user IDs and then returned in response.
let events_before = services
.rooms
.timeline
.pdus_rev(Some(sender_user), room_id, Some(base_count))
.pdus_rev(room_id, Some(base_count))
.ignore_err()
.then(async |mut pdu| {
pdu.1.set_unsigned(Some(sender_user));
// TODO: bundled aggregations
pdu
})
.ready_filter_map(|item| event_filter(item, filter))
.wide_filter_map(|item| ignored_filter(&services, item, sender_user))
.wide_filter_map(|item| visibility_filter(&services, item, sender_user))
@ -98,8 +105,13 @@ pub(crate) async fn get_context_route(
let events_after = services
.rooms
.timeline
.pdus(Some(sender_user), room_id, Some(base_count))
.pdus(room_id, Some(base_count))
.ignore_err()
.then(async |mut pdu| {
pdu.1.set_unsigned(Some(sender_user));
// TODO: bundled aggregations
pdu
})
.ready_filter_map(|item| event_filter(item, filter))
.wide_filter_map(|item| ignored_filter(&services, item, sender_user))
.wide_filter_map(|item| visibility_filter(&services, item, sender_user))

View file

@ -114,14 +114,14 @@ pub(crate) async fn get_message_events_route(
| Direction::Forward => services
.rooms
.timeline
.pdus(Some(sender_user), room_id, Some(from))
.pdus(room_id, Some(from))
.ignore_err()
.boxed(),
| Direction::Backward => services
.rooms
.timeline
.pdus_rev(Some(sender_user), room_id, Some(from))
.pdus_rev(room_id, Some(from))
.ignore_err()
.boxed(),
};
@ -132,6 +132,11 @@ pub(crate) async fn get_message_events_route(
.wide_filter_map(|item| ignored_filter(&services, item, sender_user))
.wide_filter_map(|item| visibility_filter(&services, item, sender_user))
.take(limit)
.then(async |mut pdu| {
pdu.1.set_unsigned(Some(sender_user));
// TODO: bundled aggregations
pdu
})
.collect()
.await;

View file

@ -172,6 +172,10 @@ async fn paginate_relations_with_filter(
})
}
// TODO: Can we move the visibility filter lower down, to avoid checking events
// that won't be sent? At the moment this also results in getting events that
// appear to have no relation because intermediaries are not visible to the
// user.
async fn visibility_filter(
services: &Services,
sender_user: &UserId,

View file

@ -38,7 +38,7 @@ pub(crate) async fn get_room_event_route(
"Fetched PDU must match requested"
);
event.add_age().ok();
event.set_unsigned(body.sender_user.as_deref());
Ok(get_room_event::v3::Response { event: event.into_room_event() })
}

View file

@ -25,12 +25,19 @@ pub(crate) async fn room_initial_sync_route(
return Err!(Request(Forbidden("No room preview available.")));
}
// Events are returned in body
let limit = LIMIT_MAX;
let events: Vec<_> = services
.rooms
.timeline
.pdus_rev(None, room_id, None)
.pdus_rev(room_id, None)
.try_take(limit)
.and_then(async |mut pdu| {
pdu.1.set_unsigned(body.sender_user.as_deref());
// TODO: bundled aggregations
Ok(pdu)
})
.try_collect()
.await?;

View file

@ -31,11 +31,7 @@ async fn load_timeline(
next_batch: Option<PduCount>,
limit: usize,
) -> Result<(Vec<(PduCount, PduEvent)>, bool), Error> {
let last_timeline_count = services
.rooms
.timeline
.last_timeline_count(Some(sender_user), room_id)
.await?;
let last_timeline_count = services.rooms.timeline.last_timeline_count(room_id).await?;
if last_timeline_count <= roomsincecount {
return Ok((Vec::new(), false));
@ -44,8 +40,13 @@ async fn load_timeline(
let non_timeline_pdus = services
.rooms
.timeline
.pdus_rev(Some(sender_user), room_id, None)
.pdus_rev(room_id, None)
.ignore_err()
.map(move |mut pdu| {
pdu.1.set_unsigned(Some(sender_user));
// TODO: bundled aggregations
pdu
})
.ready_skip_while(|&(pducount, _)| pducount > next_batch.unwrap_or_else(PduCount::max))
.ready_take_while(|&(pducount, _)| pducount > roomsincecount);

View file

@ -1191,7 +1191,7 @@ async fn calculate_heroes(
services
.rooms
.timeline
.all_pdus(sender_user, room_id)
.all_pdus(room_id)
.ready_filter(|(_, pdu)| pdu.kind == RoomMember)
.fold_default(|heroes: Vec<_>, (_, pdu)| {
fold_hero(heroes, services, room_id, sender_user, pdu)

View file

@ -28,6 +28,9 @@ pub(crate) async fn get_threads_route(
.transpose()?
.unwrap_or_else(PduCount::max);
// TODO: bundled aggregation
// TODO: user_can_see_event and set_unsigned should be at the same level /
// function, so unsigned is only set for seen events.
let threads: Vec<(PduCount, PduEvent)> = services
.rooms
.threads

View file

@ -3,6 +3,7 @@ use std::cmp;
use axum::extract::State;
use conduwuit::{
PduCount, Result,
result::LogErr,
utils::{IterStream, ReadyExt, stream::TryTools},
};
use futures::{FutureExt, StreamExt, TryStreamExt};
@ -62,7 +63,7 @@ pub(crate) async fn get_backfill_route(
pdus: services
.rooms
.timeline
.pdus_rev(None, &body.room_id, Some(from.saturating_add(1)))
.pdus_rev(&body.room_id, Some(from.saturating_add(1)))
.try_take(limit)
.try_filter_map(|(_, pdu)| async move {
Ok(services
@ -72,6 +73,15 @@ pub(crate) async fn get_backfill_route(
.await
.then_some(pdu))
})
.and_then(async |mut pdu| {
// Strip the transaction ID, as that is private
pdu.remove_transaction_id().log_err().ok();
// Add age, as this is specified
pdu.add_age().log_err().ok();
// It's not clear if we should strip or add any more data, leave as is.
// In particular: Redaction?
Ok(pdu)
})
.try_filter_map(|pdu| async move {
Ok(services
.rooms

View file

@ -1,11 +1,24 @@
use std::collections::BTreeMap;
use std::{borrow::Borrow, collections::BTreeMap};
use ruma::MilliSecondsSinceUnixEpoch;
use serde::Deserialize;
use serde_json::value::{RawValue as RawJsonValue, Value as JsonValue, to_raw_value};
use super::Pdu;
use crate::{Result, err, implement, is_true};
use crate::{Result, err, implement, is_true, result::LogErr};
/// Set the `unsigned` field of the PDU using only information in the PDU.
/// Some unsigned data is already set within the database (eg. prev events,
/// threads). Once this is done, other data must be calculated from the database
/// (eg. relations) This is for server-to-client events.
/// Backfill handles this itself.
#[implement(Pdu)]
pub fn set_unsigned(&mut self, user_id: Option<&ruma::UserId>) {
if Some(self.sender.borrow()) != user_id {
self.remove_transaction_id().log_err().ok();
}
self.add_age().log_err().ok();
}
#[implement(Pdu)]
pub fn remove_transaction_id(&mut self) -> Result {

View file

@ -3,7 +3,6 @@ use std::{mem::size_of, sync::Arc};
use conduwuit::{
PduCount, PduEvent,
arrayvec::ArrayVec,
result::LogErr,
utils::{
ReadyExt,
stream::{TryIgnore, WidebandExt},
@ -80,9 +79,7 @@ impl Data {
let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?;
if pdu.sender != user_id {
pdu.remove_transaction_id().log_err().ok();
}
pdu.set_unsigned(Some(user_id));
Some((shorteventid, pdu))
})

View file

@ -127,7 +127,12 @@ pub async fn search_pdus<'a>(
.then_some(pdu)
})
.skip(query.skip)
.take(query.limit);
.take(query.limit)
.map(move |mut pdu| {
pdu.set_unsigned(query.user_id);
// TODO: bundled aggregation
pdu
});
Ok((count, pdus))
}

View file

@ -160,9 +160,7 @@ impl Service {
let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?;
let pdu_id: PduId = pdu_id.into();
if pdu.sender != user_id {
pdu.remove_transaction_id().ok();
}
pdu.set_unsigned(Some(user_id));
Some((pdu_id.shorteventid, pdu))
});

View file

@ -1,14 +1,11 @@
use std::{borrow::Borrow, sync::Arc};
use std::sync::Arc;
use conduwuit::{
Err, PduCount, PduEvent, Result, at, err,
result::{LogErr, NotFound},
utils,
utils::stream::TryReadyExt,
Err, PduCount, PduEvent, Result, at, err, result::NotFound, utils, utils::stream::TryReadyExt,
};
use database::{Database, Deserialized, Json, KeyVal, Map};
use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, future::select_ok, pin_mut};
use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, UserId, api::Direction};
use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, api::Direction};
use super::{PduId, RawPduId};
use crate::{Dep, rooms, rooms::short::ShortRoomId};
@ -46,12 +43,8 @@ impl Data {
}
#[inline]
pub(super) async fn last_timeline_count(
&self,
sender_user: Option<&UserId>,
room_id: &RoomId,
) -> Result<PduCount> {
let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max());
pub(super) async fn last_timeline_count(&self, room_id: &RoomId) -> Result<PduCount> {
let pdus_rev = self.pdus_rev(room_id, PduCount::max());
pin_mut!(pdus_rev);
let last_count = pdus_rev
@ -65,12 +58,8 @@ impl Data {
}
#[inline]
pub(super) async fn latest_pdu_in_room(
&self,
sender_user: Option<&UserId>,
room_id: &RoomId,
) -> Result<PduEvent> {
let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max());
pub(super) async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result<PduEvent> {
let pdus_rev = self.pdus_rev(room_id, PduCount::max());
pin_mut!(pdus_rev);
pdus_rev
@ -223,7 +212,6 @@ impl Data {
/// order.
pub(super) fn pdus_rev<'a>(
&'a self,
user_id: Option<&'a UserId>,
room_id: &'a RoomId,
until: PduCount,
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
@ -233,14 +221,13 @@ impl Data {
self.pduid_pdu
.rev_raw_stream_from(&current)
.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
.ready_and_then(move |item| Self::each_pdu(item, user_id))
.ready_and_then(Self::from_json_slice)
})
.try_flatten_stream()
}
pub(super) fn pdus<'a>(
&'a self,
user_id: Option<&'a UserId>,
room_id: &'a RoomId,
from: PduCount,
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
@ -250,21 +237,15 @@ impl Data {
self.pduid_pdu
.raw_stream_from(&current)
.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
.ready_and_then(move |item| Self::each_pdu(item, user_id))
.ready_and_then(Self::from_json_slice)
})
.try_flatten_stream()
}
fn each_pdu((pdu_id, pdu): KeyVal<'_>, user_id: Option<&UserId>) -> Result<PdusIterItem> {
fn from_json_slice((pdu_id, pdu): KeyVal<'_>) -> Result<PdusIterItem> {
let pdu_id: RawPduId = pdu_id.into();
let mut pdu = serde_json::from_slice::<PduEvent>(pdu)?;
if Some(pdu.sender.borrow()) != user_id {
pdu.remove_transaction_id().log_err().ok();
}
pdu.add_age().log_err().ok();
let pdu = serde_json::from_slice::<PduEvent>(pdu)?;
Ok((pdu_id.pdu_count(), pdu))
}

View file

@ -165,7 +165,7 @@ impl Service {
#[tracing::instrument(skip(self), level = "debug")]
pub async fn first_item_in_room(&self, room_id: &RoomId) -> Result<(PduCount, PduEvent)> {
let pdus = self.pdus(None, room_id, None);
let pdus = self.pdus(room_id, None);
pin_mut!(pdus);
pdus.try_next()
@ -175,16 +175,12 @@ impl Service {
#[tracing::instrument(skip(self), level = "debug")]
pub async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result<PduEvent> {
self.db.latest_pdu_in_room(None, room_id).await
self.db.latest_pdu_in_room(room_id).await
}
#[tracing::instrument(skip(self), level = "debug")]
pub async fn last_timeline_count(
&self,
sender_user: Option<&UserId>,
room_id: &RoomId,
) -> Result<PduCount> {
self.db.last_timeline_count(sender_user, room_id).await
pub async fn last_timeline_count(&self, room_id: &RoomId) -> Result<PduCount> {
self.db.last_timeline_count(room_id).await
}
/// Returns the `count` of this pdu's id.
@ -545,6 +541,10 @@ impl Service {
| _ => {},
}
// CONCERN: If we receive events with a relation out-of-order, we never write
// their relation / thread. We need some kind of way to trigger when we receive
// this event, and potentially a way to rebuild the table entirely.
if let Ok(content) = pdu.get_content::<ExtractRelatesToEventId>() {
if let Ok(related_pducount) = self.get_pdu_count(&content.relates_to.event_id).await {
self.services
@ -996,34 +996,30 @@ impl Service {
#[inline]
pub fn all_pdus<'a>(
&'a self,
user_id: &'a UserId,
room_id: &'a RoomId,
) -> impl Stream<Item = PdusIterItem> + Send + 'a {
self.pdus(Some(user_id), room_id, None).ignore_err()
self.pdus(room_id, None).ignore_err()
}
/// Reverse iteration starting at from.
#[tracing::instrument(skip(self), level = "debug")]
pub fn pdus_rev<'a>(
&'a self,
user_id: Option<&'a UserId>,
room_id: &'a RoomId,
until: Option<PduCount>,
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
self.db
.pdus_rev(user_id, room_id, until.unwrap_or_else(PduCount::max))
.pdus_rev(room_id, until.unwrap_or_else(PduCount::max))
}
/// Forward iteration starting at from.
#[tracing::instrument(skip(self), level = "debug")]
pub fn pdus<'a>(
&'a self,
user_id: Option<&'a UserId>,
room_id: &'a RoomId,
from: Option<PduCount>,
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
self.db
.pdus(user_id, room_id, from.unwrap_or_else(PduCount::min))
self.db.pdus(room_id, from.unwrap_or_else(PduCount::min))
}
/// Replace a PDU with the redacted form.

View file

@ -781,7 +781,7 @@ impl Service {
for pdu in pdus {
// Redacted events are not notification targets (we don't send push for them)
if pdu.contains_unsigned_property("redacted_because", serde_json::Value::is_string) {
if pdu.is_redacted() {
continue;
}