Skip to content

Commit

Permalink
Merge pull request #660 from ably/fix/rest-host-cache
Browse files Browse the repository at this point in the history
Fix/rest host cache
  • Loading branch information
sacOO7 authored Aug 16, 2024
2 parents 4f29522 + 4ae44b4 commit 26dbcf1
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 76 deletions.
4 changes: 2 additions & 2 deletions ably/auth_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,8 @@ func TestRequestToken(t *testing.T) {
auth: &Auth{
clientID: "aClientID",
client: &REST{
successFallbackHost: &fallbackCache{},
log: logger{l: &stdLogger{mocklogger}},
hostCache: &hostCache{},
log: logger{l: &stdLogger{mocklogger}},
opts: &clientOptions{authOptions: authOptions{
AuthURL: "foo.com",
Key: "abc:def",
Expand Down
6 changes: 1 addition & 5 deletions ably/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,8 @@ func (a *Auth) SetServerTimeFunc(st func() (time.Time, error)) {
a.serverTimeHandler = st
}

func (c *REST) SetSuccessFallbackHost(duration time.Duration) {
c.successFallbackHost = &fallbackCache{duration: duration}
}

func (c *REST) GetCachedFallbackHost() string {
return c.successFallbackHost.get()
return c.hostCache.get()
}

func (c *RealtimeChannel) GetChannelSerial() string {
Expand Down
101 changes: 32 additions & 69 deletions ably/rest_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ type REST struct {
//Channels is a [ably.RESTChannels] object (RSN1).
Channels *RESTChannels

opts *clientOptions
successFallbackHost *fallbackCache
log logger
opts *clientOptions
hostCache *hostCache
log logger
}

// NewREST construct a RestClient object using an [ably.ClientOption] object to configure
Expand All @@ -158,7 +158,7 @@ func NewREST(options ...ClientOption) (*REST, error) {
chans: make(map[string]*RESTChannel),
client: c,
}
c.successFallbackHost = &fallbackCache{
c.hostCache = &hostCache{
duration: c.opts.fallbackRetryTimeout(),
}
return c, nil
Expand Down Expand Up @@ -634,75 +634,12 @@ func (c *REST) do(ctx context.Context, r *request) (*http.Response, error) {
return c.doWithHandle(ctx, r, c.handleResponse)
}

// fallbackCache caches a successful fallback host for 10 minutes.
type fallbackCache struct {
running bool
host string
duration time.Duration
cancel func()
mu sync.RWMutex
}

func (f *fallbackCache) get() string {
if f.isRunning() {
f.mu.RLock()
h := f.host
f.mu.RUnlock()
return h
}
return ""
}

func (f *fallbackCache) isRunning() bool {
f.mu.RLock()
v := f.running
f.mu.RUnlock()
return v
}

func (f *fallbackCache) run(host string) {
f.mu.Lock()
now := time.Now()
duration := defaultOptions.FallbackRetryTimeout // spec RSC15f
if f.duration != 0 {
duration = f.duration
}
ctx, cancel := context.WithDeadline(context.Background(), now.Add(duration))
f.running = true
f.host = host
f.cancel = cancel
f.mu.Unlock()
<-ctx.Done()
f.mu.Lock()
f.running = false
f.mu.Unlock()
}

func (f *fallbackCache) stop() {
f.cancel()
// we make sure we have stopped
for {
if !f.isRunning() {
return
}
}
}

func (f *fallbackCache) put(host string) {
if f.get() != host {
if f.isRunning() {
f.stop()
}
go f.run(host)
}
}

func (c *REST) doWithHandle(ctx context.Context, r *request, handle func(*http.Response, interface{}) (*http.Response, error)) (*http.Response, error) {
req, err := c.newHTTPRequest(ctx, r)
if err != nil {
return nil, err
}
if h := c.successFallbackHost.get(); h != "" {
if h := c.hostCache.get(); h != "" {
req.URL.Host = h // RSC15f
c.log.Verbosef("RestClient: setting URL.Host=%q", h)
}
Expand Down Expand Up @@ -775,7 +712,7 @@ func (c *REST) doWithHandle(ctx context.Context, r *request, handle func(*http.R
}
return nil, err
}
c.successFallbackHost.put(h)
c.hostCache.put(h)
return resp, nil
}
}
Expand Down Expand Up @@ -931,3 +868,29 @@ func decodeResp(resp *http.Response, out interface{}) error {

return decode(typ, bytes.NewReader(b), out)
}

// hostCache caches a successful fallback host for 10 minutes.
// Only used by REST client while making requests RSC15f
type hostCache struct {
duration time.Duration

sync.RWMutex
deadline time.Time
host string
}

func (c *hostCache) put(host string) {
c.Lock()
defer c.Unlock()
c.host = host
c.deadline = time.Now().Add(c.duration)
}

func (c *hostCache) get() string {
c.RLock()
defer c.RUnlock()
if ablyutil.Empty(c.host) || time.Until(c.deadline) <= 0 {
return ""
}
return c.host
}

0 comments on commit 26dbcf1

Please sign in to comment.