Skip to content

Commit

Permalink
Assigner only checks the health status for routed components.
Browse files Browse the repository at this point in the history
After #90, every babysitter retrieves routing information for
non-routed components. This means that the assigner is only
consulted to retrieve routing information for routed components.
  • Loading branch information
spetrovic77 committed Aug 4, 2023
1 parent d5c31d4 commit aeaf340
Show file tree
Hide file tree
Showing 5 changed files with 364 additions and 251 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,6 @@ jobs:
- name: Cleanup the echo app
if: always()
run: ./cmd/weaver-gke/weaver-gke kill --force echo-${{env.VERSION}}
run:
./cmd/weaver-gke/weaver-gke kill --force echo-${{env.VERSION}} || true

59 changes: 42 additions & 17 deletions internal/nanny/assigner/assigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,21 +173,27 @@ func (a *Assigner) registerApp(ctx context.Context, app string) error {

// registerReplicaSet registers the provided Kubernetes ReplicaSet in the
// appropriate app's state.
func (a *Assigner) registerReplicaSet(ctx context.Context, rid *ReplicaSetId) error {
appName := rid.Config.Deployment.App.Name
_, _, err := a.applyAppState(ctx, appName, func(state *AppState) bool {
for _, existing := range state.ReplicaSetIds {
if proto.Equal(rid, existing) {
return false
func (a *Assigner) registerReplicaSet(ctx context.Context, cfg *config.GKEConfig, replicaSet string, routed bool) (*ReplicaSetId, error) {
var rid *ReplicaSetId
_, _, err := a.applyAppState(ctx, cfg.Deployment.App.Name, func(state *AppState) bool {
for _, rid = range state.ReplicaSetIds {
if rid.Name == replicaSet && rid.Config.Deployment.Id == cfg.Deployment.Id {
if !rid.HasRouted && routed {
rid.HasRouted = true
return true // modified
}
return false // unmodified
}
}
rid = &ReplicaSetId{
Config: cfg,
Name: replicaSet,
HasRouted: routed,
}
state.ReplicaSetIds = append(state.ReplicaSetIds, rid)
return true
return true // modified
})
if err != nil {
return fmt.Errorf("register replica set %v: %v", rid, err)
}
return nil
return rid, err
}

// RegisterActiveComponent registers the given component has been activated. It
Expand Down Expand Up @@ -229,8 +235,8 @@ func (a *Assigner) RegisterActiveComponent(ctx context.Context, req *nanny.Activ
if err := a.registerApp(ctx, appName); err != nil {
return err
}
rid := &ReplicaSetId{Config: req.Config, Name: rsName}
if err := a.registerReplicaSet(ctx, rid); err != nil {
rid, err := a.registerReplicaSet(ctx, req.Config, rsName, req.Routed)
if err != nil {
return err
}

Expand All @@ -239,7 +245,9 @@ func (a *Assigner) RegisterActiveComponent(ctx context.Context, req *nanny.Activ
}

// Create an initial assignment for the component.
var cur *ReplicaSetInfo
if _, _, err := a.applyReplicaSetInfo(ctx, rid, func(rs *ReplicaSetInfo) bool {
cur = rs
if rs.Components == nil {
rs.Components = map[string]*Assignment{}
}
Expand All @@ -255,6 +263,13 @@ func (a *Assigner) RegisterActiveComponent(ctx context.Context, req *nanny.Activ
}); err != nil {
return err
}

// If this is a first routed component in a given ReplicaSet, we need
// to start checkers for replicas/Pods that have already been registered.
if err := a.updateCheckers(rid, cur, protos.HealthStatus_HEALTHY); err != nil {
return err
}

return a.mayGenerateNewRoutingInfo(ctx, rid)
}

Expand Down Expand Up @@ -290,8 +305,8 @@ func (a *Assigner) RegisterReplica(ctx context.Context, req *nanny.RegisterRepli
if err := a.registerApp(ctx, appName); err != nil {
return err
}
rid := &ReplicaSetId{Config: req.Config, Name: req.ReplicaSet}
if err := a.registerReplicaSet(ctx, rid); err != nil {
rid, err := a.registerReplicaSet(ctx, req.Config, req.ReplicaSet, false /*routed*/)
if err != nil {
return err
}

Expand All @@ -314,6 +329,10 @@ func (a *Assigner) RegisterReplica(ctx context.Context, req *nanny.RegisterRepli
return err
}

if !rid.HasRouted {
return nil
}

a.addChecker(rid, req.WeaveletAddress, req.PodName, req.BabysitterAddress, protos.HealthStatus_HEALTHY)
return a.mayGenerateNewRoutingInfo(ctx, rid)
}
Expand Down Expand Up @@ -347,8 +366,8 @@ func (a *Assigner) RegisterListener(ctx context.Context, req *nanny.ExportListen
if err := a.registerApp(ctx, req.Config.Deployment.App.Name); err != nil {
return err
}
rid := &ReplicaSetId{Config: req.Config, Name: req.ReplicaSet}
if err := a.registerReplicaSet(ctx, rid); err != nil {
rid, err := a.registerReplicaSet(ctx, req.Config, req.ReplicaSet, false /*routed*/)
if err != nil {
return err
}

Expand Down Expand Up @@ -868,6 +887,9 @@ func (a *Assigner) advanceAssignments(ctx context.Context) error {
}

for _, rid := range appState.ReplicaSetIds {
if !rid.HasRouted {
continue
}
if err := a.mayGenerateNewRoutingInfo(ctx, rid); err != nil {
errs = append(errs, err)
}
Expand Down Expand Up @@ -920,6 +942,9 @@ func (a *Assigner) annealCheckersForApp(ctx context.Context, app string) error {
// Add any new checkers.
var errs []error
for _, rid := range state.ReplicaSetIds {
if !rid.HasRouted {
continue
}
replicaSet, version, err := a.loadReplicaSetInfo(ctx, rid)
if err != nil {
errs = append(errs, err)
Expand Down
Loading

0 comments on commit aeaf340

Please sign in to comment.