feat(hydra): Initial public commit for v12 support

This commit is contained in:
nexy7574 2025-07-25 11:17:29 +01:00
commit acde010fde
No known key found for this signature in database
59 changed files with 954 additions and 397 deletions

View file

@ -195,11 +195,11 @@ async fn get_auth_chain_inner(
debug_error!(?event_id, ?e, "Could not find pdu mentioned in auth events");
},
| Ok(pdu) => {
if pdu.room_id != room_id {
if pdu.room_id.is_some() && pdu.room_id != Some(room_id.to_owned()) {
return Err!(Request(Forbidden(error!(
?event_id,
?room_id,
wrong_room_id = ?pdu.room_id,
wrong_room_id = ?pdu.room_id.unwrap(),
"auth event for incorrect room"
))));
}

View file

@ -58,6 +58,10 @@ pub async fn handle_incoming_pdu<'a>(
value: BTreeMap<String, CanonicalJsonValue>,
is_timeline_event: bool,
) -> Result<Option<RawPduId>> {
if room_id.is_empty() {
// TODO(hydra): Room IDs should be calculated before this function is called
panic!("room ID cannot be empty");
}
// 1. Skip the PDU if we already have it as a timeline event
if let Ok(pdu_id) = self.services.timeline.get_pdu_id(event_id).await {
return Ok(Some(pdu_id));

View file

@ -139,6 +139,7 @@ where
&pdu_event,
None, // TODO: third party invite
state_fetch,
create_event.as_pdu(),
)
.await
.map_err(|e| err!(Request(Forbidden("Auth check failed: {e:?}"))))?;

View file

@ -99,7 +99,10 @@ impl Service {
}
fn check_room_id<Pdu: Event>(room_id: &RoomId, pdu: &Pdu) -> Result {
if pdu.room_id() != room_id {
if pdu
.room_id()
.is_some_and(|claimed_room_id| claimed_room_id != room_id)
{
return Err!(Request(InvalidParam(error!(
pdu_event_id = ?pdu.event_id(),
pdu_room_id = ?pdu.room_id(),

View file

@ -102,6 +102,7 @@ where
&incoming_pdu,
None, // TODO: third party invite
|ty, sk| state_fetch(ty.clone(), sk.into()),
create_event.as_pdu(),
)
.await
.map_err(|e| err!(Request(Forbidden("Auth check failed: {e:?}"))))?;
@ -123,6 +124,7 @@ where
incoming_pdu.sender(),
incoming_pdu.state_key(),
incoming_pdu.content(),
&room_version,
)
.await?;
@ -140,6 +142,7 @@ where
&incoming_pdu,
None, // third-party invite
state_fetch,
create_event.as_pdu(),
)
.await
.map_err(|e| err!(Request(Forbidden("Auth check failed: {e:?}"))))?;
@ -156,7 +159,7 @@ where
!self
.services
.state_accessor
.user_can_redact(&redact_id, incoming_pdu.sender(), incoming_pdu.room_id(), true)
.user_can_redact(&redact_id, incoming_pdu.sender(), room_id, true)
.await?,
};
@ -313,6 +316,7 @@ where
state_ids_compressed,
soft_fail,
&state_lock,
room_id,
)
.await?;
@ -347,6 +351,7 @@ where
state_ids_compressed,
soft_fail,
&state_lock,
room_id,
)
.await?;

View file

@ -124,7 +124,7 @@ pub async fn search_pdus<'a>(
.wide_filter_map(move |pdu| async move {
self.services
.state_accessor
.user_can_see_event(query.user_id?, pdu.room_id(), pdu.event_id())
.user_can_see_event(query.user_id?, pdu.room_id().unwrap(), pdu.event_id())
.await
.then_some(pdu)
})

View file

@ -1,6 +1,7 @@
use std::{collections::HashMap, fmt::Write, iter::once, sync::Arc};
use async_trait::async_trait;
use conduwuit::{RoomVersion, debug};
use conduwuit_core::{
Event, PduEvent, Result, err,
result::FlatOk,
@ -15,6 +16,7 @@ use conduwuit_database::{Deserialized, Ignore, Interfix, Map};
use futures::{
FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future::join_all, pin_mut,
};
use log::trace;
use ruma::{
EventId, OwnedEventId, OwnedRoomId, RoomId, RoomVersionId, UserId,
events::{
@ -148,7 +150,7 @@ impl Service {
.roomid_spacehierarchy_cache
.lock()
.await
.remove(&pdu.room_id);
.remove(room_id);
},
| _ => continue,
}
@ -239,7 +241,7 @@ impl Service {
/// This adds all current state events (not including the incoming event)
/// to `stateid_pduid` and adds the incoming event to `eventid_statehash`.
#[tracing::instrument(skip(self, new_pdu), level = "debug")]
pub async fn append_to_state(&self, new_pdu: &PduEvent) -> Result<u64> {
pub async fn append_to_state(&self, new_pdu: &PduEvent, room_id: &RoomId) -> Result<u64> {
const BUFSIZE: usize = size_of::<u64>();
let shorteventid = self
@ -248,7 +250,7 @@ impl Service {
.get_or_create_shorteventid(&new_pdu.event_id)
.await;
let previous_shortstatehash = self.get_room_shortstatehash(&new_pdu.room_id).await;
let previous_shortstatehash = self.get_room_shortstatehash(room_id).await;
if let Ok(p) = previous_shortstatehash {
self.db
@ -319,7 +321,11 @@ impl Service {
}
#[tracing::instrument(skip_all, level = "debug")]
pub async fn summary_stripped<'a, E>(&self, event: &'a E) -> Vec<Raw<AnyStrippedStateEvent>>
pub async fn summary_stripped<'a, E>(
&self,
event: &'a E,
room_id: &RoomId,
) -> Vec<Raw<AnyStrippedStateEvent>>
where
E: Event + Send + Sync,
&'a E: Event + Send,
@ -338,7 +344,7 @@ impl Service {
let fetches = cells.into_iter().map(|(event_type, state_key)| {
self.services
.state_accessor
.room_state_get(event.room_id(), event_type, state_key)
.room_state_get(room_id, event_type, state_key)
});
join_all(fetches)
@ -421,7 +427,7 @@ impl Service {
}
/// This fetches auth events from the current state.
#[tracing::instrument(skip(self, content), level = "debug")]
#[tracing::instrument(skip(self, content, room_version), level = "trace")]
pub async fn get_auth_events(
&self,
room_id: &RoomId,
@ -429,13 +435,15 @@ impl Service {
sender: &UserId,
state_key: Option<&str>,
content: &serde_json::value::RawValue,
room_version: &RoomVersion,
) -> Result<StateMap<PduEvent>> {
let Ok(shortstatehash) = self.get_room_shortstatehash(room_id).await else {
return Ok(HashMap::new());
};
let auth_types = state_res::auth_types_for_event(kind, sender, state_key, content)?;
let auth_types =
state_res::auth_types_for_event(kind, sender, state_key, content, room_version)?;
debug!(?auth_types, "Auth types for event");
let sauthevents: HashMap<_, _> = auth_types
.iter()
.stream()
@ -448,6 +456,7 @@ impl Service {
})
.collect()
.await;
debug!(?sauthevents, "Auth events to fetch");
let (state_keys, event_ids): (Vec<_>, Vec<_>) = self
.services
@ -461,7 +470,7 @@ impl Service {
})
.unzip()
.await;
debug!(?state_keys, ?event_ids, "Auth events found in state");
self.services
.short
.multi_get_eventid_from_short(event_ids.into_iter().stream())
@ -473,6 +482,7 @@ impl Service {
.get_pdu(&event_id)
.await
.map(move |pdu| (((*ty).clone(), (*sk).clone()), pdu))
.inspect_err(|e| warn!("Failed to get auth event {event_id}: {e:?}"))
.ok()
})
.collect()

View file

@ -161,7 +161,7 @@ pub async fn user_can_invite(
&RoomMemberEventContent::new(MembershipState::Invite),
),
sender,
room_id,
Some(room_id),
state_lock,
)
.await

View file

@ -42,6 +42,7 @@ pub async fn append_incoming_pdu<'a, Leaves>(
state_ids_compressed: Arc<CompressedState>,
soft_fail: bool,
state_lock: &'a RoomMutexGuard,
room_id: &'a ruma::RoomId,
) -> Result<Option<RawPduId>>
where
Leaves: Iterator<Item = &'a EventId> + Send + 'a,
@ -51,24 +52,24 @@ where
// fail.
self.services
.state
.set_event_state(&pdu.event_id, &pdu.room_id, state_ids_compressed)
.set_event_state(&pdu.event_id, room_id, state_ids_compressed)
.await?;
if soft_fail {
self.services
.pdu_metadata
.mark_as_referenced(&pdu.room_id, pdu.prev_events.iter().map(AsRef::as_ref));
.mark_as_referenced(room_id, pdu.prev_events.iter().map(AsRef::as_ref));
self.services
.state
.set_forward_extremities(&pdu.room_id, new_room_leaves, state_lock)
.set_forward_extremities(room_id, new_room_leaves, state_lock)
.await;
return Ok(None);
}
let pdu_id = self
.append_pdu(pdu, pdu_json, new_room_leaves, state_lock)
.append_pdu(pdu, pdu_json, new_room_leaves, state_lock, room_id)
.await?;
Ok(Some(pdu_id))
@ -88,6 +89,7 @@ pub async fn append_pdu<'a, Leaves>(
mut pdu_json: CanonicalJsonObject,
leaves: Leaves,
state_lock: &'a RoomMutexGuard,
room_id: &'a ruma::RoomId,
) -> Result<RawPduId>
where
Leaves: Iterator<Item = &'a EventId> + Send + 'a,
@ -98,7 +100,7 @@ where
let shortroomid = self
.services
.short
.get_shortroomid(pdu.room_id())
.get_shortroomid(room_id)
.await
.map_err(|_| err!(Database("Room does not exist")))?;
@ -151,14 +153,14 @@ where
// We must keep track of all events that have been referenced.
self.services
.pdu_metadata
.mark_as_referenced(pdu.room_id(), pdu.prev_events().map(AsRef::as_ref));
.mark_as_referenced(room_id, pdu.prev_events().map(AsRef::as_ref));
self.services
.state
.set_forward_extremities(pdu.room_id(), leaves, state_lock)
.set_forward_extremities(room_id, leaves, state_lock)
.await;
let insert_lock = self.mutex_insert.lock(pdu.room_id()).await;
let insert_lock = self.mutex_insert.lock(room_id).await;
let count1 = self.services.globals.next_count().unwrap();
@ -166,11 +168,11 @@ where
// appending fails
self.services
.read_receipt
.private_read_set(pdu.room_id(), pdu.sender(), count1);
.private_read_set(room_id, pdu.sender(), count1);
self.services
.user
.reset_notification_counts(pdu.sender(), pdu.room_id());
.reset_notification_counts(pdu.sender(), room_id);
let count2 = PduCount::Normal(self.services.globals.next_count().unwrap());
let pdu_id: RawPduId = PduId { shortroomid, shorteventid: count2 }.into();
@ -184,14 +186,14 @@ where
let power_levels: RoomPowerLevelsEventContent = self
.services
.state_accessor
.room_state_get_content(pdu.room_id(), &StateEventType::RoomPowerLevels, "")
.room_state_get_content(room_id, &StateEventType::RoomPowerLevels, "")
.await
.unwrap_or_default();
let mut push_target: HashSet<_> = self
.services
.state_cache
.active_local_users_in_room(pdu.room_id())
.active_local_users_in_room(room_id)
.map(ToOwned::to_owned)
// Don't notify the sender of their own events, and dont send from ignored users
.ready_filter(|user| *user != pdu.sender())
@ -230,7 +232,7 @@ where
for action in self
.services
.pusher
.get_actions(user, &rules_for_user, &power_levels, &serialized, pdu.room_id())
.get_actions(user, &rules_for_user, &power_levels, &serialized, room_id)
.await
{
match action {
@ -268,20 +270,20 @@ where
}
self.db
.increment_notification_counts(pdu.room_id(), notifies, highlights);
.increment_notification_counts(room_id, notifies, highlights);
match *pdu.kind() {
| TimelineEventType::RoomRedaction => {
use RoomVersionId::*;
let room_version_id = self.services.state.get_room_version(pdu.room_id()).await?;
let room_version_id = self.services.state.get_room_version(room_id).await?;
match room_version_id {
| V1 | V2 | V3 | V4 | V5 | V6 | V7 | V8 | V9 | V10 => {
if let Some(redact_id) = pdu.redacts() {
if self
.services
.state_accessor
.user_can_redact(redact_id, pdu.sender(), pdu.room_id(), false)
.user_can_redact(redact_id, pdu.sender(), room_id, false)
.await?
{
self.redact_pdu(redact_id, pdu, shortroomid).await?;
@ -294,7 +296,7 @@ where
if self
.services
.state_accessor
.user_can_redact(redact_id, pdu.sender(), pdu.room_id(), false)
.user_can_redact(redact_id, pdu.sender(), room_id, false)
.await?
{
self.redact_pdu(redact_id, pdu, shortroomid).await?;
@ -310,7 +312,7 @@ where
.roomid_spacehierarchy_cache
.lock()
.await
.remove(pdu.room_id());
.remove(room_id);
},
| TimelineEventType::RoomMember => {
if let Some(state_key) = pdu.state_key() {
@ -320,8 +322,12 @@ where
let content: RoomMemberEventContent = pdu.get_content()?;
let stripped_state = match content.membership {
| MembershipState::Invite | MembershipState::Knock =>
self.services.state.summary_stripped(pdu).await.into(),
| MembershipState::Invite | MembershipState::Knock => self
.services
.state
.summary_stripped(pdu, room_id)
.await
.into(),
| _ => None,
};
@ -331,7 +337,7 @@ where
self.services
.state_cache
.update_membership(
pdu.room_id(),
room_id,
target_user_id,
content,
pdu.sender(),
@ -392,7 +398,7 @@ where
if self
.services
.state_cache
.appservice_in_room(pdu.room_id(), appservice)
.appservice_in_room(room_id, appservice)
.await
{
self.services
@ -430,12 +436,12 @@ where
let matching_aliases = |aliases: NamespaceRegex| {
self.services
.alias
.local_aliases_for_room(pdu.room_id())
.local_aliases_for_room(room_id)
.ready_any(move |room_alias| aliases.is_match(room_alias.as_str()))
};
if matching_aliases(appservice.aliases.clone()).await
|| appservice.rooms.is_match(pdu.room_id().as_str())
|| appservice.rooms.is_match(room_id.as_str())
|| matching_users(&appservice.users)
{
self.services

View file

@ -1,5 +1,6 @@
use std::{collections::HashSet, iter::once};
use conduwuit::{RoomVersion, debug_warn, trace};
use conduwuit_core::{
Err, Result, implement,
matrix::{event::Event, pdu::PduBuilder},
@ -11,6 +12,7 @@ use ruma::{
events::{
TimelineEventType,
room::{
create::RoomCreateEventContent,
member::{MembershipState, RoomMemberEventContent},
redaction::RoomRedactionEventContent,
},
@ -23,32 +25,36 @@ use super::RoomMutexGuard;
/// takes a roomid_mutex_state, meaning that only this function is able to
/// mutate the room state.
#[implement(super::Service)]
#[tracing::instrument(skip(self, state_lock), level = "debug")]
#[tracing::instrument(skip(self, state_lock), level = "trace")]
pub async fn build_and_append_pdu(
&self,
pdu_builder: PduBuilder,
sender: &UserId,
room_id: &RoomId,
room_id: Option<&RoomId>,
state_lock: &RoomMutexGuard,
) -> Result<OwnedEventId> {
let (pdu, pdu_json) = self
.create_hash_and_sign_event(pdu_builder, sender, room_id, state_lock)
.await?;
if self.services.admin.is_admin_room(pdu.room_id()).await {
let room_id = pdu.room_id_or_hash();
trace!("Checking if room {room_id} is an admin room");
if self.services.admin.is_admin_room(&room_id).await {
trace!("Room {room_id} is an admin room, checking PDU for admin room restrictions");
self.check_pdu_for_admin_room(&pdu, sender).boxed().await?;
}
// If redaction event is not authorized, do not append it to the timeline
if *pdu.kind() == TimelineEventType::RoomRedaction {
use RoomVersionId::*;
match self.services.state.get_room_version(pdu.room_id()).await? {
trace!("Running redaction checks for room {room_id}");
match self.services.state.get_room_version(&room_id).await? {
| V1 | V2 | V3 | V4 | V5 | V6 | V7 | V8 | V9 | V10 => {
if let Some(redact_id) = pdu.redacts() {
if !self
.services
.state_accessor
.user_can_redact(redact_id, pdu.sender(), pdu.room_id(), false)
.user_can_redact(redact_id, pdu.sender(), &room_id, false)
.await?
{
return Err!(Request(Forbidden("User cannot redact this event.")));
@ -61,7 +67,7 @@ pub async fn build_and_append_pdu(
if !self
.services
.state_accessor
.user_can_redact(redact_id, pdu.sender(), pdu.room_id(), false)
.user_can_redact(redact_id, pdu.sender(), &room_id, false)
.await?
{
return Err!(Request(Forbidden("User cannot redact this event.")));
@ -72,6 +78,7 @@ pub async fn build_and_append_pdu(
}
if *pdu.kind() == TimelineEventType::RoomMember {
trace!("Running room member checks for room {room_id}");
let content: RoomMemberEventContent = pdu.get_content()?;
if content.join_authorized_via_users_server.is_some()
@ -93,12 +100,27 @@ pub async fn build_and_append_pdu(
)));
}
}
if *pdu.kind() == TimelineEventType::RoomCreate {
trace!("Running room create checks for room {room_id}");
let content: RoomCreateEventContent = pdu.get_content()?;
let room_features = RoomVersion::new(&content.room_version)?;
if room_features.room_ids_as_hashes {
// bootstrap shortid for room
debug_warn!(%room_id, "Bootstrapping shortid for room");
self.services
.short
.get_or_create_shortroomid(&room_id)
.await;
}
}
// We append to state before appending the pdu, so we don't have a moment in
// time with the pdu without it's state. This is okay because append_pdu can't
// fail.
let statehashid = self.services.state.append_to_state(&pdu).await?;
trace!("Appending {} state for room {room_id}", pdu.event_id());
let statehashid = self.services.state.append_to_state(&pdu, &room_id).await?;
trace!("Generating raw ID for PDU {}", pdu.event_id());
let pdu_id = self
.append_pdu(
&pdu,
@ -107,20 +129,22 @@ pub async fn build_and_append_pdu(
// of the room
once(pdu.event_id()),
state_lock,
&room_id,
)
.boxed()
.await?;
// We set the room state after inserting the pdu, so that we never have a moment
// in time where events in the current room state do not exist
trace!("Setting room state for room {room_id}");
self.services
.state
.set_room_state(pdu.room_id(), statehashid, state_lock);
.set_room_state(&room_id, statehashid, state_lock);
let mut servers: HashSet<OwnedServerName> = self
.services
.state_cache
.room_servers(pdu.room_id())
.room_servers(&room_id)
.map(ToOwned::to_owned)
.collect()
.await;
@ -141,11 +165,13 @@ pub async fn build_and_append_pdu(
// room_servers() and/or the if statement above
servers.remove(self.services.globals.server_name());
trace!("Sending PDU {} to {} servers", pdu.event_id(), servers.len());
self.services
.sending
.send_pdu_servers(servers.iter().map(AsRef::as_ref).stream(), &pdu_id)
.await?;
trace!("Event {} in room {:?} has been appended", pdu.event_id(), room_id);
Ok(pdu.event_id().to_owned())
}
@ -179,7 +205,7 @@ where
let count = self
.services
.state_cache
.room_members(pdu.room_id())
.room_members(&pdu.room_id_or_hash())
.ready_filter(|user| self.services.globals.user_is_local(user))
.ready_filter(|user| *user != target)
.boxed()
@ -203,7 +229,7 @@ where
let count = self
.services
.state_cache
.room_members(pdu.room_id())
.room_members(&pdu.room_id_or_hash())
.ready_filter(|user| self.services.globals.user_is_local(user))
.ready_filter(|user| *user != target)
.boxed()

View file

@ -1,5 +1,6 @@
use std::cmp;
use std::{cmp, collections::HashMap};
use conduwuit::{smallstr::SmallString, trace};
use conduwuit_core::{
Err, Error, Result, err, implement,
matrix::{
@ -11,12 +12,13 @@ use conduwuit_core::{
};
use futures::{StreamExt, TryStreamExt, future, future::ready};
use ruma::{
CanonicalJsonObject, CanonicalJsonValue, OwnedEventId, RoomId, RoomVersionId, UserId,
CanonicalJsonObject, CanonicalJsonValue, OwnedEventId, OwnedRoomId, RoomId, RoomVersionId,
UserId,
canonical_json::to_canonical_value,
events::{StateEventType, TimelineEventType, room::create::RoomCreateEventContent},
uint,
};
use serde_json::value::to_raw_value;
use serde_json::value::{RawValue, to_raw_value};
use tracing::warn;
use super::RoomMutexGuard;
@ -26,10 +28,25 @@ pub async fn create_hash_and_sign_event(
&self,
pdu_builder: PduBuilder,
sender: &UserId,
room_id: &RoomId,
room_id: Option<&RoomId>,
_mutex_lock: &RoomMutexGuard, /* Take mutex guard to make sure users get the room
* state mutex */
) -> Result<(PduEvent, CanonicalJsonObject)> {
fn from_evt(
room_id: OwnedRoomId,
event_type: TimelineEventType,
content: Box<RawValue>,
) -> Result<RoomVersionId> {
if event_type == TimelineEventType::RoomCreate {
let content: RoomCreateEventContent = serde_json::from_str(content.get())?;
Ok(content.room_version)
} else {
Err(Error::InconsistentRoomState(
"non-create event for room of unknown version",
room_id,
))
}
}
let PduBuilder {
event_type,
content,
@ -38,67 +55,84 @@ pub async fn create_hash_and_sign_event(
redacts,
timestamp,
} = pdu_builder;
let prev_events: Vec<OwnedEventId> = self
.services
.state
.get_forward_extremities(room_id)
.take(20)
.map(Into::into)
.collect()
.await;
// If there was no create event yet, assume we are creating a room
let room_version_id = self
.services
.state
.get_room_version(room_id)
.await
.or_else(|_| {
if event_type == TimelineEventType::RoomCreate {
let content: RoomCreateEventContent = serde_json::from_str(content.get())?;
Ok(content.room_version)
} else {
Err(Error::InconsistentRoomState(
"non-create event for room of unknown version",
room_id.to_owned(),
))
}
})?;
let room_version_id = match room_id {
| Some(room_id) => self
.services
.state
.get_room_version(room_id)
.await
.or_else(|_| from_evt(room_id.to_owned(), event_type.clone(), content.clone()))?,
| None => from_evt(
RoomId::new(self.services.globals.server_name()),
event_type.clone(),
content.clone(),
)?,
};
let room_version = RoomVersion::new(&room_version_id).expect("room version is supported");
// TODO(hydra): Only create events can lack a room ID.
let auth_events = self
.services
.state
.get_auth_events(room_id, &event_type, sender, state_key.as_deref(), &content)
.await?;
let prev_events: Vec<OwnedEventId> = match room_id {
| Some(room_id) =>
self.services
.state
.get_forward_extremities(room_id)
.take(20)
.map(Into::into)
.collect()
.await,
| None => Vec::new(),
};
let auth_events: HashMap<(StateEventType, SmallString<[u8; 48]>), PduEvent> = match room_id {
| Some(room_id) =>
self.services
.state
.get_auth_events(
room_id,
&event_type,
sender,
state_key.as_deref(),
&content,
&room_version,
)
.await?,
| None => HashMap::new(),
};
// Our depth is the maximum depth of prev_events + 1
let depth = prev_events
.iter()
.stream()
.map(Ok)
.and_then(|event_id| self.get_pdu(event_id))
.and_then(|pdu| future::ok(pdu.depth))
.ignore_err()
.ready_fold(uint!(0), cmp::max)
.await
.saturating_add(uint!(1));
let depth = match room_id {
| Some(_) => prev_events
.iter()
.stream()
.map(Ok)
.and_then(|event_id| self.get_pdu(event_id))
.and_then(|pdu| future::ok(pdu.depth))
.ignore_err()
.ready_fold(uint!(0), cmp::max)
.await
.saturating_add(uint!(1)),
| None => uint!(1),
};
let mut unsigned = unsigned.unwrap_or_default();
if let Some(state_key) = &state_key {
if let Ok(prev_pdu) = self
.services
.state_accessor
.room_state_get(room_id, &event_type.to_string().into(), state_key)
.await
{
unsigned.insert("prev_content".to_owned(), prev_pdu.get_content_as_value());
unsigned.insert("prev_sender".to_owned(), serde_json::to_value(prev_pdu.sender())?);
unsigned
.insert("replaces_state".to_owned(), serde_json::to_value(prev_pdu.event_id())?);
if let Some(room_id) = room_id {
if let Some(state_key) = &state_key {
if let Ok(prev_pdu) = self
.services
.state_accessor
.room_state_get(room_id, &event_type.clone().to_string().into(), state_key)
.await
{
unsigned.insert("prev_content".to_owned(), prev_pdu.get_content_as_value());
unsigned
.insert("prev_sender".to_owned(), serde_json::to_value(prev_pdu.sender())?);
unsigned.insert(
"replaces_state".to_owned(),
serde_json::to_value(prev_pdu.event_id())?,
);
}
}
}
@ -109,15 +143,15 @@ pub async fn create_hash_and_sign_event(
// The first two events in a room are always m.room.create and m.room.member,
// so any other events with that same depth are illegal.
warn!(
"Had unsafe depth {depth} when creating non-state event in {room_id}. Cowardly \
aborting"
"Had unsafe depth {depth} when creating non-state event in {}. Cowardly aborting",
room_id.expect("room_id is Some here").as_str()
);
return Err!(Request(Unknown("Unsafe depth for non-state event.")));
}
let mut pdu = PduEvent {
event_id: ruma::event_id!("$thiswillbefilledinlater").into(),
room_id: room_id.to_owned(),
room_id: room_id.map(ToOwned::to_owned),
sender: sender.to_owned(),
origin: None,
origin_server_ts: timestamp.map_or_else(
@ -152,11 +186,30 @@ pub async fn create_hash_and_sign_event(
ready(auth_events.get(&key).map(ToOwned::to_owned))
};
let room_id_or_hash = pdu.room_id_or_hash();
let create_pdu = match &pdu.kind {
| TimelineEventType::RoomCreate => None,
| _ => Some(
self.services
.state_accessor
.room_state_get(&room_id_or_hash, &StateEventType::RoomCreate, "")
.await
.map_err(|e| {
err!(Request(Forbidden(warn!("Failed to fetch room create event: {e}"))))
})?,
),
};
let create_event = match &pdu.kind {
| TimelineEventType::RoomCreate => &pdu,
| _ => create_pdu.as_ref().unwrap().as_pdu(),
};
let auth_check = state_res::auth_check(
&room_version,
&pdu,
None, // TODO: third_party_invite
auth_fetch,
create_event,
)
.await
.map_err(|e| err!(Request(Forbidden(warn!("Auth check failed: {e:?}")))))?;
@ -164,25 +217,11 @@ pub async fn create_hash_and_sign_event(
if !auth_check {
return Err!(Request(Forbidden("Event is not authorized.")));
}
// Check with the policy server
match self
.services
.event_handler
.ask_policy_server(&pdu, room_id)
.await
{
| Ok(true) => {},
| Ok(false) => {
return Err!(Request(Forbidden(debug_warn!(
"Policy server marked this event as spam"
))));
},
| Err(e) => {
// fail open
warn!("Failed to check event with policy server: {e}");
},
}
trace!(
"Event {} in room {} is authorized",
pdu.event_id,
pdu.room_id.as_ref().map_or("None", |id| id.as_str())
);
// Hash and sign
let mut pdu_json = utils::to_canonical_object(&pdu).map_err(|e| {
@ -197,13 +236,13 @@ pub async fn create_hash_and_sign_event(
},
}
// Add origin because synapse likes that (and it's required in the spec)
pdu_json.insert(
"origin".to_owned(),
to_canonical_value(self.services.globals.server_name())
.expect("server name is a valid CanonicalJsonValue"),
);
trace!("hashing and signing event {}", pdu.event_id);
if let Err(e) = self
.services
.server_keys
@ -222,12 +261,46 @@ pub async fn create_hash_and_sign_event(
pdu_json.insert("event_id".into(), CanonicalJsonValue::String(pdu.event_id.clone().into()));
// Check with the policy server
// TODO(hydra): Skip this check for create events (why didnt we do this
// already?)
if room_id.is_some() {
trace!(
"Checking event {} in room {} with policy server",
pdu.event_id,
pdu.room_id.as_ref().map_or("None", |id| id.as_str())
);
match self
.services
.event_handler
.ask_policy_server(&pdu, &pdu.room_id_or_hash())
.await
{
| Ok(true) => {},
| Ok(false) => {
return Err!(Request(Forbidden(debug_warn!(
"Policy server marked this event as spam"
))));
},
| Err(e) => {
// fail open
warn!("Failed to check event with policy server: {e}");
},
}
}
// Generate short event id
trace!(
"Generating short event ID for {} in room {}",
pdu.event_id,
pdu.room_id.as_ref().map_or("None", |id| id.as_str())
);
let _shorteventid = self
.services
.short
.get_or_create_shorteventid(&pdu.event_id)
.await;
trace!("New PDU created: {pdu:?}");
Ok((pdu, pdu_json))
}

View file

@ -39,7 +39,11 @@ pub async fn redact_pdu<Pdu: Event + Send + Sync>(
}
}
let room_version_id = self.services.state.get_room_version(pdu.room_id()).await?;
let room_version_id = self
.services
.state
.get_room_version(&pdu.room_id_or_hash())
.await?;
pdu.redact(&room_version_id, reason.to_value())?;