mirror of
https://forgejo.ellis.link/continuwuation/continuwuity.git
synced 2025-09-11 04:53:01 +02:00
refactor: Promote handling unsigned data out of timeline
Also fixes: - Transaction IDs leaking in event route - Age not being set for event relations or threads - Both of the above for search results Notes down concern with relations table
This commit is contained in:
parent
5d44653e3a
commit
6c11e59c4a
17 changed files with 104 additions and 72 deletions
|
@ -3,7 +3,6 @@ use std::{mem::size_of, sync::Arc};
|
|||
use conduwuit::{
|
||||
PduCount, PduEvent,
|
||||
arrayvec::ArrayVec,
|
||||
result::LogErr,
|
||||
utils::{
|
||||
ReadyExt,
|
||||
stream::{TryIgnore, WidebandExt},
|
||||
|
@ -80,9 +79,7 @@ impl Data {
|
|||
|
||||
let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?;
|
||||
|
||||
if pdu.sender != user_id {
|
||||
pdu.remove_transaction_id().log_err().ok();
|
||||
}
|
||||
pdu.set_unsigned(Some(user_id));
|
||||
|
||||
Some((shorteventid, pdu))
|
||||
})
|
||||
|
|
|
@ -127,7 +127,12 @@ pub async fn search_pdus<'a>(
|
|||
.then_some(pdu)
|
||||
})
|
||||
.skip(query.skip)
|
||||
.take(query.limit);
|
||||
.take(query.limit)
|
||||
.map(move |mut pdu| {
|
||||
pdu.set_unsigned(query.user_id);
|
||||
// TODO: bundled aggregation
|
||||
pdu
|
||||
});
|
||||
|
||||
Ok((count, pdus))
|
||||
}
|
||||
|
|
|
@ -160,9 +160,7 @@ impl Service {
|
|||
let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?;
|
||||
let pdu_id: PduId = pdu_id.into();
|
||||
|
||||
if pdu.sender != user_id {
|
||||
pdu.remove_transaction_id().ok();
|
||||
}
|
||||
pdu.set_unsigned(Some(user_id));
|
||||
|
||||
Some((pdu_id.shorteventid, pdu))
|
||||
});
|
||||
|
|
|
@ -1,14 +1,11 @@
|
|||
use std::{borrow::Borrow, sync::Arc};
|
||||
use std::sync::Arc;
|
||||
|
||||
use conduwuit::{
|
||||
Err, PduCount, PduEvent, Result, at, err,
|
||||
result::{LogErr, NotFound},
|
||||
utils,
|
||||
utils::stream::TryReadyExt,
|
||||
Err, PduCount, PduEvent, Result, at, err, result::NotFound, utils, utils::stream::TryReadyExt,
|
||||
};
|
||||
use database::{Database, Deserialized, Json, KeyVal, Map};
|
||||
use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, future::select_ok, pin_mut};
|
||||
use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, UserId, api::Direction};
|
||||
use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, api::Direction};
|
||||
|
||||
use super::{PduId, RawPduId};
|
||||
use crate::{Dep, rooms, rooms::short::ShortRoomId};
|
||||
|
@ -46,12 +43,8 @@ impl Data {
|
|||
}
|
||||
|
||||
#[inline]
|
||||
pub(super) async fn last_timeline_count(
|
||||
&self,
|
||||
sender_user: Option<&UserId>,
|
||||
room_id: &RoomId,
|
||||
) -> Result<PduCount> {
|
||||
let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max());
|
||||
pub(super) async fn last_timeline_count(&self, room_id: &RoomId) -> Result<PduCount> {
|
||||
let pdus_rev = self.pdus_rev(room_id, PduCount::max());
|
||||
|
||||
pin_mut!(pdus_rev);
|
||||
let last_count = pdus_rev
|
||||
|
@ -65,12 +58,8 @@ impl Data {
|
|||
}
|
||||
|
||||
#[inline]
|
||||
pub(super) async fn latest_pdu_in_room(
|
||||
&self,
|
||||
sender_user: Option<&UserId>,
|
||||
room_id: &RoomId,
|
||||
) -> Result<PduEvent> {
|
||||
let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max());
|
||||
pub(super) async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result<PduEvent> {
|
||||
let pdus_rev = self.pdus_rev(room_id, PduCount::max());
|
||||
|
||||
pin_mut!(pdus_rev);
|
||||
pdus_rev
|
||||
|
@ -223,7 +212,6 @@ impl Data {
|
|||
/// order.
|
||||
pub(super) fn pdus_rev<'a>(
|
||||
&'a self,
|
||||
user_id: Option<&'a UserId>,
|
||||
room_id: &'a RoomId,
|
||||
until: PduCount,
|
||||
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
|
||||
|
@ -233,14 +221,13 @@ impl Data {
|
|||
self.pduid_pdu
|
||||
.rev_raw_stream_from(¤t)
|
||||
.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
|
||||
.ready_and_then(move |item| Self::each_pdu(item, user_id))
|
||||
.ready_and_then(Self::from_json_slice)
|
||||
})
|
||||
.try_flatten_stream()
|
||||
}
|
||||
|
||||
pub(super) fn pdus<'a>(
|
||||
&'a self,
|
||||
user_id: Option<&'a UserId>,
|
||||
room_id: &'a RoomId,
|
||||
from: PduCount,
|
||||
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
|
||||
|
@ -250,21 +237,15 @@ impl Data {
|
|||
self.pduid_pdu
|
||||
.raw_stream_from(¤t)
|
||||
.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
|
||||
.ready_and_then(move |item| Self::each_pdu(item, user_id))
|
||||
.ready_and_then(Self::from_json_slice)
|
||||
})
|
||||
.try_flatten_stream()
|
||||
}
|
||||
|
||||
fn each_pdu((pdu_id, pdu): KeyVal<'_>, user_id: Option<&UserId>) -> Result<PdusIterItem> {
|
||||
fn from_json_slice((pdu_id, pdu): KeyVal<'_>) -> Result<PdusIterItem> {
|
||||
let pdu_id: RawPduId = pdu_id.into();
|
||||
|
||||
let mut pdu = serde_json::from_slice::<PduEvent>(pdu)?;
|
||||
|
||||
if Some(pdu.sender.borrow()) != user_id {
|
||||
pdu.remove_transaction_id().log_err().ok();
|
||||
}
|
||||
|
||||
pdu.add_age().log_err().ok();
|
||||
let pdu = serde_json::from_slice::<PduEvent>(pdu)?;
|
||||
|
||||
Ok((pdu_id.pdu_count(), pdu))
|
||||
}
|
||||
|
|
|
@ -165,7 +165,7 @@ impl Service {
|
|||
|
||||
#[tracing::instrument(skip(self), level = "debug")]
|
||||
pub async fn first_item_in_room(&self, room_id: &RoomId) -> Result<(PduCount, PduEvent)> {
|
||||
let pdus = self.pdus(None, room_id, None);
|
||||
let pdus = self.pdus(room_id, None);
|
||||
|
||||
pin_mut!(pdus);
|
||||
pdus.try_next()
|
||||
|
@ -175,16 +175,12 @@ impl Service {
|
|||
|
||||
#[tracing::instrument(skip(self), level = "debug")]
|
||||
pub async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result<PduEvent> {
|
||||
self.db.latest_pdu_in_room(None, room_id).await
|
||||
self.db.latest_pdu_in_room(room_id).await
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self), level = "debug")]
|
||||
pub async fn last_timeline_count(
|
||||
&self,
|
||||
sender_user: Option<&UserId>,
|
||||
room_id: &RoomId,
|
||||
) -> Result<PduCount> {
|
||||
self.db.last_timeline_count(sender_user, room_id).await
|
||||
pub async fn last_timeline_count(&self, room_id: &RoomId) -> Result<PduCount> {
|
||||
self.db.last_timeline_count(room_id).await
|
||||
}
|
||||
|
||||
/// Returns the `count` of this pdu's id.
|
||||
|
@ -545,6 +541,10 @@ impl Service {
|
|||
| _ => {},
|
||||
}
|
||||
|
||||
// CONCERN: If we receive events with a relation out-of-order, we never write
|
||||
// their relation / thread. We need some kind of way to trigger when we receive
|
||||
// this event, and potentially a way to rebuild the table entirely.
|
||||
|
||||
if let Ok(content) = pdu.get_content::<ExtractRelatesToEventId>() {
|
||||
if let Ok(related_pducount) = self.get_pdu_count(&content.relates_to.event_id).await {
|
||||
self.services
|
||||
|
@ -996,34 +996,30 @@ impl Service {
|
|||
#[inline]
|
||||
pub fn all_pdus<'a>(
|
||||
&'a self,
|
||||
user_id: &'a UserId,
|
||||
room_id: &'a RoomId,
|
||||
) -> impl Stream<Item = PdusIterItem> + Send + 'a {
|
||||
self.pdus(Some(user_id), room_id, None).ignore_err()
|
||||
self.pdus(room_id, None).ignore_err()
|
||||
}
|
||||
|
||||
/// Reverse iteration starting at from.
|
||||
#[tracing::instrument(skip(self), level = "debug")]
|
||||
pub fn pdus_rev<'a>(
|
||||
&'a self,
|
||||
user_id: Option<&'a UserId>,
|
||||
room_id: &'a RoomId,
|
||||
until: Option<PduCount>,
|
||||
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
|
||||
self.db
|
||||
.pdus_rev(user_id, room_id, until.unwrap_or_else(PduCount::max))
|
||||
.pdus_rev(room_id, until.unwrap_or_else(PduCount::max))
|
||||
}
|
||||
|
||||
/// Forward iteration starting at from.
|
||||
#[tracing::instrument(skip(self), level = "debug")]
|
||||
pub fn pdus<'a>(
|
||||
&'a self,
|
||||
user_id: Option<&'a UserId>,
|
||||
room_id: &'a RoomId,
|
||||
from: Option<PduCount>,
|
||||
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
|
||||
self.db
|
||||
.pdus(user_id, room_id, from.unwrap_or_else(PduCount::min))
|
||||
self.db.pdus(room_id, from.unwrap_or_else(PduCount::min))
|
||||
}
|
||||
|
||||
/// Replace a PDU with the redacted form.
|
||||
|
|
|
@ -781,7 +781,7 @@ impl Service {
|
|||
|
||||
for pdu in pdus {
|
||||
// Redacted events are not notification targets (we don't send push for them)
|
||||
if pdu.contains_unsigned_property("redacted_because", serde_json::Value::is_string) {
|
||||
if pdu.is_redacted() {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue