Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ECO-4330] Feature - server initiated auth #661

Merged
merged 14 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -447,9 +447,6 @@ See [jwt auth issue](https://github.com/ably/ably-go/issues/569) for more detail

### Realtime API

- Inband reauthentication is not supported; expiring tokens will trigger a disconnection and resume of a realtime
connection. See [server initiated auth](https://github.com/ably/ably-go/issues/228) for more details.

- Channel suspended state is partially implemented. See [suspended channel state](https://github.com/ably/ably-go/issues/568).

- Realtime Ping function is not implemented.
Expand Down
18 changes: 18 additions & 0 deletions ably/ably_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,24 @@ func (rec *MessageRecorder) CheckIfSent(action ably.ProtoAction, times int) func
}
}

func (rec *MessageRecorder) CheckIfReceived(action ably.ProtoAction, times int) func() bool {
return func() bool {
counter := 0
for _, m := range rec.Received() {
if m.Action == action {
counter++
if counter == times {
return true
}
}
}
if times == 0 && times == counter {
sacOO7 marked this conversation as resolved.
Show resolved Hide resolved
return true
}
return false
}
}

func (rec *MessageRecorder) FindFirst(action ably.ProtoAction) *ably.ProtocolMessage {
for _, m := range rec.Sent() {
if m.Action == action {
Expand Down
9 changes: 6 additions & 3 deletions ably/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type Auth struct {

// onExplicitAuthorize is the callback that Realtime sets to reauthorize with the
// server when Authorize is explicitly called.
onExplicitAuthorize func(context.Context, *TokenDetails)
onExplicitAuthorize func(context.Context, *TokenDetails) error

serverTimeOffset time.Duration

Expand All @@ -92,7 +92,7 @@ type Auth struct {
func newAuth(client *REST) (*Auth, error) {
a := &Auth{
client: client,
onExplicitAuthorize: func(context.Context, *TokenDetails) {},
onExplicitAuthorize: func(context.Context, *TokenDetails) error { return nil },
}
method, err := detectAuthMethod(a.opts())
if err != nil {
Expand Down Expand Up @@ -313,7 +313,10 @@ func (a *Auth) Authorize(ctx context.Context, params *TokenParams, setOpts ...Au
if err != nil {
return nil, err
}
a.onExplicitAuthorize(ctx, token)
err = a.onExplicitAuthorize(ctx, token)
if err != nil {
return nil, err
}
return token, nil
}

Expand Down
5 changes: 5 additions & 0 deletions ably/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@ func (c *Connection) ConnectionStateTTL() time.Duration {
return c.connectionStateTTL()
}

func (r *Realtime) Logger() logger {
return r.log()
}

func NewInternalLogger(l Logger) logger {
return logger{l: l}
}
Expand Down Expand Up @@ -319,6 +323,7 @@ const (
ActionPresence = actionPresence
ActionMessage = actionMessage
ActionSync = actionSync
ActionAuth = actionAuth

FlagHasPresence = flagHasPresence
FlagHasBacklog = flagHasBacklog
Expand Down
3 changes: 3 additions & 0 deletions ably/proto_protocol_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ func (msg *protocolMessage) String() string {
case actionMessage:
return fmt.Sprintf("(action=%q, id=%q, messages=%v)", msg.Action,
msg.ConnectionID, msg.Messages)
case actionAuth:
return fmt.Sprintf("(action=%q, id=%q, auth=%v)", msg.Action,
msg.ConnectionID, msg.Auth)
default:
return fmt.Sprintf("%#v", msg)
}
Expand Down
95 changes: 77 additions & 18 deletions ably/realtime_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func newConn(opts *clientOptions, auth *Auth, callbacks connCallbacks, client *R
readLimit: maxMessageSize,
recover: opts.Recover,
}
auth.onExplicitAuthorize = c.onClientAuthorize
auth.onExplicitAuthorize = c.onExplicitAuthorize
c.queue = newMsgQueue(c)
if !opts.NoConnect {
c.setState(ConnectionStateConnecting, nil, 0)
Expand Down Expand Up @@ -780,7 +780,7 @@ func (c *Connection) eventloop() {
c.mtx.Unlock()
return
}
// RTN23a
// RTN23a, RTN15a
c.lockSetState(ConnectionStateDisconnected, err, 0)
c.mtx.Unlock()
arg := connArgs{
Expand Down Expand Up @@ -830,6 +830,7 @@ func (c *Connection) eventloop() {
c.mtx.Unlock()

c.failedConnSideEffects(msg.Error)
return
case actionConnected:
c.mtx.Lock()

Expand Down Expand Up @@ -887,33 +888,40 @@ func (c *Connection) eventloop() {
c.callbacks.onReconnected(failedResumeOrRecover)
}
c.queue.Flush()
case actionDisconnected:
if !isTokenError(msg.Error) {
// The spec doesn't say what to do in this case, so do nothing.
// Ably is supposed to then close the transport, which will
// trigger a transition to DISCONNECTED.
continue
}

if !c.auth.isTokenRenewable() {
case actionDisconnected: // RTN15h
if isTokenError(msg.Error) {
// RTN15h1
c.failedConnSideEffects(msg.Error)
if !c.auth.isTokenRenewable() {
c.failedConnSideEffects(msg.Error)
return
}
// RTN15h2, RTN22a
c.setState(ConnectionStateDisconnected, newErrorFromProto(msg.Error), 0)
c.reauthorize(connArgs{
lastActivityAt: lastActivityAt,
connDetails: connDetails,
})
return
}

// RTN15h2
c.reauthorize(connArgs{
// RTN15h3
c.setState(ConnectionStateDisconnected, newErrorFromProto(msg.Error), 0)
c.reconnect(connArgs{
lastActivityAt: lastActivityAt,
connDetails: connDetails,
})
return

case actionClosed:
c.mtx.Lock()
c.lockSetState(ConnectionStateClosed, nil, 0)
c.mtx.Unlock()
if c.conn != nil {
c.conn.Close()
}
case actionAuth: // RTN22
canceledCtx, cancel := context.WithCancel(context.Background())
cancel() // Cancel context to unblock current eventloop to receieve new messages
c.auth.Authorize(canceledCtx, c.auth.params)
default:
c.callbacks.onChannelMsg(msg)
}
Expand Down Expand Up @@ -952,8 +960,30 @@ func (c *Connection) reauthorize(arg connArgs) {
c.reconnect(arg)
}

func (c *Connection) onClientAuthorize(ctx context.Context, token *TokenDetails) {
switch c.State() {
func (c *Connection) onExplicitAuthorize(ctx context.Context, token *TokenDetails) error {
switch state := c.State(); state {
case ConnectionStateConnecting:
// RTC8b says: "all current connection attempts should be halted, and
// after obtaining a new token the library should immediately initiate a
// connection attempt using the new token". But the WebSocket library
// doesn't really allow us to halt the connection attempt. Instead, once
// the connection transitions out of CONNECTING (either to CONNECTED or
// to a failure state), we attempt to connect again, which will use
// the new token.
c.log().Info("client-requested authorization while CONNECTING. Will reconnect with new token.")
done := make(chan error)

c.internalEmitter.OnceAll(func(_ ConnectionStateChange) {
done <- c.onExplicitAuthorize(ctx, token)
})

select {
case <-ctx.Done():
return ctx.Err()
case err := <-done:
return err
}

case ConnectionStateConnected:
c.log().Verbosef("starting client-requested reauthorization with token: %+v", token)

Expand All @@ -974,9 +1004,38 @@ func (c *Connection) onClientAuthorize(ctx context.Context, token *TokenDetails)

select {
case <-ctx.Done():
case <-changes:
return ctx.Err()
case change := <-changes:
return change.Reason.unwrapNil()
}

case
ConnectionStateDisconnected,
ConnectionStateSuspended,
ConnectionStateFailed,
ConnectionStateClosed:
c.log().Infof("client-requested authorization while %s: connecting with new token", state)

done := make(chan error)
c.internalEmitter.OnceAll(func(change ConnectionStateChange) {
if change.Current == ConnectionStateConnecting {
done <- c.onExplicitAuthorize(ctx, token)
} else {
done <- change.Reason.unwrapNil()
}
})

c.Connect()

select {
case <-ctx.Done():
return ctx.Err()
case err := <-done:
return err
}
}

return nil
}

func (c *Connection) lockedReauthorizationFailed(err error) {
Expand Down
Loading
Loading