mirror of
https://forgejo.ellis.link/continuwuation/continuwuity.git
synced 2025-09-10 13:52:49 +02:00
Split timeline service.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
c06aa49a90
commit
c5c309ec43
7 changed files with 1146 additions and 1068 deletions
446
src/service/rooms/timeline/append.rs
Normal file
446
src/service/rooms/timeline/append.rs
Normal file
|
@ -0,0 +1,446 @@
|
|||
use std::{
|
||||
collections::{BTreeMap, HashSet},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use conduwuit_core::{
|
||||
Result, err, error, implement,
|
||||
matrix::{
|
||||
event::Event,
|
||||
pdu::{PduCount, PduEvent, PduId, RawPduId},
|
||||
},
|
||||
utils::{self, ReadyExt},
|
||||
};
|
||||
use futures::StreamExt;
|
||||
use ruma::{
|
||||
CanonicalJsonObject, CanonicalJsonValue, EventId, RoomVersionId, UserId,
|
||||
events::{
|
||||
GlobalAccountDataEventType, StateEventType, TimelineEventType,
|
||||
push_rules::PushRulesEvent,
|
||||
room::{
|
||||
encrypted::Relation,
|
||||
member::{MembershipState, RoomMemberEventContent},
|
||||
power_levels::RoomPowerLevelsEventContent,
|
||||
redaction::RoomRedactionEventContent,
|
||||
},
|
||||
},
|
||||
push::{Action, Ruleset, Tweak},
|
||||
};
|
||||
|
||||
use super::{ExtractBody, ExtractRelatesTo, ExtractRelatesToEventId, RoomMutexGuard};
|
||||
use crate::{appservice::NamespaceRegex, rooms::state_compressor::CompressedState};
|
||||
|
||||
/// Append the incoming event setting the state snapshot to the state from
|
||||
/// the server that sent the event.
|
||||
#[implement(super::Service)]
|
||||
#[tracing::instrument(level = "debug", skip_all)]
|
||||
pub async fn append_incoming_pdu<'a, Leaves>(
|
||||
&'a self,
|
||||
pdu: &'a PduEvent,
|
||||
pdu_json: CanonicalJsonObject,
|
||||
new_room_leaves: Leaves,
|
||||
state_ids_compressed: Arc<CompressedState>,
|
||||
soft_fail: bool,
|
||||
state_lock: &'a RoomMutexGuard,
|
||||
) -> Result<Option<RawPduId>>
|
||||
where
|
||||
Leaves: Iterator<Item = &'a EventId> + Send + 'a,
|
||||
{
|
||||
// We append to state before appending the pdu, so we don't have a moment in
|
||||
// time with the pdu without it's state. This is okay because append_pdu can't
|
||||
// fail.
|
||||
self.services
|
||||
.state
|
||||
.set_event_state(&pdu.event_id, &pdu.room_id, state_ids_compressed)
|
||||
.await?;
|
||||
|
||||
if soft_fail {
|
||||
self.services
|
||||
.pdu_metadata
|
||||
.mark_as_referenced(&pdu.room_id, pdu.prev_events.iter().map(AsRef::as_ref));
|
||||
|
||||
self.services
|
||||
.state
|
||||
.set_forward_extremities(&pdu.room_id, new_room_leaves, state_lock)
|
||||
.await;
|
||||
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let pdu_id = self
|
||||
.append_pdu(pdu, pdu_json, new_room_leaves, state_lock)
|
||||
.await?;
|
||||
|
||||
Ok(Some(pdu_id))
|
||||
}
|
||||
|
||||
/// Creates a new persisted data unit and adds it to a room.
|
||||
///
|
||||
/// By this point the incoming event should be fully authenticated, no auth
|
||||
/// happens in `append_pdu`.
|
||||
///
|
||||
/// Returns pdu id
|
||||
#[implement(super::Service)]
|
||||
#[tracing::instrument(level = "debug", skip_all)]
|
||||
pub async fn append_pdu<'a, Leaves>(
|
||||
&'a self,
|
||||
pdu: &'a PduEvent,
|
||||
mut pdu_json: CanonicalJsonObject,
|
||||
leaves: Leaves,
|
||||
state_lock: &'a RoomMutexGuard,
|
||||
) -> Result<RawPduId>
|
||||
where
|
||||
Leaves: Iterator<Item = &'a EventId> + Send + 'a,
|
||||
{
|
||||
// Coalesce database writes for the remainder of this scope.
|
||||
let _cork = self.db.db.cork_and_flush();
|
||||
|
||||
let shortroomid = self
|
||||
.services
|
||||
.short
|
||||
.get_shortroomid(pdu.room_id())
|
||||
.await
|
||||
.map_err(|_| err!(Database("Room does not exist")))?;
|
||||
|
||||
// Make unsigned fields correct. This is not properly documented in the spec,
|
||||
// but state events need to have previous content in the unsigned field, so
|
||||
// clients can easily interpret things like membership changes
|
||||
if let Some(state_key) = pdu.state_key() {
|
||||
if let CanonicalJsonValue::Object(unsigned) = pdu_json
|
||||
.entry("unsigned".to_owned())
|
||||
.or_insert_with(|| CanonicalJsonValue::Object(BTreeMap::default()))
|
||||
{
|
||||
if let Ok(shortstatehash) = self
|
||||
.services
|
||||
.state_accessor
|
||||
.pdu_shortstatehash(pdu.event_id())
|
||||
.await
|
||||
{
|
||||
if let Ok(prev_state) = self
|
||||
.services
|
||||
.state_accessor
|
||||
.state_get(shortstatehash, &pdu.kind().to_string().into(), state_key)
|
||||
.await
|
||||
{
|
||||
unsigned.insert(
|
||||
"prev_content".to_owned(),
|
||||
CanonicalJsonValue::Object(
|
||||
utils::to_canonical_object(prev_state.get_content_as_value())
|
||||
.map_err(|e| {
|
||||
err!(Database(error!(
|
||||
"Failed to convert prev_state to canonical JSON: {e}",
|
||||
)))
|
||||
})?,
|
||||
),
|
||||
);
|
||||
unsigned.insert(
|
||||
String::from("prev_sender"),
|
||||
CanonicalJsonValue::String(prev_state.sender().to_string()),
|
||||
);
|
||||
unsigned.insert(
|
||||
String::from("replaces_state"),
|
||||
CanonicalJsonValue::String(prev_state.event_id().to_string()),
|
||||
);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
error!("Invalid unsigned type in pdu.");
|
||||
}
|
||||
}
|
||||
|
||||
// We must keep track of all events that have been referenced.
|
||||
self.services
|
||||
.pdu_metadata
|
||||
.mark_as_referenced(pdu.room_id(), pdu.prev_events().map(AsRef::as_ref));
|
||||
|
||||
self.services
|
||||
.state
|
||||
.set_forward_extremities(pdu.room_id(), leaves, state_lock)
|
||||
.await;
|
||||
|
||||
let insert_lock = self.mutex_insert.lock(pdu.room_id()).await;
|
||||
|
||||
let count1 = self.services.globals.next_count().unwrap();
|
||||
|
||||
// Mark as read first so the sending client doesn't get a notification even if
|
||||
// appending fails
|
||||
self.services
|
||||
.read_receipt
|
||||
.private_read_set(pdu.room_id(), pdu.sender(), count1);
|
||||
|
||||
self.services
|
||||
.user
|
||||
.reset_notification_counts(pdu.sender(), pdu.room_id());
|
||||
|
||||
let count2 = PduCount::Normal(self.services.globals.next_count().unwrap());
|
||||
let pdu_id: RawPduId = PduId { shortroomid, shorteventid: count2 }.into();
|
||||
|
||||
// Insert pdu
|
||||
self.db.append_pdu(&pdu_id, pdu, &pdu_json, count2).await;
|
||||
|
||||
drop(insert_lock);
|
||||
|
||||
// See if the event matches any known pushers via power level
|
||||
let power_levels: RoomPowerLevelsEventContent = self
|
||||
.services
|
||||
.state_accessor
|
||||
.room_state_get_content(pdu.room_id(), &StateEventType::RoomPowerLevels, "")
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
|
||||
let mut push_target: HashSet<_> = self
|
||||
.services
|
||||
.state_cache
|
||||
.active_local_users_in_room(pdu.room_id())
|
||||
.map(ToOwned::to_owned)
|
||||
// Don't notify the sender of their own events, and dont send from ignored users
|
||||
.ready_filter(|user| *user != pdu.sender())
|
||||
.filter_map(|recipient_user| async move { (!self.services.users.user_is_ignored(pdu.sender(), &recipient_user).await).then_some(recipient_user) })
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
let mut notifies = Vec::with_capacity(push_target.len().saturating_add(1));
|
||||
let mut highlights = Vec::with_capacity(push_target.len().saturating_add(1));
|
||||
|
||||
if *pdu.kind() == TimelineEventType::RoomMember {
|
||||
if let Some(state_key) = pdu.state_key() {
|
||||
let target_user_id = UserId::parse(state_key)?;
|
||||
|
||||
if self.services.users.is_active_local(target_user_id).await {
|
||||
push_target.insert(target_user_id.to_owned());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let serialized = pdu.to_format();
|
||||
for user in &push_target {
|
||||
let rules_for_user = self
|
||||
.services
|
||||
.account_data
|
||||
.get_global(user, GlobalAccountDataEventType::PushRules)
|
||||
.await
|
||||
.map_or_else(
|
||||
|_| Ruleset::server_default(user),
|
||||
|ev: PushRulesEvent| ev.content.global,
|
||||
);
|
||||
|
||||
let mut highlight = false;
|
||||
let mut notify = false;
|
||||
|
||||
for action in self
|
||||
.services
|
||||
.pusher
|
||||
.get_actions(user, &rules_for_user, &power_levels, &serialized, pdu.room_id())
|
||||
.await
|
||||
{
|
||||
match action {
|
||||
| Action::Notify => notify = true,
|
||||
| Action::SetTweak(Tweak::Highlight(true)) => {
|
||||
highlight = true;
|
||||
},
|
||||
| _ => {},
|
||||
}
|
||||
|
||||
// Break early if both conditions are true
|
||||
if notify && highlight {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if notify {
|
||||
notifies.push(user.clone());
|
||||
}
|
||||
|
||||
if highlight {
|
||||
highlights.push(user.clone());
|
||||
}
|
||||
|
||||
self.services
|
||||
.pusher
|
||||
.get_pushkeys(user)
|
||||
.ready_for_each(|push_key| {
|
||||
self.services
|
||||
.sending
|
||||
.send_pdu_push(&pdu_id, user, push_key.to_owned())
|
||||
.expect("TODO: replace with future");
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
self.db
|
||||
.increment_notification_counts(pdu.room_id(), notifies, highlights);
|
||||
|
||||
match *pdu.kind() {
|
||||
| TimelineEventType::RoomRedaction => {
|
||||
use RoomVersionId::*;
|
||||
|
||||
let room_version_id = self.services.state.get_room_version(pdu.room_id()).await?;
|
||||
match room_version_id {
|
||||
| V1 | V2 | V3 | V4 | V5 | V6 | V7 | V8 | V9 | V10 => {
|
||||
if let Some(redact_id) = pdu.redacts() {
|
||||
if self
|
||||
.services
|
||||
.state_accessor
|
||||
.user_can_redact(redact_id, pdu.sender(), pdu.room_id(), false)
|
||||
.await?
|
||||
{
|
||||
self.redact_pdu(redact_id, pdu, shortroomid).await?;
|
||||
}
|
||||
}
|
||||
},
|
||||
| _ => {
|
||||
let content: RoomRedactionEventContent = pdu.get_content()?;
|
||||
if let Some(redact_id) = &content.redacts {
|
||||
if self
|
||||
.services
|
||||
.state_accessor
|
||||
.user_can_redact(redact_id, pdu.sender(), pdu.room_id(), false)
|
||||
.await?
|
||||
{
|
||||
self.redact_pdu(redact_id, pdu, shortroomid).await?;
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
| TimelineEventType::SpaceChild =>
|
||||
if let Some(_state_key) = pdu.state_key() {
|
||||
self.services
|
||||
.spaces
|
||||
.roomid_spacehierarchy_cache
|
||||
.lock()
|
||||
.await
|
||||
.remove(pdu.room_id());
|
||||
},
|
||||
| TimelineEventType::RoomMember => {
|
||||
if let Some(state_key) = pdu.state_key() {
|
||||
// if the state_key fails
|
||||
let target_user_id =
|
||||
UserId::parse(state_key).expect("This state_key was previously validated");
|
||||
|
||||
let content: RoomMemberEventContent = pdu.get_content()?;
|
||||
let stripped_state = match content.membership {
|
||||
| MembershipState::Invite | MembershipState::Knock =>
|
||||
self.services.state.summary_stripped(pdu).await.into(),
|
||||
| _ => None,
|
||||
};
|
||||
|
||||
// Update our membership info, we do this here incase a user is invited or
|
||||
// knocked and immediately leaves we need the DB to record the invite or
|
||||
// knock event for auth
|
||||
self.services
|
||||
.state_cache
|
||||
.update_membership(
|
||||
pdu.room_id(),
|
||||
target_user_id,
|
||||
content,
|
||||
pdu.sender(),
|
||||
stripped_state,
|
||||
None,
|
||||
true,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
},
|
||||
| TimelineEventType::RoomMessage => {
|
||||
let content: ExtractBody = pdu.get_content()?;
|
||||
if let Some(body) = content.body {
|
||||
self.services.search.index_pdu(shortroomid, &pdu_id, &body);
|
||||
|
||||
if self.services.admin.is_admin_command(pdu, &body).await {
|
||||
self.services
|
||||
.admin
|
||||
.command_with_sender(body, Some((pdu.event_id()).into()), pdu.sender.clone().into())?;
|
||||
}
|
||||
}
|
||||
},
|
||||
| _ => {},
|
||||
}
|
||||
|
||||
if let Ok(content) = pdu.get_content::<ExtractRelatesToEventId>() {
|
||||
if let Ok(related_pducount) = self.get_pdu_count(&content.relates_to.event_id).await {
|
||||
self.services
|
||||
.pdu_metadata
|
||||
.add_relation(count2, related_pducount);
|
||||
}
|
||||
}
|
||||
|
||||
if let Ok(content) = pdu.get_content::<ExtractRelatesTo>() {
|
||||
match content.relates_to {
|
||||
| Relation::Reply { in_reply_to } => {
|
||||
// We need to do it again here, because replies don't have
|
||||
// event_id as a top level field
|
||||
if let Ok(related_pducount) = self.get_pdu_count(&in_reply_to.event_id).await {
|
||||
self.services
|
||||
.pdu_metadata
|
||||
.add_relation(count2, related_pducount);
|
||||
}
|
||||
},
|
||||
| Relation::Thread(thread) => {
|
||||
self.services
|
||||
.threads
|
||||
.add_to_thread(&thread.event_id, pdu)
|
||||
.await?;
|
||||
},
|
||||
| _ => {}, // TODO: Aggregate other types
|
||||
}
|
||||
}
|
||||
|
||||
for appservice in self.services.appservice.read().await.values() {
|
||||
if self
|
||||
.services
|
||||
.state_cache
|
||||
.appservice_in_room(pdu.room_id(), appservice)
|
||||
.await
|
||||
{
|
||||
self.services
|
||||
.sending
|
||||
.send_pdu_appservice(appservice.registration.id.clone(), pdu_id)?;
|
||||
continue;
|
||||
}
|
||||
|
||||
// If the RoomMember event has a non-empty state_key, it is targeted at someone.
|
||||
// If it is our appservice user, we send this PDU to it.
|
||||
if *pdu.kind() == TimelineEventType::RoomMember {
|
||||
if let Some(state_key_uid) = &pdu
|
||||
.state_key
|
||||
.as_ref()
|
||||
.and_then(|state_key| UserId::parse(state_key.as_str()).ok())
|
||||
{
|
||||
let appservice_uid = appservice.registration.sender_localpart.as_str();
|
||||
if state_key_uid == &appservice_uid {
|
||||
self.services
|
||||
.sending
|
||||
.send_pdu_appservice(appservice.registration.id.clone(), pdu_id)?;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let matching_users = |users: &NamespaceRegex| {
|
||||
appservice.users.is_match(pdu.sender().as_str())
|
||||
|| *pdu.kind() == TimelineEventType::RoomMember
|
||||
&& pdu
|
||||
.state_key
|
||||
.as_ref()
|
||||
.is_some_and(|state_key| users.is_match(state_key))
|
||||
};
|
||||
let matching_aliases = |aliases: NamespaceRegex| {
|
||||
self.services
|
||||
.alias
|
||||
.local_aliases_for_room(pdu.room_id())
|
||||
.ready_any(move |room_alias| aliases.is_match(room_alias.as_str()))
|
||||
};
|
||||
|
||||
if matching_aliases(appservice.aliases.clone()).await
|
||||
|| appservice.rooms.is_match(pdu.room_id().as_str())
|
||||
|| matching_users(&appservice.users)
|
||||
{
|
||||
self.services
|
||||
.sending
|
||||
.send_pdu_appservice(appservice.registration.id.clone(), pdu_id)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(pdu_id)
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue