Compare commits

...

23 commits

Author SHA1 Message Date
nexy7574
d4862b8ead style: Remove redundant, unused functions
Some checks failed
Documentation / Build and Deploy Documentation (push) Has been skipped
Checks / Prefligit / prefligit (push) Failing after 13s
Release Docker Image / define-variables (push) Failing after 5s
Release Docker Image / build-image (linux/amd64, release, linux-amd64, base) (push) Has been skipped
Release Docker Image / build-image (linux/arm64, release, linux-arm64, base) (push) Has been skipped
Release Docker Image / merge (push) Has been skipped
Checks / Rust / Format (push) Failing after 5s
Checks / Rust / Clippy (push) Failing after 11s
Checks / Rust / Cargo Test (push) Failing after 12s
2025-06-29 15:38:01 +00:00
Jade Ellis
acb74faa07 feat: Pass sender through admin commands 2025-06-29 15:38:01 +00:00
Jade Ellis
ecc6fda98b feat: Record metadata about user suspensions 2025-06-29 15:38:01 +00:00
nexy7574
13e17d52e0 style: Remove unnecessary imports (clippy) 2025-06-29 15:38:01 +00:00
nexy7574
d8a27eeb54 fix: Failing open on database errors
oops
2025-06-29 15:38:01 +00:00
nexy7574
eb2e3b3bb7 fix: Missing suspensions shouldn't error
Turns out copying and pasting the function
above verbatim actually introduces more
problems than it solves!
2025-06-29 15:38:01 +00:00
nexy7574
72f8cb3038 feat: Do not allow suspended users to send typing statuses 2025-06-29 15:38:01 +00:00
nexy7574
1124097bd1 feat: Only allow private read receipts when suspended 2025-06-29 15:38:01 +00:00
nexy7574
08527a2880 feat: Prevent suspended users upgrading rooms 2025-06-29 15:38:01 +00:00
nexy7574
8e06571e7c feat: Prevent suspended users uploading media 2025-06-29 15:38:01 +00:00
nexy7574
90180916eb feat: Prevent suspended users performing room changes
Prevents kicks, bans, unbans, and alias modification
2025-06-29 15:38:01 +00:00
nexy7574
d0548ec064 feat: Forbid suspended users from sending state events 2025-06-29 15:38:01 +00:00
nexy7574
1ff8af8e9e style: Remove unneeded statements (clippy) 2025-06-29 15:38:01 +00:00
nexy7574
cc864dc8bb feat: Do not allow suspending admin users 2025-06-29 15:38:01 +00:00
nexy7574
8791a9b851 fix: Inappropriate empty check
I once again, assumed `true` is actually `false`.
2025-06-29 15:38:01 +00:00
nexy7574
968c0e236c fix: Create the column appropriately 2025-06-29 15:38:01 +00:00
nexy7574
5d5350a9fe feat: Prevent suspended users creating new rooms 2025-06-29 15:38:01 +00:00
nexy7574
e127c4e5a2 feat: Add un/suspend admin commands 2025-06-29 15:38:01 +00:00
nexy7574
a94128e698 feat: Prevent suspended users joining/knocking on rooms 2025-06-29 15:38:01 +00:00
nexy7574
a6ba9e3045 feat: Prevent suspended users changing their profile 2025-06-29 15:38:01 +00:00
nexy7574
286974cb9a feat: Prevent suspended users redacting events 2025-06-29 15:38:01 +00:00
nexy7574
accfda2586 feat: Prevent suspended users sending events 2025-06-29 15:38:01 +00:00
nexy7574
fac9e090cd feat: Add suspension helper to user service 2025-06-29 15:38:01 +00:00
20 changed files with 276 additions and 66 deletions

View file

@ -7,13 +7,14 @@ use futures::{
io::{AsyncWriteExt, BufWriter},
lock::Mutex,
};
use ruma::EventId;
use ruma::{EventId, UserId};
pub(crate) struct Context<'a> {
pub(crate) services: &'a Services,
pub(crate) body: &'a [&'a str],
pub(crate) timer: SystemTime,
pub(crate) reply_id: Option<&'a EventId>,
pub(crate) sender: Option<&'a UserId>,
pub(crate) output: Mutex<BufWriter<Vec<u8>>>,
}
@ -36,4 +37,10 @@ impl Context<'_> {
output.write_all(s.as_bytes()).map_err(Into::into).await
})
}
/// Get the sender as a string, or service user ID if not available
pub(crate) fn sender_or_service_user(&self) -> &UserId {
self.sender
.unwrap_or_else(|| self.services.globals.server_user.as_ref())
}
}

View file

@ -63,6 +63,7 @@ async fn process_command(services: Arc<Services>, input: &CommandInput) -> Proce
body: &body,
timer: SystemTime::now(),
reply_id: input.reply_id.as_deref(),
sender: input.sender.as_deref(),
output: BufWriter::new(Vec::new()).into(),
};

View file

@ -224,6 +224,47 @@ pub(super) async fn deactivate(&self, no_leave_rooms: bool, user_id: String) ->
.await
}
#[admin_command]
pub(super) async fn suspend(&self, user_id: String) -> Result {
let user_id = parse_local_user_id(self.services, &user_id)?;
if user_id == self.services.globals.server_user {
return Err!("Not allowed to suspend the server service account.",);
}
if !self.services.users.exists(&user_id).await {
return Err!("User {user_id} does not exist.");
}
if self.services.users.is_admin(&user_id).await {
return Err!("Admin users cannot be suspended.");
}
// TODO: Record the actual user that sent the suspension where possible
self.services
.users
.suspend_account(&user_id, self.sender_or_service_user())
.await;
self.write_str(&format!("User {user_id} has been suspended."))
.await
}
#[admin_command]
pub(super) async fn unsuspend(&self, user_id: String) -> Result {
let user_id = parse_local_user_id(self.services, &user_id)?;
if user_id == self.services.globals.server_user {
return Err!("Not allowed to unsuspend the server service account.",);
}
if !self.services.users.exists(&user_id).await {
return Err!("User {user_id} does not exist.");
}
self.services.users.unsuspend_account(&user_id).await;
self.write_str(&format!("User {user_id} has been unsuspended."))
.await
}
#[admin_command]
pub(super) async fn reset_password(&self, username: String, password: Option<String>) -> Result {
let user_id = parse_local_user_id(self.services, &username)?;

View file

@ -59,6 +59,28 @@ pub(super) enum UserCommand {
force: bool,
},
/// - Suspend a user
///
/// Suspended users are able to log in, sync, and read messages, but are not
/// able to send events nor redact them, cannot change their profile, and
/// are unable to join, invite to, or knock on rooms.
///
/// Suspended users can still leave rooms and deactivate their account.
/// Suspending them effectively makes them read-only.
Suspend {
/// Username of the user to suspend
user_id: String,
},
/// - Unsuspend a user
///
/// Reverses the effects of the `suspend` command, allowing the user to send
/// messages, change their profile, create room invites, etc.
Unsuspend {
/// Username of the user to unsuspend
user_id: String,
},
/// - List local users in the database
#[clap(alias = "list")]
ListUsers,

View file

@ -18,6 +18,9 @@ pub(crate) async fn create_alias_route(
body: Ruma<create_alias::v3::Request>,
) -> Result<create_alias::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
if services.users.is_suspended(sender_user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
services
.rooms
@ -63,6 +66,9 @@ pub(crate) async fn delete_alias_route(
body: Ruma<delete_alias::v3::Request>,
) -> Result<delete_alias::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
if services.users.is_suspended(sender_user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
services
.rooms

View file

@ -128,6 +128,9 @@ pub(crate) async fn set_room_visibility_route(
// Return 404 if the room doesn't exist
return Err!(Request(NotFound("Room not found")));
}
if services.users.is_suspended(sender_user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
if services
.users

View file

@ -52,6 +52,9 @@ pub(crate) async fn create_content_route(
body: Ruma<create_content::v3::Request>,
) -> Result<create_content::v3::Response> {
let user = body.sender_user.as_ref().expect("user is authenticated");
if services.users.is_suspended(user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
let filename = body.filename.as_deref();
let content_type = body.content_type.as_deref();

View file

@ -178,6 +178,9 @@ pub(crate) async fn join_room_by_id_route(
body: Ruma<join_room_by_id::v3::Request>,
) -> Result<join_room_by_id::v3::Response> {
let sender_user = body.sender_user();
if services.users.is_suspended(sender_user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
banned_room_check(
&services,
@ -249,6 +252,9 @@ pub(crate) async fn join_room_by_id_or_alias_route(
let sender_user = body.sender_user.as_deref().expect("user is authenticated");
let appservice_info = &body.appservice_info;
let body = body.body;
if services.users.is_suspended(sender_user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
let (servers, room_id) = match OwnedRoomId::try_from(body.room_id_or_alias) {
| Ok(room_id) => {
@ -369,6 +375,9 @@ pub(crate) async fn knock_room_route(
) -> Result<knock_room::v3::Response> {
let sender_user = body.sender_user();
let body = &body.body;
if services.users.is_suspended(sender_user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
let (servers, room_id) = match OwnedRoomId::try_from(body.room_id_or_alias.clone()) {
| Ok(room_id) => {
@ -492,6 +501,9 @@ pub(crate) async fn invite_user_route(
body: Ruma<invite_user::v3::Request>,
) -> Result<invite_user::v3::Response> {
let sender_user = body.sender_user();
if services.users.is_suspended(sender_user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
if !services.users.is_admin(sender_user).await && services.config.block_non_admin_invites {
debug_error!(
@ -566,6 +578,10 @@ pub(crate) async fn kick_user_route(
State(services): State<crate::State>,
body: Ruma<kick_user::v3::Request>,
) -> Result<kick_user::v3::Response> {
let sender_user = body.sender_user();
if services.users.is_suspended(sender_user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
let state_lock = services.rooms.state.mutex.lock(&body.room_id).await;
let Ok(event) = services
@ -601,7 +617,7 @@ pub(crate) async fn kick_user_route(
third_party_invite: None,
..event
}),
body.sender_user(),
sender_user,
&body.room_id,
&state_lock,
)
@ -625,6 +641,10 @@ pub(crate) async fn ban_user_route(
return Err!(Request(Forbidden("You cannot ban yourself.")));
}
if services.users.is_suspended(sender_user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
let state_lock = services.rooms.state.mutex.lock(&body.room_id).await;
let current_member_content = services
@ -667,6 +687,10 @@ pub(crate) async fn unban_user_route(
State(services): State<crate::State>,
body: Ruma<unban_user::v3::Request>,
) -> Result<unban_user::v3::Response> {
let sender_user = body.sender_user();
if services.users.is_suspended(sender_user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
let state_lock = services.rooms.state.mutex.lock(&body.room_id).await;
let current_member_content = services
@ -695,7 +719,7 @@ pub(crate) async fn unban_user_route(
is_direct: None,
..current_member_content
}),
body.sender_user(),
sender_user,
&body.room_id,
&state_lock,
)

View file

@ -36,6 +36,9 @@ pub(crate) async fn set_displayname_route(
body: Ruma<set_display_name::v3::Request>,
) -> Result<set_display_name::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
if services.users.is_suspended(sender_user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
if *sender_user != body.user_id && body.appservice_info.is_none() {
return Err!(Request(Forbidden("You cannot update the profile of another user")));
@ -125,6 +128,9 @@ pub(crate) async fn set_avatar_url_route(
body: Ruma<set_avatar_url::v3::Request>,
) -> Result<set_avatar_url::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
if services.users.is_suspended(sender_user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
if *sender_user != body.user_id && body.appservice_info.is_none() {
return Err!(Request(Forbidden("You cannot update the profile of another user")));

View file

@ -58,14 +58,18 @@ pub(crate) async fn set_read_marker_route(
}
if let Some(event) = &body.read_receipt {
if !services.users.is_suspended(sender_user).await? {
let receipt_content = BTreeMap::from_iter([(
event.to_owned(),
BTreeMap::from_iter([(
ReceiptType::Read,
BTreeMap::from_iter([(sender_user.to_owned(), ruma::events::receipt::Receipt {
BTreeMap::from_iter([(
sender_user.to_owned(),
ruma::events::receipt::Receipt {
ts: Some(MilliSecondsSinceUnixEpoch::now()),
thread: ReceiptThread::Unthreaded,
})]),
},
)]),
)]),
)]);
@ -82,6 +86,7 @@ pub(crate) async fn set_read_marker_route(
)
.await;
}
}
if let Some(event) = &body.private_read_receipt {
let count = services

View file

@ -1,5 +1,5 @@
use axum::extract::State;
use conduwuit::{Result, matrix::pdu::PduBuilder};
use conduwuit::{Err, Result, matrix::pdu::PduBuilder};
use ruma::{
api::client::redact::redact_event, events::room::redaction::RoomRedactionEventContent,
};
@ -17,6 +17,10 @@ pub(crate) async fn redact_event_route(
) -> Result<redact_event::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let body = body.body;
if services.users.is_suspended(sender_user).await? {
// TODO: Users can redact their own messages while suspended
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
let state_lock = services.rooms.state.mutex.lock(&body.room_id).await;

View file

@ -70,6 +70,10 @@ pub(crate) async fn create_room_route(
));
}
if services.users.is_suspended(sender_user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
let room_id: OwnedRoomId = match &body.room_id {
| Some(custom_room_id) => custom_room_id_check(&services, custom_room_id)?,
| _ => RoomId::new(&services.server.name),

View file

@ -2,7 +2,7 @@ use std::cmp::max;
use axum::extract::State;
use conduwuit::{
Error, Result, err, info,
Err, Error, Result, err, info,
matrix::{StateKey, pdu::PduBuilder},
};
use futures::StreamExt;
@ -63,6 +63,10 @@ pub(crate) async fn upgrade_room_route(
));
}
if services.users.is_suspended(sender_user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
// Create a replacement room
let replacement_room = RoomId::new(services.globals.server_name());

View file

@ -23,6 +23,9 @@ pub(crate) async fn send_message_event_route(
let sender_user = body.sender_user();
let sender_device = body.sender_device.as_deref();
let appservice_info = body.appservice_info.as_ref();
if services.users.is_suspended(sender_user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
// Forbid m.room.encrypted if encryption is disabled
if MessageLikeEventType::RoomEncrypted == body.event_type && !services.config.allow_encryption

View file

@ -33,6 +33,10 @@ pub(crate) async fn send_state_event_for_key_route(
) -> Result<send_state_event::v3::Response> {
let sender_user = body.sender_user();
if services.users.is_suspended(sender_user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
Ok(send_state_event::v3::Response {
event_id: send_state_event_for_key_helper(
&services,

View file

@ -26,7 +26,7 @@ pub(crate) async fn create_typing_event_route(
{
return Err!(Request(Forbidden("You are not in this room.")));
}
if !services.users.is_suspended(sender_user).await? {
match body.state {
| Typing::Yes(duration) => {
let duration = utils::clamp(
@ -62,6 +62,7 @@ pub(crate) async fn create_typing_event_route(
.await?;
},
}
}
// ping presence
if services.config.allow_local_presence {

View file

@ -378,6 +378,10 @@ pub(super) static MAPS: &[Descriptor] = &[
name: "userid_password",
..descriptor::RANDOM
},
Descriptor {
name: "userid_suspension",
..descriptor::RANDOM_SMALL
},
Descriptor {
name: "userid_presenceid",
..descriptor::RANDOM_SMALL

View file

@ -45,11 +45,13 @@ struct Services {
services: StdRwLock<Option<Weak<crate::Services>>>,
}
/// Inputs to a command are a multi-line string and optional reply_id.
/// Inputs to a command are a multi-line string, optional reply_id, and optional
/// sender.
#[derive(Debug)]
pub struct CommandInput {
pub command: String,
pub reply_id: Option<OwnedEventId>,
pub sender: Option<Box<UserId>>,
}
/// Prototype of the tab-completer. The input is buffered text when tab
@ -162,7 +164,22 @@ impl Service {
pub fn command(&self, command: String, reply_id: Option<OwnedEventId>) -> Result<()> {
self.channel
.0
.send(CommandInput { command, reply_id })
.send(CommandInput { command, reply_id, sender: None })
.map_err(|e| err!("Failed to enqueue admin command: {e:?}"))
}
/// Posts a command to the command processor queue with sender information
/// and returns. Processing will take place on the service worker's task
/// asynchronously. Errors if the queue is full.
pub fn command_with_sender(
&self,
command: String,
reply_id: Option<OwnedEventId>,
sender: Box<UserId>,
) -> Result<()> {
self.channel
.0
.send(CommandInput { command, reply_id, sender: Some(sender) })
.map_err(|e| err!("Failed to enqueue admin command: {e:?}"))
}
@ -173,7 +190,7 @@ impl Service {
command: String,
reply_id: Option<OwnedEventId>,
) -> ProcessorResult {
self.process_command(CommandInput { command, reply_id })
self.process_command(CommandInput { command, reply_id, sender: None })
.await
}

View file

@ -536,9 +536,11 @@ impl Service {
self.services.search.index_pdu(shortroomid, &pdu_id, &body);
if self.services.admin.is_admin_command(pdu, &body).await {
self.services
.admin
.command(body, Some((*pdu.event_id).into()))?;
self.services.admin.command_with_sender(
body,
Some((*pdu.event_id).into()),
pdu.sender.clone().into(),
)?;
}
}
},

View file

@ -16,10 +16,21 @@ use ruma::{
},
serde::Raw,
};
use serde::{Deserialize, Serialize};
use serde_json::json;
use crate::{Dep, account_data, admin, globals, rooms};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UserSuspension {
/// Whether the user is currently suspended
pub suspended: bool,
/// When the user was suspended (Unix timestamp in milliseconds)
pub suspended_at: u64,
/// User ID of who suspended this user
pub suspended_by: String,
}
pub struct Service {
services: Services,
db: Data,
@ -52,6 +63,7 @@ struct Data {
userid_lastonetimekeyupdate: Arc<Map>,
userid_masterkeyid: Arc<Map>,
userid_password: Arc<Map>,
userid_suspension: Arc<Map>,
userid_selfsigningkeyid: Arc<Map>,
userid_usersigningkeyid: Arc<Map>,
useridprofilekey_value: Arc<Map>,
@ -87,6 +99,7 @@ impl crate::Service for Service {
userid_lastonetimekeyupdate: args.db["userid_lastonetimekeyupdate"].clone(),
userid_masterkeyid: args.db["userid_masterkeyid"].clone(),
userid_password: args.db["userid_password"].clone(),
userid_suspension: args.db["userid_suspension"].clone(),
userid_selfsigningkeyid: args.db["userid_selfsigningkeyid"].clone(),
userid_usersigningkeyid: args.db["userid_usersigningkeyid"].clone(),
useridprofilekey_value: args.db["useridprofilekey_value"].clone(),
@ -143,6 +156,23 @@ impl Service {
Ok(())
}
/// Suspend account, placing it in a read-only state
pub async fn suspend_account(&self, user_id: &UserId, suspending_user: &UserId) {
self.db.userid_suspension.raw_put(
user_id,
Json(UserSuspension {
suspended: true,
suspended_at: MilliSecondsSinceUnixEpoch::now().get().into(),
suspended_by: suspending_user.to_string(),
}),
);
}
/// Unsuspend account, placing it in a read-write state
pub async fn unsuspend_account(&self, user_id: &UserId) {
self.db.userid_suspension.remove(user_id);
}
/// Check if a user has an account on this homeserver.
#[inline]
pub async fn exists(&self, user_id: &UserId) -> bool {
@ -159,6 +189,25 @@ impl Service {
.await
}
/// Check if account is suspended
pub async fn is_suspended(&self, user_id: &UserId) -> Result<bool> {
match self
.db
.userid_suspension
.get(user_id)
.await
.deserialized::<UserSuspension>()
{
| Ok(s) => Ok(s.suspended),
| Err(e) =>
if e.is_not_found() {
Ok(false)
} else {
Err(e)
},
}
}
/// Check if account is active, infallible
pub async fn is_active(&self, user_id: &UserId) -> bool {
!self.is_deactivated(user_id).await.unwrap_or(true)