Skip to content

Commit

Permalink
refactor: adjust cache storage trait
Browse files Browse the repository at this point in the history
  • Loading branch information
vicanso committed Jan 8, 2025
1 parent bbd1faf commit b08a5c0
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 53 deletions.
50 changes: 31 additions & 19 deletions src/cache/http_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use super::{file, Error, Result, PAGE_SIZE};
use crate::config::get_current_config;
use super::get_cache_backend;
use super::{Error, Result, PAGE_SIZE};
use crate::service::Error as ServiceError;
use crate::service::SimpleServiceTaskFuture;
use async_trait::async_trait;
Expand Down Expand Up @@ -174,43 +174,55 @@ pub trait HttpCacheStorage: Sync + Send {
fn stats(&self) -> Option<HttpCacheStats> {
None
}

/// Returns whether this storage implementation supports the clear operation
///
/// # Returns
/// * `bool` - Default implementation returns false, indicating no clear support
/// Implementations should override this to return true if they
/// support the clear operation
fn support_clear(&self) -> bool {
false
}
}

async fn do_file_storage_clear(
count: u32,
dir: String,
cache: Arc<dyn HttpCacheStorage>,
) -> Result<bool, ServiceError> {
// Add 1 every loop
let offset = 60;
if count % offset != 0 {
return Ok(false);
}
let Ok(storage) = file::new_file_cache(&dir) else {
return Ok(false);
};

let Some(access_before) =
SystemTime::now().checked_sub(Duration::from_secs(24 * 3600))
else {
return Ok(false);
};

let Ok((success, fail)) = storage.clear(access_before).await else {
let Ok((success, fail)) = cache.clear(access_before).await else {
return Ok(true);
};
if success < 0 {
return Ok(true);
}
info!(dir, success, fail, "cache storage clear");
info!(success, fail, "cache storage clear");
Ok(true)
}

pub fn new_file_storage_clear_service(
) -> Option<(String, SimpleServiceTaskFuture)> {
let dir = get_current_config().basic.cache_directory.as_ref()?.clone();
pub fn new_storage_clear_service() -> Option<(String, SimpleServiceTaskFuture)>
{
let Ok(backend) = get_cache_backend() else {
return None;
};
if !backend.cache.support_clear() {
return None;
}
let task: SimpleServiceTaskFuture = Box::new(move |count: u32| {
Box::pin({
let value = dir.clone();
let value = backend.cache.clone();
async move {
let value = value.clone();
do_file_storage_clear(count, value).await
Expand All @@ -222,13 +234,13 @@ pub fn new_file_storage_clear_service(

pub struct HttpCache {
pub directory: Option<String>,
pub(crate) cached: Arc<dyn HttpCacheStorage>,
pub(crate) cache: Arc<dyn HttpCacheStorage>,
}

impl HttpCache {
#[inline]
pub fn stats(&self) -> Option<HttpCacheStats> {
self.cached.stats()
self.cache.stats()
}
}

Expand Down Expand Up @@ -358,7 +370,7 @@ impl Storage for HttpCache {
) -> pingora::Result<Option<(CacheMeta, HitHandler)>> {
let namespace = key.namespace();
let hash = key.combined();
if let Some(obj) = self.cached.get(&hash, namespace).await? {
if let Some(obj) = self.cache.get(&hash, namespace).await? {
let meta = CacheMeta::deserialize(&obj.meta.0, &obj.meta.1)?;
let size = obj.body.len();
let hit_handler = CompleteHit {
Expand Down Expand Up @@ -398,7 +410,7 @@ impl Storage for HttpCache {
meta,
key: hash,
namespace: key.namespace().to_string(),
cache: self.cached.clone(),
cache: self.cache.clone(),
body: BytesMut::with_capacity(size),
};
Ok(Box::new(miss_handler))
Expand All @@ -415,7 +427,7 @@ impl Storage for HttpCache {
let hash = key.combined();
// TODO get namespace of cache key
let cache_removed =
if let Ok(result) = self.cached.remove(&hash, "").await {
if let Ok(result) = self.cache.remove(&hash, "").await {
result.is_some()
} else {
false
Expand All @@ -431,9 +443,9 @@ impl Storage for HttpCache {
) -> pingora::Result<bool> {
let namespace = key.namespace();
let hash = key.combined();
if let Some(mut obj) = self.cached.get(&hash, namespace).await? {
if let Some(mut obj) = self.cache.get(&hash, namespace).await? {
obj.meta = meta.serialize()?;
let _ = self.cached.put(&hash, namespace, obj).await?;
let _ = self.cache.put(&hash, namespace, obj).await?;
Ok(true)
} else {
Err(Error::Invalid {
Expand Down
40 changes: 6 additions & 34 deletions src/plugin/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use super::{
get_bool_conf, get_hash_key, get_step_conf, get_str_conf,
get_str_slice_conf, Error, Plugin, Result,
};
use crate::cache::{new_file_cache, new_tiny_ufo_cache, HttpCache};
use crate::cache::{get_cache_backend, HttpCache};
use crate::config::{
get_current_config, PluginCategory, PluginConf, PluginStep,
};
Expand All @@ -29,7 +29,6 @@ use bytesize::ByteSize;
use fancy_regex::Regex;
use http::{Method, StatusCode};
use humantime::parse_duration;
use memory_stats::memory_stats;
use once_cell::sync::{Lazy, OnceCell};
use pingora::cache::eviction::simple_lru::Manager;
use pingora::cache::eviction::EvictionManager;
Expand All @@ -43,7 +42,6 @@ use tracing::{debug, error};

// memory limit size
const MAX_MEMORY_SIZE: usize = 100 * 1024 * 1024;
static CACHE_BACKEND: OnceCell<HttpCache> = OnceCell::new();
static PREDICTOR: OnceCell<Predictor<32>> = OnceCell::new();
static EVICTION_MANAGER: OnceCell<Manager> = OnceCell::new();
static CACHE_LOCK_ONE_SECOND: OnceCell<CacheLock> = OnceCell::new();
Expand All @@ -64,35 +62,6 @@ pub struct Cache {
hash_value: String,
}

fn get_cache_backend() -> Result<&'static HttpCache> {
// get global cache backend
CACHE_BACKEND.get_or_try_init(|| {
let basic_conf = &get_current_config().basic;
let size = if let Some(cache_max_size) = basic_conf.cache_max_size {
cache_max_size.as_u64() as usize
} else {
MAX_MEMORY_SIZE
};
// file cache
let cache = if let Some(dir) = &basic_conf.cache_directory {
new_file_cache(dir.as_str()).map_err(|e| Error::Invalid {
category: "cache_backend".to_string(),
message: e.to_string(),
})?
} else {
// max memory
let max_memory = if let Some(value) = memory_stats() {
value.physical_mem / 2
} else {
ByteSize::gb(4).as_u64() as usize
};
// tiny ufo cache
new_tiny_ufo_cache(size.min(max_memory))
};
Ok(cache)
})
}

fn get_eviction_manager() -> &'static Manager {
EVICTION_MANAGER.get_or_init(|| {
let size = if let Some(cache_max_size) =
Expand Down Expand Up @@ -132,7 +101,10 @@ impl TryFrom<&PluginConf> for Cache {
type Error = Error;
fn try_from(value: &PluginConf) -> Result<Self> {
let hash_value = get_hash_key(value);
let cache = get_cache_backend()?;
let cache = get_cache_backend().map_err(|e| Error::Invalid {
category: "cache_backend".to_string(),
message: e.to_string(),
})?;
let eviction = if value.contains_key("eviction") {
let eviction = get_eviction_manager();
Some(eviction as &'static (dyn EvictionManager + Sync))
Expand Down Expand Up @@ -326,7 +298,7 @@ impl Plugin for Cache {
&session.req_header().uri,
);
self.http_cache
.cached
.cache
.remove(&key.combined(), key.namespace())
.await?;
return Ok(Some(HttpResponse::no_content()));
Expand Down

0 comments on commit b08a5c0

Please sign in to comment.