From 2e2b3793ca37d5937695ce7c8e85e4cb6ce5e586 Mon Sep 17 00:00:00 2001 From: Xiretza Date: Mon, 6 May 2024 17:07:43 +0000 Subject: [PATCH 1/4] service/admin: use &RoomId instead of &OwnedRoomId This is like &str vs &String. --- src/service/admin/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index 8907869e..d67841ad 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -158,7 +158,7 @@ impl Service { } } - async fn handle_event(&self, event: AdminRoomEvent, admin_room: &OwnedRoomId, server_user: &UserId) -> Result<()> { + async fn handle_event(&self, event: AdminRoomEvent, admin_room: &RoomId, server_user: &UserId) -> Result<()> { let (mut message_content, reply) = match event { AdminRoomEvent::SendMessage(content) => (content, None), AdminRoomEvent::ProcessMessage(room_message, reply_id) => { @@ -172,7 +172,7 @@ impl Service { .roomid_mutex_state .write() .await - .entry(admin_room.clone()) + .entry(admin_room.to_owned()) .or_default(), ); let state_lock = mutex_state.lock().await; @@ -207,7 +207,7 @@ impl Service { } async fn handle_response_error( - &self, e: &Error, admin_room: &OwnedRoomId, server_user: &UserId, state_lock: &MutexGuard<'_, ()>, + &self, e: &Error, admin_room: &RoomId, server_user: &UserId, state_lock: &MutexGuard<'_, ()>, ) -> Result<()> { error!("Failed to build and append admin room response PDU: \"{e}\""); let error_room_message = RoomMessageEventContent::text_plain(format!( From 69081a49aaf0b1187f4fb12e436cc8ac05f0a894 Mon Sep 17 00:00:00 2001 From: Xiretza Date: Mon, 6 May 2024 17:10:47 +0000 Subject: [PATCH 2/4] database: remove unnecessary async and select!{} --- src/database/mod.rs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/src/database/mod.rs b/src/database/mod.rs index 4fbc3f97..8dfcc99f 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -388,9 +388,9 @@ impl KeyValueDatabase { services().presence.start_handler(); } - Self::start_cleanup_task().await; + Self::start_cleanup_task(); if services().globals.allow_check_for_updates() { - Self::start_check_for_updates_task().await; + Self::start_check_for_updates_task(); } Ok(()) @@ -422,18 +422,14 @@ impl KeyValueDatabase { } #[tracing::instrument] - async fn start_check_for_updates_task() { + fn start_check_for_updates_task() { let timer_interval = Duration::from_secs(7200); // 2 hours tokio::spawn(async move { let mut i = interval(timer_interval); loop { - tokio::select! { - _ = i.tick() => { - debug!(target: "start_check_for_updates_task", "Timer ticked"); - }, - } + i.tick().await; _ = Self::try_handle_updates().await; } @@ -476,7 +472,7 @@ impl KeyValueDatabase { } #[tracing::instrument] - async fn start_cleanup_task() { + fn start_cleanup_task() { let timer_interval = Duration::from_secs(u64::from(services().globals.config.cleanup_second_interval)); tokio::spawn(async move { From 56edd5b037e0a9890880145e2fef3f2ab2927d9a Mon Sep 17 00:00:00 2001 From: Xiretza Date: Mon, 6 May 2024 17:20:58 +0000 Subject: [PATCH 3/4] Fix spans in tokio::spawn-ed tasks tokio::spawn is a span boundary, the spawned future has no parent span. For short futures, we simply inherit the current span with `.in_current_span()`. For long running futures containing a sleeping infinite loop, we don't actually want a span on the entire task or even the entire loop body, both would result in very long spans. Instead, we put the outermost span (created using #[tracing::instrument] or .instrument()) around the actual work happening after the sleep, which results in a new root span being created after every sleep. --- src/api/client_server/sync.rs | 6 ++-- src/database/mod.rs | 45 +++++++++++++++----------- src/service/admin/mod.rs | 51 +++++++++++++++-------------- src/service/presence/mod.rs | 60 +++++++++++++++++------------------ src/service/sending/sender.rs | 39 +++++++++++------------ 5 files changed, 104 insertions(+), 97 deletions(-) diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs index 647ce905..988cbdf7 100644 --- a/src/api/client_server/sync.rs +++ b/src/api/client_server/sync.rs @@ -96,7 +96,7 @@ pub(crate) async fn sync_events_route( v.insert((body.since.clone(), rx.clone())); - tokio::spawn(sync_helper_wrapper(sender_user.clone(), sender_device.clone(), body, tx)); + tokio::spawn(sync_helper_wrapper(sender_user.clone(), sender_device.clone(), body, tx).in_current_span()); rx }, @@ -108,7 +108,9 @@ pub(crate) async fn sync_events_route( debug!("Sync started for {sender_user}"); - tokio::spawn(sync_helper_wrapper(sender_user.clone(), sender_device.clone(), body, tx)); + tokio::spawn( + sync_helper_wrapper(sender_user.clone(), sender_device.clone(), body, tx).in_current_span(), + ); rx } else { diff --git a/src/database/mod.rs b/src/database/mod.rs index 8dfcc99f..9f51cbb7 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -37,7 +37,7 @@ use serde::Deserialize; #[cfg(unix)] use tokio::signal::unix::{signal, SignalKind}; use tokio::time::{interval, Instant}; -use tracing::{debug, error, warn}; +use tracing::{debug, error, info_span, warn, Instrument as _}; use crate::{ database::migrations::migrations, service::rooms::timeline::PduCount, services, Config, Error, @@ -436,6 +436,7 @@ impl KeyValueDatabase { }); } + #[tracing::instrument] async fn try_handle_updates() -> Result<()> { let response = services() .globals @@ -487,28 +488,34 @@ impl KeyValueDatabase { loop { #[cfg(unix)] - tokio::select! { - _ = i.tick() => { - debug!(target: "database-cleanup", "Timer ticked"); - } - _ = hangup.recv() => { - debug!(target: "database-cleanup","Received SIGHUP"); - } - _ = ctrl_c.recv() => { - debug!(target: "database-cleanup", "Received Ctrl+C"); - } - _ = terminate.recv() => { - debug!(target: "database-cleanup","Received SIGTERM"); - } - } + let msg = tokio::select! { + _ = i.tick() => || { + debug!("Timer ticked"); + }, + _ = hangup.recv() => || { + debug!("Received SIGHUP"); + }, + _ = ctrl_c.recv() => || { + debug!("Received Ctrl+C"); + }, + _ = terminate.recv() => || { + debug!("Received SIGTERM"); + }, + }; #[cfg(not(unix))] - { + let msg = { i.tick().await; - debug!(target: "database-cleanup", "Timer ticked") - } + || debug!("Timer ticked") + }; - Self::perform_cleanup(); + async { + msg(); + + Self::perform_cleanup(); + } + .instrument(info_span!("database_cleanup")) + .await; } }); } diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index d67841ad..ea5c137d 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -24,7 +24,7 @@ use ruma::{ }; use serde_json::value::to_raw_value; use tokio::sync::{Mutex, MutexGuard}; -use tracing::{error, warn}; +use tracing::{error, info_span, warn, Instrument}; use self::{fsck::FsckCommand, tester::TesterCommands}; use super::pdu::PduBuilder; @@ -116,10 +116,32 @@ impl Service { pub(crate) fn start_handler(self: &Arc) { let self2 = Arc::clone(self); tokio::spawn(async move { - self2 - .handler() - .await - .expect("Failed to initialize admin room handler"); + let receiver = self2.receiver.lock().await; + let Ok(Some(admin_room)) = Self::get_admin_room().await else { + return; + }; + let server_name = services().globals.server_name(); + let server_user = UserId::parse(format!("@conduit:{server_name}")).expect("server's username is valid"); + + loop { + debug_assert!(!receiver.is_closed(), "channel closed"); + let event = receiver.recv_async().await; + + async { + let ret = match event { + Ok(event) => self2.handle_event(event, &admin_room, &server_user).await, + Err(e) => { + error!("Failed to receive admin room event from channel: {e}"); + return; + }, + }; + if let Err(e) = ret { + error!("Failed to handle admin room event: {e}"); + } + } + .instrument(info_span!("admin_event_received")) + .await; + } }); } @@ -139,25 +161,6 @@ impl Service { self.sender.send(message).expect("message sent"); } - async fn handler(&self) -> Result<()> { - let receiver = self.receiver.lock().await; - let Ok(Some(admin_room)) = Self::get_admin_room().await else { - return Ok(()); - }; - let server_name = services().globals.server_name(); - let server_user = UserId::parse(format!("@conduit:{server_name}")).expect("server's username is valid"); - - loop { - debug_assert!(!receiver.is_closed(), "channel closed"); - tokio::select! { - event = receiver.recv_async() => match event { - Ok(event) => self.handle_event(event, &admin_room, &server_user).await?, - Err(e) => error!("Failed to receive admin room event from channel: {e}"), - } - } - } - } - async fn handle_event(&self, event: AdminRoomEvent, admin_room: &RoomId, server_user: &UserId) -> Result<()> { let (mut message_content, reply) = match event { AdminRoomEvent::SendMessage(content) => (content, None), diff --git a/src/service/presence/mod.rs b/src/service/presence/mod.rs index d2d1bb39..4ee56cc5 100644 --- a/src/service/presence/mod.rs +++ b/src/service/presence/mod.rs @@ -11,7 +11,7 @@ use ruma::{ }; use serde::{Deserialize, Serialize}; use tokio::{sync::Mutex, time::sleep}; -use tracing::{debug, error}; +use tracing::{debug, error, info_span, Instrument as _}; use crate::{ services, @@ -99,10 +99,34 @@ impl Service { pub(crate) fn start_handler(self: &Arc) { let self_ = Arc::clone(self); tokio::spawn(async move { - self_ - .handler() - .await - .expect("Failed to start presence handler"); + let mut presence_timers = FuturesUnordered::new(); + let receiver = self_.timer_receiver.lock().await; + loop { + tokio::select! { + event = receiver.recv_async() => async { + match event { + Ok((user_id, timeout)) => { + debug!("Adding timer {}: {user_id} timeout:{timeout:?}", presence_timers.len()); + presence_timers.push(presence_timer(user_id, timeout)); + } + Err(e) => { + // generally shouldn't happen + error!("Failed to receive presence timer through channel: {e}"); + } + } + } + .instrument(info_span!("presence_event_received")) + .await, + + Some(user_id) = presence_timers.next() => async { + if let Err(e) = process_presence_timer(&user_id) { + error!(?user_id, "Failed to process presence timer: {e}"); + } + } + .instrument(info_span!("presence_timer_expired")) + .await, + } + } }); } @@ -186,32 +210,6 @@ impl Service { pub(crate) fn presence_since(&self, since: u64) -> Box)>> { self.db.presence_since(since) } - - async fn handler(&self) -> Result<()> { - let mut presence_timers = FuturesUnordered::new(); - let receiver = self.timer_receiver.lock().await; - loop { - tokio::select! { - event = receiver.recv_async() => { - - match event { - Ok((user_id, timeout)) => { - debug!("Adding timer {}: {user_id} timeout:{timeout:?}", presence_timers.len()); - presence_timers.push(presence_timer(user_id, timeout)); - } - Err(e) => { - // generally shouldn't happen - error!("Failed to receive presence timer through channel: {e}"); - } - } - } - - Some(user_id) = presence_timers.next() => { - process_presence_timer(&user_id)?; - } - } - } - } } async fn presence_timer(user_id: OwnedUserId, timeout: Duration) -> OwnedUserId { diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index e7d00234..f02f789b 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -50,30 +50,25 @@ impl Service { pub(crate) fn start_handler(self: &Arc) { let self2 = Arc::clone(self); tokio::spawn(async move { - self2.handler().await; + let receiver = self2.receiver.lock().await; + debug_assert!(!receiver.is_closed(), "channel error"); + let mut futures: SendingFutures<'_> = FuturesUnordered::new(); + let mut statuses: CurTransactionStatus = CurTransactionStatus::new(); + self2.initial_transactions(&mut futures, &mut statuses); + loop { + tokio::select! { + Ok(request) = receiver.recv_async() => { + self2.handle_request(request, &mut futures, &mut statuses); + }, + Some(response) = futures.next() => { + self2.handle_response(response, &mut futures, &mut statuses); + }, + } + } }); } - #[tracing::instrument(skip_all, name = "sender")] - async fn handler(&self) { - let receiver = self.receiver.lock().await; - debug_assert!(!receiver.is_closed(), "channel error"); - - let mut futures: SendingFutures<'_> = FuturesUnordered::new(); - let mut statuses: CurTransactionStatus = CurTransactionStatus::new(); - self.initial_transactions(&mut futures, &mut statuses); - loop { - tokio::select! { - Ok(request) = receiver.recv_async() => { - self.handle_request(request, &mut futures, &mut statuses); - }, - Some(response) = futures.next() => { - self.handle_response(response, &mut futures, &mut statuses); - }, - } - } - } - + #[tracing::instrument(skip(self, futures, statuses))] fn handle_response( &self, response: SendingResult, futures: &mut SendingFutures<'_>, statuses: &mut CurTransactionStatus, ) { @@ -124,6 +119,7 @@ impl Service { } } + #[tracing::instrument(skip(self, futures, statuses))] fn handle_request(&self, msg: Msg, futures: &mut SendingFutures<'_>, statuses: &mut CurTransactionStatus) { let iv = vec![(msg.event, msg.queue_id)]; if let Ok(Some(events)) = self.select_events(&msg.dest, iv, statuses) { @@ -135,6 +131,7 @@ impl Service { } } + #[tracing::instrument(skip(self, futures, statuses))] fn initial_transactions(&self, futures: &mut SendingFutures<'_>, statuses: &mut CurTransactionStatus) { let keep = usize::try_from(self.startup_netburst_keep).unwrap_or(usize::MAX); let mut txns = HashMap::>::new(); From 1e314284709651e9dd6571a0bc033ca304a4decf Mon Sep 17 00:00:00 2001 From: Xiretza Date: Mon, 6 May 2024 17:38:59 +0000 Subject: [PATCH 4/4] Better span traces More information, fewer useless spans. The span on request_spawn() can never emit any events, it only creates a useless empty root span for every request. --- src/api/client_server/sync.rs | 1 + src/database/mod.rs | 3 +-- src/router/mod.rs | 5 ++++- src/service/admin/mod.rs | 1 + src/service/presence/mod.rs | 1 + src/service/rooms/read_receipt/mod.rs | 1 + src/service/rooms/timeline/mod.rs | 1 + src/service/sending/appservice.rs | 1 + src/service/sending/mod.rs | 3 ++- src/service/sending/sender.rs | 6 ++++-- 10 files changed, 17 insertions(+), 6 deletions(-) diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs index 988cbdf7..45aeb82a 100644 --- a/src/api/client_server/sync.rs +++ b/src/api/client_server/sync.rs @@ -138,6 +138,7 @@ pub(crate) async fn sync_events_route( result } +#[tracing::instrument(skip(body, tx))] async fn sync_helper_wrapper( sender_user: OwnedUserId, sender_device: OwnedDeviceId, body: sync_events::v3::Request, tx: Sender>>, diff --git a/src/database/mod.rs b/src/database/mod.rs index 9f51cbb7..a8c7dd32 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -421,7 +421,6 @@ impl KeyValueDatabase { Ok(()) } - #[tracing::instrument] fn start_check_for_updates_task() { let timer_interval = Duration::from_secs(7200); // 2 hours @@ -472,7 +471,6 @@ impl KeyValueDatabase { Ok(()) } - #[tracing::instrument] fn start_cleanup_task() { let timer_interval = Duration::from_secs(u64::from(services().globals.config.cleanup_second_interval)); @@ -520,6 +518,7 @@ impl KeyValueDatabase { }); } + #[tracing::instrument] fn perform_cleanup() { if !services().globals.config.rocksdb_periodic_cleanup { return; diff --git a/src/router/mod.rs b/src/router/mod.rs index 03e8be60..bf07102d 100644 --- a/src/router/mod.rs +++ b/src/router/mod.rs @@ -67,7 +67,10 @@ pub(crate) async fn build(server: &Server) -> io::Result, next: axum::middleware::Next, ) -> Result { diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index ea5c137d..782f8745 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -161,6 +161,7 @@ impl Service { self.sender.send(message).expect("message sent"); } + #[tracing::instrument(skip(self))] async fn handle_event(&self, event: AdminRoomEvent, admin_room: &RoomId, server_user: &UserId) -> Result<()> { let (mut message_content, reply) = match event { AdminRoomEvent::SendMessage(content) => (content, None), diff --git a/src/service/presence/mod.rs b/src/service/presence/mod.rs index 4ee56cc5..5cda6102 100644 --- a/src/service/presence/mod.rs +++ b/src/service/presence/mod.rs @@ -218,6 +218,7 @@ async fn presence_timer(user_id: OwnedUserId, timeout: Duration) -> OwnedUserId user_id } +#[tracing::instrument] fn process_presence_timer(user_id: &OwnedUserId) -> Result<()> { let idle_timeout = services().globals.config.presence_idle_timeout_s * 1_000; let offline_timeout = services().globals.config.presence_offline_timeout_s * 1_000; diff --git a/src/service/rooms/read_receipt/mod.rs b/src/service/rooms/read_receipt/mod.rs index 006204ab..c4d3cf9a 100644 --- a/src/service/rooms/read_receipt/mod.rs +++ b/src/service/rooms/read_receipt/mod.rs @@ -11,6 +11,7 @@ pub(crate) struct Service { impl Service { /// Replaces the previous read receipt. + #[tracing::instrument(skip(self, event))] pub(crate) fn readreceipt_update(&self, user_id: &UserId, room_id: &RoomId, event: ReceiptEvent) -> Result<()> { self.db.readreceipt_update(user_id, room_id, event)?; services().sending.flush_room(room_id)?; diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index ea9f1613..2408a035 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -628,6 +628,7 @@ impl Service { Ok(pdu_id) } + #[tracing::instrument(skip_all)] pub(crate) fn create_hash_and_sign_event( &self, pdu_builder: PduBuilder, diff --git a/src/service/sending/appservice.rs b/src/service/sending/appservice.rs index 4764f889..b14b0bd7 100644 --- a/src/service/sending/appservice.rs +++ b/src/service/sending/appservice.rs @@ -10,6 +10,7 @@ use crate::{debug_error, services, utils, Error, Result}; /// /// Only returns Ok(None) if there is no url specified in the appservice /// registration file +#[tracing::instrument(skip_all, fields(appservice = ®istration.id))] pub(crate) async fn send_request(registration: Registration, request: T) -> Result> where T: OutgoingRequest + Debug, diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index ed01e797..39880328 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -176,7 +176,7 @@ impl Service { Ok(()) } - #[tracing::instrument(skip(self, room_id))] + #[tracing::instrument(skip(self))] pub(crate) fn flush_room(&self, room_id: &RoomId) -> Result<()> { let servers = services() .rooms @@ -234,6 +234,7 @@ impl Service { Ok(()) } + #[tracing::instrument(skip(self))] fn dispatch(&self, msg: Msg) -> Result<()> { debug_assert!(!self.sender.is_full(), "channel full"); debug_assert!(!self.sender.is_closed(), "channel closed"); diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index f02f789b..7adb3d2b 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -26,7 +26,7 @@ use super::{appservice, send, Destination, Msg, SendingEvent, Service}; use crate::{ service::presence::Presence, services, - utils::{calculate_hash, user_id::user_is_local}, + utils::{calculate_hash, debug_slice_truncated, user_id::user_is_local}, Error, PduEvent, Result, }; @@ -78,6 +78,7 @@ impl Service { }; } + #[tracing::instrument(skip(self, _futures, statuses))] fn handle_response_err( &self, dest: Destination, _futures: &mut SendingFutures<'_>, statuses: &mut CurTransactionStatus, e: &Error, ) { @@ -91,6 +92,7 @@ impl Service { }); } + #[tracing::instrument(skip(self, futures, statuses))] fn handle_response_ok( &self, dest: &Destination, futures: &mut SendingFutures<'_>, statuses: &mut CurTransactionStatus, ) { @@ -155,7 +157,7 @@ impl Service { } } - #[tracing::instrument(skip(self, dest, new_events, statuses))] + #[tracing::instrument(skip(self, dest, statuses), fields(new_events = debug_slice_truncated(&new_events, 3)))] fn select_events( &self, dest: &Destination,