Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/traces improve #108

Merged
merged 6 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions exporter/qrynexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,18 @@


# Configuration options:
- `dsn` (required): Data Source Name for Clickhouse.
- Example: `tcp://localhost:9000/qryn`

- `clustered_clickhouse` (required):
- Type: boolean
- Description: Set to `true` if using a Clickhouse cluster; otherwise, set to `false`.

- `client_side_trace_processing` (required):
- Type: boolean
- Default: `true`
- Description: Enables client-side processing of trace data. This can improve performance but may increase client-side resource usage.

- `dsn` (required): Clickhouse's dsn.
- `clustered_clickhouse` (required): true if clickhouse cluster is used

# Example:
## Simple Trace Data
Expand Down
3 changes: 3 additions & 0 deletions exporter/qrynexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ type Config struct {
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
exporterhelper.QueueSettings `mapstructure:"sending_queue"`

// ClientSideTraceProcessing is a boolean that indicates whether to process traces on the client side.
ClientSideTraceProcessing bool `mapstructure:"client_side_trace_processing"`

ClusteredClickhouse bool `mapstructure:"clustered_clickhouse"`

// DSN is the ClickHouse server Data Source Name.
Expand Down
2 changes: 1 addition & 1 deletion exporter/qrynexporter/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
}
}

if err := batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries); err != nil {
if err := batchSamplesAndTimeSeries(context.WithValue(ctx, clusterKey, e.cluster), e.db, samples, timeSeries); err != nil {
otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeLogs)))
e.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error()))
return err
Expand Down
2 changes: 1 addition & 1 deletion exporter/qrynexporter/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ func (e *metricsExporter) pushMetricsData(ctx context.Context, md pmetric.Metric
}
}

if err := batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries); err != nil {
if err := batchSamplesAndTimeSeries(context.WithValue(ctx, clusterKey, e.cluster), e.db, samples, timeSeries); err != nil {
otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeMetrics)))
e.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error()))
return err
Expand Down
64 changes: 61 additions & 3 deletions exporter/qrynexporter/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
)

var (
tracesInputSQL = func(clustered bool) string {
tracesInputSQL = func(_ bool) string {
return `INSERT INTO traces_input (
trace_id,
span_id,
Expand Down Expand Up @@ -57,6 +57,40 @@ var (
}
)

func TracesV2InputSQL(clustered bool) string {
dist := ""
if clustered {
dist = "_dist"
}
return fmt.Sprintf(`INSERT INTO tempo_traces%s (
oid,
trace_id,
span_id,
parent_id,
name,
timestamp_ns,
duration_ns,
service_name,
payload_type,
payload)`, dist)
}

func TracesTagsV2InputSQL(clustered bool) string {
dist := ""
if clustered {
dist = "_dist"
}
return fmt.Sprintf(`INSERT INTO tempo_traces_attrs_gin%s (
oid,
date,
key,
val,
trace_id,
span_id,
timestamp_ns,
duration)`, dist)
}

// Note: https://github.com/metrico/qryn/blob/master/lib/db/maintain/scripts.js
// We need to align with the schema here.
//
Expand All @@ -76,8 +110,8 @@ var (
//
// ) Engine=Null

// Trace represent trace model
type Trace struct {
// TraceInput represent trace model
type TraceInput struct {
TraceID string `ch:"trace_id"`
SpanID string `ch:"span_id"`
ParentID string `ch:"parent_id"`
Expand All @@ -90,6 +124,30 @@ type Trace struct {
Tags [][]string `ch:"tags"`
}

type TempoTrace struct {
OID string `ch:"oid"`
TraceID []byte `ch:"trace_id"`
SpanID []byte `ch:"span_id"`
ParentID []byte `ch:"parent_id"`
Name string `ch:"name"`
TimestampNs int64 `ch:"timestamp_ns"`
DurationNs int64 `ch:"duration_ns"`
ServiceName string `ch:"service_name"`
PayloadType int8 `ch:"payload_type"`
Payload string `ch:"payload"`
}

type TempoTraceTag struct {
OID string `ch:"oid"`
Date time.Time `ch:"date"`
Key string `ch:"key"`
Val string `ch:"val"`
TraceID []byte `ch:"trace_id"`
SpanID []byte `ch:"span_id"`
TimestampNs int64 `ch:"timestamp_ns"`
DurationNs int64 `ch:"duration"`
}

// Sample represent sample data
// `CREATE TABLE IF NOT EXISTS samples_v3
// (
Expand Down
103 changes: 103 additions & 0 deletions exporter/qrynexporter/trace_batch_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package qrynexporter

import (
"encoding/hex"
"fmt"
"time"

"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
)

type traceWithTagsBatch struct {
driver.Batch
tagsBatch driver.Batch
}

func (b *traceWithTagsBatch) AppendStruct(v any) error {
ti, ok := v.(*TraceInput)
if !ok {
return fmt.Errorf("invalid data type, expected *Trace, got %T", v)
}
binTraceId, err := unhexAndPad(ti.TraceID, 16)
if err != nil {
return err
}
binParentID, err := unhexAndPad(ti.ParentID, 8)
if err != nil {
return err
}
binSpanID, err := unhexAndPad(ti.SpanID, 8)
if err != nil {
return err
}
trace := &TempoTrace{
OID: "0",
TraceID: binTraceId,
SpanID: binSpanID,
ParentID: binParentID,
Name: ti.Name,
TimestampNs: ti.TimestampNs,
DurationNs: ti.DurationNs,
ServiceName: ti.ServiceName,
PayloadType: ti.PayloadType,
Payload: ti.Payload,
}
err = b.Batch.AppendStruct(trace)
if err != nil {
return err
}
for _, tag := range ti.Tags {
attr := &TempoTraceTag{
OID: "0",
Date: time.Unix(0, trace.TimestampNs).Truncate(time.Hour * 24),
Key: tag[0],
Val: tag[1],
TraceID: binTraceId,
SpanID: binSpanID,
TimestampNs: ti.TimestampNs,
DurationNs: ti.DurationNs,
}
err = b.tagsBatch.AppendStruct(attr)
if err != nil {
return err
}
}
return nil
}

func (b *traceWithTagsBatch) Abort() error {
var errs [2]error
errs[0] = b.Batch.Abort()
errs[1] = b.tagsBatch.Abort()
for _, err := range errs {
if err != nil {
return err
}
}
return nil
}

func (b *traceWithTagsBatch) Send() error {
var errs [2]error
errs[0] = b.Batch.Send()
errs[1] = b.tagsBatch.Send()
for _, err := range errs {
if err != nil {
return err
}
}
return nil
}

func unhexAndPad(s string, size int) ([]byte, error) {
bStr, err := hex.DecodeString(s)
if err != nil {
return nil, err
}
if len(bStr) < size {
res := make([]byte, size)
copy(res[size-len(bStr):], bStr)
return res, nil
}
return bStr[size-len(bStr):], nil
}
52 changes: 40 additions & 12 deletions exporter/qrynexporter/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@ import (
"google.golang.org/protobuf/encoding/protojson"
)

type contextKey string

const (
spanLinkDataFormat = "%s|%s|%s|%s|%d"
spanLinkDataFormat = "%s|%s|%s|%s|%d"
clusterKey contextKey = "cluster"
)

var delegate = &protojson.MarshalOptions{
Expand All @@ -48,8 +51,9 @@ type tracesExporter struct {
logger *zap.Logger
meter metric.Meter

db clickhouse.Conn
cluster bool
db clickhouse.Conn
cluster bool
clientSide bool
}

// newTracesExporter returns a SpanWriter for the database
Expand All @@ -63,10 +67,11 @@ func newTracesExporter(logger *zap.Logger, cfg *Config, set *exporter.Settings)
return nil, err
}
exp := &tracesExporter{
logger: logger,
meter: set.MeterProvider.Meter(typeStr),
db: db,
cluster: cfg.ClusteredClickhouse,
logger: logger,
meter: set.MeterProvider.Meter(typeStr),
db: db,
cluster: cfg.ClusteredClickhouse,
clientSide: cfg.ClusteredClickhouse && cfg.ClientSideTraceProcessing,
}
if err := initMetrics(exp.meter); err != nil {
exp.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error()))
Expand Down Expand Up @@ -163,8 +168,15 @@ func extractScopeTags(il pcommon.InstrumentationScope, tags map[string]string) {
}

func (e *tracesExporter) exportResourceSapns(ctx context.Context, resourceSpans ptrace.ResourceSpansSlice) error {
isCluster := ctx.Value("cluster").(bool)
batch, err := e.db.PrepareBatch(ctx, tracesInputSQL(isCluster))
isCluster := ctx.Value(clusterKey).(bool)
var batch driver.Batch
var err error
if e.clientSide {
batch, err = e.prepareBatchClientSide(ctx)
} else {
batch, err = e.db.PrepareBatch(ctx, tracesInputSQL(isCluster))
}

if err != nil {
return err
}
Expand All @@ -187,9 +199,25 @@ func (e *tracesExporter) exportResourceSapns(ctx context.Context, resourceSpans
return nil
}

func (e *tracesExporter) prepareBatchClientSide(ctx context.Context) (driver.Batch, error) {
batch, err := e.db.PrepareBatch(ctx, TracesV2InputSQL(e.cluster))
if err != nil {
return nil, err
}
subBatch, err := e.db.PrepareBatch(ctx, TracesTagsV2InputSQL(e.cluster))
if err != nil {
batch.Abort()
return nil, err
}
return &traceWithTagsBatch{
Batch: batch,
tagsBatch: subBatch,
}, nil
}

// traceDataPusher implements OTEL exporterhelper.traceDataPusher
func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) error {
_ctx := context.WithValue(ctx, "cluster", e.cluster)
_ctx := context.WithValue(ctx, clusterKey, e.cluster)
start := time.Now()
if err := e.exportResourceSapns(_ctx, td.ResourceSpans()); err != nil {
otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeTraces)))
Expand Down Expand Up @@ -362,7 +390,7 @@ func marshalSpanToJSON(span ptrace.Span, mergedAttributes pcommon.Map) ([]byte,
return delegate.Marshal(otlpSpan)
}

func convertTracesInput(span ptrace.Span, resource pcommon.Resource, serviceName string, tags map[string]string) (*Trace, error) {
func convertTracesInput(span ptrace.Span, resource pcommon.Resource, serviceName string, tags map[string]string) (*TraceInput, error) {
durationNano := uint64(span.EndTimestamp() - span.StartTimestamp())
tags = aggregateSpanTags(span, tags)
tags["service.name"] = serviceName
Expand All @@ -379,7 +407,7 @@ func convertTracesInput(span ptrace.Span, resource pcommon.Resource, serviceName
return nil, fmt.Errorf("failed to marshal span: %w", err)
}

trace := &Trace{
trace := &TraceInput{
TraceID: span.TraceID().String(),
SpanID: span.SpanID().String(),
ParentID: span.ParentSpanID().String(),
Expand Down
Loading