From 9f08594136407ebc7560c3ec5168b8639714bda5 Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Sat, 30 Aug 2025 20:41:38 -0700 Subject: [PATCH 1/6] pass and use transaction id to collect timing info --- src/api/server/send.rs | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/src/api/server/send.rs b/src/api/server/send.rs index 601da06c..5a78b5f0 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -79,10 +79,11 @@ pub(crate) async fn send_transaction_message_route( } let txn_start_time = Instant::now(); + let transaction_id = body.transaction_id.to_string(); trace!( pdus = body.pdus.len(), edus = body.edus.len(), - id = ?body.transaction_id, + id = transaction_id, "Processing transaction", ); @@ -106,17 +107,17 @@ pub(crate) async fn send_transaction_message_route( pdus = body.pdus.len(), edus = body.edus.len(), elapsed = ?txn_start_time.elapsed(), - id = ?body.transaction_id, + id = transaction_id, "Validated transaction", ); - let results = handle(&services, &client, body.origin(), txn_start_time, pdus, edus).await?; + let results = handle(&services, &client, body.origin(), txn_start_time, pdus, edus, &*transaction_id).await?; info!( pdus = body.pdus.len(), edus = body.edus.len(), elapsed = ?txn_start_time.elapsed(), - id = ?body.transaction_id, + id = transaction_id, "Finished txn", ); for (id, result) in &results { @@ -142,11 +143,13 @@ async fn handle( started: Instant, pdus: impl Stream + Send, edus: impl Stream + Send, + transaction_id: &str, ) -> Result { + let handle_start = Instant::now(); edus.for_each_concurrent(automatic_width(), |edu| handle_edu(services, client, origin, edu)) .boxed() .await; - + let pdu_start = Instant::now(); // group pdus by room let pdus = pdus .collect() @@ -157,7 +160,7 @@ async fn handle( .collect() }) .await; - + let results_start = Instant::now(); // we can evaluate rooms concurrently let results: ResolvedMap = pdus .into_iter() @@ -172,7 +175,14 @@ async fn handle( .try_collect() .boxed() .await?; - + let handle_stop = Instant::now(); + info!( + edus = pdu_start.saturating_duration_since(handle_start).as_micros(), + pdus = results_start.saturating_duration_since(pdu_start).as_micros(), + handle_room = handle_stop.saturating_duration_since(results_start).as_micros(), + id = ?transaction_id, + "handled incoming transaction", + ); Ok(results) } From 73ca81886a08e1a16626f0c32b01ee27bd61645d Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Tue, 26 Aug 2025 22:25:16 -0700 Subject: [PATCH 2/6] process edus before pdus here, too --- src/api/server/send.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/api/server/send.rs b/src/api/server/send.rs index 5a78b5f0..cd563b18 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -87,14 +87,6 @@ pub(crate) async fn send_transaction_message_route( "Processing transaction", ); - let pdus = body - .pdus - .iter() - .stream() - .broad_then(|pdu| services.rooms.event_handler.parse_incoming_pdu(pdu)) - .inspect_err(|e| debug_warn!("Could not parse PDU: {e}")) - .ready_filter_map(Result::ok); - let edus = body .edus .iter() @@ -103,6 +95,14 @@ pub(crate) async fn send_transaction_message_route( .filter_map(Result::ok) .stream(); + let pdus = body + .pdus + .iter() + .stream() + .broad_then(|pdu| services.rooms.event_handler.parse_incoming_pdu(pdu)) + .inspect_err(|e| debug_warn!("Could not parse PDU: {e}")) + .ready_filter_map(Result::ok); + trace!( pdus = body.pdus.len(), edus = body.edus.len(), From ef32fd3482fd95b4b6550c97a29195ddabe580fc Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Sat, 30 Aug 2025 16:00:46 +0100 Subject: [PATCH 3/6] fix(sync/v2): Room leaves being omitted incorrectly Partially borrowed from https://github.com/matrix-construct/tuwunel/commit/85a84f93c7ef7184a8eee1bb17116e5f0f0faf5a --- src/api/client/sync/v3.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index 01428c08..298a6e4b 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -430,7 +430,7 @@ async fn handle_left_room( .ok(); // Left before last sync - if Some(since) >= left_count { + if (Some(since) >= left_count && !include_leave) || Some(next_batch) < left_count { return Ok(None); } From 9723753b5c598aae5eb4157eacd68346fbc4945d Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Sun, 31 Aug 2025 09:16:52 -0700 Subject: [PATCH 4/6] pass and use transaction id to collect timing info --- src/api/server/send.rs | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/src/api/server/send.rs b/src/api/server/send.rs index 601da06c..f5727cf6 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -79,10 +79,11 @@ pub(crate) async fn send_transaction_message_route( } let txn_start_time = Instant::now(); + let transaction_id = body.transaction_id.to_string(); trace!( pdus = body.pdus.len(), edus = body.edus.len(), - id = ?body.transaction_id, + id = transaction_id, "Processing transaction", ); @@ -106,17 +107,17 @@ pub(crate) async fn send_transaction_message_route( pdus = body.pdus.len(), edus = body.edus.len(), elapsed = ?txn_start_time.elapsed(), - id = ?body.transaction_id, + id = transaction_id, "Validated transaction", ); - let results = handle(&services, &client, body.origin(), txn_start_time, pdus, edus).await?; + let results = handle(&services, &client, body.origin(), txn_start_time, pdus, edus, &*transaction_id).await?; info!( pdus = body.pdus.len(), edus = body.edus.len(), elapsed = ?txn_start_time.elapsed(), - id = ?body.transaction_id, + id = transaction_id, "Finished txn", ); for (id, result) in &results { @@ -142,11 +143,13 @@ async fn handle( started: Instant, pdus: impl Stream + Send, edus: impl Stream + Send, + transaction_id: &str, ) -> Result { + let handle_start = Instant::now(); edus.for_each_concurrent(automatic_width(), |edu| handle_edu(services, client, origin, edu)) .boxed() .await; - + let pdu_start = Instant::now(); // group pdus by room let pdus = pdus .collect() @@ -157,14 +160,14 @@ async fn handle( .collect() }) .await; - + let results_start = Instant::now(); // we can evaluate rooms concurrently let results: ResolvedMap = pdus .into_iter() .try_stream() .broad_and_then(|(room_id, pdus): (_, Vec<_>)| { let count = pdus.len(); - handle_room(services, client, origin, started, room_id, pdus.into_iter(), count) + handle_room(services, client, origin, started, room_id, pdus.into_iter(), count, transaction_id) .map_ok(Vec::into_iter) .map_ok(IterStream::try_stream) }) @@ -172,7 +175,14 @@ async fn handle( .try_collect() .boxed() .await?; - + let handle_stop = Instant::now(); + info!( + edus = pdu_start.saturating_duration_since(handle_start).as_micros(), + pdus = results_start.saturating_duration_since(pdu_start).as_micros(), + handle_room = handle_stop.saturating_duration_since(results_start).as_micros(), + transaction_id = ?transaction_id, + "handled incoming transaction", + ); Ok(results) } @@ -184,6 +194,7 @@ async fn handle_room( room_id: OwnedRoomId, pdus: impl Iterator + Send, count: usize, + transaction_id: &str, ) -> Result> { let _room_lock = services .rooms @@ -201,6 +212,7 @@ async fn handle_room( trace!( %room_id, %event_id, + transaction_id = ?transaction_id, pdu = n + 1, total = count, pdu_elapsed = ?pdu_start_time.elapsed(), @@ -217,11 +229,12 @@ async fn handle_room( info!( %room_id, %event_id, + transaction_id = ?transaction_id, pdu = n + 1, total = count, pdu_elapsed = ?pdu_start_time.elapsed(), txn_elapsed = ?txn_start_time.elapsed(), - "Finished handling PDU {event_id}", + "Finished handling PDU", ); n += 1; From 8eb5f8466fc2d9a35a2a59298a36a84132ef5e69 Mon Sep 17 00:00:00 2001 From: Jacob Taylor Date: Tue, 26 Aug 2025 22:25:16 -0700 Subject: [PATCH 5/6] process edus before pdus here, too --- src/api/server/send.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/api/server/send.rs b/src/api/server/send.rs index f5727cf6..b2794a6e 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -87,14 +87,6 @@ pub(crate) async fn send_transaction_message_route( "Processing transaction", ); - let pdus = body - .pdus - .iter() - .stream() - .broad_then(|pdu| services.rooms.event_handler.parse_incoming_pdu(pdu)) - .inspect_err(|e| debug_warn!("Could not parse PDU: {e}")) - .ready_filter_map(Result::ok); - let edus = body .edus .iter() @@ -103,6 +95,14 @@ pub(crate) async fn send_transaction_message_route( .filter_map(Result::ok) .stream(); + let pdus = body + .pdus + .iter() + .stream() + .broad_then(|pdu| services.rooms.event_handler.parse_incoming_pdu(pdu)) + .inspect_err(|e| debug_warn!("Could not parse PDU: {e}")) + .ready_filter_map(Result::ok); + trace!( pdus = body.pdus.len(), edus = body.edus.len(), From f09e1c075da43948dbfd08654e9c919724f57e23 Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Sat, 30 Aug 2025 16:00:46 +0100 Subject: [PATCH 6/6] fix(sync/v2): Room leaves being omitted incorrectly Partially borrowed from https://github.com/matrix-construct/tuwunel/commit/85a84f93c7ef7184a8eee1bb17116e5f0f0faf5a --- src/api/client/sync/v3.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index 01428c08..298a6e4b 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -430,7 +430,7 @@ async fn handle_left_room( .ok(); // Left before last sync - if Some(since) >= left_count { + if (Some(since) >= left_count && !include_leave) || Some(next_batch) < left_count { return Ok(None); }