Skip to content

Commit

Permalink
idea for leaf reactors
Browse files Browse the repository at this point in the history
  • Loading branch information
DanG100 committed Dec 6, 2023
1 parent e5dde15 commit b0c82a6
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 47 deletions.
8 changes: 8 additions & 0 deletions ygnmi/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func NewSingletonQuery[T any](goStructName string, state, shadowpath, leaf, scal
goStructFn,
subPaths,
compressInfo,
nil,
},
}
}
Expand All @@ -79,6 +80,7 @@ func NewConfigQuery[T any](goStructName string, state, shadowpath, leaf, scalar,
goStructFn,
subPaths,
compressInfo,
nil,
},
}
}
Expand All @@ -100,6 +102,7 @@ func NewWildcardQuery[T any](goStructName string, state, shadowpath, leaf, scala
goStructFn,
subPaths,
compressInfo,
nil,
},
}
}
Expand Down Expand Up @@ -176,6 +179,7 @@ type baseQuery[T any] struct {
// compInfo stores compression information when the node points to a
// path that's compressed out in the generated code.
compInfo *CompressionInfo
reactors []*reactorPair[T]
}

// dirName returns the YANG schema name of the GoStruct containing this node.
Expand Down Expand Up @@ -279,3 +283,7 @@ func (q *baseQuery[T]) isScalar() bool {
func (q *baseQuery[T]) compressInfo() *CompressionInfo {
return q.compInfo
}

func (q *baseQuery[T]) reactorFns() []*reactorPair[T] {
return q.reactors
}
18 changes: 9 additions & 9 deletions ygnmi/unmarshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,16 +136,16 @@ func (c *ComplianceErrors) String() string {
// NOTE: The datapoints are applied in order as they are in the input slice,
// *NOT* in order of their timestamps. As such, in order to correctly support
// Collect calls, the input data must be sorted in order of timestamps.
func unmarshalAndExtract[T any](data []*DataPoint, q AnyQuery[T], goStruct ygot.ValidatedGoStruct, opts *opt) (*Value[T], error) {
func unmarshalAndExtract[T any](data []*DataPoint, q AnyQuery[T], goStruct ygot.ValidatedGoStruct, opts *opt) (*Value[T], []*DataPoint, error) {
queryPath, err := resolvePath(q.PathStruct())
if err != nil {
return nil, err
return nil, nil, err
}
ret := &Value[T]{
Path: queryPath,
}
if len(data) == 0 {
return ret, nil
return ret, nil, nil
}
schema := q.schema()

Expand All @@ -158,24 +158,24 @@ func unmarshalAndExtract[T any](data []*DataPoint, q AnyQuery[T], goStruct ygot.

delete, err := unmarshalSchemaless(data, setVal)
if err != nil {
return ret, err
return ret, nil, err
}
ret.Timestamp = data[0].Timestamp
ret.RecvTimestamp = data[0].RecvTimestamp
ret.Path = proto.Clone(data[0].Path).(*gpb.Path)
if !delete {
ret.SetVal(val)
}
return ret, nil
return ret, nil, nil
}

unmarshalledData, complianceErrs, err := unmarshal(data, schema.SchemaTree[q.dirName()], goStruct, queryPath, schema, q.isLeaf(), q.isShadowPath(), q.compressInfo(), opts)
ret.ComplianceErrors = complianceErrs
if err != nil {
return ret, err
return ret, unmarshalledData, err
}
if len(unmarshalledData) == 0 {
return ret, nil
return ret, unmarshalledData, nil
}

path := unmarshalledData[0].Path
Expand All @@ -201,13 +201,13 @@ func unmarshalAndExtract[T any](data []*DataPoint, q AnyQuery[T], goStruct ygot.
if q.isCompressedSchema() && !q.isLeaf() && !q.IsState() {
err := ygot.PruneConfigFalse(q.schema().SchemaTree[q.dirName()], goStruct)
if err != nil {
return ret, err
return ret, unmarshalledData, err
}
}
if val, ok := q.extract(goStruct); ok {
ret.SetVal(val)
}
return ret, nil
return ret, unmarshalledData, nil
}

// unmarshalSchemaless unmarshals the datapoint into the value, returning whether the datapoint was a delete.
Expand Down
172 changes: 134 additions & 38 deletions ygnmi/ygnmi.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type AnyQuery[T any] interface {
// extract is used for leaves to return the field from the parent GoStruct.
// For non-leaves, this casts the GoStruct to the concrete type.
extract(ygot.ValidatedGoStruct) (T, bool)
reactorFns() []*reactorPair[T]
}

// SingletonQuery is a non-wildcard gNMI query.
Expand Down Expand Up @@ -275,7 +276,7 @@ func Lookup[T any](ctx context.Context, c *Client, q SingletonQuery[T], opts ...
if err != nil {
return nil, fmt.Errorf("failed to receive to data: %w", err)
}
val, err := unmarshalAndExtract[T](data, q, q.goStruct(), resolvedOpts)
val, _, err := unmarshalAndExtract[T](data, q, q.goStruct(), resolvedOpts)
if err != nil {
return val, fmt.Errorf("failed to unmarshal data: %w", err)
}
Expand Down Expand Up @@ -360,7 +361,7 @@ func Watch[T any](ctx context.Context, c *Client, q SingletonQuery[T], pred func
w.errCh <- ctx.Err()
return
case data := <-dataCh:
val, err := unmarshalAndExtract[T](data, q, gs, resolvedOpts)
val, unmarshaledData, err := unmarshalAndExtract[T](data, q, gs, resolvedOpts)
if err != nil {
w.errCh <- err
return
Expand All @@ -370,6 +371,29 @@ func Watch[T any](ctx context.Context, c *Client, q SingletonQuery[T], pred func
w.errCh <- err
return
}
for _, r := range q.reactorFns() {
for _, d := range unmarshaledData {
rPath, err := resolvePath(r.ps)
if err != nil {
w.errCh <- err
return
}
if util.PathMatchesQuery(d.Path, rPath) {
v := &Value[T]{
val: val.val,
present: val.present,
Path: d.Path, // Use the datapoint path, (leaf)
Timestamp: val.Timestamp,
RecvTimestamp: val.RecvTimestamp,
ComplianceErrors: val.ComplianceErrors,
}
if err := r.reactor(v); err != nil {
w.errCh <- err
return
}
}
}
}
case err := <-errCh:
w.errCh <- err
return
Expand Down Expand Up @@ -450,7 +474,7 @@ func LookupAll[T any](ctx context.Context, c *Client, q WildcardQuery[T], opts .
var vals []*Value[T]
for _, prefix := range sortedPrefixes {
goStruct := q.goStruct()
v, err := unmarshalAndExtract[T](datapointGroups[prefix], q, goStruct, resolvedOpts)
v, _, err := unmarshalAndExtract[T](datapointGroups[prefix], q, goStruct, resolvedOpts)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal data: %w", err)
}
Expand Down Expand Up @@ -533,7 +557,7 @@ func WatchAll[T any](ctx context.Context, c *Client, q WildcardQuery[T], pred fu
if _, ok := structs[pre]; !ok {
structs[pre] = q.goStruct()
}
val, err := unmarshalAndExtract[T](datapointGroups[pre], q, structs[pre], resolvedOpts)
val, unmarshaledData, err := unmarshalAndExtract[T](datapointGroups[pre], q, structs[pre], resolvedOpts)
if err != nil {
w.errCh <- err
return
Expand All @@ -543,6 +567,29 @@ func WatchAll[T any](ctx context.Context, c *Client, q WildcardQuery[T], pred fu
w.errCh <- err
return
}
for _, r := range q.reactorFns() {
for _, d := range unmarshaledData {
rPath, err := resolvePath(r.ps)
if err != nil {
w.errCh <- err
return
}
if util.PathMatchesQuery(d.Path, rPath) {
v := &Value[T]{
val: val.val,
present: val.present,
Path: d.Path, // Use the datapoint path, (leaf)
Timestamp: val.Timestamp,
RecvTimestamp: val.RecvTimestamp,
ComplianceErrors: val.ComplianceErrors,
}
if err := r.reactor(v); err != nil {
w.errCh <- err
return
}
}
}
}
}
case err := <-errCh:
w.errCh <- err
Expand Down Expand Up @@ -721,12 +768,18 @@ func BatchDelete[T any](sb *SetBatch, q ConfigQuery[T]) {
})
}

type reactorPair[T any] struct {
ps PathStruct
reactor func(*Value[T]) error
}

// Batch contains a collection of paths.
// Calling State() or Config() on the batch returns a query
// that can be used to Lookup, Watch, etc on multiple paths at once.
type Batch[T any] struct {
root SingletonQuery[T]
paths []PathStruct
root SingletonQuery[T]
paths []PathStruct
reactors []*reactorPair[T]
}

// NewBatch creates a batch object. All paths in the batch must be children of the root query.
Expand Down Expand Up @@ -758,34 +811,56 @@ func (b *Batch[T]) AddPaths(paths ...UntypedQuery) error {
return nil
}

func (b *Batch[T]) AddReactor(path UntypedQuery, reactor func(*Value[T]) error) error {

Check warning on line 814 in ygnmi/ygnmi.go

View workflow job for this annotation

GitHub Actions / lint

exported: exported method Batch.AddReactor should have comment or be unexported (revive)
root, err := resolvePath(b.root.PathStruct())
if err != nil {
return err
}
ps := path.PathStruct()
p, err := resolvePath(ps)
if err != nil {
return err
}
if !util.PathMatchesQuery(p, root) {
return fmt.Errorf("root path %v is not a prefix of %v", root, p)
}
b.paths = append(b.paths, ps)
b.reactors = append(b.reactors, &reactorPair[T]{ps: ps, reactor: reactor})
return nil
}

// Query returns a Query that can be used in gNMI operations.
// The returned query is immutable, adding paths does not modify existing queries.
func (b *Batch[T]) Query() SingletonQuery[T] {
queryPaths := make([]PathStruct, len(b.paths))
copy(queryPaths, b.paths)
return NewSingletonQuery[T](
b.root.dirName(),
b.root.IsState(),
b.root.isShadowPath(),
b.root.isLeaf(),
b.root.isScalar(),
b.root.isCompressedSchema(),
b.root.isListContainer(),
b.root.PathStruct(),
b.root.extract,
b.root.goStruct,
b.root.schema,
queryPaths,
b.root.compressInfo(),
)
return &SingletonQueryStruct[T]{
baseQuery: baseQuery[T]{
goStructName: b.root.dirName(),
state: b.root.IsState(),
shadowpath: b.root.isShadowPath(),
ps: b.root.PathStruct(),
leaf: b.root.isLeaf(),
scalar: b.root.isScalar(),
compressedSchema: b.root.isCompressedSchema(),
listContainer: b.root.isListContainer(),
extractFn: b.root.extract,
goStructFn: b.root.goStruct,
yschemaFn: b.root.schema,
queryPathStructs: queryPaths,
compInfo: b.root.compressInfo(),
reactors: b.reactors,
},
}
}

// WildcardBatch contains a collection of paths.
// Calling Query() on the batch returns a query
// that can be used in LookupAll, WatchAll, etc on select paths within the root path.
type WildcardBatch[T any] struct {
root WildcardQuery[T]
paths []PathStruct
root WildcardQuery[T]
paths []PathStruct
reactors []*reactorPair[T]
}

// NewWildcardBatch creates a batch object. All paths in the batch must be children of the root query.
Expand Down Expand Up @@ -817,24 +892,45 @@ func (b *WildcardBatch[T]) AddPaths(paths ...UntypedQuery) error {
return nil
}

func (b *WildcardBatch[T]) AddReactor(path UntypedQuery, reactor func(*Value[T]) error) error {

Check warning on line 895 in ygnmi/ygnmi.go

View workflow job for this annotation

GitHub Actions / lint

exported: exported method WildcardBatch.AddReactor should have comment or be unexported (revive)
root, err := resolvePath(b.root.PathStruct())
if err != nil {
return err
}
ps := path.PathStruct()
p, err := resolvePath(ps)
if err != nil {
return err
}
if !util.PathMatchesQuery(p, root) {
return fmt.Errorf("root path %v is not a prefix of %v", root, p)
}
b.paths = append(b.paths, ps)
b.reactors = append(b.reactors, &reactorPair[T]{ps: ps, reactor: reactor})
return nil
}

// Query returns a Query that can be used in gNMI operations.
// The returned query is immutable, adding paths does not modify existing queries.
func (b *WildcardBatch[T]) Query() WildcardQuery[T] {
queryPaths := make([]PathStruct, len(b.paths))
copy(queryPaths, b.paths)
return NewWildcardQuery[T](
b.root.dirName(),
b.root.IsState(),
b.root.isShadowPath(),
b.root.isLeaf(),
b.root.isScalar(),
b.root.isCompressedSchema(),
b.root.isListContainer(),
b.root.PathStruct(),
b.root.extract,
b.root.goStruct,
b.root.schema,
queryPaths,
b.root.compressInfo(),
)
return &WildcardQueryStruct[T]{
baseQuery: baseQuery[T]{
goStructName: b.root.dirName(),
state: b.root.IsState(),
shadowpath: b.root.isShadowPath(),
ps: b.root.PathStruct(),
leaf: b.root.isLeaf(),
scalar: b.root.isScalar(),
compressedSchema: b.root.isCompressedSchema(),
listContainer: b.root.isListContainer(),
extractFn: b.root.extract,
goStructFn: b.root.goStruct,
yschemaFn: b.root.schema,
queryPathStructs: queryPaths,
compInfo: b.root.compressInfo(),
reactors: b.reactors,
},
}
}

0 comments on commit b0c82a6

Please sign in to comment.