diff --git a/Makefile b/Makefile index 2ceba37..06fa9c1 100644 --- a/Makefile +++ b/Makefile @@ -29,6 +29,9 @@ deps: # @HELP install dependencies @echo "getting dependencies" go get -t -d -v ./... +benchmark: # @HELP run benchmark and generate files for pprof + go test -bench=. -run=^$ -benchmem -cpuprofile profile.out -memprofile memprofile.out ./test/... + build: # @HELP build the packages sh $(PWD)/scripts/build.sh @@ -37,4 +40,4 @@ ci: deps test_rule dep-linter lint all: deps test_rule lint -.PHONY: all build \ No newline at end of file +.PHONY: all build diff --git a/README.md b/README.md index a03cadc..87b3c46 100644 --- a/README.md +++ b/README.md @@ -2,13 +2,13 @@ ![Go](https://github.com/jabolina/relt/workflows/Go/badge.svg) -Primitives for Reliable Transport. At this point, the "reliable transport" is the AMQP protocol implemented -by the RabbitMQ and that's it. +A primitive for reliable communication, backed by the atomic broadcast protocol. An example can be found on the +`_examples` folder, the current atomic broadcast implementation is the [etcd](https://github.com/etcd-io/etcd) +version, which means that an etcd server is needed. Since this will be used primarily on the [Generic Atomic Multicast](https://github.com/jabolina/go-mcast), to create -a simple structure, this Relt project will only by a RabbitMQ client with a high level API for sending messages to -fan-out exchanges. +a simple structure, with a high level API for sending messages to a group of processes. When the [Generic Atomic Multicast](https://github.com/jabolina/go-mcast) contains the basic structure, this reliable -transport will turn into a new whole project where will be implemented a generic atomic broadcast, to be used as a -communication primitive. +transport will turn into a new whole project where will be implemented a generic atomic broadcast, to be used as the +default reliable communication primitive. diff --git a/_examples/peer.go b/_examples/peer.go index 027bad5..3efc836 100644 --- a/_examples/peer.go +++ b/_examples/peer.go @@ -49,15 +49,15 @@ func consume(r *relt.Relt, ctx context.Context) { func main() { conf := relt.DefaultReltConfiguration() conf.Name = "local-test" - relt, _ := relt.NewRelt(*conf) + r, _ := relt.NewRelt(*conf) ctx, done := context.WithCancel(context.Background()) go func() { - produce(relt, os.Stdin, ctx) + produce(r, os.Stdin, ctx) }() go func() { - consume(relt, ctx) + consume(r, ctx) }() c := make(chan os.Signal, 1) @@ -69,5 +69,5 @@ func main() { }() <-ctx.Done() - relt.Close() + r.Close() } diff --git a/internal/coordinator.go b/internal/coordinator.go index 9131229..41e5143 100644 --- a/internal/coordinator.go +++ b/internal/coordinator.go @@ -2,23 +2,9 @@ package internal import ( "context" - "github.com/coreos/etcd/clientv3" "io" - "time" ) -// A single write requests to be applied to etcd. -type request struct { - // Issuer writer context. - ctx context.Context - - // Event to be sent to etcd. - event Event - - // Channel to send response back. - response chan error -} - // Configuration for the coordinator. type CoordinatorConfiguration struct { // Each Coordinator will handle only a single partition. @@ -38,7 +24,7 @@ type CoordinatorConfiguration struct { // Coordinator interface that should be implemented by the // atomic broadcast handler. // Commands should be issued through the coordinator to be delivered -// to other peers +// to other peers. type Coordinator interface { io.Closer @@ -48,130 +34,13 @@ type Coordinator interface { Watch(received chan<- Event) error // Issues an Event. - Write(ctx context.Context, event Event) <-chan error + // This will have the same effect as broadcasting a message + // for every participant on the destination. + Write(ctx context.Context, event Event) error } // Create a new Coordinator using the given configuration. // The current implementation is the EtcdCoordinator, backed by etcd. func NewCoordinator(configuration CoordinatorConfiguration) (Coordinator, error) { - cli, err := clientv3.New(clientv3.Config{ - DialTimeout: 30 * time.Second, - Endpoints: []string{configuration.Server}, - }) - if err != nil { - return nil, err - } - kv := clientv3.NewKV(cli) - ctx, cancel := context.WithCancel(configuration.Ctx) - coord := &EtcdCoordinator{ - configuration: configuration, - cli: cli, - kv: kv, - ctx: ctx, - cancel: cancel, - writeChan: make(chan request), - } - configuration.Handler.Spawn(coord.writer) - return coord, nil -} - -// EtcdCoordinator will use etcd for atomic broadcast. -type EtcdCoordinator struct { - // Configuration parameters. - configuration CoordinatorConfiguration - - // Current Coordinator context, created from the parent context. - ctx context.Context - - // Function to cancel the current context. - cancel context.CancelFunc - - // A client for the etcd server. - cli *clientv3.Client - - // The key-value entry point for issuing requests. - kv clientv3.KV - - // Channel to receive write requests. - writeChan chan request -} - -// Listen and apply write requests. -// This will keep running while the application context is available. -// Receiving commands through the channel will ensure that they are -// applied synchronously to the etcd. -func (e *EtcdCoordinator) writer() { - for { - select { - case <-e.ctx.Done(): - return - case req := <-e.writeChan: - _, err := e.kv.Put(req.ctx, req.event.Key, string(req.event.Value)) - req.response <- err - } - } -} - -// Starts a new coroutine for watching the Coordinator partition. -// All received information will be published back through the channel -// received as parameter. -// -// After calling a routine will run bounded to the application lifetime. -func (e *EtcdCoordinator) Watch(received chan<- Event) error { - watchChan := e.cli.Watch(e.ctx, e.configuration.Partition) - watchChanges := func() { - for response := range watchChan { - select { - case <-e.ctx.Done(): - return - default: - e.handleResponse(response, received) - } - } - } - e.configuration.Handler.Spawn(watchChanges) - return nil -} - -// Write the given event using the KV interface. -func (e *EtcdCoordinator) Write(ctx context.Context, event Event) <-chan error { - res := make(chan error) - e.writeChan <- request{ - ctx: ctx, - event: event, - response: res, - } - return res -} - -// Stop the etcd client connection. -func (e *EtcdCoordinator) Close() error { - e.cancel() - return e.cli.Close() -} - -// This method is responsible for handling events from the etcd client. -// -// This method will transform each received event into Event object and -// publish it back using the given channel. A buffered channel will be created -// and a goroutine will be spawned, so we can publish the received messages -// asynchronously without blocking. This can cause the Close to hold, if there -// exists pending messages to be consumed by the channel, this method can cause a deadlock. -func (e *EtcdCoordinator) handleResponse(response clientv3.WatchResponse, received chan<- Event) { - buffered := make(chan Event, len(response.Events)) - defer close(buffered) - - e.configuration.Handler.Spawn(func() { - for ev := range buffered { - received <- ev - } - }) - - for _, event := range response.Events { - buffered <- Event{ - Key: string(event.Kv.Key), - Value: event.Kv.Value, - Error: nil, - } - } + return newEtcdCoordinator(configuration) } diff --git a/internal/core.go b/internal/core.go index 1ea1723..f37dc7b 100644 --- a/internal/core.go +++ b/internal/core.go @@ -16,6 +16,7 @@ var ( coreAlreadyWatching = errors.New("already watching partition") ) +// Configuration for the Core interface. type CoreConfiguration struct { // Partition the Coordinator will work with. Partition string @@ -23,11 +24,14 @@ type CoreConfiguration struct { // Server address for the Coordinator. Server string - // Default timeout to be applied when handling channels. + // Default timeout to be applied when handling channels and + // asynchronous operations. DefaultTimeout time.Duration } // Holds all flags used to manage the Core state. +// This is the same as an AtomicBoolean and is used internally +// to manage the core states. type CoreFlags struct { // Flag for the shutdown state. shutdown Flag @@ -36,13 +40,19 @@ type CoreFlags struct { watching Flag } -// Core is the interface that will hold the Relt connection to the Coordinator. +// Core is the interface that will create the link between Relt requests +// and the Coordinator. // Every command issued will be parsed here, and every command received should // be handled here before going back to the client. +// Everything after this stage should care only about the atomic broadcast protocol +// and everything before should be abstracted as a simple communication primitive. +// This means that any parsing or state handling for the client should be done here. type Core interface { io.Closer // Start listening for new messages. + // This will receive messages from the atomic broadcast protocol + // and parse to an object the client can handle. Listen() (<-chan Message, error) // Send a message asynchronously for the given partition. @@ -66,12 +76,14 @@ type ReltCore struct { handler *GoRoutineHandler // Coordinator to issues commands and receive Event. + // The coordinator is the interface to reach the atomic + // broadcast protocol. coord Coordinator // Channel for sending Message to the client. output chan Message - // Flags for handling state. + // Flags for handling internal state. flags CoreFlags // Core configuration parameters. @@ -79,8 +91,8 @@ type ReltCore struct { } // Create a new ReltCore using the given configuration. -// As an effect, this will instantiate a Coordinator a failures -// can happen while handling connections between the peers. +// As an effect, this will instantiate a Coordinator a failure +// can happen while handling connection to the atomic broadcast server. func NewCore(configuration CoreConfiguration) (Core, error) { ctx, cancel := context.WithCancel(context.TODO()) handler := NewRoutineHandler() @@ -136,7 +148,7 @@ func (r *ReltCore) Listen() (<-chan Message, error) { } if r.flags.watching.Inactivate() { - events := make(chan Event, 100) + events := make(chan Event) if err := r.coord.Watch(events); err != nil { return nil, err } @@ -162,15 +174,14 @@ func (r *ReltCore) Listen() (<-chan Message, error) { // This is a broadcast message, which means that if _N_ nodes are // subscribed for a partition, every node will receive the message. func (r *ReltCore) Send(ctx context.Context, dest string, data []byte) <-chan error { - response := make(chan error, 1) + response := make(chan error) writeRequest := func() { if r.flags.shutdown.IsActive() { event := Event{ Key: dest, Value: data, } - err := <-r.coord.Write(ctx, event) - response <- err + response <- r.coord.Write(ctx, event) } else { response <- coreWasShutdown } @@ -192,6 +203,7 @@ func (r *ReltCore) Close() error { } r.finish() r.handler.Close() + return nil } return coreWasShutdown } diff --git a/internal/etcd_coordinator.go b/internal/etcd_coordinator.go new file mode 100644 index 0000000..93885c4 --- /dev/null +++ b/internal/etcd_coordinator.go @@ -0,0 +1,92 @@ +package internal + +import ( + "context" + "github.com/coreos/etcd/clientv3" + "time" +) + +// EtcdCoordinator will use etcd for atomic broadcast. +type EtcdCoordinator struct { + // Configuration parameters. + configuration CoordinatorConfiguration + + // Current Coordinator context, created from the parent context. + ctx context.Context + + // Function to cancel the current context. + cancel context.CancelFunc + + // A client for interacting with the etcd server. + cli *clientv3.Client +} + +// Creates a new coordinator that is backed by the etcd atomic broadcast. +// This method will connect to the etcd server configured, so a chance of failure +// exists at this step. +// Only a simple configuration is available here. +func newEtcdCoordinator(configuration CoordinatorConfiguration) (Coordinator, error) { + cli, err := clientv3.New(clientv3.Config{ + DialTimeout: 5 * time.Second, + Endpoints: []string{configuration.Server}, + }) + if err != nil { + return nil, err + } + ctx, cancel := context.WithCancel(configuration.Ctx) + coord := &EtcdCoordinator{ + configuration: configuration, + cli: cli, + ctx: ctx, + cancel: cancel, + } + return coord, nil +} + +// Starts a new coroutine for watching the Coordinator partition. +// All received information will be published back through the channel +// received as parameter. +// +// After calling a routine will run bounded to the application lifetime. +func (e *EtcdCoordinator) Watch(received chan<- Event) error { + watchChan := e.cli.Watch(e.ctx, e.configuration.Partition) + watchChanges := func() { + for response := range watchChan { + select { + case <-e.ctx.Done(): + return + default: + e.handleResponse(response, received) + } + } + } + e.configuration.Handler.Spawn(watchChanges) + return nil +} + +// Write the given event issuing a PUT request through the client. +func (e *EtcdCoordinator) Write(ctx context.Context, event Event) error { + _, err := e.cli.Put(ctx, event.Key, string(event.Value)) + return err +} + +// Stop the etcd client connection and stop the goroutines. +func (e *EtcdCoordinator) Close() error { + e.cancel() + return e.cli.Close() +} + +// This method is responsible for handling events from the etcd client. +// +// This method will transform each received event into Event object and +// publish it back using the given channel. This method will hang while +// the messages are not consumed. +func (e *EtcdCoordinator) handleResponse(response clientv3.WatchResponse, received chan<- Event) { + for _, event := range response.Events { + received <- Event{ + Key: string(event.Kv.Key), + Value: event.Kv.Value, + Error: nil, + } + } +} diff --git a/internal/event.go b/internal/event.go index de917b1..e6bc96a 100644 --- a/internal/event.go +++ b/internal/event.go @@ -3,12 +3,20 @@ package internal // Event is a structure handled by the Coordinator. // Events are received and issued through the atomic broadcast. type Event struct { - Key string + // Key affected by the event. + Key string + + // Value that should be applied if is sending the event or + // applied value if the event was received. Value []byte + + // Only used when is a received event from the atomic broadcast, + // this transport any error that happened. Error error } -func (e Event) isError() bool { +// Verify if the event is an error event. +func (e Event) IsError() bool { return e.Error != nil } @@ -24,7 +32,14 @@ func (e Event) toMessage() Message { // Message is the structure handled by the Core. // Messages are the available data sent to the client. type Message struct { + // Where the message was originated. + // If a client `A` sends a messages to client `B`, + // this value will be `B`. Origin string - Data []byte - Error error + + // Data transported. + Data []byte + + // If an error happened. + Error error } diff --git a/pkg/relt/configuration.go b/pkg/relt/configuration.go index 964a9a3..2d7cc13 100644 --- a/pkg/relt/configuration.go +++ b/pkg/relt/configuration.go @@ -13,24 +13,22 @@ var ( // Configuration used for creating a new instance // of the Relt. type Configuration struct { - // The Relt name. Is not required, if empty a - // random string will be generated to be used. - // This must be unique, since it will be used to declare - // the peer queue for consumption. + // The Relt name. Is not required, if empty a random string will + // be generated to be used. Name string - // Only plain auth is supported. The username + password - // will be passed in the connection URL. + // URL used for connecting to the atomic broadcast protocol. Url string - // This will be used to create an exchange on the RabbitMQ - // broker. If the user wants to declare its own name for the - // exchange, if none is passed, the value 'relt' will be used. + // This is the partition where the transport will act to. The client + // will listen for only messages where the destination is the configured + // partition. // // When declaring multiple partitions, this must be configured // properly, since this will dictate which peers received the // messages. If all peers are using the same exchange then - // is the same as all peers are a single partition. + // the clients will act as a single unity, where every peer + // will receive all messages in the same order. Exchange GroupAddress // Default timeout to be applied when handling asynchronous methods. @@ -38,10 +36,6 @@ type Configuration struct { } // Creates the default configuration for the Relt. -// The peer, thus the queue name will be randomly generated, -// the connection Url will connect to a local broker using -// the user `guest` and password `guest`. -// The default exchange will fallback to `relt`. func DefaultReltConfiguration() *Configuration { return &Configuration{ Name: GenerateUID(), diff --git a/pkg/relt/relt.go b/pkg/relt/relt.go index f0fa03b..db2855d 100644 --- a/pkg/relt/relt.go +++ b/pkg/relt/relt.go @@ -15,12 +15,17 @@ var ( // The implementation for the Transport interface // providing reliable communication between hosts. +// +// Every command must be issued through this struct, +// where a single object instance represents a peer that +// participates on the atomic broadcast protocol. type Relt struct { - // Holds the configuration about the core - // and the Relt transport. + // Holds the configuration. configuration Configuration // Holds the Core structure. + // Every command received will be prepared and sent + // through the core structure. core internal.Core } @@ -30,6 +35,9 @@ func (r *Relt) Consume() (<-chan internal.Message, error) { } // Implements the Transport interface. +// Will broadcast a message to all peers that listen to the destination. +// This method is bounded by the given context lifetime and by the +// configured timeout. func (r *Relt) Broadcast(ctx context.Context, message Send) error { if len(message.Address) == 0 { return ErrInvalidGroupAddress diff --git a/pkg/relt/transport.go b/pkg/relt/transport.go index 765aa54..ebcea92 100644 --- a/pkg/relt/transport.go +++ b/pkg/relt/transport.go @@ -6,28 +6,18 @@ import ( "io" ) -// When broadcasting or multicasting a message must provide -// the group address. +// When broadcasting a message must provide the group address. type GroupAddress string -// Denotes a received information or an error -// that occurred in the channel. -type Recv struct { - // Received data or nil if is an error event. - Data []byte - - // Returns an error back to the listener. - Error error -} - // Denotes an information that will be sent. -// By design, the group address cannot be empty -// and the Data to be sent cannot be nil, must be at least -// and empty slice. +// By design, the group address cannot be empty and the Data to be +// sent cannot be nil, must be at least an empty slice. type Send struct { // Which group the message will be sent. - // This is the name of the exchange that must receive - // the message and not an actual IP address. + // This is the name of the exchange that must receive the message + // and not an actual IP address. + // If multiple peers are listening, all of them will receive the + // message in the same order. Address GroupAddress // Data to be sent to the group. @@ -35,11 +25,6 @@ type Send struct { } // A interface to offer a high level API for transport. -// This transport is backed by a RabbitMQ using the quorum queues, -// that uses the Raft Protocol for consistency. -// Everything sent by this transport will not receive a direct -// response back, everything is just message passing simulating -// events that can occur. type Transport interface { io.Closer @@ -56,7 +41,6 @@ type Transport interface { // // A goroutine will be spawned and the message will be sent // through a channel, this channel is only closed when the - // Relt transport is closed, so if the the transport is already - // closed this function will panic. + // Relt transport is closed. Broadcast(ctx context.Context, message Send) error } diff --git a/pkg/relt/util.go b/pkg/relt/util.go index ae9454e..bd1b544 100644 --- a/pkg/relt/util.go +++ b/pkg/relt/util.go @@ -33,6 +33,7 @@ func GenerateRandomIP() (string, error) { return listener.Addr().String(), nil } +// Verify if the given value is a valid URL. func IsUrl(value string) bool { parsed, err := url.Parse(value) return err == nil && parsed.Scheme != "" diff --git a/test/etcd_test.go b/test/etcd_test.go new file mode 100644 index 0000000..e76b62d --- /dev/null +++ b/test/etcd_test.go @@ -0,0 +1,202 @@ +package test + +import ( + "context" + "fmt" + "github.com/coreos/etcd/clientv3" + "github.com/jabolina/relt/internal" + "go.uber.org/goleak" + "sync" + "testing" + "time" +) + +func Test_ShouldReceiveAllCommands(t *testing.T) { + defer goleak.VerifyNone(t) + partition := "etcd-coordinator" + testSize := 100 + clusterSize := 30 + ctx, cancel := context.WithCancel(context.Background()) + listenersGroup := &sync.WaitGroup{} + writerGroup := &sync.WaitGroup{} + initializeReplica := func(c internal.Coordinator, history *MessageHist) { + msgChan := make(chan internal.Event) + err := c.Watch(msgChan) + if err != nil { + t.Fatalf("failed starting to listen. %#v", err) + } + + go func() { + defer listenersGroup.Done() + for { + select { + case recv := <-msgChan: + if recv.Value == nil || len(recv.Value) == 0 { + t.Errorf("received wrong data") + } + + if recv.Error != nil { + t.Errorf("error on consumed response. %v", recv.Error) + } + + history.insert(string(recv.Value)) + case <-ctx.Done(): + return + } + } + }() + } + initializeCluster := func(size int) ([]internal.Coordinator, []*MessageHist) { + var coordinators []internal.Coordinator + var history []*MessageHist + for i := 0; i < size; i++ { + conf := internal.CoordinatorConfiguration{ + Partition: partition, + Server: "localhost:2379", + Ctx: ctx, + Handler: internal.NewRoutineHandler(), + } + coord, err := internal.NewCoordinator(conf) + if err != nil { + t.Fatalf("failed starting coordinator. %#v", err) + } + h := NewHistory() + initializeReplica(coord, h) + + coordinators = append(coordinators, coord) + history = append(history, h) + } + return coordinators, history + } + + listenersGroup.Add(clusterSize) + replicas, history := initializeCluster(clusterSize) + + entry := replicas[0] + writerGroup.Add(testSize) + for i := 0; i < testSize; i++ { + write := func(data []byte) { + defer writerGroup.Done() + err := entry.Write(ctx, internal.Event{ + Key: partition, + Value: data, + Error: nil, + }) + + if err != nil { + t.Errorf("failed broadcasting. %#v", err) + } + } + go write([]byte(fmt.Sprintf("%d", i))) + } + + writerGroup.Wait() + time.Sleep(10 * time.Second) + cancel() + listenersGroup.Wait() + + truth := history[0] + if truth.size() != testSize { + t.Errorf("should have size %d, found %d", testSize, truth.size()) + } + + for i, messageHist := range history { + diff := truth.compare(*messageHist) + if diff != 0 { + t.Errorf("history differ at %d with %d different commands", i, diff) + } + } + + for _, replica := range replicas { + err := replica.Close() + if err != nil { + t.Errorf("failed closing replica. %#v", err) + } + } +} + +func Test_ShouldHaveEventsWhenDirectConnection(t *testing.T) { + defer goleak.VerifyNone(t) + partition := "etcd-coordinator" + testSize := 100 + clusterSize := 30 + ctx, cancel := context.WithCancel(context.Background()) + listenersGroup := &sync.WaitGroup{} + writerGroup := &sync.WaitGroup{} + initializeReplica := func(c *clientv3.Client, history *MessageHist) { + watchChan := c.Watch(ctx, partition) + go func() { + defer listenersGroup.Done() + for { + select { + case res := <-watchChan: + for _, event := range res.Events { + history.insert(string(event.Kv.Value)) + } + case <-ctx.Done(): + return + } + } + }() + } + + initializeCluster := func(size int) ([]*clientv3.Client, []*MessageHist) { + var replicas []*clientv3.Client + var history []*MessageHist + for i := 0; i < size; i++ { + e, err := clientv3.New(clientv3.Config{ + Endpoints: []string{"localhost:2379"}, + DialTimeout: 5 * time.Second, + }) + if err != nil { + t.Fatalf("failed connecting. %#v", err) + } + h := NewHistory() + initializeReplica(e, h) + + replicas = append(replicas, e) + history = append(history, h) + } + return replicas, history + } + + listenersGroup.Add(clusterSize) + replicas, history := initializeCluster(clusterSize) + + entry := replicas[0] + writerGroup.Add(testSize) + for i := 0; i < testSize; i++ { + write := func(data string) { + defer writerGroup.Done() + _, err := entry.Put(ctx, partition, data) + if err != nil { + t.Errorf("failed broadcasting. %v", err) + } + } + go write(fmt.Sprintf("%d", i)) + } + + writerGroup.Wait() + time.Sleep(5 * time.Second) + cancel() + listenersGroup.Wait() + + truth := history[0] + if truth.size() != testSize { + t.Errorf("should have size %d, found %d", testSize, truth.size()) + } + + for i, messageHist := range history { + diff := truth.compare(*messageHist) + if diff != 0 { + t.Errorf("history differ at %d with %d different commands", i, diff) + } + } + + for _, replica := range replicas { + err := replica.Close() + if err != nil { + t.Errorf("failed closing replica. %#v", err) + } + } +} diff --git a/test/helper.go b/test/helper.go index 473c63c..7e51647 100644 --- a/test/helper.go +++ b/test/helper.go @@ -30,7 +30,18 @@ func (m *MessageHist) values() []string { return copied } +func (m *MessageHist) size() int { + m.mutex.Lock() + defer m.mutex.Unlock() + return len(m.history) +} + func (m *MessageHist) compare(other MessageHist) int { + // if both objects hold the same mutex a deadlock will be created. + if m.mutex == other.mutex { + return 0 + } + m.mutex.Lock() defer m.mutex.Unlock() different := 0 diff --git a/test/load_test.go b/test/load_test.go new file mode 100644 index 0000000..638c960 --- /dev/null +++ b/test/load_test.go @@ -0,0 +1,109 @@ +package test + +import ( + "context" + "fmt" + "github.com/google/uuid" + "github.com/jabolina/relt/pkg/relt" + "sync" + "testing" + "time" +) + +func Benchmark_LoadTestMultipleReplicas(b *testing.B) { + partition := "bench-replicas-" + uuid.New().String() + testSize := 200 + clusterSize := 50 + ctx, cancel := context.WithCancel(context.TODO()) + listenersGroup := &sync.WaitGroup{} + writerGroup := &sync.WaitGroup{} + initializeReplica := func(r *relt.Relt, history *MessageHist) { + listener, err := r.Consume() + if err != nil { + b.Fatalf("failed starting consumer. %#v", err) + } + + go func() { + defer listenersGroup.Done() + for { + select { + case recv := <-listener: + if recv.Data == nil || len(recv.Data) == 0 { + b.Errorf("received wrong data") + } + + if recv.Error != nil { + b.Errorf("error on consumed response. %v", recv.Error) + } + + history.insert(string(recv.Data)) + case <-ctx.Done(): + return + } + } + }() + } + initializeCluster := func(size int) ([]*relt.Relt, []*MessageHist) { + var replicas []*relt.Relt + var history []*MessageHist + for i := 0; i < size; i++ { + conf := relt.DefaultReltConfiguration() + conf.Name = partition + fmt.Sprintf("%d", i) + conf.Exchange = relt.GroupAddress(partition) + r, err := relt.NewRelt(*conf) + if err != nil { + b.Fatalf("failed connecting. %v", err) + } + h := NewHistory() + initializeReplica(r, h) + + replicas = append(replicas, r) + history = append(history, h) + } + return replicas, history + } + + listenersGroup.Add(clusterSize) + replicas, history := initializeCluster(clusterSize) + + entry := replicas[0] + writerGroup.Add(testSize) + for i := 0; i < testSize; i++ { + write := func(data []byte) { + defer writerGroup.Done() + err := entry.Broadcast(ctx, relt.Send{ + Address: relt.GroupAddress(partition), + Data: data, + }) + + if err != nil { + b.Errorf("failed broadcasting. %v", err) + } + } + go write([]byte(fmt.Sprintf("%d", i))) + } + + writerGroup.Wait() + time.Sleep(10 * time.Second) + cancel() + listenersGroup.Wait() + + truth := history[0] + if truth.size() != testSize { + b.Errorf("should have size %d, found %d", testSize, truth.size()) + } + + for i, messageHist := range history { + diff := truth.compare(*messageHist) + if diff != 0 { + b.Errorf("history differ at %d with %d different commands", i, diff) + } + } + + for _, replica := range replicas { + err := replica.Close() + if err != nil { + b.Errorf("failed closing replica. %#v", err) + } + } +} diff --git a/test/synchronization_test.go b/test/synchronization_test.go index d3399b0..98b1a11 100644 --- a/test/synchronization_test.go +++ b/test/synchronization_test.go @@ -97,3 +97,102 @@ func Test_ReplicasShouldReceiveSameOrder(t *testing.T) { t.Fatalf("message history do not match. %d messages do not match", diff) } } + +func Test_MassiveNumberOfReplicasShouldBeSynchronized(t *testing.T) { + defer goleak.VerifyNone(t) + partition := "massive-synchronized-replicas-" + uuid.New().String() + testSize := 100 + clusterSize := 30 + ctx, cancel := context.WithCancel(context.TODO()) + listenersGroup := &sync.WaitGroup{} + writerGroup := &sync.WaitGroup{} + initializeReplica := func(r *relt.Relt, history *MessageHist) { + listener, err := r.Consume() + if err != nil { + t.Fatalf("failed starting consumer. %#v", err) + } + + go func() { + defer listenersGroup.Done() + for { + select { + case recv := <-listener: + if recv.Data == nil || len(recv.Data) == 0 { + t.Errorf("received wrong data") + } + + if recv.Error != nil { + t.Errorf("error on consumed response. %v", recv.Error) + } + + history.insert(string(recv.Data)) + case <-ctx.Done(): + return + } + } + }() + } + initializeCluster := func(size int) ([]*relt.Relt, []*MessageHist) { + var replicas []*relt.Relt + var history []*MessageHist + for i := 0; i < size; i++ { + conf := relt.DefaultReltConfiguration() + conf.Name = partition + fmt.Sprintf("%d", i) + conf.Exchange = relt.GroupAddress(partition) + r, err := relt.NewRelt(*conf) + if err != nil { + t.Fatalf("failed connecting. %v", err) + } + h := NewHistory() + initializeReplica(r, h) + + replicas = append(replicas, r) + history = append(history, h) + } + return replicas, history + } + + listenersGroup.Add(clusterSize) + replicas, history := initializeCluster(clusterSize) + + entry := replicas[0] + writerGroup.Add(testSize) + for i := 0; i < testSize; i++ { + write := func(data []byte) { + defer writerGroup.Done() + err := entry.Broadcast(ctx, relt.Send{ + Address: relt.GroupAddress(partition), + Data: data, + }) + + if err != nil { + t.Errorf("failed broadcasting. %v", err) + } + } + go write([]byte(fmt.Sprintf("%d", i))) + } + + writerGroup.Wait() + time.Sleep(10 * time.Second) + cancel() + listenersGroup.Wait() + + truth := history[0] + if truth.size() != testSize { + t.Errorf("should have size %d, found %d", testSize, truth.size()) + } + + for i, messageHist := range history { + diff := truth.compare(*messageHist) + if diff != 0 { + t.Errorf("history differ at %d with %d different commands", i, diff) + } + } + + for _, replica := range replicas { + err := replica.Close() + if err != nil { + t.Errorf("failed closing replica. %#v", err) + } + } +}