Skip to content

Commit

Permalink
Cleanup attribute bag usage in mixer/pkg/api. (istio#4228)
Browse files Browse the repository at this point in the history
* Cleanup attribute bag usage in mixer/pkg/api.

- Use snapshotting feature to fix bug where Check was not returning the correct set of
referenced attributes for quota results. It wasn't included attributes referenced from
the APA.

- Get rid of the compatibility bag goo from the gRPC server code. This is no longer
needed.

* Convert remaining uses of target.* attributes to destination.* instead.
  • Loading branch information
geeknoid authored Mar 14, 2018
1 parent 4edcc81 commit 5e56399
Show file tree
Hide file tree
Showing 32 changed files with 129 additions and 273 deletions.
13 changes: 0 additions & 13 deletions mixer/adapter/circonus/operatorconfig/attributes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ spec:
valueType: STRING
source.user:
valueType: STRING
target.uid:
valueType: STRING
destination.uid:
valueType: STRING
connection.id:
Expand Down Expand Up @@ -89,17 +87,6 @@ spec:
valueType: STRING
source.serviceAccount:
valueType: STRING
target.ip:
valueType: IP_ADDRESS
target.labels:
valueType: STRING_MAP
target.name:
valueType: STRING
target.namespace:
valueType: STRING
target.service:
valueType: STRING
target.serviceAccount:
valueType: STRING
destination.ip:
valueType: IP_ADDRESS
Expand Down
4 changes: 2 additions & 2 deletions mixer/adapter/opa/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ spec:
subject:
user: source.uid | ""
action:
namespace: target.namespace | "default"
service: target.service | ""
namespace: destination.namespace | "default"
service: destination.service | ""
method: request.method | ""
path: request.path | ""

Expand Down
27 changes: 11 additions & 16 deletions mixer/adapter/opa/opa_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,17 @@ spec:
value_type: STRING
source.groups:
value_type: STRING
target.namespace:
destination.namespace:
value_type: STRING
target.service:
destination.service:
value_type: STRING
request.method:
value_type: STRING
request.path:
value_type: STRING
source.service:
value_type: STRING
target.service:
destination.service:
value_type: STRING
---
Expand Down Expand Up @@ -81,13 +81,13 @@ spec:
user: source.uid | ""
groups: source.groups | ""
action:
namespace: target.namespace | "default"
service: target.service | ""
namespace: destination.namespace | "default"
service: destination.service | ""
method: request.method | ""
path: request.path | ""
properties:
source: source.service | ""
target: target.service | ""
target: destination.service | ""
---
apiVersion: "config.istio.io/v1alpha2"
Expand Down Expand Up @@ -210,50 +210,45 @@ func TestServer(t *testing.T) {
}{
"Not important API": {
attrs: map[string]interface{}{
"destination.service": "svc.cluster.local",
"source.uid": "janet",
"request.path": "/detail/alice",
"target.service": "landing_page",
"destination.service": "landing_page",
"source.service": "details",
},
expectedStatusCode: 0,
},
"Self permission": {
attrs: map[string]interface{}{
"destination.service": "svc.cluster.local",
"source.uid": "janet",
"request.path": "/detail/janet",
"target.service": "landing_page",
"destination.service": "landing_page",
"source.service": "details",
},
expectedStatusCode: 0,
},
"Manager permission": {
attrs: map[string]interface{}{
"destination.service": "svc.cluster.local",
"source.uid": "janet",
"request.path": "/reviews/alice",
"target.service": "landing_page",
"destination.service": "landing_page",
"source.service": "details",
},
expectedStatusCode: 0,
},
"HR permission": {
attrs: map[string]interface{}{
"destination.service": "svc.cluster.local",
"source.uid": "ken",
"request.path": "/reviews/janet",
"target.service": "landing_page",
"destination.service": "landing_page",
"source.service": "details",
},
expectedStatusCode: 0,
},
"Denied request": {
attrs: map[string]interface{}{
"destination.service": "svc.cluster.local",
"source.uid": "janet",
"request.path": "/detail/ken",
"target.service": "landing_pages",
"destination.service": "landing_pages",
"source.service": "invalid",
},
expectedStatusCode: 7,
Expand Down
8 changes: 4 additions & 4 deletions mixer/doc/adapter-development-walkthrough.md
Original file line number Diff line number Diff line change
Expand Up @@ -455,10 +455,10 @@ spec:
value: "1"
dimensions:
source: source.labels["app"] | "unknown"
target: target.service | "unknown"
service: target.labels["app"] | "unknown"
target: destination.service | "unknown"
service: destination.labels["app"] | "unknown"
method: request.path | "unknown"
version: target.labels["version"] | "unknown"
version: destination.labels["version"] | "unknown"
response_code: response.code | 200
monitored_resource_type: '"UNSPECIFIED"'
---
Expand Down Expand Up @@ -552,7 +552,7 @@ You can even try passing other attributes to mixer server and inspect your out.t
the adapter changes. For example

```bash
pushd $MIXER_REPO && go install ./... && mixc report -s="destination.service=svc.cluster.local,target.service=mySrvc" -i="response.code=400" --stringmap_attributes="target.labels=app:dummyapp"
pushd $MIXER_REPO && go install ./... && mixc report -s="destination.service=svc.cluster.local,destination.service=mySrvc" -i="response.code=400" --stringmap_attributes="destination.labels=app:dummyapp"
```

**If you have reached this far, congratulate yourself !!**. You have successfully created a Mixer adapter. You can
Expand Down
4 changes: 2 additions & 2 deletions mixer/doc/running-local-mixer.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ Note that `source.ip` is an ip address specified as 4 `:` separated bytes.
`192.0.0.2` is encoded as `c0:0:0:2` in the example.

```shell
bazel-bin/mixer/cmd/mixc/mixc check --string_attributes destination.service=abc.ns.svc.cluster.local,source.name=myservice,target.port=8080 --stringmap_attributes "request.headers=clnt:abcd;source:abcd,destination.labels=app:ratings,source.labels=version:v2" --timestamp_attributes request.time="2017-07-04T00:01:10Z" --bytes_attributes source.ip=c0:0:0:2
bazel-bin/mixer/cmd/mixc/mixc check --string_attributes destination.service=abc.ns.svc.cluster.local,source.name=myservice,destination.port=8080 --stringmap_attributes "request.headers=clnt:abcd;source:abcd,destination.labels=app:ratings,source.labels=version:v2" --timestamp_attributes request.time="2017-07-04T00:01:10Z" --bytes_attributes source.ip=c0:0:0:2

Check RPC completed successfully. Check status was OK
Valid use count: 10000, valid duration: 5m0s
```

The following command sends a `report` request to Mixer.
```shell
bazel-bin/mixer/cmd/mixc/mixc report --string_attributes destination.service=abc.ns.svc.cluster.local,source.name=myservice,target.port=8080 --stringmap_attributes "request.headers=clnt:abc;source:abcd,destination.labels=app:ratings,source.labels=version:v2" --int64_attributes response.duration=2003,response.size=1024 --timestamp_attributes request.time="2017-07-04T00:01:10Z" --bytes_attributes source.ip=c0:0:0:2
bazel-bin/mixer/cmd/mixc/mixc report --string_attributes destination.service=abc.ns.svc.cluster.local,source.name=myservice,destination.port=8080 --stringmap_attributes "request.headers=clnt:abc;source:abcd,destination.labels=app:ratings,source.labels=version:v2" --int64_attributes response.duration=2003,response.size=1024 --timestamp_attributes request.time="2017-07-04T00:01:10Z" --bytes_attributes source.ip=c0:0:0:2

Report RPC returned OK
```
106 changes: 30 additions & 76 deletions mixer/pkg/api/grpcServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package api

import (
"fmt"
"strings"
"time"

opentracing "github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -80,40 +79,6 @@ func NewGRPCServer(dispatcher runtime.Dispatcher, gp *pool.GoroutinePool) mixerp
}
}

// compatBag implements compatibility between destination.* and target.* attributes.
type compatBag struct {
parent attribute.Bag
}

func (c *compatBag) String() string {
return c.parent.String()
}

// if a destination.* attribute is missing, check the corresponding target.* attribute.
func (c *compatBag) Get(name string) (v interface{}, found bool) {
v, found = c.parent.Get(name)
if found {
return
}
if !strings.HasPrefix(name, "destination.") {
return
}
compatAttr := strings.Replace(name, "destination.", "target.", 1)
v, found = c.parent.Get(compatAttr)
if found {
log.Warna("Deprecated attribute found: ", compatAttr)
}
return
}

func (c *compatBag) Names() []string {
return c.parent.Names()
}

func (c *compatBag) Done() {
c.parent.Done()
}

// Check is the entry point for the external Check method
func (s *grpcServer) Check(legacyCtx legacyContext.Context, req *mixerpb.CheckRequest) (*mixerpb.CheckResponse, error) {
// TODO: this code doesn't distinguish between RPC failures when communicating with adapters and
Expand All @@ -122,42 +87,35 @@ func (s *grpcServer) Check(legacyCtx legacyContext.Context, req *mixerpb.CheckRe
// request was denied? This will need to be addressed in the new adapter model. In the meantime,
// RPC failure is treated as a semantic denial.

requestBag := attribute.NewProtoBag(&req.Attributes, s.globalDict, s.globalWordList)
log.Debug("Dispatching Preprocess Check")

globalWordCount := int(req.GlobalWordCount)

// compatReqBag ensures that preprocessor input handles deprecated attributes gracefully.
compatReqBag := &compatBag{requestBag}
preprocResponseBag := attribute.GetMutableBag(nil)

log.Debuga("Dispatching Preprocess Check")
protoBag := attribute.NewProtoBag(&req.Attributes, s.globalDict, s.globalWordList)
checkBag := attribute.GetMutableBag(protoBag)

mutableBag := attribute.GetMutableBag(requestBag)
var out rpc.Status
if err := s.dispatcher.Preprocess(legacyCtx, compatReqBag, preprocResponseBag); err != nil {
if err := s.dispatcher.Preprocess(legacyCtx, protoBag, checkBag); err != nil {
out = status.WithError(err)
}
if err := mutableBag.Merge(preprocResponseBag); err != nil {
out = status.WithError(fmt.Errorf("could not merge preprocess attributes into request attributes: %v", err))
}

compatRespBag := &compatBag{mutableBag}

if !status.IsOK(out) {
log.Errora("Preprocess Check returned with: ", status.String(out))
requestBag.Done()
preprocResponseBag.Done()
protoBag.Done()
checkBag.Done()
return nil, makeGRPCError(out)
}

if log.DebugEnabled() {
log.Debuga("Preprocess Check returned with: ", status.String(out))
log.Debug("Dispatching to main adapters after running processors")
log.Debuga("Attribute Bag: \n", mutableBag)
log.Debuga("Attribute Bag: \n", checkBag)
log.Debug("Dispatching Check")
}

cr, err := s.dispatcher.Check(legacyCtx, compatRespBag)
snap := protoBag.SnapshotReferencedAttributes()

cr, err := s.dispatcher.Check(legacyCtx, checkBag)
if err != nil {
out = status.WithError(err)
}
Expand All @@ -174,7 +132,7 @@ func (s *grpcServer) Check(legacyCtx legacyContext.Context, req *mixerpb.CheckRe
ValidDuration: cr.ValidDuration,
ValidUseCount: cr.ValidUseCount,
Status: out,
ReferencedAttributes: requestBag.GetReferencedAttributes(s.globalDict, globalWordCount),
ReferencedAttributes: protoBag.GetReferencedAttributes(s.globalDict, globalWordCount),
},
}

Expand All @@ -183,7 +141,6 @@ func (s *grpcServer) Check(legacyCtx legacyContext.Context, req *mixerpb.CheckRe
} else {
log.Errora("Check returned with error: ", status.String(out))
}
requestBag.ClearReferencedAttributes()

if status.IsOK(resp.Precondition.Status) && len(req.Quotas) > 0 {
resp.Quotas = make(map[string]mixerpb.CheckResponse_QuotaResult, len(req.Quotas))
Expand All @@ -202,11 +159,13 @@ func (s *grpcServer) Check(legacyCtx legacyContext.Context, req *mixerpb.CheckRe
}
var err error

qr, err = quota(legacyCtx, s.dispatcher, compatRespBag, qma)
// restore to the post-APA state
protoBag.ResetReferencedAttributes(snap)

qr, err = quota(legacyCtx, s.dispatcher, checkBag, qma)
// if quota check fails, set status for the entire request and stop processing.
if err != nil {
resp.Precondition.Status = status.WithError(err)
requestBag.ClearReferencedAttributes()
break
}

Expand All @@ -219,14 +178,13 @@ func (s *grpcServer) Check(legacyCtx legacyContext.Context, req *mixerpb.CheckRe
}
}

qr.ReferencedAttributes = requestBag.GetReferencedAttributes(s.globalDict, globalWordCount)
qr.ReferencedAttributes = protoBag.GetReferencedAttributes(s.globalDict, globalWordCount)
resp.Quotas[name] = *qr
requestBag.ClearReferencedAttributes()
}
}

requestBag.Done()
preprocResponseBag.Done()
checkBag.Done()
protoBag.Done()

return resp, nil
}
Expand Down Expand Up @@ -273,18 +231,17 @@ func (s *grpcServer) Report(legacyCtx legacyContext.Context, req *mixerpb.Report
}

protoBag := attribute.NewProtoBag(&req.Attributes[0], s.globalDict, s.globalWordList)
requestBag := attribute.GetMutableBag(protoBag)
compatReqBag := &compatBag{requestBag}
mutableBag := attribute.GetMutableBag(requestBag)
preprocResponseBag := attribute.GetMutableBag(nil)
accumBag := attribute.GetMutableBag(protoBag)
reportBag := attribute.GetMutableBag(accumBag)

var err error
for i := 0; i < len(req.Attributes); i++ {
span, newctx := opentracing.StartSpanFromContext(legacyCtx, fmt.Sprintf("Attributes %d", i))

// the first attribute block is handled by the protoBag as a foundation,
// deltas are applied to the child bag (i.e. requestBag)
if i > 0 {
err = requestBag.UpdateBagFromProto(&req.Attributes[i], s.globalWordList)
err = accumBag.UpdateBagFromProto(&req.Attributes[i], s.globalWordList)
if err != nil {
msg := "Request could not be processed due to invalid attributes."
log.Errora(msg, "\n", err)
Expand All @@ -296,14 +253,10 @@ func (s *grpcServer) Report(legacyCtx legacyContext.Context, req *mixerpb.Report

log.Debug("Dispatching Preprocess")
var out rpc.Status
if err = s.dispatcher.Preprocess(newctx, compatReqBag, preprocResponseBag); err != nil {
if err = s.dispatcher.Preprocess(newctx, accumBag, reportBag); err != nil {
out = status.WithError(err)
}
if err = mutableBag.Merge(preprocResponseBag); err != nil {
out = status.WithError(fmt.Errorf("could not merge preprocess attributes into request attributes: %v", err))
}

compatRespBag := &compatBag{mutableBag}
if !status.IsOK(out) {
log.Errora("Preprocess returned with: ", status.String(out))
err = makeGRPCError(out)
Expand All @@ -314,11 +267,12 @@ func (s *grpcServer) Report(legacyCtx legacyContext.Context, req *mixerpb.Report

if log.DebugEnabled() {
log.Debuga("Preprocess returned with: ", status.String(out))
log.Debug("Dispatching to main adapters after running processors")
log.Debuga("Attribute Bag: \n", mutableBag)
log.Debug("Dispatching to main adapters after running preprocessors")
log.Debuga("Attribute Bag: \n", reportBag)
log.Debugf("Dispatching Report %d out of %d", i, len(req.Attributes))
}
err = s.dispatcher.Report(legacyCtx, compatRespBag)

err = s.dispatcher.Report(legacyCtx, reportBag)
if err != nil {
out = status.WithError(err)
log.Warnf("Report returned %v", err)
Expand All @@ -338,11 +292,11 @@ func (s *grpcServer) Report(legacyCtx legacyContext.Context, req *mixerpb.Report

span.LogFields(otlog.String("success", "finished Report for attribute bag "+string(i)))
span.Finish()
preprocResponseBag.Reset()
reportBag.Reset()
}

preprocResponseBag.Done()
requestBag.Done()
reportBag.Done()
accumBag.Done()
protoBag.Done()

if err != nil {
Expand Down
Loading

0 comments on commit 5e56399

Please sign in to comment.