Compare commits

..

8 commits

Author SHA1 Message Date
Jacob Taylor
a4d6d94690 enable converged 6g at the edge in continuwuity
Some checks failed
Release Docker Image / define-variables (push) Failing after 6s
Rust Checks / Format (push) Failing after 1s
Release Docker Image / build-image (linux/amd64, linux-amd64) (push) Has been skipped
Release Docker Image / build-image (linux/arm64, linux-arm64) (push) Has been skipped
Release Docker Image / merge (push) Has been skipped
Rust Checks / Clippy (push) Failing after 19s
Rust Checks / Cargo Test (push) Failing after 8s
2025-05-27 18:56:37 -07:00
nexy7574
0d8e941007 Always calculate state diff IDs in syncv3
Some checks failed
Rust Checks / Format (push) Failing after 2s
Rust Checks / Clippy (push) Failing after 16s
Rust Checks / Cargo Test (push) Failing after 10s
seemingly fixes #779
2025-05-26 11:58:12 -07:00
Jacob Taylor
385c33e87b upgrade some settings to enable 5g in continuwuity 2025-05-26 11:58:12 -07:00
Jacob Taylor
a34e4964c4 add futures::FutureExt to make cba9ee5240 work 2025-05-26 11:50:44 -07:00
Jason Volk
4e1e01a8dd Mitigate large futures
Signed-off-by: Jason Volk <jason@zemos.net>
2025-05-26 11:50:44 -07:00
Jacob Taylor
5d59c94316 bump the number of allowed immutable memtables by 1, to allow for greater flood protection
this should probably not be applied if you have rocksdb_atomic_flush = false (the default)
2025-05-26 11:50:44 -07:00
Jacob Taylor
7ab623d694 probably incorrectly delete support for non-standardized matrix srv record 2025-05-26 11:41:06 -07:00
Jacob Taylor
d5b3755242 Fix spaces rooms list load error. rev2 2025-05-26 11:40:57 -07:00
20 changed files with 80 additions and 588 deletions

View file

@ -31,7 +31,7 @@ pub(super) async fn last(&self, room_id: OwnedRoomOrAliasId) -> Result {
.services .services
.rooms .rooms
.timeline .timeline
.last_timeline_count(&room_id) .last_timeline_count(None, &room_id)
.await?; .await?;
self.write_str(&format!("{result:#?}")).await self.write_str(&format!("{result:#?}")).await
@ -52,7 +52,7 @@ pub(super) async fn pdus(
.services .services
.rooms .rooms
.timeline .timeline
.pdus_rev(&room_id, from) .pdus_rev(None, &room_id, from)
.try_take(limit.unwrap_or(3)) .try_take(limit.unwrap_or(3))
.try_collect() .try_collect()
.await?; .await?;

View file

@ -84,25 +84,11 @@ pub(crate) async fn get_context_route(
let base_event = ignored_filter(&services, (base_count, base_pdu), sender_user); let base_event = ignored_filter(&services, (base_count, base_pdu), sender_user);
// PDUs are used to get seen user IDs and then returned in response.
let events_before = services let events_before = services
.rooms .rooms
.timeline .timeline
.pdus_rev(room_id, Some(base_count)) .pdus_rev(Some(sender_user), room_id, Some(base_count))
.ignore_err() .ignore_err()
.then(async |mut pdu| {
pdu.1.set_unsigned(Some(sender_user));
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
.await
{
debug_warn!("Failed to add bundled aggregations: {e}");
}
pdu
})
.ready_filter_map(|item| event_filter(item, filter)) .ready_filter_map(|item| event_filter(item, filter))
.wide_filter_map(|item| ignored_filter(&services, item, sender_user)) .wide_filter_map(|item| ignored_filter(&services, item, sender_user))
.wide_filter_map(|item| visibility_filter(&services, item, sender_user)) .wide_filter_map(|item| visibility_filter(&services, item, sender_user))
@ -112,20 +98,8 @@ pub(crate) async fn get_context_route(
let events_after = services let events_after = services
.rooms .rooms
.timeline .timeline
.pdus(room_id, Some(base_count)) .pdus(Some(sender_user), room_id, Some(base_count))
.ignore_err() .ignore_err()
.then(async |mut pdu| {
pdu.1.set_unsigned(Some(sender_user));
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
.await
{
debug_warn!("Failed to add bundled aggregations: {e}");
}
pdu
})
.ready_filter_map(|item| event_filter(item, filter)) .ready_filter_map(|item| event_filter(item, filter))
.wide_filter_map(|item| ignored_filter(&services, item, sender_user)) .wide_filter_map(|item| ignored_filter(&services, item, sender_user))
.wide_filter_map(|item| visibility_filter(&services, item, sender_user)) .wide_filter_map(|item| visibility_filter(&services, item, sender_user))

View file

@ -2,7 +2,7 @@ use core::panic;
use axum::extract::State; use axum::extract::State;
use conduwuit::{ use conduwuit::{
Err, Result, at, debug_warn, Err, Result, at,
matrix::{ matrix::{
Event, Event,
pdu::{PduCount, PduEvent}, pdu::{PduCount, PduEvent},
@ -114,14 +114,14 @@ pub(crate) async fn get_message_events_route(
| Direction::Forward => services | Direction::Forward => services
.rooms .rooms
.timeline .timeline
.pdus(room_id, Some(from)) .pdus(Some(sender_user), room_id, Some(from))
.ignore_err() .ignore_err()
.boxed(), .boxed(),
| Direction::Backward => services | Direction::Backward => services
.rooms .rooms
.timeline .timeline
.pdus_rev(room_id, Some(from)) .pdus_rev(Some(sender_user), room_id, Some(from))
.ignore_err() .ignore_err()
.boxed(), .boxed(),
}; };
@ -132,18 +132,6 @@ pub(crate) async fn get_message_events_route(
.wide_filter_map(|item| ignored_filter(&services, item, sender_user)) .wide_filter_map(|item| ignored_filter(&services, item, sender_user))
.wide_filter_map(|item| visibility_filter(&services, item, sender_user)) .wide_filter_map(|item| visibility_filter(&services, item, sender_user))
.take(limit) .take(limit)
.then(async |mut pdu| {
pdu.1.set_unsigned(Some(sender_user));
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
.await
{
debug_warn!("Failed to add bundled aggregations: {e}");
}
pdu
})
.collect() .collect()
.await; .await;

View file

@ -1,6 +1,6 @@
use axum::extract::State; use axum::extract::State;
use conduwuit::{ use conduwuit::{
Result, at, debug_warn, Result, at,
matrix::pdu::PduCount, matrix::pdu::PduCount,
utils::{IterStream, ReadyExt, result::FlatOk, stream::WidebandExt}, utils::{IterStream, ReadyExt, result::FlatOk, stream::WidebandExt},
}; };
@ -149,17 +149,6 @@ async fn paginate_relations_with_filter(
.ready_take_while(|(count, _)| Some(*count) != to) .ready_take_while(|(count, _)| Some(*count) != to)
.wide_filter_map(|item| visibility_filter(services, sender_user, item)) .wide_filter_map(|item| visibility_filter(services, sender_user, item))
.take(limit) .take(limit)
.then(async |mut pdu| {
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
.await
{
debug_warn!("Failed to add bundled aggregations to relation: {e}");
}
pdu
})
.collect() .collect()
.await; .await;
@ -183,10 +172,6 @@ async fn paginate_relations_with_filter(
}) })
} }
// TODO: Can we move the visibility filter lower down, to avoid checking events
// that won't be sent? At the moment this also results in getting events that
// appear to have no relation because intermediaries are not visible to the
// user.
async fn visibility_filter( async fn visibility_filter(
services: &Services, services: &Services,
sender_user: &UserId, sender_user: &UserId,

View file

@ -1,5 +1,5 @@
use axum::extract::State; use axum::extract::State;
use conduwuit::{Err, Event, Result, debug_warn, err}; use conduwuit::{Err, Event, Result, err};
use futures::{FutureExt, TryFutureExt, future::try_join}; use futures::{FutureExt, TryFutureExt, future::try_join};
use ruma::api::client::room::get_room_event; use ruma::api::client::room::get_room_event;
@ -38,16 +38,7 @@ pub(crate) async fn get_room_event_route(
"Fetched PDU must match requested" "Fetched PDU must match requested"
); );
if let Err(e) = services event.add_age().ok();
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(body.sender_user(), &mut event)
.await
{
debug_warn!("Failed to add bundled aggregations to event: {e}");
}
event.set_unsigned(body.sender_user.as_deref());
Ok(get_room_event::v3::Response { event: event.into_room_event() }) Ok(get_room_event::v3::Response { event: event.into_room_event() })
} }

View file

@ -1,6 +1,6 @@
use axum::extract::State; use axum::extract::State;
use conduwuit::{ use conduwuit::{
Err, PduEvent, Result, at, debug_warn, Err, PduEvent, Result, at,
utils::{BoolExt, stream::TryTools}, utils::{BoolExt, stream::TryTools},
}; };
use futures::TryStreamExt; use futures::TryStreamExt;
@ -25,28 +25,12 @@ pub(crate) async fn room_initial_sync_route(
return Err!(Request(Forbidden("No room preview available."))); return Err!(Request(Forbidden("No room preview available.")));
} }
// Events are returned in body
let limit = LIMIT_MAX; let limit = LIMIT_MAX;
let events: Vec<_> = services let events: Vec<_> = services
.rooms .rooms
.timeline .timeline
.pdus_rev(room_id, None) .pdus_rev(None, room_id, None)
.try_take(limit) .try_take(limit)
.and_then(async |mut pdu| {
pdu.1.set_unsigned(body.sender_user.as_deref());
if let Some(sender_user) = body.sender_user.as_deref() {
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
.await
{
debug_warn!("Failed to add bundled aggregations: {e}");
}
}
Ok(pdu)
})
.try_collect() .try_collect()
.await?; .await?;

View file

@ -2,7 +2,7 @@ use std::collections::BTreeMap;
use axum::extract::State; use axum::extract::State;
use conduwuit::{ use conduwuit::{
Err, Result, at, debug_warn, is_true, Err, Result, at, is_true,
matrix::pdu::PduEvent, matrix::pdu::PduEvent,
result::FlatOk, result::FlatOk,
utils::{IterStream, stream::ReadyExt}, utils::{IterStream, stream::ReadyExt},
@ -144,17 +144,6 @@ async fn category_room_events(
.map(at!(2)) .map(at!(2))
.flatten() .flatten()
.stream() .stream()
.then(|mut pdu| async {
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu)
.await
{
debug_warn!("Failed to add bundled aggregations to search result: {e}");
}
pdu
})
.map(PduEvent::into_room_event) .map(PduEvent::into_room_event)
.map(|result| SearchResult { .map(|result| SearchResult {
rank: None, rank: None,

View file

@ -3,7 +3,7 @@ mod v4;
mod v5; mod v5;
use conduwuit::{ use conduwuit::{
Error, PduCount, Result, debug_warn, Error, PduCount, Result,
matrix::pdu::PduEvent, matrix::pdu::PduEvent,
utils::stream::{BroadbandExt, ReadyExt, TryIgnore}, utils::stream::{BroadbandExt, ReadyExt, TryIgnore},
}; };
@ -31,7 +31,11 @@ async fn load_timeline(
next_batch: Option<PduCount>, next_batch: Option<PduCount>,
limit: usize, limit: usize,
) -> Result<(Vec<(PduCount, PduEvent)>, bool), Error> { ) -> Result<(Vec<(PduCount, PduEvent)>, bool), Error> {
let last_timeline_count = services.rooms.timeline.last_timeline_count(room_id).await?; let last_timeline_count = services
.rooms
.timeline
.last_timeline_count(Some(sender_user), room_id)
.await?;
if last_timeline_count <= roomsincecount { if last_timeline_count <= roomsincecount {
return Ok((Vec::new(), false)); return Ok((Vec::new(), false));
@ -40,25 +44,10 @@ async fn load_timeline(
let non_timeline_pdus = services let non_timeline_pdus = services
.rooms .rooms
.timeline .timeline
.pdus_rev(room_id, None) .pdus_rev(Some(sender_user), room_id, None)
.ignore_err() .ignore_err()
.ready_skip_while(|&(pducount, _)| pducount > next_batch.unwrap_or_else(PduCount::max)) .ready_skip_while(|&(pducount, _)| pducount > next_batch.unwrap_or_else(PduCount::max))
.ready_take_while(|&(pducount, _)| pducount > roomsincecount) .ready_take_while(|&(pducount, _)| pducount > roomsincecount);
.map(move |mut pdu| {
pdu.1.set_unsigned(Some(sender_user));
pdu
})
.then(async move |mut pdu| {
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
.await
{
debug_warn!("Failed to add bundled aggregations: {e}");
}
pdu
});
// Take the last events for the timeline // Take the last events for the timeline
pin_mut!(non_timeline_pdus); pin_mut!(non_timeline_pdus);

View file

@ -1189,7 +1189,7 @@ async fn calculate_heroes(
services services
.rooms .rooms
.timeline .timeline
.all_pdus(room_id) .all_pdus(sender_user, room_id)
.ready_filter(|(_, pdu)| pdu.kind == RoomMember) .ready_filter(|(_, pdu)| pdu.kind == RoomMember)
.fold_default(|heroes: Vec<_>, (_, pdu)| { .fold_default(|heroes: Vec<_>, (_, pdu)| {
fold_hero(heroes, services, room_id, sender_user, pdu) fold_hero(heroes, services, room_id, sender_user, pdu)

View file

@ -1,6 +1,6 @@
use axum::extract::State; use axum::extract::State;
use conduwuit::{ use conduwuit::{
Result, at, debug_warn, Result, at,
matrix::pdu::{PduCount, PduEvent}, matrix::pdu::{PduCount, PduEvent},
}; };
use futures::StreamExt; use futures::StreamExt;
@ -28,8 +28,6 @@ pub(crate) async fn get_threads_route(
.transpose()? .transpose()?
.unwrap_or_else(PduCount::max); .unwrap_or_else(PduCount::max);
// TODO: user_can_see_event and set_unsigned should be at the same level /
// function, so unsigned is only set for seen events.
let threads: Vec<(PduCount, PduEvent)> = services let threads: Vec<(PduCount, PduEvent)> = services
.rooms .rooms
.threads .threads
@ -44,17 +42,6 @@ pub(crate) async fn get_threads_route(
.await .await
.then_some((count, pdu)) .then_some((count, pdu))
}) })
.then(|(count, mut pdu)| async move {
if let Err(e) = services
.rooms
.pdu_metadata
.add_bundled_aggregations_to_pdu(body.sender_user(), &mut pdu)
.await
{
debug_warn!("Failed to add bundled aggregations to thread: {e}");
}
(count, pdu)
})
.collect() .collect()
.await; .await;

View file

@ -3,7 +3,6 @@ use std::cmp;
use axum::extract::State; use axum::extract::State;
use conduwuit::{ use conduwuit::{
PduCount, Result, PduCount, Result,
result::LogErr,
utils::{IterStream, ReadyExt, stream::TryTools}, utils::{IterStream, ReadyExt, stream::TryTools},
}; };
use futures::{FutureExt, StreamExt, TryStreamExt}; use futures::{FutureExt, StreamExt, TryStreamExt};
@ -63,7 +62,7 @@ pub(crate) async fn get_backfill_route(
pdus: services pdus: services
.rooms .rooms
.timeline .timeline
.pdus_rev(&body.room_id, Some(from.saturating_add(1))) .pdus_rev(None, &body.room_id, Some(from.saturating_add(1)))
.try_take(limit) .try_take(limit)
.try_filter_map(|(_, pdu)| async move { .try_filter_map(|(_, pdu)| async move {
Ok(services Ok(services
@ -73,15 +72,6 @@ pub(crate) async fn get_backfill_route(
.await .await
.then_some(pdu)) .then_some(pdu))
}) })
.and_then(async |mut pdu| {
// Strip the transaction ID, as that is private
pdu.remove_transaction_id().log_err().ok();
// Add age, as this is specified
pdu.add_age().log_err().ok();
// It's not clear if we should strip or add any more data, leave as is.
// In particular: Redaction?
Ok(pdu)
})
.try_filter_map(|pdu| async move { .try_filter_map(|pdu| async move {
Ok(services Ok(services
.rooms .rooms

View file

@ -1,24 +1,11 @@
use std::{borrow::Borrow, collections::BTreeMap}; use std::collections::BTreeMap;
use ruma::MilliSecondsSinceUnixEpoch; use ruma::MilliSecondsSinceUnixEpoch;
use serde::Deserialize; use serde::Deserialize;
use serde_json::value::{RawValue as RawJsonValue, Value as JsonValue, to_raw_value}; use serde_json::value::{RawValue as RawJsonValue, Value as JsonValue, to_raw_value};
use super::Pdu; use super::Pdu;
use crate::{Result, err, implement, is_true, result::LogErr}; use crate::{Result, err, implement, is_true};
/// Set the `unsigned` field of the PDU using only information in the PDU.
/// Some unsigned data is already set within the database (eg. prev events,
/// threads). Once this is done, other data must be calculated from the database
/// (eg. relations) This is for server-to-client events.
/// Backfill handles this itself.
#[implement(Pdu)]
pub fn set_unsigned(&mut self, user_id: Option<&ruma::UserId>) {
if Some(self.sender.borrow()) != user_id {
self.remove_transaction_id().log_err().ok();
}
self.add_age().log_err().ok();
}
#[implement(Pdu)] #[implement(Pdu)]
pub fn remove_transaction_id(&mut self) -> Result { pub fn remove_transaction_id(&mut self) -> Result {

View file

@ -1,394 +0,0 @@
use conduwuit::{Event, PduEvent, Result, err};
use ruma::{
EventId, RoomId, UserId,
api::Direction,
events::relation::{BundledMessageLikeRelations, BundledReference, ReferenceChunk},
};
use super::PdusIterItem;
const MAX_BUNDLED_RELATIONS: usize = 50;
impl super::Service {
/// Gets bundled aggregations for an event according to the Matrix
/// specification.
/// - m.replace relations are bundled to include the most recent replacement
/// event.
/// - m.reference relations are bundled to include a chunk of event IDs.
#[tracing::instrument(skip(self), level = "debug")]
pub async fn get_bundled_aggregations(
&self,
user_id: &UserId,
room_id: &RoomId,
event_id: &EventId,
) -> Result<Option<BundledMessageLikeRelations<Box<serde_json::value::RawValue>>>> {
let relations = self
.get_relations(
user_id,
room_id,
event_id,
conduwuit::PduCount::max(),
MAX_BUNDLED_RELATIONS,
0,
Direction::Backward,
)
.await;
// The relations database code still handles the basic unsigned data
// We don't want to recursively fetch relations
// TODO: Event visibility check
// TODO: ignored users?
if relations.is_empty() {
return Ok(None);
}
let mut replace_events = Vec::with_capacity(relations.len().min(10)); // Most events have few replacements
let mut reference_events = Vec::with_capacity(relations.len());
for relation in &relations {
let pdu = &relation.1;
let content = pdu.get_content_as_value();
if let Some(relates_to) = content.get("m.relates_to") {
// We don't check that the event relates back, because we assume the database is
// good.
if let Some(rel_type) = relates_to.get("rel_type") {
match rel_type.as_str() {
| Some("m.replace") => {
replace_events.push(relation);
},
| Some("m.reference") => {
reference_events.push(relation);
},
| _ => {
// Ignore other relation types for now
// Threads are in the database but not handled here
// Other types are not specified AFAICT.
},
}
}
}
}
// If no relations to bundle, return None
if replace_events.is_empty() && reference_events.is_empty() {
return Ok(None);
}
let mut bundled = BundledMessageLikeRelations::new();
// Handle m.replace relations - find the most recent one
if !replace_events.is_empty() {
let most_recent_replacement = Self::find_most_recent_replacement(&replace_events)?;
// Convert the replacement event to the bundled format
if let Some(replacement_pdu) = most_recent_replacement {
// According to the Matrix spec, we should include the full event as raw JSON
let replacement_json = serde_json::to_string(replacement_pdu)
.map_err(|e| err!(Database("Failed to serialize replacement event: {e}")))?;
let raw_value = serde_json::value::RawValue::from_string(replacement_json)
.map_err(|e| err!(Database("Failed to create RawValue: {e}")))?;
bundled.replace = Some(Box::new(raw_value));
}
}
// Handle m.reference relations - collect event IDs
if !reference_events.is_empty() {
let reference_chunk = Self::build_reference_chunk(&reference_events)?;
if !reference_chunk.is_empty() {
bundled.reference = Some(Box::new(ReferenceChunk::new(reference_chunk)));
}
}
// TODO: Handle other relation types (m.annotation, etc.) when specified
Ok(Some(bundled))
}
/// Build reference chunk for m.reference bundled aggregations
fn build_reference_chunk(
reference_events: &[&PdusIterItem],
) -> Result<Vec<BundledReference>> {
let mut chunk = Vec::with_capacity(reference_events.len());
for relation in reference_events {
let pdu = &relation.1;
let reference_entry = BundledReference::new(pdu.event_id().to_owned());
chunk.push(reference_entry);
}
// Don't sort, order is unspecified
Ok(chunk)
}
/// Find the most recent replacement event based on origin_server_ts and
/// lexicographic event_id ordering
fn find_most_recent_replacement<'a>(
replacement_events: &'a [&'a PdusIterItem],
) -> Result<Option<&'a PduEvent>> {
if replacement_events.is_empty() {
return Ok(None);
}
let mut most_recent: Option<&PduEvent> = None;
// Jank, is there a better way to do this?
for relation in replacement_events {
let pdu = &relation.1;
match most_recent {
| None => {
most_recent = Some(pdu);
},
| Some(current_most_recent) => {
// Compare by origin_server_ts first
match pdu
.origin_server_ts()
.cmp(&current_most_recent.origin_server_ts())
{
| std::cmp::Ordering::Greater => {
most_recent = Some(pdu);
},
| std::cmp::Ordering::Equal => {
// If timestamps are equal, use lexicographic ordering of event_id
if pdu.event_id() > current_most_recent.event_id() {
most_recent = Some(pdu);
}
},
| std::cmp::Ordering::Less => {
// Keep current most recent
},
}
},
}
}
Ok(most_recent)
}
/// Adds bundled aggregations to a PDU's unsigned field
#[tracing::instrument(skip(self, pdu), level = "debug")]
pub async fn add_bundled_aggregations_to_pdu(
&self,
user_id: &UserId,
pdu: &mut PduEvent,
) -> Result<()> {
if pdu.is_redacted() {
return Ok(());
}
let bundled_aggregations = self
.get_bundled_aggregations(user_id, pdu.room_id(), pdu.event_id())
.await?;
if let Some(aggregations) = bundled_aggregations {
let aggregations_json = serde_json::to_value(aggregations)
.map_err(|e| err!(Database("Failed to serialize bundled aggregations: {e}")))?;
Self::add_bundled_aggregations_to_unsigned(pdu, aggregations_json)?;
}
Ok(())
}
/// Helper method to add bundled aggregations to a PDU's unsigned
/// field
fn add_bundled_aggregations_to_unsigned(
pdu: &mut PduEvent,
aggregations_json: serde_json::Value,
) -> Result<()> {
use serde_json::{
Map, Value as JsonValue,
value::{RawValue as RawJsonValue, to_raw_value},
};
let mut unsigned: Map<String, JsonValue> = pdu
.unsigned
.as_deref()
.map(RawJsonValue::get)
.map_or_else(|| Ok(Map::new()), serde_json::from_str)
.map_err(|e| err!(Database("Invalid unsigned in pdu event: {e}")))?;
let relations = unsigned
.entry("m.relations")
.or_insert_with(|| JsonValue::Object(Map::new()))
.as_object_mut()
.ok_or_else(|| err!(Database("m.relations is not an object")))?;
if let JsonValue::Object(aggregations_map) = aggregations_json {
for (rel_type, aggregation) in aggregations_map {
relations.insert(rel_type, aggregation);
}
}
pdu.unsigned = Some(to_raw_value(&unsigned)?);
Ok(())
}
}
#[cfg(test)]
mod tests {
use conduwuit_core::pdu::{EventHash, PduEvent};
use ruma::{UInt, events::TimelineEventType, owned_event_id, owned_room_id, owned_user_id};
use serde_json::{Value as JsonValue, json, value::to_raw_value};
fn create_test_pdu(unsigned_content: Option<JsonValue>) -> PduEvent {
PduEvent {
event_id: owned_event_id!("$test:example.com"),
room_id: owned_room_id!("!test:example.com"),
sender: owned_user_id!("@test:example.com"),
origin_server_ts: UInt::try_from(1_234_567_890_u64).unwrap(),
kind: TimelineEventType::RoomMessage,
content: to_raw_value(&json!({"msgtype": "m.text", "body": "test"})).unwrap(),
state_key: None,
prev_events: vec![],
depth: UInt::from(1_u32),
auth_events: vec![],
redacts: None,
unsigned: unsigned_content.map(|content| to_raw_value(&content).unwrap()),
hashes: EventHash { sha256: "test_hash".to_owned() },
signatures: None,
origin: None,
}
}
fn create_bundled_aggregations() -> JsonValue {
json!({
"m.replace": {
"event_id": "$replace:example.com",
"origin_server_ts": 1_234_567_890,
"sender": "@replacer:example.com"
},
"m.reference": {
"count": 5,
"chunk": [
"$ref1:example.com",
"$ref2:example.com"
]
}
})
}
#[test]
fn test_add_bundled_aggregations_to_unsigned_no_existing_unsigned() {
let mut pdu = create_test_pdu(None);
let aggregations = create_bundled_aggregations();
let result = super::super::Service::add_bundled_aggregations_to_unsigned(
&mut pdu,
aggregations.clone(),
);
assert!(result.is_ok(), "Should succeed when no unsigned field exists");
assert!(pdu.unsigned.is_some(), "Unsigned field should be created");
let unsigned_str = pdu.unsigned.as_ref().unwrap().get();
let unsigned: JsonValue = serde_json::from_str(unsigned_str).unwrap();
assert!(unsigned.get("m.relations").is_some(), "m.relations should exist");
assert_eq!(
unsigned["m.relations"], aggregations,
"Relations should match the aggregations"
);
}
#[test]
fn test_add_bundled_aggregations_to_unsigned_overwrite_same_relation_type() {
let existing_unsigned = json!({
"m.relations": {
"m.replace": {
"event_id": "$old_replace:example.com",
"origin_server_ts": 1_111_111_111,
"sender": "@old_replacer:example.com"
}
}
});
let mut pdu = create_test_pdu(Some(existing_unsigned));
let new_aggregations = create_bundled_aggregations();
let result = super::super::Service::add_bundled_aggregations_to_unsigned(
&mut pdu,
new_aggregations.clone(),
);
assert!(result.is_ok(), "Should succeed when overwriting same relation type");
let unsigned_str = pdu.unsigned.as_ref().unwrap().get();
let unsigned: JsonValue = serde_json::from_str(unsigned_str).unwrap();
let relations = &unsigned["m.relations"];
assert_eq!(
relations["m.replace"], new_aggregations["m.replace"],
"m.replace should be updated"
);
assert_eq!(
relations["m.replace"]["event_id"], "$replace:example.com",
"Should have new event_id"
);
assert!(relations.get("m.reference").is_some(), "New m.reference should be added");
}
#[test]
fn test_add_bundled_aggregations_to_unsigned_preserve_other_unsigned_fields() {
// Test case: Other unsigned fields should be preserved
let existing_unsigned = json!({
"age": 98765,
"prev_content": {"msgtype": "m.text", "body": "old message"},
"redacted_because": {"event_id": "$redaction:example.com"},
"m.relations": {
"m.annotation": {"count": 1}
}
});
let mut pdu = create_test_pdu(Some(existing_unsigned));
let new_aggregations = json!({
"m.replace": {"event_id": "$new:example.com"}
});
let result = super::super::Service::add_bundled_aggregations_to_unsigned(
&mut pdu,
new_aggregations,
);
assert!(result.is_ok(), "Should succeed while preserving other fields");
let unsigned_str = pdu.unsigned.as_ref().unwrap().get();
let unsigned: JsonValue = serde_json::from_str(unsigned_str).unwrap();
// Verify all existing fields are preserved
assert_eq!(unsigned["age"], 98765, "age should be preserved");
assert!(unsigned.get("prev_content").is_some(), "prev_content should be preserved");
assert!(
unsigned.get("redacted_because").is_some(),
"redacted_because should be preserved"
);
// Verify relations were merged correctly
let relations = &unsigned["m.relations"];
assert!(
relations.get("m.annotation").is_some(),
"Existing m.annotation should be preserved"
);
assert!(relations.get("m.replace").is_some(), "New m.replace should be added");
}
#[test]
fn test_add_bundled_aggregations_to_unsigned_invalid_existing_unsigned() {
// Test case: Invalid JSON in existing unsigned should result in error
let mut pdu = create_test_pdu(None);
// Manually set invalid unsigned data
pdu.unsigned = Some(to_raw_value(&"invalid json").unwrap());
let aggregations = create_bundled_aggregations();
let result =
super::super::Service::add_bundled_aggregations_to_unsigned(&mut pdu, aggregations);
assert!(result.is_err(), "fails when existing unsigned is invalid");
// Should we ignore the error and overwrite anyway?
}
}

View file

@ -3,6 +3,7 @@ use std::{mem::size_of, sync::Arc};
use conduwuit::{ use conduwuit::{
PduCount, PduEvent, PduCount, PduEvent,
arrayvec::ArrayVec, arrayvec::ArrayVec,
result::LogErr,
utils::{ utils::{
ReadyExt, ReadyExt,
stream::{TryIgnore, WidebandExt}, stream::{TryIgnore, WidebandExt},
@ -79,7 +80,9 @@ impl Data {
let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?; let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?;
pdu.set_unsigned(Some(user_id)); if pdu.sender != user_id {
pdu.remove_transaction_id().log_err().ok();
}
Some((shorteventid, pdu)) Some((shorteventid, pdu))
}) })

View file

@ -1,4 +1,3 @@
mod bundled_aggregations;
mod data; mod data;
use std::sync::Arc; use std::sync::Arc;

View file

@ -127,12 +127,7 @@ pub async fn search_pdus<'a>(
.then_some(pdu) .then_some(pdu)
}) })
.skip(query.skip) .skip(query.skip)
.take(query.limit) .take(query.limit);
.map(move |mut pdu| {
pdu.set_unsigned(query.user_id);
// TODO: bundled aggregation
pdu
});
Ok((count, pdus)) Ok((count, pdus))
} }

View file

@ -160,7 +160,9 @@ impl Service {
let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?; let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?;
let pdu_id: PduId = pdu_id.into(); let pdu_id: PduId = pdu_id.into();
pdu.set_unsigned(Some(user_id)); if pdu.sender != user_id {
pdu.remove_transaction_id().ok();
}
Some((pdu_id.shorteventid, pdu)) Some((pdu_id.shorteventid, pdu))
}); });

View file

@ -1,11 +1,14 @@
use std::sync::Arc; use std::{borrow::Borrow, sync::Arc};
use conduwuit::{ use conduwuit::{
Err, PduCount, PduEvent, Result, at, err, result::NotFound, utils, utils::stream::TryReadyExt, Err, PduCount, PduEvent, Result, at, err,
result::{LogErr, NotFound},
utils,
utils::stream::TryReadyExt,
}; };
use database::{Database, Deserialized, Json, KeyVal, Map}; use database::{Database, Deserialized, Json, KeyVal, Map};
use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, future::select_ok, pin_mut}; use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, future::select_ok, pin_mut};
use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, api::Direction}; use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, UserId, api::Direction};
use super::{PduId, RawPduId}; use super::{PduId, RawPduId};
use crate::{Dep, rooms, rooms::short::ShortRoomId}; use crate::{Dep, rooms, rooms::short::ShortRoomId};
@ -43,8 +46,12 @@ impl Data {
} }
#[inline] #[inline]
pub(super) async fn last_timeline_count(&self, room_id: &RoomId) -> Result<PduCount> { pub(super) async fn last_timeline_count(
let pdus_rev = self.pdus_rev(room_id, PduCount::max()); &self,
sender_user: Option<&UserId>,
room_id: &RoomId,
) -> Result<PduCount> {
let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max());
pin_mut!(pdus_rev); pin_mut!(pdus_rev);
let last_count = pdus_rev let last_count = pdus_rev
@ -58,8 +65,12 @@ impl Data {
} }
#[inline] #[inline]
pub(super) async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result<PduEvent> { pub(super) async fn latest_pdu_in_room(
let pdus_rev = self.pdus_rev(room_id, PduCount::max()); &self,
sender_user: Option<&UserId>,
room_id: &RoomId,
) -> Result<PduEvent> {
let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max());
pin_mut!(pdus_rev); pin_mut!(pdus_rev);
pdus_rev pdus_rev
@ -212,6 +223,7 @@ impl Data {
/// order. /// order.
pub(super) fn pdus_rev<'a>( pub(super) fn pdus_rev<'a>(
&'a self, &'a self,
user_id: Option<&'a UserId>,
room_id: &'a RoomId, room_id: &'a RoomId,
until: PduCount, until: PduCount,
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a { ) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
@ -221,13 +233,14 @@ impl Data {
self.pduid_pdu self.pduid_pdu
.rev_raw_stream_from(&current) .rev_raw_stream_from(&current)
.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix))) .ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
.ready_and_then(Self::from_json_slice) .ready_and_then(move |item| Self::each_pdu(item, user_id))
}) })
.try_flatten_stream() .try_flatten_stream()
} }
pub(super) fn pdus<'a>( pub(super) fn pdus<'a>(
&'a self, &'a self,
user_id: Option<&'a UserId>,
room_id: &'a RoomId, room_id: &'a RoomId,
from: PduCount, from: PduCount,
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a { ) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
@ -237,15 +250,21 @@ impl Data {
self.pduid_pdu self.pduid_pdu
.raw_stream_from(&current) .raw_stream_from(&current)
.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix))) .ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
.ready_and_then(Self::from_json_slice) .ready_and_then(move |item| Self::each_pdu(item, user_id))
}) })
.try_flatten_stream() .try_flatten_stream()
} }
fn from_json_slice((pdu_id, pdu): KeyVal<'_>) -> Result<PdusIterItem> { fn each_pdu((pdu_id, pdu): KeyVal<'_>, user_id: Option<&UserId>) -> Result<PdusIterItem> {
let pdu_id: RawPduId = pdu_id.into(); let pdu_id: RawPduId = pdu_id.into();
let pdu = serde_json::from_slice::<PduEvent>(pdu)?; let mut pdu = serde_json::from_slice::<PduEvent>(pdu)?;
if Some(pdu.sender.borrow()) != user_id {
pdu.remove_transaction_id().log_err().ok();
}
pdu.add_age().log_err().ok();
Ok((pdu_id.pdu_count(), pdu)) Ok((pdu_id.pdu_count(), pdu))
} }

View file

@ -165,7 +165,7 @@ impl Service {
#[tracing::instrument(skip(self), level = "debug")] #[tracing::instrument(skip(self), level = "debug")]
pub async fn first_item_in_room(&self, room_id: &RoomId) -> Result<(PduCount, PduEvent)> { pub async fn first_item_in_room(&self, room_id: &RoomId) -> Result<(PduCount, PduEvent)> {
let pdus = self.pdus(room_id, None); let pdus = self.pdus(None, room_id, None);
pin_mut!(pdus); pin_mut!(pdus);
pdus.try_next() pdus.try_next()
@ -175,12 +175,16 @@ impl Service {
#[tracing::instrument(skip(self), level = "debug")] #[tracing::instrument(skip(self), level = "debug")]
pub async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result<PduEvent> { pub async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result<PduEvent> {
self.db.latest_pdu_in_room(room_id).await self.db.latest_pdu_in_room(None, room_id).await
} }
#[tracing::instrument(skip(self), level = "debug")] #[tracing::instrument(skip(self), level = "debug")]
pub async fn last_timeline_count(&self, room_id: &RoomId) -> Result<PduCount> { pub async fn last_timeline_count(
self.db.last_timeline_count(room_id).await &self,
sender_user: Option<&UserId>,
room_id: &RoomId,
) -> Result<PduCount> {
self.db.last_timeline_count(sender_user, room_id).await
} }
/// Returns the `count` of this pdu's id. /// Returns the `count` of this pdu's id.
@ -541,10 +545,6 @@ impl Service {
| _ => {}, | _ => {},
} }
// CONCERN: If we receive events with a relation out-of-order, we never write
// their relation / thread. We need some kind of way to trigger when we receive
// this event, and potentially a way to rebuild the table entirely.
if let Ok(content) = pdu.get_content::<ExtractRelatesToEventId>() { if let Ok(content) = pdu.get_content::<ExtractRelatesToEventId>() {
if let Ok(related_pducount) = self.get_pdu_count(&content.relates_to.event_id).await { if let Ok(related_pducount) = self.get_pdu_count(&content.relates_to.event_id).await {
self.services self.services
@ -996,30 +996,34 @@ impl Service {
#[inline] #[inline]
pub fn all_pdus<'a>( pub fn all_pdus<'a>(
&'a self, &'a self,
user_id: &'a UserId,
room_id: &'a RoomId, room_id: &'a RoomId,
) -> impl Stream<Item = PdusIterItem> + Send + 'a { ) -> impl Stream<Item = PdusIterItem> + Send + 'a {
self.pdus(room_id, None).ignore_err() self.pdus(Some(user_id), room_id, None).ignore_err()
} }
/// Reverse iteration starting at from. /// Reverse iteration starting at from.
#[tracing::instrument(skip(self), level = "debug")] #[tracing::instrument(skip(self), level = "debug")]
pub fn pdus_rev<'a>( pub fn pdus_rev<'a>(
&'a self, &'a self,
user_id: Option<&'a UserId>,
room_id: &'a RoomId, room_id: &'a RoomId,
until: Option<PduCount>, until: Option<PduCount>,
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a { ) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
self.db self.db
.pdus_rev(room_id, until.unwrap_or_else(PduCount::max)) .pdus_rev(user_id, room_id, until.unwrap_or_else(PduCount::max))
} }
/// Forward iteration starting at from. /// Forward iteration starting at from.
#[tracing::instrument(skip(self), level = "debug")] #[tracing::instrument(skip(self), level = "debug")]
pub fn pdus<'a>( pub fn pdus<'a>(
&'a self, &'a self,
user_id: Option<&'a UserId>,
room_id: &'a RoomId, room_id: &'a RoomId,
from: Option<PduCount>, from: Option<PduCount>,
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a { ) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
self.db.pdus(room_id, from.unwrap_or_else(PduCount::min)) self.db
.pdus(user_id, room_id, from.unwrap_or_else(PduCount::min))
} }
/// Replace a PDU with the redacted form. /// Replace a PDU with the redacted form.

View file

@ -781,7 +781,7 @@ impl Service {
for pdu in pdus { for pdu in pdus {
// Redacted events are not notification targets (we don't send push for them) // Redacted events are not notification targets (we don't send push for them)
if pdu.is_redacted() { if pdu.contains_unsigned_property("redacted_because", serde_json::Value::is_string) {
continue; continue;
} }