Skip to content

Commit

Permalink
events: Add filter watch function (#25147)
Browse files Browse the repository at this point in the history
This allows other parts of the code to be able to watch for changes
to the event filter map, i.e., so that they can know when the
global or local filters are changed.

This is going to be used in an upcoming enterprise PR for event
replication to performance secondaries.
  • Loading branch information
Christopher Swenson authored Feb 6, 2024
1 parent 89c75d3 commit b736b85
Show file tree
Hide file tree
Showing 4 changed files with 827 additions and 57 deletions.
101 changes: 90 additions & 11 deletions vault/eventbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (bus *pluginEventBus) SendEvent(ctx context.Context, eventType logical.Even
return bus.bus.SendEventInternal(ctx, bus.namespace, bus.pluginInfo, eventType, data)
}

func NewEventBus(localNodeID string, logger hclog.Logger) (*EventBus, error) {
func NewEventBus(localClusterID string, logger hclog.Logger) (*EventBus, error) {
broker, err := eventlogger.NewBroker()
if err != nil {
return nil, err
Expand All @@ -180,7 +180,7 @@ func NewEventBus(localNodeID string, logger hclog.Logger) (*EventBus, error) {
logger = hclog.Default().Named("events")
}

sourceUrl, err := url.Parse("vault://" + localNodeID)
sourceUrl, err := url.Parse("vault://" + localClusterID)
if err != nil {
return nil, err
}
Expand All @@ -198,7 +198,7 @@ func NewEventBus(localNodeID string, logger hclog.Logger) (*EventBus, error) {
formatterNodeID: formatterNodeID,
timeout: defaultTimeout,
cloudEventsFormatterFilter: cloudEventsFormatterFilter,
filters: NewFilters(localNodeID),
filters: NewFilters(localClusterID),
}, nil
}

Expand All @@ -211,6 +211,13 @@ func (bus *EventBus) Subscribe(ctx context.Context, ns *namespace.Namespace, pat
// SubscribeMultipleNamespaces subscribes to events in the given namespace matching the event type
// pattern and after applying the optional go-bexpr filter.
func (bus *EventBus) SubscribeMultipleNamespaces(ctx context.Context, namespacePathPatterns []string, pattern string, bexprFilter string) (<-chan *eventlogger.Event, context.CancelFunc, error) {
return bus.subscribeInternal(ctx, namespacePathPatterns, pattern, bexprFilter, nil)
}

// subscribeInternal creates the pipeline and connects it to the event bus to receive events.
// if the cluster is specified, then the namespacePathPatterns, pattern, and bexprFilter are ignored, and instead this
// subscription will be tied to the given cluster's filter.
func (bus *EventBus) subscribeInternal(ctx context.Context, namespacePathPatterns []string, pattern string, bexprFilter string, cluster *string) (<-chan *eventlogger.Event, context.CancelFunc, error) {
// subscriptions are still stored even if the bus has not been started
pipelineID, err := uuid.GenerateUUID()
if err != nil {
Expand All @@ -227,9 +234,15 @@ func (bus *EventBus) SubscribeMultipleNamespaces(ctx context.Context, namespaceP
return nil, nil, err
}

filterNode, err := newFilterNode(namespacePathPatterns, pattern, bexprFilter)
if err != nil {
return nil, nil, err
var filterNode *eventlogger.Filter
if cluster != nil {
filterNode, err = newClusterFilterNode(bus.filters, clusterID(*cluster))
} else {
filterNode, err = newFilterNode(namespacePathPatterns, pattern, bexprFilter)
if err != nil {
return nil, nil, err
}
bus.filters.addPattern(bus.filters.self, namespacePathPatterns, pattern)
}
err = bus.broker.RegisterNode(eventlogger.NodeID(filterNodeID), filterNode)
if err != nil {
Expand All @@ -242,11 +255,10 @@ func (bus *EventBus) SubscribeMultipleNamespaces(ctx context.Context, namespaceP
}

ctx, cancel := context.WithCancel(ctx)

bus.filters.addPattern(bus.filters.self, namespacePathPatterns, pattern)

asyncNode := newAsyncNode(ctx, bus.logger, bus.broker, func() {
bus.filters.removePattern(bus.filters.self, namespacePathPatterns, pattern)
if cluster == nil {
bus.filters.removePattern(bus.filters.self, namespacePathPatterns, pattern)
}
})
err = bus.broker.RegisterNode(eventlogger.NodeID(sinkNodeID), asyncNode)
if err != nil {
Expand Down Expand Up @@ -281,6 +293,73 @@ func (bus *EventBus) SetSendTimeout(timeout time.Duration) {
bus.timeout = timeout
}

// GlobalMatch returns true if the given namespace and event type match the current global filter.
func (bus *EventBus) GlobalMatch(ns *namespace.Namespace, eventType logical.EventType) bool {
return bus.filters.globalMatch(ns, eventType)
}

// ApplyClusterFilterChanges applies the given filter changes to the cluster's filters.
func (bus *EventBus) ApplyClusterFilterChanges(c string, changes []FilterChange) {
bus.filters.applyChanges(clusterID(c), changes)
}

// ApplyGlobalFilterChanges applies the given filter changes to the global filters.
func (bus *EventBus) ApplyGlobalFilterChanges(changes []FilterChange) {
bus.filters.applyChanges(globalCluster, changes)
}

// ClearGlobalFilter removes all entries from the current global filter.
func (bus *EventBus) ClearGlobalFilter() {
bus.filters.clearGlobalPatterns()
}

// ClearClusterFilter removes all entries from the given cluster's filter.
func (bus *EventBus) ClearClusterFilter(id string) {
bus.filters.clearClusterPatterns(clusterID(id))
}

// NotifyOnGlobalFilterChanges returns a channel that receives changes to the global filter.
func (bus *EventBus) NotifyOnGlobalFilterChanges(ctx context.Context) (<-chan []FilterChange, context.CancelFunc, error) {
return bus.filters.watch(ctx, globalCluster)
}

// NotifyOnLocalFilterChanges returns a channel that receives changes to the filter for the current cluster.
func (bus *EventBus) NotifyOnLocalFilterChanges(ctx context.Context) (<-chan []FilterChange, context.CancelFunc, error) {
return bus.NotifyOnClusterFilterChanges(ctx, string(bus.filters.self))
}

// NotifyOnClusterFilterChanges returns a channel that receives changes to the filter for the given cluster.
func (bus *EventBus) NotifyOnClusterFilterChanges(ctx context.Context, cluster string) (<-chan []FilterChange, context.CancelFunc, error) {
return bus.filters.watch(ctx, clusterID(cluster))
}

// NewGlobalSubscription creates a new subscription to all events that match the global filter.
func (bus *EventBus) NewGlobalSubscription(ctx context.Context) (<-chan *eventlogger.Event, context.CancelFunc, error) {
g := globalCluster
return bus.subscribeInternal(ctx, nil, "", "", &g)
}

// NewClusterSubscription creates a new subscription to all events that match the given cluster's filter.
func (bus *EventBus) NewClusterSubscription(ctx context.Context, cluster string) (<-chan *eventlogger.Event, context.CancelFunc, error) {
return bus.subscribeInternal(ctx, nil, "", "", &cluster)
}

// creates a new filter node that is tied to the filter for a given cluster
func newClusterFilterNode(filters *Filters, c clusterID) (*eventlogger.Filter, error) {
return &eventlogger.Filter{
Predicate: func(e *eventlogger.Event) (bool, error) {
eventRecv := e.Payload.(*logical.EventReceived)
eventNs := strings.Trim(eventRecv.Namespace, "/")
if filters.clusterMatch(c, &namespace.Namespace{
Path: eventNs,
}, logical.EventType(eventRecv.EventType)) {
return true, nil
}
return false, nil
},
}, nil
}

func newFilterNode(namespacePatterns []string, pattern string, bexprFilter string) (*eventlogger.Filter, error) {
var evaluator *bexpr.Evaluator
if bexprFilter != "" {
Expand Down Expand Up @@ -308,7 +387,7 @@ func newFilterNode(namespacePatterns []string, pattern string, bexprFilter strin
}
}

// NodeFilter for correct event type, including wildcards.
// ClusterFilter for correct event type, including wildcards.
if !glob.Glob(pattern, eventRecv.EventType) {
return false, nil
}
Expand Down
Loading

0 comments on commit b736b85

Please sign in to comment.