diff --git a/src/admin/debug/commands.rs b/src/admin/debug/commands.rs index 74355311..81b0e9da 100644 --- a/src/admin/debug/commands.rs +++ b/src/admin/debug/commands.rs @@ -558,8 +558,8 @@ pub(super) async fn force_set_room_state_from_server( .latest_pdu_in_room(&room_id) .await .map_err(|_| err!(Database("Failed to find the latest PDU in database")))? - .event_id - .clone(), + .event_id() + .to_owned(), }; let room_version = self.services.rooms.state.get_room_version(&room_id).await?; diff --git a/src/admin/user/commands.rs b/src/admin/user/commands.rs index e15c0b2c..86206c2b 100644 --- a/src/admin/user/commands.rs +++ b/src/admin/user/commands.rs @@ -738,7 +738,7 @@ pub(super) async fn force_demote(&self, user_id: String, room_id: OwnedRoomOrAli .state_accessor .room_state_get(&room_id, &StateEventType::RoomCreate, "") .await - .is_ok_and(|event| event.sender == user_id); + .is_ok_and(|event| event.sender() == user_id); if !user_can_demote_self { return Err!("User is not allowed to modify their own power levels in the room.",); @@ -889,10 +889,7 @@ pub(super) async fn redact_event(&self, event_id: OwnedEventId) -> Result { return Err!("Event is already redacted."); } - let room_id = event.room_id; - let sender_user = event.sender; - - if !self.services.globals.user_is_local(&sender_user) { + if !self.services.globals.user_is_local(event.sender()) { return Err!("This command only works on local users."); } @@ -902,21 +899,21 @@ pub(super) async fn redact_event(&self, event_id: OwnedEventId) -> Result { ); let redaction_event_id = { - let state_lock = self.services.rooms.state.mutex.lock(&room_id).await; + let state_lock = self.services.rooms.state.mutex.lock(event.room_id()).await; self.services .rooms .timeline .build_and_append_pdu( PduBuilder { - redacts: Some(event.event_id.clone()), + redacts: Some(event.event_id().to_owned()), ..PduBuilder::timeline(&RoomRedactionEventContent { - redacts: Some(event.event_id.clone()), + redacts: Some(event.event_id().to_owned()), reason: Some(reason), }) }, - &sender_user, - &room_id, + event.sender(), + event.room_id(), &state_lock, ) .await? diff --git a/src/api/client/account.rs b/src/api/client/account.rs index 14bbcf98..df938c17 100644 --- a/src/api/client/account.rs +++ b/src/api/client/account.rs @@ -3,10 +3,9 @@ use std::fmt::Write; use axum::extract::State; use axum_client_ip::InsecureClientIp; use conduwuit::{ - Err, Error, Result, debug_info, err, error, info, is_equal_to, + Err, Error, Event, Result, debug_info, err, error, info, is_equal_to, matrix::pdu::PduBuilder, - utils, - utils::{ReadyExt, stream::BroadbandExt}, + utils::{self, ReadyExt, stream::BroadbandExt}, warn, }; use conduwuit_service::Services; @@ -140,16 +139,32 @@ pub(crate) async fn register_route( if !services.config.allow_registration && body.appservice_info.is_none() { match (body.username.as_ref(), body.initial_device_display_name.as_ref()) { | (Some(username), Some(device_display_name)) => { - info!(%is_guest, user = %username, device_name = %device_display_name, "Rejecting registration attempt as registration is disabled"); + info!( + %is_guest, + user = %username, + device_name = %device_display_name, + "Rejecting registration attempt as registration is disabled" + ); }, | (Some(username), _) => { - info!(%is_guest, user = %username, "Rejecting registration attempt as registration is disabled"); + info!( + %is_guest, + user = %username, + "Rejecting registration attempt as registration is disabled" + ); }, | (_, Some(device_display_name)) => { - info!(%is_guest, device_name = %device_display_name, "Rejecting registration attempt as registration is disabled"); + info!( + %is_guest, + device_name = %device_display_name, + "Rejecting registration attempt as registration is disabled" + ); }, | (None, _) => { - info!(%is_guest, "Rejecting registration attempt as registration is disabled"); + info!( + %is_guest, + "Rejecting registration attempt as registration is disabled" + ); }, } @@ -835,6 +850,7 @@ pub async fn full_user_deactivate( all_joined_rooms: &[OwnedRoomId], ) -> Result<()> { services.users.deactivate_account(user_id).await.ok(); + super::update_displayname(services, user_id, None, all_joined_rooms).await; super::update_avatar_url(services, user_id, None, None, all_joined_rooms).await; @@ -871,7 +887,7 @@ pub async fn full_user_deactivate( .state_accessor .room_state_get(room_id, &StateEventType::RoomCreate, "") .await - .is_ok_and(|event| event.sender == user_id); + .is_ok_and(|event| event.sender() == user_id); if user_can_demote_self { let mut power_levels_content = room_power_levels.unwrap_or_default(); diff --git a/src/api/client/directory.rs b/src/api/client/directory.rs index 2e219fd9..00879274 100644 --- a/src/api/client/directory.rs +++ b/src/api/client/directory.rs @@ -1,7 +1,7 @@ use axum::extract::State; use axum_client_ip::InsecureClientIp; use conduwuit::{ - Err, Result, err, info, + Err, Event, Result, err, info, utils::{ TryFutureExtExt, math::Expected, @@ -352,7 +352,7 @@ async fn user_can_publish_room( .room_state_get(room_id, &StateEventType::RoomPowerLevels, "") .await { - | Ok(event) => serde_json::from_str(event.content.get()) + | Ok(event) => serde_json::from_str(event.content().get()) .map_err(|_| err!(Database("Invalid event content for m.room.power_levels"))) .map(|content: RoomPowerLevelsEventContent| { RoomPowerLevels::from(content) @@ -365,7 +365,7 @@ async fn user_can_publish_room( .room_state_get(room_id, &StateEventType::RoomCreate, "") .await { - | Ok(event) => Ok(event.sender == user_id), + | Ok(event) => Ok(event.sender() == user_id), | _ => Err!(Request(Forbidden("User is not allowed to publish this room"))), } }, diff --git a/src/api/client/membership/invite.rs b/src/api/client/membership/invite.rs index 4ca3efb8..018fb774 100644 --- a/src/api/client/membership/invite.rs +++ b/src/api/client/membership/invite.rs @@ -2,7 +2,7 @@ use axum::extract::State; use axum_client_ip::InsecureClientIp; use conduwuit::{ Err, Result, debug_error, err, info, - matrix::pdu::{PduBuilder, gen_event_id_canonical_json}, + matrix::{event::gen_event_id_canonical_json, pdu::PduBuilder}, }; use futures::{FutureExt, join}; use ruma::{ diff --git a/src/api/client/membership/join.rs b/src/api/client/membership/join.rs index 669e9399..9d19d3bc 100644 --- a/src/api/client/membership/join.rs +++ b/src/api/client/membership/join.rs @@ -6,7 +6,8 @@ use conduwuit::{ Err, Result, debug, debug_info, debug_warn, err, error, info, matrix::{ StateKey, - pdu::{PduBuilder, PduEvent, gen_event_id, gen_event_id_canonical_json}, + event::{gen_event_id, gen_event_id_canonical_json}, + pdu::{PduBuilder, PduEvent}, state_res, }, result::FlatOk, diff --git a/src/api/client/membership/knock.rs b/src/api/client/membership/knock.rs index 544dcfb3..79f16631 100644 --- a/src/api/client/membership/knock.rs +++ b/src/api/client/membership/knock.rs @@ -4,7 +4,10 @@ use axum::extract::State; use axum_client_ip::InsecureClientIp; use conduwuit::{ Err, Result, debug, debug_info, debug_warn, err, info, - matrix::pdu::{PduBuilder, PduEvent, gen_event_id}, + matrix::{ + event::{Event, gen_event_id}, + pdu::{PduBuilder, PduEvent}, + }, result::FlatOk, trace, utils::{self, shuffle, stream::IterStream}, diff --git a/src/api/client/membership/leave.rs b/src/api/client/membership/leave.rs index a64fb41f..f4f1666b 100644 --- a/src/api/client/membership/leave.rs +++ b/src/api/client/membership/leave.rs @@ -3,7 +3,7 @@ use std::collections::HashSet; use axum::extract::State; use conduwuit::{ Err, Result, debug_info, debug_warn, err, - matrix::pdu::{PduBuilder, gen_event_id}, + matrix::{event::gen_event_id, pdu::PduBuilder}, utils::{self, FutureBoolExt, future::ReadyEqExt}, warn, }; diff --git a/src/api/client/membership/members.rs b/src/api/client/membership/members.rs index 4a7abf6d..05ba1c43 100644 --- a/src/api/client/membership/members.rs +++ b/src/api/client/membership/members.rs @@ -1,13 +1,12 @@ use axum::extract::State; use conduwuit::{ Err, Event, Result, at, - matrix::pdu::PduEvent, utils::{ future::TryExtExt, stream::{BroadbandExt, ReadyExt}, }, }; -use futures::{StreamExt, future::join}; +use futures::{FutureExt, StreamExt, future::join}; use ruma::{ api::client::membership::{ get_member_events::{self, v3::MembershipEventFilter}, @@ -55,6 +54,7 @@ pub(crate) async fn get_member_events_route( .ready_filter_map(|pdu| membership_filter(pdu, membership, not_membership)) .map(Event::into_format) .collect() + .boxed() .await, }) } @@ -98,11 +98,11 @@ pub(crate) async fn joined_members_route( }) } -fn membership_filter( - pdu: PduEvent, +fn membership_filter( + pdu: Pdu, for_membership: Option<&MembershipEventFilter>, not_membership: Option<&MembershipEventFilter>, -) -> Option { +) -> Option { let membership_state_filter = match for_membership { | Some(MembershipEventFilter::Ban) => MembershipState::Ban, | Some(MembershipEventFilter::Invite) => MembershipState::Invite, diff --git a/src/api/client/message.rs b/src/api/client/message.rs index e32d020f..f8818ebb 100644 --- a/src/api/client/message.rs +++ b/src/api/client/message.rs @@ -2,9 +2,10 @@ use axum::extract::State; use conduwuit::{ Err, Result, at, matrix::{ - Event, - pdu::{PduCount, PduEvent}, + event::{Event, Matches}, + pdu::PduCount, }, + ref_at, utils::{ IterStream, ReadyExt, result::{FlatOk, LogErr}, @@ -216,7 +217,9 @@ where pin_mut!(receipts); let witness: Witness = events .stream() - .map(|(_, pdu)| pdu.sender.clone()) + .map(ref_at!(1)) + .map(Event::sender) + .map(ToOwned::to_owned) .chain( receipts .ready_take_while(|(_, c, _)| *c <= newest.into_unsigned()) @@ -261,27 +264,33 @@ pub(crate) async fn ignored_filter( } #[inline] -pub(crate) async fn is_ignored_pdu( +pub(crate) async fn is_ignored_pdu( services: &Services, - pdu: &PduEvent, + event: &Pdu, user_id: &UserId, -) -> bool { +) -> bool +where + Pdu: Event + Send + Sync, +{ // exclude Synapse's dummy events from bloating up response bodies. clients // don't need to see this. - if pdu.kind.to_cow_str() == "org.matrix.dummy_event" { + if event.kind().to_cow_str() == "org.matrix.dummy_event" { return true; } - let ignored_type = IGNORED_MESSAGE_TYPES.binary_search(&pdu.kind).is_ok(); + let ignored_type = IGNORED_MESSAGE_TYPES.binary_search(event.kind()).is_ok(); let ignored_server = services .moderation - .is_remote_server_ignored(pdu.sender().server_name()); + .is_remote_server_ignored(event.sender().server_name()); if ignored_type && (ignored_server || (!services.config.send_messages_from_ignored_users_to_client - && services.users.user_is_ignored(&pdu.sender, user_id).await)) + && services + .users + .user_is_ignored(event.sender(), user_id) + .await)) { return true; } @@ -300,7 +309,7 @@ pub(crate) async fn visibility_filter( services .rooms .state_accessor - .user_can_see_event(user_id, &pdu.room_id, &pdu.event_id) + .user_can_see_event(user_id, pdu.room_id(), pdu.event_id()) .await .then_some(item) } @@ -308,7 +317,7 @@ pub(crate) async fn visibility_filter( #[inline] pub(crate) fn event_filter(item: PdusIterItem, filter: &RoomEventFilter) -> Option { let (_, pdu) = &item; - pdu.matches(filter).then_some(item) + filter.matches(pdu).then_some(item) } #[cfg_attr(debug_assertions, conduwuit::ctor)] diff --git a/src/api/client/profile.rs b/src/api/client/profile.rs index ff8c2a0b..1882495c 100644 --- a/src/api/client/profile.rs +++ b/src/api/client/profile.rs @@ -195,11 +195,9 @@ pub(crate) async fn get_avatar_url_route( services .users .set_displayname(&body.user_id, response.displayname.clone()); - services .users .set_avatar_url(&body.user_id, response.avatar_url.clone()); - services .users .set_blurhash(&body.user_id, response.blurhash.clone()); @@ -256,15 +254,12 @@ pub(crate) async fn get_profile_route( services .users .set_displayname(&body.user_id, response.displayname.clone()); - services .users .set_avatar_url(&body.user_id, response.avatar_url.clone()); - services .users .set_blurhash(&body.user_id, response.blurhash.clone()); - services .users .set_timezone(&body.user_id, response.tz.clone()); diff --git a/src/api/client/relations.rs b/src/api/client/relations.rs index ad726b90..1aa34ada 100644 --- a/src/api/client/relations.rs +++ b/src/api/client/relations.rs @@ -1,10 +1,10 @@ use axum::extract::State; use conduwuit::{ Result, at, - matrix::{Event, pdu::PduCount}, + matrix::{Event, event::RelationTypeEqual, pdu::PduCount}, utils::{IterStream, ReadyExt, result::FlatOk, stream::WidebandExt}, }; -use conduwuit_service::{Services, rooms::timeline::PdusIterItem}; +use conduwuit_service::Services; use futures::StreamExt; use ruma::{ EventId, RoomId, UInt, UserId, @@ -129,7 +129,7 @@ async fn paginate_relations_with_filter( // Spec (v1.10) recommends depth of at least 3 let depth: u8 = if recurse { 3 } else { 1 }; - let events: Vec = services + let events: Vec<_> = services .rooms .pdu_metadata .get_relations(sender_user, room_id, target, start, limit, depth, dir) @@ -138,12 +138,12 @@ async fn paginate_relations_with_filter( .filter(|(_, pdu)| { filter_event_type .as_ref() - .is_none_or(|kind| *kind == pdu.kind) + .is_none_or(|kind| kind == pdu.kind()) }) .filter(|(_, pdu)| { filter_rel_type .as_ref() - .is_none_or(|rel_type| pdu.relation_type_equal(rel_type)) + .is_none_or(|rel_type| rel_type.relation_type_equal(pdu)) }) .stream() .ready_take_while(|(count, _)| Some(*count) != to) @@ -172,17 +172,17 @@ async fn paginate_relations_with_filter( }) } -async fn visibility_filter( +async fn visibility_filter( services: &Services, sender_user: &UserId, - item: PdusIterItem, -) -> Option { + item: (PduCount, Pdu), +) -> Option<(PduCount, Pdu)> { let (_, pdu) = &item; services .rooms .state_accessor - .user_can_see_event(sender_user, &pdu.room_id, &pdu.event_id) + .user_can_see_event(sender_user, pdu.room_id(), pdu.event_id()) .await .then_some(item) } diff --git a/src/api/client/report.rs b/src/api/client/report.rs index 1019b358..052329d1 100644 --- a/src/api/client/report.rs +++ b/src/api/client/report.rs @@ -260,5 +260,6 @@ async fn delay_response() { "Got successful /report request, waiting {time_to_wait} seconds before sending \ successful response." ); + sleep(Duration::from_secs(time_to_wait)).await; } diff --git a/src/api/client/room/initial_sync.rs b/src/api/client/room/initial_sync.rs index c2f59d4c..2aca5b9d 100644 --- a/src/api/client/room/initial_sync.rs +++ b/src/api/client/room/initial_sync.rs @@ -49,7 +49,9 @@ pub(crate) async fn room_initial_sync_route( .try_collect::>(); let (membership, visibility, state, events) = - try_join4(membership, visibility, state, events).await?; + try_join4(membership, visibility, state, events) + .boxed() + .await?; let messages = PaginationChunk { start: events.last().map(at!(0)).as_ref().map(ToString::to_string), diff --git a/src/api/client/room/summary.rs b/src/api/client/room/summary.rs index ab534765..635f5a8a 100644 --- a/src/api/client/room/summary.rs +++ b/src/api/client/room/summary.rs @@ -112,13 +112,15 @@ async fn local_room_summary_response( ) -> Result { trace!(?sender_user, "Sending local room summary response for {room_id:?}"); let join_rule = services.rooms.state_accessor.get_join_rules(room_id); + let world_readable = services.rooms.state_accessor.is_world_readable(room_id); + let guest_can_join = services.rooms.state_accessor.guest_can_join(room_id); let (join_rule, world_readable, guest_can_join) = join3(join_rule, world_readable, guest_can_join).await; - trace!("{join_rule:?}, {world_readable:?}, {guest_can_join:?}"); + trace!("{join_rule:?}, {world_readable:?}, {guest_can_join:?}"); user_can_see_summary( services, room_id, diff --git a/src/api/client/room/upgrade.rs b/src/api/client/room/upgrade.rs index d8f5ea83..ae632235 100644 --- a/src/api/client/room/upgrade.rs +++ b/src/api/client/room/upgrade.rs @@ -2,7 +2,7 @@ use std::cmp::max; use axum::extract::State; use conduwuit::{ - Err, Error, Result, err, info, + Err, Error, Event, Result, err, info, matrix::{StateKey, pdu::PduBuilder}, }; use futures::StreamExt; @@ -215,7 +215,7 @@ pub(crate) async fn upgrade_room_route( .room_state_get(&body.room_id, event_type, "") .await { - | Ok(v) => v.content.clone(), + | Ok(v) => v.content().to_owned(), | Err(_) => continue, // Skipping missing events. }; diff --git a/src/api/client/sync/v4.rs b/src/api/client/sync/v4.rs index cabd67e4..14cd50d8 100644 --- a/src/api/client/sync/v4.rs +++ b/src/api/client/sync/v4.rs @@ -6,7 +6,7 @@ use std::{ use axum::extract::State; use conduwuit::{ - Err, Error, Event, PduCount, PduEvent, Result, at, debug, error, extract_variant, + Err, Error, Event, PduCount, Result, at, debug, error, extract_variant, matrix::TypeStateKey, utils::{ BoolExt, IterStream, ReadyExt, TryFutureExtExt, @@ -627,7 +627,7 @@ pub(crate) async fn sync_events_v4_route( .state_accessor .room_state_get(room_id, &state.0, &state.1) .await - .map(PduEvent::into_format) + .map(Event::into_format) .ok() }) .collect() diff --git a/src/api/server/invite.rs b/src/api/server/invite.rs index 0d26d787..0a9b2e10 100644 --- a/src/api/server/invite.rs +++ b/src/api/server/invite.rs @@ -2,8 +2,10 @@ use axum::extract::State; use axum_client_ip::InsecureClientIp; use base64::{Engine as _, engine::general_purpose}; use conduwuit::{ - Err, Error, PduEvent, Result, err, matrix::Event, pdu::gen_event_id, utils, - utils::hash::sha256, warn, + Err, Error, PduEvent, Result, err, + matrix::{Event, event::gen_event_id}, + utils::{self, hash::sha256}, + warn, }; use ruma::{ CanonicalJsonValue, OwnedUserId, UserId, diff --git a/src/api/server/send_join.rs b/src/api/server/send_join.rs index 895eca81..652451c7 100644 --- a/src/api/server/send_join.rs +++ b/src/api/server/send_join.rs @@ -5,7 +5,7 @@ use std::borrow::Borrow; use axum::extract::State; use conduwuit::{ Err, Result, at, err, - pdu::gen_event_id_canonical_json, + matrix::event::gen_event_id_canonical_json, utils::stream::{IterStream, TryBroadbandExt}, warn, }; diff --git a/src/api/server/send_knock.rs b/src/api/server/send_knock.rs index 8d3697d2..ffd41ada 100644 --- a/src/api/server/send_knock.rs +++ b/src/api/server/send_knock.rs @@ -1,7 +1,7 @@ use axum::extract::State; use conduwuit::{ Err, Result, err, - matrix::pdu::{PduEvent, gen_event_id_canonical_json}, + matrix::{event::gen_event_id_canonical_json, pdu::PduEvent}, warn, }; use futures::FutureExt; diff --git a/src/api/server/send_leave.rs b/src/api/server/send_leave.rs index d3dc994c..b6336e1a 100644 --- a/src/api/server/send_leave.rs +++ b/src/api/server/send_leave.rs @@ -1,7 +1,7 @@ #![allow(deprecated)] use axum::extract::State; -use conduwuit::{Err, Result, err, matrix::pdu::gen_event_id_canonical_json}; +use conduwuit::{Err, Result, err, matrix::event::gen_event_id_canonical_json}; use conduwuit_service::Services; use futures::FutureExt; use ruma::{ diff --git a/src/core/config/proxy.rs b/src/core/config/proxy.rs index ea388f24..77c4531a 100644 --- a/src/core/config/proxy.rs +++ b/src/core/config/proxy.rs @@ -88,10 +88,7 @@ impl PartialProxyConfig { } } match (included_because, excluded_because) { - | (Some(a), Some(b)) if a.more_specific_than(b) => Some(&self.url), /* included for - * a more specific - * reason */ - // than excluded + | (Some(a), Some(b)) if a.more_specific_than(b) => Some(&self.url), | (Some(_), None) => Some(&self.url), | _ => None, } diff --git a/src/core/info/cargo.rs b/src/core/info/cargo.rs index e70bdcd5..61a97508 100644 --- a/src/core/info/cargo.rs +++ b/src/core/info/cargo.rs @@ -84,10 +84,12 @@ fn append_features(features: &mut Vec, manifest: &str) -> Result<()> { fn init_dependencies() -> Result { let manifest = Manifest::from_str(WORKSPACE_MANIFEST)?; - Ok(manifest + let deps_set = manifest .workspace .as_ref() .expect("manifest has workspace section") .dependencies - .clone()) + .clone(); + + Ok(deps_set) } diff --git a/src/core/matrix/event.rs b/src/core/matrix/event.rs index 5b12770b..a1d1339e 100644 --- a/src/core/matrix/event.rs +++ b/src/core/matrix/event.rs @@ -1,21 +1,27 @@ mod content; +mod filter; mod format; +mod id; mod redact; +mod relation; mod type_ext; +mod unsigned; + +use std::fmt::Debug; use ruma::{ - EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, RoomVersionId, UserId, - events::TimelineEventType, + CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, + RoomVersionId, UserId, events::TimelineEventType, }; use serde::Deserialize; use serde_json::{Value as JsonValue, value::RawValue as RawJsonValue}; -pub use self::type_ext::TypeExt; -use super::state_key::StateKey; -use crate::Result; +pub use self::{filter::Matches, id::*, relation::RelationTypeEqual, type_ext::TypeExt}; +use super::{pdu::Pdu, state_key::StateKey}; +use crate::{Result, utils}; /// Abstraction of a PDU so users can have their own PDU types. -pub trait Event { +pub trait Event: Clone + Debug { /// Serialize into a Ruma JSON format, consuming. #[inline] fn into_format(self) -> T @@ -36,6 +42,41 @@ pub trait Event { format::Ref(self).into() } + #[inline] + fn contains_unsigned_property(&self, property: &str, is_type: T) -> bool + where + T: FnOnce(&JsonValue) -> bool, + Self: Sized, + { + unsigned::contains_unsigned_property::(self, property, is_type) + } + + #[inline] + fn get_unsigned_property(&self, property: &str) -> Result + where + T: for<'de> Deserialize<'de>, + Self: Sized, + { + unsigned::get_unsigned_property::(self, property) + } + + #[inline] + fn get_unsigned_as_value(&self) -> JsonValue + where + Self: Sized, + { + unsigned::get_unsigned_as_value(self) + } + + #[inline] + fn get_unsigned(&self) -> Result + where + T: for<'de> Deserialize<'de>, + Self: Sized, + { + unsigned::get_unsigned::(self) + } + #[inline] fn get_content_as_value(&self) -> JsonValue where @@ -69,6 +110,39 @@ pub trait Event { redact::is_redacted(self) } + #[inline] + fn into_canonical_object(self) -> CanonicalJsonObject + where + Self: Sized, + { + utils::to_canonical_object(self.into_pdu()).expect("failed to create Value::Object") + } + + #[inline] + fn to_canonical_object(&self) -> CanonicalJsonObject { + utils::to_canonical_object(self.as_pdu()).expect("failed to create Value::Object") + } + + #[inline] + fn into_value(self) -> JsonValue + where + Self: Sized, + { + serde_json::to_value(self.into_pdu()).expect("failed to create JSON Value") + } + + #[inline] + fn to_value(&self) -> JsonValue { + serde_json::to_value(self.as_pdu()).expect("failed to create JSON Value") + } + + #[inline] + fn as_mut_pdu(&mut self) -> &mut Pdu { unimplemented!("not a mutable Pdu") } + + fn as_pdu(&self) -> &Pdu; + + fn into_pdu(self) -> Pdu; + fn is_owned(&self) -> bool; // @@ -76,7 +150,7 @@ pub trait Event { // /// All the authenticating events for this event. - fn auth_events(&self) -> impl DoubleEndedIterator + Send + '_; + fn auth_events(&self) -> impl DoubleEndedIterator + Clone + Send + '_; /// The event's content. fn content(&self) -> &RawJsonValue; @@ -88,7 +162,7 @@ pub trait Event { fn origin_server_ts(&self) -> MilliSecondsSinceUnixEpoch; /// The events before this event. - fn prev_events(&self) -> impl DoubleEndedIterator + Send + '_; + fn prev_events(&self) -> impl DoubleEndedIterator + Clone + Send + '_; /// If this event is a redaction event this is the event it redacts. fn redacts(&self) -> Option<&EventId>; diff --git a/src/core/matrix/event/filter.rs b/src/core/matrix/event/filter.rs new file mode 100644 index 00000000..d3a225b6 --- /dev/null +++ b/src/core/matrix/event/filter.rs @@ -0,0 +1,93 @@ +use ruma::api::client::filter::{RoomEventFilter, UrlFilter}; +use serde_json::Value; + +use super::Event; +use crate::is_equal_to; + +pub trait Matches { + fn matches(&self, event: &E) -> bool; +} + +impl Matches for &RoomEventFilter { + #[inline] + fn matches(&self, event: &E) -> bool { + if !matches_sender(event, self) { + return false; + } + + if !matches_room(event, self) { + return false; + } + + if !matches_type(event, self) { + return false; + } + + if !matches_url(event, self) { + return false; + } + + true + } +} + +fn matches_room(event: &E, filter: &RoomEventFilter) -> bool { + if filter.not_rooms.iter().any(is_equal_to!(event.room_id())) { + return false; + } + + if let Some(rooms) = filter.rooms.as_ref() { + if !rooms.iter().any(is_equal_to!(event.room_id())) { + return false; + } + } + + true +} + +fn matches_sender(event: &E, filter: &RoomEventFilter) -> bool { + if filter.not_senders.iter().any(is_equal_to!(event.sender())) { + return false; + } + + if let Some(senders) = filter.senders.as_ref() { + if !senders.iter().any(is_equal_to!(event.sender())) { + return false; + } + } + + true +} + +fn matches_type(event: &E, filter: &RoomEventFilter) -> bool { + let kind = event.kind().to_cow_str(); + + if filter.not_types.iter().any(is_equal_to!(&kind)) { + return false; + } + + if let Some(types) = filter.types.as_ref() { + if !types.iter().any(is_equal_to!(&kind)) { + return false; + } + } + + true +} + +fn matches_url(event: &E, filter: &RoomEventFilter) -> bool { + let Some(url_filter) = filter.url_filter.as_ref() else { + return true; + }; + + //TODO: might be better to use Ruma's Raw rather than serde here + let url = event + .get_content_as_value() + .get("url") + .is_some_and(Value::is_string); + + match url_filter { + | UrlFilter::EventsWithUrl => url, + | UrlFilter::EventsWithoutUrl => !url, + } +} diff --git a/src/core/matrix/pdu/event_id.rs b/src/core/matrix/event/id.rs similarity index 100% rename from src/core/matrix/pdu/event_id.rs rename to src/core/matrix/event/id.rs diff --git a/src/core/matrix/event/relation.rs b/src/core/matrix/event/relation.rs new file mode 100644 index 00000000..58324e86 --- /dev/null +++ b/src/core/matrix/event/relation.rs @@ -0,0 +1,28 @@ +use ruma::events::relation::RelationType; +use serde::Deserialize; + +use super::Event; + +pub trait RelationTypeEqual { + fn relation_type_equal(&self, event: &E) -> bool; +} + +#[derive(Clone, Debug, Deserialize)] +struct ExtractRelatesToEventId { + #[serde(rename = "m.relates_to")] + relates_to: ExtractRelType, +} + +#[derive(Clone, Debug, Deserialize)] +struct ExtractRelType { + rel_type: RelationType, +} + +impl RelationTypeEqual for RelationType { + fn relation_type_equal(&self, event: &E) -> bool { + event + .get_content() + .map(|c: ExtractRelatesToEventId| c.relates_to.rel_type) + .is_ok_and(|r| r == *self) + } +} diff --git a/src/core/matrix/event/unsigned.rs b/src/core/matrix/event/unsigned.rs new file mode 100644 index 00000000..42928af4 --- /dev/null +++ b/src/core/matrix/event/unsigned.rs @@ -0,0 +1,51 @@ +use serde::Deserialize; +use serde_json::value::Value as JsonValue; + +use super::Event; +use crate::{Result, err, is_true}; + +pub(super) fn contains_unsigned_property(event: &E, property: &str, is_type: F) -> bool +where + F: FnOnce(&JsonValue) -> bool, + E: Event, +{ + get_unsigned_as_value(event) + .get(property) + .map(is_type) + .is_some_and(is_true!()) +} + +pub(super) fn get_unsigned_property(event: &E, property: &str) -> Result +where + T: for<'de> Deserialize<'de>, + E: Event, +{ + get_unsigned_as_value(event) + .get_mut(property) + .map(JsonValue::take) + .map(serde_json::from_value) + .ok_or(err!(Request(NotFound("property not found in unsigned object"))))? + .map_err(|e| err!(Database("Failed to deserialize unsigned.{property} into type: {e}"))) +} + +#[must_use] +pub(super) fn get_unsigned_as_value(event: &E) -> JsonValue +where + E: Event, +{ + get_unsigned::(event).unwrap_or_default() +} + +pub(super) fn get_unsigned(event: &E) -> Result +where + T: for<'de> Deserialize<'de>, + E: Event, +{ + event + .unsigned() + .as_ref() + .map(|raw| raw.get()) + .map(serde_json::from_str) + .ok_or(err!(Request(NotFound("\"unsigned\" property not found in pdu"))))? + .map_err(|e| err!(Database("Failed to deserialize \"unsigned\" into value: {e}"))) +} diff --git a/src/core/matrix/pdu.rs b/src/core/matrix/pdu.rs index e64baeb8..bff0c203 100644 --- a/src/core/matrix/pdu.rs +++ b/src/core/matrix/pdu.rs @@ -1,12 +1,8 @@ mod builder; -mod content; mod count; -mod event_id; -mod filter; mod id; mod raw_id; mod redact; -mod relation; #[cfg(test)] mod tests; mod unsigned; @@ -24,7 +20,6 @@ pub use self::{ Count as PduCount, Id as PduId, Pdu as PduEvent, RawId as RawPduId, builder::{Builder, Builder as PduBuilder}, count::Count, - event_id::*, id::{ShortId, *}, raw_id::*, }; @@ -91,7 +86,7 @@ impl Pdu { impl Event for Pdu { #[inline] - fn auth_events(&self) -> impl DoubleEndedIterator + Send + '_ { + fn auth_events(&self) -> impl DoubleEndedIterator + Clone + Send + '_ { self.auth_events.iter().map(AsRef::as_ref) } @@ -107,7 +102,7 @@ impl Event for Pdu { } #[inline] - fn prev_events(&self) -> impl DoubleEndedIterator + Send + '_ { + fn prev_events(&self) -> impl DoubleEndedIterator + Clone + Send + '_ { self.prev_events.iter().map(AsRef::as_ref) } @@ -129,13 +124,22 @@ impl Event for Pdu { #[inline] fn unsigned(&self) -> Option<&RawJsonValue> { self.unsigned.as_deref() } + #[inline] + fn as_mut_pdu(&mut self) -> &mut Pdu { self } + + #[inline] + fn as_pdu(&self) -> &Pdu { self } + + #[inline] + fn into_pdu(self) -> Pdu { self } + #[inline] fn is_owned(&self) -> bool { true } } impl Event for &Pdu { #[inline] - fn auth_events(&self) -> impl DoubleEndedIterator + Send + '_ { + fn auth_events(&self) -> impl DoubleEndedIterator + Clone + Send + '_ { self.auth_events.iter().map(AsRef::as_ref) } @@ -151,7 +155,7 @@ impl Event for &Pdu { } #[inline] - fn prev_events(&self) -> impl DoubleEndedIterator + Send + '_ { + fn prev_events(&self) -> impl DoubleEndedIterator + Clone + Send + '_ { self.prev_events.iter().map(AsRef::as_ref) } @@ -173,6 +177,12 @@ impl Event for &Pdu { #[inline] fn unsigned(&self) -> Option<&RawJsonValue> { self.unsigned.as_deref() } + #[inline] + fn as_pdu(&self) -> &Pdu { self } + + #[inline] + fn into_pdu(self) -> Pdu { self.clone() } + #[inline] fn is_owned(&self) -> bool { false } } diff --git a/src/core/matrix/pdu/content.rs b/src/core/matrix/pdu/content.rs deleted file mode 100644 index 4e60ce6e..00000000 --- a/src/core/matrix/pdu/content.rs +++ /dev/null @@ -1,20 +0,0 @@ -use serde::Deserialize; -use serde_json::value::Value as JsonValue; - -use crate::{Result, err, implement}; - -#[must_use] -#[implement(super::Pdu)] -pub fn get_content_as_value(&self) -> JsonValue { - self.get_content() - .expect("pdu content must be a valid JSON value") -} - -#[implement(super::Pdu)] -pub fn get_content(&self) -> Result -where - T: for<'de> Deserialize<'de>, -{ - serde_json::from_str(self.content.get()) - .map_err(|e| err!(Database("Failed to deserialize pdu content into type: {e}"))) -} diff --git a/src/core/matrix/pdu/filter.rs b/src/core/matrix/pdu/filter.rs deleted file mode 100644 index aabf13db..00000000 --- a/src/core/matrix/pdu/filter.rs +++ /dev/null @@ -1,90 +0,0 @@ -use ruma::api::client::filter::{RoomEventFilter, UrlFilter}; -use serde_json::Value; - -use crate::{implement, is_equal_to}; - -#[implement(super::Pdu)] -#[must_use] -pub fn matches(&self, filter: &RoomEventFilter) -> bool { - if !self.matches_sender(filter) { - return false; - } - - if !self.matches_room(filter) { - return false; - } - - if !self.matches_type(filter) { - return false; - } - - if !self.matches_url(filter) { - return false; - } - - true -} - -#[implement(super::Pdu)] -fn matches_room(&self, filter: &RoomEventFilter) -> bool { - if filter.not_rooms.contains(&self.room_id) { - return false; - } - - if let Some(rooms) = filter.rooms.as_ref() { - if !rooms.contains(&self.room_id) { - return false; - } - } - - true -} - -#[implement(super::Pdu)] -fn matches_sender(&self, filter: &RoomEventFilter) -> bool { - if filter.not_senders.contains(&self.sender) { - return false; - } - - if let Some(senders) = filter.senders.as_ref() { - if !senders.contains(&self.sender) { - return false; - } - } - - true -} - -#[implement(super::Pdu)] -fn matches_type(&self, filter: &RoomEventFilter) -> bool { - let event_type = &self.kind.to_cow_str(); - if filter.not_types.iter().any(is_equal_to!(event_type)) { - return false; - } - - if let Some(types) = filter.types.as_ref() { - if !types.iter().any(is_equal_to!(event_type)) { - return false; - } - } - - true -} - -#[implement(super::Pdu)] -fn matches_url(&self, filter: &RoomEventFilter) -> bool { - let Some(url_filter) = filter.url_filter.as_ref() else { - return true; - }; - - //TODO: might be better to use Ruma's Raw rather than serde here - let url = serde_json::from_str::(self.content.get()) - .expect("parsing content JSON failed") - .get("url") - .is_some_and(Value::is_string); - - match url_filter { - | UrlFilter::EventsWithUrl => url, - | UrlFilter::EventsWithoutUrl => !url, - } -} diff --git a/src/core/matrix/pdu/redact.rs b/src/core/matrix/pdu/redact.rs index e6a03209..896e03f8 100644 --- a/src/core/matrix/pdu/redact.rs +++ b/src/core/matrix/pdu/redact.rs @@ -1,10 +1,10 @@ use ruma::{RoomVersionId, canonical_json::redact_content_in_place}; -use serde_json::{json, value::to_raw_value}; +use serde_json::{Value as JsonValue, json, value::to_raw_value}; use crate::{Error, Result, err, implement}; #[implement(super::Pdu)] -pub fn redact(&mut self, room_version_id: &RoomVersionId, reason: &Self) -> Result { +pub fn redact(&mut self, room_version_id: &RoomVersionId, reason: JsonValue) -> Result { self.unsigned = None; let mut content = serde_json::from_str(self.content.get()) diff --git a/src/core/matrix/pdu/relation.rs b/src/core/matrix/pdu/relation.rs deleted file mode 100644 index 2968171e..00000000 --- a/src/core/matrix/pdu/relation.rs +++ /dev/null @@ -1,22 +0,0 @@ -use ruma::events::relation::RelationType; -use serde::Deserialize; - -use crate::implement; - -#[derive(Clone, Debug, Deserialize)] -struct ExtractRelType { - rel_type: RelationType, -} -#[derive(Clone, Debug, Deserialize)] -struct ExtractRelatesToEventId { - #[serde(rename = "m.relates_to")] - relates_to: ExtractRelType, -} - -#[implement(super::Pdu)] -#[must_use] -pub fn relation_type_equal(&self, rel_type: &RelationType) -> bool { - self.get_content() - .map(|c: ExtractRelatesToEventId| c.relates_to.rel_type) - .is_ok_and(|r| r == *rel_type) -} diff --git a/src/core/matrix/pdu/unsigned.rs b/src/core/matrix/pdu/unsigned.rs index 23897519..0c58bb68 100644 --- a/src/core/matrix/pdu/unsigned.rs +++ b/src/core/matrix/pdu/unsigned.rs @@ -1,11 +1,10 @@ use std::collections::BTreeMap; use ruma::MilliSecondsSinceUnixEpoch; -use serde::Deserialize; use serde_json::value::{RawValue as RawJsonValue, Value as JsonValue, to_raw_value}; use super::Pdu; -use crate::{Result, err, implement, is_true}; +use crate::{Result, err, implement}; #[implement(Pdu)] pub fn remove_transaction_id(&mut self) -> Result { @@ -74,43 +73,3 @@ pub fn add_relation(&mut self, name: &str, pdu: Option<&Pdu>) -> Result { Ok(()) } - -#[implement(Pdu)] -pub fn contains_unsigned_property(&self, property: &str, is_type: F) -> bool -where - F: FnOnce(&JsonValue) -> bool, -{ - self.get_unsigned_as_value() - .get(property) - .map(is_type) - .is_some_and(is_true!()) -} - -#[implement(Pdu)] -pub fn get_unsigned_property(&self, property: &str) -> Result -where - T: for<'de> Deserialize<'de>, -{ - self.get_unsigned_as_value() - .get_mut(property) - .map(JsonValue::take) - .map(serde_json::from_value) - .ok_or(err!(Request(NotFound("property not found in unsigned object"))))? - .map_err(|e| err!(Database("Failed to deserialize unsigned.{property} into type: {e}"))) -} - -#[implement(Pdu)] -#[must_use] -pub fn get_unsigned_as_value(&self) -> JsonValue { - self.get_unsigned::().unwrap_or_default() -} - -#[implement(Pdu)] -pub fn get_unsigned(&self) -> Result { - self.unsigned - .as_ref() - .map(|raw| raw.get()) - .map(serde_json::from_str) - .ok_or(err!(Request(NotFound("\"unsigned\" property not found in pdu"))))? - .map_err(|e| err!(Database("Failed to deserialize \"unsigned\" into value: {e}"))) -} diff --git a/src/core/matrix/state_res/mod.rs b/src/core/matrix/state_res/mod.rs index ed5aa034..ce9d9276 100644 --- a/src/core/matrix/state_res/mod.rs +++ b/src/core/matrix/state_res/mod.rs @@ -74,7 +74,7 @@ type Result = crate::Result; /// event is part of the same room. //#[tracing::instrument(level = "debug", skip(state_sets, auth_chain_sets, //#[tracing::instrument(level event_fetch))] -pub async fn resolve<'a, E, Sets, SetIter, Hasher, Fetch, FetchFut, Exists, ExistsFut>( +pub async fn resolve<'a, Pdu, Sets, SetIter, Hasher, Fetch, FetchFut, Exists, ExistsFut>( room_version: &RoomVersionId, state_sets: Sets, auth_chain_sets: &'a [HashSet], @@ -83,14 +83,14 @@ pub async fn resolve<'a, E, Sets, SetIter, Hasher, Fetch, FetchFut, Exists, Exis ) -> Result> where Fetch: Fn(OwnedEventId) -> FetchFut + Sync, - FetchFut: Future> + Send, + FetchFut: Future> + Send, Exists: Fn(OwnedEventId) -> ExistsFut + Sync, ExistsFut: Future + Send, Sets: IntoIterator + Send, SetIter: Iterator> + Clone + Send, Hasher: BuildHasher + Send + Sync, - E: Event + Clone + Send + Sync, - for<'b> &'b E: Event + Send, + Pdu: Event + Clone + Send + Sync, + for<'b> &'b Pdu: Event + Send, { debug!("State resolution starting"); @@ -221,6 +221,7 @@ where let state_sets_iter = state_sets_iter.inspect(|_| state_set_count = state_set_count.saturating_add(1)); + for (k, v) in state_sets_iter.flatten() { occurrences .entry(k) @@ -305,6 +306,7 @@ where let pl = get_power_level_for_sender(&event_id, fetch_event) .await .ok()?; + Some((event_id, pl)) }) .inspect(|(event_id, pl)| { diff --git a/src/core/mods/module.rs b/src/core/mods/module.rs index b65bbca2..bcadf5aa 100644 --- a/src/core/mods/module.rs +++ b/src/core/mods/module.rs @@ -44,6 +44,7 @@ impl Module { .handle .as_ref() .expect("backing library loaded by this instance"); + // SAFETY: Calls dlsym(3) on unix platforms. This might not have to be unsafe // if wrapped in libloading with_dlerror(). let sym = unsafe { handle.get::(cname.as_bytes()) }; diff --git a/src/core/mods/path.rs b/src/core/mods/path.rs index cde251b3..b792890b 100644 --- a/src/core/mods/path.rs +++ b/src/core/mods/path.rs @@ -27,6 +27,7 @@ pub fn to_name(path: &OsStr) -> Result { .expect("path file stem") .to_str() .expect("name string"); + let name = name.strip_prefix("lib").unwrap_or(name).to_owned(); Ok(name) diff --git a/src/core/utils/html.rs b/src/core/utils/html.rs index f2b6d861..eac4c47f 100644 --- a/src/core/utils/html.rs +++ b/src/core/utils/html.rs @@ -23,8 +23,10 @@ impl fmt::Display for Escape<'_> { | '"' => """, | _ => continue, }; + fmt.write_str(&pile_o_bits[last..i])?; fmt.write_str(s)?; + // NOTE: we only expect single byte characters here - which is fine as long as // we only match single byte characters last = i.saturating_add(1); diff --git a/src/core/utils/json.rs b/src/core/utils/json.rs index 3f2f225e..df4ccd13 100644 --- a/src/core/utils/json.rs +++ b/src/core/utils/json.rs @@ -1,4 +1,4 @@ -use std::{fmt, str::FromStr}; +use std::{fmt, marker::PhantomData, str::FromStr}; use ruma::{CanonicalJsonError, CanonicalJsonObject, canonical_json::try_from_json_map}; @@ -11,25 +11,28 @@ use crate::Result; pub fn to_canonical_object( value: T, ) -> Result { + use CanonicalJsonError::SerDe; use serde::ser::Error; - match serde_json::to_value(value).map_err(CanonicalJsonError::SerDe)? { + match serde_json::to_value(value).map_err(SerDe)? { | serde_json::Value::Object(map) => try_from_json_map(map), - | _ => - Err(CanonicalJsonError::SerDe(serde_json::Error::custom("Value must be an object"))), + | _ => Err(SerDe(serde_json::Error::custom("Value must be an object"))), } } -pub fn deserialize_from_str< - 'de, +pub fn deserialize_from_str<'de, D, T, E>(deserializer: D) -> Result +where D: serde::de::Deserializer<'de>, T: FromStr, E: fmt::Display, ->( - deserializer: D, -) -> Result { - struct Visitor, E>(std::marker::PhantomData); - impl, Err: fmt::Display> serde::de::Visitor<'_> for Visitor { +{ + struct Visitor, E>(PhantomData); + + impl serde::de::Visitor<'_> for Visitor + where + T: FromStr, + Err: fmt::Display, + { type Value = T; fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -43,5 +46,6 @@ pub fn deserialize_from_str< v.parse().map_err(serde::de::Error::custom) } } - deserializer.deserialize_str(Visitor(std::marker::PhantomData)) + + deserializer.deserialize_str(Visitor(PhantomData)) } diff --git a/src/core/utils/time.rs b/src/core/utils/time.rs index 73f73971..394e08cb 100644 --- a/src/core/utils/time.rs +++ b/src/core/utils/time.rs @@ -105,14 +105,11 @@ pub fn whole_unit(d: Duration) -> Unit { | 86_400.. => Days(d.as_secs() / 86_400), | 3_600..=86_399 => Hours(d.as_secs() / 3_600), | 60..=3_599 => Mins(d.as_secs() / 60), - | _ => match d.as_micros() { | 1_000_000.. => Secs(d.as_secs()), | 1_000..=999_999 => Millis(d.subsec_millis().into()), - | _ => match d.as_nanos() { | 1_000.. => Micros(d.subsec_micros().into()), - | _ => Nanos(d.subsec_nanos().into()), }, }, diff --git a/src/database/watchers.rs b/src/database/watchers.rs index b3907833..efb939d7 100644 --- a/src/database/watchers.rs +++ b/src/database/watchers.rs @@ -37,7 +37,6 @@ impl Watchers { pub(crate) fn wake(&self, key: &[u8]) { let watchers = self.watchers.read().unwrap(); let mut triggered = Vec::new(); - for length in 0..=key.len() { if watchers.contains_key(&key[..length]) { triggered.push(&key[..length]); diff --git a/src/main/logging.rs b/src/main/logging.rs index aec50bd4..36a8896c 100644 --- a/src/main/logging.rs +++ b/src/main/logging.rs @@ -22,10 +22,12 @@ pub(crate) fn init( let reload_handles = LogLevelReloadHandles::default(); let console_span_events = fmt_span::from_str(&config.log_span_events).unwrap_or_err(); + let console_filter = EnvFilter::builder() .with_regex(config.log_filter_regex) .parse(&config.log) .map_err(|e| err!(Config("log", "{e}.")))?; + let console_layer = fmt::Layer::new() .with_span_events(console_span_events) .event_format(ConsoleFormat::new(config)) @@ -34,6 +36,7 @@ pub(crate) fn init( let (console_reload_filter, console_reload_handle) = reload::Layer::new(console_filter.clone()); + reload_handles.add("console", Box::new(console_reload_handle)); let cap_state = Arc::new(capture::State::new()); @@ -47,8 +50,10 @@ pub(crate) fn init( let subscriber = { let sentry_filter = EnvFilter::try_new(&config.sentry_filter) .map_err(|e| err!(Config("sentry_filter", "{e}.")))?; + let sentry_layer = sentry_tracing::layer(); let (sentry_reload_filter, sentry_reload_handle) = reload::Layer::new(sentry_filter); + reload_handles.add("sentry", Box::new(sentry_reload_handle)); subscriber.with(sentry_layer.with_filter(sentry_reload_filter)) }; @@ -58,12 +63,15 @@ pub(crate) fn init( let (flame_layer, flame_guard) = if config.tracing_flame { let flame_filter = EnvFilter::try_new(&config.tracing_flame_filter) .map_err(|e| err!(Config("tracing_flame_filter", "{e}.")))?; + let (flame_layer, flame_guard) = tracing_flame::FlameLayer::with_file(&config.tracing_flame_output_path) .map_err(|e| err!(Config("tracing_flame_output_path", "{e}.")))?; + let flame_layer = flame_layer .with_empty_samples(false) .with_filter(flame_filter); + (Some(flame_layer), Some(flame_guard)) } else { (None, None) @@ -71,19 +79,24 @@ pub(crate) fn init( let jaeger_filter = EnvFilter::try_new(&config.jaeger_filter) .map_err(|e| err!(Config("jaeger_filter", "{e}.")))?; + let jaeger_layer = config.allow_jaeger.then(|| { opentelemetry::global::set_text_map_propagator( opentelemetry_jaeger::Propagator::new(), ); + let tracer = opentelemetry_jaeger::new_agent_pipeline() .with_auto_split_batch(true) .with_service_name(conduwuit_core::name()) .install_batch(opentelemetry_sdk::runtime::Tokio) .expect("jaeger agent pipeline"); + let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); + let (jaeger_reload_filter, jaeger_reload_handle) = reload::Layer::new(jaeger_filter.clone()); reload_handles.add("jaeger", Box::new(jaeger_reload_handle)); + Some(telemetry.with_filter(jaeger_reload_filter)) }); diff --git a/src/main/mods.rs b/src/main/mods.rs index d585a381..6140cc6e 100644 --- a/src/main/mods.rs +++ b/src/main/mods.rs @@ -51,7 +51,9 @@ pub(crate) async fn run(server: &Arc, starts: bool) -> Result<(bool, boo }, }; } + server.server.stopping.store(false, Ordering::Release); + let run = main_mod.get::("run")?; if let Err(error) = run(server .services @@ -64,7 +66,9 @@ pub(crate) async fn run(server: &Arc, starts: bool) -> Result<(bool, boo error!("Running server: {error}"); return Err(error); } + let reloads = server.server.reloading.swap(false, Ordering::AcqRel); + let stops = !reloads || stale(server).await? <= restart_thresh(); let starts = reloads && stops; if stops { diff --git a/src/main/sentry.rs b/src/main/sentry.rs index 68f12eb7..2a09f415 100644 --- a/src/main/sentry.rs +++ b/src/main/sentry.rs @@ -35,11 +35,13 @@ fn options(config: &Config) -> ClientOptions { .expect("init_sentry should only be called if sentry is enabled and this is not None") .as_str(); + let server_name = config + .sentry_send_server_name + .then(|| config.server_name.to_string().into()); + ClientOptions { dsn: Some(Dsn::from_str(dsn).expect("sentry_endpoint must be a valid URL")), - server_name: config - .sentry_send_server_name - .then(|| config.server_name.to_string().into()), + server_name, traces_sample_rate: config.sentry_traces_sample_rate, debug: cfg!(debug_assertions), release: sentry::release_name!(), diff --git a/src/router/request.rs b/src/router/request.rs index dba90324..3bbeae03 100644 --- a/src/router/request.rs +++ b/src/router/request.rs @@ -98,8 +98,8 @@ async fn execute( fn handle_result(method: &Method, uri: &Uri, result: Response) -> Result { let status = result.status(); - let reason = status.canonical_reason().unwrap_or("Unknown Reason"); let code = status.as_u16(); + let reason = status.canonical_reason().unwrap_or("Unknown Reason"); if status.is_server_error() { error!(method = ?method, uri = ?uri, "{code} {reason}"); diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index 66c373ec..d971ce95 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -305,13 +305,13 @@ impl Service { return Ok(()); }; - let response_sender = if self.is_admin_room(&pdu.room_id).await { + let response_sender = if self.is_admin_room(pdu.room_id()).await { &self.services.globals.server_user } else { - &pdu.sender + pdu.sender() }; - self.respond_to_room(content, &pdu.room_id, response_sender) + self.respond_to_room(content, pdu.room_id(), response_sender) .boxed() .await } diff --git a/src/service/pusher/mod.rs b/src/service/pusher/mod.rs index 192ef447..baa7a72e 100644 --- a/src/service/pusher/mod.rs +++ b/src/service/pusher/mod.rs @@ -293,11 +293,7 @@ impl Service { .state_accessor .room_state_get(event.room_id(), &StateEventType::RoomPowerLevels, "") .await - .and_then(|ev| { - serde_json::from_str(ev.content.get()).map_err(|e| { - err!(Database(error!("invalid m.room.power_levels event: {e:?}"))) - }) - }) + .and_then(|event| event.get_content()) .unwrap_or_default(); let serialized = event.to_format(); diff --git a/src/service/rooms/alias/mod.rs b/src/service/rooms/alias/mod.rs index 866e45a9..7675efd4 100644 --- a/src/service/rooms/alias/mod.rs +++ b/src/service/rooms/alias/mod.rs @@ -3,7 +3,7 @@ mod remote; use std::sync::Arc; use conduwuit::{ - Err, Result, Server, err, + Err, Event, Result, Server, err, utils::{ReadyExt, stream::TryIgnore}, }; use database::{Deserialized, Ignore, Interfix, Map}; @@ -241,7 +241,7 @@ impl Service { .room_state_get(&room_id, &StateEventType::RoomCreate, "") .await { - return Ok(event.sender == user_id); + return Ok(event.sender() == user_id); } Err!(Database("Room has no m.room.create event")) diff --git a/src/service/rooms/event_handler/fetch_and_handle_outliers.rs b/src/service/rooms/event_handler/fetch_and_handle_outliers.rs index b0a7d827..44027e04 100644 --- a/src/service/rooms/event_handler/fetch_and_handle_outliers.rs +++ b/src/service/rooms/event_handler/fetch_and_handle_outliers.rs @@ -4,11 +4,13 @@ use std::{ }; use conduwuit::{ - PduEvent, debug, debug_error, debug_warn, implement, pdu, trace, - utils::continue_exponential_backoff_secs, warn, + Event, PduEvent, debug, debug_error, debug_warn, implement, + matrix::event::gen_event_id_canonical_json, trace, utils::continue_exponential_backoff_secs, + warn, }; use ruma::{ - CanonicalJsonValue, OwnedEventId, RoomId, ServerName, api::federation::event::get_event, + CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName, + api::federation::event::get_event, }; use super::get_room_version_id; @@ -23,13 +25,17 @@ use super::get_room_version_id; /// c. Ask origin server over federation /// d. TODO: Ask other servers over federation? #[implement(super::Service)] -pub(super) async fn fetch_and_handle_outliers<'a>( +pub(super) async fn fetch_and_handle_outliers<'a, Pdu, Events>( &self, origin: &'a ServerName, - events: &'a [OwnedEventId], - create_event: &'a PduEvent, + events: Events, + create_event: &'a Pdu, room_id: &'a RoomId, -) -> Vec<(PduEvent, Option>)> { +) -> Vec<(PduEvent, Option>)> +where + Pdu: Event + Send + Sync, + Events: Iterator + Clone + Send, +{ let back_off = |id| match self .services .globals @@ -46,22 +52,23 @@ pub(super) async fn fetch_and_handle_outliers<'a>( }, }; - let mut events_with_auth_events = Vec::with_capacity(events.len()); + let mut events_with_auth_events = Vec::with_capacity(events.clone().count()); + for id in events { // a. Look in the main timeline (pduid_pdu tree) // b. Look at outlier pdu tree // (get_pdu_json checks both) if let Ok(local_pdu) = self.services.timeline.get_pdu(id).await { - trace!("Found {id} in db"); - events_with_auth_events.push((id, Some(local_pdu), vec![])); + events_with_auth_events.push((id.to_owned(), Some(local_pdu), vec![])); continue; } // c. Ask origin server over federation // We also handle its auth chain here so we don't get a stack overflow in // handle_outlier_pdu. - let mut todo_auth_events: VecDeque<_> = [id.clone()].into(); + let mut todo_auth_events: VecDeque<_> = [id.to_owned()].into(); let mut events_in_reverse_order = Vec::with_capacity(todo_auth_events.len()); + let mut events_all = HashSet::with_capacity(todo_auth_events.len()); while let Some(next_id) = todo_auth_events.pop_front() { if let Some((time, tries)) = self @@ -117,7 +124,7 @@ pub(super) async fn fetch_and_handle_outliers<'a>( }; let Ok((calculated_event_id, value)) = - pdu::gen_event_id_canonical_json(&res.pdu, &room_version_id) + gen_event_id_canonical_json(&res.pdu, &room_version_id) else { back_off((*next_id).to_owned()); continue; @@ -160,7 +167,8 @@ pub(super) async fn fetch_and_handle_outliers<'a>( }, } } - events_with_auth_events.push((id, None, events_in_reverse_order)); + + events_with_auth_events.push((id.to_owned(), None, events_in_reverse_order)); } let mut pdus = Vec::with_capacity(events_with_auth_events.len()); @@ -217,5 +225,6 @@ pub(super) async fn fetch_and_handle_outliers<'a>( } } } + pdus } diff --git a/src/service/rooms/event_handler/fetch_prev.rs b/src/service/rooms/event_handler/fetch_prev.rs index 0f92d6e6..efc7a434 100644 --- a/src/service/rooms/event_handler/fetch_prev.rs +++ b/src/service/rooms/event_handler/fetch_prev.rs @@ -1,13 +1,16 @@ -use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; +use std::{ + collections::{BTreeMap, HashMap, HashSet, VecDeque}, + iter::once, +}; use conduwuit::{ - PduEvent, Result, debug_warn, err, implement, + Event, PduEvent, Result, debug_warn, err, implement, state_res::{self}, }; use futures::{FutureExt, future}; use ruma::{ - CanonicalJsonValue, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, ServerName, UInt, int, - uint, + CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, ServerName, + int, uint, }; use super::check_room_id; @@ -19,20 +22,26 @@ use super::check_room_id; fields(%origin), )] #[allow(clippy::type_complexity)] -pub(super) async fn fetch_prev( +pub(super) async fn fetch_prev<'a, Pdu, Events>( &self, origin: &ServerName, - create_event: &PduEvent, + create_event: &Pdu, room_id: &RoomId, - first_ts_in_room: UInt, - initial_set: Vec, + first_ts_in_room: MilliSecondsSinceUnixEpoch, + initial_set: Events, ) -> Result<( Vec, HashMap)>, -)> { - let mut graph: HashMap = HashMap::with_capacity(initial_set.len()); +)> +where + Pdu: Event + Send + Sync, + Events: Iterator + Clone + Send, +{ + let num_ids = initial_set.clone().count(); let mut eventid_info = HashMap::new(); - let mut todo_outlier_stack: VecDeque = initial_set.into(); + let mut graph: HashMap = HashMap::with_capacity(num_ids); + let mut todo_outlier_stack: VecDeque = + initial_set.map(ToOwned::to_owned).collect(); let mut amount = 0; @@ -40,7 +49,12 @@ pub(super) async fn fetch_prev( self.services.server.check_running()?; match self - .fetch_and_handle_outliers(origin, &[prev_event_id.clone()], create_event, room_id) + .fetch_and_handle_outliers( + origin, + once(prev_event_id.as_ref()), + create_event, + room_id, + ) .boxed() .await .pop() @@ -65,17 +79,17 @@ pub(super) async fn fetch_prev( } if let Some(json) = json_opt { - if pdu.origin_server_ts > first_ts_in_room { + if pdu.origin_server_ts() > first_ts_in_room { amount = amount.saturating_add(1); - for prev_prev in &pdu.prev_events { + for prev_prev in pdu.prev_events() { if !graph.contains_key(prev_prev) { - todo_outlier_stack.push_back(prev_prev.clone()); + todo_outlier_stack.push_back(prev_prev.to_owned()); } } graph.insert( prev_event_id.clone(), - pdu.prev_events.iter().cloned().collect(), + pdu.prev_events().map(ToOwned::to_owned).collect(), ); } else { // Time based check failed @@ -98,8 +112,7 @@ pub(super) async fn fetch_prev( let event_fetch = |event_id| { let origin_server_ts = eventid_info .get(&event_id) - .cloned() - .map_or_else(|| uint!(0), |info| info.0.origin_server_ts); + .map_or_else(|| uint!(0), |info| info.0.origin_server_ts().get()); // This return value is the key used for sorting events, // events are then sorted by power level, time, diff --git a/src/service/rooms/event_handler/fetch_state.rs b/src/service/rooms/event_handler/fetch_state.rs index 0f9e093b..d68a3542 100644 --- a/src/service/rooms/event_handler/fetch_state.rs +++ b/src/service/rooms/event_handler/fetch_state.rs @@ -1,6 +1,6 @@ use std::collections::{HashMap, hash_map}; -use conduwuit::{Err, Error, PduEvent, Result, debug, debug_warn, implement}; +use conduwuit::{Err, Event, Result, debug, debug_warn, err, implement}; use futures::FutureExt; use ruma::{ EventId, OwnedEventId, RoomId, ServerName, api::federation::event::get_room_state_ids, @@ -18,13 +18,16 @@ use crate::rooms::short::ShortStateKey; skip_all, fields(%origin), )] -pub(super) async fn fetch_state( +pub(super) async fn fetch_state( &self, origin: &ServerName, - create_event: &PduEvent, + create_event: &Pdu, room_id: &RoomId, event_id: &EventId, -) -> Result>> { +) -> Result>> +where + Pdu: Event + Send + Sync, +{ let res = self .services .sending @@ -36,27 +39,27 @@ pub(super) async fn fetch_state( .inspect_err(|e| debug_warn!("Fetching state for event failed: {e}"))?; debug!("Fetching state events"); + let state_ids = res.pdu_ids.iter().map(AsRef::as_ref); let state_vec = self - .fetch_and_handle_outliers(origin, &res.pdu_ids, create_event, room_id) + .fetch_and_handle_outliers(origin, state_ids, create_event, room_id) .boxed() .await; let mut state: HashMap = HashMap::with_capacity(state_vec.len()); for (pdu, _) in state_vec { let state_key = pdu - .state_key - .clone() - .ok_or_else(|| Error::bad_database("Found non-state pdu in state events."))?; + .state_key() + .ok_or_else(|| err!(Database("Found non-state pdu in state events.")))?; let shortstatekey = self .services .short - .get_or_create_shortstatekey(&pdu.kind.to_string().into(), &state_key) + .get_or_create_shortstatekey(&pdu.kind().to_string().into(), state_key) .await; match state.entry(shortstatekey) { | hash_map::Entry::Vacant(v) => { - v.insert(pdu.event_id.clone()); + v.insert(pdu.event_id().to_owned()); }, | hash_map::Entry::Occupied(_) => { return Err!(Database( @@ -73,7 +76,7 @@ pub(super) async fn fetch_state( .get_shortstatekey(&StateEventType::RoomCreate, "") .await?; - if state.get(&create_shortstatekey) != Some(&create_event.event_id) { + if state.get(&create_shortstatekey).map(AsRef::as_ref) != Some(create_event.event_id()) { return Err!(Database("Incoming event refers to wrong create event.")); } diff --git a/src/service/rooms/event_handler/handle_incoming_pdu.rs b/src/service/rooms/event_handler/handle_incoming_pdu.rs index 77cae41d..86a05e0a 100644 --- a/src/service/rooms/event_handler/handle_incoming_pdu.rs +++ b/src/service/rooms/event_handler/handle_incoming_pdu.rs @@ -4,7 +4,7 @@ use std::{ }; use conduwuit::{ - Err, Result, debug, debug::INFO_SPAN_LEVEL, defer, err, implement, utils::stream::IterStream, + Err, Event, Result, debug::INFO_SPAN_LEVEL, defer, err, implement, utils::stream::IterStream, warn, }; use futures::{ @@ -12,6 +12,7 @@ use futures::{ future::{OptionFuture, try_join5}, }; use ruma::{CanonicalJsonValue, EventId, RoomId, ServerName, UserId, events::StateEventType}; +use tracing::debug; use crate::rooms::timeline::RawPduId; @@ -121,22 +122,16 @@ pub async fn handle_incoming_pdu<'a>( .timeline .first_pdu_in_room(room_id) .await? - .origin_server_ts; + .origin_server_ts(); - if incoming_pdu.origin_server_ts < first_ts_in_room { + if incoming_pdu.origin_server_ts() < first_ts_in_room { return Ok(None); } // 9. Fetch any missing prev events doing all checks listed here starting at 1. // These are timeline events let (sorted_prev_events, mut eventid_info) = self - .fetch_prev( - origin, - create_event, - room_id, - first_ts_in_room, - incoming_pdu.prev_events.clone(), - ) + .fetch_prev(origin, create_event, room_id, first_ts_in_room, incoming_pdu.prev_events()) .await?; debug!( diff --git a/src/service/rooms/event_handler/handle_outlier_pdu.rs b/src/service/rooms/event_handler/handle_outlier_pdu.rs index 5cc6be55..d79eed77 100644 --- a/src/service/rooms/event_handler/handle_outlier_pdu.rs +++ b/src/service/rooms/event_handler/handle_outlier_pdu.rs @@ -1,7 +1,7 @@ use std::collections::{BTreeMap, HashMap, hash_map}; use conduwuit::{ - Err, PduEvent, Result, debug, debug_info, err, implement, state_res, trace, warn, + Err, Event, PduEvent, Result, debug, debug_info, err, implement, state_res, trace, warn, }; use futures::future::ready; use ruma::{ @@ -12,15 +12,18 @@ use super::{check_room_id, get_room_version_id, to_room_version}; #[implement(super::Service)] #[allow(clippy::too_many_arguments)] -pub(super) async fn handle_outlier_pdu<'a>( +pub(super) async fn handle_outlier_pdu<'a, Pdu>( &self, origin: &'a ServerName, - create_event: &'a PduEvent, + create_event: &'a Pdu, event_id: &'a EventId, room_id: &'a RoomId, mut value: CanonicalJsonObject, auth_events_known: bool, -) -> Result<(PduEvent, BTreeMap)> { +) -> Result<(PduEvent, BTreeMap)> +where + Pdu: Event + Send + Sync, +{ // 1. Remove unsigned field value.remove("unsigned"); @@ -29,7 +32,7 @@ pub(super) async fn handle_outlier_pdu<'a>( // 2. Check signatures, otherwise drop // 3. check content hash, redact if doesn't match let room_version_id = get_room_version_id(create_event)?; - let mut val = match self + let mut incoming_pdu = match self .services .server_keys .verify_event(&value, Some(&room_version_id)) @@ -61,13 +64,15 @@ pub(super) async fn handle_outlier_pdu<'a>( // Now that we have checked the signature and hashes we can add the eventID and // convert to our PduEvent type - val.insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.as_str().to_owned())); - let incoming_pdu = serde_json::from_value::( - serde_json::to_value(&val).expect("CanonicalJsonObj is a valid JsonValue"), + incoming_pdu + .insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.as_str().to_owned())); + + let pdu_event = serde_json::from_value::( + serde_json::to_value(&incoming_pdu).expect("CanonicalJsonObj is a valid JsonValue"), ) .map_err(|e| err!(Request(BadJson(debug_warn!("Event is not a valid PDU: {e}")))))?; - check_room_id(room_id, &incoming_pdu)?; + check_room_id(room_id, &pdu_event)?; if !auth_events_known { // 4. fetch any missing auth events doing all checks listed here starting at 1. @@ -78,7 +83,7 @@ pub(super) async fn handle_outlier_pdu<'a>( debug!("Fetching auth events"); Box::pin(self.fetch_and_handle_outliers( origin, - &incoming_pdu.auth_events, + pdu_event.auth_events(), create_event, room_id, )) @@ -89,8 +94,8 @@ pub(super) async fn handle_outlier_pdu<'a>( // auth events debug!("Checking based on auth events"); // Build map of auth events - let mut auth_events = HashMap::with_capacity(incoming_pdu.auth_events.len()); - for id in &incoming_pdu.auth_events { + let mut auth_events = HashMap::with_capacity(pdu_event.auth_events().count()); + for id in pdu_event.auth_events() { let Ok(auth_event) = self.services.timeline.get_pdu(id).await else { warn!("Could not find auth event {id}"); continue; @@ -131,7 +136,7 @@ pub(super) async fn handle_outlier_pdu<'a>( let auth_check = state_res::event_auth::auth_check( &to_room_version(&room_version_id), - &incoming_pdu, + &pdu_event, None, // TODO: third party invite state_fetch, ) @@ -147,9 +152,9 @@ pub(super) async fn handle_outlier_pdu<'a>( // 7. Persist the event as an outlier. self.services .outlier - .add_pdu_outlier(&incoming_pdu.event_id, &val); + .add_pdu_outlier(pdu_event.event_id(), &incoming_pdu); trace!("Added pdu as outlier."); - Ok((incoming_pdu, val)) + Ok((pdu_event, incoming_pdu)) } diff --git a/src/service/rooms/event_handler/handle_prev_pdu.rs b/src/service/rooms/event_handler/handle_prev_pdu.rs index d612b2bf..cd46310a 100644 --- a/src/service/rooms/event_handler/handle_prev_pdu.rs +++ b/src/service/rooms/event_handler/handle_prev_pdu.rs @@ -1,10 +1,11 @@ use std::{collections::BTreeMap, time::Instant}; use conduwuit::{ - Err, PduEvent, Result, debug, debug::INFO_SPAN_LEVEL, defer, implement, + Err, Event, PduEvent, Result, debug::INFO_SPAN_LEVEL, defer, implement, utils::continue_exponential_backoff_secs, }; -use ruma::{CanonicalJsonValue, EventId, RoomId, ServerName, UInt}; +use ruma::{CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, RoomId, ServerName}; +use tracing::debug; #[implement(super::Service)] #[allow(clippy::type_complexity)] @@ -15,16 +16,19 @@ use ruma::{CanonicalJsonValue, EventId, RoomId, ServerName, UInt}; skip_all, fields(%prev_id), )] -pub(super) async fn handle_prev_pdu<'a>( +pub(super) async fn handle_prev_pdu<'a, Pdu>( &self, origin: &'a ServerName, event_id: &'a EventId, room_id: &'a RoomId, eventid_info: Option<(PduEvent, BTreeMap)>, - create_event: &'a PduEvent, - first_ts_in_room: UInt, + create_event: &'a Pdu, + first_ts_in_room: MilliSecondsSinceUnixEpoch, prev_id: &'a EventId, -) -> Result { +) -> Result +where + Pdu: Event + Send + Sync, +{ // Check for disabled again because it might have changed if self.services.metadata.is_disabled(room_id).await { return Err!(Request(Forbidden(debug_warn!( @@ -59,7 +63,7 @@ pub(super) async fn handle_prev_pdu<'a>( }; // Skip old events - if pdu.origin_server_ts < first_ts_in_room { + if pdu.origin_server_ts() < first_ts_in_room { return Ok(()); } diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs index 45675da8..aed38e1e 100644 --- a/src/service/rooms/event_handler/mod.rs +++ b/src/service/rooms/event_handler/mod.rs @@ -18,7 +18,7 @@ use std::{ }; use async_trait::async_trait; -use conduwuit::{Err, PduEvent, Result, RoomVersion, Server, utils::MutexMap}; +use conduwuit::{Err, Event, PduEvent, Result, RoomVersion, Server, utils::MutexMap}; use ruma::{ OwnedEventId, OwnedRoomId, RoomId, RoomVersionId, events::room::create::RoomCreateEventContent, @@ -104,11 +104,11 @@ impl Service { } } -fn check_room_id(room_id: &RoomId, pdu: &PduEvent) -> Result { - if pdu.room_id != room_id { +fn check_room_id(room_id: &RoomId, pdu: &Pdu) -> Result { + if pdu.room_id() != room_id { return Err!(Request(InvalidParam(error!( - pdu_event_id = ?pdu.event_id, - pdu_room_id = ?pdu.room_id, + pdu_event_id = ?pdu.event_id(), + pdu_room_id = ?pdu.room_id(), ?room_id, "Found event from room in room", )))); @@ -117,7 +117,7 @@ fn check_room_id(room_id: &RoomId, pdu: &PduEvent) -> Result { Ok(()) } -fn get_room_version_id(create_event: &PduEvent) -> Result { +fn get_room_version_id(create_event: &Pdu) -> Result { let content: RoomCreateEventContent = create_event.get_content()?; let room_version = content.room_version; diff --git a/src/service/rooms/event_handler/parse_incoming_pdu.rs b/src/service/rooms/event_handler/parse_incoming_pdu.rs index a49fc541..65cf1752 100644 --- a/src/service/rooms/event_handler/parse_incoming_pdu.rs +++ b/src/service/rooms/event_handler/parse_incoming_pdu.rs @@ -1,4 +1,6 @@ -use conduwuit::{Result, err, implement, pdu::gen_event_id_canonical_json, result::FlatOk}; +use conduwuit::{ + Result, err, implement, matrix::event::gen_event_id_canonical_json, result::FlatOk, +}; use ruma::{CanonicalJsonObject, CanonicalJsonValue, OwnedEventId, OwnedRoomId}; use serde_json::value::RawValue as RawJsonValue; diff --git a/src/service/rooms/event_handler/state_at_incoming.rs b/src/service/rooms/event_handler/state_at_incoming.rs index eb38c2c3..d3bb8f79 100644 --- a/src/service/rooms/event_handler/state_at_incoming.rs +++ b/src/service/rooms/event_handler/state_at_incoming.rs @@ -6,7 +6,7 @@ use std::{ use conduwuit::{ Result, debug, err, implement, - matrix::{PduEvent, StateMap}, + matrix::{Event, StateMap}, trace, utils::stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, TryWidebandExt}, }; @@ -19,11 +19,18 @@ use crate::rooms::short::ShortStateHash; #[implement(super::Service)] // request and build the state from a known point and resolve if > 1 prev_event #[tracing::instrument(name = "state", level = "debug", skip_all)] -pub(super) async fn state_at_incoming_degree_one( +pub(super) async fn state_at_incoming_degree_one( &self, - incoming_pdu: &PduEvent, -) -> Result>> { - let prev_event = &incoming_pdu.prev_events[0]; + incoming_pdu: &Pdu, +) -> Result>> +where + Pdu: Event + Send + Sync, +{ + let prev_event = incoming_pdu + .prev_events() + .next() + .expect("at least one prev_event"); + let Ok(prev_event_sstatehash) = self .services .state_accessor @@ -55,7 +62,7 @@ pub(super) async fn state_at_incoming_degree_one( .get_or_create_shortstatekey(&prev_pdu.kind.to_string().into(), state_key) .await; - state.insert(shortstatekey, prev_event.clone()); + state.insert(shortstatekey, prev_event.to_owned()); // Now it's the state after the pdu } @@ -66,16 +73,18 @@ pub(super) async fn state_at_incoming_degree_one( #[implement(super::Service)] #[tracing::instrument(name = "state", level = "debug", skip_all)] -pub(super) async fn state_at_incoming_resolved( +pub(super) async fn state_at_incoming_resolved( &self, - incoming_pdu: &PduEvent, + incoming_pdu: &Pdu, room_id: &RoomId, room_version_id: &RoomVersionId, -) -> Result>> { +) -> Result>> +where + Pdu: Event + Send + Sync, +{ trace!("Calculating extremity statehashes..."); let Ok(extremity_sstatehashes) = incoming_pdu - .prev_events - .iter() + .prev_events() .try_stream() .broad_and_then(|prev_eventid| { self.services @@ -133,12 +142,15 @@ pub(super) async fn state_at_incoming_resolved( } #[implement(super::Service)] -async fn state_at_incoming_fork( +async fn state_at_incoming_fork( &self, room_id: &RoomId, sstatehash: ShortStateHash, - prev_event: PduEvent, -) -> Result<(StateMap, HashSet)> { + prev_event: Pdu, +) -> Result<(StateMap, HashSet)> +where + Pdu: Event, +{ let mut leaf_state: HashMap<_, _> = self .services .state_accessor @@ -146,15 +158,15 @@ async fn state_at_incoming_fork( .collect() .await; - if let Some(state_key) = &prev_event.state_key { + if let Some(state_key) = prev_event.state_key() { let shortstatekey = self .services .short - .get_or_create_shortstatekey(&prev_event.kind.to_string().into(), state_key) + .get_or_create_shortstatekey(&prev_event.kind().to_string().into(), state_key) .await; - let event_id = &prev_event.event_id; - leaf_state.insert(shortstatekey, event_id.clone()); + let event_id = prev_event.event_id(); + leaf_state.insert(shortstatekey, event_id.to_owned()); // Now it's the state after the pdu } diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index 00b18c06..4093cb05 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -1,7 +1,7 @@ use std::{borrow::Borrow, collections::BTreeMap, iter::once, sync::Arc, time::Instant}; use conduwuit::{ - Err, Result, debug, debug_info, err, implement, + Err, Result, debug, debug_info, err, implement, is_equal_to, matrix::{Event, EventTypeExt, PduEvent, StateKey, state_res}, trace, utils::stream::{BroadbandExt, ReadyExt}, @@ -17,19 +17,22 @@ use crate::rooms::{ }; #[implement(super::Service)] -pub(super) async fn upgrade_outlier_to_timeline_pdu( +pub(super) async fn upgrade_outlier_to_timeline_pdu( &self, incoming_pdu: PduEvent, val: BTreeMap, - create_event: &PduEvent, + create_event: &Pdu, origin: &ServerName, room_id: &RoomId, -) -> Result> { +) -> Result> +where + Pdu: Event + Send + Sync, +{ // Skip the PDU if we already have it as a timeline event if let Ok(pduid) = self .services .timeline - .get_pdu_id(&incoming_pdu.event_id) + .get_pdu_id(incoming_pdu.event_id()) .await { return Ok(Some(pduid)); @@ -38,7 +41,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( if self .services .pdu_metadata - .is_event_soft_failed(&incoming_pdu.event_id) + .is_event_soft_failed(incoming_pdu.event_id()) .await { return Err!(Request(InvalidParam("Event has been soft failed"))); @@ -53,7 +56,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( // These are not timeline events. debug!("Resolving state at event"); - let mut state_at_incoming_event = if incoming_pdu.prev_events.len() == 1 { + let mut state_at_incoming_event = if incoming_pdu.prev_events().count() == 1 { self.state_at_incoming_degree_one(&incoming_pdu).await? } else { self.state_at_incoming_resolved(&incoming_pdu, room_id, &room_version_id) @@ -62,12 +65,13 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( if state_at_incoming_event.is_none() { state_at_incoming_event = self - .fetch_state(origin, create_event, room_id, &incoming_pdu.event_id) + .fetch_state(origin, create_event, room_id, incoming_pdu.event_id()) .await?; } let state_at_incoming_event = state_at_incoming_event.expect("we always set this to some above"); + let room_version = to_room_version(&room_version_id); debug!("Performing auth check"); @@ -99,10 +103,10 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( .state .get_auth_events( room_id, - &incoming_pdu.kind, - &incoming_pdu.sender, - incoming_pdu.state_key.as_deref(), - &incoming_pdu.content, + incoming_pdu.kind(), + incoming_pdu.sender(), + incoming_pdu.state_key(), + incoming_pdu.content(), ) .await?; @@ -129,7 +133,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( !self .services .state_accessor - .user_can_redact(&redact_id, &incoming_pdu.sender, &incoming_pdu.room_id, true) + .user_can_redact(&redact_id, incoming_pdu.sender(), incoming_pdu.room_id(), true) .await?, }; @@ -149,7 +153,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( .map(ToOwned::to_owned) .ready_filter(|event_id| { // Remove any that are referenced by this incoming event's prev_events - !incoming_pdu.prev_events.contains(event_id) + !incoming_pdu.prev_events().any(is_equal_to!(event_id)) }) .broad_filter_map(|event_id| async move { // Only keep those extremities were not referenced yet @@ -166,7 +170,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( debug!( "Retained {} extremities checked against {} prev_events", extremities.len(), - incoming_pdu.prev_events.len() + incoming_pdu.prev_events().count() ); let state_ids_compressed: Arc = self @@ -181,20 +185,20 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( .map(Arc::new) .await; - if incoming_pdu.state_key.is_some() { + if incoming_pdu.state_key().is_some() { debug!("Event is a state-event. Deriving new room state"); // We also add state after incoming event to the fork states let mut state_after = state_at_incoming_event.clone(); - if let Some(state_key) = &incoming_pdu.state_key { + if let Some(state_key) = incoming_pdu.state_key() { let shortstatekey = self .services .short - .get_or_create_shortstatekey(&incoming_pdu.kind.to_string().into(), state_key) + .get_or_create_shortstatekey(&incoming_pdu.kind().to_string().into(), state_key) .await; - let event_id = &incoming_pdu.event_id; - state_after.insert(shortstatekey, event_id.clone()); + let event_id = incoming_pdu.event_id(); + state_after.insert(shortstatekey, event_id.to_owned()); } let new_room_state = self @@ -236,9 +240,9 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( // Soft fail, we keep the event as an outlier but don't add it to the timeline self.services .pdu_metadata - .mark_event_soft_failed(&incoming_pdu.event_id); + .mark_event_soft_failed(incoming_pdu.event_id()); - warn!("Event was soft failed: {incoming_pdu:?}"); + warn!("Event was soft failed: {:?}", incoming_pdu.event_id()); return Err!(Request(InvalidParam("Event has been soft failed"))); } @@ -249,7 +253,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( let extremities = extremities .iter() .map(Borrow::borrow) - .chain(once(incoming_pdu.event_id.borrow())); + .chain(once(incoming_pdu.event_id())); let pdu_id = self .services diff --git a/src/service/rooms/outlier/mod.rs b/src/service/rooms/outlier/mod.rs index 12b56935..6ab2c026 100644 --- a/src/service/rooms/outlier/mod.rs +++ b/src/service/rooms/outlier/mod.rs @@ -1,7 +1,7 @@ use std::sync::Arc; -use conduwuit::{Result, implement, matrix::pdu::PduEvent}; -use conduwuit_database::{Deserialized, Json, Map}; +use conduwuit::{Result, implement, matrix::PduEvent}; +use database::{Deserialized, Json, Map}; use ruma::{CanonicalJsonObject, EventId}; pub struct Service { diff --git a/src/service/rooms/pdu_metadata/data.rs b/src/service/rooms/pdu_metadata/data.rs index f0beab5a..c1376cb0 100644 --- a/src/service/rooms/pdu_metadata/data.rs +++ b/src/service/rooms/pdu_metadata/data.rs @@ -1,8 +1,8 @@ use std::{mem::size_of, sync::Arc}; use conduwuit::{ - PduCount, PduEvent, arrayvec::ArrayVec, + matrix::{Event, PduCount}, result::LogErr, utils::{ ReadyExt, @@ -33,8 +33,6 @@ struct Services { timeline: Dep, } -pub(super) type PdusIterItem = (PduCount, PduEvent); - impl Data { pub(super) fn new(args: &crate::Args<'_>) -> Self { let db = &args.db; @@ -62,7 +60,7 @@ impl Data { target: ShortEventId, from: PduCount, dir: Direction, - ) -> impl Stream + Send + '_ { + ) -> impl Stream + Send + '_ { let mut current = ArrayVec::::new(); current.extend(target.to_be_bytes()); current.extend(from.saturating_inc(dir).into_unsigned().to_be_bytes()); @@ -80,8 +78,8 @@ impl Data { let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?; - if pdu.sender != user_id { - pdu.remove_transaction_id().log_err().ok(); + if pdu.sender() != user_id { + pdu.as_mut_pdu().remove_transaction_id().log_err().ok(); } Some((shorteventid, pdu)) diff --git a/src/service/rooms/pdu_metadata/mod.rs b/src/service/rooms/pdu_metadata/mod.rs index 18221c2d..c8e863fa 100644 --- a/src/service/rooms/pdu_metadata/mod.rs +++ b/src/service/rooms/pdu_metadata/mod.rs @@ -1,11 +1,14 @@ mod data; use std::sync::Arc; -use conduwuit::{PduCount, Result}; +use conduwuit::{ + Result, + matrix::{Event, PduCount}, +}; use futures::{StreamExt, future::try_join}; use ruma::{EventId, RoomId, UserId, api::Direction}; -use self::data::{Data, PdusIterItem}; +use self::data::Data; use crate::{Dep, rooms}; pub struct Service { @@ -44,16 +47,16 @@ impl Service { } #[allow(clippy::too_many_arguments)] - pub async fn get_relations( - &self, - user_id: &UserId, - room_id: &RoomId, - target: &EventId, + pub async fn get_relations<'a>( + &'a self, + user_id: &'a UserId, + room_id: &'a RoomId, + target: &'a EventId, from: PduCount, limit: usize, max_depth: u8, dir: Direction, - ) -> Vec { + ) -> Vec<(PduCount, impl Event)> { let room_id = self.services.short.get_shortroomid(room_id); let target = self.services.timeline.get_pdu_count(target); diff --git a/src/service/rooms/read_receipt/mod.rs b/src/service/rooms/read_receipt/mod.rs index 69e859c4..68ce9b7f 100644 --- a/src/service/rooms/read_receipt/mod.rs +++ b/src/service/rooms/read_receipt/mod.rs @@ -4,7 +4,10 @@ use std::{collections::BTreeMap, sync::Arc}; use conduwuit::{ Result, debug, err, - matrix::pdu::{PduCount, PduId, RawPduId}, + matrix::{ + Event, + pdu::{PduCount, PduId, RawPduId}, + }, warn, }; use futures::{Stream, TryFutureExt, try_join}; @@ -74,14 +77,13 @@ impl Service { let shortroomid = self.services.short.get_shortroomid(room_id).map_err(|e| { err!(Database(warn!("Short room ID does not exist in database for {room_id}: {e}"))) }); - let (pdu_count, shortroomid) = try_join!(pdu_count, shortroomid)?; + let (pdu_count, shortroomid) = try_join!(pdu_count, shortroomid)?; let shorteventid = PduCount::Normal(pdu_count); let pdu_id: RawPduId = PduId { shortroomid, shorteventid }.into(); - let pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await?; - let event_id: OwnedEventId = pdu.event_id; + let event_id: OwnedEventId = pdu.event_id().to_owned(); let user_id: OwnedUserId = user_id.to_owned(); let content: BTreeMap = BTreeMap::from_iter([( event_id, diff --git a/src/service/rooms/search/mod.rs b/src/service/rooms/search/mod.rs index b9d067a6..afe3061b 100644 --- a/src/service/rooms/search/mod.rs +++ b/src/service/rooms/search/mod.rs @@ -1,9 +1,10 @@ use std::sync::Arc; -use conduwuit_core::{ - Event, PduCount, PduEvent, Result, +use conduwuit::{ + PduCount, Result, arrayvec::ArrayVec, implement, + matrix::event::{Event, Matches}, utils::{ ArrayVecExt, IterStream, ReadyExt, set, stream::{TryIgnore, WidebandExt}, @@ -103,9 +104,10 @@ pub fn deindex_pdu(&self, shortroomid: ShortRoomId, pdu_id: &RawPduId, message_b pub async fn search_pdus<'a>( &'a self, query: &'a RoomQuery<'a>, -) -> Result<(usize, impl Stream + Send + 'a)> { +) -> Result<(usize, impl Stream> + Send + '_)> { let pdu_ids: Vec<_> = self.search_pdu_ids(query).await?.collect().await; + let filter = &query.criteria.filter; let count = pdu_ids.len(); let pdus = pdu_ids .into_iter() @@ -118,11 +120,11 @@ pub async fn search_pdus<'a>( .ok() }) .ready_filter(|pdu| !pdu.is_redacted()) - .ready_filter(|pdu| pdu.matches(&query.criteria.filter)) + .ready_filter(move |pdu| filter.matches(pdu)) .wide_filter_map(move |pdu| async move { self.services .state_accessor - .user_can_see_event(query.user_id?, &pdu.room_id, &pdu.event_id) + .user_can_see_event(query.user_id?, pdu.room_id(), pdu.event_id()) .await .then_some(pdu) }) diff --git a/src/service/rooms/state/mod.rs b/src/service/rooms/state/mod.rs index 9eb02221..641aa6a9 100644 --- a/src/service/rooms/state/mod.rs +++ b/src/service/rooms/state/mod.rs @@ -356,8 +356,8 @@ impl Service { &self, room_id: &RoomId, shortstatehash: u64, - _mutex_lock: &RoomMutexGuard, /* Take mutex guard to make sure users get the room - * state mutex */ + // Take mutex guard to make sure users get the room state mutex + _mutex_lock: &RoomMutexGuard, ) { const BUFSIZE: usize = size_of::(); diff --git a/src/service/rooms/state_accessor/room_state.rs b/src/service/rooms/state_accessor/room_state.rs index 89fa2a83..89a66f0c 100644 --- a/src/service/rooms/state_accessor/room_state.rs +++ b/src/service/rooms/state_accessor/room_state.rs @@ -2,7 +2,7 @@ use std::borrow::Borrow; use conduwuit::{ Result, err, implement, - matrix::{PduEvent, StateKey}, + matrix::{Event, StateKey}, }; use futures::{Stream, StreamExt, TryFutureExt}; use ruma::{EventId, RoomId, events::StateEventType}; @@ -30,7 +30,7 @@ where pub fn room_state_full<'a>( &'a self, room_id: &'a RoomId, -) -> impl Stream> + Send + 'a { +) -> impl Stream> + Send + 'a { self.services .state .get_room_shortstatehash(room_id) @@ -45,7 +45,7 @@ pub fn room_state_full<'a>( pub fn room_state_full_pdus<'a>( &'a self, room_id: &'a RoomId, -) -> impl Stream> + Send + 'a { +) -> impl Stream> + Send + 'a { self.services .state .get_room_shortstatehash(room_id) @@ -84,7 +84,7 @@ pub async fn room_state_get( room_id: &RoomId, event_type: &StateEventType, state_key: &str, -) -> Result { +) -> Result { self.services .state .get_room_shortstatehash(room_id) diff --git a/src/service/rooms/state_accessor/state.rs b/src/service/rooms/state_accessor/state.rs index 169e69e9..a46ce380 100644 --- a/src/service/rooms/state_accessor/state.rs +++ b/src/service/rooms/state_accessor/state.rs @@ -2,14 +2,14 @@ use std::{borrow::Borrow, ops::Deref, sync::Arc}; use conduwuit::{ Result, at, err, implement, - matrix::{PduEvent, StateKey}, + matrix::{Event, StateKey}, pair_of, utils::{ result::FlatOk, stream::{BroadbandExt, IterStream, ReadyExt, TryIgnore}, }, }; -use conduwuit_database::Deserialized; +use database::Deserialized; use futures::{FutureExt, Stream, StreamExt, TryFutureExt, future::try_join, pin_mut}; use ruma::{ EventId, OwnedEventId, UserId, @@ -125,11 +125,9 @@ pub async fn state_get( shortstatehash: ShortStateHash, event_type: &StateEventType, state_key: &str, -) -> Result { +) -> Result { self.state_get_id(shortstatehash, event_type, state_key) - .and_then(|event_id: OwnedEventId| async move { - self.services.timeline.get_pdu(&event_id).await - }) + .and_then(async |event_id: OwnedEventId| self.services.timeline.get_pdu(&event_id).await) .await } @@ -316,18 +314,16 @@ pub fn state_added( pub fn state_full( &self, shortstatehash: ShortStateHash, -) -> impl Stream + Send + '_ { +) -> impl Stream + Send + '_ { self.state_full_pdus(shortstatehash) - .ready_filter_map(|pdu| { - Some(((pdu.kind.to_string().into(), pdu.state_key.clone()?), pdu)) - }) + .ready_filter_map(|pdu| Some(((pdu.kind().clone().into(), pdu.state_key()?.into()), pdu))) } #[implement(super::Service)] pub fn state_full_pdus( &self, shortstatehash: ShortStateHash, -) -> impl Stream + Send + '_ { +) -> impl Stream + Send + '_ { let short_ids = self .state_full_shortids(shortstatehash) .ignore_err() diff --git a/src/service/rooms/state_accessor/user_can.rs b/src/service/rooms/state_accessor/user_can.rs index 67e0b52b..221263a8 100644 --- a/src/service/rooms/state_accessor/user_can.rs +++ b/src/service/rooms/state_accessor/user_can.rs @@ -1,4 +1,4 @@ -use conduwuit::{Err, Result, implement, pdu::PduBuilder}; +use conduwuit::{Err, Result, implement, matrix::Event, pdu::PduBuilder}; use ruma::{ EventId, RoomId, UserId, events::{ @@ -29,14 +29,14 @@ pub async fn user_can_redact( if redacting_event .as_ref() - .is_ok_and(|pdu| pdu.kind == TimelineEventType::RoomCreate) + .is_ok_and(|pdu| *pdu.kind() == TimelineEventType::RoomCreate) { return Err!(Request(Forbidden("Redacting m.room.create is not safe, forbidding."))); } if redacting_event .as_ref() - .is_ok_and(|pdu| pdu.kind == TimelineEventType::RoomServerAcl) + .is_ok_and(|pdu| *pdu.kind() == TimelineEventType::RoomServerAcl) { return Err!(Request(Forbidden( "Redacting m.room.server_acl will result in the room being inaccessible for \ @@ -59,9 +59,9 @@ pub async fn user_can_redact( && match redacting_event { | Ok(redacting_event) => if federation { - redacting_event.sender.server_name() == sender.server_name() + redacting_event.sender().server_name() == sender.server_name() } else { - redacting_event.sender == sender + redacting_event.sender() == sender }, | _ => false, }) @@ -72,10 +72,10 @@ pub async fn user_can_redact( .room_state_get(room_id, &StateEventType::RoomCreate, "") .await { - | Ok(room_create) => Ok(room_create.sender == sender + | Ok(room_create) => Ok(room_create.sender() == sender || redacting_event .as_ref() - .is_ok_and(|redacting_event| redacting_event.sender == sender)), + .is_ok_and(|redacting_event| redacting_event.sender() == sender)), | _ => Err!(Database( "No m.room.power_levels or m.room.create events in database for room" )), diff --git a/src/service/rooms/threads/mod.rs b/src/service/rooms/threads/mod.rs index 9566eb61..59319ba6 100644 --- a/src/service/rooms/threads/mod.rs +++ b/src/service/rooms/threads/mod.rs @@ -49,10 +49,9 @@ impl crate::Service for Service { } impl Service { - pub async fn add_to_thread<'a, E>(&self, root_event_id: &EventId, event: &'a E) -> Result + pub async fn add_to_thread(&self, root_event_id: &EventId, event: &E) -> Result where E: Event + Send + Sync, - &'a E: Event + Send, { let root_id = self .services @@ -120,7 +119,7 @@ impl Service { self.services .timeline - .replace_pdu(&root_id, &root_pdu_json, &root_pdu) + .replace_pdu(&root_id, &root_pdu_json) .await?; } @@ -130,7 +129,7 @@ impl Service { users.extend_from_slice(&userids); }, | _ => { - users.push(root_pdu.sender); + users.push(root_pdu.sender().to_owned()); }, } users.push(event.sender().to_owned()); @@ -162,10 +161,10 @@ impl Service { .ready_take_while(move |pdu_id| pdu_id.shortroomid() == shortroomid.to_be_bytes()) .wide_filter_map(move |pdu_id| async move { let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?; - let pdu_id: PduId = pdu_id.into(); - if pdu.sender != user_id { - pdu.remove_transaction_id().ok(); + let pdu_id: PduId = pdu_id.into(); + if pdu.sender() != user_id { + pdu.as_mut_pdu().remove_transaction_id().ok(); } Some((pdu_id.shorteventid, pdu)) diff --git a/src/service/rooms/timeline/data.rs b/src/service/rooms/timeline/data.rs index 94c78bb0..fa10a5c0 100644 --- a/src/service/rooms/timeline/data.rs +++ b/src/service/rooms/timeline/data.rs @@ -207,7 +207,6 @@ impl Data { &self, pdu_id: &RawPduId, pdu_json: &CanonicalJsonObject, - _pdu: &PduEvent, ) -> Result { if self.pduid_pdu.get(pdu_id).await.is_not_found() { return Err!(Request(NotFound("PDU does not exist."))); diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index bcad1309..a381fcf6 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -14,8 +14,8 @@ pub use conduwuit::matrix::pdu::{PduId, RawPduId}; use conduwuit::{ Err, Error, Result, Server, at, debug, debug_warn, err, error, implement, info, matrix::{ - Event, - pdu::{EventHash, PduBuilder, PduCount, PduEvent, gen_event_id}, + event::{Event, gen_event_id}, + pdu::{EventHash, PduBuilder, PduCount, PduEvent}, state_res::{self, RoomVersion}, }, utils::{ @@ -159,12 +159,12 @@ impl crate::Service for Service { impl Service { #[tracing::instrument(skip(self), level = "debug")] - pub async fn first_pdu_in_room(&self, room_id: &RoomId) -> Result { + pub async fn first_pdu_in_room(&self, room_id: &RoomId) -> Result { self.first_item_in_room(room_id).await.map(at!(1)) } #[tracing::instrument(skip(self), level = "debug")] - pub async fn first_item_in_room(&self, room_id: &RoomId) -> Result<(PduCount, PduEvent)> { + pub async fn first_item_in_room(&self, room_id: &RoomId) -> Result<(PduCount, impl Event)> { let pdus = self.pdus(None, room_id, None); pin_mut!(pdus); @@ -174,7 +174,7 @@ impl Service { } #[tracing::instrument(skip(self), level = "debug")] - pub async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result { + pub async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result { self.db.latest_pdu_in_room(None, room_id).await } @@ -216,13 +216,14 @@ impl Service { /// /// Checks the `eventid_outlierpdu` Tree if not found in the timeline. #[inline] - pub async fn get_non_outlier_pdu(&self, event_id: &EventId) -> Result { + pub async fn get_non_outlier_pdu(&self, event_id: &EventId) -> Result { self.db.get_non_outlier_pdu(event_id).await } /// Returns the pdu. /// /// Checks the `eventid_outlierpdu` Tree if not found in the timeline. + #[inline] pub async fn get_pdu(&self, event_id: &EventId) -> Result { self.db.get_pdu(event_id).await } @@ -230,11 +231,13 @@ impl Service { /// Returns the pdu. /// /// This does __NOT__ check the outliers `Tree`. + #[inline] pub async fn get_pdu_from_id(&self, pdu_id: &RawPduId) -> Result { self.db.get_pdu_from_id(pdu_id).await } /// Returns the pdu as a `BTreeMap`. + #[inline] pub async fn get_pdu_json_from_id(&self, pdu_id: &RawPduId) -> Result { self.db.get_pdu_json_from_id(pdu_id).await } @@ -242,6 +245,7 @@ impl Service { /// Checks if pdu exists /// /// Checks the `eventid_outlierpdu` Tree if not found in the timeline. + #[inline] pub fn pdu_exists<'a>( &'a self, event_id: &'a EventId, @@ -251,13 +255,8 @@ impl Service { /// Removes a pdu and creates a new one with the same id. #[tracing::instrument(skip(self), level = "debug")] - pub async fn replace_pdu( - &self, - pdu_id: &RawPduId, - pdu_json: &CanonicalJsonObject, - pdu: &PduEvent, - ) -> Result<()> { - self.db.replace_pdu(pdu_id, pdu_json, pdu).await + pub async fn replace_pdu(&self, pdu_id: &RawPduId, pdu_json: &CanonicalJsonObject) -> Result { + self.db.replace_pdu(pdu_id, pdu_json).await } /// Creates a new persisted data unit and adds it to a room. @@ -310,25 +309,21 @@ impl Service { unsigned.insert( "prev_content".to_owned(), CanonicalJsonValue::Object( - utils::to_canonical_object(prev_state.content.clone()).map_err( - |e| { - error!( - "Failed to convert prev_state to canonical JSON: {e}" - ); - Error::bad_database( - "Failed to convert prev_state to canonical JSON.", - ) - }, - )?, + utils::to_canonical_object(prev_state.get_content_as_value()) + .map_err(|e| { + err!(Database(error!( + "Failed to convert prev_state to canonical JSON: {e}", + ))) + })?, ), ); unsigned.insert( String::from("prev_sender"), - CanonicalJsonValue::String(prev_state.sender.to_string()), + CanonicalJsonValue::String(prev_state.sender().to_string()), ); unsigned.insert( String::from("replaces_state"), - CanonicalJsonValue::String(prev_state.event_id.to_string()), + CanonicalJsonValue::String(prev_state.event_id().to_string()), ); } } @@ -709,14 +704,11 @@ impl Service { .await { unsigned.insert("prev_content".to_owned(), prev_pdu.get_content_as_value()); - unsigned.insert( - "prev_sender".to_owned(), - serde_json::to_value(&prev_pdu.sender) - .expect("UserId::to_value always works"), - ); + unsigned + .insert("prev_sender".to_owned(), serde_json::to_value(prev_pdu.sender())?); unsigned.insert( "replaces_state".to_owned(), - serde_json::to_value(&prev_pdu.event_id).expect("EventId is valid json"), + serde_json::to_value(prev_pdu.event_id())?, ); } } @@ -759,7 +751,7 @@ impl Service { unsigned: if unsigned.is_empty() { None } else { - Some(to_raw_value(&unsigned).expect("to_raw_value always works")) + Some(to_raw_value(&unsigned)?) }, hashes: EventHash { sha256: "aaa".to_owned() }, signatures: None, @@ -1041,10 +1033,10 @@ impl Service { /// Replace a PDU with the redacted form. #[tracing::instrument(name = "redact", level = "debug", skip(self))] - pub async fn redact_pdu( + pub async fn redact_pdu( &self, event_id: &EventId, - reason: &PduEvent, + reason: &Pdu, shortroomid: ShortRoomId, ) -> Result { // TODO: Don't reserialize, keep original json @@ -1053,9 +1045,13 @@ impl Service { return Ok(()); }; - let mut pdu = self.get_pdu_from_id(&pdu_id).await.map_err(|e| { - err!(Database(error!(?pdu_id, ?event_id, ?e, "PDU ID points to invalid PDU."))) - })?; + let mut pdu = self + .get_pdu_from_id(&pdu_id) + .await + .map(Event::into_pdu) + .map_err(|e| { + err!(Database(error!(?pdu_id, ?event_id, ?e, "PDU ID points to invalid PDU."))) + })?; if let Ok(content) = pdu.get_content::() { if let Some(body) = content.body { @@ -1065,15 +1061,15 @@ impl Service { } } - let room_version_id = self.services.state.get_room_version(&pdu.room_id).await?; + let room_version_id = self.services.state.get_room_version(pdu.room_id()).await?; - pdu.redact(&room_version_id, reason)?; + pdu.redact(&room_version_id, reason.to_value())?; let obj = utils::to_canonical_object(&pdu).map_err(|e| { err!(Database(error!(?event_id, ?e, "Failed to convert PDU to canonical JSON"))) })?; - self.replace_pdu(&pdu_id, &obj, &pdu).await + self.replace_pdu(&pdu_id, &obj).await } #[tracing::instrument(name = "backfill", level = "debug", skip(self))] @@ -1163,7 +1159,7 @@ impl Service { backfill_server, federation::backfill::get_backfill::v1::Request { room_id: room_id.to_owned(), - v: vec![first_pdu.1.event_id.clone()], + v: vec![first_pdu.1.event_id().to_owned()], limit: uint!(100), }, ) @@ -1248,8 +1244,11 @@ impl Service { #[implement(Service)] #[tracing::instrument(skip_all, level = "debug")] -async fn check_pdu_for_admin_room(&self, pdu: &PduEvent, sender: &UserId) -> Result<()> { - match &pdu.kind { +async fn check_pdu_for_admin_room(&self, pdu: &Pdu, sender: &UserId) -> Result +where + Pdu: Event + Send + Sync, +{ + match pdu.kind() { | TimelineEventType::RoomEncryption => { return Err!(Request(Forbidden(error!("Encryption not supported in admins room.")))); }, @@ -1273,7 +1272,7 @@ async fn check_pdu_for_admin_room(&self, pdu: &PduEvent, sender: &UserId) -> Res let count = self .services .state_cache - .room_members(&pdu.room_id) + .room_members(pdu.room_id()) .ready_filter(|user| self.services.globals.user_is_local(user)) .ready_filter(|user| *user != target) .boxed() @@ -1297,7 +1296,7 @@ async fn check_pdu_for_admin_room(&self, pdu: &PduEvent, sender: &UserId) -> Res let count = self .services .state_cache - .room_members(&pdu.room_id) + .room_members(pdu.room_id()) .ready_filter(|user| self.services.globals.user_is_local(user)) .ready_filter(|user| *user != target) .boxed() diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index 408ab17d..a708f746 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -798,7 +798,7 @@ impl Service { let unread: UInt = self .services .user - .notification_count(&user_id, &pdu.room_id) + .notification_count(&user_id, pdu.room_id()) .await .try_into() .expect("notification count can't go that high"); diff --git a/src/service/server_keys/verify.rs b/src/service/server_keys/verify.rs index 84433628..9cc3655a 100644 --- a/src/service/server_keys/verify.rs +++ b/src/service/server_keys/verify.rs @@ -1,4 +1,4 @@ -use conduwuit::{Err, Result, implement, pdu::gen_event_id_canonical_json}; +use conduwuit::{Err, Result, implement, matrix::event::gen_event_id_canonical_json}; use ruma::{ CanonicalJsonObject, CanonicalJsonValue, OwnedEventId, RoomVersionId, signatures::Verified, };