Post-formatting aesthetic and spacing corrections

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2025-04-27 02:39:28 +00:00 committed by Jade Ellis
parent af4f66c768
commit 364293608d
No known key found for this signature in database
GPG key ID: 8705A2A3EBF77BD2
72 changed files with 704 additions and 528 deletions

View file

@ -558,8 +558,8 @@ pub(super) async fn force_set_room_state_from_server(
.latest_pdu_in_room(&room_id)
.await
.map_err(|_| err!(Database("Failed to find the latest PDU in database")))?
.event_id
.clone(),
.event_id()
.to_owned(),
};
let room_version = self.services.rooms.state.get_room_version(&room_id).await?;

View file

@ -738,7 +738,7 @@ pub(super) async fn force_demote(&self, user_id: String, room_id: OwnedRoomOrAli
.state_accessor
.room_state_get(&room_id, &StateEventType::RoomCreate, "")
.await
.is_ok_and(|event| event.sender == user_id);
.is_ok_and(|event| event.sender() == user_id);
if !user_can_demote_self {
return Err!("User is not allowed to modify their own power levels in the room.",);
@ -889,10 +889,7 @@ pub(super) async fn redact_event(&self, event_id: OwnedEventId) -> Result {
return Err!("Event is already redacted.");
}
let room_id = event.room_id;
let sender_user = event.sender;
if !self.services.globals.user_is_local(&sender_user) {
if !self.services.globals.user_is_local(event.sender()) {
return Err!("This command only works on local users.");
}
@ -902,21 +899,21 @@ pub(super) async fn redact_event(&self, event_id: OwnedEventId) -> Result {
);
let redaction_event_id = {
let state_lock = self.services.rooms.state.mutex.lock(&room_id).await;
let state_lock = self.services.rooms.state.mutex.lock(event.room_id()).await;
self.services
.rooms
.timeline
.build_and_append_pdu(
PduBuilder {
redacts: Some(event.event_id.clone()),
redacts: Some(event.event_id().to_owned()),
..PduBuilder::timeline(&RoomRedactionEventContent {
redacts: Some(event.event_id.clone()),
redacts: Some(event.event_id().to_owned()),
reason: Some(reason),
})
},
&sender_user,
&room_id,
event.sender(),
event.room_id(),
&state_lock,
)
.await?

View file

@ -3,10 +3,9 @@ use std::fmt::Write;
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{
Err, Error, Result, debug_info, err, error, info, is_equal_to,
Err, Error, Event, Result, debug_info, err, error, info, is_equal_to,
matrix::pdu::PduBuilder,
utils,
utils::{ReadyExt, stream::BroadbandExt},
utils::{self, ReadyExt, stream::BroadbandExt},
warn,
};
use conduwuit_service::Services;
@ -140,16 +139,32 @@ pub(crate) async fn register_route(
if !services.config.allow_registration && body.appservice_info.is_none() {
match (body.username.as_ref(), body.initial_device_display_name.as_ref()) {
| (Some(username), Some(device_display_name)) => {
info!(%is_guest, user = %username, device_name = %device_display_name, "Rejecting registration attempt as registration is disabled");
info!(
%is_guest,
user = %username,
device_name = %device_display_name,
"Rejecting registration attempt as registration is disabled"
);
},
| (Some(username), _) => {
info!(%is_guest, user = %username, "Rejecting registration attempt as registration is disabled");
info!(
%is_guest,
user = %username,
"Rejecting registration attempt as registration is disabled"
);
},
| (_, Some(device_display_name)) => {
info!(%is_guest, device_name = %device_display_name, "Rejecting registration attempt as registration is disabled");
info!(
%is_guest,
device_name = %device_display_name,
"Rejecting registration attempt as registration is disabled"
);
},
| (None, _) => {
info!(%is_guest, "Rejecting registration attempt as registration is disabled");
info!(
%is_guest,
"Rejecting registration attempt as registration is disabled"
);
},
}
@ -835,6 +850,7 @@ pub async fn full_user_deactivate(
all_joined_rooms: &[OwnedRoomId],
) -> Result<()> {
services.users.deactivate_account(user_id).await.ok();
super::update_displayname(services, user_id, None, all_joined_rooms).await;
super::update_avatar_url(services, user_id, None, None, all_joined_rooms).await;
@ -871,7 +887,7 @@ pub async fn full_user_deactivate(
.state_accessor
.room_state_get(room_id, &StateEventType::RoomCreate, "")
.await
.is_ok_and(|event| event.sender == user_id);
.is_ok_and(|event| event.sender() == user_id);
if user_can_demote_self {
let mut power_levels_content = room_power_levels.unwrap_or_default();

View file

@ -1,7 +1,7 @@
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{
Err, Result, err, info,
Err, Event, Result, err, info,
utils::{
TryFutureExtExt,
math::Expected,
@ -352,7 +352,7 @@ async fn user_can_publish_room(
.room_state_get(room_id, &StateEventType::RoomPowerLevels, "")
.await
{
| Ok(event) => serde_json::from_str(event.content.get())
| Ok(event) => serde_json::from_str(event.content().get())
.map_err(|_| err!(Database("Invalid event content for m.room.power_levels")))
.map(|content: RoomPowerLevelsEventContent| {
RoomPowerLevels::from(content)
@ -365,7 +365,7 @@ async fn user_can_publish_room(
.room_state_get(room_id, &StateEventType::RoomCreate, "")
.await
{
| Ok(event) => Ok(event.sender == user_id),
| Ok(event) => Ok(event.sender() == user_id),
| _ => Err!(Request(Forbidden("User is not allowed to publish this room"))),
}
},

View file

@ -2,7 +2,7 @@ use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{
Err, Result, debug_error, err, info,
matrix::pdu::{PduBuilder, gen_event_id_canonical_json},
matrix::{event::gen_event_id_canonical_json, pdu::PduBuilder},
};
use futures::{FutureExt, join};
use ruma::{

View file

@ -6,7 +6,8 @@ use conduwuit::{
Err, Result, debug, debug_info, debug_warn, err, error, info,
matrix::{
StateKey,
pdu::{PduBuilder, PduEvent, gen_event_id, gen_event_id_canonical_json},
event::{gen_event_id, gen_event_id_canonical_json},
pdu::{PduBuilder, PduEvent},
state_res,
},
result::FlatOk,

View file

@ -4,7 +4,10 @@ use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{
Err, Result, debug, debug_info, debug_warn, err, info,
matrix::pdu::{PduBuilder, PduEvent, gen_event_id},
matrix::{
event::{Event, gen_event_id},
pdu::{PduBuilder, PduEvent},
},
result::FlatOk,
trace,
utils::{self, shuffle, stream::IterStream},

View file

@ -3,7 +3,7 @@ use std::collections::HashSet;
use axum::extract::State;
use conduwuit::{
Err, Result, debug_info, debug_warn, err,
matrix::pdu::{PduBuilder, gen_event_id},
matrix::{event::gen_event_id, pdu::PduBuilder},
utils::{self, FutureBoolExt, future::ReadyEqExt},
warn,
};

View file

@ -1,13 +1,12 @@
use axum::extract::State;
use conduwuit::{
Err, Event, Result, at,
matrix::pdu::PduEvent,
utils::{
future::TryExtExt,
stream::{BroadbandExt, ReadyExt},
},
};
use futures::{StreamExt, future::join};
use futures::{FutureExt, StreamExt, future::join};
use ruma::{
api::client::membership::{
get_member_events::{self, v3::MembershipEventFilter},
@ -55,6 +54,7 @@ pub(crate) async fn get_member_events_route(
.ready_filter_map(|pdu| membership_filter(pdu, membership, not_membership))
.map(Event::into_format)
.collect()
.boxed()
.await,
})
}
@ -98,11 +98,11 @@ pub(crate) async fn joined_members_route(
})
}
fn membership_filter(
pdu: PduEvent,
fn membership_filter<Pdu: Event>(
pdu: Pdu,
for_membership: Option<&MembershipEventFilter>,
not_membership: Option<&MembershipEventFilter>,
) -> Option<PduEvent> {
) -> Option<impl Event> {
let membership_state_filter = match for_membership {
| Some(MembershipEventFilter::Ban) => MembershipState::Ban,
| Some(MembershipEventFilter::Invite) => MembershipState::Invite,

View file

@ -2,9 +2,10 @@ use axum::extract::State;
use conduwuit::{
Err, Result, at,
matrix::{
Event,
pdu::{PduCount, PduEvent},
event::{Event, Matches},
pdu::PduCount,
},
ref_at,
utils::{
IterStream, ReadyExt,
result::{FlatOk, LogErr},
@ -216,7 +217,9 @@ where
pin_mut!(receipts);
let witness: Witness = events
.stream()
.map(|(_, pdu)| pdu.sender.clone())
.map(ref_at!(1))
.map(Event::sender)
.map(ToOwned::to_owned)
.chain(
receipts
.ready_take_while(|(_, c, _)| *c <= newest.into_unsigned())
@ -261,27 +264,33 @@ pub(crate) async fn ignored_filter(
}
#[inline]
pub(crate) async fn is_ignored_pdu(
pub(crate) async fn is_ignored_pdu<Pdu>(
services: &Services,
pdu: &PduEvent,
event: &Pdu,
user_id: &UserId,
) -> bool {
) -> bool
where
Pdu: Event + Send + Sync,
{
// exclude Synapse's dummy events from bloating up response bodies. clients
// don't need to see this.
if pdu.kind.to_cow_str() == "org.matrix.dummy_event" {
if event.kind().to_cow_str() == "org.matrix.dummy_event" {
return true;
}
let ignored_type = IGNORED_MESSAGE_TYPES.binary_search(&pdu.kind).is_ok();
let ignored_type = IGNORED_MESSAGE_TYPES.binary_search(event.kind()).is_ok();
let ignored_server = services
.moderation
.is_remote_server_ignored(pdu.sender().server_name());
.is_remote_server_ignored(event.sender().server_name());
if ignored_type
&& (ignored_server
|| (!services.config.send_messages_from_ignored_users_to_client
&& services.users.user_is_ignored(&pdu.sender, user_id).await))
&& services
.users
.user_is_ignored(event.sender(), user_id)
.await))
{
return true;
}
@ -300,7 +309,7 @@ pub(crate) async fn visibility_filter(
services
.rooms
.state_accessor
.user_can_see_event(user_id, &pdu.room_id, &pdu.event_id)
.user_can_see_event(user_id, pdu.room_id(), pdu.event_id())
.await
.then_some(item)
}
@ -308,7 +317,7 @@ pub(crate) async fn visibility_filter(
#[inline]
pub(crate) fn event_filter(item: PdusIterItem, filter: &RoomEventFilter) -> Option<PdusIterItem> {
let (_, pdu) = &item;
pdu.matches(filter).then_some(item)
filter.matches(pdu).then_some(item)
}
#[cfg_attr(debug_assertions, conduwuit::ctor)]

View file

@ -195,11 +195,9 @@ pub(crate) async fn get_avatar_url_route(
services
.users
.set_displayname(&body.user_id, response.displayname.clone());
services
.users
.set_avatar_url(&body.user_id, response.avatar_url.clone());
services
.users
.set_blurhash(&body.user_id, response.blurhash.clone());
@ -256,15 +254,12 @@ pub(crate) async fn get_profile_route(
services
.users
.set_displayname(&body.user_id, response.displayname.clone());
services
.users
.set_avatar_url(&body.user_id, response.avatar_url.clone());
services
.users
.set_blurhash(&body.user_id, response.blurhash.clone());
services
.users
.set_timezone(&body.user_id, response.tz.clone());

View file

@ -1,10 +1,10 @@
use axum::extract::State;
use conduwuit::{
Result, at,
matrix::{Event, pdu::PduCount},
matrix::{Event, event::RelationTypeEqual, pdu::PduCount},
utils::{IterStream, ReadyExt, result::FlatOk, stream::WidebandExt},
};
use conduwuit_service::{Services, rooms::timeline::PdusIterItem};
use conduwuit_service::Services;
use futures::StreamExt;
use ruma::{
EventId, RoomId, UInt, UserId,
@ -129,7 +129,7 @@ async fn paginate_relations_with_filter(
// Spec (v1.10) recommends depth of at least 3
let depth: u8 = if recurse { 3 } else { 1 };
let events: Vec<PdusIterItem> = services
let events: Vec<_> = services
.rooms
.pdu_metadata
.get_relations(sender_user, room_id, target, start, limit, depth, dir)
@ -138,12 +138,12 @@ async fn paginate_relations_with_filter(
.filter(|(_, pdu)| {
filter_event_type
.as_ref()
.is_none_or(|kind| *kind == pdu.kind)
.is_none_or(|kind| kind == pdu.kind())
})
.filter(|(_, pdu)| {
filter_rel_type
.as_ref()
.is_none_or(|rel_type| pdu.relation_type_equal(rel_type))
.is_none_or(|rel_type| rel_type.relation_type_equal(pdu))
})
.stream()
.ready_take_while(|(count, _)| Some(*count) != to)
@ -172,17 +172,17 @@ async fn paginate_relations_with_filter(
})
}
async fn visibility_filter(
async fn visibility_filter<Pdu: Event + Send + Sync>(
services: &Services,
sender_user: &UserId,
item: PdusIterItem,
) -> Option<PdusIterItem> {
item: (PduCount, Pdu),
) -> Option<(PduCount, Pdu)> {
let (_, pdu) = &item;
services
.rooms
.state_accessor
.user_can_see_event(sender_user, &pdu.room_id, &pdu.event_id)
.user_can_see_event(sender_user, pdu.room_id(), pdu.event_id())
.await
.then_some(item)
}

View file

@ -260,5 +260,6 @@ async fn delay_response() {
"Got successful /report request, waiting {time_to_wait} seconds before sending \
successful response."
);
sleep(Duration::from_secs(time_to_wait)).await;
}

View file

@ -49,7 +49,9 @@ pub(crate) async fn room_initial_sync_route(
.try_collect::<Vec<_>>();
let (membership, visibility, state, events) =
try_join4(membership, visibility, state, events).await?;
try_join4(membership, visibility, state, events)
.boxed()
.await?;
let messages = PaginationChunk {
start: events.last().map(at!(0)).as_ref().map(ToString::to_string),

View file

@ -112,13 +112,15 @@ async fn local_room_summary_response(
) -> Result<get_summary::msc3266::Response> {
trace!(?sender_user, "Sending local room summary response for {room_id:?}");
let join_rule = services.rooms.state_accessor.get_join_rules(room_id);
let world_readable = services.rooms.state_accessor.is_world_readable(room_id);
let guest_can_join = services.rooms.state_accessor.guest_can_join(room_id);
let (join_rule, world_readable, guest_can_join) =
join3(join_rule, world_readable, guest_can_join).await;
trace!("{join_rule:?}, {world_readable:?}, {guest_can_join:?}");
trace!("{join_rule:?}, {world_readable:?}, {guest_can_join:?}");
user_can_see_summary(
services,
room_id,

View file

@ -2,7 +2,7 @@ use std::cmp::max;
use axum::extract::State;
use conduwuit::{
Err, Error, Result, err, info,
Err, Error, Event, Result, err, info,
matrix::{StateKey, pdu::PduBuilder},
};
use futures::StreamExt;
@ -215,7 +215,7 @@ pub(crate) async fn upgrade_room_route(
.room_state_get(&body.room_id, event_type, "")
.await
{
| Ok(v) => v.content.clone(),
| Ok(v) => v.content().to_owned(),
| Err(_) => continue, // Skipping missing events.
};

View file

@ -6,7 +6,7 @@ use std::{
use axum::extract::State;
use conduwuit::{
Err, Error, Event, PduCount, PduEvent, Result, at, debug, error, extract_variant,
Err, Error, Event, PduCount, Result, at, debug, error, extract_variant,
matrix::TypeStateKey,
utils::{
BoolExt, IterStream, ReadyExt, TryFutureExtExt,
@ -627,7 +627,7 @@ pub(crate) async fn sync_events_v4_route(
.state_accessor
.room_state_get(room_id, &state.0, &state.1)
.await
.map(PduEvent::into_format)
.map(Event::into_format)
.ok()
})
.collect()

View file

@ -2,8 +2,10 @@ use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use base64::{Engine as _, engine::general_purpose};
use conduwuit::{
Err, Error, PduEvent, Result, err, matrix::Event, pdu::gen_event_id, utils,
utils::hash::sha256, warn,
Err, Error, PduEvent, Result, err,
matrix::{Event, event::gen_event_id},
utils::{self, hash::sha256},
warn,
};
use ruma::{
CanonicalJsonValue, OwnedUserId, UserId,

View file

@ -5,7 +5,7 @@ use std::borrow::Borrow;
use axum::extract::State;
use conduwuit::{
Err, Result, at, err,
pdu::gen_event_id_canonical_json,
matrix::event::gen_event_id_canonical_json,
utils::stream::{IterStream, TryBroadbandExt},
warn,
};

View file

@ -1,7 +1,7 @@
use axum::extract::State;
use conduwuit::{
Err, Result, err,
matrix::pdu::{PduEvent, gen_event_id_canonical_json},
matrix::{event::gen_event_id_canonical_json, pdu::PduEvent},
warn,
};
use futures::FutureExt;

View file

@ -1,7 +1,7 @@
#![allow(deprecated)]
use axum::extract::State;
use conduwuit::{Err, Result, err, matrix::pdu::gen_event_id_canonical_json};
use conduwuit::{Err, Result, err, matrix::event::gen_event_id_canonical_json};
use conduwuit_service::Services;
use futures::FutureExt;
use ruma::{

View file

@ -88,10 +88,7 @@ impl PartialProxyConfig {
}
}
match (included_because, excluded_because) {
| (Some(a), Some(b)) if a.more_specific_than(b) => Some(&self.url), /* included for
* a more specific
* reason */
// than excluded
| (Some(a), Some(b)) if a.more_specific_than(b) => Some(&self.url),
| (Some(_), None) => Some(&self.url),
| _ => None,
}

View file

@ -84,10 +84,12 @@ fn append_features(features: &mut Vec<String>, manifest: &str) -> Result<()> {
fn init_dependencies() -> Result<DepsSet> {
let manifest = Manifest::from_str(WORKSPACE_MANIFEST)?;
Ok(manifest
let deps_set = manifest
.workspace
.as_ref()
.expect("manifest has workspace section")
.dependencies
.clone())
.clone();
Ok(deps_set)
}

View file

@ -1,21 +1,27 @@
mod content;
mod filter;
mod format;
mod id;
mod redact;
mod relation;
mod type_ext;
mod unsigned;
use std::fmt::Debug;
use ruma::{
EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, RoomVersionId, UserId,
events::TimelineEventType,
CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId,
RoomVersionId, UserId, events::TimelineEventType,
};
use serde::Deserialize;
use serde_json::{Value as JsonValue, value::RawValue as RawJsonValue};
pub use self::type_ext::TypeExt;
use super::state_key::StateKey;
use crate::Result;
pub use self::{filter::Matches, id::*, relation::RelationTypeEqual, type_ext::TypeExt};
use super::{pdu::Pdu, state_key::StateKey};
use crate::{Result, utils};
/// Abstraction of a PDU so users can have their own PDU types.
pub trait Event {
pub trait Event: Clone + Debug {
/// Serialize into a Ruma JSON format, consuming.
#[inline]
fn into_format<T>(self) -> T
@ -36,6 +42,41 @@ pub trait Event {
format::Ref(self).into()
}
#[inline]
fn contains_unsigned_property<T>(&self, property: &str, is_type: T) -> bool
where
T: FnOnce(&JsonValue) -> bool,
Self: Sized,
{
unsigned::contains_unsigned_property::<T, _>(self, property, is_type)
}
#[inline]
fn get_unsigned_property<T>(&self, property: &str) -> Result<T>
where
T: for<'de> Deserialize<'de>,
Self: Sized,
{
unsigned::get_unsigned_property::<T, _>(self, property)
}
#[inline]
fn get_unsigned_as_value(&self) -> JsonValue
where
Self: Sized,
{
unsigned::get_unsigned_as_value(self)
}
#[inline]
fn get_unsigned<T>(&self) -> Result<T>
where
T: for<'de> Deserialize<'de>,
Self: Sized,
{
unsigned::get_unsigned::<T, _>(self)
}
#[inline]
fn get_content_as_value(&self) -> JsonValue
where
@ -69,6 +110,39 @@ pub trait Event {
redact::is_redacted(self)
}
#[inline]
fn into_canonical_object(self) -> CanonicalJsonObject
where
Self: Sized,
{
utils::to_canonical_object(self.into_pdu()).expect("failed to create Value::Object")
}
#[inline]
fn to_canonical_object(&self) -> CanonicalJsonObject {
utils::to_canonical_object(self.as_pdu()).expect("failed to create Value::Object")
}
#[inline]
fn into_value(self) -> JsonValue
where
Self: Sized,
{
serde_json::to_value(self.into_pdu()).expect("failed to create JSON Value")
}
#[inline]
fn to_value(&self) -> JsonValue {
serde_json::to_value(self.as_pdu()).expect("failed to create JSON Value")
}
#[inline]
fn as_mut_pdu(&mut self) -> &mut Pdu { unimplemented!("not a mutable Pdu") }
fn as_pdu(&self) -> &Pdu;
fn into_pdu(self) -> Pdu;
fn is_owned(&self) -> bool;
//
@ -76,7 +150,7 @@ pub trait Event {
//
/// All the authenticating events for this event.
fn auth_events(&self) -> impl DoubleEndedIterator<Item = &EventId> + Send + '_;
fn auth_events(&self) -> impl DoubleEndedIterator<Item = &EventId> + Clone + Send + '_;
/// The event's content.
fn content(&self) -> &RawJsonValue;
@ -88,7 +162,7 @@ pub trait Event {
fn origin_server_ts(&self) -> MilliSecondsSinceUnixEpoch;
/// The events before this event.
fn prev_events(&self) -> impl DoubleEndedIterator<Item = &EventId> + Send + '_;
fn prev_events(&self) -> impl DoubleEndedIterator<Item = &EventId> + Clone + Send + '_;
/// If this event is a redaction event this is the event it redacts.
fn redacts(&self) -> Option<&EventId>;

View file

@ -0,0 +1,93 @@
use ruma::api::client::filter::{RoomEventFilter, UrlFilter};
use serde_json::Value;
use super::Event;
use crate::is_equal_to;
pub trait Matches<E: Event> {
fn matches(&self, event: &E) -> bool;
}
impl<E: Event> Matches<E> for &RoomEventFilter {
#[inline]
fn matches(&self, event: &E) -> bool {
if !matches_sender(event, self) {
return false;
}
if !matches_room(event, self) {
return false;
}
if !matches_type(event, self) {
return false;
}
if !matches_url(event, self) {
return false;
}
true
}
}
fn matches_room<E: Event>(event: &E, filter: &RoomEventFilter) -> bool {
if filter.not_rooms.iter().any(is_equal_to!(event.room_id())) {
return false;
}
if let Some(rooms) = filter.rooms.as_ref() {
if !rooms.iter().any(is_equal_to!(event.room_id())) {
return false;
}
}
true
}
fn matches_sender<E: Event>(event: &E, filter: &RoomEventFilter) -> bool {
if filter.not_senders.iter().any(is_equal_to!(event.sender())) {
return false;
}
if let Some(senders) = filter.senders.as_ref() {
if !senders.iter().any(is_equal_to!(event.sender())) {
return false;
}
}
true
}
fn matches_type<E: Event>(event: &E, filter: &RoomEventFilter) -> bool {
let kind = event.kind().to_cow_str();
if filter.not_types.iter().any(is_equal_to!(&kind)) {
return false;
}
if let Some(types) = filter.types.as_ref() {
if !types.iter().any(is_equal_to!(&kind)) {
return false;
}
}
true
}
fn matches_url<E: Event>(event: &E, filter: &RoomEventFilter) -> bool {
let Some(url_filter) = filter.url_filter.as_ref() else {
return true;
};
//TODO: might be better to use Ruma's Raw rather than serde here
let url = event
.get_content_as_value()
.get("url")
.is_some_and(Value::is_string);
match url_filter {
| UrlFilter::EventsWithUrl => url,
| UrlFilter::EventsWithoutUrl => !url,
}
}

View file

@ -0,0 +1,28 @@
use ruma::events::relation::RelationType;
use serde::Deserialize;
use super::Event;
pub trait RelationTypeEqual<E: Event> {
fn relation_type_equal(&self, event: &E) -> bool;
}
#[derive(Clone, Debug, Deserialize)]
struct ExtractRelatesToEventId {
#[serde(rename = "m.relates_to")]
relates_to: ExtractRelType,
}
#[derive(Clone, Debug, Deserialize)]
struct ExtractRelType {
rel_type: RelationType,
}
impl<E: Event> RelationTypeEqual<E> for RelationType {
fn relation_type_equal(&self, event: &E) -> bool {
event
.get_content()
.map(|c: ExtractRelatesToEventId| c.relates_to.rel_type)
.is_ok_and(|r| r == *self)
}
}

View file

@ -0,0 +1,51 @@
use serde::Deserialize;
use serde_json::value::Value as JsonValue;
use super::Event;
use crate::{Result, err, is_true};
pub(super) fn contains_unsigned_property<F, E>(event: &E, property: &str, is_type: F) -> bool
where
F: FnOnce(&JsonValue) -> bool,
E: Event,
{
get_unsigned_as_value(event)
.get(property)
.map(is_type)
.is_some_and(is_true!())
}
pub(super) fn get_unsigned_property<T, E>(event: &E, property: &str) -> Result<T>
where
T: for<'de> Deserialize<'de>,
E: Event,
{
get_unsigned_as_value(event)
.get_mut(property)
.map(JsonValue::take)
.map(serde_json::from_value)
.ok_or(err!(Request(NotFound("property not found in unsigned object"))))?
.map_err(|e| err!(Database("Failed to deserialize unsigned.{property} into type: {e}")))
}
#[must_use]
pub(super) fn get_unsigned_as_value<E>(event: &E) -> JsonValue
where
E: Event,
{
get_unsigned::<JsonValue, E>(event).unwrap_or_default()
}
pub(super) fn get_unsigned<T, E>(event: &E) -> Result<T>
where
T: for<'de> Deserialize<'de>,
E: Event,
{
event
.unsigned()
.as_ref()
.map(|raw| raw.get())
.map(serde_json::from_str)
.ok_or(err!(Request(NotFound("\"unsigned\" property not found in pdu"))))?
.map_err(|e| err!(Database("Failed to deserialize \"unsigned\" into value: {e}")))
}

View file

@ -1,12 +1,8 @@
mod builder;
mod content;
mod count;
mod event_id;
mod filter;
mod id;
mod raw_id;
mod redact;
mod relation;
#[cfg(test)]
mod tests;
mod unsigned;
@ -24,7 +20,6 @@ pub use self::{
Count as PduCount, Id as PduId, Pdu as PduEvent, RawId as RawPduId,
builder::{Builder, Builder as PduBuilder},
count::Count,
event_id::*,
id::{ShortId, *},
raw_id::*,
};
@ -91,7 +86,7 @@ impl Pdu {
impl Event for Pdu {
#[inline]
fn auth_events(&self) -> impl DoubleEndedIterator<Item = &EventId> + Send + '_ {
fn auth_events(&self) -> impl DoubleEndedIterator<Item = &EventId> + Clone + Send + '_ {
self.auth_events.iter().map(AsRef::as_ref)
}
@ -107,7 +102,7 @@ impl Event for Pdu {
}
#[inline]
fn prev_events(&self) -> impl DoubleEndedIterator<Item = &EventId> + Send + '_ {
fn prev_events(&self) -> impl DoubleEndedIterator<Item = &EventId> + Clone + Send + '_ {
self.prev_events.iter().map(AsRef::as_ref)
}
@ -129,13 +124,22 @@ impl Event for Pdu {
#[inline]
fn unsigned(&self) -> Option<&RawJsonValue> { self.unsigned.as_deref() }
#[inline]
fn as_mut_pdu(&mut self) -> &mut Pdu { self }
#[inline]
fn as_pdu(&self) -> &Pdu { self }
#[inline]
fn into_pdu(self) -> Pdu { self }
#[inline]
fn is_owned(&self) -> bool { true }
}
impl Event for &Pdu {
#[inline]
fn auth_events(&self) -> impl DoubleEndedIterator<Item = &EventId> + Send + '_ {
fn auth_events(&self) -> impl DoubleEndedIterator<Item = &EventId> + Clone + Send + '_ {
self.auth_events.iter().map(AsRef::as_ref)
}
@ -151,7 +155,7 @@ impl Event for &Pdu {
}
#[inline]
fn prev_events(&self) -> impl DoubleEndedIterator<Item = &EventId> + Send + '_ {
fn prev_events(&self) -> impl DoubleEndedIterator<Item = &EventId> + Clone + Send + '_ {
self.prev_events.iter().map(AsRef::as_ref)
}
@ -173,6 +177,12 @@ impl Event for &Pdu {
#[inline]
fn unsigned(&self) -> Option<&RawJsonValue> { self.unsigned.as_deref() }
#[inline]
fn as_pdu(&self) -> &Pdu { self }
#[inline]
fn into_pdu(self) -> Pdu { self.clone() }
#[inline]
fn is_owned(&self) -> bool { false }
}

View file

@ -1,20 +0,0 @@
use serde::Deserialize;
use serde_json::value::Value as JsonValue;
use crate::{Result, err, implement};
#[must_use]
#[implement(super::Pdu)]
pub fn get_content_as_value(&self) -> JsonValue {
self.get_content()
.expect("pdu content must be a valid JSON value")
}
#[implement(super::Pdu)]
pub fn get_content<T>(&self) -> Result<T>
where
T: for<'de> Deserialize<'de>,
{
serde_json::from_str(self.content.get())
.map_err(|e| err!(Database("Failed to deserialize pdu content into type: {e}")))
}

View file

@ -1,90 +0,0 @@
use ruma::api::client::filter::{RoomEventFilter, UrlFilter};
use serde_json::Value;
use crate::{implement, is_equal_to};
#[implement(super::Pdu)]
#[must_use]
pub fn matches(&self, filter: &RoomEventFilter) -> bool {
if !self.matches_sender(filter) {
return false;
}
if !self.matches_room(filter) {
return false;
}
if !self.matches_type(filter) {
return false;
}
if !self.matches_url(filter) {
return false;
}
true
}
#[implement(super::Pdu)]
fn matches_room(&self, filter: &RoomEventFilter) -> bool {
if filter.not_rooms.contains(&self.room_id) {
return false;
}
if let Some(rooms) = filter.rooms.as_ref() {
if !rooms.contains(&self.room_id) {
return false;
}
}
true
}
#[implement(super::Pdu)]
fn matches_sender(&self, filter: &RoomEventFilter) -> bool {
if filter.not_senders.contains(&self.sender) {
return false;
}
if let Some(senders) = filter.senders.as_ref() {
if !senders.contains(&self.sender) {
return false;
}
}
true
}
#[implement(super::Pdu)]
fn matches_type(&self, filter: &RoomEventFilter) -> bool {
let event_type = &self.kind.to_cow_str();
if filter.not_types.iter().any(is_equal_to!(event_type)) {
return false;
}
if let Some(types) = filter.types.as_ref() {
if !types.iter().any(is_equal_to!(event_type)) {
return false;
}
}
true
}
#[implement(super::Pdu)]
fn matches_url(&self, filter: &RoomEventFilter) -> bool {
let Some(url_filter) = filter.url_filter.as_ref() else {
return true;
};
//TODO: might be better to use Ruma's Raw rather than serde here
let url = serde_json::from_str::<Value>(self.content.get())
.expect("parsing content JSON failed")
.get("url")
.is_some_and(Value::is_string);
match url_filter {
| UrlFilter::EventsWithUrl => url,
| UrlFilter::EventsWithoutUrl => !url,
}
}

View file

@ -1,10 +1,10 @@
use ruma::{RoomVersionId, canonical_json::redact_content_in_place};
use serde_json::{json, value::to_raw_value};
use serde_json::{Value as JsonValue, json, value::to_raw_value};
use crate::{Error, Result, err, implement};
#[implement(super::Pdu)]
pub fn redact(&mut self, room_version_id: &RoomVersionId, reason: &Self) -> Result {
pub fn redact(&mut self, room_version_id: &RoomVersionId, reason: JsonValue) -> Result {
self.unsigned = None;
let mut content = serde_json::from_str(self.content.get())

View file

@ -1,22 +0,0 @@
use ruma::events::relation::RelationType;
use serde::Deserialize;
use crate::implement;
#[derive(Clone, Debug, Deserialize)]
struct ExtractRelType {
rel_type: RelationType,
}
#[derive(Clone, Debug, Deserialize)]
struct ExtractRelatesToEventId {
#[serde(rename = "m.relates_to")]
relates_to: ExtractRelType,
}
#[implement(super::Pdu)]
#[must_use]
pub fn relation_type_equal(&self, rel_type: &RelationType) -> bool {
self.get_content()
.map(|c: ExtractRelatesToEventId| c.relates_to.rel_type)
.is_ok_and(|r| r == *rel_type)
}

View file

@ -1,11 +1,10 @@
use std::collections::BTreeMap;
use ruma::MilliSecondsSinceUnixEpoch;
use serde::Deserialize;
use serde_json::value::{RawValue as RawJsonValue, Value as JsonValue, to_raw_value};
use super::Pdu;
use crate::{Result, err, implement, is_true};
use crate::{Result, err, implement};
#[implement(Pdu)]
pub fn remove_transaction_id(&mut self) -> Result {
@ -74,43 +73,3 @@ pub fn add_relation(&mut self, name: &str, pdu: Option<&Pdu>) -> Result {
Ok(())
}
#[implement(Pdu)]
pub fn contains_unsigned_property<F>(&self, property: &str, is_type: F) -> bool
where
F: FnOnce(&JsonValue) -> bool,
{
self.get_unsigned_as_value()
.get(property)
.map(is_type)
.is_some_and(is_true!())
}
#[implement(Pdu)]
pub fn get_unsigned_property<T>(&self, property: &str) -> Result<T>
where
T: for<'de> Deserialize<'de>,
{
self.get_unsigned_as_value()
.get_mut(property)
.map(JsonValue::take)
.map(serde_json::from_value)
.ok_or(err!(Request(NotFound("property not found in unsigned object"))))?
.map_err(|e| err!(Database("Failed to deserialize unsigned.{property} into type: {e}")))
}
#[implement(Pdu)]
#[must_use]
pub fn get_unsigned_as_value(&self) -> JsonValue {
self.get_unsigned::<JsonValue>().unwrap_or_default()
}
#[implement(Pdu)]
pub fn get_unsigned<T>(&self) -> Result<JsonValue> {
self.unsigned
.as_ref()
.map(|raw| raw.get())
.map(serde_json::from_str)
.ok_or(err!(Request(NotFound("\"unsigned\" property not found in pdu"))))?
.map_err(|e| err!(Database("Failed to deserialize \"unsigned\" into value: {e}")))
}

View file

@ -74,7 +74,7 @@ type Result<T, E = Error> = crate::Result<T, E>;
/// event is part of the same room.
//#[tracing::instrument(level = "debug", skip(state_sets, auth_chain_sets,
//#[tracing::instrument(level event_fetch))]
pub async fn resolve<'a, E, Sets, SetIter, Hasher, Fetch, FetchFut, Exists, ExistsFut>(
pub async fn resolve<'a, Pdu, Sets, SetIter, Hasher, Fetch, FetchFut, Exists, ExistsFut>(
room_version: &RoomVersionId,
state_sets: Sets,
auth_chain_sets: &'a [HashSet<OwnedEventId, Hasher>],
@ -83,14 +83,14 @@ pub async fn resolve<'a, E, Sets, SetIter, Hasher, Fetch, FetchFut, Exists, Exis
) -> Result<StateMap<OwnedEventId>>
where
Fetch: Fn(OwnedEventId) -> FetchFut + Sync,
FetchFut: Future<Output = Option<E>> + Send,
FetchFut: Future<Output = Option<Pdu>> + Send,
Exists: Fn(OwnedEventId) -> ExistsFut + Sync,
ExistsFut: Future<Output = bool> + Send,
Sets: IntoIterator<IntoIter = SetIter> + Send,
SetIter: Iterator<Item = &'a StateMap<OwnedEventId>> + Clone + Send,
Hasher: BuildHasher + Send + Sync,
E: Event + Clone + Send + Sync,
for<'b> &'b E: Event + Send,
Pdu: Event + Clone + Send + Sync,
for<'b> &'b Pdu: Event + Send,
{
debug!("State resolution starting");
@ -221,6 +221,7 @@ where
let state_sets_iter =
state_sets_iter.inspect(|_| state_set_count = state_set_count.saturating_add(1));
for (k, v) in state_sets_iter.flatten() {
occurrences
.entry(k)
@ -305,6 +306,7 @@ where
let pl = get_power_level_for_sender(&event_id, fetch_event)
.await
.ok()?;
Some((event_id, pl))
})
.inspect(|(event_id, pl)| {

View file

@ -44,6 +44,7 @@ impl Module {
.handle
.as_ref()
.expect("backing library loaded by this instance");
// SAFETY: Calls dlsym(3) on unix platforms. This might not have to be unsafe
// if wrapped in libloading with_dlerror().
let sym = unsafe { handle.get::<Prototype>(cname.as_bytes()) };

View file

@ -27,6 +27,7 @@ pub fn to_name(path: &OsStr) -> Result<String> {
.expect("path file stem")
.to_str()
.expect("name string");
let name = name.strip_prefix("lib").unwrap_or(name).to_owned();
Ok(name)

View file

@ -23,8 +23,10 @@ impl fmt::Display for Escape<'_> {
| '"' => "&quot;",
| _ => continue,
};
fmt.write_str(&pile_o_bits[last..i])?;
fmt.write_str(s)?;
// NOTE: we only expect single byte characters here - which is fine as long as
// we only match single byte characters
last = i.saturating_add(1);

View file

@ -1,4 +1,4 @@
use std::{fmt, str::FromStr};
use std::{fmt, marker::PhantomData, str::FromStr};
use ruma::{CanonicalJsonError, CanonicalJsonObject, canonical_json::try_from_json_map};
@ -11,25 +11,28 @@ use crate::Result;
pub fn to_canonical_object<T: serde::Serialize>(
value: T,
) -> Result<CanonicalJsonObject, CanonicalJsonError> {
use CanonicalJsonError::SerDe;
use serde::ser::Error;
match serde_json::to_value(value).map_err(CanonicalJsonError::SerDe)? {
match serde_json::to_value(value).map_err(SerDe)? {
| serde_json::Value::Object(map) => try_from_json_map(map),
| _ =>
Err(CanonicalJsonError::SerDe(serde_json::Error::custom("Value must be an object"))),
| _ => Err(SerDe(serde_json::Error::custom("Value must be an object"))),
}
}
pub fn deserialize_from_str<
'de,
pub fn deserialize_from_str<'de, D, T, E>(deserializer: D) -> Result<T, D::Error>
where
D: serde::de::Deserializer<'de>,
T: FromStr<Err = E>,
E: fmt::Display,
>(
deserializer: D,
) -> Result<T, D::Error> {
struct Visitor<T: FromStr<Err = E>, E>(std::marker::PhantomData<T>);
impl<T: FromStr<Err = Err>, Err: fmt::Display> serde::de::Visitor<'_> for Visitor<T, Err> {
{
struct Visitor<T: FromStr<Err = E>, E>(PhantomData<T>);
impl<T, Err> serde::de::Visitor<'_> for Visitor<T, Err>
where
T: FromStr<Err = Err>,
Err: fmt::Display,
{
type Value = T;
fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
@ -43,5 +46,6 @@ pub fn deserialize_from_str<
v.parse().map_err(serde::de::Error::custom)
}
}
deserializer.deserialize_str(Visitor(std::marker::PhantomData))
deserializer.deserialize_str(Visitor(PhantomData))
}

View file

@ -105,14 +105,11 @@ pub fn whole_unit(d: Duration) -> Unit {
| 86_400.. => Days(d.as_secs() / 86_400),
| 3_600..=86_399 => Hours(d.as_secs() / 3_600),
| 60..=3_599 => Mins(d.as_secs() / 60),
| _ => match d.as_micros() {
| 1_000_000.. => Secs(d.as_secs()),
| 1_000..=999_999 => Millis(d.subsec_millis().into()),
| _ => match d.as_nanos() {
| 1_000.. => Micros(d.subsec_micros().into()),
| _ => Nanos(d.subsec_nanos().into()),
},
},

View file

@ -37,7 +37,6 @@ impl Watchers {
pub(crate) fn wake(&self, key: &[u8]) {
let watchers = self.watchers.read().unwrap();
let mut triggered = Vec::new();
for length in 0..=key.len() {
if watchers.contains_key(&key[..length]) {
triggered.push(&key[..length]);

View file

@ -22,10 +22,12 @@ pub(crate) fn init(
let reload_handles = LogLevelReloadHandles::default();
let console_span_events = fmt_span::from_str(&config.log_span_events).unwrap_or_err();
let console_filter = EnvFilter::builder()
.with_regex(config.log_filter_regex)
.parse(&config.log)
.map_err(|e| err!(Config("log", "{e}.")))?;
let console_layer = fmt::Layer::new()
.with_span_events(console_span_events)
.event_format(ConsoleFormat::new(config))
@ -34,6 +36,7 @@ pub(crate) fn init(
let (console_reload_filter, console_reload_handle) =
reload::Layer::new(console_filter.clone());
reload_handles.add("console", Box::new(console_reload_handle));
let cap_state = Arc::new(capture::State::new());
@ -47,8 +50,10 @@ pub(crate) fn init(
let subscriber = {
let sentry_filter = EnvFilter::try_new(&config.sentry_filter)
.map_err(|e| err!(Config("sentry_filter", "{e}.")))?;
let sentry_layer = sentry_tracing::layer();
let (sentry_reload_filter, sentry_reload_handle) = reload::Layer::new(sentry_filter);
reload_handles.add("sentry", Box::new(sentry_reload_handle));
subscriber.with(sentry_layer.with_filter(sentry_reload_filter))
};
@ -58,12 +63,15 @@ pub(crate) fn init(
let (flame_layer, flame_guard) = if config.tracing_flame {
let flame_filter = EnvFilter::try_new(&config.tracing_flame_filter)
.map_err(|e| err!(Config("tracing_flame_filter", "{e}.")))?;
let (flame_layer, flame_guard) =
tracing_flame::FlameLayer::with_file(&config.tracing_flame_output_path)
.map_err(|e| err!(Config("tracing_flame_output_path", "{e}.")))?;
let flame_layer = flame_layer
.with_empty_samples(false)
.with_filter(flame_filter);
(Some(flame_layer), Some(flame_guard))
} else {
(None, None)
@ -71,19 +79,24 @@ pub(crate) fn init(
let jaeger_filter = EnvFilter::try_new(&config.jaeger_filter)
.map_err(|e| err!(Config("jaeger_filter", "{e}.")))?;
let jaeger_layer = config.allow_jaeger.then(|| {
opentelemetry::global::set_text_map_propagator(
opentelemetry_jaeger::Propagator::new(),
);
let tracer = opentelemetry_jaeger::new_agent_pipeline()
.with_auto_split_batch(true)
.with_service_name(conduwuit_core::name())
.install_batch(opentelemetry_sdk::runtime::Tokio)
.expect("jaeger agent pipeline");
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
let (jaeger_reload_filter, jaeger_reload_handle) =
reload::Layer::new(jaeger_filter.clone());
reload_handles.add("jaeger", Box::new(jaeger_reload_handle));
Some(telemetry.with_filter(jaeger_reload_filter))
});

View file

@ -51,7 +51,9 @@ pub(crate) async fn run(server: &Arc<Server>, starts: bool) -> Result<(bool, boo
},
};
}
server.server.stopping.store(false, Ordering::Release);
let run = main_mod.get::<RunFuncProto>("run")?;
if let Err(error) = run(server
.services
@ -64,7 +66,9 @@ pub(crate) async fn run(server: &Arc<Server>, starts: bool) -> Result<(bool, boo
error!("Running server: {error}");
return Err(error);
}
let reloads = server.server.reloading.swap(false, Ordering::AcqRel);
let stops = !reloads || stale(server).await? <= restart_thresh();
let starts = reloads && stops;
if stops {

View file

@ -35,11 +35,13 @@ fn options(config: &Config) -> ClientOptions {
.expect("init_sentry should only be called if sentry is enabled and this is not None")
.as_str();
let server_name = config
.sentry_send_server_name
.then(|| config.server_name.to_string().into());
ClientOptions {
dsn: Some(Dsn::from_str(dsn).expect("sentry_endpoint must be a valid URL")),
server_name: config
.sentry_send_server_name
.then(|| config.server_name.to_string().into()),
server_name,
traces_sample_rate: config.sentry_traces_sample_rate,
debug: cfg!(debug_assertions),
release: sentry::release_name!(),

View file

@ -98,8 +98,8 @@ async fn execute(
fn handle_result(method: &Method, uri: &Uri, result: Response) -> Result<Response, StatusCode> {
let status = result.status();
let reason = status.canonical_reason().unwrap_or("Unknown Reason");
let code = status.as_u16();
let reason = status.canonical_reason().unwrap_or("Unknown Reason");
if status.is_server_error() {
error!(method = ?method, uri = ?uri, "{code} {reason}");

View file

@ -305,13 +305,13 @@ impl Service {
return Ok(());
};
let response_sender = if self.is_admin_room(&pdu.room_id).await {
let response_sender = if self.is_admin_room(pdu.room_id()).await {
&self.services.globals.server_user
} else {
&pdu.sender
pdu.sender()
};
self.respond_to_room(content, &pdu.room_id, response_sender)
self.respond_to_room(content, pdu.room_id(), response_sender)
.boxed()
.await
}

View file

@ -293,11 +293,7 @@ impl Service {
.state_accessor
.room_state_get(event.room_id(), &StateEventType::RoomPowerLevels, "")
.await
.and_then(|ev| {
serde_json::from_str(ev.content.get()).map_err(|e| {
err!(Database(error!("invalid m.room.power_levels event: {e:?}")))
})
})
.and_then(|event| event.get_content())
.unwrap_or_default();
let serialized = event.to_format();

View file

@ -3,7 +3,7 @@ mod remote;
use std::sync::Arc;
use conduwuit::{
Err, Result, Server, err,
Err, Event, Result, Server, err,
utils::{ReadyExt, stream::TryIgnore},
};
use database::{Deserialized, Ignore, Interfix, Map};
@ -241,7 +241,7 @@ impl Service {
.room_state_get(&room_id, &StateEventType::RoomCreate, "")
.await
{
return Ok(event.sender == user_id);
return Ok(event.sender() == user_id);
}
Err!(Database("Room has no m.room.create event"))

View file

@ -4,11 +4,13 @@ use std::{
};
use conduwuit::{
PduEvent, debug, debug_error, debug_warn, implement, pdu, trace,
utils::continue_exponential_backoff_secs, warn,
Event, PduEvent, debug, debug_error, debug_warn, implement,
matrix::event::gen_event_id_canonical_json, trace, utils::continue_exponential_backoff_secs,
warn,
};
use ruma::{
CanonicalJsonValue, OwnedEventId, RoomId, ServerName, api::federation::event::get_event,
CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName,
api::federation::event::get_event,
};
use super::get_room_version_id;
@ -23,13 +25,17 @@ use super::get_room_version_id;
/// c. Ask origin server over federation
/// d. TODO: Ask other servers over federation?
#[implement(super::Service)]
pub(super) async fn fetch_and_handle_outliers<'a>(
pub(super) async fn fetch_and_handle_outliers<'a, Pdu, Events>(
&self,
origin: &'a ServerName,
events: &'a [OwnedEventId],
create_event: &'a PduEvent,
events: Events,
create_event: &'a Pdu,
room_id: &'a RoomId,
) -> Vec<(PduEvent, Option<BTreeMap<String, CanonicalJsonValue>>)> {
) -> Vec<(PduEvent, Option<BTreeMap<String, CanonicalJsonValue>>)>
where
Pdu: Event + Send + Sync,
Events: Iterator<Item = &'a EventId> + Clone + Send,
{
let back_off = |id| match self
.services
.globals
@ -46,22 +52,23 @@ pub(super) async fn fetch_and_handle_outliers<'a>(
},
};
let mut events_with_auth_events = Vec::with_capacity(events.len());
let mut events_with_auth_events = Vec::with_capacity(events.clone().count());
for id in events {
// a. Look in the main timeline (pduid_pdu tree)
// b. Look at outlier pdu tree
// (get_pdu_json checks both)
if let Ok(local_pdu) = self.services.timeline.get_pdu(id).await {
trace!("Found {id} in db");
events_with_auth_events.push((id, Some(local_pdu), vec![]));
events_with_auth_events.push((id.to_owned(), Some(local_pdu), vec![]));
continue;
}
// c. Ask origin server over federation
// We also handle its auth chain here so we don't get a stack overflow in
// handle_outlier_pdu.
let mut todo_auth_events: VecDeque<_> = [id.clone()].into();
let mut todo_auth_events: VecDeque<_> = [id.to_owned()].into();
let mut events_in_reverse_order = Vec::with_capacity(todo_auth_events.len());
let mut events_all = HashSet::with_capacity(todo_auth_events.len());
while let Some(next_id) = todo_auth_events.pop_front() {
if let Some((time, tries)) = self
@ -117,7 +124,7 @@ pub(super) async fn fetch_and_handle_outliers<'a>(
};
let Ok((calculated_event_id, value)) =
pdu::gen_event_id_canonical_json(&res.pdu, &room_version_id)
gen_event_id_canonical_json(&res.pdu, &room_version_id)
else {
back_off((*next_id).to_owned());
continue;
@ -160,7 +167,8 @@ pub(super) async fn fetch_and_handle_outliers<'a>(
},
}
}
events_with_auth_events.push((id, None, events_in_reverse_order));
events_with_auth_events.push((id.to_owned(), None, events_in_reverse_order));
}
let mut pdus = Vec::with_capacity(events_with_auth_events.len());
@ -217,5 +225,6 @@ pub(super) async fn fetch_and_handle_outliers<'a>(
}
}
}
pdus
}

View file

@ -1,13 +1,16 @@
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
use std::{
collections::{BTreeMap, HashMap, HashSet, VecDeque},
iter::once,
};
use conduwuit::{
PduEvent, Result, debug_warn, err, implement,
Event, PduEvent, Result, debug_warn, err, implement,
state_res::{self},
};
use futures::{FutureExt, future};
use ruma::{
CanonicalJsonValue, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, ServerName, UInt, int,
uint,
CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, ServerName,
int, uint,
};
use super::check_room_id;
@ -19,20 +22,26 @@ use super::check_room_id;
fields(%origin),
)]
#[allow(clippy::type_complexity)]
pub(super) async fn fetch_prev(
pub(super) async fn fetch_prev<'a, Pdu, Events>(
&self,
origin: &ServerName,
create_event: &PduEvent,
create_event: &Pdu,
room_id: &RoomId,
first_ts_in_room: UInt,
initial_set: Vec<OwnedEventId>,
first_ts_in_room: MilliSecondsSinceUnixEpoch,
initial_set: Events,
) -> Result<(
Vec<OwnedEventId>,
HashMap<OwnedEventId, (PduEvent, BTreeMap<String, CanonicalJsonValue>)>,
)> {
let mut graph: HashMap<OwnedEventId, _> = HashMap::with_capacity(initial_set.len());
)>
where
Pdu: Event + Send + Sync,
Events: Iterator<Item = &'a EventId> + Clone + Send,
{
let num_ids = initial_set.clone().count();
let mut eventid_info = HashMap::new();
let mut todo_outlier_stack: VecDeque<OwnedEventId> = initial_set.into();
let mut graph: HashMap<OwnedEventId, _> = HashMap::with_capacity(num_ids);
let mut todo_outlier_stack: VecDeque<OwnedEventId> =
initial_set.map(ToOwned::to_owned).collect();
let mut amount = 0;
@ -40,7 +49,12 @@ pub(super) async fn fetch_prev(
self.services.server.check_running()?;
match self
.fetch_and_handle_outliers(origin, &[prev_event_id.clone()], create_event, room_id)
.fetch_and_handle_outliers(
origin,
once(prev_event_id.as_ref()),
create_event,
room_id,
)
.boxed()
.await
.pop()
@ -65,17 +79,17 @@ pub(super) async fn fetch_prev(
}
if let Some(json) = json_opt {
if pdu.origin_server_ts > first_ts_in_room {
if pdu.origin_server_ts() > first_ts_in_room {
amount = amount.saturating_add(1);
for prev_prev in &pdu.prev_events {
for prev_prev in pdu.prev_events() {
if !graph.contains_key(prev_prev) {
todo_outlier_stack.push_back(prev_prev.clone());
todo_outlier_stack.push_back(prev_prev.to_owned());
}
}
graph.insert(
prev_event_id.clone(),
pdu.prev_events.iter().cloned().collect(),
pdu.prev_events().map(ToOwned::to_owned).collect(),
);
} else {
// Time based check failed
@ -98,8 +112,7 @@ pub(super) async fn fetch_prev(
let event_fetch = |event_id| {
let origin_server_ts = eventid_info
.get(&event_id)
.cloned()
.map_or_else(|| uint!(0), |info| info.0.origin_server_ts);
.map_or_else(|| uint!(0), |info| info.0.origin_server_ts().get());
// This return value is the key used for sorting events,
// events are then sorted by power level, time,

View file

@ -1,6 +1,6 @@
use std::collections::{HashMap, hash_map};
use conduwuit::{Err, Error, PduEvent, Result, debug, debug_warn, implement};
use conduwuit::{Err, Event, Result, debug, debug_warn, err, implement};
use futures::FutureExt;
use ruma::{
EventId, OwnedEventId, RoomId, ServerName, api::federation::event::get_room_state_ids,
@ -18,13 +18,16 @@ use crate::rooms::short::ShortStateKey;
skip_all,
fields(%origin),
)]
pub(super) async fn fetch_state(
pub(super) async fn fetch_state<Pdu>(
&self,
origin: &ServerName,
create_event: &PduEvent,
create_event: &Pdu,
room_id: &RoomId,
event_id: &EventId,
) -> Result<Option<HashMap<u64, OwnedEventId>>> {
) -> Result<Option<HashMap<u64, OwnedEventId>>>
where
Pdu: Event + Send + Sync,
{
let res = self
.services
.sending
@ -36,27 +39,27 @@ pub(super) async fn fetch_state(
.inspect_err(|e| debug_warn!("Fetching state for event failed: {e}"))?;
debug!("Fetching state events");
let state_ids = res.pdu_ids.iter().map(AsRef::as_ref);
let state_vec = self
.fetch_and_handle_outliers(origin, &res.pdu_ids, create_event, room_id)
.fetch_and_handle_outliers(origin, state_ids, create_event, room_id)
.boxed()
.await;
let mut state: HashMap<ShortStateKey, OwnedEventId> = HashMap::with_capacity(state_vec.len());
for (pdu, _) in state_vec {
let state_key = pdu
.state_key
.clone()
.ok_or_else(|| Error::bad_database("Found non-state pdu in state events."))?;
.state_key()
.ok_or_else(|| err!(Database("Found non-state pdu in state events.")))?;
let shortstatekey = self
.services
.short
.get_or_create_shortstatekey(&pdu.kind.to_string().into(), &state_key)
.get_or_create_shortstatekey(&pdu.kind().to_string().into(), state_key)
.await;
match state.entry(shortstatekey) {
| hash_map::Entry::Vacant(v) => {
v.insert(pdu.event_id.clone());
v.insert(pdu.event_id().to_owned());
},
| hash_map::Entry::Occupied(_) => {
return Err!(Database(
@ -73,7 +76,7 @@ pub(super) async fn fetch_state(
.get_shortstatekey(&StateEventType::RoomCreate, "")
.await?;
if state.get(&create_shortstatekey) != Some(&create_event.event_id) {
if state.get(&create_shortstatekey).map(AsRef::as_ref) != Some(create_event.event_id()) {
return Err!(Database("Incoming event refers to wrong create event."));
}

View file

@ -4,7 +4,7 @@ use std::{
};
use conduwuit::{
Err, Result, debug, debug::INFO_SPAN_LEVEL, defer, err, implement, utils::stream::IterStream,
Err, Event, Result, debug::INFO_SPAN_LEVEL, defer, err, implement, utils::stream::IterStream,
warn,
};
use futures::{
@ -12,6 +12,7 @@ use futures::{
future::{OptionFuture, try_join5},
};
use ruma::{CanonicalJsonValue, EventId, RoomId, ServerName, UserId, events::StateEventType};
use tracing::debug;
use crate::rooms::timeline::RawPduId;
@ -121,22 +122,16 @@ pub async fn handle_incoming_pdu<'a>(
.timeline
.first_pdu_in_room(room_id)
.await?
.origin_server_ts;
.origin_server_ts();
if incoming_pdu.origin_server_ts < first_ts_in_room {
if incoming_pdu.origin_server_ts() < first_ts_in_room {
return Ok(None);
}
// 9. Fetch any missing prev events doing all checks listed here starting at 1.
// These are timeline events
let (sorted_prev_events, mut eventid_info) = self
.fetch_prev(
origin,
create_event,
room_id,
first_ts_in_room,
incoming_pdu.prev_events.clone(),
)
.fetch_prev(origin, create_event, room_id, first_ts_in_room, incoming_pdu.prev_events())
.await?;
debug!(

View file

@ -1,7 +1,7 @@
use std::collections::{BTreeMap, HashMap, hash_map};
use conduwuit::{
Err, PduEvent, Result, debug, debug_info, err, implement, state_res, trace, warn,
Err, Event, PduEvent, Result, debug, debug_info, err, implement, state_res, trace, warn,
};
use futures::future::ready;
use ruma::{
@ -12,15 +12,18 @@ use super::{check_room_id, get_room_version_id, to_room_version};
#[implement(super::Service)]
#[allow(clippy::too_many_arguments)]
pub(super) async fn handle_outlier_pdu<'a>(
pub(super) async fn handle_outlier_pdu<'a, Pdu>(
&self,
origin: &'a ServerName,
create_event: &'a PduEvent,
create_event: &'a Pdu,
event_id: &'a EventId,
room_id: &'a RoomId,
mut value: CanonicalJsonObject,
auth_events_known: bool,
) -> Result<(PduEvent, BTreeMap<String, CanonicalJsonValue>)> {
) -> Result<(PduEvent, BTreeMap<String, CanonicalJsonValue>)>
where
Pdu: Event + Send + Sync,
{
// 1. Remove unsigned field
value.remove("unsigned");
@ -29,7 +32,7 @@ pub(super) async fn handle_outlier_pdu<'a>(
// 2. Check signatures, otherwise drop
// 3. check content hash, redact if doesn't match
let room_version_id = get_room_version_id(create_event)?;
let mut val = match self
let mut incoming_pdu = match self
.services
.server_keys
.verify_event(&value, Some(&room_version_id))
@ -61,13 +64,15 @@ pub(super) async fn handle_outlier_pdu<'a>(
// Now that we have checked the signature and hashes we can add the eventID and
// convert to our PduEvent type
val.insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.as_str().to_owned()));
let incoming_pdu = serde_json::from_value::<PduEvent>(
serde_json::to_value(&val).expect("CanonicalJsonObj is a valid JsonValue"),
incoming_pdu
.insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.as_str().to_owned()));
let pdu_event = serde_json::from_value::<PduEvent>(
serde_json::to_value(&incoming_pdu).expect("CanonicalJsonObj is a valid JsonValue"),
)
.map_err(|e| err!(Request(BadJson(debug_warn!("Event is not a valid PDU: {e}")))))?;
check_room_id(room_id, &incoming_pdu)?;
check_room_id(room_id, &pdu_event)?;
if !auth_events_known {
// 4. fetch any missing auth events doing all checks listed here starting at 1.
@ -78,7 +83,7 @@ pub(super) async fn handle_outlier_pdu<'a>(
debug!("Fetching auth events");
Box::pin(self.fetch_and_handle_outliers(
origin,
&incoming_pdu.auth_events,
pdu_event.auth_events(),
create_event,
room_id,
))
@ -89,8 +94,8 @@ pub(super) async fn handle_outlier_pdu<'a>(
// auth events
debug!("Checking based on auth events");
// Build map of auth events
let mut auth_events = HashMap::with_capacity(incoming_pdu.auth_events.len());
for id in &incoming_pdu.auth_events {
let mut auth_events = HashMap::with_capacity(pdu_event.auth_events().count());
for id in pdu_event.auth_events() {
let Ok(auth_event) = self.services.timeline.get_pdu(id).await else {
warn!("Could not find auth event {id}");
continue;
@ -131,7 +136,7 @@ pub(super) async fn handle_outlier_pdu<'a>(
let auth_check = state_res::event_auth::auth_check(
&to_room_version(&room_version_id),
&incoming_pdu,
&pdu_event,
None, // TODO: third party invite
state_fetch,
)
@ -147,9 +152,9 @@ pub(super) async fn handle_outlier_pdu<'a>(
// 7. Persist the event as an outlier.
self.services
.outlier
.add_pdu_outlier(&incoming_pdu.event_id, &val);
.add_pdu_outlier(pdu_event.event_id(), &incoming_pdu);
trace!("Added pdu as outlier.");
Ok((incoming_pdu, val))
Ok((pdu_event, incoming_pdu))
}

View file

@ -1,10 +1,11 @@
use std::{collections::BTreeMap, time::Instant};
use conduwuit::{
Err, PduEvent, Result, debug, debug::INFO_SPAN_LEVEL, defer, implement,
Err, Event, PduEvent, Result, debug::INFO_SPAN_LEVEL, defer, implement,
utils::continue_exponential_backoff_secs,
};
use ruma::{CanonicalJsonValue, EventId, RoomId, ServerName, UInt};
use ruma::{CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, RoomId, ServerName};
use tracing::debug;
#[implement(super::Service)]
#[allow(clippy::type_complexity)]
@ -15,16 +16,19 @@ use ruma::{CanonicalJsonValue, EventId, RoomId, ServerName, UInt};
skip_all,
fields(%prev_id),
)]
pub(super) async fn handle_prev_pdu<'a>(
pub(super) async fn handle_prev_pdu<'a, Pdu>(
&self,
origin: &'a ServerName,
event_id: &'a EventId,
room_id: &'a RoomId,
eventid_info: Option<(PduEvent, BTreeMap<String, CanonicalJsonValue>)>,
create_event: &'a PduEvent,
first_ts_in_room: UInt,
create_event: &'a Pdu,
first_ts_in_room: MilliSecondsSinceUnixEpoch,
prev_id: &'a EventId,
) -> Result {
) -> Result
where
Pdu: Event + Send + Sync,
{
// Check for disabled again because it might have changed
if self.services.metadata.is_disabled(room_id).await {
return Err!(Request(Forbidden(debug_warn!(
@ -59,7 +63,7 @@ pub(super) async fn handle_prev_pdu<'a>(
};
// Skip old events
if pdu.origin_server_ts < first_ts_in_room {
if pdu.origin_server_ts() < first_ts_in_room {
return Ok(());
}

View file

@ -18,7 +18,7 @@ use std::{
};
use async_trait::async_trait;
use conduwuit::{Err, PduEvent, Result, RoomVersion, Server, utils::MutexMap};
use conduwuit::{Err, Event, PduEvent, Result, RoomVersion, Server, utils::MutexMap};
use ruma::{
OwnedEventId, OwnedRoomId, RoomId, RoomVersionId,
events::room::create::RoomCreateEventContent,
@ -104,11 +104,11 @@ impl Service {
}
}
fn check_room_id(room_id: &RoomId, pdu: &PduEvent) -> Result {
if pdu.room_id != room_id {
fn check_room_id<Pdu: Event>(room_id: &RoomId, pdu: &Pdu) -> Result {
if pdu.room_id() != room_id {
return Err!(Request(InvalidParam(error!(
pdu_event_id = ?pdu.event_id,
pdu_room_id = ?pdu.room_id,
pdu_event_id = ?pdu.event_id(),
pdu_room_id = ?pdu.room_id(),
?room_id,
"Found event from room in room",
))));
@ -117,7 +117,7 @@ fn check_room_id(room_id: &RoomId, pdu: &PduEvent) -> Result {
Ok(())
}
fn get_room_version_id(create_event: &PduEvent) -> Result<RoomVersionId> {
fn get_room_version_id<Pdu: Event>(create_event: &Pdu) -> Result<RoomVersionId> {
let content: RoomCreateEventContent = create_event.get_content()?;
let room_version = content.room_version;

View file

@ -1,4 +1,6 @@
use conduwuit::{Result, err, implement, pdu::gen_event_id_canonical_json, result::FlatOk};
use conduwuit::{
Result, err, implement, matrix::event::gen_event_id_canonical_json, result::FlatOk,
};
use ruma::{CanonicalJsonObject, CanonicalJsonValue, OwnedEventId, OwnedRoomId};
use serde_json::value::RawValue as RawJsonValue;

View file

@ -6,7 +6,7 @@ use std::{
use conduwuit::{
Result, debug, err, implement,
matrix::{PduEvent, StateMap},
matrix::{Event, StateMap},
trace,
utils::stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, TryWidebandExt},
};
@ -19,11 +19,18 @@ use crate::rooms::short::ShortStateHash;
#[implement(super::Service)]
// request and build the state from a known point and resolve if > 1 prev_event
#[tracing::instrument(name = "state", level = "debug", skip_all)]
pub(super) async fn state_at_incoming_degree_one(
pub(super) async fn state_at_incoming_degree_one<Pdu>(
&self,
incoming_pdu: &PduEvent,
) -> Result<Option<HashMap<u64, OwnedEventId>>> {
let prev_event = &incoming_pdu.prev_events[0];
incoming_pdu: &Pdu,
) -> Result<Option<HashMap<u64, OwnedEventId>>>
where
Pdu: Event + Send + Sync,
{
let prev_event = incoming_pdu
.prev_events()
.next()
.expect("at least one prev_event");
let Ok(prev_event_sstatehash) = self
.services
.state_accessor
@ -55,7 +62,7 @@ pub(super) async fn state_at_incoming_degree_one(
.get_or_create_shortstatekey(&prev_pdu.kind.to_string().into(), state_key)
.await;
state.insert(shortstatekey, prev_event.clone());
state.insert(shortstatekey, prev_event.to_owned());
// Now it's the state after the pdu
}
@ -66,16 +73,18 @@ pub(super) async fn state_at_incoming_degree_one(
#[implement(super::Service)]
#[tracing::instrument(name = "state", level = "debug", skip_all)]
pub(super) async fn state_at_incoming_resolved(
pub(super) async fn state_at_incoming_resolved<Pdu>(
&self,
incoming_pdu: &PduEvent,
incoming_pdu: &Pdu,
room_id: &RoomId,
room_version_id: &RoomVersionId,
) -> Result<Option<HashMap<u64, OwnedEventId>>> {
) -> Result<Option<HashMap<u64, OwnedEventId>>>
where
Pdu: Event + Send + Sync,
{
trace!("Calculating extremity statehashes...");
let Ok(extremity_sstatehashes) = incoming_pdu
.prev_events
.iter()
.prev_events()
.try_stream()
.broad_and_then(|prev_eventid| {
self.services
@ -133,12 +142,15 @@ pub(super) async fn state_at_incoming_resolved(
}
#[implement(super::Service)]
async fn state_at_incoming_fork(
async fn state_at_incoming_fork<Pdu>(
&self,
room_id: &RoomId,
sstatehash: ShortStateHash,
prev_event: PduEvent,
) -> Result<(StateMap<OwnedEventId>, HashSet<OwnedEventId>)> {
prev_event: Pdu,
) -> Result<(StateMap<OwnedEventId>, HashSet<OwnedEventId>)>
where
Pdu: Event,
{
let mut leaf_state: HashMap<_, _> = self
.services
.state_accessor
@ -146,15 +158,15 @@ async fn state_at_incoming_fork(
.collect()
.await;
if let Some(state_key) = &prev_event.state_key {
if let Some(state_key) = prev_event.state_key() {
let shortstatekey = self
.services
.short
.get_or_create_shortstatekey(&prev_event.kind.to_string().into(), state_key)
.get_or_create_shortstatekey(&prev_event.kind().to_string().into(), state_key)
.await;
let event_id = &prev_event.event_id;
leaf_state.insert(shortstatekey, event_id.clone());
let event_id = prev_event.event_id();
leaf_state.insert(shortstatekey, event_id.to_owned());
// Now it's the state after the pdu
}

View file

@ -1,7 +1,7 @@
use std::{borrow::Borrow, collections::BTreeMap, iter::once, sync::Arc, time::Instant};
use conduwuit::{
Err, Result, debug, debug_info, err, implement,
Err, Result, debug, debug_info, err, implement, is_equal_to,
matrix::{Event, EventTypeExt, PduEvent, StateKey, state_res},
trace,
utils::stream::{BroadbandExt, ReadyExt},
@ -17,19 +17,22 @@ use crate::rooms::{
};
#[implement(super::Service)]
pub(super) async fn upgrade_outlier_to_timeline_pdu(
pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
&self,
incoming_pdu: PduEvent,
val: BTreeMap<String, CanonicalJsonValue>,
create_event: &PduEvent,
create_event: &Pdu,
origin: &ServerName,
room_id: &RoomId,
) -> Result<Option<RawPduId>> {
) -> Result<Option<RawPduId>>
where
Pdu: Event + Send + Sync,
{
// Skip the PDU if we already have it as a timeline event
if let Ok(pduid) = self
.services
.timeline
.get_pdu_id(&incoming_pdu.event_id)
.get_pdu_id(incoming_pdu.event_id())
.await
{
return Ok(Some(pduid));
@ -38,7 +41,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
if self
.services
.pdu_metadata
.is_event_soft_failed(&incoming_pdu.event_id)
.is_event_soft_failed(incoming_pdu.event_id())
.await
{
return Err!(Request(InvalidParam("Event has been soft failed")));
@ -53,7 +56,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
// These are not timeline events.
debug!("Resolving state at event");
let mut state_at_incoming_event = if incoming_pdu.prev_events.len() == 1 {
let mut state_at_incoming_event = if incoming_pdu.prev_events().count() == 1 {
self.state_at_incoming_degree_one(&incoming_pdu).await?
} else {
self.state_at_incoming_resolved(&incoming_pdu, room_id, &room_version_id)
@ -62,12 +65,13 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
if state_at_incoming_event.is_none() {
state_at_incoming_event = self
.fetch_state(origin, create_event, room_id, &incoming_pdu.event_id)
.fetch_state(origin, create_event, room_id, incoming_pdu.event_id())
.await?;
}
let state_at_incoming_event =
state_at_incoming_event.expect("we always set this to some above");
let room_version = to_room_version(&room_version_id);
debug!("Performing auth check");
@ -99,10 +103,10 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
.state
.get_auth_events(
room_id,
&incoming_pdu.kind,
&incoming_pdu.sender,
incoming_pdu.state_key.as_deref(),
&incoming_pdu.content,
incoming_pdu.kind(),
incoming_pdu.sender(),
incoming_pdu.state_key(),
incoming_pdu.content(),
)
.await?;
@ -129,7 +133,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
!self
.services
.state_accessor
.user_can_redact(&redact_id, &incoming_pdu.sender, &incoming_pdu.room_id, true)
.user_can_redact(&redact_id, incoming_pdu.sender(), incoming_pdu.room_id(), true)
.await?,
};
@ -149,7 +153,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
.map(ToOwned::to_owned)
.ready_filter(|event_id| {
// Remove any that are referenced by this incoming event's prev_events
!incoming_pdu.prev_events.contains(event_id)
!incoming_pdu.prev_events().any(is_equal_to!(event_id))
})
.broad_filter_map(|event_id| async move {
// Only keep those extremities were not referenced yet
@ -166,7 +170,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
debug!(
"Retained {} extremities checked against {} prev_events",
extremities.len(),
incoming_pdu.prev_events.len()
incoming_pdu.prev_events().count()
);
let state_ids_compressed: Arc<CompressedState> = self
@ -181,20 +185,20 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
.map(Arc::new)
.await;
if incoming_pdu.state_key.is_some() {
if incoming_pdu.state_key().is_some() {
debug!("Event is a state-event. Deriving new room state");
// We also add state after incoming event to the fork states
let mut state_after = state_at_incoming_event.clone();
if let Some(state_key) = &incoming_pdu.state_key {
if let Some(state_key) = incoming_pdu.state_key() {
let shortstatekey = self
.services
.short
.get_or_create_shortstatekey(&incoming_pdu.kind.to_string().into(), state_key)
.get_or_create_shortstatekey(&incoming_pdu.kind().to_string().into(), state_key)
.await;
let event_id = &incoming_pdu.event_id;
state_after.insert(shortstatekey, event_id.clone());
let event_id = incoming_pdu.event_id();
state_after.insert(shortstatekey, event_id.to_owned());
}
let new_room_state = self
@ -236,9 +240,9 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
// Soft fail, we keep the event as an outlier but don't add it to the timeline
self.services
.pdu_metadata
.mark_event_soft_failed(&incoming_pdu.event_id);
.mark_event_soft_failed(incoming_pdu.event_id());
warn!("Event was soft failed: {incoming_pdu:?}");
warn!("Event was soft failed: {:?}", incoming_pdu.event_id());
return Err!(Request(InvalidParam("Event has been soft failed")));
}
@ -249,7 +253,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
let extremities = extremities
.iter()
.map(Borrow::borrow)
.chain(once(incoming_pdu.event_id.borrow()));
.chain(once(incoming_pdu.event_id()));
let pdu_id = self
.services

View file

@ -1,7 +1,7 @@
use std::sync::Arc;
use conduwuit::{Result, implement, matrix::pdu::PduEvent};
use conduwuit_database::{Deserialized, Json, Map};
use conduwuit::{Result, implement, matrix::PduEvent};
use database::{Deserialized, Json, Map};
use ruma::{CanonicalJsonObject, EventId};
pub struct Service {

View file

@ -1,8 +1,8 @@
use std::{mem::size_of, sync::Arc};
use conduwuit::{
PduCount, PduEvent,
arrayvec::ArrayVec,
matrix::{Event, PduCount},
result::LogErr,
utils::{
ReadyExt,
@ -33,8 +33,6 @@ struct Services {
timeline: Dep<rooms::timeline::Service>,
}
pub(super) type PdusIterItem = (PduCount, PduEvent);
impl Data {
pub(super) fn new(args: &crate::Args<'_>) -> Self {
let db = &args.db;
@ -62,7 +60,7 @@ impl Data {
target: ShortEventId,
from: PduCount,
dir: Direction,
) -> impl Stream<Item = PdusIterItem> + Send + '_ {
) -> impl Stream<Item = (PduCount, impl Event)> + Send + '_ {
let mut current = ArrayVec::<u8, 16>::new();
current.extend(target.to_be_bytes());
current.extend(from.saturating_inc(dir).into_unsigned().to_be_bytes());
@ -80,8 +78,8 @@ impl Data {
let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?;
if pdu.sender != user_id {
pdu.remove_transaction_id().log_err().ok();
if pdu.sender() != user_id {
pdu.as_mut_pdu().remove_transaction_id().log_err().ok();
}
Some((shorteventid, pdu))

View file

@ -1,11 +1,14 @@
mod data;
use std::sync::Arc;
use conduwuit::{PduCount, Result};
use conduwuit::{
Result,
matrix::{Event, PduCount},
};
use futures::{StreamExt, future::try_join};
use ruma::{EventId, RoomId, UserId, api::Direction};
use self::data::{Data, PdusIterItem};
use self::data::Data;
use crate::{Dep, rooms};
pub struct Service {
@ -44,16 +47,16 @@ impl Service {
}
#[allow(clippy::too_many_arguments)]
pub async fn get_relations(
&self,
user_id: &UserId,
room_id: &RoomId,
target: &EventId,
pub async fn get_relations<'a>(
&'a self,
user_id: &'a UserId,
room_id: &'a RoomId,
target: &'a EventId,
from: PduCount,
limit: usize,
max_depth: u8,
dir: Direction,
) -> Vec<PdusIterItem> {
) -> Vec<(PduCount, impl Event)> {
let room_id = self.services.short.get_shortroomid(room_id);
let target = self.services.timeline.get_pdu_count(target);

View file

@ -4,7 +4,10 @@ use std::{collections::BTreeMap, sync::Arc};
use conduwuit::{
Result, debug, err,
matrix::pdu::{PduCount, PduId, RawPduId},
matrix::{
Event,
pdu::{PduCount, PduId, RawPduId},
},
warn,
};
use futures::{Stream, TryFutureExt, try_join};
@ -74,14 +77,13 @@ impl Service {
let shortroomid = self.services.short.get_shortroomid(room_id).map_err(|e| {
err!(Database(warn!("Short room ID does not exist in database for {room_id}: {e}")))
});
let (pdu_count, shortroomid) = try_join!(pdu_count, shortroomid)?;
let (pdu_count, shortroomid) = try_join!(pdu_count, shortroomid)?;
let shorteventid = PduCount::Normal(pdu_count);
let pdu_id: RawPduId = PduId { shortroomid, shorteventid }.into();
let pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await?;
let event_id: OwnedEventId = pdu.event_id;
let event_id: OwnedEventId = pdu.event_id().to_owned();
let user_id: OwnedUserId = user_id.to_owned();
let content: BTreeMap<OwnedEventId, Receipts> = BTreeMap::from_iter([(
event_id,

View file

@ -1,9 +1,10 @@
use std::sync::Arc;
use conduwuit_core::{
Event, PduCount, PduEvent, Result,
use conduwuit::{
PduCount, Result,
arrayvec::ArrayVec,
implement,
matrix::event::{Event, Matches},
utils::{
ArrayVecExt, IterStream, ReadyExt, set,
stream::{TryIgnore, WidebandExt},
@ -103,9 +104,10 @@ pub fn deindex_pdu(&self, shortroomid: ShortRoomId, pdu_id: &RawPduId, message_b
pub async fn search_pdus<'a>(
&'a self,
query: &'a RoomQuery<'a>,
) -> Result<(usize, impl Stream<Item = PduEvent> + Send + 'a)> {
) -> Result<(usize, impl Stream<Item = impl Event + use<>> + Send + '_)> {
let pdu_ids: Vec<_> = self.search_pdu_ids(query).await?.collect().await;
let filter = &query.criteria.filter;
let count = pdu_ids.len();
let pdus = pdu_ids
.into_iter()
@ -118,11 +120,11 @@ pub async fn search_pdus<'a>(
.ok()
})
.ready_filter(|pdu| !pdu.is_redacted())
.ready_filter(|pdu| pdu.matches(&query.criteria.filter))
.ready_filter(move |pdu| filter.matches(pdu))
.wide_filter_map(move |pdu| async move {
self.services
.state_accessor
.user_can_see_event(query.user_id?, &pdu.room_id, &pdu.event_id)
.user_can_see_event(query.user_id?, pdu.room_id(), pdu.event_id())
.await
.then_some(pdu)
})

View file

@ -356,8 +356,8 @@ impl Service {
&self,
room_id: &RoomId,
shortstatehash: u64,
_mutex_lock: &RoomMutexGuard, /* Take mutex guard to make sure users get the room
* state mutex */
// Take mutex guard to make sure users get the room state mutex
_mutex_lock: &RoomMutexGuard,
) {
const BUFSIZE: usize = size_of::<u64>();

View file

@ -2,7 +2,7 @@ use std::borrow::Borrow;
use conduwuit::{
Result, err, implement,
matrix::{PduEvent, StateKey},
matrix::{Event, StateKey},
};
use futures::{Stream, StreamExt, TryFutureExt};
use ruma::{EventId, RoomId, events::StateEventType};
@ -30,7 +30,7 @@ where
pub fn room_state_full<'a>(
&'a self,
room_id: &'a RoomId,
) -> impl Stream<Item = Result<((StateEventType, StateKey), PduEvent)>> + Send + 'a {
) -> impl Stream<Item = Result<((StateEventType, StateKey), impl Event)>> + Send + 'a {
self.services
.state
.get_room_shortstatehash(room_id)
@ -45,7 +45,7 @@ pub fn room_state_full<'a>(
pub fn room_state_full_pdus<'a>(
&'a self,
room_id: &'a RoomId,
) -> impl Stream<Item = Result<PduEvent>> + Send + 'a {
) -> impl Stream<Item = Result<impl Event>> + Send + 'a {
self.services
.state
.get_room_shortstatehash(room_id)
@ -84,7 +84,7 @@ pub async fn room_state_get(
room_id: &RoomId,
event_type: &StateEventType,
state_key: &str,
) -> Result<PduEvent> {
) -> Result<impl Event> {
self.services
.state
.get_room_shortstatehash(room_id)

View file

@ -2,14 +2,14 @@ use std::{borrow::Borrow, ops::Deref, sync::Arc};
use conduwuit::{
Result, at, err, implement,
matrix::{PduEvent, StateKey},
matrix::{Event, StateKey},
pair_of,
utils::{
result::FlatOk,
stream::{BroadbandExt, IterStream, ReadyExt, TryIgnore},
},
};
use conduwuit_database::Deserialized;
use database::Deserialized;
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, future::try_join, pin_mut};
use ruma::{
EventId, OwnedEventId, UserId,
@ -125,11 +125,9 @@ pub async fn state_get(
shortstatehash: ShortStateHash,
event_type: &StateEventType,
state_key: &str,
) -> Result<PduEvent> {
) -> Result<impl Event> {
self.state_get_id(shortstatehash, event_type, state_key)
.and_then(|event_id: OwnedEventId| async move {
self.services.timeline.get_pdu(&event_id).await
})
.and_then(async |event_id: OwnedEventId| self.services.timeline.get_pdu(&event_id).await)
.await
}
@ -316,18 +314,16 @@ pub fn state_added(
pub fn state_full(
&self,
shortstatehash: ShortStateHash,
) -> impl Stream<Item = ((StateEventType, StateKey), PduEvent)> + Send + '_ {
) -> impl Stream<Item = ((StateEventType, StateKey), impl Event)> + Send + '_ {
self.state_full_pdus(shortstatehash)
.ready_filter_map(|pdu| {
Some(((pdu.kind.to_string().into(), pdu.state_key.clone()?), pdu))
})
.ready_filter_map(|pdu| Some(((pdu.kind().clone().into(), pdu.state_key()?.into()), pdu)))
}
#[implement(super::Service)]
pub fn state_full_pdus(
&self,
shortstatehash: ShortStateHash,
) -> impl Stream<Item = PduEvent> + Send + '_ {
) -> impl Stream<Item = impl Event> + Send + '_ {
let short_ids = self
.state_full_shortids(shortstatehash)
.ignore_err()

View file

@ -1,4 +1,4 @@
use conduwuit::{Err, Result, implement, pdu::PduBuilder};
use conduwuit::{Err, Result, implement, matrix::Event, pdu::PduBuilder};
use ruma::{
EventId, RoomId, UserId,
events::{
@ -29,14 +29,14 @@ pub async fn user_can_redact(
if redacting_event
.as_ref()
.is_ok_and(|pdu| pdu.kind == TimelineEventType::RoomCreate)
.is_ok_and(|pdu| *pdu.kind() == TimelineEventType::RoomCreate)
{
return Err!(Request(Forbidden("Redacting m.room.create is not safe, forbidding.")));
}
if redacting_event
.as_ref()
.is_ok_and(|pdu| pdu.kind == TimelineEventType::RoomServerAcl)
.is_ok_and(|pdu| *pdu.kind() == TimelineEventType::RoomServerAcl)
{
return Err!(Request(Forbidden(
"Redacting m.room.server_acl will result in the room being inaccessible for \
@ -59,9 +59,9 @@ pub async fn user_can_redact(
&& match redacting_event {
| Ok(redacting_event) =>
if federation {
redacting_event.sender.server_name() == sender.server_name()
redacting_event.sender().server_name() == sender.server_name()
} else {
redacting_event.sender == sender
redacting_event.sender() == sender
},
| _ => false,
})
@ -72,10 +72,10 @@ pub async fn user_can_redact(
.room_state_get(room_id, &StateEventType::RoomCreate, "")
.await
{
| Ok(room_create) => Ok(room_create.sender == sender
| Ok(room_create) => Ok(room_create.sender() == sender
|| redacting_event
.as_ref()
.is_ok_and(|redacting_event| redacting_event.sender == sender)),
.is_ok_and(|redacting_event| redacting_event.sender() == sender)),
| _ => Err!(Database(
"No m.room.power_levels or m.room.create events in database for room"
)),

View file

@ -49,10 +49,9 @@ impl crate::Service for Service {
}
impl Service {
pub async fn add_to_thread<'a, E>(&self, root_event_id: &EventId, event: &'a E) -> Result
pub async fn add_to_thread<E>(&self, root_event_id: &EventId, event: &E) -> Result
where
E: Event + Send + Sync,
&'a E: Event + Send,
{
let root_id = self
.services
@ -120,7 +119,7 @@ impl Service {
self.services
.timeline
.replace_pdu(&root_id, &root_pdu_json, &root_pdu)
.replace_pdu(&root_id, &root_pdu_json)
.await?;
}
@ -130,7 +129,7 @@ impl Service {
users.extend_from_slice(&userids);
},
| _ => {
users.push(root_pdu.sender);
users.push(root_pdu.sender().to_owned());
},
}
users.push(event.sender().to_owned());
@ -162,10 +161,10 @@ impl Service {
.ready_take_while(move |pdu_id| pdu_id.shortroomid() == shortroomid.to_be_bytes())
.wide_filter_map(move |pdu_id| async move {
let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?;
let pdu_id: PduId = pdu_id.into();
if pdu.sender != user_id {
pdu.remove_transaction_id().ok();
let pdu_id: PduId = pdu_id.into();
if pdu.sender() != user_id {
pdu.as_mut_pdu().remove_transaction_id().ok();
}
Some((pdu_id.shorteventid, pdu))

View file

@ -207,7 +207,6 @@ impl Data {
&self,
pdu_id: &RawPduId,
pdu_json: &CanonicalJsonObject,
_pdu: &PduEvent,
) -> Result {
if self.pduid_pdu.get(pdu_id).await.is_not_found() {
return Err!(Request(NotFound("PDU does not exist.")));

View file

@ -14,8 +14,8 @@ pub use conduwuit::matrix::pdu::{PduId, RawPduId};
use conduwuit::{
Err, Error, Result, Server, at, debug, debug_warn, err, error, implement, info,
matrix::{
Event,
pdu::{EventHash, PduBuilder, PduCount, PduEvent, gen_event_id},
event::{Event, gen_event_id},
pdu::{EventHash, PduBuilder, PduCount, PduEvent},
state_res::{self, RoomVersion},
},
utils::{
@ -159,12 +159,12 @@ impl crate::Service for Service {
impl Service {
#[tracing::instrument(skip(self), level = "debug")]
pub async fn first_pdu_in_room(&self, room_id: &RoomId) -> Result<PduEvent> {
pub async fn first_pdu_in_room(&self, room_id: &RoomId) -> Result<impl Event> {
self.first_item_in_room(room_id).await.map(at!(1))
}
#[tracing::instrument(skip(self), level = "debug")]
pub async fn first_item_in_room(&self, room_id: &RoomId) -> Result<(PduCount, PduEvent)> {
pub async fn first_item_in_room(&self, room_id: &RoomId) -> Result<(PduCount, impl Event)> {
let pdus = self.pdus(None, room_id, None);
pin_mut!(pdus);
@ -174,7 +174,7 @@ impl Service {
}
#[tracing::instrument(skip(self), level = "debug")]
pub async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result<PduEvent> {
pub async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result<impl Event> {
self.db.latest_pdu_in_room(None, room_id).await
}
@ -216,13 +216,14 @@ impl Service {
///
/// Checks the `eventid_outlierpdu` Tree if not found in the timeline.
#[inline]
pub async fn get_non_outlier_pdu(&self, event_id: &EventId) -> Result<PduEvent> {
pub async fn get_non_outlier_pdu(&self, event_id: &EventId) -> Result<impl Event> {
self.db.get_non_outlier_pdu(event_id).await
}
/// Returns the pdu.
///
/// Checks the `eventid_outlierpdu` Tree if not found in the timeline.
#[inline]
pub async fn get_pdu(&self, event_id: &EventId) -> Result<PduEvent> {
self.db.get_pdu(event_id).await
}
@ -230,11 +231,13 @@ impl Service {
/// Returns the pdu.
///
/// This does __NOT__ check the outliers `Tree`.
#[inline]
pub async fn get_pdu_from_id(&self, pdu_id: &RawPduId) -> Result<PduEvent> {
self.db.get_pdu_from_id(pdu_id).await
}
/// Returns the pdu as a `BTreeMap<String, CanonicalJsonValue>`.
#[inline]
pub async fn get_pdu_json_from_id(&self, pdu_id: &RawPduId) -> Result<CanonicalJsonObject> {
self.db.get_pdu_json_from_id(pdu_id).await
}
@ -242,6 +245,7 @@ impl Service {
/// Checks if pdu exists
///
/// Checks the `eventid_outlierpdu` Tree if not found in the timeline.
#[inline]
pub fn pdu_exists<'a>(
&'a self,
event_id: &'a EventId,
@ -251,13 +255,8 @@ impl Service {
/// Removes a pdu and creates a new one with the same id.
#[tracing::instrument(skip(self), level = "debug")]
pub async fn replace_pdu(
&self,
pdu_id: &RawPduId,
pdu_json: &CanonicalJsonObject,
pdu: &PduEvent,
) -> Result<()> {
self.db.replace_pdu(pdu_id, pdu_json, pdu).await
pub async fn replace_pdu(&self, pdu_id: &RawPduId, pdu_json: &CanonicalJsonObject) -> Result {
self.db.replace_pdu(pdu_id, pdu_json).await
}
/// Creates a new persisted data unit and adds it to a room.
@ -310,25 +309,21 @@ impl Service {
unsigned.insert(
"prev_content".to_owned(),
CanonicalJsonValue::Object(
utils::to_canonical_object(prev_state.content.clone()).map_err(
|e| {
error!(
"Failed to convert prev_state to canonical JSON: {e}"
);
Error::bad_database(
"Failed to convert prev_state to canonical JSON.",
)
},
)?,
utils::to_canonical_object(prev_state.get_content_as_value())
.map_err(|e| {
err!(Database(error!(
"Failed to convert prev_state to canonical JSON: {e}",
)))
})?,
),
);
unsigned.insert(
String::from("prev_sender"),
CanonicalJsonValue::String(prev_state.sender.to_string()),
CanonicalJsonValue::String(prev_state.sender().to_string()),
);
unsigned.insert(
String::from("replaces_state"),
CanonicalJsonValue::String(prev_state.event_id.to_string()),
CanonicalJsonValue::String(prev_state.event_id().to_string()),
);
}
}
@ -709,14 +704,11 @@ impl Service {
.await
{
unsigned.insert("prev_content".to_owned(), prev_pdu.get_content_as_value());
unsigned.insert(
"prev_sender".to_owned(),
serde_json::to_value(&prev_pdu.sender)
.expect("UserId::to_value always works"),
);
unsigned
.insert("prev_sender".to_owned(), serde_json::to_value(prev_pdu.sender())?);
unsigned.insert(
"replaces_state".to_owned(),
serde_json::to_value(&prev_pdu.event_id).expect("EventId is valid json"),
serde_json::to_value(prev_pdu.event_id())?,
);
}
}
@ -759,7 +751,7 @@ impl Service {
unsigned: if unsigned.is_empty() {
None
} else {
Some(to_raw_value(&unsigned).expect("to_raw_value always works"))
Some(to_raw_value(&unsigned)?)
},
hashes: EventHash { sha256: "aaa".to_owned() },
signatures: None,
@ -1041,10 +1033,10 @@ impl Service {
/// Replace a PDU with the redacted form.
#[tracing::instrument(name = "redact", level = "debug", skip(self))]
pub async fn redact_pdu(
pub async fn redact_pdu<Pdu: Event + Send + Sync>(
&self,
event_id: &EventId,
reason: &PduEvent,
reason: &Pdu,
shortroomid: ShortRoomId,
) -> Result {
// TODO: Don't reserialize, keep original json
@ -1053,7 +1045,11 @@ impl Service {
return Ok(());
};
let mut pdu = self.get_pdu_from_id(&pdu_id).await.map_err(|e| {
let mut pdu = self
.get_pdu_from_id(&pdu_id)
.await
.map(Event::into_pdu)
.map_err(|e| {
err!(Database(error!(?pdu_id, ?event_id, ?e, "PDU ID points to invalid PDU.")))
})?;
@ -1065,15 +1061,15 @@ impl Service {
}
}
let room_version_id = self.services.state.get_room_version(&pdu.room_id).await?;
let room_version_id = self.services.state.get_room_version(pdu.room_id()).await?;
pdu.redact(&room_version_id, reason)?;
pdu.redact(&room_version_id, reason.to_value())?;
let obj = utils::to_canonical_object(&pdu).map_err(|e| {
err!(Database(error!(?event_id, ?e, "Failed to convert PDU to canonical JSON")))
})?;
self.replace_pdu(&pdu_id, &obj, &pdu).await
self.replace_pdu(&pdu_id, &obj).await
}
#[tracing::instrument(name = "backfill", level = "debug", skip(self))]
@ -1163,7 +1159,7 @@ impl Service {
backfill_server,
federation::backfill::get_backfill::v1::Request {
room_id: room_id.to_owned(),
v: vec![first_pdu.1.event_id.clone()],
v: vec![first_pdu.1.event_id().to_owned()],
limit: uint!(100),
},
)
@ -1248,8 +1244,11 @@ impl Service {
#[implement(Service)]
#[tracing::instrument(skip_all, level = "debug")]
async fn check_pdu_for_admin_room(&self, pdu: &PduEvent, sender: &UserId) -> Result<()> {
match &pdu.kind {
async fn check_pdu_for_admin_room<Pdu>(&self, pdu: &Pdu, sender: &UserId) -> Result
where
Pdu: Event + Send + Sync,
{
match pdu.kind() {
| TimelineEventType::RoomEncryption => {
return Err!(Request(Forbidden(error!("Encryption not supported in admins room."))));
},
@ -1273,7 +1272,7 @@ async fn check_pdu_for_admin_room(&self, pdu: &PduEvent, sender: &UserId) -> Res
let count = self
.services
.state_cache
.room_members(&pdu.room_id)
.room_members(pdu.room_id())
.ready_filter(|user| self.services.globals.user_is_local(user))
.ready_filter(|user| *user != target)
.boxed()
@ -1297,7 +1296,7 @@ async fn check_pdu_for_admin_room(&self, pdu: &PduEvent, sender: &UserId) -> Res
let count = self
.services
.state_cache
.room_members(&pdu.room_id)
.room_members(pdu.room_id())
.ready_filter(|user| self.services.globals.user_is_local(user))
.ready_filter(|user| *user != target)
.boxed()

View file

@ -798,7 +798,7 @@ impl Service {
let unread: UInt = self
.services
.user
.notification_count(&user_id, &pdu.room_id)
.notification_count(&user_id, pdu.room_id())
.await
.try_into()
.expect("notification count can't go that high");

View file

@ -1,4 +1,4 @@
use conduwuit::{Err, Result, implement, pdu::gen_event_id_canonical_json};
use conduwuit::{Err, Result, implement, matrix::event::gen_event_id_canonical_json};
use ruma::{
CanonicalJsonObject, CanonicalJsonValue, OwnedEventId, RoomVersionId, signatures::Verified,
};