Skip to content

Commit

Permalink
Updated implementation for realtime_conn as per RTN15h, updated tests…
Browse files Browse the repository at this point in the history
… for the same
  • Loading branch information
sacOO7 committed Aug 23, 2024
1 parent b27c6ae commit 2b6d28e
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 18 deletions.
30 changes: 17 additions & 13 deletions ably/realtime_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,7 @@ func (c *Connection) eventloop() {
c.mtx.Unlock()

c.failedConnSideEffects(msg.Error)
return
case actionConnected:
c.mtx.Lock()

Expand Down Expand Up @@ -862,26 +863,29 @@ func (c *Connection) eventloop() {
c.callbacks.onReconnected(failedResumeOrRecover)
}
c.queue.Flush()
case actionDisconnected:
if !isTokenError(msg.Error) {
// The spec doesn't say what to do in this case, so do nothing.
// Ably is supposed to then close the transport, which will
// trigger a transition to DISCONNECTED.
continue
}

if !c.auth.isTokenRenewable() {
case actionDisconnected: // RTN15h
if isTokenError(msg.Error) {
// RTN15h1
c.failedConnSideEffects(msg.Error)
if !c.auth.isTokenRenewable() {
c.failedConnSideEffects(msg.Error)
return
}
// RTN15h2, RTN22a
c.setState(ConnectionStateConnecting, newErrorFromProto(msg.Error), 0)
c.reauthorize(connArgs{
lastActivityAt: lastActivityAt,
connDetails: connDetails,
})
return
}

// RTN15h2, RTN22a
c.reauthorize(connArgs{
// RTN15h3
c.setState(ConnectionStateConnecting, newErrorFromProto(msg.Error), 0)
c.reconnect(connArgs{
lastActivityAt: lastActivityAt,
connDetails: connDetails,
})
return

case actionClosed:
c.mtx.Lock()
c.lockSetState(ConnectionStateClosed, nil, 0)
Expand Down
92 changes: 87 additions & 5 deletions ably/realtime_conn_spec_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1589,9 +1589,10 @@ func TestRealtimeConn_RTN15h2_ReauthWithBadToken(t *testing.T) {
},
}

// No state change expected before a reauthorization and reconnection
// attempt.
ablytest.Instantly.NoRecv(t, nil, stateChanges, t.Fatalf)
// Connecting state expected before a reauthorization and reconnection attempt.
var stateChange ably.ConnectionStateChange
ablytest.Instantly.Recv(t, &stateChange, stateChanges, t.Fatalf)
assert.Equal(t, ably.ConnectionStateConnecting, stateChange.Current)

// The DISCONNECTED causes a reauth, and dial again with the new
// token.
Expand Down Expand Up @@ -1669,6 +1670,12 @@ func TestRealtimeConn_RTN15h2_Success(t *testing.T) {
},
}

// Connecting state expected before a reauthorization and reconnection attempt.
var stateChange ably.ConnectionStateChange
ablytest.Instantly.Recv(t, &stateChange, stateChanges, t.Fatalf)
assert.Equal(t, ably.ConnectionStateConnecting, stateChange.Current)
assert.Equal(t, ably.ConnectionStateConnecting, c.Connection.State())

// The DISCONNECTED causes a reauth, and dial again with the new
// token.
var dialURL *url.URL
Expand All @@ -1684,12 +1691,87 @@ func TestRealtimeConn_RTN15h2_Success(t *testing.T) {
ConnectionDetails: &ably.ConnectionDetails{},
}

// Expect a UPDATED event.
// Expect a CONNECTED event from previous CONNECTING state

var change ably.ConnectionStateChange
ablytest.Instantly.Recv(t, &change, stateChanges, t.Fatalf)

assert.Equal(t, ably.ConnectionEventConnected, change.Event,
"expected UPDATED event; got %v", change)

// Expect no further events.break
ablytest.Instantly.NoRecv(t, nil, stateChanges, t.Fatalf)
}

func TestRealtimeConn_RTN15h3_Success(t *testing.T) {
in := make(chan *ably.ProtocolMessage, 1)
out := make(chan *ably.ProtocolMessage, 16)
dials := make(chan *url.URL, 1)

c, _ := ably.NewRealtime(
ably.WithToken("fake:token"),
ably.WithAutoConnect(false),
ably.WithAuthCallback(func(context.Context, ably.TokenParams) (ably.Tokener, error) {
return ably.TokenString("good:token"), nil
}),
ably.WithDial(func(proto string, u *url.URL, timeout time.Duration) (ably.Conn, error) {
dials <- u
return MessagePipe(in, out)(proto, u, timeout)
}))

in <- &ably.ProtocolMessage{
Action: ably.ActionConnected,
ConnectionID: "connection-id",
ConnectionDetails: &ably.ConnectionDetails{},
}

err := ablytest.Wait(ablytest.ConnWaiter(c, c.Connect, ably.ConnectionEventConnected), nil)
assert.NoError(t, err)

ablytest.Instantly.Recv(t, nil, dials, t.Fatalf)

stateChanges := make(chan ably.ConnectionStateChange, 1)

off := c.Connection.OnAll(func(change ably.ConnectionStateChange) {
stateChanges <- change
})
defer off()

in <- &ably.ProtocolMessage{
Action: ably.ActionDisconnected,
Error: &ably.ProtoErrorInfo{
StatusCode: 506,
Code: 50600,
Message: "server error",
},
}

// Connecting state expected before reconnection attempt.
var stateChange ably.ConnectionStateChange
ablytest.Instantly.Recv(t, &stateChange, stateChanges, t.Fatalf)
assert.Equal(t, ably.ConnectionStateConnecting, stateChange.Current)
assert.Equal(t, ably.ConnectionStateConnecting, c.Connection.State())

// The DISCONNECTED causes a reauth, and dial again with the existing token
var dialURL *url.URL
ablytest.Instantly.Recv(t, &dialURL, dials, t.Fatalf)

assert.Equal(t, "fake:token", dialURL.Query().Get("access_token"),
"expected fake:token; got %q", dialURL.Query().Get("access_token"))

// Simulate a successful reconnection.
in <- &ably.ProtocolMessage{
Action: ably.ActionConnected,
ConnectionID: "new-connection-id",
ConnectionDetails: &ably.ConnectionDetails{},
}

// Expect a CONNECTED event from previous CONNECTING state

var change ably.ConnectionStateChange
ablytest.Instantly.Recv(t, &change, stateChanges, t.Fatalf)

assert.Equal(t, ably.ConnectionEventUpdate, change.Event,
assert.Equal(t, ably.ConnectionEventConnected, change.Event,
"expected UPDATED event; got %v", change)

// Expect no further events.break
Expand Down

0 comments on commit 2b6d28e

Please sign in to comment.