Skip to content

Commit

Permalink
Add seek logic for reader (apache#356)
Browse files Browse the repository at this point in the history
Signed-off-by: xiaolong.ran <[email protected]>

### Motivation


Follow apache#222 and add the seek logic for reader 

### Modifications

- Add `seek by msgID` interface
- Add `seek by time` interface
- Add test case

### Verifying this change

- [x] Make sure that the change passes the CI checks.
  • Loading branch information
wolfstudy authored Aug 24, 2020
1 parent a7e7239 commit f9b3c0f
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 10 deletions.
8 changes: 4 additions & 4 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
if msgID.entryID != noMessageEntry {
pc.startMessageID = msgID

err = pc.requestSeek(msgID)
err = pc.requestSeek(msgID.messageID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -276,7 +276,7 @@ func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest)
req.msgID, req.err = pc.requestGetLastMessageID()
}

func (pc *partitionConsumer) requestGetLastMessageID() (messageID, error) {
func (pc *partitionConsumer) requestGetLastMessageID() (trackingMessageID, error) {
requestID := pc.client.rpcClient.NewRequestID()
cmdGetLastMessageID := &pb.CommandGetLastMessageId{
RequestId: proto.Uint64(requestID),
Expand All @@ -286,7 +286,7 @@ func (pc *partitionConsumer) requestGetLastMessageID() (messageID, error) {
pb.BaseCommand_GET_LAST_MESSAGE_ID, cmdGetLastMessageID)
if err != nil {
pc.log.WithError(err).Error("Failed to get last message id")
return messageID{}, err
return trackingMessageID{}, err
}
id := res.Response.GetLastMessageIdResponse.GetLastMessageId()
return convertToMessageID(id), nil
Expand Down Expand Up @@ -365,7 +365,7 @@ func (pc *partitionConsumer) Seek(msgID trackingMessageID) error {

func (pc *partitionConsumer) internalSeek(seek *seekRequest) {
defer close(seek.doneCh)
seek.err = pc.requestSeek(seek.msgID)
seek.err = pc.requestSeek(seek.msgID.messageID)
}

func (pc *partitionConsumer) requestSeek(msgID messageID) error {
Expand Down
2 changes: 1 addition & 1 deletion pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,8 +665,8 @@ func (c *connection) handleAuthChallenge(authChallenge *pb.CommandAuthChallenge)
}

func (c *connection) handleCloseConsumer(closeConsumer *pb.CommandCloseConsumer) {
c.log.Infof("Broker notification of Closed consumer: %d", closeConsumer.GetConsumerId())
consumerID := closeConsumer.GetConsumerId()
c.log.Infof("Broker notification of Closed consumer: %d", consumerID)

c.Lock()
defer c.Unlock()
Expand Down
22 changes: 21 additions & 1 deletion pulsar/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package pulsar

import "context"
import (
"context"
"time"
)

// ReaderMessage package Reader and Message as a struct to use
type ReaderMessage struct {
Expand Down Expand Up @@ -88,4 +91,21 @@ type Reader interface {

// Close the reader and stop the broker to push more messages
Close()

// Reset the subscription associated with this reader to a specific message id.
// The message id can either be a specific message or represent the first or last messages in the topic.
//
// Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
// seek() on the individual partitions.
Seek(MessageID) error

// Reset the subscription associated with this reader to a specific message publish time.
//
// Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on
// the individual partitions.
//
// @param timestamp
// the message publish time where to reposition the subscription
//
SeekByTime(time time.Time) error
}
38 changes: 38 additions & 0 deletions pulsar/reader_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package pulsar
import (
"context"
"fmt"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand All @@ -45,6 +46,7 @@ var (
)

type reader struct {
sync.Mutex
pc *partitionConsumer
messageCh chan ConsumerMessage
lastMessageInBroker trackingMessageID
Expand Down Expand Up @@ -187,3 +189,39 @@ func (r *reader) Close() {
r.pc.Close()
readersClosed.Inc()
}

func (r *reader) messageID(msgID MessageID) (trackingMessageID, bool) {
mid, ok := toTrackingMessageID(msgID)
if !ok {
r.log.Warnf("invalid message id type %T", msgID)
return trackingMessageID{}, false
}

partition := int(mid.partitionIdx)
// did we receive a valid partition index?
if partition < 0 {
r.log.Warnf("invalid partition index %d expected", partition)
return trackingMessageID{}, false
}

return mid, true
}

func (r *reader) Seek(msgID MessageID) error {
r.Lock()
defer r.Unlock()

mid, ok := r.messageID(msgID)
if !ok {
return nil
}

return r.pc.Seek(mid)
}

func (r *reader) SeekByTime(time time.Time) error {
r.Lock()
defer r.Unlock()

return r.pc.SeekByTime(time)
}
63 changes: 59 additions & 4 deletions pulsar/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,65 @@ func TestReaderOnSpecificMessageWithCustomMessageID(t *testing.T) {
}
}

func TestReaderSeek(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()

topicName := newTopicName()
ctx := context.Background()

producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
})
assert.Nil(t, err)
defer producer.Close()

reader, err := client.CreateReader(ReaderOptions{
Topic: topicName,
StartMessageID: EarliestMessageID(),
})
assert.Nil(t, err)
defer reader.Close()

const N = 10
var seekID MessageID
for i := 0; i < N; i++ {
id, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
})
assert.Nil(t, err)

if i == 4 {
seekID = id
}
}
err = producer.Flush()
assert.NoError(t, err)

for i := 0; i < N; i++ {
msg, err := reader.Next(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
}

err = reader.Seek(seekID)
assert.Nil(t, err)

readerOfSeek, err := client.CreateReader(ReaderOptions{
Topic: topicName,
StartMessageID: seekID,
StartMessageIDInclusive: true,
})
assert.Nil(t, err)

msg, err := readerOfSeek.Next(ctx)
assert.Nil(t, err)
assert.Equal(t, "hello-4", string(msg.Payload()))
}

func TestReaderLatestInclusiveHasNext(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
Expand Down Expand Up @@ -498,14 +557,10 @@ func TestReaderLatestInclusiveHasNext(t *testing.T) {
assert.Nil(t, err)
defer reader.Close()

var msgID MessageID
if reader.HasNext() {
msg, err := reader.Next(context.Background())
assert.NoError(t, err)

assert.Equal(t, []byte("hello-9"), msg.Payload())
msgID = msg.ID()
}

assert.Equal(t, lastMsgID.Serialize(), msgID.Serialize())
}

0 comments on commit f9b3c0f

Please sign in to comment.