From 0abfc192d45dabd1725d27947995e020aa4d500b Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Fri, 15 Aug 2025 04:10:40 +0100 Subject: [PATCH] fix(fed): Improve transaction flushing --- src/service/federation/execute.rs | 6 +++--- src/service/sending/sender.rs | 30 ++++++++++++++++++++++++++---- 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/src/service/federation/execute.rs b/src/service/federation/execute.rs index 1d1d1154..8be1befa 100644 --- a/src/service/federation/execute.rs +++ b/src/service/federation/execute.rs @@ -3,7 +3,7 @@ use std::{fmt::Debug, mem}; use bytes::Bytes; use conduwuit::{ Err, Error, Result, debug, debug::INFO_SPAN_LEVEL, debug_error, debug_warn, err, - error::inspect_debug_log, implement, trace, utils::string::EMPTY, + error::inspect_debug_log, implement, trace, utils::string::EMPTY, warn, }; use http::{HeaderValue, header::AUTHORIZATION}; use ipaddress::IPAddress; @@ -193,7 +193,7 @@ fn handle_error( ) -> Result { if e.is_timeout() || e.is_connect() { e = e.without_url(); - debug_warn!("{e:?}"); + debug_warn!(?url, "network error while sending request: {e:?}"); } else if e.is_redirect() { debug_error!( method = ?method, @@ -204,7 +204,7 @@ fn handle_error( e, ); } else { - debug_error!("{e:?}"); + warn!(?url, "failed to send federation request: {e:?}"); } Err(e.into()) diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index a708f746..7af4b533 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -10,7 +10,7 @@ use std::{ use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD}; use conduwuit_core::{ - Error, Event, Result, debug, err, error, + Error, Event, Result, debug, err, error, info, result::LogErr, trace, utils::{ @@ -142,7 +142,7 @@ impl Service { } fn handle_response_err(dest: Destination, statuses: &mut CurTransactionStatus, e: &Error) { - debug!(dest = ?dest, "{e:?}"); + debug!(dest = ?dest, "error response: {e:?}"); statuses.entry(dest).and_modify(|e| { *e = match e { | TransactionStatus::Running => TransactionStatus::Failed(1, Instant::now()), @@ -177,7 +177,21 @@ impl Service { if !new_events.is_empty() { self.db.mark_as_active(new_events.iter()); - let new_events_vec = new_events.into_iter().map(|(_, event)| event).collect(); + let new_events_vec: Vec = + new_events.into_iter().map(|(_, event)| event).collect(); + + if let Some(status) = statuses.get(&dest.clone()) { + if matches!(status, TransactionStatus::Running) { + // If the server is in backoff, clear it + warn!( + ?dest, + "Catching up destination with {} new events", + new_events_vec.len() + ); + statuses.insert(dest.clone(), TransactionStatus::Running); + } + } + futures.push(self.send_events(dest.clone(), new_events_vec)); } else { statuses.remove(dest); @@ -859,12 +873,20 @@ impl Service { pdus, edus, }; + let pdu_count = request.pdus.len(); + let edu_count = request.edus.len(); let result = self .services .federation .execute_on(&self.services.client.sender, &server, request) - .await; + .await + .inspect(|_| { + info!(%txn_id, %server, "Sent {} PDUs, {} EDUs", pdu_count, edu_count); + }) + .inspect_err(|e| { + error!(%txn_id, %server, "Failed to send transaction ({} PDUs, {} EDUs): {e:?}", pdu_count, edu_count); + }); for (event_id, result) in result.iter().flat_map(|resp| resp.pdus.iter()) { if let Err(e) = result {