diff --git a/src/admin/query/room_timeline.rs b/src/admin/query/room_timeline.rs index 58f75cb9..0fd22ca7 100644 --- a/src/admin/query/room_timeline.rs +++ b/src/admin/query/room_timeline.rs @@ -31,7 +31,7 @@ pub(super) async fn last(&self, room_id: OwnedRoomOrAliasId) -> Result { .services .rooms .timeline - .last_timeline_count(&room_id) + .last_timeline_count(None, &room_id) .await?; self.write_str(&format!("{result:#?}")).await @@ -52,7 +52,7 @@ pub(super) async fn pdus( .services .rooms .timeline - .pdus_rev(&room_id, from) + .pdus_rev(None, &room_id, from) .try_take(limit.unwrap_or(3)) .try_collect() .await?; diff --git a/src/api/client/context.rs b/src/api/client/context.rs index ee3a458c..dbc2a22f 100644 --- a/src/api/client/context.rs +++ b/src/api/client/context.rs @@ -84,25 +84,11 @@ pub(crate) async fn get_context_route( let base_event = ignored_filter(&services, (base_count, base_pdu), sender_user); - // PDUs are used to get seen user IDs and then returned in response. - let events_before = services .rooms .timeline - .pdus_rev(room_id, Some(base_count)) + .pdus_rev(Some(sender_user), room_id, Some(base_count)) .ignore_err() - .then(async |mut pdu| { - pdu.1.set_unsigned(Some(sender_user)); - if let Err(e) = services - .rooms - .pdu_metadata - .add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1) - .await - { - debug_warn!("Failed to add bundled aggregations: {e}"); - } - pdu - }) .ready_filter_map(|item| event_filter(item, filter)) .wide_filter_map(|item| ignored_filter(&services, item, sender_user)) .wide_filter_map(|item| visibility_filter(&services, item, sender_user)) @@ -112,20 +98,8 @@ pub(crate) async fn get_context_route( let events_after = services .rooms .timeline - .pdus(room_id, Some(base_count)) + .pdus(Some(sender_user), room_id, Some(base_count)) .ignore_err() - .then(async |mut pdu| { - pdu.1.set_unsigned(Some(sender_user)); - if let Err(e) = services - .rooms - .pdu_metadata - .add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1) - .await - { - debug_warn!("Failed to add bundled aggregations: {e}"); - } - pdu - }) .ready_filter_map(|item| event_filter(item, filter)) .wide_filter_map(|item| ignored_filter(&services, item, sender_user)) .wide_filter_map(|item| visibility_filter(&services, item, sender_user)) diff --git a/src/api/client/message.rs b/src/api/client/message.rs index 6087478c..e442850b 100644 --- a/src/api/client/message.rs +++ b/src/api/client/message.rs @@ -2,7 +2,7 @@ use core::panic; use axum::extract::State; use conduwuit::{ - Err, Result, at, debug_warn, + Err, Result, at, matrix::{ Event, pdu::{PduCount, PduEvent}, @@ -114,14 +114,14 @@ pub(crate) async fn get_message_events_route( | Direction::Forward => services .rooms .timeline - .pdus(room_id, Some(from)) + .pdus(Some(sender_user), room_id, Some(from)) .ignore_err() .boxed(), | Direction::Backward => services .rooms .timeline - .pdus_rev(room_id, Some(from)) + .pdus_rev(Some(sender_user), room_id, Some(from)) .ignore_err() .boxed(), }; @@ -132,18 +132,6 @@ pub(crate) async fn get_message_events_route( .wide_filter_map(|item| ignored_filter(&services, item, sender_user)) .wide_filter_map(|item| visibility_filter(&services, item, sender_user)) .take(limit) - .then(async |mut pdu| { - pdu.1.set_unsigned(Some(sender_user)); - if let Err(e) = services - .rooms - .pdu_metadata - .add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1) - .await - { - debug_warn!("Failed to add bundled aggregations: {e}"); - } - pdu - }) .collect() .await; diff --git a/src/api/client/relations.rs b/src/api/client/relations.rs index 377f0c71..b8c2dd4d 100644 --- a/src/api/client/relations.rs +++ b/src/api/client/relations.rs @@ -1,6 +1,6 @@ use axum::extract::State; use conduwuit::{ - Result, at, debug_warn, + Result, at, matrix::pdu::PduCount, utils::{IterStream, ReadyExt, result::FlatOk, stream::WidebandExt}, }; @@ -149,17 +149,6 @@ async fn paginate_relations_with_filter( .ready_take_while(|(count, _)| Some(*count) != to) .wide_filter_map(|item| visibility_filter(services, sender_user, item)) .take(limit) - .then(async |mut pdu| { - if let Err(e) = services - .rooms - .pdu_metadata - .add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1) - .await - { - debug_warn!("Failed to add bundled aggregations to relation: {e}"); - } - pdu - }) .collect() .await; @@ -183,10 +172,6 @@ async fn paginate_relations_with_filter( }) } -// TODO: Can we move the visibility filter lower down, to avoid checking events -// that won't be sent? At the moment this also results in getting events that -// appear to have no relation because intermediaries are not visible to the -// user. async fn visibility_filter( services: &Services, sender_user: &UserId, diff --git a/src/api/client/room/event.rs b/src/api/client/room/event.rs index e7b0bb37..2b115b5c 100644 --- a/src/api/client/room/event.rs +++ b/src/api/client/room/event.rs @@ -1,5 +1,5 @@ use axum::extract::State; -use conduwuit::{Err, Event, Result, debug_warn, err}; +use conduwuit::{Err, Event, Result, err}; use futures::{FutureExt, TryFutureExt, future::try_join}; use ruma::api::client::room::get_room_event; @@ -38,16 +38,7 @@ pub(crate) async fn get_room_event_route( "Fetched PDU must match requested" ); - if let Err(e) = services - .rooms - .pdu_metadata - .add_bundled_aggregations_to_pdu(body.sender_user(), &mut event) - .await - { - debug_warn!("Failed to add bundled aggregations to event: {e}"); - } - - event.set_unsigned(body.sender_user.as_deref()); + event.add_age().ok(); Ok(get_room_event::v3::Response { event: event.into_room_event() }) } diff --git a/src/api/client/room/initial_sync.rs b/src/api/client/room/initial_sync.rs index 2f965245..ca63610b 100644 --- a/src/api/client/room/initial_sync.rs +++ b/src/api/client/room/initial_sync.rs @@ -1,6 +1,6 @@ use axum::extract::State; use conduwuit::{ - Err, PduEvent, Result, at, debug_warn, + Err, PduEvent, Result, at, utils::{BoolExt, stream::TryTools}, }; use futures::TryStreamExt; @@ -25,28 +25,12 @@ pub(crate) async fn room_initial_sync_route( return Err!(Request(Forbidden("No room preview available."))); } - // Events are returned in body - let limit = LIMIT_MAX; let events: Vec<_> = services .rooms .timeline - .pdus_rev(room_id, None) + .pdus_rev(None, room_id, None) .try_take(limit) - .and_then(async |mut pdu| { - pdu.1.set_unsigned(body.sender_user.as_deref()); - if let Some(sender_user) = body.sender_user.as_deref() { - if let Err(e) = services - .rooms - .pdu_metadata - .add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1) - .await - { - debug_warn!("Failed to add bundled aggregations: {e}"); - } - } - Ok(pdu) - }) .try_collect() .await?; diff --git a/src/api/client/search.rs b/src/api/client/search.rs index af5fccec..d4dcde57 100644 --- a/src/api/client/search.rs +++ b/src/api/client/search.rs @@ -2,7 +2,7 @@ use std::collections::BTreeMap; use axum::extract::State; use conduwuit::{ - Err, Result, at, debug_warn, is_true, + Err, Result, at, is_true, matrix::pdu::PduEvent, result::FlatOk, utils::{IterStream, stream::ReadyExt}, @@ -144,17 +144,6 @@ async fn category_room_events( .map(at!(2)) .flatten() .stream() - .then(|mut pdu| async { - if let Err(e) = services - .rooms - .pdu_metadata - .add_bundled_aggregations_to_pdu(sender_user, &mut pdu) - .await - { - debug_warn!("Failed to add bundled aggregations to search result: {e}"); - } - pdu - }) .map(PduEvent::into_room_event) .map(|result| SearchResult { rank: None, diff --git a/src/api/client/sync/mod.rs b/src/api/client/sync/mod.rs index 1ea62883..40370160 100644 --- a/src/api/client/sync/mod.rs +++ b/src/api/client/sync/mod.rs @@ -3,7 +3,7 @@ mod v4; mod v5; use conduwuit::{ - Error, PduCount, Result, debug_warn, + Error, PduCount, Result, matrix::pdu::PduEvent, utils::stream::{BroadbandExt, ReadyExt, TryIgnore}, }; @@ -31,7 +31,11 @@ async fn load_timeline( next_batch: Option, limit: usize, ) -> Result<(Vec<(PduCount, PduEvent)>, bool), Error> { - let last_timeline_count = services.rooms.timeline.last_timeline_count(room_id).await?; + let last_timeline_count = services + .rooms + .timeline + .last_timeline_count(Some(sender_user), room_id) + .await?; if last_timeline_count <= roomsincecount { return Ok((Vec::new(), false)); @@ -40,25 +44,10 @@ async fn load_timeline( let non_timeline_pdus = services .rooms .timeline - .pdus_rev(room_id, None) + .pdus_rev(Some(sender_user), room_id, None) .ignore_err() .ready_skip_while(|&(pducount, _)| pducount > next_batch.unwrap_or_else(PduCount::max)) - .ready_take_while(|&(pducount, _)| pducount > roomsincecount) - .map(move |mut pdu| { - pdu.1.set_unsigned(Some(sender_user)); - pdu - }) - .then(async move |mut pdu| { - if let Err(e) = services - .rooms - .pdu_metadata - .add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1) - .await - { - debug_warn!("Failed to add bundled aggregations: {e}"); - } - pdu - }); + .ready_take_while(|&(pducount, _)| pducount > roomsincecount); // Take the last events for the timeline pin_mut!(non_timeline_pdus); diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index 7eb8c7e1..7bc74c95 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -1189,7 +1189,7 @@ async fn calculate_heroes( services .rooms .timeline - .all_pdus(room_id) + .all_pdus(sender_user, room_id) .ready_filter(|(_, pdu)| pdu.kind == RoomMember) .fold_default(|heroes: Vec<_>, (_, pdu)| { fold_hero(heroes, services, room_id, sender_user, pdu) diff --git a/src/api/client/threads.rs b/src/api/client/threads.rs index 09fb75d6..5b838bef 100644 --- a/src/api/client/threads.rs +++ b/src/api/client/threads.rs @@ -1,6 +1,6 @@ use axum::extract::State; use conduwuit::{ - Result, at, debug_warn, + Result, at, matrix::pdu::{PduCount, PduEvent}, }; use futures::StreamExt; @@ -28,8 +28,6 @@ pub(crate) async fn get_threads_route( .transpose()? .unwrap_or_else(PduCount::max); - // TODO: user_can_see_event and set_unsigned should be at the same level / - // function, so unsigned is only set for seen events. let threads: Vec<(PduCount, PduEvent)> = services .rooms .threads @@ -44,17 +42,6 @@ pub(crate) async fn get_threads_route( .await .then_some((count, pdu)) }) - .then(|(count, mut pdu)| async move { - if let Err(e) = services - .rooms - .pdu_metadata - .add_bundled_aggregations_to_pdu(body.sender_user(), &mut pdu) - .await - { - debug_warn!("Failed to add bundled aggregations to thread: {e}"); - } - (count, pdu) - }) .collect() .await; diff --git a/src/api/server/backfill.rs b/src/api/server/backfill.rs index 058fc273..3cfbcedc 100644 --- a/src/api/server/backfill.rs +++ b/src/api/server/backfill.rs @@ -3,7 +3,6 @@ use std::cmp; use axum::extract::State; use conduwuit::{ PduCount, Result, - result::LogErr, utils::{IterStream, ReadyExt, stream::TryTools}, }; use futures::{FutureExt, StreamExt, TryStreamExt}; @@ -63,7 +62,7 @@ pub(crate) async fn get_backfill_route( pdus: services .rooms .timeline - .pdus_rev(&body.room_id, Some(from.saturating_add(1))) + .pdus_rev(None, &body.room_id, Some(from.saturating_add(1))) .try_take(limit) .try_filter_map(|(_, pdu)| async move { Ok(services @@ -73,15 +72,6 @@ pub(crate) async fn get_backfill_route( .await .then_some(pdu)) }) - .and_then(async |mut pdu| { - // Strip the transaction ID, as that is private - pdu.remove_transaction_id().log_err().ok(); - // Add age, as this is specified - pdu.add_age().log_err().ok(); - // It's not clear if we should strip or add any more data, leave as is. - // In particular: Redaction? - Ok(pdu) - }) .try_filter_map(|pdu| async move { Ok(services .rooms diff --git a/src/core/matrix/pdu/unsigned.rs b/src/core/matrix/pdu/unsigned.rs index 2726a292..23897519 100644 --- a/src/core/matrix/pdu/unsigned.rs +++ b/src/core/matrix/pdu/unsigned.rs @@ -1,24 +1,11 @@ -use std::{borrow::Borrow, collections::BTreeMap}; +use std::collections::BTreeMap; use ruma::MilliSecondsSinceUnixEpoch; use serde::Deserialize; use serde_json::value::{RawValue as RawJsonValue, Value as JsonValue, to_raw_value}; use super::Pdu; -use crate::{Result, err, implement, is_true, result::LogErr}; - -/// Set the `unsigned` field of the PDU using only information in the PDU. -/// Some unsigned data is already set within the database (eg. prev events, -/// threads). Once this is done, other data must be calculated from the database -/// (eg. relations) This is for server-to-client events. -/// Backfill handles this itself. -#[implement(Pdu)] -pub fn set_unsigned(&mut self, user_id: Option<&ruma::UserId>) { - if Some(self.sender.borrow()) != user_id { - self.remove_transaction_id().log_err().ok(); - } - self.add_age().log_err().ok(); -} +use crate::{Result, err, implement, is_true}; #[implement(Pdu)] pub fn remove_transaction_id(&mut self) -> Result { diff --git a/src/service/rooms/pdu_metadata/bundled_aggregations.rs b/src/service/rooms/pdu_metadata/bundled_aggregations.rs deleted file mode 100644 index 4ef8efc1..00000000 --- a/src/service/rooms/pdu_metadata/bundled_aggregations.rs +++ /dev/null @@ -1,394 +0,0 @@ -use conduwuit::{Event, PduEvent, Result, err}; -use ruma::{ - EventId, RoomId, UserId, - api::Direction, - events::relation::{BundledMessageLikeRelations, BundledReference, ReferenceChunk}, -}; - -use super::PdusIterItem; - -const MAX_BUNDLED_RELATIONS: usize = 50; - -impl super::Service { - /// Gets bundled aggregations for an event according to the Matrix - /// specification. - /// - m.replace relations are bundled to include the most recent replacement - /// event. - /// - m.reference relations are bundled to include a chunk of event IDs. - #[tracing::instrument(skip(self), level = "debug")] - pub async fn get_bundled_aggregations( - &self, - user_id: &UserId, - room_id: &RoomId, - event_id: &EventId, - ) -> Result>>> { - let relations = self - .get_relations( - user_id, - room_id, - event_id, - conduwuit::PduCount::max(), - MAX_BUNDLED_RELATIONS, - 0, - Direction::Backward, - ) - .await; - // The relations database code still handles the basic unsigned data - // We don't want to recursively fetch relations - - // TODO: Event visibility check - // TODO: ignored users? - - if relations.is_empty() { - return Ok(None); - } - - let mut replace_events = Vec::with_capacity(relations.len().min(10)); // Most events have few replacements - let mut reference_events = Vec::with_capacity(relations.len()); - - for relation in &relations { - let pdu = &relation.1; - - let content = pdu.get_content_as_value(); - if let Some(relates_to) = content.get("m.relates_to") { - // We don't check that the event relates back, because we assume the database is - // good. - if let Some(rel_type) = relates_to.get("rel_type") { - match rel_type.as_str() { - | Some("m.replace") => { - replace_events.push(relation); - }, - | Some("m.reference") => { - reference_events.push(relation); - }, - | _ => { - // Ignore other relation types for now - // Threads are in the database but not handled here - // Other types are not specified AFAICT. - }, - } - } - } - } - - // If no relations to bundle, return None - if replace_events.is_empty() && reference_events.is_empty() { - return Ok(None); - } - - let mut bundled = BundledMessageLikeRelations::new(); - - // Handle m.replace relations - find the most recent one - if !replace_events.is_empty() { - let most_recent_replacement = Self::find_most_recent_replacement(&replace_events)?; - - // Convert the replacement event to the bundled format - if let Some(replacement_pdu) = most_recent_replacement { - // According to the Matrix spec, we should include the full event as raw JSON - let replacement_json = serde_json::to_string(replacement_pdu) - .map_err(|e| err!(Database("Failed to serialize replacement event: {e}")))?; - let raw_value = serde_json::value::RawValue::from_string(replacement_json) - .map_err(|e| err!(Database("Failed to create RawValue: {e}")))?; - bundled.replace = Some(Box::new(raw_value)); - } - } - - // Handle m.reference relations - collect event IDs - if !reference_events.is_empty() { - let reference_chunk = Self::build_reference_chunk(&reference_events)?; - if !reference_chunk.is_empty() { - bundled.reference = Some(Box::new(ReferenceChunk::new(reference_chunk))); - } - } - - // TODO: Handle other relation types (m.annotation, etc.) when specified - - Ok(Some(bundled)) - } - - /// Build reference chunk for m.reference bundled aggregations - fn build_reference_chunk( - reference_events: &[&PdusIterItem], - ) -> Result> { - let mut chunk = Vec::with_capacity(reference_events.len()); - - for relation in reference_events { - let pdu = &relation.1; - - let reference_entry = BundledReference::new(pdu.event_id().to_owned()); - chunk.push(reference_entry); - } - - // Don't sort, order is unspecified - - Ok(chunk) - } - - /// Find the most recent replacement event based on origin_server_ts and - /// lexicographic event_id ordering - fn find_most_recent_replacement<'a>( - replacement_events: &'a [&'a PdusIterItem], - ) -> Result> { - if replacement_events.is_empty() { - return Ok(None); - } - - let mut most_recent: Option<&PduEvent> = None; - - // Jank, is there a better way to do this? - for relation in replacement_events { - let pdu = &relation.1; - - match most_recent { - | None => { - most_recent = Some(pdu); - }, - | Some(current_most_recent) => { - // Compare by origin_server_ts first - match pdu - .origin_server_ts() - .cmp(¤t_most_recent.origin_server_ts()) - { - | std::cmp::Ordering::Greater => { - most_recent = Some(pdu); - }, - | std::cmp::Ordering::Equal => { - // If timestamps are equal, use lexicographic ordering of event_id - if pdu.event_id() > current_most_recent.event_id() { - most_recent = Some(pdu); - } - }, - | std::cmp::Ordering::Less => { - // Keep current most recent - }, - } - }, - } - } - - Ok(most_recent) - } - - /// Adds bundled aggregations to a PDU's unsigned field - #[tracing::instrument(skip(self, pdu), level = "debug")] - pub async fn add_bundled_aggregations_to_pdu( - &self, - user_id: &UserId, - pdu: &mut PduEvent, - ) -> Result<()> { - if pdu.is_redacted() { - return Ok(()); - } - - let bundled_aggregations = self - .get_bundled_aggregations(user_id, pdu.room_id(), pdu.event_id()) - .await?; - - if let Some(aggregations) = bundled_aggregations { - let aggregations_json = serde_json::to_value(aggregations) - .map_err(|e| err!(Database("Failed to serialize bundled aggregations: {e}")))?; - - Self::add_bundled_aggregations_to_unsigned(pdu, aggregations_json)?; - } - - Ok(()) - } - - /// Helper method to add bundled aggregations to a PDU's unsigned - /// field - fn add_bundled_aggregations_to_unsigned( - pdu: &mut PduEvent, - aggregations_json: serde_json::Value, - ) -> Result<()> { - use serde_json::{ - Map, Value as JsonValue, - value::{RawValue as RawJsonValue, to_raw_value}, - }; - - let mut unsigned: Map = pdu - .unsigned - .as_deref() - .map(RawJsonValue::get) - .map_or_else(|| Ok(Map::new()), serde_json::from_str) - .map_err(|e| err!(Database("Invalid unsigned in pdu event: {e}")))?; - - let relations = unsigned - .entry("m.relations") - .or_insert_with(|| JsonValue::Object(Map::new())) - .as_object_mut() - .ok_or_else(|| err!(Database("m.relations is not an object")))?; - - if let JsonValue::Object(aggregations_map) = aggregations_json { - for (rel_type, aggregation) in aggregations_map { - relations.insert(rel_type, aggregation); - } - } - - pdu.unsigned = Some(to_raw_value(&unsigned)?); - - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use conduwuit_core::pdu::{EventHash, PduEvent}; - use ruma::{UInt, events::TimelineEventType, owned_event_id, owned_room_id, owned_user_id}; - use serde_json::{Value as JsonValue, json, value::to_raw_value}; - - fn create_test_pdu(unsigned_content: Option) -> PduEvent { - PduEvent { - event_id: owned_event_id!("$test:example.com"), - room_id: owned_room_id!("!test:example.com"), - sender: owned_user_id!("@test:example.com"), - origin_server_ts: UInt::try_from(1_234_567_890_u64).unwrap(), - kind: TimelineEventType::RoomMessage, - content: to_raw_value(&json!({"msgtype": "m.text", "body": "test"})).unwrap(), - state_key: None, - prev_events: vec![], - depth: UInt::from(1_u32), - auth_events: vec![], - redacts: None, - unsigned: unsigned_content.map(|content| to_raw_value(&content).unwrap()), - hashes: EventHash { sha256: "test_hash".to_owned() }, - signatures: None, - origin: None, - } - } - - fn create_bundled_aggregations() -> JsonValue { - json!({ - "m.replace": { - "event_id": "$replace:example.com", - "origin_server_ts": 1_234_567_890, - "sender": "@replacer:example.com" - }, - "m.reference": { - "count": 5, - "chunk": [ - "$ref1:example.com", - "$ref2:example.com" - ] - } - }) - } - - #[test] - fn test_add_bundled_aggregations_to_unsigned_no_existing_unsigned() { - let mut pdu = create_test_pdu(None); - let aggregations = create_bundled_aggregations(); - - let result = super::super::Service::add_bundled_aggregations_to_unsigned( - &mut pdu, - aggregations.clone(), - ); - assert!(result.is_ok(), "Should succeed when no unsigned field exists"); - - assert!(pdu.unsigned.is_some(), "Unsigned field should be created"); - - let unsigned_str = pdu.unsigned.as_ref().unwrap().get(); - let unsigned: JsonValue = serde_json::from_str(unsigned_str).unwrap(); - - assert!(unsigned.get("m.relations").is_some(), "m.relations should exist"); - assert_eq!( - unsigned["m.relations"], aggregations, - "Relations should match the aggregations" - ); - } - - #[test] - fn test_add_bundled_aggregations_to_unsigned_overwrite_same_relation_type() { - let existing_unsigned = json!({ - "m.relations": { - "m.replace": { - "event_id": "$old_replace:example.com", - "origin_server_ts": 1_111_111_111, - "sender": "@old_replacer:example.com" - } - } - }); - - let mut pdu = create_test_pdu(Some(existing_unsigned)); - let new_aggregations = create_bundled_aggregations(); - - let result = super::super::Service::add_bundled_aggregations_to_unsigned( - &mut pdu, - new_aggregations.clone(), - ); - assert!(result.is_ok(), "Should succeed when overwriting same relation type"); - - let unsigned_str = pdu.unsigned.as_ref().unwrap().get(); - let unsigned: JsonValue = serde_json::from_str(unsigned_str).unwrap(); - - let relations = &unsigned["m.relations"]; - - assert_eq!( - relations["m.replace"], new_aggregations["m.replace"], - "m.replace should be updated" - ); - assert_eq!( - relations["m.replace"]["event_id"], "$replace:example.com", - "Should have new event_id" - ); - - assert!(relations.get("m.reference").is_some(), "New m.reference should be added"); - } - - #[test] - fn test_add_bundled_aggregations_to_unsigned_preserve_other_unsigned_fields() { - // Test case: Other unsigned fields should be preserved - let existing_unsigned = json!({ - "age": 98765, - "prev_content": {"msgtype": "m.text", "body": "old message"}, - "redacted_because": {"event_id": "$redaction:example.com"}, - "m.relations": { - "m.annotation": {"count": 1} - } - }); - - let mut pdu = create_test_pdu(Some(existing_unsigned)); - let new_aggregations = json!({ - "m.replace": {"event_id": "$new:example.com"} - }); - - let result = super::super::Service::add_bundled_aggregations_to_unsigned( - &mut pdu, - new_aggregations, - ); - assert!(result.is_ok(), "Should succeed while preserving other fields"); - - let unsigned_str = pdu.unsigned.as_ref().unwrap().get(); - let unsigned: JsonValue = serde_json::from_str(unsigned_str).unwrap(); - - // Verify all existing fields are preserved - assert_eq!(unsigned["age"], 98765, "age should be preserved"); - assert!(unsigned.get("prev_content").is_some(), "prev_content should be preserved"); - assert!( - unsigned.get("redacted_because").is_some(), - "redacted_because should be preserved" - ); - - // Verify relations were merged correctly - let relations = &unsigned["m.relations"]; - assert!( - relations.get("m.annotation").is_some(), - "Existing m.annotation should be preserved" - ); - assert!(relations.get("m.replace").is_some(), "New m.replace should be added"); - } - - #[test] - fn test_add_bundled_aggregations_to_unsigned_invalid_existing_unsigned() { - // Test case: Invalid JSON in existing unsigned should result in error - let mut pdu = create_test_pdu(None); - // Manually set invalid unsigned data - pdu.unsigned = Some(to_raw_value(&"invalid json").unwrap()); - - let aggregations = create_bundled_aggregations(); - let result = - super::super::Service::add_bundled_aggregations_to_unsigned(&mut pdu, aggregations); - - assert!(result.is_err(), "fails when existing unsigned is invalid"); - // Should we ignore the error and overwrite anyway? - } -} diff --git a/src/service/rooms/pdu_metadata/data.rs b/src/service/rooms/pdu_metadata/data.rs index c4b37b99..f0beab5a 100644 --- a/src/service/rooms/pdu_metadata/data.rs +++ b/src/service/rooms/pdu_metadata/data.rs @@ -3,6 +3,7 @@ use std::{mem::size_of, sync::Arc}; use conduwuit::{ PduCount, PduEvent, arrayvec::ArrayVec, + result::LogErr, utils::{ ReadyExt, stream::{TryIgnore, WidebandExt}, @@ -79,7 +80,9 @@ impl Data { let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?; - pdu.set_unsigned(Some(user_id)); + if pdu.sender != user_id { + pdu.remove_transaction_id().log_err().ok(); + } Some((shorteventid, pdu)) }) diff --git a/src/service/rooms/pdu_metadata/mod.rs b/src/service/rooms/pdu_metadata/mod.rs index 2dff54d8..18221c2d 100644 --- a/src/service/rooms/pdu_metadata/mod.rs +++ b/src/service/rooms/pdu_metadata/mod.rs @@ -1,4 +1,3 @@ -mod bundled_aggregations; mod data; use std::sync::Arc; diff --git a/src/service/rooms/search/mod.rs b/src/service/rooms/search/mod.rs index 7cef5dbf..4100dd75 100644 --- a/src/service/rooms/search/mod.rs +++ b/src/service/rooms/search/mod.rs @@ -127,12 +127,7 @@ pub async fn search_pdus<'a>( .then_some(pdu) }) .skip(query.skip) - .take(query.limit) - .map(move |mut pdu| { - pdu.set_unsigned(query.user_id); - // TODO: bundled aggregation - pdu - }); + .take(query.limit); Ok((count, pdus)) } diff --git a/src/service/rooms/threads/mod.rs b/src/service/rooms/threads/mod.rs index a24183e6..a680df55 100644 --- a/src/service/rooms/threads/mod.rs +++ b/src/service/rooms/threads/mod.rs @@ -160,7 +160,9 @@ impl Service { let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?; let pdu_id: PduId = pdu_id.into(); - pdu.set_unsigned(Some(user_id)); + if pdu.sender != user_id { + pdu.remove_transaction_id().ok(); + } Some((pdu_id.shorteventid, pdu)) }); diff --git a/src/service/rooms/timeline/data.rs b/src/service/rooms/timeline/data.rs index 5f7b8c81..94c78bb0 100644 --- a/src/service/rooms/timeline/data.rs +++ b/src/service/rooms/timeline/data.rs @@ -1,11 +1,14 @@ -use std::sync::Arc; +use std::{borrow::Borrow, sync::Arc}; use conduwuit::{ - Err, PduCount, PduEvent, Result, at, err, result::NotFound, utils, utils::stream::TryReadyExt, + Err, PduCount, PduEvent, Result, at, err, + result::{LogErr, NotFound}, + utils, + utils::stream::TryReadyExt, }; use database::{Database, Deserialized, Json, KeyVal, Map}; use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, future::select_ok, pin_mut}; -use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, api::Direction}; +use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, UserId, api::Direction}; use super::{PduId, RawPduId}; use crate::{Dep, rooms, rooms::short::ShortRoomId}; @@ -43,8 +46,12 @@ impl Data { } #[inline] - pub(super) async fn last_timeline_count(&self, room_id: &RoomId) -> Result { - let pdus_rev = self.pdus_rev(room_id, PduCount::max()); + pub(super) async fn last_timeline_count( + &self, + sender_user: Option<&UserId>, + room_id: &RoomId, + ) -> Result { + let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max()); pin_mut!(pdus_rev); let last_count = pdus_rev @@ -58,8 +65,12 @@ impl Data { } #[inline] - pub(super) async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result { - let pdus_rev = self.pdus_rev(room_id, PduCount::max()); + pub(super) async fn latest_pdu_in_room( + &self, + sender_user: Option<&UserId>, + room_id: &RoomId, + ) -> Result { + let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max()); pin_mut!(pdus_rev); pdus_rev @@ -212,6 +223,7 @@ impl Data { /// order. pub(super) fn pdus_rev<'a>( &'a self, + user_id: Option<&'a UserId>, room_id: &'a RoomId, until: PduCount, ) -> impl Stream> + Send + 'a { @@ -221,13 +233,14 @@ impl Data { self.pduid_pdu .rev_raw_stream_from(¤t) .ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix))) - .ready_and_then(Self::from_json_slice) + .ready_and_then(move |item| Self::each_pdu(item, user_id)) }) .try_flatten_stream() } pub(super) fn pdus<'a>( &'a self, + user_id: Option<&'a UserId>, room_id: &'a RoomId, from: PduCount, ) -> impl Stream> + Send + 'a { @@ -237,15 +250,21 @@ impl Data { self.pduid_pdu .raw_stream_from(¤t) .ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix))) - .ready_and_then(Self::from_json_slice) + .ready_and_then(move |item| Self::each_pdu(item, user_id)) }) .try_flatten_stream() } - fn from_json_slice((pdu_id, pdu): KeyVal<'_>) -> Result { + fn each_pdu((pdu_id, pdu): KeyVal<'_>, user_id: Option<&UserId>) -> Result { let pdu_id: RawPduId = pdu_id.into(); - let pdu = serde_json::from_slice::(pdu)?; + let mut pdu = serde_json::from_slice::(pdu)?; + + if Some(pdu.sender.borrow()) != user_id { + pdu.remove_transaction_id().log_err().ok(); + } + + pdu.add_age().log_err().ok(); Ok((pdu_id.pdu_count(), pdu)) } diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index e65ae2c7..4b2f3cb2 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -165,7 +165,7 @@ impl Service { #[tracing::instrument(skip(self), level = "debug")] pub async fn first_item_in_room(&self, room_id: &RoomId) -> Result<(PduCount, PduEvent)> { - let pdus = self.pdus(room_id, None); + let pdus = self.pdus(None, room_id, None); pin_mut!(pdus); pdus.try_next() @@ -175,12 +175,16 @@ impl Service { #[tracing::instrument(skip(self), level = "debug")] pub async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result { - self.db.latest_pdu_in_room(room_id).await + self.db.latest_pdu_in_room(None, room_id).await } #[tracing::instrument(skip(self), level = "debug")] - pub async fn last_timeline_count(&self, room_id: &RoomId) -> Result { - self.db.last_timeline_count(room_id).await + pub async fn last_timeline_count( + &self, + sender_user: Option<&UserId>, + room_id: &RoomId, + ) -> Result { + self.db.last_timeline_count(sender_user, room_id).await } /// Returns the `count` of this pdu's id. @@ -541,10 +545,6 @@ impl Service { | _ => {}, } - // CONCERN: If we receive events with a relation out-of-order, we never write - // their relation / thread. We need some kind of way to trigger when we receive - // this event, and potentially a way to rebuild the table entirely. - if let Ok(content) = pdu.get_content::() { if let Ok(related_pducount) = self.get_pdu_count(&content.relates_to.event_id).await { self.services @@ -996,30 +996,34 @@ impl Service { #[inline] pub fn all_pdus<'a>( &'a self, + user_id: &'a UserId, room_id: &'a RoomId, ) -> impl Stream + Send + 'a { - self.pdus(room_id, None).ignore_err() + self.pdus(Some(user_id), room_id, None).ignore_err() } /// Reverse iteration starting at from. #[tracing::instrument(skip(self), level = "debug")] pub fn pdus_rev<'a>( &'a self, + user_id: Option<&'a UserId>, room_id: &'a RoomId, until: Option, ) -> impl Stream> + Send + 'a { self.db - .pdus_rev(room_id, until.unwrap_or_else(PduCount::max)) + .pdus_rev(user_id, room_id, until.unwrap_or_else(PduCount::max)) } /// Forward iteration starting at from. #[tracing::instrument(skip(self), level = "debug")] pub fn pdus<'a>( &'a self, + user_id: Option<&'a UserId>, room_id: &'a RoomId, from: Option, ) -> impl Stream> + Send + 'a { - self.db.pdus(room_id, from.unwrap_or_else(PduCount::min)) + self.db + .pdus(user_id, room_id, from.unwrap_or_else(PduCount::min)) } /// Replace a PDU with the redacted form. diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index cd84f7e7..fab02f6b 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -781,7 +781,7 @@ impl Service { for pdu in pdus { // Redacted events are not notification targets (we don't send push for them) - if pdu.is_redacted() { + if pdu.contains_unsigned_property("redacted_because", serde_json::Value::is_string) { continue; }