Outdent state_compressor service.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2025-04-29 06:55:54 +00:00 committed by Jade Ellis
parent c5c309ec43
commit 56420a67ca
No known key found for this signature in database
GPG key ID: 8705A2A3EBF77BD2

View file

@ -9,7 +9,7 @@ use async_trait::async_trait;
use conduwuit::{ use conduwuit::{
Result, Result,
arrayvec::ArrayVec, arrayvec::ArrayVec,
at, checked, err, expected, utils, at, checked, err, expected, implement, utils,
utils::{bytes, math::usize_from_f64, stream::IterStream}, utils::{bytes, math::usize_from_f64, stream::IterStream},
}; };
use database::Map; use database::Map;
@ -115,29 +115,30 @@ impl crate::Service for Service {
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
} }
impl Service { /// Returns a stack with info on shortstatehash, full state, added diff and
/// Returns a stack with info on shortstatehash, full state, added diff and /// removed diff for the selected shortstatehash and each parent layer.
/// removed diff for the selected shortstatehash and each parent layer. #[implement(Service)]
#[tracing::instrument(name = "load", level = "debug", skip(self))] #[tracing::instrument(name = "load", level = "debug", skip(self))]
pub async fn load_shortstatehash_info( pub async fn load_shortstatehash_info(
&self, &self,
shortstatehash: ShortStateHash, shortstatehash: ShortStateHash,
) -> Result<ShortStateInfoVec> { ) -> Result<ShortStateInfoVec> {
if let Some(r) = self.stateinfo_cache.lock()?.get_mut(&shortstatehash) { if let Some(r) = self.stateinfo_cache.lock()?.get_mut(&shortstatehash) {
return Ok(r.clone()); return Ok(r.clone());
}
let stack = self.new_shortstatehash_info(shortstatehash).await?;
self.cache_shortstatehash_info(shortstatehash, stack.clone())
.await?;
Ok(stack)
} }
/// Returns a stack with info on shortstatehash, full state, added diff and let stack = self.new_shortstatehash_info(shortstatehash).await?;
/// removed diff for the selected shortstatehash and each parent layer.
#[tracing::instrument( self.cache_shortstatehash_info(shortstatehash, stack.clone())
.await?;
Ok(stack)
}
/// Returns a stack with info on shortstatehash, full state, added diff and
/// removed diff for the selected shortstatehash and each parent layer.
#[implement(Service)]
#[tracing::instrument(
name = "cache", name = "cache",
level = "debug", level = "debug",
skip_all, skip_all,
@ -146,362 +147,365 @@ impl Service {
stack = stack.len(), stack = stack.len(),
), ),
)] )]
async fn cache_shortstatehash_info( async fn cache_shortstatehash_info(
&self, &self,
shortstatehash: ShortStateHash, shortstatehash: ShortStateHash,
stack: ShortStateInfoVec, stack: ShortStateInfoVec,
) -> Result { ) -> Result {
self.stateinfo_cache.lock()?.insert(shortstatehash, stack); self.stateinfo_cache.lock()?.insert(shortstatehash, stack);
Ok(()) Ok(())
} }
async fn new_shortstatehash_info( #[implement(Service)]
&self, async fn new_shortstatehash_info(
shortstatehash: ShortStateHash, &self,
) -> Result<ShortStateInfoVec> { shortstatehash: ShortStateHash,
let StateDiff { parent, added, removed } = self.get_statediff(shortstatehash).await?; ) -> Result<ShortStateInfoVec> {
let StateDiff { parent, added, removed } = self.get_statediff(shortstatehash).await?;
let Some(parent) = parent else { let Some(parent) = parent else {
return Ok(vec![ShortStateInfo { return Ok(vec![ShortStateInfo {
shortstatehash,
full_state: added.clone(),
added,
removed,
}]);
};
let mut stack = Box::pin(self.load_shortstatehash_info(parent)).await?;
let top = stack.last().expect("at least one frame");
let mut full_state = (*top.full_state).clone();
full_state.extend(added.iter().copied());
let removed = (*removed).clone();
for r in &removed {
full_state.remove(r);
}
stack.push(ShortStateInfo {
shortstatehash, shortstatehash,
full_state: added.clone(),
added, added,
removed: Arc::new(removed), removed,
full_state: Arc::new(full_state), }]);
}); };
Ok(stack) let mut stack = Box::pin(self.load_shortstatehash_info(parent)).await?;
let top = stack.last().expect("at least one frame");
let mut full_state = (*top.full_state).clone();
full_state.extend(added.iter().copied());
let removed = (*removed).clone();
for r in &removed {
full_state.remove(r);
} }
pub fn compress_state_events<'a, I>( stack.push(ShortStateInfo {
&'a self, shortstatehash,
state: I, added,
) -> impl Stream<Item = CompressedStateEvent> + Send + 'a removed: Arc::new(removed),
where full_state: Arc::new(full_state),
I: Iterator<Item = (&'a ShortStateKey, &'a EventId)> + Clone + Debug + Send + 'a, });
{
let event_ids = state.clone().map(at!(1));
let short_event_ids = self Ok(stack)
.services }
.short
.multi_get_or_create_shorteventid(event_ids);
state #[implement(Service)]
.stream() pub fn compress_state_events<'a, I>(
.map(at!(0)) &'a self,
.zip(short_event_ids) state: I,
.map(|(shortstatekey, shorteventid)| { ) -> impl Stream<Item = CompressedStateEvent> + Send + 'a
compress_state_event(*shortstatekey, shorteventid) where
}) I: Iterator<Item = (&'a ShortStateKey, &'a EventId)> + Clone + Debug + Send + 'a,
} {
let event_ids = state.clone().map(at!(1));
pub async fn compress_state_event( let short_event_ids = self
&self, .services
shortstatekey: ShortStateKey, .short
event_id: &EventId, .multi_get_or_create_shorteventid(event_ids);
) -> CompressedStateEvent {
let shorteventid = self
.services
.short
.get_or_create_shorteventid(event_id)
.await;
compress_state_event(shortstatekey, shorteventid) state
} .stream()
.map(at!(0))
.zip(short_event_ids)
.map(|(shortstatekey, shorteventid)| compress_state_event(*shortstatekey, shorteventid))
}
/// Creates a new shortstatehash that often is just a diff to an already #[implement(Service)]
/// existing shortstatehash and therefore very efficient. pub async fn compress_state_event(
/// &self,
/// There are multiple layers of diffs. The bottom layer 0 always contains shortstatekey: ShortStateKey,
/// the full state. Layer 1 contains diffs to states of layer 0, layer 2 event_id: &EventId,
/// diffs to layer 1 and so on. If layer n > 0 grows too big, it will be ) -> CompressedStateEvent {
/// combined with layer n-1 to create a new diff on layer n-1 that's let shorteventid = self
/// based on layer n-2. If that layer is also too big, it will recursively .services
/// fix above layers too. .short
/// .get_or_create_shorteventid(event_id)
/// * `shortstatehash` - Shortstatehash of this state .await;
/// * `statediffnew` - Added to base. Each vec is shortstatekey+shorteventid
/// * `statediffremoved` - Removed from base. Each vec is
/// shortstatekey+shorteventid
/// * `diff_to_sibling` - Approximately how much the diff grows each time
/// for this layer
/// * `parent_states` - A stack with info on shortstatehash, full state,
/// added diff and removed diff for each parent layer
pub fn save_state_from_diff(
&self,
shortstatehash: ShortStateHash,
statediffnew: Arc<CompressedState>,
statediffremoved: Arc<CompressedState>,
diff_to_sibling: usize,
mut parent_states: ParentStatesVec,
) -> Result {
let statediffnew_len = statediffnew.len();
let statediffremoved_len = statediffremoved.len();
let diffsum = checked!(statediffnew_len + statediffremoved_len)?;
if parent_states.len() > 3 { compress_state_event(shortstatekey, shorteventid)
// Number of layers }
// To many layers, we have to go deeper
let parent = parent_states.pop().expect("parent must have a state");
let mut parent_new = (*parent.added).clone(); /// Creates a new shortstatehash that often is just a diff to an already
let mut parent_removed = (*parent.removed).clone(); /// existing shortstatehash and therefore very efficient.
///
for removed in statediffremoved.iter() { /// There are multiple layers of diffs. The bottom layer 0 always contains
if !parent_new.remove(removed) { /// the full state. Layer 1 contains diffs to states of layer 0, layer 2
// It was not added in the parent and we removed it /// diffs to layer 1 and so on. If layer n > 0 grows too big, it will be
parent_removed.insert(*removed); /// combined with layer n-1 to create a new diff on layer n-1 that's
} /// based on layer n-2. If that layer is also too big, it will recursively
// Else it was added in the parent and we removed it again. We /// fix above layers too.
// can forget this change ///
} /// * `shortstatehash` - Shortstatehash of this state
/// * `statediffnew` - Added to base. Each vec is shortstatekey+shorteventid
for new in statediffnew.iter() { /// * `statediffremoved` - Removed from base. Each vec is
if !parent_removed.remove(new) { /// shortstatekey+shorteventid
// It was not touched in the parent and we added it /// * `diff_to_sibling` - Approximately how much the diff grows each time for
parent_new.insert(*new); /// this layer
} /// * `parent_states` - A stack with info on shortstatehash, full state, added
// Else it was removed in the parent and we added it again. We /// diff and removed diff for each parent layer
// can forget this change #[implement(Service)]
} pub fn save_state_from_diff(
&self,
self.save_state_from_diff( shortstatehash: ShortStateHash,
shortstatehash, statediffnew: Arc<CompressedState>,
Arc::new(parent_new), statediffremoved: Arc<CompressedState>,
Arc::new(parent_removed), diff_to_sibling: usize,
diffsum, mut parent_states: ParentStatesVec,
parent_states, ) -> Result {
)?; let statediffnew_len = statediffnew.len();
let statediffremoved_len = statediffremoved.len();
return Ok(()); let diffsum = checked!(statediffnew_len + statediffremoved_len)?;
}
if parent_states.is_empty() {
// There is no parent layer, create a new state
self.save_statediff(shortstatehash, &StateDiff {
parent: None,
added: statediffnew,
removed: statediffremoved,
});
return Ok(());
}
// Else we have two options.
// 1. We add the current diff on top of the parent layer.
// 2. We replace a layer above
if parent_states.len() > 3 {
// Number of layers
// To many layers, we have to go deeper
let parent = parent_states.pop().expect("parent must have a state"); let parent = parent_states.pop().expect("parent must have a state");
let parent_added_len = parent.added.len();
let parent_removed_len = parent.removed.len();
let parent_diff = checked!(parent_added_len + parent_removed_len)?;
if checked!(diffsum * diffsum)? >= checked!(2 * diff_to_sibling * parent_diff)? { let mut parent_new = (*parent.added).clone();
// Diff too big, we replace above layer(s) let mut parent_removed = (*parent.removed).clone();
let mut parent_new = (*parent.added).clone();
let mut parent_removed = (*parent.removed).clone();
for removed in statediffremoved.iter() { for removed in statediffremoved.iter() {
if !parent_new.remove(removed) { if !parent_new.remove(removed) {
// It was not added in the parent and we removed it // It was not added in the parent and we removed it
parent_removed.insert(*removed); parent_removed.insert(*removed);
}
// Else it was added in the parent and we removed it again. We
// can forget this change
} }
// Else it was added in the parent and we removed it again. We
for new in statediffnew.iter() { // can forget this change
if !parent_removed.remove(new) {
// It was not touched in the parent and we added it
parent_new.insert(*new);
}
// Else it was removed in the parent and we added it again. We
// can forget this change
}
self.save_state_from_diff(
shortstatehash,
Arc::new(parent_new),
Arc::new(parent_removed),
diffsum,
parent_states,
)?;
} else {
// Diff small enough, we add diff as layer on top of parent
self.save_statediff(shortstatehash, &StateDiff {
parent: Some(parent.shortstatehash),
added: statediffnew,
removed: statediffremoved,
});
} }
Ok(()) for new in statediffnew.iter() {
if !parent_removed.remove(new) {
// It was not touched in the parent and we added it
parent_new.insert(*new);
}
// Else it was removed in the parent and we added it again. We
// can forget this change
}
self.save_state_from_diff(
shortstatehash,
Arc::new(parent_new),
Arc::new(parent_removed),
diffsum,
parent_states,
)?;
return Ok(());
} }
/// Returns the new shortstatehash, and the state diff from the previous if parent_states.is_empty() {
/// room state // There is no parent layer, create a new state
#[tracing::instrument(skip(self, new_state_ids_compressed), level = "debug")] self.save_statediff(shortstatehash, &StateDiff {
pub async fn save_state( parent: None,
&self,
room_id: &RoomId,
new_state_ids_compressed: Arc<CompressedState>,
) -> Result<HashSetCompressStateEvent> {
let previous_shortstatehash = self
.services
.state
.get_room_shortstatehash(room_id)
.await
.ok();
let state_hash =
utils::calculate_hash(new_state_ids_compressed.iter().map(|bytes| &bytes[..]));
let (new_shortstatehash, already_existed) = self
.services
.short
.get_or_create_shortstatehash(&state_hash)
.await;
if Some(new_shortstatehash) == previous_shortstatehash {
return Ok(HashSetCompressStateEvent {
shortstatehash: new_shortstatehash,
..Default::default()
});
}
let states_parents = if let Some(p) = previous_shortstatehash {
self.load_shortstatehash_info(p).await.unwrap_or_default()
} else {
ShortStateInfoVec::new()
};
let (statediffnew, statediffremoved) =
if let Some(parent_stateinfo) = states_parents.last() {
let statediffnew: CompressedState = new_state_ids_compressed
.difference(&parent_stateinfo.full_state)
.copied()
.collect();
let statediffremoved: CompressedState = parent_stateinfo
.full_state
.difference(&new_state_ids_compressed)
.copied()
.collect();
(Arc::new(statediffnew), Arc::new(statediffremoved))
} else {
(new_state_ids_compressed, Arc::new(CompressedState::new()))
};
if !already_existed {
self.save_state_from_diff(
new_shortstatehash,
statediffnew.clone(),
statediffremoved.clone(),
2, // every state change is 2 event changes on average
states_parents,
)?;
}
Ok(HashSetCompressStateEvent {
shortstatehash: new_shortstatehash,
added: statediffnew, added: statediffnew,
removed: statediffremoved, removed: statediffremoved,
}) });
return Ok(());
} }
#[tracing::instrument(skip(self), level = "debug", name = "get")] // Else we have two options.
async fn get_statediff(&self, shortstatehash: ShortStateHash) -> Result<StateDiff> { // 1. We add the current diff on top of the parent layer.
const BUFSIZE: usize = size_of::<ShortStateHash>(); // 2. We replace a layer above
const STRIDE: usize = size_of::<ShortStateHash>();
let value = self let parent = parent_states.pop().expect("parent must have a state");
.db let parent_added_len = parent.added.len();
.shortstatehash_statediff let parent_removed_len = parent.removed.len();
.aqry::<BUFSIZE, _>(&shortstatehash) let parent_diff = checked!(parent_added_len + parent_removed_len)?;
.await
.map_err(|e| {
err!(Database("Failed to find StateDiff from short {shortstatehash:?}: {e}"))
})?;
let parent = utils::u64_from_bytes(&value[0..size_of::<u64>()]) if checked!(diffsum * diffsum)? >= checked!(2 * diff_to_sibling * parent_diff)? {
.ok() // Diff too big, we replace above layer(s)
.take_if(|parent| *parent != 0); let mut parent_new = (*parent.added).clone();
let mut parent_removed = (*parent.removed).clone();
debug_assert!(value.len() % STRIDE == 0, "value not aligned to stride"); for removed in statediffremoved.iter() {
let _num_values = value.len() / STRIDE; if !parent_new.remove(removed) {
// It was not added in the parent and we removed it
let mut add_mode = true; parent_removed.insert(*removed);
let mut added = CompressedState::new();
let mut removed = CompressedState::new();
let mut i = STRIDE;
while let Some(v) = value.get(i..expected!(i + 2 * STRIDE)) {
if add_mode && v.starts_with(&0_u64.to_be_bytes()) {
add_mode = false;
i = expected!(i + STRIDE);
continue;
} }
if add_mode { // Else it was added in the parent and we removed it again. We
added.insert(v.try_into()?); // can forget this change
} else {
removed.insert(v.try_into()?);
}
i = expected!(i + 2 * STRIDE);
} }
Ok(StateDiff { for new in statediffnew.iter() {
parent, if !parent_removed.remove(new) {
added: Arc::new(added), // It was not touched in the parent and we added it
removed: Arc::new(removed), parent_new.insert(*new);
}) }
// Else it was removed in the parent and we added it again. We
// can forget this change
}
self.save_state_from_diff(
shortstatehash,
Arc::new(parent_new),
Arc::new(parent_removed),
diffsum,
parent_states,
)?;
} else {
// Diff small enough, we add diff as layer on top of parent
self.save_statediff(shortstatehash, &StateDiff {
parent: Some(parent.shortstatehash),
added: statediffnew,
removed: statediffremoved,
});
} }
fn save_statediff(&self, shortstatehash: ShortStateHash, diff: &StateDiff) { Ok(())
let mut value = Vec::<u8>::with_capacity( }
2_usize
.saturating_add(diff.added.len())
.saturating_add(diff.removed.len()),
);
let parent = diff.parent.unwrap_or(0_u64); /// Returns the new shortstatehash, and the state diff from the previous
value.extend_from_slice(&parent.to_be_bytes()); /// room state
#[implement(Service)]
#[tracing::instrument(skip(self, new_state_ids_compressed), level = "debug")]
pub async fn save_state(
&self,
room_id: &RoomId,
new_state_ids_compressed: Arc<CompressedState>,
) -> Result<HashSetCompressStateEvent> {
let previous_shortstatehash = self
.services
.state
.get_room_shortstatehash(room_id)
.await
.ok();
for new in diff.added.iter() { let state_hash =
value.extend_from_slice(&new[..]); utils::calculate_hash(new_state_ids_compressed.iter().map(|bytes| &bytes[..]));
}
if !diff.removed.is_empty() { let (new_shortstatehash, already_existed) = self
value.extend_from_slice(&0_u64.to_be_bytes()); .services
for removed in diff.removed.iter() { .short
value.extend_from_slice(&removed[..]); .get_or_create_shortstatehash(&state_hash)
} .await;
}
self.db if Some(new_shortstatehash) == previous_shortstatehash {
.shortstatehash_statediff return Ok(HashSetCompressStateEvent {
.insert(&shortstatehash.to_be_bytes(), &value); shortstatehash: new_shortstatehash,
..Default::default()
});
} }
let states_parents = if let Some(p) = previous_shortstatehash {
self.load_shortstatehash_info(p).await.unwrap_or_default()
} else {
ShortStateInfoVec::new()
};
let (statediffnew, statediffremoved) = if let Some(parent_stateinfo) = states_parents.last() {
let statediffnew: CompressedState = new_state_ids_compressed
.difference(&parent_stateinfo.full_state)
.copied()
.collect();
let statediffremoved: CompressedState = parent_stateinfo
.full_state
.difference(&new_state_ids_compressed)
.copied()
.collect();
(Arc::new(statediffnew), Arc::new(statediffremoved))
} else {
(new_state_ids_compressed, Arc::new(CompressedState::new()))
};
if !already_existed {
self.save_state_from_diff(
new_shortstatehash,
statediffnew.clone(),
statediffremoved.clone(),
2, // every state change is 2 event changes on average
states_parents,
)?;
}
Ok(HashSetCompressStateEvent {
shortstatehash: new_shortstatehash,
added: statediffnew,
removed: statediffremoved,
})
}
#[implement(Service)]
#[tracing::instrument(skip(self), level = "debug", name = "get")]
async fn get_statediff(&self, shortstatehash: ShortStateHash) -> Result<StateDiff> {
const BUFSIZE: usize = size_of::<ShortStateHash>();
const STRIDE: usize = size_of::<ShortStateHash>();
let value = self
.db
.shortstatehash_statediff
.aqry::<BUFSIZE, _>(&shortstatehash)
.await
.map_err(|e| {
err!(Database("Failed to find StateDiff from short {shortstatehash:?}: {e}"))
})?;
let parent = utils::u64_from_bytes(&value[0..size_of::<u64>()])
.ok()
.take_if(|parent| *parent != 0);
debug_assert!(value.len() % STRIDE == 0, "value not aligned to stride");
let _num_values = value.len() / STRIDE;
let mut add_mode = true;
let mut added = CompressedState::new();
let mut removed = CompressedState::new();
let mut i = STRIDE;
while let Some(v) = value.get(i..expected!(i + 2 * STRIDE)) {
if add_mode && v.starts_with(&0_u64.to_be_bytes()) {
add_mode = false;
i = expected!(i + STRIDE);
continue;
}
if add_mode {
added.insert(v.try_into()?);
} else {
removed.insert(v.try_into()?);
}
i = expected!(i + 2 * STRIDE);
}
Ok(StateDiff {
parent,
added: Arc::new(added),
removed: Arc::new(removed),
})
}
#[implement(Service)]
fn save_statediff(&self, shortstatehash: ShortStateHash, diff: &StateDiff) {
let mut value = Vec::<u8>::with_capacity(
2_usize
.saturating_add(diff.added.len())
.saturating_add(diff.removed.len()),
);
let parent = diff.parent.unwrap_or(0_u64);
value.extend_from_slice(&parent.to_be_bytes());
for new in diff.added.iter() {
value.extend_from_slice(&new[..]);
}
if !diff.removed.is_empty() {
value.extend_from_slice(&0_u64.to_be_bytes());
for removed in diff.removed.iter() {
value.extend_from_slice(&removed[..]);
}
}
self.db
.shortstatehash_statediff
.insert(&shortstatehash.to_be_bytes(), &value);
} }
#[inline] #[inline]