Compare commits

...

37 commits

Author SHA1 Message Date
nexy7574
a37151c665 Always calculate state diff IDs in syncv3
Some checks failed
Release Docker Image / define-variables (push) Failing after 1s
Release Docker Image / build-image (linux/amd64, linux-amd64) (push) Has been skipped
Release Docker Image / build-image (linux/arm64, linux-arm64) (push) Has been skipped
Release Docker Image / merge (push) Has been skipped
seemingly fixes #779
2025-05-25 17:30:28 -07:00
Jacob Taylor
2b3980116c cache maxxing
Some checks failed
Release Docker Image / define-variables (push) Failing after 4s
Release Docker Image / build-image (linux/amd64, linux-amd64) (push) Has been skipped
Release Docker Image / build-image (linux/arm64, linux-arm64) (push) Has been skipped
Release Docker Image / merge (push) Has been skipped
2025-05-24 22:21:58 -07:00
Jacob Taylor
49ffec3876 upgrade some settings
Some checks failed
Release Docker Image / define-variables (push) Failing after 4s
Release Docker Image / build-image (linux/amd64, linux-amd64) (push) Has been skipped
Release Docker Image / build-image (linux/arm64, linux-arm64) (push) Has been skipped
Release Docker Image / merge (push) Has been skipped
2025-05-24 07:27:57 -07:00
Jacob Taylor
ccdbf963de add futures::FutureExt to make cba9ee5240 work 2025-05-22 18:39:15 -07:00
Jacob Taylor
63fbb5dec3 Merge branch 'main' into illegal-car-mods 2025-05-22 16:47:42 -07:00
Jason Volk
cba9ee5240 Mitigate large futures
Signed-off-by: Jason Volk <jason@zemos.net>
2025-05-19 17:12:19 -07:00
Jacob Taylor
5bb8c8e858 bump the number of allowed immutable memtables by 1, to allow for greater flood protection 2025-05-14 06:53:00 -07:00
Jason Volk
066cf68622 Eliminate associated Id type from trait Event.
Co-authored-by: Jade Ellis <jade@ellis.link>
Signed-off-by: Jason Volk <jason@zemos.net>
2025-05-14 06:27:52 -07:00
Jason Volk
efdcca0b49 Eliminate explicit parallel_fetches argument.
Signed-off-by: Jason Volk <jason@zemos.net>
2025-05-14 06:27:52 -07:00
Jason Volk
84d61fd032 Remove unused Pdu::into_any_event().
Signed-off-by: Jason Volk <jason@zemos.net>
2025-05-14 06:27:52 -07:00
Jason Volk
dfdf6cd2f0 Join jemalloc background threads prior to exit.
Co-authored-by: Jade Ellis <jade@ellis.link>
Signed-off-by: Jason Volk <jason@zemos.net>
2025-05-14 06:27:52 -07:00
Jade Ellis
d939be460c chore: Add CONTINUWUITY_ environment variables
Also updates some examples to match
2025-05-14 06:27:52 -07:00
Jade
22918bf5ef docs: Fix name in README 2025-05-14 06:27:52 -07:00
Jade Ellis
e84c3ebccf chore: Fix typos across the codebase 2025-05-14 06:27:52 -07:00
Jade Ellis
60718b029f ci: Fix bad comparison 2025-05-14 06:27:52 -07:00
Jade Ellis
4c313aac9e chore: Link to Matrix rooms directly 2025-05-14 06:27:52 -07:00
Jade Ellis
e8da89d79c feat: Prefill server name in federation test 2025-05-14 06:27:52 -07:00
Jade Ellis
d519028c5c ci: Cache timelord-cli to avoid unnecesary compilation 2025-05-14 06:27:52 -07:00
Jade Ellis
aec255320c refactor: Move git version info gather in into a build script 2025-05-14 06:27:52 -07:00
Jade Ellis
d66c78675d feat: HTML default page 2025-05-14 06:27:52 -07:00
Jade Ellis
6250b07f6a fix: Hack around software treating empty join rule incorrectly 2025-04-28 19:06:49 -07:00
Glandos
94f2792d99 Actualiser debian/conduwuit.service 2025-04-28 19:06:49 -07:00
Kokomo
737f9b9788 Update Contributing.md file (#807)
Cleaned up wording and adjusted the links

Reviewed-on: https://forgejo.ellis.link/continuwuation/continuwuity/pulls/807
Reviewed-by: nex <nex@noreply.localhost>
Reviewed-by: Jade Ellis <jade@ellis.link>
Co-authored-by: Kokomo <git@kokomo.cloud>
Co-committed-by: Kokomo <git@kokomo.cloud>
2025-04-28 19:06:49 -07:00
Kokomo
942308ea57 Add maintainer emails 2025-04-28 19:06:49 -07:00
Kokomo
8ccdec6516 Add back space oops 2025-04-28 19:06:49 -07:00
Kokomo
80da18a325 Remove email and add reference to matrix space 2025-04-28 19:06:49 -07:00
Tom Foster
531170594d Tidy up publishing restriction check 2025-04-28 19:06:49 -07:00
Tom Foster
1aafd1163d Element Web client build 2025-04-28 19:06:49 -07:00
Tom Foster
a9d9580aa4 Ignore all markdown for auto image builds 2025-04-28 19:06:49 -07:00
Tom Foster
1518ce0878 Make Cloudflare Pages optional in CI 2025-04-28 19:06:49 -07:00
Jade Ellis
2e8abe1071 chore: Error on missing ID in messages 2025-04-28 19:06:49 -07:00
Jade Ellis
0c09c3651b fix: Do not panic on invalid membership event content 2025-04-28 19:06:49 -07:00
Jade Ellis
0c5e4fdc20 feat: Allow controlling client message filtering 2025-04-28 19:06:49 -07:00
Jade Ellis
2c043cfabf chore: Enable blurhashing by default 2025-04-28 19:06:18 -07:00
Jacob Taylor
ebfbca59a7 completely strike knowledge of the server from the moderation service 2025-04-25 21:44:45 -07:00
Jacob Taylor
78c2a07524 probably incorrectly delete support for non-standardized matrix srv record 2025-04-25 21:44:45 -07:00
Jacob Taylor
1c8ca527db Fix spaces rooms list load error. rev2 2025-04-25 21:44:44 -07:00
21 changed files with 257 additions and 314 deletions

View file

@ -6,6 +6,7 @@ use conduwuit::{
warn, warn,
}; };
use futures::StreamExt; use futures::StreamExt;
use futures::FutureExt;
use ruma::{OwnedRoomId, OwnedRoomOrAliasId, RoomAliasId, RoomId, RoomOrAliasId}; use ruma::{OwnedRoomId, OwnedRoomOrAliasId, RoomAliasId, RoomId, RoomOrAliasId};
use crate::{admin_command, admin_command_dispatch, get_room_info}; use crate::{admin_command, admin_command_dispatch, get_room_info};
@ -155,7 +156,10 @@ async fn ban_room(&self, room: OwnedRoomOrAliasId) -> Result {
evicting admins too)", evicting admins too)",
); );
if let Err(e) = leave_room(self.services, user_id, &room_id, None).await { if let Err(e) = leave_room(self.services, user_id, &room_id, None)
.boxed()
.await
{
warn!("Failed to leave room: {e}"); warn!("Failed to leave room: {e}");
} }
@ -323,7 +327,10 @@ async fn ban_list_of_rooms(&self) -> Result {
evicting admins too)", evicting admins too)",
); );
if let Err(e) = leave_room(self.services, user_id, &room_id, None).await { if let Err(e) = leave_room(self.services, user_id, &room_id, None)
.boxed()
.await
{
warn!("Failed to leave room: {e}"); warn!("Failed to leave room: {e}");
} }

View file

@ -9,6 +9,7 @@ use conduwuit::{
}; };
use conduwuit_api::client::{leave_all_rooms, update_avatar_url, update_displayname}; use conduwuit_api::client::{leave_all_rooms, update_avatar_url, update_displayname};
use futures::StreamExt; use futures::StreamExt;
use futures::FutureExt;
use ruma::{ use ruma::{
OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedUserId, UserId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedUserId, UserId,
events::{ events::{
@ -655,7 +656,9 @@ pub(super) async fn force_leave_room(
return Err!("{user_id} is not joined in the room"); return Err!("{user_id} is not joined in the room");
} }
leave_room(self.services, &user_id, &room_id, None).await?; leave_room(self.services, &user_id, &room_id, None)
.boxed()
.await?;
self.write_str(&format!("{user_id} has left {room_id}.",)) self.write_str(&format!("{user_id} has left {room_id}.",))
.await .await

View file

@ -763,7 +763,9 @@ pub(crate) async fn deactivate_route(
super::update_displayname(&services, sender_user, None, &all_joined_rooms).await; super::update_displayname(&services, sender_user, None, &all_joined_rooms).await;
super::update_avatar_url(&services, sender_user, None, None, &all_joined_rooms).await; super::update_avatar_url(&services, sender_user, None, None, &all_joined_rooms).await;
full_user_deactivate(&services, sender_user, &all_joined_rooms).await?; full_user_deactivate(&services, sender_user, &all_joined_rooms)
.boxed()
.await?;
info!("User {sender_user} deactivated their account."); info!("User {sender_user} deactivated their account.");
@ -915,7 +917,9 @@ pub async fn full_user_deactivate(
} }
} }
super::leave_all_rooms(services, user_id).await; super::leave_all_rooms(services, user_id)
.boxed()
.await;
Ok(()) Ok(())
} }

View file

@ -114,7 +114,9 @@ async fn banned_room_check(
.collect() .collect()
.await; .await;
full_user_deactivate(services, user_id, &all_joined_rooms).await?; full_user_deactivate(services, user_id, &all_joined_rooms)
.boxed()
.await?;
} }
return Err!(Request(Forbidden("This room is banned on this homeserver."))); return Err!(Request(Forbidden("This room is banned on this homeserver.")));
@ -153,7 +155,9 @@ async fn banned_room_check(
.collect() .collect()
.await; .await;
full_user_deactivate(services, user_id, &all_joined_rooms).await?; full_user_deactivate(services, user_id, &all_joined_rooms)
.boxed()
.await?;
} }
return Err!(Request(Forbidden("This remote server is banned on this homeserver."))); return Err!(Request(Forbidden("This remote server is banned on this homeserver.")));
@ -259,6 +263,7 @@ pub(crate) async fn join_room_by_id_or_alias_route(
room_id.server_name(), room_id.server_name(),
client, client,
) )
.boxed()
.await?; .await?;
let mut servers = body.via.clone(); let mut servers = body.via.clone();
@ -478,6 +483,7 @@ pub(crate) async fn leave_room_route(
body: Ruma<leave_room::v3::Request>, body: Ruma<leave_room::v3::Request>,
) -> Result<leave_room::v3::Response> { ) -> Result<leave_room::v3::Response> {
leave_room(&services, body.sender_user(), &body.room_id, body.reason.clone()) leave_room(&services, body.sender_user(), &body.room_id, body.reason.clone())
.boxed()
.await .await
.map(|()| leave_room::v3::Response::new()) .map(|()| leave_room::v3::Response::new())
} }
@ -1792,7 +1798,10 @@ pub async fn leave_all_rooms(services: &Services, user_id: &UserId) {
for room_id in all_rooms { for room_id in all_rooms {
// ignore errors // ignore errors
if let Err(e) = leave_room(services, user_id, &room_id, None).await { if let Err(e) = leave_room(services, user_id, &room_id, None)
.boxed()
.await
{
warn!(%user_id, "Failed to leave {room_id} remotely: {e}"); warn!(%user_id, "Failed to leave {room_id} remotely: {e}");
} }

View file

@ -121,7 +121,9 @@ where
.map(|(key, val)| (key, val.collect())) .map(|(key, val)| (key, val.collect()))
.collect(); .collect();
if !populate { if populate {
rooms.push(summary_to_chunk(summary.clone()));
} else {
children = children children = children
.iter() .iter()
.rev() .rev()
@ -144,10 +146,8 @@ where
.collect(); .collect();
} }
if populate { if !populate && queue.is_empty() && children.is_empty() {
rooms.push(summary_to_chunk(summary.clone())); break;
} else if queue.is_empty() && children.is_empty() {
return Err!(Request(InvalidParam("Room IDs in token were not found.")));
} }
parents.insert(current_room.clone()); parents.insert(current_room.clone());

View file

@ -6,6 +6,7 @@ use conduwuit::{
}; };
use conduwuit_service::Services; use conduwuit_service::Services;
use futures::TryStreamExt; use futures::TryStreamExt;
use futures::FutureExt;
use ruma::{ use ruma::{
OwnedEventId, RoomId, UserId, OwnedEventId, RoomId, UserId,
api::client::state::{get_state_events, get_state_events_for_key, send_state_event}, api::client::state::{get_state_events, get_state_events_for_key, send_state_event},
@ -59,6 +60,7 @@ pub(crate) async fn send_state_event_for_empty_key_route(
body: Ruma<send_state_event::v3::Request>, body: Ruma<send_state_event::v3::Request>,
) -> Result<RumaResponse<send_state_event::v3::Response>> { ) -> Result<RumaResponse<send_state_event::v3::Response>> {
send_state_event_for_key_route(State(services), body) send_state_event_for_key_route(State(services), body)
.boxed()
.await .await
.map(RumaResponse) .map(RumaResponse)
} }

View file

@ -1009,8 +1009,6 @@ async fn calculate_state_incremental<'a>(
) -> Result<StateChanges> { ) -> Result<StateChanges> {
let since_shortstatehash = since_shortstatehash.unwrap_or(current_shortstatehash); let since_shortstatehash = since_shortstatehash.unwrap_or(current_shortstatehash);
let state_changed = since_shortstatehash != current_shortstatehash;
let encrypted_room = services let encrypted_room = services
.rooms .rooms
.state_accessor .state_accessor
@ -1042,7 +1040,7 @@ async fn calculate_state_incremental<'a>(
}) })
.into(); .into();
let state_diff_ids: OptionFuture<_> = (!full_state && state_changed) let state_diff_ids: OptionFuture<_> = (!full_state)
.then(|| { .then(|| {
StreamExt::into_future( StreamExt::into_future(
services services

View file

@ -274,6 +274,10 @@ pub fn set_dirty_decay<I: Into<Option<usize>>>(arena: I, decay_ms: isize) -> Res
} }
} }
pub fn background_thread_enable(enable: bool) -> Result<bool> {
set::<u8>(&mallctl!("background_thread"), enable.into()).map(is_nonzero!())
}
#[inline] #[inline]
#[must_use] #[must_use]
pub fn is_affine_arena() -> bool { is_percpu_arena() || is_phycpu_arena() } pub fn is_affine_arena() -> bool { is_percpu_arena() || is_phycpu_arena() }

View file

@ -2042,41 +2042,41 @@ fn default_database_backups_to_keep() -> i16 { 1 }
fn default_db_write_buffer_capacity_mb() -> f64 { 48.0 + parallelism_scaled_f64(4.0) } fn default_db_write_buffer_capacity_mb() -> f64 { 48.0 + parallelism_scaled_f64(4.0) }
fn default_db_cache_capacity_mb() -> f64 { 128.0 + parallelism_scaled_f64(64.0) } fn default_db_cache_capacity_mb() -> f64 { 256.0 + parallelism_scaled_f64(256.0) }
fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(10_000).saturating_add(100_000) } fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(25_000).saturating_add(200_000) }
fn default_cache_capacity_modifier() -> f64 { 1.0 } fn default_cache_capacity_modifier() -> f64 { 1.0 }
fn default_auth_chain_cache_capacity() -> u32 { fn default_auth_chain_cache_capacity() -> u32 {
parallelism_scaled_u32(10_000).saturating_add(100_000) parallelism_scaled_u32(25_000).saturating_add(200_000)
} }
fn default_shorteventid_cache_capacity() -> u32 { fn default_shorteventid_cache_capacity() -> u32 {
parallelism_scaled_u32(50_000).saturating_add(100_000) parallelism_scaled_u32(50_000).saturating_add(200_000)
} }
fn default_eventidshort_cache_capacity() -> u32 { fn default_eventidshort_cache_capacity() -> u32 {
parallelism_scaled_u32(25_000).saturating_add(100_000) parallelism_scaled_u32(25_000).saturating_add(200_000)
} }
fn default_eventid_pdu_cache_capacity() -> u32 { fn default_eventid_pdu_cache_capacity() -> u32 {
parallelism_scaled_u32(25_000).saturating_add(100_000) parallelism_scaled_u32(25_000).saturating_add(200_000)
} }
fn default_shortstatekey_cache_capacity() -> u32 { fn default_shortstatekey_cache_capacity() -> u32 {
parallelism_scaled_u32(10_000).saturating_add(100_000) parallelism_scaled_u32(25_000).saturating_add(200_000)
} }
fn default_statekeyshort_cache_capacity() -> u32 { fn default_statekeyshort_cache_capacity() -> u32 {
parallelism_scaled_u32(10_000).saturating_add(100_000) parallelism_scaled_u32(25_000).saturating_add(200_000)
} }
fn default_servernameevent_data_cache_capacity() -> u32 { fn default_servernameevent_data_cache_capacity() -> u32 {
parallelism_scaled_u32(100_000).saturating_add(500_000) parallelism_scaled_u32(200_000).saturating_add(500_000)
} }
fn default_stateinfo_cache_capacity() -> u32 { parallelism_scaled_u32(100) } fn default_stateinfo_cache_capacity() -> u32 { parallelism_scaled_u32(2000) }
fn default_roomid_spacehierarchy_cache_capacity() -> u32 { parallelism_scaled_u32(1000) } fn default_roomid_spacehierarchy_cache_capacity() -> u32 { parallelism_scaled_u32(1000) }

View file

@ -1,18 +1,10 @@
use std::{
borrow::Borrow,
fmt::{Debug, Display},
hash::Hash,
};
use ruma::{EventId, MilliSecondsSinceUnixEpoch, RoomId, UserId, events::TimelineEventType}; use ruma::{EventId, MilliSecondsSinceUnixEpoch, RoomId, UserId, events::TimelineEventType};
use serde_json::value::RawValue as RawJsonValue; use serde_json::value::RawValue as RawJsonValue;
/// Abstraction of a PDU so users can have their own PDU types. /// Abstraction of a PDU so users can have their own PDU types.
pub trait Event { pub trait Event {
type Id: Clone + Debug + Display + Eq + Ord + Hash + Send + Borrow<EventId>;
/// The `EventId` of this event. /// The `EventId` of this event.
fn event_id(&self) -> &Self::Id; fn event_id(&self) -> &EventId;
/// The `RoomId` of this event. /// The `RoomId` of this event.
fn room_id(&self) -> &RoomId; fn room_id(&self) -> &RoomId;
@ -34,20 +26,18 @@ pub trait Event {
/// The events before this event. /// The events before this event.
// Requires GATs to avoid boxing (and TAIT for making it convenient). // Requires GATs to avoid boxing (and TAIT for making it convenient).
fn prev_events(&self) -> impl DoubleEndedIterator<Item = &Self::Id> + Send + '_; fn prev_events(&self) -> impl DoubleEndedIterator<Item = &EventId> + Send + '_;
/// All the authenticating events for this event. /// All the authenticating events for this event.
// Requires GATs to avoid boxing (and TAIT for making it convenient). // Requires GATs to avoid boxing (and TAIT for making it convenient).
fn auth_events(&self) -> impl DoubleEndedIterator<Item = &Self::Id> + Send + '_; fn auth_events(&self) -> impl DoubleEndedIterator<Item = &EventId> + Send + '_;
/// If this event is a redaction event this is the event it redacts. /// If this event is a redaction event this is the event it redacts.
fn redacts(&self) -> Option<&Self::Id>; fn redacts(&self) -> Option<&EventId>;
} }
impl<T: Event> Event for &T { impl<T: Event> Event for &T {
type Id = T::Id; fn event_id(&self) -> &EventId { (*self).event_id() }
fn event_id(&self) -> &Self::Id { (*self).event_id() }
fn room_id(&self) -> &RoomId { (*self).room_id() } fn room_id(&self) -> &RoomId { (*self).room_id() }
@ -61,13 +51,13 @@ impl<T: Event> Event for &T {
fn state_key(&self) -> Option<&str> { (*self).state_key() } fn state_key(&self) -> Option<&str> { (*self).state_key() }
fn prev_events(&self) -> impl DoubleEndedIterator<Item = &Self::Id> + Send + '_ { fn prev_events(&self) -> impl DoubleEndedIterator<Item = &EventId> + Send + '_ {
(*self).prev_events() (*self).prev_events()
} }
fn auth_events(&self) -> impl DoubleEndedIterator<Item = &Self::Id> + Send + '_ { fn auth_events(&self) -> impl DoubleEndedIterator<Item = &EventId> + Send + '_ {
(*self).auth_events() (*self).auth_events()
} }
fn redacts(&self) -> Option<&Self::Id> { (*self).redacts() } fn redacts(&self) -> Option<&EventId> { (*self).redacts() }
} }

View file

@ -79,9 +79,7 @@ impl Pdu {
} }
impl Event for Pdu { impl Event for Pdu {
type Id = OwnedEventId; fn event_id(&self) -> &EventId { &self.event_id }
fn event_id(&self) -> &Self::Id { &self.event_id }
fn room_id(&self) -> &RoomId { &self.room_id } fn room_id(&self) -> &RoomId { &self.room_id }
@ -97,15 +95,15 @@ impl Event for Pdu {
fn state_key(&self) -> Option<&str> { self.state_key.as_deref() } fn state_key(&self) -> Option<&str> { self.state_key.as_deref() }
fn prev_events(&self) -> impl DoubleEndedIterator<Item = &Self::Id> + Send + '_ { fn prev_events(&self) -> impl DoubleEndedIterator<Item = &EventId> + Send + '_ {
self.prev_events.iter() self.prev_events.iter().map(AsRef::as_ref)
} }
fn auth_events(&self) -> impl DoubleEndedIterator<Item = &Self::Id> + Send + '_ { fn auth_events(&self) -> impl DoubleEndedIterator<Item = &EventId> + Send + '_ {
self.auth_events.iter() self.auth_events.iter().map(AsRef::as_ref)
} }
fn redacts(&self) -> Option<&Self::Id> { self.redacts.as_ref() } fn redacts(&self) -> Option<&EventId> { self.redacts.as_deref() }
} }
/// Prevent derived equality which wouldn't limit itself to event_id /// Prevent derived equality which wouldn't limit itself to event_id

View file

@ -1,8 +1,8 @@
use ruma::{ use ruma::{
events::{ events::{
AnyEphemeralRoomEvent, AnyMessageLikeEvent, AnyStateEvent, AnyStrippedStateEvent, AnyMessageLikeEvent, AnyStateEvent, AnyStrippedStateEvent, AnySyncStateEvent,
AnySyncStateEvent, AnySyncTimelineEvent, AnyTimelineEvent, StateEvent, AnySyncTimelineEvent, AnyTimelineEvent, StateEvent, room::member::RoomMemberEventContent,
room::member::RoomMemberEventContent, space::child::HierarchySpaceChildEvent, space::child::HierarchySpaceChildEvent,
}, },
serde::Raw, serde::Raw,
}; };
@ -10,41 +10,6 @@ use serde_json::{json, value::Value as JsonValue};
use crate::implement; use crate::implement;
/// This only works for events that are also AnyRoomEvents.
#[must_use]
#[implement(super::Pdu)]
pub fn into_any_event(self) -> Raw<AnyEphemeralRoomEvent> {
serde_json::from_value(self.into_any_event_value()).expect("Raw::from_value always works")
}
/// This only works for events that are also AnyRoomEvents.
#[implement(super::Pdu)]
#[must_use]
#[inline]
pub fn into_any_event_value(self) -> JsonValue {
let (redacts, content) = self.copy_redacts();
let mut json = json!({
"content": content,
"type": self.kind,
"event_id": self.event_id,
"sender": self.sender,
"origin_server_ts": self.origin_server_ts,
"room_id": self.room_id,
});
if let Some(unsigned) = &self.unsigned {
json["unsigned"] = json!(unsigned);
}
if let Some(state_key) = &self.state_key {
json["state_key"] = json!(state_key);
}
if let Some(redacts) = &redacts {
json["redacts"] = json!(redacts);
}
json
}
#[implement(super::Pdu)] #[implement(super::Pdu)]
#[must_use] #[must_use]
#[inline] #[inline]
@ -53,7 +18,8 @@ pub fn into_room_event(self) -> Raw<AnyTimelineEvent> { self.to_room_event() }
#[implement(super::Pdu)] #[implement(super::Pdu)]
#[must_use] #[must_use]
pub fn to_room_event(&self) -> Raw<AnyTimelineEvent> { pub fn to_room_event(&self) -> Raw<AnyTimelineEvent> {
serde_json::from_value(self.to_room_event_value()).expect("Raw::from_value always works") let value = self.to_room_event_value();
serde_json::from_value(value).expect("Failed to serialize Event value")
} }
#[implement(super::Pdu)] #[implement(super::Pdu)]
@ -91,8 +57,8 @@ pub fn into_message_like_event(self) -> Raw<AnyMessageLikeEvent> { self.to_messa
#[implement(super::Pdu)] #[implement(super::Pdu)]
#[must_use] #[must_use]
pub fn to_message_like_event(&self) -> Raw<AnyMessageLikeEvent> { pub fn to_message_like_event(&self) -> Raw<AnyMessageLikeEvent> {
serde_json::from_value(self.to_message_like_event_value()) let value = self.to_message_like_event_value();
.expect("Raw::from_value always works") serde_json::from_value(value).expect("Failed to serialize Event value")
} }
#[implement(super::Pdu)] #[implement(super::Pdu)]
@ -130,7 +96,8 @@ pub fn into_sync_room_event(self) -> Raw<AnySyncTimelineEvent> { self.to_sync_ro
#[implement(super::Pdu)] #[implement(super::Pdu)]
#[must_use] #[must_use]
pub fn to_sync_room_event(&self) -> Raw<AnySyncTimelineEvent> { pub fn to_sync_room_event(&self) -> Raw<AnySyncTimelineEvent> {
serde_json::from_value(self.to_sync_room_event_value()).expect("Raw::from_value always works") let value = self.to_sync_room_event_value();
serde_json::from_value(value).expect("Failed to serialize Event value")
} }
#[implement(super::Pdu)] #[implement(super::Pdu)]
@ -162,7 +129,8 @@ pub fn to_sync_room_event_value(&self) -> JsonValue {
#[implement(super::Pdu)] #[implement(super::Pdu)]
#[must_use] #[must_use]
pub fn into_state_event(self) -> Raw<AnyStateEvent> { pub fn into_state_event(self) -> Raw<AnyStateEvent> {
serde_json::from_value(self.into_state_event_value()).expect("Raw::from_value always works") let value = self.into_state_event_value();
serde_json::from_value(value).expect("Failed to serialize Event value")
} }
#[implement(super::Pdu)] #[implement(super::Pdu)]
@ -189,8 +157,8 @@ pub fn into_state_event_value(self) -> JsonValue {
#[implement(super::Pdu)] #[implement(super::Pdu)]
#[must_use] #[must_use]
pub fn into_sync_state_event(self) -> Raw<AnySyncStateEvent> { pub fn into_sync_state_event(self) -> Raw<AnySyncStateEvent> {
serde_json::from_value(self.into_sync_state_event_value()) let value = self.into_sync_state_event_value();
.expect("Raw::from_value always works") serde_json::from_value(value).expect("Failed to serialize Event value")
} }
#[implement(super::Pdu)] #[implement(super::Pdu)]
@ -223,8 +191,8 @@ pub fn into_stripped_state_event(self) -> Raw<AnyStrippedStateEvent> {
#[implement(super::Pdu)] #[implement(super::Pdu)]
#[must_use] #[must_use]
pub fn to_stripped_state_event(&self) -> Raw<AnyStrippedStateEvent> { pub fn to_stripped_state_event(&self) -> Raw<AnyStrippedStateEvent> {
serde_json::from_value(self.to_stripped_state_event_value()) let value = self.to_stripped_state_event_value();
.expect("Raw::from_value always works") serde_json::from_value(value).expect("Failed to serialize Event value")
} }
#[implement(super::Pdu)] #[implement(super::Pdu)]
@ -242,8 +210,8 @@ pub fn to_stripped_state_event_value(&self) -> JsonValue {
#[implement(super::Pdu)] #[implement(super::Pdu)]
#[must_use] #[must_use]
pub fn into_stripped_spacechild_state_event(self) -> Raw<HierarchySpaceChildEvent> { pub fn into_stripped_spacechild_state_event(self) -> Raw<HierarchySpaceChildEvent> {
serde_json::from_value(self.into_stripped_spacechild_state_event_value()) let value = self.into_stripped_spacechild_state_event_value();
.expect("Raw::from_value always works") serde_json::from_value(value).expect("Failed to serialize Event value")
} }
#[implement(super::Pdu)] #[implement(super::Pdu)]
@ -262,7 +230,8 @@ pub fn into_stripped_spacechild_state_event_value(self) -> JsonValue {
#[implement(super::Pdu)] #[implement(super::Pdu)]
#[must_use] #[must_use]
pub fn into_member_event(self) -> Raw<StateEvent<RoomMemberEventContent>> { pub fn into_member_event(self) -> Raw<StateEvent<RoomMemberEventContent>> {
serde_json::from_value(self.into_member_event_value()).expect("Raw::from_value always works") let value = self.into_member_event_value();
serde_json::from_value(value).expect("Failed to serialize Event value")
} }
#[implement(super::Pdu)] #[implement(super::Pdu)]

View file

@ -52,7 +52,6 @@ fn lexico_topo_sort(c: &mut test::Bencher) {
#[cfg(conduwuit_bench)] #[cfg(conduwuit_bench)]
#[cfg_attr(conduwuit_bench, bench)] #[cfg_attr(conduwuit_bench, bench)]
fn resolution_shallow_auth_chain(c: &mut test::Bencher) { fn resolution_shallow_auth_chain(c: &mut test::Bencher) {
let parallel_fetches = 32;
let mut store = TestStore(hashmap! {}); let mut store = TestStore(hashmap! {});
// build up the DAG // build up the DAG
@ -78,7 +77,6 @@ fn resolution_shallow_auth_chain(c: &mut test::Bencher) {
&auth_chain_sets, &auth_chain_sets,
&fetch, &fetch,
&exists, &exists,
parallel_fetches,
) )
.await .await
{ {
@ -91,7 +89,6 @@ fn resolution_shallow_auth_chain(c: &mut test::Bencher) {
#[cfg(conduwuit_bench)] #[cfg(conduwuit_bench)]
#[cfg_attr(conduwuit_bench, bench)] #[cfg_attr(conduwuit_bench, bench)]
fn resolve_deeper_event_set(c: &mut test::Bencher) { fn resolve_deeper_event_set(c: &mut test::Bencher) {
let parallel_fetches = 32;
let mut inner = INITIAL_EVENTS(); let mut inner = INITIAL_EVENTS();
let ban = BAN_STATE_SET(); let ban = BAN_STATE_SET();
@ -153,7 +150,6 @@ fn resolve_deeper_event_set(c: &mut test::Bencher) {
&auth_chain_sets, &auth_chain_sets,
&fetch, &fetch,
&exists, &exists,
parallel_fetches,
) )
.await .await
{ {
@ -190,7 +186,11 @@ impl<E: Event + Clone> TestStore<E> {
} }
/// Returns a Vec of the related auth events to the given `event`. /// Returns a Vec of the related auth events to the given `event`.
fn auth_event_ids(&self, room_id: &RoomId, event_ids: Vec<E::Id>) -> Result<HashSet<E::Id>> { fn auth_event_ids(
&self,
room_id: &RoomId,
event_ids: Vec<OwnedEventId>,
) -> Result<HashSet<OwnedEventId>> {
let mut result = HashSet::new(); let mut result = HashSet::new();
let mut stack = event_ids; let mut stack = event_ids;
@ -216,8 +216,8 @@ impl<E: Event + Clone> TestStore<E> {
fn auth_chain_diff( fn auth_chain_diff(
&self, &self,
room_id: &RoomId, room_id: &RoomId,
event_ids: Vec<Vec<E::Id>>, event_ids: Vec<Vec<OwnedEventId>>,
) -> Result<Vec<E::Id>> { ) -> Result<Vec<OwnedEventId>> {
let mut auth_chain_sets = vec![]; let mut auth_chain_sets = vec![];
for ids in event_ids { for ids in event_ids {
// TODO state store `auth_event_ids` returns self in the event ids list // TODO state store `auth_event_ids` returns self in the event ids list
@ -238,7 +238,7 @@ impl<E: Event + Clone> TestStore<E> {
Ok(auth_chain_sets Ok(auth_chain_sets
.into_iter() .into_iter()
.flatten() .flatten()
.filter(|id| !common.contains(id.borrow())) .filter(|id| !common.contains(id))
.collect()) .collect())
} else { } else {
Ok(vec![]) Ok(vec![])
@ -565,7 +565,7 @@ impl EventTypeExt for &TimelineEventType {
mod event { mod event {
use ruma::{ use ruma::{
MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, UserId, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, UserId,
events::{TimelineEventType, pdu::Pdu}, events::{TimelineEventType, pdu::Pdu},
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -574,9 +574,7 @@ mod event {
use super::Event; use super::Event;
impl Event for PduEvent { impl Event for PduEvent {
type Id = OwnedEventId; fn event_id(&self) -> &EventId { &self.event_id }
fn event_id(&self) -> &Self::Id { &self.event_id }
fn room_id(&self) -> &RoomId { fn room_id(&self) -> &RoomId {
match &self.rest { match &self.rest {
@ -632,28 +630,30 @@ mod event {
} }
} }
fn prev_events(&self) -> Box<dyn DoubleEndedIterator<Item = &Self::Id> + Send + '_> { fn prev_events(&self) -> Box<dyn DoubleEndedIterator<Item = &EventId> + Send + '_> {
match &self.rest { match &self.rest {
| Pdu::RoomV1Pdu(ev) => Box::new(ev.prev_events.iter().map(|(id, _)| id)), | Pdu::RoomV1Pdu(ev) =>
| Pdu::RoomV3Pdu(ev) => Box::new(ev.prev_events.iter()), Box::new(ev.prev_events.iter().map(|(id, _)| id.as_ref())),
| Pdu::RoomV3Pdu(ev) => Box::new(ev.prev_events.iter().map(AsRef::as_ref)),
#[cfg(not(feature = "unstable-exhaustive-types"))] #[cfg(not(feature = "unstable-exhaustive-types"))]
| _ => unreachable!("new PDU version"), | _ => unreachable!("new PDU version"),
} }
} }
fn auth_events(&self) -> Box<dyn DoubleEndedIterator<Item = &Self::Id> + Send + '_> { fn auth_events(&self) -> Box<dyn DoubleEndedIterator<Item = &EventId> + Send + '_> {
match &self.rest { match &self.rest {
| Pdu::RoomV1Pdu(ev) => Box::new(ev.auth_events.iter().map(|(id, _)| id)), | Pdu::RoomV1Pdu(ev) =>
| Pdu::RoomV3Pdu(ev) => Box::new(ev.auth_events.iter()), Box::new(ev.auth_events.iter().map(|(id, _)| id.as_ref())),
| Pdu::RoomV3Pdu(ev) => Box::new(ev.auth_events.iter().map(AsRef::as_ref)),
#[cfg(not(feature = "unstable-exhaustive-types"))] #[cfg(not(feature = "unstable-exhaustive-types"))]
| _ => unreachable!("new PDU version"), | _ => unreachable!("new PDU version"),
} }
} }
fn redacts(&self) -> Option<&Self::Id> { fn redacts(&self) -> Option<&EventId> {
match &self.rest { match &self.rest {
| Pdu::RoomV1Pdu(ev) => ev.redacts.as_ref(), | Pdu::RoomV1Pdu(ev) => ev.redacts.as_deref(),
| Pdu::RoomV3Pdu(ev) => ev.redacts.as_ref(), | Pdu::RoomV3Pdu(ev) => ev.redacts.as_deref(),
#[cfg(not(feature = "unstable-exhaustive-types"))] #[cfg(not(feature = "unstable-exhaustive-types"))]
| _ => unreachable!("new PDU version"), | _ => unreachable!("new PDU version"),
} }

View file

@ -133,7 +133,7 @@ pub fn auth_types_for_event(
level = "debug", level = "debug",
skip_all, skip_all,
fields( fields(
event_id = incoming_event.event_id().borrow().as_str() event_id = incoming_event.event_id().as_str(),
) )
)] )]
pub async fn auth_check<F, Fut, Fetched, Incoming>( pub async fn auth_check<F, Fut, Fetched, Incoming>(
@ -259,7 +259,7 @@ where
// 3. If event does not have m.room.create in auth_events reject // 3. If event does not have m.room.create in auth_events reject
if !incoming_event if !incoming_event
.auth_events() .auth_events()
.any(|id| id.borrow() == room_create_event.event_id().borrow()) .any(|id| id == room_create_event.event_id())
{ {
warn!("no m.room.create event in auth events"); warn!("no m.room.create event in auth events");
return Ok(false); return Ok(false);
@ -1021,11 +1021,11 @@ fn check_redaction(
// If the domain of the event_id of the event being redacted is the same as the // If the domain of the event_id of the event being redacted is the same as the
// domain of the event_id of the m.room.redaction, allow // domain of the event_id of the m.room.redaction, allow
if redaction_event.event_id().borrow().server_name() if redaction_event.event_id().server_name()
== redaction_event == redaction_event
.redacts() .redacts()
.as_ref() .as_ref()
.and_then(|&id| id.borrow().server_name()) .and_then(|&id| id.server_name())
{ {
debug!("redaction event allowed via room version 1 rules"); debug!("redaction event allowed via room version 1 rules");
return Ok(true); return Ok(true);

View file

@ -20,7 +20,7 @@ use std::{
use futures::{Future, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future}; use futures::{Future, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future};
use ruma::{ use ruma::{
EventId, Int, MilliSecondsSinceUnixEpoch, RoomVersionId, EventId, Int, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomVersionId,
events::{ events::{
StateEventType, TimelineEventType, StateEventType, TimelineEventType,
room::member::{MembershipState, RoomMemberEventContent}, room::member::{MembershipState, RoomMemberEventContent},
@ -39,9 +39,7 @@ use crate::{
debug, debug_error, debug, debug_error,
matrix::{event::Event, pdu::StateKey}, matrix::{event::Event, pdu::StateKey},
trace, trace,
utils::stream::{ utils::stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, WidebandExt},
BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, TryReadyExt, WidebandExt,
},
warn, warn,
}; };
@ -69,9 +67,6 @@ type Result<T, E = Error> = crate::Result<T, E>;
/// * `event_fetch` - Any event not found in the `event_map` will defer to this /// * `event_fetch` - Any event not found in the `event_map` will defer to this
/// closure to find the event. /// closure to find the event.
/// ///
/// * `parallel_fetches` - The number of asynchronous fetch requests in-flight
/// for any given operation.
///
/// ## Invariants /// ## Invariants
/// ///
/// The caller of `resolve` must ensure that all the events are from the same /// The caller of `resolve` must ensure that all the events are from the same
@ -82,21 +77,19 @@ type Result<T, E = Error> = crate::Result<T, E>;
pub async fn resolve<'a, E, Sets, SetIter, Hasher, Fetch, FetchFut, Exists, ExistsFut>( pub async fn resolve<'a, E, Sets, SetIter, Hasher, Fetch, FetchFut, Exists, ExistsFut>(
room_version: &RoomVersionId, room_version: &RoomVersionId,
state_sets: Sets, state_sets: Sets,
auth_chain_sets: &'a [HashSet<E::Id, Hasher>], auth_chain_sets: &'a [HashSet<OwnedEventId, Hasher>],
event_fetch: &Fetch, event_fetch: &Fetch,
event_exists: &Exists, event_exists: &Exists,
parallel_fetches: usize, ) -> Result<StateMap<OwnedEventId>>
) -> Result<StateMap<E::Id>>
where where
Fetch: Fn(E::Id) -> FetchFut + Sync, Fetch: Fn(OwnedEventId) -> FetchFut + Sync,
FetchFut: Future<Output = Option<E>> + Send, FetchFut: Future<Output = Option<E>> + Send,
Exists: Fn(E::Id) -> ExistsFut + Sync, Exists: Fn(OwnedEventId) -> ExistsFut + Sync,
ExistsFut: Future<Output = bool> + Send, ExistsFut: Future<Output = bool> + Send,
Sets: IntoIterator<IntoIter = SetIter> + Send, Sets: IntoIterator<IntoIter = SetIter> + Send,
SetIter: Iterator<Item = &'a StateMap<E::Id>> + Clone + Send, SetIter: Iterator<Item = &'a StateMap<OwnedEventId>> + Clone + Send,
Hasher: BuildHasher + Send + Sync, Hasher: BuildHasher + Send + Sync,
E: Event + Clone + Send + Sync, E: Event + Clone + Send + Sync,
E::Id: Borrow<EventId> + Send + Sync,
for<'b> &'b E: Send, for<'b> &'b E: Send,
{ {
debug!("State resolution starting"); debug!("State resolution starting");
@ -147,13 +140,8 @@ where
// Sort the control events based on power_level/clock/event_id and // Sort the control events based on power_level/clock/event_id and
// outgoing/incoming edges // outgoing/incoming edges
let sorted_control_levels = reverse_topological_power_sort( let sorted_control_levels =
control_events, reverse_topological_power_sort(control_events, &all_conflicted, &event_fetch).await?;
&all_conflicted,
&event_fetch,
parallel_fetches,
)
.await?;
debug!(count = sorted_control_levels.len(), "power events"); debug!(count = sorted_control_levels.len(), "power events");
trace!(list = ?sorted_control_levels, "sorted power events"); trace!(list = ?sorted_control_levels, "sorted power events");
@ -162,7 +150,7 @@ where
// Sequentially auth check each control event. // Sequentially auth check each control event.
let resolved_control = iterative_auth_check( let resolved_control = iterative_auth_check(
&room_version, &room_version,
sorted_control_levels.iter().stream(), sorted_control_levels.iter().stream().map(AsRef::as_ref),
clean.clone(), clean.clone(),
&event_fetch, &event_fetch,
) )
@ -179,7 +167,7 @@ where
// that failed auth // that failed auth
let events_to_resolve: Vec<_> = all_conflicted let events_to_resolve: Vec<_> = all_conflicted
.iter() .iter()
.filter(|&id| !deduped_power_ev.contains(id.borrow())) .filter(|&id| !deduped_power_ev.contains(id))
.cloned() .cloned()
.collect(); .collect();
@ -199,7 +187,7 @@ where
let mut resolved_state = iterative_auth_check( let mut resolved_state = iterative_auth_check(
&room_version, &room_version,
sorted_left_events.iter().stream(), sorted_left_events.iter().stream().map(AsRef::as_ref),
resolved_control, // The control events are added to the final resolved state resolved_control, // The control events are added to the final resolved state
&event_fetch, &event_fetch,
) )
@ -292,16 +280,14 @@ where
/// earlier (further back in time) origin server timestamp. /// earlier (further back in time) origin server timestamp.
#[tracing::instrument(level = "debug", skip_all)] #[tracing::instrument(level = "debug", skip_all)]
async fn reverse_topological_power_sort<E, F, Fut>( async fn reverse_topological_power_sort<E, F, Fut>(
events_to_sort: Vec<E::Id>, events_to_sort: Vec<OwnedEventId>,
auth_diff: &HashSet<E::Id>, auth_diff: &HashSet<OwnedEventId>,
fetch_event: &F, fetch_event: &F,
parallel_fetches: usize, ) -> Result<Vec<OwnedEventId>>
) -> Result<Vec<E::Id>>
where where
F: Fn(E::Id) -> Fut + Sync, F: Fn(OwnedEventId) -> Fut + Sync,
Fut: Future<Output = Option<E>> + Send, Fut: Future<Output = Option<E>> + Send,
E: Event + Send + Sync, E: Event + Send + Sync,
E::Id: Borrow<EventId> + Send + Sync,
{ {
debug!("reverse topological sort of power events"); debug!("reverse topological sort of power events");
@ -311,35 +297,36 @@ where
} }
// This is used in the `key_fn` passed to the lexico_topo_sort fn // This is used in the `key_fn` passed to the lexico_topo_sort fn
let event_to_pl = graph let event_to_pl: HashMap<_, _> = graph
.keys() .keys()
.cloned()
.stream() .stream()
.map(|event_id| { .broad_filter_map(async |event_id| {
get_power_level_for_sender(event_id.clone(), fetch_event) let pl = get_power_level_for_sender(&event_id, fetch_event)
.map(move |res| res.map(|pl| (event_id, pl))) .await
.ok()?;
Some((event_id, pl))
}) })
.buffer_unordered(parallel_fetches) .inspect(|(event_id, pl)| {
.ready_try_fold(HashMap::new(), |mut event_to_pl, (event_id, pl)| {
debug!( debug!(
event_id = event_id.borrow().as_str(), event_id = event_id.as_str(),
power_level = i64::from(pl), power_level = i64::from(*pl),
"found the power level of an event's sender", "found the power level of an event's sender",
); );
event_to_pl.insert(event_id.clone(), pl);
Ok(event_to_pl)
}) })
.collect()
.boxed() .boxed()
.await?; .await;
let event_to_pl = &event_to_pl; let fetcher = async |event_id: OwnedEventId| {
let fetcher = |event_id: E::Id| async move {
let pl = *event_to_pl let pl = *event_to_pl
.get(event_id.borrow()) .get(&event_id)
.ok_or_else(|| Error::NotFound(String::new()))?; .ok_or_else(|| Error::NotFound(String::new()))?;
let ev = fetch_event(event_id) let ev = fetch_event(event_id)
.await .await
.ok_or_else(|| Error::NotFound(String::new()))?; .ok_or_else(|| Error::NotFound(String::new()))?;
Ok((pl, ev.origin_server_ts())) Ok((pl, ev.origin_server_ts()))
}; };
@ -476,18 +463,17 @@ where
/// the eventId at the eventId's generation (we walk backwards to `EventId`s /// the eventId at the eventId's generation (we walk backwards to `EventId`s
/// most recent previous power level event). /// most recent previous power level event).
async fn get_power_level_for_sender<E, F, Fut>( async fn get_power_level_for_sender<E, F, Fut>(
event_id: E::Id, event_id: &EventId,
fetch_event: &F, fetch_event: &F,
) -> serde_json::Result<Int> ) -> serde_json::Result<Int>
where where
F: Fn(E::Id) -> Fut + Sync, F: Fn(OwnedEventId) -> Fut + Sync,
Fut: Future<Output = Option<E>> + Send, Fut: Future<Output = Option<E>> + Send,
E: Event + Send, E: Event + Send,
E::Id: Borrow<EventId> + Send,
{ {
debug!("fetch event ({event_id}) senders power level"); debug!("fetch event ({event_id}) senders power level");
let event = fetch_event(event_id).await; let event = fetch_event(event_id.to_owned()).await;
let auth_events = event.as_ref().map(Event::auth_events); let auth_events = event.as_ref().map(Event::auth_events);
@ -495,7 +481,7 @@ where
.into_iter() .into_iter()
.flatten() .flatten()
.stream() .stream()
.broadn_filter_map(5, |aid| fetch_event(aid.clone())) .broadn_filter_map(5, |aid| fetch_event(aid.to_owned()))
.ready_find(|aev| is_type_and_key(aev, &TimelineEventType::RoomPowerLevels, "")) .ready_find(|aev| is_type_and_key(aev, &TimelineEventType::RoomPowerLevels, ""))
.await; .await;
@ -528,14 +514,13 @@ where
async fn iterative_auth_check<'a, E, F, Fut, S>( async fn iterative_auth_check<'a, E, F, Fut, S>(
room_version: &RoomVersion, room_version: &RoomVersion,
events_to_check: S, events_to_check: S,
unconflicted_state: StateMap<E::Id>, unconflicted_state: StateMap<OwnedEventId>,
fetch_event: &F, fetch_event: &F,
) -> Result<StateMap<E::Id>> ) -> Result<StateMap<OwnedEventId>>
where where
F: Fn(E::Id) -> Fut + Sync, F: Fn(OwnedEventId) -> Fut + Sync,
Fut: Future<Output = Option<E>> + Send, Fut: Future<Output = Option<E>> + Send,
E::Id: Borrow<EventId> + Clone + Eq + Ord + Send + Sync + 'a, S: Stream<Item = &'a EventId> + Send + 'a,
S: Stream<Item = &'a E::Id> + Send + 'a,
E: Event + Clone + Send + Sync, E: Event + Clone + Send + Sync,
{ {
debug!("starting iterative auth check"); debug!("starting iterative auth check");
@ -543,7 +528,7 @@ where
let events_to_check: Vec<_> = events_to_check let events_to_check: Vec<_> = events_to_check
.map(Result::Ok) .map(Result::Ok)
.broad_and_then(async |event_id| { .broad_and_then(async |event_id| {
fetch_event(event_id.clone()) fetch_event(event_id.to_owned())
.await .await
.ok_or_else(|| Error::NotFound(format!("Failed to find {event_id}"))) .ok_or_else(|| Error::NotFound(format!("Failed to find {event_id}")))
}) })
@ -551,16 +536,16 @@ where
.boxed() .boxed()
.await?; .await?;
let auth_event_ids: HashSet<E::Id> = events_to_check let auth_event_ids: HashSet<OwnedEventId> = events_to_check
.iter() .iter()
.flat_map(|event: &E| event.auth_events().map(Clone::clone)) .flat_map(|event: &E| event.auth_events().map(ToOwned::to_owned))
.collect(); .collect();
let auth_events: HashMap<E::Id, E> = auth_event_ids let auth_events: HashMap<OwnedEventId, E> = auth_event_ids
.into_iter() .into_iter()
.stream() .stream()
.broad_filter_map(fetch_event) .broad_filter_map(fetch_event)
.map(|auth_event| (auth_event.event_id().clone(), auth_event)) .map(|auth_event| (auth_event.event_id().to_owned(), auth_event))
.collect() .collect()
.boxed() .boxed()
.await; .await;
@ -581,7 +566,7 @@ where
let mut auth_state = StateMap::new(); let mut auth_state = StateMap::new();
for aid in event.auth_events() { for aid in event.auth_events() {
if let Some(ev) = auth_events.get(aid.borrow()) { if let Some(ev) = auth_events.get(aid) {
//TODO: synapse checks "rejected_reason" which is most likely related to //TODO: synapse checks "rejected_reason" which is most likely related to
// soft-failing // soft-failing
auth_state.insert( auth_state.insert(
@ -592,7 +577,7 @@ where
ev.clone(), ev.clone(),
); );
} else { } else {
warn!(event_id = aid.borrow().as_str(), "missing auth event"); warn!(event_id = aid.as_str(), "missing auth event");
} }
} }
@ -601,7 +586,7 @@ where
.stream() .stream()
.ready_filter_map(|key| Some((key, resolved_state.get(key)?))) .ready_filter_map(|key| Some((key, resolved_state.get(key)?)))
.filter_map(|(key, ev_id)| async move { .filter_map(|(key, ev_id)| async move {
if let Some(event) = auth_events.get(ev_id.borrow()) { if let Some(event) = auth_events.get(ev_id) {
Some((key, event.clone())) Some((key, event.clone()))
} else { } else {
Some((key, fetch_event(ev_id.clone()).await?)) Some((key, fetch_event(ev_id.clone()).await?))
@ -633,7 +618,7 @@ where
// add event to resolved state map // add event to resolved state map
resolved_state.insert( resolved_state.insert(
event.event_type().with_state_key(state_key), event.event_type().with_state_key(state_key),
event.event_id().clone(), event.event_id().to_owned(),
); );
}, },
| Ok(false) => { | Ok(false) => {
@ -660,15 +645,14 @@ where
/// level as a parent) will be marked as depth 1. depth 1 is "older" than depth /// level as a parent) will be marked as depth 1. depth 1 is "older" than depth
/// 0. /// 0.
async fn mainline_sort<E, F, Fut>( async fn mainline_sort<E, F, Fut>(
to_sort: &[E::Id], to_sort: &[OwnedEventId],
resolved_power_level: Option<E::Id>, resolved_power_level: Option<OwnedEventId>,
fetch_event: &F, fetch_event: &F,
) -> Result<Vec<E::Id>> ) -> Result<Vec<OwnedEventId>>
where where
F: Fn(E::Id) -> Fut + Sync, F: Fn(OwnedEventId) -> Fut + Sync,
Fut: Future<Output = Option<E>> + Send, Fut: Future<Output = Option<E>> + Send,
E: Event + Clone + Send + Sync, E: Event + Clone + Send + Sync,
E::Id: Borrow<EventId> + Clone + Send + Sync,
{ {
debug!("mainline sort of events"); debug!("mainline sort of events");
@ -688,7 +672,7 @@ where
pl = None; pl = None;
for aid in event.auth_events() { for aid in event.auth_events() {
let ev = fetch_event(aid.clone()) let ev = fetch_event(aid.to_owned())
.await .await
.ok_or_else(|| Error::NotFound(format!("Failed to find {aid}")))?; .ok_or_else(|| Error::NotFound(format!("Failed to find {aid}")))?;
@ -734,26 +718,25 @@ where
/// that has an associated mainline depth. /// that has an associated mainline depth.
async fn get_mainline_depth<E, F, Fut>( async fn get_mainline_depth<E, F, Fut>(
mut event: Option<E>, mut event: Option<E>,
mainline_map: &HashMap<E::Id, usize>, mainline_map: &HashMap<OwnedEventId, usize>,
fetch_event: &F, fetch_event: &F,
) -> Result<usize> ) -> Result<usize>
where where
F: Fn(E::Id) -> Fut + Sync, F: Fn(OwnedEventId) -> Fut + Sync,
Fut: Future<Output = Option<E>> + Send, Fut: Future<Output = Option<E>> + Send,
E: Event + Send + Sync, E: Event + Send + Sync,
E::Id: Borrow<EventId> + Send + Sync,
{ {
while let Some(sort_ev) = event { while let Some(sort_ev) = event {
debug!(event_id = sort_ev.event_id().borrow().as_str(), "mainline"); debug!(event_id = sort_ev.event_id().as_str(), "mainline");
let id = sort_ev.event_id(); let id = sort_ev.event_id();
if let Some(depth) = mainline_map.get(id.borrow()) { if let Some(depth) = mainline_map.get(id) {
return Ok(*depth); return Ok(*depth);
} }
event = None; event = None;
for aid in sort_ev.auth_events() { for aid in sort_ev.auth_events() {
let aev = fetch_event(aid.clone()) let aev = fetch_event(aid.to_owned())
.await .await
.ok_or_else(|| Error::NotFound(format!("Failed to find {aid}")))?; .ok_or_else(|| Error::NotFound(format!("Failed to find {aid}")))?;
@ -768,15 +751,14 @@ where
} }
async fn add_event_and_auth_chain_to_graph<E, F, Fut>( async fn add_event_and_auth_chain_to_graph<E, F, Fut>(
graph: &mut HashMap<E::Id, HashSet<E::Id>>, graph: &mut HashMap<OwnedEventId, HashSet<OwnedEventId>>,
event_id: E::Id, event_id: OwnedEventId,
auth_diff: &HashSet<E::Id>, auth_diff: &HashSet<OwnedEventId>,
fetch_event: &F, fetch_event: &F,
) where ) where
F: Fn(E::Id) -> Fut + Sync, F: Fn(OwnedEventId) -> Fut + Sync,
Fut: Future<Output = Option<E>> + Send, Fut: Future<Output = Option<E>> + Send,
E: Event + Send + Sync, E: Event + Send + Sync,
E::Id: Borrow<EventId> + Clone + Send + Sync,
{ {
let mut state = vec![event_id]; let mut state = vec![event_id];
while let Some(eid) = state.pop() { while let Some(eid) = state.pop() {
@ -786,26 +768,27 @@ async fn add_event_and_auth_chain_to_graph<E, F, Fut>(
// Prefer the store to event as the store filters dedups the events // Prefer the store to event as the store filters dedups the events
for aid in auth_events { for aid in auth_events {
if auth_diff.contains(aid.borrow()) { if auth_diff.contains(aid) {
if !graph.contains_key(aid.borrow()) { if !graph.contains_key(aid) {
state.push(aid.to_owned()); state.push(aid.to_owned());
} }
// We just inserted this at the start of the while loop graph
graph.get_mut(eid.borrow()).unwrap().insert(aid.to_owned()); .get_mut(&eid)
.expect("We just inserted this at the start of the while loop")
.insert(aid.to_owned());
} }
} }
} }
} }
async fn is_power_event_id<E, F, Fut>(event_id: &E::Id, fetch: &F) -> bool async fn is_power_event_id<E, F, Fut>(event_id: &EventId, fetch: &F) -> bool
where where
F: Fn(E::Id) -> Fut + Sync, F: Fn(OwnedEventId) -> Fut + Sync,
Fut: Future<Output = Option<E>> + Send, Fut: Future<Output = Option<E>> + Send,
E: Event + Send, E: Event + Send,
E::Id: Borrow<EventId> + Send + Sync,
{ {
match fetch(event_id.clone()).await.as_ref() { match fetch(event_id.to_owned()).await.as_ref() {
| Some(state) => is_power_event(state), | Some(state) => is_power_event(state),
| _ => false, | _ => false,
} }
@ -909,13 +892,13 @@ mod tests {
let fetcher = |id| ready(events.get(&id).cloned()); let fetcher = |id| ready(events.get(&id).cloned());
let sorted_power_events = let sorted_power_events =
super::reverse_topological_power_sort(power_events, &auth_chain, &fetcher, 1) super::reverse_topological_power_sort(power_events, &auth_chain, &fetcher)
.await .await
.unwrap(); .unwrap();
let resolved_power = super::iterative_auth_check( let resolved_power = super::iterative_auth_check(
&RoomVersion::V6, &RoomVersion::V6,
sorted_power_events.iter().stream(), sorted_power_events.iter().map(AsRef::as_ref).stream(),
HashMap::new(), // unconflicted events HashMap::new(), // unconflicted events
&fetcher, &fetcher,
) )
@ -1300,7 +1283,7 @@ mod tests {
let ev_map = store.0.clone(); let ev_map = store.0.clone();
let fetcher = |id| ready(ev_map.get(&id).cloned()); let fetcher = |id| ready(ev_map.get(&id).cloned());
let exists = |id: <PduEvent as Event>::Id| ready(ev_map.get(&*id).is_some()); let exists = |id: OwnedEventId| ready(ev_map.get(&*id).is_some());
let state_sets = [state_at_bob, state_at_charlie]; let state_sets = [state_at_bob, state_at_charlie];
let auth_chain: Vec<_> = state_sets let auth_chain: Vec<_> = state_sets
@ -1312,14 +1295,8 @@ mod tests {
}) })
.collect(); .collect();
let resolved = match super::resolve( let resolved =
&RoomVersionId::V2, match super::resolve(&RoomVersionId::V2, &state_sets, &auth_chain, &fetcher, &exists)
&state_sets,
&auth_chain,
&fetcher,
&exists,
1,
)
.await .await
{ {
| Ok(state) => state, | Ok(state) => state,
@ -1429,16 +1406,10 @@ mod tests {
}) })
.collect(); .collect();
let fetcher = |id: <PduEvent as Event>::Id| ready(ev_map.get(&id).cloned()); let fetcher = |id: OwnedEventId| ready(ev_map.get(&id).cloned());
let exists = |id: <PduEvent as Event>::Id| ready(ev_map.get(&id).is_some()); let exists = |id: OwnedEventId| ready(ev_map.get(&id).is_some());
let resolved = match super::resolve( let resolved =
&RoomVersionId::V6, match super::resolve(&RoomVersionId::V6, &state_sets, &auth_chain, &fetcher, &exists)
&state_sets,
&auth_chain,
&fetcher,
&exists,
1,
)
.await .await
{ {
| Ok(state) => state, | Ok(state) => state,

View file

@ -133,16 +133,10 @@ pub(crate) async fn do_check(
.collect(); .collect();
let event_map = &event_map; let event_map = &event_map;
let fetch = |id: <PduEvent as Event>::Id| ready(event_map.get(&id).cloned()); let fetch = |id: OwnedEventId| ready(event_map.get(&id).cloned());
let exists = |id: <PduEvent as Event>::Id| ready(event_map.get(&id).is_some()); let exists = |id: OwnedEventId| ready(event_map.get(&id).is_some());
let resolved = super::resolve( let resolved =
&RoomVersionId::V6, super::resolve(&RoomVersionId::V6, state_sets, &auth_chain_sets, &fetch, &exists)
state_sets,
&auth_chain_sets,
&fetch,
&exists,
1,
)
.await; .await;
match resolved { match resolved {
@ -247,8 +241,8 @@ impl<E: Event + Clone> TestStore<E> {
pub(crate) fn auth_event_ids( pub(crate) fn auth_event_ids(
&self, &self,
room_id: &RoomId, room_id: &RoomId,
event_ids: Vec<E::Id>, event_ids: Vec<OwnedEventId>,
) -> Result<HashSet<E::Id>> { ) -> Result<HashSet<OwnedEventId>> {
let mut result = HashSet::new(); let mut result = HashSet::new();
let mut stack = event_ids; let mut stack = event_ids;
@ -584,7 +578,7 @@ pub(crate) fn INITIAL_EDGES() -> Vec<OwnedEventId> {
pub(crate) mod event { pub(crate) mod event {
use ruma::{ use ruma::{
MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, UserId, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, UserId,
events::{TimelineEventType, pdu::Pdu}, events::{TimelineEventType, pdu::Pdu},
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -593,9 +587,7 @@ pub(crate) mod event {
use crate::Event; use crate::Event;
impl Event for PduEvent { impl Event for PduEvent {
type Id = OwnedEventId; fn event_id(&self) -> &EventId { &self.event_id }
fn event_id(&self) -> &Self::Id { &self.event_id }
fn room_id(&self) -> &RoomId { fn room_id(&self) -> &RoomId {
match &self.rest { match &self.rest {
@ -652,29 +644,31 @@ pub(crate) mod event {
} }
#[allow(refining_impl_trait)] #[allow(refining_impl_trait)]
fn prev_events(&self) -> Box<dyn DoubleEndedIterator<Item = &Self::Id> + Send + '_> { fn prev_events(&self) -> Box<dyn DoubleEndedIterator<Item = &EventId> + Send + '_> {
match &self.rest { match &self.rest {
| Pdu::RoomV1Pdu(ev) => Box::new(ev.prev_events.iter().map(|(id, _)| id)), | Pdu::RoomV1Pdu(ev) =>
| Pdu::RoomV3Pdu(ev) => Box::new(ev.prev_events.iter()), Box::new(ev.prev_events.iter().map(|(id, _)| id.as_ref())),
| Pdu::RoomV3Pdu(ev) => Box::new(ev.prev_events.iter().map(AsRef::as_ref)),
#[allow(unreachable_patterns)] #[allow(unreachable_patterns)]
| _ => unreachable!("new PDU version"), | _ => unreachable!("new PDU version"),
} }
} }
#[allow(refining_impl_trait)] #[allow(refining_impl_trait)]
fn auth_events(&self) -> Box<dyn DoubleEndedIterator<Item = &Self::Id> + Send + '_> { fn auth_events(&self) -> Box<dyn DoubleEndedIterator<Item = &EventId> + Send + '_> {
match &self.rest { match &self.rest {
| Pdu::RoomV1Pdu(ev) => Box::new(ev.auth_events.iter().map(|(id, _)| id)), | Pdu::RoomV1Pdu(ev) =>
| Pdu::RoomV3Pdu(ev) => Box::new(ev.auth_events.iter()), Box::new(ev.auth_events.iter().map(|(id, _)| id.as_ref())),
| Pdu::RoomV3Pdu(ev) => Box::new(ev.auth_events.iter().map(AsRef::as_ref)),
#[allow(unreachable_patterns)] #[allow(unreachable_patterns)]
| _ => unreachable!("new PDU version"), | _ => unreachable!("new PDU version"),
} }
} }
fn redacts(&self) -> Option<&Self::Id> { fn redacts(&self) -> Option<&EventId> {
match &self.rest { match &self.rest {
| Pdu::RoomV1Pdu(ev) => ev.redacts.as_ref(), | Pdu::RoomV1Pdu(ev) => ev.redacts.as_deref(),
| Pdu::RoomV3Pdu(ev) => ev.redacts.as_ref(), | Pdu::RoomV3Pdu(ev) => ev.redacts.as_deref(),
#[allow(unreachable_patterns)] #[allow(unreachable_patterns)]
| _ => unreachable!("new PDU version"), | _ => unreachable!("new PDU version"),
} }

View file

@ -29,7 +29,7 @@ fn descriptor_cf_options(
set_table_options(&mut opts, &desc, cache)?; set_table_options(&mut opts, &desc, cache)?;
opts.set_min_write_buffer_number(1); opts.set_min_write_buffer_number(1);
opts.set_max_write_buffer_number(2); opts.set_max_write_buffer_number(3);
opts.set_write_buffer_size(desc.write_size); opts.set_write_buffer_size(desc.write_size);
opts.set_target_file_size_base(desc.file_size); opts.set_target_file_size_base(desc.file_size);

View file

@ -98,12 +98,7 @@ pub(super) fn shutdown(server: &Arc<Server>, runtime: tokio::runtime::Runtime) {
Level::INFO Level::INFO
}; };
debug!( wait_shutdown(server, runtime);
timeout = ?SHUTDOWN_TIMEOUT,
"Waiting for runtime..."
);
runtime.shutdown_timeout(SHUTDOWN_TIMEOUT);
let runtime_metrics = server.server.metrics.runtime_interval().unwrap_or_default(); let runtime_metrics = server.server.metrics.runtime_interval().unwrap_or_default();
event!(LEVEL, ?runtime_metrics, "Final runtime metrics"); event!(LEVEL, ?runtime_metrics, "Final runtime metrics");
@ -111,13 +106,23 @@ pub(super) fn shutdown(server: &Arc<Server>, runtime: tokio::runtime::Runtime) {
#[cfg(not(tokio_unstable))] #[cfg(not(tokio_unstable))]
#[tracing::instrument(name = "stop", level = "info", skip_all)] #[tracing::instrument(name = "stop", level = "info", skip_all)]
pub(super) fn shutdown(_server: &Arc<Server>, runtime: tokio::runtime::Runtime) { pub(super) fn shutdown(server: &Arc<Server>, runtime: tokio::runtime::Runtime) {
wait_shutdown(server, runtime);
}
fn wait_shutdown(_server: &Arc<Server>, runtime: tokio::runtime::Runtime) {
debug!( debug!(
timeout = ?SHUTDOWN_TIMEOUT, timeout = ?SHUTDOWN_TIMEOUT,
"Waiting for runtime..." "Waiting for runtime..."
); );
runtime.shutdown_timeout(SHUTDOWN_TIMEOUT); runtime.shutdown_timeout(SHUTDOWN_TIMEOUT);
// Join any jemalloc threads so they don't appear in use at exit.
#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))]
conduwuit_core::alloc::je::background_thread_enable(false)
.log_debug_err()
.ok();
} }
#[tracing::instrument( #[tracing::instrument(

View file

@ -4,7 +4,6 @@ mod execute;
mod grant; mod grant;
use std::{ use std::{
future::Future,
pin::Pin, pin::Pin,
sync::{Arc, RwLock as StdRwLock, Weak}, sync::{Arc, RwLock as StdRwLock, Weak},
}; };
@ -14,7 +13,7 @@ use conduwuit::{
Error, PduEvent, Result, Server, debug, err, error, error::default_log, pdu::PduBuilder, Error, PduEvent, Result, Server, debug, err, error, error::default_log, pdu::PduBuilder,
}; };
pub use create::create_admin_room; pub use create::create_admin_room;
use futures::{FutureExt, TryFutureExt}; use futures::{Future, FutureExt, TryFutureExt};
use loole::{Receiver, Sender}; use loole::{Receiver, Sender};
use ruma::{ use ruma::{
OwnedEventId, OwnedRoomId, RoomId, UserId, OwnedEventId, OwnedRoomId, RoomId, UserId,

View file

@ -306,14 +306,12 @@ impl super::Service {
#[tracing::instrument(name = "srv", level = "debug", skip(self))] #[tracing::instrument(name = "srv", level = "debug", skip(self))]
async fn query_srv_record(&self, hostname: &'_ str) -> Result<Option<FedDest>> { async fn query_srv_record(&self, hostname: &'_ str) -> Result<Option<FedDest>> {
let hostnames =
[format!("_matrix-fed._tcp.{hostname}."), format!("_matrix._tcp.{hostname}.")];
for hostname in hostnames {
self.services.server.check_running()?; self.services.server.check_running()?;
debug!("querying SRV for {hostname:?}"); debug!("querying SRV for {hostname:?}");
let hostname = hostname.trim_end_matches('.');
let hostname_suffix = format!("_matrix-fed._tcp.{hostname}.");
let hostname = hostname_suffix.trim_end_matches('.');
match self.resolver.resolver.srv_lookup(hostname).await { match self.resolver.resolver.srv_lookup(hostname).await {
| Err(e) => Self::handle_resolve_error(&e, hostname)?, | Err(e) => Self::handle_resolve_error(&e, hostname)?,
| Ok(result) => { | Ok(result) => {
@ -328,7 +326,6 @@ impl super::Service {
})); }));
}, },
} }
}
Ok(None) Ok(None)
} }

View file

@ -8,7 +8,7 @@ use conduwuit::{
Error, Result, err, implement, Error, Result, err, implement,
state_res::{self, StateMap}, state_res::{self, StateMap},
trace, trace,
utils::stream::{IterStream, ReadyExt, TryWidebandExt, WidebandExt, automatic_width}, utils::stream::{IterStream, ReadyExt, TryWidebandExt, WidebandExt},
}; };
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, future::try_join}; use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, future::try_join};
use ruma::{OwnedEventId, RoomId, RoomVersionId}; use ruma::{OwnedEventId, RoomId, RoomVersionId};
@ -112,14 +112,7 @@ where
{ {
let event_fetch = |event_id| self.event_fetch(event_id); let event_fetch = |event_id| self.event_fetch(event_id);
let event_exists = |event_id| self.event_exists(event_id); let event_exists = |event_id| self.event_exists(event_id);
state_res::resolve( state_res::resolve(room_version, state_sets, auth_chain_sets, &event_fetch, &event_exists)
room_version,
state_sets,
auth_chain_sets,
&event_fetch,
&event_exists,
automatic_width(),
)
.map_err(|e| err!(error!("State resolution failed: {e:?}"))) .map_err(|e| err!(error!("State resolution failed: {e:?}")))
.await .await
} }