Skip to content

Commit

Permalink
Fallback on all Redis errors (#16)
Browse files Browse the repository at this point in the history
* Fallback on all Redis errors

* Benchmark: Fail on test error (tee ate the pipe error)

* Disable fallback on benchmark test

* Tweak redis pool settings for benchmark

* Chill the defaults, we don't need that many connections

* Benchmark: Close redis client on each test iteration (-count=10)

* Update Redis in CI, lower #no of requests

* Remove benchmark from CI, as Github Actions limit us too much

#16 (comment)
  • Loading branch information
VojtechVitek authored Aug 8, 2024
1 parent 487f95a commit 1ab0ec2
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 100 deletions.
57 changes: 0 additions & 57 deletions .github/workflows/benchmark.yml

This file was deleted.

2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:

services:
redis:
image: redis:6
image: redis:7
ports:
- 6379:6379

Expand Down
6 changes: 3 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type Config struct {
// Timeout for each Redis command after which we fall back to a local
// in-memory counter. If Redis does not respond within this duration,
// the system will use the local counter unless it is explicitly disabled.
FallbackTimeout time.Duration `toml:"fallback_timeout"` // default: 50ms
FallbackTimeout time.Duration `toml:"fallback_timeout"` // default: 100ms

// Client if supplied will be used and the below fields will be ignored.
//
Expand All @@ -31,6 +31,6 @@ type Config struct {
Port uint16 `toml:"port"`
Password string `toml:"password"` // optional
DBIndex int `toml:"db_index"` // default: 0
MaxIdle int `toml:"max_idle"` // default: 4
MaxActive int `toml:"max_active"` // default: 8
MaxIdle int `toml:"max_idle"` // default: 5
MaxActive int `toml:"max_active"` // default: 10
}
56 changes: 29 additions & 27 deletions httprateredis.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ package httprateredis

import (
"context"
"errors"
"fmt"
"net"
"os"
"path/filepath"
"strconv"
Expand Down Expand Up @@ -40,8 +38,13 @@ func NewRedisLimitCounter(cfg *Config) (*redisCounter, error) {
cfg.PrefixKey = "httprate"
}
if cfg.FallbackTimeout == 0 {
// Activate local in-memory fallback fairly quickly, as this would slow down all requests.
cfg.FallbackTimeout = 100 * time.Millisecond
if cfg.FallbackDisabled {
cfg.FallbackTimeout = time.Second
} else {
// Activate local in-memory fallback fairly quickly,
// so we don't slow down incoming requests too much.
cfg.FallbackTimeout = 100 * time.Millisecond
}
}

rc := &redisCounter{
Expand All @@ -54,10 +57,10 @@ func NewRedisLimitCounter(cfg *Config) (*redisCounter, error) {
if cfg.Client == nil {
maxIdle, maxActive := cfg.MaxIdle, cfg.MaxActive
if maxIdle < 1 {
maxIdle = 20
maxIdle = 5
}
if maxActive < 1 {
maxActive = 50
maxActive = 10
}

rc.client = redis.NewClient(&redis.Options{
Expand Down Expand Up @@ -107,13 +110,8 @@ func (c *redisCounter) IncrementBy(key string, currentWindow time.Time, amount i
return c.fallbackCounter.IncrementBy(key, currentWindow, amount)
}
defer func() {
if err != nil {
// On redis network error, fallback to local in-memory counter.
var netErr net.Error
if errors.As(err, &netErr) || errors.Is(err, redis.ErrClosed) {
c.fallback()
err = c.fallbackCounter.IncrementBy(key, currentWindow, amount)
}
if c.shouldFallback(err) {
err = c.fallbackCounter.IncrementBy(key, currentWindow, amount)
}
}()
}
Expand Down Expand Up @@ -147,13 +145,8 @@ func (c *redisCounter) Get(key string, currentWindow, previousWindow time.Time)
return c.fallbackCounter.Get(key, currentWindow, previousWindow)
}
defer func() {
if err != nil {
// On redis network error, fallback to local in-memory counter.
var netErr net.Error
if errors.As(err, &netErr) || errors.Is(err, redis.ErrClosed) {
c.fallback()
curr, prev, err = c.fallbackCounter.Get(key, currentWindow, previousWindow)
}
if c.shouldFallback(err) {
curr, prev, err = c.fallbackCounter.Get(key, currentWindow, previousWindow)
}
}()
}
Expand Down Expand Up @@ -189,25 +182,34 @@ func (c *redisCounter) IsFallbackActivated() bool {
return c.fallbackActivated.Load()
}

func (c *redisCounter) fallback() {
// Activate the in-memory counter fallback, unless activated by some other goroutine.
fallbackAlreadyActivated := c.fallbackActivated.Swap(true)
if fallbackAlreadyActivated {
return
func (c *redisCounter) Close() error {
return c.client.Close()
}

func (c *redisCounter) shouldFallback(err error) bool {
if err == nil {
return false
}

// Activate the local in-memory counter fallback, unless activated by some other goroutine.
alreadyActivated := c.fallbackActivated.Swap(true)
if !alreadyActivated {
go c.reconnect()
}

go c.reconnect()
return true
}

func (c *redisCounter) reconnect() {
// Try to re-connect to redis every 200ms.
for {
time.Sleep(200 * time.Millisecond)

err := c.client.Ping(context.Background()).Err()
if err == nil {
c.fallbackActivated.Store(false)
return
}
time.Sleep(200 * time.Millisecond)
}
}

Expand Down
30 changes: 18 additions & 12 deletions httprateredis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ func TestRedisCounter(t *testing.T) {
limitCounter, err := httprateredis.NewRedisLimitCounter(&httprateredis.Config{
Host: "localhost",
Port: 6379,
MaxIdle: 100,
MaxActive: 200,
MaxIdle: 0,
MaxActive: 2,
DBIndex: 0,
ClientName: "httprateredis_test",
PrefixKey: fmt.Sprintf("httprate:test:%v", rand.Int31n(100000)), // Unique Redis key for each test
Expand All @@ -26,6 +26,7 @@ func TestRedisCounter(t *testing.T) {
if err != nil {
t.Fatalf("redis not available: %v", err)
}
defer limitCounter.Close()

limitCounter.Config(1000, time.Minute)

Expand Down Expand Up @@ -156,23 +157,28 @@ func TestRedisCounter(t *testing.T) {

func BenchmarkLocalCounter(b *testing.B) {
limitCounter, err := httprateredis.NewRedisLimitCounter(&httprateredis.Config{
Host: "localhost",
Port: 6379,
MaxIdle: 500,
MaxActive: 500,
DBIndex: 0,
ClientName: "httprateredis_test",
PrefixKey: fmt.Sprintf("httprate:test:%v", rand.Int31n(100000)), // Unique key for each test
Host: "localhost",
Port: 6379,
DBIndex: 0,
ClientName: "httprateredis_test",
PrefixKey: fmt.Sprintf("httprate:test:%v", rand.Int31n(100000)), // Unique key for each test
MaxActive: 10,
MaxIdle: 0,
FallbackDisabled: true,
FallbackTimeout: 5 * time.Second,
})
if err != nil {
b.Fatalf("redis not available: %v", err)
}
defer limitCounter.Close()

limitCounter.Config(1000, time.Minute)

currentWindow := time.Now().UTC().Truncate(time.Minute)
previousWindow := currentWindow.Add(-time.Minute)

concurrentRequests := 100

b.ResetTimer()

for i := 0; i < b.N; i++ {
Expand All @@ -182,14 +188,14 @@ func BenchmarkLocalCounter(b *testing.B) {
previousWindow.Add(time.Duration(i) * time.Minute)

wg := sync.WaitGroup{}
wg.Add(1000)
for i := 0; i < 1000; i++ {
wg.Add(concurrentRequests)
for i := 0; i < concurrentRequests; i++ {
// Simulate concurrent requests with different rate-limit keys.
go func(i int) {
defer wg.Done()

_, _, _ = limitCounter.Get(fmt.Sprintf("key:%v", i), currentWindow, previousWindow)
_ = limitCounter.IncrementBy(fmt.Sprintf("key:%v", i), currentWindow, rand.Intn(100))
_ = limitCounter.IncrementBy(fmt.Sprintf("key:%v", i), currentWindow, rand.Intn(20))
}(i)
}
wg.Wait()
Expand Down

0 comments on commit 1ab0ec2

Please sign in to comment.