mirror of
https://forgejo.ellis.link/continuwuation/continuwuity.git
synced 2025-09-10 20:22:49 +02:00
feat(fed): Handle EDUs before PDUs
Aranje needs his crypto keys
This commit is contained in:
parent
b45b630af7
commit
8584116555
1 changed files with 34 additions and 12 deletions
|
@ -5,7 +5,7 @@ use axum_client_ip::InsecureClientIp;
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Err, Error, Result, debug,
|
Err, Error, Result, debug,
|
||||||
debug::INFO_SPAN_LEVEL,
|
debug::INFO_SPAN_LEVEL,
|
||||||
debug_warn, err, error,
|
debug_warn, err, error, info,
|
||||||
result::LogErr,
|
result::LogErr,
|
||||||
trace,
|
trace,
|
||||||
utils::{
|
utils::{
|
||||||
|
@ -79,13 +79,11 @@ pub(crate) async fn send_transaction_message_route(
|
||||||
}
|
}
|
||||||
|
|
||||||
let txn_start_time = Instant::now();
|
let txn_start_time = Instant::now();
|
||||||
trace!(
|
info!(
|
||||||
pdus = body.pdus.len(),
|
pdus = body.pdus.len(),
|
||||||
edus = body.edus.len(),
|
edus = body.edus.len(),
|
||||||
elapsed = ?txn_start_time.elapsed(),
|
|
||||||
id = ?body.transaction_id,
|
id = ?body.transaction_id,
|
||||||
origin =?body.origin(),
|
"Processing transaction",
|
||||||
"Starting txn",
|
|
||||||
);
|
);
|
||||||
|
|
||||||
let pdus = body
|
let pdus = body
|
||||||
|
@ -104,14 +102,21 @@ pub(crate) async fn send_transaction_message_route(
|
||||||
.filter_map(Result::ok)
|
.filter_map(Result::ok)
|
||||||
.stream();
|
.stream();
|
||||||
|
|
||||||
let results = handle(&services, &client, body.origin(), txn_start_time, pdus, edus).await?;
|
info!(
|
||||||
|
pdus = body.pdus.len(),
|
||||||
debug!(
|
edus = body.edus.len(),
|
||||||
|
elapsed = ?txn_start_time.elapsed(),
|
||||||
|
id = ?body.transaction_id,
|
||||||
|
"Validated transaction",
|
||||||
|
);
|
||||||
|
|
||||||
|
let results = handle(&services, &client, body.origin(), txn_start_time, pdus, edus).await?;
|
||||||
|
|
||||||
|
info!(
|
||||||
pdus = body.pdus.len(),
|
pdus = body.pdus.len(),
|
||||||
edus = body.edus.len(),
|
edus = body.edus.len(),
|
||||||
elapsed = ?txn_start_time.elapsed(),
|
elapsed = ?txn_start_time.elapsed(),
|
||||||
id = ?body.transaction_id,
|
id = ?body.transaction_id,
|
||||||
origin =?body.origin(),
|
|
||||||
"Finished txn",
|
"Finished txn",
|
||||||
);
|
);
|
||||||
for (id, result) in &results {
|
for (id, result) in &results {
|
||||||
|
@ -154,7 +159,8 @@ async fn handle(
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.try_stream()
|
.try_stream()
|
||||||
.broad_and_then(|(room_id, pdus): (_, Vec<_>)| {
|
.broad_and_then(|(room_id, pdus): (_, Vec<_>)| {
|
||||||
handle_room(services, client, origin, started, room_id, pdus.into_iter())
|
let count = pdus.len();
|
||||||
|
handle_room(services, client, origin, started, room_id, pdus.into_iter(), count)
|
||||||
.map_ok(Vec::into_iter)
|
.map_ok(Vec::into_iter)
|
||||||
.map_ok(IterStream::try_stream)
|
.map_ok(IterStream::try_stream)
|
||||||
})
|
})
|
||||||
|
@ -178,6 +184,7 @@ async fn handle_room(
|
||||||
txn_start_time: Instant,
|
txn_start_time: Instant,
|
||||||
room_id: OwnedRoomId,
|
room_id: OwnedRoomId,
|
||||||
pdus: impl Iterator<Item = Pdu> + Send,
|
pdus: impl Iterator<Item = Pdu> + Send,
|
||||||
|
count: usize,
|
||||||
) -> Result<Vec<(OwnedEventId, Result)>> {
|
) -> Result<Vec<(OwnedEventId, Result)>> {
|
||||||
let _room_lock = services
|
let _room_lock = services
|
||||||
.rooms
|
.rooms
|
||||||
|
@ -187,10 +194,20 @@ async fn handle_room(
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
let room_id = &room_id;
|
let room_id = &room_id;
|
||||||
|
let mut n = 0;
|
||||||
pdus.try_stream()
|
pdus.try_stream()
|
||||||
.and_then(|(_, event_id, value)| async move {
|
.and_then(|(_, event_id, value)| async move {
|
||||||
services.server.check_running()?;
|
services.server.check_running()?;
|
||||||
let pdu_start_time = Instant::now();
|
let pdu_start_time = Instant::now();
|
||||||
|
info!(
|
||||||
|
%room_id,
|
||||||
|
%event_id,
|
||||||
|
pdu = n + 1,
|
||||||
|
total = count,
|
||||||
|
pdu_elapsed = ?pdu_start_time.elapsed(),
|
||||||
|
txn_elapsed = ?txn_start_time.elapsed(),
|
||||||
|
"Handling PDU",
|
||||||
|
);
|
||||||
let result = services
|
let result = services
|
||||||
.rooms
|
.rooms
|
||||||
.event_handler
|
.event_handler
|
||||||
|
@ -198,11 +215,16 @@ async fn handle_room(
|
||||||
.await
|
.await
|
||||||
.map(|_| ());
|
.map(|_| ());
|
||||||
|
|
||||||
debug!(
|
info!(
|
||||||
|
%room_id,
|
||||||
|
%event_id,
|
||||||
|
pdu = n + 1,
|
||||||
|
total = count,
|
||||||
pdu_elapsed = ?pdu_start_time.elapsed(),
|
pdu_elapsed = ?pdu_start_time.elapsed(),
|
||||||
txn_elapsed = ?txn_start_time.elapsed(),
|
txn_elapsed = ?txn_start_time.elapsed(),
|
||||||
"Finished PDU {event_id}",
|
"Finished handling PDU {event_id}",
|
||||||
);
|
);
|
||||||
|
n += 1;
|
||||||
|
|
||||||
Ok((event_id, result))
|
Ok((event_id, result))
|
||||||
})
|
})
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue