Skip to content

Commit

Permalink
stream refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
huseyinbabal committed Feb 23, 2023
1 parent 986b781 commit 8ac2cd9
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 51 deletions.
12 changes: 6 additions & 6 deletions internal/source/kubernetes/registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ type registration struct {
mappedEvent config.EventType
}

func (r registration) handleEvent(ctx context.Context, resource string, eventType config.EventType, routes []route, fn eventHandler) {
func (r registration) handleEvent(s Source, resource string, eventType config.EventType, routes []route, fn eventHandler) {
handleFunc := func(oldObj, newObj interface{}) {
logger := r.log.WithFields(logrus.Fields{
"eventHandler": eventType,
"resource": resource,
"object": newObj,
})

event, err := r.eventForObj(ctx, newObj, eventType, resource)
event, err := r.eventForObj(s.ctx, newObj, eventType, resource)
if err != nil {
logger.Errorf("while creating new event: %s", err.Error())
return
Expand All @@ -52,7 +52,7 @@ func (r registration) handleEvent(ctx context.Context, resource string, eventTyp
if !ok {
return
}
fn(ctx, event, diffs)
fn(s, event, diffs)
}

var resourceEventHandlerFuncs cache.ResourceEventHandlerFuncs
Expand All @@ -68,7 +68,7 @@ func (r registration) handleEvent(ctx context.Context, resource string, eventTyp
r.informer.AddEventHandler(resourceEventHandlerFuncs)
}

func (r registration) handleMapped(ctx context.Context, eventType config.EventType, routeTable map[string][]entry, fn eventHandler) {
func (r registration) handleMapped(s Source, eventType config.EventType, routeTable map[string][]entry, fn eventHandler) {
r.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
var eventObj coreV1.Event
Expand Down Expand Up @@ -99,7 +99,7 @@ func (r registration) handleMapped(ctx context.Context, eventType config.EventTy
return
}

event, err := r.eventForObj(ctx, obj, eventType, gvrString)
event, err := r.eventForObj(s.ctx, obj, eventType, gvrString)
if err != nil {
r.log.Errorf("while creating new event: %s", err.Error())
return
Expand All @@ -114,7 +114,7 @@ func (r registration) handleMapped(ctx context.Context, eventType config.EventTy
if !ok {
return
}
fn(ctx, event, nil)
fn(s, event, nil)
},
})
}
Expand Down
12 changes: 5 additions & 7 deletions internal/source/kubernetes/router.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package kubernetes

import (
"context"

"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/dynamic"
Expand All @@ -17,7 +15,7 @@ const eventsResource = "v1/events"

type mergedEvents map[string]map[config.EventType]struct{}
type registrationHandler func(resource string) (cache.SharedIndexInformer, error)
type eventHandler func(ctx context.Context, event event.Event, updateDiffs []string)
type eventHandler func(source Source, event event.Event, updateDiffs []string)

type route struct {
resourceName config.RegexConstraints
Expand Down Expand Up @@ -123,21 +121,21 @@ func (r *Router) MapWithEventsInformer(srcEvent config.EventType, dstEvent confi

// RegisterEventHandler allows router clients to create handlers that are
// triggered for a target event.
func (r *Router) RegisterEventHandler(ctx context.Context, eventType config.EventType, handlerFn eventHandler) {
func (r *Router) RegisterEventHandler(s Source, eventType config.EventType, handlerFn func(s Source, e event.Event, updateDiffs []string)) {
for resource, reg := range r.registrations {
if !reg.canHandleEvent(eventType.String()) {
continue
}
sourceRoutes := r.getSourceRoutes(resource, eventType)
reg.handleEvent(ctx, resource, eventType, sourceRoutes, handlerFn)
reg.handleEvent(s, resource, eventType, sourceRoutes, handlerFn)
}
}

// HandleMappedEvent allows router clients to create handlers that are
// triggered for a target mapped event.
func (r *Router) HandleMappedEvent(ctx context.Context, targetEvent config.EventType, handlerFn eventHandler) {
func (r *Router) HandleMappedEvent(s Source, targetEvent config.EventType, handlerFn eventHandler) {
if informer, ok := r.mappedInformer(targetEvent); ok {
informer.handleMapped(ctx, targetEvent, r.table, handlerFn)
informer.handleMapped(s, targetEvent, r.table, handlerFn)
}
}

Expand Down
79 changes: 41 additions & 38 deletions internal/source/kubernetes/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type RecommendationFactory interface {

// Source Kubernetes source plugin data structure
type Source struct {
ctx context.Context
pluginVersion string
config config2.Config
logger logrus.FieldLogger
Expand All @@ -69,7 +70,8 @@ func NewSource(version string) *Source {
}

// Stream streams Kubernetes events
func (s *Source) Stream(ctx context.Context, input source.StreamInput) (source.StreamOutput, error) {
func (*Source) Stream(ctx context.Context, input source.StreamInput) (source.StreamOutput, error) {
s := Source{}
s.messageCh = make(chan source.Message)
out := source.StreamOutput{Message: s.messageCh}
cfg, err := config2.MergeConfigs(input.Configs)
Expand All @@ -80,7 +82,8 @@ func (s *Source) Stream(ctx context.Context, input source.StreamInput) (source.S
s.logger = loggerx.New(loggerx.Config{
Level: cfg.Log.Level,
})
go s.consumeEvents(ctx)
s.ctx = ctx
go consumeEvents(s)
return out, nil
}

Expand All @@ -93,7 +96,7 @@ func (s *Source) Metadata(_ context.Context) (api.MetadataOutput, error) {
}, nil
}

func (s *Source) consumeEvents(ctx context.Context) {
func consumeEvents(s Source) {
client, err := NewClient(s.config.KubeConfig)
exitOnError(err, s.logger)

Expand All @@ -110,7 +113,7 @@ func (s *Source) consumeEvents(ctx context.Context) {
config.UpdateEvent,
config.DeleteEvent,
}, func(resource string) (cache.SharedIndexInformer, error) {
gvr, err := s.parseResourceArg(resource, client.mapper)
gvr, err := parseResourceArg(resource, client.mapper)
if err != nil {
s.logger.Infof("Unable to parse resource: %s to register with informer\n", resource)
return nil, err
Expand All @@ -132,7 +135,7 @@ func (s *Source) consumeEvents(ctx context.Context) {
config.ErrorEvent,
config.WarningEvent,
func(resource string) (cache.SharedIndexInformer, error) {
gvr, err := s.parseResourceArg(resource, client.mapper)
gvr, err := parseResourceArg(resource, client.mapper)
if err != nil {
s.logger.Infof("Unable to parse resource: %s to register with informer\n", resource)
return nil, err
Expand All @@ -154,25 +157,25 @@ func (s *Source) consumeEvents(ctx context.Context) {
}
for _, eventType := range eventTypes {
router.RegisterEventHandler(
ctx,
s,
eventType,
s.handleEvent,
handleEvent,
)
}

router.HandleMappedEvent(
ctx,
s,
config.ErrorEvent,
s.handleEvent,
handleEvent,
)

stopCh := ctx.Done()
stopCh := s.ctx.Done()
dynamicKubeInformerFactory.Start(stopCh)
}

func (s *Source) handleEvent(ctx context.Context, e event.Event, updateDiffs []string) {
func handleEvent(s Source, e event.Event, updateDiffs []string) {
s.logger.Debugf("Processing %s to %s/%v in %s namespace", e.Type, e.Resource, e.Name, e.Namespace)
s.enrichEventWithAdditionalMetadata(&e)
enrichEventWithAdditionalMetadata(s, &e)

// Skip older events
if !e.TimeStamp.IsZero() && e.TimeStamp.Before(s.startTime) {
Expand All @@ -195,7 +198,7 @@ func (s *Source) handleEvent(ctx context.Context, e event.Event, updateDiffs []s
}

// Filter events
e = s.filterEngine.Run(ctx, e)
e = s.filterEngine.Run(s.ctx, e)
if e.Skip {
s.logger.Debugf("Skipping e: %#v", e)
return
Expand All @@ -207,7 +210,7 @@ func (s *Source) handleEvent(ctx context.Context, e event.Event, updateDiffs []s
}

recRunner, recCfg := s.recommFactory.New(s.config)
err := recRunner.Do(ctx, &e)
err := recRunner.Do(s.ctx, &e)
if err != nil {
s.logger.Errorf("while running recommendations: %w", err)
}
Expand All @@ -218,19 +221,19 @@ func (s *Source) handleEvent(ctx context.Context, e event.Event, updateDiffs []s
}

message := source.Message{
Data: s.messageFrom(e),
Data: messageFrom(s, e),
Metadata: e,
Telemetry: event.AnonymizedEventDetailsFrom(e),
}
s.messageCh <- message
}

func (s *Source) enrichEventWithAdditionalMetadata(event *event.Event) {
func enrichEventWithAdditionalMetadata(s Source, event *event.Event) {
event.Cluster = s.config.ClusterName
}

func (s *Source) parseResourceArg(arg string, mapper meta.RESTMapper) (schema.GroupVersionResource, error) {
gvr, err := s.strToGVR(arg)
func parseResourceArg(arg string, mapper meta.RESTMapper) (schema.GroupVersionResource, error) {
gvr, err := strToGVR(arg)
if err != nil {
return schema.GroupVersionResource{}, fmt.Errorf("while converting string to GroupVersionReference: %w", err)
}
Expand All @@ -242,7 +245,7 @@ func (s *Source) parseResourceArg(arg string, mapper meta.RESTMapper) (schema.Gr
return gvr, nil
}

func (s *Source) strToGVR(arg string) (schema.GroupVersionResource, error) {
func strToGVR(arg string) (schema.GroupVersionResource, error) {
const separator = "/"
gvrStrParts := strings.Split(arg, separator)
switch len(gvrStrParts) {
Expand All @@ -255,27 +258,27 @@ func (s *Source) strToGVR(arg string) (schema.GroupVersionResource, error) {
}
}

func (s *Source) messageFrom(event event.Event, additionalSections ...api.Section) api.Message {
func messageFrom(s Source, event event.Event, additionalSections ...api.Section) api.Message {
var sections []api.Section
section := s.baseNotificationSection(event)
section := baseNotificationSection(event)
section.TextFields = api.TextFields{
{Text: fmt.Sprintf("*Kind:* %s", event.Kind)},
{Text: fmt.Sprintf("*Name:* %s", event.Name)},
}
section.TextFields = s.appendTextFieldIfNotEmpty(section.TextFields, "Namespace", event.Namespace)
section.TextFields = s.appendTextFieldIfNotEmpty(section.TextFields, "Reason", event.Reason)
section.TextFields = s.appendTextFieldIfNotEmpty(section.TextFields, "Action", event.Action)
section.TextFields = s.appendTextFieldIfNotEmpty(section.TextFields, "Cluster", event.Cluster)
section.TextFields = appendTextFieldIfNotEmpty(section.TextFields, "Namespace", event.Namespace)
section.TextFields = appendTextFieldIfNotEmpty(section.TextFields, "Reason", event.Reason)
section.TextFields = appendTextFieldIfNotEmpty(section.TextFields, "Action", event.Action)
section.TextFields = appendTextFieldIfNotEmpty(section.TextFields, "Cluster", event.Cluster)

// Messages, Recommendations and Warnings formatted as bullet point lists.
section.Body.Plaintext = s.bulletPointEventAttachments(event)
section.Body.Plaintext = bulletPointEventAttachments(event)

sections = append(sections, section)
if len(additionalSections) > 0 {
sections = append(sections, additionalSections...)
}

cmdSection := s.getInteractiveEventSectionIfShould(event)
cmdSection := getInteractiveEventSectionIfShould(s, event)

if cmdSection != nil {
sections = append(sections, *cmdSection)
Expand All @@ -285,7 +288,7 @@ func (s *Source) messageFrom(event event.Event, additionalSections ...api.Sectio
}
}

func (s *Source) getInteractiveEventSectionIfShould(event event.Event) *api.Section {
func getInteractiveEventSectionIfShould(s Source, event event.Event) *api.Section {
commands, err := s.commander.GetCommandsForEvent(event)
if err != nil {
s.logger.Errorf("while getting commands for event: %w", err)
Expand All @@ -308,7 +311,7 @@ func (s *Source) getInteractiveEventSectionIfShould(event event.Event) *api.Sect
return &section
}

func (s *Source) baseNotificationSection(event event.Event) api.Section {
func baseNotificationSection(event event.Event) api.Section {
emoji := emojiForLevel[event.Level]
section := api.Section{
Base: api.Base{
Expand All @@ -327,7 +330,7 @@ func (s *Source) baseNotificationSection(event event.Event) api.Section {
return section
}

func (s *Source) appendTextFieldIfNotEmpty(fields []api.TextField, title, in string) []api.TextField {
func appendTextFieldIfNotEmpty(fields []api.TextField, title, in string) []api.TextField {
if in == "" {
return fields
}
Expand All @@ -336,15 +339,15 @@ func (s *Source) appendTextFieldIfNotEmpty(fields []api.TextField, title, in str
})
}

func (s *Source) bulletPointEventAttachments(event event.Event) string {
func bulletPointEventAttachments(event event.Event) string {
strBuilder := strings.Builder{}
s.writeStringIfNotEmpty(&strBuilder, "Messages", s.bulletPointListFromMessages(event.Messages))
s.writeStringIfNotEmpty(&strBuilder, "Recommendations", s.bulletPointListFromMessages(event.Recommendations))
s.writeStringIfNotEmpty(&strBuilder, "Warnings", s.bulletPointListFromMessages(event.Warnings))
writeStringIfNotEmpty(&strBuilder, "Messages", bulletPointListFromMessages(event.Messages))
writeStringIfNotEmpty(&strBuilder, "Recommendations", bulletPointListFromMessages(event.Recommendations))
writeStringIfNotEmpty(&strBuilder, "Warnings", bulletPointListFromMessages(event.Warnings))
return strBuilder.String()
}

func (s *Source) writeStringIfNotEmpty(strBuilder *strings.Builder, title, in string) {
func writeStringIfNotEmpty(strBuilder *strings.Builder, title, in string) {
if in == "" {
return
}
Expand All @@ -354,11 +357,11 @@ func (s *Source) writeStringIfNotEmpty(strBuilder *strings.Builder, title, in st

// bulletPointListFromMessages creates a Markdown bullet-point list from messages.
// See https://api.slack.com/reference/surfaces/formatting#block-formatting
func (s *Source) bulletPointListFromMessages(msgs []string) string {
return s.joinMessages(msgs, "• ")
func bulletPointListFromMessages(msgs []string) string {
return joinMessages(msgs, "• ")
}

func (s *Source) joinMessages(msgs []string, msgPrefix string) string {
func joinMessages(msgs []string, msgPrefix string) string {
if len(msgs) == 0 {
return ""
}
Expand Down

0 comments on commit 8ac2cd9

Please sign in to comment.