Skip to content

Commit

Permalink
Add and use extensions from configuration
Browse files Browse the repository at this point in the history
Signed-off-by: Adam Cattermole <[email protected]>
  • Loading branch information
adam-cattermole committed Aug 29, 2024
1 parent 37d3eaa commit abd2ff8
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 59 deletions.
119 changes: 85 additions & 34 deletions src/configuration.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::cell::OnceCell;
use std::collections::HashMap;
use std::fmt::{Debug, Display, Formatter};
use std::rc::Rc;
use std::sync::Arc;
Expand Down Expand Up @@ -441,17 +442,14 @@ pub fn type_of(path: &str) -> Option<ValueType> {

pub struct FilterConfig {
pub index: PolicyIndex,
// Deny/Allow request when faced with an irrecoverable failure.
pub failure_mode: FailureMode,
pub service: Rc<GrpcService>,
pub services: Rc<HashMap<String, Rc<GrpcService>>>,
}

impl Default for FilterConfig {
fn default() -> Self {
Self {
index: PolicyIndex::new(),
failure_mode: FailureMode::Deny,
service: Rc::new(GrpcService::default()),
services: Rc::new(HashMap::new()),
}
}
}
Expand Down Expand Up @@ -484,21 +482,30 @@ impl TryFrom<PluginConfiguration> for FilterConfig {
}
}

// todo(adam-cattermole): retrieve from config
let rl_service =
GrpcService::new(ExtensionType::RateLimit, config.policies[0].service.clone());
// configure grpc services from the extensions in config
let mut services = HashMap::new();
for (name, ext) in config.extensions {
services.insert(
name,
Rc::new(GrpcService::new(
ext.extension_type,
ext.endpoint,
ext.failure_mode,
)),
);
}

Ok(Self {
index,
failure_mode: config.failure_mode,
service: Rc::new(rl_service),
services: Rc::new(services),
})
}
}

#[derive(Deserialize, Debug, Clone)]
#[derive(Deserialize, Debug, Clone, Default)]
#[serde(rename_all = "lowercase")]
pub enum FailureMode {
#[default]
Deny,
Allow,
}
Expand All @@ -513,8 +520,16 @@ pub enum ExtensionType {
#[derive(Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct PluginConfiguration {
#[serde(rename = "rateLimitPolicies")]
pub extensions: HashMap<String, Extension>,
pub policies: Vec<Policy>,
}

#[derive(Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Extension {
#[serde(rename = "type")]
pub extension_type: ExtensionType,
pub endpoint: String,
// Deny/Allow request when faced with an irrecoverable failure.
pub failure_mode: FailureMode,
}
Expand All @@ -524,12 +539,17 @@ mod test {
use super::*;

const CONFIG: &str = r#"{
"failureMode": "deny",
"rateLimitPolicies": [
"extensions": {
"limitador": {
"type": "ratelimit",
"endpoint": "limitador-cluster",
"failureMode": "deny"
}
},
"policies": [
{
"name": "rlp-ns-A/rlp-name-A",
"domain": "rlp-ns-A/rlp-name-A",
"service": "limitador-cluster",
"hostnames": ["*.toystore.com", "example.com"],
"rules": [
{
Expand Down Expand Up @@ -621,8 +641,8 @@ mod test {
#[test]
fn parse_config_min() {
let config = r#"{
"failureMode": "deny",
"rateLimitPolicies": []
"extensions": {},
"policies": []
}"#;
let res = serde_json::from_str::<PluginConfiguration>(config);
if let Err(ref e) = res {
Expand All @@ -637,12 +657,17 @@ mod test {
#[test]
fn parse_config_data_selector() {
let config = r#"{
"failureMode": "deny",
"rateLimitPolicies": [
"extensions": {
"limitador": {
"type": "ratelimit",
"endpoint": "limitador-cluster",
"failureMode": "deny"
}
},
"policies": [
{
"name": "rlp-ns-A/rlp-name-A",
"domain": "rlp-ns-A/rlp-name-A",
"service": "limitador-cluster",
"hostnames": ["*.toystore.com", "example.com"],
"rules": [
{
Expand Down Expand Up @@ -687,12 +712,17 @@ mod test {
#[test]
fn parse_config_condition_selector_operators() {
let config = r#"{
"failureMode": "deny",
"rateLimitPolicies": [
"extensions": {
"limitador": {
"type": "ratelimit",
"endpoint": "limitador-cluster",
"failureMode": "deny"
}
},
"policies": [
{
"name": "rlp-ns-A/rlp-name-A",
"domain": "rlp-ns-A/rlp-name-A",
"service": "limitador-cluster",
"hostnames": ["*.toystore.com", "example.com"],
"rules": [
{
Expand Down Expand Up @@ -766,12 +796,17 @@ mod test {
#[test]
fn parse_config_conditions_optional() {
let config = r#"{
"failureMode": "deny",
"rateLimitPolicies": [
"extensions": {
"limitador": {
"type": "ratelimit",
"endpoint": "limitador-cluster",
"failureMode": "deny"
}
},
"policies": [
{
"name": "rlp-ns-A/rlp-name-A",
"domain": "rlp-ns-A/rlp-name-A",
"service": "limitador-cluster",
"hostnames": ["*.toystore.com", "example.com"],
"rules": [
{
Expand Down Expand Up @@ -810,12 +845,17 @@ mod test {
fn parse_config_invalid_data() {
// data item fields are mutually exclusive
let bad_config = r#"{
"failureMode": "deny",
"rateLimitPolicies": [
"extensions": {
"limitador": {
"type": "ratelimit",
"endpoint": "limitador-cluster",
"failureMode": "deny"
}
},
"policies": [
{
"name": "rlp-ns-A/rlp-name-A",
"domain": "rlp-ns-A/rlp-name-A",
"service": "limitador-cluster",
"hostnames": ["*.toystore.com", "example.com"],
"rules": [
{
Expand All @@ -837,8 +877,14 @@ mod test {

// data item unknown fields are forbidden
let bad_config = r#"{
"failureMode": "deny",
"rateLimitPolicies": [
"extensions": {
"limitador": {
"type": "ratelimit",
"endpoint": "limitador-cluster",
"failureMode": "deny"
}
},
"policies": [
{
"name": "rlp-ns-A/rlp-name-A",
"domain": "rlp-ns-A/rlp-name-A",
Expand All @@ -861,12 +907,17 @@ mod test {

// condition selector operator unknown
let bad_config = r#"{
"failureMode": "deny",
"rateLimitPolicies": [
"extensions": {
"limitador": {
"type": "ratelimit",
"endpoint": "limitador-cluster",
"failureMode": "deny"
}
},
"policies": [
{
"name": "rlp-ns-A/rlp-name-A",
"domain": "rlp-ns-A/rlp-name-A",
"service": "limitador-cluster",
"hostnames": ["*.toystore.com", "example.com"],
"rules": [
{
Expand Down
28 changes: 21 additions & 7 deletions src/filter/http_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,19 @@ impl Filter {
return Action::Continue;
}

let rls = GrpcServiceHandler::new(
Rc::clone(&self.config.service),
Rc::clone(&self.header_resolver),
);
// todo(adam-cattermole): For now we just get the first GrpcService but we expect to have
// an action which links to the service that should be used
let rls = self
.config
.services
.values()
.next()
.expect("expect a value");

let handler = GrpcServiceHandler::new(Rc::clone(rls), Rc::clone(&self.header_resolver));
let message = RateLimitService::message(rlp.domain.clone(), descriptors);

match rls.send(message) {
match handler.send(message) {
Ok(call_id) => {
debug!(
"#{} initiated gRPC call (id# {}) to Limitador",
Expand All @@ -56,7 +62,7 @@ impl Filter {
}
Err(e) => {
warn!("gRPC call to Limitador failed! {e:?}");
if let FailureMode::Deny = self.config.failure_mode {
if let FailureMode::Deny = rls.failure_mode() {
self.send_http_response(500, vec![], Some(b"Internal Server Error.\n"))
}
Action::Continue
Expand All @@ -65,7 +71,15 @@ impl Filter {
}

fn handle_error_on_grpc_response(&self) {
match &self.config.failure_mode {
// todo(adam-cattermole): We need a method of knowing which service is the one currently
// being used (the current action) so that we can get the failure mode
let rls = self
.config
.services
.values()
.next()
.expect("expect a value");
match rls.failure_mode() {
FailureMode::Deny => {
self.send_http_response(500, vec![], Some(b"Internal Server Error.\n"))
}
Expand Down
10 changes: 1 addition & 9 deletions src/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,16 @@ pub struct Rule {
pub struct Policy {
pub name: String,
pub domain: String,
pub service: String,
pub hostnames: Vec<String>,
pub rules: Vec<Rule>,
}

impl Policy {
#[cfg(test)]
pub fn new(
name: String,
domain: String,
service: String,
hostnames: Vec<String>,
rules: Vec<Rule>,
) -> Self {
pub fn new(name: String, domain: String, hostnames: Vec<String>, rules: Vec<Rule>) -> Self {
Policy {
name,
domain,
service,
hostnames,
rules,
}
Expand Down
8 changes: 1 addition & 7 deletions src/policy_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,7 @@ mod tests {
use crate::policy_index::PolicyIndex;

fn build_ratelimit_policy(name: &str) -> Policy {
Policy::new(
name.to_owned(),
"".to_owned(),
"".to_owned(),
Vec::new(),
Vec::new(),
)
Policy::new(name.to_owned(), "".to_owned(), Vec::new(), Vec::new())
}

#[test]
Expand Down
17 changes: 15 additions & 2 deletions src/service.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pub(crate) mod auth;
pub(crate) mod rate_limit;

use crate::configuration::ExtensionType;
use crate::configuration::{ExtensionType, FailureMode};
use crate::service::auth::{AUTH_METHOD_NAME, AUTH_SERVICE_NAME};
use crate::service::rate_limit::{RATELIMIT_METHOD_NAME, RATELIMIT_SERVICE_NAME};
use crate::service::TracingHeader::{Baggage, Traceparent, Tracestate};
Expand All @@ -18,20 +18,23 @@ pub struct GrpcService {
endpoint: String,
name: &'static str,
method: &'static str,
failure_mode: FailureMode,
}

impl GrpcService {
pub fn new(extension_type: ExtensionType, endpoint: String) -> Self {
pub fn new(extension_type: ExtensionType, endpoint: String, failure_mode: FailureMode) -> Self {
match extension_type {
ExtensionType::Auth => Self {
endpoint,
name: AUTH_SERVICE_NAME,
method: AUTH_METHOD_NAME,
failure_mode,
},
ExtensionType::RateLimit => Self {
endpoint,
name: RATELIMIT_SERVICE_NAME,
method: RATELIMIT_METHOD_NAME,
failure_mode,
},
}
}
Expand All @@ -44,8 +47,12 @@ impl GrpcService {
fn method(&self) -> &str {
self.method
}
pub fn failure_mode(&self) -> &FailureMode {
&self.failure_mode
}
}

#[derive(Default)]
pub struct GrpcServiceHandler {
service: Rc<GrpcService>,
header_resolver: Rc<HeaderResolver>,
Expand Down Expand Up @@ -83,6 +90,12 @@ pub struct HeaderResolver {
headers: OnceCell<Vec<(&'static str, Bytes)>>,
}

impl Default for HeaderResolver {
fn default() -> Self {
Self::new()
}
}

impl HeaderResolver {
pub fn new() -> Self {
Self {
Expand Down

0 comments on commit abd2ff8

Please sign in to comment.