Skip to content

Commit

Permalink
feat(zbchaos): add command for broker scaling (#464)
Browse files Browse the repository at this point in the history
Adds a new command that makes an API call to Zeebe to scale to the given
amount of brokers and then scales up the statefulset accordingly. The
command knows if it is scaling up or down to scale the statefulset first
or last. The command always waits for completion as it can only scale
down the statefulset after the change is applied.

Usage:

```
$ zbchaos cluster scale --brokers 5
```

Part 2/X of #458
  • Loading branch information
lenaschoenburg authored Dec 19, 2023
2 parents cc64d58 + bf6d01c commit a6694f3
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 24 deletions.
144 changes: 121 additions & 23 deletions go-chaos/cmd/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package cmd

import (
"bytes"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -42,25 +43,115 @@ func AddClusterCommands(rootCmd *cobra.Command, flags *Flags) {
Use: "wait",
Short: "Waits for a topology change to complete",
RunE: func(cmd *cobra.Command, args []string) error {
return waitForChange(flags)
return portForwardAndWaitForChange(flags)
},
}
var scaleCommand = &cobra.Command{
Use: "scale",
Short: "Scales the cluster to the given size",
RunE: func(cmd *cobra.Command, args []string) error {
return scaleCluster(flags)
},
}

rootCmd.AddCommand(clusterCommand)
clusterCommand.AddCommand(statusCommand)
clusterCommand.AddCommand(waitCommand)
waitCommand.Flags().IntVar(&flags.changeId, "changeId", 0, "The id of the change to wait for")
waitCommand.Flags().Int64Var(&flags.changeId, "changeId", 0, "The id of the change to wait for")
waitCommand.MarkFlagRequired("changeId")
clusterCommand.AddCommand(scaleCommand)
scaleCommand.Flags().IntVar(&flags.brokers, "brokers", 0, "The amount of brokers to scale to")
scaleCommand.MarkFlagRequired("brokers")
}

func printCurrentTopology(flags *Flags) error {
func scaleCluster(flags *Flags) error {
k8Client, err := createK8ClientWithFlags(flags)
ensureNoError(err)

err = k8Client.AwaitReadiness()
ensureNoError(err)

port, closePortForward := k8Client.MustGatewayPortForward(0, 9600)
defer closePortForward()
currentTopology, err := QueryTopology(port)
ensureNoError(err)
if currentTopology.PendingChange != nil {
return fmt.Errorf("cluster is already scaling")
}

if len(currentTopology.Brokers) > flags.brokers {
_, err = scaleDownBrokers(k8Client, port, flags.brokers)
} else if len(currentTopology.Brokers) < flags.brokers {
_, err = scaleUpBrokers(k8Client, port, flags.brokers)
} else {
return fmt.Errorf("cluster is already at size %d", flags.brokers)
}
ensureNoError(err)

return nil
}

func scaleUpBrokers(k8Client internal.K8Client, port int, brokers int) (*ChangeResponse, error) {
changeResponse, err := requestBrokerScaling(port, brokers)
ensureNoError(err)
_, err = k8Client.ScaleZeebeCluster(brokers)
ensureNoError(err)
waitForChange(port, changeResponse.ChangeId)
return changeResponse, nil
}

func scaleDownBrokers(k8Client internal.K8Client, port int, brokers int) (*ChangeResponse, error) {
changeResponse, err := requestBrokerScaling(port, brokers)
ensureNoError(err)

// Wait for brokers to leave before scaling down
err = waitForChange(port, changeResponse.ChangeId)
ensureNoError(err)
_, err = k8Client.ScaleZeebeCluster(brokers)

ensureNoError(err)
return changeResponse, nil
}

func requestBrokerScaling(port int, brokers int) (*ChangeResponse, error) {
brokerIds := make([]int32, brokers)
for i := 0; i < brokers; i++ {
brokerIds[i] = int32(i)
}
url := fmt.Sprintf("http://localhost:%d/actuator/cluster/brokers", port)
request, err := json.Marshal(brokerIds)
if err != nil {
panic(err)
return nil, err
}
resp, err := http.Post(url, "application/json", bytes.NewReader(request))
if err != nil {
return nil, err
}
if resp.StatusCode < 200 || resp.StatusCode >= 400 {
return nil, fmt.Errorf("scaling failed with code %d", resp.StatusCode)
}

err = k8Client.AwaitReadiness()
defer func(Body io.ReadCloser) {
_ = Body.Close()
}(resp.Body)

response, err := io.ReadAll(resp.Body)
if err != nil {
return err
return nil, err
}

var changeResponse ChangeResponse
err = json.Unmarshal(response, &changeResponse)
if err != nil {
return nil, err
}
return &changeResponse, nil
}

func printCurrentTopology(flags *Flags) error {
k8Client, err := createK8ClientWithFlags(flags)
if err != nil {
panic(err)
}

port, closePortForward := k8Client.MustGatewayPortForward(0, 9600)
Expand All @@ -74,56 +165,56 @@ func printCurrentTopology(flags *Flags) error {
if err != nil {
return err
}
fmt.Println(string(formatted))
internal.LogInfo("Current topology: %s", string(formatted))
return nil
}

func waitForChange(flags *Flags) error {
func portForwardAndWaitForChange(flags *Flags) error {
k8Client, err := createK8ClientWithFlags(flags)
if err != nil {
panic(err)
}

err = k8Client.AwaitReadiness()
if err != nil {
return err
}

port, closePortForward := k8Client.MustGatewayPortForward(0, 9600)
defer closePortForward()

return waitForChange(port, flags.changeId)
}

func waitForChange(port int, changeId int64) error {
interval := time.Second * 5
timeout := (time.Minute * 25)
iterations := int(timeout / interval)
for i := 0; i < int(iterations); i++ {
topology, err := QueryTopology(port)
if err != nil {
return err
internal.LogInfo("Failed to query topology: %s", err)
continue
}
changeStatus := describeChangeStatus(topology, int64(flags.changeId))
changeStatus := describeChangeStatus(topology, int64(changeId))
switch changeStatus {
case ChangeStatusCompleted:
internal.LogInfo("Change %d completed successfully", flags.changeId)
internal.LogInfo("Change %d completed successfully", changeId)
return nil
case ChangeStatusFailed:
internal.LogInfo("Change %d failed with status %s", flags.changeId, topology.LastChange.Status)
return fmt.Errorf("change %d failed with status %s", flags.changeId, topology.LastChange.Status)
internal.LogInfo("Change %d failed with status %s", changeId, topology.LastChange.Status)
return fmt.Errorf("change %d failed with status %s", changeId, topology.LastChange.Status)
case ChangeStatusOutdated:
internal.LogInfo("Change %d is outdated but was most likely completed successfully, latest change is %d", flags.changeId, topology.LastChange.Id)
internal.LogInfo("Change %d is outdated but was most likely completed successfully, latest change is %d", changeId, topology.LastChange.Id)
return nil
case ChangeStatusPending:
competed := len(topology.PendingChange.Completed)
pending := len(topology.PendingChange.Pending)
total := competed + pending
internal.LogInfo("Change %d is %s with %d/%d operations complete", flags.changeId, topology.PendingChange.Status, competed, total)
internal.LogInfo("Change %d is %s with %d/%d operations complete", changeId, topology.PendingChange.Status, competed, total)
case ChangeStatusUnknown:
internal.LogInfo("Change %d not yet started", flags.changeId)
internal.LogInfo("Change %d not yet started", changeId)
}
internal.LogInfo("Waiting %s before checking again. Iteration %d out of %d", interval, i, iterations)
internal.LogVerbose("Waiting %s before checking again. Iteration %d out of %d", interval, i, iterations)
time.Sleep(interval)
}

return fmt.Errorf("change %d did not complete within 25 minutes", flags.changeId)
return fmt.Errorf("change %d did not complete within 25 minutes", changeId)
}

type ChangeStatus string
Expand Down Expand Up @@ -177,6 +268,13 @@ func QueryTopology(port int) (*CurrentTopology, error) {
return &topology, nil
}

type ChangeResponse struct {
ChangeId int64
CurrentTopology []BrokerState
PlannedChanges []Operation
ExpectedTopology []BrokerState
}

type CurrentTopology struct {
Version int32
Brokers []BrokerState
Expand Down
3 changes: 2 additions & 1 deletion go-chaos/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ type Flags struct {
jobType string

// cluster
changeId int
changeId int64
brokers int
}

var Version = "development"
Expand Down
4 changes: 4 additions & 0 deletions go-chaos/cmd/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"io"
"sort"
"strings"
"text/tabwriter"

Expand Down Expand Up @@ -67,6 +68,9 @@ func writeTopologyToOutput(output io.Writer, response *pb.TopologyResponse) {
}

func writeTopology(response *pb.TopologyResponse, writer *tabwriter.Writer) {
sort.Slice(response.Brokers, func(i, j int) bool {
return response.Brokers[i].NodeId < response.Brokers[j].NodeId
})
for _, broker := range response.Brokers {
addLineToWriter(writer, createBrokerTopologyString(response.PartitionsCount, broker))
}
Expand Down

0 comments on commit a6694f3

Please sign in to comment.