Skip to content

Commit

Permalink
improve performance
Browse files Browse the repository at this point in the history
  • Loading branch information
alphadose committed Jun 17, 2022
1 parent f963644 commit 3fb829d
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 58 deletions.
48 changes: 24 additions & 24 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"unsafe"
)

// a single slot for a worker in the thread pool
// a single slot for a worker in Pool
type slot struct {
threadPtr unsafe.Pointer
task func()
Expand All @@ -32,79 +32,79 @@ func NewPool(size uint64) *Pool {
// it first tries to use already existing goroutines
// if all existing goroutines are present, it tries to add a new goroutine to the pool if the pool capacity is not exceeded
// in case the pool capacity exits, this function yields the processor to other goroutines and loops again for finding available workers
func (p *Pool) Submit(task func()) {
func (self *Pool) Submit(task func()) {
var s *slot
for {
if s = p.pop(); s != nil {
if s = self.pop(); s != nil {
s.task = task
safe_ready(s.threadPtr)
return
} else if atomic.AddUint64(&p.currSize, 1) <= p.maxSize {
} else if atomic.AddUint64(&self.currSize, 1) <= self.maxSize {
s = &slot{task: task}
go p.loopQ(s)
go self.loopQ(s)
return
} else {
atomic.AddUint64(&p.currSize, uint64SubtractionConstant)
atomic.AddUint64(&self.currSize, uint64SubtractionConstant)
mcall(gosched_m)
}
}
}

// loopQ is the looping function for every worker goroutine
func (p *Pool) loopQ(s *slot) {
func (self *Pool) loopQ(s *slot) {
// store self goroutine pointer
s.threadPtr = GetG()
for {
// exec task
s.task()
// notify availability by pushing self reference into stack
p.push(s)
self.push(s)
// park and wait for call
mcall(fast_park)
}
}

// global memory pool for all items used in Pool
var itemPool = sync.Pool{New: func() any { return &directItem{next: nil, value: nil} }}
var itemPool = sync.Pool{New: func() any { return new(node) }}

// internal lock-free stack implementation for parking and waking up goroutines
// Credits -> https://github.com/golang-design/lockfree

// a single item in this stack
type directItem struct {
// a single node in this stack
type node struct {
next unsafe.Pointer
value *slot
}

// Pop pops value from the top of the stack
func (s *Pool) pop() (value *slot) {
// pop pops value from the top of the stack
func (self *Pool) pop() (value *slot) {
var top, next unsafe.Pointer
for {
top = atomic.LoadPointer(&s.top)
top = atomic.LoadPointer(&self.top)
if top == nil {
return
}
next = atomic.LoadPointer(&(*directItem)(top).next)
if atomic.CompareAndSwapPointer(&s.top, top, next) {
value = (*directItem)(top).value
(*directItem)(top).next, (*directItem)(top).value = nil, nil
itemPool.Put((*directItem)(top))
next = atomic.LoadPointer(&(*node)(top).next)
if atomic.CompareAndSwapPointer(&self.top, top, next) {
value = (*node)(top).value
(*node)(top).next, (*node)(top).value = nil, nil
itemPool.Put((*node)(top))
return
}
}
}

// Push pushes a value on top of the stack
func (s *Pool) push(v *slot) {
// push pushes a value on top of the stack
func (self *Pool) push(v *slot) {
var (
top unsafe.Pointer
item = itemPool.Get().(*directItem)
item = itemPool.Get().(*node)
)
item.value = v
for {
top = atomic.LoadPointer(&s.top)
top = atomic.LoadPointer(&self.top)
item.next = top
if atomic.CompareAndSwapPointer(&s.top, top, unsafe.Pointer(item)) {
if atomic.CompareAndSwapPointer(&self.top, top, unsafe.Pointer(item)) {
return
}
}
Expand Down
71 changes: 37 additions & 34 deletions pool_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@ import (
"unsafe"
)

// a single point in PoolWithFunc
type dataPoint[T any] struct {
// a single slot for a worker in PoolWithFunc
type slotFunc[T any] struct {
threadPtr unsafe.Pointer
data T
}

// PoolWithFunc is used for spawning workers for a single pre-defined function with myriad inputs
// useful for throughput bound cases
// has lower memory usage and allocs per op than the default Pool
// ( type -> func(T) {} ) where T is a generic parameter
type PoolWithFunc[T any] struct {
currSize uint64
_p1 [cacheLinePadSize - unsafe.Sizeof(uint64(0))]byte
Expand All @@ -23,82 +25,83 @@ type PoolWithFunc[T any] struct {
_p3 [cacheLinePadSize - unsafe.Sizeof(func(T) {})]byte
top unsafe.Pointer
_p4 [cacheLinePadSize - unsafe.Sizeof(unsafe.Pointer(nil))]byte
sync.Pool
}

// NewPoolWithFunc returns a new PoolWithFunc
func NewPoolWithFunc[T any](size uint64, task func(T)) *PoolWithFunc[T] {
return &PoolWithFunc[T]{maxSize: size, task: task}
return &PoolWithFunc[T]{maxSize: size, task: task, Pool: sync.Pool{
New: func() any { return new(dataItem[T]) },
}}
}

// Invoke invokes the pre-defined method in PoolWithFunc by assigning the data to an already existing worker
// or spawning a new worker given queue size is in limits
func (p *PoolWithFunc[T]) Invoke(value T) {
var s unsafe.Pointer
func (self *PoolWithFunc[T]) Invoke(value T) {
var s *slotFunc[T]
for {
if s = p.pop(); s != nil {
(*dataPoint[T])(s).data = value
safe_ready((*dataPoint[T])(s).threadPtr)
if s = self.pop(); s != nil {
s.data = value
safe_ready(s.threadPtr)
return
} else if atomic.AddUint64(&p.currSize, 1) <= p.maxSize {
go p.loopQ(&dataPoint[T]{data: value})
} else if atomic.AddUint64(&self.currSize, 1) <= self.maxSize {
s = &slotFunc[T]{data: value}
go self.loopQ(s)
return
} else {
atomic.AddUint64(&p.currSize, uint64SubtractionConstant)
atomic.AddUint64(&self.currSize, uint64SubtractionConstant)
mcall(gosched_m)
}
}
}

// represents the infinite loop for a worker goroutine
func (p *PoolWithFunc[T]) loopQ(d *dataPoint[T]) {
func (self *PoolWithFunc[T]) loopQ(d *slotFunc[T]) {
d.threadPtr = GetG()
for {
p.task(d.data)
p.push(unsafe.Pointer(d))
self.task(d.data)
self.push(d)
mcall(fast_park)
}
}

// global memory pool for all items used in PoolWithFunc
var dataPool = sync.Pool{New: func() any { return &dataItem{next: nil, value: nil} }}
// Stack implementation below for storing goroutine references

// Stack implementation below

//
type dataItem struct {
// a single node in the stack
type dataItem[T any] struct {
next unsafe.Pointer
value unsafe.Pointer
value *slotFunc[T]
}

// Pop pops value from the top of the stack
func (s *PoolWithFunc[T]) pop() (value unsafe.Pointer) {
// pop pops value from the top of the stack
func (self *PoolWithFunc[T]) pop() (value *slotFunc[T]) {
var top, next unsafe.Pointer
for {
top = atomic.LoadPointer(&s.top)
top = atomic.LoadPointer(&self.top)
if top == nil {
return
}
next = atomic.LoadPointer(&(*dataItem)(top).next)
if atomic.CompareAndSwapPointer(&s.top, top, next) {
value = (*dataItem)(top).value
(*dataItem)(top).next, (*dataItem)(top).value = nil, nil
dataPool.Put((*dataItem)(top))
next = atomic.LoadPointer(&(*dataItem[T])(top).next)
if atomic.CompareAndSwapPointer(&self.top, top, next) {
value = (*dataItem[T])(top).value
(*dataItem[T])(top).next, (*dataItem[T])(top).value = nil, nil
self.Put((*dataItem[T])(top))
return
}
}
}

// Push pushes a value on top of the stack
func (s *PoolWithFunc[T]) push(v unsafe.Pointer) {
// push pushes a value on top of the stack
func (self *PoolWithFunc[T]) push(v *slotFunc[T]) {
var (
top unsafe.Pointer
item = dataPool.Get().(*dataItem)
item = self.Get().(*dataItem[T])
)
item.value = v
for {
top = atomic.LoadPointer(&s.top)
top = atomic.LoadPointer(&self.top)
item.next = top
if atomic.CompareAndSwapPointer(&s.top, top, unsafe.Pointer(item)) {
if atomic.CompareAndSwapPointer(&self.top, top, unsafe.Pointer(item)) {
return
}
}
Expand Down

0 comments on commit 3fb829d

Please sign in to comment.