Skip to content

Commit

Permalink
Fix cancelation panic (#207)
Browse files Browse the repository at this point in the history
Fixes #186 

This PR is best reviewed by commit:

- 0ecbbcd is a pure refactor to improve
the readability of the `cancel.Wrap` method.
- 997d21b Replaces `inOutCache` with
`evict.Pool`: A data structure better suited for purpose. `cancel` uses
`evict.Pool` to keep track of the set of current requests, ensuring that
the request associated with each context is canceled if one of three
events happen:
  - The request returns.
  - The request times out (only for `Create`, `Update` and `Delete`).
  - The provider receives a `Cancel` request.
- 6d6b8aa improves the integration
testing around the `cancel` middleware, especially under heavy
concurrency. I'm pleased to report that
6d6b8aa without
997d21b regularly panics, showing that
the improved testing does cover #186.
- 9a69c1f sets the `-race` flag for
tests. This would have also caught #186.
  • Loading branch information
iwahbe authored Apr 1, 2024
1 parent 9fbffb6 commit e49130e
Show file tree
Hide file tree
Showing 7 changed files with 478 additions and 272 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,7 @@ examples/**/pulumi-resource-*

/.vscode

**/testdata/rapid/**
**/testdata/rapid/**

go.work
go.work.sum
15 changes: 9 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
.PHONY: build build_examples install_examples lint lint-copyright lint-golang

GO_TEST_FLAGS=-race
GO_TEST=go test ${GO_TEST_FLAGS}

build:
go build ./...

Expand All @@ -8,13 +11,13 @@ test: test_unit test_examples

.PHONY: test_unit
test_unit: build
go test ./...
cd infer/tests && go test ./...
cd integration && go test ./...
cd resourcex && go test ./...
cd tests && go test ./...
${GO_TEST} ./...
cd infer/tests && ${GO_TEST} ./...
cd integration && ${GO_TEST} ./...
cd resourcex && ${GO_TEST} ./...
cd tests && ${GO_TEST} ./...
for d in examples/*; do if [ -d $$d ]; then \
cd $$d; go test ./... || exit $$?; \
cd $$d; ${GO_TEST} ./... || exit $$?; \
cd -; fi; done

lint: lint-golang lint-copyright
Expand Down
249 changes: 76 additions & 173 deletions middleware/cancel/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,50 +12,49 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Cancel ensures that contexts are canceled when their associated tasks are completed.
// There are two parts of this middleware:
// 1. Tying Provider.Cancel to all associated contexts.
// 2. Applying timeout information when available.
// The `cancel` package provides a middle-ware that ties the Cancel gRPC call from Pulumi
// to Go's `context.Context` cancellation system.
//
// Wrapping a provider in `cancel.Wrap` ensures 2 things:
//
// 1. When a resource operation times out, the associated context is canceled.
//
// 2. When `Cancel` is called, all outstanding gRPC methods have their associated contexts
// canceled.
//
// A `cancel.Wrap`ed provider will still call the `Cancel` method on the underlying
// provider. If NotImplemented is returned, it will be swallowed.
package cancel

import (
"context"
"sync"
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

p "github.com/pulumi/pulumi-go-provider"
"github.com/pulumi/pulumi-go-provider/middleware/cancel/internal/evict"
)

func Wrap(provider p.Provider) p.Provider {
var canceled bool
var cancelFuncs inOutCache[context.CancelFunc]
cancelFuncs := evict.Pool[context.CancelFunc]{
OnEvict: func(f context.CancelFunc) { f() },
}
cancel := func(ctx p.Context, timeout float64) (p.Context, func()) {
var cancel context.CancelFunc
if timeout == noTimeout {
ctx, cancel = p.CtxWithCancel(ctx)
} else {
ctx, cancel = p.CtxWithTimeout(ctx, time.Second*time.Duration(timeout))
}
if canceled {
cancel()
return ctx, func() {}
}
evict := cancelFuncs.insert(cancel)
return ctx, func() {
if !evict() {
cancel()
}
}

handle := cancelFuncs.Insert(cancel)
return ctx, handle.Evict
}
wrapper := provider
wrapper.Cancel = func(ctx p.Context) error {
canceled = true
for _, f := range cancelFuncs.drain() {
f()
}
cancelFuncs.Close()

// We consider this a valid implementation of the Cancel RPC request. We still pass on
// the request so downstream provides *may* rely on the Cancel call, but we catch an
Expand All @@ -70,165 +69,69 @@ func Wrap(provider p.Provider) p.Provider {
}
return err
}
if provider.GetSchema != nil {
wrapper.GetSchema = func(ctx p.Context, req p.GetSchemaRequest) (p.GetSchemaResponse, error) {
ctx, end := cancel(ctx, noTimeout)
defer end()
return provider.GetSchema(ctx, req)
}
}
if provider.CheckConfig != nil {
wrapper.CheckConfig = func(ctx p.Context, req p.CheckRequest) (p.CheckResponse, error) {
ctx, end := cancel(ctx, noTimeout)
defer end()
return provider.CheckConfig(ctx, req)
}
}
if provider.DiffConfig != nil {
wrapper.DiffConfig = func(ctx p.Context, req p.DiffRequest) (p.DiffResponse, error) {
ctx, end := cancel(ctx, noTimeout)
defer end()
return provider.DiffConfig(ctx, req)
}
}
if provider.Configure != nil {
wrapper.Configure = func(ctx p.Context, req p.ConfigureRequest) error {
ctx, end := cancel(ctx, noTimeout)
defer end()
return provider.Configure(ctx, req)
}
}
if provider.Invoke != nil {
wrapper.Invoke = func(ctx p.Context, req p.InvokeRequest) (p.InvokeResponse, error) {
ctx, end := cancel(ctx, noTimeout)
defer end()
return provider.Invoke(ctx, req)
}
}
if provider.Check != nil {
wrapper.Check = func(ctx p.Context, req p.CheckRequest) (p.CheckResponse, error) {
ctx, end := cancel(ctx, noTimeout)
defer end()
return provider.Check(ctx, req)
}
}
if provider.Diff != nil {
wrapper.Diff = func(ctx p.Context, req p.DiffRequest) (p.DiffResponse, error) {
ctx, end := cancel(ctx, noTimeout)
defer end()
return provider.Diff(ctx, req)
}
}
if provider.Create != nil {
wrapper.Create = func(ctx p.Context, req p.CreateRequest) (p.CreateResponse, error) {
ctx, end := cancel(ctx, req.Timeout)
defer end()
return provider.Create(ctx, req)
}
}
if provider.Read != nil {
wrapper.Read = func(ctx p.Context, req p.ReadRequest) (p.ReadResponse, error) {
ctx, end := cancel(ctx, noTimeout)
defer end()
return provider.Read(ctx, req)
}
}
if provider.Update != nil {
wrapper.Update = func(ctx p.Context, req p.UpdateRequest) (p.UpdateResponse, error) {
ctx, end := cancel(ctx, req.Timeout)
defer end()
return provider.Update(ctx, req)
}
}
if provider.Delete != nil {
wrapper.Delete = func(ctx p.Context, req p.DeleteRequest) error {
ctx, end := cancel(ctx, req.Timeout)
defer end()
return provider.Delete(ctx, req)
}
}
if provider.Construct != nil {
wrapper.Construct = func(ctx p.Context, req p.ConstructRequest) (p.ConstructResponse, error) {
ctx, end := cancel(ctx, noTimeout)
defer end()
return provider.Construct(ctx, req)
}
}
return wrapper
}

const noTimeout float64 = 0

// A data structure which provides amortized O(1) insertion, removal, and draining.
type inOutCache[T any] struct {
values []*entry[T] // An unorderd list of stored values or tombstone (nil) entries.
tombstones []int // An unordered list of empty slots in values
m sync.Mutex
inDrain bool // Wheither the cache is currently being drained. inDrain=true implies m can be ignored.
}

type entry[T any] struct {
evict func() bool
value T
// Wrap each gRPC method to transform a cancel call into a cancel on
// context.Cancel.
wrapper.GetSchema = setCancel2(cancel, provider.GetSchema, nil)
wrapper.CheckConfig = setCancel2(cancel, provider.CheckConfig, nil)
wrapper.DiffConfig = setCancel2(cancel, provider.DiffConfig, nil)
wrapper.Configure = setCancel1(cancel, provider.Configure, nil)
wrapper.Invoke = setCancel2(cancel, provider.Invoke, nil)
wrapper.Check = setCancel2(cancel, provider.Check, nil)
wrapper.Diff = setCancel2(cancel, provider.Diff, nil)
wrapper.Create = setCancel2(cancel, provider.Create, func(r p.CreateRequest) float64 {
return r.Timeout
})
wrapper.Read = setCancel2(cancel, provider.Read, nil)
wrapper.Update = setCancel2(cancel, provider.Update, func(r p.UpdateRequest) float64 {
return r.Timeout
})
wrapper.Delete = setCancel1(cancel, provider.Delete, func(r p.DeleteRequest) float64 {
return r.Timeout
})
wrapper.Construct = setCancel2(cancel, provider.Construct, nil)
return wrapper
}

// Insert a new element into the inOutCahce. The new element can be ejected by calling
// `evict`. If the element was already drained or if `evict` was already called, then
// `evict` will return true. Otherwise it returns false.
func (h *inOutCache[T]) insert(t T) (evict func() (missing bool)) {
h.m.Lock()
defer h.m.Unlock()
var i int // The index in values of the new entry.
if len(h.tombstones) == 0 {
i = len(h.values) // We extend values.
} else {
// There is an empty slot in values, so use that.
i = h.tombstones[len(h.tombstones)-1]
h.tombstones = h.tombstones[:len(h.tombstones)-1]
func setCancel1[
Req any,
F func(p.Context, Req) error,
Cancel func(ctx p.Context, timeout float64) (p.Context, func()),
GetTimeout func(Req) float64,
](cancel Cancel, f F, getTimeout GetTimeout) F {
if f == nil {
return nil
}
return func(ctx p.Context, req Req) error {
var timeout float64
if getTimeout != nil {
timeout = getTimeout(req)
}
ctx, end := cancel(ctx, timeout)
defer end()
return f(ctx, req)
}
}

el := &entry[T]{
value: t,
}
el.evict = func() bool {
if !h.inDrain {
h.m.Lock()
defer h.m.Unlock()
}
gone := el.evict == nil
if gone {
return true
func setCancel2[
Req any, Resp any,
F func(p.Context, Req) (Resp, error),
Cancel func(ctx p.Context, timeout float64) (p.Context, func()),
GetTimeout func(Req) float64,
](cancel Cancel, f F, getTimeout GetTimeout) F {
if f == nil {
return nil
}
return func(ctx p.Context, req Req) (Resp, error) {
var timeout float64
if getTimeout != nil {
timeout = getTimeout(req)
}
el.evict = nil
h.values[i] = nil
h.tombstones = append(h.tombstones, i)
return gone

ctx, end := cancel(ctx, timeout)
defer end()
return f(ctx, req)
}

// Push the value
if len(h.tombstones) == 0 {
h.values = append(h.values, el)
} else {
h.values[i] = el
}
return el.evict
}

// Remove all values from the inOutCache, and return them.
func (h *inOutCache[T]) drain() []T {
h.m.Lock()
defer h.m.Unlock()
// Setting inDrain indicates a trusted actor holds the mutex, indicating that evict
// functions don't need to grab the mutex before executing.
h.inDrain = true
defer func() { h.inDrain = false }()
values := []T{} // Values currently in the cache.
for _, v := range h.values {
if v != nil {
v.evict()
values = append(values, v.value)
}
}
return values
}
const noTimeout float64 = 0
57 changes: 0 additions & 57 deletions middleware/cancel/cancel_test.go

This file was deleted.

Loading

0 comments on commit e49130e

Please sign in to comment.