diff --git a/prometheus/atomic_update.go b/prometheus/atomic_update.go new file mode 100644 index 000000000..b65896a31 --- /dev/null +++ b/prometheus/atomic_update.go @@ -0,0 +1,50 @@ +// Copyright 2014 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package prometheus + +import ( + "math" + "sync/atomic" + "time" +) + +// atomicUpdateFloat atomically updates the float64 value pointed to by bits +// using the provided updateFunc, with an exponential backoff on contention. +func atomicUpdateFloat(bits *uint64, updateFunc func(float64) float64) { + const ( + // both numbers are derived from empirical observations + // documented in this PR: https://github.com/prometheus/client_golang/pull/1661 + maxBackoff = 320 * time.Millisecond + initialBackoff = 10 * time.Millisecond + ) + backoff := initialBackoff + + for { + loadedBits := atomic.LoadUint64(bits) + oldFloat := math.Float64frombits(loadedBits) + newFloat := updateFunc(oldFloat) + newBits := math.Float64bits(newFloat) + + if atomic.CompareAndSwapUint64(bits, loadedBits, newBits) { + break + } else { + // Exponential backoff with sleep and cap to avoid infinite wait + time.Sleep(backoff) + backoff *= 2 + if backoff > maxBackoff { + backoff = maxBackoff + } + } + } +} diff --git a/prometheus/atomic_update_test.go b/prometheus/atomic_update_test.go new file mode 100644 index 000000000..0233bc71f --- /dev/null +++ b/prometheus/atomic_update_test.go @@ -0,0 +1,167 @@ +// Copyright 2014 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package prometheus + +import ( + "math" + "sync" + "sync/atomic" + "testing" + "unsafe" +) + +var output float64 + +func TestAtomicUpdateFloat(t *testing.T) { + var val float64 = 0.0 + bits := (*uint64)(unsafe.Pointer(&val)) + var wg sync.WaitGroup + numGoroutines := 100000 + increment := 1.0 + + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + atomicUpdateFloat(bits, func(f float64) float64 { + return f + increment + }) + }() + } + + wg.Wait() + expected := float64(numGoroutines) * increment + if val != expected { + t.Errorf("Expected %f, got %f", expected, val) + } +} + +// Benchmark for atomicUpdateFloat with single goroutine (no contention). +func BenchmarkAtomicUpdateFloat_SingleGoroutine(b *testing.B) { + var val float64 = 0.0 + bits := (*uint64)(unsafe.Pointer(&val)) + + for i := 0; i < b.N; i++ { + atomicUpdateFloat(bits, func(f float64) float64 { + return f + 1.0 + }) + } + + output = val +} + +// Benchmark for old implementation with single goroutine (no contention) -> to check overhead of backoff +func BenchmarkAtomicNoBackoff_SingleGoroutine(b *testing.B) { + var val float64 = 0.0 + bits := (*uint64)(unsafe.Pointer(&val)) + + for i := 0; i < b.N; i++ { + for { + loadedBits := atomic.LoadUint64(bits) + newBits := math.Float64bits(math.Float64frombits(loadedBits) + 1.0) + if atomic.CompareAndSwapUint64(bits, loadedBits, newBits) { + break + } + } + } + + output = val +} + +// Benchmark varying the number of goroutines. +func benchmarkAtomicUpdateFloatConcurrency(b *testing.B, numGoroutines int) { + var val float64 = 0.0 + bits := (*uint64)(unsafe.Pointer(&val)) + b.SetParallelism(numGoroutines) + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + atomicUpdateFloat(bits, func(f float64) float64 { + return f + 1.0 + }) + } + }) + + output = val +} + +func benchmarkAtomicNoBackoffFloatConcurrency(b *testing.B, numGoroutines int) { + var val float64 = 0.0 + bits := (*uint64)(unsafe.Pointer(&val)) + b.SetParallelism(numGoroutines) + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + for { + loadedBits := atomic.LoadUint64(bits) + newBits := math.Float64bits(math.Float64frombits(loadedBits) + 1.0) + if atomic.CompareAndSwapUint64(bits, loadedBits, newBits) { + break + } + } + } + }) + + output = val +} + +func BenchmarkAtomicUpdateFloat_1Goroutine(b *testing.B) { + benchmarkAtomicUpdateFloatConcurrency(b, 1) +} + +func BenchmarkAtomicNoBackoff_1Goroutine(b *testing.B) { + benchmarkAtomicNoBackoffFloatConcurrency(b, 1) +} + +func BenchmarkAtomicUpdateFloat_2Goroutines(b *testing.B) { + benchmarkAtomicUpdateFloatConcurrency(b, 2) +} + +func BenchmarkAtomicNoBackoff_2Goroutines(b *testing.B) { + benchmarkAtomicNoBackoffFloatConcurrency(b, 2) +} + +func BenchmarkAtomicUpdateFloat_4Goroutines(b *testing.B) { + benchmarkAtomicUpdateFloatConcurrency(b, 4) +} + +func BenchmarkAtomicNoBackoff_4Goroutines(b *testing.B) { + benchmarkAtomicNoBackoffFloatConcurrency(b, 4) +} + +func BenchmarkAtomicUpdateFloat_8Goroutines(b *testing.B) { + benchmarkAtomicUpdateFloatConcurrency(b, 8) +} + +func BenchmarkAtomicNoBackoff_8Goroutines(b *testing.B) { + benchmarkAtomicNoBackoffFloatConcurrency(b, 8) +} + +func BenchmarkAtomicUpdateFloat_16Goroutines(b *testing.B) { + benchmarkAtomicUpdateFloatConcurrency(b, 16) +} + +func BenchmarkAtomicNoBackoff_16Goroutines(b *testing.B) { + benchmarkAtomicNoBackoffFloatConcurrency(b, 16) +} + +func BenchmarkAtomicUpdateFloat_32Goroutines(b *testing.B) { + benchmarkAtomicUpdateFloatConcurrency(b, 32) +} + +func BenchmarkAtomicNoBackoff_32Goroutines(b *testing.B) { + benchmarkAtomicNoBackoffFloatConcurrency(b, 32) +} diff --git a/prometheus/counter.go b/prometheus/counter.go index 4ce84e7a8..2996aef6a 100644 --- a/prometheus/counter.go +++ b/prometheus/counter.go @@ -134,13 +134,9 @@ func (c *counter) Add(v float64) { return } - for { - oldBits := atomic.LoadUint64(&c.valBits) - newBits := math.Float64bits(math.Float64frombits(oldBits) + v) - if atomic.CompareAndSwapUint64(&c.valBits, oldBits, newBits) { - return - } - } + atomicUpdateFloat(&c.valBits, func(oldVal float64) float64 { + return oldVal + v + }) } func (c *counter) AddWithExemplar(v float64, e Labels) { diff --git a/prometheus/gauge.go b/prometheus/gauge.go index dd2eac940..aa1846365 100644 --- a/prometheus/gauge.go +++ b/prometheus/gauge.go @@ -120,13 +120,9 @@ func (g *gauge) Dec() { } func (g *gauge) Add(val float64) { - for { - oldBits := atomic.LoadUint64(&g.valBits) - newBits := math.Float64bits(math.Float64frombits(oldBits) + val) - if atomic.CompareAndSwapUint64(&g.valBits, oldBits, newBits) { - return - } - } + atomicUpdateFloat(&g.valBits, func(oldVal float64) float64 { + return oldVal + val + }) } func (g *gauge) Sub(val float64) { diff --git a/prometheus/histogram.go b/prometheus/histogram.go index 08bef3d87..24fcab3ab 100644 --- a/prometheus/histogram.go +++ b/prometheus/histogram.go @@ -1621,13 +1621,9 @@ func waitForCooldown(count uint64, counts *histogramCounts) { // atomicAddFloat adds the provided float atomically to another float // represented by the bit pattern the bits pointer is pointing to. func atomicAddFloat(bits *uint64, v float64) { - for { - loadedBits := atomic.LoadUint64(bits) - newBits := math.Float64bits(math.Float64frombits(loadedBits) + v) - if atomic.CompareAndSwapUint64(bits, loadedBits, newBits) { - break - } - } + atomicUpdateFloat(bits, func(oldVal float64) float64 { + return oldVal + v + }) } // atomicDecUint32 atomically decrements the uint32 p points to. See diff --git a/prometheus/process_collector_cgo_darwin.go b/prometheus/process_collector_cgo_darwin.go index 6f48e5845..fc4c24713 100644 --- a/prometheus/process_collector_cgo_darwin.go +++ b/prometheus/process_collector_cgo_darwin.go @@ -22,9 +22,7 @@ import "C" import "fmt" func getMemory() (*memoryInfo, error) { - var ( - rss, vsize C.ulonglong - ) + var rss, vsize C.ulonglong if err := C.get_memory_info(&rss, &vsize); err != 0 { return nil, fmt.Errorf("task_info() failed with 0x%x", int(err)) diff --git a/prometheus/summary.go b/prometheus/summary.go index 1ab0e4796..fb2bf2b25 100644 --- a/prometheus/summary.go +++ b/prometheus/summary.go @@ -468,13 +468,9 @@ func (s *noObjectivesSummary) Observe(v float64) { n := atomic.AddUint64(&s.countAndHotIdx, 1) hotCounts := s.counts[n>>63] - for { - oldBits := atomic.LoadUint64(&hotCounts.sumBits) - newBits := math.Float64bits(math.Float64frombits(oldBits) + v) - if atomic.CompareAndSwapUint64(&hotCounts.sumBits, oldBits, newBits) { - break - } - } + atomicUpdateFloat(&hotCounts.sumBits, func(oldVal float64) float64 { + return oldVal + v + }) // Increment count last as we take it as a signal that the observation // is complete. atomic.AddUint64(&hotCounts.count, 1) @@ -516,14 +512,13 @@ func (s *noObjectivesSummary) Write(out *dto.Metric) error { // Finally add all the cold counts to the new hot counts and reset the cold counts. atomic.AddUint64(&hotCounts.count, count) atomic.StoreUint64(&coldCounts.count, 0) - for { - oldBits := atomic.LoadUint64(&hotCounts.sumBits) - newBits := math.Float64bits(math.Float64frombits(oldBits) + sum.GetSampleSum()) - if atomic.CompareAndSwapUint64(&hotCounts.sumBits, oldBits, newBits) { - atomic.StoreUint64(&coldCounts.sumBits, 0) - break - } - } + + // Use atomicUpdateFloat to update hotCounts.sumBits atomically. + atomicUpdateFloat(&hotCounts.sumBits, func(oldVal float64) float64 { + return oldVal + sum.GetSampleSum() + }) + atomic.StoreUint64(&coldCounts.sumBits, 0) + return nil }