|
1 | 1 | use anyhow::Context as AContext; |
| 2 | +use lru_time_cache::LruCache; |
| 3 | +use rusqlite::Connection; |
2 | 4 | use rusqlite::{functions::Context, params}; |
3 | | -use std::sync::{Arc, RwLock}; |
| 5 | +use std::sync::LazyLock; |
| 6 | +use std::sync::{Arc, Mutex}; |
4 | 7 | use std::time::Duration; |
5 | 8 |
|
6 | 9 | use zstd::dict::{DecoderDictionary, EncoderDictionary}; |
7 | 10 |
|
| 11 | +// we cache the instantiated encoder dictionaries keyed by (DbConnection, dict_id, compression_level) |
| 12 | +// DbConnection would ideally be db.path() because it's the same for multiple connections to the same db, but that would be less robust (e.g. in-memory databases) |
| 13 | +// we use a Mutex and not a RwLock because even the .get() methods on LruCache need to write (to update expiry and least recently used time) |
| 14 | +static ENCODER_DICTS: LazyLock< |
| 15 | + Mutex<LruCache<(usize, i32, i32), Arc<EncoderDictionary<'static>>>>, |
| 16 | +> = LazyLock::new(|| Mutex::new(LruCache::with_expiry_duration(Duration::from_secs(10)))); |
| 17 | + |
| 18 | +static DECODER_DICTS: LazyLock<Mutex<LruCache<(usize, i32), Arc<DecoderDictionary<'static>>>>> = |
| 19 | + LazyLock::new(|| Mutex::new(LruCache::with_expiry_duration(Duration::from_secs(10)))); |
| 20 | + |
| 21 | +/// when we open a new connection, it may reuse the same pointer location as an old connection, so we need to invalidate parts of the dict cache |
| 22 | +pub(crate) fn invalidate_caches(_db: &Connection) { |
| 23 | + // (theoretically we only need to clear caches with key db_handle_pointer but it likely doesn't matter much, |
| 24 | + // how often are you going to open a new connection?) |
| 25 | + // let db_handle_pointer = unsafe { db.handle() } as usize; |
| 26 | + log::debug!("Invalidating dict caches"); |
| 27 | + { |
| 28 | + let mut cache = ENCODER_DICTS.lock().unwrap(); |
| 29 | + cache.clear(); |
| 30 | + } |
| 31 | + { |
| 32 | + let mut cache = DECODER_DICTS.lock().unwrap(); |
| 33 | + cache.clear(); |
| 34 | + } |
| 35 | +} |
8 | 36 | // TODO: the rust interface currently requires a level when preparing a dictionary, but the zstd interface (ZSTD_CCtx_loadDictionary) does not. |
9 | 37 | // TODO: Using LruCache here isn't very smart |
10 | 38 | pub fn encoder_dict_from_ctx( |
11 | 39 | ctx: &Context, |
12 | 40 | arg_index: usize, |
13 | 41 | level: i32, |
14 | 42 | ) -> anyhow::Result<Arc<EncoderDictionary<'static>>> { |
15 | | - use lru_time_cache::LruCache; |
16 | | - // we cache the instantiated encoder dictionaries keyed by (DbConnection, dict_id, compression_level) |
17 | | - // DbConnection would ideally be db.path() because it's the same for multiple connections to the same db, but that would be less robust (e.g. in-memory databases) |
18 | | - lazy_static::lazy_static! { |
19 | | - static ref DICTS: RwLock<LruCache<(usize, i32, i32), Arc<EncoderDictionary<'static>>>> = RwLock::new(LruCache::with_expiry_duration(Duration::from_secs(10))); |
20 | | - } |
21 | 43 | let id: i32 = ctx.get(arg_index)?; |
22 | 44 | let db = unsafe { ctx.get_connection()? }; // SAFETY: This might be unsafe depending on how the connection is used. See https://github.com/rusqlite/rusqlite/issues/643#issuecomment-640181213 |
23 | 45 | let db_handle_pointer = unsafe { db.handle() } as usize; // SAFETY: We're only getting the pointer as an int, not using the raw connection |
24 | 46 |
|
25 | | - let mut dicts_write = DICTS.write().unwrap(); |
| 47 | + let mut dicts_write = ENCODER_DICTS.lock().unwrap(); |
26 | 48 | let entry = dicts_write.entry((db_handle_pointer, id, level)); |
27 | 49 | let res = match entry { |
28 | 50 | lru_time_cache::Entry::Vacant(e) => e.insert({ |
@@ -52,17 +74,17 @@ pub fn decoder_dict_from_ctx( |
52 | 74 | ctx: &Context, |
53 | 75 | arg_index: usize, |
54 | 76 | ) -> anyhow::Result<Arc<DecoderDictionary<'static>>> { |
55 | | - use lru_time_cache::LruCache; |
56 | 77 | // we cache the instantiated decoder dictionaries keyed by (DbConnection, dict_id) |
57 | 78 | // DbConnection would ideally be db.path() because it's the same for multiple connections to the same db, but that would be less robust (e.g. in-memory databases) |
58 | | - lazy_static::lazy_static! { |
59 | | - static ref DICTS: RwLock<LruCache<(usize, i32), Arc<DecoderDictionary<'static>>>> = RwLock::new(LruCache::with_expiry_duration(Duration::from_secs(10))); |
60 | | - } |
61 | 79 | let id: i32 = ctx.get(arg_index)?; |
62 | 80 | let db = unsafe { ctx.get_connection()? }; // SAFETY: This might be unsafe depending on how the connection is used. See https://github.com/rusqlite/rusqlite/issues/643#issuecomment-640181213 |
63 | 81 | let db_handle_pointer = unsafe { db.handle() } as usize; // SAFETY: We're only getting the pointer as an int, not using the raw connection |
64 | | - let mut dicts_write = DICTS.write().unwrap(); |
65 | | - let entry = dicts_write.entry((db_handle_pointer, id)); |
| 82 | + log::trace!("Using DB Handle pointer {db_handle_pointer} as cache key"); |
| 83 | + let cache_key = (db_handle_pointer, id); |
| 84 | + // since the get() function on lru cache also writes (updates last used time and expiry), |
| 85 | + // we can not use DICTS.read() (RwLock) for perf |
| 86 | + let mut dicts_write = DECODER_DICTS.lock().unwrap(); |
| 87 | + let entry = dicts_write.entry(cache_key); |
66 | 88 | let res = match entry { |
67 | 89 | lru_time_cache::Entry::Vacant(e) => e.insert({ |
68 | 90 | log::debug!( |
|
0 commit comments