Skip to content

Commit

Permalink
draft up a way to translate otel loki receiver to flow
Browse files Browse the repository at this point in the history
Signed-off-by: erikbaranowski <[email protected]>
  • Loading branch information
erikbaranowski committed Mar 27, 2024
1 parent 5e037eb commit 0701953
Show file tree
Hide file tree
Showing 24 changed files with 201 additions and 3 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,7 @@ require (
github.com/natefinch/atomic v1.0.1
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter v0.96.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor v0.96.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/lokireceiver v0.96.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.96.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/vcenterreceiver v0.96.0
go.opentelemetry.io/collector/config/configretry v0.96.0
Expand Down Expand Up @@ -644,6 +645,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/shield v1.24.0 // indirect
github.com/aws/aws-sdk-go-v2/service/storagegateway v1.26.0 // indirect
github.com/axiomhq/hyperloglog v0.0.0-20240124082744-24bca3a5b39b // indirect
github.com/buger/jsonparser v1.1.1 // indirect
github.com/channelmeter/iso8601duration v0.0.0-20150204201828-8da3af7a2a61 // indirect
github.com/cloudflare/circl v1.3.7 // indirect
github.com/containerd/log v0.1.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,8 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dR
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
github.com/boynux/squid-exporter v1.10.5-0.20230618153315-c1fae094e18e h1:C1vYe728vM2FpXaICJuDRt5zgGyRdMmUGYnVfM7WcLY=
github.com/boynux/squid-exporter v1.10.5-0.20230618153315-c1fae094e18e/go.mod h1:8NpZERGK+R9DGuZqqsKfnf2qI/rh7yBT8End29IvgNA=
github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs=
github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
github.com/burningalchemist/sql_exporter v0.0.0-20240103092044-466b38b6abc4 h1:dgjwrjeVe90AeMhrx04TmDKjZe7xqKKEUxT3QKNx9RU=
github.com/burningalchemist/sql_exporter v0.0.0-20240103092044-466b38b6abc4/go.mod h1:aRr7CZ/KleZpcDkQVsNeXE1BFT3xRG8baUHJ7J+j8NI=
github.com/bwesterb/go-ristretto v1.2.3/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0=
Expand Down Expand Up @@ -1806,6 +1808,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceive
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceiver v0.96.0/go.mod h1:yk9+s0wSHn8WKzvBSa63puaPhCrjr+rmkfJ4/4NVyeQ=
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver v0.96.0 h1:V3DvS2g8qPp2Pr0i39iS37iByUlk7JvE6iEA6Ia1F58=
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver v0.96.0/go.mod h1:SpDMTfNxJhLoh90tzVbFVR6jBznomtSSfv1+mKR1s9I=
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/lokireceiver v0.96.0 h1:wbZBeLXUSDldDQb7AEYjOw0sLd+pB78P0ene1bGwQs0=
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/lokireceiver v0.96.0/go.mod h1:+ufo/ZG65nRRmmJpeKqVlD3r26JUtwItlCmd39fhr6A=
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/opencensusreceiver v0.96.0 h1:gK3nBuj0qhtt8HT4MuiW60KfNcnAA1hjdqnwdIbxHaU=
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/opencensusreceiver v0.96.0/go.mod h1:xc2JC4VmYfGsjaH834h0O+nCTHcddAGZkt5fJxQF7LE=
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.96.0 h1:SK1GpgAte9WhTSeY6NiO6vHB+BhFF7akPlK7fyMO+ps=
Expand Down
6 changes: 6 additions & 0 deletions internal/converter/internal/common/river_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,15 @@ func getValueOverrideHook() builder.ValueOverrideHook {
return ConvertTargets{
Targets: value,
}
case flow_relabel.Rules:
if len(value) == 0 {
return CustomTokenizer{Expr: "null"}
}
default:
return val
}

return val
}
}

Expand Down
143 changes: 143 additions & 0 deletions internal/converter/internal/otelcolconvert/converter_lokireceiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package otelcolconvert

import (
"fmt"

"github.com/grafana/agent/internal/component/common/loki"
"github.com/grafana/agent/internal/component/loki/source/api"
"github.com/grafana/agent/internal/component/otelcol"
otel_loki "github.com/grafana/agent/internal/component/otelcol/receiver/loki"
"github.com/grafana/agent/internal/converter/diag"
"github.com/grafana/agent/internal/converter/internal/common"
"github.com/grafana/agent/internal/converter/internal/promtailconvert/build"
"github.com/grafana/dskit/server"
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/lokireceiver"
"go.opentelemetry.io/collector/component"
)

func init() {
converters = append(converters, lokiReceiverConverter{})
}

type lokiReceiverConverter struct{}

func (lokiReceiverConverter) Factory() component.Factory { return lokireceiver.NewFactory() }

func (lokiReceiverConverter) InputComponentName() string { return "" }

func (lokiReceiverConverter) ConvertAndAppend(state *State, id component.InstanceID, cfg component.Config) diag.Diagnostics {
var diags diag.Diagnostics

label := state.FlowComponentLabel()

otelArgs := toOtelcolReceiverLoki(state, id)
otelBlock := common.NewBlockWithOverride([]string{"otelcol", "receiver", "loki"}, label, otelArgs)

logsReceivers := []loki.LogsReceiver{common.ConvertLogsReceiver{
Expr: StringifyBlock(otelBlock) + ".receiver",
}}
apiArgs := toLokiSourceApi(logsReceivers, cfg.(*lokireceiver.Config))
apiBlock := common.NewBlockWithOverride([]string{"loki", "source", "api"}, label, apiArgs)

diags.Add(
diag.SeverityLevelInfo,
fmt.Sprintf("Converted %s into %s", StringifyInstanceID(id), StringifyBlock(otelBlock)),
)

diags.Add(
diag.SeverityLevelInfo,
fmt.Sprintf("Converted %s into %s", StringifyInstanceID(id), StringifyBlock(apiBlock)),
)

// Do this at the end in reverse order so the pipeline looks a little better
state.Body().AppendBlock(apiBlock)
state.Body().AppendBlock(otelBlock)
return diags
}

func toOtelcolReceiverLoki(state *State, id component.InstanceID) *otel_loki.Arguments {
var (
nextLogs = state.Next(id, component.DataTypeLogs)
)

return &otel_loki.Arguments{
Output: &otelcol.ConsumerArguments{
Logs: ToTokenizedConsumers(nextLogs),
},
}
}

func toLokiSourceApi(logsReceivers []loki.LogsReceiver, cfg *lokireceiver.Config) *api.Arguments {
ptc := &scrapeconfig.PushTargetConfig{
Server: toServer(&cfg.Protocols),
Labels: nil,
KeepTimestamp: cfg.KeepTimestamp,
}

args := build.ToLokiApiArguments(ptc, logsReceivers)
return &args
}

func toServer(cfg *lokireceiver.Protocols) server.Config {
return server.Config{
HTTPListenAddress: "",
HTTPListenPort: 0,
HTTPConnLimit: 0,
HTTPServerReadTimeout: 0,
HTTPServerWriteTimeout: 0,
HTTPServerIdleTimeout: 0,
GRPCListenAddress: "",
GRPCListenPort: 0,
GRPCConnLimit: 0,
GRPCServerMaxConnectionAge: 0,
GRPCServerMaxConnectionAgeGrace: 0,
GRPCServerMaxRecvMsgSize: 0,
GRPCServerMaxSendMsgSize: 0,
GRPCServerMaxConcurrentStreams: uint(cfg.GRPC.MaxConcurrentStreams),
GRPCServerMaxConnectionIdle: 0,
ServerGracefulShutdownTimeout: cfg.GRPC.NetAddr.DialerConfig.Timeout,

// PROMTAIL CONVERTER IGNORES ALL OF THESE
//
// HTTPListenNetwork: "",
// HTTPServerReadHeaderTimeout: 0,
// GRPCListenNetwork: "",
// CipherSuites: "",
// MinVersion: "",
// HTTPTLSConfig: server.TLSConfig{},
// GRPCTLSConfig: server.TLSConfig{},
// RegisterInstrumentation: false,
// ReportGRPCCodesInInstrumentationLabel: false,
// ReportHTTP4XXCodesInInstrumentationLabel: false,
// ExcludeRequestInLog: false,
// DisableRequestSuccessLog: false,
// HTTPLogClosedConnectionsWithoutResponse: false,
// GRPCOptions: []grpc.ServerOption{},
// GRPCMiddleware: []grpc.UnaryServerInterceptor{},
// GRPCStreamMiddleware: []grpc.StreamServerInterceptor{},
// HTTPMiddleware: []middleware.Interface{},
// Router: &mux.Router{},
// DoNotAddDefaultHTTPMiddleware: false,
// RouteHTTPToGRPC: false,
// GRPCServerTime: 0,
// GRPCServerTimeout: cfg.GRPC.NetAddr.DialerConfig.Timeout,
// GRPCServerMinTimeBetweenPings: 0,
// GRPCServerPingWithoutStreamAllowed: false,
// GRPCServerNumWorkers: 0,
// LogFormat: "",
// LogLevel: log.Level{},
// Log: nil,
// LogSourceIPs: false,
// LogSourceIPsHeader: "",
// LogSourceIPsRegex: "",
// LogRequestHeaders: false,
// LogRequestAtInfoLevel: false,
// LogRequestExcludeHeadersList: "",
// SignalHandler: nil,
// Registerer: nil,
// Gatherer: nil,
// PathPrefix: "",
// GrpcMethodLimiter: nil,
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
loki.source.api "default" {
http { }

grpc { }
graceful_shutdown_timeout = "10s"
forward_to = [otelcol.receiver.loki.default.receiver]
labels = {}
relabel_rules = null
use_incoming_timestamp = true
}

otelcol.receiver.loki "default" {
output {
logs = [otelcol.exporter.otlp.default.input]
}
}

otelcol.exporter.otlp "default" {
client {
endpoint = "database:4317"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
receivers:
loki:
protocols:
http:
endpoint: 0.0.0.0:3500
grpc:
endpoint: 0.0.0.0:3600
dialer:
timeout: 10s
use_incoming_timestamp: true

exporters:
otlp:
endpoint: database:4317

service:
pipelines:
logs:
receivers: [loki]
processors: []
exporters: [otlp]
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func (s *ScrapeConfigBuilder) AppendPushAPI() {
return
}
s.diags.AddAll(common.ValidateWeaveWorksServerCfg(s.cfg.PushConfig.Server))
args := toLokiApiArguments(s.cfg.PushConfig, s.getOrNewProcessStageReceivers())
args := ToLokiApiArguments(s.cfg.PushConfig, s.getOrNewProcessStageReceivers())
override := func(val interface{}) interface{} {
switch val.(type) {
case relabel.Rules:
Expand All @@ -32,7 +32,7 @@ func (s *ScrapeConfigBuilder) AppendPushAPI() {
))
}

func toLokiApiArguments(config *scrapeconfig.PushTargetConfig, forwardTo []loki.LogsReceiver) api.Arguments {
func ToLokiApiArguments(config *scrapeconfig.PushTargetConfig, forwardTo []loki.LogsReceiver) api.Arguments {
return api.Arguments{
ForwardTo: forwardTo,
RelabelRules: make(relabel.Rules, 0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/grafana/agent/internal/component/common/loki"
"github.com/grafana/agent/internal/converter/diag"
"github.com/grafana/agent/internal/converter/internal/common"
"github.com/grafana/agent/internal/converter/internal/promtailconvert/internal/build"
"github.com/grafana/agent/internal/converter/internal/promtailconvert/build"
"github.com/grafana/dskit/flagext"
promtailcfg "github.com/grafana/loki/clients/pkg/promtail/config"
"github.com/grafana/loki/clients/pkg/promtail/limit"
Expand Down

0 comments on commit 0701953

Please sign in to comment.