diff --git a/src/admin/room/commands.rs b/src/admin/room/commands.rs index 5b08ff2a..9075389f 100644 --- a/src/admin/room/commands.rs +++ b/src/admin/room/commands.rs @@ -83,3 +83,169 @@ pub(super) async fn purge_sync_tokens(&self, room: OwnedRoomOrAliasId) -> Result )) .await } + +#[admin_command] +pub(super) async fn purge_empty_room_tokens( + &self, + yes: bool, + target_disabled: bool, + target_banned: bool, + dry_run: bool, +) -> Result { + use conduwuit::{debug, info}; + + if !yes && !dry_run { + return Err!( + "Please confirm this operation with --yes as it may delete tokens from many rooms, \ + or use --dry-run to simulate" + ); + } + + let mode = if dry_run { "Simulating" } else { "Starting" }; + + let mut total_rooms_processed = 0; + let mut empty_rooms_processed = 0; + let mut total_tokens_deleted = 0; + let mut error_count = 0; + let mut skipped_rooms = 0; + + info!("{} purge of sync tokens for rooms with no local users", mode); + + // Get all rooms in the server + let all_rooms = self + .services + .rooms + .metadata + .iter_ids() + .collect::>() + .await; + + info!("Found {} rooms total on the server", all_rooms.len()); + + // Filter rooms based on options + let mut rooms = Vec::new(); + for room_id in all_rooms { + // Filter rooms based on targeting options + let is_disabled = self.services.rooms.metadata.is_disabled(room_id).await; + let is_banned = self.services.rooms.metadata.is_banned(room_id).await; + + // If targeting specific types of rooms, only include matching rooms + if (target_disabled || target_banned) + && !((target_disabled && is_disabled) || (target_banned && is_banned)) + { + debug!("Skipping room {} as it doesn't match targeting criteria", room_id); + skipped_rooms += 1; + continue; + } + + rooms.push(room_id); + } + + // Total number of rooms we'll be checking + let total_rooms = rooms.len(); + info!( + "Processing {} rooms after filtering (skipped {} rooms)", + total_rooms, skipped_rooms + ); + + // Process each room + for room_id in rooms { + total_rooms_processed += 1; + + // Count local users in this room + let local_users_count = self + .services + .rooms + .state_cache + .local_users_in_room(room_id) + .count() + .await; + + // Only process rooms with no local users + if local_users_count == 0 { + empty_rooms_processed += 1; + + // In dry run mode, just count what would be deleted, don't actually delete + debug!( + "Room {} has no local users, {}", + room_id, + if dry_run { + "would purge sync tokens" + } else { + "purging sync tokens" + } + ); + + if dry_run { + // For dry run mode, count tokens without deleting + match self.services.rooms.user.count_room_tokens(room_id).await { + | Ok(count) => + if count > 0 { + debug!("Would delete {} sync tokens for room {}", count, room_id); + total_tokens_deleted += count; + } else { + debug!("No sync tokens found for room {}", room_id); + }, + | Err(e) => { + debug!("Error counting sync tokens for room {}: {:?}", room_id, e); + error_count += 1; + }, + } + } else { + // Real deletion mode + match self.services.rooms.user.delete_room_tokens(room_id).await { + | Ok(count) => + if count > 0 { + debug!("Deleted {} sync tokens for room {}", count, room_id); + total_tokens_deleted += count; + } else { + debug!("No sync tokens found for room {}", room_id); + }, + | Err(e) => { + debug!("Error purging sync tokens for room {}: {:?}", room_id, e); + error_count += 1; + }, + } + } + } else { + debug!("Room {} has {} local users, skipping", room_id, local_users_count); + } + + // Log progress periodically + if total_rooms_processed % 100 == 0 || total_rooms_processed == total_rooms { + info!( + "Progress: {}/{} rooms processed, {} empty rooms found, {} tokens {}", + total_rooms_processed, + total_rooms, + empty_rooms_processed, + total_tokens_deleted, + if dry_run { "would be deleted" } else { "deleted" } + ); + } + } + + let action = if dry_run { "would be deleted" } else { "deleted" }; + info!( + "Finished {}: processed {} empty rooms out of {} total, {} tokens {}, errors: {}", + if dry_run { + "purge simulation" + } else { + "purging sync tokens" + }, + empty_rooms_processed, + total_rooms, + total_tokens_deleted, + action, + error_count + ); + + let mode_msg = if dry_run { "DRY RUN: " } else { "" }; + self.write_str(&format!( + "{}Successfully processed {empty_rooms_processed} empty rooms (out of {total_rooms} \ + total rooms), {total_tokens_deleted} tokens {}. Skipped {skipped_rooms} rooms based on \ + filters. Failed for {error_count} rooms.", + mode_msg, + if dry_run { "would be deleted" } else { "deleted" } + )) + .await +} diff --git a/src/admin/room/mod.rs b/src/admin/room/mod.rs index 0eac2224..61114b90 100644 --- a/src/admin/room/mod.rs +++ b/src/admin/room/mod.rs @@ -63,4 +63,27 @@ pub(super) enum RoomCommand { #[arg(value_parser)] room: OwnedRoomOrAliasId, }, + + /// - Delete sync tokens for all rooms that have no local users + /// + /// By default, processes all empty rooms. You can use --target-disabled + /// and/or --target-banned to exclusively process rooms matching those + /// conditions. + PurgeEmptyRoomTokens { + /// Confirm you want to delete tokens from potentially many rooms + #[arg(long)] + yes: bool, + + /// Only purge rooms that have federation disabled + #[arg(long)] + target_disabled: bool, + + /// Only purge rooms that have been banned + #[arg(long)] + target_banned: bool, + + /// Perform a dry run without actually deleting any tokens + #[arg(long)] + dry_run: bool, + }, } diff --git a/src/service/rooms/user/mod.rs b/src/service/rooms/user/mod.rs index cc72ac97..58df427b 100644 --- a/src/service/rooms/user/mod.rs +++ b/src/service/rooms/user/mod.rs @@ -128,6 +128,31 @@ pub async fn get_token_shortstatehash( .deserialized() } +/// Count how many sync tokens exist for a room without deleting them +/// +/// This is useful for dry runs to see how many tokens would be deleted +#[implement(Service)] +pub async fn count_room_tokens(&self, room_id: &RoomId) -> Result { + use futures::TryStreamExt; + + let shortroomid = self.services.short.get_shortroomid(room_id).await?; + + // Create a prefix to search by - all entries for this room will start with its + // short ID + let prefix = &[shortroomid]; + + // Collect all keys into a Vec and count them + let keys = self + .db + .roomsynctoken_shortstatehash + .keys_prefix_raw(prefix) + .map_ok(|_| ()) // We only need to count, not store the keys + .try_collect::>() + .await?; + + Ok(keys.len()) +} + /// Delete all sync tokens associated with a room /// /// This helps clean up the database as these tokens are never otherwise removed