fix(hydra): Backfill server selection in v12

This commit is contained in:
nexy7574 2025-09-02 02:21:34 +01:00
commit d221491377
No known key found for this signature in database

View file

@ -1,20 +1,23 @@
use std::iter::once; use std::iter::once;
use conduwuit::RoomVersion;
use conduwuit_core::{ use conduwuit_core::{
Result, debug, debug_warn, implement, info, Result, debug, debug_warn, implement, info,
matrix::{ matrix::{
event::Event, event::Event,
pdu::{PduCount, PduId, RawPduId}, pdu::{PduCount, PduId, RawPduId},
}, },
trace,
utils::{IterStream, ReadyExt}, utils::{IterStream, ReadyExt},
validated, warn, validated, warn,
}; };
use futures::{FutureExt, StreamExt}; use futures::{FutureExt, StreamExt};
use ruma::{ use ruma::{
RoomId, ServerName, Int, RoomId, ServerName,
api::federation, api::federation,
events::{ events::{
StateEventType, TimelineEventType, room::power_levels::RoomPowerLevelsEventContent, StateEventType, TimelineEventType,
room::{create::RoomCreateEventContent, power_levels::RoomPowerLevelsEventContent},
}, },
uint, uint,
}; };
@ -23,7 +26,7 @@ use serde_json::value::RawValue as RawJsonValue;
use super::ExtractBody; use super::ExtractBody;
#[implement(super::Service)] #[implement(super::Service)]
#[tracing::instrument(name = "backfill", level = "debug", skip(self))] #[tracing::instrument(name = "backfill", level = "trace", skip(self))]
pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Result<()> { pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Result<()> {
if self if self
.services .services
@ -59,11 +62,47 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re
.room_state_get_content(room_id, &StateEventType::RoomPowerLevels, "") .room_state_get_content(room_id, &StateEventType::RoomPowerLevels, "")
.await .await
.unwrap_or_default(); .unwrap_or_default();
let create_event_content: RoomCreateEventContent = self
.services
.state_accessor
.room_state_get_content(room_id, &StateEventType::RoomCreate, "")
.await?;
let create_event = self
.services
.state_accessor
.room_state_get(room_id, &StateEventType::RoomCreate, "")
.await?;
let room_mods = power_levels.users.iter().filter_map(|(user_id, level)| { let room_version =
if level > &power_levels.users_default && !self.services.globals.user_is_local(user_id) { RoomVersion::new(&create_event_content.room_version).expect("supported room version");
let mut users = power_levels.users.clone();
if room_version.explicitly_privilege_room_creators {
users.insert(create_event.sender().to_owned(), Int::MAX);
if let Some(additional_creators) = &create_event_content.additional_creators {
for user_id in additional_creators {
users.insert(user_id.to_owned(), Int::MAX);
}
}
}
let room_mods = users.iter().filter_map(|(user_id, level)| {
let remote_powered =
level > &power_levels.users_default && !self.services.globals.user_is_local(user_id);
let creator = if room_version.explicitly_privilege_room_creators {
create_event.sender() == user_id
|| create_event_content
.additional_creators
.as_ref()
.is_some_and(|c| c.contains(user_id))
} else {
false
};
if remote_powered || creator {
debug!(%remote_powered, %creator, "User {user_id} can backfill in room {room_id}");
Some(user_id.server_name()) Some(user_id.server_name())
} else { } else {
debug!(%remote_powered, %creator, "User {user_id} cannot backfill in room {room_id}");
None None
} }
}); });
@ -93,11 +132,14 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re
) )
.ready_filter(|server_name| !self.services.globals.server_is_ours(server_name)) .ready_filter(|server_name| !self.services.globals.server_is_ours(server_name))
.filter_map(|server_name| async move { .filter_map(|server_name| async move {
self.services let sn = self
.services
.state_cache .state_cache
.server_in_room(&server_name, room_id) .server_in_room(&server_name, room_id)
.await .await
.then_some(server_name) .then_some(server_name.clone());
trace!(server_name = ?server_name.clone(), "Considering server for backfill");
sn
}) })
.boxed(); .boxed();