refactor: Promote handling unsigned data out of timeline

Also fixes:
- Transaction IDs leaking in event route
- Age not being set for event relations or threads
- Both of the above for search results

Notes down concern with relations table
This commit is contained in:
Jade Ellis 2025-06-03 21:31:02 +01:00
commit 490a5d087e
No known key found for this signature in database
GPG key ID: 8705A2A3EBF77BD2
17 changed files with 104 additions and 72 deletions

View file

@ -1,14 +1,11 @@
use std::{borrow::Borrow, sync::Arc};
use std::sync::Arc;
use conduwuit::{
Err, PduCount, PduEvent, Result, at, err,
result::{LogErr, NotFound},
utils,
utils::stream::TryReadyExt,
Err, PduCount, PduEvent, Result, at, err, result::NotFound, utils, utils::stream::TryReadyExt,
};
use database::{Database, Deserialized, Json, KeyVal, Map};
use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, future::select_ok, pin_mut};
use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, UserId, api::Direction};
use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, api::Direction};
use super::{PduId, RawPduId};
use crate::{Dep, rooms, rooms::short::ShortRoomId};
@ -46,12 +43,8 @@ impl Data {
}
#[inline]
pub(super) async fn last_timeline_count(
&self,
sender_user: Option<&UserId>,
room_id: &RoomId,
) -> Result<PduCount> {
let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max());
pub(super) async fn last_timeline_count(&self, room_id: &RoomId) -> Result<PduCount> {
let pdus_rev = self.pdus_rev(room_id, PduCount::max());
pin_mut!(pdus_rev);
let last_count = pdus_rev
@ -65,12 +58,8 @@ impl Data {
}
#[inline]
pub(super) async fn latest_pdu_in_room(
&self,
sender_user: Option<&UserId>,
room_id: &RoomId,
) -> Result<PduEvent> {
let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max());
pub(super) async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result<PduEvent> {
let pdus_rev = self.pdus_rev(room_id, PduCount::max());
pin_mut!(pdus_rev);
pdus_rev
@ -223,7 +212,6 @@ impl Data {
/// order.
pub(super) fn pdus_rev<'a>(
&'a self,
user_id: Option<&'a UserId>,
room_id: &'a RoomId,
until: PduCount,
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
@ -233,14 +221,13 @@ impl Data {
self.pduid_pdu
.rev_raw_stream_from(&current)
.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
.ready_and_then(move |item| Self::each_pdu(item, user_id))
.ready_and_then(Self::from_json_slice)
})
.try_flatten_stream()
}
pub(super) fn pdus<'a>(
&'a self,
user_id: Option<&'a UserId>,
room_id: &'a RoomId,
from: PduCount,
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
@ -250,21 +237,15 @@ impl Data {
self.pduid_pdu
.raw_stream_from(&current)
.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
.ready_and_then(move |item| Self::each_pdu(item, user_id))
.ready_and_then(Self::from_json_slice)
})
.try_flatten_stream()
}
fn each_pdu((pdu_id, pdu): KeyVal<'_>, user_id: Option<&UserId>) -> Result<PdusIterItem> {
fn from_json_slice((pdu_id, pdu): KeyVal<'_>) -> Result<PdusIterItem> {
let pdu_id: RawPduId = pdu_id.into();
let mut pdu = serde_json::from_slice::<PduEvent>(pdu)?;
if Some(pdu.sender.borrow()) != user_id {
pdu.remove_transaction_id().log_err().ok();
}
pdu.add_age().log_err().ok();
let pdu = serde_json::from_slice::<PduEvent>(pdu)?;
Ok((pdu_id.pdu_count(), pdu))
}