diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index 298a6e4b..01428c08 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -430,7 +430,7 @@ async fn handle_left_room( .ok(); // Left before last sync - if (Some(since) >= left_count && !include_leave) || Some(next_batch) < left_count { + if Some(since) >= left_count { return Ok(None); } diff --git a/src/api/server/send.rs b/src/api/server/send.rs index cd563b18..601da06c 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -79,22 +79,13 @@ pub(crate) async fn send_transaction_message_route( } let txn_start_time = Instant::now(); - let transaction_id = body.transaction_id.to_string(); trace!( pdus = body.pdus.len(), edus = body.edus.len(), - id = transaction_id, + id = ?body.transaction_id, "Processing transaction", ); - let edus = body - .edus - .iter() - .map(|edu| edu.json().get()) - .map(serde_json::from_str) - .filter_map(Result::ok) - .stream(); - let pdus = body .pdus .iter() @@ -103,21 +94,29 @@ pub(crate) async fn send_transaction_message_route( .inspect_err(|e| debug_warn!("Could not parse PDU: {e}")) .ready_filter_map(Result::ok); + let edus = body + .edus + .iter() + .map(|edu| edu.json().get()) + .map(serde_json::from_str) + .filter_map(Result::ok) + .stream(); + trace!( pdus = body.pdus.len(), edus = body.edus.len(), elapsed = ?txn_start_time.elapsed(), - id = transaction_id, + id = ?body.transaction_id, "Validated transaction", ); - let results = handle(&services, &client, body.origin(), txn_start_time, pdus, edus, &*transaction_id).await?; + let results = handle(&services, &client, body.origin(), txn_start_time, pdus, edus).await?; info!( pdus = body.pdus.len(), edus = body.edus.len(), elapsed = ?txn_start_time.elapsed(), - id = transaction_id, + id = ?body.transaction_id, "Finished txn", ); for (id, result) in &results { @@ -143,13 +142,11 @@ async fn handle( started: Instant, pdus: impl Stream + Send, edus: impl Stream + Send, - transaction_id: &str, ) -> Result { - let handle_start = Instant::now(); edus.for_each_concurrent(automatic_width(), |edu| handle_edu(services, client, origin, edu)) .boxed() .await; - let pdu_start = Instant::now(); + // group pdus by room let pdus = pdus .collect() @@ -160,7 +157,7 @@ async fn handle( .collect() }) .await; - let results_start = Instant::now(); + // we can evaluate rooms concurrently let results: ResolvedMap = pdus .into_iter() @@ -175,14 +172,7 @@ async fn handle( .try_collect() .boxed() .await?; - let handle_stop = Instant::now(); - info!( - edus = pdu_start.saturating_duration_since(handle_start).as_micros(), - pdus = results_start.saturating_duration_since(pdu_start).as_micros(), - handle_room = handle_stop.saturating_duration_since(results_start).as_micros(), - id = ?transaction_id, - "handled incoming transaction", - ); + Ok(results) }