Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix cancelation panic #207

Merged
merged 7 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,7 @@ examples/**/pulumi-resource-*

/.vscode

**/testdata/rapid/**
**/testdata/rapid/**

go.work
go.work.sum
15 changes: 9 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
.PHONY: build build_examples install_examples lint lint-copyright lint-golang

GO_TEST_FLAGS=-race
GO_TEST=go test ${GO_TEST_FLAGS}

build:
go build ./...

Expand All @@ -8,13 +11,13 @@ test: test_unit test_examples

.PHONY: test_unit
test_unit: build
go test ./...
cd infer/tests && go test ./...
cd integration && go test ./...
cd resourcex && go test ./...
cd tests && go test ./...
${GO_TEST} ./...
cd infer/tests && ${GO_TEST} ./...
cd integration && ${GO_TEST} ./...
cd resourcex && ${GO_TEST} ./...
cd tests && ${GO_TEST} ./...
for d in examples/*; do if [ -d $$d ]; then \
cd $$d; go test ./... || exit $$?; \
cd $$d; ${GO_TEST} ./... || exit $$?; \
cd -; fi; done

lint: lint-golang lint-copyright
Expand Down
249 changes: 76 additions & 173 deletions middleware/cancel/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,50 +12,49 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Cancel ensures that contexts are canceled when their associated tasks are completed.
// There are two parts of this middleware:
// 1. Tying Provider.Cancel to all associated contexts.
// 2. Applying timeout information when available.
// The `cancel` package provides a middle-ware that ties the Cancel gRPC call from Pulumi
// to Go's `context.Context` cancellation system.
//
// Wrapping a provider in `cancel.Wrap` ensures 2 things:
//
// 1. When a resource operation times out, the associated context is canceled.
//
// 2. When `Cancel` is called, all outstanding gRPC methods have their associated contexts
// canceled.
//
// A `cancel.Wrap`ed provider will still call the `Cancel` method on the underlying
// provider. If NotImplemented is returned, it will be swallowed.
Comment on lines +15 to +26
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some docs clarifying what cancel currently does. This behavior is unchanged before and after this PR. The PR only prevents panics on concurrent creations and cancelations.

Since this PR fixes a P1 (panic) and doesn't change the design of cancel.Wrap, I suggest that we merge as is unless there are suggestions on how to better implement the existing semantic.

I can tell that there is disagreement on the desired behavior of cancel.Wrap. I'm happy to have that discussion, but I think it should be a separate discussion, outside the scope of this PR.

CC @blampe @t0yv0

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened #208 to track a design discussion on what cancel should do.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me!

package cancel

import (
"context"
"sync"
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

p "github.com/pulumi/pulumi-go-provider"
"github.com/pulumi/pulumi-go-provider/middleware/cancel/internal/evict"
)

func Wrap(provider p.Provider) p.Provider {
var canceled bool
var cancelFuncs inOutCache[context.CancelFunc]
cancelFuncs := evict.Pool[context.CancelFunc]{
OnEvict: func(f context.CancelFunc) { f() },
}
cancel := func(ctx p.Context, timeout float64) (p.Context, func()) {
var cancel context.CancelFunc
if timeout == noTimeout {
ctx, cancel = p.CtxWithCancel(ctx)
} else {
ctx, cancel = p.CtxWithTimeout(ctx, time.Second*time.Duration(timeout))
}
if canceled {
cancel()
return ctx, func() {}
}
evict := cancelFuncs.insert(cancel)
return ctx, func() {
if !evict() {
cancel()
}
}

handle := cancelFuncs.Insert(cancel)
return ctx, handle.Evict
}
wrapper := provider
wrapper.Cancel = func(ctx p.Context) error {
canceled = true
for _, f := range cancelFuncs.drain() {
f()
}
cancelFuncs.Close()

// We consider this a valid implementation of the Cancel RPC request. We still pass on
// the request so downstream provides *may* rely on the Cancel call, but we catch an
Expand All @@ -70,165 +69,69 @@ func Wrap(provider p.Provider) p.Provider {
}
return err
}
if provider.GetSchema != nil {
wrapper.GetSchema = func(ctx p.Context, req p.GetSchemaRequest) (p.GetSchemaResponse, error) {
ctx, end := cancel(ctx, noTimeout)
defer end()
return provider.GetSchema(ctx, req)
}
}
if provider.CheckConfig != nil {
wrapper.CheckConfig = func(ctx p.Context, req p.CheckRequest) (p.CheckResponse, error) {
ctx, end := cancel(ctx, noTimeout)
defer end()
return provider.CheckConfig(ctx, req)
}
}
if provider.DiffConfig != nil {
wrapper.DiffConfig = func(ctx p.Context, req p.DiffRequest) (p.DiffResponse, error) {
ctx, end := cancel(ctx, noTimeout)
defer end()
return provider.DiffConfig(ctx, req)
}
}
if provider.Configure != nil {
wrapper.Configure = func(ctx p.Context, req p.ConfigureRequest) error {
ctx, end := cancel(ctx, noTimeout)
defer end()
return provider.Configure(ctx, req)
}
}
if provider.Invoke != nil {
wrapper.Invoke = func(ctx p.Context, req p.InvokeRequest) (p.InvokeResponse, error) {
ctx, end := cancel(ctx, noTimeout)
defer end()
return provider.Invoke(ctx, req)
}
}
if provider.Check != nil {
wrapper.Check = func(ctx p.Context, req p.CheckRequest) (p.CheckResponse, error) {
ctx, end := cancel(ctx, noTimeout)
defer end()
return provider.Check(ctx, req)
}
}
if provider.Diff != nil {
wrapper.Diff = func(ctx p.Context, req p.DiffRequest) (p.DiffResponse, error) {
ctx, end := cancel(ctx, noTimeout)
defer end()
return provider.Diff(ctx, req)
}
}
if provider.Create != nil {
wrapper.Create = func(ctx p.Context, req p.CreateRequest) (p.CreateResponse, error) {
ctx, end := cancel(ctx, req.Timeout)
defer end()
return provider.Create(ctx, req)
}
}
if provider.Read != nil {
wrapper.Read = func(ctx p.Context, req p.ReadRequest) (p.ReadResponse, error) {
ctx, end := cancel(ctx, noTimeout)
defer end()
return provider.Read(ctx, req)
}
}
if provider.Update != nil {
wrapper.Update = func(ctx p.Context, req p.UpdateRequest) (p.UpdateResponse, error) {
ctx, end := cancel(ctx, req.Timeout)
defer end()
return provider.Update(ctx, req)
}
}
if provider.Delete != nil {
wrapper.Delete = func(ctx p.Context, req p.DeleteRequest) error {
ctx, end := cancel(ctx, req.Timeout)
defer end()
return provider.Delete(ctx, req)
}
}
if provider.Construct != nil {
wrapper.Construct = func(ctx p.Context, req p.ConstructRequest) (p.ConstructResponse, error) {
ctx, end := cancel(ctx, noTimeout)
defer end()
return provider.Construct(ctx, req)
}
}
return wrapper
}

const noTimeout float64 = 0

// A data structure which provides amortized O(1) insertion, removal, and draining.
type inOutCache[T any] struct {
values []*entry[T] // An unorderd list of stored values or tombstone (nil) entries.
tombstones []int // An unordered list of empty slots in values
m sync.Mutex
inDrain bool // Wheither the cache is currently being drained. inDrain=true implies m can be ignored.
}

type entry[T any] struct {
evict func() bool
value T
// Wrap each gRPC method to transform a cancel call into a cancel on
// context.Cancel.
wrapper.GetSchema = setCancel2(cancel, provider.GetSchema, nil)
wrapper.CheckConfig = setCancel2(cancel, provider.CheckConfig, nil)
wrapper.DiffConfig = setCancel2(cancel, provider.DiffConfig, nil)
wrapper.Configure = setCancel1(cancel, provider.Configure, nil)
wrapper.Invoke = setCancel2(cancel, provider.Invoke, nil)
wrapper.Check = setCancel2(cancel, provider.Check, nil)
wrapper.Diff = setCancel2(cancel, provider.Diff, nil)
wrapper.Create = setCancel2(cancel, provider.Create, func(r p.CreateRequest) float64 {
return r.Timeout
})
wrapper.Read = setCancel2(cancel, provider.Read, nil)
wrapper.Update = setCancel2(cancel, provider.Update, func(r p.UpdateRequest) float64 {
return r.Timeout
})
wrapper.Delete = setCancel1(cancel, provider.Delete, func(r p.DeleteRequest) float64 {
return r.Timeout
})
wrapper.Construct = setCancel2(cancel, provider.Construct, nil)
return wrapper
}

// Insert a new element into the inOutCahce. The new element can be ejected by calling
// `evict`. If the element was already drained or if `evict` was already called, then
// `evict` will return true. Otherwise it returns false.
func (h *inOutCache[T]) insert(t T) (evict func() (missing bool)) {
h.m.Lock()
defer h.m.Unlock()
var i int // The index in values of the new entry.
if len(h.tombstones) == 0 {
i = len(h.values) // We extend values.
} else {
// There is an empty slot in values, so use that.
i = h.tombstones[len(h.tombstones)-1]
h.tombstones = h.tombstones[:len(h.tombstones)-1]
func setCancel1[
Req any,
F func(p.Context, Req) error,
Cancel func(ctx p.Context, timeout float64) (p.Context, func()),
GetTimeout func(Req) float64,
](cancel Cancel, f F, getTimeout GetTimeout) F {
if f == nil {
return nil
}
return func(ctx p.Context, req Req) error {
var timeout float64
if getTimeout != nil {
timeout = getTimeout(req)
}
ctx, end := cancel(ctx, timeout)
defer end()
return f(ctx, req)
}
}

el := &entry[T]{
value: t,
}
el.evict = func() bool {
if !h.inDrain {
h.m.Lock()
defer h.m.Unlock()
}
gone := el.evict == nil
if gone {
return true
func setCancel2[
Req any, Resp any,
F func(p.Context, Req) (Resp, error),
Cancel func(ctx p.Context, timeout float64) (p.Context, func()),
GetTimeout func(Req) float64,
](cancel Cancel, f F, getTimeout GetTimeout) F {
if f == nil {
return nil
}
return func(ctx p.Context, req Req) (Resp, error) {
var timeout float64
if getTimeout != nil {
timeout = getTimeout(req)
}
el.evict = nil
h.values[i] = nil
h.tombstones = append(h.tombstones, i)
return gone

ctx, end := cancel(ctx, timeout)
defer end()
return f(ctx, req)
}

// Push the value
if len(h.tombstones) == 0 {
h.values = append(h.values, el)
} else {
h.values[i] = el
}
return el.evict
}

// Remove all values from the inOutCache, and return them.
func (h *inOutCache[T]) drain() []T {
h.m.Lock()
defer h.m.Unlock()
// Setting inDrain indicates a trusted actor holds the mutex, indicating that evict
// functions don't need to grab the mutex before executing.
h.inDrain = true
defer func() { h.inDrain = false }()
values := []T{} // Values currently in the cache.
for _, v := range h.values {
if v != nil {
v.evict()
values = append(values, v.value)
}
}
return values
}
const noTimeout float64 = 0
57 changes: 0 additions & 57 deletions middleware/cancel/cancel_test.go

This file was deleted.

Loading
Loading