Skip to content

Commit

Permalink
Change to the observer pattern (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
ders authored May 23, 2024
1 parent f730838 commit 759842d
Show file tree
Hide file tree
Showing 7 changed files with 266 additions and 290 deletions.
186 changes: 106 additions & 80 deletions collection.go
Original file line number Diff line number Diff line change
@@ -1,78 +1,94 @@
package nested

import (
"math/rand"
"strconv"
"errors"
"fmt"
"sort"
"strings"
"sync"
)

// A Collection monitors multiple services and keeps track of the overall state. The overall state is defined as:
// - Ready if all of the services are ready.
// - Stopped if ANY of the services are stopped.
// - Not Ready otherwise.
// - Ready if ALL of the services are ready.
// - Stopped if ALL of the services are stopped.
// - Error if ANY of the services are erroring.
// - Error if some (but not all) of the services are stopped.
// - Initializing of ANY of the services are initializing (and none are erroring).
//
// A Collection implements the Service interface but does not set the error states.
// A Collection implements the Service interface.
//
// Services to be monitored are added using the Add() method. Services cannot be removed once added.
//
// An empty Collection is ready to use and in the Not Ready state. A Collection must not be copied after first use.
// To start monitoring, the caller must invoke the Run() method. Only when Run has been called AND all of the services
// have finished initialization will the collection change its state. Services should not be added after calling Run().
//
// An empty Collection is ready to use and in the Initializing state. A Collection must not be copied after first use.
type Collection struct {
Monitor
sync.Mutex
services map[string]Service
id string
updates chan Notification
running bool
}

// Verifies that a Monitor implements the Service interface.
// Verifies that a Collection implements the Service interface.
var _ Service = &Collection{}

// Add adds a service to be monitored. Panics if the service has already been added. Panics if the label has been
// used already for another service.
// A CollectionError is returned by the collections Err() method when any of the services are erroring. It can be
// inspected for details of the errors from each service.
type CollectionError struct {
// Errors contains the error descriptions from each erroring service, indexed by label. Only erroring services are included.
Errors map[string]error
}

// An ErrStoppedServices error is returned by the collections Err() method when no services are erroring and some (but
// not all) of the monitored services are stopped. It normally indicates that we're in the process of shutting down.
var ErrStoppedServices = errors.New("there are stopped services")

// Error returns the error descriptions from all erroring services in a multi-line string.
func (ce CollectionError) Error() string {
msgs := make([]string, 0, len(ce.Errors))
for id, err := range ce.Errors {
msgs = append(msgs, id+": "+err.Error())
}
sort.Strings(msgs)
return strings.Join(msgs, "\n")
}

// Add adds a service to be monitored. Panics if the label has already been used in this collection.
func (c *Collection) Add(label string, s Service) {
c.Lock()
defer c.Unlock()

// Initialize the update channel if this is the first service to be added.
// Initialize the maps if this is the first service to be added.
if c.services == nil {
c.services = make(map[string]Service)
c.updates = make(chan Notification)
go func() {
for range c.updates {
c.Monitor.SetState(c.getOverallState(), nil)
}
}()
// Using the same ID to subscribe to all monitored services means that Subscribe will panic below if a service
// is added twice.
c.id = "collection-" + strconv.Itoa(rand.Int())
} else {
// Otherwise check that we're not reusing a label.
if _, ok := c.services[label]; ok {
panic("add: label " + label + " already in use")
panic(fmt.Sprintf("add: label %q already in use", label))
}
}

c.services[label] = s
s.Subscribe(c.id, c.updates)

// Trigger an update to include the state of the newly added service.
go func() {
c.updates <- Notification{}
}()
// Just in case someone adds a service to a running collection, make sure we get its events. The alternative would
// be to disallow adding the service in the first place, but we don't want to do that.
if c.running {
s.Register(c)
}
}

// StateCount returns the number of monitored services currently in the given state.
func (c *Collection) StateCount(state State) int {
// Run starts monitoring the added services. The collection remains in the Initializing state until all of the
// monitored services are finished initializing.
//
// Calling Run on an already running collection has no effect.
func (c *Collection) Run() {
defer c.OnNotify(Event{})
c.Lock()
defer c.Unlock()

var n int
for _, service := range c.services {
if service.GetState() == state {
n++
}
for _, s := range c.services {
s.Register(c)
}
return n
}

// Up returns a map whose keys are the labels of all the currently monitored services and whose values are true if
Expand All @@ -91,58 +107,68 @@ func (c *Collection) Up() map[string]bool {
// any of the services should be used after calling stop.
func (c *Collection) Stop() {

// Start stopping all of the member services, and then release the lock.
u := func() chan Notification {

// Initialize the wait group first so that wg.Wait() runs after the lock is released. That way, if we block
// on any of the Stop() calls, we do so without holding the lock.
wg := sync.WaitGroup{}
defer wg.Wait()

c.Lock()
defer c.Unlock()

wg.Add(len(c.services))
for _, service := range c.services {
// Unsubscribe first so that we can close the notifications channel. Note that a side effect of
// unsubscribing here is that we need to explicitly set the monitor to stopped when we're done.
service.Unsubscribe(c.id)
go func(s Service) {
s.Stop()
wg.Done()
}(service)
}
c.services = nil
// Initialize the wait group first so that wg.Wait() runs after the lock is released. That way, if we block
// on any of the Stop() calls, we do so without holding the lock.
wg := sync.WaitGroup{}
defer wg.Wait()

// Return the update channel so that we don't have to grab the lock again to get it.
return c.updates
}()
c.Lock()
defer c.Unlock()

// Close the update channel to release the goroutine in Add() above. If u is nil, that means that this collection
// hasn't been used, which is unexpected but not our concern.
if u != nil {
close(u)
wg.Add(len(c.services))
for _, service := range c.services {
go func(s Service) {
s.Stop()
wg.Done()
}(service)
}

// Need to explicitly set the monitor to stopped, since we unsubscribed already above.
c.Monitor.Stop()
}

// getOverallState computes the overall state of the collection: ready if all of the services are ready, stopped
// if any of the services are stopped, and not ready otherwise. getOverallState should not be called on an empty
// collection, as it will give the incorrect state.
func (c *Collection) getOverallState() State {
// OnNotify updates the state of the collection according to the states of all of the monitored services. No update is
// done if any of the services are still initializing.
//
// OnNotify is used internally as a callback when any monitored service changes state. It is not normally called directly.
func (c *Collection) OnNotify(_ Event) {

c.Lock()
defer c.Unlock()

we := State(Ready)
for _, service := range c.services {
switch service.GetState() {
allStopped := true
anyStopped := false
errs := make(map[string]error)

if len(c.services) == 0 {
return
}

for id, s := range c.services {
switch s.GetState() {
case Initializing:
return
case Ready:
allStopped = false
case Error:
errs[id] = s.Err()
allStopped = false // not actually needed, since we check for errors first
case Stopped:
return Stopped
case NotReady:
we = NotReady
anyStopped = true
}
}
return we

if len(errs) > 0 {
c.Monitor.SetError(CollectionError{Errors: errs})
return
}

if allStopped {
c.Monitor.Stop()
return
}

if anyStopped {
c.Monitor.SetError(ErrStoppedServices)
return
}

c.Monitor.SetReady()
}
87 changes: 22 additions & 65 deletions collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,97 +10,54 @@ func TestCollection(t *testing.T) {

co := Collection{}

// A new collection is not ready.
s, e := co.GetFullState()
assertEqual(t, NotReady, s)
assertEqual(t, nil, e)
assertEqual(t, s, co.GetState())
// A new collection is initializing
assertEqual(t, Initializing, co.GetState())
assertEqual(t, nil, co.Err())
assertEqual(t, map[string]bool{}, co.Up())

// Add services.
s0, s1 := &Monitor{}, &Monitor{}
co.Add("service 0", s0)
co.Add("service 1", s1)
co.Run()
time.Sleep(10 * time.Millisecond)
s, e = co.GetFullState()
assertEqual(t, NotReady, s)
assertEqual(t, nil, e)
assertEqual(t, s, co.GetState())
assertEqual(t, Initializing, co.GetState())
assertEqual(t, nil, co.Err())
assertEqual(t, map[string]bool{"service 0": false, "service 1": false}, co.Up())

// Can't add another service 0.
assertPanic(t, func() { co.Add("service 0", s0) }, `add: label "service 0" already in use`)

// One service is ready.
s0.SetState(Ready, nil)
s0.SetReady()
time.Sleep(10 * time.Millisecond)
s, e = co.GetFullState()
assertEqual(t, NotReady, s)
assertEqual(t, nil, e)
assertEqual(t, s, co.GetState())
assertEqual(t, Initializing, co.GetState())
assertEqual(t, nil, co.Err())
assertEqual(t, map[string]bool{"service 0": true, "service 1": false}, co.Up())

// Both services are ready.
s1.SetState(Ready, nil)
s1.SetReady()
time.Sleep(10 * time.Millisecond)
s, e = co.GetFullState()
assertEqual(t, Ready, s)
assertEqual(t, nil, e)
assertEqual(t, s, co.GetState())
assertEqual(t, Ready, co.GetState())
assertEqual(t, nil, co.Err())
assertEqual(t, map[string]bool{"service 0": true, "service 1": true}, co.Up())

assertEqual(t, 2, co.StateCount(Ready))
assertEqual(t, 0, co.StateCount(NotReady))
assertEqual(t, 0, co.StateCount(Stopped))

// One service is stopped.
s0.Stop()
time.Sleep(10 * time.Millisecond)
s, e = co.GetFullState()
assertEqual(t, Stopped, s)
assertEqual(t, nil, e)
assertEqual(t, s, co.GetState())
assertEqual(t, Error, co.GetState())
assertEqual(t, ErrStoppedServices, co.Err())
assertEqual(t, map[string]bool{"service 0": false, "service 1": true}, co.Up())

assertEqual(t, 1, co.StateCount(Ready))
assertEqual(t, 0, co.StateCount(NotReady))
assertEqual(t, 1, co.StateCount(Stopped))

// One service is stopped, and the other is not ready.
s1.SetState(NotReady, nil)
nr := errors.New("not ready")
s1.SetError(nr)
time.Sleep(10 * time.Millisecond)
s, e = co.GetFullState()
assertEqual(t, Stopped, s)
assertEqual(t, nil, e)
assertEqual(t, s, co.GetState())
assertEqual(t, Error, co.GetState())
assertEqual(t, error(CollectionError{Errors: map[string]error{"service 1": nr}}), co.Err())
assertEqual(t, map[string]bool{"service 0": false, "service 1": false}, co.Up())

// Stop all services.
co.Stop()
assertEqual(t, 0, co.StateCount(Ready))
assertEqual(t, 0, co.StateCount(NotReady))
assertEqual(t, map[string]bool{}, co.Up())

// We also have no stopped services because the service list has been emptied.
assertEqual(t, 0, co.StateCount(Stopped))
}

func TestCollection2(t *testing.T) {

co := Collection{}

// A new collection is not ready.
s, e := co.GetFullState()
assertEqual(t, NotReady, s)
assertEqual(t, nil, e)

// Add two services; one is ready, one isn't.
s0, s1 := &Monitor{}, &Monitor{}
s0.SetState(Ready, nil)
co.Add("service 0", s0)
s1.SetState(NotReady, errors.New("oh, no!"))
co.Add("service 1", s1)
time.Sleep(10 * time.Millisecond)
s, e = co.GetFullState()
assertEqual(t, NotReady, s)
assertEqual(t, nil, e)
assertEqual(t, s, co.GetState())
assertEqual(t, map[string]bool{"service 0": true, "service 1": false}, co.Up())
assertEqual(t, map[string]bool{"service 0": false, "service 1": false}, co.Up())
}
21 changes: 15 additions & 6 deletions doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,26 @@
// should implement.
//
// The state machine has the following states:
// - Initializing. The service is not ready yet.
// - Ready. The service is running normally.
// - Not ready. The service is temporarily unavailable.
// - Stopped. The service is permanently unavailable.
//
// Additionally, an error state is exposed.
// - When ready, the error state should always be nil.
// - When not ready, the error state may indicate a reason for being not ready. Not ready with a nil error state
// implies that the service is initializing.
// - When stopped, the error state may indicate a reason for being stopped. Stopped with a nil error state implies
// that the service was stopped by the calling process with Stop().
// The state machine begins in the initializing state. Once it transitions to one of the other states, it can never
// return to the initializing state.
//
// A state machine in the stopped state cannot change states.
//
// This package also provides a Monitor type, which implements the state machine. A Monitor can be embedded in
// any service to make it a nested service.
//
// A common pattern is to include a Monitor in the struct that defines the nested service, e.g.
//
// type MyService struct {
// nested.Monitor
// ...
// }
//
// The MyService constructor may either return an initializing service or a fully initialized service. The MyService
// Stop() method, however, should always wait until the service has stopped completely before returning.
package nested
Loading

0 comments on commit 759842d

Please sign in to comment.