-
Notifications
You must be signed in to change notification settings - Fork 14
/
retry_handler.go
198 lines (175 loc) · 6.99 KB
/
retry_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
package nethttplibrary
import (
"context"
"fmt"
"io"
"math"
nethttp "net/http"
"strconv"
"time"
abs "github.com/microsoft/kiota-abstractions-go"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// RetryHandler handles transient HTTP responses and retries the request given the retry options
type RetryHandler struct {
// default options to use when evaluating the response
options RetryHandlerOptions
}
// NewRetryHandler creates a new RetryHandler with default options
func NewRetryHandler() *RetryHandler {
return NewRetryHandlerWithOptions(RetryHandlerOptions{
ShouldRetry: func(delay time.Duration, executionCount int, request *nethttp.Request, response *nethttp.Response) bool {
return true
},
})
}
// NewRetryHandlerWithOptions creates a new RetryHandler with the given options
func NewRetryHandlerWithOptions(options RetryHandlerOptions) *RetryHandler {
return &RetryHandler{options: options}
}
const defaultMaxRetries = 3
const absoluteMaxRetries = 10
const defaultDelaySeconds = 3
const absoluteMaxDelaySeconds = 180
// RetryHandlerOptions to apply when evaluating the response for retrial
type RetryHandlerOptions struct {
// Callback to determine if the request should be retried
ShouldRetry func(delay time.Duration, executionCount int, request *nethttp.Request, response *nethttp.Response) bool
// The maximum number of times a request can be retried
MaxRetries int
// The delay in seconds between retries
DelaySeconds int
}
type retryHandlerOptionsInt interface {
abs.RequestOption
GetShouldRetry() func(delay time.Duration, executionCount int, request *nethttp.Request, response *nethttp.Response) bool
GetDelaySeconds() int
GetMaxRetries() int
}
var retryKeyValue = abs.RequestOptionKey{
Key: "RetryHandler",
}
// GetKey returns the key value to be used when the option is added to the request context
func (options *RetryHandlerOptions) GetKey() abs.RequestOptionKey {
return retryKeyValue
}
// GetShouldRetry returns the should retry callback function which evaluates the response for retrial
func (options *RetryHandlerOptions) GetShouldRetry() func(delay time.Duration, executionCount int, request *nethttp.Request, response *nethttp.Response) bool {
return options.ShouldRetry
}
// GetDelaySeconds returns the delays in seconds between retries
func (options *RetryHandlerOptions) GetDelaySeconds() int {
if options.DelaySeconds < 1 {
return defaultDelaySeconds
} else if options.DelaySeconds > absoluteMaxDelaySeconds {
return absoluteMaxDelaySeconds
} else {
return options.DelaySeconds
}
}
// GetMaxRetries returns the maximum number of times a request can be retried
func (options *RetryHandlerOptions) GetMaxRetries() int {
if options.MaxRetries < 1 {
return defaultMaxRetries
} else if options.MaxRetries > absoluteMaxRetries {
return absoluteMaxRetries
} else {
return options.MaxRetries
}
}
const retryAttemptHeader = "Retry-Attempt"
const retryAfterHeader = "Retry-After"
const tooManyRequests = 429
const serviceUnavailable = 503
const gatewayTimeout = 504
// Intercept implements the interface and evaluates whether to retry a failed request.
func (middleware RetryHandler) Intercept(pipeline Pipeline, middlewareIndex int, req *nethttp.Request) (*nethttp.Response, error) {
obsOptions := GetObservabilityOptionsFromRequest(req)
ctx := req.Context()
var span trace.Span
var observabilityName string
if obsOptions != nil {
observabilityName = obsOptions.GetTracerInstrumentationName()
ctx, span = otel.GetTracerProvider().Tracer(observabilityName).Start(ctx, "RetryHandler_Intercept")
span.SetAttributes(attribute.Bool("com.microsoft.kiota.handler.retry.enable", true))
defer span.End()
req = req.WithContext(ctx)
}
response, err := pipeline.Next(req, middlewareIndex)
if err != nil {
return response, err
}
reqOption, ok := req.Context().Value(retryKeyValue).(retryHandlerOptionsInt)
if !ok {
reqOption = &middleware.options
}
return middleware.retryRequest(ctx, pipeline, middlewareIndex, reqOption, req, response, 0, 0, observabilityName)
}
func (middleware RetryHandler) retryRequest(ctx context.Context, pipeline Pipeline, middlewareIndex int, options retryHandlerOptionsInt, req *nethttp.Request, resp *nethttp.Response, executionCount int, cumulativeDelay time.Duration, observabilityName string) (*nethttp.Response, error) {
if middleware.isRetriableErrorCode(resp.StatusCode) &&
middleware.isRetriableRequest(req) &&
executionCount < options.GetMaxRetries() &&
cumulativeDelay < time.Duration(absoluteMaxDelaySeconds)*time.Second &&
options.GetShouldRetry()(cumulativeDelay, executionCount, req, resp) {
executionCount++
delay := middleware.getRetryDelay(req, resp, options, executionCount)
cumulativeDelay += delay
req.Header.Set(retryAttemptHeader, strconv.Itoa(executionCount))
if req.Body != nil {
s, ok := req.Body.(io.Seeker)
if ok {
s.Seek(0, io.SeekStart)
}
}
if observabilityName != "" {
ctx, span := otel.GetTracerProvider().Tracer(observabilityName).Start(ctx, "RetryHandler_Intercept - attempt "+fmt.Sprint(executionCount))
span.SetAttributes(attribute.Int("http.request.resend_count", executionCount),
attribute.Int("http.status_code", resp.StatusCode),
attribute.Float64("http.request.resend_delay", delay.Seconds()),
)
defer span.End()
req = req.WithContext(ctx)
}
t := time.NewTimer(delay)
select {
case <-ctx.Done():
// Return without retrying if the context was cancelled.
return nil, ctx.Err()
// Leaving this case empty causes it to exit the switch-block.
case <-t.C:
}
response, err := pipeline.Next(req, middlewareIndex)
if err != nil {
return response, err
}
return middleware.retryRequest(ctx, pipeline, middlewareIndex, options, req, response, executionCount, cumulativeDelay, observabilityName)
}
return resp, nil
}
func (middleware RetryHandler) isRetriableErrorCode(code int) bool {
return code == tooManyRequests || code == serviceUnavailable || code == gatewayTimeout
}
func (middleware RetryHandler) isRetriableRequest(req *nethttp.Request) bool {
isBodiedMethod := req.Method == "POST" || req.Method == "PUT" || req.Method == "PATCH"
if isBodiedMethod && req.Body != nil {
return req.ContentLength != -1
}
return true
}
func (middleware RetryHandler) getRetryDelay(req *nethttp.Request, resp *nethttp.Response, options retryHandlerOptionsInt, executionCount int) time.Duration {
retryAfter := resp.Header.Get(retryAfterHeader)
if retryAfter != "" {
retryAfterDelay, err := strconv.ParseFloat(retryAfter, 64)
if err == nil {
return time.Duration(retryAfterDelay) * time.Second
}
// parse the header if it's a date
t, err := time.Parse(time.RFC1123, retryAfter)
if err == nil {
return t.Sub(time.Now())
}
}
return time.Duration(math.Pow(float64(options.GetDelaySeconds()), float64(executionCount))) * time.Second
}