Skip to content

Commit

Permalink
adding tests
Browse files Browse the repository at this point in the history
Signed-off-by: Gabriele Santomaggio <[email protected]>
  • Loading branch information
Gsantomaggio committed Dec 3, 2024
1 parent b1001e1 commit fb0a6c9
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 42 deletions.
2 changes: 1 addition & 1 deletion pkg/stream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ func (c *Client) DeclarePublisher(streamName string, options *ProducerOptions) (
}
res := c.internalDeclarePublisher(streamName, producer)
if res.Err == nil {
producer.processMessages()
producer.processSendingMessages()
//producer.startPublishTask()
producer.startUnconfirmedMessagesTimeOutTask()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/stream/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (coordinator *Coordinator) NewProducer(
unConfirmedMessages: map[int64]*ConfirmationStatus{},
status: open,
messageSequenceCh: make(chan messageSequence, size),
dynamicChannel: make(chan message.StreamMessage, adativeSize),
dynamicSendCh: make(chan message.StreamMessage, adativeSize),
pendingMessages: pendingMessagesSequence{
messages: make([]*messageSequence, 0),
size: initBufferPublishSize,
Expand Down
24 changes: 10 additions & 14 deletions pkg/stream/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ type Producer struct {
messageSequenceCh chan messageSequence
pendingMessages pendingMessagesSequence

dynamicChannel chan message.StreamMessage
dynamicSendCh chan message.StreamMessage
}

type FilterValue func(message message.StreamMessage) string
Expand Down Expand Up @@ -319,7 +319,7 @@ func (producer *Producer) startUnconfirmedMessagesTimeOutTask() {
}()

}
func (producer *Producer) processMessages() {
func (producer *Producer) processSendingMessages() {

// Define a channel to signal the batch of messages
// the channel doesn't have a buffer. It is needed to signal
Expand Down Expand Up @@ -369,14 +369,14 @@ func (producer *Producer) processMessages() {
/// accumulate the messages in a buffer
go func() {
waitGroup.Done()
for msg := range producer.dynamicChannel {
for msg := range producer.dynamicSendCh {
mutex.Lock()
batchMessages = append(batchMessages, msg)
mutex.Unlock()
chSignal <- struct{}{} // signal to send the messages
}
// close the local signal channel as soon the dynamic channel is closed
// producer.dynamicChannel is closed by the Close() function
// producer.dynamicSendCh is closed by the Close() function
close(chSignal)
}()

Expand Down Expand Up @@ -427,7 +427,7 @@ func (producer *Producer) Send(streamMessage message.StreamMessage) error {
if producer.getStatus() == closed {
return fmt.Errorf("can't sent message. The Producer id: %d closed", producer.id)
}
producer.dynamicChannel <- streamMessage
producer.dynamicSendCh <- streamMessage
return nil
}

Expand All @@ -452,8 +452,7 @@ func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) (*Bat
var messagesSequence = make([]*messageSequence, 0)
result := &BatchSendResult{}
totalBufferToSend := 0
var messagesToRemove = make([]*messageSequence, 0)
for i, batchMessage := range batchMessages {
for _, batchMessage := range batchMessages {
messageBytes, err := batchMessage.MarshalBinary()
if err != nil {
return result, err
Expand All @@ -468,7 +467,6 @@ func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) (*Bat
// When a single message is too large, the producer sends back a confirmation
// with the error FrameTooLarge
if len(messageBytes) > producer.options.client.tuneState.requestedMaxFrameSize {
messagesToRemove = append(messagesToRemove, messagesSequence[:i]...)
if producer.publishConfirm != nil {
unConfirmedMessage := &ConfirmationStatus{
inserted: time.Now(),
Expand All @@ -481,6 +479,7 @@ func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) (*Bat
}
producer.publishConfirm <- []*ConfirmationStatus{unConfirmedMessage}
}
continue
}

totalBufferToSend += len(messageBytes)
Expand All @@ -507,11 +506,8 @@ func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) (*Bat
})
}

// Here we remove the messages that are too large
// the messagesToRemove are sent back via ConfirmationStatus
//with confirmation status to false
for i := range messagesToRemove {
messagesSequence = append(messagesSequence[:i], messagesSequence[i+1:]...)
if messagesSequence == nil || len(messagesSequence) == 0 {

Check failure on line 509 in pkg/stream/producer.go

View workflow job for this annotation

GitHub Actions / test-win32 (1.22)

should omit nil check; len() for []*github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream.messageSequence is defined as zero (S1009)

Check failure on line 509 in pkg/stream/producer.go

View workflow job for this annotation

GitHub Actions / test (1.22)

should omit nil check; len() for []*github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream.messageSequence is defined as zero (S1009)
return result, nil
}

producer.addUnConfirmedSequences(messagesSequence, producer.GetID())
Expand Down Expand Up @@ -716,7 +712,7 @@ func (producer *Producer) Close() error {
}

close(producer.messageSequenceCh)
close(producer.dynamicChannel)
close(producer.dynamicSendCh)
return nil
}

Expand Down
179 changes: 153 additions & 26 deletions pkg/stream/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,10 +381,6 @@ var _ = Describe("Streaming Producers", func() {
}, 5*time.Second).Should(Equal(int32(2)),
"confirm should receive same messages Send by producer")

//By("Max frame Error")
//s := make([]byte, 1148576)
//Expect(producer.Send(amqp.NewMessage(s))).To(HaveOccurred())
//Expect(producer.lenUnConfirmed()).To(Equal(0))
Expect(producer.Close()).NotTo(HaveOccurred())

producer, err = testEnvironment.NewProducer(testProducerStream,
Expand Down Expand Up @@ -446,42 +442,173 @@ var _ = Describe("Streaming Producers", func() {
Expect(producer.Close()).NotTo(HaveOccurred())
})

It("Smart Send Send after BatchPublishingDelay", func() {
// this test is need to test "Send after BatchPublishingDelay"
// and the time check
producer, err := testEnvironment.NewProducer(testProducerStream,
NewProducerOptions().SetBatchPublishingDelay(50))
It("BatchSend should not a send a big message with sent to Zero", func() {
// 1.5 Milestone
// the batch send should not send a big message
// The message should be sed back to the client with an error
// FrameTooLarge and not confirmed
producer, err := testEnvironment.NewProducer(testProducerStream, nil)
Expect(err).NotTo(HaveOccurred())
var messagesReceived int32
var notConfirmedTooLarge int32
chConfirm := producer.NotifyPublishConfirmation()
go func(ch ChannelPublishConfirm) {
for ids := range ch {
atomic.AddInt32(&messagesReceived, int32(len(ids)))
for _, conf := range ids {
if !conf.IsConfirmed() {
Expect(conf.GetError()).To(Equal(FrameTooLarge))
atomic.AddInt32(&notConfirmedTooLarge, 1)
}
}
}
}(chConfirm)
result, err := producer.BatchSend([]message.StreamMessage{amqp.NewMessage(make([]byte, 1148001))})
Expect(err).NotTo(HaveOccurred())
Expect(result.TotalSent).To(Equal(0))
Expect(result.TotalFrames).To(Equal(0))
Eventually(func() int32 {
return atomic.LoadInt32(&notConfirmedTooLarge)
}).Should(Equal(int32(1)))
Expect(producer.Close()).NotTo(HaveOccurred())
})

for z := 0; z < 5; z++ {
s := make([]byte, 50)
err = producer.Send(amqp.NewMessage(s))
Expect(err).NotTo(HaveOccurred())
time.Sleep(60 * time.Millisecond)
It("Send should not a send a big message with sent to Zero", Focus, func() {
// 1.5 Milestone
// the Send() method should not send a big message
// The message should be sed back to the client with an error
// FrameTooLarge and not confirmed
producer, err := testEnvironment.NewProducer(testProducerStream, nil)
Expect(err).NotTo(HaveOccurred())
var notConfirmedTooLarge int32
chConfirm := producer.NotifyPublishConfirmation()
go func(ch ChannelPublishConfirm) {
for ids := range ch {
for _, conf := range ids {
if !conf.IsConfirmed() {
Expect(conf.GetError()).To(Equal(FrameTooLarge))
atomic.AddInt32(&notConfirmedTooLarge, 1)
}
}
}
}(chConfirm)
err = producer.Send(amqp.NewMessage(make([]byte, MessageBufferTooBig)))
Expect(err).NotTo(HaveOccurred())
Eventually(func() int32 {
return atomic.LoadInt32(&notConfirmedTooLarge)
}, 5*time.Second).Should(Equal(int32(1)))
Expect(producer.Close()).NotTo(HaveOccurred())
Expect(producer.Send(amqp.NewMessage(make([]byte, MessageBufferBigButLessTheFrame)))).To(HaveOccurred())
})

It("BatchSend should not send a big messages with inside other messages", func() {
// 1.5 Milestone
// the batch send should not send a big message
// The message should be sed back to the client with an error
// FrameTooLarge and not confirmed
// but the other messages should be sent
producer, err := testEnvironment.NewProducer(testProducerStream, nil)
Expect(err).NotTo(HaveOccurred())
var notConfirmedTooLarge int32
var confirmed int32

chConfirm := producer.NotifyPublishConfirmation()
go func(ch ChannelPublishConfirm) {
defer GinkgoRecover()
for ids := range ch {
for _, conf := range ids {
if !conf.IsConfirmed() {
Expect(conf.GetError()).To(Equal(FrameTooLarge))
Expect(conf.message.GetMessageProperties().MessageID).To(Equal(fmt.Sprintf("to-big-%d", atomic.LoadInt32(&notConfirmedTooLarge))))
atomic.AddInt32(&notConfirmedTooLarge, 1)
} else {
Expect(conf.GetError()).NotTo(HaveOccurred())
Expect(conf.message.GetMessageProperties().MessageID).To(Equal(fmt.Sprintf("ok-%d", atomic.LoadInt32(&confirmed))))
atomic.AddInt32(&confirmed, 1)
}

}
}
}(chConfirm)
var messages []message.StreamMessage
for i := 0; i < 3; i++ {
m := amqp.NewMessage(make([]byte, 1))
m.Properties = &amqp.MessageProperties{
MessageID: fmt.Sprintf("ok-%d", i),
}
messages = append(messages, m)
}

for z := 0; z < 5; z++ {
s := make([]byte, 50)
err = producer.Send(amqp.NewMessage(s))
Expect(err).NotTo(HaveOccurred())
time.Sleep(20 * time.Millisecond)
for i := 0; i < 2; i++ {
m := amqp.NewMessage(make([]byte, MessageBufferTooBig))
m.Properties = &amqp.MessageProperties{
MessageID: fmt.Sprintf("to-big-%d", i),
}
messages = append(messages, m)
}

result, err := producer.BatchSend(messages)
Expect(err).NotTo(HaveOccurred())
Expect(result.TotalSent).To(Equal(3))
Expect(result.TotalFrames).To(Equal(1))
Eventually(func() int32 {
return atomic.LoadInt32(&messagesReceived)
}, 5*time.Second).Should(Equal(int32(10)),
"confirm should receive same messages Send by producer")
return atomic.LoadInt32(&notConfirmedTooLarge)
}, 5*time.Second).Should(Equal(int32(2)))

Expect(producer.lenUnConfirmed()).To(Equal(0))
err = producer.Close()
Eventually(func() int32 {
return atomic.LoadInt32(&confirmed)
}, 5*time.Second).Should(Equal(int32(3)))
Expect(producer.Close()).NotTo(HaveOccurred())
})

It("BatchSend should not a send a big message and splits the send", func() {
// the batch send should not send a big message
// The message should be sed back to the client with an error
// FrameTooLarge and not confirmed
// but the other messages should be sent with two different frames
// because the messages are too big but the single can be sent
producer, err := testEnvironment.NewProducer(testProducerStream, nil)
Expect(err).NotTo(HaveOccurred())
var notConfirmedTooLarge int32
var confirmed int32
chConfirm := producer.NotifyPublishConfirmation()
go func(ch ChannelPublishConfirm) {
defer GinkgoRecover()
for ids := range ch {
for _, conf := range ids {
if !conf.IsConfirmed() {
Expect(conf.GetError()).To(Equal(FrameTooLarge))
Expect(conf.message.GetMessageProperties().MessageID).To(Equal(fmt.Sprintf("too-big-4")))

Check failure on line 579 in pkg/stream/producer_test.go

View workflow job for this annotation

GitHub Actions / test-win32 (1.22)

unnecessary use of fmt.Sprintf (S1039)

Check failure on line 579 in pkg/stream/producer_test.go

View workflow job for this annotation

GitHub Actions / test (1.22)

unnecessary use of fmt.Sprintf (S1039)
atomic.AddInt32(&notConfirmedTooLarge, 1)
} else {
Expect(conf.GetError()).NotTo(HaveOccurred())
atomic.AddInt32(&confirmed, 1)

}
}
}
}(chConfirm)
var messages []message.StreamMessage
for i := 0; i < 3; i++ {
// the message is big but can be sent with a single frame
m := amqp.NewMessage(make([]byte, MessageBufferBigButLessTheFrame))
m.Properties = &amqp.MessageProperties{
MessageID: fmt.Sprintf("ok-%d", i),
}
messages = append(messages, m)
}

m := amqp.NewMessage(make([]byte, MessageBufferTooBig))
m.Properties = &amqp.MessageProperties{
MessageID: fmt.Sprintf("too-big-4"),

Check failure on line 601 in pkg/stream/producer_test.go

View workflow job for this annotation

GitHub Actions / test-win32 (1.22)

unnecessary use of fmt.Sprintf (S1039)

Check failure on line 601 in pkg/stream/producer_test.go

View workflow job for this annotation

GitHub Actions / test (1.22)

unnecessary use of fmt.Sprintf (S1039)
}
messages = append(messages, m)

result, err := producer.BatchSend(messages)
Expect(err).NotTo(HaveOccurred())
Expect(result.TotalSent).To(Equal(3))
Expect(result.TotalFrames).To(Equal(2))
Eventually(func() int32 {
return atomic.LoadInt32(&notConfirmedTooLarge)
}).Should(Equal(int32(1)))
})

It("Already Closed/Limits", func() {
Expand Down
3 changes: 3 additions & 0 deletions pkg/stream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (
"strconv"
)

const MessageBufferTooBig = 1148001
const MessageBufferBigButLessTheFrame = 1048400

func CreateArrayMessagesForTesting(numberOfMessages int) []message.StreamMessage {
return CreateArrayMessagesForTestingWithPrefix("test_", numberOfMessages)

Expand Down

0 comments on commit fb0a6c9

Please sign in to comment.