Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allocator: make assignment much much faster #351

Merged
merged 2 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions allocator/alloc_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,18 @@ func (s *State) observe() {
var slots = m.ItemLimit()
var zone = len(s.Zones) - 1

if len(s.Zones) == 0 || s.Zones[zone] < m.Zone {
if slots == 0 {
// Don't collect zones of members having no slots.
} else if len(s.Zones) == 0 || s.Zones[zone] < m.Zone {
s.Zones = append(s.Zones, m.Zone)
s.ZoneSlots = append(s.ZoneSlots, 0)
s.ZoneSlots = append(s.ZoneSlots, slots)
zone++
} else if s.Zones[zone] > m.Zone {
panic("invalid Member order")
} else {
s.ZoneSlots[zone] += slots
}

s.ZoneSlots[zone] += slots
s.MemberSlots += slots
s.NetworkHash = foldCRC(s.NetworkHash, s.Members[i].Raw.Key, slots)
}
Expand Down
89 changes: 45 additions & 44 deletions allocator/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
log "github.com/sirupsen/logrus"
"go.etcd.io/etcd/api/v3/etcdserverpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.gazette.dev/core/allocator/push_relabel"
"go.gazette.dev/core/allocator/sparse_push_relabel"
"go.gazette.dev/core/keyspace"
)
Expand All @@ -30,17 +29,14 @@ type AllocateArgs struct {
// allocation of all Items to Members. Allocate exits on an unrecoverable
// error, or if:
//
// * The local Member has an ItemLimit of Zero, AND
// * No Assignments to the current Member remain.
// - The local Member has an ItemLimit of Zero, AND
// - No Assignments to the current Member remain.
//
// Eg, Allocate should be gracefully stopped by updating the ItemLimit of the
// Member identified by Allocator.LocalKey() to zero (perhaps as part of a
// SIGTERM signal handler) and then waiting for Allocate to return, which it
// will once all instance Assignments have been re-assigned to other Members.
func Allocate(args AllocateArgs) error {
// flowNetwork is local to a single pass of the scheduler, but we retain a
// single instance and re-use it each iteration to reduce allocation.
var fn = new(flowNetwork)
// The leader runs push/relabel to re-compute a |desired| network only when
// the State |NetworkHash| changes. Otherwise, it incrementally converges
// towards the previous solution, which is still a valid maximum assignment.
Expand Down Expand Up @@ -73,37 +69,22 @@ func Allocate(args AllocateArgs) error {
// Do we need to re-solve for a maximum assignment?
if state.NetworkHash != lastNetworkHash {
var startTime = time.Now()

// Build a prioritized flowNetwork and solve for maximum flow.
if useSparseSolver {
var fs = newSparseFlowNetwork(state)
var mf = sparse_push_relabel.FindMaxFlow(fs)

desired = fs.extractAssignments(mf, desired[:0])
} else {
fn.init(state)
push_relabel.FindMaxFlow(&fn.source, &fn.sink)

// Extract desired max-flow Assignments for each Item.
desired = desired[:0]
for item := range state.Items {
desired = extractItemFlow(state, fn, item, desired)
}
}

desired = solveDesiredAssignments(state, desired[:0])
var dur = time.Since(startTime)
allocatorMaxFlowRuntimeSeconds.Observe(dur.Seconds())

log.WithFields(log.Fields{
"root": state.KS.Root,
"rev": state.KS.Header.Revision,
"hash": state.NetworkHash,
"lastHash": lastNetworkHash,
"items": len(state.Items),
"members": len(state.Members),
"assignments": len(state.Assignments),
"desired": len(desired),
"dur": dur,
"dur": dur,
"hash": state.NetworkHash,
"itemSlots": state.ItemSlots,
"items": len(state.Items),
"lastHash": lastNetworkHash,
"memberSlots": state.MemberSlots,
"members": len(state.Members),
"nextAssignments": len(desired),
"prevAssignments": len(state.Assignments),
"rev": state.KS.Header.Revision,
"root": state.KS.Root,
}).Info("solved for maximum assignment")

if len(desired) < state.ItemSlots {
Expand Down Expand Up @@ -225,19 +206,44 @@ func removeDeadAssignments(txn checkpointTxn, ks *keyspace.KeySpace, asn keyspac
return nil
}

func solveDesiredAssignments(s *State, desired []Assignment) []Assignment {
// Number of items to lump into each invocation of push/relabel.
// This is an arbitrary number which is empirically fast to solve,
// but is large enough that we're unlikely to see further improvements
// from finding a global maximum flow.
// Splitting the problem in this way makes it big-O linear on items,
// instead of combinatorial over members and items.
const itemsPerNetwork = 10000

// NOTE(johnny): this could trivially be parallelized if needed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my own understanding: I don't understand is how this could be trivially parallelized while still respecting the item limit for each member. Wouldn't each process need to know the current number of assignments for each member in order to be able to respect their item limits?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Answered below.

Then re: parallelization, State is unchanged when building / evaluating the max flow network, so multiple goroutines can work in parallel. I didn't do it yet because it's extra SLOC we don't appear to need, and I didn't know for-sure that the memory impact would be negligible.

for i := 0; i*itemsPerNetwork < len(s.Items); i++ {
var end = (i + 1) * itemsPerNetwork
if end > len(s.Items) {
end = len(s.Items)
}
var items = s.Items[i*itemsPerNetwork : end]

// Build a prioritized flow network and solve for maximum flow.
var network = newSparseFlowNetwork(s, items)
var maxFlow = sparse_push_relabel.FindMaxFlow(network)
desired = network.extractAssignments(maxFlow, desired)
}
return desired
}

// modRevisionUnchanged returns a Cmp which verifies the key has not changed
// from the current KeyValue.
func modRevisionUnchanged(kv keyspace.KeyValue) clientv3.Cmp {
return clientv3.Compare(clientv3.ModRevision(string(kv.Raw.Key)), "=", kv.Raw.ModRevision)
}

// checkpointTxn runs transactions. It's modeled on clientv3.Txn, but:
// * It introduces "checkpoints", whereby many checkpoints may be grouped into
// a smaller number of underlying Txns, while still providing a guarantee
// that If/Thens of a checkpoint will be issued together in one Txn.
// * It allows If and Then to be called multiple times.
// * It removes Else, as incompatible with the checkpoint model. As such,
// a Txn which does not succeed becomes an error.
// - It introduces "checkpoints", whereby many checkpoints may be grouped into
// a smaller number of underlying Txns, while still providing a guarantee
// that If/Thens of a checkpoint will be issued together in one Txn.
// - It allows If and Then to be called multiple times.
// - It removes Else, as incompatible with the checkpoint model. As such,
// a Txn which does not succeed becomes an error.
type checkpointTxn interface {
If(...clientv3.Cmp) checkpointTxn
Then(...clientv3.Op) checkpointTxn
Expand Down Expand Up @@ -381,8 +387,3 @@ func (b *batchedTxn) debugLogTxn(response *clientv3.TxnResponse, err error) {
// configuration at runtime with --max-txn-ops. We assume the default and will
// error if a smaller value is used.
var maxTxnOps = 128

// useSparseSolver is a developer toggle for comparative testing of the sparse
// vs dense push/relabel solvers & associated flow networks.
// Deprecated: to removed with the dense flow network implementation.
const useSparseSolver = true
67 changes: 38 additions & 29 deletions allocator/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package allocator
import (
"context"
"fmt"
"io"
"testing"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -35,15 +36,15 @@ func benchmarkSimulatedDeploy(b *testing.B) {
var client = etcdtest.TestClient()
defer etcdtest.Cleanup()

var ctx, _ = context.WithCancel(context.Background())
var ctx = context.Background()
var ks = NewAllocatorKeySpace("/root", testAllocDecoder{})
var state = NewObservedState(ks, MemberKey(ks, "zone-b", "leader"), isConsistent)
var state = NewObservedState(ks, MemberKey(ks, "zone-a", "leader"), isConsistent)

// Each stage of the deployment will cycle |NMembers10| Members.
var NMembers10 = b.N
var NMembersHalf = b.N * 5
var NMembers = b.N * 10
var NItems = NMembers * 100
var NItems = NMembers * 200

b.Logf("Benchmarking with %d items, %d members (%d /2, %d /10)", NItems, NMembers, NMembersHalf, NMembers10)

Expand All @@ -68,59 +69,67 @@ func benchmarkSimulatedDeploy(b *testing.B) {

// Announce half of Members...
fill(0, NMembersHalf, true, func(i int) (string, string) {
return benchMemberKey(ks, i), `{"R": 1000}`
return benchMemberKey(ks, i), `{"R": 1500}`
})
// And all Items.
fill(0, NItems, true, func(i int) (string, string) {
return ItemKey(ks, fmt.Sprintf("i%05d", i)), `{"R": 3}`
})

var testState = struct {
nextBlock int // Next block of Members to cycle down & up.
consistent bool // Whether we've marked Assignments as consistent.
nextBlock int // Next block of Members to cycle down & up.
nextDown bool // Are we next scaling down, or up?
}{}

var testHook = func(round int, idle bool) {
var begin = NMembers10 * testState.nextBlock
var end = NMembers10 * (testState.nextBlock + 1)

log.WithFields(log.Fields{
"round": round,
"idle": idle,
"state.nextBlock": testState.nextBlock,
"state.consistent": testState.consistent,
"begin": begin,
"end": end,
"round": round,
"idle": idle,
"begin": begin,
"end": end,
}).Info("ScheduleCallback")

if !idle {
return
} else if !testState.consistent {
// Mark any new Assignments as "consistent", which will typically
// unblock further convergence operations.
require.NoError(b, markAllConsistent(ctx, client, ks))
testState.consistent = true
} else if err := markAllConsistent(ctx, client, ks, ""); err == nil {
log.Info("marked some items as consistent")
return // We marked some items as consistent. Keep going.
} else if err == io.ErrNoProgress {
// Continue the next test step below.
} else {
log.WithField("err", err).Warn("failed to mark all consistent (will retry)")
return
}

log.WithFields(log.Fields{
"state.nextBlock": testState.nextBlock,
"state.nextDown": testState.nextDown,
}).Info("next test step")

if begin == NMembersHalf {
// We've cycled all Members. Gracefully exit by setting our ItemLimit to zero,
// and waiting for Serve to complete.
update(ctx, client, state.LocalKey, `{"R": 0}`)
testState.consistent = false
return
}

// Mark a block of Members as starting up, and shutting down.
fill(NMembersHalf+begin, NMembersHalf+end, true, func(i int) (string, string) {
return benchMemberKey(ks, i), `{"R": 1000}`
})
fill(begin, end, false, func(i int) (string, string) {
return benchMemberKey(ks, i), `{"R": 0}`
})

testState.nextBlock++
testState.consistent = false
if !testState.nextDown {
// Mark a block of Members as starting up.
fill(NMembersHalf+begin, NMembersHalf+end, true, func(i int) (string, string) {
return benchMemberKey(ks, i), `{"R": 1205}` // Less than before, but _just_ enough.
})
testState.nextDown = true
} else {
// Mark a block of Members as shutting down.
fill(begin, end, false, func(i int) (string, string) {
return benchMemberKey(ks, i), `{"R": 0}`
})
testState.nextBlock += 1
testState.nextDown = false
}
}

require.NoError(b, ks.Load(ctx, client, 0))
Expand All @@ -145,7 +154,7 @@ func benchMemberKey(ks *keyspace.KeySpace, i int) string {

switch i % 5 {
case 0, 2, 4:
zone = "zone-a"
zone = "zone-a" // Larger than zone-b.
case 1, 3:
zone = "zone-b"
}
Expand Down
Loading
Loading