mirror of
https://forgejo.ellis.link/continuwuation/continuwuity.git
synced 2025-09-10 00:23:03 +02:00
feat(async-media): Partial implementation of async media
This commit is contained in:
parent
e4a43b1a5b
commit
daaea3f766
3 changed files with 154 additions and 6 deletions
|
@ -3,13 +3,15 @@ use std::time::Duration;
|
||||||
use axum::extract::State;
|
use axum::extract::State;
|
||||||
use axum_client_ip::InsecureClientIp;
|
use axum_client_ip::InsecureClientIp;
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Err, Result, err,
|
Err, Error, Result, debug, debug_info, err, error,
|
||||||
utils::{self, content_disposition::make_content_disposition, math::ruma_from_usize},
|
utils::{self, content_disposition::make_content_disposition, math::ruma_from_usize},
|
||||||
|
warn,
|
||||||
};
|
};
|
||||||
use conduwuit_service::{
|
use conduwuit_service::{
|
||||||
Services,
|
Services,
|
||||||
media::{CACHE_CONTROL_IMMUTABLE, CORP_CROSS_ORIGIN, Dim, FileMeta, MXC_LENGTH},
|
media::{CACHE_CONTROL_IMMUTABLE, CORP_CROSS_ORIGIN, Dim, FileMeta, MXC_LENGTH},
|
||||||
};
|
};
|
||||||
|
use http::StatusCode;
|
||||||
use reqwest::Url;
|
use reqwest::Url;
|
||||||
use ruma::{
|
use ruma::{
|
||||||
Mxc, UserId,
|
Mxc, UserId,
|
||||||
|
@ -18,7 +20,8 @@ use ruma::{
|
||||||
get_content, get_content_as_filename, get_content_thumbnail, get_media_config,
|
get_content, get_content_as_filename, get_content_thumbnail, get_media_config,
|
||||||
get_media_preview,
|
get_media_preview,
|
||||||
},
|
},
|
||||||
media::create_content,
|
error::ErrorKind,
|
||||||
|
media::{create_content, create_content_async, create_mxc_uri},
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -83,6 +86,83 @@ pub(crate) async fn create_content_route(
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// # `POST /_matrix/media/v1/create`
|
||||||
|
///
|
||||||
|
/// Creates a new MXC URI to later be populated.
|
||||||
|
#[tracing::instrument(
|
||||||
|
name = "media_create_async",
|
||||||
|
level = "debug",
|
||||||
|
skip_all,
|
||||||
|
fields(%client),
|
||||||
|
)]
|
||||||
|
pub(crate) async fn create_async_mxc_uri_route(
|
||||||
|
State(services): State<crate::State>,
|
||||||
|
InsecureClientIp(client): InsecureClientIp,
|
||||||
|
body: Ruma<create_mxc_uri::v1::Request>,
|
||||||
|
) -> Result<create_mxc_uri::v1::Response> {
|
||||||
|
let user = body.sender_user();
|
||||||
|
if services.users.is_suspended(user).await? {
|
||||||
|
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
|
||||||
|
}
|
||||||
|
let ref mxc = Mxc {
|
||||||
|
server_name: services.globals.server_name(),
|
||||||
|
media_id: &utils::random_string(MXC_LENGTH),
|
||||||
|
};
|
||||||
|
services.media.create_async(mxc, Some(user)).await?;
|
||||||
|
// TODO: add expiring MXC URIs to prevent exhaustion of MXC IDs
|
||||||
|
Ok(create_mxc_uri::v1::Response {
|
||||||
|
content_uri: mxc.to_string().into(),
|
||||||
|
unused_expires_at: None,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// # `PUT /_matrix/media/v3/upload/{serverName}/{mediaId}`
|
||||||
|
///
|
||||||
|
/// Permanently save media in the server, using an existing MXC URI.
|
||||||
|
#[tracing::instrument(
|
||||||
|
name = "media_async_upload",
|
||||||
|
level = "debug",
|
||||||
|
skip_all,
|
||||||
|
fields(%client),
|
||||||
|
)]
|
||||||
|
pub(crate) async fn upload_async_media_route(
|
||||||
|
State(services): State<crate::State>,
|
||||||
|
InsecureClientIp(client): InsecureClientIp,
|
||||||
|
body: Ruma<create_content_async::v3::Request>,
|
||||||
|
) -> Result<create_content_async::v3::Response> {
|
||||||
|
let user = body.sender_user();
|
||||||
|
if services.users.is_suspended(user).await? {
|
||||||
|
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
|
||||||
|
}
|
||||||
|
let ref mxc = Mxc {
|
||||||
|
server_name: &body.server_name,
|
||||||
|
media_id: &body.media_id,
|
||||||
|
};
|
||||||
|
if !services.globals.server_is_ours(&body.server_name) {
|
||||||
|
return Err!(Request(Forbidden("Media uri does not belong to us.")));
|
||||||
|
}
|
||||||
|
if !services.media.exists(mxc).await {
|
||||||
|
return Err!(Request(NotFound("Media uri does not exist.")));
|
||||||
|
} else if services.media.is_populated(mxc).await {
|
||||||
|
return Err(Error::Request(
|
||||||
|
ErrorKind::CannotOverwriteMedia,
|
||||||
|
"Media uri is already populated.".into(),
|
||||||
|
StatusCode::CONFLICT,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let filename = body.filename.as_deref();
|
||||||
|
let content_type = body.content_type.as_deref();
|
||||||
|
let content_disposition = make_content_disposition(None, content_type, filename);
|
||||||
|
|
||||||
|
services
|
||||||
|
.media
|
||||||
|
.create(mxc, Some(user), Some(&content_disposition), content_type, &body.file)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(create_content_async::v3::Response {})
|
||||||
|
}
|
||||||
|
|
||||||
/// # `GET /_matrix/client/v1/media/thumbnail/{serverName}/{mediaId}`
|
/// # `GET /_matrix/client/v1/media/thumbnail/{serverName}/{mediaId}`
|
||||||
///
|
///
|
||||||
/// Load media thumbnail from our server or over federation.
|
/// Load media thumbnail from our server or over federation.
|
||||||
|
@ -313,18 +393,48 @@ async fn fetch_thumbnail_meta(
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn wait_for_population(
|
||||||
|
services: &Services,
|
||||||
|
mxc: &Mxc<'_>,
|
||||||
|
timeout_ms: Duration,
|
||||||
|
) -> Result<FileMeta> {
|
||||||
|
async fn inner(services: &Services, mxc: &Mxc<'_>) -> Result<FileMeta> {
|
||||||
|
if !services.media.exists(mxc).await {
|
||||||
|
return Err!(Request(NotFound("Media not found.")));
|
||||||
|
}
|
||||||
|
loop {
|
||||||
|
if let Ok(Some(filemeta)) = services.media.get(mxc).await {
|
||||||
|
return Ok(filemeta);
|
||||||
|
}
|
||||||
|
// TODO(async-media): A notify/send mechanism would be better than polling.
|
||||||
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tokio::time::timeout(
|
||||||
|
timeout_ms
|
||||||
|
.checked_sub(Duration::from_millis(500))
|
||||||
|
.unwrap_or(Duration::from_millis(0)),
|
||||||
|
inner(services, mxc),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(|_| err!(Request(NotYetUploaded("Media was not ready in time."))))?
|
||||||
|
}
|
||||||
async fn fetch_file_meta(
|
async fn fetch_file_meta(
|
||||||
services: &Services,
|
services: &Services,
|
||||||
mxc: &Mxc<'_>,
|
mxc: &Mxc<'_>,
|
||||||
user: &UserId,
|
user: &UserId,
|
||||||
timeout_ms: Duration,
|
timeout_ms: Duration,
|
||||||
) -> Result<FileMeta> {
|
) -> Result<FileMeta> {
|
||||||
if let Some(filemeta) = services.media.get(mxc).await? {
|
if services.globals.server_is_ours(mxc.server_name) {
|
||||||
return Ok(filemeta);
|
let result = wait_for_population(services, mxc, timeout_ms).await;
|
||||||
|
if let Ok(filemeta) = result {
|
||||||
|
return Ok(filemeta);
|
||||||
|
}
|
||||||
|
return Err!(Request(NotFound("Local media not found.")));
|
||||||
}
|
}
|
||||||
|
|
||||||
if services.globals.server_is_ours(mxc.server_name) {
|
if let Some(filemeta) = services.media.get(mxc).await? {
|
||||||
return Err!(Request(NotFound("Local media not found.")));
|
return Ok(filemeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
services
|
services
|
||||||
|
|
|
@ -154,6 +154,8 @@ pub fn build(router: Router<State>, server: &Server) -> Router<State> {
|
||||||
.ruma_route(&client::turn_server_route)
|
.ruma_route(&client::turn_server_route)
|
||||||
.ruma_route(&client::send_event_to_device_route)
|
.ruma_route(&client::send_event_to_device_route)
|
||||||
.ruma_route(&client::create_content_route)
|
.ruma_route(&client::create_content_route)
|
||||||
|
.ruma_route(&client::create_async_mxc_uri_route)
|
||||||
|
.ruma_route(&client::upload_async_media_route)
|
||||||
.ruma_route(&client::get_content_thumbnail_route)
|
.ruma_route(&client::get_content_thumbnail_route)
|
||||||
.ruma_route(&client::get_content_route)
|
.ruma_route(&client::get_content_route)
|
||||||
.ruma_route(&client::get_content_as_filename_route)
|
.ruma_route(&client::get_content_as_filename_route)
|
||||||
|
|
|
@ -105,6 +105,15 @@ impl Service {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Creates an MXC key but no associated file.
|
||||||
|
/// Used for async media. Must be later overwritten with `create`
|
||||||
|
pub async fn create_async(&self, mxc: &Mxc<'_>, user: Option<&UserId>) -> Result {
|
||||||
|
// effectively just reserves the MXC ID in the database
|
||||||
|
self.db
|
||||||
|
.create_file_metadata(mxc, user, &Dim::default(), None, None)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Deletes a file in the database and from the media directory via an MXC
|
/// Deletes a file in the database and from the media directory via an MXC
|
||||||
pub async fn delete(&self, mxc: &Mxc<'_>) -> Result<()> {
|
pub async fn delete(&self, mxc: &Mxc<'_>) -> Result<()> {
|
||||||
match self.db.search_mxc_metadata_prefix(mxc).await {
|
match self.db.search_mxc_metadata_prefix(mxc).await {
|
||||||
|
@ -179,6 +188,33 @@ impl Service {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Checks that an MXC URI exists in our media database.
|
||||||
|
pub async fn exists(&self, mxc: &Mxc<'_>) -> bool {
|
||||||
|
self.db.search_mxc_metadata_prefix(mxc).await.is_ok()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Checks that an MXC URI exists *and* has a file associated with it in our
|
||||||
|
/// media database.
|
||||||
|
pub async fn is_populated(&self, mxc: &Mxc<'_>) -> bool {
|
||||||
|
match self.db.search_mxc_metadata_prefix(mxc).await {
|
||||||
|
| Ok(keys) => {
|
||||||
|
if keys.is_empty() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
for key in keys {
|
||||||
|
let path = self.get_media_file(&key);
|
||||||
|
if fs::metadata(path).await.is_ok() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
false
|
||||||
|
},
|
||||||
|
| _ => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Gets all the MXC URIs in our media database
|
/// Gets all the MXC URIs in our media database
|
||||||
pub async fn get_all_mxcs(&self) -> Result<Vec<OwnedMxcUri>> {
|
pub async fn get_all_mxcs(&self) -> Result<Vec<OwnedMxcUri>> {
|
||||||
let all_keys = self.db.get_all_media_keys().await;
|
let all_keys = self.db.get_all_media_keys().await;
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue