Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a LatencyAwarePolicy that prioritizes the host with smaller average latency #1831

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

### Added
- Add a LatencyAwarePolicy that prioritizes the host with smaller average latency

### Changed

Expand Down
284 changes: 281 additions & 3 deletions policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ type HostSelectionPolicy interface {
// selection policy.
type SelectedHost interface {
Info() *HostInfo
Mark(error)
Mark(error, uint64)
}

type selectedHost HostInfo
Expand All @@ -322,7 +322,7 @@ func (host *selectedHost) Info() *HostInfo {
return (*HostInfo)(host)
}

func (host *selectedHost) Mark(err error) {}
func (host *selectedHost) Mark(err error, latency uint64) {}

// NextHost is an iteration function over picked hosts
type NextHost func() SelectedHost
Expand Down Expand Up @@ -817,7 +817,7 @@ func (host selectedHostPoolHost) Info() *HostInfo {
return host.info
}

func (host selectedHostPoolHost) Mark(err error) {
func (host selectedHostPoolHost) Mark(err error, latency uint64) {
ip := host.info.ConnectAddress().String()

host.policy.mu.RLock()
Expand Down Expand Up @@ -981,6 +981,284 @@ func (d *rackAwareRR) Pick(q ExecutableQuery) NextHost {
return roundRobbin(int(nextStartOffset), d.hosts[0].get(), d.hosts[1].get(), d.hosts[2].get())
}

// LatencyAwarePolicy is a host selection policy which will prioritize and return hosts with smaller latencies.
// It collects the latencies of the queries to each Cassandra node and maintains a per-node average latency score.
// Nodes that are slower than the best performing nodes by more than a configurable threshold will be deprioritized
// in the query plan
const (
minMeasures = 50
exclusionThreshold = 2
)

var (
scaleInNano = uint64(time.Millisecond.Nanoseconds() * 100)
retryPeriodNanos = uint64(time.Second.Nanoseconds() * 10)
updateMinAvgRate = time.Millisecond * 100
startUpdateMinAvgLatencyOnce sync.Once

hostLatencyThresholdToAccount = uint64(30 * minMeasures / 100)
)

func LatencyAwarePolicy(fallback HostSelectionPolicy) HostSelectionPolicy {
if fallback == nil {
panic("LatencyAwarePolicy should have a fallback HostSelectionPolicy")
}
return &latencyAwarePolicy{latencies: make(map[string]*hostLatencyStat), fallback: fallback,
stopUpdateMinAvgChan: make(chan struct{})}
}

type latencyAwarePolicy struct {
latencies map[string]*hostLatencyStat
mu sync.RWMutex

minAvg uint64
maMu sync.RWMutex

fallback HostSelectionPolicy

stopUpdateMinAvgChan chan struct{}
}

func (l *latencyAwarePolicy) IsLocal(host *HostInfo) bool { return l.fallback.IsLocal(host) }
func (l *latencyAwarePolicy) KeyspaceChanged(update KeyspaceUpdateEvent) {
l.fallback.KeyspaceChanged(update)
}
func (l *latencyAwarePolicy) Init(session *Session) {
l.fallback.Init(session)
}
func (l *latencyAwarePolicy) SetPartitioner(partitioner string) {
l.fallback.SetPartitioner(partitioner)
}

type hostLatencyStat struct {
mu sync.Mutex

thresholdToAccount uint64
scale uint64
timestamp int64
average uint64
numMeasure uint64

host *HostInfo
}

func (h *hostLatencyStat) addNewLatency(latency uint64) {
h.mu.Lock()
defer h.mu.Unlock()

curTimestamp := time.Now().UnixNano()

h.numMeasure = h.numMeasure + 1
if h.numMeasure < h.thresholdToAccount {
h.timestamp = curTimestamp
h.average = 0
return
}

if h.average <= 0 {
h.timestamp = curTimestamp
h.average = latency
return
}

delay := curTimestamp - h.timestamp
// This should be rare and just discard the new latency
if delay <= 0 {
return
}

h.timestamp = curTimestamp
scaledDelay := float64(delay) / float64(h.scale)
prevWeight := math.Log(scaledDelay+1) / scaledDelay
newAverage := (1.0-prevWeight)*float64(latency) + prevWeight*float64(h.average)
h.average = uint64(newAverage)
}

func (l *latencyAwarePolicy) AddHost(host *HostInfo) {
ip := host.ConnectAddress().String()

l.mu.RLock()
if h, ok := l.latencies[ip]; ok && h != nil {
l.mu.RUnlock()
return
}
l.mu.RUnlock()

l.mu.Lock()
l.latencies[ip] = &hostLatencyStat{thresholdToAccount: hostLatencyThresholdToAccount,
scale: scaleInNano, timestamp: time.Now().UnixNano(), average: 0, host: host}
l.mu.Unlock()

l.fallback.AddHost(host)
}

func (l *latencyAwarePolicy) RemoveHost(host *HostInfo) {
ip := host.ConnectAddress().String()

l.mu.RLock()
if _, ok := l.latencies[ip]; !ok {
l.mu.RUnlock()
return
}
l.mu.RUnlock()

l.mu.Lock()
delete(l.latencies, ip)
l.mu.Unlock()

l.fallback.RemoveHost(host)
}

func (l *latencyAwarePolicy) HostUp(host *HostInfo) {
l.AddHost(host)
}

func (l *latencyAwarePolicy) HostDown(host *HostInfo) {
l.RemoveHost(host)
}

func (l *latencyAwarePolicy) Pick(qry ExecutableQuery) NextHost {
if l.latencies == nil || len(l.latencies) == 0 {
return l.fallback.Pick(qry)
}
// Start the thread to update the minAvg
startUpdateMinAvgLatencyOnce.Do(func() {
go l.updateMinAvgLatency()
})

return func() SelectedHost {
fallbackIter := l.fallback.Pick(qry)

l.mu.RLock()
defer l.mu.RUnlock()

skipped := make([]*HostInfo, len(l.latencies))
now := uint64(time.Now().UnixNano())

fallbackHost := fallbackIter()
for {
if fallbackHost != nil {
ip := fallbackHost.Info().connectAddress.String()

stat, ok := l.latencies[ip]
l.maMu.RLock()
elapsedTime := now - uint64(stat.timestamp)
if !ok || l.minAvg <= 0 || stat.numMeasure < minMeasures || elapsedTime > retryPeriodNanos {
l.maMu.RUnlock()
return selectedLatencyAwareHost{
policy: l,
info: l.latencies[ip].host,
}
}
l.maMu.RUnlock()
exclusionThresholdTime := l.minAvg * exclusionThreshold
if stat.average > 0 && stat.average <= exclusionThresholdTime {
return selectedLatencyAwareHost{
policy: l,
info: l.latencies[ip].host,
}
}
skipped = append(skipped, l.latencies[ip].host)
fallbackHost = fallbackIter()
} else {
break
}
}
randomIdx := rand.Intn(len(skipped))
return selectedLatencyAwareHost{policy: l, info: skipped[randomIdx]}
}
}

func (l *latencyAwarePolicy) updateHostLatency(ip string, latency uint64) {
l.mu.RLock()
defer l.mu.RUnlock()

if _, ok := l.latencies[ip]; !ok {
return
}

l.latencies[ip].addNewLatency(latency)
}

func (l *latencyAwarePolicy) updateMinAvgLatency() {
// Update the min avg latency with rate
updateTicker := time.NewTicker(updateMinAvgRate)
defer updateTicker.Stop()

for {
select {
case <-updateTicker.C:
maxUint64 := uint64(math.MaxUint64)
minLatency := maxUint64
l.mu.RLock()
now := time.Now().UnixNano()
for _, stat := range l.latencies {
elapsedTime := uint64(now - stat.timestamp)
if stat != nil && stat.average > 0 && stat.numMeasure >= minMeasures && minLatency > stat.average &&
elapsedTime <= retryPeriodNanos {
minLatency = stat.average
}
}
l.mu.RUnlock()

if minLatency != maxUint64 {
l.maMu.Lock()
l.minAvg = minLatency
l.maMu.Unlock()
}
case <-l.stopUpdateMinAvgChan:
return
}
}
}

func (l *latencyAwarePolicy) stopUpdateMinAvgLatency() {
l.stopUpdateMinAvgChan <- struct{}{}
close(l.stopUpdateMinAvgChan)
}

// selectedLatencyAwareHost is a host returned by the latencyAwarePolicy and
// implements the SelectedHost interface
type selectedLatencyAwareHost struct {
policy *latencyAwarePolicy
info *HostInfo
}

func (host selectedLatencyAwareHost) Info() *HostInfo {
return host.info
}

func (host selectedLatencyAwareHost) Mark(err error, latency uint64) {
ip := host.info.ConnectAddress().String()

host.policy.mu.RLock()
defer host.policy.mu.RUnlock()

// host was removed between pick and mark
if _, ok := host.policy.latencies[ip]; !ok {
return
}

if ok := host.shouldConsiderNewLatency(err); ok {
host.policy.updateHostLatency(ip, latency)
}
}

func (host selectedLatencyAwareHost) shouldConsiderNewLatency(err error) bool {
if err == nil {
return true
}
var errFrame errorFrame
if errors.As(err, &errFrame) {
if errFrame.code == ErrCodeServer || errFrame.code == ErrCodeOverloaded ||
errFrame.code == ErrCodeBootstrapping || errFrame.code == ErrCodeUnprepared ||
errFrame.code == ErrCodeInvalid {
return false
}
}
return true
}

// ReadyPolicy defines a policy for when a HostSelectionPolicy can be used. After
// each host connects during session initialization, the Ready method will be
// called. If you only need a single Host to be up you can wrap a
Expand Down
Loading