diff --git a/ygnmi/types.go b/ygnmi/types.go index d25b146..72031d1 100644 --- a/ygnmi/types.go +++ b/ygnmi/types.go @@ -58,6 +58,7 @@ func NewSingletonQuery[T any](goStructName string, state, shadowpath, leaf, scal goStructFn, subPaths, compressInfo, + nil, }, } } @@ -79,6 +80,7 @@ func NewConfigQuery[T any](goStructName string, state, shadowpath, leaf, scalar, goStructFn, subPaths, compressInfo, + nil, }, } } @@ -100,6 +102,7 @@ func NewWildcardQuery[T any](goStructName string, state, shadowpath, leaf, scala goStructFn, subPaths, compressInfo, + nil, }, } } @@ -176,6 +179,7 @@ type baseQuery[T any] struct { // compInfo stores compression information when the node points to a // path that's compressed out in the generated code. compInfo *CompressionInfo + reactors []*reactorPair[T] } // dirName returns the YANG schema name of the GoStruct containing this node. @@ -279,3 +283,7 @@ func (q *baseQuery[T]) isScalar() bool { func (q *baseQuery[T]) compressInfo() *CompressionInfo { return q.compInfo } + +func (q *baseQuery[T]) reactorFns() []*reactorPair[T] { + return q.reactors +} diff --git a/ygnmi/unmarshal.go b/ygnmi/unmarshal.go index b48d7f2..5961ad0 100644 --- a/ygnmi/unmarshal.go +++ b/ygnmi/unmarshal.go @@ -136,16 +136,16 @@ func (c *ComplianceErrors) String() string { // NOTE: The datapoints are applied in order as they are in the input slice, // *NOT* in order of their timestamps. As such, in order to correctly support // Collect calls, the input data must be sorted in order of timestamps. -func unmarshalAndExtract[T any](data []*DataPoint, q AnyQuery[T], goStruct ygot.ValidatedGoStruct, opts *opt) (*Value[T], error) { +func unmarshalAndExtract[T any](data []*DataPoint, q AnyQuery[T], goStruct ygot.ValidatedGoStruct, opts *opt) (*Value[T], []*DataPoint, error) { queryPath, err := resolvePath(q.PathStruct()) if err != nil { - return nil, err + return nil, nil, err } ret := &Value[T]{ Path: queryPath, } if len(data) == 0 { - return ret, nil + return ret, nil, nil } schema := q.schema() @@ -158,7 +158,7 @@ func unmarshalAndExtract[T any](data []*DataPoint, q AnyQuery[T], goStruct ygot. delete, err := unmarshalSchemaless(data, setVal) if err != nil { - return ret, err + return ret, nil, err } ret.Timestamp = data[0].Timestamp ret.RecvTimestamp = data[0].RecvTimestamp @@ -166,16 +166,16 @@ func unmarshalAndExtract[T any](data []*DataPoint, q AnyQuery[T], goStruct ygot. if !delete { ret.SetVal(val) } - return ret, nil + return ret, nil, nil } unmarshalledData, complianceErrs, err := unmarshal(data, schema.SchemaTree[q.dirName()], goStruct, queryPath, schema, q.isLeaf(), q.isShadowPath(), q.compressInfo(), opts) ret.ComplianceErrors = complianceErrs if err != nil { - return ret, err + return ret, unmarshalledData, err } if len(unmarshalledData) == 0 { - return ret, nil + return ret, unmarshalledData, nil } path := unmarshalledData[0].Path @@ -201,13 +201,13 @@ func unmarshalAndExtract[T any](data []*DataPoint, q AnyQuery[T], goStruct ygot. if q.isCompressedSchema() && !q.isLeaf() && !q.IsState() { err := ygot.PruneConfigFalse(q.schema().SchemaTree[q.dirName()], goStruct) if err != nil { - return ret, err + return ret, unmarshalledData, err } } if val, ok := q.extract(goStruct); ok { ret.SetVal(val) } - return ret, nil + return ret, unmarshalledData, nil } // unmarshalSchemaless unmarshals the datapoint into the value, returning whether the datapoint was a delete. diff --git a/ygnmi/ygnmi.go b/ygnmi/ygnmi.go index 8e5cc64..66e53ed 100644 --- a/ygnmi/ygnmi.go +++ b/ygnmi/ygnmi.go @@ -74,6 +74,7 @@ type AnyQuery[T any] interface { // extract is used for leaves to return the field from the parent GoStruct. // For non-leaves, this casts the GoStruct to the concrete type. extract(ygot.ValidatedGoStruct) (T, bool) + reactorFns() []*reactorPair[T] } // SingletonQuery is a non-wildcard gNMI query. @@ -275,7 +276,7 @@ func Lookup[T any](ctx context.Context, c *Client, q SingletonQuery[T], opts ... if err != nil { return nil, fmt.Errorf("failed to receive to data: %w", err) } - val, err := unmarshalAndExtract[T](data, q, q.goStruct(), resolvedOpts) + val, _, err := unmarshalAndExtract[T](data, q, q.goStruct(), resolvedOpts) if err != nil { return val, fmt.Errorf("failed to unmarshal data: %w", err) } @@ -360,7 +361,7 @@ func Watch[T any](ctx context.Context, c *Client, q SingletonQuery[T], pred func w.errCh <- ctx.Err() return case data := <-dataCh: - val, err := unmarshalAndExtract[T](data, q, gs, resolvedOpts) + val, unmarshaledData, err := unmarshalAndExtract[T](data, q, gs, resolvedOpts) if err != nil { w.errCh <- err return @@ -370,6 +371,29 @@ func Watch[T any](ctx context.Context, c *Client, q SingletonQuery[T], pred func w.errCh <- err return } + for _, r := range q.reactorFns() { + for _, d := range unmarshaledData { + rPath, err := resolvePath(r.ps) + if err != nil { + w.errCh <- err + return + } + if util.PathMatchesQuery(d.Path, rPath) { + v := &Value[T]{ + val: val.val, + present: val.present, + Path: d.Path, // Use the datapoint path, (leaf) + Timestamp: val.Timestamp, + RecvTimestamp: val.RecvTimestamp, + ComplianceErrors: val.ComplianceErrors, + } + if err := r.reactor(v); err != nil { + w.errCh <- err + return + } + } + } + } case err := <-errCh: w.errCh <- err return @@ -450,7 +474,7 @@ func LookupAll[T any](ctx context.Context, c *Client, q WildcardQuery[T], opts . var vals []*Value[T] for _, prefix := range sortedPrefixes { goStruct := q.goStruct() - v, err := unmarshalAndExtract[T](datapointGroups[prefix], q, goStruct, resolvedOpts) + v, _, err := unmarshalAndExtract[T](datapointGroups[prefix], q, goStruct, resolvedOpts) if err != nil { return nil, fmt.Errorf("failed to unmarshal data: %w", err) } @@ -533,7 +557,7 @@ func WatchAll[T any](ctx context.Context, c *Client, q WildcardQuery[T], pred fu if _, ok := structs[pre]; !ok { structs[pre] = q.goStruct() } - val, err := unmarshalAndExtract[T](datapointGroups[pre], q, structs[pre], resolvedOpts) + val, unmarshaledData, err := unmarshalAndExtract[T](datapointGroups[pre], q, structs[pre], resolvedOpts) if err != nil { w.errCh <- err return @@ -543,6 +567,29 @@ func WatchAll[T any](ctx context.Context, c *Client, q WildcardQuery[T], pred fu w.errCh <- err return } + for _, r := range q.reactorFns() { + for _, d := range unmarshaledData { + rPath, err := resolvePath(r.ps) + if err != nil { + w.errCh <- err + return + } + if util.PathMatchesQuery(d.Path, rPath) { + v := &Value[T]{ + val: val.val, + present: val.present, + Path: d.Path, // Use the datapoint path, (leaf) + Timestamp: val.Timestamp, + RecvTimestamp: val.RecvTimestamp, + ComplianceErrors: val.ComplianceErrors, + } + if err := r.reactor(v); err != nil { + w.errCh <- err + return + } + } + } + } } case err := <-errCh: w.errCh <- err @@ -721,12 +768,18 @@ func BatchDelete[T any](sb *SetBatch, q ConfigQuery[T]) { }) } +type reactorPair[T any] struct { + ps PathStruct + reactor func(*Value[T]) error +} + // Batch contains a collection of paths. // Calling State() or Config() on the batch returns a query // that can be used to Lookup, Watch, etc on multiple paths at once. type Batch[T any] struct { - root SingletonQuery[T] - paths []PathStruct + root SingletonQuery[T] + paths []PathStruct + reactors []*reactorPair[T] } // NewBatch creates a batch object. All paths in the batch must be children of the root query. @@ -758,34 +811,56 @@ func (b *Batch[T]) AddPaths(paths ...UntypedQuery) error { return nil } +func (b *Batch[T]) AddReactor(path UntypedQuery, reactor func(*Value[T]) error) error { + root, err := resolvePath(b.root.PathStruct()) + if err != nil { + return err + } + ps := path.PathStruct() + p, err := resolvePath(ps) + if err != nil { + return err + } + if !util.PathMatchesQuery(p, root) { + return fmt.Errorf("root path %v is not a prefix of %v", root, p) + } + b.paths = append(b.paths, ps) + b.reactors = append(b.reactors, &reactorPair[T]{ps: ps, reactor: reactor}) + return nil +} + // Query returns a Query that can be used in gNMI operations. // The returned query is immutable, adding paths does not modify existing queries. func (b *Batch[T]) Query() SingletonQuery[T] { queryPaths := make([]PathStruct, len(b.paths)) copy(queryPaths, b.paths) - return NewSingletonQuery[T]( - b.root.dirName(), - b.root.IsState(), - b.root.isShadowPath(), - b.root.isLeaf(), - b.root.isScalar(), - b.root.isCompressedSchema(), - b.root.isListContainer(), - b.root.PathStruct(), - b.root.extract, - b.root.goStruct, - b.root.schema, - queryPaths, - b.root.compressInfo(), - ) + return &SingletonQueryStruct[T]{ + baseQuery: baseQuery[T]{ + goStructName: b.root.dirName(), + state: b.root.IsState(), + shadowpath: b.root.isShadowPath(), + ps: b.root.PathStruct(), + leaf: b.root.isLeaf(), + scalar: b.root.isScalar(), + compressedSchema: b.root.isCompressedSchema(), + listContainer: b.root.isListContainer(), + extractFn: b.root.extract, + goStructFn: b.root.goStruct, + yschemaFn: b.root.schema, + queryPathStructs: queryPaths, + compInfo: b.root.compressInfo(), + reactors: b.reactors, + }, + } } // WildcardBatch contains a collection of paths. // Calling Query() on the batch returns a query // that can be used in LookupAll, WatchAll, etc on select paths within the root path. type WildcardBatch[T any] struct { - root WildcardQuery[T] - paths []PathStruct + root WildcardQuery[T] + paths []PathStruct + reactors []*reactorPair[T] } // NewWildcardBatch creates a batch object. All paths in the batch must be children of the root query. @@ -817,24 +892,45 @@ func (b *WildcardBatch[T]) AddPaths(paths ...UntypedQuery) error { return nil } +func (b *WildcardBatch[T]) AddReactor(path UntypedQuery, reactor func(*Value[T]) error) error { + root, err := resolvePath(b.root.PathStruct()) + if err != nil { + return err + } + ps := path.PathStruct() + p, err := resolvePath(ps) + if err != nil { + return err + } + if !util.PathMatchesQuery(p, root) { + return fmt.Errorf("root path %v is not a prefix of %v", root, p) + } + b.paths = append(b.paths, ps) + b.reactors = append(b.reactors, &reactorPair[T]{ps: ps, reactor: reactor}) + return nil +} + // Query returns a Query that can be used in gNMI operations. // The returned query is immutable, adding paths does not modify existing queries. func (b *WildcardBatch[T]) Query() WildcardQuery[T] { queryPaths := make([]PathStruct, len(b.paths)) copy(queryPaths, b.paths) - return NewWildcardQuery[T]( - b.root.dirName(), - b.root.IsState(), - b.root.isShadowPath(), - b.root.isLeaf(), - b.root.isScalar(), - b.root.isCompressedSchema(), - b.root.isListContainer(), - b.root.PathStruct(), - b.root.extract, - b.root.goStruct, - b.root.schema, - queryPaths, - b.root.compressInfo(), - ) + return &WildcardQueryStruct[T]{ + baseQuery: baseQuery[T]{ + goStructName: b.root.dirName(), + state: b.root.IsState(), + shadowpath: b.root.isShadowPath(), + ps: b.root.PathStruct(), + leaf: b.root.isLeaf(), + scalar: b.root.isScalar(), + compressedSchema: b.root.isCompressedSchema(), + listContainer: b.root.isListContainer(), + extractFn: b.root.extract, + goStructFn: b.root.goStruct, + yschemaFn: b.root.schema, + queryPathStructs: queryPaths, + compInfo: b.root.compressInfo(), + reactors: b.reactors, + }, + } }