mirror of
https://forgejo.ellis.link/continuwuation/continuwuity.git
synced 2025-09-11 01:32:49 +02:00
Compare commits
5 commits
2106e4e9f4
...
60b8c9f5f2
Author | SHA1 | Date | |
---|---|---|---|
|
60b8c9f5f2 | ||
|
c83f1ddb71 | ||
|
a75eacd544 | ||
|
e15d71d230 | ||
|
a7929c1931 |
6 changed files with 20 additions and 29 deletions
|
@ -3,7 +3,7 @@ use std::{collections::BTreeMap, net::IpAddr, time::Instant};
|
||||||
use axum::extract::State;
|
use axum::extract::State;
|
||||||
use axum_client_ip::InsecureClientIp;
|
use axum_client_ip::InsecureClientIp;
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Err, Error, Result, debug,
|
Err, Error, Result,
|
||||||
debug::INFO_SPAN_LEVEL,
|
debug::INFO_SPAN_LEVEL,
|
||||||
debug_warn, err, error, info,
|
debug_warn, err, error, info,
|
||||||
result::LogErr,
|
result::LogErr,
|
||||||
|
@ -79,7 +79,7 @@ pub(crate) async fn send_transaction_message_route(
|
||||||
}
|
}
|
||||||
|
|
||||||
let txn_start_time = Instant::now();
|
let txn_start_time = Instant::now();
|
||||||
info!(
|
trace!(
|
||||||
pdus = body.pdus.len(),
|
pdus = body.pdus.len(),
|
||||||
edus = body.edus.len(),
|
edus = body.edus.len(),
|
||||||
id = ?body.transaction_id,
|
id = ?body.transaction_id,
|
||||||
|
@ -102,7 +102,7 @@ pub(crate) async fn send_transaction_message_route(
|
||||||
.filter_map(Result::ok)
|
.filter_map(Result::ok)
|
||||||
.stream();
|
.stream();
|
||||||
|
|
||||||
info!(
|
trace!(
|
||||||
pdus = body.pdus.len(),
|
pdus = body.pdus.len(),
|
||||||
edus = body.edus.len(),
|
edus = body.edus.len(),
|
||||||
elapsed = ?txn_start_time.elapsed(),
|
elapsed = ?txn_start_time.elapsed(),
|
||||||
|
@ -198,7 +198,7 @@ async fn handle_room(
|
||||||
.and_then(|(_, event_id, value)| async move {
|
.and_then(|(_, event_id, value)| async move {
|
||||||
services.server.check_running()?;
|
services.server.check_running()?;
|
||||||
let pdu_start_time = Instant::now();
|
let pdu_start_time = Instant::now();
|
||||||
info!(
|
trace!(
|
||||||
%room_id,
|
%room_id,
|
||||||
%event_id,
|
%event_id,
|
||||||
pdu = n + 1,
|
pdu = n + 1,
|
||||||
|
|
|
@ -5,10 +5,7 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use conduwuit::{
|
use conduwuit::{Err, Error, Result, debug, debug::INFO_SPAN_LEVEL, err, error::inspect_debug_log, implement, trace, utils::string::EMPTY, info};
|
||||||
Err, Error, Result, debug, debug::INFO_SPAN_LEVEL, err,
|
|
||||||
error::inspect_debug_log, implement, trace, utils::string::EMPTY, warn,
|
|
||||||
};
|
|
||||||
use http::{HeaderValue, header::AUTHORIZATION};
|
use http::{HeaderValue, header::AUTHORIZATION};
|
||||||
use ipaddress::IPAddress;
|
use ipaddress::IPAddress;
|
||||||
use reqwest::{Client, Method, Request, Response, Url};
|
use reqwest::{Client, Method, Request, Response, Url};
|
||||||
|
@ -197,9 +194,9 @@ fn handle_error(
|
||||||
) -> Result {
|
) -> Result {
|
||||||
if e.is_timeout() || e.is_connect() {
|
if e.is_timeout() || e.is_connect() {
|
||||||
e = e.without_url();
|
e = e.without_url();
|
||||||
warn!(?url, "network error while sending federation request: {e:?}");
|
trace!(?url, "network error while sending federation request: {e:?}");
|
||||||
} else if e.is_redirect() {
|
} else if e.is_redirect() {
|
||||||
warn!(
|
trace!(
|
||||||
method = ?method,
|
method = ?method,
|
||||||
url = ?url,
|
url = ?url,
|
||||||
final_url = ?e.url(),
|
final_url = ?e.url(),
|
||||||
|
@ -208,7 +205,7 @@ fn handle_error(
|
||||||
e,
|
e,
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
warn!(?url, "failed to send federation request: {e:?}");
|
trace!(?url, "failed to send federation request: {e:?}");
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut nice_error = "Request failed".to_owned();
|
let mut nice_error = "Request failed".to_owned();
|
||||||
|
@ -217,7 +214,7 @@ fn handle_error(
|
||||||
write!(nice_error, ": {source:?}").expect("writing to string should not fail");
|
write!(nice_error, ": {source:?}").expect("writing to string should not fail");
|
||||||
src = source.source();
|
src = source.source();
|
||||||
}
|
}
|
||||||
warn!(nice_error, "Federation request error");
|
info!(nice_error, "Federation request error");
|
||||||
|
|
||||||
Err(e.into())
|
Err(e.into())
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,11 +3,7 @@ use std::{
|
||||||
time::Instant,
|
time::Instant,
|
||||||
};
|
};
|
||||||
|
|
||||||
use conduwuit::{
|
use conduwuit::{Event, PduEvent, debug, debug_error, implement, matrix::event::gen_event_id_canonical_json, trace, utils::continue_exponential_backoff_secs, warn, debug_warn};
|
||||||
Event, PduEvent, debug, debug_error, debug_warn, implement,
|
|
||||||
matrix::event::gen_event_id_canonical_json, trace, utils::continue_exponential_backoff_secs,
|
|
||||||
warn,
|
|
||||||
};
|
|
||||||
use ruma::{
|
use ruma::{
|
||||||
CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName,
|
CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName,
|
||||||
api::federation::event::get_event,
|
api::federation::event::get_event,
|
||||||
|
|
|
@ -123,7 +123,7 @@ where
|
||||||
|
|
||||||
// The original create event must be in the auth events
|
// The original create event must be in the auth events
|
||||||
if !auth_events.contains_key(&(StateEventType::RoomCreate, String::new().into())) {
|
if !auth_events.contains_key(&(StateEventType::RoomCreate, String::new().into())) {
|
||||||
return Err!(Request(InvalidParam("Incoming event refers to wrong create event.")));
|
return Err!(Request(InvalidParam("Incoming event refers to wrong create event. event_id={event_id}")));
|
||||||
}
|
}
|
||||||
|
|
||||||
let state_fetch = |ty: &StateEventType, sk: &str| {
|
let state_fetch = |ty: &StateEventType, sk: &str| {
|
||||||
|
|
|
@ -1,13 +1,13 @@
|
||||||
use std::iter::once;
|
use std::iter::once;
|
||||||
|
|
||||||
use conduwuit_core::{
|
use conduwuit_core::{
|
||||||
Result, debug, debug_warn, implement, info,
|
Result, debug, implement, info,
|
||||||
matrix::{
|
matrix::{
|
||||||
event::Event,
|
event::Event,
|
||||||
pdu::{PduCount, PduId, RawPduId},
|
pdu::{PduCount, PduId, RawPduId},
|
||||||
},
|
},
|
||||||
utils::{IterStream, ReadyExt},
|
utils::{IterStream, ReadyExt},
|
||||||
validated, warn,
|
validated,
|
||||||
};
|
};
|
||||||
use futures::{FutureExt, StreamExt};
|
use futures::{FutureExt, StreamExt};
|
||||||
use ruma::{
|
use ruma::{
|
||||||
|
@ -15,9 +15,7 @@ use ruma::{
|
||||||
api::federation,
|
api::federation,
|
||||||
events::{
|
events::{
|
||||||
StateEventType, TimelineEventType, room::power_levels::RoomPowerLevelsEventContent,
|
StateEventType, TimelineEventType, room::power_levels::RoomPowerLevelsEventContent,
|
||||||
},
|
}, UInt};
|
||||||
uint,
|
|
||||||
};
|
|
||||||
use serde_json::value::RawValue as RawJsonValue;
|
use serde_json::value::RawValue as RawJsonValue;
|
||||||
|
|
||||||
use super::ExtractBody;
|
use super::ExtractBody;
|
||||||
|
@ -100,7 +98,7 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re
|
||||||
.boxed();
|
.boxed();
|
||||||
|
|
||||||
while let Some(ref backfill_server) = servers.next().await {
|
while let Some(ref backfill_server) = servers.next().await {
|
||||||
info!("Asking {backfill_server} for backfill");
|
info!("Asking {backfill_server} for backfill of room {room_id}");
|
||||||
let response = self
|
let response = self
|
||||||
.services
|
.services
|
||||||
.sending
|
.sending
|
||||||
|
@ -109,7 +107,7 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re
|
||||||
federation::backfill::get_backfill::v1::Request {
|
federation::backfill::get_backfill::v1::Request {
|
||||||
room_id: room_id.to_owned(),
|
room_id: room_id.to_owned(),
|
||||||
v: vec![first_pdu.1.event_id().to_owned()],
|
v: vec![first_pdu.1.event_id().to_owned()],
|
||||||
limit: uint!(100),
|
limit: UInt::from(self.services.server.config.max_fetch_prev_events),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
@ -117,13 +115,13 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re
|
||||||
| Ok(response) => {
|
| Ok(response) => {
|
||||||
for pdu in response.pdus {
|
for pdu in response.pdus {
|
||||||
if let Err(e) = self.backfill_pdu(backfill_server, pdu).boxed().await {
|
if let Err(e) = self.backfill_pdu(backfill_server, pdu).boxed().await {
|
||||||
debug_warn!("Failed to add backfilled pdu in room {room_id}: {e}");
|
info!("Failed to add backfilled pdu in room {room_id}: {e}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return Ok(());
|
return Ok(());
|
||||||
},
|
},
|
||||||
| Err(e) => {
|
| Err(e) => {
|
||||||
warn!("{backfill_server} failed to provide backfill for room {room_id}: {e}");
|
info!("{backfill_server} failed to provide backfill for room {room_id}: {e}");
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -882,7 +882,7 @@ impl Service {
|
||||||
.execute_on(&self.services.client.sender, &server, request)
|
.execute_on(&self.services.client.sender, &server, request)
|
||||||
.await
|
.await
|
||||||
.inspect(|_| {
|
.inspect(|_| {
|
||||||
info!(%txn_id, %server, "Sent {} PDUs, {} EDUs", pdu_count, edu_count);
|
trace!(%txn_id, %server, "Sent {} PDUs, {} EDUs", pdu_count, edu_count);
|
||||||
})
|
})
|
||||||
.inspect_err(|e| {
|
.inspect_err(|e| {
|
||||||
info!(%txn_id, %server, "Failed to send transaction ({} PDUs, {} EDUs): {e:?}", pdu_count, edu_count);
|
info!(%txn_id, %server, "Failed to send transaction ({} PDUs, {} EDUs): {e:?}", pdu_count, edu_count);
|
||||||
|
@ -890,7 +890,7 @@ impl Service {
|
||||||
|
|
||||||
for (event_id, result) in result.iter().flat_map(|resp| resp.pdus.iter()) {
|
for (event_id, result) in result.iter().flat_map(|resp| resp.pdus.iter()) {
|
||||||
if let Err(e) = result {
|
if let Err(e) = result {
|
||||||
warn!(
|
trace!(
|
||||||
%txn_id, %server,
|
%txn_id, %server,
|
||||||
"error sending PDU {event_id} to remote server: {e:?}"
|
"error sending PDU {event_id} to remote server: {e:?}"
|
||||||
);
|
);
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue