force 0ns timeouts to default appropriately in syncv3

This commit is contained in:
nexy7574 2025-05-26 02:32:44 +01:00
commit cdbe5d52c2
No known key found for this signature in database
GPG key ID: 0FA334385D0B689F

View file

@ -1,18 +1,18 @@
use std::{ use std::{
cmp::{self},
collections::{BTreeMap, HashMap, HashSet}, collections::{BTreeMap, HashMap, HashSet},
time::Duration, time::Duration,
}; };
use axum::extract::State; use axum::extract::State;
use conduwuit::{ use conduwuit::{
Result, at, err, error, extract_variant, is_equal_to, Result, at, debug, debug_warn, err, error, extract_variant, is_equal_to,
matrix::{ matrix::{
Event, Event,
pdu::{EventHash, PduCount, PduEvent}, pdu::{EventHash, PduCount, PduEvent},
}, },
pair_of, ref_at, pair_of, ref_at,
result::FlatOk, result::FlatOk,
trace,
utils::{ utils::{
self, BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt, self, BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt,
future::{OptionStream, ReadyEqExt}, future::{OptionStream, ReadyEqExt},
@ -133,8 +133,13 @@ pub(crate) async fn sync_events_route(
// Setup watchers, so if there's no response, we can wait for them // Setup watchers, so if there's no response, we can wait for them
let watcher = services.sync.watch(sender_user, sender_device); let watcher = services.sync.watch(sender_user, sender_device);
let response = build_sync_events(&services, &body).await?; let response = build_sync_events(&services, &body).await?;
trace!("sync body.body.full_state: {}", body.body.full_state);
trace!("response.rooms.is_empty(): {}", response.rooms.is_empty());
trace!("response.presence.is_empty(): {}", response.presence.is_empty());
trace!("response.account_data.is_empty(): {}", response.account_data.is_empty());
trace!("response.device_lists.is_empty(): {}", response.device_lists.is_empty());
trace!("response.to_device.is_empty(): {}", response.to_device.is_empty());
if body.body.full_state if body.body.full_state
|| !(response.rooms.is_empty() || !(response.rooms.is_empty()
&& response.presence.is_empty() && response.presence.is_empty()
@ -142,17 +147,29 @@ pub(crate) async fn sync_events_route(
&& response.device_lists.is_empty() && response.device_lists.is_empty()
&& response.to_device.is_empty()) && response.to_device.is_empty())
{ {
trace!("Sync response built and returning immediately: {response:#?}");
return Ok(response); return Ok(response);
} }
// Hang a few seconds so requests are not spammed // Hang a few seconds so requests are not spammed
// Stop hanging if new info arrives // Stop hanging if new info arrives
let default = Duration::from_secs(30); let default_timeout = Duration::from_secs(30);
let duration = cmp::min(body.body.timeout.unwrap_or(default), default); let user_timeout = body.body.timeout.unwrap_or(default_timeout);
_ = tokio::time::timeout(duration, watcher).await; trace!("User timeout: {user_timeout:?}, default timeout: {default_timeout:?}");
// If the user default resolves to 0ns or less, use the default timeout
let timeout = if user_timeout <= Duration::from_secs(0) {
default_timeout
} else {
user_timeout
};
debug!("Sync was empty, waiting for new data for {timeout:?}");
_ = tokio::time::timeout(timeout, watcher).await;
// Retry returning data // Retry returning data
build_sync_events(&services, &body).await trace!("Sync watcher returned, retrying build_sync_events");
let new_response = build_sync_events(&services, &body).await?;
trace!("Sync watcher returned, built new response: {new_response:#?}");
Ok(new_response)
} }
pub(crate) async fn build_sync_events( pub(crate) async fn build_sync_events(