From b08a5c00c713d800d1aadaa9f012f1fe5824a6f3 Mon Sep 17 00:00:00 2001 From: vicanso Date: Wed, 8 Jan 2025 21:24:16 +0800 Subject: [PATCH] refactor: adjust cache storage trait --- src/cache/http_cache.rs | 50 +++++++++++++++++++++++++---------------- src/plugin/cache.rs | 40 +++++---------------------------- 2 files changed, 37 insertions(+), 53 deletions(-) diff --git a/src/cache/http_cache.rs b/src/cache/http_cache.rs index 794938d..240cbb1 100644 --- a/src/cache/http_cache.rs +++ b/src/cache/http_cache.rs @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use super::{file, Error, Result, PAGE_SIZE}; -use crate::config::get_current_config; +use super::get_cache_backend; +use super::{Error, Result, PAGE_SIZE}; use crate::service::Error as ServiceError; use crate::service::SimpleServiceTaskFuture; use async_trait::async_trait; @@ -174,20 +174,27 @@ pub trait HttpCacheStorage: Sync + Send { fn stats(&self) -> Option { None } + + /// Returns whether this storage implementation supports the clear operation + /// + /// # Returns + /// * `bool` - Default implementation returns false, indicating no clear support + /// Implementations should override this to return true if they + /// support the clear operation + fn support_clear(&self) -> bool { + false + } } async fn do_file_storage_clear( count: u32, - dir: String, + cache: Arc, ) -> Result { // Add 1 every loop let offset = 60; if count % offset != 0 { return Ok(false); } - let Ok(storage) = file::new_file_cache(&dir) else { - return Ok(false); - }; let Some(access_before) = SystemTime::now().checked_sub(Duration::from_secs(24 * 3600)) @@ -195,22 +202,27 @@ async fn do_file_storage_clear( return Ok(false); }; - let Ok((success, fail)) = storage.clear(access_before).await else { + let Ok((success, fail)) = cache.clear(access_before).await else { return Ok(true); }; if success < 0 { return Ok(true); } - info!(dir, success, fail, "cache storage clear"); + info!(success, fail, "cache storage clear"); Ok(true) } -pub fn new_file_storage_clear_service( -) -> Option<(String, SimpleServiceTaskFuture)> { - let dir = get_current_config().basic.cache_directory.as_ref()?.clone(); +pub fn new_storage_clear_service() -> Option<(String, SimpleServiceTaskFuture)> +{ + let Ok(backend) = get_cache_backend() else { + return None; + }; + if !backend.cache.support_clear() { + return None; + } let task: SimpleServiceTaskFuture = Box::new(move |count: u32| { Box::pin({ - let value = dir.clone(); + let value = backend.cache.clone(); async move { let value = value.clone(); do_file_storage_clear(count, value).await @@ -222,13 +234,13 @@ pub fn new_file_storage_clear_service( pub struct HttpCache { pub directory: Option, - pub(crate) cached: Arc, + pub(crate) cache: Arc, } impl HttpCache { #[inline] pub fn stats(&self) -> Option { - self.cached.stats() + self.cache.stats() } } @@ -358,7 +370,7 @@ impl Storage for HttpCache { ) -> pingora::Result> { let namespace = key.namespace(); let hash = key.combined(); - if let Some(obj) = self.cached.get(&hash, namespace).await? { + if let Some(obj) = self.cache.get(&hash, namespace).await? { let meta = CacheMeta::deserialize(&obj.meta.0, &obj.meta.1)?; let size = obj.body.len(); let hit_handler = CompleteHit { @@ -398,7 +410,7 @@ impl Storage for HttpCache { meta, key: hash, namespace: key.namespace().to_string(), - cache: self.cached.clone(), + cache: self.cache.clone(), body: BytesMut::with_capacity(size), }; Ok(Box::new(miss_handler)) @@ -415,7 +427,7 @@ impl Storage for HttpCache { let hash = key.combined(); // TODO get namespace of cache key let cache_removed = - if let Ok(result) = self.cached.remove(&hash, "").await { + if let Ok(result) = self.cache.remove(&hash, "").await { result.is_some() } else { false @@ -431,9 +443,9 @@ impl Storage for HttpCache { ) -> pingora::Result { let namespace = key.namespace(); let hash = key.combined(); - if let Some(mut obj) = self.cached.get(&hash, namespace).await? { + if let Some(mut obj) = self.cache.get(&hash, namespace).await? { obj.meta = meta.serialize()?; - let _ = self.cached.put(&hash, namespace, obj).await?; + let _ = self.cache.put(&hash, namespace, obj).await?; Ok(true) } else { Err(Error::Invalid { diff --git a/src/plugin/cache.rs b/src/plugin/cache.rs index ebc63eb..46698ab 100644 --- a/src/plugin/cache.rs +++ b/src/plugin/cache.rs @@ -16,7 +16,7 @@ use super::{ get_bool_conf, get_hash_key, get_step_conf, get_str_conf, get_str_slice_conf, Error, Plugin, Result, }; -use crate::cache::{new_file_cache, new_tiny_ufo_cache, HttpCache}; +use crate::cache::{get_cache_backend, HttpCache}; use crate::config::{ get_current_config, PluginCategory, PluginConf, PluginStep, }; @@ -29,7 +29,6 @@ use bytesize::ByteSize; use fancy_regex::Regex; use http::{Method, StatusCode}; use humantime::parse_duration; -use memory_stats::memory_stats; use once_cell::sync::{Lazy, OnceCell}; use pingora::cache::eviction::simple_lru::Manager; use pingora::cache::eviction::EvictionManager; @@ -43,7 +42,6 @@ use tracing::{debug, error}; // memory limit size const MAX_MEMORY_SIZE: usize = 100 * 1024 * 1024; -static CACHE_BACKEND: OnceCell = OnceCell::new(); static PREDICTOR: OnceCell> = OnceCell::new(); static EVICTION_MANAGER: OnceCell = OnceCell::new(); static CACHE_LOCK_ONE_SECOND: OnceCell = OnceCell::new(); @@ -64,35 +62,6 @@ pub struct Cache { hash_value: String, } -fn get_cache_backend() -> Result<&'static HttpCache> { - // get global cache backend - CACHE_BACKEND.get_or_try_init(|| { - let basic_conf = &get_current_config().basic; - let size = if let Some(cache_max_size) = basic_conf.cache_max_size { - cache_max_size.as_u64() as usize - } else { - MAX_MEMORY_SIZE - }; - // file cache - let cache = if let Some(dir) = &basic_conf.cache_directory { - new_file_cache(dir.as_str()).map_err(|e| Error::Invalid { - category: "cache_backend".to_string(), - message: e.to_string(), - })? - } else { - // max memory - let max_memory = if let Some(value) = memory_stats() { - value.physical_mem / 2 - } else { - ByteSize::gb(4).as_u64() as usize - }; - // tiny ufo cache - new_tiny_ufo_cache(size.min(max_memory)) - }; - Ok(cache) - }) -} - fn get_eviction_manager() -> &'static Manager { EVICTION_MANAGER.get_or_init(|| { let size = if let Some(cache_max_size) = @@ -132,7 +101,10 @@ impl TryFrom<&PluginConf> for Cache { type Error = Error; fn try_from(value: &PluginConf) -> Result { let hash_value = get_hash_key(value); - let cache = get_cache_backend()?; + let cache = get_cache_backend().map_err(|e| Error::Invalid { + category: "cache_backend".to_string(), + message: e.to_string(), + })?; let eviction = if value.contains_key("eviction") { let eviction = get_eviction_manager(); Some(eviction as &'static (dyn EvictionManager + Sync)) @@ -326,7 +298,7 @@ impl Plugin for Cache { &session.req_header().uri, ); self.http_cache - .cached + .cache .remove(&key.combined(), key.namespace()) .await?; return Ok(Some(HttpResponse::no_content()));