Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add: exponential backoff for CAS operations on floats #1661

Merged
merged 5 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions prometheus/atomic_update.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2014 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package prometheus

import (
"math"
"sync/atomic"
"time"
)

// atomicUpdateFloat atomically updates the float64 value pointed to by bits
// using the provided updateFunc, with an exponential backoff on contention.
func atomicUpdateFloat(bits *uint64, updateFunc func(float64) float64) {
const (
maxBackoff = 320 * time.Millisecond
initialBackoff = 10 * time.Millisecond
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a comment explaining that those numbers were chosen based on empirical testing in one particular environment (AWS c7i.2xlarge) ?

backoff := initialBackoff

for {
loadedBits := atomic.LoadUint64(bits)
oldFloat := math.Float64frombits(loadedBits)
newFloat := updateFunc(oldFloat)
newBits := math.Float64bits(newFloat)

if atomic.CompareAndSwapUint64(bits, loadedBits, newBits) {
break
} else {
// Exponential backoff with sleep and cap to avoid infinite wait
time.Sleep(backoff)
backoff *= 2
if backoff > maxBackoff {
backoff = maxBackoff
}
}
}
}
167 changes: 167 additions & 0 deletions prometheus/atomic_update_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// Copyright 2014 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package prometheus

import (
"math"
"sync"
"sync/atomic"
"testing"
"unsafe"
)

var output float64

func TestAtomicUpdateFloat(t *testing.T) {
var val float64 = 0.0
bits := (*uint64)(unsafe.Pointer(&val))
var wg sync.WaitGroup
numGoroutines := 100000
increment := 1.0

for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
atomicUpdateFloat(bits, func(f float64) float64 {
return f + increment
})
}()
}

wg.Wait()
expected := float64(numGoroutines) * increment
if val != expected {
t.Errorf("Expected %f, got %f", expected, val)
}
}

// Benchmark for atomicUpdateFloat with single goroutine (no contention).
func BenchmarkAtomicUpdateFloat_SingleGoroutine(b *testing.B) {
var val float64 = 0.0
bits := (*uint64)(unsafe.Pointer(&val))

for i := 0; i < b.N; i++ {
atomicUpdateFloat(bits, func(f float64) float64 {
return f + 1.0
})
}

output = val
}

// Benchmark for old implementation with single goroutine (no contention) -> to check overhead of backoff
func BenchmarkAtomicNoBackoff_SingleGoroutine(b *testing.B) {
var val float64 = 0.0
bits := (*uint64)(unsafe.Pointer(&val))

for i := 0; i < b.N; i++ {
for {
loadedBits := atomic.LoadUint64(bits)
newBits := math.Float64bits(math.Float64frombits(loadedBits) + 1.0)
if atomic.CompareAndSwapUint64(bits, loadedBits, newBits) {
break
}
}
}

output = val
}

// Benchmark varying the number of goroutines.
func benchmarkAtomicUpdateFloatConcurrency(b *testing.B, numGoroutines int) {
var val float64 = 0.0
bits := (*uint64)(unsafe.Pointer(&val))
b.SetParallelism(numGoroutines)

b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
atomicUpdateFloat(bits, func(f float64) float64 {
return f + 1.0
})
}
})

output = val
}

func benchmarkAtomicNoBackoffFloatConcurrency(b *testing.B, numGoroutines int) {
var val float64 = 0.0
bits := (*uint64)(unsafe.Pointer(&val))
b.SetParallelism(numGoroutines)

b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
for {
loadedBits := atomic.LoadUint64(bits)
newBits := math.Float64bits(math.Float64frombits(loadedBits) + 1.0)
if atomic.CompareAndSwapUint64(bits, loadedBits, newBits) {
break
}
}
}
})

output = val
}

func BenchmarkAtomicUpdateFloat_1Goroutine(b *testing.B) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels we could use b.Run here as I was proposing in https://www.bwplotka.dev/2024/go-microbenchmarks-benchstat/

Any good reasons to NOT do it? (e.g. "goroutines=%v")

Nevertheless we can do later, not a blocker for this PR, thanks!

benchmarkAtomicUpdateFloatConcurrency(b, 1)
}

func BenchmarkAtomicNoBackoff_1Goroutine(b *testing.B) {
benchmarkAtomicNoBackoffFloatConcurrency(b, 1)
}

func BenchmarkAtomicUpdateFloat_2Goroutines(b *testing.B) {
benchmarkAtomicUpdateFloatConcurrency(b, 2)
}

func BenchmarkAtomicNoBackoff_2Goroutines(b *testing.B) {
benchmarkAtomicNoBackoffFloatConcurrency(b, 2)
}

func BenchmarkAtomicUpdateFloat_4Goroutines(b *testing.B) {
benchmarkAtomicUpdateFloatConcurrency(b, 4)
}

func BenchmarkAtomicNoBackoff_4Goroutines(b *testing.B) {
benchmarkAtomicNoBackoffFloatConcurrency(b, 4)
}

func BenchmarkAtomicUpdateFloat_8Goroutines(b *testing.B) {
benchmarkAtomicUpdateFloatConcurrency(b, 8)
}

func BenchmarkAtomicNoBackoff_8Goroutines(b *testing.B) {
benchmarkAtomicNoBackoffFloatConcurrency(b, 8)
}

func BenchmarkAtomicUpdateFloat_16Goroutines(b *testing.B) {
benchmarkAtomicUpdateFloatConcurrency(b, 16)
}

func BenchmarkAtomicNoBackoff_16Goroutines(b *testing.B) {
benchmarkAtomicNoBackoffFloatConcurrency(b, 16)
}

func BenchmarkAtomicUpdateFloat_32Goroutines(b *testing.B) {
benchmarkAtomicUpdateFloatConcurrency(b, 32)
}

func BenchmarkAtomicNoBackoff_32Goroutines(b *testing.B) {
benchmarkAtomicNoBackoffFloatConcurrency(b, 32)
}
10 changes: 3 additions & 7 deletions prometheus/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,9 @@ func (c *counter) Add(v float64) {
return
}

for {
oldBits := atomic.LoadUint64(&c.valBits)
newBits := math.Float64bits(math.Float64frombits(oldBits) + v)
if atomic.CompareAndSwapUint64(&c.valBits, oldBits, newBits) {
return
}
}
atomicUpdateFloat(&c.valBits, func(oldVal float64) float64 {
return oldVal + v
})
}

func (c *counter) AddWithExemplar(v float64, e Labels) {
Expand Down
10 changes: 3 additions & 7 deletions prometheus/gauge.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,9 @@ func (g *gauge) Dec() {
}

func (g *gauge) Add(val float64) {
for {
oldBits := atomic.LoadUint64(&g.valBits)
newBits := math.Float64bits(math.Float64frombits(oldBits) + val)
if atomic.CompareAndSwapUint64(&g.valBits, oldBits, newBits) {
return
}
}
atomicUpdateFloat(&g.valBits, func(oldVal float64) float64 {
return oldVal + val
})
}

func (g *gauge) Sub(val float64) {
Expand Down
10 changes: 3 additions & 7 deletions prometheus/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -1621,13 +1621,9 @@ func waitForCooldown(count uint64, counts *histogramCounts) {
// atomicAddFloat adds the provided float atomically to another float
// represented by the bit pattern the bits pointer is pointing to.
func atomicAddFloat(bits *uint64, v float64) {
for {
loadedBits := atomic.LoadUint64(bits)
newBits := math.Float64bits(math.Float64frombits(loadedBits) + v)
if atomic.CompareAndSwapUint64(bits, loadedBits, newBits) {
break
}
}
atomicUpdateFloat(bits, func(oldVal float64) float64 {
return oldVal + v
})
}

// atomicDecUint32 atomically decrements the uint32 p points to. See
Expand Down
4 changes: 1 addition & 3 deletions prometheus/process_collector_cgo_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ import "C"
import "fmt"

func getMemory() (*memoryInfo, error) {
var (
rss, vsize C.ulonglong
)
var rss, vsize C.ulonglong

if err := C.get_memory_info(&rss, &vsize); err != 0 {
return nil, fmt.Errorf("task_info() failed with 0x%x", int(err))
Expand Down
25 changes: 10 additions & 15 deletions prometheus/summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,13 +468,9 @@ func (s *noObjectivesSummary) Observe(v float64) {
n := atomic.AddUint64(&s.countAndHotIdx, 1)
hotCounts := s.counts[n>>63]

for {
oldBits := atomic.LoadUint64(&hotCounts.sumBits)
newBits := math.Float64bits(math.Float64frombits(oldBits) + v)
if atomic.CompareAndSwapUint64(&hotCounts.sumBits, oldBits, newBits) {
break
}
}
atomicUpdateFloat(&hotCounts.sumBits, func(oldVal float64) float64 {
return oldVal + v
})
// Increment count last as we take it as a signal that the observation
// is complete.
atomic.AddUint64(&hotCounts.count, 1)
Expand Down Expand Up @@ -516,14 +512,13 @@ func (s *noObjectivesSummary) Write(out *dto.Metric) error {
// Finally add all the cold counts to the new hot counts and reset the cold counts.
atomic.AddUint64(&hotCounts.count, count)
atomic.StoreUint64(&coldCounts.count, 0)
for {
oldBits := atomic.LoadUint64(&hotCounts.sumBits)
newBits := math.Float64bits(math.Float64frombits(oldBits) + sum.GetSampleSum())
if atomic.CompareAndSwapUint64(&hotCounts.sumBits, oldBits, newBits) {
atomic.StoreUint64(&coldCounts.sumBits, 0)
break
}
}

// Use atomicUpdateFloat to update hotCounts.sumBits atomically.
atomicUpdateFloat(&hotCounts.sumBits, func(oldVal float64) float64 {
return oldVal + sum.GetSampleSum()
})
atomic.StoreUint64(&coldCounts.sumBits, 0)

return nil
}

Expand Down
Loading