Skip to content

Commit

Permalink
fix: cleaned up and fixed various bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
gene-redpanda committed Jul 9, 2024
1 parent fb79c10 commit 7fbf1b8
Show file tree
Hide file tree
Showing 3 changed files with 239 additions and 99 deletions.
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ unit_tests:
@DEBUG=true \
REDPANDA_CLIENT_ID="dummy_id" \
REDPANDA_CLIENT_SECRET="dummy_secret" \
TF_ACC=false \
RUN_CLUSTER_TESTS=false \
$(GOCMD) test -short ./...

Expand Down
118 changes: 77 additions & 41 deletions redpanda/cloud/ratelimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
)

const (
limitPeriod = 60.0
burstPeriod = 10.0
limitPeriod = 60.0 // api resets rate limit at 60s
burstPeriod = 10.0 // allows bursts of up to 50 requests to reduce hammering of the api
)

type rateLimiter struct {
Expand All @@ -28,67 +28,103 @@ func newRateLimiter(limit int) *rateLimiter {
}
}

// parseRateLimit expects a header as defined in https://datatracker.ietf.org/doc/html/draft-ietf-httpapi-ratelimit-headers
func parseRateLimit(header string) (limit, remaining int, reset time.Duration, err error) {
for _, part := range strings.Split(header, ",") {
kv := strings.SplitN(strings.TrimSpace(part), "=", 2)
if len(kv) != 2 {
parts := strings.Split(header, ",")
if len(parts) != 3 {
return 0, 0, 0, fmt.Errorf("invalid rate limit header: %s", header) // we expect limit, remaining and reset
}

var limitSet, remainingSet, resetSet bool
for _, part := range parts {
kv := strings.Split(part, "=")
if len(kv) != 2 { // invalid header contents skip example "limit=1=remaining"
continue
}
switch kv[0] {

key, value := strings.TrimSpace(kv[0]), strings.TrimSpace(kv[1])
intValue, err := strconv.Atoi(value)
if err != nil {
return 0, 0, 0, fmt.Errorf("invalid %s value: %v", key, err)
}

switch key {
case "limit":
limit, err = strconv.Atoi(kv[1])
if err != nil {
return 0, 0, 0, fmt.Errorf("invalid limit value: %v", err)
}
limit = intValue
limitSet = true
case "remaining":
remaining, err = strconv.Atoi(kv[1])
if err != nil {
return 0, 0, 0, fmt.Errorf("invalid remaining value: %v", err)
}
remaining = intValue
remainingSet = true
case "reset":
seconds, err := strconv.Atoi(kv[1])
if err != nil {
return 0, 0, 0, fmt.Errorf("invalid reset value: %v", err)
}
reset = time.Duration(seconds) * time.Second
reset = time.Duration(intValue) * time.Second
resetSet = true
}
}

if !limitSet || !remainingSet || !resetSet {
return 0, 0, 0, fmt.Errorf("missing required rate limit information: %s", header)
}

return limit, remaining, reset, nil
}

// Limiter is a grpc.UnaryClientInterceptor that updates the rate limiter based on the rate limit headers returned by the server
// malformed or otherwise incorrect headers are discarded with errors logged but non-halting
// messages without rate limits are considered valid
func (r *rateLimiter) Limiter(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
var header metadata.MD
if err := invoker(ctx, method, req, reply, cc, append(opts, grpc.Header(&header))...); err != nil {
return err
}

if rateLimitHeader := header.Get("ratelimit"); len(rateLimitHeader) > 0 {
tflog.Debug(ctx, "parsing rate limit headers")
limit, remaining, reset, err := parseRateLimit(rateLimitHeader[0])
if err != nil {
return fmt.Errorf("failed to parse rate limit header: %v", err)
}
rateLimitHeader := header.Get("ratelimit")
if len(rateLimitHeader) == 0 {
return nil // no rate limit headers
}

tflog.Debug(ctx, "setting limit and burst")
r.limiter.SetLimit(rate.Limit(limit / limitPeriod))
r.limiter.SetBurst(limit / burstPeriod)
if remaining == 0 && reset > 0 {
tflog.Warn(ctx, "rate limit exceeded", map[string]any{
"limit": limit,
"remaining": remaining,
"reset": reset,
})
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(reset + 1*time.Second):
}
}
tflog.Debug(ctx, "rate limit updated", map[string]any{
tflog.Debug(ctx, "parsing rate limit headers", map[string]any{
"header": rateLimitHeader[0],
})
limit, remaining, reset, err := parseRateLimit(rateLimitHeader[0])
if err != nil {
// if the parser returns an error we log it but otherwise treat it the same as not having a ratelimit header
tflog.Warn(ctx, "failed to parse rate limit header", map[string]any{
"error": err.Error(),
"header": rateLimitHeader[0],
})
// lint:ignore nilerr logging and not returning error as it is not fail worthy
return nil
}

newLimit := rate.Limit(limit / limitPeriod)
newBurst := limit / burstPeriod

if r.limiter.Limit() != newLimit || r.limiter.Burst() != newBurst {
tflog.Debug(ctx, "updating rate limiter", map[string]any{
"new_limit": newLimit,
"new_burst": newBurst,
})
r.limiter.SetLimit(newLimit)
r.limiter.SetBurst(newBurst)
}

if remaining == 0 && reset > 0 {
tflog.Warn(ctx, "rate limit exceeded", map[string]any{
"limit": limit,
"remaining": remaining,
"reset": reset,
})
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(reset + 1*time.Second):
}
}

tflog.Debug(ctx, "rate limit updated", map[string]any{
"limit": limit,
"remaining": remaining,
"reset": reset,
})
return r.limiter.Wait(ctx)
}
Loading

0 comments on commit 7fbf1b8

Please sign in to comment.