Skip to content

Commit

Permalink
fix Close and presence timer, prepare v0.9.1 (#130)
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia authored Jul 1, 2020
1 parent 6105771 commit 116c717
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 20 deletions.
16 changes: 16 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
v0.9.1
======

* fix `Close` method – do not use error channel since this leads to deadlock anyway, just close in goroutine.
* fix presence timer scheduling

```
gorelease -base=v0.9.0 -version=v0.9.1
github.com/centrifugal/centrifuge
---------------------------------
Incompatible changes:
- (*Client).Close: changed from func(*Disconnect) chan error to func(*Disconnect) error
v0.9.1 is a valid semantic version for this release.
```

v0.9.0
======

Expand Down
23 changes: 12 additions & 11 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,17 +234,20 @@ func (c *Client) scheduleNextTimer() {
c.stopTimer()
var minEventTime int64
var nextTimerOp timerOp
var needTimer bool
if c.nextExpire > 0 {
nextTimerOp = timerOpExpire
minEventTime = c.nextExpire
needTimer = true
}
if c.nextPresence > 0 && c.nextPresence < minEventTime {
if c.nextPresence > 0 && (minEventTime == 0 || c.nextPresence < minEventTime) {
nextTimerOp = timerOpPresence
minEventTime = c.nextPresence
needTimer = true
}
if minEventTime > 0 {
if needTimer {
c.timerOp = nextTimerOp
afterDuration := time.Duration(minEventTime-time.Now().Unix()) * time.Second
afterDuration := time.Duration(minEventTime-time.Now().UnixNano()) * time.Nanosecond
c.timer = time.AfterFunc(afterDuration, c.onTimerOp)
}
}
Expand All @@ -260,13 +263,13 @@ func (c *Client) stopTimer() {
func (c *Client) addPresenceUpdate() {
config := c.node.Config()
presenceInterval := config.ClientPresenceUpdateInterval
c.nextPresence = time.Now().Add(presenceInterval).Unix()
c.nextPresence = time.Now().Add(presenceInterval).UnixNano()
c.scheduleNextTimer()
}

// Lock must be held outside.
func (c *Client) addExpireUpdate(after time.Duration) {
c.nextExpire = time.Now().Add(after).Unix()
c.nextExpire = time.Now().Add(after).UnixNano()
c.scheduleNextTimer()
}

Expand Down Expand Up @@ -529,14 +532,12 @@ func (c *Client) sendUnsub(ch string, resubscribe bool) error {
// Close client connection with specific disconnect reason.
// This method internally creates a new goroutine at moment to do
// closing stuff. An extra goroutine is required to solve disconnect
// and presence callback ordering problems. If you need to wait for
// close result for some reason then read an error from returned channel.
func (c *Client) Close(disconnect *Disconnect) chan error {
errCh := make(chan error, 1)
// and presence callback ordering problems.
func (c *Client) Close(disconnect *Disconnect) error {
go func() {
errCh <- c.close(disconnect)
_ = c.close(disconnect)
}()
return errCh
return nil
}

func (c *Client) close(disconnect *Disconnect) error {
Expand Down
68 changes: 59 additions & 9 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ func TestClientClosedState(t *testing.T) {
defer func() { _ = node.Shutdown(context.Background()) }()
transport := newTestTransport()
client, _ := newClient(context.Background(), node, transport)
errCh := client.Close(nil)
require.NoError(t, <-errCh)
err := client.close(nil)
require.NoError(t, err)
require.True(t, client.closed)
}

Expand All @@ -108,12 +108,14 @@ func TestClientTimerSchedule(t *testing.T) {
defer func() { _ = node.Shutdown(context.Background()) }()
transport := newTestTransport()
client, _ := newClient(context.Background(), node, transport)
client.nextExpire = time.Now().Unix() + 5
client.nextPresence = time.Now().Unix() + 10
client.mu.Lock()
defer client.mu.Unlock()
client.nextExpire = time.Now().Add(5 * time.Second).UnixNano()
client.nextPresence = time.Now().Add(10 * time.Second).UnixNano()
client.scheduleNextTimer()
require.NotNil(t, client.timer)
require.Equal(t, timerOpExpire, client.timerOp)
client.nextPresence = time.Now().Unix() + 1
client.nextPresence = time.Now().Add(time.Second).UnixNano()
client.scheduleNextTimer()
require.NotNil(t, client.timer)
require.Equal(t, timerOpPresence, client.timerOp)
Expand Down Expand Up @@ -1403,8 +1405,8 @@ func TestClientSend(t *testing.T) {
err := client.Send([]byte(`{}`))
require.NoError(t, err)

errCh := client.Close(nil)
require.NoError(t, <-errCh)
err = client.close(nil)
require.NoError(t, err)

err = client.Send([]byte(`{}`))
require.Error(t, err)
Expand All @@ -1422,8 +1424,8 @@ func TestClientClose(t *testing.T) {

connectClient(t, client)

errCh := client.Close(DisconnectShutdown)
require.NoError(t, <-errCh)
err := client.close(DisconnectShutdown)
require.NoError(t, err)
require.True(t, transport.closed)
require.Equal(t, DisconnectShutdown, transport.disconnect)
}
Expand Down Expand Up @@ -1843,3 +1845,51 @@ func BenchmarkUUID(b *testing.B) {
}
b.ReportAllocs()
}

func TestClientPresenceHandler(t *testing.T) {
node := nodeWithMemoryEngine()
defer func() { _ = node.Shutdown(context.Background()) }()

config := node.Config()
config.ClientPresenceUpdateInterval = time.Millisecond
_ = node.Reload(config)

transport := newTestTransport()
ctx := context.Background()
newCtx := SetCredentials(ctx, &Credentials{UserID: "42"})
client, _ := newClient(newCtx, node, transport)

done := make(chan struct{})
closed := false
disconnected := make(chan struct{})
numCalls := 0

client.On().Presence(func(event PresenceEvent) PresenceReply {
numCalls++
if numCalls >= 50 && !closed {
close(done)
closed = true
client.Close(DisconnectForceNoReconnect)
}
return PresenceReply{}
})

client.On().Disconnect(func(event DisconnectEvent) DisconnectReply {
close(disconnected)
return DisconnectReply{}
})

connectClient(t, client)
client.scheduleOnConnectTimers()

select {
case <-done:
case <-time.After(time.Second):
t.Fatal("alive handler not called")
}
select {
case <-disconnected:
case <-time.After(time.Second):
t.Fatal("disconnect handler not called")
}
}

0 comments on commit 116c717

Please sign in to comment.