apply new rustfmt.toml changes, fix some clippy lints

Signed-off-by: strawberry <strawberry@puppygock.gay>
This commit is contained in:
strawberry 2024-12-15 00:05:47 -05:00
commit 77e0b76408
No known key found for this signature in database
296 changed files with 7147 additions and 4300 deletions

View file

@ -3,14 +3,18 @@ use std::{fmt::Debug, mem};
use bytes::BytesMut;
use conduwuit::{debug_error, err, trace, utils, warn, Err, Result};
use reqwest::Client;
use ruma::api::{appservice::Registration, IncomingResponse, MatrixVersion, OutgoingRequest, SendAccessToken};
use ruma::api::{
appservice::Registration, IncomingResponse, MatrixVersion, OutgoingRequest, SendAccessToken,
};
/// Sends a request to an appservice
///
/// Only returns Ok(None) if there is no url specified in the appservice
/// registration file
pub(crate) async fn send_request<T>(
client: &Client, registration: Registration, request: T,
client: &Client,
registration: Registration,
request: T,
) -> Result<Option<T::IncomingResponse>>
where
T: OutgoingRequest + Debug + Send,
@ -25,17 +29,17 @@ where
let hs_token = registration.hs_token.as_str();
let mut http_request = request
.try_into_http_request::<BytesMut>(&dest, SendAccessToken::IfRequired(hs_token), &VERSIONS)
.try_into_http_request::<BytesMut>(
&dest,
SendAccessToken::IfRequired(hs_token),
&VERSIONS,
)
.map_err(|e| err!(BadServerResponse(warn!("Failed to find destination {dest}: {e}"))))?
.map(BytesMut::freeze);
let mut parts = http_request.uri().clone().into_parts();
let old_path_and_query = parts.path_and_query.unwrap().as_str().to_owned();
let symbol = if old_path_and_query.contains('?') {
"&"
} else {
"?"
};
let symbol = if old_path_and_query.contains('?') { "&" } else { "?" };
parts.path_and_query = Some(
(old_path_and_query + symbol + "access_token=" + hs_token)

View file

@ -43,7 +43,9 @@ impl Data {
}
}
pub(super) fn delete_active_request(&self, key: &[u8]) { self.servercurrentevent_data.remove(key); }
pub(super) fn delete_active_request(&self, key: &[u8]) {
self.servercurrentevent_data.remove(key);
}
pub(super) async fn delete_all_active_requests_for(&self, destination: &Destination) {
let prefix = destination.get_prefix();
@ -76,11 +78,7 @@ impl Data {
events
.filter(|(key, _)| !key.is_empty())
.for_each(|(key, val)| {
let val = if let SendingEvent::Edu(val) = &val {
&**val
} else {
&[]
};
let val = if let SendingEvent::Edu(val) = &val { &**val } else { &[] };
self.servercurrentevent_data.insert(key, val);
self.servernameevent_data.remove(key);
@ -93,21 +91,26 @@ impl Data {
.raw_stream()
.ignore_err()
.map(|(key, val)| {
let (dest, event) = parse_servercurrentevent(key, val).expect("invalid servercurrentevent");
let (dest, event) =
parse_servercurrentevent(key, val).expect("invalid servercurrentevent");
(key.to_vec(), event, dest)
})
}
#[inline]
pub fn active_requests_for(&self, destination: &Destination) -> impl Stream<Item = SendingItem> + Send + '_ {
pub fn active_requests_for(
&self,
destination: &Destination,
) -> impl Stream<Item = SendingItem> + Send + '_ {
let prefix = destination.get_prefix();
self.servercurrentevent_data
.raw_stream_from(&prefix)
.ignore_err()
.ready_take_while(move |(key, _)| key.starts_with(&prefix))
.map(|(key, val)| {
let (_, event) = parse_servercurrentevent(key, val).expect("invalid servercurrentevent");
let (_, event) =
parse_servercurrentevent(key, val).expect("invalid servercurrentevent");
(key.to_vec(), event)
})
@ -150,14 +153,18 @@ impl Data {
keys
}
pub fn queued_requests(&self, destination: &Destination) -> impl Stream<Item = QueueItem> + Send + '_ {
pub fn queued_requests(
&self,
destination: &Destination,
) -> impl Stream<Item = QueueItem> + Send + '_ {
let prefix = destination.get_prefix();
self.servernameevent_data
.raw_stream_from(&prefix)
.ignore_err()
.ready_take_while(move |(key, _)| key.starts_with(&prefix))
.map(|(key, val)| {
let (_, event) = parse_servercurrentevent(key, val).expect("invalid servercurrentevent");
let (_, event) =
parse_servercurrentevent(key, val).expect("invalid servercurrentevent");
(key.to_vec(), event)
})
@ -186,8 +193,9 @@ fn parse_servercurrentevent(key: &[u8], value: &[u8]) -> Result<(Destination, Se
.next()
.ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?;
let server = utils::string_from_bytes(server)
.map_err(|_| Error::bad_database("Invalid server bytes in server_currenttransaction"))?;
let server = utils::string_from_bytes(server).map_err(|_| {
Error::bad_database("Invalid server bytes in server_currenttransaction")
})?;
(
Destination::Appservice(server),
@ -203,8 +211,8 @@ fn parse_servercurrentevent(key: &[u8], value: &[u8]) -> Result<(Destination, Se
let user = parts.next().expect("splitn always returns one element");
let user_string = utils::string_from_bytes(user)
.map_err(|_| Error::bad_database("Invalid user string in servercurrentevent"))?;
let user_id =
UserId::parse(user_string).map_err(|_| Error::bad_database("Invalid user id in servercurrentevent"))?;
let user_id = UserId::parse(user_string)
.map_err(|_| Error::bad_database("Invalid user id in servercurrentevent"))?;
let pushkey = parts
.next()
@ -233,14 +241,14 @@ fn parse_servercurrentevent(key: &[u8], value: &[u8]) -> Result<(Destination, Se
.next()
.ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?;
let server = utils::string_from_bytes(server)
.map_err(|_| Error::bad_database("Invalid server bytes in server_currenttransaction"))?;
let server = utils::string_from_bytes(server).map_err(|_| {
Error::bad_database("Invalid server bytes in server_currenttransaction")
})?;
(
Destination::Normal(
ServerName::parse(server)
.map_err(|_| Error::bad_database("Invalid server string in server_currenttransaction"))?,
),
Destination::Normal(ServerName::parse(server).map_err(|_| {
Error::bad_database("Invalid server string in server_currenttransaction")
})?),
if value.is_empty() {
SendingEvent::Pdu(event.into())
} else {

View file

@ -14,7 +14,7 @@ pub enum Destination {
#[must_use]
pub(super) fn get_prefix(&self) -> Vec<u8> {
match self {
Self::Normal(server) => {
| Self::Normal(server) => {
let len = server.as_bytes().len().saturating_add(1);
let mut p = Vec::with_capacity(len);
@ -22,7 +22,7 @@ pub(super) fn get_prefix(&self) -> Vec<u8> {
p.push(0xFF);
p
},
Self::Appservice(server) => {
| Self::Appservice(server) => {
let sigil = b"+";
let len = sigil.len().saturating_add(server.len()).saturating_add(1);
@ -32,7 +32,7 @@ pub(super) fn get_prefix(&self) -> Vec<u8> {
p.push(0xFF);
p
},
Self::Push(user, pushkey) => {
| Self::Push(user, pushkey) => {
let sigil = b"$";
let len = sigil
.len()

View file

@ -25,8 +25,8 @@ pub use self::{
sender::{EDU_LIMIT, PDU_LIMIT},
};
use crate::{
account_data, client, globals, presence, pusher, resolver, rooms, rooms::timeline::RawPduId, server_keys, users,
Dep,
account_data, client, globals, presence, pusher, resolver, rooms, rooms::timeline::RawPduId,
server_keys, users, Dep,
};
pub struct Service {
@ -156,18 +156,16 @@ impl Service {
{
let _cork = self.db.db.cork();
let requests = servers
.map(|server| (Destination::Normal(server.into()), SendingEvent::Pdu(pdu_id.to_owned())))
.map(|server| {
(Destination::Normal(server.into()), SendingEvent::Pdu(pdu_id.to_owned()))
})
.collect::<Vec<_>>()
.await;
let keys = self.db.queue_requests(requests.iter().map(|(o, e)| (e, o)));
for ((dest, event), queue_id) in requests.into_iter().zip(keys) {
self.dispatch(Msg {
dest,
event,
queue_id,
})?;
self.dispatch(Msg { dest, event, queue_id })?;
}
Ok(())
@ -204,18 +202,16 @@ impl Service {
{
let _cork = self.db.db.cork();
let requests = servers
.map(|server| (Destination::Normal(server.to_owned()), SendingEvent::Edu(serialized.clone())))
.map(|server| {
(Destination::Normal(server.to_owned()), SendingEvent::Edu(serialized.clone()))
})
.collect::<Vec<_>>()
.await;
let keys = self.db.queue_requests(requests.iter().map(|(o, e)| (e, o)));
for ((dest, event), queue_id) in requests.into_iter().zip(keys) {
self.dispatch(Msg {
dest,
event,
queue_id,
})?;
self.dispatch(Msg { dest, event, queue_id })?;
}
Ok(())
@ -253,7 +249,11 @@ impl Service {
/// Sends a request to a federation server
#[tracing::instrument(skip_all, name = "request")]
pub async fn send_federation_request<T>(&self, dest: &ServerName, request: T) -> Result<T::IncomingResponse>
pub async fn send_federation_request<T>(
&self,
dest: &ServerName,
request: T,
) -> Result<T::IncomingResponse>
where
T: OutgoingRequest + Debug + Send,
{
@ -263,7 +263,11 @@ impl Service {
/// Like send_federation_request() but with a very large timeout
#[tracing::instrument(skip_all, name = "synapse")]
pub async fn send_synapse_request<T>(&self, dest: &ServerName, request: T) -> Result<T::IncomingResponse>
pub async fn send_synapse_request<T>(
&self,
dest: &ServerName,
request: T,
) -> Result<T::IncomingResponse>
where
T: OutgoingRequest + Debug + Send,
{
@ -276,7 +280,9 @@ impl Service {
/// Only returns None if there is no url specified in the appservice
/// registration file
pub async fn send_appservice_request<T>(
&self, registration: Registration, request: T,
&self,
registration: Registration,
request: T,
) -> Result<Option<T::IncomingResponse>>
where
T: OutgoingRequest + Debug + Send,
@ -291,24 +297,30 @@ impl Service {
/// key
#[tracing::instrument(skip(self), level = "debug")]
pub async fn cleanup_events(
&self, appservice_id: Option<&str>, user_id: Option<&UserId>, push_key: Option<&str>,
&self,
appservice_id: Option<&str>,
user_id: Option<&UserId>,
push_key: Option<&str>,
) -> Result {
match (appservice_id, user_id, push_key) {
(None, Some(user_id), Some(push_key)) => {
| (None, Some(user_id), Some(push_key)) => {
self.db
.delete_all_requests_for(&Destination::Push(user_id.to_owned(), push_key.to_owned()))
.delete_all_requests_for(&Destination::Push(
user_id.to_owned(),
push_key.to_owned(),
))
.await;
Ok(())
},
(Some(appservice_id), None, None) => {
| (Some(appservice_id), None, None) => {
self.db
.delete_all_requests_for(&Destination::Appservice(appservice_id.to_owned()))
.await;
Ok(())
},
_ => {
| _ => {
debug_warn!("cleanup_events called with too many or too few arguments");
Ok(())
},

View file

@ -2,16 +2,16 @@ use std::mem;
use bytes::Bytes;
use conduwuit::{
debug, debug_error, debug_warn, err, error::inspect_debug_log, implement, trace, utils::string::EMPTY, Err, Error,
Result,
debug, debug_error, debug_warn, err, error::inspect_debug_log, implement, trace,
utils::string::EMPTY, Err, Error, Result,
};
use http::{header::AUTHORIZATION, HeaderValue};
use ipaddress::IPAddress;
use reqwest::{Client, Method, Request, Response, Url};
use ruma::{
api::{
client::error::Error as RumaError, EndpointError, IncomingResponse, MatrixVersion, OutgoingRequest,
SendAccessToken,
client::error::Error as RumaError, EndpointError, IncomingResponse, MatrixVersion,
OutgoingRequest, SendAccessToken,
},
serde::Base64,
server_util::authorization::XMatrix,
@ -25,7 +25,12 @@ use crate::{
impl super::Service {
#[tracing::instrument(skip_all, level = "debug")]
pub async fn send<T>(&self, client: &Client, dest: &ServerName, request: T) -> Result<T::IncomingResponse>
pub async fn send<T>(
&self,
client: &Client,
dest: &ServerName,
request: T,
) -> Result<T::IncomingResponse>
where
T: OutgoingRequest + Send,
{
@ -39,7 +44,9 @@ impl super::Service {
.forbidden_remote_server_names
.contains(dest)
{
return Err!(Request(Forbidden(debug_warn!("Federation with {dest} is not allowed."))));
return Err!(Request(Forbidden(debug_warn!(
"Federation with {dest} is not allowed."
))));
}
let actual = self.services.resolver.get_actual_dest(dest).await?;
@ -49,7 +56,11 @@ impl super::Service {
}
async fn execute<T>(
&self, dest: &ServerName, actual: &ActualDest, request: Request, client: &Client,
&self,
dest: &ServerName,
actual: &ActualDest,
request: Request,
client: &Client,
) -> Result<T::IncomingResponse>
where
T: OutgoingRequest + Send,
@ -59,8 +70,18 @@ impl super::Service {
debug!(?method, ?url, "Sending request");
match client.execute(request).await {
Ok(response) => handle_response::<T>(&self.services.resolver, dest, actual, &method, &url, response).await,
Err(error) => Err(handle_error(actual, &method, &url, error).expect_err("always returns error")),
| Ok(response) =>
handle_response::<T>(
&self.services.resolver,
dest,
actual,
&method,
&url,
response,
)
.await,
| Err(error) =>
Err(handle_error(actual, &method, &url, error).expect_err("always returns error")),
}
}
@ -86,7 +107,11 @@ impl super::Service {
}
async fn handle_response<T>(
resolver: &resolver::Service, dest: &ServerName, actual: &ActualDest, method: &Method, url: &Url,
resolver: &resolver::Service,
dest: &ServerName,
actual: &ActualDest,
method: &Method,
url: &Url,
response: Response,
) -> Result<T::IncomingResponse>
where
@ -96,21 +121,22 @@ where
let result = T::IncomingResponse::try_from_http_response(response);
if result.is_ok() && !actual.cached {
resolver.set_cached_destination(
dest.to_owned(),
CachedDest {
dest: actual.dest.clone(),
host: actual.host.clone(),
expire: CachedDest::default_expire(),
},
);
resolver.set_cached_destination(dest.to_owned(), CachedDest {
dest: actual.dest.clone(),
host: actual.host.clone(),
expire: CachedDest::default_expire(),
});
}
result.map_err(|e| err!(BadServerResponse("Server returned bad 200 response: {e:?}")))
}
async fn into_http_response(
dest: &ServerName, actual: &ActualDest, method: &Method, url: &Url, mut response: Response,
dest: &ServerName,
actual: &ActualDest,
method: &Method,
url: &Url,
mut response: Response,
) -> Result<http::Response<Bytes>> {
let status = response.status();
trace!(
@ -146,13 +172,21 @@ async fn into_http_response(
debug!("Got {status:?} for {method} {url}");
if !status.is_success() {
return Err(Error::Federation(dest.to_owned(), RumaError::from_http_response(http_response)));
return Err(Error::Federation(
dest.to_owned(),
RumaError::from_http_response(http_response),
));
}
Ok(http_response)
}
fn handle_error(actual: &ActualDest, method: &Method, url: &Url, mut e: reqwest::Error) -> Result {
fn handle_error(
actual: &ActualDest,
method: &Method,
url: &Url,
mut e: reqwest::Error,
) -> Result {
if e.is_timeout() || e.is_connect() {
e = e.without_url();
debug_warn!("{e:?}");
@ -186,7 +220,8 @@ fn sign_request(&self, http_request: &mut http::Request<Vec<u8>>, dest: &ServerN
.expect("http::Request missing path_and_query");
let mut req: Object = if !body.is_empty() {
let content: CanonicalJsonValue = serde_json::from_slice(body).expect("failed to serialize body");
let content: CanonicalJsonValue =
serde_json::from_slice(body).expect("failed to serialize body");
let authorization: [Member; 5] = [
("content".into(), content),

View file

@ -24,15 +24,19 @@ use ruma::{
appservice::event::push_events::v1::Edu as RumaEdu,
federation::transactions::{
edu::{
DeviceListUpdateContent, Edu, PresenceContent, PresenceUpdate, ReceiptContent, ReceiptData, ReceiptMap,
DeviceListUpdateContent, Edu, PresenceContent, PresenceUpdate, ReceiptContent,
ReceiptData, ReceiptMap,
},
send_transaction_message,
},
},
device_id,
events::{push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType},
push, uint, CanonicalJsonObject, MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId,
RoomVersionId, ServerName, UInt,
events::{
push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent,
GlobalAccountDataEventType,
},
push, uint, CanonicalJsonObject, MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedServerName,
OwnedUserId, RoomId, RoomVersionId, ServerName, UInt,
};
use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
@ -86,11 +90,14 @@ impl Service {
}
async fn handle_response<'a>(
&'a self, response: SendingResult, futures: &mut SendingFutures<'a>, statuses: &mut CurTransactionStatus,
&'a self,
response: SendingResult,
futures: &mut SendingFutures<'a>,
statuses: &mut CurTransactionStatus,
) {
match response {
Ok(dest) => self.handle_response_ok(&dest, futures, statuses).await,
Err((dest, e)) => Self::handle_response_err(dest, statuses, &e),
| Ok(dest) => self.handle_response_ok(&dest, futures, statuses).await,
| Err((dest, e)) => Self::handle_response_err(dest, statuses, &e),
};
}
@ -98,16 +105,22 @@ impl Service {
debug!(dest = ?dest, "{e:?}");
statuses.entry(dest).and_modify(|e| {
*e = match e {
TransactionStatus::Running => TransactionStatus::Failed(1, Instant::now()),
TransactionStatus::Retrying(ref n) => TransactionStatus::Failed(n.saturating_add(1), Instant::now()),
TransactionStatus::Failed(..) => panic!("Request that was not even running failed?!"),
| TransactionStatus::Running => TransactionStatus::Failed(1, Instant::now()),
| TransactionStatus::Retrying(ref n) =>
TransactionStatus::Failed(n.saturating_add(1), Instant::now()),
| TransactionStatus::Failed(..) => {
panic!("Request that was not even running failed?!")
},
}
});
}
#[allow(clippy::needless_pass_by_ref_mut)]
async fn handle_response_ok<'a>(
&'a self, dest: &Destination, futures: &mut SendingFutures<'a>, statuses: &mut CurTransactionStatus,
&'a self,
dest: &Destination,
futures: &mut SendingFutures<'a>,
statuses: &mut CurTransactionStatus,
) {
let _cork = self.db.db.cork();
self.db.delete_all_active_requests_for(dest).await;
@ -133,7 +146,10 @@ impl Service {
#[allow(clippy::needless_pass_by_ref_mut)]
async fn handle_request<'a>(
&'a self, msg: Msg, futures: &mut SendingFutures<'a>, statuses: &mut CurTransactionStatus,
&'a self,
msg: Msg,
futures: &mut SendingFutures<'a>,
statuses: &mut CurTransactionStatus,
) {
let iv = vec![(msg.queue_id, msg.event)];
if let Ok(Some(events)) = self.select_events(&msg.dest, iv, statuses).await {
@ -168,8 +184,13 @@ impl Service {
}
#[allow(clippy::needless_pass_by_ref_mut)]
async fn initial_requests<'a>(&'a self, futures: &mut SendingFutures<'a>, statuses: &mut CurTransactionStatus) {
let keep = usize::try_from(self.server.config.startup_netburst_keep).unwrap_or(usize::MAX);
async fn initial_requests<'a>(
&'a self,
futures: &mut SendingFutures<'a>,
statuses: &mut CurTransactionStatus,
) {
let keep =
usize::try_from(self.server.config.startup_netburst_keep).unwrap_or(usize::MAX);
let mut txns = HashMap::<Destination, Vec<SendingEvent>>::new();
let mut active = self.db.active_requests().boxed();
@ -240,7 +261,11 @@ impl Service {
}
#[tracing::instrument(skip_all, level = "debug")]
fn select_events_current(&self, dest: Destination, statuses: &mut CurTransactionStatus) -> Result<(bool, bool)> {
fn select_events_current(
&self,
dest: Destination,
statuses: &mut CurTransactionStatus,
) -> Result<(bool, bool)> {
let (mut allow, mut retry) = (true, false);
statuses
.entry(dest.clone()) // TODO: can we avoid cloning?
@ -278,7 +303,8 @@ impl Service {
let events_len = AtomicUsize::default();
let max_edu_count = AtomicU64::new(since);
let device_changes = self.select_edus_device_changes(server_name, batch, &max_edu_count, &events_len);
let device_changes =
self.select_edus_device_changes(server_name, batch, &max_edu_count, &events_len);
let receipts: OptionFuture<_> = self
.server
@ -305,7 +331,11 @@ impl Service {
/// Look for presence
async fn select_edus_device_changes(
&self, server_name: &ServerName, since: (u64, u64), max_edu_count: &AtomicU64, events_len: &AtomicUsize,
&self,
server_name: &ServerName,
since: (u64, u64),
max_edu_count: &AtomicU64,
events_len: &AtomicUsize,
) -> Vec<Vec<u8>> {
let mut events = Vec::new();
let server_rooms = self.services.state_cache.server_rooms(server_name);
@ -342,7 +372,8 @@ impl Service {
keys: None,
});
let edu = serde_json::to_vec(&edu).expect("failed to serialize device list update to JSON");
let edu = serde_json::to_vec(&edu)
.expect("failed to serialize device list update to JSON");
events.push(edu);
if events_len.fetch_add(1, Ordering::Relaxed) >= SELECT_EDU_LIMIT - 1 {
@ -356,7 +387,10 @@ impl Service {
/// Look for read receipts in this room
async fn select_edus_receipts(
&self, server_name: &ServerName, since: (u64, u64), max_edu_count: &AtomicU64,
&self,
server_name: &ServerName,
since: (u64, u64),
max_edu_count: &AtomicU64,
) -> Option<Vec<u8>> {
let server_rooms = self.services.state_cache.server_rooms(server_name);
@ -377,19 +411,21 @@ impl Service {
return None;
}
let receipt_content = Edu::Receipt(ReceiptContent {
receipts,
});
let receipt_content = Edu::Receipt(ReceiptContent { receipts });
let receipt_content =
serde_json::to_vec(&receipt_content).expect("Failed to serialize Receipt EDU to JSON vec");
let receipt_content = serde_json::to_vec(&receipt_content)
.expect("Failed to serialize Receipt EDU to JSON vec");
Some(receipt_content)
}
/// Look for read receipts in this room
async fn select_edus_receipts_room(
&self, room_id: &RoomId, since: (u64, u64), max_edu_count: &AtomicU64, num: &mut usize,
&self,
room_id: &RoomId,
since: (u64, u64),
max_edu_count: &AtomicU64,
num: &mut usize,
) -> ReceiptMap {
let receipts = self
.services
@ -444,14 +480,15 @@ impl Service {
}
}
ReceiptMap {
read,
}
ReceiptMap { read }
}
/// Look for presence
async fn select_edus_presence(
&self, server_name: &ServerName, since: (u64, u64), max_edu_count: &AtomicU64,
&self,
server_name: &ServerName,
since: (u64, u64),
max_edu_count: &AtomicU64,
) -> Option<Vec<u8>> {
let presence_since = self.services.presence.presence_since(since.0);
@ -511,7 +548,8 @@ impl Service {
push: presence_updates.into_values().collect(),
});
let presence_content = serde_json::to_vec(&presence_content).expect("failed to serialize Presence EDU to JSON");
let presence_content = serde_json::to_vec(&presence_content)
.expect("failed to serialize Presence EDU to JSON");
Some(presence_content)
}
@ -519,21 +557,28 @@ impl Service {
async fn send_events(&self, dest: Destination, events: Vec<SendingEvent>) -> SendingResult {
//debug_assert!(!events.is_empty(), "sending empty transaction");
match dest {
Destination::Normal(ref server) => self.send_events_dest_normal(&dest, server, events).await,
Destination::Appservice(ref id) => self.send_events_dest_appservice(&dest, id, events).await,
Destination::Push(ref userid, ref pushkey) => {
| Destination::Normal(ref server) =>
self.send_events_dest_normal(&dest, server, events).await,
| Destination::Appservice(ref id) =>
self.send_events_dest_appservice(&dest, id, events).await,
| Destination::Push(ref userid, ref pushkey) =>
self.send_events_dest_push(&dest, userid, pushkey, events)
.await
},
.await,
}
}
#[tracing::instrument(skip(self, dest, events), name = "appservice")]
async fn send_events_dest_appservice(
&self, dest: &Destination, id: &str, events: Vec<SendingEvent>,
&self,
dest: &Destination,
id: &str,
events: Vec<SendingEvent>,
) -> SendingResult {
let Some(appservice) = self.services.appservice.get_registration(id).await else {
return Err((dest.clone(), err!(Database(warn!(?id, "Missing appservice registration")))));
return Err((
dest.clone(),
err!(Database(warn!(?id, "Missing appservice registration"))),
));
};
let mut pdu_jsons = Vec::with_capacity(
@ -550,12 +595,12 @@ impl Service {
);
for event in &events {
match event {
SendingEvent::Pdu(pdu_id) => {
| SendingEvent::Pdu(pdu_id) => {
if let Ok(pdu) = self.services.timeline.get_pdu_from_id(pdu_id).await {
pdu_jsons.push(pdu.to_room_event());
}
},
SendingEvent::Edu(edu) => {
| SendingEvent::Edu(edu) => {
if appservice
.receive_ephemeral
.is_some_and(|receive_edus| receive_edus)
@ -565,14 +610,14 @@ impl Service {
}
}
},
SendingEvent::Flush => {}, // flush only; no new content
| SendingEvent::Flush => {}, // flush only; no new content
}
}
let txn_hash = calculate_hash(events.iter().filter_map(|e| match e {
SendingEvent::Edu(b) => Some(&**b),
SendingEvent::Pdu(b) => Some(b.as_ref()),
SendingEvent::Flush => None,
| SendingEvent::Edu(b) => Some(&**b),
| SendingEvent::Pdu(b) => Some(b.as_ref()),
| SendingEvent::Flush => None,
}));
let txn_id = &*general_purpose::URL_SAFE_NO_PAD.encode(txn_hash);
@ -592,28 +637,35 @@ impl Service {
)
.await
{
Ok(_) => Ok(dest.clone()),
Err(e) => Err((dest.clone(), e)),
| Ok(_) => Ok(dest.clone()),
| Err(e) => Err((dest.clone(), e)),
}
}
#[tracing::instrument(skip(self, dest, events), name = "push")]
async fn send_events_dest_push(
&self, dest: &Destination, userid: &OwnedUserId, pushkey: &str, events: Vec<SendingEvent>,
&self,
dest: &Destination,
userid: &OwnedUserId,
pushkey: &str,
events: Vec<SendingEvent>,
) -> SendingResult {
let Ok(pusher) = self.services.pusher.get_pusher(userid, pushkey).await else {
return Err((dest.clone(), err!(Database(error!(?userid, ?pushkey, "Missing pusher")))));
return Err((
dest.clone(),
err!(Database(error!(?userid, ?pushkey, "Missing pusher"))),
));
};
let mut pdus = Vec::new();
for event in &events {
match event {
SendingEvent::Pdu(pdu_id) => {
| SendingEvent::Pdu(pdu_id) => {
if let Ok(pdu) = self.services.timeline.get_pdu_from_id(pdu_id).await {
pdus.push(pdu);
}
},
SendingEvent::Edu(_) | SendingEvent::Flush => {
| SendingEvent::Edu(_) | SendingEvent::Flush => {
// Push gateways don't need EDUs (?) and flush only;
// no new content
},
@ -657,7 +709,10 @@ impl Service {
#[tracing::instrument(skip(self, dest, events), name = "", level = "debug")]
async fn send_events_dest_normal(
&self, dest: &Destination, server: &OwnedServerName, events: Vec<SendingEvent>,
&self,
dest: &Destination,
server: &OwnedServerName,
events: Vec<SendingEvent>,
) -> SendingResult {
let mut pdu_jsons = Vec::with_capacity(
events
@ -675,17 +730,16 @@ impl Service {
for event in &events {
match event {
// TODO: check room version and remove event_id if needed
SendingEvent::Pdu(pdu_id) => {
| SendingEvent::Pdu(pdu_id) => {
if let Ok(pdu) = self.services.timeline.get_pdu_json_from_id(pdu_id).await {
pdu_jsons.push(self.convert_to_outgoing_federation_event(pdu).await);
}
},
SendingEvent::Edu(edu) => {
| SendingEvent::Edu(edu) =>
if let Ok(raw) = serde_json::from_slice(edu) {
edu_jsons.push(raw);
}
},
SendingEvent::Flush => {}, // flush only; no new content
},
| SendingEvent::Flush => {}, // flush only; no new content
}
}
@ -693,9 +747,9 @@ impl Service {
// transaction");
let txn_hash = calculate_hash(events.iter().filter_map(|e| match e {
SendingEvent::Edu(b) => Some(&**b),
SendingEvent::Pdu(b) => Some(b.as_ref()),
SendingEvent::Flush => None,
| SendingEvent::Edu(b) => Some(&**b),
| SendingEvent::Pdu(b) => Some(b.as_ref()),
| SendingEvent::Flush => None,
}));
let txn_id = &*general_purpose::URL_SAFE_NO_PAD.encode(txn_hash);
@ -725,7 +779,10 @@ impl Service {
}
/// This does not return a full `Pdu` it is only to satisfy ruma's types.
pub async fn convert_to_outgoing_federation_event(&self, mut pdu_json: CanonicalJsonObject) -> Box<RawJsonValue> {
pub async fn convert_to_outgoing_federation_event(
&self,
mut pdu_json: CanonicalJsonObject,
) -> Box<RawJsonValue> {
if let Some(unsigned) = pdu_json
.get_mut("unsigned")
.and_then(|val| val.as_object_mut())
@ -739,11 +796,11 @@ impl Service {
.and_then(|val| RoomId::parse(val.as_str()?).ok())
{
match self.services.state.get_room_version(&room_id).await {
Ok(room_version_id) => match room_version_id {
RoomVersionId::V1 | RoomVersionId::V2 => {},
_ => _ = pdu_json.remove("event_id"),
| Ok(room_version_id) => match room_version_id {
| RoomVersionId::V1 | RoomVersionId::V2 => {},
| _ => _ = pdu_json.remove("event_id"),
},
Err(_) => _ = pdu_json.remove("event_id"),
| Err(_) => _ = pdu_json.remove("event_id"),
}
} else {
pdu_json.remove("event_id");