Compare commits

..

3 commits

Author SHA1 Message Date
nexy7574
ef32fd3482 fix(sync/v2): Room leaves being omitted incorrectly
Partially borrowed from 85a84f93c7
2025-08-30 20:41:40 -07:00
Jacob Taylor
73ca81886a process edus before pdus here, too 2025-08-30 20:41:39 -07:00
Jacob Taylor
9f08594136 pass and use transaction id to collect timing info 2025-08-30 20:41:38 -07:00

View file

@ -167,7 +167,7 @@ async fn handle(
.try_stream() .try_stream()
.broad_and_then(|(room_id, pdus): (_, Vec<_>)| { .broad_and_then(|(room_id, pdus): (_, Vec<_>)| {
let count = pdus.len(); let count = pdus.len();
handle_room(services, client, origin, started, room_id, pdus.into_iter(), count, transaction_id) handle_room(services, client, origin, started, room_id, pdus.into_iter(), count)
.map_ok(Vec::into_iter) .map_ok(Vec::into_iter)
.map_ok(IterStream::try_stream) .map_ok(IterStream::try_stream)
}) })
@ -180,7 +180,7 @@ async fn handle(
edus = pdu_start.saturating_duration_since(handle_start).as_micros(), edus = pdu_start.saturating_duration_since(handle_start).as_micros(),
pdus = results_start.saturating_duration_since(pdu_start).as_micros(), pdus = results_start.saturating_duration_since(pdu_start).as_micros(),
handle_room = handle_stop.saturating_duration_since(results_start).as_micros(), handle_room = handle_stop.saturating_duration_since(results_start).as_micros(),
transaction_id = ?transaction_id, id = ?transaction_id,
"handled incoming transaction", "handled incoming transaction",
); );
Ok(results) Ok(results)
@ -194,7 +194,6 @@ async fn handle_room(
room_id: OwnedRoomId, room_id: OwnedRoomId,
pdus: impl Iterator<Item = Pdu> + Send, pdus: impl Iterator<Item = Pdu> + Send,
count: usize, count: usize,
transaction_id: &str,
) -> Result<Vec<(OwnedEventId, Result)>> { ) -> Result<Vec<(OwnedEventId, Result)>> {
let _room_lock = services let _room_lock = services
.rooms .rooms
@ -212,7 +211,6 @@ async fn handle_room(
trace!( trace!(
%room_id, %room_id,
%event_id, %event_id,
transaction_id = ?transaction_id,
pdu = n + 1, pdu = n + 1,
total = count, total = count,
pdu_elapsed = ?pdu_start_time.elapsed(), pdu_elapsed = ?pdu_start_time.elapsed(),
@ -229,12 +227,11 @@ async fn handle_room(
info!( info!(
%room_id, %room_id,
%event_id, %event_id,
transaction_id = ?transaction_id,
pdu = n + 1, pdu = n + 1,
total = count, total = count,
pdu_elapsed = ?pdu_start_time.elapsed(), pdu_elapsed = ?pdu_start_time.elapsed(),
txn_elapsed = ?txn_start_time.elapsed(), txn_elapsed = ?txn_start_time.elapsed(),
"Finished handling PDU", "Finished handling PDU {event_id}",
); );
n += 1; n += 1;