From 3c1dfcdf3e9e42daf6e2d4ff79331236845af1ef Mon Sep 17 00:00:00 2001 From: Kalpit Pant Date: Wed, 16 Oct 2024 13:09:35 +0530 Subject: [PATCH 01/14] feature/redis-circuit-breaker --- go.mod | 6 +++++- go.sum | 19 +++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 4367bed..f33f655 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,8 @@ module github.com/sony/gobreaker go 1.12 -require github.com/stretchr/testify v1.3.0 +require ( + github.com/alicebob/miniredis/v2 v2.33.0 + github.com/redis/go-redis/v9 v9.6.2 + github.com/stretchr/testify v1.3.0 +) diff --git a/go.sum b/go.sum index 4347755..ef08208 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,26 @@ +github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk= +github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= +github.com/alicebob/miniredis/v2 v2.33.0 h1:uvTF0EDeu9RLnUEG27Db5I68ESoIxTiXbNUiji6lZrA= +github.com/alicebob/miniredis/v2 v2.33.0/go.mod h1:MhP4a3EU7aENRi9aO+tHfTBZicLqQevyi/DJpoj6mi0= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= +github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= +github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/redis/go-redis/v9 v9.6.2 h1:w0uvkRbc9KpgD98zcvo5IrVUsn0lXpRMuhNgiHDJzdk= +github.com/redis/go-redis/v9 v9.6.2/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= +github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M= +github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= +golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= From 1704c10514158f1ab0bd02a156184718b5ffa8fd Mon Sep 17 00:00:00 2001 From: Kalpit Pant Date: Wed, 16 Oct 2024 13:16:31 +0530 Subject: [PATCH 02/14] feature/redis-circuit-breaker --- redis_circuit_breaker.go | 240 ++++++++++++++++++++++++++++++++ redis_circuit_breaker_test.go | 250 ++++++++++++++++++++++++++++++++++ 2 files changed, 490 insertions(+) create mode 100644 redis_circuit_breaker.go create mode 100644 redis_circuit_breaker_test.go diff --git a/redis_circuit_breaker.go b/redis_circuit_breaker.go new file mode 100644 index 0000000..61940ca --- /dev/null +++ b/redis_circuit_breaker.go @@ -0,0 +1,240 @@ +package gobreaker + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/redis/go-redis/v9" +) + +// RedisCircuitBreaker extends CircuitBreaker with Redis-based state storage +type RedisCircuitBreaker struct { + *CircuitBreaker + redisClient RedisClientInterface + redisKey string +} + +type RedisClientInterface interface { + Get(ctx context.Context, key string) *redis.StringCmd + Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.StatusCmd +} + +// RedisSettings extends Settings with Redis configuration +type RedisSettings struct { + Settings + RedisClient RedisClientInterface + RedisKey string +} + +// NewRedisCircuitBreaker returns a new RedisCircuitBreaker configured with the given RedisSettings +func NewRedisCircuitBreaker(st RedisSettings) *RedisCircuitBreaker { + cb := NewCircuitBreaker(st.Settings) + return &RedisCircuitBreaker{ + CircuitBreaker: cb, + redisClient: st.RedisClient, + redisKey: st.RedisKey, + } +} + +// RedisState represents the CircuitBreaker state stored in Redis +type RedisState struct { + State State `json:"state"` + Generation uint64 `json:"generation"` + Counts Counts `json:"counts"` + Expiry time.Time `json:"expiry"` +} + +func (rcb *RedisCircuitBreaker) State() State { + if rcb.redisClient == nil { + return rcb.CircuitBreaker.State() + } + + ctx := context.Background() + state, err := rcb.getRedisState(ctx) + if err != nil { + // Fallback to in-memory state if Redis fails + return rcb.CircuitBreaker.State() + } + + now := time.Now() + currentState, _ := rcb.currentState(state, now) + + // Update the state in Redis if it has changed + if currentState != state.State { + state.State = currentState + if err := rcb.setRedisState(ctx, state); err != nil { + // Log the error, but continue with the current state + fmt.Printf("Failed to update state in Redis: %v\n", err) + } + } + + return currentState +} + +// Execute runs the given request if the RedisCircuitBreaker accepts it +func (rcb *RedisCircuitBreaker) Execute(req func() (interface{}, error)) (interface{}, error) { + if rcb.redisClient == nil { + return rcb.CircuitBreaker.Execute(req) + } + + generation, err := rcb.beforeRequest() + if err != nil { + return nil, err + } + + defer func() { + e := recover() + if e != nil { + rcb.afterRequest(generation, false) + panic(e) + } + }() + + result, err := req() + rcb.afterRequest(generation, rcb.isSuccessful(err)) + return result, err +} + +func (rcb *RedisCircuitBreaker) beforeRequest() (uint64, error) { + ctx := context.Background() + state, err := rcb.getRedisState(ctx) + if err != nil { + return 0, err + } + + now := time.Now() + currentState, generation := rcb.currentState(state, now) + + if currentState == StateOpen { + return generation, ErrOpenState + } else if currentState == StateHalfOpen && state.Counts.Requests >= rcb.maxRequests { + return generation, ErrTooManyRequests + } + + state.Counts.onRequest() + err = rcb.setRedisState(ctx, state) + if err != nil { + return 0, err + } + + return generation, nil +} + +func (rcb *RedisCircuitBreaker) afterRequest(before uint64, success bool) { + ctx := context.Background() + state, err := rcb.getRedisState(ctx) + if err != nil { + return + } + + now := time.Now() + currentState, generation := rcb.currentState(state, now) + if generation != before { + return + } + + if success { + rcb.onSuccess(&state, currentState, now) + } else { + rcb.onFailure(&state, currentState, now) + } + + rcb.setRedisState(ctx, state) +} + +func (rcb *RedisCircuitBreaker) onSuccess(state *RedisState, currentState State, now time.Time) { + switch currentState { + case StateClosed: + state.Counts.onSuccess() + case StateHalfOpen: + state.Counts.onSuccess() + if state.Counts.ConsecutiveSuccesses >= rcb.maxRequests { + rcb.setState(state, StateClosed, now) + } + } +} + +func (rcb *RedisCircuitBreaker) onFailure(state *RedisState, currentState State, now time.Time) { + switch currentState { + case StateClosed: + state.Counts.onFailure() + if rcb.readyToTrip(state.Counts) { + rcb.setState(state, StateOpen, now) + } + case StateHalfOpen: + rcb.setState(state, StateOpen, now) + } +} + +func (rcb *RedisCircuitBreaker) currentState(state RedisState, now time.Time) (State, uint64) { + switch state.State { + case StateClosed: + if !state.Expiry.IsZero() && state.Expiry.Before(now) { + rcb.toNewGeneration(&state, now) + } + case StateOpen: + if state.Expiry.Before(now) { + rcb.setState(&state, StateHalfOpen, now) + } + } + return state.State, state.Generation +} + +func (rcb *RedisCircuitBreaker) setState(state *RedisState, newState State, now time.Time) { + if state.State == newState { + return + } + + prev := state.State + state.State = newState + + rcb.toNewGeneration(state, now) + + if rcb.onStateChange != nil { + rcb.onStateChange(rcb.name, prev, newState) + } +} + +func (rcb *RedisCircuitBreaker) toNewGeneration(state *RedisState, now time.Time) { + state.Generation++ + state.Counts.clear() + + var zero time.Time + switch state.State { + case StateClosed: + if rcb.interval == 0 { + state.Expiry = zero + } else { + state.Expiry = now.Add(rcb.interval) + } + case StateOpen: + state.Expiry = now.Add(rcb.timeout) + default: // StateHalfOpen + state.Expiry = zero + } +} + +func (rcb *RedisCircuitBreaker) getRedisState(ctx context.Context) (RedisState, error) { + var state RedisState + data, err := rcb.redisClient.Get(ctx, rcb.redisKey).Bytes() + if err == redis.Nil { + // Key doesn't exist, return default state + return RedisState{State: StateClosed}, nil + } else if err != nil { + return state, err + } + + err = json.Unmarshal(data, &state) + return state, err +} + +func (rcb *RedisCircuitBreaker) setRedisState(ctx context.Context, state RedisState) error { + data, err := json.Marshal(state) + if err != nil { + return err + } + + return rcb.redisClient.Set(ctx, rcb.redisKey, data, 0).Err() +} diff --git a/redis_circuit_breaker_test.go b/redis_circuit_breaker_test.go new file mode 100644 index 0000000..6e0355d --- /dev/null +++ b/redis_circuit_breaker_test.go @@ -0,0 +1,250 @@ +package gobreaker + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/alicebob/miniredis/v2" + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/assert" +) + +var defaultRCB *RedisCircuitBreaker +var customRCB *RedisCircuitBreaker + +func setupTestWithMiniredis() (*RedisCircuitBreaker, *miniredis.Miniredis, *redis.Client) { + mr, err := miniredis.Run() + if err != nil { + panic(err) + } + + client := redis.NewClient(&redis.Options{ + Addr: mr.Addr(), + }) + + return NewRedisCircuitBreaker(RedisSettings{ + Settings: Settings{ + Name: "TestBreaker", + MaxRequests: 3, + Interval: time.Second, + Timeout: time.Second * 2, + ReadyToTrip: func(counts Counts) bool { + return counts.ConsecutiveFailures > 5 + }, + }, + RedisClient: client, + RedisKey: "test_breaker", + }), mr, client +} + +func pseudoSleepRedis(rcb *RedisCircuitBreaker, period time.Duration) { + ctx := context.Background() + state, _ := rcb.getRedisState(ctx) + + state.Expiry = state.Expiry.Add(-period) + // Reset counts if the interval has passed + if time.Now().After(state.Expiry) { + state.Counts = Counts{} + } + rcb.setRedisState(ctx, state) + +} + +func successRequest(rcb *RedisCircuitBreaker) error { + _, err := rcb.Execute(func() (interface{}, error) { return nil, nil }) + return err +} + +func failRequest(rcb *RedisCircuitBreaker) error { + _, err := rcb.Execute(func() (interface{}, error) { return nil, errors.New("fail") }) + if err != nil && err.Error() == "fail" { + return nil + } + return err +} + +func TestRedisCircuitBreakerInitialization(t *testing.T) { + rcb, mr, _ := setupTestWithMiniredis() + defer mr.Close() + + assert.Equal(t, "TestBreaker", rcb.Name()) + assert.Equal(t, uint32(3), rcb.maxRequests) + assert.Equal(t, time.Second, rcb.interval) + assert.Equal(t, time.Second*2, rcb.timeout) + assert.NotNil(t, rcb.readyToTrip) + + state := rcb.State() + assert.Equal(t, StateClosed, state) +} + +func TestRedisCircuitBreakerStateTransitions(t *testing.T) { + rcb, mr, _ := setupTestWithMiniredis() + defer mr.Close() + + // Check if initial state is closed + assert.Equal(t, StateClosed, rcb.State()) + + // StateClosed to StateOpen + for i := 0; i < 6; i++ { + assert.NoError(t, failRequest(rcb)) + } + + assert.Equal(t, StateOpen, rcb.State()) + + // Ensure requests fail when circuit is open + err := failRequest(rcb) + assert.Error(t, err) + assert.Equal(t, ErrOpenState, err) + + // Wait for timeout to transition to half-open + pseudoSleepRedis(rcb, rcb.timeout) + assert.Equal(t, StateHalfOpen, rcb.State()) + + // StateHalfOpen to StateClosed + for i := 0; i < int(rcb.maxRequests); i++ { + assert.NoError(t, successRequest(rcb)) + } + assert.Equal(t, StateClosed, rcb.State()) + + // StateClosed to StateOpen (again) + for i := 0; i < 6; i++ { + assert.NoError(t, failRequest(rcb)) + } + assert.Equal(t, StateOpen, rcb.State()) +} + +func TestRedisCircuitBreakerExecution(t *testing.T) { + rcb, mr, _ := setupTestWithMiniredis() + defer mr.Close() + + // Test successful execution + result, err := rcb.Execute(func() (interface{}, error) { + return "success", nil + }) + assert.NoError(t, err) + assert.Equal(t, "success", result) + + // Test failed execution + _, err = rcb.Execute(func() (interface{}, error) { + return nil, errors.New("test error") + }) + assert.Error(t, err) + assert.Equal(t, "test error", err.Error()) +} + +func TestRedisCircuitBreakerCounts(t *testing.T) { + rcb, mr, _ := setupTestWithMiniredis() + defer mr.Close() + + for i := 0; i < 5; i++ { + assert.Nil(t, successRequest(rcb)) + } + + ctx := context.Background() + state, _ := rcb.getRedisState(ctx) + assert.Equal(t, Counts{5, 5, 0, 5, 0}, state.Counts) + + assert.Nil(t, failRequest(rcb)) + state, _ = rcb.getRedisState(ctx) + assert.Equal(t, Counts{6, 5, 1, 0, 1}, state.Counts) +} + +func TestRedisCircuitBreakerFallback(t *testing.T) { + rcb, mr, _ := setupTestWithMiniredis() + defer mr.Close() + + // Test when Redis is unavailable + mr.Close() // Simulate Redis being unavailable + + rcb.redisClient = nil + + state := rcb.State() + assert.Equal(t, StateClosed, state, "Should fallback to in-memory state when Redis is unavailable") + + // Ensure operations still work without Redis + assert.Nil(t, successRequest(rcb)) + assert.Nil(t, failRequest(rcb)) +} + +func TestCustomRedisCircuitBreaker(t *testing.T) { + mr, err := miniredis.Run() + if err != nil { + panic(err) + } + defer mr.Close() + + client := redis.NewClient(&redis.Options{ + Addr: mr.Addr(), + }) + + customRCB = NewRedisCircuitBreaker(RedisSettings{ + Settings: Settings{ + Name: "CustomBreaker", + MaxRequests: 3, + Interval: time.Second * 30, + Timeout: time.Second * 90, + ReadyToTrip: func(counts Counts) bool { + numReqs := counts.Requests + failureRatio := float64(counts.TotalFailures) / float64(numReqs) + return numReqs >= 3 && failureRatio >= 0.6 + }, + }, + RedisClient: client, + RedisKey: "custom_breaker", + }) + + t.Run("Initialization", func(t *testing.T) { + assert.Equal(t, "CustomBreaker", customRCB.Name()) + assert.Equal(t, StateClosed, customRCB.State()) + }) + + t.Run("Counts and State Transitions", func(t *testing.T) { + ctx := context.Background() + + // Perform 5 successful and 5 failed requests + for i := 0; i < 5; i++ { + assert.NoError(t, successRequest(customRCB)) + assert.NoError(t, failRequest(customRCB)) + } + + state, err := customRCB.getRedisState(ctx) + assert.NoError(t, err) + assert.Equal(t, StateClosed, state.State) + assert.Equal(t, Counts{10, 5, 5, 0, 1}, state.Counts) + + // Perform one more successful request + assert.NoError(t, successRequest(customRCB)) + state, err = customRCB.getRedisState(ctx) + assert.NoError(t, err) + assert.Equal(t, Counts{11, 6, 5, 1, 0}, state.Counts) + + // Simulate time passing to reset counts + pseudoSleepRedis(customRCB, time.Second*30) + + // Perform requests to trigger StateOpen + assert.NoError(t, successRequest(customRCB)) + assert.NoError(t, failRequest(customRCB)) + assert.NoError(t, failRequest(customRCB)) + + // Check if the circuit breaker is now open + assert.Equal(t, StateOpen, customRCB.State()) + + state, err = customRCB.getRedisState(ctx) + assert.NoError(t, err) + assert.Equal(t, Counts{0, 0, 0, 0, 0}, state.Counts) + }) + + t.Run("Timeout and Half-Open State", func(t *testing.T) { + // Simulate timeout to transition to half-open state + pseudoSleepRedis(customRCB, time.Second*90) + assert.Equal(t, StateHalfOpen, customRCB.State()) + + // Successful requests in half-open state should close the circuit + for i := 0; i < 3; i++ { + assert.NoError(t, successRequest(customRCB)) + } + assert.Equal(t, StateClosed, customRCB.State()) + }) +} From 0c110f0f38b2d50c30e78f584ec9dc86676debd5 Mon Sep 17 00:00:00 2001 From: Kalpit Pant Date: Thu, 17 Oct 2024 16:29:28 +0530 Subject: [PATCH 03/14] Refactor --- redis_circuit_breaker.go | 31 ++++++++++++++++--------------- redis_circuit_breaker_test.go | 8 ++------ 2 files changed, 18 insertions(+), 21 deletions(-) diff --git a/redis_circuit_breaker.go b/redis_circuit_breaker.go index 61940ca..ef114f9 100644 --- a/redis_circuit_breaker.go +++ b/redis_circuit_breaker.go @@ -9,32 +9,29 @@ import ( "github.com/redis/go-redis/v9" ) +type CacheClient interface { + Get(ctx context.Context, key string) *redis.StringCmd + Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.StatusCmd +} + // RedisCircuitBreaker extends CircuitBreaker with Redis-based state storage type RedisCircuitBreaker struct { *CircuitBreaker - redisClient RedisClientInterface - redisKey string -} - -type RedisClientInterface interface { - Get(ctx context.Context, key string) *redis.StringCmd - Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.StatusCmd + redisClient CacheClient } // RedisSettings extends Settings with Redis configuration type RedisSettings struct { Settings - RedisClient RedisClientInterface - RedisKey string + RedisKey string } // NewRedisCircuitBreaker returns a new RedisCircuitBreaker configured with the given RedisSettings -func NewRedisCircuitBreaker(st RedisSettings) *RedisCircuitBreaker { - cb := NewCircuitBreaker(st.Settings) +func NewRedisCircuitBreaker(redisClient CacheClient, settings RedisSettings) *RedisCircuitBreaker { + cb := NewCircuitBreaker(settings.Settings) return &RedisCircuitBreaker{ CircuitBreaker: cb, - redisClient: st.RedisClient, - redisKey: st.RedisKey, + redisClient: redisClient, } } @@ -216,9 +213,13 @@ func (rcb *RedisCircuitBreaker) toNewGeneration(state *RedisState, now time.Time } } +func (rcb *RedisCircuitBreaker) getRedisKey() string { + return "cb:" + rcb.name +} + func (rcb *RedisCircuitBreaker) getRedisState(ctx context.Context) (RedisState, error) { var state RedisState - data, err := rcb.redisClient.Get(ctx, rcb.redisKey).Bytes() + data, err := rcb.redisClient.Get(ctx, rcb.getRedisKey()).Bytes() if err == redis.Nil { // Key doesn't exist, return default state return RedisState{State: StateClosed}, nil @@ -236,5 +237,5 @@ func (rcb *RedisCircuitBreaker) setRedisState(ctx context.Context, state RedisSt return err } - return rcb.redisClient.Set(ctx, rcb.redisKey, data, 0).Err() + return rcb.redisClient.Set(ctx, rcb.getRedisKey(), data, 0).Err() } diff --git a/redis_circuit_breaker_test.go b/redis_circuit_breaker_test.go index 6e0355d..a2c6f59 100644 --- a/redis_circuit_breaker_test.go +++ b/redis_circuit_breaker_test.go @@ -24,7 +24,7 @@ func setupTestWithMiniredis() (*RedisCircuitBreaker, *miniredis.Miniredis, *redi Addr: mr.Addr(), }) - return NewRedisCircuitBreaker(RedisSettings{ + return NewRedisCircuitBreaker(client, RedisSettings{ Settings: Settings{ Name: "TestBreaker", MaxRequests: 3, @@ -34,8 +34,6 @@ func setupTestWithMiniredis() (*RedisCircuitBreaker, *miniredis.Miniredis, *redi return counts.ConsecutiveFailures > 5 }, }, - RedisClient: client, - RedisKey: "test_breaker", }), mr, client } @@ -179,7 +177,7 @@ func TestCustomRedisCircuitBreaker(t *testing.T) { Addr: mr.Addr(), }) - customRCB = NewRedisCircuitBreaker(RedisSettings{ + customRCB = NewRedisCircuitBreaker(client, RedisSettings{ Settings: Settings{ Name: "CustomBreaker", MaxRequests: 3, @@ -191,8 +189,6 @@ func TestCustomRedisCircuitBreaker(t *testing.T) { return numReqs >= 3 && failureRatio >= 0.6 }, }, - RedisClient: client, - RedisKey: "custom_breaker", }) t.Run("Initialization", func(t *testing.T) { From 1bd941ceaccff18aa781500759000866c4c468b1 Mon Sep 17 00:00:00 2001 From: Kalpit Pant Date: Mon, 21 Oct 2024 17:44:20 +0530 Subject: [PATCH 04/14] save state --- redis_circuit_breaker.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/redis_circuit_breaker.go b/redis_circuit_breaker.go index ef114f9..e6711d5 100644 --- a/redis_circuit_breaker.go +++ b/redis_circuit_breaker.go @@ -189,6 +189,13 @@ func (rcb *RedisCircuitBreaker) setState(state *RedisState, newState State, now rcb.toNewGeneration(state, now) + // Save the updated state to Redis + ctx := context.Background() + if err := rcb.setRedisState(ctx, *state); err != nil { + // Log the error, but continue with the current state + fmt.Printf("Failed to update state in Redis: %v\n", err) + } + if rcb.onStateChange != nil { rcb.onStateChange(rcb.name, prev, newState) } From 88df094bc0a96ab48e5bf056cc18f0d621cba44d Mon Sep 17 00:00:00 2001 From: Kalpit Pant Date: Tue, 22 Oct 2024 09:44:45 +0530 Subject: [PATCH 05/14] Saving half-open state also --- redis_circuit_breaker.go | 39 +++++++++++++++++---------------------- 1 file changed, 17 insertions(+), 22 deletions(-) diff --git a/redis_circuit_breaker.go b/redis_circuit_breaker.go index e6711d5..b3dd9ca 100644 --- a/redis_circuit_breaker.go +++ b/redis_circuit_breaker.go @@ -3,7 +3,6 @@ package gobreaker import ( "context" "encoding/json" - "fmt" "time" "github.com/redis/go-redis/v9" @@ -55,19 +54,19 @@ func (rcb *RedisCircuitBreaker) State() State { return rcb.CircuitBreaker.State() } - now := time.Now() - currentState, _ := rcb.currentState(state, now) + // now := time.Now() + // currentState, _ := rcb.currentState(state, now) - // Update the state in Redis if it has changed - if currentState != state.State { - state.State = currentState - if err := rcb.setRedisState(ctx, state); err != nil { - // Log the error, but continue with the current state - fmt.Printf("Failed to update state in Redis: %v\n", err) - } - } + // // Update the state in Redis if it has changed + // if currentState != state.State { + // state.State = currentState + // if err := rcb.setRedisState(ctx, state); err != nil { + // // Log the error, but continue with the current state + // fmt.Printf("Failed to update state in Redis: %v\n", err) + // } + // } - return currentState + return state.State } // Execute runs the given request if the RedisCircuitBreaker accepts it @@ -75,7 +74,6 @@ func (rcb *RedisCircuitBreaker) Execute(req func() (interface{}, error)) (interf if rcb.redisClient == nil { return rcb.CircuitBreaker.Execute(req) } - generation, err := rcb.beforeRequest() if err != nil { return nil, err @@ -91,6 +89,7 @@ func (rcb *RedisCircuitBreaker) Execute(req func() (interface{}, error)) (interf result, err := req() rcb.afterRequest(generation, rcb.isSuccessful(err)) + return result, err } @@ -100,7 +99,6 @@ func (rcb *RedisCircuitBreaker) beforeRequest() (uint64, error) { if err != nil { return 0, err } - now := time.Now() currentState, generation := rcb.currentState(state, now) @@ -125,7 +123,6 @@ func (rcb *RedisCircuitBreaker) afterRequest(before uint64, success bool) { if err != nil { return } - now := time.Now() currentState, generation := rcb.currentState(state, now) if generation != before { @@ -142,6 +139,10 @@ func (rcb *RedisCircuitBreaker) afterRequest(before uint64, success bool) { } func (rcb *RedisCircuitBreaker) onSuccess(state *RedisState, currentState State, now time.Time) { + if state.State == StateOpen { + state.State = currentState + } + switch currentState { case StateClosed: state.Counts.onSuccess() @@ -189,19 +190,13 @@ func (rcb *RedisCircuitBreaker) setState(state *RedisState, newState State, now rcb.toNewGeneration(state, now) - // Save the updated state to Redis - ctx := context.Background() - if err := rcb.setRedisState(ctx, *state); err != nil { - // Log the error, but continue with the current state - fmt.Printf("Failed to update state in Redis: %v\n", err) - } - if rcb.onStateChange != nil { rcb.onStateChange(rcb.name, prev, newState) } } func (rcb *RedisCircuitBreaker) toNewGeneration(state *RedisState, now time.Time) { + state.Generation++ state.Counts.clear() From 4de08e36f4d9e9e933975c4105eaeaa5ab1046f7 Mon Sep 17 00:00:00 2001 From: Kalpit Pant Date: Tue, 22 Oct 2024 09:48:20 +0530 Subject: [PATCH 06/14] Saving half-open state also --- redis_circuit_breaker.go | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/redis_circuit_breaker.go b/redis_circuit_breaker.go index b3dd9ca..884d161 100644 --- a/redis_circuit_breaker.go +++ b/redis_circuit_breaker.go @@ -3,6 +3,7 @@ package gobreaker import ( "context" "encoding/json" + "fmt" "time" "github.com/redis/go-redis/v9" @@ -54,17 +55,17 @@ func (rcb *RedisCircuitBreaker) State() State { return rcb.CircuitBreaker.State() } - // now := time.Now() - // currentState, _ := rcb.currentState(state, now) + now := time.Now() + currentState, _ := rcb.currentState(state, now) - // // Update the state in Redis if it has changed - // if currentState != state.State { - // state.State = currentState - // if err := rcb.setRedisState(ctx, state); err != nil { - // // Log the error, but continue with the current state - // fmt.Printf("Failed to update state in Redis: %v\n", err) - // } - // } + // Update the state in Redis if it has changed + if currentState != state.State { + state.State = currentState + if err := rcb.setRedisState(ctx, state); err != nil { + // Log the error, but continue with the current state + fmt.Printf("Failed to update state in Redis: %v\n", err) + } + } return state.State } From cb05588c7dcb6b7c5f9e89e8b158e2bc94e87b33 Mon Sep 17 00:00:00 2001 From: Kalpit Pant Date: Tue, 22 Oct 2024 09:53:11 +0530 Subject: [PATCH 07/14] Added test case --- redis_circuit_breaker_test.go | 66 +++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/redis_circuit_breaker_test.go b/redis_circuit_breaker_test.go index a2c6f59..3ea947f 100644 --- a/redis_circuit_breaker_test.go +++ b/redis_circuit_breaker_test.go @@ -244,3 +244,69 @@ func TestCustomRedisCircuitBreaker(t *testing.T) { assert.Equal(t, StateClosed, customRCB.State()) }) } + +func TestCustomRedisCircuitBreakerStateTransitions(t *testing.T) { + // Setup + var stateChange StateChange + customSt := Settings{ + Name: "cb", + MaxRequests: 3, + Interval: 5 * time.Second, + Timeout: 5 * time.Second, + ReadyToTrip: func(counts Counts) bool { + return counts.ConsecutiveFailures >= 2 + }, + OnStateChange: func(name string, from State, to State) { + stateChange = StateChange{name, from, to} + }, + } + + mr, err := miniredis.Run() + if err != nil { + t.Fatalf("Failed to start miniredis: %v", err) + } + defer mr.Close() + + client := redis.NewClient(&redis.Options{ + Addr: mr.Addr(), + }) + + cb := NewRedisCircuitBreaker(client, RedisSettings{Settings: customSt}) + + // Test case + t.Run("Circuit Breaker State Transitions", func(t *testing.T) { + // Initial state should be Closed + assert.Equal(t, StateClosed, cb.State()) + + // Cause two consecutive failures to trip the circuit + for i := 0; i < 2; i++ { + err := failRequest(cb) + assert.NoError(t, err, "Fail request should not return an error") + } + + // Circuit should now be Open + assert.Equal(t, StateOpen, cb.State()) + assert.Equal(t, StateChange{"cb", StateClosed, StateOpen}, stateChange) + + // Requests should fail immediately when circuit is Open + err := successRequest(cb) + assert.Error(t, err) + assert.Equal(t, ErrOpenState, err) + + // Simulate timeout to transition to Half-Open + pseudoSleepRedis(cb, 6*time.Second) + assert.Equal(t, StateHalfOpen, cb.State()) + assert.Equal(t, StateChange{"cb", StateOpen, StateHalfOpen}, stateChange) + + // Successful requests in Half-Open state should close the circuit + for i := 0; i < int(cb.maxRequests); i++ { + err := successRequest(cb) + assert.NoError(t, err) + } + + // Circuit should now be Closed + assert.Equal(t, StateClosed, cb.State()) + assert.Equal(t, StateChange{"cb", StateHalfOpen, StateClosed}, stateChange) + + }) +} From 6bdaed6f4b2162723542c249928032c2993b76ad Mon Sep 17 00:00:00 2001 From: Kalpit Pant Date: Tue, 22 Oct 2024 10:53:21 +0530 Subject: [PATCH 08/14] Saving state transition --- redis_circuit_breaker.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/redis_circuit_breaker.go b/redis_circuit_breaker.go index 884d161..43a0720 100644 --- a/redis_circuit_breaker.go +++ b/redis_circuit_breaker.go @@ -103,6 +103,14 @@ func (rcb *RedisCircuitBreaker) beforeRequest() (uint64, error) { now := time.Now() currentState, generation := rcb.currentState(state, now) + if currentState != state.State { + rcb.setState(&state, currentState, now) + err = rcb.setRedisState(ctx, state) + if err != nil { + return 0, err + } + } + if currentState == StateOpen { return generation, ErrOpenState } else if currentState == StateHalfOpen && state.Counts.Requests >= rcb.maxRequests { From 04880044be9c19664317099b41042855bd8478f8 Mon Sep 17 00:00:00 2001 From: Kalpit Pant Date: Tue, 22 Oct 2024 12:29:58 +0530 Subject: [PATCH 09/14] Pass context --- redis_circuit_breaker.go | 17 +++--- redis_circuit_breaker_test.go | 108 ++++++++++++++++++---------------- 2 files changed, 65 insertions(+), 60 deletions(-) diff --git a/redis_circuit_breaker.go b/redis_circuit_breaker.go index 43a0720..5daa011 100644 --- a/redis_circuit_breaker.go +++ b/redis_circuit_breaker.go @@ -43,12 +43,11 @@ type RedisState struct { Expiry time.Time `json:"expiry"` } -func (rcb *RedisCircuitBreaker) State() State { +func (rcb *RedisCircuitBreaker) State(ctx context.Context) State { if rcb.redisClient == nil { return rcb.CircuitBreaker.State() } - ctx := context.Background() state, err := rcb.getRedisState(ctx) if err != nil { // Fallback to in-memory state if Redis fails @@ -71,11 +70,11 @@ func (rcb *RedisCircuitBreaker) State() State { } // Execute runs the given request if the RedisCircuitBreaker accepts it -func (rcb *RedisCircuitBreaker) Execute(req func() (interface{}, error)) (interface{}, error) { +func (rcb *RedisCircuitBreaker) Execute(ctx context.Context, req func() (interface{}, error)) (interface{}, error) { if rcb.redisClient == nil { return rcb.CircuitBreaker.Execute(req) } - generation, err := rcb.beforeRequest() + generation, err := rcb.beforeRequest(ctx) if err != nil { return nil, err } @@ -83,19 +82,18 @@ func (rcb *RedisCircuitBreaker) Execute(req func() (interface{}, error)) (interf defer func() { e := recover() if e != nil { - rcb.afterRequest(generation, false) + rcb.afterRequest(ctx, generation, false) panic(e) } }() result, err := req() - rcb.afterRequest(generation, rcb.isSuccessful(err)) + rcb.afterRequest(ctx, generation, rcb.isSuccessful(err)) return result, err } -func (rcb *RedisCircuitBreaker) beforeRequest() (uint64, error) { - ctx := context.Background() +func (rcb *RedisCircuitBreaker) beforeRequest(ctx context.Context) (uint64, error) { state, err := rcb.getRedisState(ctx) if err != nil { return 0, err @@ -126,8 +124,7 @@ func (rcb *RedisCircuitBreaker) beforeRequest() (uint64, error) { return generation, nil } -func (rcb *RedisCircuitBreaker) afterRequest(before uint64, success bool) { - ctx := context.Background() +func (rcb *RedisCircuitBreaker) afterRequest(ctx context.Context, before uint64, success bool) { state, err := rcb.getRedisState(ctx) if err != nil { return diff --git a/redis_circuit_breaker_test.go b/redis_circuit_breaker_test.go index 3ea947f..da37b45 100644 --- a/redis_circuit_breaker_test.go +++ b/redis_circuit_breaker_test.go @@ -37,8 +37,7 @@ func setupTestWithMiniredis() (*RedisCircuitBreaker, *miniredis.Miniredis, *redi }), mr, client } -func pseudoSleepRedis(rcb *RedisCircuitBreaker, period time.Duration) { - ctx := context.Background() +func pseudoSleepRedis(ctx context.Context, rcb *RedisCircuitBreaker, period time.Duration) { state, _ := rcb.getRedisState(ctx) state.Expiry = state.Expiry.Add(-period) @@ -47,16 +46,15 @@ func pseudoSleepRedis(rcb *RedisCircuitBreaker, period time.Duration) { state.Counts = Counts{} } rcb.setRedisState(ctx, state) - } -func successRequest(rcb *RedisCircuitBreaker) error { - _, err := rcb.Execute(func() (interface{}, error) { return nil, nil }) +func successRequest(ctx context.Context, rcb *RedisCircuitBreaker) error { + _, err := rcb.Execute(ctx, func() (interface{}, error) { return nil, nil }) return err } -func failRequest(rcb *RedisCircuitBreaker) error { - _, err := rcb.Execute(func() (interface{}, error) { return nil, errors.New("fail") }) +func failRequest(ctx context.Context, rcb *RedisCircuitBreaker) error { + _, err := rcb.Execute(ctx, func() (interface{}, error) { return nil, errors.New("fail") }) if err != nil && err.Error() == "fail" { return nil } @@ -67,13 +65,15 @@ func TestRedisCircuitBreakerInitialization(t *testing.T) { rcb, mr, _ := setupTestWithMiniredis() defer mr.Close() + ctx := context.Background() + assert.Equal(t, "TestBreaker", rcb.Name()) assert.Equal(t, uint32(3), rcb.maxRequests) assert.Equal(t, time.Second, rcb.interval) assert.Equal(t, time.Second*2, rcb.timeout) assert.NotNil(t, rcb.readyToTrip) - state := rcb.State() + state := rcb.State(ctx) assert.Equal(t, StateClosed, state) } @@ -81,51 +81,55 @@ func TestRedisCircuitBreakerStateTransitions(t *testing.T) { rcb, mr, _ := setupTestWithMiniredis() defer mr.Close() + ctx := context.Background() + // Check if initial state is closed - assert.Equal(t, StateClosed, rcb.State()) + assert.Equal(t, StateClosed, rcb.State(ctx)) // StateClosed to StateOpen for i := 0; i < 6; i++ { - assert.NoError(t, failRequest(rcb)) + assert.NoError(t, failRequest(ctx, rcb)) } - assert.Equal(t, StateOpen, rcb.State()) + assert.Equal(t, StateOpen, rcb.State(ctx)) // Ensure requests fail when circuit is open - err := failRequest(rcb) + err := failRequest(ctx, rcb) assert.Error(t, err) assert.Equal(t, ErrOpenState, err) // Wait for timeout to transition to half-open - pseudoSleepRedis(rcb, rcb.timeout) - assert.Equal(t, StateHalfOpen, rcb.State()) + pseudoSleepRedis(ctx, rcb, rcb.timeout) + assert.Equal(t, StateHalfOpen, rcb.State(ctx)) // StateHalfOpen to StateClosed for i := 0; i < int(rcb.maxRequests); i++ { - assert.NoError(t, successRequest(rcb)) + assert.NoError(t, successRequest(ctx, rcb)) } - assert.Equal(t, StateClosed, rcb.State()) + assert.Equal(t, StateClosed, rcb.State(ctx)) // StateClosed to StateOpen (again) for i := 0; i < 6; i++ { - assert.NoError(t, failRequest(rcb)) + assert.NoError(t, failRequest(ctx, rcb)) } - assert.Equal(t, StateOpen, rcb.State()) + assert.Equal(t, StateOpen, rcb.State(ctx)) } func TestRedisCircuitBreakerExecution(t *testing.T) { rcb, mr, _ := setupTestWithMiniredis() defer mr.Close() + ctx := context.Background() + // Test successful execution - result, err := rcb.Execute(func() (interface{}, error) { + result, err := rcb.Execute(ctx, func() (interface{}, error) { return "success", nil }) assert.NoError(t, err) assert.Equal(t, "success", result) // Test failed execution - _, err = rcb.Execute(func() (interface{}, error) { + _, err = rcb.Execute(ctx, func() (interface{}, error) { return nil, errors.New("test error") }) assert.Error(t, err) @@ -136,15 +140,16 @@ func TestRedisCircuitBreakerCounts(t *testing.T) { rcb, mr, _ := setupTestWithMiniredis() defer mr.Close() + ctx := context.Background() + for i := 0; i < 5; i++ { - assert.Nil(t, successRequest(rcb)) + assert.Nil(t, successRequest(ctx, rcb)) } - ctx := context.Background() state, _ := rcb.getRedisState(ctx) assert.Equal(t, Counts{5, 5, 0, 5, 0}, state.Counts) - assert.Nil(t, failRequest(rcb)) + assert.Nil(t, failRequest(ctx, rcb)) state, _ = rcb.getRedisState(ctx) assert.Equal(t, Counts{6, 5, 1, 0, 1}, state.Counts) } @@ -153,17 +158,19 @@ func TestRedisCircuitBreakerFallback(t *testing.T) { rcb, mr, _ := setupTestWithMiniredis() defer mr.Close() + ctx := context.Background() + // Test when Redis is unavailable mr.Close() // Simulate Redis being unavailable rcb.redisClient = nil - state := rcb.State() + state := rcb.State(ctx) assert.Equal(t, StateClosed, state, "Should fallback to in-memory state when Redis is unavailable") // Ensure operations still work without Redis - assert.Nil(t, successRequest(rcb)) - assert.Nil(t, failRequest(rcb)) + assert.Nil(t, successRequest(ctx, rcb)) + assert.Nil(t, failRequest(ctx, rcb)) } func TestCustomRedisCircuitBreaker(t *testing.T) { @@ -191,18 +198,18 @@ func TestCustomRedisCircuitBreaker(t *testing.T) { }, }) + ctx := context.Background() + t.Run("Initialization", func(t *testing.T) { assert.Equal(t, "CustomBreaker", customRCB.Name()) - assert.Equal(t, StateClosed, customRCB.State()) + assert.Equal(t, StateClosed, customRCB.State(ctx)) }) t.Run("Counts and State Transitions", func(t *testing.T) { - ctx := context.Background() - // Perform 5 successful and 5 failed requests for i := 0; i < 5; i++ { - assert.NoError(t, successRequest(customRCB)) - assert.NoError(t, failRequest(customRCB)) + assert.NoError(t, successRequest(ctx, customRCB)) + assert.NoError(t, failRequest(ctx, customRCB)) } state, err := customRCB.getRedisState(ctx) @@ -211,21 +218,21 @@ func TestCustomRedisCircuitBreaker(t *testing.T) { assert.Equal(t, Counts{10, 5, 5, 0, 1}, state.Counts) // Perform one more successful request - assert.NoError(t, successRequest(customRCB)) + assert.NoError(t, successRequest(ctx, customRCB)) state, err = customRCB.getRedisState(ctx) assert.NoError(t, err) assert.Equal(t, Counts{11, 6, 5, 1, 0}, state.Counts) // Simulate time passing to reset counts - pseudoSleepRedis(customRCB, time.Second*30) + pseudoSleepRedis(ctx, customRCB, time.Second*30) // Perform requests to trigger StateOpen - assert.NoError(t, successRequest(customRCB)) - assert.NoError(t, failRequest(customRCB)) - assert.NoError(t, failRequest(customRCB)) + assert.NoError(t, successRequest(ctx, customRCB)) + assert.NoError(t, failRequest(ctx, customRCB)) + assert.NoError(t, failRequest(ctx, customRCB)) // Check if the circuit breaker is now open - assert.Equal(t, StateOpen, customRCB.State()) + assert.Equal(t, StateOpen, customRCB.State(ctx)) state, err = customRCB.getRedisState(ctx) assert.NoError(t, err) @@ -234,14 +241,14 @@ func TestCustomRedisCircuitBreaker(t *testing.T) { t.Run("Timeout and Half-Open State", func(t *testing.T) { // Simulate timeout to transition to half-open state - pseudoSleepRedis(customRCB, time.Second*90) - assert.Equal(t, StateHalfOpen, customRCB.State()) + pseudoSleepRedis(ctx, customRCB, time.Second*90) + assert.Equal(t, StateHalfOpen, customRCB.State(ctx)) // Successful requests in half-open state should close the circuit for i := 0; i < 3; i++ { - assert.NoError(t, successRequest(customRCB)) + assert.NoError(t, successRequest(ctx, customRCB)) } - assert.Equal(t, StateClosed, customRCB.State()) + assert.Equal(t, StateClosed, customRCB.State(ctx)) }) } @@ -273,40 +280,41 @@ func TestCustomRedisCircuitBreakerStateTransitions(t *testing.T) { cb := NewRedisCircuitBreaker(client, RedisSettings{Settings: customSt}) + ctx := context.Background() + // Test case t.Run("Circuit Breaker State Transitions", func(t *testing.T) { // Initial state should be Closed - assert.Equal(t, StateClosed, cb.State()) + assert.Equal(t, StateClosed, cb.State(ctx)) // Cause two consecutive failures to trip the circuit for i := 0; i < 2; i++ { - err := failRequest(cb) + err := failRequest(ctx, cb) assert.NoError(t, err, "Fail request should not return an error") } // Circuit should now be Open - assert.Equal(t, StateOpen, cb.State()) + assert.Equal(t, StateOpen, cb.State(ctx)) assert.Equal(t, StateChange{"cb", StateClosed, StateOpen}, stateChange) // Requests should fail immediately when circuit is Open - err := successRequest(cb) + err := successRequest(ctx, cb) assert.Error(t, err) assert.Equal(t, ErrOpenState, err) // Simulate timeout to transition to Half-Open - pseudoSleepRedis(cb, 6*time.Second) - assert.Equal(t, StateHalfOpen, cb.State()) + pseudoSleepRedis(ctx, cb, 6*time.Second) + assert.Equal(t, StateHalfOpen, cb.State(ctx)) assert.Equal(t, StateChange{"cb", StateOpen, StateHalfOpen}, stateChange) // Successful requests in Half-Open state should close the circuit for i := 0; i < int(cb.maxRequests); i++ { - err := successRequest(cb) + err := successRequest(ctx, cb) assert.NoError(t, err) } // Circuit should now be Closed - assert.Equal(t, StateClosed, cb.State()) + assert.Equal(t, StateClosed, cb.State(ctx)) assert.Equal(t, StateChange{"cb", StateHalfOpen, StateClosed}, stateChange) - }) } From 36b0f9e8c0f6d5b2adb0f77f193dd7abd4fccf3c Mon Sep 17 00:00:00 2001 From: Kalpit Pant Date: Tue, 29 Oct 2024 21:50:06 +0530 Subject: [PATCH 10/14] Moved redis circuit breaker to v2 --- v2/go.mod | 9 +++++ v2/go.sum | 12 ++++++ .../redis_circuit_breaker.go | 37 ++++++++++--------- .../redis_circuit_breaker_test.go | 18 ++++----- 4 files changed, 49 insertions(+), 27 deletions(-) rename redis_circuit_breaker.go => v2/redis_circuit_breaker.go (77%) rename redis_circuit_breaker_test.go => v2/redis_circuit_breaker_test.go (94%) diff --git a/v2/go.mod b/v2/go.mod index 9e1537a..eb1204f 100644 --- a/v2/go.mod +++ b/v2/go.mod @@ -5,7 +5,16 @@ go 1.21 require github.com/stretchr/testify v1.8.4 require ( + github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/yuin/gopher-lua v1.1.1 // indirect +) + +require ( + github.com/alicebob/miniredis/v2 v2.33.0 github.com/davecgh/go-spew v1.1.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/redis/go-redis/v9 v9.7.0 gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/v2/go.sum b/v2/go.sum index fa4b6e6..f36dd60 100644 --- a/v2/go.sum +++ b/v2/go.sum @@ -1,9 +1,21 @@ +github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk= +github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= +github.com/alicebob/miniredis/v2 v2.33.0 h1:uvTF0EDeu9RLnUEG27Db5I68ESoIxTiXbNUiji6lZrA= +github.com/alicebob/miniredis/v2 v2.33.0/go.mod h1:MhP4a3EU7aENRi9aO+tHfTBZicLqQevyi/DJpoj6mi0= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E= +github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M= +github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/redis_circuit_breaker.go b/v2/redis_circuit_breaker.go similarity index 77% rename from redis_circuit_breaker.go rename to v2/redis_circuit_breaker.go index 5daa011..318a7b9 100644 --- a/redis_circuit_breaker.go +++ b/v2/redis_circuit_breaker.go @@ -15,8 +15,8 @@ type CacheClient interface { } // RedisCircuitBreaker extends CircuitBreaker with Redis-based state storage -type RedisCircuitBreaker struct { - *CircuitBreaker +type RedisCircuitBreaker[T any] struct { + *CircuitBreaker[T] redisClient CacheClient } @@ -27,9 +27,9 @@ type RedisSettings struct { } // NewRedisCircuitBreaker returns a new RedisCircuitBreaker configured with the given RedisSettings -func NewRedisCircuitBreaker(redisClient CacheClient, settings RedisSettings) *RedisCircuitBreaker { - cb := NewCircuitBreaker(settings.Settings) - return &RedisCircuitBreaker{ +func NewRedisCircuitBreaker[T any](redisClient CacheClient, settings RedisSettings) *RedisCircuitBreaker[T] { + cb := NewCircuitBreaker[T](settings.Settings) + return &RedisCircuitBreaker[T]{ CircuitBreaker: cb, redisClient: redisClient, } @@ -43,7 +43,7 @@ type RedisState struct { Expiry time.Time `json:"expiry"` } -func (rcb *RedisCircuitBreaker) State(ctx context.Context) State { +func (rcb *RedisCircuitBreaker[T]) State(ctx context.Context) State { if rcb.redisClient == nil { return rcb.CircuitBreaker.State() } @@ -70,13 +70,14 @@ func (rcb *RedisCircuitBreaker) State(ctx context.Context) State { } // Execute runs the given request if the RedisCircuitBreaker accepts it -func (rcb *RedisCircuitBreaker) Execute(ctx context.Context, req func() (interface{}, error)) (interface{}, error) { +func (rcb *RedisCircuitBreaker[T]) Execute(ctx context.Context, req func() (T, error)) (T, error) { if rcb.redisClient == nil { return rcb.CircuitBreaker.Execute(req) } generation, err := rcb.beforeRequest(ctx) if err != nil { - return nil, err + var zero T + return zero, err } defer func() { @@ -93,7 +94,7 @@ func (rcb *RedisCircuitBreaker) Execute(ctx context.Context, req func() (interfa return result, err } -func (rcb *RedisCircuitBreaker) beforeRequest(ctx context.Context) (uint64, error) { +func (rcb *RedisCircuitBreaker[T]) beforeRequest(ctx context.Context) (uint64, error) { state, err := rcb.getRedisState(ctx) if err != nil { return 0, err @@ -124,7 +125,7 @@ func (rcb *RedisCircuitBreaker) beforeRequest(ctx context.Context) (uint64, erro return generation, nil } -func (rcb *RedisCircuitBreaker) afterRequest(ctx context.Context, before uint64, success bool) { +func (rcb *RedisCircuitBreaker[T]) afterRequest(ctx context.Context, before uint64, success bool) { state, err := rcb.getRedisState(ctx) if err != nil { return @@ -144,7 +145,7 @@ func (rcb *RedisCircuitBreaker) afterRequest(ctx context.Context, before uint64, rcb.setRedisState(ctx, state) } -func (rcb *RedisCircuitBreaker) onSuccess(state *RedisState, currentState State, now time.Time) { +func (rcb *RedisCircuitBreaker[T]) onSuccess(state *RedisState, currentState State, now time.Time) { if state.State == StateOpen { state.State = currentState } @@ -160,7 +161,7 @@ func (rcb *RedisCircuitBreaker) onSuccess(state *RedisState, currentState State, } } -func (rcb *RedisCircuitBreaker) onFailure(state *RedisState, currentState State, now time.Time) { +func (rcb *RedisCircuitBreaker[T]) onFailure(state *RedisState, currentState State, now time.Time) { switch currentState { case StateClosed: state.Counts.onFailure() @@ -172,7 +173,7 @@ func (rcb *RedisCircuitBreaker) onFailure(state *RedisState, currentState State, } } -func (rcb *RedisCircuitBreaker) currentState(state RedisState, now time.Time) (State, uint64) { +func (rcb *RedisCircuitBreaker[T]) currentState(state RedisState, now time.Time) (State, uint64) { switch state.State { case StateClosed: if !state.Expiry.IsZero() && state.Expiry.Before(now) { @@ -186,7 +187,7 @@ func (rcb *RedisCircuitBreaker) currentState(state RedisState, now time.Time) (S return state.State, state.Generation } -func (rcb *RedisCircuitBreaker) setState(state *RedisState, newState State, now time.Time) { +func (rcb *RedisCircuitBreaker[T]) setState(state *RedisState, newState State, now time.Time) { if state.State == newState { return } @@ -201,7 +202,7 @@ func (rcb *RedisCircuitBreaker) setState(state *RedisState, newState State, now } } -func (rcb *RedisCircuitBreaker) toNewGeneration(state *RedisState, now time.Time) { +func (rcb *RedisCircuitBreaker[T]) toNewGeneration(state *RedisState, now time.Time) { state.Generation++ state.Counts.clear() @@ -221,11 +222,11 @@ func (rcb *RedisCircuitBreaker) toNewGeneration(state *RedisState, now time.Time } } -func (rcb *RedisCircuitBreaker) getRedisKey() string { +func (rcb *RedisCircuitBreaker[T]) getRedisKey() string { return "cb:" + rcb.name } -func (rcb *RedisCircuitBreaker) getRedisState(ctx context.Context) (RedisState, error) { +func (rcb *RedisCircuitBreaker[T]) getRedisState(ctx context.Context) (RedisState, error) { var state RedisState data, err := rcb.redisClient.Get(ctx, rcb.getRedisKey()).Bytes() if err == redis.Nil { @@ -239,7 +240,7 @@ func (rcb *RedisCircuitBreaker) getRedisState(ctx context.Context) (RedisState, return state, err } -func (rcb *RedisCircuitBreaker) setRedisState(ctx context.Context, state RedisState) error { +func (rcb *RedisCircuitBreaker[T]) setRedisState(ctx context.Context, state RedisState) error { data, err := json.Marshal(state) if err != nil { return err diff --git a/redis_circuit_breaker_test.go b/v2/redis_circuit_breaker_test.go similarity index 94% rename from redis_circuit_breaker_test.go rename to v2/redis_circuit_breaker_test.go index da37b45..b015431 100644 --- a/redis_circuit_breaker_test.go +++ b/v2/redis_circuit_breaker_test.go @@ -11,10 +11,10 @@ import ( "github.com/stretchr/testify/assert" ) -var defaultRCB *RedisCircuitBreaker -var customRCB *RedisCircuitBreaker +var defaultRCB *RedisCircuitBreaker[any] +var customRCB *RedisCircuitBreaker[any] -func setupTestWithMiniredis() (*RedisCircuitBreaker, *miniredis.Miniredis, *redis.Client) { +func setupTestWithMiniredis() (*RedisCircuitBreaker[any], *miniredis.Miniredis, *redis.Client) { mr, err := miniredis.Run() if err != nil { panic(err) @@ -24,7 +24,7 @@ func setupTestWithMiniredis() (*RedisCircuitBreaker, *miniredis.Miniredis, *redi Addr: mr.Addr(), }) - return NewRedisCircuitBreaker(client, RedisSettings{ + return NewRedisCircuitBreaker[any](client, RedisSettings{ Settings: Settings{ Name: "TestBreaker", MaxRequests: 3, @@ -37,7 +37,7 @@ func setupTestWithMiniredis() (*RedisCircuitBreaker, *miniredis.Miniredis, *redi }), mr, client } -func pseudoSleepRedis(ctx context.Context, rcb *RedisCircuitBreaker, period time.Duration) { +func pseudoSleepRedis(ctx context.Context, rcb *RedisCircuitBreaker[any], period time.Duration) { state, _ := rcb.getRedisState(ctx) state.Expiry = state.Expiry.Add(-period) @@ -48,12 +48,12 @@ func pseudoSleepRedis(ctx context.Context, rcb *RedisCircuitBreaker, period time rcb.setRedisState(ctx, state) } -func successRequest(ctx context.Context, rcb *RedisCircuitBreaker) error { +func successRequest(ctx context.Context, rcb *RedisCircuitBreaker[any]) error { _, err := rcb.Execute(ctx, func() (interface{}, error) { return nil, nil }) return err } -func failRequest(ctx context.Context, rcb *RedisCircuitBreaker) error { +func failRequest(ctx context.Context, rcb *RedisCircuitBreaker[any]) error { _, err := rcb.Execute(ctx, func() (interface{}, error) { return nil, errors.New("fail") }) if err != nil && err.Error() == "fail" { return nil @@ -184,7 +184,7 @@ func TestCustomRedisCircuitBreaker(t *testing.T) { Addr: mr.Addr(), }) - customRCB = NewRedisCircuitBreaker(client, RedisSettings{ + customRCB = NewRedisCircuitBreaker[any](client, RedisSettings{ Settings: Settings{ Name: "CustomBreaker", MaxRequests: 3, @@ -278,7 +278,7 @@ func TestCustomRedisCircuitBreakerStateTransitions(t *testing.T) { Addr: mr.Addr(), }) - cb := NewRedisCircuitBreaker(client, RedisSettings{Settings: customSt}) + cb := NewRedisCircuitBreaker[any](client, RedisSettings{Settings: customSt}) ctx := context.Background() From 164badd96060c5384ce7d3fe2a3bcfa172f665ba Mon Sep 17 00:00:00 2001 From: Kalpit Pant Date: Tue, 5 Nov 2024 22:09:10 +0530 Subject: [PATCH 11/14] Revert go.mod and go.sum --- go.mod | 6 +----- go.sum | 19 ------------------- 2 files changed, 1 insertion(+), 24 deletions(-) diff --git a/go.mod b/go.mod index f33f655..4367bed 100644 --- a/go.mod +++ b/go.mod @@ -2,8 +2,4 @@ module github.com/sony/gobreaker go 1.12 -require ( - github.com/alicebob/miniredis/v2 v2.33.0 - github.com/redis/go-redis/v9 v9.6.2 - github.com/stretchr/testify v1.3.0 -) +require github.com/stretchr/testify v1.3.0 diff --git a/go.sum b/go.sum index ef08208..4347755 100644 --- a/go.sum +++ b/go.sum @@ -1,26 +1,7 @@ -github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk= -github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= -github.com/alicebob/miniredis/v2 v2.33.0 h1:uvTF0EDeu9RLnUEG27Db5I68ESoIxTiXbNUiji6lZrA= -github.com/alicebob/miniredis/v2 v2.33.0/go.mod h1:MhP4a3EU7aENRi9aO+tHfTBZicLqQevyi/DJpoj6mi0= -github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= -github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= -github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= -github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= -github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= -github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= -github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/redis/go-redis/v9 v9.6.2 h1:w0uvkRbc9KpgD98zcvo5IrVUsn0lXpRMuhNgiHDJzdk= -github.com/redis/go-redis/v9 v9.6.2/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= -github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M= -github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= -golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= From fffe01391617764a38fc6c35fb65c402a24f4648 Mon Sep 17 00:00:00 2001 From: Kalpit Pant Date: Tue, 5 Nov 2024 22:38:05 +0530 Subject: [PATCH 12/14] Acked review comments --- ...aker.go => distributed_circuit_breaker.go} | 89 +++++++++---------- ...go => distributed_circuit_breaker_test.go} | 52 +++++++---- 2 files changed, 78 insertions(+), 63 deletions(-) rename v2/{redis_circuit_breaker.go => distributed_circuit_breaker.go} (56%) rename v2/{redis_circuit_breaker_test.go => distributed_circuit_breaker_test.go} (83%) diff --git a/v2/redis_circuit_breaker.go b/v2/distributed_circuit_breaker.go similarity index 56% rename from v2/redis_circuit_breaker.go rename to v2/distributed_circuit_breaker.go index 318a7b9..66619cf 100644 --- a/v2/redis_circuit_breaker.go +++ b/v2/distributed_circuit_breaker.go @@ -5,73 +5,70 @@ import ( "encoding/json" "fmt" "time" - - "github.com/redis/go-redis/v9" ) type CacheClient interface { - Get(ctx context.Context, key string) *redis.StringCmd - Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.StatusCmd + Get(ctx context.Context, key string) ([]byte, error) + Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error } -// RedisCircuitBreaker extends CircuitBreaker with Redis-based state storage -type RedisCircuitBreaker[T any] struct { +// DistributedCircuitBreaker extends CircuitBreaker with distributed state storage +type DistributedCircuitBreaker[T any] struct { *CircuitBreaker[T] - redisClient CacheClient + cacheClient CacheClient } -// RedisSettings extends Settings with Redis configuration -type RedisSettings struct { +// StorageSettings extends Settings +type StorageSettings struct { Settings - RedisKey string } -// NewRedisCircuitBreaker returns a new RedisCircuitBreaker configured with the given RedisSettings -func NewRedisCircuitBreaker[T any](redisClient CacheClient, settings RedisSettings) *RedisCircuitBreaker[T] { +// NewDistributedCircuitBreaker returns a new DistributedCircuitBreaker configured with the given StorageSettings +func NewDistributedCircuitBreaker[T any](storageClient CacheClient, settings StorageSettings) *DistributedCircuitBreaker[T] { cb := NewCircuitBreaker[T](settings.Settings) - return &RedisCircuitBreaker[T]{ + return &DistributedCircuitBreaker[T]{ CircuitBreaker: cb, - redisClient: redisClient, + cacheClient: storageClient, } } -// RedisState represents the CircuitBreaker state stored in Redis -type RedisState struct { +// StoredState represents the CircuitBreaker state stored in Distributed Storage +type StoredState struct { State State `json:"state"` Generation uint64 `json:"generation"` Counts Counts `json:"counts"` Expiry time.Time `json:"expiry"` } -func (rcb *RedisCircuitBreaker[T]) State(ctx context.Context) State { - if rcb.redisClient == nil { +func (rcb *DistributedCircuitBreaker[T]) State(ctx context.Context) State { + if rcb.cacheClient == nil { return rcb.CircuitBreaker.State() } - state, err := rcb.getRedisState(ctx) + state, err := rcb.getStoredState(ctx) if err != nil { - // Fallback to in-memory state if Redis fails + // Fallback to in-memory state if Storage fails return rcb.CircuitBreaker.State() } now := time.Now() currentState, _ := rcb.currentState(state, now) - // Update the state in Redis if it has changed + // Update the state in Storage if it has changed if currentState != state.State { state.State = currentState - if err := rcb.setRedisState(ctx, state); err != nil { + if err := rcb.setStoredState(ctx, state); err != nil { // Log the error, but continue with the current state - fmt.Printf("Failed to update state in Redis: %v\n", err) + fmt.Printf("Failed to update state in storage: %v\n", err) } } return state.State } -// Execute runs the given request if the RedisCircuitBreaker accepts it -func (rcb *RedisCircuitBreaker[T]) Execute(ctx context.Context, req func() (T, error)) (T, error) { - if rcb.redisClient == nil { +// Execute runs the given request if the DistributedCircuitBreaker accepts it +func (rcb *DistributedCircuitBreaker[T]) Execute(ctx context.Context, req func() (T, error)) (T, error) { + if rcb.cacheClient == nil { return rcb.CircuitBreaker.Execute(req) } generation, err := rcb.beforeRequest(ctx) @@ -94,8 +91,8 @@ func (rcb *RedisCircuitBreaker[T]) Execute(ctx context.Context, req func() (T, e return result, err } -func (rcb *RedisCircuitBreaker[T]) beforeRequest(ctx context.Context) (uint64, error) { - state, err := rcb.getRedisState(ctx) +func (rcb *DistributedCircuitBreaker[T]) beforeRequest(ctx context.Context) (uint64, error) { + state, err := rcb.getStoredState(ctx) if err != nil { return 0, err } @@ -104,7 +101,7 @@ func (rcb *RedisCircuitBreaker[T]) beforeRequest(ctx context.Context) (uint64, e if currentState != state.State { rcb.setState(&state, currentState, now) - err = rcb.setRedisState(ctx, state) + err = rcb.setStoredState(ctx, state) if err != nil { return 0, err } @@ -117,7 +114,7 @@ func (rcb *RedisCircuitBreaker[T]) beforeRequest(ctx context.Context) (uint64, e } state.Counts.onRequest() - err = rcb.setRedisState(ctx, state) + err = rcb.setStoredState(ctx, state) if err != nil { return 0, err } @@ -125,8 +122,8 @@ func (rcb *RedisCircuitBreaker[T]) beforeRequest(ctx context.Context) (uint64, e return generation, nil } -func (rcb *RedisCircuitBreaker[T]) afterRequest(ctx context.Context, before uint64, success bool) { - state, err := rcb.getRedisState(ctx) +func (rcb *DistributedCircuitBreaker[T]) afterRequest(ctx context.Context, before uint64, success bool) { + state, err := rcb.getStoredState(ctx) if err != nil { return } @@ -142,10 +139,10 @@ func (rcb *RedisCircuitBreaker[T]) afterRequest(ctx context.Context, before uint rcb.onFailure(&state, currentState, now) } - rcb.setRedisState(ctx, state) + rcb.setStoredState(ctx, state) } -func (rcb *RedisCircuitBreaker[T]) onSuccess(state *RedisState, currentState State, now time.Time) { +func (rcb *DistributedCircuitBreaker[T]) onSuccess(state *StoredState, currentState State, now time.Time) { if state.State == StateOpen { state.State = currentState } @@ -161,7 +158,7 @@ func (rcb *RedisCircuitBreaker[T]) onSuccess(state *RedisState, currentState Sta } } -func (rcb *RedisCircuitBreaker[T]) onFailure(state *RedisState, currentState State, now time.Time) { +func (rcb *DistributedCircuitBreaker[T]) onFailure(state *StoredState, currentState State, now time.Time) { switch currentState { case StateClosed: state.Counts.onFailure() @@ -173,7 +170,7 @@ func (rcb *RedisCircuitBreaker[T]) onFailure(state *RedisState, currentState Sta } } -func (rcb *RedisCircuitBreaker[T]) currentState(state RedisState, now time.Time) (State, uint64) { +func (rcb *DistributedCircuitBreaker[T]) currentState(state StoredState, now time.Time) (State, uint64) { switch state.State { case StateClosed: if !state.Expiry.IsZero() && state.Expiry.Before(now) { @@ -187,7 +184,7 @@ func (rcb *RedisCircuitBreaker[T]) currentState(state RedisState, now time.Time) return state.State, state.Generation } -func (rcb *RedisCircuitBreaker[T]) setState(state *RedisState, newState State, now time.Time) { +func (rcb *DistributedCircuitBreaker[T]) setState(state *StoredState, newState State, now time.Time) { if state.State == newState { return } @@ -202,7 +199,7 @@ func (rcb *RedisCircuitBreaker[T]) setState(state *RedisState, newState State, n } } -func (rcb *RedisCircuitBreaker[T]) toNewGeneration(state *RedisState, now time.Time) { +func (rcb *DistributedCircuitBreaker[T]) toNewGeneration(state *StoredState, now time.Time) { state.Generation++ state.Counts.clear() @@ -222,16 +219,16 @@ func (rcb *RedisCircuitBreaker[T]) toNewGeneration(state *RedisState, now time.T } } -func (rcb *RedisCircuitBreaker[T]) getRedisKey() string { +func (rcb *DistributedCircuitBreaker[T]) getStorageKey() string { return "cb:" + rcb.name } -func (rcb *RedisCircuitBreaker[T]) getRedisState(ctx context.Context) (RedisState, error) { - var state RedisState - data, err := rcb.redisClient.Get(ctx, rcb.getRedisKey()).Bytes() - if err == redis.Nil { +func (rcb *DistributedCircuitBreaker[T]) getStoredState(ctx context.Context) (StoredState, error) { + var state StoredState + data, err := rcb.cacheClient.Get(ctx, rcb.getStorageKey()) + if len(data) == 0 { // Key doesn't exist, return default state - return RedisState{State: StateClosed}, nil + return StoredState{State: StateClosed}, nil } else if err != nil { return state, err } @@ -240,11 +237,11 @@ func (rcb *RedisCircuitBreaker[T]) getRedisState(ctx context.Context) (RedisStat return state, err } -func (rcb *RedisCircuitBreaker[T]) setRedisState(ctx context.Context, state RedisState) error { +func (rcb *DistributedCircuitBreaker[T]) setStoredState(ctx context.Context, state StoredState) error { data, err := json.Marshal(state) if err != nil { return err } - return rcb.redisClient.Set(ctx, rcb.getRedisKey(), data, 0).Err() + return rcb.cacheClient.Set(ctx, rcb.getStorageKey(), data, 0) } diff --git a/v2/redis_circuit_breaker_test.go b/v2/distributed_circuit_breaker_test.go similarity index 83% rename from v2/redis_circuit_breaker_test.go rename to v2/distributed_circuit_breaker_test.go index b015431..f8eecfb 100644 --- a/v2/redis_circuit_breaker_test.go +++ b/v2/distributed_circuit_breaker_test.go @@ -11,10 +11,22 @@ import ( "github.com/stretchr/testify/assert" ) -var defaultRCB *RedisCircuitBreaker[any] -var customRCB *RedisCircuitBreaker[any] +var defaultRCB *DistributedCircuitBreaker[any] +var customRCB *DistributedCircuitBreaker[any] -func setupTestWithMiniredis() (*RedisCircuitBreaker[any], *miniredis.Miniredis, *redis.Client) { +type storageAdapter struct { + client *redis.Client +} + +func (r *storageAdapter) Get(ctx context.Context, key string) ([]byte, error) { + return r.client.Get(ctx, key).Bytes() +} + +func (r *storageAdapter) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error { + return r.client.Set(ctx, key, value, expiration).Err() +} + +func setupTestWithMiniredis() (*DistributedCircuitBreaker[any], *miniredis.Miniredis, *redis.Client) { mr, err := miniredis.Run() if err != nil { panic(err) @@ -24,7 +36,9 @@ func setupTestWithMiniredis() (*RedisCircuitBreaker[any], *miniredis.Miniredis, Addr: mr.Addr(), }) - return NewRedisCircuitBreaker[any](client, RedisSettings{ + storageClient := &storageAdapter{client: client} + + return NewDistributedCircuitBreaker[any](storageClient, StorageSettings{ Settings: Settings{ Name: "TestBreaker", MaxRequests: 3, @@ -37,23 +51,23 @@ func setupTestWithMiniredis() (*RedisCircuitBreaker[any], *miniredis.Miniredis, }), mr, client } -func pseudoSleepRedis(ctx context.Context, rcb *RedisCircuitBreaker[any], period time.Duration) { - state, _ := rcb.getRedisState(ctx) +func pseudoSleepRedis(ctx context.Context, rcb *DistributedCircuitBreaker[any], period time.Duration) { + state, _ := rcb.getStoredState(ctx) state.Expiry = state.Expiry.Add(-period) // Reset counts if the interval has passed if time.Now().After(state.Expiry) { state.Counts = Counts{} } - rcb.setRedisState(ctx, state) + rcb.setStoredState(ctx, state) } -func successRequest(ctx context.Context, rcb *RedisCircuitBreaker[any]) error { +func successRequest(ctx context.Context, rcb *DistributedCircuitBreaker[any]) error { _, err := rcb.Execute(ctx, func() (interface{}, error) { return nil, nil }) return err } -func failRequest(ctx context.Context, rcb *RedisCircuitBreaker[any]) error { +func failRequest(ctx context.Context, rcb *DistributedCircuitBreaker[any]) error { _, err := rcb.Execute(ctx, func() (interface{}, error) { return nil, errors.New("fail") }) if err != nil && err.Error() == "fail" { return nil @@ -146,11 +160,11 @@ func TestRedisCircuitBreakerCounts(t *testing.T) { assert.Nil(t, successRequest(ctx, rcb)) } - state, _ := rcb.getRedisState(ctx) + state, _ := rcb.getStoredState(ctx) assert.Equal(t, Counts{5, 5, 0, 5, 0}, state.Counts) assert.Nil(t, failRequest(ctx, rcb)) - state, _ = rcb.getRedisState(ctx) + state, _ = rcb.getStoredState(ctx) assert.Equal(t, Counts{6, 5, 1, 0, 1}, state.Counts) } @@ -163,7 +177,7 @@ func TestRedisCircuitBreakerFallback(t *testing.T) { // Test when Redis is unavailable mr.Close() // Simulate Redis being unavailable - rcb.redisClient = nil + rcb.cacheClient = nil state := rcb.State(ctx) assert.Equal(t, StateClosed, state, "Should fallback to in-memory state when Redis is unavailable") @@ -184,7 +198,9 @@ func TestCustomRedisCircuitBreaker(t *testing.T) { Addr: mr.Addr(), }) - customRCB = NewRedisCircuitBreaker[any](client, RedisSettings{ + storageClient := &storageAdapter{client: client} + + customRCB = NewDistributedCircuitBreaker[any](storageClient, StorageSettings{ Settings: Settings{ Name: "CustomBreaker", MaxRequests: 3, @@ -212,14 +228,14 @@ func TestCustomRedisCircuitBreaker(t *testing.T) { assert.NoError(t, failRequest(ctx, customRCB)) } - state, err := customRCB.getRedisState(ctx) + state, err := customRCB.getStoredState(ctx) assert.NoError(t, err) assert.Equal(t, StateClosed, state.State) assert.Equal(t, Counts{10, 5, 5, 0, 1}, state.Counts) // Perform one more successful request assert.NoError(t, successRequest(ctx, customRCB)) - state, err = customRCB.getRedisState(ctx) + state, err = customRCB.getStoredState(ctx) assert.NoError(t, err) assert.Equal(t, Counts{11, 6, 5, 1, 0}, state.Counts) @@ -234,7 +250,7 @@ func TestCustomRedisCircuitBreaker(t *testing.T) { // Check if the circuit breaker is now open assert.Equal(t, StateOpen, customRCB.State(ctx)) - state, err = customRCB.getRedisState(ctx) + state, err = customRCB.getStoredState(ctx) assert.NoError(t, err) assert.Equal(t, Counts{0, 0, 0, 0, 0}, state.Counts) }) @@ -278,7 +294,9 @@ func TestCustomRedisCircuitBreakerStateTransitions(t *testing.T) { Addr: mr.Addr(), }) - cb := NewRedisCircuitBreaker[any](client, RedisSettings{Settings: customSt}) + storageClient := &storageAdapter{client: client} + + cb := NewDistributedCircuitBreaker[any](storageClient, StorageSettings{Settings: customSt}) ctx := context.Background() From aace9906184b548871d03859c4f2fc3aaf1d4bf9 Mon Sep 17 00:00:00 2001 From: Kalpit Pant Date: Tue, 5 Nov 2024 22:42:45 +0530 Subject: [PATCH 13/14] Refactor --- v2/distributed_circuit_breaker_test.go | 32 +++++++++++++------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/v2/distributed_circuit_breaker_test.go b/v2/distributed_circuit_breaker_test.go index f8eecfb..7666b7b 100644 --- a/v2/distributed_circuit_breaker_test.go +++ b/v2/distributed_circuit_breaker_test.go @@ -51,7 +51,7 @@ func setupTestWithMiniredis() (*DistributedCircuitBreaker[any], *miniredis.Minir }), mr, client } -func pseudoSleepRedis(ctx context.Context, rcb *DistributedCircuitBreaker[any], period time.Duration) { +func pseudoSleepStorage(ctx context.Context, rcb *DistributedCircuitBreaker[any], period time.Duration) { state, _ := rcb.getStoredState(ctx) state.Expiry = state.Expiry.Add(-period) @@ -75,7 +75,7 @@ func failRequest(ctx context.Context, rcb *DistributedCircuitBreaker[any]) error return err } -func TestRedisCircuitBreakerInitialization(t *testing.T) { +func TestDistributedCircuitBreakerInitialization(t *testing.T) { rcb, mr, _ := setupTestWithMiniredis() defer mr.Close() @@ -91,7 +91,7 @@ func TestRedisCircuitBreakerInitialization(t *testing.T) { assert.Equal(t, StateClosed, state) } -func TestRedisCircuitBreakerStateTransitions(t *testing.T) { +func TestDistributedCircuitBreakerStateTransitions(t *testing.T) { rcb, mr, _ := setupTestWithMiniredis() defer mr.Close() @@ -113,7 +113,7 @@ func TestRedisCircuitBreakerStateTransitions(t *testing.T) { assert.Equal(t, ErrOpenState, err) // Wait for timeout to transition to half-open - pseudoSleepRedis(ctx, rcb, rcb.timeout) + pseudoSleepStorage(ctx, rcb, rcb.timeout) assert.Equal(t, StateHalfOpen, rcb.State(ctx)) // StateHalfOpen to StateClosed @@ -129,7 +129,7 @@ func TestRedisCircuitBreakerStateTransitions(t *testing.T) { assert.Equal(t, StateOpen, rcb.State(ctx)) } -func TestRedisCircuitBreakerExecution(t *testing.T) { +func TestDistributedCircuitBreakerExecution(t *testing.T) { rcb, mr, _ := setupTestWithMiniredis() defer mr.Close() @@ -150,7 +150,7 @@ func TestRedisCircuitBreakerExecution(t *testing.T) { assert.Equal(t, "test error", err.Error()) } -func TestRedisCircuitBreakerCounts(t *testing.T) { +func TestDistributedCircuitBreakerCounts(t *testing.T) { rcb, mr, _ := setupTestWithMiniredis() defer mr.Close() @@ -168,26 +168,26 @@ func TestRedisCircuitBreakerCounts(t *testing.T) { assert.Equal(t, Counts{6, 5, 1, 0, 1}, state.Counts) } -func TestRedisCircuitBreakerFallback(t *testing.T) { +func TestDistributedCircuitBreakerFallback(t *testing.T) { rcb, mr, _ := setupTestWithMiniredis() defer mr.Close() ctx := context.Background() - // Test when Redis is unavailable - mr.Close() // Simulate Redis being unavailable + // Test when Storage is unavailable + mr.Close() // Simulate Storage being unavailable rcb.cacheClient = nil state := rcb.State(ctx) - assert.Equal(t, StateClosed, state, "Should fallback to in-memory state when Redis is unavailable") + assert.Equal(t, StateClosed, state, "Should fallback to in-memory state when Storage is unavailable") - // Ensure operations still work without Redis + // Ensure operations still work without Storage assert.Nil(t, successRequest(ctx, rcb)) assert.Nil(t, failRequest(ctx, rcb)) } -func TestCustomRedisCircuitBreaker(t *testing.T) { +func TestCustomDistributedCircuitBreaker(t *testing.T) { mr, err := miniredis.Run() if err != nil { panic(err) @@ -240,7 +240,7 @@ func TestCustomRedisCircuitBreaker(t *testing.T) { assert.Equal(t, Counts{11, 6, 5, 1, 0}, state.Counts) // Simulate time passing to reset counts - pseudoSleepRedis(ctx, customRCB, time.Second*30) + pseudoSleepStorage(ctx, customRCB, time.Second*30) // Perform requests to trigger StateOpen assert.NoError(t, successRequest(ctx, customRCB)) @@ -257,7 +257,7 @@ func TestCustomRedisCircuitBreaker(t *testing.T) { t.Run("Timeout and Half-Open State", func(t *testing.T) { // Simulate timeout to transition to half-open state - pseudoSleepRedis(ctx, customRCB, time.Second*90) + pseudoSleepStorage(ctx, customRCB, time.Second*90) assert.Equal(t, StateHalfOpen, customRCB.State(ctx)) // Successful requests in half-open state should close the circuit @@ -268,7 +268,7 @@ func TestCustomRedisCircuitBreaker(t *testing.T) { }) } -func TestCustomRedisCircuitBreakerStateTransitions(t *testing.T) { +func TestCustomDistributedCircuitBreakerStateTransitions(t *testing.T) { // Setup var stateChange StateChange customSt := Settings{ @@ -321,7 +321,7 @@ func TestCustomRedisCircuitBreakerStateTransitions(t *testing.T) { assert.Equal(t, ErrOpenState, err) // Simulate timeout to transition to Half-Open - pseudoSleepRedis(ctx, cb, 6*time.Second) + pseudoSleepStorage(ctx, cb, 6*time.Second) assert.Equal(t, StateHalfOpen, cb.State(ctx)) assert.Equal(t, StateChange{"cb", StateOpen, StateHalfOpen}, stateChange) From 69ff782a40f5ee68b73985b978f28d15ea8b61dc Mon Sep 17 00:00:00 2001 From: Kalpit Pant Date: Tue, 5 Nov 2024 22:48:25 +0530 Subject: [PATCH 14/14] Refactor --- v2/distributed_circuit_breaker.go | 8 ++++---- v2/distributed_circuit_breaker_test.go | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/v2/distributed_circuit_breaker.go b/v2/distributed_circuit_breaker.go index 66619cf..f3b6472 100644 --- a/v2/distributed_circuit_breaker.go +++ b/v2/distributed_circuit_breaker.go @@ -8,8 +8,8 @@ import ( ) type CacheClient interface { - Get(ctx context.Context, key string) ([]byte, error) - Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error + GetState(ctx context.Context, key string) ([]byte, error) + SetState(ctx context.Context, key string, value interface{}, expiration time.Duration) error } // DistributedCircuitBreaker extends CircuitBreaker with distributed state storage @@ -225,7 +225,7 @@ func (rcb *DistributedCircuitBreaker[T]) getStorageKey() string { func (rcb *DistributedCircuitBreaker[T]) getStoredState(ctx context.Context) (StoredState, error) { var state StoredState - data, err := rcb.cacheClient.Get(ctx, rcb.getStorageKey()) + data, err := rcb.cacheClient.GetState(ctx, rcb.getStorageKey()) if len(data) == 0 { // Key doesn't exist, return default state return StoredState{State: StateClosed}, nil @@ -243,5 +243,5 @@ func (rcb *DistributedCircuitBreaker[T]) setStoredState(ctx context.Context, sta return err } - return rcb.cacheClient.Set(ctx, rcb.getStorageKey(), data, 0) + return rcb.cacheClient.SetState(ctx, rcb.getStorageKey(), data, 0) } diff --git a/v2/distributed_circuit_breaker_test.go b/v2/distributed_circuit_breaker_test.go index 7666b7b..6371d3d 100644 --- a/v2/distributed_circuit_breaker_test.go +++ b/v2/distributed_circuit_breaker_test.go @@ -18,11 +18,11 @@ type storageAdapter struct { client *redis.Client } -func (r *storageAdapter) Get(ctx context.Context, key string) ([]byte, error) { +func (r *storageAdapter) GetState(ctx context.Context, key string) ([]byte, error) { return r.client.Get(ctx, key).Bytes() } -func (r *storageAdapter) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error { +func (r *storageAdapter) SetState(ctx context.Context, key string, value interface{}, expiration time.Duration) error { return r.client.Set(ctx, key, value, expiration).Err() }