Skip to content

Commit

Permalink
http caching dotan just asked for
Browse files Browse the repository at this point in the history
  • Loading branch information
YassinEldeeb committed Jan 29, 2024
1 parent 6a6cd84 commit 952c476
Show file tree
Hide file tree
Showing 11 changed files with 144 additions and 54 deletions.
8 changes: 6 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,9 @@
"yaml.schemas": {
"./libs/config/conductor.schema.json": "*.yaml"
},
"rust-analyzer.linkedProjects": ["./Cargo.toml"]
}
"rust-analyzer.linkedProjects": [
"./Cargo.toml",
"./libs/common/Cargo.toml",
"./libs/common/Cargo.toml"
]
}
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions libs/common/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ use std::sync::Arc;
use std::sync::Mutex;
use tracing::error;

#[async_trait(?Send)]
// #[async_trait(?Send)]
pub trait CacheStore {
async fn cache_get(&mut self, key: &str) -> Option<ConductorHttpResponse>;
async fn cache_set(&mut self, key: String, response: ConductorHttpResponse);
fn cache_get(&mut self, key: &str) -> Option<ConductorHttpResponse>;
fn cache_set(&mut self, key: String, response: ConductorHttpResponse);
fn id(&self) -> &String;
}

Expand Down Expand Up @@ -63,24 +63,24 @@ impl CacheStores {
pub async fn get_cached_response(&self, key: String, id: &str) -> Option<ConductorHttpResponse> {
let mut stores = self.caches.lock().unwrap();
if let Some(store) = stores.iter_mut().find(|store| store.id() == id) {
store.cache_get(&key).await
store.cache_get(&key)
} else {
None
}
}

pub async fn cache_response_if_absent(
pub fn cache_response_if_absent(
&self,
key: String,
response: ConductorHttpResponse,
id: &str,
) -> ConductorHttpResponse {
let mut stores = self.caches.lock().unwrap();
if let Some(store) = stores.iter_mut().find(|store| store.id() == id) {
if let Some(cached_response) = store.cache_get(&key).await {
if let Some(cached_response) = store.cache_get(&key) {
cached_response
} else {
store.cache_set(key.clone(), response.clone()).await;
store.cache_set(key.clone(), response.clone());
response
}
} else {
Expand Down
49 changes: 24 additions & 25 deletions libs/common/src/stores/cloudflare_kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,31 +23,30 @@ impl CloudflareKVCacheStore {
}
}

#[async_trait(?Send)]
impl CacheStore for CloudflareKVCacheStore {
async fn cache_get(&mut self, key: &str) -> Option<ConductorHttpResponse> {
let kv_store = self.kv_store.lock().unwrap();
match kv_store.get(key).text().await {
Ok(Some(data)) => serde_json::from_str(&data).ok(),
_ => None,
}
}
// impl CacheStore for CloudflareKVCacheStore {
// fn cache_get(&mut self, key: &str) -> Option<ConductorHttpResponse> {
// let kv_store = self.kv_store.lock().unwrap();
// match kv_store.get(key).text().await {
// Ok(Some(data)) => serde_json::from_str(&data).ok(),
// _ => None,
// }
// }

async fn cache_set(&mut self, key: String, response: ConductorHttpResponse) {
let response_str = serde_json::to_string(&response).ok().unwrap();
let ttl = self.cache_ttl_seconds.unwrap_or(3600);
// fn cache_set(&mut self, key: String, response: ConductorHttpResponse) {
// let response_str = serde_json::to_string(&response).ok().unwrap();
// let ttl = self.cache_ttl_seconds.unwrap_or(3600);

let mut kv_store = self.kv_store.lock().unwrap();
let _ = kv_store
.put(&key, &response_str)
.expect("Failed to create put request")
.expiration_ttl(ttl)
.execute()
.await
.unwrap();
}
// let mut kv_store = self.kv_store.lock().unwrap();
// let _ = kv_store
// .put(&key, &response_str)
// .expect("Failed to create put request")
// .expiration_ttl(ttl)
// .execute()
// .await
// .unwrap();
// }

fn id(&self) -> &String {
&self.id
}
}
// fn id(&self) -> &String {
// &self.id
// }
// }
4 changes: 2 additions & 2 deletions libs/common/src/stores/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ impl InMemoryCacheStore {
}
#[async_trait(?Send)]
impl CacheStore for InMemoryCacheStore {
async fn cache_get(&mut self, key: &str) -> Option<ConductorHttpResponse> {
fn cache_get(&mut self, key: &str) -> Option<ConductorHttpResponse> {
self.cache.cache_get(key).cloned()
}

async fn cache_set(&mut self, key: String, response: ConductorHttpResponse) {
fn cache_set(&mut self, key: String, response: ConductorHttpResponse) {
self.cache.cache_set(key, response);
}

Expand Down
5 changes: 2 additions & 3 deletions libs/common/src/stores/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ impl RedisCacheStore {
}
}

#[async_trait(?Send)]
impl CacheStore for RedisCacheStore {
async fn cache_get(&mut self, key: &str) -> Option<ConductorHttpResponse> {
fn cache_get(&mut self, key: &str) -> Option<ConductorHttpResponse> {
let mut con = match self.get_con() {
Ok(con) => con,
Err(_) => return None,
Expand All @@ -58,7 +57,7 @@ impl CacheStore for RedisCacheStore {
serde_json::from_str(&value).ok()
}

async fn cache_set(&mut self, key: String, response: ConductorHttpResponse) {
fn cache_set(&mut self, key: String, response: ConductorHttpResponse) {
let mut con = match self.get_con() {
Ok(con) => con,
Err(_) => return,
Expand Down
9 changes: 9 additions & 0 deletions libs/config/conductor.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,15 @@
"properties": {
"cache": {
"type": "string"
},
"max_age": {
"default": 60,
"type": [
"integer",
"null"
],
"format": "uint64",
"minimum": 0.0
}
}
},
Expand Down
2 changes: 2 additions & 0 deletions plugins/response_cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,7 @@ schemars = { workspace = true }
linked-hash-map = "0.5.6"
cached = "0.47.0"
redis = "0.24.0"
sha2 = "0.10.8"
hex = "0.4.3"

[dev-dependencies]
6 changes: 6 additions & 0 deletions plugins/response_cache/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,10 @@ use serde::{Deserialize, Serialize};
pub struct CachePluginConfig {
#[serde(rename = "cache")]
pub store_id: String,
#[serde(default = "defualt_max_age")]
pub max_age: Option<u64>,
}

fn defualt_max_age() -> Option<u64> {
Some(60)
}
88 changes: 78 additions & 10 deletions plugins/response_cache/src/plugin.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,30 @@
use std::borrow::Borrow;
use std::time::{SystemTime, UNIX_EPOCH};

use crate::config::CachePluginConfig;
use conductor_common::{
execute::RequestExecutionContext,
http::{ConductorHttpRequest, ConductorHttpResponse},
http::{Bytes, ConductorHttpRequest, ConductorHttpResponse},
plugin::{CreatablePlugin, Plugin, PluginError},
};
use hex::encode as hex_encode;
use reqwest::{
header::{HeaderMap, CACHE_CONTROL, ETAG, IF_NONE_MATCH},
StatusCode,
};
use sha2::{Digest, Sha256};

#[derive(Debug)]
pub struct CachePlugin {
store_id: String,
max_age: u64,
}

impl CachePlugin {
pub fn new(config: CachePluginConfig) -> Self {
CachePlugin {
store_id: config.store_id,
// @expected: it's never gonna be a `None`, we have a default value
max_age: config.max_age.unwrap(),
}
}

Expand All @@ -38,13 +47,50 @@ impl CreatablePlugin for CachePlugin {
impl Plugin for CachePlugin {
async fn on_downstream_http_request(&self, ctx: &mut RequestExecutionContext) {
let key = Self::generate_cache_key(&ctx.downstream_http_request);
if let Some(cached_response) = ctx
.cache_stores
.as_ref()
.and_then(|stores| Some(stores.get_cached_response(key.clone(), &self.store_id)))
{
ctx.was_cached = true;
ctx.short_circuit(cached_response.await.unwrap());
if let Some(stores) = ctx.cache_stores {
if let Some(mut cached_response) = stores
.get_cached_response(key.clone(), &self.store_id)
.await
{
// check if the cached response is still fresh
if let Some(cache_timestamp) = cached_response.headers.get("Cache-Timestamp") {
if let Ok(timestamp) = cache_timestamp.to_str() {
if let Ok(cached_time) = timestamp.parse::<u64>() {
let current_time = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Time went backwards")
.as_secs();

if current_time - cached_time > self.max_age {
// cached response is stale, fetch a new one
return;
}
}
}
}

if let Some(etag_header) = ctx.downstream_http_request.headers.get(IF_NONE_MATCH) {
if let Some(cached_etag) = cached_response.headers.get(ETAG) {
if etag_header == cached_etag {
ctx.short_circuit(ConductorHttpResponse {
status: StatusCode::NOT_MODIFIED,
body: Bytes::new(),
headers: HeaderMap::new(),
});
return;
}
}
}

cached_response
.headers
.insert(ETAG, generate_etag(&cached_response.body).parse().unwrap());
cached_response.headers.insert(
CACHE_CONTROL,
format!("max-age={}", self.max_age).parse().unwrap(),
);
ctx.short_circuit(cached_response);
}
}
}

Expand All @@ -54,11 +100,33 @@ impl Plugin for CachePlugin {
response: &mut ConductorHttpResponse,
) {
if !ctx.was_cached {
// only cache if it was not already fetched from the cache
let etag = generate_etag(&response.body);
response.headers.insert(ETAG, etag.parse().unwrap());
response.headers.insert(
CACHE_CONTROL,
format!("max-age={}", self.max_age).parse().unwrap(),
);

let current_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("time somehow went backwards")
.as_secs()
.to_string();
response
.headers
.insert("Cache-Timestamp", current_time.parse().unwrap());

let key = Self::generate_cache_key(&ctx.downstream_http_request);
if let Some(stores) = &ctx.cache_stores {
stores.cache_response_if_absent(key, response.clone(), &self.store_id);
}
}
}
}

fn generate_etag(body: &[u8]) -> String {
let mut hasher = Sha256::new();
hasher.update(body);
let result = hasher.finalize();
hex_encode(result)
}
11 changes: 6 additions & 5 deletions test_config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ endpoints:
max_age: 3600
- type: response_cache
config:
cache: hi
cache: hello
max_age: 10

- path: /vrl
from: countries
Expand Down Expand Up @@ -80,10 +81,10 @@ cache_stores:
eviction_policy: lru
max_size: 500

- type: redis
id: hi
config:
connection_string: redis://@localhost:6379
# - type: redis
# id: hi
# config:
# connection_string: redis://@localhost:6379

- type: cloudflare_kv
id: test
Expand Down

0 comments on commit 952c476

Please sign in to comment.