Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable direct error handling for bundle plugin trigger method #7143

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions cmd/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cmd
import (
"bytes"
"context"
"errors"
"fmt"
"os"
"path/filepath"
Expand All @@ -11,6 +12,7 @@ import (
"testing"
"time"

"github.com/open-policy-agent/opa/ast"
"github.com/open-policy-agent/opa/cmd/internal/exec"
"github.com/open-policy-agent/opa/internal/file/archive"
loggingtest "github.com/open-policy-agent/opa/logging/test"
Expand Down Expand Up @@ -321,15 +323,32 @@ main contains "hello" if {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
go func(expectedErrors []string) {
err := runExecWithContext(ctx, params)
// Note(philipc): Catch the expected cancellation
// errors, allowing unexpected test failures through.
if err != context.Canceled {
t.Error(err)
return
var errs ast.Errors
if errors.As(err, &errs) {
for _, expErr := range expectedErrors {
found := false
for _, e := range errs {
if strings.Contains(e.Error(), expErr) {
found = true
break
}
}
if !found {
t.Errorf("Could not find expected error: %s in %v", expErr, errs)
return
}
}
} else {
t.Error(err)
return
}
}
}()
}(tc.expErrs)

if !test.Eventually(t, 5*time.Second, func() bool {
for _, expErr := range tc.expErrs {
Expand Down
53 changes: 53 additions & 0 deletions plugins/bundle/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package bundle

import (
"errors"
"fmt"

"github.com/open-policy-agent/opa/download"
)

// Errors represents a list of errors that occurred during a bundle load enriched by the bundle name.
type Errors []Error

func (e Errors) Unwrap() []error {
output := make([]error, len(e))
for i := range e {
output[i] = e[i]
}
return output
}
func (e Errors) Error() string {
err := errors.Join(e.Unwrap()...)
return err.Error()
}

type Error struct {
BundleName string
Code string
HTTPCode int
Message string
Err error
}

func NewBundleError(bundleName string, cause error) Error {
var (
httpError download.HTTPError
)
switch {
case cause == nil:
return Error{BundleName: bundleName, Code: "", HTTPCode: -1, Message: "", Err: nil}
case errors.As(cause, &httpError):
return Error{BundleName: bundleName, Code: errCode, HTTPCode: httpError.StatusCode, Message: httpError.Error(), Err: cause}
default:
return Error{BundleName: bundleName, Code: errCode, HTTPCode: -1, Message: cause.Error(), Err: cause}
}
}

func (e Error) Error() string {
return fmt.Sprintf("Bundle name: %s, Code: %s, HTTPCode: %d, Message: %s", e.BundleName, errCode, e.HTTPCode, e.Message)
}

func (e Error) Unwrap() error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Is there a reason to define this method?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will allow using errors.As for retrieving the wrapped error. Something like this:

errs := Errors{NewBundleError("example", download.HTTPError{StatusCode: 500})}

var httpError download.HTTPError
errors.As(errs, &httpError) 

if httpError.StatusCode == 500 { ...

There is a test in errors_test.go (TestUnwrap) that will start to fail if we remove the function.

return e.Err
}
145 changes: 145 additions & 0 deletions plugins/bundle/errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package bundle

import (
"errors"
"fmt"
"testing"

"github.com/open-policy-agent/opa/ast"
"github.com/open-policy-agent/opa/download"
)

func TestErrors(t *testing.T) {
errs := Errors{
NewBundleError("foo", fmt.Errorf("foo error")),
NewBundleError("bar", fmt.Errorf("bar error")),
}

expected := "Bundle name: foo, Code: bundle_error, HTTPCode: -1, Message: foo error\nBundle name: bar, Code: bundle_error, HTTPCode: -1, Message: bar error"
result := errs.Error()

if result != expected {
t.Errorf("Expected: %v \nbut got: %v", expected, result)
}
}

func TestUnwrapSlice(t *testing.T) {
fooErr := NewBundleError("foo", fmt.Errorf("foo error"))
barErr := NewBundleError("bar", fmt.Errorf("bar error"))

errs := Errors{fooErr, barErr}

result := errs.Unwrap()

if result[0].Error() != fooErr.Error() {
t.Fatalf("expected %v \nbut got: %v", fooErr, result[0])
}
if result[1].Error() != barErr.Error() {
t.Fatalf("expected %v \nbut got: %v", barErr, result[1])
}
}

func TestUnwrap(t *testing.T) {
serverHTTPError := NewBundleError("server", download.HTTPError{StatusCode: 500})
clientHTTPError := NewBundleError("client", download.HTTPError{StatusCode: 400})
astErrors := ast.Errors{ast.NewError(ast.ParseErr, ast.NewLocation(nil, "foo.rego", 100, 2), "blarg")}

errs := Errors{serverHTTPError, clientHTTPError, NewBundleError("ast", astErrors)}

// unwrap first bundle.Error
var bundleError Error
if !errors.As(errs, &bundleError) {
t.Fatal("failed to unwrap Error")
}
if bundleError.Error() != serverHTTPError.Error() {
t.Fatalf("expected: %v \ngot: %v", serverHTTPError, bundleError)
}

// unwrap first HTTPError
var httpError download.HTTPError
if !errors.As(errs, &httpError) {
t.Fatal("failed to unwrap Error")
}
if httpError.Error() != serverHTTPError.Err.Error() {
t.Fatalf("expected: %v \ngot: %v", serverHTTPError.Err, httpError)
}

// unwrap HTTPError from bundle.Error
if !errors.As(bundleError, &httpError) {
t.Fatal("failed to unwrap HTTPError")
}
if httpError.Error() != serverHTTPError.Err.Error() {
t.Fatalf("expected: %v \nbgot: %v", serverHTTPError.Err, httpError)
}

var unwrappedAstErrors ast.Errors
if !errors.As(errs, &unwrappedAstErrors) {
t.Fatal("failed to unwrap ast.Errors")
}
if unwrappedAstErrors.Error() != astErrors.Error() {
t.Fatalf("expected: %v \ngot: %v", astErrors, unwrappedAstErrors)
}
}

func TestHTTPErrorWrapping(t *testing.T) {
err := download.HTTPError{StatusCode: 500}
bundleErr := NewBundleError("foo", err)

if bundleErr.BundleName != "foo" {
t.Fatalf("BundleName: expected: %v \ngot: %v", "foo", bundleErr.BundleName)
}
if bundleErr.HTTPCode != err.StatusCode {
t.Fatalf("HTTPCode: expected: %v \ngot: %v", err.StatusCode, bundleErr.HTTPCode)
}
if bundleErr.Message != err.Error() {
t.Fatalf("Message: expected: %v \ngot: %v", err.Error(), bundleErr.Message)
}
if bundleErr.Code != errCode {
t.Fatalf("Code: expected: %v \ngot: %v", errCode, bundleErr.Code)
}
if bundleErr.Err != err {
t.Fatalf("Err: expected: %v \ngot: %v", err, bundleErr.Err)
}
}

func TestASTErrorsWrapping(t *testing.T) {
err := ast.Errors{ast.NewError(ast.ParseErr, ast.NewLocation(nil, "foo.rego", 100, 2), "blarg")}
bundleErr := NewBundleError("foo", err)

if bundleErr.BundleName != "foo" {
t.Fatalf("BundleName: expected: %v \ngot: %v", "foo", bundleErr.BundleName)
}
if bundleErr.HTTPCode != -1 {
t.Fatalf("HTTPCode: expected: %v \ngot: %v", -1, bundleErr.HTTPCode)
}
if bundleErr.Message != err.Error() {
t.Fatalf("Message: expected: %v \ngot: %v", err.Error(), bundleErr.Message)
}
if bundleErr.Code != errCode {
t.Fatalf("Code: expected: %v \ngot: %v", errCode, bundleErr.Code)
}
if bundleErr.Err.Error() != err.Error() {
t.Fatalf("Err: expected: %v \ngot: %v", err.Error(), bundleErr.Err.Error())
}
}

func TestGenericErrorWrapping(t *testing.T) {
err := fmt.Errorf("foo error")
bundleErr := NewBundleError("foo", err)

if bundleErr.BundleName != "foo" {
t.Fatalf("BundleName: expected: %v \ngot: %v", "foo", bundleErr.BundleName)
}
if bundleErr.HTTPCode != -1 {
t.Fatalf("HTTPCode: expected: %v \ngot: %v", -1, bundleErr.HTTPCode)
}
if bundleErr.Message != err.Error() {
t.Fatalf("Message: expected: %v \ngot: %v", err.Error(), bundleErr.Message)
}
if bundleErr.Code != errCode {
t.Fatalf("Code: expected: %v \ngot: %v", errCode, bundleErr.Code)
}
if bundleErr.Err.Error() != err.Error() {
t.Fatalf("Err: expected: %v \ngot: %v", err.Error(), bundleErr.Err.Error())
}
}
15 changes: 12 additions & 3 deletions plugins/bundle/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,16 +257,25 @@ func (p *Plugin) Loaders() map[string]Loader {

// Trigger triggers a bundle download on all configured bundles.
func (p *Plugin) Trigger(ctx context.Context) error {
var errs Errors

p.mtx.Lock()
downloaders := map[string]Loader{}
for name, dl := range p.downloaders {
downloaders[name] = dl
}
p.mtx.Unlock()

for _, d := range downloaders {
// plugin callback will log the trigger error and include it in the bundle status
_ = d.Trigger(ctx)
for name, d := range downloaders {
// plugin callback will also log the trigger error and include it in the bundle status
err := d.Trigger(ctx)
// only return errors for TriggerMode manual as periodic bundles will be retried
if err != nil && *p.config.Bundles[name].Trigger == plugins.TriggerManual {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the Config() method on the plugin to retrieve the config.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ashutosh-narkar do you think it makes sense to only return the errors for bundles in manual trigger mode?
It felt sensible to me as periodic triggermode will be retried by the bundle plugin but might also just create confusion.
it also creates this additional if statement which will then not be tested.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Manual trigger mode can currently only be configured via a library integration. So to keep parity with the discovery plugin, returning errors in the manual mode makes sense.

errs = append(errs, NewBundleError(name, err))
}
}
if len(errs) > 0 {
return errs
}
return nil
}
Expand Down
62 changes: 62 additions & 0 deletions plugins/bundle/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6522,6 +6522,68 @@ func TestPluginManualTriggerWithTimeout(t *testing.T) {
}
}

func TestPluginManualTriggerWithServerError(t *testing.T) {
Copy link
Author

@torwunder torwunder Oct 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test triggers the race condition.
My investigation led to the conclusion that this is triggered by having the bundle with the "periodic" trigger mode in the test.
The downloader.oneShot function will be triggered by the explicitly trigger method and also by the downloader.loop function.
The access to the shared variables (f.ex. etag, client) in Downloader is not synchronized and leads to the race condition.
A possible solution is to only trigger bundle dowloads with "manual" trigger mode in the trigger method.
F.ex. like this (in bundle.Plugin.Trigger)

func (p *Plugin) Trigger(ctx context.Context) error {
[...]
    for name, dl := range p.downloaders {
        downloaders[name] = dl
        p.cfgMtx.RLock()
        triggerMode := p.config.Bundles[name].Trigger
        p.cfgMtx.RUnlock()
        
        if triggerMode != nil && *triggerMode == plugins.TriggerManual {
	        downloaders[name] = dl
        }
    }
[...]

This would also solve the question if Trigger should return errors for bundles with "periodic" trigger mode.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ashutosh-narkar
Do you have a recommendation how to proceed with this?
To make the test green, I could remove the configuration for the bundle in "periodic" trigger mode but I think this might be a problem in the bundle plugin that should be addressed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think having only a manual trigger for the test should be fine.


ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

s := httptest.NewServer(http.HandlerFunc(func(resp http.ResponseWriter, _ *http.Request) {
resp.WriteHeader(500)
}))

// setup plugin pointing at fake server
manager := getTestManagerWithOpts([]byte(fmt.Sprintf(`{
"services": {
"default": {
"url": %q
}
}
}`, s.URL)))

var manual plugins.TriggerMode = "manual"
var periodic plugins.TriggerMode = "periodic"
var delay int64 = 10
polling := download.PollingConfig{MinDelaySeconds: &delay, MaxDelaySeconds: &delay}

plugin := New(&Config{
Bundles: map[string]*Source{
"manual": {
Service: "default",
SizeLimitBytes: int64(bundle.DefaultSizeLimitBytes),
Config: download.Config{Trigger: &manual},
},
"periodic": {
Service: "default",
SizeLimitBytes: int64(bundle.DefaultSizeLimitBytes),
Config: download.Config{Trigger: &periodic, Polling: polling},
},
},
}, manager)

err := plugin.Start(ctx)
if err != nil {
t.Fatal(err)
}
// manually trigger bundle download
err = plugin.Trigger(ctx)

plugin.Stop(ctx)

var bundleErrors Errors
if errors.As(err, &bundleErrors) {
if len(bundleErrors) != 1 {
t.Fatalf("expected exactly one error, got %d", len(bundleErrors))
}
for _, e := range bundleErrors {
if e.BundleName != "manual" {
t.Fatalf("expect error only for bundle with manual trigger mode")
}
}
} else {
t.Fatalf("expected type of error to be %s but got %s", reflect.TypeOf(bundleErrors), reflect.TypeOf(err))
}
}

func TestGetNormalizedBundleName(t *testing.T) {
cases := []struct {
input string
Expand Down
Loading