mirror of
https://forgejo.ellis.link/continuwuation/continuwuity.git
synced 2025-07-03 10:04:39 +02:00
Compare commits
7 commits
e4a0337e56
...
5a861d406a
Author | SHA1 | Date | |
---|---|---|---|
|
5a861d406a | ||
|
db50adc000 | ||
|
6c11e59c4a | ||
|
5d44653e3a | ||
|
44e60d0ea6 | ||
|
d7514178ab | ||
|
1d45e0b68c |
24 changed files with 974 additions and 86 deletions
|
@ -180,7 +180,7 @@ jobs:
|
||||||
file: "docker/Dockerfile"
|
file: "docker/Dockerfile"
|
||||||
build-args: |
|
build-args: |
|
||||||
GIT_COMMIT_HASH=${{ github.sha }})
|
GIT_COMMIT_HASH=${{ github.sha }})
|
||||||
GIT_COMMIT_HASH_SHORT=${{ env.COMMIT_SHORT_SHA }})
|
GIT_COMMIT_HASH_SHORT=${{ env.COMMIT_SHORT_SHA }}
|
||||||
GIT_REMOTE_URL=${{github.event.repository.html_url }}
|
GIT_REMOTE_URL=${{github.event.repository.html_url }}
|
||||||
GIT_REMOTE_COMMIT_URL=${{github.event.head_commit.url }}
|
GIT_REMOTE_COMMIT_URL=${{github.event.head_commit.url }}
|
||||||
platforms: ${{ matrix.platform }}
|
platforms: ${{ matrix.platform }}
|
||||||
|
|
|
@ -20,10 +20,10 @@ We may backport fixes to the previous release at our discretion, but we don't gu
|
||||||
|
|
||||||
We appreciate the efforts of security researchers and the community in identifying and reporting vulnerabilities. To ensure that potential vulnerabilities are addressed properly, please follow these guidelines:
|
We appreciate the efforts of security researchers and the community in identifying and reporting vulnerabilities. To ensure that potential vulnerabilities are addressed properly, please follow these guidelines:
|
||||||
|
|
||||||
1. Contact members of the team over E2EE private message.
|
1. **Contact members of the team directly** over E2EE private message.
|
||||||
- [@jade:ellis.link](https://matrix.to/#/@jade:ellis.link)
|
- [@jade:ellis.link](https://matrix.to/#/@jade:ellis.link)
|
||||||
- [@nex:nexy7574.co.uk](https://matrix.to/#/@nex:nexy7574.co.uk) <!-- ? -->
|
- [@nex:nexy7574.co.uk](https://matrix.to/#/@nex:nexy7574.co.uk) <!-- ? -->
|
||||||
2. **Email the security team** directly at [security@continuwuity.org](mailto:security@continuwuity.org). This is not E2EE, so don't include sensitive details.
|
2. **Email the security team** at [security@continuwuity.org](mailto:security@continuwuity.org). This is not E2EE, so don't include sensitive details.
|
||||||
3. **Do not disclose the vulnerability publicly** until it has been addressed
|
3. **Do not disclose the vulnerability publicly** until it has been addressed
|
||||||
4. **Provide detailed information** about the vulnerability, including:
|
4. **Provide detailed information** about the vulnerability, including:
|
||||||
- A clear description of the issue
|
- A clear description of the issue
|
||||||
|
@ -48,7 +48,7 @@ When you report a security vulnerability:
|
||||||
|
|
||||||
When security vulnerabilities are identified:
|
When security vulnerabilities are identified:
|
||||||
|
|
||||||
1. We will develop and test fixes in a private branch
|
1. We will develop and test fixes in a private fork
|
||||||
2. Security updates will be released as soon as possible
|
2. Security updates will be released as soon as possible
|
||||||
3. Release notes will include information about the vulnerabilities, avoiding details that could facilitate exploitation where possible
|
3. Release notes will include information about the vulnerabilities, avoiding details that could facilitate exploitation where possible
|
||||||
4. Critical security updates may be backported to the previous stable release
|
4. Critical security updates may be backported to the previous stable release
|
||||||
|
|
|
@ -125,13 +125,13 @@ pub(super) enum DebugCommand {
|
||||||
reset: bool,
|
reset: bool,
|
||||||
},
|
},
|
||||||
|
|
||||||
/// - Verify json signatures
|
/// - Sign JSON blob
|
||||||
///
|
///
|
||||||
/// This command needs a JSON blob provided in a Markdown code block below
|
/// This command needs a JSON blob provided in a Markdown code block below
|
||||||
/// the command.
|
/// the command.
|
||||||
SignJson,
|
SignJson,
|
||||||
|
|
||||||
/// - Verify json signatures
|
/// - Verify JSON signatures
|
||||||
///
|
///
|
||||||
/// This command needs a JSON blob provided in a Markdown code block below
|
/// This command needs a JSON blob provided in a Markdown code block below
|
||||||
/// the command.
|
/// the command.
|
||||||
|
|
|
@ -31,7 +31,7 @@ pub(super) async fn last(&self, room_id: OwnedRoomOrAliasId) -> Result {
|
||||||
.services
|
.services
|
||||||
.rooms
|
.rooms
|
||||||
.timeline
|
.timeline
|
||||||
.last_timeline_count(None, &room_id)
|
.last_timeline_count(&room_id)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
self.write_str(&format!("{result:#?}")).await
|
self.write_str(&format!("{result:#?}")).await
|
||||||
|
@ -52,7 +52,7 @@ pub(super) async fn pdus(
|
||||||
.services
|
.services
|
||||||
.rooms
|
.rooms
|
||||||
.timeline
|
.timeline
|
||||||
.pdus_rev(None, &room_id, from)
|
.pdus_rev(&room_id, from)
|
||||||
.try_take(limit.unwrap_or(3))
|
.try_take(limit.unwrap_or(3))
|
||||||
.try_collect()
|
.try_collect()
|
||||||
.await?;
|
.await?;
|
||||||
|
|
|
@ -84,11 +84,25 @@ pub(crate) async fn get_context_route(
|
||||||
|
|
||||||
let base_event = ignored_filter(&services, (base_count, base_pdu), sender_user);
|
let base_event = ignored_filter(&services, (base_count, base_pdu), sender_user);
|
||||||
|
|
||||||
|
// PDUs are used to get seen user IDs and then returned in response.
|
||||||
|
|
||||||
let events_before = services
|
let events_before = services
|
||||||
.rooms
|
.rooms
|
||||||
.timeline
|
.timeline
|
||||||
.pdus_rev(Some(sender_user), room_id, Some(base_count))
|
.pdus_rev(room_id, Some(base_count))
|
||||||
.ignore_err()
|
.ignore_err()
|
||||||
|
.then(async |mut pdu| {
|
||||||
|
pdu.1.set_unsigned(Some(sender_user));
|
||||||
|
if let Err(e) = services
|
||||||
|
.rooms
|
||||||
|
.pdu_metadata
|
||||||
|
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
debug_warn!("Failed to add bundled aggregations: {e}");
|
||||||
|
}
|
||||||
|
pdu
|
||||||
|
})
|
||||||
.ready_filter_map(|item| event_filter(item, filter))
|
.ready_filter_map(|item| event_filter(item, filter))
|
||||||
.wide_filter_map(|item| ignored_filter(&services, item, sender_user))
|
.wide_filter_map(|item| ignored_filter(&services, item, sender_user))
|
||||||
.wide_filter_map(|item| visibility_filter(&services, item, sender_user))
|
.wide_filter_map(|item| visibility_filter(&services, item, sender_user))
|
||||||
|
@ -98,8 +112,20 @@ pub(crate) async fn get_context_route(
|
||||||
let events_after = services
|
let events_after = services
|
||||||
.rooms
|
.rooms
|
||||||
.timeline
|
.timeline
|
||||||
.pdus(Some(sender_user), room_id, Some(base_count))
|
.pdus(room_id, Some(base_count))
|
||||||
.ignore_err()
|
.ignore_err()
|
||||||
|
.then(async |mut pdu| {
|
||||||
|
pdu.1.set_unsigned(Some(sender_user));
|
||||||
|
if let Err(e) = services
|
||||||
|
.rooms
|
||||||
|
.pdu_metadata
|
||||||
|
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
debug_warn!("Failed to add bundled aggregations: {e}");
|
||||||
|
}
|
||||||
|
pdu
|
||||||
|
})
|
||||||
.ready_filter_map(|item| event_filter(item, filter))
|
.ready_filter_map(|item| event_filter(item, filter))
|
||||||
.wide_filter_map(|item| ignored_filter(&services, item, sender_user))
|
.wide_filter_map(|item| ignored_filter(&services, item, sender_user))
|
||||||
.wide_filter_map(|item| visibility_filter(&services, item, sender_user))
|
.wide_filter_map(|item| visibility_filter(&services, item, sender_user))
|
||||||
|
|
|
@ -2,7 +2,7 @@ use core::panic;
|
||||||
|
|
||||||
use axum::extract::State;
|
use axum::extract::State;
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Err, Result, at,
|
Err, Result, at, debug_warn,
|
||||||
matrix::{
|
matrix::{
|
||||||
Event,
|
Event,
|
||||||
pdu::{PduCount, PduEvent},
|
pdu::{PduCount, PduEvent},
|
||||||
|
@ -114,14 +114,14 @@ pub(crate) async fn get_message_events_route(
|
||||||
| Direction::Forward => services
|
| Direction::Forward => services
|
||||||
.rooms
|
.rooms
|
||||||
.timeline
|
.timeline
|
||||||
.pdus(Some(sender_user), room_id, Some(from))
|
.pdus(room_id, Some(from))
|
||||||
.ignore_err()
|
.ignore_err()
|
||||||
.boxed(),
|
.boxed(),
|
||||||
|
|
||||||
| Direction::Backward => services
|
| Direction::Backward => services
|
||||||
.rooms
|
.rooms
|
||||||
.timeline
|
.timeline
|
||||||
.pdus_rev(Some(sender_user), room_id, Some(from))
|
.pdus_rev(room_id, Some(from))
|
||||||
.ignore_err()
|
.ignore_err()
|
||||||
.boxed(),
|
.boxed(),
|
||||||
};
|
};
|
||||||
|
@ -132,6 +132,18 @@ pub(crate) async fn get_message_events_route(
|
||||||
.wide_filter_map(|item| ignored_filter(&services, item, sender_user))
|
.wide_filter_map(|item| ignored_filter(&services, item, sender_user))
|
||||||
.wide_filter_map(|item| visibility_filter(&services, item, sender_user))
|
.wide_filter_map(|item| visibility_filter(&services, item, sender_user))
|
||||||
.take(limit)
|
.take(limit)
|
||||||
|
.then(async |mut pdu| {
|
||||||
|
pdu.1.set_unsigned(Some(sender_user));
|
||||||
|
if let Err(e) = services
|
||||||
|
.rooms
|
||||||
|
.pdu_metadata
|
||||||
|
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
debug_warn!("Failed to add bundled aggregations: {e}");
|
||||||
|
}
|
||||||
|
pdu
|
||||||
|
})
|
||||||
.collect()
|
.collect()
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
use axum::extract::State;
|
use axum::extract::State;
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Result, at,
|
Result, at, debug_warn,
|
||||||
matrix::pdu::PduCount,
|
matrix::pdu::PduCount,
|
||||||
utils::{IterStream, ReadyExt, result::FlatOk, stream::WidebandExt},
|
utils::{IterStream, ReadyExt, result::FlatOk, stream::WidebandExt},
|
||||||
};
|
};
|
||||||
|
@ -149,6 +149,17 @@ async fn paginate_relations_with_filter(
|
||||||
.ready_take_while(|(count, _)| Some(*count) != to)
|
.ready_take_while(|(count, _)| Some(*count) != to)
|
||||||
.wide_filter_map(|item| visibility_filter(services, sender_user, item))
|
.wide_filter_map(|item| visibility_filter(services, sender_user, item))
|
||||||
.take(limit)
|
.take(limit)
|
||||||
|
.then(async |mut pdu| {
|
||||||
|
if let Err(e) = services
|
||||||
|
.rooms
|
||||||
|
.pdu_metadata
|
||||||
|
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
debug_warn!("Failed to add bundled aggregations to relation: {e}");
|
||||||
|
}
|
||||||
|
pdu
|
||||||
|
})
|
||||||
.collect()
|
.collect()
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
@ -172,6 +183,10 @@ async fn paginate_relations_with_filter(
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: Can we move the visibility filter lower down, to avoid checking events
|
||||||
|
// that won't be sent? At the moment this also results in getting events that
|
||||||
|
// appear to have no relation because intermediaries are not visible to the
|
||||||
|
// user.
|
||||||
async fn visibility_filter(
|
async fn visibility_filter(
|
||||||
services: &Services,
|
services: &Services,
|
||||||
sender_user: &UserId,
|
sender_user: &UserId,
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
use axum::extract::State;
|
use axum::extract::State;
|
||||||
use conduwuit::{Err, Event, Result, err};
|
use conduwuit::{Err, Event, Result, debug_warn, err};
|
||||||
use futures::{FutureExt, TryFutureExt, future::try_join};
|
use futures::{FutureExt, TryFutureExt, future::try_join};
|
||||||
use ruma::api::client::room::get_room_event;
|
use ruma::api::client::room::get_room_event;
|
||||||
|
|
||||||
|
@ -38,7 +38,16 @@ pub(crate) async fn get_room_event_route(
|
||||||
"Fetched PDU must match requested"
|
"Fetched PDU must match requested"
|
||||||
);
|
);
|
||||||
|
|
||||||
event.add_age().ok();
|
if let Err(e) = services
|
||||||
|
.rooms
|
||||||
|
.pdu_metadata
|
||||||
|
.add_bundled_aggregations_to_pdu(body.sender_user(), &mut event)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
debug_warn!("Failed to add bundled aggregations to event: {e}");
|
||||||
|
}
|
||||||
|
|
||||||
|
event.set_unsigned(body.sender_user.as_deref());
|
||||||
|
|
||||||
Ok(get_room_event::v3::Response { event: event.into_room_event() })
|
Ok(get_room_event::v3::Response { event: event.into_room_event() })
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
use axum::extract::State;
|
use axum::extract::State;
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Err, PduEvent, Result, at,
|
Err, PduEvent, Result, at, debug_warn,
|
||||||
utils::{BoolExt, stream::TryTools},
|
utils::{BoolExt, stream::TryTools},
|
||||||
};
|
};
|
||||||
use futures::TryStreamExt;
|
use futures::TryStreamExt;
|
||||||
|
@ -25,12 +25,28 @@ pub(crate) async fn room_initial_sync_route(
|
||||||
return Err!(Request(Forbidden("No room preview available.")));
|
return Err!(Request(Forbidden("No room preview available.")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Events are returned in body
|
||||||
|
|
||||||
let limit = LIMIT_MAX;
|
let limit = LIMIT_MAX;
|
||||||
let events: Vec<_> = services
|
let events: Vec<_> = services
|
||||||
.rooms
|
.rooms
|
||||||
.timeline
|
.timeline
|
||||||
.pdus_rev(None, room_id, None)
|
.pdus_rev(room_id, None)
|
||||||
.try_take(limit)
|
.try_take(limit)
|
||||||
|
.and_then(async |mut pdu| {
|
||||||
|
pdu.1.set_unsigned(body.sender_user.as_deref());
|
||||||
|
if let Some(sender_user) = body.sender_user.as_deref() {
|
||||||
|
if let Err(e) = services
|
||||||
|
.rooms
|
||||||
|
.pdu_metadata
|
||||||
|
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
debug_warn!("Failed to add bundled aggregations: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(pdu)
|
||||||
|
})
|
||||||
.try_collect()
|
.try_collect()
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,7 @@ use std::collections::BTreeMap;
|
||||||
|
|
||||||
use axum::extract::State;
|
use axum::extract::State;
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Err, Result, at, is_true,
|
Err, Result, at, debug_warn, is_true,
|
||||||
matrix::pdu::PduEvent,
|
matrix::pdu::PduEvent,
|
||||||
result::FlatOk,
|
result::FlatOk,
|
||||||
utils::{IterStream, stream::ReadyExt},
|
utils::{IterStream, stream::ReadyExt},
|
||||||
|
@ -144,6 +144,17 @@ async fn category_room_events(
|
||||||
.map(at!(2))
|
.map(at!(2))
|
||||||
.flatten()
|
.flatten()
|
||||||
.stream()
|
.stream()
|
||||||
|
.then(|mut pdu| async {
|
||||||
|
if let Err(e) = services
|
||||||
|
.rooms
|
||||||
|
.pdu_metadata
|
||||||
|
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
debug_warn!("Failed to add bundled aggregations to search result: {e}");
|
||||||
|
}
|
||||||
|
pdu
|
||||||
|
})
|
||||||
.map(PduEvent::into_room_event)
|
.map(PduEvent::into_room_event)
|
||||||
.map(|result| SearchResult {
|
.map(|result| SearchResult {
|
||||||
rank: None,
|
rank: None,
|
||||||
|
|
|
@ -3,7 +3,7 @@ mod v4;
|
||||||
mod v5;
|
mod v5;
|
||||||
|
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Error, PduCount, Result,
|
Error, PduCount, Result, debug_warn,
|
||||||
matrix::pdu::PduEvent,
|
matrix::pdu::PduEvent,
|
||||||
utils::stream::{BroadbandExt, ReadyExt, TryIgnore},
|
utils::stream::{BroadbandExt, ReadyExt, TryIgnore},
|
||||||
};
|
};
|
||||||
|
@ -31,11 +31,7 @@ async fn load_timeline(
|
||||||
next_batch: Option<PduCount>,
|
next_batch: Option<PduCount>,
|
||||||
limit: usize,
|
limit: usize,
|
||||||
) -> Result<(Vec<(PduCount, PduEvent)>, bool), Error> {
|
) -> Result<(Vec<(PduCount, PduEvent)>, bool), Error> {
|
||||||
let last_timeline_count = services
|
let last_timeline_count = services.rooms.timeline.last_timeline_count(room_id).await?;
|
||||||
.rooms
|
|
||||||
.timeline
|
|
||||||
.last_timeline_count(Some(sender_user), room_id)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
if last_timeline_count <= roomsincecount {
|
if last_timeline_count <= roomsincecount {
|
||||||
return Ok((Vec::new(), false));
|
return Ok((Vec::new(), false));
|
||||||
|
@ -44,10 +40,25 @@ async fn load_timeline(
|
||||||
let non_timeline_pdus = services
|
let non_timeline_pdus = services
|
||||||
.rooms
|
.rooms
|
||||||
.timeline
|
.timeline
|
||||||
.pdus_rev(Some(sender_user), room_id, None)
|
.pdus_rev(room_id, None)
|
||||||
.ignore_err()
|
.ignore_err()
|
||||||
.ready_skip_while(|&(pducount, _)| pducount > next_batch.unwrap_or_else(PduCount::max))
|
.ready_skip_while(|&(pducount, _)| pducount > next_batch.unwrap_or_else(PduCount::max))
|
||||||
.ready_take_while(|&(pducount, _)| pducount > roomsincecount);
|
.ready_take_while(|&(pducount, _)| pducount > roomsincecount)
|
||||||
|
.map(move |mut pdu| {
|
||||||
|
pdu.1.set_unsigned(Some(sender_user));
|
||||||
|
pdu
|
||||||
|
})
|
||||||
|
.then(async move |mut pdu| {
|
||||||
|
if let Err(e) = services
|
||||||
|
.rooms
|
||||||
|
.pdu_metadata
|
||||||
|
.add_bundled_aggregations_to_pdu(sender_user, &mut pdu.1)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
debug_warn!("Failed to add bundled aggregations: {e}");
|
||||||
|
}
|
||||||
|
pdu
|
||||||
|
});
|
||||||
|
|
||||||
// Take the last events for the timeline
|
// Take the last events for the timeline
|
||||||
pin_mut!(non_timeline_pdus);
|
pin_mut!(non_timeline_pdus);
|
||||||
|
|
|
@ -1191,7 +1191,7 @@ async fn calculate_heroes(
|
||||||
services
|
services
|
||||||
.rooms
|
.rooms
|
||||||
.timeline
|
.timeline
|
||||||
.all_pdus(sender_user, room_id)
|
.all_pdus(room_id)
|
||||||
.ready_filter(|(_, pdu)| pdu.kind == RoomMember)
|
.ready_filter(|(_, pdu)| pdu.kind == RoomMember)
|
||||||
.fold_default(|heroes: Vec<_>, (_, pdu)| {
|
.fold_default(|heroes: Vec<_>, (_, pdu)| {
|
||||||
fold_hero(heroes, services, room_id, sender_user, pdu)
|
fold_hero(heroes, services, room_id, sender_user, pdu)
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
use axum::extract::State;
|
use axum::extract::State;
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Result, at,
|
Result, at, debug_warn,
|
||||||
matrix::pdu::{PduCount, PduEvent},
|
matrix::pdu::{PduCount, PduEvent},
|
||||||
};
|
};
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
|
@ -28,6 +28,8 @@ pub(crate) async fn get_threads_route(
|
||||||
.transpose()?
|
.transpose()?
|
||||||
.unwrap_or_else(PduCount::max);
|
.unwrap_or_else(PduCount::max);
|
||||||
|
|
||||||
|
// TODO: user_can_see_event and set_unsigned should be at the same level /
|
||||||
|
// function, so unsigned is only set for seen events.
|
||||||
let threads: Vec<(PduCount, PduEvent)> = services
|
let threads: Vec<(PduCount, PduEvent)> = services
|
||||||
.rooms
|
.rooms
|
||||||
.threads
|
.threads
|
||||||
|
@ -42,6 +44,17 @@ pub(crate) async fn get_threads_route(
|
||||||
.await
|
.await
|
||||||
.then_some((count, pdu))
|
.then_some((count, pdu))
|
||||||
})
|
})
|
||||||
|
.then(|(count, mut pdu)| async move {
|
||||||
|
if let Err(e) = services
|
||||||
|
.rooms
|
||||||
|
.pdu_metadata
|
||||||
|
.add_bundled_aggregations_to_pdu(body.sender_user(), &mut pdu)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
debug_warn!("Failed to add bundled aggregations to thread: {e}");
|
||||||
|
}
|
||||||
|
(count, pdu)
|
||||||
|
})
|
||||||
.collect()
|
.collect()
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
|
|
@ -3,6 +3,7 @@ use std::cmp;
|
||||||
use axum::extract::State;
|
use axum::extract::State;
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
PduCount, Result,
|
PduCount, Result,
|
||||||
|
result::LogErr,
|
||||||
utils::{IterStream, ReadyExt, stream::TryTools},
|
utils::{IterStream, ReadyExt, stream::TryTools},
|
||||||
};
|
};
|
||||||
use futures::{FutureExt, StreamExt, TryStreamExt};
|
use futures::{FutureExt, StreamExt, TryStreamExt};
|
||||||
|
@ -62,7 +63,7 @@ pub(crate) async fn get_backfill_route(
|
||||||
pdus: services
|
pdus: services
|
||||||
.rooms
|
.rooms
|
||||||
.timeline
|
.timeline
|
||||||
.pdus_rev(None, &body.room_id, Some(from.saturating_add(1)))
|
.pdus_rev(&body.room_id, Some(from.saturating_add(1)))
|
||||||
.try_take(limit)
|
.try_take(limit)
|
||||||
.try_filter_map(|(_, pdu)| async move {
|
.try_filter_map(|(_, pdu)| async move {
|
||||||
Ok(services
|
Ok(services
|
||||||
|
@ -72,6 +73,15 @@ pub(crate) async fn get_backfill_route(
|
||||||
.await
|
.await
|
||||||
.then_some(pdu))
|
.then_some(pdu))
|
||||||
})
|
})
|
||||||
|
.and_then(async |mut pdu| {
|
||||||
|
// Strip the transaction ID, as that is private
|
||||||
|
pdu.remove_transaction_id().log_err().ok();
|
||||||
|
// Add age, as this is specified
|
||||||
|
pdu.add_age().log_err().ok();
|
||||||
|
// It's not clear if we should strip or add any more data, leave as is.
|
||||||
|
// In particular: Redaction?
|
||||||
|
Ok(pdu)
|
||||||
|
})
|
||||||
.try_filter_map(|pdu| async move {
|
.try_filter_map(|pdu| async move {
|
||||||
Ok(services
|
Ok(services
|
||||||
.rooms
|
.rooms
|
||||||
|
|
|
@ -219,6 +219,15 @@ pub fn check(config: &Config) -> Result {
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check if support contact information is configured
|
||||||
|
if config.well_known.support_email.is_none() && config.well_known.support_mxid.is_none() {
|
||||||
|
warn!(
|
||||||
|
"No support contact information (support_email or support_mxid) is configured in \
|
||||||
|
the well_known section. Users in the admin room will be automatically listed as \
|
||||||
|
support contacts in the /.well-known/matrix/support endpoint."
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
if config
|
if config
|
||||||
.url_preview_domain_contains_allowlist
|
.url_preview_domain_contains_allowlist
|
||||||
.contains(&"*".to_owned())
|
.contains(&"*".to_owned())
|
||||||
|
|
|
@ -1,11 +1,24 @@
|
||||||
use std::collections::BTreeMap;
|
use std::{borrow::Borrow, collections::BTreeMap};
|
||||||
|
|
||||||
use ruma::MilliSecondsSinceUnixEpoch;
|
use ruma::MilliSecondsSinceUnixEpoch;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use serde_json::value::{RawValue as RawJsonValue, Value as JsonValue, to_raw_value};
|
use serde_json::value::{RawValue as RawJsonValue, Value as JsonValue, to_raw_value};
|
||||||
|
|
||||||
use super::Pdu;
|
use super::Pdu;
|
||||||
use crate::{Result, err, implement, is_true};
|
use crate::{Result, err, implement, is_true, result::LogErr};
|
||||||
|
|
||||||
|
/// Set the `unsigned` field of the PDU using only information in the PDU.
|
||||||
|
/// Some unsigned data is already set within the database (eg. prev events,
|
||||||
|
/// threads). Once this is done, other data must be calculated from the database
|
||||||
|
/// (eg. relations) This is for server-to-client events.
|
||||||
|
/// Backfill handles this itself.
|
||||||
|
#[implement(Pdu)]
|
||||||
|
pub fn set_unsigned(&mut self, user_id: Option<&ruma::UserId>) {
|
||||||
|
if Some(self.sender.borrow()) != user_id {
|
||||||
|
self.remove_transaction_id().log_err().ok();
|
||||||
|
}
|
||||||
|
self.add_age().log_err().ok();
|
||||||
|
}
|
||||||
|
|
||||||
#[implement(Pdu)]
|
#[implement(Pdu)]
|
||||||
pub fn remove_transaction_id(&mut self) -> Result {
|
pub fn remove_transaction_id(&mut self) -> Result {
|
||||||
|
|
765
src/service/rooms/pdu_metadata/bundled_aggregations.rs
Normal file
765
src/service/rooms/pdu_metadata/bundled_aggregations.rs
Normal file
|
@ -0,0 +1,765 @@
|
||||||
|
use conduwuit::{Event, PduEvent, Result, err};
|
||||||
|
use ruma::{
|
||||||
|
EventId, RoomId, UserId,
|
||||||
|
api::Direction,
|
||||||
|
events::relation::{BundledMessageLikeRelations, BundledReference, ReferenceChunk},
|
||||||
|
};
|
||||||
|
|
||||||
|
use super::PdusIterItem;
|
||||||
|
|
||||||
|
const MAX_BUNDLED_RELATIONS: usize = 50;
|
||||||
|
|
||||||
|
impl super::Service {
|
||||||
|
/// Gets bundled aggregations for an event according to the Matrix
|
||||||
|
/// specification.
|
||||||
|
/// - m.replace relations are bundled to include the most recent replacement
|
||||||
|
/// event.
|
||||||
|
/// - m.reference relations are bundled to include a chunk of event IDs.
|
||||||
|
#[tracing::instrument(skip(self), level = "debug")]
|
||||||
|
pub async fn get_bundled_aggregations(
|
||||||
|
&self,
|
||||||
|
user_id: &UserId,
|
||||||
|
room_id: &RoomId,
|
||||||
|
event_id: &EventId,
|
||||||
|
) -> Result<Option<BundledMessageLikeRelations<Box<serde_json::value::RawValue>>>> {
|
||||||
|
let relations = self
|
||||||
|
.get_relations(
|
||||||
|
user_id,
|
||||||
|
room_id,
|
||||||
|
event_id,
|
||||||
|
conduwuit::PduCount::max(),
|
||||||
|
MAX_BUNDLED_RELATIONS,
|
||||||
|
0,
|
||||||
|
Direction::Backward,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
// The relations database code still handles the basic unsigned data
|
||||||
|
// We don't want to recursively fetch relations
|
||||||
|
|
||||||
|
// TODO: Event visibility check
|
||||||
|
// TODO: ignored users?
|
||||||
|
|
||||||
|
if relations.is_empty() {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the original event for validation of replacement events
|
||||||
|
let original_event = self.services.timeline.get_pdu(event_id).await?;
|
||||||
|
|
||||||
|
let mut replace_events = Vec::with_capacity(relations.len());
|
||||||
|
let mut reference_events = Vec::with_capacity(relations.len());
|
||||||
|
|
||||||
|
for relation in &relations {
|
||||||
|
let pdu = &relation.1;
|
||||||
|
|
||||||
|
let content = pdu.get_content_as_value();
|
||||||
|
if let Some(relates_to) = content.get("m.relates_to") {
|
||||||
|
// We don't check that the event relates back, because we assume the database is
|
||||||
|
// good.
|
||||||
|
if let Some(rel_type) = relates_to.get("rel_type") {
|
||||||
|
match rel_type.as_str() {
|
||||||
|
| Some("m.replace") => {
|
||||||
|
// Only consider valid replacements
|
||||||
|
if Self::is_valid_replacement_event(&original_event, pdu).await? {
|
||||||
|
replace_events.push(relation);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
| Some("m.reference") => {
|
||||||
|
reference_events.push(relation);
|
||||||
|
},
|
||||||
|
| _ => {
|
||||||
|
// Ignore other relation types for now
|
||||||
|
// Threads are in the database but not handled here
|
||||||
|
// Other types are not specified AFAICT.
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If no relations to bundle, return None
|
||||||
|
if replace_events.is_empty() && reference_events.is_empty() {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut bundled = BundledMessageLikeRelations::new();
|
||||||
|
|
||||||
|
// Handle m.replace relations - find the most recent one
|
||||||
|
if !replace_events.is_empty() {
|
||||||
|
let most_recent_replacement = Self::find_most_recent_replacement(&replace_events)?;
|
||||||
|
|
||||||
|
// Convert the replacement event to the bundled format
|
||||||
|
if let Some(replacement_pdu) = most_recent_replacement {
|
||||||
|
// According to the Matrix spec, we should include the full event as raw JSON
|
||||||
|
let replacement_json = serde_json::to_string(replacement_pdu)
|
||||||
|
.map_err(|e| err!(Database("Failed to serialize replacement event: {e}")))?;
|
||||||
|
let raw_value = serde_json::value::RawValue::from_string(replacement_json)
|
||||||
|
.map_err(|e| err!(Database("Failed to create RawValue: {e}")))?;
|
||||||
|
bundled.replace = Some(Box::new(raw_value));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle m.reference relations - collect event IDs
|
||||||
|
if !reference_events.is_empty() {
|
||||||
|
let reference_chunk = Self::build_reference_chunk(&reference_events)?;
|
||||||
|
if !reference_chunk.is_empty() {
|
||||||
|
bundled.reference = Some(Box::new(ReferenceChunk::new(reference_chunk)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Handle other relation types (m.annotation, etc.) when specified
|
||||||
|
|
||||||
|
Ok(Some(bundled))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Build reference chunk for m.reference bundled aggregations
|
||||||
|
fn build_reference_chunk(
|
||||||
|
reference_events: &[&PdusIterItem],
|
||||||
|
) -> Result<Vec<BundledReference>> {
|
||||||
|
let mut chunk = Vec::with_capacity(reference_events.len());
|
||||||
|
|
||||||
|
for relation in reference_events {
|
||||||
|
let pdu = &relation.1;
|
||||||
|
|
||||||
|
let reference_entry = BundledReference::new(pdu.event_id().to_owned());
|
||||||
|
chunk.push(reference_entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Don't sort, order is unspecified
|
||||||
|
|
||||||
|
Ok(chunk)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Find the most recent replacement event based on origin_server_ts and
|
||||||
|
/// lexicographic event_id ordering
|
||||||
|
fn find_most_recent_replacement<'a>(
|
||||||
|
replacement_events: &'a [&'a PdusIterItem],
|
||||||
|
) -> Result<Option<&'a PduEvent>> {
|
||||||
|
if replacement_events.is_empty() {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut most_recent: Option<&PduEvent> = None;
|
||||||
|
|
||||||
|
// Jank, is there a better way to do this?
|
||||||
|
for relation in replacement_events {
|
||||||
|
let pdu = &relation.1;
|
||||||
|
|
||||||
|
match most_recent {
|
||||||
|
| None => {
|
||||||
|
most_recent = Some(pdu);
|
||||||
|
},
|
||||||
|
| Some(current_most_recent) => {
|
||||||
|
// Compare by origin_server_ts first
|
||||||
|
match pdu
|
||||||
|
.origin_server_ts()
|
||||||
|
.cmp(¤t_most_recent.origin_server_ts())
|
||||||
|
{
|
||||||
|
| std::cmp::Ordering::Greater => {
|
||||||
|
most_recent = Some(pdu);
|
||||||
|
},
|
||||||
|
| std::cmp::Ordering::Equal => {
|
||||||
|
// If timestamps are equal, use lexicographic ordering of event_id
|
||||||
|
if pdu.event_id() > current_most_recent.event_id() {
|
||||||
|
most_recent = Some(pdu);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
| std::cmp::Ordering::Less => {
|
||||||
|
// Keep current most recent
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(most_recent)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Adds bundled aggregations to a PDU's unsigned field
|
||||||
|
#[tracing::instrument(skip(self, pdu), level = "debug")]
|
||||||
|
pub async fn add_bundled_aggregations_to_pdu(
|
||||||
|
&self,
|
||||||
|
user_id: &UserId,
|
||||||
|
pdu: &mut PduEvent,
|
||||||
|
) -> Result<()> {
|
||||||
|
if pdu.is_redacted() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let bundled_aggregations = self
|
||||||
|
.get_bundled_aggregations(user_id, pdu.room_id(), pdu.event_id())
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
if let Some(aggregations) = bundled_aggregations {
|
||||||
|
let aggregations_json = serde_json::to_value(aggregations)
|
||||||
|
.map_err(|e| err!(Database("Failed to serialize bundled aggregations: {e}")))?;
|
||||||
|
|
||||||
|
Self::add_bundled_aggregations_to_unsigned(pdu, aggregations_json)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Helper method to add bundled aggregations to a PDU's unsigned
|
||||||
|
/// field
|
||||||
|
fn add_bundled_aggregations_to_unsigned(
|
||||||
|
pdu: &mut PduEvent,
|
||||||
|
aggregations_json: serde_json::Value,
|
||||||
|
) -> Result<()> {
|
||||||
|
use serde_json::{
|
||||||
|
Map, Value as JsonValue,
|
||||||
|
value::{RawValue as RawJsonValue, to_raw_value},
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut unsigned: Map<String, JsonValue> = pdu
|
||||||
|
.unsigned
|
||||||
|
.as_deref()
|
||||||
|
.map(RawJsonValue::get)
|
||||||
|
.map_or_else(|| Ok(Map::new()), serde_json::from_str)
|
||||||
|
.map_err(|e| err!(Database("Invalid unsigned in pdu event: {e}")))?;
|
||||||
|
|
||||||
|
let relations = unsigned
|
||||||
|
.entry("m.relations")
|
||||||
|
.or_insert_with(|| JsonValue::Object(Map::new()))
|
||||||
|
.as_object_mut()
|
||||||
|
.ok_or_else(|| err!(Database("m.relations is not an object")))?;
|
||||||
|
|
||||||
|
if let JsonValue::Object(aggregations_map) = aggregations_json {
|
||||||
|
for (rel_type, aggregation) in aggregations_map {
|
||||||
|
relations.insert(rel_type, aggregation);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pdu.unsigned = Some(to_raw_value(&unsigned)?);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Validates that an event is acceptable as a replacement for another event
|
||||||
|
/// See C/S spec "Validity of replacement events"
|
||||||
|
#[tracing::instrument(level = "debug")]
|
||||||
|
async fn is_valid_replacement_event(
|
||||||
|
original_event: &PduEvent,
|
||||||
|
replacement_event: &PduEvent,
|
||||||
|
) -> Result<bool> {
|
||||||
|
// 1. Same room_id
|
||||||
|
if original_event.room_id() != replacement_event.room_id() {
|
||||||
|
return Ok(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. Same sender
|
||||||
|
if original_event.sender() != replacement_event.sender() {
|
||||||
|
return Ok(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. Same type
|
||||||
|
if original_event.event_type() != replacement_event.event_type() {
|
||||||
|
return Ok(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4. Neither event should have a state_key property
|
||||||
|
if original_event.state_key().is_some() || replacement_event.state_key().is_some() {
|
||||||
|
return Ok(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 5. Original event must not have rel_type of m.replace
|
||||||
|
let original_content = original_event.get_content_as_value();
|
||||||
|
if let Some(relates_to) = original_content.get("m.relates_to") {
|
||||||
|
if let Some(rel_type) = relates_to.get("rel_type") {
|
||||||
|
if rel_type.as_str() == Some("m.replace") {
|
||||||
|
return Ok(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 6. Replacement event must have m.new_content property
|
||||||
|
// Skip this check for encrypted events, as m.new_content would be inside the
|
||||||
|
// encrypted payload
|
||||||
|
if replacement_event.event_type() != &ruma::events::TimelineEventType::RoomEncrypted {
|
||||||
|
let replacement_content = replacement_event.get_content_as_value();
|
||||||
|
if replacement_content.get("m.new_content").is_none() {
|
||||||
|
return Ok(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(true)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use conduwuit_core::pdu::{EventHash, PduEvent};
|
||||||
|
use ruma::{UInt, events::TimelineEventType, owned_event_id, owned_room_id, owned_user_id};
|
||||||
|
use serde_json::{Value as JsonValue, json, value::to_raw_value};
|
||||||
|
|
||||||
|
fn create_test_pdu(unsigned_content: Option<JsonValue>) -> PduEvent {
|
||||||
|
PduEvent {
|
||||||
|
event_id: owned_event_id!("$test:example.com"),
|
||||||
|
room_id: owned_room_id!("!test:example.com"),
|
||||||
|
sender: owned_user_id!("@test:example.com"),
|
||||||
|
origin_server_ts: UInt::try_from(1_234_567_890_u64).unwrap(),
|
||||||
|
kind: TimelineEventType::RoomMessage,
|
||||||
|
content: to_raw_value(&json!({"msgtype": "m.text", "body": "test"})).unwrap(),
|
||||||
|
state_key: None,
|
||||||
|
prev_events: vec![],
|
||||||
|
depth: UInt::from(1_u32),
|
||||||
|
auth_events: vec![],
|
||||||
|
redacts: None,
|
||||||
|
unsigned: unsigned_content.map(|content| to_raw_value(&content).unwrap()),
|
||||||
|
hashes: EventHash { sha256: "test_hash".to_owned() },
|
||||||
|
signatures: None,
|
||||||
|
origin: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_bundled_aggregations() -> JsonValue {
|
||||||
|
json!({
|
||||||
|
"m.replace": {
|
||||||
|
"event_id": "$replace:example.com",
|
||||||
|
"origin_server_ts": 1_234_567_890,
|
||||||
|
"sender": "@replacer:example.com"
|
||||||
|
},
|
||||||
|
"m.reference": {
|
||||||
|
"count": 5,
|
||||||
|
"chunk": [
|
||||||
|
"$ref1:example.com",
|
||||||
|
"$ref2:example.com"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_add_bundled_aggregations_to_unsigned_no_existing_unsigned() {
|
||||||
|
let mut pdu = create_test_pdu(None);
|
||||||
|
let aggregations = create_bundled_aggregations();
|
||||||
|
|
||||||
|
let result = super::super::Service::add_bundled_aggregations_to_unsigned(
|
||||||
|
&mut pdu,
|
||||||
|
aggregations.clone(),
|
||||||
|
);
|
||||||
|
assert!(result.is_ok(), "Should succeed when no unsigned field exists");
|
||||||
|
|
||||||
|
assert!(pdu.unsigned.is_some(), "Unsigned field should be created");
|
||||||
|
|
||||||
|
let unsigned_str = pdu.unsigned.as_ref().unwrap().get();
|
||||||
|
let unsigned: JsonValue = serde_json::from_str(unsigned_str).unwrap();
|
||||||
|
|
||||||
|
assert!(unsigned.get("m.relations").is_some(), "m.relations should exist");
|
||||||
|
assert_eq!(
|
||||||
|
unsigned["m.relations"], aggregations,
|
||||||
|
"Relations should match the aggregations"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_add_bundled_aggregations_to_unsigned_overwrite_same_relation_type() {
|
||||||
|
let existing_unsigned = json!({
|
||||||
|
"m.relations": {
|
||||||
|
"m.replace": {
|
||||||
|
"event_id": "$old_replace:example.com",
|
||||||
|
"origin_server_ts": 1_111_111_111,
|
||||||
|
"sender": "@old_replacer:example.com"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut pdu = create_test_pdu(Some(existing_unsigned));
|
||||||
|
let new_aggregations = create_bundled_aggregations();
|
||||||
|
|
||||||
|
let result = super::super::Service::add_bundled_aggregations_to_unsigned(
|
||||||
|
&mut pdu,
|
||||||
|
new_aggregations.clone(),
|
||||||
|
);
|
||||||
|
assert!(result.is_ok(), "Should succeed when overwriting same relation type");
|
||||||
|
|
||||||
|
let unsigned_str = pdu.unsigned.as_ref().unwrap().get();
|
||||||
|
let unsigned: JsonValue = serde_json::from_str(unsigned_str).unwrap();
|
||||||
|
|
||||||
|
let relations = &unsigned["m.relations"];
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
relations["m.replace"], new_aggregations["m.replace"],
|
||||||
|
"m.replace should be updated"
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
relations["m.replace"]["event_id"], "$replace:example.com",
|
||||||
|
"Should have new event_id"
|
||||||
|
);
|
||||||
|
|
||||||
|
assert!(relations.get("m.reference").is_some(), "New m.reference should be added");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_add_bundled_aggregations_to_unsigned_preserve_other_unsigned_fields() {
|
||||||
|
// Test case: Other unsigned fields should be preserved
|
||||||
|
let existing_unsigned = json!({
|
||||||
|
"age": 98765,
|
||||||
|
"prev_content": {"msgtype": "m.text", "body": "old message"},
|
||||||
|
"redacted_because": {"event_id": "$redaction:example.com"},
|
||||||
|
"m.relations": {
|
||||||
|
"m.annotation": {"count": 1}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut pdu = create_test_pdu(Some(existing_unsigned));
|
||||||
|
let new_aggregations = json!({
|
||||||
|
"m.replace": {"event_id": "$new:example.com"}
|
||||||
|
});
|
||||||
|
|
||||||
|
let result = super::super::Service::add_bundled_aggregations_to_unsigned(
|
||||||
|
&mut pdu,
|
||||||
|
new_aggregations,
|
||||||
|
);
|
||||||
|
assert!(result.is_ok(), "Should succeed while preserving other fields");
|
||||||
|
|
||||||
|
let unsigned_str = pdu.unsigned.as_ref().unwrap().get();
|
||||||
|
let unsigned: JsonValue = serde_json::from_str(unsigned_str).unwrap();
|
||||||
|
|
||||||
|
// Verify all existing fields are preserved
|
||||||
|
assert_eq!(unsigned["age"], 98765, "age should be preserved");
|
||||||
|
assert!(unsigned.get("prev_content").is_some(), "prev_content should be preserved");
|
||||||
|
assert!(
|
||||||
|
unsigned.get("redacted_because").is_some(),
|
||||||
|
"redacted_because should be preserved"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Verify relations were merged correctly
|
||||||
|
let relations = &unsigned["m.relations"];
|
||||||
|
assert!(
|
||||||
|
relations.get("m.annotation").is_some(),
|
||||||
|
"Existing m.annotation should be preserved"
|
||||||
|
);
|
||||||
|
assert!(relations.get("m.replace").is_some(), "New m.replace should be added");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_add_bundled_aggregations_to_unsigned_invalid_existing_unsigned() {
|
||||||
|
// Test case: Invalid JSON in existing unsigned should result in error
|
||||||
|
let mut pdu = create_test_pdu(None);
|
||||||
|
// Manually set invalid unsigned data
|
||||||
|
pdu.unsigned = Some(to_raw_value(&"invalid json").unwrap());
|
||||||
|
|
||||||
|
let aggregations = create_bundled_aggregations();
|
||||||
|
let result =
|
||||||
|
super::super::Service::add_bundled_aggregations_to_unsigned(&mut pdu, aggregations);
|
||||||
|
|
||||||
|
assert!(result.is_err(), "fails when existing unsigned is invalid");
|
||||||
|
// Should we ignore the error and overwrite anyway?
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test helper function to create test PDU events
|
||||||
|
fn create_test_event(
|
||||||
|
event_id: &str,
|
||||||
|
room_id: &str,
|
||||||
|
sender: &str,
|
||||||
|
event_type: TimelineEventType,
|
||||||
|
content: &JsonValue,
|
||||||
|
state_key: Option<&str>,
|
||||||
|
) -> PduEvent {
|
||||||
|
PduEvent {
|
||||||
|
event_id: event_id.try_into().unwrap(),
|
||||||
|
room_id: room_id.try_into().unwrap(),
|
||||||
|
sender: sender.try_into().unwrap(),
|
||||||
|
origin_server_ts: UInt::try_from(1_234_567_890_u64).unwrap(),
|
||||||
|
kind: event_type,
|
||||||
|
content: to_raw_value(&content).unwrap(),
|
||||||
|
state_key: state_key.map(Into::into),
|
||||||
|
prev_events: vec![],
|
||||||
|
depth: UInt::from(1_u32),
|
||||||
|
auth_events: vec![],
|
||||||
|
redacts: None,
|
||||||
|
unsigned: None,
|
||||||
|
hashes: EventHash { sha256: "test_hash".to_owned() },
|
||||||
|
signatures: None,
|
||||||
|
origin: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test that a valid replacement event passes validation
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_valid_replacement_event() {
|
||||||
|
let original = create_test_event(
|
||||||
|
"$original:example.com",
|
||||||
|
"!room:example.com",
|
||||||
|
"@user:example.com",
|
||||||
|
TimelineEventType::RoomMessage,
|
||||||
|
&json!({"msgtype": "m.text", "body": "original message"}),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
|
let replacement = create_test_event(
|
||||||
|
"$replacement:example.com",
|
||||||
|
"!room:example.com",
|
||||||
|
"@user:example.com",
|
||||||
|
TimelineEventType::RoomMessage,
|
||||||
|
&json!({
|
||||||
|
"msgtype": "m.text",
|
||||||
|
"body": "* edited message",
|
||||||
|
"m.new_content": {
|
||||||
|
"msgtype": "m.text",
|
||||||
|
"body": "edited message"
|
||||||
|
},
|
||||||
|
"m.relates_to": {
|
||||||
|
"rel_type": "m.replace",
|
||||||
|
"event_id": "$original:example.com"
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
|
let result =
|
||||||
|
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
|
||||||
|
assert!(result.is_ok(), "Validation should succeed");
|
||||||
|
assert!(result.unwrap(), "Valid replacement event should be accepted");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test replacement event with different room ID is rejected
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_replacement_event_different_room() {
|
||||||
|
let original = create_test_event(
|
||||||
|
"$original:example.com",
|
||||||
|
"!room1:example.com",
|
||||||
|
"@user:example.com",
|
||||||
|
TimelineEventType::RoomMessage,
|
||||||
|
&json!({"msgtype": "m.text", "body": "original message"}),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
|
let replacement = create_test_event(
|
||||||
|
"$replacement:example.com",
|
||||||
|
"!room2:example.com", // Different room
|
||||||
|
"@user:example.com",
|
||||||
|
TimelineEventType::RoomMessage,
|
||||||
|
&json!({
|
||||||
|
"msgtype": "m.text",
|
||||||
|
"body": "* edited message",
|
||||||
|
"m.new_content": {
|
||||||
|
"msgtype": "m.text",
|
||||||
|
"body": "edited message"
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
|
let result =
|
||||||
|
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
|
||||||
|
assert!(result.is_ok(), "Validation should succeed");
|
||||||
|
assert!(!result.unwrap(), "Different room ID should be rejected");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test replacement event with different sender is rejected
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_replacement_event_different_sender() {
|
||||||
|
let original = create_test_event(
|
||||||
|
"$original:example.com",
|
||||||
|
"!room:example.com",
|
||||||
|
"@user1:example.com",
|
||||||
|
TimelineEventType::RoomMessage,
|
||||||
|
&json!({"msgtype": "m.text", "body": "original message"}),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
|
let replacement = create_test_event(
|
||||||
|
"$replacement:example.com",
|
||||||
|
"!room:example.com",
|
||||||
|
"@user2:example.com", // Different sender
|
||||||
|
TimelineEventType::RoomMessage,
|
||||||
|
&json!({
|
||||||
|
"msgtype": "m.text",
|
||||||
|
"body": "* edited message",
|
||||||
|
"m.new_content": {
|
||||||
|
"msgtype": "m.text",
|
||||||
|
"body": "edited message"
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
|
let result =
|
||||||
|
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
|
||||||
|
assert!(result.is_ok(), "Validation should succeed");
|
||||||
|
assert!(!result.unwrap(), "Different sender should be rejected");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test replacement event with different type is rejected
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_replacement_event_different_type() {
|
||||||
|
let original = create_test_event(
|
||||||
|
"$original:example.com",
|
||||||
|
"!room:example.com",
|
||||||
|
"@user:example.com",
|
||||||
|
TimelineEventType::RoomMessage,
|
||||||
|
&json!({"msgtype": "m.text", "body": "original message"}),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
|
let replacement = create_test_event(
|
||||||
|
"$replacement:example.com",
|
||||||
|
"!room:example.com",
|
||||||
|
"@user:example.com",
|
||||||
|
TimelineEventType::RoomTopic, // Different event type
|
||||||
|
&json!({
|
||||||
|
"topic": "new topic",
|
||||||
|
"m.new_content": {
|
||||||
|
"topic": "new topic"
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
|
let result =
|
||||||
|
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
|
||||||
|
assert!(result.is_ok(), "Validation should succeed");
|
||||||
|
assert!(!result.unwrap(), "Different event type should be rejected");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test replacement event with state key is rejected
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_replacement_event_with_state_key() {
|
||||||
|
let original = create_test_event(
|
||||||
|
"$original:example.com",
|
||||||
|
"!room:example.com",
|
||||||
|
"@user:example.com",
|
||||||
|
TimelineEventType::RoomName,
|
||||||
|
&json!({"name": "room name"}),
|
||||||
|
Some(""), // Has state key
|
||||||
|
);
|
||||||
|
|
||||||
|
let replacement = create_test_event(
|
||||||
|
"$replacement:example.com",
|
||||||
|
"!room:example.com",
|
||||||
|
"@user:example.com",
|
||||||
|
TimelineEventType::RoomName,
|
||||||
|
&json!({
|
||||||
|
"name": "new room name",
|
||||||
|
"m.new_content": {
|
||||||
|
"name": "new room name"
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
|
let result =
|
||||||
|
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
|
||||||
|
assert!(result.is_ok(), "Validation should succeed");
|
||||||
|
assert!(!result.unwrap(), "Event with state key should be rejected");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test replacement of an event that is already a replacement is rejected
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_replacement_event_original_is_replacement() {
|
||||||
|
let original = create_test_event(
|
||||||
|
"$original:example.com",
|
||||||
|
"!room:example.com",
|
||||||
|
"@user:example.com",
|
||||||
|
TimelineEventType::RoomMessage,
|
||||||
|
&json!({
|
||||||
|
"msgtype": "m.text",
|
||||||
|
"body": "* edited message",
|
||||||
|
"m.relates_to": {
|
||||||
|
"rel_type": "m.replace", // Original is already a replacement
|
||||||
|
"event_id": "$some_other:example.com"
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
|
let replacement = create_test_event(
|
||||||
|
"$replacement:example.com",
|
||||||
|
"!room:example.com",
|
||||||
|
"@user:example.com",
|
||||||
|
TimelineEventType::RoomMessage,
|
||||||
|
&json!({
|
||||||
|
"msgtype": "m.text",
|
||||||
|
"body": "* edited again",
|
||||||
|
"m.new_content": {
|
||||||
|
"msgtype": "m.text",
|
||||||
|
"body": "edited again"
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
|
let result =
|
||||||
|
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
|
||||||
|
assert!(result.is_ok(), "Validation should succeed");
|
||||||
|
assert!(!result.unwrap(), "Replacement of replacement should be rejected");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test replacement event missing m.new_content is rejected
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_replacement_event_missing_new_content() {
|
||||||
|
let original = create_test_event(
|
||||||
|
"$original:example.com",
|
||||||
|
"!room:example.com",
|
||||||
|
"@user:example.com",
|
||||||
|
TimelineEventType::RoomMessage,
|
||||||
|
&json!({"msgtype": "m.text", "body": "original message"}),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
|
let replacement = create_test_event(
|
||||||
|
"$replacement:example.com",
|
||||||
|
"!room:example.com",
|
||||||
|
"@user:example.com",
|
||||||
|
TimelineEventType::RoomMessage,
|
||||||
|
&json!({
|
||||||
|
"msgtype": "m.text",
|
||||||
|
"body": "* edited message"
|
||||||
|
// Missing m.new_content
|
||||||
|
}),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
|
let result =
|
||||||
|
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
|
||||||
|
assert!(result.is_ok(), "Validation should succeed");
|
||||||
|
assert!(!result.unwrap(), "Missing m.new_content should be rejected");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test encrypted replacement event without m.new_content is accepted
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_replacement_event_encrypted_missing_new_content_is_valid() {
|
||||||
|
let original = create_test_event(
|
||||||
|
"$original:example.com",
|
||||||
|
"!room:example.com",
|
||||||
|
"@user:example.com",
|
||||||
|
TimelineEventType::RoomEncrypted,
|
||||||
|
&json!({
|
||||||
|
"algorithm": "m.megolm.v1.aes-sha2",
|
||||||
|
"ciphertext": "encrypted_payload_base64",
|
||||||
|
"sender_key": "sender_key",
|
||||||
|
"session_id": "session_id"
|
||||||
|
}),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
|
let replacement = create_test_event(
|
||||||
|
"$replacement:example.com",
|
||||||
|
"!room:example.com",
|
||||||
|
"@user:example.com",
|
||||||
|
TimelineEventType::RoomEncrypted,
|
||||||
|
&json!({
|
||||||
|
"algorithm": "m.megolm.v1.aes-sha2",
|
||||||
|
"ciphertext": "encrypted_replacement_payload_base64",
|
||||||
|
"sender_key": "sender_key",
|
||||||
|
"session_id": "session_id",
|
||||||
|
"m.relates_to": {
|
||||||
|
"rel_type": "m.replace",
|
||||||
|
"event_id": "$original:example.com"
|
||||||
|
}
|
||||||
|
// No m.new_content in cleartext - this is valid for encrypted events
|
||||||
|
}),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
|
let result =
|
||||||
|
super::super::Service::is_valid_replacement_event(&original, &replacement).await;
|
||||||
|
assert!(result.is_ok(), "Validation should succeed");
|
||||||
|
assert!(
|
||||||
|
result.unwrap(),
|
||||||
|
"Encrypted replacement without cleartext m.new_content should be accepted"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
|
@ -3,7 +3,6 @@ use std::{mem::size_of, sync::Arc};
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
PduCount, PduEvent,
|
PduCount, PduEvent,
|
||||||
arrayvec::ArrayVec,
|
arrayvec::ArrayVec,
|
||||||
result::LogErr,
|
|
||||||
utils::{
|
utils::{
|
||||||
ReadyExt,
|
ReadyExt,
|
||||||
stream::{TryIgnore, WidebandExt},
|
stream::{TryIgnore, WidebandExt},
|
||||||
|
@ -80,9 +79,7 @@ impl Data {
|
||||||
|
|
||||||
let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?;
|
let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?;
|
||||||
|
|
||||||
if pdu.sender != user_id {
|
pdu.set_unsigned(Some(user_id));
|
||||||
pdu.remove_transaction_id().log_err().ok();
|
|
||||||
}
|
|
||||||
|
|
||||||
Some((shorteventid, pdu))
|
Some((shorteventid, pdu))
|
||||||
})
|
})
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
mod bundled_aggregations;
|
||||||
mod data;
|
mod data;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
|
|
@ -127,7 +127,12 @@ pub async fn search_pdus<'a>(
|
||||||
.then_some(pdu)
|
.then_some(pdu)
|
||||||
})
|
})
|
||||||
.skip(query.skip)
|
.skip(query.skip)
|
||||||
.take(query.limit);
|
.take(query.limit)
|
||||||
|
.map(move |mut pdu| {
|
||||||
|
pdu.set_unsigned(query.user_id);
|
||||||
|
// TODO: bundled aggregation
|
||||||
|
pdu
|
||||||
|
});
|
||||||
|
|
||||||
Ok((count, pdus))
|
Ok((count, pdus))
|
||||||
}
|
}
|
||||||
|
|
|
@ -160,9 +160,7 @@ impl Service {
|
||||||
let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?;
|
let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?;
|
||||||
let pdu_id: PduId = pdu_id.into();
|
let pdu_id: PduId = pdu_id.into();
|
||||||
|
|
||||||
if pdu.sender != user_id {
|
pdu.set_unsigned(Some(user_id));
|
||||||
pdu.remove_transaction_id().ok();
|
|
||||||
}
|
|
||||||
|
|
||||||
Some((pdu_id.shorteventid, pdu))
|
Some((pdu_id.shorteventid, pdu))
|
||||||
});
|
});
|
||||||
|
|
|
@ -1,14 +1,11 @@
|
||||||
use std::{borrow::Borrow, sync::Arc};
|
use std::sync::Arc;
|
||||||
|
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Err, PduCount, PduEvent, Result, at, err,
|
Err, PduCount, PduEvent, Result, at, err, result::NotFound, utils, utils::stream::TryReadyExt,
|
||||||
result::{LogErr, NotFound},
|
|
||||||
utils,
|
|
||||||
utils::stream::TryReadyExt,
|
|
||||||
};
|
};
|
||||||
use database::{Database, Deserialized, Json, KeyVal, Map};
|
use database::{Database, Deserialized, Json, KeyVal, Map};
|
||||||
use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, future::select_ok, pin_mut};
|
use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, future::select_ok, pin_mut};
|
||||||
use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, UserId, api::Direction};
|
use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, api::Direction};
|
||||||
|
|
||||||
use super::{PduId, RawPduId};
|
use super::{PduId, RawPduId};
|
||||||
use crate::{Dep, rooms, rooms::short::ShortRoomId};
|
use crate::{Dep, rooms, rooms::short::ShortRoomId};
|
||||||
|
@ -46,12 +43,8 @@ impl Data {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
pub(super) async fn last_timeline_count(
|
pub(super) async fn last_timeline_count(&self, room_id: &RoomId) -> Result<PduCount> {
|
||||||
&self,
|
let pdus_rev = self.pdus_rev(room_id, PduCount::max());
|
||||||
sender_user: Option<&UserId>,
|
|
||||||
room_id: &RoomId,
|
|
||||||
) -> Result<PduCount> {
|
|
||||||
let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max());
|
|
||||||
|
|
||||||
pin_mut!(pdus_rev);
|
pin_mut!(pdus_rev);
|
||||||
let last_count = pdus_rev
|
let last_count = pdus_rev
|
||||||
|
@ -65,12 +58,8 @@ impl Data {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
pub(super) async fn latest_pdu_in_room(
|
pub(super) async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result<PduEvent> {
|
||||||
&self,
|
let pdus_rev = self.pdus_rev(room_id, PduCount::max());
|
||||||
sender_user: Option<&UserId>,
|
|
||||||
room_id: &RoomId,
|
|
||||||
) -> Result<PduEvent> {
|
|
||||||
let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max());
|
|
||||||
|
|
||||||
pin_mut!(pdus_rev);
|
pin_mut!(pdus_rev);
|
||||||
pdus_rev
|
pdus_rev
|
||||||
|
@ -223,7 +212,6 @@ impl Data {
|
||||||
/// order.
|
/// order.
|
||||||
pub(super) fn pdus_rev<'a>(
|
pub(super) fn pdus_rev<'a>(
|
||||||
&'a self,
|
&'a self,
|
||||||
user_id: Option<&'a UserId>,
|
|
||||||
room_id: &'a RoomId,
|
room_id: &'a RoomId,
|
||||||
until: PduCount,
|
until: PduCount,
|
||||||
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
|
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
|
||||||
|
@ -233,14 +221,13 @@ impl Data {
|
||||||
self.pduid_pdu
|
self.pduid_pdu
|
||||||
.rev_raw_stream_from(¤t)
|
.rev_raw_stream_from(¤t)
|
||||||
.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
|
.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
|
||||||
.ready_and_then(move |item| Self::each_pdu(item, user_id))
|
.ready_and_then(Self::from_json_slice)
|
||||||
})
|
})
|
||||||
.try_flatten_stream()
|
.try_flatten_stream()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn pdus<'a>(
|
pub(super) fn pdus<'a>(
|
||||||
&'a self,
|
&'a self,
|
||||||
user_id: Option<&'a UserId>,
|
|
||||||
room_id: &'a RoomId,
|
room_id: &'a RoomId,
|
||||||
from: PduCount,
|
from: PduCount,
|
||||||
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
|
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
|
||||||
|
@ -250,21 +237,15 @@ impl Data {
|
||||||
self.pduid_pdu
|
self.pduid_pdu
|
||||||
.raw_stream_from(¤t)
|
.raw_stream_from(¤t)
|
||||||
.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
|
.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
|
||||||
.ready_and_then(move |item| Self::each_pdu(item, user_id))
|
.ready_and_then(Self::from_json_slice)
|
||||||
})
|
})
|
||||||
.try_flatten_stream()
|
.try_flatten_stream()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn each_pdu((pdu_id, pdu): KeyVal<'_>, user_id: Option<&UserId>) -> Result<PdusIterItem> {
|
fn from_json_slice((pdu_id, pdu): KeyVal<'_>) -> Result<PdusIterItem> {
|
||||||
let pdu_id: RawPduId = pdu_id.into();
|
let pdu_id: RawPduId = pdu_id.into();
|
||||||
|
|
||||||
let mut pdu = serde_json::from_slice::<PduEvent>(pdu)?;
|
let pdu = serde_json::from_slice::<PduEvent>(pdu)?;
|
||||||
|
|
||||||
if Some(pdu.sender.borrow()) != user_id {
|
|
||||||
pdu.remove_transaction_id().log_err().ok();
|
|
||||||
}
|
|
||||||
|
|
||||||
pdu.add_age().log_err().ok();
|
|
||||||
|
|
||||||
Ok((pdu_id.pdu_count(), pdu))
|
Ok((pdu_id.pdu_count(), pdu))
|
||||||
}
|
}
|
||||||
|
|
|
@ -165,7 +165,7 @@ impl Service {
|
||||||
|
|
||||||
#[tracing::instrument(skip(self), level = "debug")]
|
#[tracing::instrument(skip(self), level = "debug")]
|
||||||
pub async fn first_item_in_room(&self, room_id: &RoomId) -> Result<(PduCount, PduEvent)> {
|
pub async fn first_item_in_room(&self, room_id: &RoomId) -> Result<(PduCount, PduEvent)> {
|
||||||
let pdus = self.pdus(None, room_id, None);
|
let pdus = self.pdus(room_id, None);
|
||||||
|
|
||||||
pin_mut!(pdus);
|
pin_mut!(pdus);
|
||||||
pdus.try_next()
|
pdus.try_next()
|
||||||
|
@ -175,16 +175,12 @@ impl Service {
|
||||||
|
|
||||||
#[tracing::instrument(skip(self), level = "debug")]
|
#[tracing::instrument(skip(self), level = "debug")]
|
||||||
pub async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result<PduEvent> {
|
pub async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result<PduEvent> {
|
||||||
self.db.latest_pdu_in_room(None, room_id).await
|
self.db.latest_pdu_in_room(room_id).await
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(self), level = "debug")]
|
#[tracing::instrument(skip(self), level = "debug")]
|
||||||
pub async fn last_timeline_count(
|
pub async fn last_timeline_count(&self, room_id: &RoomId) -> Result<PduCount> {
|
||||||
&self,
|
self.db.last_timeline_count(room_id).await
|
||||||
sender_user: Option<&UserId>,
|
|
||||||
room_id: &RoomId,
|
|
||||||
) -> Result<PduCount> {
|
|
||||||
self.db.last_timeline_count(sender_user, room_id).await
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the `count` of this pdu's id.
|
/// Returns the `count` of this pdu's id.
|
||||||
|
@ -545,6 +541,10 @@ impl Service {
|
||||||
| _ => {},
|
| _ => {},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CONCERN: If we receive events with a relation out-of-order, we never write
|
||||||
|
// their relation / thread. We need some kind of way to trigger when we receive
|
||||||
|
// this event, and potentially a way to rebuild the table entirely.
|
||||||
|
|
||||||
if let Ok(content) = pdu.get_content::<ExtractRelatesToEventId>() {
|
if let Ok(content) = pdu.get_content::<ExtractRelatesToEventId>() {
|
||||||
if let Ok(related_pducount) = self.get_pdu_count(&content.relates_to.event_id).await {
|
if let Ok(related_pducount) = self.get_pdu_count(&content.relates_to.event_id).await {
|
||||||
self.services
|
self.services
|
||||||
|
@ -996,34 +996,30 @@ impl Service {
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn all_pdus<'a>(
|
pub fn all_pdus<'a>(
|
||||||
&'a self,
|
&'a self,
|
||||||
user_id: &'a UserId,
|
|
||||||
room_id: &'a RoomId,
|
room_id: &'a RoomId,
|
||||||
) -> impl Stream<Item = PdusIterItem> + Send + 'a {
|
) -> impl Stream<Item = PdusIterItem> + Send + 'a {
|
||||||
self.pdus(Some(user_id), room_id, None).ignore_err()
|
self.pdus(room_id, None).ignore_err()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reverse iteration starting at from.
|
/// Reverse iteration starting at from.
|
||||||
#[tracing::instrument(skip(self), level = "debug")]
|
#[tracing::instrument(skip(self), level = "debug")]
|
||||||
pub fn pdus_rev<'a>(
|
pub fn pdus_rev<'a>(
|
||||||
&'a self,
|
&'a self,
|
||||||
user_id: Option<&'a UserId>,
|
|
||||||
room_id: &'a RoomId,
|
room_id: &'a RoomId,
|
||||||
until: Option<PduCount>,
|
until: Option<PduCount>,
|
||||||
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
|
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
|
||||||
self.db
|
self.db
|
||||||
.pdus_rev(user_id, room_id, until.unwrap_or_else(PduCount::max))
|
.pdus_rev(room_id, until.unwrap_or_else(PduCount::max))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Forward iteration starting at from.
|
/// Forward iteration starting at from.
|
||||||
#[tracing::instrument(skip(self), level = "debug")]
|
#[tracing::instrument(skip(self), level = "debug")]
|
||||||
pub fn pdus<'a>(
|
pub fn pdus<'a>(
|
||||||
&'a self,
|
&'a self,
|
||||||
user_id: Option<&'a UserId>,
|
|
||||||
room_id: &'a RoomId,
|
room_id: &'a RoomId,
|
||||||
from: Option<PduCount>,
|
from: Option<PduCount>,
|
||||||
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
|
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
|
||||||
self.db
|
self.db.pdus(room_id, from.unwrap_or_else(PduCount::min))
|
||||||
.pdus(user_id, room_id, from.unwrap_or_else(PduCount::min))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Replace a PDU with the redacted form.
|
/// Replace a PDU with the redacted form.
|
||||||
|
|
|
@ -781,7 +781,7 @@ impl Service {
|
||||||
|
|
||||||
for pdu in pdus {
|
for pdu in pdus {
|
||||||
// Redacted events are not notification targets (we don't send push for them)
|
// Redacted events are not notification targets (we don't send push for them)
|
||||||
if pdu.contains_unsigned_property("redacted_because", serde_json::Value::is_string) {
|
if pdu.is_redacted() {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue