mirror of
https://forgejo.ellis.link/continuwuation/continuwuity.git
synced 2025-09-11 20:33:02 +02:00
Compare commits
6 commits
f36028b869
...
394382884d
Author | SHA1 | Date | |
---|---|---|---|
|
394382884d |
||
|
2e1575863a |
||
|
547d4d9f88 |
||
|
b3a9cff497 |
||
|
ea8e80b2b7 |
||
|
a047cb600e |
27 changed files with 317 additions and 230 deletions
2
.envrc
2
.envrc
|
@ -2,6 +2,6 @@
|
|||
|
||||
dotenv_if_exists
|
||||
|
||||
use flake ".#${DIRENV_DEVSHELL:-default}"
|
||||
# use flake ".#${DIRENV_DEVSHELL:-default}"
|
||||
|
||||
PATH_add bin
|
||||
|
|
30
Cargo.lock
generated
30
Cargo.lock
generated
|
@ -963,6 +963,7 @@ dependencies = [
|
|||
"itertools 0.14.0",
|
||||
"libc",
|
||||
"libloading",
|
||||
"lock_api",
|
||||
"log",
|
||||
"maplit",
|
||||
"nix",
|
||||
|
@ -1658,6 +1659,12 @@ dependencies = [
|
|||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fixedbitset"
|
||||
version = "0.4.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
|
||||
|
||||
[[package]]
|
||||
name = "flate2"
|
||||
version = "1.1.2"
|
||||
|
@ -3219,10 +3226,13 @@ version = "0.9.11"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bc838d2a56b5b1a6c25f55575dfc605fabb63bb2365f6c2353ef9159aa69e4a5"
|
||||
dependencies = [
|
||||
"backtrace",
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"petgraph",
|
||||
"redox_syscall",
|
||||
"smallvec",
|
||||
"thread-id",
|
||||
"windows-targets 0.52.6",
|
||||
]
|
||||
|
||||
|
@ -3272,6 +3282,16 @@ version = "2.3.1"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
|
||||
|
||||
[[package]]
|
||||
name = "petgraph"
|
||||
version = "0.6.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db"
|
||||
dependencies = [
|
||||
"fixedbitset",
|
||||
"indexmap 2.9.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "phf"
|
||||
version = "0.11.3"
|
||||
|
@ -4893,6 +4913,16 @@ dependencies = [
|
|||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "thread-id"
|
||||
version = "4.2.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cfe8f25bbdd100db7e1d34acf7fd2dc59c4bf8f7483f505eaa7d4f12f76cc0ea"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "thread_local"
|
||||
version = "1.1.9"
|
||||
|
|
|
@ -517,10 +517,11 @@ version = "1.0"
|
|||
|
||||
[workspace.dependencies.parking_lot]
|
||||
version = "0.12.4"
|
||||
features = ["hardware-lock-elision", "deadlock_detection"] # TODO: Check if deadlock_detection has a perf impact, if it does only enable with debug_assertions
|
||||
|
||||
# Use this when extending with_lock::WithLock to parking_lot
|
||||
# [workspace.dependencies.lock_api]
|
||||
# version = "0.4.13"
|
||||
[workspace.dependencies.lock_api]
|
||||
version = "0.4.13"
|
||||
|
||||
[workspace.dependencies.bytesize]
|
||||
version = "2.0"
|
||||
|
|
|
@ -26,8 +26,7 @@ pub(super) async fn incoming_federation(&self) -> Result {
|
|||
.rooms
|
||||
.event_handler
|
||||
.federation_handletime
|
||||
.read()
|
||||
.expect("locked");
|
||||
.read();
|
||||
|
||||
let mut msg = format!("Handling {} incoming pdus:\n", map.len());
|
||||
for (r, (e, i)) in map.iter() {
|
||||
|
|
|
@ -37,11 +37,7 @@ pub use crate::admin::AdminCommand;
|
|||
|
||||
/// Install the admin command processor
|
||||
pub async fn init(admin_service: &service::admin::Service) {
|
||||
_ = admin_service
|
||||
.complete
|
||||
.write()
|
||||
.expect("locked for writing")
|
||||
.insert(processor::complete);
|
||||
_ = admin_service.complete.write().insert(processor::complete);
|
||||
_ = admin_service
|
||||
.handle
|
||||
.write()
|
||||
|
@ -52,9 +48,5 @@ pub async fn init(admin_service: &service::admin::Service) {
|
|||
/// Uninstall the admin command handler
|
||||
pub async fn fini(admin_service: &service::admin::Service) {
|
||||
_ = admin_service.handle.write().await.take();
|
||||
_ = admin_service
|
||||
.complete
|
||||
.write()
|
||||
.expect("locked for writing")
|
||||
.take();
|
||||
_ = admin_service.complete.write().take();
|
||||
}
|
||||
|
|
|
@ -111,6 +111,7 @@ tracing-subscriber.workspace = true
|
|||
tracing.workspace = true
|
||||
url.workspace = true
|
||||
parking_lot.workspace = true
|
||||
lock_api.workspace = true
|
||||
|
||||
[target.'cfg(unix)'.dependencies]
|
||||
nix.workspace = true
|
||||
|
|
|
@ -4,7 +4,6 @@ use std::{
|
|||
cell::OnceCell,
|
||||
ffi::{CStr, c_char, c_void},
|
||||
fmt::Debug,
|
||||
sync::RwLock,
|
||||
};
|
||||
|
||||
use arrayvec::ArrayVec;
|
||||
|
@ -13,7 +12,7 @@ use tikv_jemalloc_sys as ffi;
|
|||
use tikv_jemallocator as jemalloc;
|
||||
|
||||
use crate::{
|
||||
Result, err, is_equal_to, is_nonzero,
|
||||
Result, SyncRwLock, err, is_equal_to, is_nonzero,
|
||||
utils::{math, math::Tried},
|
||||
};
|
||||
|
||||
|
@ -40,7 +39,7 @@ const MALLOC_CONF_PROF: &str = "";
|
|||
|
||||
#[global_allocator]
|
||||
static JEMALLOC: jemalloc::Jemalloc = jemalloc::Jemalloc;
|
||||
static CONTROL: RwLock<()> = RwLock::new(());
|
||||
static CONTROL: SyncRwLock<()> = SyncRwLock::new(());
|
||||
|
||||
type Name = ArrayVec<u8, NAME_MAX>;
|
||||
type Key = ArrayVec<usize, KEY_SEGS>;
|
||||
|
@ -332,7 +331,7 @@ fn set<T>(key: &Key, val: T) -> Result<T>
|
|||
where
|
||||
T: Copy + Debug,
|
||||
{
|
||||
let _lock = CONTROL.write()?;
|
||||
let _lock = CONTROL.write();
|
||||
let res = xchg(key, val)?;
|
||||
inc_epoch()?;
|
||||
|
||||
|
|
|
@ -1,65 +1,212 @@
|
|||
//! Traits for explicitly scoping the lifetime of locks.
|
||||
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::{
|
||||
future::Future,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
|
||||
pub trait WithLock<T> {
|
||||
/// Acquires a lock and executes the given closure with the locked data.
|
||||
fn with_lock<F>(&self, f: F)
|
||||
pub trait WithLock<T: ?Sized> {
|
||||
/// Acquires a lock and executes the given closure with the locked data,
|
||||
/// returning the result.
|
||||
fn with_lock<R, F>(&self, f: F) -> R
|
||||
where
|
||||
F: FnMut(&mut T);
|
||||
F: FnMut(&mut T) -> R;
|
||||
}
|
||||
|
||||
impl<T> WithLock<T> for Mutex<T> {
|
||||
fn with_lock<F>(&self, mut f: F)
|
||||
fn with_lock<R, F>(&self, mut f: F) -> R
|
||||
where
|
||||
F: FnMut(&mut T),
|
||||
F: FnMut(&mut T) -> R,
|
||||
{
|
||||
// The locking and unlocking logic is hidden inside this function.
|
||||
let mut data_guard = self.lock().unwrap();
|
||||
f(&mut data_guard);
|
||||
f(&mut data_guard)
|
||||
// Lock is released here when `data_guard` goes out of scope.
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> WithLock<T> for Arc<Mutex<T>> {
|
||||
fn with_lock<F>(&self, mut f: F)
|
||||
fn with_lock<R, F>(&self, mut f: F) -> R
|
||||
where
|
||||
F: FnMut(&mut T),
|
||||
F: FnMut(&mut T) -> R,
|
||||
{
|
||||
// The locking and unlocking logic is hidden inside this function.
|
||||
let mut data_guard = self.lock().unwrap();
|
||||
f(&mut data_guard);
|
||||
f(&mut data_guard)
|
||||
// Lock is released here when `data_guard` goes out of scope.
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: lock_api::RawMutex, T: ?Sized> WithLock<T> for lock_api::Mutex<R, T> {
|
||||
fn with_lock<Ret, F>(&self, mut f: F) -> Ret
|
||||
where
|
||||
F: FnMut(&mut T) -> Ret,
|
||||
{
|
||||
// The locking and unlocking logic is hidden inside this function.
|
||||
let mut data_guard = self.lock();
|
||||
f(&mut data_guard)
|
||||
// Lock is released here when `data_guard` goes out of scope.
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: lock_api::RawMutex, T: ?Sized> WithLock<T> for Arc<lock_api::Mutex<R, T>> {
|
||||
fn with_lock<Ret, F>(&self, mut f: F) -> Ret
|
||||
where
|
||||
F: FnMut(&mut T) -> Ret,
|
||||
{
|
||||
// The locking and unlocking logic is hidden inside this function.
|
||||
let mut data_guard = self.lock();
|
||||
f(&mut data_guard)
|
||||
// Lock is released here when `data_guard` goes out of scope.
|
||||
}
|
||||
}
|
||||
|
||||
pub trait WithLockAsync<T> {
|
||||
/// Acquires a lock and executes the given closure with the locked data.
|
||||
fn with_lock<F>(&self, f: F) -> impl Future<Output = ()>
|
||||
/// Acquires a lock and executes the given closure with the locked data,
|
||||
/// returning the result.
|
||||
fn with_lock<R, F>(&self, f: F) -> impl Future<Output = R>
|
||||
where
|
||||
F: FnMut(&mut T);
|
||||
F: FnMut(&mut T) -> R;
|
||||
|
||||
/// Acquires a lock and executes the given async closure with the locked
|
||||
/// data.
|
||||
fn with_lock_async<R, F>(&self, f: F) -> impl std::future::Future<Output = R>
|
||||
where
|
||||
F: AsyncFnMut(&mut T) -> R;
|
||||
}
|
||||
|
||||
impl<T> WithLockAsync<T> for futures::lock::Mutex<T> {
|
||||
async fn with_lock<F>(&self, mut f: F)
|
||||
async fn with_lock<R, F>(&self, mut f: F) -> R
|
||||
where
|
||||
F: FnMut(&mut T),
|
||||
F: FnMut(&mut T) -> R,
|
||||
{
|
||||
// The locking and unlocking logic is hidden inside this function.
|
||||
let mut data_guard = self.lock().await;
|
||||
f(&mut data_guard);
|
||||
f(&mut data_guard)
|
||||
// Lock is released here when `data_guard` goes out of scope.
|
||||
}
|
||||
|
||||
async fn with_lock_async<R, F>(&self, mut f: F) -> R
|
||||
where
|
||||
F: AsyncFnMut(&mut T) -> R,
|
||||
{
|
||||
// The locking and unlocking logic is hidden inside this function.
|
||||
let mut data_guard = self.lock().await;
|
||||
f(&mut data_guard).await
|
||||
// Lock is released here when `data_guard` goes out of scope.
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> WithLockAsync<T> for Arc<futures::lock::Mutex<T>> {
|
||||
async fn with_lock<F>(&self, mut f: F)
|
||||
async fn with_lock<R, F>(&self, mut f: F) -> R
|
||||
where
|
||||
F: FnMut(&mut T),
|
||||
F: FnMut(&mut T) -> R,
|
||||
{
|
||||
// The locking and unlocking logic is hidden inside this function.
|
||||
let mut data_guard = self.lock().await;
|
||||
f(&mut data_guard);
|
||||
f(&mut data_guard)
|
||||
// Lock is released here when `data_guard` goes out of scope.
|
||||
}
|
||||
|
||||
async fn with_lock_async<R, F>(&self, mut f: F) -> R
|
||||
where
|
||||
F: AsyncFnMut(&mut T) -> R,
|
||||
{
|
||||
// The locking and unlocking logic is hidden inside this function.
|
||||
let mut data_guard = self.lock().await;
|
||||
f(&mut data_guard).await
|
||||
// Lock is released here when `data_guard` goes out of scope.
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_with_lock_return_value() {
|
||||
let mutex = Mutex::new(5);
|
||||
let result = mutex.with_lock(|v| {
|
||||
*v += 1;
|
||||
*v * 2
|
||||
});
|
||||
assert_eq!(result, 12);
|
||||
let value = mutex.lock().unwrap();
|
||||
assert_eq!(*value, 6);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_with_lock_unit_return() {
|
||||
let mutex = Mutex::new(10);
|
||||
mutex.with_lock(|v| {
|
||||
*v += 2;
|
||||
});
|
||||
let value = mutex.lock().unwrap();
|
||||
assert_eq!(*value, 12);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_with_lock_arc_mutex() {
|
||||
let mutex = Arc::new(Mutex::new(1));
|
||||
let result = mutex.with_lock(|v| {
|
||||
*v *= 10;
|
||||
*v
|
||||
});
|
||||
assert_eq!(result, 10);
|
||||
assert_eq!(*mutex.lock().unwrap(), 10);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_with_lock_async_return_value() {
|
||||
use futures::lock::Mutex as AsyncMutex;
|
||||
let mutex = AsyncMutex::new(7);
|
||||
let result = mutex
|
||||
.with_lock(|v| {
|
||||
*v += 3;
|
||||
*v * 2
|
||||
})
|
||||
.await;
|
||||
assert_eq!(result, 20);
|
||||
let value = mutex.lock().await;
|
||||
assert_eq!(*value, 10);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_with_lock_async_unit_return() {
|
||||
use futures::lock::Mutex as AsyncMutex;
|
||||
let mutex = AsyncMutex::new(100);
|
||||
mutex
|
||||
.with_lock(|v| {
|
||||
*v -= 50;
|
||||
})
|
||||
.await;
|
||||
let value = mutex.lock().await;
|
||||
assert_eq!(*value, 50);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_with_lock_async_closure() {
|
||||
use futures::lock::Mutex as AsyncMutex;
|
||||
let mutex = AsyncMutex::new(1);
|
||||
mutex
|
||||
.with_lock_async(async |v| {
|
||||
*v += 9;
|
||||
})
|
||||
.await;
|
||||
let value = mutex.lock().await;
|
||||
assert_eq!(*value, 10);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_with_lock_async_arc_mutex() {
|
||||
use futures::lock::Mutex as AsyncMutex;
|
||||
let mutex = Arc::new(AsyncMutex::new(2));
|
||||
mutex
|
||||
.with_lock_async(async |v: &mut i32| {
|
||||
*v *= 5;
|
||||
})
|
||||
.await;
|
||||
let value = mutex.lock().await;
|
||||
assert_eq!(*value, 10);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -71,7 +71,7 @@ pub fn backup_count(&self) -> Result<usize> {
|
|||
fn backup_engine(&self) -> Result<BackupEngine> {
|
||||
let path = self.backup_path()?;
|
||||
let options = BackupEngineOptions::new(path).map_err(map_err)?;
|
||||
BackupEngine::open(&options, &*self.ctx.env.lock()?).map_err(map_err)
|
||||
BackupEngine::open(&options, &self.ctx.env.lock()).map_err(map_err)
|
||||
}
|
||||
|
||||
#[implement(Engine)]
|
||||
|
|
|
@ -232,7 +232,7 @@ fn get_cache(ctx: &Context, desc: &Descriptor) -> Option<Cache> {
|
|||
cache_opts.set_num_shard_bits(shard_bits);
|
||||
cache_opts.set_capacity(size);
|
||||
|
||||
let mut caches = ctx.col_cache.lock().expect("locked");
|
||||
let mut caches = ctx.col_cache.lock();
|
||||
match desc.cache_disp {
|
||||
| CacheDisp::Unique if desc.cache_size == 0 => None,
|
||||
| CacheDisp::Unique => {
|
||||
|
|
|
@ -1,9 +1,6 @@
|
|||
use std::{
|
||||
collections::BTreeMap,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
use std::{collections::BTreeMap, sync::Arc};
|
||||
|
||||
use conduwuit::{Result, Server, debug, utils::math::usize_from_f64};
|
||||
use conduwuit::{Result, Server, SyncMutex, debug, utils::math::usize_from_f64};
|
||||
use rocksdb::{Cache, Env, LruCacheOptions};
|
||||
|
||||
use crate::{or_else, pool::Pool};
|
||||
|
@ -14,9 +11,9 @@ use crate::{or_else, pool::Pool};
|
|||
/// These assets are housed in the shared Context.
|
||||
pub(crate) struct Context {
|
||||
pub(crate) pool: Arc<Pool>,
|
||||
pub(crate) col_cache: Mutex<BTreeMap<String, Cache>>,
|
||||
pub(crate) row_cache: Mutex<Cache>,
|
||||
pub(crate) env: Mutex<Env>,
|
||||
pub(crate) col_cache: SyncMutex<BTreeMap<String, Cache>>,
|
||||
pub(crate) row_cache: SyncMutex<Cache>,
|
||||
pub(crate) env: SyncMutex<Env>,
|
||||
pub(crate) server: Arc<Server>,
|
||||
}
|
||||
|
||||
|
@ -68,7 +65,7 @@ impl Drop for Context {
|
|||
debug!("Closing frontend pool");
|
||||
self.pool.close();
|
||||
|
||||
let mut env = self.env.lock().expect("locked");
|
||||
let mut env = self.env.lock();
|
||||
|
||||
debug!("Shutting down background threads");
|
||||
env.set_high_priority_background_threads(0);
|
||||
|
|
|
@ -9,7 +9,7 @@ use crate::or_else;
|
|||
#[implement(Engine)]
|
||||
pub fn memory_usage(&self) -> Result<String> {
|
||||
let mut res = String::new();
|
||||
let stats = get_memory_usage_stats(Some(&[&self.db]), Some(&[&*self.ctx.row_cache.lock()?]))
|
||||
let stats = get_memory_usage_stats(Some(&[&self.db]), Some(&[&*self.ctx.row_cache.lock()]))
|
||||
.or_else(or_else)?;
|
||||
let mibs = |input| f64::from(u32::try_from(input / 1024).unwrap_or(0)) / 1024.0;
|
||||
writeln!(
|
||||
|
@ -19,10 +19,10 @@ pub fn memory_usage(&self) -> Result<String> {
|
|||
mibs(stats.mem_table_total),
|
||||
mibs(stats.mem_table_unflushed),
|
||||
mibs(stats.mem_table_readers_total),
|
||||
mibs(u64::try_from(self.ctx.row_cache.lock()?.get_usage())?),
|
||||
mibs(u64::try_from(self.ctx.row_cache.lock().get_usage())?),
|
||||
)?;
|
||||
|
||||
for (name, cache) in &*self.ctx.col_cache.lock()? {
|
||||
for (name, cache) in &*self.ctx.col_cache.lock() {
|
||||
writeln!(res, "{name} cache: {:.2} MiB", mibs(u64::try_from(cache.get_usage())?))?;
|
||||
}
|
||||
|
||||
|
|
|
@ -23,11 +23,7 @@ pub(crate) async fn open(ctx: Arc<Context>, desc: &[Descriptor]) -> Result<Arc<S
|
|||
let config = &server.config;
|
||||
let path = &config.database_path;
|
||||
|
||||
let db_opts = db_options(
|
||||
config,
|
||||
&ctx.env.lock().expect("environment locked"),
|
||||
&ctx.row_cache.lock().expect("row cache locked"),
|
||||
)?;
|
||||
let db_opts = db_options(config, &ctx.env.lock(), &ctx.row_cache.lock())?;
|
||||
|
||||
let cfds = Self::configure_cfds(&ctx, &db_opts, desc)?;
|
||||
let num_cfds = cfds.len();
|
||||
|
|
|
@ -3,7 +3,7 @@ mod configure;
|
|||
use std::{
|
||||
mem::take,
|
||||
sync::{
|
||||
Arc, Mutex,
|
||||
Arc,
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
},
|
||||
thread,
|
||||
|
@ -12,7 +12,7 @@ use std::{
|
|||
|
||||
use async_channel::{QueueStrategy, Receiver, RecvError, Sender};
|
||||
use conduwuit::{
|
||||
Error, Result, Server, debug, err, error, implement,
|
||||
Error, Result, Server, SyncMutex, debug, err, error, implement,
|
||||
result::DebugInspect,
|
||||
smallvec::SmallVec,
|
||||
trace,
|
||||
|
@ -31,7 +31,7 @@ use crate::{Handle, Map, keyval::KeyBuf, stream};
|
|||
pub(crate) struct Pool {
|
||||
server: Arc<Server>,
|
||||
queues: Vec<Sender<Cmd>>,
|
||||
workers: Mutex<Vec<JoinHandle<()>>>,
|
||||
workers: SyncMutex<Vec<JoinHandle<()>>>,
|
||||
topology: Vec<usize>,
|
||||
busy: AtomicUsize,
|
||||
queued_max: AtomicUsize,
|
||||
|
@ -115,7 +115,7 @@ impl Drop for Pool {
|
|||
#[implement(Pool)]
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub(crate) fn close(&self) {
|
||||
let workers = take(&mut *self.workers.lock().expect("locked"));
|
||||
let workers = take(&mut *self.workers.lock());
|
||||
|
||||
let senders = self.queues.iter().map(Sender::sender_count).sum::<usize>();
|
||||
|
||||
|
@ -154,7 +154,7 @@ pub(crate) fn close(&self) {
|
|||
|
||||
#[implement(Pool)]
|
||||
fn spawn_until(self: &Arc<Self>, recv: &[Receiver<Cmd>], count: usize) -> Result {
|
||||
let mut workers = self.workers.lock().expect("locked");
|
||||
let mut workers = self.workers.lock();
|
||||
while workers.len() < count {
|
||||
self.clone().spawn_one(&mut workers, recv)?;
|
||||
}
|
||||
|
|
|
@ -1,11 +1,8 @@
|
|||
#![cfg(feature = "console")]
|
||||
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
use std::{collections::VecDeque, sync::Arc};
|
||||
|
||||
use conduwuit::{Server, debug, defer, error, log, log::is_systemd_mode};
|
||||
use conduwuit::{Server, SyncMutex, debug, defer, error, log, log::is_systemd_mode};
|
||||
use futures::future::{AbortHandle, Abortable};
|
||||
use ruma::events::room::message::RoomMessageEventContent;
|
||||
use rustyline_async::{Readline, ReadlineError, ReadlineEvent};
|
||||
|
@ -17,10 +14,10 @@ use crate::{Dep, admin};
|
|||
pub struct Console {
|
||||
server: Arc<Server>,
|
||||
admin: Dep<admin::Service>,
|
||||
worker_join: Mutex<Option<JoinHandle<()>>>,
|
||||
input_abort: Mutex<Option<AbortHandle>>,
|
||||
command_abort: Mutex<Option<AbortHandle>>,
|
||||
history: Mutex<VecDeque<String>>,
|
||||
worker_join: SyncMutex<Option<JoinHandle<()>>>,
|
||||
input_abort: SyncMutex<Option<AbortHandle>>,
|
||||
command_abort: SyncMutex<Option<AbortHandle>>,
|
||||
history: SyncMutex<VecDeque<String>>,
|
||||
output: MadSkin,
|
||||
}
|
||||
|
||||
|
@ -50,7 +47,7 @@ impl Console {
|
|||
}
|
||||
|
||||
pub async fn start(self: &Arc<Self>) {
|
||||
let mut worker_join = self.worker_join.lock().expect("locked");
|
||||
let mut worker_join = self.worker_join.lock();
|
||||
if worker_join.is_none() {
|
||||
let self_ = Arc::clone(self);
|
||||
_ = worker_join.insert(self.server.runtime().spawn(self_.worker()));
|
||||
|
@ -60,7 +57,7 @@ impl Console {
|
|||
pub async fn close(self: &Arc<Self>) {
|
||||
self.interrupt();
|
||||
|
||||
let Some(worker_join) = self.worker_join.lock().expect("locked").take() else {
|
||||
let Some(worker_join) = self.worker_join.lock().take() else {
|
||||
return;
|
||||
};
|
||||
|
||||
|
@ -70,22 +67,18 @@ impl Console {
|
|||
pub fn interrupt(self: &Arc<Self>) {
|
||||
self.interrupt_command();
|
||||
self.interrupt_readline();
|
||||
self.worker_join
|
||||
.lock()
|
||||
.expect("locked")
|
||||
.as_ref()
|
||||
.map(JoinHandle::abort);
|
||||
self.worker_join.lock().as_ref().map(JoinHandle::abort);
|
||||
}
|
||||
|
||||
pub fn interrupt_readline(self: &Arc<Self>) {
|
||||
if let Some(input_abort) = self.input_abort.lock().expect("locked").take() {
|
||||
if let Some(input_abort) = self.input_abort.lock().take() {
|
||||
debug!("Interrupting console readline...");
|
||||
input_abort.abort();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn interrupt_command(self: &Arc<Self>) {
|
||||
if let Some(command_abort) = self.command_abort.lock().expect("locked").take() {
|
||||
if let Some(command_abort) = self.command_abort.lock().take() {
|
||||
debug!("Interrupting console command...");
|
||||
command_abort.abort();
|
||||
}
|
||||
|
@ -120,7 +113,7 @@ impl Console {
|
|||
}
|
||||
|
||||
debug!("session ending");
|
||||
self.worker_join.lock().expect("locked").take();
|
||||
self.worker_join.lock().take();
|
||||
}
|
||||
|
||||
async fn readline(self: &Arc<Self>) -> Result<ReadlineEvent, ReadlineError> {
|
||||
|
@ -135,9 +128,9 @@ impl Console {
|
|||
|
||||
let (abort, abort_reg) = AbortHandle::new_pair();
|
||||
let future = Abortable::new(future, abort_reg);
|
||||
_ = self.input_abort.lock().expect("locked").insert(abort);
|
||||
_ = self.input_abort.lock().insert(abort);
|
||||
defer! {{
|
||||
_ = self.input_abort.lock().expect("locked").take();
|
||||
_ = self.input_abort.lock().take();
|
||||
}}
|
||||
|
||||
let Ok(result) = future.await else {
|
||||
|
@ -158,9 +151,9 @@ impl Console {
|
|||
|
||||
let (abort, abort_reg) = AbortHandle::new_pair();
|
||||
let future = Abortable::new(future, abort_reg);
|
||||
_ = self.command_abort.lock().expect("locked").insert(abort);
|
||||
_ = self.command_abort.lock().insert(abort);
|
||||
defer! {{
|
||||
_ = self.command_abort.lock().expect("locked").take();
|
||||
_ = self.command_abort.lock().take();
|
||||
}}
|
||||
|
||||
_ = future.await;
|
||||
|
@ -184,12 +177,7 @@ impl Console {
|
|||
}
|
||||
|
||||
fn set_history(&self, readline: &mut Readline) {
|
||||
self.history
|
||||
.lock()
|
||||
.expect("locked")
|
||||
.iter()
|
||||
.rev()
|
||||
.for_each(|entry| {
|
||||
self.history.lock().iter().rev().for_each(|entry| {
|
||||
readline
|
||||
.add_history_entry(entry.clone())
|
||||
.expect("added history entry");
|
||||
|
@ -197,7 +185,7 @@ impl Console {
|
|||
}
|
||||
|
||||
fn add_history(&self, line: String) {
|
||||
let mut history = self.history.lock().expect("locked");
|
||||
let mut history = self.history.lock();
|
||||
history.push_front(line);
|
||||
history.truncate(HISTORY_LIMIT);
|
||||
}
|
||||
|
|
|
@ -5,11 +5,11 @@ mod grant;
|
|||
|
||||
use std::{
|
||||
pin::Pin,
|
||||
sync::{Arc, RwLock as StdRwLock, Weak},
|
||||
sync::{Arc, Weak},
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use conduwuit::{Err, utils};
|
||||
use conduwuit::{Err, SyncRwLock, utils};
|
||||
use conduwuit_core::{
|
||||
Error, Event, Result, Server, debug, err, error, error::default_log, pdu::PduBuilder,
|
||||
};
|
||||
|
@ -36,7 +36,7 @@ pub struct Service {
|
|||
services: Services,
|
||||
channel: (Sender<CommandInput>, Receiver<CommandInput>),
|
||||
pub handle: RwLock<Option<Processor>>,
|
||||
pub complete: StdRwLock<Option<Completer>>,
|
||||
pub complete: SyncRwLock<Option<Completer>>,
|
||||
#[cfg(feature = "console")]
|
||||
pub console: Arc<console::Console>,
|
||||
}
|
||||
|
@ -50,7 +50,7 @@ struct Services {
|
|||
state_cache: Dep<rooms::state_cache::Service>,
|
||||
state_accessor: Dep<rooms::state_accessor::Service>,
|
||||
account_data: Dep<account_data::Service>,
|
||||
services: StdRwLock<Option<Weak<crate::Services>>>,
|
||||
services: SyncRwLock<Option<Weak<crate::Services>>>,
|
||||
media: Dep<crate::media::Service>,
|
||||
}
|
||||
|
||||
|
@ -105,7 +105,7 @@ impl crate::Service for Service {
|
|||
},
|
||||
channel: loole::bounded(COMMAND_QUEUE_LIMIT),
|
||||
handle: RwLock::new(None),
|
||||
complete: StdRwLock::new(None),
|
||||
complete: SyncRwLock::new(None),
|
||||
#[cfg(feature = "console")]
|
||||
console: console::Console::new(&args),
|
||||
}))
|
||||
|
@ -312,10 +312,7 @@ impl Service {
|
|||
/// Invokes the tab-completer to complete the command. When unavailable,
|
||||
/// None is returned.
|
||||
pub fn complete_command(&self, command: &str) -> Option<String> {
|
||||
self.complete
|
||||
.read()
|
||||
.expect("locked for reading")
|
||||
.map(|complete| complete(command))
|
||||
self.complete.read().map(|complete| complete(command))
|
||||
}
|
||||
|
||||
async fn handle_signal(&self, sig: &'static str) {
|
||||
|
@ -338,17 +335,13 @@ impl Service {
|
|||
}
|
||||
|
||||
async fn process_command(&self, command: CommandInput) -> ProcessorResult {
|
||||
let handle = &self
|
||||
.handle
|
||||
.read()
|
||||
.await
|
||||
.expect("Admin module is not loaded");
|
||||
let handle_guard = self.handle.read().await;
|
||||
let handle = handle_guard.as_ref().expect("Admin module is not loaded");
|
||||
|
||||
let services = self
|
||||
.services
|
||||
.services
|
||||
.read()
|
||||
.expect("locked")
|
||||
.as_ref()
|
||||
.and_then(Weak::upgrade)
|
||||
.expect("Services self-reference not initialized.");
|
||||
|
@ -523,7 +516,7 @@ impl Service {
|
|||
/// Sets the self-reference to crate::Services which will provide context to
|
||||
/// the admin commands.
|
||||
pub(super) fn set_services(&self, services: Option<&Arc<crate::Services>>) {
|
||||
let receiver = &mut *self.services.services.write().expect("locked for writing");
|
||||
let receiver = &mut *self.services.services.write();
|
||||
let weak = services.map(Arc::downgrade);
|
||||
*receiver = weak;
|
||||
}
|
||||
|
|
|
@ -1,14 +1,9 @@
|
|||
mod data;
|
||||
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
fmt::Write,
|
||||
sync::{Arc, RwLock},
|
||||
time::Instant,
|
||||
};
|
||||
use std::{collections::HashMap, fmt::Write, sync::Arc, time::Instant};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use conduwuit::{Result, Server, error, utils::bytes::pretty};
|
||||
use conduwuit::{Result, Server, SyncRwLock, error, utils::bytes::pretty};
|
||||
use data::Data;
|
||||
use regex::RegexSet;
|
||||
use ruma::{OwnedEventId, OwnedRoomAliasId, OwnedServerName, OwnedUserId, ServerName, UserId};
|
||||
|
@ -19,7 +14,7 @@ pub struct Service {
|
|||
pub db: Data,
|
||||
server: Arc<Server>,
|
||||
|
||||
pub bad_event_ratelimiter: Arc<RwLock<HashMap<OwnedEventId, RateLimitState>>>,
|
||||
pub bad_event_ratelimiter: Arc<SyncRwLock<HashMap<OwnedEventId, RateLimitState>>>,
|
||||
pub server_user: OwnedUserId,
|
||||
pub admin_alias: OwnedRoomAliasId,
|
||||
pub turn_secret: String,
|
||||
|
@ -62,7 +57,7 @@ impl crate::Service for Service {
|
|||
Ok(Arc::new(Self {
|
||||
db,
|
||||
server: args.server.clone(),
|
||||
bad_event_ratelimiter: Arc::new(RwLock::new(HashMap::new())),
|
||||
bad_event_ratelimiter: Arc::new(SyncRwLock::new(HashMap::new())),
|
||||
admin_alias: OwnedRoomAliasId::try_from(format!("#admins:{}", &args.server.name))
|
||||
.expect("#admins:server_name is valid alias name"),
|
||||
server_user: UserId::parse_with_server_name(
|
||||
|
@ -76,7 +71,7 @@ impl crate::Service for Service {
|
|||
}
|
||||
|
||||
async fn memory_usage(&self, out: &mut (dyn Write + Send)) -> Result {
|
||||
let (ber_count, ber_bytes) = self.bad_event_ratelimiter.read()?.iter().fold(
|
||||
let (ber_count, ber_bytes) = self.bad_event_ratelimiter.read().iter().fold(
|
||||
(0_usize, 0_usize),
|
||||
|(mut count, mut bytes), (event_id, _)| {
|
||||
bytes = bytes.saturating_add(event_id.capacity());
|
||||
|
@ -91,12 +86,7 @@ impl crate::Service for Service {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn clear_cache(&self) {
|
||||
self.bad_event_ratelimiter
|
||||
.write()
|
||||
.expect("locked for writing")
|
||||
.clear();
|
||||
}
|
||||
async fn clear_cache(&self) { self.bad_event_ratelimiter.write().clear(); }
|
||||
|
||||
fn name(&self) -> &str { service::make_name(std::module_path!()) }
|
||||
}
|
||||
|
|
|
@ -1,9 +1,6 @@
|
|||
use std::{
|
||||
mem::size_of,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
use std::{mem::size_of, sync::Arc};
|
||||
|
||||
use conduwuit::{Err, Result, err, utils, utils::math::usize_from_f64};
|
||||
use conduwuit::{Err, Result, SyncMutex, err, utils, utils::math::usize_from_f64};
|
||||
use database::Map;
|
||||
use lru_cache::LruCache;
|
||||
|
||||
|
@ -11,7 +8,7 @@ use crate::rooms::short::ShortEventId;
|
|||
|
||||
pub(super) struct Data {
|
||||
shorteventid_authchain: Arc<Map>,
|
||||
pub(super) auth_chain_cache: Mutex<LruCache<Vec<u64>, Arc<[ShortEventId]>>>,
|
||||
pub(super) auth_chain_cache: SyncMutex<LruCache<Vec<u64>, Arc<[ShortEventId]>>>,
|
||||
}
|
||||
|
||||
impl Data {
|
||||
|
@ -23,7 +20,7 @@ impl Data {
|
|||
.expect("valid cache size");
|
||||
Self {
|
||||
shorteventid_authchain: db["shorteventid_authchain"].clone(),
|
||||
auth_chain_cache: Mutex::new(LruCache::new(cache_size)),
|
||||
auth_chain_cache: SyncMutex::new(LruCache::new(cache_size)),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -34,12 +31,7 @@ impl Data {
|
|||
debug_assert!(!key.is_empty(), "auth_chain key must not be empty");
|
||||
|
||||
// Check RAM cache
|
||||
if let Some(result) = self
|
||||
.auth_chain_cache
|
||||
.lock()
|
||||
.expect("cache locked")
|
||||
.get_mut(key)
|
||||
{
|
||||
if let Some(result) = self.auth_chain_cache.lock().get_mut(key) {
|
||||
return Ok(Arc::clone(result));
|
||||
}
|
||||
|
||||
|
@ -63,7 +55,6 @@ impl Data {
|
|||
// Cache in RAM
|
||||
self.auth_chain_cache
|
||||
.lock()
|
||||
.expect("cache locked")
|
||||
.insert(vec![key[0]], Arc::clone(&chain));
|
||||
|
||||
Ok(chain)
|
||||
|
@ -84,9 +75,6 @@ impl Data {
|
|||
}
|
||||
|
||||
// Cache in RAM
|
||||
self.auth_chain_cache
|
||||
.lock()
|
||||
.expect("cache locked")
|
||||
.insert(key, auth_chain);
|
||||
self.auth_chain_cache.lock().insert(key, auth_chain);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -248,10 +248,10 @@ pub fn cache_auth_chain_vec(&self, key: Vec<u64>, auth_chain: &[ShortEventId]) {
|
|||
|
||||
#[implement(Service)]
|
||||
pub fn get_cache_usage(&self) -> (usize, usize) {
|
||||
let cache = self.db.auth_chain_cache.lock().expect("locked");
|
||||
let cache = self.db.auth_chain_cache.lock();
|
||||
|
||||
(cache.len(), cache.capacity())
|
||||
}
|
||||
|
||||
#[implement(Service)]
|
||||
pub fn clear_cache(&self) { self.db.auth_chain_cache.lock().expect("locked").clear(); }
|
||||
pub fn clear_cache(&self) { self.db.auth_chain_cache.lock().clear(); }
|
||||
|
|
|
@ -41,7 +41,6 @@ where
|
|||
.globals
|
||||
.bad_event_ratelimiter
|
||||
.write()
|
||||
.expect("locked")
|
||||
.entry(id)
|
||||
{
|
||||
| hash_map::Entry::Vacant(e) => {
|
||||
|
@ -76,7 +75,6 @@ where
|
|||
.globals
|
||||
.bad_event_ratelimiter
|
||||
.read()
|
||||
.expect("locked")
|
||||
.get(&*next_id)
|
||||
{
|
||||
// Exponential backoff
|
||||
|
@ -187,7 +185,6 @@ where
|
|||
.globals
|
||||
.bad_event_ratelimiter
|
||||
.read()
|
||||
.expect("locked")
|
||||
.get(&*next_id)
|
||||
{
|
||||
// Exponential backoff
|
||||
|
|
|
@ -160,7 +160,6 @@ pub async fn handle_incoming_pdu<'a>(
|
|||
.globals
|
||||
.bad_event_ratelimiter
|
||||
.write()
|
||||
.expect("locked")
|
||||
.entry(prev_id.into())
|
||||
{
|
||||
| hash_map::Entry::Vacant(e) => {
|
||||
|
@ -181,13 +180,11 @@ pub async fn handle_incoming_pdu<'a>(
|
|||
let start_time = Instant::now();
|
||||
self.federation_handletime
|
||||
.write()
|
||||
.expect("locked")
|
||||
.insert(room_id.into(), (event_id.to_owned(), start_time));
|
||||
|
||||
defer! {{
|
||||
self.federation_handletime
|
||||
.write()
|
||||
.expect("locked")
|
||||
.remove(room_id);
|
||||
}};
|
||||
|
||||
|
|
|
@ -42,7 +42,6 @@ where
|
|||
.globals
|
||||
.bad_event_ratelimiter
|
||||
.read()
|
||||
.expect("locked")
|
||||
.get(prev_id)
|
||||
{
|
||||
// Exponential backoff
|
||||
|
@ -70,13 +69,11 @@ where
|
|||
let start_time = Instant::now();
|
||||
self.federation_handletime
|
||||
.write()
|
||||
.expect("locked")
|
||||
.insert(room_id.into(), ((*prev_id).to_owned(), start_time));
|
||||
|
||||
defer! {{
|
||||
self.federation_handletime
|
||||
.write()
|
||||
.expect("locked")
|
||||
.remove(room_id);
|
||||
}};
|
||||
|
||||
|
|
|
@ -10,15 +10,10 @@ mod resolve_state;
|
|||
mod state_at_incoming;
|
||||
mod upgrade_outlier_pdu;
|
||||
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
fmt::Write,
|
||||
sync::{Arc, RwLock as StdRwLock},
|
||||
time::Instant,
|
||||
};
|
||||
use std::{collections::HashMap, fmt::Write, sync::Arc, time::Instant};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use conduwuit::{Err, Event, PduEvent, Result, RoomVersion, Server, utils::MutexMap};
|
||||
use conduwuit::{Err, Event, PduEvent, Result, RoomVersion, Server, SyncRwLock, utils::MutexMap};
|
||||
use ruma::{
|
||||
OwnedEventId, OwnedRoomId, RoomId, RoomVersionId,
|
||||
events::room::create::RoomCreateEventContent,
|
||||
|
@ -28,7 +23,7 @@ use crate::{Dep, globals, rooms, sending, server_keys};
|
|||
|
||||
pub struct Service {
|
||||
pub mutex_federation: RoomMutexMap,
|
||||
pub federation_handletime: StdRwLock<HandleTimeMap>,
|
||||
pub federation_handletime: SyncRwLock<HandleTimeMap>,
|
||||
services: Services,
|
||||
}
|
||||
|
||||
|
@ -81,11 +76,7 @@ impl crate::Service for Service {
|
|||
let mutex_federation = self.mutex_federation.len();
|
||||
writeln!(out, "federation_mutex: {mutex_federation}")?;
|
||||
|
||||
let federation_handletime = self
|
||||
.federation_handletime
|
||||
.read()
|
||||
.expect("locked for reading")
|
||||
.len();
|
||||
let federation_handletime = self.federation_handletime.read().len();
|
||||
writeln!(out, "federation_handletime: {federation_handletime}")?;
|
||||
|
||||
Ok(())
|
||||
|
|
|
@ -1,13 +1,10 @@
|
|||
mod update;
|
||||
mod via;
|
||||
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{Arc, RwLock},
|
||||
};
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use conduwuit::{
|
||||
Result, implement,
|
||||
Result, SyncRwLock, implement,
|
||||
result::LogErr,
|
||||
utils::{ReadyExt, stream::TryIgnore},
|
||||
warn,
|
||||
|
@ -54,14 +51,14 @@ struct Data {
|
|||
userroomid_knockedstate: Arc<Map>,
|
||||
}
|
||||
|
||||
type AppServiceInRoomCache = RwLock<HashMap<OwnedRoomId, HashMap<String, bool>>>;
|
||||
type AppServiceInRoomCache = SyncRwLock<HashMap<OwnedRoomId, HashMap<String, bool>>>;
|
||||
type StrippedStateEventItem = (OwnedRoomId, Vec<Raw<AnyStrippedStateEvent>>);
|
||||
type SyncStateEventItem = (OwnedRoomId, Vec<Raw<AnySyncStateEvent>>);
|
||||
|
||||
impl crate::Service for Service {
|
||||
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
||||
Ok(Arc::new(Self {
|
||||
appservice_in_room_cache: RwLock::new(HashMap::new()),
|
||||
appservice_in_room_cache: SyncRwLock::new(HashMap::new()),
|
||||
services: Services {
|
||||
account_data: args.depend::<account_data::Service>("account_data"),
|
||||
config: args.depend::<config::Service>("config"),
|
||||
|
@ -99,7 +96,6 @@ pub async fn appservice_in_room(&self, room_id: &RoomId, appservice: &Registrati
|
|||
if let Some(cached) = self
|
||||
.appservice_in_room_cache
|
||||
.read()
|
||||
.expect("locked")
|
||||
.get(room_id)
|
||||
.and_then(|map| map.get(&appservice.registration.id))
|
||||
.copied()
|
||||
|
@ -124,7 +120,6 @@ pub async fn appservice_in_room(&self, room_id: &RoomId, appservice: &Registrati
|
|||
|
||||
self.appservice_in_room_cache
|
||||
.write()
|
||||
.expect("locked")
|
||||
.entry(room_id.into())
|
||||
.or_default()
|
||||
.insert(appservice.registration.id.clone(), in_room);
|
||||
|
@ -134,19 +129,14 @@ pub async fn appservice_in_room(&self, room_id: &RoomId, appservice: &Registrati
|
|||
|
||||
#[implement(Service)]
|
||||
pub fn get_appservice_in_room_cache_usage(&self) -> (usize, usize) {
|
||||
let cache = self.appservice_in_room_cache.read().expect("locked");
|
||||
let cache = self.appservice_in_room_cache.read();
|
||||
|
||||
(cache.len(), cache.capacity())
|
||||
}
|
||||
|
||||
#[implement(Service)]
|
||||
#[tracing::instrument(level = "debug", skip_all)]
|
||||
pub fn clear_appservice_in_room_cache(&self) {
|
||||
self.appservice_in_room_cache
|
||||
.write()
|
||||
.expect("locked")
|
||||
.clear();
|
||||
}
|
||||
pub fn clear_appservice_in_room_cache(&self) { self.appservice_in_room_cache.write().clear(); }
|
||||
|
||||
/// Returns an iterator of all servers participating in this room.
|
||||
#[implement(Service)]
|
||||
|
|
|
@ -211,10 +211,7 @@ pub async fn update_joined_count(&self, room_id: &RoomId) {
|
|||
self.db.serverroomids.put_raw(serverroom_id, []);
|
||||
}
|
||||
|
||||
self.appservice_in_room_cache
|
||||
.write()
|
||||
.expect("locked")
|
||||
.remove(room_id);
|
||||
self.appservice_in_room_cache.write().remove(room_id);
|
||||
}
|
||||
|
||||
/// Direct DB function to directly mark a user as joined. It is not
|
||||
|
|
|
@ -2,12 +2,12 @@ use std::{
|
|||
collections::{BTreeSet, HashMap},
|
||||
fmt::{Debug, Write},
|
||||
mem::size_of,
|
||||
sync::{Arc, Mutex},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use conduwuit::{
|
||||
Result,
|
||||
Result, SyncMutex,
|
||||
arrayvec::ArrayVec,
|
||||
at, checked, err, expected, implement, utils,
|
||||
utils::{bytes, math::usize_from_f64, stream::IterStream},
|
||||
|
@ -23,7 +23,7 @@ use crate::{
|
|||
};
|
||||
|
||||
pub struct Service {
|
||||
pub stateinfo_cache: Mutex<StateInfoLruCache>,
|
||||
pub stateinfo_cache: SyncMutex<StateInfoLruCache>,
|
||||
db: Data,
|
||||
services: Services,
|
||||
}
|
||||
|
@ -86,7 +86,7 @@ impl crate::Service for Service {
|
|||
|
||||
async fn memory_usage(&self, out: &mut (dyn Write + Send)) -> Result {
|
||||
let (cache_len, ents) = {
|
||||
let cache = self.stateinfo_cache.lock().expect("locked");
|
||||
let cache = self.stateinfo_cache.lock();
|
||||
let ents = cache.iter().map(at!(1)).flat_map(|vec| vec.iter()).fold(
|
||||
HashMap::new(),
|
||||
|mut ents, ssi| {
|
||||
|
@ -110,7 +110,7 @@ impl crate::Service for Service {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn clear_cache(&self) { self.stateinfo_cache.lock().expect("locked").clear(); }
|
||||
async fn clear_cache(&self) { self.stateinfo_cache.lock().clear(); }
|
||||
|
||||
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
|
||||
}
|
||||
|
@ -123,7 +123,7 @@ pub async fn load_shortstatehash_info(
|
|||
&self,
|
||||
shortstatehash: ShortStateHash,
|
||||
) -> Result<ShortStateInfoVec> {
|
||||
if let Some(r) = self.stateinfo_cache.lock()?.get_mut(&shortstatehash) {
|
||||
if let Some(r) = self.stateinfo_cache.lock().get_mut(&shortstatehash) {
|
||||
return Ok(r.clone());
|
||||
}
|
||||
|
||||
|
@ -152,7 +152,7 @@ async fn cache_shortstatehash_info(
|
|||
shortstatehash: ShortStateHash,
|
||||
stack: ShortStateInfoVec,
|
||||
) -> Result {
|
||||
self.stateinfo_cache.lock()?.insert(shortstatehash, stack);
|
||||
self.stateinfo_cache.lock().insert(shortstatehash, stack);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -2,10 +2,10 @@ mod watch;
|
|||
|
||||
use std::{
|
||||
collections::{BTreeMap, BTreeSet},
|
||||
sync::{Arc, Mutex, Mutex as StdMutex},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use conduwuit::{Result, Server};
|
||||
use conduwuit::{Result, Server, SyncMutex};
|
||||
use database::Map;
|
||||
use ruma::{
|
||||
OwnedDeviceId, OwnedRoomId, OwnedUserId,
|
||||
|
@ -62,11 +62,11 @@ struct SnakeSyncCache {
|
|||
extensions: v5::request::Extensions,
|
||||
}
|
||||
|
||||
type DbConnections<K, V> = Mutex<BTreeMap<K, V>>;
|
||||
type DbConnections<K, V> = SyncMutex<BTreeMap<K, V>>;
|
||||
type DbConnectionsKey = (OwnedUserId, OwnedDeviceId, String);
|
||||
type DbConnectionsVal = Arc<Mutex<SlidingSyncCache>>;
|
||||
type DbConnectionsVal = Arc<SyncMutex<SlidingSyncCache>>;
|
||||
type SnakeConnectionsKey = (OwnedUserId, OwnedDeviceId, Option<String>);
|
||||
type SnakeConnectionsVal = Arc<Mutex<SnakeSyncCache>>;
|
||||
type SnakeConnectionsVal = Arc<SyncMutex<SnakeSyncCache>>;
|
||||
|
||||
impl crate::Service for Service {
|
||||
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
||||
|
@ -90,8 +90,8 @@ impl crate::Service for Service {
|
|||
state_cache: args.depend::<rooms::state_cache::Service>("rooms::state_cache"),
|
||||
typing: args.depend::<rooms::typing::Service>("rooms::typing"),
|
||||
},
|
||||
connections: StdMutex::new(BTreeMap::new()),
|
||||
snake_connections: StdMutex::new(BTreeMap::new()),
|
||||
connections: SyncMutex::new(BTreeMap::new()),
|
||||
snake_connections: SyncMutex::new(BTreeMap::new()),
|
||||
}))
|
||||
}
|
||||
|
||||
|
@ -100,22 +100,19 @@ impl crate::Service for Service {
|
|||
|
||||
impl Service {
|
||||
pub fn snake_connection_cached(&self, key: &SnakeConnectionsKey) -> bool {
|
||||
self.snake_connections
|
||||
.lock()
|
||||
.expect("locked")
|
||||
.contains_key(key)
|
||||
self.snake_connections.lock().contains_key(key)
|
||||
}
|
||||
|
||||
pub fn forget_snake_sync_connection(&self, key: &SnakeConnectionsKey) {
|
||||
self.snake_connections.lock().expect("locked").remove(key);
|
||||
self.snake_connections.lock().remove(key);
|
||||
}
|
||||
|
||||
pub fn remembered(&self, key: &DbConnectionsKey) -> bool {
|
||||
self.connections.lock().expect("locked").contains_key(key)
|
||||
self.connections.lock().contains_key(key)
|
||||
}
|
||||
|
||||
pub fn forget_sync_request_connection(&self, key: &DbConnectionsKey) {
|
||||
self.connections.lock().expect("locked").remove(key);
|
||||
self.connections.lock().remove(key);
|
||||
}
|
||||
|
||||
pub fn update_snake_sync_request_with_cache(
|
||||
|
@ -123,13 +120,13 @@ impl Service {
|
|||
snake_key: &SnakeConnectionsKey,
|
||||
request: &mut v5::Request,
|
||||
) -> BTreeMap<String, BTreeMap<OwnedRoomId, u64>> {
|
||||
let mut cache = self.snake_connections.lock().expect("locked");
|
||||
let mut cache = self.snake_connections.lock();
|
||||
let cached = Arc::clone(
|
||||
cache
|
||||
.entry(snake_key.clone())
|
||||
.or_insert_with(|| Arc::new(Mutex::new(SnakeSyncCache::default()))),
|
||||
.or_insert_with(|| Arc::new(SyncMutex::new(SnakeSyncCache::default()))),
|
||||
);
|
||||
let cached = &mut cached.lock().expect("locked");
|
||||
let cached = &mut cached.lock();
|
||||
drop(cache);
|
||||
|
||||
//v5::Request::try_from_http_request(req, path_args);
|
||||
|
@ -232,16 +229,16 @@ impl Service {
|
|||
};
|
||||
|
||||
let key = into_db_key(key.0.clone(), key.1.clone(), conn_id);
|
||||
let mut cache = self.connections.lock().expect("locked");
|
||||
let mut cache = self.connections.lock();
|
||||
let cached = Arc::clone(cache.entry(key).or_insert_with(|| {
|
||||
Arc::new(Mutex::new(SlidingSyncCache {
|
||||
Arc::new(SyncMutex::new(SlidingSyncCache {
|
||||
lists: BTreeMap::new(),
|
||||
subscriptions: BTreeMap::new(),
|
||||
known_rooms: BTreeMap::new(),
|
||||
extensions: ExtensionsConfig::default(),
|
||||
}))
|
||||
}));
|
||||
let cached = &mut cached.lock().expect("locked");
|
||||
let cached = &mut cached.lock();
|
||||
drop(cache);
|
||||
|
||||
for (list_id, list) in &mut request.lists {
|
||||
|
@ -328,16 +325,16 @@ impl Service {
|
|||
key: &DbConnectionsKey,
|
||||
subscriptions: BTreeMap<OwnedRoomId, sync_events::v4::RoomSubscription>,
|
||||
) {
|
||||
let mut cache = self.connections.lock().expect("locked");
|
||||
let mut cache = self.connections.lock();
|
||||
let cached = Arc::clone(cache.entry(key.clone()).or_insert_with(|| {
|
||||
Arc::new(Mutex::new(SlidingSyncCache {
|
||||
Arc::new(SyncMutex::new(SlidingSyncCache {
|
||||
lists: BTreeMap::new(),
|
||||
subscriptions: BTreeMap::new(),
|
||||
known_rooms: BTreeMap::new(),
|
||||
extensions: ExtensionsConfig::default(),
|
||||
}))
|
||||
}));
|
||||
let cached = &mut cached.lock().expect("locked");
|
||||
let cached = &mut cached.lock();
|
||||
drop(cache);
|
||||
|
||||
cached.subscriptions = subscriptions;
|
||||
|
@ -350,16 +347,16 @@ impl Service {
|
|||
new_cached_rooms: BTreeSet<OwnedRoomId>,
|
||||
globalsince: u64,
|
||||
) {
|
||||
let mut cache = self.connections.lock().expect("locked");
|
||||
let mut cache = self.connections.lock();
|
||||
let cached = Arc::clone(cache.entry(key.clone()).or_insert_with(|| {
|
||||
Arc::new(Mutex::new(SlidingSyncCache {
|
||||
Arc::new(SyncMutex::new(SlidingSyncCache {
|
||||
lists: BTreeMap::new(),
|
||||
subscriptions: BTreeMap::new(),
|
||||
known_rooms: BTreeMap::new(),
|
||||
extensions: ExtensionsConfig::default(),
|
||||
}))
|
||||
}));
|
||||
let cached = &mut cached.lock().expect("locked");
|
||||
let cached = &mut cached.lock();
|
||||
drop(cache);
|
||||
|
||||
for (room_id, lastsince) in cached
|
||||
|
@ -386,13 +383,13 @@ impl Service {
|
|||
globalsince: u64,
|
||||
) {
|
||||
assert!(key.2.is_some(), "Some(conn_id) required for this call");
|
||||
let mut cache = self.snake_connections.lock().expect("locked");
|
||||
let mut cache = self.snake_connections.lock();
|
||||
let cached = Arc::clone(
|
||||
cache
|
||||
.entry(key.clone())
|
||||
.or_insert_with(|| Arc::new(Mutex::new(SnakeSyncCache::default()))),
|
||||
.or_insert_with(|| Arc::new(SyncMutex::new(SnakeSyncCache::default()))),
|
||||
);
|
||||
let cached = &mut cached.lock().expect("locked");
|
||||
let cached = &mut cached.lock();
|
||||
drop(cache);
|
||||
|
||||
for (room_id, lastsince) in cached
|
||||
|
@ -416,13 +413,13 @@ impl Service {
|
|||
key: &SnakeConnectionsKey,
|
||||
subscriptions: BTreeMap<OwnedRoomId, v5::request::RoomSubscription>,
|
||||
) {
|
||||
let mut cache = self.snake_connections.lock().expect("locked");
|
||||
let mut cache = self.snake_connections.lock();
|
||||
let cached = Arc::clone(
|
||||
cache
|
||||
.entry(key.clone())
|
||||
.or_insert_with(|| Arc::new(Mutex::new(SnakeSyncCache::default()))),
|
||||
.or_insert_with(|| Arc::new(SyncMutex::new(SnakeSyncCache::default()))),
|
||||
);
|
||||
let cached = &mut cached.lock().expect("locked");
|
||||
let cached = &mut cached.lock();
|
||||
drop(cache);
|
||||
|
||||
cached.subscriptions = subscriptions;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue