Skip to content

Commit

Permalink
Feat: started cleaning up code
Browse files Browse the repository at this point in the history
  • Loading branch information
hmoog committed Mar 11, 2024
1 parent dbc652f commit fec5472
Show file tree
Hide file tree
Showing 7 changed files with 270 additions and 16 deletions.
25 changes: 25 additions & 0 deletions ds/reactive/wait_group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package reactive

// WaitGroup is a reactive Event that waits for a set of elements to be done and that allows to inspect the pending
// elements.
type WaitGroup[T comparable] interface {
// Event returns the event that is triggered when all elements are done.
Event

// Add adds the given elements to the wait group.
Add(elements ...T)

// Done marks the given elements as done and triggers the wait group if all elements are done.
Done(elements ...T)

// Wait waits until all elements are done.
Wait()

// PendingElements returns the currently pending elements.
PendingElements() ReadableSet[T]
}

// NewWaitGroup creates a new WaitGroup.
func NewWaitGroup[T comparable](elements ...T) WaitGroup[T] {
return newWaitGroup(elements...)
}
66 changes: 66 additions & 0 deletions ds/reactive/wait_group_impl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package reactive

import (
"sync"
"sync/atomic"
)

// waitGroup is the default implementation of the WaitGroup interface.
type waitGroup[T comparable] struct {
// Event embeds the Event that is triggered when all elements are done.
Event

// pendingElements contains the currently pending elements.
pendingElements Set[T]

// pendingElementsCounter is the thread-safe counter that keeps track of the number of pending elements.
pendingElementsCounter atomic.Int32
}

// newWaitGroup creates a new wait group.
func newWaitGroup[T comparable](elements ...T) *waitGroup[T] {
w := &waitGroup[T]{
Event: NewEvent(),
pendingElements: NewSet[T](),
}

w.Add(elements...)

return w
}

// Add adds the given elements to the wait group.
func (w *waitGroup[T]) Add(elements ...T) {
// first increase the counter so that the trigger is not executed before all elements are added
w.pendingElementsCounter.Add(int32(len(elements)))

// then add the elements (and correct the counter if the elements are already present)
for _, element := range elements {
if !w.pendingElements.Add(element) {
w.pendingElementsCounter.Add(-1)
}
}
}

// Done marks the given elements as done and triggers the wait group if all elements are done.
func (w *waitGroup[T]) Done(elements ...T) {
for _, element := range elements {
if w.pendingElements.Delete(element) && w.pendingElementsCounter.Add(-1) == 0 {
w.Trigger()
}
}
}

// Wait waits until all elements are done.
func (w *waitGroup[T]) Wait() {
var wg sync.WaitGroup

wg.Add(1)
w.OnTrigger(wg.Done)
wg.Wait()
}

// PendingElements returns the currently pending elements.
func (w *waitGroup[T]) PendingElements() ReadableSet[T] {
return w.pendingElements
}
39 changes: 39 additions & 0 deletions ds/reactive/wait_group_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package reactive

import (
"testing"

"github.com/stretchr/testify/require"
)

func Test(t *testing.T) {
wg := NewWaitGroup(1, 2, 3)

require.True(t, wg.PendingElements().HasAll(NewSet(1, 2, 3)))
require.False(t, wg.WasTriggered())

wg.Done(1, 2)

require.True(t, wg.PendingElements().HasAll(NewSet(3)))
require.False(t, wg.WasTriggered())

wg.Done(1, 2)

require.True(t, wg.PendingElements().HasAll(NewSet(3)))
require.False(t, wg.WasTriggered())

wg.Add(4)

require.True(t, wg.PendingElements().HasAll(NewSet(3, 4)))
require.False(t, wg.WasTriggered())

wg.Add(4)

require.True(t, wg.PendingElements().HasAll(NewSet(3, 4)))
require.False(t, wg.WasTriggered())

wg.Done(3, 4)

require.True(t, wg.PendingElements().IsEmpty())
require.True(t, wg.WasTriggered())
}
77 changes: 77 additions & 0 deletions runtime/module/debug_waiting_process.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package module

Check failure on line 1 in runtime/module/debug_waiting_process.go

View workflow job for this annotation

GitHub Actions / golangci-runtime

[golangci-runtime] runtime/module/debug_waiting_process.go#L1

: # github.com/iotaledger/hive.go/runtime/module [github.com/iotaledger/hive.go/runtime/module.test]
Raw output
module/debug_waiting_process.go:1: : # github.com/iotaledger/hive.go/runtime/module [github.com/iotaledger/hive.go/runtime/module.test]

import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/iotaledger/hive.go/ds"
"github.com/iotaledger/hive.go/ds/reactive"
"github.com/iotaledger/hive.go/lo"
)

func DebugWaitingProcess(reportInterval time.Duration, handler ...func(ds.Set[Module])) {
debugReportInterval.Store(&reportInterval)

if len(handler) > 0 {
debugReportHandler.Store(&handler[0])
} else {
defaultDebugWaitAllHandler := func(pendingModules ds.Set[Module]) {
fmt.Println("Waiting for: " + strings.Join(lo.Map(pendingModules.ToSlice(), Module.LogName), ", "))
}

debugReportHandler.Store(&defaultDebugWaitAllHandler)
}
}

type PendingModules interface {
WaitGroup[Module]

Check failure on line 30 in runtime/module/debug_waiting_process.go

View workflow job for this annotation

GitHub Actions / golangci-runtime

[golangci-runtime] runtime/module/debug_waiting_process.go#L30

undefined: WaitGroup
Raw output
module/debug_waiting_process.go:30:2: undefined: WaitGroup

MarkDone(module Module)

ds.ReadableSet[Module]

reactive.Event
}

type pendingModules struct {
ds.ReadableSet[Module]

wg sync.WaitGroup
pendingModules ds.Set[Module]
}

func reportPendingModules(modules ...Module) (pendingModules ds.Set[Module]) {
reportInterval := debugReportInterval.Load()
reportHandler := debugReportHandler.Load()
if reportInterval == nil || reportHandler == nil {
return
}

pendingModules = ds.NewSet[Module]()
for _, module := range modules {
pendingModules.Add(module)
}

go func() {
ticker := time.NewTicker(*reportInterval)
defer ticker.Stop()

for range ticker.C {
if pendingModules.IsEmpty() {
break
}

(*reportHandler)(pendingModules)
}
}()

return
}

var (
debugReportInterval atomic.Pointer[time.Duration]
debugReportHandler atomic.Pointer[func(ds.Set[Module])]
)
17 changes: 1 addition & 16 deletions runtime/module/utils.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package module

import (
"fmt"
"sync"
"sync/atomic"

"github.com/iotaledger/hive.go/ds/reactive"
Expand All @@ -29,22 +27,9 @@ func OnAllStopped(callback func(), modules ...Module) (unsubscribe func()) {
return onModulesTriggered(Module.StoppedEvent, callback, modules...)
}

// WaitAll waits until all given modules have triggered the given event.
func WaitAll(event func(Module) reactive.Event, modules ...Module) {
var wg sync.WaitGroup

wg.Add(len(modules))
for _, module := range modules {
event(module).OnTrigger(wg.Done)
}

wg.Wait()
}

// TriggerAll triggers the given event on all given modules.
func TriggerAll(event func(Module) reactive.Event, modules ...Module) {
for i, module := range modules {
fmt.Println(i, "TriggerAll", module)
for _, module := range modules {
event(module).Trigger()
}
}
Expand Down
27 changes: 27 additions & 0 deletions runtime/module/wait_all.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package module

import (
"sync"

"github.com/iotaledger/hive.go/ds/reactive"
)

// WaitAll waits until all given modules have triggered the given event.
func WaitAll(event func(Module) reactive.Event, modules ...Module) reactive.WaitGroup[T] {

Check failure on line 10 in runtime/module/wait_all.go

View workflow job for this annotation

GitHub Actions / golangci-runtime

[golangci-runtime] runtime/module/wait_all.go#L10

undefined: reactive.WaitGroup
Raw output
module/wait_all.go:10:77: undefined: reactive.WaitGroup
pendingModules := reportPendingModules(modules...)

var wg sync.WaitGroup

wg.Add(len(modules))
for _, module := range modules {
event(module).OnTrigger(func() {
if pendingModules != nil {
pendingModules.Delete(module)
}

wg.Done()
})
}

wg.Wait()
}

Check failure on line 27 in runtime/module/wait_all.go

View workflow job for this annotation

GitHub Actions / golangci-runtime

[golangci-runtime] runtime/module/wait_all.go#L27

missing return (typecheck)
Raw output
module/wait_all.go:27:1: missing return (typecheck)
package module
35 changes: 35 additions & 0 deletions runtime/module/wait_all_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package module

import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/iotaledger/hive.go/log"
)

func Test(t *testing.T) {
module1 := New(log.NewLogger(log.WithName("module1")))
module2 := New(log.NewLogger(log.WithName("module2")))
module3 := New(log.NewLogger(log.WithName("module3")))

DebugWaitingProcess(1 * time.Second)

go func() {
time.Sleep(2 * time.Second)
module1.ConstructedEvent().Trigger()

time.Sleep(2 * time.Second)
module2.ConstructedEvent().Trigger()

time.Sleep(2 * time.Second)
module3.ConstructedEvent().Trigger()
}()

WaitAll(Module.ConstructedEvent, module1, module2, module3)

require.True(t, module1.ConstructedEvent().WasTriggered())
require.True(t, module2.ConstructedEvent().WasTriggered())
require.True(t, module3.ConstructedEvent().WasTriggered())
}

0 comments on commit fec5472

Please sign in to comment.