From 993f6444073340027276fa119a47440ba915771c Mon Sep 17 00:00:00 2001 From: Maksim Konovalov Date: Thu, 18 Jul 2024 19:53:08 +0300 Subject: [PATCH 1/3] feature/balancing-faces: add external load balancing methods support I added the ability to use custom balancing methods when connecting with a pool. --- CHANGELOG.md | 1 + pool/balancer.go | 19 +++++++++++++++++++ pool/connection_pool.go | 27 +++++++++++++++++---------- pool/round_robin.go | 28 ++++++++++++++++++---------- pool/round_robin_test.go | 12 ++++++------ 5 files changed, 61 insertions(+), 26 deletions(-) create mode 100644 pool/balancer.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ef3cb4e..c8c7dd37 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. also added logs for error case of `ConnectionPool.tryConnect()` calls in `ConnectionPool.controller()` and `ConnectionPool.reconnect()` - Methods that are implemented but not included in the pooler interface (#395). +- Add support for external load balancing methods when connecting via Pool (#400). ### Changed diff --git a/pool/balancer.go b/pool/balancer.go new file mode 100644 index 00000000..07448340 --- /dev/null +++ b/pool/balancer.go @@ -0,0 +1,19 @@ +package pool + +import "github.com/tarantool/go-tarantool/v2" + +type BalancerFactory interface { + Create(size int) BalancingPool +} + +type BalancingPool interface { + GetConnection(string) *tarantool.Connection + DeleteConnection(string) *tarantool.Connection + AddConnection(id string, conn *tarantool.Connection) + GetNextConnection() *tarantool.Connection + GetConnections() map[string]*tarantool.Connection +} + +func IsEmpty(pool BalancingPool) bool { + return len(pool.GetConnections()) == 0 +} diff --git a/pool/connection_pool.go b/pool/connection_pool.go index 1c9d85f4..f6f7b934 100644 --- a/pool/connection_pool.go +++ b/pool/connection_pool.go @@ -81,6 +81,9 @@ type Opts struct { CheckTimeout time.Duration // ConnectionHandler provides an ability to handle connection updates. ConnectionHandler ConnectionHandler + // BalancerFactory - a factory for creating balancing, + // contains the pool size as well as the connections for which it is used. + BalancerFactory BalancerFactory } /* @@ -110,9 +113,9 @@ type ConnectionPool struct { state state done chan struct{} - roPool *roundRobinStrategy - rwPool *roundRobinStrategy - anyPool *roundRobinStrategy + roPool BalancingPool + rwPool BalancingPool + anyPool BalancingPool poolsMutex sync.RWMutex watcherContainer watcherContainer } @@ -153,6 +156,10 @@ func newEndpoint(name string, dialer tarantool.Dialer, opts tarantool.Opts) *end // opts. Instances must have unique names. func ConnectWithOpts(ctx context.Context, instances []Instance, opts Opts) (*ConnectionPool, error) { + if opts.BalancerFactory == nil { + opts.BalancerFactory = &RoundRobinFactory{} + } + unique := make(map[string]bool) for _, instance := range instances { if _, ok := unique[instance.Name]; ok { @@ -166,9 +173,9 @@ func ConnectWithOpts(ctx context.Context, instances []Instance, } size := len(instances) - rwPool := newRoundRobinStrategy(size) - roPool := newRoundRobinStrategy(size) - anyPool := newRoundRobinStrategy(size) + rwPool := opts.BalancerFactory.Create(size) + roPool := opts.BalancerFactory.Create(size) + anyPool := opts.BalancerFactory.Create(size) connPool := &ConnectionPool{ ends: make(map[string]*endpoint), @@ -218,15 +225,15 @@ func (p *ConnectionPool) ConnectedNow(mode Mode) (bool, error) { } switch mode { case ANY: - return !p.anyPool.IsEmpty(), nil + return !IsEmpty(p.anyPool), nil case RW: - return !p.rwPool.IsEmpty(), nil + return !IsEmpty(p.rwPool), nil case RO: - return !p.roPool.IsEmpty(), nil + return !IsEmpty(p.roPool), nil case PreferRW: fallthrough case PreferRO: - return !p.rwPool.IsEmpty() || !p.roPool.IsEmpty(), nil + return !IsEmpty(p.rwPool) || !IsEmpty(p.roPool), nil default: return false, ErrNoHealthyInstance } diff --git a/pool/round_robin.go b/pool/round_robin.go index 82cf26f3..ad8b60b8 100644 --- a/pool/round_robin.go +++ b/pool/round_robin.go @@ -7,7 +7,9 @@ import ( "github.com/tarantool/go-tarantool/v2" ) -type roundRobinStrategy struct { +var _ BalancingPool = (*RoundRobinStrategy)(nil) + +type RoundRobinStrategy struct { conns []*tarantool.Connection indexById map[string]uint mutex sync.RWMutex @@ -15,8 +17,14 @@ type roundRobinStrategy struct { current uint64 } -func newRoundRobinStrategy(size int) *roundRobinStrategy { - return &roundRobinStrategy{ +type RoundRobinFactory struct{} + +func (r *RoundRobinFactory) Create(size int) BalancingPool { + return NewRoundRobinStrategy(size) +} + +func NewRoundRobinStrategy(size int) BalancingPool { + return &RoundRobinStrategy{ conns: make([]*tarantool.Connection, 0, size), indexById: make(map[string]uint, size), size: 0, @@ -24,7 +32,7 @@ func newRoundRobinStrategy(size int) *roundRobinStrategy { } } -func (r *roundRobinStrategy) GetConnection(id string) *tarantool.Connection { +func (r *RoundRobinStrategy) GetConnection(id string) *tarantool.Connection { r.mutex.RLock() defer r.mutex.RUnlock() @@ -36,7 +44,7 @@ func (r *roundRobinStrategy) GetConnection(id string) *tarantool.Connection { return r.conns[index] } -func (r *roundRobinStrategy) DeleteConnection(id string) *tarantool.Connection { +func (r *RoundRobinStrategy) DeleteConnection(id string) *tarantool.Connection { r.mutex.Lock() defer r.mutex.Unlock() @@ -64,14 +72,14 @@ func (r *roundRobinStrategy) DeleteConnection(id string) *tarantool.Connection { return conn } -func (r *roundRobinStrategy) IsEmpty() bool { +func (r *RoundRobinStrategy) IsEmpty() bool { r.mutex.RLock() defer r.mutex.RUnlock() return r.size == 0 } -func (r *roundRobinStrategy) GetNextConnection() *tarantool.Connection { +func (r *RoundRobinStrategy) GetNextConnection() *tarantool.Connection { r.mutex.RLock() defer r.mutex.RUnlock() @@ -81,7 +89,7 @@ func (r *roundRobinStrategy) GetNextConnection() *tarantool.Connection { return r.conns[r.nextIndex()] } -func (r *roundRobinStrategy) GetConnections() map[string]*tarantool.Connection { +func (r *RoundRobinStrategy) GetConnections() map[string]*tarantool.Connection { r.mutex.RLock() defer r.mutex.RUnlock() @@ -93,7 +101,7 @@ func (r *roundRobinStrategy) GetConnections() map[string]*tarantool.Connection { return conns } -func (r *roundRobinStrategy) AddConnection(id string, conn *tarantool.Connection) { +func (r *RoundRobinStrategy) AddConnection(id string, conn *tarantool.Connection) { r.mutex.Lock() defer r.mutex.Unlock() @@ -106,7 +114,7 @@ func (r *roundRobinStrategy) AddConnection(id string, conn *tarantool.Connection } } -func (r *roundRobinStrategy) nextIndex() uint64 { +func (r *RoundRobinStrategy) nextIndex() uint64 { next := atomic.AddUint64(&r.current, 1) return (next - 1) % r.size } diff --git a/pool/round_robin_test.go b/pool/round_robin_test.go index 6f028f2d..3682e22d 100644 --- a/pool/round_robin_test.go +++ b/pool/round_robin_test.go @@ -12,7 +12,7 @@ const ( ) func TestRoundRobinAddDelete(t *testing.T) { - rr := newRoundRobinStrategy(10) + rr := NewRoundRobinStrategy(10) addrs := []string{validAddr1, validAddr2} conns := []*tarantool.Connection{&tarantool.Connection{}, &tarantool.Connection{}} @@ -26,13 +26,13 @@ func TestRoundRobinAddDelete(t *testing.T) { t.Errorf("Unexpected connection on address %s", addr) } } - if !rr.IsEmpty() { + if !IsEmpty(rr) { t.Errorf("RoundRobin does not empty") } } func TestRoundRobinAddDuplicateDelete(t *testing.T) { - rr := newRoundRobinStrategy(10) + rr := NewRoundRobinStrategy(10) conn1 := &tarantool.Connection{} conn2 := &tarantool.Connection{} @@ -43,7 +43,7 @@ func TestRoundRobinAddDuplicateDelete(t *testing.T) { if rr.DeleteConnection(validAddr1) != conn2 { t.Errorf("Unexpected deleted connection") } - if !rr.IsEmpty() { + if !IsEmpty(rr) { t.Errorf("RoundRobin does not empty") } if rr.DeleteConnection(validAddr1) != nil { @@ -52,7 +52,7 @@ func TestRoundRobinAddDuplicateDelete(t *testing.T) { } func TestRoundRobinGetNextConnection(t *testing.T) { - rr := newRoundRobinStrategy(10) + rr := NewRoundRobinStrategy(10) addrs := []string{validAddr1, validAddr2} conns := []*tarantool.Connection{&tarantool.Connection{}, &tarantool.Connection{}} @@ -70,7 +70,7 @@ func TestRoundRobinGetNextConnection(t *testing.T) { } func TestRoundRobinStrategy_GetConnections(t *testing.T) { - rr := newRoundRobinStrategy(10) + rr := NewRoundRobinStrategy(10) addrs := []string{validAddr1, validAddr2} conns := []*tarantool.Connection{&tarantool.Connection{}, &tarantool.Connection{}} From c1ef16261ef0091dac33a1b3f6d92b1275090d6b Mon Sep 17 00:00:00 2001 From: Maksim Konovalov Date: Tue, 1 Oct 2024 17:01:23 +0300 Subject: [PATCH 2/3] pool: make round-robin tests for public API --- pool/round_robin_test.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/pool/round_robin_test.go b/pool/round_robin_test.go index 3682e22d..970bd11e 100644 --- a/pool/round_robin_test.go +++ b/pool/round_robin_test.go @@ -1,9 +1,10 @@ -package pool +package pool_test import ( "testing" "github.com/tarantool/go-tarantool/v2" + "github.com/tarantool/go-tarantool/v2/pool" ) const ( @@ -12,7 +13,7 @@ const ( ) func TestRoundRobinAddDelete(t *testing.T) { - rr := NewRoundRobinStrategy(10) + rr := pool.NewRoundRobinStrategy(10) addrs := []string{validAddr1, validAddr2} conns := []*tarantool.Connection{&tarantool.Connection{}, &tarantool.Connection{}} @@ -26,13 +27,13 @@ func TestRoundRobinAddDelete(t *testing.T) { t.Errorf("Unexpected connection on address %s", addr) } } - if !IsEmpty(rr) { + if !pool.IsEmpty(rr) { t.Errorf("RoundRobin does not empty") } } func TestRoundRobinAddDuplicateDelete(t *testing.T) { - rr := NewRoundRobinStrategy(10) + rr := pool.NewRoundRobinStrategy(10) conn1 := &tarantool.Connection{} conn2 := &tarantool.Connection{} @@ -43,7 +44,7 @@ func TestRoundRobinAddDuplicateDelete(t *testing.T) { if rr.DeleteConnection(validAddr1) != conn2 { t.Errorf("Unexpected deleted connection") } - if !IsEmpty(rr) { + if !pool.IsEmpty(rr) { t.Errorf("RoundRobin does not empty") } if rr.DeleteConnection(validAddr1) != nil { @@ -52,7 +53,7 @@ func TestRoundRobinAddDuplicateDelete(t *testing.T) { } func TestRoundRobinGetNextConnection(t *testing.T) { - rr := NewRoundRobinStrategy(10) + rr := pool.NewRoundRobinStrategy(10) addrs := []string{validAddr1, validAddr2} conns := []*tarantool.Connection{&tarantool.Connection{}, &tarantool.Connection{}} @@ -70,7 +71,7 @@ func TestRoundRobinGetNextConnection(t *testing.T) { } func TestRoundRobinStrategy_GetConnections(t *testing.T) { - rr := NewRoundRobinStrategy(10) + rr := pool.NewRoundRobinStrategy(10) addrs := []string{validAddr1, validAddr2} conns := []*tarantool.Connection{&tarantool.Connection{}, &tarantool.Connection{}} From 011c58ce865a27e29ae921948dc3e4603e780ce7 Mon Sep 17 00:00:00 2001 From: Maksim Konovalov Date: Tue, 1 Oct 2024 17:09:09 +0300 Subject: [PATCH 3/3] pool: add comments for balancer factory --- pool/balancer.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/pool/balancer.go b/pool/balancer.go index 07448340..6bb161cd 100644 --- a/pool/balancer.go +++ b/pool/balancer.go @@ -2,15 +2,34 @@ package pool import "github.com/tarantool/go-tarantool/v2" +// BalancerFactory is an interface for creating a balancing pool of connections. type BalancerFactory interface { + // Create initializes a new BalancingPool with the specified size. + // The size parameter indicates the intended number of connections to manage within the pool. Create(size int) BalancingPool } +// BalancingPool represents a connection pool with load balancing. type BalancingPool interface { + // GetConnection returns the connection associated with the specified identifier. + // If no connection with the given identifier is found, it returns nil. GetConnection(string) *tarantool.Connection + + // DeleteConnection removes the connection with the specified identifier from the pool + // and returns the removed connection. If no connection is found, it returns nil. DeleteConnection(string) *tarantool.Connection + + // AddConnection adds a new connection to the pool under the specified identifier. + // If a connection with that identifier already exists, the behavior may depend + // on the implementation (e.g., it may overwrite the existing connection). AddConnection(id string, conn *tarantool.Connection) + + // GetNextConnection returns the next available connection from the pool. + // The implementation may use load balancing algorithms to select the connection. GetNextConnection() *tarantool.Connection + + // GetConnections returns the current map of all connections in the pool, + // where the key is the connection identifier and the value is a pointer to the connection object. GetConnections() map[string]*tarantool.Connection }