From 0ecbbcd98ecc4f24a3c6dbc04099f4a3aa545f4b Mon Sep 17 00:00:00 2001 From: Ian Wahbe Date: Thu, 28 Mar 2024 13:10:28 +0100 Subject: [PATCH 1/7] refactor: Improve readability of cancelation wrapper --- middleware/cancel/cancel.go | 142 +++++++++++++++--------------------- 1 file changed, 60 insertions(+), 82 deletions(-) diff --git a/middleware/cancel/cancel.go b/middleware/cancel/cancel.go index f1bbfabd..75b7ebef 100644 --- a/middleware/cancel/cancel.go +++ b/middleware/cancel/cancel.go @@ -70,91 +70,69 @@ func Wrap(provider p.Provider) p.Provider { } return err } - if provider.GetSchema != nil { - wrapper.GetSchema = func(ctx p.Context, req p.GetSchemaRequest) (p.GetSchemaResponse, error) { - ctx, end := cancel(ctx, noTimeout) - defer end() - return provider.GetSchema(ctx, req) - } - } - if provider.CheckConfig != nil { - wrapper.CheckConfig = func(ctx p.Context, req p.CheckRequest) (p.CheckResponse, error) { - ctx, end := cancel(ctx, noTimeout) - defer end() - return provider.CheckConfig(ctx, req) - } - } - if provider.DiffConfig != nil { - wrapper.DiffConfig = func(ctx p.Context, req p.DiffRequest) (p.DiffResponse, error) { - ctx, end := cancel(ctx, noTimeout) - defer end() - return provider.DiffConfig(ctx, req) - } - } - if provider.Configure != nil { - wrapper.Configure = func(ctx p.Context, req p.ConfigureRequest) error { - ctx, end := cancel(ctx, noTimeout) - defer end() - return provider.Configure(ctx, req) - } - } - if provider.Invoke != nil { - wrapper.Invoke = func(ctx p.Context, req p.InvokeRequest) (p.InvokeResponse, error) { - ctx, end := cancel(ctx, noTimeout) - defer end() - return provider.Invoke(ctx, req) - } - } - if provider.Check != nil { - wrapper.Check = func(ctx p.Context, req p.CheckRequest) (p.CheckResponse, error) { - ctx, end := cancel(ctx, noTimeout) - defer end() - return provider.Check(ctx, req) - } - } - if provider.Diff != nil { - wrapper.Diff = func(ctx p.Context, req p.DiffRequest) (p.DiffResponse, error) { - ctx, end := cancel(ctx, noTimeout) - defer end() - return provider.Diff(ctx, req) - } - } - if provider.Create != nil { - wrapper.Create = func(ctx p.Context, req p.CreateRequest) (p.CreateResponse, error) { - ctx, end := cancel(ctx, req.Timeout) - defer end() - return provider.Create(ctx, req) - } - } - if provider.Read != nil { - wrapper.Read = func(ctx p.Context, req p.ReadRequest) (p.ReadResponse, error) { - ctx, end := cancel(ctx, noTimeout) - defer end() - return provider.Read(ctx, req) - } - } - if provider.Update != nil { - wrapper.Update = func(ctx p.Context, req p.UpdateRequest) (p.UpdateResponse, error) { - ctx, end := cancel(ctx, req.Timeout) - defer end() - return provider.Update(ctx, req) - } - } - if provider.Delete != nil { - wrapper.Delete = func(ctx p.Context, req p.DeleteRequest) error { - ctx, end := cancel(ctx, req.Timeout) - defer end() - return provider.Delete(ctx, req) - } + + // Wrap each gRPC method to transform a cancel call into a cancel on + // context.Cancel. + wrapper.GetSchema = setCancel2(cancel, provider.GetSchema, nil) + wrapper.CheckConfig = setCancel2(cancel, provider.CheckConfig, nil) + wrapper.DiffConfig = setCancel2(cancel, provider.DiffConfig, nil) + wrapper.Configure = setCancel1(cancel, provider.Configure, nil) + wrapper.Invoke = setCancel2(cancel, provider.Invoke, nil) + wrapper.Check = setCancel2(cancel, provider.Check, nil) + wrapper.Diff = setCancel2(cancel, provider.Diff, nil) + wrapper.Create = setCancel2(cancel, provider.Create, func(r p.CreateRequest) float64 { + return r.Timeout + }) + wrapper.Read = setCancel2(cancel, provider.Read, nil) + wrapper.Update = setCancel2(cancel, provider.Update, func(r p.UpdateRequest) float64 { + return r.Timeout + }) + wrapper.Delete = setCancel1(cancel, provider.Delete, func(r p.DeleteRequest) float64 { + return r.Timeout + }) + wrapper.Construct = setCancel2(cancel, provider.Construct, nil) + return wrapper +} + +func setCancel1[ + Req any, + F func(p.Context, Req) error, + Cancel func(ctx p.Context, timeout float64) (p.Context, func()), + GetTimeout func(Req) float64, +](cancel Cancel, f F, getTimeout GetTimeout) F { + if f == nil { + return nil + } + return func(ctx p.Context, req Req) error { + var timeout float64 + if getTimeout != nil { + timeout = getTimeout(req) + } + ctx, end := cancel(ctx, timeout) + defer end() + return f(ctx, req) } - if provider.Construct != nil { - wrapper.Construct = func(ctx p.Context, req p.ConstructRequest) (p.ConstructResponse, error) { - ctx, end := cancel(ctx, noTimeout) - defer end() - return provider.Construct(ctx, req) +} + +func setCancel2[ + Req any, Resp any, + F func(p.Context, Req) (Resp, error), + Cancel func(ctx p.Context, timeout float64) (p.Context, func()), + GetTimeout func(Req) float64, +](cancel Cancel, f F, getTimeout GetTimeout) F { + if f == nil { + return nil + } + return func(ctx p.Context, req Req) (Resp, error) { + var timeout float64 + if getTimeout != nil { + timeout = getTimeout(req) } + + ctx, end := cancel(ctx, timeout) + defer end() + return f(ctx, req) } - return wrapper } const noTimeout float64 = 0 From 997d21b48a4f929889594e1c0b1c992eaacb4184 Mon Sep 17 00:00:00 2001 From: Ian Wahbe Date: Thu, 28 Mar 2024 14:29:59 +0100 Subject: [PATCH 2/7] Replace inOutCache with evict.Pool The new implementation is simpler to understand and consequentially less racy. --- middleware/cancel/cancel.go | 99 ++------------ middleware/cancel/cancel_test.go | 44 +----- middleware/cancel/internal/evict/pool.go | 127 ++++++++++++++++++ middleware/cancel/internal/evict/pool_test.go | 111 +++++++++++++++ 4 files changed, 247 insertions(+), 134 deletions(-) create mode 100644 middleware/cancel/internal/evict/pool.go create mode 100644 middleware/cancel/internal/evict/pool_test.go diff --git a/middleware/cancel/cancel.go b/middleware/cancel/cancel.go index 75b7ebef..e295324b 100644 --- a/middleware/cancel/cancel.go +++ b/middleware/cancel/cancel.go @@ -20,18 +20,19 @@ package cancel import ( "context" - "sync" "time" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" p "github.com/pulumi/pulumi-go-provider" + "github.com/pulumi/pulumi-go-provider/middleware/cancel/internal/evict" ) func Wrap(provider p.Provider) p.Provider { - var canceled bool - var cancelFuncs inOutCache[context.CancelFunc] + cancelFuncs := evict.Pool[context.CancelFunc]{ + OnEvict: func(f context.CancelFunc) { f() }, + } cancel := func(ctx p.Context, timeout float64) (p.Context, func()) { var cancel context.CancelFunc if timeout == noTimeout { @@ -39,23 +40,13 @@ func Wrap(provider p.Provider) p.Provider { } else { ctx, cancel = p.CtxWithTimeout(ctx, time.Second*time.Duration(timeout)) } - if canceled { - cancel() - return ctx, func() {} - } - evict := cancelFuncs.insert(cancel) - return ctx, func() { - if !evict() { - cancel() - } - } + + handle := cancelFuncs.Insert(cancel) + return ctx, handle.Evict } wrapper := provider wrapper.Cancel = func(ctx p.Context) error { - canceled = true - for _, f := range cancelFuncs.drain() { - f() - } + cancelFuncs.Close() // We consider this a valid implementation of the Cancel RPC request. We still pass on // the request so downstream provides *may* rely on the Cancel call, but we catch an @@ -136,77 +127,3 @@ func setCancel2[ } const noTimeout float64 = 0 - -// A data structure which provides amortized O(1) insertion, removal, and draining. -type inOutCache[T any] struct { - values []*entry[T] // An unorderd list of stored values or tombstone (nil) entries. - tombstones []int // An unordered list of empty slots in values - m sync.Mutex - inDrain bool // Wheither the cache is currently being drained. inDrain=true implies m can be ignored. -} - -type entry[T any] struct { - evict func() bool - value T -} - -// Insert a new element into the inOutCahce. The new element can be ejected by calling -// `evict`. If the element was already drained or if `evict` was already called, then -// `evict` will return true. Otherwise it returns false. -func (h *inOutCache[T]) insert(t T) (evict func() (missing bool)) { - h.m.Lock() - defer h.m.Unlock() - var i int // The index in values of the new entry. - if len(h.tombstones) == 0 { - i = len(h.values) // We extend values. - } else { - // There is an empty slot in values, so use that. - i = h.tombstones[len(h.tombstones)-1] - h.tombstones = h.tombstones[:len(h.tombstones)-1] - } - - el := &entry[T]{ - value: t, - } - el.evict = func() bool { - if !h.inDrain { - h.m.Lock() - defer h.m.Unlock() - } - gone := el.evict == nil - if gone { - return true - } - el.evict = nil - h.values[i] = nil - h.tombstones = append(h.tombstones, i) - return gone - - } - - // Push the value - if len(h.tombstones) == 0 { - h.values = append(h.values, el) - } else { - h.values[i] = el - } - return el.evict -} - -// Remove all values from the inOutCache, and return them. -func (h *inOutCache[T]) drain() []T { - h.m.Lock() - defer h.m.Unlock() - // Setting inDrain indicates a trusted actor holds the mutex, indicating that evict - // functions don't need to grab the mutex before executing. - h.inDrain = true - defer func() { h.inDrain = false }() - values := []T{} // Values currently in the cache. - for _, v := range h.values { - if v != nil { - v.evict() - values = append(values, v.value) - } - } - return values -} diff --git a/middleware/cancel/cancel_test.go b/middleware/cancel/cancel_test.go index 29a779b2..8d086e6a 100644 --- a/middleware/cancel/cancel_test.go +++ b/middleware/cancel/cancel_test.go @@ -1,4 +1,4 @@ -// Copyright 2022, Pulumi Corporation. +// Copyright 2024, Pulumi Corporation. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -13,45 +13,3 @@ // limitations under the License. package cancel - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestInOutCache(t *testing.T) { - t.Parallel() - cache := inOutCache[int]{} - evicts := make([]func() bool, 1000) - for i := 0; i < 1000; i++ { - evicts[i] = cache.insert(i) - } - for i := 0; i < 1000; i += 2 { - assert.False(t, evicts[i]()) - } - - for i := 1000; i < 1500; i++ { - evicts = append(evicts, cache.insert(i)) - } - - assert.Len(t, cache.values, 1001) - - expected := make([]int, 0, 1000) - for i := 1; i < 1000; i += 2 { - expected = append(expected, i) - } - for i := 1000; i < 1500; i++ { - expected = append(expected, i) - } - elements := cache.drain() - assert.ElementsMatch(t, elements, expected) - - // Refill the cache - for i := 0; i < 1000; i++ { - cache.insert(i + 2000) - } - for _, f := range evicts { - assert.True(t, f(), "cache element was evicted") - } -} diff --git a/middleware/cancel/internal/evict/pool.go b/middleware/cancel/internal/evict/pool.go new file mode 100644 index 00000000..11718811 --- /dev/null +++ b/middleware/cancel/internal/evict/pool.go @@ -0,0 +1,127 @@ +// Copyright 2024, Pulumi Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package evict + +import ( + "sync" +) + +// A data structure which provides amortized O(1) insertion, removal, and draining. +type Pool[T any] struct { + entries []entry[T] + + m sync.Mutex + + closed bool + + OnEvict func(T) +} + +type Handle[T any] struct { + cache *Pool[T] + idx int + revision int +} + +func (h Handle[T]) Evict() { + h.cache.m.Lock() + defer h.cache.m.Unlock() + + h.threadUnsafeEvict() +} + +func (h Handle[T]) threadUnsafeEvict() { + entry := &h.cache.entries[h.idx] + if !entry.has(h.revision) { + return + } + + h.cache.OnEvict(entry.value) + entry.markEmpty() +} + +type entry[T any] struct { + revision int + empty bool + value T +} + +func (e *entry[T]) markEmpty() { + if e.empty { + return + } + e.revision++ + e.empty = true +} + +func (e *entry[T]) has(revision int) bool { + return !e.empty && e.revision == revision +} + +// Insert a new element into the inOutCahce. The new element can be ejected by calling +// `evict`. If the element was already drained or if `evict` was already called, then +// `evict` will return true. Otherwise it returns false. +func (c *Pool[T]) Insert(t T) (ret Handle[T]) { + c.m.Lock() + defer c.m.Unlock() + + // If we are finished, immediately evict the returned handle. + if c.closed { + defer func() { ret.threadUnsafeEvict() }() + } + + // Check if an existing cell is empty + for i, entry := range c.entries { + if entry.empty { + entry.empty = false + entry.value = t + + c.entries[i] = entry + + return Handle[T]{ + cache: c, + idx: i, + revision: entry.revision, + } + } + } + + // No existing cells are empty, so create a new cell + i := len(c.entries) + c.entries = append(c.entries, entry[T]{ + value: t, + }) + return Handle[T]{cache: c, idx: i} + +} + +// Close the cache, evicting all elements and evicting future elements on insertion. +func (c *Pool[T]) Close() { + c.m.Lock() + defer c.m.Unlock() + + c.closed = true + + for i, e := range c.entries { + // Construct a handle to the maybe-empty slot in entries. + handle := Handle[T]{ + cache: c, + idx: i, + revision: e.revision, + } + + handle.threadUnsafeEvict() + } +} diff --git a/middleware/cancel/internal/evict/pool_test.go b/middleware/cancel/internal/evict/pool_test.go new file mode 100644 index 00000000..081beb80 --- /dev/null +++ b/middleware/cancel/internal/evict/pool_test.go @@ -0,0 +1,111 @@ +// Copyright 2024, Pulumi Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package evict + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestPool(t *testing.T) { + t.Parallel() + + evicted := map[int]struct{}{} + + cache := Pool[int]{ + OnEvict: func(i int) { + _, alreadyEvicted := evicted[i] + assert.False(t, alreadyEvicted) + evicted[i] = struct{}{} + }, + } + + h1 := cache.Insert(1) + cache.Insert(2) + + h1.Evict() + assert.Contains(t, evicted, 1) + + cache.Insert(3) + + cache.Close() + + assert.Equal(t, map[int]struct{}{ + 1: {}, + 2: {}, + 3: {}, + }, evicted) +} + +func TestPoolParallel(t *testing.T) { + t.Parallel() + + evicted := new(sync.Map) + + cache := Pool[int]{ + OnEvict: func(i int) { + _, alreadyEvicted := evicted.LoadOrStore(i, struct{}{}) + assert.False(t, alreadyEvicted) + }, + } + + var wait sync.WaitGroup + wait.Add(5) + + for i := 1; i <= 5; i++ { + go func(i int) { + min, max := i*100, (i+1)*100 + localEvict := make(map[Handle[int]]struct{}, 50) + for i := min; i < max; i++ { + h := cache.Insert(i) + if i%2 == 0 { + localEvict[h] = struct{}{} + } + } + + for h := range localEvict { + h.Evict() + } + + for i := min; i < max; i += 2 { + _, has := evicted.Load(i) + assert.True(t, has) + } + + wait.Done() + }(i) + } + + wait.Wait() + + lenSyncMap := func(m *sync.Map) int { + i := 0 + m.Range(func(any, any) bool { + i++ + return true + }) + return i + } + + assert.Equal(t, 250, lenSyncMap(evicted), + "Check that the 250 evens are present, and no other numbers are.") + + cache.Close() + + assert.Equal(t, 500, lenSyncMap(evicted), + "Check that all 500 numbers are now present") +} From 6d6b8aabff9bc4a3b8febc287635b88151d0b41b Mon Sep 17 00:00:00 2001 From: Ian Wahbe Date: Thu, 28 Mar 2024 15:35:44 +0100 Subject: [PATCH 3/7] Improve testing of concurrent systems --- .gitignore | 5 +- middleware/cancel/cancel_test.go | 15 --- tests/cancel_test.go | 175 ++++++++++++++++++++++++------- 3 files changed, 144 insertions(+), 51 deletions(-) delete mode 100644 middleware/cancel/cancel_test.go diff --git a/.gitignore b/.gitignore index 050cea08..5eb14448 100644 --- a/.gitignore +++ b/.gitignore @@ -23,4 +23,7 @@ examples/**/pulumi-resource-* /.vscode -**/testdata/rapid/** \ No newline at end of file +**/testdata/rapid/** + +go.work +go.work.sum \ No newline at end of file diff --git a/middleware/cancel/cancel_test.go b/middleware/cancel/cancel_test.go deleted file mode 100644 index 8d086e6a..00000000 --- a/middleware/cancel/cancel_test.go +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright 2024, Pulumi Corporation. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cancel diff --git a/tests/cancel_test.go b/tests/cancel_test.go index 35887b99..65a07e97 100644 --- a/tests/cancel_test.go +++ b/tests/cancel_test.go @@ -1,4 +1,4 @@ -// Copyright 2022, Pulumi Corporation. +// Copyright 2024, Pulumi Corporation. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -15,64 +15,169 @@ package tests import ( + "context" "sync" "testing" "github.com/blang/semver" + "github.com/pulumi/pulumi/sdk/v3/go/common/resource" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + p "github.com/pulumi/pulumi-go-provider" "github.com/pulumi/pulumi-go-provider/integration" "github.com/pulumi/pulumi-go-provider/middleware/cancel" - "github.com/stretchr/testify/assert" ) func TestGlobalCancel(t *testing.T) { t.Parallel() - wg := new(sync.WaitGroup) - wg.Add(4) - s := integration.NewServer("cancel", semver.MustParse("1.2.3"), + + const testSize = 5000 + require.True(t, testSize%2 == 0) + + noWaitCounter := new(sync.WaitGroup) + noWaitCounter.Add(testSize / 2) + + provider := integration.NewServer("cancel", semver.MustParse("1.2.3"), cancel.Wrap(p.Provider{ Create: func(ctx p.Context, req p.CreateRequest) (p.CreateResponse, error) { - select { - case <-ctx.Done(): - wg.Done() - return p.CreateResponse{ - ID: "cancled", - Properties: req.Properties, - }, nil + + // If a request is set to wait, then it pauses until it is canceled. + if req.Properties["wait"].BoolValue() { + <-ctx.Done() + + return p.CreateResponse{}, ctx.Err() } + + noWaitCounter.Done() + + return p.CreateResponse{}, nil }, })) - go func() { _, err := s.Create(p.CreateRequest{}); assert.NoError(t, err) }() - go func() { _, err := s.Create(p.CreateRequest{}); assert.NoError(t, err) }() - go func() { _, err := s.Create(p.CreateRequest{}); assert.NoError(t, err) }() - assert.NoError(t, s.Cancel()) - go func() { _, err := s.Create(p.CreateRequest{}); assert.NoError(t, err) }() - wg.Wait() + + finished := new(sync.WaitGroup) + finished.Add(testSize + (testSize / 2)) + + go func() { + // Make sure that all requests that should not be canceled have already gone through. + noWaitCounter.Wait() + + // Now cancel remaining requests. + err := provider.Cancel() + assert.NoError(t, err) + + // As a sanity check, send another testSize/2 requests. Check that they are immediately + // canceled. + for i := 0; i < testSize/2; i++ { + go func() { + _, err := provider.Create(p.CreateRequest{ + Properties: resource.PropertyMap{ + "wait": resource.NewProperty(true), + }, + }) + assert.ErrorIs(t, err, context.Canceled) + finished.Done() + }() + } + }() + + // create testSize requests. + // + // Half are configured to wait, while the other half are set to return immediately. + for i := 0; i < testSize; i++ { + shouldWait := i%2 == 0 + go func() { + _, err := provider.Create(p.CreateRequest{ + Properties: resource.PropertyMap{ + "wait": resource.NewProperty(shouldWait), + }, + }) + if shouldWait { + assert.ErrorIs(t, err, context.Canceled) + } else { + assert.NoError(t, err) + } + finished.Done() + }() + } + finished.Wait() } -func TestTimeoutApplication(t *testing.T) { +// TestCancelCreate checks that a Cancel that occurs during a concurrent operation +// (Create) cancels the context associated with the operation. +func TestCancelCreate(t *testing.T) { t.Parallel() - wg := new(sync.WaitGroup) - wg.Add(1) + + createCheck := make(chan bool) + + provider := integration.NewServer("cancel", semver.MustParse("1.2.3"), cancel.Wrap(p.Provider{ + Create: func(ctx p.Context, req p.CreateRequest) (p.CreateResponse, error) { + // The context should not be canceled yes + assert.NoError(t, ctx.Err()) + createCheck <- true + <-createCheck + + return p.CreateResponse{}, ctx.Err() + }, + })) + + go func() { + <-createCheck + assert.NoError(t, provider.Cancel()) + createCheck <- true + }() + + _, err := provider.Create(p.CreateRequest{}) + assert.ErrorIs(t, err, context.Canceled) +} + +// TestCancelTimeout checks that timeouts are applied. +// +// Note: if the timeout is not applied, the test will hang instead of fail. +func TestCancelTimeout(t *testing.T) { + t.Parallel() + + checkDeadline := func(ctx p.Context) error { + _, ok := ctx.Deadline() + assert.True(t, ok) + <-ctx.Done() + return ctx.Err() + } + s := integration.NewServer("cancel", semver.MustParse("1.2.3"), cancel.Wrap(p.Provider{ - Create: func(ctx p.Context, req p.CreateRequest) (p.CreateResponse, error) { - select { - case <-ctx.Done(): - wg.Done() - return p.CreateResponse{ - ID: "cancled", - Properties: req.Properties, - }, nil - } + Create: func(ctx p.Context, _ p.CreateRequest) (p.CreateResponse, error) { + return p.CreateResponse{}, checkDeadline(ctx) + }, + Update: func(ctx p.Context, _ p.UpdateRequest) (p.UpdateResponse, error) { + return p.UpdateResponse{}, checkDeadline(ctx) + }, + Delete: func(ctx p.Context, _ p.DeleteRequest) error { + return checkDeadline(ctx) }, })) - go func() { + t.Run("create", func(t *testing.T) { + t.Parallel() _, err := s.Create(p.CreateRequest{ - Timeout: 0.5, + Timeout: 0.1, }) - assert.NoError(t, err) - }() - wg.Wait() + assert.ErrorIs(t, err, context.DeadlineExceeded) + }) + + t.Run("update", func(t *testing.T) { + t.Parallel() + _, err := s.Update(p.UpdateRequest{ + Timeout: 0.1, + }) + assert.ErrorIs(t, err, context.DeadlineExceeded) + }) + + t.Run("delete", func(t *testing.T) { + t.Parallel() + err := s.Delete(p.DeleteRequest{ + Timeout: 0.1, + }) + assert.ErrorIs(t, err, context.DeadlineExceeded) + }) } From 9a69c1fefb85581d7715a43f9519a88a219215b1 Mon Sep 17 00:00:00 2001 From: Ian Wahbe Date: Thu, 28 Mar 2024 15:50:39 +0100 Subject: [PATCH 4/7] Set the -race flag for go tests --- Makefile | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/Makefile b/Makefile index 98144b21..f22bff74 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,8 @@ .PHONY: build build_examples install_examples lint lint-copyright lint-golang +GO_TEST_FLAGS=-race +GO_TEST=go test ${GO_TEST_FLAGS} + build: go build ./... @@ -8,13 +11,13 @@ test: test_unit test_examples .PHONY: test_unit test_unit: build - go test ./... - cd infer/tests && go test ./... - cd integration && go test ./... - cd resourcex && go test ./... - cd tests && go test ./... + ${GO_TEST} ./... + cd infer/tests && ${GO_TEST} ./... + cd integration && ${GO_TEST} ./... + cd resourcex && ${GO_TEST} ./... + cd tests && ${GO_TEST} ./... for d in examples/*; do if [ -d $$d ]; then \ - cd $$d; go test ./... || exit $$?; \ + cd $$d; ${GO_TEST} ./... || exit $$?; \ cd -; fi; done lint: lint-golang lint-copyright From 7cf4c3e641a0075b2d0409301cdaf61c66eb520e Mon Sep 17 00:00:00 2001 From: Ian Wahbe Date: Thu, 28 Mar 2024 15:56:57 +0100 Subject: [PATCH 5/7] Fix comment --- middleware/cancel/internal/evict/pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/middleware/cancel/internal/evict/pool.go b/middleware/cancel/internal/evict/pool.go index 11718811..4e76d049 100644 --- a/middleware/cancel/internal/evict/pool.go +++ b/middleware/cancel/internal/evict/pool.go @@ -70,7 +70,7 @@ func (e *entry[T]) has(revision int) bool { return !e.empty && e.revision == revision } -// Insert a new element into the inOutCahce. The new element can be ejected by calling +// Insert a new element into the Pool. The new element can be ejected by calling // `evict`. If the element was already drained or if `evict` was already called, then // `evict` will return true. Otherwise it returns false. func (c *Pool[T]) Insert(t T) (ret Handle[T]) { From 55b0110926521321b4fe78c356f424c4d853c563 Mon Sep 17 00:00:00 2001 From: Ian Wahbe Date: Fri, 29 Mar 2024 23:03:24 +0100 Subject: [PATCH 6/7] Expand on what `cancel` does --- middleware/cancel/cancel.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/middleware/cancel/cancel.go b/middleware/cancel/cancel.go index e295324b..eb82cb23 100644 --- a/middleware/cancel/cancel.go +++ b/middleware/cancel/cancel.go @@ -12,10 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Cancel ensures that contexts are canceled when their associated tasks are completed. -// There are two parts of this middleware: -// 1. Tying Provider.Cancel to all associated contexts. -// 2. Applying timeout information when available. +// The `cancel` package provides a middle-ware that ties the Cancel gRPC call from Pulumi +// to Go's `context.Context` cancellation system. +// +// Wrapping a provider in `cancel.Wrap` ensures 2 things: +// +// 1. When a resource operation times out, the associated context is canceled. +// +// 2. When `Cancel` is called, all outstanding gRPC methods have their associated contexts +// canceled. +// +// A `cancel.Wrap`ed provider will still call the `Cancel` method on the underlying +// provider. If NotImplemented is returned, it will be swallowed. package cancel import ( From d75230c983ee3d71351f23dc7bbb78731e36f2fe Mon Sep 17 00:00:00 2001 From: Ian Wahbe Date: Sat, 30 Mar 2024 11:39:07 +0100 Subject: [PATCH 7/7] O(1) amortized insertion This is accomplished by replacing a linear search of `c.entries` for empty cells with a maintained list of empty cells. --- middleware/cancel/internal/evict/pool.go | 35 ++++++++++++++++-------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/middleware/cancel/internal/evict/pool.go b/middleware/cancel/internal/evict/pool.go index 4e76d049..9978fa3a 100644 --- a/middleware/cancel/internal/evict/pool.go +++ b/middleware/cancel/internal/evict/pool.go @@ -16,12 +16,17 @@ package evict import ( "sync" + + "github.com/pulumi/pulumi/sdk/v3/go/common/util/contract" ) // A data structure which provides amortized O(1) insertion, removal, and draining. type Pool[T any] struct { entries []entry[T] + // emptyEntries holds the indexes of empty cells in entries. + emptyEntries []int + m sync.Mutex closed bool @@ -50,6 +55,7 @@ func (h Handle[T]) threadUnsafeEvict() { h.cache.OnEvict(entry.value) entry.markEmpty() + h.cache.emptyEntries = append(h.cache.emptyEntries, h.idx) } type entry[T any] struct { @@ -83,18 +89,23 @@ func (c *Pool[T]) Insert(t T) (ret Handle[T]) { } // Check if an existing cell is empty - for i, entry := range c.entries { - if entry.empty { - entry.empty = false - entry.value = t - - c.entries[i] = entry - - return Handle[T]{ - cache: c, - idx: i, - revision: entry.revision, - } + if l := len(c.emptyEntries); l > 0 { + // Pop the last empty entry + i := c.emptyEntries[l-1] + c.emptyEntries = c.emptyEntries[:l-1] + entry := &c.entries[i] + + contract.Assertf(entry.empty, "emptyEntries must contain only evicted slots") + + // Set the entry to hold t + entry.value = t + entry.empty = false + + // Return a handle to the new entry + return Handle[T]{ + cache: c, + idx: i, + revision: entry.revision, } }