Compare commits

...

5 commits

Author SHA1 Message Date
Jade Ellis
fad3c223fd fix: Use to_str methods on room IDs 2025-09-07 18:40:01 -07:00
Jade Ellis
5c5add30ec chore: cleanup 2025-09-07 18:40:01 -07:00
Jade Ellis
94193611d5 chore: Fix more complicated clippy warnings 2025-09-07 18:40:01 -07:00
Jade Ellis
087f3f3dd1 feat: Add command to purge sync tokens for empty rooms 2025-09-07 18:40:01 -07:00
Jade Ellis
45822d9f9e feat: Add admin command to delete sync tokens from a room 2025-09-07 18:40:01 -07:00
3 changed files with 266 additions and 2 deletions

View file

@ -1,6 +1,6 @@
use conduwuit::{Err, Result};
use futures::StreamExt;
use ruma::OwnedRoomId;
use ruma::{OwnedRoomId, OwnedRoomOrAliasId};
use crate::{PAGE_SIZE, admin_command, get_room_info};
@ -66,3 +66,185 @@ pub(super) async fn exists(&self, room_id: OwnedRoomId) -> Result {
self.write_str(&format!("{result}")).await
}
#[admin_command]
pub(super) async fn purge_sync_tokens(&self, room: OwnedRoomOrAliasId) -> Result {
// Resolve the room ID from the room or alias ID
let room_id = self.services.rooms.alias.resolve(&room).await?;
// Delete all tokens for this room using the service method
let Ok(deleted_count) = self.services.rooms.user.delete_room_tokens(&room_id).await else {
return Err!("Failed to delete sync tokens for room {}", room_id.as_str());
};
self.write_str(&format!(
"Successfully deleted {deleted_count} sync tokens for room {}",
room_id.as_str()
))
.await
}
/// Target options for room purging
#[derive(Default, Debug, clap::ValueEnum, Clone)]
pub(crate) enum RoomTargetOption {
#[default]
/// Target all rooms
All,
/// Target only disabled rooms
DisabledOnly,
/// Target only banned rooms
BannedOnly,
}
#[admin_command]
pub(super) async fn purge_all_sync_tokens(
&self,
target_option: Option<RoomTargetOption>,
execute: bool,
) -> Result {
use conduwuit::{debug, info};
let mode = if !execute { "Simulating" } else { "Starting" };
// strictly, we should check if these reach the max value after the loop and
// warn the user that the count is too large
let mut total_rooms_checked: usize = 0;
let mut total_tokens_deleted: usize = 0;
let mut error_count: u32 = 0;
let mut skipped_rooms: usize = 0;
info!("{} purge of sync tokens", mode);
// Get all rooms in the server
let all_rooms = self
.services
.rooms
.metadata
.iter_ids()
.collect::<Vec<_>>()
.await;
info!("Found {} rooms total on the server", all_rooms.len());
// Filter rooms based on options
let mut rooms = Vec::new();
for room_id in all_rooms {
if let Some(target) = &target_option {
match target {
| RoomTargetOption::DisabledOnly => {
if !self.services.rooms.metadata.is_disabled(room_id).await {
debug!("Skipping room {} as it's not disabled", room_id.as_str());
skipped_rooms = skipped_rooms.saturating_add(1);
continue;
}
},
| RoomTargetOption::BannedOnly => {
if !self.services.rooms.metadata.is_banned(room_id).await {
debug!("Skipping room {} as it's not banned", room_id.as_str());
skipped_rooms = skipped_rooms.saturating_add(1);
continue;
}
},
| RoomTargetOption::All => {},
}
}
rooms.push(room_id);
}
// Total number of rooms we'll be checking
let total_rooms = rooms.len();
info!(
"Processing {} rooms after filtering (skipped {} rooms)",
total_rooms, skipped_rooms
);
// Process each room
for room_id in rooms {
total_rooms_checked = total_rooms_checked.saturating_add(1);
// Log progress periodically
if total_rooms_checked % 100 == 0 || total_rooms_checked == total_rooms {
info!(
"Progress: {}/{} rooms checked, {} tokens {}",
total_rooms_checked,
total_rooms,
total_tokens_deleted,
if !execute { "would be deleted" } else { "deleted" }
);
}
// In dry run mode, just count what would be deleted, don't actually delete
debug!(
"Room {}: {}",
room_id.as_str(),
if !execute {
"would purge sync tokens"
} else {
"purging sync tokens"
}
);
if !execute {
// For dry run mode, count tokens without deleting
match self.services.rooms.user.count_room_tokens(room_id).await {
| Ok(count) =>
if count > 0 {
debug!(
"Would delete {} sync tokens for room {}",
count,
room_id.as_str()
);
total_tokens_deleted = total_tokens_deleted.saturating_add(count);
} else {
debug!("No sync tokens found for room {}", room_id.as_str());
},
| Err(e) => {
debug!("Error counting sync tokens for room {}: {:?}", room_id.as_str(), e);
error_count = error_count.saturating_add(1);
},
}
} else {
// Real deletion mode
match self.services.rooms.user.delete_room_tokens(room_id).await {
| Ok(count) =>
if count > 0 {
debug!("Deleted {} sync tokens for room {}", count, room_id.as_str());
total_tokens_deleted = total_tokens_deleted.saturating_add(count);
} else {
debug!("No sync tokens found for room {}", room_id.as_str());
},
| Err(e) => {
debug!("Error purging sync tokens for room {}: {:?}", room_id.as_str(), e);
error_count = error_count.saturating_add(1);
},
}
}
}
let action = if !execute { "would be deleted" } else { "deleted" };
info!(
"Finished {}: checked {} rooms out of {} total, {} tokens {}, errors: {}",
if !execute {
"purge simulation"
} else {
"purging sync tokens"
},
total_rooms_checked,
total_rooms,
total_tokens_deleted,
action,
error_count
);
self.write_str(&format!(
"Finished {}: checked {} rooms out of {} total, {} tokens {}, errors: {}",
if !execute { "simulation" } else { "purging sync tokens" },
total_rooms_checked,
total_rooms,
total_tokens_deleted,
action,
error_count
))
.await
}

View file

@ -5,8 +5,9 @@ mod info;
mod moderation;
use clap::Subcommand;
use commands::RoomTargetOption;
use conduwuit::Result;
use ruma::OwnedRoomId;
use ruma::{OwnedRoomId, OwnedRoomOrAliasId};
use self::{
alias::RoomAliasCommand, directory::RoomDirectoryCommand, info::RoomInfoCommand,
@ -56,4 +57,25 @@ pub enum RoomCommand {
Exists {
room_id: OwnedRoomId,
},
/// - Delete all sync tokens for a room
PurgeSyncTokens {
/// Room ID or alias to purge sync tokens for
#[arg(value_parser)]
room: OwnedRoomOrAliasId,
},
/// - Delete sync tokens for all rooms that have no local users
///
/// By default, processes all empty rooms.
PurgeAllSyncTokens {
/// Target specific room types
#[arg(long, value_enum)]
target_option: Option<RoomTargetOption>,
/// Execute token deletions. Otherwise,
/// Performs a dry run without actually deleting any tokens
#[arg(long)]
execute: bool,
},
}

View file

@ -127,3 +127,63 @@ pub async fn get_token_shortstatehash(
.await
.deserialized()
}
/// Count how many sync tokens exist for a room without deleting them
///
/// This is useful for dry runs to see how many tokens would be deleted
#[implement(Service)]
pub async fn count_room_tokens(&self, room_id: &RoomId) -> Result<usize> {
use futures::TryStreamExt;
let shortroomid = self.services.short.get_shortroomid(room_id).await?;
// Create a prefix to search by - all entries for this room will start with its
// short ID
let prefix = &[shortroomid];
// Collect all keys into a Vec and count them
let keys = self
.db
.roomsynctoken_shortstatehash
.keys_prefix_raw(prefix)
.map_ok(|_| ()) // We only need to count, not store the keys
.try_collect::<Vec<_>>()
.await?;
Ok(keys.len())
}
/// Delete all sync tokens associated with a room
///
/// This helps clean up the database as these tokens are never otherwise removed
#[implement(Service)]
pub async fn delete_room_tokens(&self, room_id: &RoomId) -> Result<usize> {
use futures::TryStreamExt;
let shortroomid = self.services.short.get_shortroomid(room_id).await?;
// Create a prefix to search by - all entries for this room will start with its
// short ID
let prefix = &[shortroomid];
// Collect all keys into a Vec first, then delete them
let keys = self
.db
.roomsynctoken_shortstatehash
.keys_prefix_raw(prefix)
.map_ok(|key| {
// Clone the key since we can't store references in the Vec
Vec::from(key)
})
.try_collect::<Vec<_>>()
.await?;
// Delete each key individually
for key in &keys {
self.db.roomsynctoken_shortstatehash.del(key);
}
let count = keys.len();
Ok(count)
}