feat: Ask remote servers for individual unknown events

This commit is contained in:
nexy7574 2025-07-08 17:07:50 +01:00
parent 13b21b00a9
commit 89e2faaa8e
No known key found for this signature in database
GPG key ID: 0FA334385D0B689F
2 changed files with 112 additions and 9 deletions

View file

@ -18,7 +18,7 @@ pub(crate) async fn get_room_event_route(
let event = services let event = services
.rooms .rooms
.timeline .timeline
.get_pdu(event_id) .get_remote_pdu(room_id, event_id)
.map_err(|_| err!(Request(NotFound("Event {} not found.", event_id)))); .map_err(|_| err!(Request(NotFound("Event {} not found.", event_id))));
let visible = services let visible = services
@ -33,11 +33,6 @@ pub(crate) async fn get_room_event_route(
return Err!(Request(Forbidden("You don't have permission to view this event."))); return Err!(Request(Forbidden("You don't have permission to view this event.")));
} }
debug_assert!(
event.event_id() == event_id && event.room_id() == room_id,
"Fetched PDU must match requested"
);
event.add_age().ok(); event.add_age().ok();
Ok(get_room_event::v3::Response { event: event.into_format() }) Ok(get_room_event::v3::Response { event: event.into_format() })

View file

@ -1,5 +1,6 @@
use std::iter::once; use std::iter::once;
use conduwuit::{Err, PduEvent};
use conduwuit_core::{ use conduwuit_core::{
Result, debug, debug_warn, implement, info, Result, debug, debug_warn, implement, info,
matrix::{ matrix::{
@ -11,7 +12,7 @@ use conduwuit_core::{
}; };
use futures::{FutureExt, StreamExt}; use futures::{FutureExt, StreamExt};
use ruma::{ use ruma::{
RoomId, ServerName, EventId, RoomId, ServerName,
api::federation, api::federation,
events::{ events::{
StateEventType, TimelineEventType, room::power_levels::RoomPowerLevelsEventContent, StateEventType, TimelineEventType, room::power_levels::RoomPowerLevelsEventContent,
@ -100,7 +101,7 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re
.boxed(); .boxed();
while let Some(ref backfill_server) = servers.next().await { while let Some(ref backfill_server) = servers.next().await {
info!("Asking {backfill_server} for backfill"); info!("Asking {backfill_server} for backfill in {room_id}");
let response = self let response = self
.services .services
.sending .sending
@ -128,10 +129,117 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re
} }
} }
info!("No servers could backfill, but backfill was needed in room {room_id}"); warn!("No servers could backfill, but backfill was needed in room {room_id}");
Ok(()) Ok(())
} }
#[implement(super::Service)]
#[tracing::instrument(name = "get_remote_pdu", level = "debug", skip(self))]
pub async fn get_remote_pdu(&self, room_id: &RoomId, event_id: &EventId) -> Result<PduEvent> {
let local = self.get_pdu(event_id).await;
if local.is_ok() {
// We already have this PDU, no need to backfill
debug!("We already have {event_id} in {room_id}, no need to backfill.");
return local;
}
debug!("Preparing to fetch event {event_id} in room {room_id} from remote servers.");
// Similar to backfill_if_required, but only for a single PDU
// Fetch a list of servers to try
if self
.services
.state_cache
.room_joined_count(room_id)
.await
.is_ok_and(|count| count <= 1)
&& !self
.services
.state_accessor
.is_world_readable(room_id)
.await
{
// Room is empty (1 user or none), there is no one that can backfill
return Err!(Request(NotFound("No one can backfill this PDU, room is empty.")));
}
let power_levels: RoomPowerLevelsEventContent = self
.services
.state_accessor
.room_state_get_content(room_id, &StateEventType::RoomPowerLevels, "")
.await
.unwrap_or_default();
let room_mods = power_levels.users.iter().filter_map(|(user_id, level)| {
if level > &power_levels.users_default && !self.services.globals.user_is_local(user_id) {
Some(user_id.server_name())
} else {
None
}
});
let canonical_room_alias_server = once(
self.services
.state_accessor
.get_canonical_alias(room_id)
.await,
)
.filter_map(Result::ok)
.map(|alias| alias.server_name().to_owned())
.stream();
let mut servers = room_mods
.stream()
.map(ToOwned::to_owned)
.chain(canonical_room_alias_server)
.chain(
self.services
.server
.config
.trusted_servers
.iter()
.map(ToOwned::to_owned)
.stream(),
)
.ready_filter(|server_name| !self.services.globals.server_is_ours(server_name))
.filter_map(|server_name| async move {
self.services
.state_cache
.server_in_room(&server_name, room_id)
.await
.then_some(server_name)
})
.boxed();
while let Some(ref backfill_server) = servers.next().await {
info!("Asking {backfill_server} for event {}", event_id);
let response = self
.services
.sending
.send_federation_request(backfill_server, federation::event::get_event::v1::Request {
event_id: event_id.to_owned(),
include_unredacted_content: Some(false),
})
.await;
let pdu = match response {
| Ok(response) => {
self.backfill_pdu(backfill_server, response.pdu)
.boxed()
.await?;
debug!("Successfully backfilled {event_id} from {backfill_server}");
Some(self.get_pdu(event_id).await)
},
| Err(e) => {
warn!("{backfill_server} failed to provide backfill for room {room_id}: {e}");
None
},
};
if let Some(pdu) = pdu {
debug!("Fetched {event_id} from {backfill_server}");
return pdu;
}
}
Err!("No servers could be used to fetch {} in {}.", room_id, event_id)
}
#[implement(super::Service)] #[implement(super::Service)]
#[tracing::instrument(skip(self, pdu), level = "debug")] #[tracing::instrument(skip(self, pdu), level = "debug")]
pub async fn backfill_pdu(&self, origin: &ServerName, pdu: Box<RawJsonValue>) -> Result<()> { pub async fn backfill_pdu(&self, origin: &ServerName, pdu: Box<RawJsonValue>) -> Result<()> {