From f0c9b1c6a7c4623acff5f6e22446248a6dff001a Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Mon, 5 Aug 2024 18:03:02 +0530 Subject: [PATCH 01/29] Added separate struct for realtimeHosts and hostcache to handle realtime host fallbacks --- ably/hosts.go | 97 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 ably/hosts.go diff --git a/ably/hosts.go b/ably/hosts.go new file mode 100644 index 00000000..8d21db65 --- /dev/null +++ b/ably/hosts.go @@ -0,0 +1,97 @@ +package ably + +import ( + "sync" + "time" + + "github.com/ably/ably-go/ably/internal/ablyutil" +) + +// realtimeHosts(RTN17) is used to retrieve realtime primaryHost/fallbackHosts +type realtimeHosts struct { + opts *clientOptions + visitedHosts ablyutil.HashSet + sync.Mutex +} + +func newRealtimeHosts(opts *clientOptions) *realtimeHosts { + return &realtimeHosts{ + opts: opts, + visitedHosts: ablyutil.NewHashSet(), + } +} + +func (realtimeHosts *realtimeHosts) getPrimaryHost() string { + return realtimeHosts.opts.getRealtimeHost() +} + +func (realtimeHosts *realtimeHosts) nextFallbackHost() string { + realtimeHosts.Lock() + defer realtimeHosts.Unlock() + + getNonVisitedHost := func() string { + visitedHosts := realtimeHosts.visitedHosts + hosts, _ := realtimeHosts.opts.getFallbackHosts() + shuffledFallbackHosts := ablyutil.Shuffle(hosts) + for _, host := range shuffledFallbackHosts { + if !visitedHosts.Has(host) { + return host + } + } + return "" + } + + nonVisitedHost := getNonVisitedHost() + if !ablyutil.Empty(nonVisitedHost) { + realtimeHosts.visitedHosts.Add(nonVisitedHost) + } + return nonVisitedHost +} + +func (realtimeHosts *realtimeHosts) resetVisitedFallbackHosts() { + realtimeHosts.Lock() + defer realtimeHosts.Unlock() + realtimeHosts.visitedHosts = ablyutil.NewHashSet() +} + +func (realtimeHosts *realtimeHosts) fallbackHostsRemaining() int { + realtimeHosts.Lock() + defer realtimeHosts.Unlock() + hosts, _ := realtimeHosts.opts.getFallbackHosts() + return len(hosts) + 1 - len(realtimeHosts.visitedHosts) +} + +// getPreferredHost - Used to retrieve primary realtime host +func (realtimeHosts *realtimeHosts) getPreferredHost() string { + realtimeHosts.Lock() + defer realtimeHosts.Unlock() + host := realtimeHosts.getPrimaryHost() // primary host is always preferred host/ fallback host in realtime + realtimeHosts.visitedHosts.Add(host) + return host +} + +// hostCache caches a successful fallback host for 10 minutes. +// Only used by REST client while making requests RSC15f +type hostCache struct { + duration time.Duration + + sync.RWMutex + deadline time.Time + host string +} + +func (c *hostCache) put(host string) { + c.Lock() + defer c.Unlock() + c.host = host + c.deadline = time.Now().Add(c.duration) +} + +func (c *hostCache) get() string { + c.RLock() + defer c.RUnlock() + if ablyutil.Empty(c.host) || time.Until(c.deadline) <= 0 { + return "" + } + return c.host +} From afe392aed7fc7655d2b59f62f6e32d65f5e48a34 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Mon, 5 Aug 2024 18:03:43 +0530 Subject: [PATCH 02/29] Added code to options to check for active internet connection --- ably/options.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/ably/options.go b/ably/options.go index 5e22a377..2a06a0c2 100644 --- a/ably/options.go +++ b/ably/options.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "io/ioutil" "log" "net" "net/http" @@ -28,6 +29,9 @@ const ( Port = 80 TLSPort = 443 maxMessageSize = 65536 // 64kb, default value TO3l8 + + internetCheckUrl = "https://internet-up.ably-realtime.com/is-the-internet-up.txt" + internetCheckOk = "yes\n" ) var defaultOptions = clientOptions{ @@ -595,6 +599,19 @@ func (opts *clientOptions) idempotentRESTPublishing() bool { return opts.IdempotentRESTPublishing } +func (opts *clientOptions) hasActiveInternetConnection() bool { + res, err := opts.httpclient().Get(internetCheckUrl) + if err != nil { + return false + } + data, err := ioutil.ReadAll(res.Body) + res.Body.Close() + if err != nil { + return false + } + return string(data) == internetCheckOk +} + type ScopeParams struct { Start time.Time End time.Time From db45ce8ffff26c4d2f3ce8a76b681fcba6338fc5 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Tue, 6 Aug 2024 17:41:10 +0530 Subject: [PATCH 03/29] Removed REST specific (hostcache) implementation from hosts.go --- ably/hosts.go | 27 --------------------------- 1 file changed, 27 deletions(-) diff --git a/ably/hosts.go b/ably/hosts.go index 8d21db65..24704c1a 100644 --- a/ably/hosts.go +++ b/ably/hosts.go @@ -2,7 +2,6 @@ package ably import ( "sync" - "time" "github.com/ably/ably-go/ably/internal/ablyutil" ) @@ -69,29 +68,3 @@ func (realtimeHosts *realtimeHosts) getPreferredHost() string { realtimeHosts.visitedHosts.Add(host) return host } - -// hostCache caches a successful fallback host for 10 minutes. -// Only used by REST client while making requests RSC15f -type hostCache struct { - duration time.Duration - - sync.RWMutex - deadline time.Time - host string -} - -func (c *hostCache) put(host string) { - c.Lock() - defer c.Unlock() - c.host = host - c.deadline = time.Now().Add(c.duration) -} - -func (c *hostCache) get() string { - c.RLock() - defer c.RUnlock() - if ablyutil.Empty(c.host) || time.Until(c.deadline) <= 0 { - return "" - } - return c.host -} From e04ed9d62f13268762deddfe18af1b9a9fe5793e Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Tue, 6 Aug 2024 17:42:56 +0530 Subject: [PATCH 04/29] Updated export test methods for realtimeHost --- ably/export_test.go | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/ably/export_test.go b/ably/export_test.go index 7fcbd64e..7dc4c0ef 100644 --- a/ably/export_test.go +++ b/ably/export_test.go @@ -6,12 +6,46 @@ import ( "net/http/httptrace" "net/url" "time" + + "github.com/ably/ably-go/ably/internal/ablyutil" ) func NewClientOptions(os ...ClientOption) *clientOptions { return applyOptionsWithDefaults(os...) } +func NewRealtimeHosts(opts *clientOptions) *realtimeHosts { + return newRealtimeHosts(opts) +} + +func (realtimeHosts *realtimeHosts) NextFallbackHost() string { + return realtimeHosts.nextFallbackHost() +} + +func (realtimeHosts *realtimeHosts) GetAllRemainingFallbackHosts() []string { + var hosts []string + for true { + fallbackHost := realtimeHosts.NextFallbackHost() + if ablyutil.Empty(fallbackHost) { + break + } + hosts = append(hosts, fallbackHost) + } + return hosts +} + +func (realtimeHosts *realtimeHosts) ResetVisitedFallbackHosts() { + realtimeHosts.resetVisitedFallbackHosts() +} + +func (realtimeHosts *realtimeHosts) FallbackHostsRemaining() int { + return realtimeHosts.fallbackHostsRemaining() +} + +func (realtimeHosts *realtimeHosts) GetPreferredHost() string { + return realtimeHosts.getPreferredHost() +} + func GetEnvFallbackHosts(env string) []string { return getEnvFallbackHosts(env) } From 9e25796c482c7347ed2c6702b6c8932c3626de04 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Tue, 6 Aug 2024 17:43:21 +0530 Subject: [PATCH 05/29] Added unit tests for realtime host fallbacks --- ably/hosts_test.go | 120 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 120 insertions(+) create mode 100644 ably/hosts_test.go diff --git a/ably/hosts_test.go b/ably/hosts_test.go new file mode 100644 index 00000000..26c5c17e --- /dev/null +++ b/ably/hosts_test.go @@ -0,0 +1,120 @@ +package ably_test + +import ( + "testing" + + "github.com/ably/ably-go/ably" + "github.com/stretchr/testify/assert" +) + +func Test_RTN17_RealtimeHostFallback(t *testing.T) { + t.Parallel() + t.Run("RTN17a: should always get primary host as pref. host", func(t *testing.T) { + clientOptions := ably.NewClientOptions() + realtimeHosts := ably.NewRealtimeHosts(clientOptions) + prefHost := realtimeHosts.GetPreferredHost() + assert.Equal(t, "realtime.ably.io", prefHost) + }) + + t.Run("RTN17a: should always get primary host as pref. host", func(t *testing.T) { + clientOptions := ably.NewClientOptions(ably.WithRealtimeHost("custom-realtime.ably.io")) + realtimeHosts := ably.NewRealtimeHosts(clientOptions) + prefHost := realtimeHosts.GetPreferredHost() + assert.Equal(t, "custom-realtime.ably.io", prefHost) + }) + + t.Run("RTN17c, RSC15g: should get fallback hosts in random order", func(t *testing.T) { + clientOptions := ably.NewClientOptions() + realtimeHosts := ably.NewRealtimeHosts(clientOptions) + // All expected hosts supposed to be tried upon + expectedHosts := []string{ + "realtime.ably.io", + "a.ably-realtime.com", + "b.ably-realtime.com", + "c.ably-realtime.com", + "d.ably-realtime.com", + "e.ably-realtime.com", + } + + // Get first preferred restHost + var actualHosts []string + prefHost := realtimeHosts.GetPreferredHost() + actualHosts = append(actualHosts, prefHost) + + // Get all fallback hosts in random order + actualHosts = append(actualHosts, realtimeHosts.GetAllRemainingFallbackHosts()...) + + assert.ElementsMatch(t, expectedHosts, actualHosts) + }) + + t.Run("RTN17c, RSC15g: should get all fallback hosts again, when visited hosts are cleared after reconnection", func(t *testing.T) { + clientOptions := ably.NewClientOptions() + realtimeHosts := ably.NewRealtimeHosts(clientOptions) + // All expected hosts supposed to be tried upon + expectedHosts := []string{ + "a.ably-realtime.com", + "b.ably-realtime.com", + "c.ably-realtime.com", + "d.ably-realtime.com", + "e.ably-realtime.com", + } + + // Get first preferred restHost + var actualHosts []string + realtimeHosts.GetPreferredHost() + + // Get some fallback hosts + realtimeHosts.NextFallbackHost() + realtimeHosts.NextFallbackHost() + + // Clear visited hosts, after reconnection + realtimeHosts.ResetVisitedFallbackHosts() + + // Get all fallback hosts in random order + actualHosts = append(actualHosts, realtimeHosts.GetAllRemainingFallbackHosts()...) + + assert.ElementsMatch(t, expectedHosts, actualHosts) + }) + + t.Run("RTN17c, RSC15g: should get all fallback hosts when preferred host is not requested", func(t *testing.T) { + clientOptions := ably.NewClientOptions() + realtimeHosts := ably.NewRealtimeHosts(clientOptions) + // All expected hosts supposed to be tried upon + expectedHosts := []string{ + "a.ably-realtime.com", + "b.ably-realtime.com", + "c.ably-realtime.com", + "d.ably-realtime.com", + "e.ably-realtime.com", + } + + var actualHosts []string + + // Get all fallback hosts in random order + actualHosts = append(actualHosts, realtimeHosts.GetAllRemainingFallbackHosts()...) + + assert.ElementsMatch(t, expectedHosts, actualHosts) + }) + + t.Run("should get remaining fallback hosts count", func(t *testing.T) { + clientOptions := ably.NewClientOptions() + realtimeHosts := ably.NewRealtimeHosts(clientOptions) + + // Get first preferred restHost + realtimeHosts.GetPreferredHost() + + assert.Equal(t, 5, realtimeHosts.FallbackHostsRemaining()) + // Get some fallback hosts + realtimeHosts.NextFallbackHost() + assert.Equal(t, 4, realtimeHosts.FallbackHostsRemaining()) + + realtimeHosts.NextFallbackHost() + assert.Equal(t, 3, realtimeHosts.FallbackHostsRemaining()) + + realtimeHosts.NextFallbackHost() + realtimeHosts.NextFallbackHost() + realtimeHosts.NextFallbackHost() + + assert.Equal(t, 0, realtimeHosts.FallbackHostsRemaining()) + }) +} From a5964df7fb9bc4de6fb4c67249a5bcfb2f12c031 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Wed, 7 Aug 2024 19:47:14 +0530 Subject: [PATCH 06/29] Moved timeoutOrDnsError check to error.go file --- ably/error.go | 11 +++++++++++ ably/rest_client.go | 13 ------------- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/ably/error.go b/ably/error.go index d88ff902..623422ce 100644 --- a/ably/error.go +++ b/ably/error.go @@ -147,6 +147,17 @@ func errFromUnprocessableBody(resp *http.Response) error { return &ErrorInfo{Code: ErrBadRequest, StatusCode: resp.StatusCode, err: err} } +func isTimeoutOrDnsErr(err error) bool { + var netErr net.Error + if errors.As(err, &netErr) { + if netErr.Timeout() { // RSC15l2 + return true + } + } + var dnsErr *net.DNSError + return errors.As(err, &dnsErr) // RSC15l1 +} + func checkValidHTTPResponse(resp *http.Response) error { type errorBody struct { Error errorInfo `json:"error,omitempty" codec:"error,omitempty"` diff --git a/ably/rest_client.go b/ably/rest_client.go index 0698bf51..ad84ca7e 100644 --- a/ably/rest_client.go +++ b/ably/rest_client.go @@ -6,13 +6,11 @@ import ( _ "crypto/sha512" "encoding/base64" "encoding/json" - "errors" "fmt" "io" "io/ioutil" "math/rand" "mime" - "net" "net/http" "net/http/httptrace" "net/url" @@ -804,17 +802,6 @@ func canFallBack(err error, res *http.Response) bool { isTimeoutOrDnsErr(err) //RSC15l1, RSC15l2 } -func isTimeoutOrDnsErr(err error) bool { - var netErr net.Error - if errors.As(err, &netErr) { - if netErr.Timeout() { // RSC15l2 - return true - } - } - var dnsErr *net.DNSError - return errors.As(err, &dnsErr) // RSC15l1 -} - // RSC15l3 func isStatusCodeBetween500_504(res *http.Response) bool { return res != nil && From ef8bcee415805915adab33e828b2e4605b265c0b Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Wed, 7 Aug 2024 19:47:59 +0530 Subject: [PATCH 07/29] Added test for checking active internet connection --- ably/export_test.go | 4 ++++ ably/options_test.go | 46 +++++++++++++++++++++++--------------------- 2 files changed, 28 insertions(+), 22 deletions(-) diff --git a/ably/export_test.go b/ably/export_test.go index 7dc4c0ef..1f8ac150 100644 --- a/ably/export_test.go +++ b/ably/export_test.go @@ -159,6 +159,10 @@ func (opts *clientOptions) GetFallbackRetryTimeout() time.Duration { return opts.fallbackRetryTimeout() } +func (opts *clientOptions) HasActiveInternetConnection() bool { + return opts.hasActiveInternetConnection() +} + func NewErrorInfo(code ErrorCode, err error) *ErrorInfo { return newError(code, err) } diff --git a/ably/options_test.go b/ably/options_test.go index c622bb54..1b0c588d 100644 --- a/ably/options_test.go +++ b/ably/options_test.go @@ -14,31 +14,33 @@ import ( ) func TestDefaultFallbacks_RSC15h(t *testing.T) { - t.Run("with env should return environment fallback hosts", func(t *testing.T) { - expectedFallBackHosts := []string{ - "a.ably-realtime.com", - "b.ably-realtime.com", - "c.ably-realtime.com", - "d.ably-realtime.com", - "e.ably-realtime.com", - } - hosts := ably.DefaultFallbackHosts() - assert.Equal(t, expectedFallBackHosts, hosts) - }) + expectedFallBackHosts := []string{ + "a.ably-realtime.com", + "b.ably-realtime.com", + "c.ably-realtime.com", + "d.ably-realtime.com", + "e.ably-realtime.com", + } + hosts := ably.DefaultFallbackHosts() + assert.Equal(t, expectedFallBackHosts, hosts) } func TestEnvFallbackHosts_RSC15i(t *testing.T) { - t.Run("with env should return environment fallback hosts", func(t *testing.T) { - expectedFallBackHosts := []string{ - "sandbox-a-fallback.ably-realtime.com", - "sandbox-b-fallback.ably-realtime.com", - "sandbox-c-fallback.ably-realtime.com", - "sandbox-d-fallback.ably-realtime.com", - "sandbox-e-fallback.ably-realtime.com", - } - hosts := ably.GetEnvFallbackHosts("sandbox") - assert.Equal(t, expectedFallBackHosts, hosts) - }) + expectedFallBackHosts := []string{ + "sandbox-a-fallback.ably-realtime.com", + "sandbox-b-fallback.ably-realtime.com", + "sandbox-c-fallback.ably-realtime.com", + "sandbox-d-fallback.ably-realtime.com", + "sandbox-e-fallback.ably-realtime.com", + } + hosts := ably.GetEnvFallbackHosts("sandbox") + assert.Equal(t, expectedFallBackHosts, hosts) +} + +func TestInternetConnectionCheck_RTN17c(t *testing.T) { + t.Parallel() + clientOptions := ably.NewClientOptions() + assert.True(t, clientOptions.HasActiveInternetConnection()) } func TestFallbackHosts_RSC15b(t *testing.T) { From 796051543236fc84539bcdc1ca9d8c57b872f42b Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Fri, 9 Aug 2024 17:04:32 +0530 Subject: [PATCH 08/29] Added hosts as a separate property to connection struct, removed unused err property --- ably/realtime_conn.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ably/realtime_conn.go b/ably/realtime_conn.go index beb2a358..cf75b59e 100644 --- a/ably/realtime_conn.go +++ b/ably/realtime_conn.go @@ -62,7 +62,6 @@ type Connection struct { msgSerial int64 connStateTTL durationFromMsecs - err error conn conn opts *clientOptions pending pendingEmitter @@ -78,6 +77,7 @@ type Connection struct { // after a reauthorization, to avoid re-reauthorizing. reauthorizing bool arg connArgs + hosts *realtimeHosts client *Realtime readLimit int64 @@ -113,6 +113,7 @@ func newConn(opts *clientOptions, auth *Auth, callbacks connCallbacks, client *R } auth.onExplicitAuthorize = c.onClientAuthorize c.queue = newMsgQueue(c) + c.hosts = newRealtimeHosts(c.opts) if !opts.NoConnect { c.setState(ConnectionStateConnecting, nil, 0) go func() { From 06ceed1fbc543b8e8b3a843be997a577c383b4ef Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Mon, 12 Aug 2024 16:30:23 +0530 Subject: [PATCH 09/29] Updated websocketConn to include http resp field that stores handshake response --- ably/realtime_client_integration_test.go | 3 ++- ably/websocket.go | 12 +++++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/ably/realtime_client_integration_test.go b/ably/realtime_client_integration_test.go index 925918fa..ca48307b 100644 --- a/ably/realtime_client_integration_test.go +++ b/ably/realtime_client_integration_test.go @@ -68,7 +68,8 @@ func TestRealtime_RSC7_AblyAgent(t *testing.T) { defer client.Close() expectedAgentHeaderValue := ably.AblySDKIdentifier + " " + ably.GoRuntimeIdentifier + " " + ably.GoOSIdentifier() - ablytest.Wait(ablytest.ConnWaiter(client, nil, ably.ConnectionEventDisconnected), nil) + err = ablytest.Wait(ablytest.ConnWaiter(client, nil, ably.ConnectionEventDisconnected), nil) + assert.NoError(t, err) assert.Equal(t, expectedAgentHeaderValue, agentHeaderValue) }) diff --git a/ably/websocket.go b/ably/websocket.go index febfc6ae..879db43c 100644 --- a/ably/websocket.go +++ b/ably/websocket.go @@ -22,6 +22,7 @@ const ( type websocketConn struct { conn *websocket.Conn proto proto + resp *http.Response } func (ws *websocketConn) Send(msg *protocolMessage) error { @@ -88,7 +89,8 @@ func dialWebsocket(proto string, u *url.URL, timeout time.Duration, agents map[s return nil, errors.New(`invalid protocol "` + proto + `"`) } // Starts a raw websocket connection with server - conn, err := dialWebsocketTimeout(u.String(), "https://"+u.Host, timeout, agents) + conn, resp, err := dialWebsocketTimeout(u.String(), "https://"+u.Host, timeout, agents) + ws.resp = resp if err != nil { return nil, err } @@ -97,7 +99,7 @@ func dialWebsocket(proto string, u *url.URL, timeout time.Duration, agents map[s } // dialWebsocketTimeout dials the websocket with a timeout. -func dialWebsocketTimeout(uri, origin string, timeout time.Duration, agents map[string]string) (*websocket.Conn, error) { +func dialWebsocketTimeout(uri, origin string, timeout time.Duration, agents map[string]string) (*websocket.Conn, *http.Response, error) { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() @@ -105,13 +107,13 @@ func dialWebsocketTimeout(uri, origin string, timeout time.Duration, agents map[ ops.HTTPHeader = make(http.Header) ops.HTTPHeader.Add(ablyAgentHeader, ablyAgentIdentifier(agents)) - c, _, err := websocket.Dial(ctx, uri, &ops) + c, resp, err := websocket.Dial(ctx, uri, &ops) if err != nil { - return nil, err + return nil, resp, err } - return c, nil + return c, resp, nil } func unwrapConn(c conn) conn { From e4494751fa2afa8366a0e80e7ddae5dc9c0db7ed Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Mon, 12 Aug 2024 17:43:51 +0530 Subject: [PATCH 10/29] Added method to extract http response from conn interface --- ably/websocket.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/ably/websocket.go b/ably/websocket.go index 879db43c..84d71ddd 100644 --- a/ably/websocket.go +++ b/ably/websocket.go @@ -126,6 +126,15 @@ func unwrapConn(c conn) conn { return unwrapConn(u.Unwrap()) } +func extractHttpResponseFromConn(c conn) *http.Response { + unwrappedConn := unwrapConn(c) + websocketConn, ok := unwrappedConn.(*websocketConn) + if ok { + return websocketConn.resp + } + return nil +} + func setConnectionReadLimit(c conn, readLimit int64) error { unwrappedConn := unwrapConn(c) websocketConn, ok := unwrappedConn.(*websocketConn) From b67165fe3c35dc4b9445ccb5e18d016ff85476a0 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Mon, 12 Aug 2024 17:51:40 +0530 Subject: [PATCH 11/29] Updated realtime implementation to connect using fallback hosts --- ably/realtime_conn.go | 52 ++++++++++++++++++++++++++++--------------- 1 file changed, 34 insertions(+), 18 deletions(-) diff --git a/ably/realtime_conn.go b/ably/realtime_conn.go index cf75b59e..713428e9 100644 --- a/ably/realtime_conn.go +++ b/ably/realtime_conn.go @@ -368,16 +368,16 @@ func (c *Connection) connectWithRetryLoop(arg connArgs) (result, error) { } func (c *Connection) connectWith(arg connArgs) (result, error) { + defer c.hosts.resetVisitedFallbackHosts() + connectMode := c.getMode() + c.mtx.Lock() // set ably connection state to connecting, connecting state exists regardless of whether raw connection is successful or not if !c.isActive() { // check if already in connecting state c.lockSetState(ConnectionStateConnecting, nil, 0) } c.mtx.Unlock() - u, err := url.Parse(c.opts.realtimeURL()) - if err != nil { - return nil, err - } + var res result if arg.result { res = c.internalEmitter.listenResult( @@ -386,22 +386,38 @@ func (c *Connection) connectWith(arg connArgs) (result, error) { ConnectionStateDisconnected, ) } - connectMode := c.getMode() - query, err := c.params(connectMode) - if err != nil { - return nil, err - } - u.RawQuery = query.Encode() - proto := c.opts.protocol() - if c.State() == ConnectionStateClosed { // RTN12d - if connection is closed by client, don't try to reconnect - return nopResult, nil - } + var conn conn + host := c.hosts.getPreferredHost() + for { + u, err := url.Parse(host) + if err != nil { + return nil, err + } + query, err := c.params(connectMode) + if err != nil { + return nil, err + } + u.RawQuery = query.Encode() + proto := c.opts.protocol() - // if err is nil, raw connection with server is successful - conn, err := c.dial(proto, u) - if err != nil { - return nil, err + if c.State() == ConnectionStateClosed { // RTN12d - if connection is closed by client, don't try to reconnect + return nopResult, nil + } + // if err is nil, raw connection with server is successful + conn, err = c.dial(proto, u) + if err == nil { // success + if host != c.hosts.getPrimaryHost() { // RTN17e + // set preferred rest host same as active realtime host + } + break + } + resp := extractHttpResponseFromConn(c.conn) + if c.hosts.fallbackHostsRemaining() > 0 && canFallBack(err, resp) && c.opts.hasActiveInternetConnection() { // RTN17d, RTN17c + host = c.hosts.nextFallbackHost() + } else { + return nil, err + } } c.mtx.Lock() From 823d0fc16cee88f582a83c2232899bcdbbcb2406 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Mon, 12 Aug 2024 19:01:26 +0530 Subject: [PATCH 12/29] Updated realtime url to return url based on given host --- ably/export_test.go | 4 ++-- ably/options.go | 10 +++------- ably/realtime_conn.go | 2 +- 3 files changed, 6 insertions(+), 10 deletions(-) diff --git a/ably/export_test.go b/ably/export_test.go index 1f8ac150..e9531f90 100644 --- a/ably/export_test.go +++ b/ably/export_test.go @@ -70,8 +70,8 @@ func (opts *clientOptions) RestURL() string { return opts.restURL() } -func (opts *clientOptions) RealtimeURL() string { - return opts.realtimeURL() +func (opts *clientOptions) RealtimeURL(realtimeHost string) string { + return opts.realtimeURL(realtimeHost) } func (c *REST) Post(ctx context.Context, path string, in, out interface{}) (*http.Response, error) { diff --git a/ably/options.go b/ably/options.go index 2a06a0c2..16b43b8c 100644 --- a/ably/options.go +++ b/ably/options.go @@ -486,13 +486,9 @@ func (opts *clientOptions) restURL() (restUrl string) { return "https://" + baseUrl } -func (opts *clientOptions) realtimeURL() (realtimeUrl string) { - baseUrl := opts.getRealtimeHost() - _, _, err := net.SplitHostPort(baseUrl) - if err != nil { // set port if not set in baseUrl - port, _ := opts.activePort() - baseUrl = net.JoinHostPort(baseUrl, strconv.Itoa(port)) - } +func (opts *clientOptions) realtimeURL(realtimeHost string) (realtimeUrl string) { + port, _ := opts.activePort() + baseUrl := net.JoinHostPort(realtimeHost, strconv.Itoa(port)) if opts.NoTLS { return "ws://" + baseUrl } diff --git a/ably/realtime_conn.go b/ably/realtime_conn.go index 713428e9..bb08d812 100644 --- a/ably/realtime_conn.go +++ b/ably/realtime_conn.go @@ -390,7 +390,7 @@ func (c *Connection) connectWith(arg connArgs) (result, error) { var conn conn host := c.hosts.getPreferredHost() for { - u, err := url.Parse(host) + u, err := url.Parse(c.opts.realtimeURL(host)) if err != nil { return nil, err } From 982e5cdd69300dae27102e08f0caebbc9cc759a3 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Mon, 12 Aug 2024 19:31:56 +0530 Subject: [PATCH 13/29] Fixed failing tests responsible for checking agent headers --- ably/realtime_client_integration_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ably/realtime_client_integration_test.go b/ably/realtime_client_integration_test.go index ca48307b..ab1a0706 100644 --- a/ably/realtime_client_integration_test.go +++ b/ably/realtime_client_integration_test.go @@ -61,6 +61,7 @@ func TestRealtime_RSC7_AblyAgent(t *testing.T) { client, err := ably.NewRealtime( ably.WithEnvironment(ablytest.Environment), ably.WithTLS(false), + ably.WithFallbackHosts([]string{}), ably.WithToken("fake:token"), ably.WithUseTokenAuth(true), ably.WithRealtimeHost(serverURL.Host)) @@ -88,6 +89,7 @@ func TestRealtime_RSC7_AblyAgent(t *testing.T) { ably.WithEnvironment(ablytest.Environment), ably.WithTLS(false), ably.WithToken("fake:token"), + ably.WithFallbackHosts([]string{}), ably.WithUseTokenAuth(true), ably.WithRealtimeHost(serverURL.Host), ably.WithAgents(map[string]string{ @@ -117,6 +119,7 @@ func TestRealtime_RSC7_AblyAgent(t *testing.T) { ably.WithEnvironment(ablytest.Environment), ably.WithTLS(false), ably.WithToken("fake:token"), + ably.WithFallbackHosts([]string{}), ably.WithUseTokenAuth(true), ably.WithRealtimeHost(serverURL.Host), ably.WithAgents(map[string]string{ From edeb38db7f48c3606e27874f54cbb5d1a897ba11 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Tue, 13 Aug 2024 00:38:13 +0530 Subject: [PATCH 14/29] Fixed realtimeUrl method on clientOptions --- ably/options.go | 8 ++++++-- ably/realtime_client_integration_test.go | 3 +-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/ably/options.go b/ably/options.go index 16b43b8c..213791a7 100644 --- a/ably/options.go +++ b/ably/options.go @@ -487,8 +487,12 @@ func (opts *clientOptions) restURL() (restUrl string) { } func (opts *clientOptions) realtimeURL(realtimeHost string) (realtimeUrl string) { - port, _ := opts.activePort() - baseUrl := net.JoinHostPort(realtimeHost, strconv.Itoa(port)) + baseUrl := realtimeHost + _, _, err := net.SplitHostPort(baseUrl) + if err != nil { // set port if not set in provided realtimeHost + port, _ := opts.activePort() + baseUrl = net.JoinHostPort(baseUrl, strconv.Itoa(port)) + } if opts.NoTLS { return "ws://" + baseUrl } diff --git a/ably/realtime_client_integration_test.go b/ably/realtime_client_integration_test.go index ab1a0706..6215ef87 100644 --- a/ably/realtime_client_integration_test.go +++ b/ably/realtime_client_integration_test.go @@ -69,8 +69,7 @@ func TestRealtime_RSC7_AblyAgent(t *testing.T) { defer client.Close() expectedAgentHeaderValue := ably.AblySDKIdentifier + " " + ably.GoRuntimeIdentifier + " " + ably.GoOSIdentifier() - err = ablytest.Wait(ablytest.ConnWaiter(client, nil, ably.ConnectionEventDisconnected), nil) - assert.NoError(t, err) + ablytest.Wait(ablytest.ConnWaiter(client, nil, ably.ConnectionEventDisconnected), nil) assert.Equal(t, expectedAgentHeaderValue, agentHeaderValue) }) From a22202b202ed0701d52a3b352beb269c10ed1b76 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Tue, 13 Aug 2024 08:51:43 +0530 Subject: [PATCH 15/29] Added integration tests for realtime host fallback --- ably/realtime_client_integration_test.go | 132 +++++++++++++++++++++++ 1 file changed, 132 insertions(+) diff --git a/ably/realtime_client_integration_test.go b/ably/realtime_client_integration_test.go index 6215ef87..bbfbec76 100644 --- a/ably/realtime_client_integration_test.go +++ b/ably/realtime_client_integration_test.go @@ -135,6 +135,138 @@ func TestRealtime_RSC7_AblyAgent(t *testing.T) { }) } +func TestRealtime_RTN17_HostFallback(t *testing.T) { + + getDNSErr := func() *net.DNSError { + return &net.DNSError{ + Err: "Can't resolve host", + Name: "Host unresolvable", + Server: "rest.ably.com", + IsTimeout: false, + IsTemporary: false, + IsNotFound: false, + } + } + + getTimeoutErr := func() error { + dnsErr := getDNSErr() + dnsErr.IsTimeout = true + return dnsErr + } + + setUpWithError := func(err error, opts ...ably.ClientOption) (visitedHosts []string) { + hostCh := make(chan string, 1) + errCh := make(chan error, 1) + defaultOptions := []ably.ClientOption{ + ably.WithAutoConnect(false), + ably.WithDial(func(protocol string, u *url.URL, timeout time.Duration) (ably.Conn, error) { + hostCh <- u.Hostname() + return nil, <-errCh + }), + } + opts = append(opts, defaultOptions...) + _, client := ablytest.NewRealtime(opts...) + client.Connect() + connDisconnectedEventChan := make(chan struct{}) + // inject error to return after receiving host and break the loop once disconnect event is received + go func(err error) { + for { + select { + case host := <-hostCh: + visitedHosts = append(visitedHosts, host) + errCh <- err + case <-connDisconnectedEventChan: + return + } + } + }(err) + ablytest.Wait(ablytest.ConnWaiter(client, nil, ably.ConnectionEventDisconnected), nil) + connDisconnectedEventChan <- struct{}{} + return + } + + t.Run("RTN17a: First attempt should be on default host first", func(t *testing.T) { + t.Parallel() + visitedHosts := setUpWithError(fmt.Errorf("host url is wrong")) + expectedHost := "sandbox-realtime.ably.io" + + assert.Equal(t, expectedHost, visitedHosts[0]) + }) + + t.Run("RTN17b: Fallback behaviour", func(t *testing.T) { + t.Parallel() + + t.Run("apply when default realtime endpoint is not overridden, port/tlsport not set", func(t *testing.T) { + t.Parallel() + visitedHosts := setUpWithError(getTimeoutErr()) + expectedPrimaryHost := "sandbox-realtime.ably.io" + expectedFallbackHosts := ably.GetEnvFallbackHosts("sandbox") + + assert.Equal(t, 6, len(visitedHosts)) + assert.Equal(t, expectedPrimaryHost, visitedHosts[0]) + assert.ElementsMatch(t, expectedFallbackHosts, visitedHosts[1:]) + }) + + t.Run("does not apply when the custom realtime endpoint is used", func(t *testing.T) { + t.Parallel() + visitedHosts := setUpWithError(getTimeoutErr(), ably.WithRealtimeHost("custom-realtime.ably.io")) + expectedHost := "custom-realtime.ably.io" + + assert.Equal(t, 1, len(visitedHosts)) + assert.Equal(t, expectedHost, visitedHosts[0]) + }) + + t.Run("apply when fallbacks are provided", func(t *testing.T) { + t.Parallel() + fallbacks := []string{"fallback0", "fallback1", "fallback2"} + visitedHosts := setUpWithError(getTimeoutErr(), ably.WithFallbackHosts(fallbacks)) + expectedPrimaryHost := "sandbox-realtime.ably.io" + + assert.Equal(t, 4, len(visitedHosts)) + assert.Equal(t, expectedPrimaryHost, visitedHosts[0]) + assert.ElementsMatch(t, fallbacks, visitedHosts[1:]) + }) + + t.Run("apply when fallbackHostUseDefault is true, even if env. or host is set", func(t *testing.T) { + t.Parallel() + visitedHosts := setUpWithError( + getTimeoutErr(), + ably.WithFallbackHostsUseDefault(true), + ably.WithEnvironment("custom"), + ably.WithRealtimeHost("custom-ably.realtime.com")) + + expectedPrimaryHost := "custom-ably.realtime.com" + expectedFallbackHosts := ably.DefaultFallbackHosts() + + assert.Equal(t, 6, len(visitedHosts)) + assert.Equal(t, expectedPrimaryHost, visitedHosts[0]) + assert.ElementsMatch(t, expectedFallbackHosts, visitedHosts[1:]) + }) + }) + + t.Run("RTN17c: Verifies internet connection is active in case of error necessitating use of an alternative host", func(t *testing.T) { + t.Parallel() + const internetCheckUrl = "https://internet-up.ably-realtime.com/is-the-internet-up.txt" + rec, optn := recorder() + visitedHosts := setUpWithError(getDNSErr(), optn...) + assert.Equal(t, 6, len(visitedHosts)) // including primary host + assert.Equal(t, 5, len(rec.Requests())) + for _, request := range rec.Requests() { + assert.Equal(t, request.URL.String(), internetCheckUrl) + } + }) + + t.Run("RTN17d: Check for compatible errors before attempting to reconnect to a fallback host", func(t *testing.T) { + t.Parallel() + visitedHosts := setUpWithError(fmt.Errorf("host url is wrong")) // non-dns or non-timeout error + assert.Equal(t, 1, len(visitedHosts)) + visitedHosts = setUpWithError(getDNSErr()) + assert.Equal(t, 6, len(visitedHosts)) + visitedHosts = setUpWithError(getTimeoutErr()) + assert.Equal(t, 6, len(visitedHosts)) + }) +} + func checkUnique(ch chan string, typ string, n int) error { close(ch) uniq := make(map[string]struct{}, n) From 2168f831e3570a76fa8239216c3722580994653e Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Wed, 14 Aug 2024 03:42:26 +0530 Subject: [PATCH 16/29] Added RTN17e host fallback implementation for active realtime host, Added integration test for the same --- ably/realtime_client_integration_test.go | 91 ++++++++++++------------ ably/realtime_conn.go | 4 +- ably/rest_client.go | 11 ++- ablytest/recorders.go | 7 ++ ablytest/sandbox.go | 21 +++--- 5 files changed, 75 insertions(+), 59 deletions(-) diff --git a/ably/realtime_client_integration_test.go b/ably/realtime_client_integration_test.go index bbfbec76..3cca4dd8 100644 --- a/ably/realtime_client_integration_test.go +++ b/ably/realtime_client_integration_test.go @@ -15,6 +15,7 @@ import ( "time" "github.com/ably/ably-go/ably" + "github.com/ably/ably-go/ably/internal/ablyutil" "github.com/ably/ably-go/ablytest" "github.com/stretchr/testify/assert" @@ -136,59 +137,31 @@ func TestRealtime_RSC7_AblyAgent(t *testing.T) { } func TestRealtime_RTN17_HostFallback(t *testing.T) { + t.Parallel() getDNSErr := func() *net.DNSError { return &net.DNSError{ - Err: "Can't resolve host", - Name: "Host unresolvable", - Server: "rest.ably.com", - IsTimeout: false, - IsTemporary: false, - IsNotFound: false, + IsTimeout: false, } } getTimeoutErr := func() error { - dnsErr := getDNSErr() - dnsErr.IsTimeout = true - return dnsErr + return &errTimeout{} } setUpWithError := func(err error, opts ...ably.ClientOption) (visitedHosts []string) { - hostCh := make(chan string, 1) - errCh := make(chan error, 1) - defaultOptions := []ably.ClientOption{ - ably.WithAutoConnect(false), + client, _ := ably.NewRealtime(append(opts, ably.WithAutoConnect(false), ably.WithKey("fake:key"), ably.WithDial(func(protocol string, u *url.URL, timeout time.Duration) (ably.Conn, error) { - hostCh <- u.Hostname() - return nil, <-errCh - }), - } - opts = append(opts, defaultOptions...) - _, client := ablytest.NewRealtime(opts...) - client.Connect() - connDisconnectedEventChan := make(chan struct{}) - // inject error to return after receiving host and break the loop once disconnect event is received - go func(err error) { - for { - select { - case host := <-hostCh: - visitedHosts = append(visitedHosts, host) - errCh <- err - case <-connDisconnectedEventChan: - return - } - } - }(err) - ablytest.Wait(ablytest.ConnWaiter(client, nil, ably.ConnectionEventDisconnected), nil) - connDisconnectedEventChan <- struct{}{} + visitedHosts = append(visitedHosts, u.Hostname()) + return nil, err + }))...) + ablytest.Wait(ablytest.ConnWaiter(client, client.Connect, ably.ConnectionEventDisconnected), nil) return } t.Run("RTN17a: First attempt should be on default host first", func(t *testing.T) { - t.Parallel() visitedHosts := setUpWithError(fmt.Errorf("host url is wrong")) - expectedHost := "sandbox-realtime.ably.io" + expectedHost := "realtime.ably.io" assert.Equal(t, expectedHost, visitedHosts[0]) }) @@ -197,10 +170,9 @@ func TestRealtime_RTN17_HostFallback(t *testing.T) { t.Parallel() t.Run("apply when default realtime endpoint is not overridden, port/tlsport not set", func(t *testing.T) { - t.Parallel() visitedHosts := setUpWithError(getTimeoutErr()) - expectedPrimaryHost := "sandbox-realtime.ably.io" - expectedFallbackHosts := ably.GetEnvFallbackHosts("sandbox") + expectedPrimaryHost := "realtime.ably.io" + expectedFallbackHosts := ably.DefaultFallbackHosts() assert.Equal(t, 6, len(visitedHosts)) assert.Equal(t, expectedPrimaryHost, visitedHosts[0]) @@ -208,7 +180,6 @@ func TestRealtime_RTN17_HostFallback(t *testing.T) { }) t.Run("does not apply when the custom realtime endpoint is used", func(t *testing.T) { - t.Parallel() visitedHosts := setUpWithError(getTimeoutErr(), ably.WithRealtimeHost("custom-realtime.ably.io")) expectedHost := "custom-realtime.ably.io" @@ -217,10 +188,9 @@ func TestRealtime_RTN17_HostFallback(t *testing.T) { }) t.Run("apply when fallbacks are provided", func(t *testing.T) { - t.Parallel() fallbacks := []string{"fallback0", "fallback1", "fallback2"} visitedHosts := setUpWithError(getTimeoutErr(), ably.WithFallbackHosts(fallbacks)) - expectedPrimaryHost := "sandbox-realtime.ably.io" + expectedPrimaryHost := "realtime.ably.io" assert.Equal(t, 4, len(visitedHosts)) assert.Equal(t, expectedPrimaryHost, visitedHosts[0]) @@ -228,7 +198,6 @@ func TestRealtime_RTN17_HostFallback(t *testing.T) { }) t.Run("apply when fallbackHostUseDefault is true, even if env. or host is set", func(t *testing.T) { - t.Parallel() visitedHosts := setUpWithError( getTimeoutErr(), ably.WithFallbackHostsUseDefault(true), @@ -247,7 +216,7 @@ func TestRealtime_RTN17_HostFallback(t *testing.T) { t.Run("RTN17c: Verifies internet connection is active in case of error necessitating use of an alternative host", func(t *testing.T) { t.Parallel() const internetCheckUrl = "https://internet-up.ably-realtime.com/is-the-internet-up.txt" - rec, optn := recorder() + rec, optn := ablytest.NewHttpRecorder() visitedHosts := setUpWithError(getDNSErr(), optn...) assert.Equal(t, 6, len(visitedHosts)) // including primary host assert.Equal(t, 5, len(rec.Requests())) @@ -257,7 +226,6 @@ func TestRealtime_RTN17_HostFallback(t *testing.T) { }) t.Run("RTN17d: Check for compatible errors before attempting to reconnect to a fallback host", func(t *testing.T) { - t.Parallel() visitedHosts := setUpWithError(fmt.Errorf("host url is wrong")) // non-dns or non-timeout error assert.Equal(t, 1, len(visitedHosts)) visitedHosts = setUpWithError(getDNSErr()) @@ -265,6 +233,37 @@ func TestRealtime_RTN17_HostFallback(t *testing.T) { visitedHosts = setUpWithError(getTimeoutErr()) assert.Equal(t, 6, len(visitedHosts)) }) + + t.Run("RTN17e: Same fallback host should be used for REST as Realtime Fallback Host for a given active connection", func(t *testing.T) { + errCh := make(chan error, 1) + errCh <- getTimeoutErr() + realtimeMsgRecorder := NewMessageRecorder() // websocket recorder + restMsgRecorder, optn := ablytest.NewHttpRecorder() // http recorder + _, client := ablytest.NewRealtime(ably.WithAutoConnect(false), + ably.WithDial(func(protocol string, u *url.URL, timeout time.Duration) (ably.Conn, error) { + err, ok := <-errCh + if ok { + close(errCh) + return nil, err // return timeout error for primary host + } + return realtimeMsgRecorder.Dial(protocol, u, timeout) // return dial for subsequent dials + }), optn[0]) + defer client.Close() + + err := ablytest.Wait(ablytest.ConnWaiter(client, client.Connect, ably.ConnectionEventConnected), nil) + if err != nil { + t.Fatalf("Error connecting host with error %v", err) + } + realtimeSuccessHost := realtimeMsgRecorder.URLs()[0].Hostname() + fallbackHosts := ably.GetEnvFallbackHosts("sandbox") + if !ablyutil.SliceContains(fallbackHosts, realtimeSuccessHost) { + t.Fatalf("realtime host must be one of fallback hosts, received %v", realtimeSuccessHost) + } + + client.Time(context.Background()) // make a rest request + restSuccessHost := restMsgRecorder.Request(1).URL.Hostname() // second request is to get the time, first for internet connection + assert.Equal(t, realtimeSuccessHost, restSuccessHost) + }) } func checkUnique(ch chan string, typ string, n int) error { diff --git a/ably/realtime_conn.go b/ably/realtime_conn.go index bb08d812..c2d73947 100644 --- a/ably/realtime_conn.go +++ b/ably/realtime_conn.go @@ -408,7 +408,9 @@ func (c *Connection) connectWith(arg connArgs) (result, error) { conn, err = c.dial(proto, u) if err == nil { // success if host != c.hosts.getPrimaryHost() { // RTN17e - // set preferred rest host same as active realtime host + c.client.rest.setActiveRealtimeHost(host) + } else if !empty(c.client.rest.activeRealtimeHost) { + c.client.rest.setActiveRealtimeHost("") // reset to default } break } diff --git a/ably/rest_client.go b/ably/rest_client.go index ad84ca7e..38703ad6 100644 --- a/ably/rest_client.go +++ b/ably/rest_client.go @@ -134,6 +134,7 @@ type REST struct { opts *clientOptions successFallbackHost *fallbackCache + activeRealtimeHost string // RTN17e log logger } @@ -193,6 +194,10 @@ func (c *REST) Stats(o ...StatsOption) StatsRequest { return StatsRequest{r: c.newPaginatedRequest("/stats", "", params)} } +func (c *REST) setActiveRealtimeHost(realtimeHost string) { + c.activeRealtimeHost = realtimeHost +} + // A StatsOption configures a call to REST.Stats or Realtime.Stats. type StatsOption func(*statsOptions) @@ -702,8 +707,12 @@ func (c *REST) doWithHandle(ctx context.Context, r *request, handle func(*http.R } if h := c.successFallbackHost.get(); h != "" { req.URL.Host = h // RSC15f - c.log.Verbosef("RestClient: setting URL.Host=%q", h) + c.log.Verbosef("RestClient: setting cached URL.Host=%q", h) + } else if !empty(c.activeRealtimeHost) { // RTN17e + req.URL.Host = c.activeRealtimeHost + c.log.Verbosef("RestClient: setting activeRealtimeHost URL.Host=%q", c.activeRealtimeHost) } + if c.opts.Trace != nil { req = req.WithContext(httptrace.WithClientTrace(req.Context(), c.opts.Trace)) c.log.Verbose("RestClient: enabling httptrace") diff --git a/ablytest/recorders.go b/ablytest/recorders.go index e5af3dcc..2b91c76b 100644 --- a/ablytest/recorders.go +++ b/ablytest/recorders.go @@ -23,6 +23,13 @@ type RoundTripRecorder struct { stopped int32 } +func NewHttpRecorder() (*RoundTripRecorder, []ably.ClientOption) { + rec := &RoundTripRecorder{} + httpClient := &http.Client{Transport: &http.Transport{}} + httpClient.Transport = rec.Hijack(httpClient.Transport) + return rec, []ably.ClientOption{ably.WithHTTPClient(httpClient)} +} + var _ http.RoundTripper = (*RoundTripRecorder)(nil) // Len gives number of recorded request/response pairs. diff --git a/ablytest/sandbox.go b/ablytest/sandbox.go index 59a7f36c..8cc995a1 100644 --- a/ablytest/sandbox.go +++ b/ablytest/sandbox.go @@ -231,24 +231,23 @@ func (app *Sandbox) Options(opts ...ably.ClientOption) []ably.ClientOption { Hijack(http.RoundTripper) http.RoundTripper } appHTTPClient := NewHTTPClient() - appOpts := []ably.ClientOption{ - ably.WithKey(app.Key()), - ably.WithEnvironment(app.Environment), - ably.WithUseBinaryProtocol(!NoBinaryProtocol), - ably.WithHTTPClient(appHTTPClient), - ably.WithLogLevel(DefaultLogLevel), - } // If opts want to record round trips inject the recording transport // via TransportHijacker interface. - opt := MergeOptions(opts) - if httpClient := ClientOptionsInspector.HTTPClient(opt); httpClient != nil { + if httpClient := ClientOptionsInspector.HTTPClient(opts); httpClient != nil { if hijacker, ok := httpClient.Transport.(transportHijacker); ok { appHTTPClient.Transport = hijacker.Hijack(appHTTPClient.Transport) - opt = append(opt, ably.WithHTTPClient(appHTTPClient)) + opts = append(opts, ably.WithHTTPClient(appHTTPClient)) } } - appOpts = MergeOptions(appOpts, opt) + + appOpts := []ably.ClientOption{ + ably.WithKey(app.Key()), + ably.WithEnvironment(app.Environment), + ably.WithUseBinaryProtocol(!NoBinaryProtocol), + ably.WithLogLevel(DefaultLogLevel), + } + appOpts = MergeOptions(appOpts, opts) return appOpts } From 2a8d98e7b5ac31d068f6334348a660ca8caf5d87 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Wed, 14 Aug 2024 04:50:22 +0530 Subject: [PATCH 17/29] Simplified realtime fallbacks, Removed use of separate hosts struct and tests --- ably/export_test.go | 34 ------------ ably/hosts.go | 70 ------------------------ ably/hosts_test.go | 120 ------------------------------------------ ably/realtime_conn.go | 21 ++++---- 4 files changed, 11 insertions(+), 234 deletions(-) delete mode 100644 ably/hosts.go delete mode 100644 ably/hosts_test.go diff --git a/ably/export_test.go b/ably/export_test.go index e9531f90..11c001b7 100644 --- a/ably/export_test.go +++ b/ably/export_test.go @@ -6,46 +6,12 @@ import ( "net/http/httptrace" "net/url" "time" - - "github.com/ably/ably-go/ably/internal/ablyutil" ) func NewClientOptions(os ...ClientOption) *clientOptions { return applyOptionsWithDefaults(os...) } -func NewRealtimeHosts(opts *clientOptions) *realtimeHosts { - return newRealtimeHosts(opts) -} - -func (realtimeHosts *realtimeHosts) NextFallbackHost() string { - return realtimeHosts.nextFallbackHost() -} - -func (realtimeHosts *realtimeHosts) GetAllRemainingFallbackHosts() []string { - var hosts []string - for true { - fallbackHost := realtimeHosts.NextFallbackHost() - if ablyutil.Empty(fallbackHost) { - break - } - hosts = append(hosts, fallbackHost) - } - return hosts -} - -func (realtimeHosts *realtimeHosts) ResetVisitedFallbackHosts() { - realtimeHosts.resetVisitedFallbackHosts() -} - -func (realtimeHosts *realtimeHosts) FallbackHostsRemaining() int { - return realtimeHosts.fallbackHostsRemaining() -} - -func (realtimeHosts *realtimeHosts) GetPreferredHost() string { - return realtimeHosts.getPreferredHost() -} - func GetEnvFallbackHosts(env string) []string { return getEnvFallbackHosts(env) } diff --git a/ably/hosts.go b/ably/hosts.go deleted file mode 100644 index 24704c1a..00000000 --- a/ably/hosts.go +++ /dev/null @@ -1,70 +0,0 @@ -package ably - -import ( - "sync" - - "github.com/ably/ably-go/ably/internal/ablyutil" -) - -// realtimeHosts(RTN17) is used to retrieve realtime primaryHost/fallbackHosts -type realtimeHosts struct { - opts *clientOptions - visitedHosts ablyutil.HashSet - sync.Mutex -} - -func newRealtimeHosts(opts *clientOptions) *realtimeHosts { - return &realtimeHosts{ - opts: opts, - visitedHosts: ablyutil.NewHashSet(), - } -} - -func (realtimeHosts *realtimeHosts) getPrimaryHost() string { - return realtimeHosts.opts.getRealtimeHost() -} - -func (realtimeHosts *realtimeHosts) nextFallbackHost() string { - realtimeHosts.Lock() - defer realtimeHosts.Unlock() - - getNonVisitedHost := func() string { - visitedHosts := realtimeHosts.visitedHosts - hosts, _ := realtimeHosts.opts.getFallbackHosts() - shuffledFallbackHosts := ablyutil.Shuffle(hosts) - for _, host := range shuffledFallbackHosts { - if !visitedHosts.Has(host) { - return host - } - } - return "" - } - - nonVisitedHost := getNonVisitedHost() - if !ablyutil.Empty(nonVisitedHost) { - realtimeHosts.visitedHosts.Add(nonVisitedHost) - } - return nonVisitedHost -} - -func (realtimeHosts *realtimeHosts) resetVisitedFallbackHosts() { - realtimeHosts.Lock() - defer realtimeHosts.Unlock() - realtimeHosts.visitedHosts = ablyutil.NewHashSet() -} - -func (realtimeHosts *realtimeHosts) fallbackHostsRemaining() int { - realtimeHosts.Lock() - defer realtimeHosts.Unlock() - hosts, _ := realtimeHosts.opts.getFallbackHosts() - return len(hosts) + 1 - len(realtimeHosts.visitedHosts) -} - -// getPreferredHost - Used to retrieve primary realtime host -func (realtimeHosts *realtimeHosts) getPreferredHost() string { - realtimeHosts.Lock() - defer realtimeHosts.Unlock() - host := realtimeHosts.getPrimaryHost() // primary host is always preferred host/ fallback host in realtime - realtimeHosts.visitedHosts.Add(host) - return host -} diff --git a/ably/hosts_test.go b/ably/hosts_test.go deleted file mode 100644 index 26c5c17e..00000000 --- a/ably/hosts_test.go +++ /dev/null @@ -1,120 +0,0 @@ -package ably_test - -import ( - "testing" - - "github.com/ably/ably-go/ably" - "github.com/stretchr/testify/assert" -) - -func Test_RTN17_RealtimeHostFallback(t *testing.T) { - t.Parallel() - t.Run("RTN17a: should always get primary host as pref. host", func(t *testing.T) { - clientOptions := ably.NewClientOptions() - realtimeHosts := ably.NewRealtimeHosts(clientOptions) - prefHost := realtimeHosts.GetPreferredHost() - assert.Equal(t, "realtime.ably.io", prefHost) - }) - - t.Run("RTN17a: should always get primary host as pref. host", func(t *testing.T) { - clientOptions := ably.NewClientOptions(ably.WithRealtimeHost("custom-realtime.ably.io")) - realtimeHosts := ably.NewRealtimeHosts(clientOptions) - prefHost := realtimeHosts.GetPreferredHost() - assert.Equal(t, "custom-realtime.ably.io", prefHost) - }) - - t.Run("RTN17c, RSC15g: should get fallback hosts in random order", func(t *testing.T) { - clientOptions := ably.NewClientOptions() - realtimeHosts := ably.NewRealtimeHosts(clientOptions) - // All expected hosts supposed to be tried upon - expectedHosts := []string{ - "realtime.ably.io", - "a.ably-realtime.com", - "b.ably-realtime.com", - "c.ably-realtime.com", - "d.ably-realtime.com", - "e.ably-realtime.com", - } - - // Get first preferred restHost - var actualHosts []string - prefHost := realtimeHosts.GetPreferredHost() - actualHosts = append(actualHosts, prefHost) - - // Get all fallback hosts in random order - actualHosts = append(actualHosts, realtimeHosts.GetAllRemainingFallbackHosts()...) - - assert.ElementsMatch(t, expectedHosts, actualHosts) - }) - - t.Run("RTN17c, RSC15g: should get all fallback hosts again, when visited hosts are cleared after reconnection", func(t *testing.T) { - clientOptions := ably.NewClientOptions() - realtimeHosts := ably.NewRealtimeHosts(clientOptions) - // All expected hosts supposed to be tried upon - expectedHosts := []string{ - "a.ably-realtime.com", - "b.ably-realtime.com", - "c.ably-realtime.com", - "d.ably-realtime.com", - "e.ably-realtime.com", - } - - // Get first preferred restHost - var actualHosts []string - realtimeHosts.GetPreferredHost() - - // Get some fallback hosts - realtimeHosts.NextFallbackHost() - realtimeHosts.NextFallbackHost() - - // Clear visited hosts, after reconnection - realtimeHosts.ResetVisitedFallbackHosts() - - // Get all fallback hosts in random order - actualHosts = append(actualHosts, realtimeHosts.GetAllRemainingFallbackHosts()...) - - assert.ElementsMatch(t, expectedHosts, actualHosts) - }) - - t.Run("RTN17c, RSC15g: should get all fallback hosts when preferred host is not requested", func(t *testing.T) { - clientOptions := ably.NewClientOptions() - realtimeHosts := ably.NewRealtimeHosts(clientOptions) - // All expected hosts supposed to be tried upon - expectedHosts := []string{ - "a.ably-realtime.com", - "b.ably-realtime.com", - "c.ably-realtime.com", - "d.ably-realtime.com", - "e.ably-realtime.com", - } - - var actualHosts []string - - // Get all fallback hosts in random order - actualHosts = append(actualHosts, realtimeHosts.GetAllRemainingFallbackHosts()...) - - assert.ElementsMatch(t, expectedHosts, actualHosts) - }) - - t.Run("should get remaining fallback hosts count", func(t *testing.T) { - clientOptions := ably.NewClientOptions() - realtimeHosts := ably.NewRealtimeHosts(clientOptions) - - // Get first preferred restHost - realtimeHosts.GetPreferredHost() - - assert.Equal(t, 5, realtimeHosts.FallbackHostsRemaining()) - // Get some fallback hosts - realtimeHosts.NextFallbackHost() - assert.Equal(t, 4, realtimeHosts.FallbackHostsRemaining()) - - realtimeHosts.NextFallbackHost() - assert.Equal(t, 3, realtimeHosts.FallbackHostsRemaining()) - - realtimeHosts.NextFallbackHost() - realtimeHosts.NextFallbackHost() - realtimeHosts.NextFallbackHost() - - assert.Equal(t, 0, realtimeHosts.FallbackHostsRemaining()) - }) -} diff --git a/ably/realtime_conn.go b/ably/realtime_conn.go index c2d73947..3abcab00 100644 --- a/ably/realtime_conn.go +++ b/ably/realtime_conn.go @@ -8,6 +8,8 @@ import ( "strconv" "sync" "time" + + "github.com/ably/ably-go/ably/internal/ablyutil" ) var ( @@ -77,7 +79,6 @@ type Connection struct { // after a reauthorization, to avoid re-reauthorizing. reauthorizing bool arg connArgs - hosts *realtimeHosts client *Realtime readLimit int64 @@ -113,7 +114,6 @@ func newConn(opts *clientOptions, auth *Auth, callbacks connCallbacks, client *R } auth.onExplicitAuthorize = c.onClientAuthorize c.queue = newMsgQueue(c) - c.hosts = newRealtimeHosts(c.opts) if !opts.NoConnect { c.setState(ConnectionStateConnecting, nil, 0) go func() { @@ -368,7 +368,6 @@ func (c *Connection) connectWithRetryLoop(arg connArgs) (result, error) { } func (c *Connection) connectWith(arg connArgs) (result, error) { - defer c.hosts.resetVisitedFallbackHosts() connectMode := c.getMode() c.mtx.Lock() @@ -388,8 +387,11 @@ func (c *Connection) connectWith(arg connArgs) (result, error) { } var conn conn - host := c.hosts.getPreferredHost() - for { + primaryHost := c.opts.getRealtimeHost() + fallbackHosts, _ := c.opts.getFallbackHosts() + // Always try primary host first and then fallback hosts for realtime conn + hosts := append([]string{primaryHost}, ablyutil.Shuffle(fallbackHosts)...) + for hostCounter, host := range hosts { u, err := url.Parse(c.opts.realtimeURL(host)) if err != nil { return nil, err @@ -407,7 +409,7 @@ func (c *Connection) connectWith(arg connArgs) (result, error) { // if err is nil, raw connection with server is successful conn, err = c.dial(proto, u) if err == nil { // success - if host != c.hosts.getPrimaryHost() { // RTN17e + if host != primaryHost { // RTN17e c.client.rest.setActiveRealtimeHost(host) } else if !empty(c.client.rest.activeRealtimeHost) { c.client.rest.setActiveRealtimeHost("") // reset to default @@ -415,11 +417,10 @@ func (c *Connection) connectWith(arg connArgs) (result, error) { break } resp := extractHttpResponseFromConn(c.conn) - if c.hosts.fallbackHostsRemaining() > 0 && canFallBack(err, resp) && c.opts.hasActiveInternetConnection() { // RTN17d, RTN17c - host = c.hosts.nextFallbackHost() - } else { - return nil, err + if hostCounter < len(hosts)-1 && canFallBack(err, resp) && c.opts.hasActiveInternetConnection() { // RTN17d, RTN17c + continue } + return nil, err } c.mtx.Lock() From 15ad3b66cb3cc513f4ca1417a6d8477a0b74c635 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Thu, 15 Aug 2024 00:56:35 +0530 Subject: [PATCH 18/29] Added type for websocketErr, implemented passing integration test for the same --- ably/realtime_client_integration_test.go | 23 +++++++++++++++++++++ ably/realtime_conn.go | 2 +- ably/websocket.go | 26 +++++++++++++++++------- 3 files changed, 43 insertions(+), 8 deletions(-) diff --git a/ably/realtime_client_integration_test.go b/ably/realtime_client_integration_test.go index 3cca4dd8..c8e33b27 100644 --- a/ably/realtime_client_integration_test.go +++ b/ably/realtime_client_integration_test.go @@ -266,6 +266,29 @@ func TestRealtime_RTN17_HostFallback(t *testing.T) { }) } +func TestRealtime_RTN17_Integration_HostFallback(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer server.Close() + serverURL, err := url.Parse(server.URL) + assert.NoError(t, err) + + app, realtime := ablytest.NewRealtime( + ably.WithAutoConnect(false), + ably.WithTLS(false), + ably.WithUseTokenAuth(true), + ably.WithFallbackHosts(ably.GetEnvFallbackHosts(ablytest.Environment)), + ably.WithRealtimeHost(serverURL.Host)) + + defer safeclose(t, ablytest.FullRealtimeCloser(realtime), app) + + err = ablytest.Wait(ablytest.ConnWaiter(realtime, realtime.Connect, ably.ConnectionEventConnected), nil) + if err != nil { + t.Fatalf("Error connecting host with error %v", err) + } +} + func checkUnique(ch chan string, typ string, n int) error { close(ch) uniq := make(map[string]struct{}, n) diff --git a/ably/realtime_conn.go b/ably/realtime_conn.go index 3abcab00..72323f74 100644 --- a/ably/realtime_conn.go +++ b/ably/realtime_conn.go @@ -416,7 +416,7 @@ func (c *Connection) connectWith(arg connArgs) (result, error) { } break } - resp := extractHttpResponseFromConn(c.conn) + resp := extractHttpResponseFromError(err) if hostCounter < len(hosts)-1 && canFallBack(err, resp) && c.opts.hasActiveInternetConnection() { // RTN17d, RTN17c continue } diff --git a/ably/websocket.go b/ably/websocket.go index 84d71ddd..0e8b7756 100644 --- a/ably/websocket.go +++ b/ably/websocket.go @@ -22,7 +22,21 @@ const ( type websocketConn struct { conn *websocket.Conn proto proto - resp *http.Response +} + +type websocketErr struct { + err error + resp *http.Response +} + +// websocketErr implements the builtin error interface. +func (e *websocketErr) Error() string { + return e.err.Error() +} + +// Unwrap implements the implicit interface that errors.Unwrap understands. +func (e *websocketErr) Unwrap() error { + return e.err } func (ws *websocketConn) Send(msg *protocolMessage) error { @@ -90,9 +104,8 @@ func dialWebsocket(proto string, u *url.URL, timeout time.Duration, agents map[s } // Starts a raw websocket connection with server conn, resp, err := dialWebsocketTimeout(u.String(), "https://"+u.Host, timeout, agents) - ws.resp = resp if err != nil { - return nil, err + return nil, &websocketErr{err: err, resp: resp} } ws.conn = conn return ws, nil @@ -126,11 +139,10 @@ func unwrapConn(c conn) conn { return unwrapConn(u.Unwrap()) } -func extractHttpResponseFromConn(c conn) *http.Response { - unwrappedConn := unwrapConn(c) - websocketConn, ok := unwrappedConn.(*websocketConn) +func extractHttpResponseFromError(err error) *http.Response { + wsErr, ok := err.(*websocketErr) if ok { - return websocketConn.resp + return wsErr.resp } return nil } From d240ee37dd3684698937fc3a70678c244403dd41 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Thu, 15 Aug 2024 12:39:34 +0530 Subject: [PATCH 19/29] Added integration test for host fallback timeout and server error --- ably/export_test.go | 12 +++++--- ably/options_test.go | 1 - ably/realtime_client_integration_test.go | 38 +++++++++++++++++++----- 3 files changed, 38 insertions(+), 13 deletions(-) diff --git a/ably/export_test.go b/ably/export_test.go index 11c001b7..75a03824 100644 --- a/ably/export_test.go +++ b/ably/export_test.go @@ -36,10 +36,6 @@ func (opts *clientOptions) RestURL() string { return opts.restURL() } -func (opts *clientOptions) RealtimeURL(realtimeHost string) string { - return opts.realtimeURL(realtimeHost) -} - func (c *REST) Post(ctx context.Context, path string, in, out interface{}) (*http.Response, error) { return c.post(ctx, path, in, out) } @@ -97,6 +93,10 @@ func (c *REST) GetCachedFallbackHost() string { return c.successFallbackHost.get() } +func (c *REST) ActiveRealtimeHost() string { + return c.activeRealtimeHost +} + func (c *RealtimeChannel) GetChannelSerial() string { c.mtx.Lock() defer c.mtx.Unlock() @@ -230,6 +230,10 @@ func (c *Connection) SetKey(key string) { c.key = key } +func (r *Realtime) Rest() *REST { + return r.rest +} + func (c *RealtimePresence) Members() map[string]*PresenceMessage { c.mtx.Lock() defer c.mtx.Unlock() diff --git a/ably/options_test.go b/ably/options_test.go index 1b0c588d..855076bd 100644 --- a/ably/options_test.go +++ b/ably/options_test.go @@ -38,7 +38,6 @@ func TestEnvFallbackHosts_RSC15i(t *testing.T) { } func TestInternetConnectionCheck_RTN17c(t *testing.T) { - t.Parallel() clientOptions := ably.NewClientOptions() assert.True(t, clientOptions.HasActiveInternetConnection()) } diff --git a/ably/realtime_client_integration_test.go b/ably/realtime_client_integration_test.go index c8e33b27..c2f99fb3 100644 --- a/ably/realtime_client_integration_test.go +++ b/ably/realtime_client_integration_test.go @@ -62,7 +62,6 @@ func TestRealtime_RSC7_AblyAgent(t *testing.T) { client, err := ably.NewRealtime( ably.WithEnvironment(ablytest.Environment), ably.WithTLS(false), - ably.WithFallbackHosts([]string{}), ably.WithToken("fake:token"), ably.WithUseTokenAuth(true), ably.WithRealtimeHost(serverURL.Host)) @@ -89,7 +88,6 @@ func TestRealtime_RSC7_AblyAgent(t *testing.T) { ably.WithEnvironment(ablytest.Environment), ably.WithTLS(false), ably.WithToken("fake:token"), - ably.WithFallbackHosts([]string{}), ably.WithUseTokenAuth(true), ably.WithRealtimeHost(serverURL.Host), ably.WithAgents(map[string]string{ @@ -119,7 +117,6 @@ func TestRealtime_RSC7_AblyAgent(t *testing.T) { ably.WithEnvironment(ablytest.Environment), ably.WithTLS(false), ably.WithToken("fake:token"), - ably.WithFallbackHosts([]string{}), ably.WithUseTokenAuth(true), ably.WithRealtimeHost(serverURL.Host), ably.WithAgents(map[string]string{ @@ -266,7 +263,7 @@ func TestRealtime_RTN17_HostFallback(t *testing.T) { }) } -func TestRealtime_RTN17_Integration_HostFallback(t *testing.T) { +func TestRealtime_RTN17_Integration_HostFallback_Internal_Server_Error(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusInternalServerError) })) @@ -278,15 +275,40 @@ func TestRealtime_RTN17_Integration_HostFallback(t *testing.T) { ably.WithAutoConnect(false), ably.WithTLS(false), ably.WithUseTokenAuth(true), - ably.WithFallbackHosts(ably.GetEnvFallbackHosts(ablytest.Environment)), + ably.WithFallbackHosts([]string{"sandbox-a-fallback.ably-realtime.com"}), ably.WithRealtimeHost(serverURL.Host)) defer safeclose(t, ablytest.FullRealtimeCloser(realtime), app) err = ablytest.Wait(ablytest.ConnWaiter(realtime, realtime.Connect, ably.ConnectionEventConnected), nil) - if err != nil { - t.Fatalf("Error connecting host with error %v", err) - } + assert.Nil(t, err) + + assert.Equal(t, "sandbox-a-fallback.ably-realtime.com", realtime.Rest().ActiveRealtimeHost()) +} + +func TestRealtime_RTN17_Integration_HostFallback_Timeout(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(3 * time.Second) + w.WriteHeader(http.StatusSwitchingProtocols) + })) + defer server.Close() + serverURL, err := url.Parse(server.URL) + assert.NoError(t, err) + + app, realtime := ablytest.NewRealtime( + ably.WithAutoConnect(false), + ably.WithTLS(false), + ably.WithUseTokenAuth(true), + ably.WithFallbackHosts([]string{"sandbox-a-fallback.ably-realtime.com"}), + ably.WithRealtimeRequestTimeout(2*time.Second), + ably.WithRealtimeHost(serverURL.Host)) + + defer safeclose(t, ablytest.FullRealtimeCloser(realtime), app) + + err = ablytest.Wait(ablytest.ConnWaiter(realtime, realtime.Connect, ably.ConnectionEventConnected), nil) + assert.Nil(t, err) + + assert.Equal(t, "sandbox-a-fallback.ably-realtime.com", realtime.Rest().ActiveRealtimeHost()) } func checkUnique(ch chan string, typ string, n int) error { From d36803953ca749eccbae54710f170cbfa473d889 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Thu, 15 Aug 2024 12:42:10 +0530 Subject: [PATCH 20/29] Reverted sandbox clientOptions changes, refactored opts param --- ablytest/sandbox.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/ablytest/sandbox.go b/ablytest/sandbox.go index 8cc995a1..b3a6a40b 100644 --- a/ablytest/sandbox.go +++ b/ablytest/sandbox.go @@ -231,6 +231,13 @@ func (app *Sandbox) Options(opts ...ably.ClientOption) []ably.ClientOption { Hijack(http.RoundTripper) http.RoundTripper } appHTTPClient := NewHTTPClient() + appOpts := []ably.ClientOption{ + ably.WithKey(app.Key()), + ably.WithEnvironment(app.Environment), + ably.WithUseBinaryProtocol(!NoBinaryProtocol), + ably.WithHTTPClient(appHTTPClient), + ably.WithLogLevel(DefaultLogLevel), + } // If opts want to record round trips inject the recording transport // via TransportHijacker interface. @@ -240,13 +247,6 @@ func (app *Sandbox) Options(opts ...ably.ClientOption) []ably.ClientOption { opts = append(opts, ably.WithHTTPClient(appHTTPClient)) } } - - appOpts := []ably.ClientOption{ - ably.WithKey(app.Key()), - ably.WithEnvironment(app.Environment), - ably.WithUseBinaryProtocol(!NoBinaryProtocol), - ably.WithLogLevel(DefaultLogLevel), - } appOpts = MergeOptions(appOpts, opts) return appOpts From b5b9f4b3b01018cfab322ad9277e061a1a18dd52 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Thu, 15 Aug 2024 18:22:08 +0530 Subject: [PATCH 21/29] Updated README, removed limitation for connection failure handling --- README.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/README.md b/README.md index 64247ebf..b56e1c92 100644 --- a/README.md +++ b/README.md @@ -450,8 +450,6 @@ See [jwt auth issue](https://github.com/ably/ably-go/issues/569) for more detail - Inband reauthentication is not supported; expiring tokens will trigger a disconnection and resume of a realtime connection. See [server initiated auth](https://github.com/ably/ably-go/issues/228) for more details. -- Realtime connection failure handling is partially implemented. See [host fallback](https://github.com/ably/ably-go/issues/225) for more details. - - Channel suspended state is partially implemented. See [suspended channel state](https://github.com/ably/ably-go/issues/568). - Realtime Ping function is not implemented. From 992a9e321ea988e5cf13ce0900b3834d04d23f33 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Thu, 22 Aug 2024 12:51:52 +0530 Subject: [PATCH 22/29] Using errors.Is instead of errors.As for checking dns error --- ably/error.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ably/error.go b/ably/error.go index 623422ce..b467d7b3 100644 --- a/ably/error.go +++ b/ably/error.go @@ -155,7 +155,7 @@ func isTimeoutOrDnsErr(err error) bool { } } var dnsErr *net.DNSError - return errors.As(err, &dnsErr) // RSC15l1 + return errors.Is(err, dnsErr) // RSC15l1 } func checkValidHTTPResponse(resp *http.Response) error { From a0242d4878fcec855c168289fc82d8692380f107 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Thu, 22 Aug 2024 12:52:33 +0530 Subject: [PATCH 23/29] Refactored realtime host fallback tests as per review comment --- ably/realtime_client_integration_test.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/ably/realtime_client_integration_test.go b/ably/realtime_client_integration_test.go index c2f99fb3..0f9b5c50 100644 --- a/ably/realtime_client_integration_test.go +++ b/ably/realtime_client_integration_test.go @@ -5,6 +5,7 @@ package ably_test import ( "context" + "errors" "fmt" "net" "net/http" @@ -19,6 +20,7 @@ import ( "github.com/ably/ably-go/ablytest" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestRealtime_RealtimeHost(t *testing.T) { @@ -147,20 +149,19 @@ func TestRealtime_RTN17_HostFallback(t *testing.T) { } setUpWithError := func(err error, opts ...ably.ClientOption) (visitedHosts []string) { - client, _ := ably.NewRealtime(append(opts, ably.WithAutoConnect(false), ably.WithKey("fake:key"), + client, err := ably.NewRealtime(append(opts, ably.WithAutoConnect(false), ably.WithKey("fake:key"), ably.WithDial(func(protocol string, u *url.URL, timeout time.Duration) (ably.Conn, error) { visitedHosts = append(visitedHosts, u.Hostname()) return nil, err }))...) + require.NoError(t, err) ablytest.Wait(ablytest.ConnWaiter(client, client.Connect, ably.ConnectionEventDisconnected), nil) return } - t.Run("RTN17a: First attempt should be on default host first", func(t *testing.T) { - visitedHosts := setUpWithError(fmt.Errorf("host url is wrong")) - expectedHost := "realtime.ably.io" - - assert.Equal(t, expectedHost, visitedHosts[0]) + t.Run("RTN17a: First attempt should be made on default primary host", func(t *testing.T) { + visitedHosts := setUpWithError(errors.New("host url is wrong")) + assert.Equal(t, "realtime.ably.io", visitedHosts[0]) }) t.Run("RTN17b: Fallback behaviour", func(t *testing.T) { @@ -180,7 +181,7 @@ func TestRealtime_RTN17_HostFallback(t *testing.T) { visitedHosts := setUpWithError(getTimeoutErr(), ably.WithRealtimeHost("custom-realtime.ably.io")) expectedHost := "custom-realtime.ably.io" - assert.Equal(t, 1, len(visitedHosts)) + require.Equal(t, 1, len(visitedHosts)) assert.Equal(t, expectedHost, visitedHosts[0]) }) @@ -281,7 +282,7 @@ func TestRealtime_RTN17_Integration_HostFallback_Internal_Server_Error(t *testin defer safeclose(t, ablytest.FullRealtimeCloser(realtime), app) err = ablytest.Wait(ablytest.ConnWaiter(realtime, realtime.Connect, ably.ConnectionEventConnected), nil) - assert.Nil(t, err) + assert.NoError(t, err) assert.Equal(t, "sandbox-a-fallback.ably-realtime.com", realtime.Rest().ActiveRealtimeHost()) } @@ -306,7 +307,7 @@ func TestRealtime_RTN17_Integration_HostFallback_Timeout(t *testing.T) { defer safeclose(t, ablytest.FullRealtimeCloser(realtime), app) err = ablytest.Wait(ablytest.ConnWaiter(realtime, realtime.Connect, ably.ConnectionEventConnected), nil) - assert.Nil(t, err) + assert.NoError(t, err) assert.Equal(t, "sandbox-a-fallback.ably-realtime.com", realtime.Rest().ActiveRealtimeHost()) } From f484445463658a23dfbaab3644a4f0dfb8409aa6 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Thu, 22 Aug 2024 12:53:08 +0530 Subject: [PATCH 24/29] Removed use of deprecated io/ioutil, replaced with io package instead --- ably/options.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ably/options.go b/ably/options.go index 213791a7..6fa4ba8a 100644 --- a/ably/options.go +++ b/ably/options.go @@ -4,7 +4,7 @@ import ( "context" "errors" "fmt" - "io/ioutil" + "io" "log" "net" "net/http" @@ -604,7 +604,7 @@ func (opts *clientOptions) hasActiveInternetConnection() bool { if err != nil { return false } - data, err := ioutil.ReadAll(res.Body) + data, err := io.ReadAll(res.Body) res.Body.Close() if err != nil { return false From a2021719522dacc8ccad5678081824fafe3eebd8 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Thu, 22 Aug 2024 13:52:58 +0530 Subject: [PATCH 25/29] Refactored implementation for realtime host fallback with proper error checking --- ably/realtime_client_integration_test.go | 4 ++-- ably/realtime_conn.go | 29 ++++++++++++++---------- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/ably/realtime_client_integration_test.go b/ably/realtime_client_integration_test.go index 0f9b5c50..db22ba67 100644 --- a/ably/realtime_client_integration_test.go +++ b/ably/realtime_client_integration_test.go @@ -148,11 +148,11 @@ func TestRealtime_RTN17_HostFallback(t *testing.T) { return &errTimeout{} } - setUpWithError := func(err error, opts ...ably.ClientOption) (visitedHosts []string) { + setUpWithError := func(customErr error, opts ...ably.ClientOption) (visitedHosts []string) { client, err := ably.NewRealtime(append(opts, ably.WithAutoConnect(false), ably.WithKey("fake:key"), ably.WithDial(func(protocol string, u *url.URL, timeout time.Duration) (ably.Conn, error) { visitedHosts = append(visitedHosts, u.Hostname()) - return nil, err + return nil, customErr }))...) require.NoError(t, err) ablytest.Wait(ablytest.ConnWaiter(client, client.Connect, ably.ConnectionEventDisconnected), nil) diff --git a/ably/realtime_conn.go b/ably/realtime_conn.go index 72323f74..03ba6d00 100644 --- a/ably/realtime_conn.go +++ b/ably/realtime_conn.go @@ -388,9 +388,14 @@ func (c *Connection) connectWith(arg connArgs) (result, error) { var conn conn primaryHost := c.opts.getRealtimeHost() - fallbackHosts, _ := c.opts.getFallbackHosts() + hosts := []string{primaryHost} + fallbackHosts, err := c.opts.getFallbackHosts() + if err != nil { + c.log().Warn(err) + } else { + hosts = append(hosts, ablyutil.Shuffle(fallbackHosts)...) + } // Always try primary host first and then fallback hosts for realtime conn - hosts := append([]string{primaryHost}, ablyutil.Shuffle(fallbackHosts)...) for hostCounter, host := range hosts { u, err := url.Parse(c.opts.realtimeURL(host)) if err != nil { @@ -408,19 +413,19 @@ func (c *Connection) connectWith(arg connArgs) (result, error) { } // if err is nil, raw connection with server is successful conn, err = c.dial(proto, u) - if err == nil { // success - if host != primaryHost { // RTN17e - c.client.rest.setActiveRealtimeHost(host) - } else if !empty(c.client.rest.activeRealtimeHost) { - c.client.rest.setActiveRealtimeHost("") // reset to default + if err != nil { + resp := extractHttpResponseFromError(err) + if hostCounter < len(hosts)-1 && canFallBack(err, resp) && c.opts.hasActiveInternetConnection() { // RTN17d, RTN17c + continue } - break + return nil, err } - resp := extractHttpResponseFromError(err) - if hostCounter < len(hosts)-1 && canFallBack(err, resp) && c.opts.hasActiveInternetConnection() { // RTN17d, RTN17c - continue + if host != primaryHost { // RTN17e + c.client.rest.setActiveRealtimeHost(host) + } else if !empty(c.client.rest.activeRealtimeHost) { + c.client.rest.setActiveRealtimeHost("") // reset to default } - return nil, err + break } c.mtx.Lock() From 049151ce57dbe8d70b98e0963dc2baed42efd2ed Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Thu, 22 Aug 2024 13:53:10 +0530 Subject: [PATCH 26/29] Revert "Using errors.Is instead of errors.As for checking dns error" This reverts commit 992a9e321ea988e5cf13ce0900b3834d04d23f33. --- ably/error.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ably/error.go b/ably/error.go index b467d7b3..623422ce 100644 --- a/ably/error.go +++ b/ably/error.go @@ -155,7 +155,7 @@ func isTimeoutOrDnsErr(err error) bool { } } var dnsErr *net.DNSError - return errors.Is(err, dnsErr) // RSC15l1 + return errors.As(err, &dnsErr) // RSC15l1 } func checkValidHTTPResponse(resp *http.Response) error { From 5254cae93f1bc4843905bdc61fb5638360509299 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Thu, 22 Aug 2024 18:05:57 +0530 Subject: [PATCH 27/29] Refactored realtime host tests and internet check code as per review comments --- ably/options.go | 2 +- ably/realtime_client_integration_test.go | 20 ++++++++++---------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/ably/options.go b/ably/options.go index 6fa4ba8a..b0865dec 100644 --- a/ably/options.go +++ b/ably/options.go @@ -604,8 +604,8 @@ func (opts *clientOptions) hasActiveInternetConnection() bool { if err != nil { return false } + defer res.Body.Close() data, err := io.ReadAll(res.Body) - res.Body.Close() if err != nil { return false } diff --git a/ably/realtime_client_integration_test.go b/ably/realtime_client_integration_test.go index db22ba67..872e749d 100644 --- a/ably/realtime_client_integration_test.go +++ b/ably/realtime_client_integration_test.go @@ -148,7 +148,7 @@ func TestRealtime_RTN17_HostFallback(t *testing.T) { return &errTimeout{} } - setUpWithError := func(customErr error, opts ...ably.ClientOption) (visitedHosts []string) { + initClientWithConnError := func(customErr error, opts ...ably.ClientOption) (visitedHosts []string) { client, err := ably.NewRealtime(append(opts, ably.WithAutoConnect(false), ably.WithKey("fake:key"), ably.WithDial(func(protocol string, u *url.URL, timeout time.Duration) (ably.Conn, error) { visitedHosts = append(visitedHosts, u.Hostname()) @@ -160,7 +160,7 @@ func TestRealtime_RTN17_HostFallback(t *testing.T) { } t.Run("RTN17a: First attempt should be made on default primary host", func(t *testing.T) { - visitedHosts := setUpWithError(errors.New("host url is wrong")) + visitedHosts := initClientWithConnError(errors.New("host url is wrong")) assert.Equal(t, "realtime.ably.io", visitedHosts[0]) }) @@ -168,7 +168,7 @@ func TestRealtime_RTN17_HostFallback(t *testing.T) { t.Parallel() t.Run("apply when default realtime endpoint is not overridden, port/tlsport not set", func(t *testing.T) { - visitedHosts := setUpWithError(getTimeoutErr()) + visitedHosts := initClientWithConnError(getTimeoutErr()) expectedPrimaryHost := "realtime.ably.io" expectedFallbackHosts := ably.DefaultFallbackHosts() @@ -178,7 +178,7 @@ func TestRealtime_RTN17_HostFallback(t *testing.T) { }) t.Run("does not apply when the custom realtime endpoint is used", func(t *testing.T) { - visitedHosts := setUpWithError(getTimeoutErr(), ably.WithRealtimeHost("custom-realtime.ably.io")) + visitedHosts := initClientWithConnError(getTimeoutErr(), ably.WithRealtimeHost("custom-realtime.ably.io")) expectedHost := "custom-realtime.ably.io" require.Equal(t, 1, len(visitedHosts)) @@ -187,7 +187,7 @@ func TestRealtime_RTN17_HostFallback(t *testing.T) { t.Run("apply when fallbacks are provided", func(t *testing.T) { fallbacks := []string{"fallback0", "fallback1", "fallback2"} - visitedHosts := setUpWithError(getTimeoutErr(), ably.WithFallbackHosts(fallbacks)) + visitedHosts := initClientWithConnError(getTimeoutErr(), ably.WithFallbackHosts(fallbacks)) expectedPrimaryHost := "realtime.ably.io" assert.Equal(t, 4, len(visitedHosts)) @@ -196,7 +196,7 @@ func TestRealtime_RTN17_HostFallback(t *testing.T) { }) t.Run("apply when fallbackHostUseDefault is true, even if env. or host is set", func(t *testing.T) { - visitedHosts := setUpWithError( + visitedHosts := initClientWithConnError( getTimeoutErr(), ably.WithFallbackHostsUseDefault(true), ably.WithEnvironment("custom"), @@ -215,7 +215,7 @@ func TestRealtime_RTN17_HostFallback(t *testing.T) { t.Parallel() const internetCheckUrl = "https://internet-up.ably-realtime.com/is-the-internet-up.txt" rec, optn := ablytest.NewHttpRecorder() - visitedHosts := setUpWithError(getDNSErr(), optn...) + visitedHosts := initClientWithConnError(getDNSErr(), optn...) assert.Equal(t, 6, len(visitedHosts)) // including primary host assert.Equal(t, 5, len(rec.Requests())) for _, request := range rec.Requests() { @@ -224,11 +224,11 @@ func TestRealtime_RTN17_HostFallback(t *testing.T) { }) t.Run("RTN17d: Check for compatible errors before attempting to reconnect to a fallback host", func(t *testing.T) { - visitedHosts := setUpWithError(fmt.Errorf("host url is wrong")) // non-dns or non-timeout error + visitedHosts := initClientWithConnError(fmt.Errorf("host url is wrong")) // non-dns or non-timeout error assert.Equal(t, 1, len(visitedHosts)) - visitedHosts = setUpWithError(getDNSErr()) + visitedHosts = initClientWithConnError(getDNSErr()) assert.Equal(t, 6, len(visitedHosts)) - visitedHosts = setUpWithError(getTimeoutErr()) + visitedHosts = initClientWithConnError(getTimeoutErr()) assert.Equal(t, 6, len(visitedHosts)) }) From 22d35c93e86ccad7745d23c1cf2936849b6cb3b0 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Fri, 23 Aug 2024 22:26:08 +0530 Subject: [PATCH 28/29] Updated code as per spec for checking active internet connection --- ably/options.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/ably/options.go b/ably/options.go index b0865dec..1bf26783 100644 --- a/ably/options.go +++ b/ably/options.go @@ -1,6 +1,7 @@ package ably import ( + "bytes" "context" "errors" "fmt" @@ -30,8 +31,9 @@ const ( TLSPort = 443 maxMessageSize = 65536 // 64kb, default value TO3l8 + // RTN17c internetCheckUrl = "https://internet-up.ably-realtime.com/is-the-internet-up.txt" - internetCheckOk = "yes\n" + internetCheckOk = "yes" ) var defaultOptions = clientOptions{ @@ -599,9 +601,10 @@ func (opts *clientOptions) idempotentRESTPublishing() bool { return opts.IdempotentRESTPublishing } +// RTN17c func (opts *clientOptions) hasActiveInternetConnection() bool { res, err := opts.httpclient().Get(internetCheckUrl) - if err != nil { + if err != nil || res.StatusCode != 200 { return false } defer res.Body.Close() @@ -609,7 +612,7 @@ func (opts *clientOptions) hasActiveInternetConnection() bool { if err != nil { return false } - return string(data) == internetCheckOk + return bytes.Contains(data, []byte(internetCheckOk)) } type ScopeParams struct { From 63cb1105a24a7c9834954700e744ad83d0d267bd Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Sat, 24 Aug 2024 21:48:36 +0530 Subject: [PATCH 29/29] Updated realtime test for fallback hosts, removed code for possible flakiness --- ably/export_test.go | 5 +++ ably/realtime_client_integration_test.go | 52 +++++++++++++++++++++--- 2 files changed, 51 insertions(+), 6 deletions(-) diff --git a/ably/export_test.go b/ably/export_test.go index ee1e6a1d..9c4482bc 100644 --- a/ably/export_test.go +++ b/ably/export_test.go @@ -280,6 +280,11 @@ type DurationFromMsecs = durationFromMsecs type ProtoErrorInfo = errorInfo type ProtoFlag = protoFlag type ProtocolMessage = protocolMessage +type WebsocketErr = websocketErr + +func (w *WebsocketErr) HttpResp() *http.Response { + return w.resp +} const ( DefaultCipherKeyLength = defaultCipherKeyLength diff --git a/ably/realtime_client_integration_test.go b/ably/realtime_client_integration_test.go index 872e749d..857a022e 100644 --- a/ably/realtime_client_integration_test.go +++ b/ably/realtime_client_integration_test.go @@ -272,11 +272,28 @@ func TestRealtime_RTN17_Integration_HostFallback_Internal_Server_Error(t *testin serverURL, err := url.Parse(server.URL) assert.NoError(t, err) + fallbackHost := "sandbox-a-fallback.ably-realtime.com" + connAttempts := 0 + app, realtime := ablytest.NewRealtime( ably.WithAutoConnect(false), ably.WithTLS(false), ably.WithUseTokenAuth(true), - ably.WithFallbackHosts([]string{"sandbox-a-fallback.ably-realtime.com"}), + ably.WithFallbackHosts([]string{fallbackHost}), + ably.WithDial(func(protocol string, u *url.URL, timeout time.Duration) (ably.Conn, error) { + connAttempts += 1 + conn, err := ably.DialWebsocket(protocol, u, timeout) + if connAttempts == 1 { + assert.Equal(t, serverURL.Host, u.Host) + var websocketErr *ably.WebsocketErr + assert.ErrorAs(t, err, &websocketErr) + assert.Equal(t, http.StatusInternalServerError, websocketErr.HttpResp().StatusCode) + } else { + assert.NoError(t, err) + assert.Equal(t, fallbackHost, u.Hostname()) + } + return conn, err + }), ably.WithRealtimeHost(serverURL.Host)) defer safeclose(t, ablytest.FullRealtimeCloser(realtime), app) @@ -284,24 +301,46 @@ func TestRealtime_RTN17_Integration_HostFallback_Internal_Server_Error(t *testin err = ablytest.Wait(ablytest.ConnWaiter(realtime, realtime.Connect, ably.ConnectionEventConnected), nil) assert.NoError(t, err) - assert.Equal(t, "sandbox-a-fallback.ably-realtime.com", realtime.Rest().ActiveRealtimeHost()) + assert.Equal(t, 2, connAttempts) + assert.Equal(t, fallbackHost, realtime.Rest().ActiveRealtimeHost()) } func TestRealtime_RTN17_Integration_HostFallback_Timeout(t *testing.T) { + timedOut := make(chan bool) server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - time.Sleep(3 * time.Second) + <-timedOut w.WriteHeader(http.StatusSwitchingProtocols) })) defer server.Close() serverURL, err := url.Parse(server.URL) assert.NoError(t, err) + fallbackHost := "sandbox-a-fallback.ably-realtime.com" + requestTimeout := 2 * time.Second + connAttempts := 0 + app, realtime := ablytest.NewRealtime( ably.WithAutoConnect(false), ably.WithTLS(false), ably.WithUseTokenAuth(true), - ably.WithFallbackHosts([]string{"sandbox-a-fallback.ably-realtime.com"}), - ably.WithRealtimeRequestTimeout(2*time.Second), + ably.WithFallbackHosts([]string{fallbackHost}), + ably.WithRealtimeRequestTimeout(requestTimeout), + ably.WithDial(func(protocol string, u *url.URL, timeout time.Duration) (ably.Conn, error) { + connAttempts += 1 + assert.Equal(t, requestTimeout, timeout) + conn, err := ably.DialWebsocket(protocol, u, timeout) + if connAttempts == 1 { + assert.Equal(t, serverURL.Host, u.Host) + var timeoutError net.Error + assert.ErrorAs(t, err, &timeoutError) + assert.True(t, timeoutError.Timeout()) + timedOut <- true + } else { + assert.NoError(t, err) + assert.Equal(t, fallbackHost, u.Hostname()) + } + return conn, err + }), ably.WithRealtimeHost(serverURL.Host)) defer safeclose(t, ablytest.FullRealtimeCloser(realtime), app) @@ -309,7 +348,8 @@ func TestRealtime_RTN17_Integration_HostFallback_Timeout(t *testing.T) { err = ablytest.Wait(ablytest.ConnWaiter(realtime, realtime.Connect, ably.ConnectionEventConnected), nil) assert.NoError(t, err) - assert.Equal(t, "sandbox-a-fallback.ably-realtime.com", realtime.Rest().ActiveRealtimeHost()) + assert.Equal(t, 2, connAttempts) + assert.Equal(t, fallbackHost, realtime.Rest().ActiveRealtimeHost()) } func checkUnique(ch chan string, typ string, n int) error {