Skip to content

Commit

Permalink
HostPoolHostPolicy was moved to separate package
Browse files Browse the repository at this point in the history
  • Loading branch information
tengu-alt committed Nov 13, 2024
1 parent 34fdeeb commit 6c692f7
Show file tree
Hide file tree
Showing 4 changed files with 207 additions and 186 deletions.
150 changes: 150 additions & 0 deletions pkg/hostpool/hostpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package hostpool

import (
"sync"

"github.com/hailocab/go-hostpool"

"github.com/gocql/gocql"
)

// HostPoolHostPolicy is a host policy which uses the bitly/go-hostpool library
// to distribute queries between hosts and prevent sending queries to
// unresponsive hosts. When creating the host pool that is passed to the policy
// use an empty slice of hosts as the hostpool will be populated later by gocql.
// See below for examples of usage:
//
// // Create host selection policy using a simple host pool
// cluster.PoolConfig.HostSelectionPolicy = HostPoolHostPolicy(hostpool.New(nil))
//
// // Create host selection policy using an epsilon greedy pool
// cluster.PoolConfig.HostSelectionPolicy = HostPoolHostPolicy(
// hostpool.NewEpsilonGreedy(nil, 0, &hostpool.LinearEpsilonValueCalculator{}),
// )
func HostPoolHostPolicy(hp hostpool.HostPool) *hostPoolHostPolicy {
return &hostPoolHostPolicy{hostMap: map[string]*gocql.HostInfo{}, hp: hp}
}

type hostPoolHostPolicy struct {
hp hostpool.HostPool
mu sync.RWMutex
hostMap map[string]*gocql.HostInfo
}

func (r *hostPoolHostPolicy) Init(*gocql.Session) {}
func (r *hostPoolHostPolicy) KeyspaceChanged(gocql.KeyspaceUpdateEvent) {}
func (r *hostPoolHostPolicy) SetPartitioner(string) {}
func (r *hostPoolHostPolicy) IsLocal(*gocql.HostInfo) bool { return true }

func (r *hostPoolHostPolicy) SetHosts(hosts []*gocql.HostInfo) {
peers := make([]string, len(hosts))
hostMap := make(map[string]*gocql.HostInfo, len(hosts))

for i, host := range hosts {
ip := host.ConnectAddress().String()
peers[i] = ip
hostMap[ip] = host
}

r.mu.Lock()
r.hp.SetHosts(peers)
r.hostMap = hostMap
r.mu.Unlock()
}

func (r *hostPoolHostPolicy) AddHost(host *gocql.HostInfo) {
ip := host.ConnectAddress().String()

r.mu.Lock()
defer r.mu.Unlock()

// If the host addr is present and isn't nil return
if h, ok := r.hostMap[ip]; ok && h != nil {
return
}
// otherwise, add the host to the map
r.hostMap[ip] = host
// and construct a new peer list to give to the HostPool
hosts := make([]string, 0, len(r.hostMap))
for addr := range r.hostMap {
hosts = append(hosts, addr)
}

r.hp.SetHosts(hosts)
}

func (r *hostPoolHostPolicy) RemoveHost(host *gocql.HostInfo) {
ip := host.ConnectAddress().String()

r.mu.Lock()
defer r.mu.Unlock()

if _, ok := r.hostMap[ip]; !ok {
return
}

delete(r.hostMap, ip)
hosts := make([]string, 0, len(r.hostMap))
for _, host := range r.hostMap {
hosts = append(hosts, host.ConnectAddress().String())
}

r.hp.SetHosts(hosts)
}

func (r *hostPoolHostPolicy) HostUp(host *gocql.HostInfo) {
r.AddHost(host)
}

func (r *hostPoolHostPolicy) HostDown(host *gocql.HostInfo) {
r.RemoveHost(host)
}

func (r *hostPoolHostPolicy) Pick(qry gocql.ExecutableQuery) gocql.NextHost {
return func() gocql.SelectedHost {
r.mu.RLock()
defer r.mu.RUnlock()

if len(r.hostMap) == 0 {
return nil
}

hostR := r.hp.Get()
host, ok := r.hostMap[hostR.Host()]
if !ok {
return nil
}

return selectedHostPoolHost{
policy: r,
info: host,
hostR: hostR,
}
}
}

// selectedHostPoolHost is a host returned by the hostPoolHostPolicy and
// implements the SelectedHost interface
type selectedHostPoolHost struct {
policy *hostPoolHostPolicy
info *gocql.HostInfo
hostR hostpool.HostPoolResponse
}

func (host selectedHostPoolHost) Info() *gocql.HostInfo {
return host.info
}

func (host selectedHostPoolHost) Mark(err error) {
ip := host.info.ConnectAddress().String()

host.policy.mu.RLock()
defer host.policy.mu.RUnlock()

if _, ok := host.policy.hostMap[ip]; !ok {
// host was removed between pick and mark
return
}

host.hostR.Mark(err)
}
57 changes: 57 additions & 0 deletions pkg/hostpool/hostpool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package hostpool

import (
"fmt"
"net"
"testing"

"github.com/hailocab/go-hostpool"

"github.com/gocql/gocql"
)

func TestHostPolicy_HostPool(t *testing.T) {
policy := HostPoolHostPolicy(hostpool.New(nil))

//hosts := []*gocql.HostInfo{
// {hostId: "0", connectAddress: net.IPv4(10, 0, 0, 0)},
// {hostId: "1", connectAddress: net.IPv4(10, 0, 0, 1)},
//}
firstHost := &gocql.HostInfo{}
firstHost.SetHostID("0")
firstHost.SetConnectAddress(net.IPv4(10, 0, 0, 0))
secHost := &gocql.HostInfo{}
secHost.SetHostID("1")
secHost.SetConnectAddress(net.IPv4(10, 0, 0, 1))
hosts := []*gocql.HostInfo{firstHost, secHost}
// Using set host to control the ordering of the hosts as calling "AddHost" iterates the map
// which will result in an unpredictable ordering
policy.SetHosts(hosts)

// the first host selected is actually at [1], but this is ok for RR
// interleaved iteration should always increment the host
iter := policy.Pick(nil)
actualA := iter()
if actualA.Info().HostID() != "0" {
t.Errorf("Expected hosts[0] but was hosts[%s]", actualA.Info().HostID())
}
actualA.Mark(nil)

actualB := iter()
if actualB.Info().HostID() != "1" {
t.Errorf("Expected hosts[1] but was hosts[%s]", actualB.Info().HostID())
}
actualB.Mark(fmt.Errorf("error"))

actualC := iter()
if actualC.Info().HostID() != "0" {
t.Errorf("Expected hosts[0] but was hosts[%s]", actualC.Info().HostID())
}
actualC.Mark(nil)

actualD := iter()
if actualD.Info().HostID() != "0" {
t.Errorf("Expected hosts[0] but was hosts[%s]", actualD.Info().HostID())
}
actualD.Mark(nil)
}
143 changes: 0 additions & 143 deletions policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (
"sync"
"sync/atomic"
"time"

"github.com/hailocab/go-hostpool"
)

// cowHostList implements a copy on write host list, its equivalent type is []*HostInfo
Expand Down Expand Up @@ -670,147 +668,6 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
}
}

// HostPoolHostPolicy is a host policy which uses the bitly/go-hostpool library
// to distribute queries between hosts and prevent sending queries to
// unresponsive hosts. When creating the host pool that is passed to the policy
// use an empty slice of hosts as the hostpool will be populated later by gocql.
// See below for examples of usage:
//
// // Create host selection policy using a simple host pool
// cluster.PoolConfig.HostSelectionPolicy = HostPoolHostPolicy(hostpool.New(nil))
//
// // Create host selection policy using an epsilon greedy pool
// cluster.PoolConfig.HostSelectionPolicy = HostPoolHostPolicy(
// hostpool.NewEpsilonGreedy(nil, 0, &hostpool.LinearEpsilonValueCalculator{}),
// )
func HostPoolHostPolicy(hp hostpool.HostPool) HostSelectionPolicy {
return &hostPoolHostPolicy{hostMap: map[string]*HostInfo{}, hp: hp}
}

type hostPoolHostPolicy struct {
hp hostpool.HostPool
mu sync.RWMutex
hostMap map[string]*HostInfo
}

func (r *hostPoolHostPolicy) Init(*Session) {}
func (r *hostPoolHostPolicy) KeyspaceChanged(KeyspaceUpdateEvent) {}
func (r *hostPoolHostPolicy) SetPartitioner(string) {}
func (r *hostPoolHostPolicy) IsLocal(*HostInfo) bool { return true }

func (r *hostPoolHostPolicy) SetHosts(hosts []*HostInfo) {
peers := make([]string, len(hosts))
hostMap := make(map[string]*HostInfo, len(hosts))

for i, host := range hosts {
ip := host.ConnectAddress().String()
peers[i] = ip
hostMap[ip] = host
}

r.mu.Lock()
r.hp.SetHosts(peers)
r.hostMap = hostMap
r.mu.Unlock()
}

func (r *hostPoolHostPolicy) AddHost(host *HostInfo) {
ip := host.ConnectAddress().String()

r.mu.Lock()
defer r.mu.Unlock()

// If the host addr is present and isn't nil return
if h, ok := r.hostMap[ip]; ok && h != nil {
return
}
// otherwise, add the host to the map
r.hostMap[ip] = host
// and construct a new peer list to give to the HostPool
hosts := make([]string, 0, len(r.hostMap))
for addr := range r.hostMap {
hosts = append(hosts, addr)
}

r.hp.SetHosts(hosts)
}

func (r *hostPoolHostPolicy) RemoveHost(host *HostInfo) {
ip := host.ConnectAddress().String()

r.mu.Lock()
defer r.mu.Unlock()

if _, ok := r.hostMap[ip]; !ok {
return
}

delete(r.hostMap, ip)
hosts := make([]string, 0, len(r.hostMap))
for _, host := range r.hostMap {
hosts = append(hosts, host.ConnectAddress().String())
}

r.hp.SetHosts(hosts)
}

func (r *hostPoolHostPolicy) HostUp(host *HostInfo) {
r.AddHost(host)
}

func (r *hostPoolHostPolicy) HostDown(host *HostInfo) {
r.RemoveHost(host)
}

func (r *hostPoolHostPolicy) Pick(qry ExecutableQuery) NextHost {
return func() SelectedHost {
r.mu.RLock()
defer r.mu.RUnlock()

if len(r.hostMap) == 0 {
return nil
}

hostR := r.hp.Get()
host, ok := r.hostMap[hostR.Host()]
if !ok {
return nil
}

return selectedHostPoolHost{
policy: r,
info: host,
hostR: hostR,
}
}
}

// selectedHostPoolHost is a host returned by the hostPoolHostPolicy and
// implements the SelectedHost interface
type selectedHostPoolHost struct {
policy *hostPoolHostPolicy
info *HostInfo
hostR hostpool.HostPoolResponse
}

func (host selectedHostPoolHost) Info() *HostInfo {
return host.info
}

func (host selectedHostPoolHost) Mark(err error) {
ip := host.info.ConnectAddress().String()

host.policy.mu.RLock()
defer host.policy.mu.RUnlock()

if _, ok := host.policy.hostMap[ip]; !ok {
// host was removed between pick and mark
return
}

host.hostR.Mark(err)
}

type dcAwareRR struct {
local string
localHosts cowHostList
Expand Down
Loading

0 comments on commit 6c692f7

Please sign in to comment.