mirror of
https://forgejo.ellis.link/continuwuation/continuwuity.git
synced 2025-06-26 12:15:22 +02:00
refactor: Promote handling unsigned data out of timeline
Also fixes: - Transaction IDs leaking in event route - Age not being set for event relations or threads - Both of the above for search results Notes down concern with relations table
This commit is contained in:
parent
70df8364b3
commit
7579d0acf8
17 changed files with 104 additions and 72 deletions
|
@ -31,7 +31,7 @@ pub(super) async fn last(&self, room_id: OwnedRoomOrAliasId) -> Result {
|
|||
.services
|
||||
.rooms
|
||||
.timeline
|
||||
.last_timeline_count(None, &room_id)
|
||||
.last_timeline_count(&room_id)
|
||||
.await?;
|
||||
|
||||
self.write_str(&format!("{result:#?}")).await
|
||||
|
@ -52,7 +52,7 @@ pub(super) async fn pdus(
|
|||
.services
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus_rev(None, &room_id, from)
|
||||
.pdus_rev(&room_id, from)
|
||||
.try_take(limit.unwrap_or(3))
|
||||
.try_collect()
|
||||
.await?;
|
||||
|
|
|
@ -84,11 +84,18 @@ pub(crate) async fn get_context_route(
|
|||
|
||||
let base_event = ignored_filter(&services, (base_count, base_pdu), sender_user);
|
||||
|
||||
// PDUs are used to get seen user IDs and then returned in response.
|
||||
|
||||
let events_before = services
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus_rev(Some(sender_user), room_id, Some(base_count))
|
||||
.pdus_rev(room_id, Some(base_count))
|
||||
.ignore_err()
|
||||
.then(async |mut pdu| {
|
||||
pdu.1.set_unsigned(Some(sender_user));
|
||||
// TODO: bundled aggregations
|
||||
pdu
|
||||
})
|
||||
.ready_filter_map(|item| event_filter(item, filter))
|
||||
.wide_filter_map(|item| ignored_filter(&services, item, sender_user))
|
||||
.wide_filter_map(|item| visibility_filter(&services, item, sender_user))
|
||||
|
@ -98,8 +105,13 @@ pub(crate) async fn get_context_route(
|
|||
let events_after = services
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus(Some(sender_user), room_id, Some(base_count))
|
||||
.pdus(room_id, Some(base_count))
|
||||
.ignore_err()
|
||||
.then(async |mut pdu| {
|
||||
pdu.1.set_unsigned(Some(sender_user));
|
||||
// TODO: bundled aggregations
|
||||
pdu
|
||||
})
|
||||
.ready_filter_map(|item| event_filter(item, filter))
|
||||
.wide_filter_map(|item| ignored_filter(&services, item, sender_user))
|
||||
.wide_filter_map(|item| visibility_filter(&services, item, sender_user))
|
||||
|
|
|
@ -114,14 +114,14 @@ pub(crate) async fn get_message_events_route(
|
|||
| Direction::Forward => services
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus(Some(sender_user), room_id, Some(from))
|
||||
.pdus(room_id, Some(from))
|
||||
.ignore_err()
|
||||
.boxed(),
|
||||
|
||||
| Direction::Backward => services
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus_rev(Some(sender_user), room_id, Some(from))
|
||||
.pdus_rev(room_id, Some(from))
|
||||
.ignore_err()
|
||||
.boxed(),
|
||||
};
|
||||
|
@ -132,6 +132,11 @@ pub(crate) async fn get_message_events_route(
|
|||
.wide_filter_map(|item| ignored_filter(&services, item, sender_user))
|
||||
.wide_filter_map(|item| visibility_filter(&services, item, sender_user))
|
||||
.take(limit)
|
||||
.then(async |mut pdu| {
|
||||
pdu.1.set_unsigned(Some(sender_user));
|
||||
// TODO: bundled aggregations
|
||||
pdu
|
||||
})
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
|
|
|
@ -172,6 +172,10 @@ async fn paginate_relations_with_filter(
|
|||
})
|
||||
}
|
||||
|
||||
// TODO: Can we move the visibility filter lower down, to avoid checking events
|
||||
// that won't be sent? At the moment this also results in getting events that
|
||||
// appear to have no relation because intermediaries are not visible to the
|
||||
// user.
|
||||
async fn visibility_filter(
|
||||
services: &Services,
|
||||
sender_user: &UserId,
|
||||
|
|
|
@ -38,7 +38,7 @@ pub(crate) async fn get_room_event_route(
|
|||
"Fetched PDU must match requested"
|
||||
);
|
||||
|
||||
event.add_age().ok();
|
||||
event.set_unsigned(body.sender_user.as_deref());
|
||||
|
||||
Ok(get_room_event::v3::Response { event: event.into_room_event() })
|
||||
}
|
||||
|
|
|
@ -25,12 +25,19 @@ pub(crate) async fn room_initial_sync_route(
|
|||
return Err!(Request(Forbidden("No room preview available.")));
|
||||
}
|
||||
|
||||
// Events are returned in body
|
||||
|
||||
let limit = LIMIT_MAX;
|
||||
let events: Vec<_> = services
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus_rev(None, room_id, None)
|
||||
.pdus_rev(room_id, None)
|
||||
.try_take(limit)
|
||||
.and_then(async |mut pdu| {
|
||||
pdu.1.set_unsigned(body.sender_user.as_deref());
|
||||
// TODO: bundled aggregations
|
||||
Ok(pdu)
|
||||
})
|
||||
.try_collect()
|
||||
.await?;
|
||||
|
||||
|
|
|
@ -31,11 +31,7 @@ async fn load_timeline(
|
|||
next_batch: Option<PduCount>,
|
||||
limit: usize,
|
||||
) -> Result<(Vec<(PduCount, PduEvent)>, bool), Error> {
|
||||
let last_timeline_count = services
|
||||
.rooms
|
||||
.timeline
|
||||
.last_timeline_count(Some(sender_user), room_id)
|
||||
.await?;
|
||||
let last_timeline_count = services.rooms.timeline.last_timeline_count(room_id).await?;
|
||||
|
||||
if last_timeline_count <= roomsincecount {
|
||||
return Ok((Vec::new(), false));
|
||||
|
@ -44,8 +40,13 @@ async fn load_timeline(
|
|||
let non_timeline_pdus = services
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus_rev(Some(sender_user), room_id, None)
|
||||
.pdus_rev(room_id, None)
|
||||
.ignore_err()
|
||||
.map(move |mut pdu| {
|
||||
pdu.1.set_unsigned(Some(sender_user));
|
||||
// TODO: bundled aggregations
|
||||
pdu
|
||||
})
|
||||
.ready_skip_while(|&(pducount, _)| pducount > next_batch.unwrap_or_else(PduCount::max))
|
||||
.ready_take_while(|&(pducount, _)| pducount > roomsincecount);
|
||||
|
||||
|
|
|
@ -1191,7 +1191,7 @@ async fn calculate_heroes(
|
|||
services
|
||||
.rooms
|
||||
.timeline
|
||||
.all_pdus(sender_user, room_id)
|
||||
.all_pdus(room_id)
|
||||
.ready_filter(|(_, pdu)| pdu.kind == RoomMember)
|
||||
.fold_default(|heroes: Vec<_>, (_, pdu)| {
|
||||
fold_hero(heroes, services, room_id, sender_user, pdu)
|
||||
|
|
|
@ -28,6 +28,9 @@ pub(crate) async fn get_threads_route(
|
|||
.transpose()?
|
||||
.unwrap_or_else(PduCount::max);
|
||||
|
||||
// TODO: bundled aggregation
|
||||
// TODO: user_can_see_event and set_unsigned should be at the same level /
|
||||
// function, so unsigned is only set for seen events.
|
||||
let threads: Vec<(PduCount, PduEvent)> = services
|
||||
.rooms
|
||||
.threads
|
||||
|
|
|
@ -3,6 +3,7 @@ use std::cmp;
|
|||
use axum::extract::State;
|
||||
use conduwuit::{
|
||||
PduCount, Result,
|
||||
result::LogErr,
|
||||
utils::{IterStream, ReadyExt, stream::TryTools},
|
||||
};
|
||||
use futures::{FutureExt, StreamExt, TryStreamExt};
|
||||
|
@ -62,7 +63,7 @@ pub(crate) async fn get_backfill_route(
|
|||
pdus: services
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus_rev(None, &body.room_id, Some(from.saturating_add(1)))
|
||||
.pdus_rev(&body.room_id, Some(from.saturating_add(1)))
|
||||
.try_take(limit)
|
||||
.try_filter_map(|(_, pdu)| async move {
|
||||
Ok(services
|
||||
|
@ -72,6 +73,15 @@ pub(crate) async fn get_backfill_route(
|
|||
.await
|
||||
.then_some(pdu))
|
||||
})
|
||||
.and_then(async |mut pdu| {
|
||||
// Strip the transaction ID, as that is private
|
||||
pdu.remove_transaction_id().log_err().ok();
|
||||
// Add age, as this is specified
|
||||
pdu.add_age().log_err().ok();
|
||||
// It's not clear if we should strip or add any more data, leave as is.
|
||||
// In particular: Redaction?
|
||||
Ok(pdu)
|
||||
})
|
||||
.try_filter_map(|pdu| async move {
|
||||
Ok(services
|
||||
.rooms
|
||||
|
|
|
@ -1,11 +1,24 @@
|
|||
use std::collections::BTreeMap;
|
||||
use std::{borrow::Borrow, collections::BTreeMap};
|
||||
|
||||
use ruma::MilliSecondsSinceUnixEpoch;
|
||||
use serde::Deserialize;
|
||||
use serde_json::value::{RawValue as RawJsonValue, Value as JsonValue, to_raw_value};
|
||||
|
||||
use super::Pdu;
|
||||
use crate::{Result, err, implement, is_true};
|
||||
use crate::{Result, err, implement, is_true, result::LogErr};
|
||||
|
||||
/// Set the `unsigned` field of the PDU using only information in the PDU.
|
||||
/// Some unsigned data is already set within the database (eg. prev events,
|
||||
/// threads). Once this is done, other data must be calculated from the database
|
||||
/// (eg. relations) This is for server-to-client events.
|
||||
/// Backfill handles this itself.
|
||||
#[implement(Pdu)]
|
||||
pub fn set_unsigned(&mut self, user_id: Option<&ruma::UserId>) {
|
||||
if Some(self.sender.borrow()) != user_id {
|
||||
self.remove_transaction_id().log_err().ok();
|
||||
}
|
||||
self.add_age().log_err().ok();
|
||||
}
|
||||
|
||||
#[implement(Pdu)]
|
||||
pub fn remove_transaction_id(&mut self) -> Result {
|
||||
|
|
|
@ -3,7 +3,6 @@ use std::{mem::size_of, sync::Arc};
|
|||
use conduwuit::{
|
||||
PduCount, PduEvent,
|
||||
arrayvec::ArrayVec,
|
||||
result::LogErr,
|
||||
utils::{
|
||||
ReadyExt,
|
||||
stream::{TryIgnore, WidebandExt},
|
||||
|
@ -80,9 +79,7 @@ impl Data {
|
|||
|
||||
let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?;
|
||||
|
||||
if pdu.sender != user_id {
|
||||
pdu.remove_transaction_id().log_err().ok();
|
||||
}
|
||||
pdu.set_unsigned(Some(user_id));
|
||||
|
||||
Some((shorteventid, pdu))
|
||||
})
|
||||
|
|
|
@ -127,7 +127,12 @@ pub async fn search_pdus<'a>(
|
|||
.then_some(pdu)
|
||||
})
|
||||
.skip(query.skip)
|
||||
.take(query.limit);
|
||||
.take(query.limit)
|
||||
.map(move |mut pdu| {
|
||||
pdu.set_unsigned(query.user_id);
|
||||
// TODO: bundled aggregation
|
||||
pdu
|
||||
});
|
||||
|
||||
Ok((count, pdus))
|
||||
}
|
||||
|
|
|
@ -160,9 +160,7 @@ impl Service {
|
|||
let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?;
|
||||
let pdu_id: PduId = pdu_id.into();
|
||||
|
||||
if pdu.sender != user_id {
|
||||
pdu.remove_transaction_id().ok();
|
||||
}
|
||||
pdu.set_unsigned(Some(user_id));
|
||||
|
||||
Some((pdu_id.shorteventid, pdu))
|
||||
});
|
||||
|
|
|
@ -1,14 +1,11 @@
|
|||
use std::{borrow::Borrow, sync::Arc};
|
||||
use std::sync::Arc;
|
||||
|
||||
use conduwuit::{
|
||||
Err, PduCount, PduEvent, Result, at, err,
|
||||
result::{LogErr, NotFound},
|
||||
utils,
|
||||
utils::stream::TryReadyExt,
|
||||
Err, PduCount, PduEvent, Result, at, err, result::NotFound, utils, utils::stream::TryReadyExt,
|
||||
};
|
||||
use database::{Database, Deserialized, Json, KeyVal, Map};
|
||||
use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, future::select_ok, pin_mut};
|
||||
use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, UserId, api::Direction};
|
||||
use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, api::Direction};
|
||||
|
||||
use super::{PduId, RawPduId};
|
||||
use crate::{Dep, rooms, rooms::short::ShortRoomId};
|
||||
|
@ -46,12 +43,8 @@ impl Data {
|
|||
}
|
||||
|
||||
#[inline]
|
||||
pub(super) async fn last_timeline_count(
|
||||
&self,
|
||||
sender_user: Option<&UserId>,
|
||||
room_id: &RoomId,
|
||||
) -> Result<PduCount> {
|
||||
let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max());
|
||||
pub(super) async fn last_timeline_count(&self, room_id: &RoomId) -> Result<PduCount> {
|
||||
let pdus_rev = self.pdus_rev(room_id, PduCount::max());
|
||||
|
||||
pin_mut!(pdus_rev);
|
||||
let last_count = pdus_rev
|
||||
|
@ -65,12 +58,8 @@ impl Data {
|
|||
}
|
||||
|
||||
#[inline]
|
||||
pub(super) async fn latest_pdu_in_room(
|
||||
&self,
|
||||
sender_user: Option<&UserId>,
|
||||
room_id: &RoomId,
|
||||
) -> Result<PduEvent> {
|
||||
let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max());
|
||||
pub(super) async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result<PduEvent> {
|
||||
let pdus_rev = self.pdus_rev(room_id, PduCount::max());
|
||||
|
||||
pin_mut!(pdus_rev);
|
||||
pdus_rev
|
||||
|
@ -223,7 +212,6 @@ impl Data {
|
|||
/// order.
|
||||
pub(super) fn pdus_rev<'a>(
|
||||
&'a self,
|
||||
user_id: Option<&'a UserId>,
|
||||
room_id: &'a RoomId,
|
||||
until: PduCount,
|
||||
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
|
||||
|
@ -233,14 +221,13 @@ impl Data {
|
|||
self.pduid_pdu
|
||||
.rev_raw_stream_from(¤t)
|
||||
.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
|
||||
.ready_and_then(move |item| Self::each_pdu(item, user_id))
|
||||
.ready_and_then(Self::from_json_slice)
|
||||
})
|
||||
.try_flatten_stream()
|
||||
}
|
||||
|
||||
pub(super) fn pdus<'a>(
|
||||
&'a self,
|
||||
user_id: Option<&'a UserId>,
|
||||
room_id: &'a RoomId,
|
||||
from: PduCount,
|
||||
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
|
||||
|
@ -250,21 +237,15 @@ impl Data {
|
|||
self.pduid_pdu
|
||||
.raw_stream_from(¤t)
|
||||
.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
|
||||
.ready_and_then(move |item| Self::each_pdu(item, user_id))
|
||||
.ready_and_then(Self::from_json_slice)
|
||||
})
|
||||
.try_flatten_stream()
|
||||
}
|
||||
|
||||
fn each_pdu((pdu_id, pdu): KeyVal<'_>, user_id: Option<&UserId>) -> Result<PdusIterItem> {
|
||||
fn from_json_slice((pdu_id, pdu): KeyVal<'_>) -> Result<PdusIterItem> {
|
||||
let pdu_id: RawPduId = pdu_id.into();
|
||||
|
||||
let mut pdu = serde_json::from_slice::<PduEvent>(pdu)?;
|
||||
|
||||
if Some(pdu.sender.borrow()) != user_id {
|
||||
pdu.remove_transaction_id().log_err().ok();
|
||||
}
|
||||
|
||||
pdu.add_age().log_err().ok();
|
||||
let pdu = serde_json::from_slice::<PduEvent>(pdu)?;
|
||||
|
||||
Ok((pdu_id.pdu_count(), pdu))
|
||||
}
|
||||
|
|
|
@ -165,7 +165,7 @@ impl Service {
|
|||
|
||||
#[tracing::instrument(skip(self), level = "debug")]
|
||||
pub async fn first_item_in_room(&self, room_id: &RoomId) -> Result<(PduCount, PduEvent)> {
|
||||
let pdus = self.pdus(None, room_id, None);
|
||||
let pdus = self.pdus(room_id, None);
|
||||
|
||||
pin_mut!(pdus);
|
||||
pdus.try_next()
|
||||
|
@ -175,16 +175,12 @@ impl Service {
|
|||
|
||||
#[tracing::instrument(skip(self), level = "debug")]
|
||||
pub async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result<PduEvent> {
|
||||
self.db.latest_pdu_in_room(None, room_id).await
|
||||
self.db.latest_pdu_in_room(room_id).await
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self), level = "debug")]
|
||||
pub async fn last_timeline_count(
|
||||
&self,
|
||||
sender_user: Option<&UserId>,
|
||||
room_id: &RoomId,
|
||||
) -> Result<PduCount> {
|
||||
self.db.last_timeline_count(sender_user, room_id).await
|
||||
pub async fn last_timeline_count(&self, room_id: &RoomId) -> Result<PduCount> {
|
||||
self.db.last_timeline_count(room_id).await
|
||||
}
|
||||
|
||||
/// Returns the `count` of this pdu's id.
|
||||
|
@ -545,6 +541,10 @@ impl Service {
|
|||
| _ => {},
|
||||
}
|
||||
|
||||
// CONCERN: If we receive events with a relation out-of-order, we never write
|
||||
// their relation / thread. We need some kind of way to trigger when we receive
|
||||
// this event, and potentially a way to rebuild the table entirely.
|
||||
|
||||
if let Ok(content) = pdu.get_content::<ExtractRelatesToEventId>() {
|
||||
if let Ok(related_pducount) = self.get_pdu_count(&content.relates_to.event_id).await {
|
||||
self.services
|
||||
|
@ -996,34 +996,30 @@ impl Service {
|
|||
#[inline]
|
||||
pub fn all_pdus<'a>(
|
||||
&'a self,
|
||||
user_id: &'a UserId,
|
||||
room_id: &'a RoomId,
|
||||
) -> impl Stream<Item = PdusIterItem> + Send + 'a {
|
||||
self.pdus(Some(user_id), room_id, None).ignore_err()
|
||||
self.pdus(room_id, None).ignore_err()
|
||||
}
|
||||
|
||||
/// Reverse iteration starting at from.
|
||||
#[tracing::instrument(skip(self), level = "debug")]
|
||||
pub fn pdus_rev<'a>(
|
||||
&'a self,
|
||||
user_id: Option<&'a UserId>,
|
||||
room_id: &'a RoomId,
|
||||
until: Option<PduCount>,
|
||||
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
|
||||
self.db
|
||||
.pdus_rev(user_id, room_id, until.unwrap_or_else(PduCount::max))
|
||||
.pdus_rev(room_id, until.unwrap_or_else(PduCount::max))
|
||||
}
|
||||
|
||||
/// Forward iteration starting at from.
|
||||
#[tracing::instrument(skip(self), level = "debug")]
|
||||
pub fn pdus<'a>(
|
||||
&'a self,
|
||||
user_id: Option<&'a UserId>,
|
||||
room_id: &'a RoomId,
|
||||
from: Option<PduCount>,
|
||||
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
|
||||
self.db
|
||||
.pdus(user_id, room_id, from.unwrap_or_else(PduCount::min))
|
||||
self.db.pdus(room_id, from.unwrap_or_else(PduCount::min))
|
||||
}
|
||||
|
||||
/// Replace a PDU with the redacted form.
|
||||
|
|
|
@ -781,7 +781,7 @@ impl Service {
|
|||
|
||||
for pdu in pdus {
|
||||
// Redacted events are not notification targets (we don't send push for them)
|
||||
if pdu.contains_unsigned_property("redacted_because", serde_json::Value::is_string) {
|
||||
if pdu.is_redacted() {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue