diff --git a/src/api/client/room/initial_sync.rs b/src/api/client/room/initial_sync.rs index 2aca5b9d..d40f6b4f 100644 --- a/src/api/client/room/initial_sync.rs +++ b/src/api/client/room/initial_sync.rs @@ -1,7 +1,7 @@ use axum::extract::State; use conduwuit::{ Err, Event, Result, at, - utils::{BoolExt, future::TryExtExt, stream::TryTools}, + utils::{BoolExt, stream::TryTools}, }; use futures::{FutureExt, TryStreamExt, future::try_join4}; use ruma::api::client::room::initial_sync::v3::{PaginationChunk, Request, Response}; diff --git a/src/service/rooms/timeline/append.rs b/src/service/rooms/timeline/append.rs new file mode 100644 index 00000000..a7b558c2 --- /dev/null +++ b/src/service/rooms/timeline/append.rs @@ -0,0 +1,446 @@ +use std::{ + collections::{BTreeMap, HashSet}, + sync::Arc, +}; + +use conduwuit_core::{ + Result, err, error, implement, + matrix::{ + event::Event, + pdu::{PduCount, PduEvent, PduId, RawPduId}, + }, + utils::{self, ReadyExt}, +}; +use futures::StreamExt; +use ruma::{ + CanonicalJsonObject, CanonicalJsonValue, EventId, RoomVersionId, UserId, + events::{ + GlobalAccountDataEventType, StateEventType, TimelineEventType, + push_rules::PushRulesEvent, + room::{ + encrypted::Relation, + member::{MembershipState, RoomMemberEventContent}, + power_levels::RoomPowerLevelsEventContent, + redaction::RoomRedactionEventContent, + }, + }, + push::{Action, Ruleset, Tweak}, +}; + +use super::{ExtractBody, ExtractRelatesTo, ExtractRelatesToEventId, RoomMutexGuard}; +use crate::{appservice::NamespaceRegex, rooms::state_compressor::CompressedState}; + +/// Append the incoming event setting the state snapshot to the state from +/// the server that sent the event. +#[implement(super::Service)] +#[tracing::instrument(level = "debug", skip_all)] +pub async fn append_incoming_pdu<'a, Leaves>( + &'a self, + pdu: &'a PduEvent, + pdu_json: CanonicalJsonObject, + new_room_leaves: Leaves, + state_ids_compressed: Arc, + soft_fail: bool, + state_lock: &'a RoomMutexGuard, +) -> Result> +where + Leaves: Iterator + Send + 'a, +{ + // We append to state before appending the pdu, so we don't have a moment in + // time with the pdu without it's state. This is okay because append_pdu can't + // fail. + self.services + .state + .set_event_state(&pdu.event_id, &pdu.room_id, state_ids_compressed) + .await?; + + if soft_fail { + self.services + .pdu_metadata + .mark_as_referenced(&pdu.room_id, pdu.prev_events.iter().map(AsRef::as_ref)); + + self.services + .state + .set_forward_extremities(&pdu.room_id, new_room_leaves, state_lock) + .await; + + return Ok(None); + } + + let pdu_id = self + .append_pdu(pdu, pdu_json, new_room_leaves, state_lock) + .await?; + + Ok(Some(pdu_id)) +} + +/// Creates a new persisted data unit and adds it to a room. +/// +/// By this point the incoming event should be fully authenticated, no auth +/// happens in `append_pdu`. +/// +/// Returns pdu id +#[implement(super::Service)] +#[tracing::instrument(level = "debug", skip_all)] +pub async fn append_pdu<'a, Leaves>( + &'a self, + pdu: &'a PduEvent, + mut pdu_json: CanonicalJsonObject, + leaves: Leaves, + state_lock: &'a RoomMutexGuard, +) -> Result +where + Leaves: Iterator + Send + 'a, +{ + // Coalesce database writes for the remainder of this scope. + let _cork = self.db.db.cork_and_flush(); + + let shortroomid = self + .services + .short + .get_shortroomid(pdu.room_id()) + .await + .map_err(|_| err!(Database("Room does not exist")))?; + + // Make unsigned fields correct. This is not properly documented in the spec, + // but state events need to have previous content in the unsigned field, so + // clients can easily interpret things like membership changes + if let Some(state_key) = pdu.state_key() { + if let CanonicalJsonValue::Object(unsigned) = pdu_json + .entry("unsigned".to_owned()) + .or_insert_with(|| CanonicalJsonValue::Object(BTreeMap::default())) + { + if let Ok(shortstatehash) = self + .services + .state_accessor + .pdu_shortstatehash(pdu.event_id()) + .await + { + if let Ok(prev_state) = self + .services + .state_accessor + .state_get(shortstatehash, &pdu.kind().to_string().into(), state_key) + .await + { + unsigned.insert( + "prev_content".to_owned(), + CanonicalJsonValue::Object( + utils::to_canonical_object(prev_state.get_content_as_value()) + .map_err(|e| { + err!(Database(error!( + "Failed to convert prev_state to canonical JSON: {e}", + ))) + })?, + ), + ); + unsigned.insert( + String::from("prev_sender"), + CanonicalJsonValue::String(prev_state.sender().to_string()), + ); + unsigned.insert( + String::from("replaces_state"), + CanonicalJsonValue::String(prev_state.event_id().to_string()), + ); + } + } + } else { + error!("Invalid unsigned type in pdu."); + } + } + + // We must keep track of all events that have been referenced. + self.services + .pdu_metadata + .mark_as_referenced(pdu.room_id(), pdu.prev_events().map(AsRef::as_ref)); + + self.services + .state + .set_forward_extremities(pdu.room_id(), leaves, state_lock) + .await; + + let insert_lock = self.mutex_insert.lock(pdu.room_id()).await; + + let count1 = self.services.globals.next_count().unwrap(); + + // Mark as read first so the sending client doesn't get a notification even if + // appending fails + self.services + .read_receipt + .private_read_set(pdu.room_id(), pdu.sender(), count1); + + self.services + .user + .reset_notification_counts(pdu.sender(), pdu.room_id()); + + let count2 = PduCount::Normal(self.services.globals.next_count().unwrap()); + let pdu_id: RawPduId = PduId { shortroomid, shorteventid: count2 }.into(); + + // Insert pdu + self.db.append_pdu(&pdu_id, pdu, &pdu_json, count2).await; + + drop(insert_lock); + + // See if the event matches any known pushers via power level + let power_levels: RoomPowerLevelsEventContent = self + .services + .state_accessor + .room_state_get_content(pdu.room_id(), &StateEventType::RoomPowerLevels, "") + .await + .unwrap_or_default(); + + let mut push_target: HashSet<_> = self + .services + .state_cache + .active_local_users_in_room(pdu.room_id()) + .map(ToOwned::to_owned) + // Don't notify the sender of their own events, and dont send from ignored users + .ready_filter(|user| *user != pdu.sender()) + .filter_map(|recipient_user| async move { (!self.services.users.user_is_ignored(pdu.sender(), &recipient_user).await).then_some(recipient_user) }) + .collect() + .await; + + let mut notifies = Vec::with_capacity(push_target.len().saturating_add(1)); + let mut highlights = Vec::with_capacity(push_target.len().saturating_add(1)); + + if *pdu.kind() == TimelineEventType::RoomMember { + if let Some(state_key) = pdu.state_key() { + let target_user_id = UserId::parse(state_key)?; + + if self.services.users.is_active_local(target_user_id).await { + push_target.insert(target_user_id.to_owned()); + } + } + } + + let serialized = pdu.to_format(); + for user in &push_target { + let rules_for_user = self + .services + .account_data + .get_global(user, GlobalAccountDataEventType::PushRules) + .await + .map_or_else( + |_| Ruleset::server_default(user), + |ev: PushRulesEvent| ev.content.global, + ); + + let mut highlight = false; + let mut notify = false; + + for action in self + .services + .pusher + .get_actions(user, &rules_for_user, &power_levels, &serialized, pdu.room_id()) + .await + { + match action { + | Action::Notify => notify = true, + | Action::SetTweak(Tweak::Highlight(true)) => { + highlight = true; + }, + | _ => {}, + } + + // Break early if both conditions are true + if notify && highlight { + break; + } + } + + if notify { + notifies.push(user.clone()); + } + + if highlight { + highlights.push(user.clone()); + } + + self.services + .pusher + .get_pushkeys(user) + .ready_for_each(|push_key| { + self.services + .sending + .send_pdu_push(&pdu_id, user, push_key.to_owned()) + .expect("TODO: replace with future"); + }) + .await; + } + + self.db + .increment_notification_counts(pdu.room_id(), notifies, highlights); + + match *pdu.kind() { + | TimelineEventType::RoomRedaction => { + use RoomVersionId::*; + + let room_version_id = self.services.state.get_room_version(pdu.room_id()).await?; + match room_version_id { + | V1 | V2 | V3 | V4 | V5 | V6 | V7 | V8 | V9 | V10 => { + if let Some(redact_id) = pdu.redacts() { + if self + .services + .state_accessor + .user_can_redact(redact_id, pdu.sender(), pdu.room_id(), false) + .await? + { + self.redact_pdu(redact_id, pdu, shortroomid).await?; + } + } + }, + | _ => { + let content: RoomRedactionEventContent = pdu.get_content()?; + if let Some(redact_id) = &content.redacts { + if self + .services + .state_accessor + .user_can_redact(redact_id, pdu.sender(), pdu.room_id(), false) + .await? + { + self.redact_pdu(redact_id, pdu, shortroomid).await?; + } + } + }, + } + }, + | TimelineEventType::SpaceChild => + if let Some(_state_key) = pdu.state_key() { + self.services + .spaces + .roomid_spacehierarchy_cache + .lock() + .await + .remove(pdu.room_id()); + }, + | TimelineEventType::RoomMember => { + if let Some(state_key) = pdu.state_key() { + // if the state_key fails + let target_user_id = + UserId::parse(state_key).expect("This state_key was previously validated"); + + let content: RoomMemberEventContent = pdu.get_content()?; + let stripped_state = match content.membership { + | MembershipState::Invite | MembershipState::Knock => + self.services.state.summary_stripped(pdu).await.into(), + | _ => None, + }; + + // Update our membership info, we do this here incase a user is invited or + // knocked and immediately leaves we need the DB to record the invite or + // knock event for auth + self.services + .state_cache + .update_membership( + pdu.room_id(), + target_user_id, + content, + pdu.sender(), + stripped_state, + None, + true, + ) + .await?; + } + }, + | TimelineEventType::RoomMessage => { + let content: ExtractBody = pdu.get_content()?; + if let Some(body) = content.body { + self.services.search.index_pdu(shortroomid, &pdu_id, &body); + + if self.services.admin.is_admin_command(pdu, &body).await { + self.services + .admin + .command_with_sender(body, Some((pdu.event_id()).into()), pdu.sender.clone().into())?; + } + } + }, + | _ => {}, + } + + if let Ok(content) = pdu.get_content::() { + if let Ok(related_pducount) = self.get_pdu_count(&content.relates_to.event_id).await { + self.services + .pdu_metadata + .add_relation(count2, related_pducount); + } + } + + if let Ok(content) = pdu.get_content::() { + match content.relates_to { + | Relation::Reply { in_reply_to } => { + // We need to do it again here, because replies don't have + // event_id as a top level field + if let Ok(related_pducount) = self.get_pdu_count(&in_reply_to.event_id).await { + self.services + .pdu_metadata + .add_relation(count2, related_pducount); + } + }, + | Relation::Thread(thread) => { + self.services + .threads + .add_to_thread(&thread.event_id, pdu) + .await?; + }, + | _ => {}, // TODO: Aggregate other types + } + } + + for appservice in self.services.appservice.read().await.values() { + if self + .services + .state_cache + .appservice_in_room(pdu.room_id(), appservice) + .await + { + self.services + .sending + .send_pdu_appservice(appservice.registration.id.clone(), pdu_id)?; + continue; + } + + // If the RoomMember event has a non-empty state_key, it is targeted at someone. + // If it is our appservice user, we send this PDU to it. + if *pdu.kind() == TimelineEventType::RoomMember { + if let Some(state_key_uid) = &pdu + .state_key + .as_ref() + .and_then(|state_key| UserId::parse(state_key.as_str()).ok()) + { + let appservice_uid = appservice.registration.sender_localpart.as_str(); + if state_key_uid == &appservice_uid { + self.services + .sending + .send_pdu_appservice(appservice.registration.id.clone(), pdu_id)?; + continue; + } + } + } + + let matching_users = |users: &NamespaceRegex| { + appservice.users.is_match(pdu.sender().as_str()) + || *pdu.kind() == TimelineEventType::RoomMember + && pdu + .state_key + .as_ref() + .is_some_and(|state_key| users.is_match(state_key)) + }; + let matching_aliases = |aliases: NamespaceRegex| { + self.services + .alias + .local_aliases_for_room(pdu.room_id()) + .ready_any(move |room_alias| aliases.is_match(room_alias.as_str())) + }; + + if matching_aliases(appservice.aliases.clone()).await + || appservice.rooms.is_match(pdu.room_id().as_str()) + || matching_users(&appservice.users) + { + self.services + .sending + .send_pdu_appservice(appservice.registration.id.clone(), pdu_id)?; + } + } + + Ok(pdu_id) +} diff --git a/src/service/rooms/timeline/backfill.rs b/src/service/rooms/timeline/backfill.rs new file mode 100644 index 00000000..e976981e --- /dev/null +++ b/src/service/rooms/timeline/backfill.rs @@ -0,0 +1,191 @@ +use std::iter::once; + +use conduwuit_core::{ + Result, debug, debug_warn, implement, info, + matrix::{ + event::Event, + pdu::{PduCount, PduId, RawPduId}, + }, + utils::{IterStream, ReadyExt}, + validated, warn, +}; +use futures::{FutureExt, StreamExt}; +use ruma::{ + RoomId, ServerName, + api::federation, + events::{ + StateEventType, TimelineEventType, room::power_levels::RoomPowerLevelsEventContent, + }, + uint, +}; +use serde_json::value::RawValue as RawJsonValue; + +use super::ExtractBody; + +#[implement(super::Service)] +#[tracing::instrument(name = "backfill", level = "debug", skip(self))] +pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Result<()> { + if self + .services + .state_cache + .room_joined_count(room_id) + .await + .is_ok_and(|count| count <= 1) + && !self + .services + .state_accessor + .is_world_readable(room_id) + .await + { + // Room is empty (1 user or none), there is no one that can backfill + return Ok(()); + } + + let first_pdu = self + .first_item_in_room(room_id) + .await + .expect("Room is not empty"); + + if first_pdu.0 < from { + // No backfill required, there are still events between them + return Ok(()); + } + + let power_levels: RoomPowerLevelsEventContent = self + .services + .state_accessor + .room_state_get_content(room_id, &StateEventType::RoomPowerLevels, "") + .await + .unwrap_or_default(); + + let room_mods = power_levels.users.iter().filter_map(|(user_id, level)| { + if level > &power_levels.users_default && !self.services.globals.user_is_local(user_id) { + Some(user_id.server_name()) + } else { + None + } + }); + + let canonical_room_alias_server = once( + self.services + .state_accessor + .get_canonical_alias(room_id) + .await, + ) + .filter_map(Result::ok) + .map(|alias| alias.server_name().to_owned()) + .stream(); + + let mut servers = room_mods + .stream() + .map(ToOwned::to_owned) + .chain(canonical_room_alias_server) + .chain( + self.services + .server + .config + .trusted_servers + .iter() + .map(ToOwned::to_owned) + .stream(), + ) + .ready_filter(|server_name| !self.services.globals.server_is_ours(server_name)) + .filter_map(|server_name| async move { + self.services + .state_cache + .server_in_room(&server_name, room_id) + .await + .then_some(server_name) + }) + .boxed(); + + while let Some(ref backfill_server) = servers.next().await { + info!("Asking {backfill_server} for backfill"); + let response = self + .services + .sending + .send_federation_request( + backfill_server, + federation::backfill::get_backfill::v1::Request { + room_id: room_id.to_owned(), + v: vec![first_pdu.1.event_id().to_owned()], + limit: uint!(100), + }, + ) + .await; + match response { + | Ok(response) => { + for pdu in response.pdus { + if let Err(e) = self.backfill_pdu(backfill_server, pdu).boxed().await { + debug_warn!("Failed to add backfilled pdu in room {room_id}: {e}"); + } + } + return Ok(()); + }, + | Err(e) => { + warn!("{backfill_server} failed to provide backfill for room {room_id}: {e}"); + }, + } + } + + info!("No servers could backfill, but backfill was needed in room {room_id}"); + Ok(()) +} + +#[implement(super::Service)] +#[tracing::instrument(skip(self, pdu), level = "debug")] +pub async fn backfill_pdu(&self, origin: &ServerName, pdu: Box) -> Result<()> { + let (room_id, event_id, value) = self.services.event_handler.parse_incoming_pdu(&pdu).await?; + + // Lock so we cannot backfill the same pdu twice at the same time + let mutex_lock = self + .services + .event_handler + .mutex_federation + .lock(&room_id) + .await; + + // Skip the PDU if we already have it as a timeline event + if let Ok(pdu_id) = self.get_pdu_id(&event_id).await { + debug!("We already know {event_id} at {pdu_id:?}"); + return Ok(()); + } + + self.services + .event_handler + .handle_incoming_pdu(origin, &room_id, &event_id, value, false) + .boxed() + .await?; + + let value = self.get_pdu_json(&event_id).await?; + + let pdu = self.get_pdu(&event_id).await?; + + let shortroomid = self.services.short.get_shortroomid(&room_id).await?; + + let insert_lock = self.mutex_insert.lock(&room_id).await; + + let count: i64 = self.services.globals.next_count().unwrap().try_into()?; + + let pdu_id: RawPduId = PduId { + shortroomid, + shorteventid: PduCount::Backfilled(validated!(0 - count)), + } + .into(); + + // Insert pdu + self.db.prepend_backfill_pdu(&pdu_id, &event_id, &value); + + drop(insert_lock); + + if pdu.kind == TimelineEventType::RoomMessage { + let content: ExtractBody = pdu.get_content()?; + if let Some(body) = content.body { + self.services.search.index_pdu(shortroomid, &pdu_id, &body); + } + } + drop(mutex_lock); + + debug!("Prepended backfill pdu"); + Ok(()) +} diff --git a/src/service/rooms/timeline/build.rs b/src/service/rooms/timeline/build.rs new file mode 100644 index 00000000..a522c531 --- /dev/null +++ b/src/service/rooms/timeline/build.rs @@ -0,0 +1,226 @@ +use std::{collections::HashSet, iter::once}; + +use conduwuit_core::{ + Err, Result, implement, + matrix::{event::Event, pdu::PduBuilder}, + utils::{IterStream, ReadyExt}, +}; +use futures::{FutureExt, StreamExt}; +use ruma::{ + OwnedEventId, OwnedServerName, RoomId, RoomVersionId, UserId, + events::{ + TimelineEventType, + room::{ + member::{MembershipState, RoomMemberEventContent}, + redaction::RoomRedactionEventContent, + }, + }, +}; + +use super::RoomMutexGuard; + +/// Creates a new persisted data unit and adds it to a room. This function +/// takes a roomid_mutex_state, meaning that only this function is able to +/// mutate the room state. +#[implement(super::Service)] +#[tracing::instrument(skip(self, state_lock), level = "debug")] +pub async fn build_and_append_pdu( + &self, + pdu_builder: PduBuilder, + sender: &UserId, + room_id: &RoomId, + state_lock: &RoomMutexGuard, +) -> Result { + let (pdu, pdu_json) = self + .create_hash_and_sign_event(pdu_builder, sender, room_id, state_lock) + .await?; + + if self.services.admin.is_admin_room(pdu.room_id()).await { + self.check_pdu_for_admin_room(&pdu, sender).boxed().await?; + } + + // If redaction event is not authorized, do not append it to the timeline + if *pdu.kind() == TimelineEventType::RoomRedaction { + use RoomVersionId::*; + match self.services.state.get_room_version(pdu.room_id()).await? { + | V1 | V2 | V3 | V4 | V5 | V6 | V7 | V8 | V9 | V10 => { + if let Some(redact_id) = pdu.redacts() { + if !self + .services + .state_accessor + .user_can_redact(redact_id, pdu.sender(), pdu.room_id(), false) + .await? + { + return Err!(Request(Forbidden("User cannot redact this event."))); + } + } + }, + | _ => { + let content: RoomRedactionEventContent = pdu.get_content()?; + if let Some(redact_id) = &content.redacts { + if !self + .services + .state_accessor + .user_can_redact(redact_id, pdu.sender(), pdu.room_id(), false) + .await? + { + return Err!(Request(Forbidden("User cannot redact this event."))); + } + } + }, + } + } + + if *pdu.kind() == TimelineEventType::RoomMember { + let content: RoomMemberEventContent = pdu.get_content()?; + + if content.join_authorized_via_users_server.is_some() + && content.membership != MembershipState::Join + { + return Err!(Request(BadJson( + "join_authorised_via_users_server is only for member joins" + ))); + } + + if content + .join_authorized_via_users_server + .as_ref() + .is_some_and(|authorising_user| { + !self.services.globals.user_is_local(authorising_user) + }) { + return Err!(Request(InvalidParam( + "Authorising user does not belong to this homeserver" + ))); + } + } + + // We append to state before appending the pdu, so we don't have a moment in + // time with the pdu without it's state. This is okay because append_pdu can't + // fail. + let statehashid = self.services.state.append_to_state(&pdu).await?; + + let pdu_id = self + .append_pdu( + &pdu, + pdu_json, + // Since this PDU references all pdu_leaves we can update the leaves + // of the room + once(pdu.event_id()), + state_lock, + ) + .boxed() + .await?; + + // We set the room state after inserting the pdu, so that we never have a moment + // in time where events in the current room state do not exist + self.services + .state + .set_room_state(pdu.room_id(), statehashid, state_lock); + + let mut servers: HashSet = self + .services + .state_cache + .room_servers(pdu.room_id()) + .map(ToOwned::to_owned) + .collect() + .await; + + // In case we are kicking or banning a user, we need to inform their server of + // the change + if *pdu.kind() == TimelineEventType::RoomMember { + if let Some(state_key_uid) = &pdu + .state_key + .as_ref() + .and_then(|state_key| UserId::parse(state_key.as_str()).ok()) + { + servers.insert(state_key_uid.server_name().to_owned()); + } + } + + // Remove our server from the server list since it will be added to it by + // room_servers() and/or the if statement above + servers.remove(self.services.globals.server_name()); + + self.services + .sending + .send_pdu_servers(servers.iter().map(AsRef::as_ref).stream(), &pdu_id) + .await?; + + Ok(pdu.event_id().to_owned()) +} + +#[implement(super::Service)] +#[tracing::instrument(skip_all, level = "debug")] +async fn check_pdu_for_admin_room(&self, pdu: &Pdu, sender: &UserId) -> Result +where + Pdu: Event + Send + Sync, +{ + match pdu.kind() { + | TimelineEventType::RoomEncryption => { + return Err!(Request(Forbidden(error!("Encryption not supported in admins room.")))); + }, + | TimelineEventType::RoomMember => { + let target = pdu + .state_key() + .filter(|v| v.starts_with('@')) + .unwrap_or(sender.as_str()); + + let server_user = &self.services.globals.server_user.to_string(); + + let content: RoomMemberEventContent = pdu.get_content()?; + match content.membership { + | MembershipState::Leave => { + if target == server_user { + return Err!(Request(Forbidden(error!( + "Server user cannot leave the admins room." + )))); + } + + let count = self + .services + .state_cache + .room_members(pdu.room_id()) + .ready_filter(|user| self.services.globals.user_is_local(user)) + .ready_filter(|user| *user != target) + .boxed() + .count() + .await; + + if count < 2 { + return Err!(Request(Forbidden(error!( + "Last admin cannot leave the admins room." + )))); + } + }, + + | MembershipState::Ban if pdu.state_key().is_some() => { + if target == server_user { + return Err!(Request(Forbidden(error!( + "Server cannot be banned from admins room." + )))); + } + + let count = self + .services + .state_cache + .room_members(pdu.room_id()) + .ready_filter(|user| self.services.globals.user_is_local(user)) + .ready_filter(|user| *user != target) + .boxed() + .count() + .await; + + if count < 2 { + return Err!(Request(Forbidden(error!( + "Last admin cannot be banned from admins room." + )))); + } + }, + | _ => {}, + } + }, + | _ => {}, + } + + Ok(()) +} diff --git a/src/service/rooms/timeline/create.rs b/src/service/rooms/timeline/create.rs new file mode 100644 index 00000000..d890e88e --- /dev/null +++ b/src/service/rooms/timeline/create.rs @@ -0,0 +1,214 @@ +use std::cmp; + +use conduwuit_core::{ + Err, Error, Result, err, implement, + matrix::{ + event::{Event, gen_event_id}, + pdu::{EventHash, PduBuilder, PduEvent}, + state_res::{self, RoomVersion}, + }, + utils::{self, IterStream, ReadyExt, stream::TryIgnore}, +}; +use futures::{StreamExt, TryStreamExt, future, future::ready}; +use ruma::{ + CanonicalJsonObject, CanonicalJsonValue, OwnedEventId, RoomId, RoomVersionId, UserId, + canonical_json::to_canonical_value, + events::{StateEventType, TimelineEventType, room::create::RoomCreateEventContent}, + uint, +}; +use serde_json::value::to_raw_value; +use tracing::warn; + +use super::RoomMutexGuard; + +#[implement(super::Service)] +pub async fn create_hash_and_sign_event( + &self, + pdu_builder: PduBuilder, + sender: &UserId, + room_id: &RoomId, + _mutex_lock: &RoomMutexGuard, /* Take mutex guard to make sure users get the room + * state mutex */ +) -> Result<(PduEvent, CanonicalJsonObject)> { + let PduBuilder { + event_type, + content, + unsigned, + state_key, + redacts, + timestamp, + } = pdu_builder; + + let prev_events: Vec = self + .services + .state + .get_forward_extremities(room_id) + .take(20) + .map(Into::into) + .collect() + .await; + + // If there was no create event yet, assume we are creating a room + let room_version_id = self + .services + .state + .get_room_version(room_id) + .await + .or_else(|_| { + if event_type == TimelineEventType::RoomCreate { + let content: RoomCreateEventContent = serde_json::from_str(content.get())?; + Ok(content.room_version) + } else { + Err(Error::InconsistentRoomState( + "non-create event for room of unknown version", + room_id.to_owned(), + )) + } + })?; + + let room_version = RoomVersion::new(&room_version_id).expect("room version is supported"); + + let auth_events = self + .services + .state + .get_auth_events(room_id, &event_type, sender, state_key.as_deref(), &content) + .await?; + + // Our depth is the maximum depth of prev_events + 1 + let depth = prev_events + .iter() + .stream() + .map(Ok) + .and_then(|event_id| self.get_pdu(event_id)) + .and_then(|pdu| future::ok(pdu.depth)) + .ignore_err() + .ready_fold(uint!(0), cmp::max) + .await + .saturating_add(uint!(1)); + + let mut unsigned = unsigned.unwrap_or_default(); + + if let Some(state_key) = &state_key { + if let Ok(prev_pdu) = self + .services + .state_accessor + .room_state_get(room_id, &event_type.to_string().into(), state_key) + .await + { + unsigned.insert("prev_content".to_owned(), prev_pdu.get_content_as_value()); + unsigned.insert("prev_sender".to_owned(), serde_json::to_value(prev_pdu.sender())?); + unsigned + .insert("replaces_state".to_owned(), serde_json::to_value(prev_pdu.event_id())?); + } + } + + if event_type != TimelineEventType::RoomCreate && prev_events.is_empty() { + return Err!(Request(Unknown("Event incorrectly had zero prev_events."))); + } + if state_key.is_none() && depth.lt(&uint!(2)) { + // The first two events in a room are always m.room.create and m.room.member, + // so any other events with that same depth are illegal. + warn!( + "Had unsafe depth {depth} when creating non-state event in {room_id}. Cowardly \ + aborting" + ); + return Err!(Request(Unknown("Unsafe depth for non-state event."))); + } + + let mut pdu = PduEvent { + event_id: ruma::event_id!("$thiswillbefilledinlater").into(), + room_id: room_id.to_owned(), + sender: sender.to_owned(), + origin: None, + origin_server_ts: timestamp.map_or_else( + || { + utils::millis_since_unix_epoch() + .try_into() + .expect("u64 fits into UInt") + }, + |ts| ts.get(), + ), + kind: event_type, + content, + state_key, + prev_events, + depth, + auth_events: auth_events + .values() + .map(|pdu| pdu.event_id.clone()) + .collect(), + redacts, + unsigned: if unsigned.is_empty() { + None + } else { + Some(to_raw_value(&unsigned)?) + }, + hashes: EventHash { sha256: "aaa".to_owned() }, + signatures: None, + }; + + let auth_fetch = |k: &StateEventType, s: &str| { + let key = (k.clone(), s.into()); + ready(auth_events.get(&key).map(ToOwned::to_owned)) + }; + + let auth_check = state_res::auth_check( + &room_version, + &pdu, + None, // TODO: third_party_invite + auth_fetch, + ) + .await + .map_err(|e| err!(Request(Forbidden(warn!("Auth check failed: {e:?}")))))?; + + if !auth_check { + return Err!(Request(Forbidden("Event is not authorized."))); + } + + // Hash and sign + let mut pdu_json = utils::to_canonical_object(&pdu).map_err(|e| { + err!(Request(BadJson(warn!("Failed to convert PDU to canonical JSON: {e}")))) + })?; + + // room v3 and above removed the "event_id" field from remote PDU format + match room_version_id { + | RoomVersionId::V1 | RoomVersionId::V2 => {}, + | _ => { + pdu_json.remove("event_id"); + }, + } + + // Add origin because synapse likes that (and it's required in the spec) + pdu_json.insert( + "origin".to_owned(), + to_canonical_value(self.services.globals.server_name()) + .expect("server name is a valid CanonicalJsonValue"), + ); + + if let Err(e) = self + .services + .server_keys + .hash_and_sign_event(&mut pdu_json, &room_version_id) + { + return match e { + | Error::Signatures(ruma::signatures::Error::PduSize) => { + Err!(Request(TooLarge("Message/PDU is too long (exceeds 65535 bytes)"))) + }, + | _ => Err!(Request(Unknown(warn!("Signing event failed: {e}")))), + }; + } + + // Generate event id + pdu.event_id = gen_event_id(&pdu_json, &room_version_id)?; + + pdu_json.insert("event_id".into(), CanonicalJsonValue::String(pdu.event_id.clone().into())); + + // Generate short event id + let _shorteventid = self + .services + .short + .get_or_create_shorteventid(&pdu.event_id) + .await; + + Ok((pdu, pdu_json)) +} diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index a381fcf6..70c98a09 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -1,61 +1,34 @@ +mod append; +mod backfill; +mod build; +mod create; mod data; +mod redact; -use std::{ - borrow::Borrow, - cmp, - collections::{BTreeMap, HashSet}, - fmt::Write, - iter::once, - sync::Arc, -}; +use std::{fmt::Write, sync::Arc}; use async_trait::async_trait; -pub use conduwuit::matrix::pdu::{PduId, RawPduId}; -use conduwuit::{ - Err, Error, Result, Server, at, debug, debug_warn, err, error, implement, info, +pub use conduwuit_core::matrix::pdu::{PduId, RawPduId}; +use conduwuit_core::{ + Result, Server, at, err, matrix::{ - event::{Event, gen_event_id}, - pdu::{EventHash, PduBuilder, PduCount, PduEvent}, - state_res::{self, RoomVersion}, + event::Event, + pdu::{PduCount, PduEvent}, }, - utils::{ - self, IterStream, MutexMap, MutexMapGuard, ReadyExt, future::TryExtExt, stream::TryIgnore, - }, - validated, warn, -}; -use futures::{ - Future, FutureExt, Stream, StreamExt, TryStreamExt, future, future::ready, pin_mut, + utils::{MutexMap, MutexMapGuard, future::TryExtExt, stream::TryIgnore}, + warn, }; +use futures::{Future, Stream, TryStreamExt, pin_mut}; use ruma::{ - CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId, OwnedServerName, - RoomId, RoomVersionId, ServerName, UserId, - api::federation, - canonical_json::to_canonical_value, - events::{ - GlobalAccountDataEventType, StateEventType, TimelineEventType, - push_rules::PushRulesEvent, - room::{ - create::RoomCreateEventContent, - encrypted::Relation, - member::{MembershipState, RoomMemberEventContent}, - power_levels::RoomPowerLevelsEventContent, - redaction::RoomRedactionEventContent, - }, - }, - push::{Action, Ruleset, Tweak}, - uint, + CanonicalJsonObject, EventId, OwnedEventId, OwnedRoomId, RoomId, UserId, + events::room::encrypted::Relation, }; use serde::Deserialize; -use serde_json::value::{RawValue as RawJsonValue, to_raw_value}; use self::data::Data; pub use self::data::PdusIterItem; use crate::{ - Dep, account_data, admin, appservice, - appservice::NamespaceRegex, - globals, pusher, rooms, - rooms::{short::ShortRoomId, state_compressor::CompressedState}, - sending, server_keys, users, + Dep, account_data, admin, appservice, globals, pusher, rooms, sending, server_keys, users, }; // Update Relationships @@ -259,743 +232,6 @@ impl Service { self.db.replace_pdu(pdu_id, pdu_json).await } - /// Creates a new persisted data unit and adds it to a room. - /// - /// By this point the incoming event should be fully authenticated, no auth - /// happens in `append_pdu`. - /// - /// Returns pdu id - #[tracing::instrument(level = "debug", skip_all)] - pub async fn append_pdu<'a, Leaves>( - &'a self, - pdu: &'a PduEvent, - mut pdu_json: CanonicalJsonObject, - leaves: Leaves, - state_lock: &'a RoomMutexGuard, - ) -> Result - where - Leaves: Iterator + Send + 'a, - { - // Coalesce database writes for the remainder of this scope. - let _cork = self.db.db.cork_and_flush(); - - let shortroomid = self - .services - .short - .get_shortroomid(&pdu.room_id) - .await - .map_err(|_| err!(Database("Room does not exist")))?; - - // Make unsigned fields correct. This is not properly documented in the spec, - // but state events need to have previous content in the unsigned field, so - // clients can easily interpret things like membership changes - if let Some(state_key) = &pdu.state_key { - if let CanonicalJsonValue::Object(unsigned) = pdu_json - .entry("unsigned".to_owned()) - .or_insert_with(|| CanonicalJsonValue::Object(BTreeMap::default())) - { - if let Ok(shortstatehash) = self - .services - .state_accessor - .pdu_shortstatehash(&pdu.event_id) - .await - { - if let Ok(prev_state) = self - .services - .state_accessor - .state_get(shortstatehash, &pdu.kind.to_string().into(), state_key) - .await - { - unsigned.insert( - "prev_content".to_owned(), - CanonicalJsonValue::Object( - utils::to_canonical_object(prev_state.get_content_as_value()) - .map_err(|e| { - err!(Database(error!( - "Failed to convert prev_state to canonical JSON: {e}", - ))) - })?, - ), - ); - unsigned.insert( - String::from("prev_sender"), - CanonicalJsonValue::String(prev_state.sender().to_string()), - ); - unsigned.insert( - String::from("replaces_state"), - CanonicalJsonValue::String(prev_state.event_id().to_string()), - ); - } - } - } else { - error!("Invalid unsigned type in pdu."); - } - } - - // We must keep track of all events that have been referenced. - self.services - .pdu_metadata - .mark_as_referenced(&pdu.room_id, pdu.prev_events.iter().map(AsRef::as_ref)); - - self.services - .state - .set_forward_extremities(&pdu.room_id, leaves, state_lock) - .await; - - let insert_lock = self.mutex_insert.lock(&pdu.room_id).await; - - let count1 = self.services.globals.next_count().unwrap(); - // Mark as read first so the sending client doesn't get a notification even if - // appending fails - self.services - .read_receipt - .private_read_set(&pdu.room_id, &pdu.sender, count1); - self.services - .user - .reset_notification_counts(&pdu.sender, &pdu.room_id); - - let count2 = PduCount::Normal(self.services.globals.next_count().unwrap()); - let pdu_id: RawPduId = PduId { shortroomid, shorteventid: count2 }.into(); - - // Insert pdu - self.db.append_pdu(&pdu_id, pdu, &pdu_json, count2).await; - - drop(insert_lock); - - // See if the event matches any known pushers via power level - let power_levels: RoomPowerLevelsEventContent = self - .services - .state_accessor - .room_state_get_content(&pdu.room_id, &StateEventType::RoomPowerLevels, "") - .await - .unwrap_or_default(); - - let mut push_target: HashSet<_> = self - .services - .state_cache - .active_local_users_in_room(&pdu.room_id) - .map(ToOwned::to_owned) - // Don't notify the sender of their own events, and dont send from ignored users - .ready_filter(|user| *user != pdu.sender) - .filter_map(|recipient_user| async move { (!self.services.users.user_is_ignored(&pdu.sender, &recipient_user).await).then_some(recipient_user) }) - .collect() - .await; - - let mut notifies = Vec::with_capacity(push_target.len().saturating_add(1)); - let mut highlights = Vec::with_capacity(push_target.len().saturating_add(1)); - - if pdu.kind == TimelineEventType::RoomMember { - if let Some(state_key) = &pdu.state_key { - let target_user_id = UserId::parse(state_key)?; - - if self.services.users.is_active_local(target_user_id).await { - push_target.insert(target_user_id.to_owned()); - } - } - } - - let serialized = pdu.to_format(); - for user in &push_target { - let rules_for_user = self - .services - .account_data - .get_global(user, GlobalAccountDataEventType::PushRules) - .await - .map_or_else( - |_| Ruleset::server_default(user), - |ev: PushRulesEvent| ev.content.global, - ); - - let mut highlight = false; - let mut notify = false; - - for action in self - .services - .pusher - .get_actions(user, &rules_for_user, &power_levels, &serialized, &pdu.room_id) - .await - { - match action { - | Action::Notify => notify = true, - | Action::SetTweak(Tweak::Highlight(true)) => { - highlight = true; - }, - | _ => {}, - } - - // Break early if both conditions are true - if notify && highlight { - break; - } - } - - if notify { - notifies.push(user.clone()); - } - - if highlight { - highlights.push(user.clone()); - } - - self.services - .pusher - .get_pushkeys(user) - .ready_for_each(|push_key| { - self.services - .sending - .send_pdu_push(&pdu_id, user, push_key.to_owned()) - .expect("TODO: replace with future"); - }) - .await; - } - - self.db - .increment_notification_counts(&pdu.room_id, notifies, highlights); - - match pdu.kind { - | TimelineEventType::RoomRedaction => { - use RoomVersionId::*; - - let room_version_id = self.services.state.get_room_version(&pdu.room_id).await?; - match room_version_id { - | V1 | V2 | V3 | V4 | V5 | V6 | V7 | V8 | V9 | V10 => { - if let Some(redact_id) = &pdu.redacts { - if self - .services - .state_accessor - .user_can_redact(redact_id, &pdu.sender, &pdu.room_id, false) - .await? - { - self.redact_pdu(redact_id, pdu, shortroomid).await?; - } - } - }, - | _ => { - let content: RoomRedactionEventContent = pdu.get_content()?; - if let Some(redact_id) = &content.redacts { - if self - .services - .state_accessor - .user_can_redact(redact_id, &pdu.sender, &pdu.room_id, false) - .await? - { - self.redact_pdu(redact_id, pdu, shortroomid).await?; - } - } - }, - } - }, - | TimelineEventType::SpaceChild => - if let Some(_state_key) = &pdu.state_key { - self.services - .spaces - .roomid_spacehierarchy_cache - .lock() - .await - .remove(&pdu.room_id); - }, - | TimelineEventType::RoomMember => { - if let Some(state_key) = &pdu.state_key { - // if the state_key fails - let target_user_id = UserId::parse(state_key) - .expect("This state_key was previously validated"); - - let content: RoomMemberEventContent = pdu.get_content()?; - let stripped_state = match content.membership { - | MembershipState::Invite | MembershipState::Knock => - self.services.state.summary_stripped(pdu).await.into(), - | _ => None, - }; - - // Update our membership info, we do this here incase a user is invited or - // knocked and immediately leaves we need the DB to record the invite or - // knock event for auth - self.services - .state_cache - .update_membership( - &pdu.room_id, - target_user_id, - content, - &pdu.sender, - stripped_state, - None, - true, - ) - .await?; - } - }, - | TimelineEventType::RoomMessage => { - let content: ExtractBody = pdu.get_content()?; - if let Some(body) = content.body { - self.services.search.index_pdu(shortroomid, &pdu_id, &body); - - if self.services.admin.is_admin_command(pdu, &body).await { - self.services.admin.command_with_sender( - body, - Some((*pdu.event_id).into()), - pdu.sender.clone().into(), - )?; - } - } - }, - | _ => {}, - } - - if let Ok(content) = pdu.get_content::() { - if let Ok(related_pducount) = self.get_pdu_count(&content.relates_to.event_id).await { - self.services - .pdu_metadata - .add_relation(count2, related_pducount); - } - } - - if let Ok(content) = pdu.get_content::() { - match content.relates_to { - | Relation::Reply { in_reply_to } => { - // We need to do it again here, because replies don't have - // event_id as a top level field - if let Ok(related_pducount) = self.get_pdu_count(&in_reply_to.event_id).await - { - self.services - .pdu_metadata - .add_relation(count2, related_pducount); - } - }, - | Relation::Thread(thread) => { - self.services - .threads - .add_to_thread(&thread.event_id, pdu) - .await?; - }, - | _ => {}, // TODO: Aggregate other types - } - } - - for appservice in self.services.appservice.read().await.values() { - if self - .services - .state_cache - .appservice_in_room(&pdu.room_id, appservice) - .await - { - self.services - .sending - .send_pdu_appservice(appservice.registration.id.clone(), pdu_id)?; - continue; - } - - // If the RoomMember event has a non-empty state_key, it is targeted at someone. - // If it is our appservice user, we send this PDU to it. - if pdu.kind == TimelineEventType::RoomMember { - if let Some(state_key_uid) = &pdu - .state_key - .as_ref() - .and_then(|state_key| UserId::parse(state_key.as_str()).ok()) - { - let appservice_uid = appservice.registration.sender_localpart.as_str(); - if state_key_uid == &appservice_uid { - self.services - .sending - .send_pdu_appservice(appservice.registration.id.clone(), pdu_id)?; - continue; - } - } - } - - let matching_users = |users: &NamespaceRegex| { - appservice.users.is_match(pdu.sender.as_str()) - || pdu.kind == TimelineEventType::RoomMember - && pdu - .state_key - .as_ref() - .is_some_and(|state_key| users.is_match(state_key)) - }; - let matching_aliases = |aliases: NamespaceRegex| { - self.services - .alias - .local_aliases_for_room(&pdu.room_id) - .ready_any(move |room_alias| aliases.is_match(room_alias.as_str())) - }; - - if matching_aliases(appservice.aliases.clone()).await - || appservice.rooms.is_match(pdu.room_id.as_str()) - || matching_users(&appservice.users) - { - self.services - .sending - .send_pdu_appservice(appservice.registration.id.clone(), pdu_id)?; - } - } - - Ok(pdu_id) - } - - pub async fn create_hash_and_sign_event( - &self, - pdu_builder: PduBuilder, - sender: &UserId, - room_id: &RoomId, - _mutex_lock: &RoomMutexGuard, /* Take mutex guard to make sure users get the room - * state mutex */ - ) -> Result<(PduEvent, CanonicalJsonObject)> { - let PduBuilder { - event_type, - content, - unsigned, - state_key, - redacts, - timestamp, - } = pdu_builder; - - let prev_events: Vec = self - .services - .state - .get_forward_extremities(room_id) - .take(20) - .map(Into::into) - .collect() - .await; - - // If there was no create event yet, assume we are creating a room - let room_version_id = self - .services - .state - .get_room_version(room_id) - .await - .or_else(|_| { - if event_type == TimelineEventType::RoomCreate { - let content: RoomCreateEventContent = serde_json::from_str(content.get())?; - Ok(content.room_version) - } else { - Err(Error::InconsistentRoomState( - "non-create event for room of unknown version", - room_id.to_owned(), - )) - } - })?; - - let room_version = RoomVersion::new(&room_version_id).expect("room version is supported"); - - let auth_events = self - .services - .state - .get_auth_events(room_id, &event_type, sender, state_key.as_deref(), &content) - .await?; - - // Our depth is the maximum depth of prev_events + 1 - let depth = prev_events - .iter() - .stream() - .map(Ok) - .and_then(|event_id| self.get_pdu(event_id)) - .and_then(|pdu| future::ok(pdu.depth)) - .ignore_err() - .ready_fold(uint!(0), cmp::max) - .await - .saturating_add(uint!(1)); - - let mut unsigned = unsigned.unwrap_or_default(); - - if let Some(state_key) = &state_key { - if let Ok(prev_pdu) = self - .services - .state_accessor - .room_state_get(room_id, &event_type.to_string().into(), state_key) - .await - { - unsigned.insert("prev_content".to_owned(), prev_pdu.get_content_as_value()); - unsigned - .insert("prev_sender".to_owned(), serde_json::to_value(prev_pdu.sender())?); - unsigned.insert( - "replaces_state".to_owned(), - serde_json::to_value(prev_pdu.event_id())?, - ); - } - } - if event_type != TimelineEventType::RoomCreate && prev_events.is_empty() { - return Err!(Request(Unknown("Event incorrectly had zero prev_events."))); - } - if state_key.is_none() && depth.lt(&uint!(2)) { - // The first two events in a room are always m.room.create and m.room.member, - // so any other events with that same depth are illegal. - warn!( - "Had unsafe depth {depth} when creating non-state event in {room_id}. Cowardly \ - aborting" - ); - return Err!(Request(Unknown("Unsafe depth for non-state event."))); - } - - let mut pdu = PduEvent { - event_id: ruma::event_id!("$thiswillbefilledinlater").into(), - room_id: room_id.to_owned(), - sender: sender.to_owned(), - origin: None, - origin_server_ts: timestamp.map_or_else( - || { - utils::millis_since_unix_epoch() - .try_into() - .expect("u64 fits into UInt") - }, - |ts| ts.get(), - ), - kind: event_type, - content, - state_key, - prev_events, - depth, - auth_events: auth_events - .values() - .map(|pdu| pdu.event_id.clone()) - .collect(), - redacts, - unsigned: if unsigned.is_empty() { - None - } else { - Some(to_raw_value(&unsigned)?) - }, - hashes: EventHash { sha256: "aaa".to_owned() }, - signatures: None, - }; - - let auth_fetch = |k: &StateEventType, s: &str| { - let key = (k.clone(), s.into()); - ready(auth_events.get(&key).map(ToOwned::to_owned)) - }; - - let auth_check = state_res::auth_check( - &room_version, - &pdu, - None, // TODO: third_party_invite - auth_fetch, - ) - .await - .map_err(|e| err!(Request(Forbidden(warn!("Auth check failed: {e:?}")))))?; - - if !auth_check { - return Err!(Request(Forbidden("Event is not authorized."))); - } - - // Hash and sign - let mut pdu_json = utils::to_canonical_object(&pdu).map_err(|e| { - err!(Request(BadJson(warn!("Failed to convert PDU to canonical JSON: {e}")))) - })?; - - // room v3 and above removed the "event_id" field from remote PDU format - match room_version_id { - | RoomVersionId::V1 | RoomVersionId::V2 => {}, - | _ => { - pdu_json.remove("event_id"); - }, - } - - // Add origin because synapse likes that (and it's required in the spec) - pdu_json.insert( - "origin".to_owned(), - to_canonical_value(self.services.globals.server_name()) - .expect("server name is a valid CanonicalJsonValue"), - ); - - if let Err(e) = self - .services - .server_keys - .hash_and_sign_event(&mut pdu_json, &room_version_id) - { - return match e { - | Error::Signatures(ruma::signatures::Error::PduSize) => { - Err!(Request(TooLarge("Message/PDU is too long (exceeds 65535 bytes)"))) - }, - | _ => Err!(Request(Unknown(warn!("Signing event failed: {e}")))), - }; - } - - // Generate event id - pdu.event_id = gen_event_id(&pdu_json, &room_version_id)?; - - pdu_json - .insert("event_id".into(), CanonicalJsonValue::String(pdu.event_id.clone().into())); - - // Generate short event id - let _shorteventid = self - .services - .short - .get_or_create_shorteventid(&pdu.event_id) - .await; - - Ok((pdu, pdu_json)) - } - - /// Creates a new persisted data unit and adds it to a room. This function - /// takes a roomid_mutex_state, meaning that only this function is able to - /// mutate the room state. - #[tracing::instrument(skip(self, state_lock), level = "debug")] - pub async fn build_and_append_pdu( - &self, - pdu_builder: PduBuilder, - sender: &UserId, - room_id: &RoomId, - state_lock: &RoomMutexGuard, - ) -> Result { - let (pdu, pdu_json) = self - .create_hash_and_sign_event(pdu_builder, sender, room_id, state_lock) - .await?; - - if self.services.admin.is_admin_room(&pdu.room_id).await { - self.check_pdu_for_admin_room(&pdu, sender).boxed().await?; - } - - // If redaction event is not authorized, do not append it to the timeline - if pdu.kind == TimelineEventType::RoomRedaction { - use RoomVersionId::*; - match self.services.state.get_room_version(&pdu.room_id).await? { - | V1 | V2 | V3 | V4 | V5 | V6 | V7 | V8 | V9 | V10 => { - if let Some(redact_id) = &pdu.redacts { - if !self - .services - .state_accessor - .user_can_redact(redact_id, &pdu.sender, &pdu.room_id, false) - .await? - { - return Err!(Request(Forbidden("User cannot redact this event."))); - } - } - }, - | _ => { - let content: RoomRedactionEventContent = pdu.get_content()?; - if let Some(redact_id) = &content.redacts { - if !self - .services - .state_accessor - .user_can_redact(redact_id, &pdu.sender, &pdu.room_id, false) - .await? - { - return Err!(Request(Forbidden("User cannot redact this event."))); - } - } - }, - } - } - - if pdu.kind == TimelineEventType::RoomMember { - let content: RoomMemberEventContent = pdu.get_content()?; - - if content.join_authorized_via_users_server.is_some() - && content.membership != MembershipState::Join - { - return Err!(Request(BadJson( - "join_authorised_via_users_server is only for member joins" - ))); - } - - if content - .join_authorized_via_users_server - .as_ref() - .is_some_and(|authorising_user| { - !self.services.globals.user_is_local(authorising_user) - }) { - return Err!(Request(InvalidParam( - "Authorising user does not belong to this homeserver" - ))); - } - } - - // We append to state before appending the pdu, so we don't have a moment in - // time with the pdu without it's state. This is okay because append_pdu can't - // fail. - let statehashid = self.services.state.append_to_state(&pdu).await?; - - let pdu_id = self - .append_pdu( - &pdu, - pdu_json, - // Since this PDU references all pdu_leaves we can update the leaves - // of the room - once(pdu.event_id.borrow()), - state_lock, - ) - .boxed() - .await?; - - // We set the room state after inserting the pdu, so that we never have a moment - // in time where events in the current room state do not exist - self.services - .state - .set_room_state(&pdu.room_id, statehashid, state_lock); - - let mut servers: HashSet = self - .services - .state_cache - .room_servers(&pdu.room_id) - .map(ToOwned::to_owned) - .collect() - .await; - - // In case we are kicking or banning a user, we need to inform their server of - // the change - if pdu.kind == TimelineEventType::RoomMember { - if let Some(state_key_uid) = &pdu - .state_key - .as_ref() - .and_then(|state_key| UserId::parse(state_key.as_str()).ok()) - { - servers.insert(state_key_uid.server_name().to_owned()); - } - } - - // Remove our server from the server list since it will be added to it by - // room_servers() and/or the if statement above - servers.remove(self.services.globals.server_name()); - - self.services - .sending - .send_pdu_servers(servers.iter().map(AsRef::as_ref).stream(), &pdu_id) - .await?; - - Ok(pdu.event_id) - } - - /// Append the incoming event setting the state snapshot to the state from - /// the server that sent the event. - #[tracing::instrument(level = "debug", skip_all)] - pub async fn append_incoming_pdu<'a, Leaves>( - &'a self, - pdu: &'a PduEvent, - pdu_json: CanonicalJsonObject, - new_room_leaves: Leaves, - state_ids_compressed: Arc, - soft_fail: bool, - state_lock: &'a RoomMutexGuard, - ) -> Result> - where - Leaves: Iterator + Send + 'a, - { - // We append to state before appending the pdu, so we don't have a moment in - // time with the pdu without it's state. This is okay because append_pdu can't - // fail. - self.services - .state - .set_event_state(&pdu.event_id, &pdu.room_id, state_ids_compressed) - .await?; - - if soft_fail { - self.services - .pdu_metadata - .mark_as_referenced(&pdu.room_id, pdu.prev_events.iter().map(AsRef::as_ref)); - - self.services - .state - .set_forward_extremities(&pdu.room_id, new_room_leaves, state_lock) - .await; - - return Ok(None); - } - - let pdu_id = self - .append_pdu(pdu, pdu_json, new_room_leaves, state_lock) - .await?; - - Ok(Some(pdu_id)) - } - /// Returns an iterator over all PDUs in a room. Unknown rooms produce no /// items. #[inline] @@ -1030,290 +266,4 @@ impl Service { self.db .pdus(user_id, room_id, from.unwrap_or_else(PduCount::min)) } - - /// Replace a PDU with the redacted form. - #[tracing::instrument(name = "redact", level = "debug", skip(self))] - pub async fn redact_pdu( - &self, - event_id: &EventId, - reason: &Pdu, - shortroomid: ShortRoomId, - ) -> Result { - // TODO: Don't reserialize, keep original json - let Ok(pdu_id) = self.get_pdu_id(event_id).await else { - // If event does not exist, just noop - return Ok(()); - }; - - let mut pdu = self - .get_pdu_from_id(&pdu_id) - .await - .map(Event::into_pdu) - .map_err(|e| { - err!(Database(error!(?pdu_id, ?event_id, ?e, "PDU ID points to invalid PDU."))) - })?; - - if let Ok(content) = pdu.get_content::() { - if let Some(body) = content.body { - self.services - .search - .deindex_pdu(shortroomid, &pdu_id, &body); - } - } - - let room_version_id = self.services.state.get_room_version(pdu.room_id()).await?; - - pdu.redact(&room_version_id, reason.to_value())?; - - let obj = utils::to_canonical_object(&pdu).map_err(|e| { - err!(Database(error!(?event_id, ?e, "Failed to convert PDU to canonical JSON"))) - })?; - - self.replace_pdu(&pdu_id, &obj).await - } - - #[tracing::instrument(name = "backfill", level = "debug", skip(self))] - pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Result<()> { - if self - .services - .state_cache - .room_joined_count(room_id) - .await - .is_ok_and(|count| count <= 1) - && !self - .services - .state_accessor - .is_world_readable(room_id) - .await - { - // Room is empty (1 user or none), there is no one that can backfill - return Ok(()); - } - - let first_pdu = self - .first_item_in_room(room_id) - .await - .expect("Room is not empty"); - - if first_pdu.0 < from { - // No backfill required, there are still events between them - return Ok(()); - } - - let power_levels: RoomPowerLevelsEventContent = self - .services - .state_accessor - .room_state_get_content(room_id, &StateEventType::RoomPowerLevels, "") - .await - .unwrap_or_default(); - - let room_mods = power_levels.users.iter().filter_map(|(user_id, level)| { - if level > &power_levels.users_default - && !self.services.globals.user_is_local(user_id) - { - Some(user_id.server_name()) - } else { - None - } - }); - - let canonical_room_alias_server = once( - self.services - .state_accessor - .get_canonical_alias(room_id) - .await, - ) - .filter_map(Result::ok) - .map(|alias| alias.server_name().to_owned()) - .stream(); - - let mut servers = room_mods - .stream() - .map(ToOwned::to_owned) - .chain(canonical_room_alias_server) - .chain( - self.services - .server - .config - .trusted_servers - .iter() - .map(ToOwned::to_owned) - .stream(), - ) - .ready_filter(|server_name| !self.services.globals.server_is_ours(server_name)) - .filter_map(|server_name| async move { - self.services - .state_cache - .server_in_room(&server_name, room_id) - .await - .then_some(server_name) - }) - .boxed(); - - while let Some(ref backfill_server) = servers.next().await { - info!("Asking {backfill_server} for backfill"); - let response = self - .services - .sending - .send_federation_request( - backfill_server, - federation::backfill::get_backfill::v1::Request { - room_id: room_id.to_owned(), - v: vec![first_pdu.1.event_id().to_owned()], - limit: uint!(100), - }, - ) - .await; - match response { - | Ok(response) => { - for pdu in response.pdus { - if let Err(e) = self.backfill_pdu(backfill_server, pdu).boxed().await { - debug_warn!("Failed to add backfilled pdu in room {room_id}: {e}"); - } - } - return Ok(()); - }, - | Err(e) => { - warn!("{backfill_server} failed to provide backfill for room {room_id}: {e}"); - }, - } - } - - info!("No servers could backfill, but backfill was needed in room {room_id}"); - Ok(()) - } - - #[tracing::instrument(skip(self, pdu), level = "debug")] - pub async fn backfill_pdu(&self, origin: &ServerName, pdu: Box) -> Result<()> { - let (room_id, event_id, value) = - self.services.event_handler.parse_incoming_pdu(&pdu).await?; - - // Lock so we cannot backfill the same pdu twice at the same time - let mutex_lock = self - .services - .event_handler - .mutex_federation - .lock(&room_id) - .await; - - // Skip the PDU if we already have it as a timeline event - if let Ok(pdu_id) = self.get_pdu_id(&event_id).await { - debug!("We already know {event_id} at {pdu_id:?}"); - return Ok(()); - } - - self.services - .event_handler - .handle_incoming_pdu(origin, &room_id, &event_id, value, false) - .boxed() - .await?; - - let value = self.get_pdu_json(&event_id).await?; - - let pdu = self.get_pdu(&event_id).await?; - - let shortroomid = self.services.short.get_shortroomid(&room_id).await?; - - let insert_lock = self.mutex_insert.lock(&room_id).await; - - let count: i64 = self.services.globals.next_count().unwrap().try_into()?; - - let pdu_id: RawPduId = PduId { - shortroomid, - shorteventid: PduCount::Backfilled(validated!(0 - count)), - } - .into(); - - // Insert pdu - self.db.prepend_backfill_pdu(&pdu_id, &event_id, &value); - - drop(insert_lock); - - if pdu.kind == TimelineEventType::RoomMessage { - let content: ExtractBody = pdu.get_content()?; - if let Some(body) = content.body { - self.services.search.index_pdu(shortroomid, &pdu_id, &body); - } - } - drop(mutex_lock); - - debug!("Prepended backfill pdu"); - Ok(()) - } -} - -#[implement(Service)] -#[tracing::instrument(skip_all, level = "debug")] -async fn check_pdu_for_admin_room(&self, pdu: &Pdu, sender: &UserId) -> Result -where - Pdu: Event + Send + Sync, -{ - match pdu.kind() { - | TimelineEventType::RoomEncryption => { - return Err!(Request(Forbidden(error!("Encryption not supported in admins room.")))); - }, - | TimelineEventType::RoomMember => { - let target = pdu - .state_key() - .filter(|v| v.starts_with('@')) - .unwrap_or(sender.as_str()); - - let server_user = &self.services.globals.server_user.to_string(); - - let content: RoomMemberEventContent = pdu.get_content()?; - match content.membership { - | MembershipState::Leave => { - if target == server_user { - return Err!(Request(Forbidden(error!( - "Server user cannot leave the admins room." - )))); - } - - let count = self - .services - .state_cache - .room_members(pdu.room_id()) - .ready_filter(|user| self.services.globals.user_is_local(user)) - .ready_filter(|user| *user != target) - .boxed() - .count() - .await; - - if count < 2 { - return Err!(Request(Forbidden(error!( - "Last admin cannot leave the admins room." - )))); - } - }, - - | MembershipState::Ban if pdu.state_key().is_some() => { - if target == server_user { - return Err!(Request(Forbidden(error!( - "Server cannot be banned from admins room." - )))); - } - - let count = self - .services - .state_cache - .room_members(pdu.room_id()) - .ready_filter(|user| self.services.globals.user_is_local(user)) - .ready_filter(|user| *user != target) - .boxed() - .count() - .await; - - if count < 2 { - return Err!(Request(Forbidden(error!( - "Last admin cannot be banned from admins room." - )))); - } - }, - | _ => {}, - } - }, - | _ => {}, - } - - Ok(()) } diff --git a/src/service/rooms/timeline/redact.rs b/src/service/rooms/timeline/redact.rs new file mode 100644 index 00000000..d51a8462 --- /dev/null +++ b/src/service/rooms/timeline/redact.rs @@ -0,0 +1,51 @@ +use conduwuit_core::{ + Result, err, implement, + matrix::event::Event, + utils::{self}, +}; +use ruma::EventId; + +use super::ExtractBody; +use crate::rooms::short::ShortRoomId; + +/// Replace a PDU with the redacted form. +#[implement(super::Service)] +#[tracing::instrument(name = "redact", level = "debug", skip(self))] +pub async fn redact_pdu( + &self, + event_id: &EventId, + reason: &Pdu, + shortroomid: ShortRoomId, +) -> Result { + // TODO: Don't reserialize, keep original json + let Ok(pdu_id) = self.get_pdu_id(event_id).await else { + // If event does not exist, just noop + return Ok(()); + }; + + let mut pdu = self + .get_pdu_from_id(&pdu_id) + .await + .map(Event::into_pdu) + .map_err(|e| { + err!(Database(error!(?pdu_id, ?event_id, ?e, "PDU ID points to invalid PDU."))) + })?; + + if let Ok(content) = pdu.get_content::() { + if let Some(body) = content.body { + self.services + .search + .deindex_pdu(shortroomid, &pdu_id, &body); + } + } + + let room_version_id = self.services.state.get_room_version(pdu.room_id()).await?; + + pdu.redact(&room_version_id, reason.to_value())?; + + let obj = utils::to_canonical_object(&pdu).map_err(|e| { + err!(Database(error!(?event_id, ?e, "Failed to convert PDU to canonical JSON"))) + })?; + + self.replace_pdu(&pdu_id, &obj).await +}