mirror of
https://forgejo.ellis.link/continuwuation/continuwuity.git
synced 2025-07-09 16:06:41 +02:00
Fix spans in tokio::spawn-ed tasks
tokio::spawn is a span boundary, the spawned future has no parent span. For short futures, we simply inherit the current span with `.in_current_span()`. For long running futures containing a sleeping infinite loop, we don't actually want a span on the entire task or even the entire loop body, both would result in very long spans. Instead, we put the outermost span (created using #[tracing::instrument] or .instrument()) around the actual work happening after the sleep, which results in a new root span being created after every sleep.
This commit is contained in:
parent
69081a49aa
commit
56edd5b037
5 changed files with 104 additions and 97 deletions
|
@ -96,7 +96,7 @@ pub(crate) async fn sync_events_route(
|
|||
|
||||
v.insert((body.since.clone(), rx.clone()));
|
||||
|
||||
tokio::spawn(sync_helper_wrapper(sender_user.clone(), sender_device.clone(), body, tx));
|
||||
tokio::spawn(sync_helper_wrapper(sender_user.clone(), sender_device.clone(), body, tx).in_current_span());
|
||||
|
||||
rx
|
||||
},
|
||||
|
@ -108,7 +108,9 @@ pub(crate) async fn sync_events_route(
|
|||
|
||||
debug!("Sync started for {sender_user}");
|
||||
|
||||
tokio::spawn(sync_helper_wrapper(sender_user.clone(), sender_device.clone(), body, tx));
|
||||
tokio::spawn(
|
||||
sync_helper_wrapper(sender_user.clone(), sender_device.clone(), body, tx).in_current_span(),
|
||||
);
|
||||
|
||||
rx
|
||||
} else {
|
||||
|
|
|
@ -37,7 +37,7 @@ use serde::Deserialize;
|
|||
#[cfg(unix)]
|
||||
use tokio::signal::unix::{signal, SignalKind};
|
||||
use tokio::time::{interval, Instant};
|
||||
use tracing::{debug, error, warn};
|
||||
use tracing::{debug, error, info_span, warn, Instrument as _};
|
||||
|
||||
use crate::{
|
||||
database::migrations::migrations, service::rooms::timeline::PduCount, services, Config, Error,
|
||||
|
@ -436,6 +436,7 @@ impl KeyValueDatabase {
|
|||
});
|
||||
}
|
||||
|
||||
#[tracing::instrument]
|
||||
async fn try_handle_updates() -> Result<()> {
|
||||
let response = services()
|
||||
.globals
|
||||
|
@ -487,29 +488,35 @@ impl KeyValueDatabase {
|
|||
|
||||
loop {
|
||||
#[cfg(unix)]
|
||||
tokio::select! {
|
||||
_ = i.tick() => {
|
||||
debug!(target: "database-cleanup", "Timer ticked");
|
||||
}
|
||||
_ = hangup.recv() => {
|
||||
debug!(target: "database-cleanup","Received SIGHUP");
|
||||
}
|
||||
_ = ctrl_c.recv() => {
|
||||
debug!(target: "database-cleanup", "Received Ctrl+C");
|
||||
}
|
||||
_ = terminate.recv() => {
|
||||
debug!(target: "database-cleanup","Received SIGTERM");
|
||||
}
|
||||
}
|
||||
let msg = tokio::select! {
|
||||
_ = i.tick() => || {
|
||||
debug!("Timer ticked");
|
||||
},
|
||||
_ = hangup.recv() => || {
|
||||
debug!("Received SIGHUP");
|
||||
},
|
||||
_ = ctrl_c.recv() => || {
|
||||
debug!("Received Ctrl+C");
|
||||
},
|
||||
_ = terminate.recv() => || {
|
||||
debug!("Received SIGTERM");
|
||||
},
|
||||
};
|
||||
|
||||
#[cfg(not(unix))]
|
||||
{
|
||||
let msg = {
|
||||
i.tick().await;
|
||||
debug!(target: "database-cleanup", "Timer ticked")
|
||||
}
|
||||
|| debug!("Timer ticked")
|
||||
};
|
||||
|
||||
async {
|
||||
msg();
|
||||
|
||||
Self::perform_cleanup();
|
||||
}
|
||||
.instrument(info_span!("database_cleanup"))
|
||||
.await;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
@ -24,7 +24,7 @@ use ruma::{
|
|||
};
|
||||
use serde_json::value::to_raw_value;
|
||||
use tokio::sync::{Mutex, MutexGuard};
|
||||
use tracing::{error, warn};
|
||||
use tracing::{error, info_span, warn, Instrument};
|
||||
|
||||
use self::{fsck::FsckCommand, tester::TesterCommands};
|
||||
use super::pdu::PduBuilder;
|
||||
|
@ -116,10 +116,32 @@ impl Service {
|
|||
pub(crate) fn start_handler(self: &Arc<Self>) {
|
||||
let self2 = Arc::clone(self);
|
||||
tokio::spawn(async move {
|
||||
self2
|
||||
.handler()
|
||||
.await
|
||||
.expect("Failed to initialize admin room handler");
|
||||
let receiver = self2.receiver.lock().await;
|
||||
let Ok(Some(admin_room)) = Self::get_admin_room().await else {
|
||||
return;
|
||||
};
|
||||
let server_name = services().globals.server_name();
|
||||
let server_user = UserId::parse(format!("@conduit:{server_name}")).expect("server's username is valid");
|
||||
|
||||
loop {
|
||||
debug_assert!(!receiver.is_closed(), "channel closed");
|
||||
let event = receiver.recv_async().await;
|
||||
|
||||
async {
|
||||
let ret = match event {
|
||||
Ok(event) => self2.handle_event(event, &admin_room, &server_user).await,
|
||||
Err(e) => {
|
||||
error!("Failed to receive admin room event from channel: {e}");
|
||||
return;
|
||||
},
|
||||
};
|
||||
if let Err(e) = ret {
|
||||
error!("Failed to handle admin room event: {e}");
|
||||
}
|
||||
}
|
||||
.instrument(info_span!("admin_event_received"))
|
||||
.await;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -139,25 +161,6 @@ impl Service {
|
|||
self.sender.send(message).expect("message sent");
|
||||
}
|
||||
|
||||
async fn handler(&self) -> Result<()> {
|
||||
let receiver = self.receiver.lock().await;
|
||||
let Ok(Some(admin_room)) = Self::get_admin_room().await else {
|
||||
return Ok(());
|
||||
};
|
||||
let server_name = services().globals.server_name();
|
||||
let server_user = UserId::parse(format!("@conduit:{server_name}")).expect("server's username is valid");
|
||||
|
||||
loop {
|
||||
debug_assert!(!receiver.is_closed(), "channel closed");
|
||||
tokio::select! {
|
||||
event = receiver.recv_async() => match event {
|
||||
Ok(event) => self.handle_event(event, &admin_room, &server_user).await?,
|
||||
Err(e) => error!("Failed to receive admin room event from channel: {e}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_event(&self, event: AdminRoomEvent, admin_room: &RoomId, server_user: &UserId) -> Result<()> {
|
||||
let (mut message_content, reply) = match event {
|
||||
AdminRoomEvent::SendMessage(content) => (content, None),
|
||||
|
|
|
@ -11,7 +11,7 @@ use ruma::{
|
|||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::{sync::Mutex, time::sleep};
|
||||
use tracing::{debug, error};
|
||||
use tracing::{debug, error, info_span, Instrument as _};
|
||||
|
||||
use crate::{
|
||||
services,
|
||||
|
@ -99,10 +99,34 @@ impl Service {
|
|||
pub(crate) fn start_handler(self: &Arc<Self>) {
|
||||
let self_ = Arc::clone(self);
|
||||
tokio::spawn(async move {
|
||||
self_
|
||||
.handler()
|
||||
.await
|
||||
.expect("Failed to start presence handler");
|
||||
let mut presence_timers = FuturesUnordered::new();
|
||||
let receiver = self_.timer_receiver.lock().await;
|
||||
loop {
|
||||
tokio::select! {
|
||||
event = receiver.recv_async() => async {
|
||||
match event {
|
||||
Ok((user_id, timeout)) => {
|
||||
debug!("Adding timer {}: {user_id} timeout:{timeout:?}", presence_timers.len());
|
||||
presence_timers.push(presence_timer(user_id, timeout));
|
||||
}
|
||||
Err(e) => {
|
||||
// generally shouldn't happen
|
||||
error!("Failed to receive presence timer through channel: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
.instrument(info_span!("presence_event_received"))
|
||||
.await,
|
||||
|
||||
Some(user_id) = presence_timers.next() => async {
|
||||
if let Err(e) = process_presence_timer(&user_id) {
|
||||
error!(?user_id, "Failed to process presence timer: {e}");
|
||||
}
|
||||
}
|
||||
.instrument(info_span!("presence_timer_expired"))
|
||||
.await,
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -186,32 +210,6 @@ impl Service {
|
|||
pub(crate) fn presence_since(&self, since: u64) -> Box<dyn Iterator<Item = (OwnedUserId, u64, Vec<u8>)>> {
|
||||
self.db.presence_since(since)
|
||||
}
|
||||
|
||||
async fn handler(&self) -> Result<()> {
|
||||
let mut presence_timers = FuturesUnordered::new();
|
||||
let receiver = self.timer_receiver.lock().await;
|
||||
loop {
|
||||
tokio::select! {
|
||||
event = receiver.recv_async() => {
|
||||
|
||||
match event {
|
||||
Ok((user_id, timeout)) => {
|
||||
debug!("Adding timer {}: {user_id} timeout:{timeout:?}", presence_timers.len());
|
||||
presence_timers.push(presence_timer(user_id, timeout));
|
||||
}
|
||||
Err(e) => {
|
||||
// generally shouldn't happen
|
||||
error!("Failed to receive presence timer through channel: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Some(user_id) = presence_timers.next() => {
|
||||
process_presence_timer(&user_id)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn presence_timer(user_id: OwnedUserId, timeout: Duration) -> OwnedUserId {
|
||||
|
|
|
@ -50,30 +50,25 @@ impl Service {
|
|||
pub(crate) fn start_handler(self: &Arc<Self>) {
|
||||
let self2 = Arc::clone(self);
|
||||
tokio::spawn(async move {
|
||||
self2.handler().await;
|
||||
});
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all, name = "sender")]
|
||||
async fn handler(&self) {
|
||||
let receiver = self.receiver.lock().await;
|
||||
let receiver = self2.receiver.lock().await;
|
||||
debug_assert!(!receiver.is_closed(), "channel error");
|
||||
|
||||
let mut futures: SendingFutures<'_> = FuturesUnordered::new();
|
||||
let mut statuses: CurTransactionStatus = CurTransactionStatus::new();
|
||||
self.initial_transactions(&mut futures, &mut statuses);
|
||||
self2.initial_transactions(&mut futures, &mut statuses);
|
||||
loop {
|
||||
tokio::select! {
|
||||
Ok(request) = receiver.recv_async() => {
|
||||
self.handle_request(request, &mut futures, &mut statuses);
|
||||
self2.handle_request(request, &mut futures, &mut statuses);
|
||||
},
|
||||
Some(response) = futures.next() => {
|
||||
self.handle_response(response, &mut futures, &mut statuses);
|
||||
self2.handle_response(response, &mut futures, &mut statuses);
|
||||
},
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self, futures, statuses))]
|
||||
fn handle_response(
|
||||
&self, response: SendingResult, futures: &mut SendingFutures<'_>, statuses: &mut CurTransactionStatus,
|
||||
) {
|
||||
|
@ -124,6 +119,7 @@ impl Service {
|
|||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self, futures, statuses))]
|
||||
fn handle_request(&self, msg: Msg, futures: &mut SendingFutures<'_>, statuses: &mut CurTransactionStatus) {
|
||||
let iv = vec![(msg.event, msg.queue_id)];
|
||||
if let Ok(Some(events)) = self.select_events(&msg.dest, iv, statuses) {
|
||||
|
@ -135,6 +131,7 @@ impl Service {
|
|||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self, futures, statuses))]
|
||||
fn initial_transactions(&self, futures: &mut SendingFutures<'_>, statuses: &mut CurTransactionStatus) {
|
||||
let keep = usize::try_from(self.startup_netburst_keep).unwrap_or(usize::MAX);
|
||||
let mut txns = HashMap::<Destination, Vec<SendingEvent>>::new();
|
||||
|
|
Loading…
Add table
Reference in a new issue