From d221491377b7e170e8ace28336be79e9de290306 Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Tue, 2 Sep 2025 02:21:34 +0100 Subject: [PATCH] fix(hydra): Backfill server selection in v12 --- src/service/rooms/timeline/backfill.rs | 56 ++++++++++++++++++++++---- 1 file changed, 49 insertions(+), 7 deletions(-) diff --git a/src/service/rooms/timeline/backfill.rs b/src/service/rooms/timeline/backfill.rs index 391e7545..5b70a02e 100644 --- a/src/service/rooms/timeline/backfill.rs +++ b/src/service/rooms/timeline/backfill.rs @@ -1,20 +1,23 @@ use std::iter::once; +use conduwuit::RoomVersion; use conduwuit_core::{ Result, debug, debug_warn, implement, info, matrix::{ event::Event, pdu::{PduCount, PduId, RawPduId}, }, + trace, utils::{IterStream, ReadyExt}, validated, warn, }; use futures::{FutureExt, StreamExt}; use ruma::{ - RoomId, ServerName, + Int, RoomId, ServerName, api::federation, events::{ - StateEventType, TimelineEventType, room::power_levels::RoomPowerLevelsEventContent, + StateEventType, TimelineEventType, + room::{create::RoomCreateEventContent, power_levels::RoomPowerLevelsEventContent}, }, uint, }; @@ -23,7 +26,7 @@ use serde_json::value::RawValue as RawJsonValue; use super::ExtractBody; #[implement(super::Service)] -#[tracing::instrument(name = "backfill", level = "debug", skip(self))] +#[tracing::instrument(name = "backfill", level = "trace", skip(self))] pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Result<()> { if self .services @@ -59,11 +62,47 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re .room_state_get_content(room_id, &StateEventType::RoomPowerLevels, "") .await .unwrap_or_default(); + let create_event_content: RoomCreateEventContent = self + .services + .state_accessor + .room_state_get_content(room_id, &StateEventType::RoomCreate, "") + .await?; + let create_event = self + .services + .state_accessor + .room_state_get(room_id, &StateEventType::RoomCreate, "") + .await?; - let room_mods = power_levels.users.iter().filter_map(|(user_id, level)| { - if level > &power_levels.users_default && !self.services.globals.user_is_local(user_id) { + let room_version = + RoomVersion::new(&create_event_content.room_version).expect("supported room version"); + let mut users = power_levels.users.clone(); + if room_version.explicitly_privilege_room_creators { + users.insert(create_event.sender().to_owned(), Int::MAX); + if let Some(additional_creators) = &create_event_content.additional_creators { + for user_id in additional_creators { + users.insert(user_id.to_owned(), Int::MAX); + } + } + } + + let room_mods = users.iter().filter_map(|(user_id, level)| { + let remote_powered = + level > &power_levels.users_default && !self.services.globals.user_is_local(user_id); + let creator = if room_version.explicitly_privilege_room_creators { + create_event.sender() == user_id + || create_event_content + .additional_creators + .as_ref() + .is_some_and(|c| c.contains(user_id)) + } else { + false + }; + + if remote_powered || creator { + debug!(%remote_powered, %creator, "User {user_id} can backfill in room {room_id}"); Some(user_id.server_name()) } else { + debug!(%remote_powered, %creator, "User {user_id} cannot backfill in room {room_id}"); None } }); @@ -93,11 +132,14 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re ) .ready_filter(|server_name| !self.services.globals.server_is_ours(server_name)) .filter_map(|server_name| async move { - self.services + let sn = self + .services .state_cache .server_in_room(&server_name, room_id) .await - .then_some(server_name) + .then_some(server_name.clone()); + trace!(server_name = ?server_name.clone(), "Considering server for backfill"); + sn }) .boxed();