From 116c717b8f0dea4f43be63e1c9f47bd5634ae779 Mon Sep 17 00:00:00 2001 From: Alexander Emelin Date: Wed, 1 Jul 2020 22:23:49 +0300 Subject: [PATCH] fix Close and presence timer, prepare v0.9.1 (#130) --- changelog.md | 16 ++++++++++++ client.go | 23 +++++++++-------- client_test.go | 68 +++++++++++++++++++++++++++++++++++++++++++------- 3 files changed, 87 insertions(+), 20 deletions(-) diff --git a/changelog.md b/changelog.md index e29fa573..ba42adea 100644 --- a/changelog.md +++ b/changelog.md @@ -1,3 +1,19 @@ +v0.9.1 +====== + +* fix `Close` method – do not use error channel since this leads to deadlock anyway, just close in goroutine. +* fix presence timer scheduling + +``` +gorelease -base=v0.9.0 -version=v0.9.1 +github.com/centrifugal/centrifuge +--------------------------------- +Incompatible changes: +- (*Client).Close: changed from func(*Disconnect) chan error to func(*Disconnect) error + +v0.9.1 is a valid semantic version for this release. +``` + v0.9.0 ====== diff --git a/client.go b/client.go index 8aade530..f1e01725 100644 --- a/client.go +++ b/client.go @@ -234,17 +234,20 @@ func (c *Client) scheduleNextTimer() { c.stopTimer() var minEventTime int64 var nextTimerOp timerOp + var needTimer bool if c.nextExpire > 0 { nextTimerOp = timerOpExpire minEventTime = c.nextExpire + needTimer = true } - if c.nextPresence > 0 && c.nextPresence < minEventTime { + if c.nextPresence > 0 && (minEventTime == 0 || c.nextPresence < minEventTime) { nextTimerOp = timerOpPresence minEventTime = c.nextPresence + needTimer = true } - if minEventTime > 0 { + if needTimer { c.timerOp = nextTimerOp - afterDuration := time.Duration(minEventTime-time.Now().Unix()) * time.Second + afterDuration := time.Duration(minEventTime-time.Now().UnixNano()) * time.Nanosecond c.timer = time.AfterFunc(afterDuration, c.onTimerOp) } } @@ -260,13 +263,13 @@ func (c *Client) stopTimer() { func (c *Client) addPresenceUpdate() { config := c.node.Config() presenceInterval := config.ClientPresenceUpdateInterval - c.nextPresence = time.Now().Add(presenceInterval).Unix() + c.nextPresence = time.Now().Add(presenceInterval).UnixNano() c.scheduleNextTimer() } // Lock must be held outside. func (c *Client) addExpireUpdate(after time.Duration) { - c.nextExpire = time.Now().Add(after).Unix() + c.nextExpire = time.Now().Add(after).UnixNano() c.scheduleNextTimer() } @@ -529,14 +532,12 @@ func (c *Client) sendUnsub(ch string, resubscribe bool) error { // Close client connection with specific disconnect reason. // This method internally creates a new goroutine at moment to do // closing stuff. An extra goroutine is required to solve disconnect -// and presence callback ordering problems. If you need to wait for -// close result for some reason then read an error from returned channel. -func (c *Client) Close(disconnect *Disconnect) chan error { - errCh := make(chan error, 1) +// and presence callback ordering problems. +func (c *Client) Close(disconnect *Disconnect) error { go func() { - errCh <- c.close(disconnect) + _ = c.close(disconnect) }() - return errCh + return nil } func (c *Client) close(disconnect *Disconnect) error { diff --git a/client_test.go b/client_test.go index 356ecc66..65a55272 100644 --- a/client_test.go +++ b/client_test.go @@ -85,8 +85,8 @@ func TestClientClosedState(t *testing.T) { defer func() { _ = node.Shutdown(context.Background()) }() transport := newTestTransport() client, _ := newClient(context.Background(), node, transport) - errCh := client.Close(nil) - require.NoError(t, <-errCh) + err := client.close(nil) + require.NoError(t, err) require.True(t, client.closed) } @@ -108,12 +108,14 @@ func TestClientTimerSchedule(t *testing.T) { defer func() { _ = node.Shutdown(context.Background()) }() transport := newTestTransport() client, _ := newClient(context.Background(), node, transport) - client.nextExpire = time.Now().Unix() + 5 - client.nextPresence = time.Now().Unix() + 10 + client.mu.Lock() + defer client.mu.Unlock() + client.nextExpire = time.Now().Add(5 * time.Second).UnixNano() + client.nextPresence = time.Now().Add(10 * time.Second).UnixNano() client.scheduleNextTimer() require.NotNil(t, client.timer) require.Equal(t, timerOpExpire, client.timerOp) - client.nextPresence = time.Now().Unix() + 1 + client.nextPresence = time.Now().Add(time.Second).UnixNano() client.scheduleNextTimer() require.NotNil(t, client.timer) require.Equal(t, timerOpPresence, client.timerOp) @@ -1403,8 +1405,8 @@ func TestClientSend(t *testing.T) { err := client.Send([]byte(`{}`)) require.NoError(t, err) - errCh := client.Close(nil) - require.NoError(t, <-errCh) + err = client.close(nil) + require.NoError(t, err) err = client.Send([]byte(`{}`)) require.Error(t, err) @@ -1422,8 +1424,8 @@ func TestClientClose(t *testing.T) { connectClient(t, client) - errCh := client.Close(DisconnectShutdown) - require.NoError(t, <-errCh) + err := client.close(DisconnectShutdown) + require.NoError(t, err) require.True(t, transport.closed) require.Equal(t, DisconnectShutdown, transport.disconnect) } @@ -1843,3 +1845,51 @@ func BenchmarkUUID(b *testing.B) { } b.ReportAllocs() } + +func TestClientPresenceHandler(t *testing.T) { + node := nodeWithMemoryEngine() + defer func() { _ = node.Shutdown(context.Background()) }() + + config := node.Config() + config.ClientPresenceUpdateInterval = time.Millisecond + _ = node.Reload(config) + + transport := newTestTransport() + ctx := context.Background() + newCtx := SetCredentials(ctx, &Credentials{UserID: "42"}) + client, _ := newClient(newCtx, node, transport) + + done := make(chan struct{}) + closed := false + disconnected := make(chan struct{}) + numCalls := 0 + + client.On().Presence(func(event PresenceEvent) PresenceReply { + numCalls++ + if numCalls >= 50 && !closed { + close(done) + closed = true + client.Close(DisconnectForceNoReconnect) + } + return PresenceReply{} + }) + + client.On().Disconnect(func(event DisconnectEvent) DisconnectReply { + close(disconnected) + return DisconnectReply{} + }) + + connectClient(t, client) + client.scheduleOnConnectTimers() + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("alive handler not called") + } + select { + case <-disconnected: + case <-time.After(time.Second): + t.Fatal("disconnect handler not called") + } +}