Skip to content

Commit

Permalink
Merge pull request #166 from rockwotj/fatal
Browse files Browse the repository at this point in the history
pure: add crash processor
  • Loading branch information
rockwotj authored Jan 21, 2025
2 parents e649c40 + 2446c20 commit fb074b5
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 0 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.

## 4.44.0 - TBD

### Added

- A `crash` processor for FATAL logging. (@rockwotj)

## 4.43.0 - 2025-01-13

### Added
Expand Down
51 changes: 51 additions & 0 deletions internal/impl/pure/processor_crash.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2025 Redpanda Data, Inc.

package pure

import (
"context"

"github.com/redpanda-data/benthos/v4/internal/component/interop"
"github.com/redpanda-data/benthos/v4/internal/log"
"github.com/redpanda-data/benthos/v4/public/service"
)

func init() {
spec := service.NewConfigSpec().
Categories("Utility").
Beta().
Summary(`Crashes the process using a fatal log message. The log message can be set using function interpolations described in xref:configuration:interpolation.adoc#bloblang-queries[Bloblang queries] which allows you to log the contents and metadata of messages.`).
Field(service.NewInterpolatedStringField(""))
err := service.RegisterProcessor(
"crash", spec,
func(conf *service.ParsedConfig, res *service.Resources) (service.Processor, error) {
messageStr, err := conf.FieldInterpolatedString("")
if err != nil {
return nil, err
}
mgr := interop.UnwrapManagement(res)
return &crashProcessor{mgr.Logger(), messageStr}, nil
})
if err != nil {
panic(err)
}
}

type crashProcessor struct {
logger log.Modular
message *service.InterpolatedString
}

func (l *crashProcessor) Process(ctx context.Context, msg *service.Message) (service.MessageBatch, error) {
m, err := l.message.TryString(msg)
if err != nil {
l.logger.Fatal("failed to interpolate crash message: %v", err)
} else {
l.logger.Fatal("%s", m)
}
return nil, nil
}

func (l *crashProcessor) Close(ctx context.Context) error {
return nil
}

0 comments on commit fb074b5

Please sign in to comment.