mirror of
https://forgejo.ellis.link/continuwuation/continuwuity.git
synced 2025-09-11 17:23:00 +02:00
Compare commits
4 commits
bd3db65cb2
...
583cb924f1
Author | SHA1 | Date | |
---|---|---|---|
|
583cb924f1 | ||
|
9286838d23 | ||
|
d1ebcfaf0b | ||
|
e820551f62 |
9 changed files with 291 additions and 49 deletions
|
@ -8,7 +8,7 @@ use conduwuit::{
|
||||||
ref_at,
|
ref_at,
|
||||||
utils::{
|
utils::{
|
||||||
IterStream, ReadyExt,
|
IterStream, ReadyExt,
|
||||||
result::{FlatOk, LogErr},
|
result::LogErr,
|
||||||
stream::{BroadbandExt, TryIgnore, WidebandExt},
|
stream::{BroadbandExt, TryIgnore, WidebandExt},
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
@ -35,6 +35,7 @@ use ruma::{
|
||||||
};
|
};
|
||||||
use tracing::warn;
|
use tracing::warn;
|
||||||
|
|
||||||
|
use super::utils::{count_to_token, parse_pagination_token as parse_token};
|
||||||
use crate::Ruma;
|
use crate::Ruma;
|
||||||
|
|
||||||
/// list of safe and common non-state events to ignore if the user is ignored
|
/// list of safe and common non-state events to ignore if the user is ignored
|
||||||
|
@ -84,14 +85,14 @@ pub(crate) async fn get_message_events_route(
|
||||||
let from: PduCount = body
|
let from: PduCount = body
|
||||||
.from
|
.from
|
||||||
.as_deref()
|
.as_deref()
|
||||||
.map(str::parse)
|
.map(parse_token)
|
||||||
.transpose()?
|
.transpose()?
|
||||||
.unwrap_or_else(|| match body.dir {
|
.unwrap_or_else(|| match body.dir {
|
||||||
| Direction::Forward => PduCount::min(),
|
| Direction::Forward => PduCount::min(),
|
||||||
| Direction::Backward => PduCount::max(),
|
| Direction::Backward => PduCount::max(),
|
||||||
});
|
});
|
||||||
|
|
||||||
let to: Option<PduCount> = body.to.as_deref().map(str::parse).flat_ok();
|
let to: Option<PduCount> = body.to.as_deref().map(parse_token).transpose()?;
|
||||||
|
|
||||||
let limit: usize = body
|
let limit: usize = body
|
||||||
.limit
|
.limit
|
||||||
|
@ -180,8 +181,8 @@ pub(crate) async fn get_message_events_route(
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
Ok(get_message_events::v3::Response {
|
Ok(get_message_events::v3::Response {
|
||||||
start: from.to_string(),
|
start: count_to_token(from),
|
||||||
end: next_token.as_ref().map(ToString::to_string),
|
end: next_token.map(count_to_token),
|
||||||
chunk,
|
chunk,
|
||||||
state,
|
state,
|
||||||
})
|
})
|
||||||
|
|
|
@ -36,6 +36,7 @@ pub(super) mod typing;
|
||||||
pub(super) mod unstable;
|
pub(super) mod unstable;
|
||||||
pub(super) mod unversioned;
|
pub(super) mod unversioned;
|
||||||
pub(super) mod user_directory;
|
pub(super) mod user_directory;
|
||||||
|
pub(super) mod utils;
|
||||||
pub(super) mod voip;
|
pub(super) mod voip;
|
||||||
pub(super) mod well_known;
|
pub(super) mod well_known;
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,7 @@ use ruma::{
|
||||||
events::{TimelineEventType, relation::RelationType},
|
events::{TimelineEventType, relation::RelationType},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use super::utils::{count_to_token, parse_pagination_token as parse_token};
|
||||||
use crate::Ruma;
|
use crate::Ruma;
|
||||||
|
|
||||||
/// # `GET /_matrix/client/r0/rooms/{roomId}/relations/{eventId}/{relType}/{eventType}`
|
/// # `GET /_matrix/client/r0/rooms/{roomId}/relations/{eventId}/{relType}/{eventType}`
|
||||||
|
@ -110,14 +111,14 @@ async fn paginate_relations_with_filter(
|
||||||
dir: Direction,
|
dir: Direction,
|
||||||
) -> Result<get_relating_events::v1::Response> {
|
) -> Result<get_relating_events::v1::Response> {
|
||||||
let start: PduCount = from
|
let start: PduCount = from
|
||||||
.map(str::parse)
|
.map(parse_token)
|
||||||
.transpose()?
|
.transpose()?
|
||||||
.unwrap_or_else(|| match dir {
|
.unwrap_or_else(|| match dir {
|
||||||
| Direction::Forward => PduCount::min(),
|
| Direction::Forward => PduCount::min(),
|
||||||
| Direction::Backward => PduCount::max(),
|
| Direction::Backward => PduCount::max(),
|
||||||
});
|
});
|
||||||
|
|
||||||
let to: Option<PduCount> = to.map(str::parse).flat_ok();
|
let to: Option<PduCount> = to.map(parse_token).transpose()?;
|
||||||
|
|
||||||
// Use limit or else 30, with maximum 100
|
// Use limit or else 30, with maximum 100
|
||||||
let limit: usize = limit
|
let limit: usize = limit
|
||||||
|
@ -129,6 +130,11 @@ async fn paginate_relations_with_filter(
|
||||||
// Spec (v1.10) recommends depth of at least 3
|
// Spec (v1.10) recommends depth of at least 3
|
||||||
let depth: u8 = if recurse { 3 } else { 1 };
|
let depth: u8 = if recurse { 3 } else { 1 };
|
||||||
|
|
||||||
|
// Check if this is a thread request
|
||||||
|
let is_thread = filter_rel_type
|
||||||
|
.as_ref()
|
||||||
|
.is_some_and(|rel| *rel == RelationType::Thread);
|
||||||
|
|
||||||
let events: Vec<_> = services
|
let events: Vec<_> = services
|
||||||
.rooms
|
.rooms
|
||||||
.pdu_metadata
|
.pdu_metadata
|
||||||
|
@ -152,23 +158,58 @@ async fn paginate_relations_with_filter(
|
||||||
.collect()
|
.collect()
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
let next_batch = match dir {
|
// For threads, check if we should include the root event
|
||||||
|
let mut root_event = None;
|
||||||
|
if is_thread && dir == Direction::Backward {
|
||||||
|
// Check if we've reached the beginning of the thread
|
||||||
|
// (fewer events than requested means we've exhausted the thread)
|
||||||
|
if events.len() < limit {
|
||||||
|
// Try to get the thread root event
|
||||||
|
if let Ok(root_pdu) = services.rooms.timeline.get_pdu(target).await {
|
||||||
|
// Check visibility
|
||||||
|
if services
|
||||||
|
.rooms
|
||||||
|
.state_accessor
|
||||||
|
.user_can_see_event(sender_user, room_id, target)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
// Store the root event to add to the response
|
||||||
|
root_event = Some(root_pdu);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Determine if there are more events to fetch
|
||||||
|
let has_more = if root_event.is_some() {
|
||||||
|
false // We've included the root, no more events
|
||||||
|
} else {
|
||||||
|
// Check if we got a full page of results (might be more)
|
||||||
|
events.len() >= limit
|
||||||
|
};
|
||||||
|
|
||||||
|
let next_batch = if has_more {
|
||||||
|
match dir {
|
||||||
| Direction::Forward => events.last(),
|
| Direction::Forward => events.last(),
|
||||||
| Direction::Backward => events.first(),
|
| Direction::Backward => events.first(),
|
||||||
}
|
}
|
||||||
.map(at!(0))
|
.map(|(count, _)| count_to_token(*count))
|
||||||
.as_ref()
|
} else {
|
||||||
.map(ToString::to_string);
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
// Build the response chunk with thread root if needed
|
||||||
|
let chunk: Vec<_> = root_event
|
||||||
|
.into_iter()
|
||||||
|
.map(Event::into_format)
|
||||||
|
.chain(events.into_iter().map(at!(1)).map(Event::into_format))
|
||||||
|
.collect();
|
||||||
|
|
||||||
Ok(get_relating_events::v1::Response {
|
Ok(get_relating_events::v1::Response {
|
||||||
next_batch,
|
next_batch,
|
||||||
prev_batch: from.map(Into::into),
|
prev_batch: from.map(Into::into),
|
||||||
recursion_depth: recurse.then_some(depth.into()),
|
recursion_depth: recurse.then_some(depth.into()),
|
||||||
chunk: events
|
chunk,
|
||||||
.into_iter()
|
|
||||||
.map(at!(1))
|
|
||||||
.map(Event::into_format)
|
|
||||||
.collect(),
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -198,8 +198,8 @@ pub(crate) async fn login_route(
|
||||||
.clone()
|
.clone()
|
||||||
.unwrap_or_else(|| utils::random_string(DEVICE_ID_LENGTH).into());
|
.unwrap_or_else(|| utils::random_string(DEVICE_ID_LENGTH).into());
|
||||||
|
|
||||||
// Generate a new token for the device
|
// Generate a new token for the device (ensuring no collisions)
|
||||||
let token = utils::random_string(TOKEN_LENGTH);
|
let token = services.users.generate_unique_token().await;
|
||||||
|
|
||||||
// Determine if device_id was provided and exists in the db for this user
|
// Determine if device_id was provided and exists in the db for this user
|
||||||
let device_exists = if body.device_id.is_some() {
|
let device_exists = if body.device_id.is_some() {
|
||||||
|
|
28
src/api/client/utils.rs
Normal file
28
src/api/client/utils.rs
Normal file
|
@ -0,0 +1,28 @@
|
||||||
|
use conduwuit::{
|
||||||
|
Result, err,
|
||||||
|
matrix::pdu::{PduCount, ShortEventId},
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Parse a pagination token, trying ShortEventId first, then falling back to
|
||||||
|
/// PduCount
|
||||||
|
pub(crate) fn parse_pagination_token(token: &str) -> Result<PduCount> {
|
||||||
|
// Try parsing as ShortEventId first
|
||||||
|
if let Ok(shorteventid) = token.parse::<ShortEventId>() {
|
||||||
|
// ShortEventId maps directly to a PduCount in our database
|
||||||
|
Ok(PduCount::Normal(shorteventid))
|
||||||
|
} else if let Ok(count) = token.parse::<u64>() {
|
||||||
|
// Fallback to PduCount for backwards compatibility
|
||||||
|
Ok(PduCount::Normal(count))
|
||||||
|
} else if let Ok(count) = token.parse::<i64>() {
|
||||||
|
// Also handle negative counts for backfilled events
|
||||||
|
Ok(PduCount::from_signed(count))
|
||||||
|
} else {
|
||||||
|
Err(err!(Request(InvalidParam("Invalid pagination token"))))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Convert a PduCount to a token string (using the underlying ShortEventId)
|
||||||
|
pub(crate) fn count_to_token(count: PduCount) -> String {
|
||||||
|
// The PduCount's unsigned value IS the ShortEventId
|
||||||
|
count.into_unsigned().to_string()
|
||||||
|
}
|
|
@ -5,6 +5,14 @@ use axum_extra::{
|
||||||
typed_header::TypedHeaderRejectionReason,
|
typed_header::TypedHeaderRejectionReason,
|
||||||
};
|
};
|
||||||
use conduwuit::{Err, Error, Result, debug_error, err, warn};
|
use conduwuit::{Err, Error, Result, debug_error, err, warn};
|
||||||
|
use futures::{
|
||||||
|
TryFutureExt,
|
||||||
|
future::{
|
||||||
|
Either::{Left, Right},
|
||||||
|
select_ok,
|
||||||
|
},
|
||||||
|
pin_mut,
|
||||||
|
};
|
||||||
use ruma::{
|
use ruma::{
|
||||||
CanonicalJsonObject, CanonicalJsonValue, OwnedDeviceId, OwnedServerName, OwnedUserId, UserId,
|
CanonicalJsonObject, CanonicalJsonValue, OwnedDeviceId, OwnedServerName, OwnedUserId, UserId,
|
||||||
api::{
|
api::{
|
||||||
|
@ -54,17 +62,7 @@ pub(super) async fn auth(
|
||||||
| None => request.query.access_token.as_deref(),
|
| None => request.query.access_token.as_deref(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let token = if let Some(token) = token {
|
let token = find_token(services, token).await?;
|
||||||
match services.appservice.find_from_token(token).await {
|
|
||||||
| Some(reg_info) => Token::Appservice(Box::new(reg_info)),
|
|
||||||
| _ => match services.users.find_from_token(token).await {
|
|
||||||
| Ok((user_id, device_id)) => Token::User((user_id, device_id)),
|
|
||||||
| _ => Token::Invalid,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
Token::None
|
|
||||||
};
|
|
||||||
|
|
||||||
if metadata.authentication == AuthScheme::None {
|
if metadata.authentication == AuthScheme::None {
|
||||||
match metadata {
|
match metadata {
|
||||||
|
@ -342,3 +340,25 @@ async fn parse_x_matrix(request: &mut Request) -> Result<XMatrix> {
|
||||||
|
|
||||||
Ok(x_matrix)
|
Ok(x_matrix)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn find_token(services: &Services, token: Option<&str>) -> Result<Token> {
|
||||||
|
let Some(token) = token else {
|
||||||
|
return Ok(Token::None);
|
||||||
|
};
|
||||||
|
|
||||||
|
let user_token = services.users.find_from_token(token).map_ok(Token::User);
|
||||||
|
|
||||||
|
let appservice_token = services
|
||||||
|
.appservice
|
||||||
|
.find_from_token(token)
|
||||||
|
.map_ok(Box::new)
|
||||||
|
.map_ok(Token::Appservice);
|
||||||
|
|
||||||
|
pin_mut!(user_token, appservice_token);
|
||||||
|
// Returns Ok if either token type succeeds, Err only if both fail
|
||||||
|
match select_ok([Left(user_token), Right(appservice_token)]).await {
|
||||||
|
| Err(e) if !e.is_not_found() => Err(e),
|
||||||
|
| Ok((token, _)) => Ok(token),
|
||||||
|
| _ => Ok(Token::Invalid),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -4,14 +4,14 @@ mod registration_info;
|
||||||
use std::{collections::BTreeMap, iter::IntoIterator, sync::Arc};
|
use std::{collections::BTreeMap, iter::IntoIterator, sync::Arc};
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use conduwuit::{Result, err, utils::stream::IterStream};
|
use conduwuit::{Err, Result, err, utils::stream::IterStream};
|
||||||
use database::Map;
|
use database::Map;
|
||||||
use futures::{Future, FutureExt, Stream, TryStreamExt};
|
use futures::{Future, FutureExt, Stream, TryStreamExt};
|
||||||
use ruma::{RoomAliasId, RoomId, UserId, api::appservice::Registration};
|
use ruma::{RoomAliasId, RoomId, UserId, api::appservice::Registration};
|
||||||
use tokio::sync::{RwLock, RwLockReadGuard};
|
use tokio::sync::{RwLock, RwLockReadGuard};
|
||||||
|
|
||||||
pub use self::{namespace_regex::NamespaceRegex, registration_info::RegistrationInfo};
|
pub use self::{namespace_regex::NamespaceRegex, registration_info::RegistrationInfo};
|
||||||
use crate::{Dep, sending};
|
use crate::{Dep, globals, sending, users};
|
||||||
|
|
||||||
pub struct Service {
|
pub struct Service {
|
||||||
registration_info: RwLock<Registrations>,
|
registration_info: RwLock<Registrations>,
|
||||||
|
@ -20,7 +20,9 @@ pub struct Service {
|
||||||
}
|
}
|
||||||
|
|
||||||
struct Services {
|
struct Services {
|
||||||
|
globals: Dep<globals::Service>,
|
||||||
sending: Dep<sending::Service>,
|
sending: Dep<sending::Service>,
|
||||||
|
users: Dep<users::Service>,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct Data {
|
struct Data {
|
||||||
|
@ -35,7 +37,9 @@ impl crate::Service for Service {
|
||||||
Ok(Arc::new(Self {
|
Ok(Arc::new(Self {
|
||||||
registration_info: RwLock::new(BTreeMap::new()),
|
registration_info: RwLock::new(BTreeMap::new()),
|
||||||
services: Services {
|
services: Services {
|
||||||
|
globals: args.depend::<globals::Service>("globals"),
|
||||||
sending: args.depend::<sending::Service>("sending"),
|
sending: args.depend::<sending::Service>("sending"),
|
||||||
|
users: args.depend::<users::Service>("users"),
|
||||||
},
|
},
|
||||||
db: Data {
|
db: Data {
|
||||||
id_appserviceregistrations: args.db["id_appserviceregistrations"].clone(),
|
id_appserviceregistrations: args.db["id_appserviceregistrations"].clone(),
|
||||||
|
@ -44,23 +48,89 @@ impl crate::Service for Service {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn worker(self: Arc<Self>) -> Result {
|
async fn worker(self: Arc<Self>) -> Result {
|
||||||
// Inserting registrations into cache
|
// First, collect all appservices to check for token conflicts
|
||||||
self.iter_db_ids()
|
let appservices: Vec<(String, Registration)> = self.iter_db_ids().try_collect().await?;
|
||||||
.try_for_each(async |appservice| {
|
|
||||||
self.registration_info
|
// Check for appservice-to-appservice token conflicts
|
||||||
.write()
|
for i in 0..appservices.len() {
|
||||||
|
for j in i.saturating_add(1)..appservices.len() {
|
||||||
|
if appservices[i].1.as_token == appservices[j].1.as_token {
|
||||||
|
return Err!(Database(error!(
|
||||||
|
"Token collision detected: Appservices '{}' and '{}' have the same token",
|
||||||
|
appservices[i].0, appservices[j].0
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process each appservice
|
||||||
|
for (id, registration) in appservices {
|
||||||
|
// During startup, resolve any token collisions in favour of appservices
|
||||||
|
// by logging out conflicting user devices
|
||||||
|
if let Ok((user_id, device_id)) = self
|
||||||
|
.services
|
||||||
|
.users
|
||||||
|
.find_from_token(®istration.as_token)
|
||||||
.await
|
.await
|
||||||
.insert(appservice.0, appservice.1.try_into()?);
|
{
|
||||||
|
conduwuit::warn!(
|
||||||
|
"Token collision detected during startup: Appservice '{}' token was also \
|
||||||
|
used by user '{}' device '{}'. Logging out the user device to resolve \
|
||||||
|
conflict.",
|
||||||
|
id,
|
||||||
|
user_id.localpart(),
|
||||||
|
device_id
|
||||||
|
);
|
||||||
|
|
||||||
|
self.services
|
||||||
|
.users
|
||||||
|
.remove_device(&user_id, &device_id)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.start_appservice(id, registration).await?;
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
|
||||||
.await
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
|
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Service {
|
impl Service {
|
||||||
|
/// Starts an appservice, ensuring its sender_localpart user exists and is
|
||||||
|
/// active. Creates the user if it doesn't exist, or reactivates it if it
|
||||||
|
/// was deactivated. Then registers the appservice in memory for request
|
||||||
|
/// handling.
|
||||||
|
async fn start_appservice(&self, id: String, registration: Registration) -> Result {
|
||||||
|
let appservice_user_id = UserId::parse_with_server_name(
|
||||||
|
registration.sender_localpart.as_str(),
|
||||||
|
self.services.globals.server_name(),
|
||||||
|
)?;
|
||||||
|
|
||||||
|
if !self.services.users.exists(&appservice_user_id).await {
|
||||||
|
self.services.users.create(&appservice_user_id, None)?;
|
||||||
|
} else if self
|
||||||
|
.services
|
||||||
|
.users
|
||||||
|
.is_deactivated(&appservice_user_id)
|
||||||
|
.await
|
||||||
|
.unwrap_or(false)
|
||||||
|
{
|
||||||
|
// Reactivate the appservice user if it was accidentally deactivated
|
||||||
|
self.services
|
||||||
|
.users
|
||||||
|
.set_password(&appservice_user_id, None)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.registration_info
|
||||||
|
.write()
|
||||||
|
.await
|
||||||
|
.insert(id, registration.try_into()?);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Registers an appservice and returns the ID to the caller
|
/// Registers an appservice and returns the ID to the caller
|
||||||
pub async fn register_appservice(
|
pub async fn register_appservice(
|
||||||
&self,
|
&self,
|
||||||
|
@ -68,15 +138,40 @@ impl Service {
|
||||||
appservice_config_body: &str,
|
appservice_config_body: &str,
|
||||||
) -> Result {
|
) -> Result {
|
||||||
//TODO: Check for collisions between exclusive appservice namespaces
|
//TODO: Check for collisions between exclusive appservice namespaces
|
||||||
self.registration_info
|
|
||||||
.write()
|
// Check for token collision with other appservices (allow re-registration of
|
||||||
|
// same appservice)
|
||||||
|
if let Ok(existing) = self.find_from_token(®istration.as_token).await {
|
||||||
|
if existing.registration.id != registration.id {
|
||||||
|
return Err(err!(Request(InvalidParam(
|
||||||
|
"Cannot register appservice: Token is already used by appservice '{}'. \
|
||||||
|
Please generate a different token.",
|
||||||
|
existing.registration.id
|
||||||
|
))));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prevent token collision with existing user tokens
|
||||||
|
if self
|
||||||
|
.services
|
||||||
|
.users
|
||||||
|
.find_from_token(®istration.as_token)
|
||||||
.await
|
.await
|
||||||
.insert(registration.id.clone(), registration.clone().try_into()?);
|
.is_ok()
|
||||||
|
{
|
||||||
|
return Err(err!(Request(InvalidParam(
|
||||||
|
"Cannot register appservice: The provided token is already in use by a user \
|
||||||
|
device. Please generate a different token for the appservice."
|
||||||
|
))));
|
||||||
|
}
|
||||||
|
|
||||||
self.db
|
self.db
|
||||||
.id_appserviceregistrations
|
.id_appserviceregistrations
|
||||||
.insert(®istration.id, appservice_config_body);
|
.insert(®istration.id, appservice_config_body);
|
||||||
|
|
||||||
|
self.start_appservice(registration.id.clone(), registration.clone())
|
||||||
|
.await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -113,12 +208,14 @@ impl Service {
|
||||||
.map(|info| info.registration)
|
.map(|info| info.registration)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn find_from_token(&self, token: &str) -> Option<RegistrationInfo> {
|
/// Returns Result to match users::find_from_token for select_ok usage
|
||||||
|
pub async fn find_from_token(&self, token: &str) -> Result<RegistrationInfo> {
|
||||||
self.read()
|
self.read()
|
||||||
.await
|
.await
|
||||||
.values()
|
.values()
|
||||||
.find(|info| info.registration.as_token == token)
|
.find(|info| info.registration.as_token == token)
|
||||||
.cloned()
|
.cloned()
|
||||||
|
.ok_or_else(|| err!(Request(NotFound("Appservice token not found"))))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Checks if a given user id matches any exclusive appservice regex
|
/// Checks if a given user id matches any exclusive appservice regex
|
||||||
|
|
|
@ -61,9 +61,12 @@ impl Data {
|
||||||
from: PduCount,
|
from: PduCount,
|
||||||
dir: Direction,
|
dir: Direction,
|
||||||
) -> impl Stream<Item = (PduCount, impl Event)> + Send + '_ {
|
) -> impl Stream<Item = (PduCount, impl Event)> + Send + '_ {
|
||||||
|
// Query from exact position then filter excludes it (saturating_inc could skip
|
||||||
|
// events at min/max boundaries)
|
||||||
|
let from_unsigned = from.into_unsigned();
|
||||||
let mut current = ArrayVec::<u8, 16>::new();
|
let mut current = ArrayVec::<u8, 16>::new();
|
||||||
current.extend(target.to_be_bytes());
|
current.extend(target.to_be_bytes());
|
||||||
current.extend(from.saturating_inc(dir).into_unsigned().to_be_bytes());
|
current.extend(from_unsigned.to_be_bytes());
|
||||||
let current = current.as_slice();
|
let current = current.as_slice();
|
||||||
match dir {
|
match dir {
|
||||||
| Direction::Forward => self.tofrom_relation.raw_keys_from(current).boxed(),
|
| Direction::Forward => self.tofrom_relation.raw_keys_from(current).boxed(),
|
||||||
|
@ -73,6 +76,17 @@ impl Data {
|
||||||
.ready_take_while(move |key| key.starts_with(&target.to_be_bytes()))
|
.ready_take_while(move |key| key.starts_with(&target.to_be_bytes()))
|
||||||
.map(|to_from| u64_from_u8(&to_from[8..16]))
|
.map(|to_from| u64_from_u8(&to_from[8..16]))
|
||||||
.map(PduCount::from_unsigned)
|
.map(PduCount::from_unsigned)
|
||||||
|
.ready_filter(move |count| {
|
||||||
|
if from == PduCount::min() || from == PduCount::max() {
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
let count_unsigned = count.into_unsigned();
|
||||||
|
match dir {
|
||||||
|
| Direction::Forward => count_unsigned > from_unsigned,
|
||||||
|
| Direction::Backward => count_unsigned < from_unsigned,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
.wide_filter_map(move |shorteventid| async move {
|
.wide_filter_map(move |shorteventid| async move {
|
||||||
let pdu_id: RawPduId = PduId { shortroomid, shorteventid }.into();
|
let pdu_id: RawPduId = PduId { shortroomid, shorteventid }.into();
|
||||||
|
|
||||||
|
|
|
@ -19,7 +19,7 @@ use ruma::{
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
|
|
||||||
use crate::{Dep, account_data, admin, globals, rooms};
|
use crate::{Dep, account_data, admin, appservice, globals, rooms};
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub struct UserSuspension {
|
pub struct UserSuspension {
|
||||||
|
@ -40,6 +40,7 @@ struct Services {
|
||||||
server: Arc<Server>,
|
server: Arc<Server>,
|
||||||
account_data: Dep<account_data::Service>,
|
account_data: Dep<account_data::Service>,
|
||||||
admin: Dep<admin::Service>,
|
admin: Dep<admin::Service>,
|
||||||
|
appservice: Dep<appservice::Service>,
|
||||||
globals: Dep<globals::Service>,
|
globals: Dep<globals::Service>,
|
||||||
state_accessor: Dep<rooms::state_accessor::Service>,
|
state_accessor: Dep<rooms::state_accessor::Service>,
|
||||||
state_cache: Dep<rooms::state_cache::Service>,
|
state_cache: Dep<rooms::state_cache::Service>,
|
||||||
|
@ -76,6 +77,7 @@ impl crate::Service for Service {
|
||||||
server: args.server.clone(),
|
server: args.server.clone(),
|
||||||
account_data: args.depend::<account_data::Service>("account_data"),
|
account_data: args.depend::<account_data::Service>("account_data"),
|
||||||
admin: args.depend::<admin::Service>("admin"),
|
admin: args.depend::<admin::Service>("admin"),
|
||||||
|
appservice: args.depend::<appservice::Service>("appservice"),
|
||||||
globals: args.depend::<globals::Service>("globals"),
|
globals: args.depend::<globals::Service>("globals"),
|
||||||
state_accessor: args
|
state_accessor: args
|
||||||
.depend::<rooms::state_accessor::Service>("rooms::state_accessor"),
|
.depend::<rooms::state_accessor::Service>("rooms::state_accessor"),
|
||||||
|
@ -391,6 +393,31 @@ impl Service {
|
||||||
self.db.userdeviceid_token.qry(&key).await.deserialized()
|
self.db.userdeviceid_token.qry(&key).await.deserialized()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Generate a unique access token that doesn't collide with existing tokens
|
||||||
|
pub async fn generate_unique_token(&self) -> String {
|
||||||
|
loop {
|
||||||
|
let token = utils::random_string(32);
|
||||||
|
|
||||||
|
// Check for collision with appservice tokens
|
||||||
|
if self
|
||||||
|
.services
|
||||||
|
.appservice
|
||||||
|
.find_from_token(&token)
|
||||||
|
.await
|
||||||
|
.is_ok()
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for collision with user tokens
|
||||||
|
if self.db.token_userdeviceid.get(&token).await.is_ok() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
return token;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Replaces the access token of one device.
|
/// Replaces the access token of one device.
|
||||||
pub async fn set_token(
|
pub async fn set_token(
|
||||||
&self,
|
&self,
|
||||||
|
@ -407,6 +434,19 @@ impl Service {
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check for token collision with appservices
|
||||||
|
if self
|
||||||
|
.services
|
||||||
|
.appservice
|
||||||
|
.find_from_token(token)
|
||||||
|
.await
|
||||||
|
.is_ok()
|
||||||
|
{
|
||||||
|
return Err!(Request(InvalidParam(
|
||||||
|
"Token conflicts with an existing appservice token"
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
// Remove old token
|
// Remove old token
|
||||||
if let Ok(old_token) = self.db.userdeviceid_token.qry(&key).await {
|
if let Ok(old_token) = self.db.userdeviceid_token.qry(&key).await {
|
||||||
self.db.token_userdeviceid.remove(&old_token);
|
self.db.token_userdeviceid.remove(&old_token);
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue