mirror of
https://forgejo.ellis.link/continuwuation/continuwuity.git
synced 2025-07-09 16:06:41 +02:00
Merge 1e31428470
into 4389e08686
This commit is contained in:
commit
0efabeb5d5
10 changed files with 129 additions and 115 deletions
|
@ -96,7 +96,7 @@ pub(crate) async fn sync_events_route(
|
||||||
|
|
||||||
v.insert((body.since.clone(), rx.clone()));
|
v.insert((body.since.clone(), rx.clone()));
|
||||||
|
|
||||||
tokio::spawn(sync_helper_wrapper(sender_user.clone(), sender_device.clone(), body, tx));
|
tokio::spawn(sync_helper_wrapper(sender_user.clone(), sender_device.clone(), body, tx).in_current_span());
|
||||||
|
|
||||||
rx
|
rx
|
||||||
},
|
},
|
||||||
|
@ -108,7 +108,9 @@ pub(crate) async fn sync_events_route(
|
||||||
|
|
||||||
debug!("Sync started for {sender_user}");
|
debug!("Sync started for {sender_user}");
|
||||||
|
|
||||||
tokio::spawn(sync_helper_wrapper(sender_user.clone(), sender_device.clone(), body, tx));
|
tokio::spawn(
|
||||||
|
sync_helper_wrapper(sender_user.clone(), sender_device.clone(), body, tx).in_current_span(),
|
||||||
|
);
|
||||||
|
|
||||||
rx
|
rx
|
||||||
} else {
|
} else {
|
||||||
|
@ -136,6 +138,7 @@ pub(crate) async fn sync_events_route(
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip(body, tx))]
|
||||||
async fn sync_helper_wrapper(
|
async fn sync_helper_wrapper(
|
||||||
sender_user: OwnedUserId, sender_device: OwnedDeviceId, body: sync_events::v3::Request,
|
sender_user: OwnedUserId, sender_device: OwnedDeviceId, body: sync_events::v3::Request,
|
||||||
tx: Sender<Option<Result<sync_events::v3::Response>>>,
|
tx: Sender<Option<Result<sync_events::v3::Response>>>,
|
||||||
|
|
|
@ -37,7 +37,7 @@ use serde::Deserialize;
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
use tokio::signal::unix::{signal, SignalKind};
|
use tokio::signal::unix::{signal, SignalKind};
|
||||||
use tokio::time::{interval, Instant};
|
use tokio::time::{interval, Instant};
|
||||||
use tracing::{debug, error, warn};
|
use tracing::{debug, error, info_span, warn, Instrument as _};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
database::migrations::migrations, service::rooms::timeline::PduCount, services, Config, Error,
|
database::migrations::migrations, service::rooms::timeline::PduCount, services, Config, Error,
|
||||||
|
@ -388,9 +388,9 @@ impl KeyValueDatabase {
|
||||||
services().presence.start_handler();
|
services().presence.start_handler();
|
||||||
}
|
}
|
||||||
|
|
||||||
Self::start_cleanup_task().await;
|
Self::start_cleanup_task();
|
||||||
if services().globals.allow_check_for_updates() {
|
if services().globals.allow_check_for_updates() {
|
||||||
Self::start_check_for_updates_task().await;
|
Self::start_check_for_updates_task();
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -421,25 +421,21 @@ impl KeyValueDatabase {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument]
|
fn start_check_for_updates_task() {
|
||||||
async fn start_check_for_updates_task() {
|
|
||||||
let timer_interval = Duration::from_secs(7200); // 2 hours
|
let timer_interval = Duration::from_secs(7200); // 2 hours
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let mut i = interval(timer_interval);
|
let mut i = interval(timer_interval);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
i.tick().await;
|
||||||
_ = i.tick() => {
|
|
||||||
debug!(target: "start_check_for_updates_task", "Timer ticked");
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
_ = Self::try_handle_updates().await;
|
_ = Self::try_handle_updates().await;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument]
|
||||||
async fn try_handle_updates() -> Result<()> {
|
async fn try_handle_updates() -> Result<()> {
|
||||||
let response = services()
|
let response = services()
|
||||||
.globals
|
.globals
|
||||||
|
@ -475,8 +471,7 @@ impl KeyValueDatabase {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument]
|
fn start_cleanup_task() {
|
||||||
async fn start_cleanup_task() {
|
|
||||||
let timer_interval = Duration::from_secs(u64::from(services().globals.config.cleanup_second_interval));
|
let timer_interval = Duration::from_secs(u64::from(services().globals.config.cleanup_second_interval));
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
@ -491,32 +486,39 @@ impl KeyValueDatabase {
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
tokio::select! {
|
let msg = tokio::select! {
|
||||||
_ = i.tick() => {
|
_ = i.tick() => || {
|
||||||
debug!(target: "database-cleanup", "Timer ticked");
|
debug!("Timer ticked");
|
||||||
}
|
},
|
||||||
_ = hangup.recv() => {
|
_ = hangup.recv() => || {
|
||||||
debug!(target: "database-cleanup","Received SIGHUP");
|
debug!("Received SIGHUP");
|
||||||
}
|
},
|
||||||
_ = ctrl_c.recv() => {
|
_ = ctrl_c.recv() => || {
|
||||||
debug!(target: "database-cleanup", "Received Ctrl+C");
|
debug!("Received Ctrl+C");
|
||||||
}
|
},
|
||||||
_ = terminate.recv() => {
|
_ = terminate.recv() => || {
|
||||||
debug!(target: "database-cleanup","Received SIGTERM");
|
debug!("Received SIGTERM");
|
||||||
}
|
},
|
||||||
}
|
};
|
||||||
|
|
||||||
#[cfg(not(unix))]
|
#[cfg(not(unix))]
|
||||||
{
|
let msg = {
|
||||||
i.tick().await;
|
i.tick().await;
|
||||||
debug!(target: "database-cleanup", "Timer ticked")
|
|| debug!("Timer ticked")
|
||||||
}
|
};
|
||||||
|
|
||||||
Self::perform_cleanup();
|
async {
|
||||||
|
msg();
|
||||||
|
|
||||||
|
Self::perform_cleanup();
|
||||||
|
}
|
||||||
|
.instrument(info_span!("database_cleanup"))
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument]
|
||||||
fn perform_cleanup() {
|
fn perform_cleanup() {
|
||||||
if !services().globals.config.rocksdb_periodic_cleanup {
|
if !services().globals.config.rocksdb_periodic_cleanup {
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -98,7 +98,10 @@ pub(crate) async fn build(server: &Server) -> io::Result<axum::routing::IntoMake
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip_all, name = "spawn")]
|
/// Ensures the request runs in a new tokio thread.
|
||||||
|
///
|
||||||
|
/// The axum request handler task gets cancelled if the connection is shut down;
|
||||||
|
/// by spawning our own task, processing continue after the client disconnects.
|
||||||
async fn request_spawn(
|
async fn request_spawn(
|
||||||
req: http::Request<axum::body::Body>, next: axum::middleware::Next,
|
req: http::Request<axum::body::Body>, next: axum::middleware::Next,
|
||||||
) -> Result<axum::response::Response, StatusCode> {
|
) -> Result<axum::response::Response, StatusCode> {
|
||||||
|
|
|
@ -24,7 +24,7 @@ use ruma::{
|
||||||
};
|
};
|
||||||
use serde_json::value::to_raw_value;
|
use serde_json::value::to_raw_value;
|
||||||
use tokio::sync::{Mutex, MutexGuard};
|
use tokio::sync::{Mutex, MutexGuard};
|
||||||
use tracing::{error, warn};
|
use tracing::{error, info_span, warn, Instrument};
|
||||||
|
|
||||||
use self::{fsck::FsckCommand, tester::TesterCommands};
|
use self::{fsck::FsckCommand, tester::TesterCommands};
|
||||||
use super::pdu::PduBuilder;
|
use super::pdu::PduBuilder;
|
||||||
|
@ -116,10 +116,32 @@ impl Service {
|
||||||
pub(crate) fn start_handler(self: &Arc<Self>) {
|
pub(crate) fn start_handler(self: &Arc<Self>) {
|
||||||
let self2 = Arc::clone(self);
|
let self2 = Arc::clone(self);
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
self2
|
let receiver = self2.receiver.lock().await;
|
||||||
.handler()
|
let Ok(Some(admin_room)) = Self::get_admin_room().await else {
|
||||||
.await
|
return;
|
||||||
.expect("Failed to initialize admin room handler");
|
};
|
||||||
|
let server_name = services().globals.server_name();
|
||||||
|
let server_user = UserId::parse(format!("@conduit:{server_name}")).expect("server's username is valid");
|
||||||
|
|
||||||
|
loop {
|
||||||
|
debug_assert!(!receiver.is_closed(), "channel closed");
|
||||||
|
let event = receiver.recv_async().await;
|
||||||
|
|
||||||
|
async {
|
||||||
|
let ret = match event {
|
||||||
|
Ok(event) => self2.handle_event(event, &admin_room, &server_user).await,
|
||||||
|
Err(e) => {
|
||||||
|
error!("Failed to receive admin room event from channel: {e}");
|
||||||
|
return;
|
||||||
|
},
|
||||||
|
};
|
||||||
|
if let Err(e) = ret {
|
||||||
|
error!("Failed to handle admin room event: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
.instrument(info_span!("admin_event_received"))
|
||||||
|
.await;
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -139,26 +161,8 @@ impl Service {
|
||||||
self.sender.send(message).expect("message sent");
|
self.sender.send(message).expect("message sent");
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handler(&self) -> Result<()> {
|
#[tracing::instrument(skip(self))]
|
||||||
let receiver = self.receiver.lock().await;
|
async fn handle_event(&self, event: AdminRoomEvent, admin_room: &RoomId, server_user: &UserId) -> Result<()> {
|
||||||
let Ok(Some(admin_room)) = Self::get_admin_room().await else {
|
|
||||||
return Ok(());
|
|
||||||
};
|
|
||||||
let server_name = services().globals.server_name();
|
|
||||||
let server_user = UserId::parse(format!("@conduit:{server_name}")).expect("server's username is valid");
|
|
||||||
|
|
||||||
loop {
|
|
||||||
debug_assert!(!receiver.is_closed(), "channel closed");
|
|
||||||
tokio::select! {
|
|
||||||
event = receiver.recv_async() => match event {
|
|
||||||
Ok(event) => self.handle_event(event, &admin_room, &server_user).await?,
|
|
||||||
Err(e) => error!("Failed to receive admin room event from channel: {e}"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_event(&self, event: AdminRoomEvent, admin_room: &OwnedRoomId, server_user: &UserId) -> Result<()> {
|
|
||||||
let (mut message_content, reply) = match event {
|
let (mut message_content, reply) = match event {
|
||||||
AdminRoomEvent::SendMessage(content) => (content, None),
|
AdminRoomEvent::SendMessage(content) => (content, None),
|
||||||
AdminRoomEvent::ProcessMessage(room_message, reply_id) => {
|
AdminRoomEvent::ProcessMessage(room_message, reply_id) => {
|
||||||
|
@ -172,7 +176,7 @@ impl Service {
|
||||||
.roomid_mutex_state
|
.roomid_mutex_state
|
||||||
.write()
|
.write()
|
||||||
.await
|
.await
|
||||||
.entry(admin_room.clone())
|
.entry(admin_room.to_owned())
|
||||||
.or_default(),
|
.or_default(),
|
||||||
);
|
);
|
||||||
let state_lock = mutex_state.lock().await;
|
let state_lock = mutex_state.lock().await;
|
||||||
|
@ -207,7 +211,7 @@ impl Service {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_response_error(
|
async fn handle_response_error(
|
||||||
&self, e: &Error, admin_room: &OwnedRoomId, server_user: &UserId, state_lock: &MutexGuard<'_, ()>,
|
&self, e: &Error, admin_room: &RoomId, server_user: &UserId, state_lock: &MutexGuard<'_, ()>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
error!("Failed to build and append admin room response PDU: \"{e}\"");
|
error!("Failed to build and append admin room response PDU: \"{e}\"");
|
||||||
let error_room_message = RoomMessageEventContent::text_plain(format!(
|
let error_room_message = RoomMessageEventContent::text_plain(format!(
|
||||||
|
|
|
@ -11,7 +11,7 @@ use ruma::{
|
||||||
};
|
};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use tokio::{sync::Mutex, time::sleep};
|
use tokio::{sync::Mutex, time::sleep};
|
||||||
use tracing::{debug, error};
|
use tracing::{debug, error, info_span, Instrument as _};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
services,
|
services,
|
||||||
|
@ -99,10 +99,34 @@ impl Service {
|
||||||
pub(crate) fn start_handler(self: &Arc<Self>) {
|
pub(crate) fn start_handler(self: &Arc<Self>) {
|
||||||
let self_ = Arc::clone(self);
|
let self_ = Arc::clone(self);
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
self_
|
let mut presence_timers = FuturesUnordered::new();
|
||||||
.handler()
|
let receiver = self_.timer_receiver.lock().await;
|
||||||
.await
|
loop {
|
||||||
.expect("Failed to start presence handler");
|
tokio::select! {
|
||||||
|
event = receiver.recv_async() => async {
|
||||||
|
match event {
|
||||||
|
Ok((user_id, timeout)) => {
|
||||||
|
debug!("Adding timer {}: {user_id} timeout:{timeout:?}", presence_timers.len());
|
||||||
|
presence_timers.push(presence_timer(user_id, timeout));
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
// generally shouldn't happen
|
||||||
|
error!("Failed to receive presence timer through channel: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
.instrument(info_span!("presence_event_received"))
|
||||||
|
.await,
|
||||||
|
|
||||||
|
Some(user_id) = presence_timers.next() => async {
|
||||||
|
if let Err(e) = process_presence_timer(&user_id) {
|
||||||
|
error!(?user_id, "Failed to process presence timer: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
.instrument(info_span!("presence_timer_expired"))
|
||||||
|
.await,
|
||||||
|
}
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -186,32 +210,6 @@ impl Service {
|
||||||
pub(crate) fn presence_since(&self, since: u64) -> Box<dyn Iterator<Item = (OwnedUserId, u64, Vec<u8>)>> {
|
pub(crate) fn presence_since(&self, since: u64) -> Box<dyn Iterator<Item = (OwnedUserId, u64, Vec<u8>)>> {
|
||||||
self.db.presence_since(since)
|
self.db.presence_since(since)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handler(&self) -> Result<()> {
|
|
||||||
let mut presence_timers = FuturesUnordered::new();
|
|
||||||
let receiver = self.timer_receiver.lock().await;
|
|
||||||
loop {
|
|
||||||
tokio::select! {
|
|
||||||
event = receiver.recv_async() => {
|
|
||||||
|
|
||||||
match event {
|
|
||||||
Ok((user_id, timeout)) => {
|
|
||||||
debug!("Adding timer {}: {user_id} timeout:{timeout:?}", presence_timers.len());
|
|
||||||
presence_timers.push(presence_timer(user_id, timeout));
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
// generally shouldn't happen
|
|
||||||
error!("Failed to receive presence timer through channel: {e}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Some(user_id) = presence_timers.next() => {
|
|
||||||
process_presence_timer(&user_id)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn presence_timer(user_id: OwnedUserId, timeout: Duration) -> OwnedUserId {
|
async fn presence_timer(user_id: OwnedUserId, timeout: Duration) -> OwnedUserId {
|
||||||
|
@ -220,6 +218,7 @@ async fn presence_timer(user_id: OwnedUserId, timeout: Duration) -> OwnedUserId
|
||||||
user_id
|
user_id
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument]
|
||||||
fn process_presence_timer(user_id: &OwnedUserId) -> Result<()> {
|
fn process_presence_timer(user_id: &OwnedUserId) -> Result<()> {
|
||||||
let idle_timeout = services().globals.config.presence_idle_timeout_s * 1_000;
|
let idle_timeout = services().globals.config.presence_idle_timeout_s * 1_000;
|
||||||
let offline_timeout = services().globals.config.presence_offline_timeout_s * 1_000;
|
let offline_timeout = services().globals.config.presence_offline_timeout_s * 1_000;
|
||||||
|
|
|
@ -11,6 +11,7 @@ pub(crate) struct Service {
|
||||||
|
|
||||||
impl Service {
|
impl Service {
|
||||||
/// Replaces the previous read receipt.
|
/// Replaces the previous read receipt.
|
||||||
|
#[tracing::instrument(skip(self, event))]
|
||||||
pub(crate) fn readreceipt_update(&self, user_id: &UserId, room_id: &RoomId, event: ReceiptEvent) -> Result<()> {
|
pub(crate) fn readreceipt_update(&self, user_id: &UserId, room_id: &RoomId, event: ReceiptEvent) -> Result<()> {
|
||||||
self.db.readreceipt_update(user_id, room_id, event)?;
|
self.db.readreceipt_update(user_id, room_id, event)?;
|
||||||
services().sending.flush_room(room_id)?;
|
services().sending.flush_room(room_id)?;
|
||||||
|
|
|
@ -628,6 +628,7 @@ impl Service {
|
||||||
Ok(pdu_id)
|
Ok(pdu_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip_all)]
|
||||||
pub(crate) fn create_hash_and_sign_event(
|
pub(crate) fn create_hash_and_sign_event(
|
||||||
&self,
|
&self,
|
||||||
pdu_builder: PduBuilder,
|
pdu_builder: PduBuilder,
|
||||||
|
|
|
@ -10,6 +10,7 @@ use crate::{debug_error, services, utils, Error, Result};
|
||||||
///
|
///
|
||||||
/// Only returns Ok(None) if there is no url specified in the appservice
|
/// Only returns Ok(None) if there is no url specified in the appservice
|
||||||
/// registration file
|
/// registration file
|
||||||
|
#[tracing::instrument(skip_all, fields(appservice = ®istration.id))]
|
||||||
pub(crate) async fn send_request<T>(registration: Registration, request: T) -> Result<Option<T::IncomingResponse>>
|
pub(crate) async fn send_request<T>(registration: Registration, request: T) -> Result<Option<T::IncomingResponse>>
|
||||||
where
|
where
|
||||||
T: OutgoingRequest + Debug,
|
T: OutgoingRequest + Debug,
|
||||||
|
|
|
@ -176,7 +176,7 @@ impl Service {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(self, room_id))]
|
#[tracing::instrument(skip(self))]
|
||||||
pub(crate) fn flush_room(&self, room_id: &RoomId) -> Result<()> {
|
pub(crate) fn flush_room(&self, room_id: &RoomId) -> Result<()> {
|
||||||
let servers = services()
|
let servers = services()
|
||||||
.rooms
|
.rooms
|
||||||
|
@ -234,6 +234,7 @@ impl Service {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip(self))]
|
||||||
fn dispatch(&self, msg: Msg) -> Result<()> {
|
fn dispatch(&self, msg: Msg) -> Result<()> {
|
||||||
debug_assert!(!self.sender.is_full(), "channel full");
|
debug_assert!(!self.sender.is_full(), "channel full");
|
||||||
debug_assert!(!self.sender.is_closed(), "channel closed");
|
debug_assert!(!self.sender.is_closed(), "channel closed");
|
||||||
|
|
|
@ -26,7 +26,7 @@ use super::{appservice, send, Destination, Msg, SendingEvent, Service};
|
||||||
use crate::{
|
use crate::{
|
||||||
service::presence::Presence,
|
service::presence::Presence,
|
||||||
services,
|
services,
|
||||||
utils::{calculate_hash, user_id::user_is_local},
|
utils::{calculate_hash, debug_slice_truncated, user_id::user_is_local},
|
||||||
Error, PduEvent, Result,
|
Error, PduEvent, Result,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -50,30 +50,25 @@ impl Service {
|
||||||
pub(crate) fn start_handler(self: &Arc<Self>) {
|
pub(crate) fn start_handler(self: &Arc<Self>) {
|
||||||
let self2 = Arc::clone(self);
|
let self2 = Arc::clone(self);
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
self2.handler().await;
|
let receiver = self2.receiver.lock().await;
|
||||||
|
debug_assert!(!receiver.is_closed(), "channel error");
|
||||||
|
let mut futures: SendingFutures<'_> = FuturesUnordered::new();
|
||||||
|
let mut statuses: CurTransactionStatus = CurTransactionStatus::new();
|
||||||
|
self2.initial_transactions(&mut futures, &mut statuses);
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
Ok(request) = receiver.recv_async() => {
|
||||||
|
self2.handle_request(request, &mut futures, &mut statuses);
|
||||||
|
},
|
||||||
|
Some(response) = futures.next() => {
|
||||||
|
self2.handle_response(response, &mut futures, &mut statuses);
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip_all, name = "sender")]
|
#[tracing::instrument(skip(self, futures, statuses))]
|
||||||
async fn handler(&self) {
|
|
||||||
let receiver = self.receiver.lock().await;
|
|
||||||
debug_assert!(!receiver.is_closed(), "channel error");
|
|
||||||
|
|
||||||
let mut futures: SendingFutures<'_> = FuturesUnordered::new();
|
|
||||||
let mut statuses: CurTransactionStatus = CurTransactionStatus::new();
|
|
||||||
self.initial_transactions(&mut futures, &mut statuses);
|
|
||||||
loop {
|
|
||||||
tokio::select! {
|
|
||||||
Ok(request) = receiver.recv_async() => {
|
|
||||||
self.handle_request(request, &mut futures, &mut statuses);
|
|
||||||
},
|
|
||||||
Some(response) = futures.next() => {
|
|
||||||
self.handle_response(response, &mut futures, &mut statuses);
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn handle_response(
|
fn handle_response(
|
||||||
&self, response: SendingResult, futures: &mut SendingFutures<'_>, statuses: &mut CurTransactionStatus,
|
&self, response: SendingResult, futures: &mut SendingFutures<'_>, statuses: &mut CurTransactionStatus,
|
||||||
) {
|
) {
|
||||||
|
@ -83,6 +78,7 @@ impl Service {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip(self, _futures, statuses))]
|
||||||
fn handle_response_err(
|
fn handle_response_err(
|
||||||
&self, dest: Destination, _futures: &mut SendingFutures<'_>, statuses: &mut CurTransactionStatus, e: &Error,
|
&self, dest: Destination, _futures: &mut SendingFutures<'_>, statuses: &mut CurTransactionStatus, e: &Error,
|
||||||
) {
|
) {
|
||||||
|
@ -96,6 +92,7 @@ impl Service {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip(self, futures, statuses))]
|
||||||
fn handle_response_ok(
|
fn handle_response_ok(
|
||||||
&self, dest: &Destination, futures: &mut SendingFutures<'_>, statuses: &mut CurTransactionStatus,
|
&self, dest: &Destination, futures: &mut SendingFutures<'_>, statuses: &mut CurTransactionStatus,
|
||||||
) {
|
) {
|
||||||
|
@ -124,6 +121,7 @@ impl Service {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip(self, futures, statuses))]
|
||||||
fn handle_request(&self, msg: Msg, futures: &mut SendingFutures<'_>, statuses: &mut CurTransactionStatus) {
|
fn handle_request(&self, msg: Msg, futures: &mut SendingFutures<'_>, statuses: &mut CurTransactionStatus) {
|
||||||
let iv = vec![(msg.event, msg.queue_id)];
|
let iv = vec![(msg.event, msg.queue_id)];
|
||||||
if let Ok(Some(events)) = self.select_events(&msg.dest, iv, statuses) {
|
if let Ok(Some(events)) = self.select_events(&msg.dest, iv, statuses) {
|
||||||
|
@ -135,6 +133,7 @@ impl Service {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip(self, futures, statuses))]
|
||||||
fn initial_transactions(&self, futures: &mut SendingFutures<'_>, statuses: &mut CurTransactionStatus) {
|
fn initial_transactions(&self, futures: &mut SendingFutures<'_>, statuses: &mut CurTransactionStatus) {
|
||||||
let keep = usize::try_from(self.startup_netburst_keep).unwrap_or(usize::MAX);
|
let keep = usize::try_from(self.startup_netburst_keep).unwrap_or(usize::MAX);
|
||||||
let mut txns = HashMap::<Destination, Vec<SendingEvent>>::new();
|
let mut txns = HashMap::<Destination, Vec<SendingEvent>>::new();
|
||||||
|
@ -158,7 +157,7 @@ impl Service {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(self, dest, new_events, statuses))]
|
#[tracing::instrument(skip(self, dest, statuses), fields(new_events = debug_slice_truncated(&new_events, 3)))]
|
||||||
fn select_events(
|
fn select_events(
|
||||||
&self,
|
&self,
|
||||||
dest: &Destination,
|
dest: &Destination,
|
||||||
|
|
Loading…
Add table
Reference in a new issue