From b9ce99d036572aee3c1b3be725e0ae9449a865a5 Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Sat, 19 Jul 2025 20:22:29 +0100 Subject: [PATCH 01/10] feat(policy-server): Policy server following --- src/core/matrix/state_res/event_auth.rs | 7 +- .../rooms/event_handler/call_policyserv.rs | 71 +++++++++++++++++++ src/service/rooms/event_handler/mod.rs | 1 + .../event_handler/upgrade_outlier_pdu.rs | 34 ++++++--- 4 files changed, 102 insertions(+), 11 deletions(-) create mode 100644 src/service/rooms/event_handler/call_policyserv.rs diff --git a/src/core/matrix/state_res/event_auth.rs b/src/core/matrix/state_res/event_auth.rs index 5c36ce03..819d05e2 100644 --- a/src/core/matrix/state_res/event_auth.rs +++ b/src/core/matrix/state_res/event_auth.rs @@ -5,7 +5,7 @@ use futures::{ future::{OptionFuture, join3}, }; use ruma::{ - Int, OwnedUserId, RoomVersionId, UserId, + EventId, Int, OwnedUserId, RoomVersionId, UserId, events::room::{ create::RoomCreateEventContent, join_rules::{JoinRule, RoomJoinRulesEventContent}, @@ -217,8 +217,9 @@ where } /* - // TODO: In the past this code caused problems federating with synapse, maybe this has been - // resolved already. Needs testing. + // TODO: In the past this code was commented as it caused problems with Synapse. This is no + // longer the case. This needs to be implemented. + // See also: https://github.com/ruma/ruma/pull/2064 // // 2. Reject if auth_events // a. auth_events cannot have duplicate keys since it's a BTree diff --git a/src/service/rooms/event_handler/call_policyserv.rs b/src/service/rooms/event_handler/call_policyserv.rs new file mode 100644 index 00000000..4a52227d --- /dev/null +++ b/src/service/rooms/event_handler/call_policyserv.rs @@ -0,0 +1,71 @@ +use conduwuit::{ + Err, Event, PduEvent, Result, debug, implement, utils::to_canonical_object, warn, +}; +use ruma::{ + RoomId, ServerName, + api::federation::room::policy::v1::Request as PolicyRequest, + canonical_json::to_canonical_value, + events::{StateEventType, room::policy::RoomPolicyEventContent}, +}; + +/// Returns Ok if the policy server allows the event +#[implement(super::Service)] +#[tracing::instrument(skip_all, level = "debug")] +pub async fn policyserv_check(&self, pdu: &PduEvent, room_id: &RoomId) -> Result { + let Ok(policyserver) = self + .services + .state_accessor + .room_state_get_content(room_id, &StateEventType::RoomPolicy, "") + .await + .map(|c: RoomPolicyEventContent| c) + else { + return Ok(()); + }; + + let via = match policyserver.via { + | Some(ref via) => ServerName::parse(via)?, + | None => { + debug!("No policy server configured for room {room_id}"); + return Ok(()); + }, + }; + // TODO: dont do *this* + let pdu_json = self.services.timeline.get_pdu_json(pdu.event_id()).await?; + let outgoing = self + .services + .sending + .convert_to_outgoing_federation_event(pdu_json) + .await; + // let s = match serde_json::to_string(outgoing.as_ref()) { + // | Ok(s) => s, + // | Err(e) => { + // warn!("Failed to convert pdu {} to outgoing federation event: {e}", + // pdu.event_id()); return Err!(Request(InvalidParam("Failed to convert PDU + // to outgoing event."))); }, + // }; + debug!("Checking pdu {outgoing:?} for spam with policy server {via} for room {room_id}"); + let response = self + .services + .sending + .send_federation_request(via, PolicyRequest { + event_id: pdu.event_id().to_owned(), + pdu: Some(outgoing), + }) + .await; + let response = match response { + | Ok(response) => response, + | Err(e) => { + warn!("Failed to contact policy server {via} for room {room_id}: {e}"); + return Ok(()); + }, + }; + if response.recommendation == "spam" { + warn!( + "Event {} in room {room_id} was marked as spam by policy server {via}", + pdu.event_id().to_owned() + ); + return Err!(Request(Forbidden("Event was marked as spam by policy server"))); + }; + + Ok(()) +} diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs index aed38e1e..5ed25c6e 100644 --- a/src/service/rooms/event_handler/mod.rs +++ b/src/service/rooms/event_handler/mod.rs @@ -1,4 +1,5 @@ mod acl_check; +mod call_policyserv; mod fetch_and_handle_outliers; mod fetch_prev; mod fetch_state; diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index 4093cb05..abb5c116 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -1,7 +1,7 @@ use std::{borrow::Borrow, collections::BTreeMap, iter::once, sync::Arc, time::Instant}; use conduwuit::{ - Err, Result, debug, debug_info, err, implement, is_equal_to, + Err, Result, debug, debug_info, err, implement, info, is_equal_to, matrix::{Event, EventTypeExt, PduEvent, StateKey, state_res}, trace, utils::stream::{BroadbandExt, ReadyExt}, @@ -47,7 +47,7 @@ where return Err!(Request(InvalidParam("Event has been soft failed"))); } - debug!("Upgrading to timeline pdu"); + debug!("Upgrading pdu {} from outlier to timeline pdu", incoming_pdu.event_id); let timer = Instant::now(); let room_version_id = get_room_version_id(create_event)?; @@ -55,7 +55,7 @@ where // backwards extremities doing all the checks in this list starting at 1. // These are not timeline events. - debug!("Resolving state at event"); + debug!("Resolving state at event {}", incoming_pdu.event_id); let mut state_at_incoming_event = if incoming_pdu.prev_events().count() == 1 { self.state_at_incoming_degree_one(&incoming_pdu).await? } else { @@ -74,7 +74,7 @@ where let room_version = to_room_version(&room_version_id); - debug!("Performing auth check"); + debug!("Performing auth check to upgrade {}", incoming_pdu.event_id); // 11. Check the auth of the event passes based on the state of the event let state_fetch_state = &state_at_incoming_event; let state_fetch = |k: StateEventType, s: StateKey| async move { @@ -84,6 +84,7 @@ where self.services.timeline.get_pdu(event_id).await.ok() }; + debug!("running auth check on {}", incoming_pdu.event_id); let auth_check = state_res::event_auth::auth_check( &room_version, &incoming_pdu, @@ -97,7 +98,7 @@ where return Err!(Request(Forbidden("Event has failed auth check with state at the event."))); } - debug!("Gathering auth events"); + debug!("Gathering auth events for {}", incoming_pdu.event_id); let auth_events = self .services .state @@ -115,6 +116,7 @@ where ready(auth_events.get(&key).map(ToOwned::to_owned)) }; + debug!("running auth check on {} with claimed state auth", incoming_pdu.event_id); let auth_check = state_res::event_auth::auth_check( &room_version, &incoming_pdu, @@ -125,8 +127,8 @@ where .map_err(|e| err!(Request(Forbidden("Auth check failed: {e:?}"))))?; // Soft fail check before doing state res - debug!("Performing soft-fail check"); - let soft_fail = match (auth_check, incoming_pdu.redacts_id(&room_version_id)) { + debug!("Performing soft-fail check on {}", incoming_pdu.event_id); + let mut soft_fail = match (auth_check, incoming_pdu.redacts_id(&room_version_id)) { | (false, _) => true, | (true, None) => false, | (true, Some(redact_id)) => @@ -219,10 +221,26 @@ where .await?; } + // 14-pre. If the event is not a state event, ask the policy server about it + if incoming_pdu.state_key.is_none() + && incoming_pdu.sender().server_name() != self.services.globals.server_name() + { + debug!("Checking policy server for event {}", incoming_pdu.event_id); + let policy = self.policyserv_check(&incoming_pdu, room_id); + if let Err(e) = policy.await { + warn!("Policy server check failed for event {}: {e}", incoming_pdu.event_id); + if !soft_fail { + soft_fail = true; + } + } + debug!("Policy server check passed for event {}", incoming_pdu.event_id); + } + // 14. Check if the event passes auth based on the "current state" of the room, // if not soft fail it if soft_fail { - debug!("Soft failing event"); + info!("Soft failing event {}", incoming_pdu.event_id); + // assert!(extremities.is_empty(), "soft_fail extremities empty"); let extremities = extremities.iter().map(Borrow::borrow); self.services From 1dc9abc00e4df4475c75e5cf84b8ab96cf32674a Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Sat, 19 Jul 2025 20:34:34 +0100 Subject: [PATCH 02/10] chore: Update ruwuma & fix lints --- Cargo.lock | 22 +++++++++---------- Cargo.toml | 2 +- src/core/matrix/state_res/event_auth.rs | 2 +- .../rooms/event_handler/call_policyserv.rs | 7 ++---- 4 files changed, 15 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6f711007..700c04f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3900,7 +3900,7 @@ checksum = "88f8660c1ff60292143c98d08fc6e2f654d722db50410e3f3797d40baaf9d8f3" [[package]] name = "ruma" version = "0.10.1" -source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=a4b948b40417a65ab0282ae47cc50035dd455e02#a4b948b40417a65ab0282ae47cc50035dd455e02" +source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=b753738047d1f443aca870896ef27ecaacf027da#b753738047d1f443aca870896ef27ecaacf027da" dependencies = [ "assign", "js_int", @@ -3920,7 +3920,7 @@ dependencies = [ [[package]] name = "ruma-appservice-api" version = "0.10.0" -source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=a4b948b40417a65ab0282ae47cc50035dd455e02#a4b948b40417a65ab0282ae47cc50035dd455e02" +source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=b753738047d1f443aca870896ef27ecaacf027da#b753738047d1f443aca870896ef27ecaacf027da" dependencies = [ "js_int", "ruma-common", @@ -3932,7 +3932,7 @@ dependencies = [ [[package]] name = "ruma-client-api" version = "0.18.0" -source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=a4b948b40417a65ab0282ae47cc50035dd455e02#a4b948b40417a65ab0282ae47cc50035dd455e02" +source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=b753738047d1f443aca870896ef27ecaacf027da#b753738047d1f443aca870896ef27ecaacf027da" dependencies = [ "as_variant", "assign", @@ -3955,7 +3955,7 @@ dependencies = [ [[package]] name = "ruma-common" version = "0.13.0" -source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=a4b948b40417a65ab0282ae47cc50035dd455e02#a4b948b40417a65ab0282ae47cc50035dd455e02" +source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=b753738047d1f443aca870896ef27ecaacf027da#b753738047d1f443aca870896ef27ecaacf027da" dependencies = [ "as_variant", "base64 0.22.1", @@ -3987,7 +3987,7 @@ dependencies = [ [[package]] name = "ruma-events" version = "0.28.1" -source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=a4b948b40417a65ab0282ae47cc50035dd455e02#a4b948b40417a65ab0282ae47cc50035dd455e02" +source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=b753738047d1f443aca870896ef27ecaacf027da#b753738047d1f443aca870896ef27ecaacf027da" dependencies = [ "as_variant", "indexmap 2.9.0", @@ -4012,7 +4012,7 @@ dependencies = [ [[package]] name = "ruma-federation-api" version = "0.9.0" -source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=a4b948b40417a65ab0282ae47cc50035dd455e02#a4b948b40417a65ab0282ae47cc50035dd455e02" +source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=b753738047d1f443aca870896ef27ecaacf027da#b753738047d1f443aca870896ef27ecaacf027da" dependencies = [ "bytes", "headers", @@ -4034,7 +4034,7 @@ dependencies = [ [[package]] name = "ruma-identifiers-validation" version = "0.9.5" -source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=a4b948b40417a65ab0282ae47cc50035dd455e02#a4b948b40417a65ab0282ae47cc50035dd455e02" +source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=b753738047d1f443aca870896ef27ecaacf027da#b753738047d1f443aca870896ef27ecaacf027da" dependencies = [ "js_int", "thiserror 2.0.12", @@ -4043,7 +4043,7 @@ dependencies = [ [[package]] name = "ruma-identity-service-api" version = "0.9.0" -source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=a4b948b40417a65ab0282ae47cc50035dd455e02#a4b948b40417a65ab0282ae47cc50035dd455e02" +source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=b753738047d1f443aca870896ef27ecaacf027da#b753738047d1f443aca870896ef27ecaacf027da" dependencies = [ "js_int", "ruma-common", @@ -4053,7 +4053,7 @@ dependencies = [ [[package]] name = "ruma-macros" version = "0.13.0" -source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=a4b948b40417a65ab0282ae47cc50035dd455e02#a4b948b40417a65ab0282ae47cc50035dd455e02" +source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=b753738047d1f443aca870896ef27ecaacf027da#b753738047d1f443aca870896ef27ecaacf027da" dependencies = [ "cfg-if", "proc-macro-crate", @@ -4068,7 +4068,7 @@ dependencies = [ [[package]] name = "ruma-push-gateway-api" version = "0.9.0" -source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=a4b948b40417a65ab0282ae47cc50035dd455e02#a4b948b40417a65ab0282ae47cc50035dd455e02" +source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=b753738047d1f443aca870896ef27ecaacf027da#b753738047d1f443aca870896ef27ecaacf027da" dependencies = [ "js_int", "ruma-common", @@ -4080,7 +4080,7 @@ dependencies = [ [[package]] name = "ruma-signatures" version = "0.15.0" -source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=a4b948b40417a65ab0282ae47cc50035dd455e02#a4b948b40417a65ab0282ae47cc50035dd455e02" +source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=b753738047d1f443aca870896ef27ecaacf027da#b753738047d1f443aca870896ef27ecaacf027da" dependencies = [ "base64 0.22.1", "ed25519-dalek", diff --git a/Cargo.toml b/Cargo.toml index ef917332..fb00d6d3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -352,7 +352,7 @@ version = "0.1.2" [workspace.dependencies.ruma] git = "https://forgejo.ellis.link/continuwuation/ruwuma" #branch = "conduwuit-changes" -rev = "a4b948b40417a65ab0282ae47cc50035dd455e02" +rev = "b753738047d1f443aca870896ef27ecaacf027da" features = [ "compat", "rand", diff --git a/src/core/matrix/state_res/event_auth.rs b/src/core/matrix/state_res/event_auth.rs index 819d05e2..81c83431 100644 --- a/src/core/matrix/state_res/event_auth.rs +++ b/src/core/matrix/state_res/event_auth.rs @@ -5,7 +5,7 @@ use futures::{ future::{OptionFuture, join3}, }; use ruma::{ - EventId, Int, OwnedUserId, RoomVersionId, UserId, + Int, OwnedUserId, RoomVersionId, UserId, events::room::{ create::RoomCreateEventContent, join_rules::{JoinRule, RoomJoinRulesEventContent}, diff --git a/src/service/rooms/event_handler/call_policyserv.rs b/src/service/rooms/event_handler/call_policyserv.rs index 4a52227d..804c77eb 100644 --- a/src/service/rooms/event_handler/call_policyserv.rs +++ b/src/service/rooms/event_handler/call_policyserv.rs @@ -1,10 +1,7 @@ -use conduwuit::{ - Err, Event, PduEvent, Result, debug, implement, utils::to_canonical_object, warn, -}; +use conduwuit::{Err, Event, PduEvent, Result, debug, implement, warn}; use ruma::{ RoomId, ServerName, api::federation::room::policy::v1::Request as PolicyRequest, - canonical_json::to_canonical_value, events::{StateEventType, room::policy::RoomPolicyEventContent}, }; @@ -65,7 +62,7 @@ pub async fn policyserv_check(&self, pdu: &PduEvent, room_id: &RoomId) -> Result pdu.event_id().to_owned() ); return Err!(Request(Forbidden("Event was marked as spam by policy server"))); - }; + } Ok(()) } From be61ff1465f8b57dfe754acaaef6d2234c326771 Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Sat, 19 Jul 2025 20:47:02 +0100 Subject: [PATCH 03/10] fix(policy-server): Avoid unnecessary database lookup --- src/service/rooms/event_handler/call_policyserv.rs | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/src/service/rooms/event_handler/call_policyserv.rs b/src/service/rooms/event_handler/call_policyserv.rs index 804c77eb..e7ae1d0f 100644 --- a/src/service/rooms/event_handler/call_policyserv.rs +++ b/src/service/rooms/event_handler/call_policyserv.rs @@ -26,20 +26,11 @@ pub async fn policyserv_check(&self, pdu: &PduEvent, room_id: &RoomId) -> Result return Ok(()); }, }; - // TODO: dont do *this* - let pdu_json = self.services.timeline.get_pdu_json(pdu.event_id()).await?; let outgoing = self .services .sending - .convert_to_outgoing_federation_event(pdu_json) + .convert_to_outgoing_federation_event(pdu.to_canonical_object()) .await; - // let s = match serde_json::to_string(outgoing.as_ref()) { - // | Ok(s) => s, - // | Err(e) => { - // warn!("Failed to convert pdu {} to outgoing federation event: {e}", - // pdu.event_id()); return Err!(Request(InvalidParam("Failed to convert PDU - // to outgoing event."))); }, - // }; debug!("Checking pdu {outgoing:?} for spam with policy server {via} for room {room_id}"); let response = self .services From 964b23a4282b84d1ca10d9eaf6f056646e21a8d9 Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Sat, 19 Jul 2025 20:50:47 +0100 Subject: [PATCH 04/10] style(policy-server): Restructure logging --- src/service/rooms/event_handler/call_policyserv.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/service/rooms/event_handler/call_policyserv.rs b/src/service/rooms/event_handler/call_policyserv.rs index e7ae1d0f..894e28af 100644 --- a/src/service/rooms/event_handler/call_policyserv.rs +++ b/src/service/rooms/event_handler/call_policyserv.rs @@ -43,14 +43,21 @@ pub async fn policyserv_check(&self, pdu: &PduEvent, room_id: &RoomId) -> Result let response = match response { | Ok(response) => response, | Err(e) => { - warn!("Failed to contact policy server {via} for room {room_id}: {e}"); + warn!( + via = %via, + event_id = %pdu.event_id(), + room_id = %room_id, + "Failed to contact policy server: {e}" + ); return Ok(()); }, }; if response.recommendation == "spam" { warn!( - "Event {} in room {room_id} was marked as spam by policy server {via}", - pdu.event_id().to_owned() + via = %via, + event_id = %pdu.event_id(), + room_id = %room_id, + "Event was marked as spam by policy server", ); return Err!(Request(Forbidden("Event was marked as spam by policy server"))); } From 40d789dd72d7981fc4f819ebb5c39b33a8db9400 Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Sat, 19 Jul 2025 20:54:06 +0100 Subject: [PATCH 05/10] feat(policy-server): Soft-fail redactions for failed events --- .../event_handler/upgrade_outlier_pdu.rs | 22 ++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index abb5c116..e8e22fe9 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -222,9 +222,7 @@ where } // 14-pre. If the event is not a state event, ask the policy server about it - if incoming_pdu.state_key.is_none() - && incoming_pdu.sender().server_name() != self.services.globals.server_name() - { + if incoming_pdu.state_key.is_none() { debug!("Checking policy server for event {}", incoming_pdu.event_id); let policy = self.policyserv_check(&incoming_pdu, room_id); if let Err(e) = policy.await { @@ -236,6 +234,24 @@ where debug!("Policy server check passed for event {}", incoming_pdu.event_id); } + // Additionally, if this is a redaction for a soft-failed event, we soft-fail it + // also + if let Some(redact_id) = incoming_pdu.redacts_id(&room_version_id) { + debug!("Checking if redaction {} is for a soft-failed event", redact_id); + if self + .services + .pdu_metadata + .is_event_soft_failed(&redact_id) + .await + { + warn!( + "Redaction {} is for a soft-failed event, soft failing the redaction", + redact_id + ); + soft_fail = true; + } + } + // 14. Check if the event passes auth based on the "current state" of the room, // if not soft fail it if soft_fail { From efce67264e605ca6350e68ca5db47c4957121f07 Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Sat, 19 Jul 2025 21:09:23 +0100 Subject: [PATCH 06/10] feat(policy-server): Prevent local events that fail the policy check --- src/service/rooms/timeline/create.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/service/rooms/timeline/create.rs b/src/service/rooms/timeline/create.rs index 20ccaf56..6301d785 100644 --- a/src/service/rooms/timeline/create.rs +++ b/src/service/rooms/timeline/create.rs @@ -165,6 +165,17 @@ pub async fn create_hash_and_sign_event( return Err!(Request(Forbidden("Event is not authorized."))); } + // Check with the policy server + if self + .services + .event_handler + .policyserv_check(&pdu, room_id) + .await + .is_err() + { + return Err!(Request(Forbidden(debug_warn!("Policy server marked this event as spam")))); + } + // Hash and sign let mut pdu_json = utils::to_canonical_object(&pdu).map_err(|e| { err!(Request(BadJson(warn!("Failed to convert PDU to canonical JSON: {e}")))) From fe1610ab1cd8618a0981e4279c61f761dfb3ca22 Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Sat, 19 Jul 2025 22:07:18 +0100 Subject: [PATCH 07/10] feat(policy-server): Limit policy server request timeout to 10 seconds --- .../rooms/event_handler/call_policyserv.rs | 33 +++++++++++++------ 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/src/service/rooms/event_handler/call_policyserv.rs b/src/service/rooms/event_handler/call_policyserv.rs index 894e28af..0592186a 100644 --- a/src/service/rooms/event_handler/call_policyserv.rs +++ b/src/service/rooms/event_handler/call_policyserv.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use conduwuit::{Err, Event, PduEvent, Result, debug, implement, warn}; use ruma::{ RoomId, ServerName, @@ -32,17 +34,19 @@ pub async fn policyserv_check(&self, pdu: &PduEvent, room_id: &RoomId) -> Result .convert_to_outgoing_federation_event(pdu.to_canonical_object()) .await; debug!("Checking pdu {outgoing:?} for spam with policy server {via} for room {room_id}"); - let response = self - .services - .sending - .send_federation_request(via, PolicyRequest { - event_id: pdu.event_id().to_owned(), - pdu: Some(outgoing), - }) - .await; + let response = tokio::time::timeout( + Duration::from_secs(10), + self.services + .sending + .send_federation_request(via, PolicyRequest { + event_id: pdu.event_id().to_owned(), + pdu: Some(outgoing), + }), + ) + .await; let response = match response { - | Ok(response) => response, - | Err(e) => { + | Ok(Ok(response)) => response, + | Ok(Err(e)) => { warn!( via = %via, event_id = %pdu.event_id(), @@ -51,6 +55,15 @@ pub async fn policyserv_check(&self, pdu: &PduEvent, room_id: &RoomId) -> Result ); return Ok(()); }, + | Err(_) => { + warn!( + via = %via, + event_id = %pdu.event_id(), + room_id = %room_id, + "Policy server request timed out after 10 seconds" + ); + return Ok(()); + }, }; if response.recommendation == "spam" { warn!( From 977fddf4c5f7561c48b40ffc855ddaa6ef90219a Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Sat, 19 Jul 2025 23:50:32 +0100 Subject: [PATCH 08/10] feat(policy-server): Optimise policy server lookups --- src/service/rooms/event_handler/call_policyserv.rs | 12 ++++++++++++ src/service/rooms/event_handler/mod.rs | 2 ++ 2 files changed, 14 insertions(+) diff --git a/src/service/rooms/event_handler/call_policyserv.rs b/src/service/rooms/event_handler/call_policyserv.rs index 0592186a..331d4c8f 100644 --- a/src/service/rooms/event_handler/call_policyserv.rs +++ b/src/service/rooms/event_handler/call_policyserv.rs @@ -11,6 +11,10 @@ use ruma::{ #[implement(super::Service)] #[tracing::instrument(skip_all, level = "debug")] pub async fn policyserv_check(&self, pdu: &PduEvent, room_id: &RoomId) -> Result { + if pdu.event_type().to_owned() == StateEventType::RoomPolicy.into() { + debug!("Skipping spam check for policy server meta-event in room {room_id}"); + return Ok(()); + } let Ok(policyserver) = self .services .state_accessor @@ -28,6 +32,14 @@ pub async fn policyserv_check(&self, pdu: &PduEvent, room_id: &RoomId) -> Result return Ok(()); }, }; + if via.is_empty() { + debug!("Policy server is empty for room {room_id}, skipping spam check"); + return Ok(()); + } + if !self.services.state_cache.server_in_room(via, room_id).await { + debug!("Policy server {via} is not in the room {room_id}, skipping spam check"); + return Ok(()); + } let outgoing = self .services .sending diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs index 5ed25c6e..4e948e95 100644 --- a/src/service/rooms/event_handler/mod.rs +++ b/src/service/rooms/event_handler/mod.rs @@ -43,6 +43,7 @@ struct Services { server_keys: Dep, short: Dep, state: Dep, + state_cache: Dep, state_accessor: Dep, state_compressor: Dep, timeline: Dep, @@ -68,6 +69,7 @@ impl crate::Service for Service { pdu_metadata: args.depend::("rooms::pdu_metadata"), short: args.depend::("rooms::short"), state: args.depend::("rooms::state"), + state_cache: args.depend::("rooms::state_cache"), state_accessor: args .depend::("rooms::state_accessor"), state_compressor: args From 5454c22b5bed70479610f19ab80ff981ab952003 Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Sat, 19 Jul 2025 23:54:07 +0100 Subject: [PATCH 09/10] style(policy-server): Run clippy --- src/service/rooms/event_handler/call_policyserv.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/service/rooms/event_handler/call_policyserv.rs b/src/service/rooms/event_handler/call_policyserv.rs index 331d4c8f..96e3f7cc 100644 --- a/src/service/rooms/event_handler/call_policyserv.rs +++ b/src/service/rooms/event_handler/call_policyserv.rs @@ -11,7 +11,7 @@ use ruma::{ #[implement(super::Service)] #[tracing::instrument(skip_all, level = "debug")] pub async fn policyserv_check(&self, pdu: &PduEvent, room_id: &RoomId) -> Result { - if pdu.event_type().to_owned() == StateEventType::RoomPolicy.into() { + if *pdu.event_type() == StateEventType::RoomPolicy.into() { debug!("Skipping spam check for policy server meta-event in room {room_id}"); return Ok(()); } From 4f90379ac15503be293f68c2f77bfa99130d5904 Mon Sep 17 00:00:00 2001 From: Jade Ellis Date: Sun, 20 Jul 2025 01:03:18 +0100 Subject: [PATCH 10/10] style: Improve logging and comments --- src/core/matrix/state_res/event_auth.rs | 4 +- .../rooms/event_handler/call_policyserv.rs | 26 ++++++- .../event_handler/upgrade_outlier_pdu.rs | 73 +++++++++++++++---- 3 files changed, 82 insertions(+), 21 deletions(-) diff --git a/src/core/matrix/state_res/event_auth.rs b/src/core/matrix/state_res/event_auth.rs index 81c83431..77a4a95c 100644 --- a/src/core/matrix/state_res/event_auth.rs +++ b/src/core/matrix/state_res/event_auth.rs @@ -149,8 +149,8 @@ where for<'a> &'a E: Event + Send, { debug!( - event_id = format!("{}", incoming_event.event_id()), - event_type = format!("{}", incoming_event.event_type()), + event_id = %incoming_event.event_id(), + event_type = ?incoming_event.event_type(), "auth_check beginning" ); diff --git a/src/service/rooms/event_handler/call_policyserv.rs b/src/service/rooms/event_handler/call_policyserv.rs index 96e3f7cc..aef99dba 100644 --- a/src/service/rooms/event_handler/call_policyserv.rs +++ b/src/service/rooms/event_handler/call_policyserv.rs @@ -1,3 +1,8 @@ +//! Policy server integration for event spam checking in Matrix rooms. +//! +//! This module implements a check against a room-specific policy server, as +//! described in the relevant Matrix spec proposal (see: https://github.com/matrix-org/matrix-spec-proposals/pull/4284). + use std::time::Duration; use conduwuit::{Err, Event, PduEvent, Result, debug, implement, warn}; @@ -12,7 +17,11 @@ use ruma::{ #[tracing::instrument(skip_all, level = "debug")] pub async fn policyserv_check(&self, pdu: &PduEvent, room_id: &RoomId) -> Result { if *pdu.event_type() == StateEventType::RoomPolicy.into() { - debug!("Skipping spam check for policy server meta-event in room {room_id}"); + debug!( + room_id = %room_id, + event_type = ?pdu.event_type(), + "Skipping spam check for policy server meta-event" + ); return Ok(()); } let Ok(policyserver) = self @@ -37,7 +46,11 @@ pub async fn policyserv_check(&self, pdu: &PduEvent, room_id: &RoomId) -> Result return Ok(()); } if !self.services.state_cache.server_in_room(via, room_id).await { - debug!("Policy server {via} is not in the room {room_id}, skipping spam check"); + debug!( + room_id = %room_id, + via = %via, + "Policy server is not in the room, skipping spam check" + ); return Ok(()); } let outgoing = self @@ -45,7 +58,12 @@ pub async fn policyserv_check(&self, pdu: &PduEvent, room_id: &RoomId) -> Result .sending .convert_to_outgoing_federation_event(pdu.to_canonical_object()) .await; - debug!("Checking pdu {outgoing:?} for spam with policy server {via} for room {room_id}"); + debug!( + room_id = %room_id, + via = %via, + outgoing = ?outgoing, + "Checking event for spam with policy server" + ); let response = tokio::time::timeout( Duration::from_secs(10), self.services @@ -65,6 +83,8 @@ pub async fn policyserv_check(&self, pdu: &PduEvent, room_id: &RoomId) -> Result room_id = %room_id, "Failed to contact policy server: {e}" ); + // Network or policy server errors are treated as non-fatal: event is allowed by + // default. return Ok(()); }, | Err(_) => { diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index e8e22fe9..d3dc32fb 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -47,7 +47,10 @@ where return Err!(Request(InvalidParam("Event has been soft failed"))); } - debug!("Upgrading pdu {} from outlier to timeline pdu", incoming_pdu.event_id); + debug!( + event_id = %incoming_pdu.event_id, + "Upgrading PDU from outlier to timeline" + ); let timer = Instant::now(); let room_version_id = get_room_version_id(create_event)?; @@ -55,7 +58,10 @@ where // backwards extremities doing all the checks in this list starting at 1. // These are not timeline events. - debug!("Resolving state at event {}", incoming_pdu.event_id); + debug!( + event_id = %incoming_pdu.event_id, + "Resolving state at event" + ); let mut state_at_incoming_event = if incoming_pdu.prev_events().count() == 1 { self.state_at_incoming_degree_one(&incoming_pdu).await? } else { @@ -74,7 +80,10 @@ where let room_version = to_room_version(&room_version_id); - debug!("Performing auth check to upgrade {}", incoming_pdu.event_id); + debug!( + event_id = %incoming_pdu.event_id, + "Performing auth check to upgrade" + ); // 11. Check the auth of the event passes based on the state of the event let state_fetch_state = &state_at_incoming_event; let state_fetch = |k: StateEventType, s: StateKey| async move { @@ -84,7 +93,10 @@ where self.services.timeline.get_pdu(event_id).await.ok() }; - debug!("running auth check on {}", incoming_pdu.event_id); + debug!( + event_id = %incoming_pdu.event_id, + "Running initial auth check" + ); let auth_check = state_res::event_auth::auth_check( &room_version, &incoming_pdu, @@ -98,7 +110,10 @@ where return Err!(Request(Forbidden("Event has failed auth check with state at the event."))); } - debug!("Gathering auth events for {}", incoming_pdu.event_id); + debug!( + event_id = %incoming_pdu.event_id, + "Gathering auth events" + ); let auth_events = self .services .state @@ -116,7 +131,10 @@ where ready(auth_events.get(&key).map(ToOwned::to_owned)) }; - debug!("running auth check on {} with claimed state auth", incoming_pdu.event_id); + debug!( + event_id = %incoming_pdu.event_id, + "Running auth check with claimed state auth" + ); let auth_check = state_res::event_auth::auth_check( &room_version, &incoming_pdu, @@ -127,7 +145,10 @@ where .map_err(|e| err!(Request(Forbidden("Auth check failed: {e:?}"))))?; // Soft fail check before doing state res - debug!("Performing soft-fail check on {}", incoming_pdu.event_id); + debug!( + event_id = %incoming_pdu.event_id, + "Performing soft-fail check" + ); let mut soft_fail = match (auth_check, incoming_pdu.redacts_id(&room_version_id)) { | (false, _) => true, | (true, None) => false, @@ -142,7 +163,10 @@ where // 13. Use state resolution to find new room state // We start looking at current room state now, so lets lock the room - trace!("Locking the room"); + trace!( + room_id = %room_id, + "Locking the room" + ); let state_lock = self.services.state.mutex.lock(room_id).await; // Now we calculate the set of extremities this room has after the incoming @@ -223,21 +247,32 @@ where // 14-pre. If the event is not a state event, ask the policy server about it if incoming_pdu.state_key.is_none() { - debug!("Checking policy server for event {}", incoming_pdu.event_id); + debug!( + event_id = %incoming_pdu.event_id,"Checking policy server for event"); let policy = self.policyserv_check(&incoming_pdu, room_id); if let Err(e) = policy.await { - warn!("Policy server check failed for event {}: {e}", incoming_pdu.event_id); + warn!( + event_id = %incoming_pdu.event_id, + error = ?e, + "Policy server check failed for event" + ); if !soft_fail { soft_fail = true; } } - debug!("Policy server check passed for event {}", incoming_pdu.event_id); + debug!( + event_id = %incoming_pdu.event_id, + "Policy server check passed for event" + ); } // Additionally, if this is a redaction for a soft-failed event, we soft-fail it // also if let Some(redact_id) = incoming_pdu.redacts_id(&room_version_id) { - debug!("Checking if redaction {} is for a soft-failed event", redact_id); + debug!( + redact_id = %redact_id, + "Checking if redaction is for a soft-failed event" + ); if self .services .pdu_metadata @@ -245,8 +280,8 @@ where .await { warn!( - "Redaction {} is for a soft-failed event, soft failing the redaction", - redact_id + redact_id = %redact_id, + "Redaction is for a soft-failed event, soft failing the redaction" ); soft_fail = true; } @@ -255,7 +290,10 @@ where // 14. Check if the event passes auth based on the "current state" of the room, // if not soft fail it if soft_fail { - info!("Soft failing event {}", incoming_pdu.event_id); + info!( + event_id = %incoming_pdu.event_id, + "Soft failing event" + ); // assert!(extremities.is_empty(), "soft_fail extremities empty"); let extremities = extremities.iter().map(Borrow::borrow); @@ -276,7 +314,10 @@ where .pdu_metadata .mark_event_soft_failed(incoming_pdu.event_id()); - warn!("Event was soft failed: {:?}", incoming_pdu.event_id()); + warn!( + event_id = %incoming_pdu.event_id, + "Event was soft failed" + ); return Err!(Request(InvalidParam("Event has been soft failed"))); }