From daaea3f7669af0887f17c90a0cc1f86ffb11f798 Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Wed, 6 Aug 2025 01:29:41 +0100 Subject: [PATCH] feat(async-media): Partial implementation of async media --- src/api/client/media.rs | 122 +++++++++++++++++++++++++++++++++++++-- src/api/router.rs | 2 + src/service/media/mod.rs | 36 ++++++++++++ 3 files changed, 154 insertions(+), 6 deletions(-) diff --git a/src/api/client/media.rs b/src/api/client/media.rs index 3f491d54..3e025f33 100644 --- a/src/api/client/media.rs +++ b/src/api/client/media.rs @@ -3,13 +3,15 @@ use std::time::Duration; use axum::extract::State; use axum_client_ip::InsecureClientIp; use conduwuit::{ - Err, Result, err, + Err, Error, Result, debug, debug_info, err, error, utils::{self, content_disposition::make_content_disposition, math::ruma_from_usize}, + warn, }; use conduwuit_service::{ Services, media::{CACHE_CONTROL_IMMUTABLE, CORP_CROSS_ORIGIN, Dim, FileMeta, MXC_LENGTH}, }; +use http::StatusCode; use reqwest::Url; use ruma::{ Mxc, UserId, @@ -18,7 +20,8 @@ use ruma::{ get_content, get_content_as_filename, get_content_thumbnail, get_media_config, get_media_preview, }, - media::create_content, + error::ErrorKind, + media::{create_content, create_content_async, create_mxc_uri}, }, }; @@ -83,6 +86,83 @@ pub(crate) async fn create_content_route( }) } +/// # `POST /_matrix/media/v1/create` +/// +/// Creates a new MXC URI to later be populated. +#[tracing::instrument( + name = "media_create_async", + level = "debug", + skip_all, + fields(%client), +)] +pub(crate) async fn create_async_mxc_uri_route( + State(services): State, + InsecureClientIp(client): InsecureClientIp, + body: Ruma, +) -> Result { + let user = body.sender_user(); + if services.users.is_suspended(user).await? { + return Err!(Request(UserSuspended("You cannot perform this action while suspended."))); + } + let ref mxc = Mxc { + server_name: services.globals.server_name(), + media_id: &utils::random_string(MXC_LENGTH), + }; + services.media.create_async(mxc, Some(user)).await?; + // TODO: add expiring MXC URIs to prevent exhaustion of MXC IDs + Ok(create_mxc_uri::v1::Response { + content_uri: mxc.to_string().into(), + unused_expires_at: None, + }) +} + +/// # `PUT /_matrix/media/v3/upload/{serverName}/{mediaId}` +/// +/// Permanently save media in the server, using an existing MXC URI. +#[tracing::instrument( + name = "media_async_upload", + level = "debug", + skip_all, + fields(%client), +)] +pub(crate) async fn upload_async_media_route( + State(services): State, + InsecureClientIp(client): InsecureClientIp, + body: Ruma, +) -> Result { + let user = body.sender_user(); + if services.users.is_suspended(user).await? { + return Err!(Request(UserSuspended("You cannot perform this action while suspended."))); + } + let ref mxc = Mxc { + server_name: &body.server_name, + media_id: &body.media_id, + }; + if !services.globals.server_is_ours(&body.server_name) { + return Err!(Request(Forbidden("Media uri does not belong to us."))); + } + if !services.media.exists(mxc).await { + return Err!(Request(NotFound("Media uri does not exist."))); + } else if services.media.is_populated(mxc).await { + return Err(Error::Request( + ErrorKind::CannotOverwriteMedia, + "Media uri is already populated.".into(), + StatusCode::CONFLICT, + )); + } + + let filename = body.filename.as_deref(); + let content_type = body.content_type.as_deref(); + let content_disposition = make_content_disposition(None, content_type, filename); + + services + .media + .create(mxc, Some(user), Some(&content_disposition), content_type, &body.file) + .await?; + + Ok(create_content_async::v3::Response {}) +} + /// # `GET /_matrix/client/v1/media/thumbnail/{serverName}/{mediaId}` /// /// Load media thumbnail from our server or over federation. @@ -313,18 +393,48 @@ async fn fetch_thumbnail_meta( .await } +async fn wait_for_population( + services: &Services, + mxc: &Mxc<'_>, + timeout_ms: Duration, +) -> Result { + async fn inner(services: &Services, mxc: &Mxc<'_>) -> Result { + if !services.media.exists(mxc).await { + return Err!(Request(NotFound("Media not found."))); + } + loop { + if let Ok(Some(filemeta)) = services.media.get(mxc).await { + return Ok(filemeta); + } + // TODO(async-media): A notify/send mechanism would be better than polling. + tokio::time::sleep(Duration::from_millis(100)).await; + } + } + tokio::time::timeout( + timeout_ms + .checked_sub(Duration::from_millis(500)) + .unwrap_or(Duration::from_millis(0)), + inner(services, mxc), + ) + .await + .map_err(|_| err!(Request(NotYetUploaded("Media was not ready in time."))))? +} async fn fetch_file_meta( services: &Services, mxc: &Mxc<'_>, user: &UserId, timeout_ms: Duration, ) -> Result { - if let Some(filemeta) = services.media.get(mxc).await? { - return Ok(filemeta); + if services.globals.server_is_ours(mxc.server_name) { + let result = wait_for_population(services, mxc, timeout_ms).await; + if let Ok(filemeta) = result { + return Ok(filemeta); + } + return Err!(Request(NotFound("Local media not found."))); } - if services.globals.server_is_ours(mxc.server_name) { - return Err!(Request(NotFound("Local media not found."))); + if let Some(filemeta) = services.media.get(mxc).await? { + return Ok(filemeta); } services diff --git a/src/api/router.rs b/src/api/router.rs index 8072fa5b..81ecf9cf 100644 --- a/src/api/router.rs +++ b/src/api/router.rs @@ -154,6 +154,8 @@ pub fn build(router: Router, server: &Server) -> Router { .ruma_route(&client::turn_server_route) .ruma_route(&client::send_event_to_device_route) .ruma_route(&client::create_content_route) + .ruma_route(&client::create_async_mxc_uri_route) + .ruma_route(&client::upload_async_media_route) .ruma_route(&client::get_content_thumbnail_route) .ruma_route(&client::get_content_route) .ruma_route(&client::get_content_as_filename_route) diff --git a/src/service/media/mod.rs b/src/service/media/mod.rs index d053ba54..4b2a18cc 100644 --- a/src/service/media/mod.rs +++ b/src/service/media/mod.rs @@ -105,6 +105,15 @@ impl Service { Ok(()) } + /// Creates an MXC key but no associated file. + /// Used for async media. Must be later overwritten with `create` + pub async fn create_async(&self, mxc: &Mxc<'_>, user: Option<&UserId>) -> Result { + // effectively just reserves the MXC ID in the database + self.db + .create_file_metadata(mxc, user, &Dim::default(), None, None)?; + Ok(()) + } + /// Deletes a file in the database and from the media directory via an MXC pub async fn delete(&self, mxc: &Mxc<'_>) -> Result<()> { match self.db.search_mxc_metadata_prefix(mxc).await { @@ -179,6 +188,33 @@ impl Service { } } + /// Checks that an MXC URI exists in our media database. + pub async fn exists(&self, mxc: &Mxc<'_>) -> bool { + self.db.search_mxc_metadata_prefix(mxc).await.is_ok() + } + + /// Checks that an MXC URI exists *and* has a file associated with it in our + /// media database. + pub async fn is_populated(&self, mxc: &Mxc<'_>) -> bool { + match self.db.search_mxc_metadata_prefix(mxc).await { + | Ok(keys) => { + if keys.is_empty() { + return false; + } + + for key in keys { + let path = self.get_media_file(&key); + if fs::metadata(path).await.is_ok() { + return true; + } + } + + false + }, + | _ => false, + } + } + /// Gets all the MXC URIs in our media database pub async fn get_all_mxcs(&self) -> Result> { let all_keys = self.db.get_all_media_keys().await;