mirror of
https://forgejo.ellis.link/continuwuation/continuwuity.git
synced 2025-09-10 21:02:50 +02:00
Compare commits
No commits in common. "ef32fd3482fd95b4b6550c97a29195ddabe580fc" and "60b8c9f5f2b92af8997997aa57e870eea9335425" have entirely different histories.
ef32fd3482
...
60b8c9f5f2
2 changed files with 16 additions and 26 deletions
|
@ -430,7 +430,7 @@ async fn handle_left_room(
|
||||||
.ok();
|
.ok();
|
||||||
|
|
||||||
// Left before last sync
|
// Left before last sync
|
||||||
if (Some(since) >= left_count && !include_leave) || Some(next_batch) < left_count {
|
if Some(since) >= left_count {
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -79,22 +79,13 @@ pub(crate) async fn send_transaction_message_route(
|
||||||
}
|
}
|
||||||
|
|
||||||
let txn_start_time = Instant::now();
|
let txn_start_time = Instant::now();
|
||||||
let transaction_id = body.transaction_id.to_string();
|
|
||||||
trace!(
|
trace!(
|
||||||
pdus = body.pdus.len(),
|
pdus = body.pdus.len(),
|
||||||
edus = body.edus.len(),
|
edus = body.edus.len(),
|
||||||
id = transaction_id,
|
id = ?body.transaction_id,
|
||||||
"Processing transaction",
|
"Processing transaction",
|
||||||
);
|
);
|
||||||
|
|
||||||
let edus = body
|
|
||||||
.edus
|
|
||||||
.iter()
|
|
||||||
.map(|edu| edu.json().get())
|
|
||||||
.map(serde_json::from_str)
|
|
||||||
.filter_map(Result::ok)
|
|
||||||
.stream();
|
|
||||||
|
|
||||||
let pdus = body
|
let pdus = body
|
||||||
.pdus
|
.pdus
|
||||||
.iter()
|
.iter()
|
||||||
|
@ -103,21 +94,29 @@ pub(crate) async fn send_transaction_message_route(
|
||||||
.inspect_err(|e| debug_warn!("Could not parse PDU: {e}"))
|
.inspect_err(|e| debug_warn!("Could not parse PDU: {e}"))
|
||||||
.ready_filter_map(Result::ok);
|
.ready_filter_map(Result::ok);
|
||||||
|
|
||||||
|
let edus = body
|
||||||
|
.edus
|
||||||
|
.iter()
|
||||||
|
.map(|edu| edu.json().get())
|
||||||
|
.map(serde_json::from_str)
|
||||||
|
.filter_map(Result::ok)
|
||||||
|
.stream();
|
||||||
|
|
||||||
trace!(
|
trace!(
|
||||||
pdus = body.pdus.len(),
|
pdus = body.pdus.len(),
|
||||||
edus = body.edus.len(),
|
edus = body.edus.len(),
|
||||||
elapsed = ?txn_start_time.elapsed(),
|
elapsed = ?txn_start_time.elapsed(),
|
||||||
id = transaction_id,
|
id = ?body.transaction_id,
|
||||||
"Validated transaction",
|
"Validated transaction",
|
||||||
);
|
);
|
||||||
|
|
||||||
let results = handle(&services, &client, body.origin(), txn_start_time, pdus, edus, &*transaction_id).await?;
|
let results = handle(&services, &client, body.origin(), txn_start_time, pdus, edus).await?;
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
pdus = body.pdus.len(),
|
pdus = body.pdus.len(),
|
||||||
edus = body.edus.len(),
|
edus = body.edus.len(),
|
||||||
elapsed = ?txn_start_time.elapsed(),
|
elapsed = ?txn_start_time.elapsed(),
|
||||||
id = transaction_id,
|
id = ?body.transaction_id,
|
||||||
"Finished txn",
|
"Finished txn",
|
||||||
);
|
);
|
||||||
for (id, result) in &results {
|
for (id, result) in &results {
|
||||||
|
@ -143,13 +142,11 @@ async fn handle(
|
||||||
started: Instant,
|
started: Instant,
|
||||||
pdus: impl Stream<Item = Pdu> + Send,
|
pdus: impl Stream<Item = Pdu> + Send,
|
||||||
edus: impl Stream<Item = Edu> + Send,
|
edus: impl Stream<Item = Edu> + Send,
|
||||||
transaction_id: &str,
|
|
||||||
) -> Result<ResolvedMap> {
|
) -> Result<ResolvedMap> {
|
||||||
let handle_start = Instant::now();
|
|
||||||
edus.for_each_concurrent(automatic_width(), |edu| handle_edu(services, client, origin, edu))
|
edus.for_each_concurrent(automatic_width(), |edu| handle_edu(services, client, origin, edu))
|
||||||
.boxed()
|
.boxed()
|
||||||
.await;
|
.await;
|
||||||
let pdu_start = Instant::now();
|
|
||||||
// group pdus by room
|
// group pdus by room
|
||||||
let pdus = pdus
|
let pdus = pdus
|
||||||
.collect()
|
.collect()
|
||||||
|
@ -160,7 +157,7 @@ async fn handle(
|
||||||
.collect()
|
.collect()
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
let results_start = Instant::now();
|
|
||||||
// we can evaluate rooms concurrently
|
// we can evaluate rooms concurrently
|
||||||
let results: ResolvedMap = pdus
|
let results: ResolvedMap = pdus
|
||||||
.into_iter()
|
.into_iter()
|
||||||
|
@ -175,14 +172,7 @@ async fn handle(
|
||||||
.try_collect()
|
.try_collect()
|
||||||
.boxed()
|
.boxed()
|
||||||
.await?;
|
.await?;
|
||||||
let handle_stop = Instant::now();
|
|
||||||
info!(
|
|
||||||
edus = pdu_start.saturating_duration_since(handle_start).as_micros(),
|
|
||||||
pdus = results_start.saturating_duration_since(pdu_start).as_micros(),
|
|
||||||
handle_room = handle_stop.saturating_duration_since(results_start).as_micros(),
|
|
||||||
id = ?transaction_id,
|
|
||||||
"handled incoming transaction",
|
|
||||||
);
|
|
||||||
Ok(results)
|
Ok(results)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue