From e32d66a22b4927ad0273087530916f346b25086e Mon Sep 17 00:00:00 2001 From: akvlad Date: Mon, 21 Oct 2024 16:26:54 +0300 Subject: [PATCH 1/6] traces ingestion V2 --- exporter/qrynexporter/README.md | 1 + exporter/qrynexporter/config.go | 2 + exporter/qrynexporter/schema.go | 58 ++++++++++++++++ exporter/qrynexporter/traces.go | 27 +++++++- exporter/qrynexporter/traces_v2.go | 102 +++++++++++++++++++++++++++++ 5 files changed, 189 insertions(+), 1 deletion(-) create mode 100644 exporter/qrynexporter/traces_v2.go diff --git a/exporter/qrynexporter/README.md b/exporter/qrynexporter/README.md index 7daa99c..35527da 100644 --- a/exporter/qrynexporter/README.md +++ b/exporter/qrynexporter/README.md @@ -11,6 +11,7 @@ - `dsn` (required): Clickhouse's dsn. - `clustered_clickhouse` (required): true if clickhouse cluster is used +- `traces_distributed_export_v2`: use improved ingestion algorythm for traces. Data ingestion is sess performant but more evenly distributed # Example: ## Simple Trace Data diff --git a/exporter/qrynexporter/config.go b/exporter/qrynexporter/config.go index c71651b..4075e71 100644 --- a/exporter/qrynexporter/config.go +++ b/exporter/qrynexporter/config.go @@ -30,6 +30,8 @@ type Config struct { configretry.BackOffConfig `mapstructure:"retry_on_failure"` exporterhelper.QueueSettings `mapstructure:"sending_queue"` + TracesDitstibutedExportV2 bool `mapstructure:"traces_distributed_export_v2"` + ClusteredClickhouse bool `mapstructure:"clustered_clickhouse"` // DSN is the ClickHouse server Data Source Name. diff --git a/exporter/qrynexporter/schema.go b/exporter/qrynexporter/schema.go index fed1546..34dce51 100644 --- a/exporter/qrynexporter/schema.go +++ b/exporter/qrynexporter/schema.go @@ -57,6 +57,40 @@ var ( } ) +func TracesV2InputSQL(clustered bool) string { + dist := "" + if clustered { + dist = "_dist" + } + return fmt.Sprintf(`INSERT INTO tempo_traces%s ( + oid, + trace_id, + span_id, + parent_id, + name, + timestamp_ns, + duration_ns, + service_name, + payload_type, + payload)`, dist) +} + +func TracesTagsV2InputSQL(clustered bool) string { + dist := "" + if clustered { + dist = "_dist" + } + return fmt.Sprintf(`INSERT INTO tempo_traces_attrs_gin%s ( + oid, + date, + key, + val, + trace_id, + span_id, + timestamp_ns, + duration)`, dist) +} + // Note: https://github.com/metrico/qryn/blob/master/lib/db/maintain/scripts.js // We need to align with the schema here. // @@ -90,6 +124,30 @@ type Trace struct { Tags [][]string `ch:"tags"` } +type TraceV2 struct { + OID string `ch:"oid"` + TraceID []byte `ch:"trace_id"` + SpanID []byte `ch:"span_id"` + ParentID []byte `ch:"parent_id"` + Name string `ch:"name"` + TimestampNs int64 `ch:"timestamp_ns"` + DurationNs int64 `ch:"duration_ns"` + ServiceName string `ch:"service_name"` + PayloadType int8 `ch:"payload_type"` + Payload string `ch:"payload"` +} + +type TraceTagsV2 struct { + OID string `ch:"oid"` + Date time.Time `ch:"date"` + Key string `ch:"key"` + Val string `ch:"val"` + TraceID []byte `ch:"trace_id"` + SpanID []byte `ch:"span_id"` + TimestampNs int64 `ch:"timestamp_ns"` + DurationNs int64 `ch:"duration"` +} + // Sample represent sample data // `CREATE TABLE IF NOT EXISTS samples_v3 // ( diff --git a/exporter/qrynexporter/traces.go b/exporter/qrynexporter/traces.go index 86dec6d..e33979c 100644 --- a/exporter/qrynexporter/traces.go +++ b/exporter/qrynexporter/traces.go @@ -50,6 +50,7 @@ type tracesExporter struct { db clickhouse.Conn cluster bool + v2 bool } // newTracesExporter returns a SpanWriter for the database @@ -67,6 +68,7 @@ func newTracesExporter(logger *zap.Logger, cfg *Config, set *exporter.Settings) meter: set.MeterProvider.Meter(typeStr), db: db, cluster: cfg.ClusteredClickhouse, + v2: cfg.ClusteredClickhouse && cfg.TracesDitstibutedExportV2, } if err := initMetrics(exp.meter); err != nil { exp.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error())) @@ -164,7 +166,14 @@ func extractScopeTags(il pcommon.InstrumentationScope, tags map[string]string) { func (e *tracesExporter) exportResourceSapns(ctx context.Context, resourceSpans ptrace.ResourceSpansSlice) error { isCluster := ctx.Value("cluster").(bool) - batch, err := e.db.PrepareBatch(ctx, tracesInputSQL(isCluster)) + var batch driver.Batch + var err error + if e.v2 { + batch, err = e.prepareBatchV2(ctx) + } else { + batch, err = e.db.PrepareBatch(ctx, tracesInputSQL(isCluster)) + } + if err != nil { return err } @@ -187,6 +196,22 @@ func (e *tracesExporter) exportResourceSapns(ctx context.Context, resourceSpans return nil } +func (e *tracesExporter) prepareBatchV2(ctx context.Context) (driver.Batch, error) { + batch, err := e.db.PrepareBatch(ctx, TracesV2InputSQL(e.cluster)) + if err != nil { + return nil, err + } + subBatch, err := e.db.PrepareBatch(ctx, TracesTagsV2InputSQL(e.cluster)) + if err != nil { + batch.Abort() + return nil, err + } + return &batchV2{ + Batch: batch, + subBatch: subBatch, + }, nil +} + // traceDataPusher implements OTEL exporterhelper.traceDataPusher func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) error { _ctx := context.WithValue(ctx, "cluster", e.cluster) diff --git a/exporter/qrynexporter/traces_v2.go b/exporter/qrynexporter/traces_v2.go new file mode 100644 index 0000000..37da8a9 --- /dev/null +++ b/exporter/qrynexporter/traces_v2.go @@ -0,0 +1,102 @@ +package qrynexporter + +import ( + "encoding/hex" + "fmt" + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "time" +) + +type batchV2 struct { + driver.Batch + subBatch driver.Batch +} + +func (b *batchV2) AppendStruct(data any) error { + _data, ok := data.(*Trace) + if !ok { + return fmt.Errorf("invalid data type, expected *Trace, got %T", data) + } + binTraceId, err := unhexAndPad(_data.TraceID, 16) + if err != nil { + return err + } + binParentID, err := unhexAndPad(_data.ParentID, 8) + if err != nil { + return err + } + binSpanID, err := unhexAndPad(_data.SpanID, 8) + if err != nil { + return err + } + trace := &TraceV2{ + OID: "0", + TraceID: binTraceId, + SpanID: binSpanID, + ParentID: binParentID, + Name: _data.Name, + TimestampNs: _data.TimestampNs, + DurationNs: _data.DurationNs, + ServiceName: _data.ServiceName, + PayloadType: _data.PayloadType, + Payload: _data.Payload, + } + err = b.Batch.AppendStruct(trace) + if err != nil { + return err + } + for _, tag := range _data.Tags { + attr := &TraceTagsV2{ + OID: "0", + Date: time.Unix(0, trace.TimestampNs).Truncate(time.Hour * 24), + Key: tag[0], + Val: tag[1], + TraceID: binTraceId, + SpanID: binSpanID, + TimestampNs: _data.TimestampNs, + DurationNs: _data.DurationNs, + } + err = b.subBatch.AppendStruct(attr) + if err != nil { + return err + } + } + return nil +} + +func (b *batchV2) Abort() error { + var errs [2]error + errs[0] = b.Batch.Abort() + errs[1] = b.subBatch.Abort() + for _, err := range errs { + if err != nil { + return err + } + } + return nil +} + +func (b *batchV2) Send() error { + var errs [2]error + errs[0] = b.Batch.Send() + errs[1] = b.subBatch.Send() + for _, err := range errs { + if err != nil { + return err + } + } + return nil +} + +func unhexAndPad(s string, size int) ([]byte, error) { + bStr, err := hex.DecodeString(s) + if err != nil { + return nil, err + } + if len(bStr) < size { + res := make([]byte, size) + copy(res[size-len(bStr):], bStr) + return res, nil + } + return bStr[size-len(bStr):], nil +} From d6fc0ff4aa240b33006d01193b4518d90ba699f4 Mon Sep 17 00:00:00 2001 From: akvlad Date: Mon, 21 Oct 2024 17:08:00 +0300 Subject: [PATCH 2/6] config rename --- exporter/qrynexporter/README.md | 3 ++- exporter/qrynexporter/config.go | 2 +- exporter/qrynexporter/traces.go | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/exporter/qrynexporter/README.md b/exporter/qrynexporter/README.md index 35527da..b2b0b4f 100644 --- a/exporter/qrynexporter/README.md +++ b/exporter/qrynexporter/README.md @@ -11,7 +11,8 @@ - `dsn` (required): Clickhouse's dsn. - `clustered_clickhouse` (required): true if clickhouse cluster is used -- `traces_distributed_export_v2`: use improved ingestion algorythm for traces. Data ingestion is sess performant but more evenly distributed +- `client_side_trace_processing`: use improved traces ingestion algorythm for clickhouse clusters. +Data ingestion is sess performant but more evenly distributed # Example: ## Simple Trace Data diff --git a/exporter/qrynexporter/config.go b/exporter/qrynexporter/config.go index 4075e71..b025e7a 100644 --- a/exporter/qrynexporter/config.go +++ b/exporter/qrynexporter/config.go @@ -30,7 +30,7 @@ type Config struct { configretry.BackOffConfig `mapstructure:"retry_on_failure"` exporterhelper.QueueSettings `mapstructure:"sending_queue"` - TracesDitstibutedExportV2 bool `mapstructure:"traces_distributed_export_v2"` + ClientSideTraceProcessing bool `mapstructure:"client_side_trace_processing"` ClusteredClickhouse bool `mapstructure:"clustered_clickhouse"` diff --git a/exporter/qrynexporter/traces.go b/exporter/qrynexporter/traces.go index e33979c..b1e71f2 100644 --- a/exporter/qrynexporter/traces.go +++ b/exporter/qrynexporter/traces.go @@ -68,7 +68,7 @@ func newTracesExporter(logger *zap.Logger, cfg *Config, set *exporter.Settings) meter: set.MeterProvider.Meter(typeStr), db: db, cluster: cfg.ClusteredClickhouse, - v2: cfg.ClusteredClickhouse && cfg.TracesDitstibutedExportV2, + v2: cfg.ClusteredClickhouse && cfg.ClientSideTraceProcessing, } if err := initMetrics(exp.meter); err != nil { exp.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error())) From 8ab9005be38df95758b5520a83615b47044c58a1 Mon Sep 17 00:00:00 2001 From: Cluas Date: Mon, 21 Oct 2024 22:14:43 +0800 Subject: [PATCH 3/6] refactor: improve the code structure and documentation of qrynexporter. Rename the batchV2 struct to traceWithTagsBatch to improve code readability. Update the names of struct fields to make them more descriptive. Rename the traces_v2.go file to trace_batch_processor.go. Use a custom contextKey type in the pushTraceData function to resolve the SA1029 warning. Optimize README.md to provide more detailed configuration instructions. These changes are aimed at improving code quality, maintainability, and documentation clarity. --- exporter/qrynexporter/README.md | 14 +++++ exporter/qrynexporter/config.go | 4 ++ exporter/qrynexporter/logs.go | 2 +- exporter/qrynexporter/metrics.go | 2 +- exporter/qrynexporter/schema.go | 10 ++-- ...{traces_v2.go => trace_batch_processor.go} | 51 ++++++++++--------- exporter/qrynexporter/traces.go | 39 +++++++++----- 7 files changed, 76 insertions(+), 46 deletions(-) rename exporter/qrynexporter/{traces_v2.go => trace_batch_processor.go} (61%) diff --git a/exporter/qrynexporter/README.md b/exporter/qrynexporter/README.md index b2b0b4f..ce0a325 100644 --- a/exporter/qrynexporter/README.md +++ b/exporter/qrynexporter/README.md @@ -8,11 +8,25 @@ # Configuration options: +- `dsn` (required): Data Source Name for Clickhouse. + - Example: `tcp://localhost:9000/?database=cloki` +- `clustered_clickhouse` (required): + - Type: boolean + - Description: Set to `true` if using a Clickhouse cluster; otherwise, set to `false`. + +- `client_side_trace_processing` (required): + - Type: boolean + - Default: `true` + - Description: Enables client-side processing of trace data. This can improve performance but may increase client-side resource usage. + +<<<<<<< HEAD - `dsn` (required): Clickhouse's dsn. - `clustered_clickhouse` (required): true if clickhouse cluster is used - `client_side_trace_processing`: use improved traces ingestion algorythm for clickhouse clusters. Data ingestion is sess performant but more evenly distributed +======= +>>>>>>> e35202d (refactor: improve the code structure and documentation of qrynexporter.) # Example: ## Simple Trace Data diff --git a/exporter/qrynexporter/config.go b/exporter/qrynexporter/config.go index b025e7a..176c28c 100644 --- a/exporter/qrynexporter/config.go +++ b/exporter/qrynexporter/config.go @@ -30,6 +30,10 @@ type Config struct { configretry.BackOffConfig `mapstructure:"retry_on_failure"` exporterhelper.QueueSettings `mapstructure:"sending_queue"` +<<<<<<< HEAD +======= + // ClientSideTraceProcessing is a boolean that indicates whether to process traces on the client side. +>>>>>>> e35202d (refactor: improve the code structure and documentation of qrynexporter.) ClientSideTraceProcessing bool `mapstructure:"client_side_trace_processing"` ClusteredClickhouse bool `mapstructure:"clustered_clickhouse"` diff --git a/exporter/qrynexporter/logs.go b/exporter/qrynexporter/logs.go index cc8eafc..7791965 100644 --- a/exporter/qrynexporter/logs.go +++ b/exporter/qrynexporter/logs.go @@ -441,7 +441,7 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error { } } - if err := batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries); err != nil { + if err := batchSamplesAndTimeSeries(context.WithValue(ctx, clusterKey, e.cluster), e.db, samples, timeSeries); err != nil { otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeLogs))) e.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error())) return err diff --git a/exporter/qrynexporter/metrics.go b/exporter/qrynexporter/metrics.go index c2f0fd4..d1c4618 100644 --- a/exporter/qrynexporter/metrics.go +++ b/exporter/qrynexporter/metrics.go @@ -491,7 +491,7 @@ func (e *metricsExporter) pushMetricsData(ctx context.Context, md pmetric.Metric } } - if err := batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries); err != nil { + if err := batchSamplesAndTimeSeries(context.WithValue(ctx, clusterKey, e.cluster), e.db, samples, timeSeries); err != nil { otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeMetrics))) e.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error())) return err diff --git a/exporter/qrynexporter/schema.go b/exporter/qrynexporter/schema.go index 34dce51..c84c745 100644 --- a/exporter/qrynexporter/schema.go +++ b/exporter/qrynexporter/schema.go @@ -20,7 +20,7 @@ import ( ) var ( - tracesInputSQL = func(clustered bool) string { + tracesInputSQL = func(_ bool) string { return `INSERT INTO traces_input ( trace_id, span_id, @@ -110,8 +110,8 @@ func TracesTagsV2InputSQL(clustered bool) string { // // ) Engine=Null -// Trace represent trace model -type Trace struct { +// TraceInput represent trace model +type TraceInput struct { TraceID string `ch:"trace_id"` SpanID string `ch:"span_id"` ParentID string `ch:"parent_id"` @@ -124,7 +124,7 @@ type Trace struct { Tags [][]string `ch:"tags"` } -type TraceV2 struct { +type TempoTrace struct { OID string `ch:"oid"` TraceID []byte `ch:"trace_id"` SpanID []byte `ch:"span_id"` @@ -137,7 +137,7 @@ type TraceV2 struct { Payload string `ch:"payload"` } -type TraceTagsV2 struct { +type TempoTraceTag struct { OID string `ch:"oid"` Date time.Time `ch:"date"` Key string `ch:"key"` diff --git a/exporter/qrynexporter/traces_v2.go b/exporter/qrynexporter/trace_batch_processor.go similarity index 61% rename from exporter/qrynexporter/traces_v2.go rename to exporter/qrynexporter/trace_batch_processor.go index 37da8a9..6fcde08 100644 --- a/exporter/qrynexporter/traces_v2.go +++ b/exporter/qrynexporter/trace_batch_processor.go @@ -3,60 +3,61 @@ package qrynexporter import ( "encoding/hex" "fmt" - "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "time" + + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" ) -type batchV2 struct { +type traceWithTagsBatch struct { driver.Batch - subBatch driver.Batch + tagsBatch driver.Batch } -func (b *batchV2) AppendStruct(data any) error { - _data, ok := data.(*Trace) +func (b *traceWithTagsBatch) AppendStruct(v any) error { + ti, ok := v.(*TraceInput) if !ok { - return fmt.Errorf("invalid data type, expected *Trace, got %T", data) + return fmt.Errorf("invalid data type, expected *Trace, got %T", v) } - binTraceId, err := unhexAndPad(_data.TraceID, 16) + binTraceId, err := unhexAndPad(ti.TraceID, 16) if err != nil { return err } - binParentID, err := unhexAndPad(_data.ParentID, 8) + binParentID, err := unhexAndPad(ti.ParentID, 8) if err != nil { return err } - binSpanID, err := unhexAndPad(_data.SpanID, 8) + binSpanID, err := unhexAndPad(ti.SpanID, 8) if err != nil { return err } - trace := &TraceV2{ + trace := &TempoTrace{ OID: "0", TraceID: binTraceId, SpanID: binSpanID, ParentID: binParentID, - Name: _data.Name, - TimestampNs: _data.TimestampNs, - DurationNs: _data.DurationNs, - ServiceName: _data.ServiceName, - PayloadType: _data.PayloadType, - Payload: _data.Payload, + Name: ti.Name, + TimestampNs: ti.TimestampNs, + DurationNs: ti.DurationNs, + ServiceName: ti.ServiceName, + PayloadType: ti.PayloadType, + Payload: ti.Payload, } err = b.Batch.AppendStruct(trace) if err != nil { return err } - for _, tag := range _data.Tags { - attr := &TraceTagsV2{ + for _, tag := range ti.Tags { + attr := &TempoTraceTag{ OID: "0", Date: time.Unix(0, trace.TimestampNs).Truncate(time.Hour * 24), Key: tag[0], Val: tag[1], TraceID: binTraceId, SpanID: binSpanID, - TimestampNs: _data.TimestampNs, - DurationNs: _data.DurationNs, + TimestampNs: ti.TimestampNs, + DurationNs: ti.DurationNs, } - err = b.subBatch.AppendStruct(attr) + err = b.tagsBatch.AppendStruct(attr) if err != nil { return err } @@ -64,10 +65,10 @@ func (b *batchV2) AppendStruct(data any) error { return nil } -func (b *batchV2) Abort() error { +func (b *traceWithTagsBatch) Abort() error { var errs [2]error errs[0] = b.Batch.Abort() - errs[1] = b.subBatch.Abort() + errs[1] = b.tagsBatch.Abort() for _, err := range errs { if err != nil { return err @@ -76,10 +77,10 @@ func (b *batchV2) Abort() error { return nil } -func (b *batchV2) Send() error { +func (b *traceWithTagsBatch) Send() error { var errs [2]error errs[0] = b.Batch.Send() - errs[1] = b.subBatch.Send() + errs[1] = b.tagsBatch.Send() for _, err := range errs { if err != nil { return err diff --git a/exporter/qrynexporter/traces.go b/exporter/qrynexporter/traces.go index b1e71f2..a63e27e 100644 --- a/exporter/qrynexporter/traces.go +++ b/exporter/qrynexporter/traces.go @@ -34,8 +34,11 @@ import ( "google.golang.org/protobuf/encoding/protojson" ) +type contextKey string + const ( - spanLinkDataFormat = "%s|%s|%s|%s|%d" + spanLinkDataFormat = "%s|%s|%s|%s|%d" + clusterKey contextKey = "cluster" ) var delegate = &protojson.MarshalOptions{ @@ -48,9 +51,9 @@ type tracesExporter struct { logger *zap.Logger meter metric.Meter - db clickhouse.Conn - cluster bool - v2 bool + db clickhouse.Conn + cluster bool + clientSide bool } // newTracesExporter returns a SpanWriter for the database @@ -64,11 +67,19 @@ func newTracesExporter(logger *zap.Logger, cfg *Config, set *exporter.Settings) return nil, err } exp := &tracesExporter{ +<<<<<<< HEAD logger: logger, meter: set.MeterProvider.Meter(typeStr), db: db, cluster: cfg.ClusteredClickhouse, v2: cfg.ClusteredClickhouse && cfg.ClientSideTraceProcessing, +======= + logger: logger, + meter: set.MeterProvider.Meter(typeStr), + db: db, + cluster: cfg.ClusteredClickhouse, + clientSide: cfg.ClientSideTraceProcessing, +>>>>>>> e35202d (refactor: improve the code structure and documentation of qrynexporter.) } if err := initMetrics(exp.meter); err != nil { exp.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error())) @@ -165,11 +176,11 @@ func extractScopeTags(il pcommon.InstrumentationScope, tags map[string]string) { } func (e *tracesExporter) exportResourceSapns(ctx context.Context, resourceSpans ptrace.ResourceSpansSlice) error { - isCluster := ctx.Value("cluster").(bool) + isCluster := ctx.Value(clusterKey).(bool) var batch driver.Batch var err error - if e.v2 { - batch, err = e.prepareBatchV2(ctx) + if e.clientSide { + batch, err = e.prepareBatchClientSide(ctx) } else { batch, err = e.db.PrepareBatch(ctx, tracesInputSQL(isCluster)) } @@ -196,7 +207,7 @@ func (e *tracesExporter) exportResourceSapns(ctx context.Context, resourceSpans return nil } -func (e *tracesExporter) prepareBatchV2(ctx context.Context) (driver.Batch, error) { +func (e *tracesExporter) prepareBatchClientSide(ctx context.Context) (driver.Batch, error) { batch, err := e.db.PrepareBatch(ctx, TracesV2InputSQL(e.cluster)) if err != nil { return nil, err @@ -206,15 +217,15 @@ func (e *tracesExporter) prepareBatchV2(ctx context.Context) (driver.Batch, erro batch.Abort() return nil, err } - return &batchV2{ - Batch: batch, - subBatch: subBatch, + return &traceWithTagsBatch{ + Batch: batch, + tagsBatch: subBatch, }, nil } // traceDataPusher implements OTEL exporterhelper.traceDataPusher func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) error { - _ctx := context.WithValue(ctx, "cluster", e.cluster) + _ctx := context.WithValue(ctx, clusterKey, e.cluster) start := time.Now() if err := e.exportResourceSapns(_ctx, td.ResourceSpans()); err != nil { otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeTraces))) @@ -387,7 +398,7 @@ func marshalSpanToJSON(span ptrace.Span, mergedAttributes pcommon.Map) ([]byte, return delegate.Marshal(otlpSpan) } -func convertTracesInput(span ptrace.Span, resource pcommon.Resource, serviceName string, tags map[string]string) (*Trace, error) { +func convertTracesInput(span ptrace.Span, resource pcommon.Resource, serviceName string, tags map[string]string) (*TraceInput, error) { durationNano := uint64(span.EndTimestamp() - span.StartTimestamp()) tags = aggregateSpanTags(span, tags) tags["service.name"] = serviceName @@ -404,7 +415,7 @@ func convertTracesInput(span ptrace.Span, resource pcommon.Resource, serviceName return nil, fmt.Errorf("failed to marshal span: %w", err) } - trace := &Trace{ + trace := &TraceInput{ TraceID: span.TraceID().String(), SpanID: span.SpanID().String(), ParentID: span.ParentSpanID().String(), From 3c3653e57529046f790126971bb58d203d40c67d Mon Sep 17 00:00:00 2001 From: Cluas Date: Mon, 21 Oct 2024 22:31:46 +0800 Subject: [PATCH 4/6] fix: rebase --- exporter/qrynexporter/README.md | 7 ------- exporter/qrynexporter/config.go | 3 --- exporter/qrynexporter/traces.go | 8 -------- 3 files changed, 18 deletions(-) diff --git a/exporter/qrynexporter/README.md b/exporter/qrynexporter/README.md index ce0a325..a844bb0 100644 --- a/exporter/qrynexporter/README.md +++ b/exporter/qrynexporter/README.md @@ -20,13 +20,6 @@ - Default: `true` - Description: Enables client-side processing of trace data. This can improve performance but may increase client-side resource usage. -<<<<<<< HEAD -- `dsn` (required): Clickhouse's dsn. -- `clustered_clickhouse` (required): true if clickhouse cluster is used -- `client_side_trace_processing`: use improved traces ingestion algorythm for clickhouse clusters. -Data ingestion is sess performant but more evenly distributed -======= ->>>>>>> e35202d (refactor: improve the code structure and documentation of qrynexporter.) # Example: ## Simple Trace Data diff --git a/exporter/qrynexporter/config.go b/exporter/qrynexporter/config.go index 176c28c..d0e2477 100644 --- a/exporter/qrynexporter/config.go +++ b/exporter/qrynexporter/config.go @@ -30,10 +30,7 @@ type Config struct { configretry.BackOffConfig `mapstructure:"retry_on_failure"` exporterhelper.QueueSettings `mapstructure:"sending_queue"` -<<<<<<< HEAD -======= // ClientSideTraceProcessing is a boolean that indicates whether to process traces on the client side. ->>>>>>> e35202d (refactor: improve the code structure and documentation of qrynexporter.) ClientSideTraceProcessing bool `mapstructure:"client_side_trace_processing"` ClusteredClickhouse bool `mapstructure:"clustered_clickhouse"` diff --git a/exporter/qrynexporter/traces.go b/exporter/qrynexporter/traces.go index a63e27e..fd56330 100644 --- a/exporter/qrynexporter/traces.go +++ b/exporter/qrynexporter/traces.go @@ -67,19 +67,11 @@ func newTracesExporter(logger *zap.Logger, cfg *Config, set *exporter.Settings) return nil, err } exp := &tracesExporter{ -<<<<<<< HEAD - logger: logger, - meter: set.MeterProvider.Meter(typeStr), - db: db, - cluster: cfg.ClusteredClickhouse, - v2: cfg.ClusteredClickhouse && cfg.ClientSideTraceProcessing, -======= logger: logger, meter: set.MeterProvider.Meter(typeStr), db: db, cluster: cfg.ClusteredClickhouse, clientSide: cfg.ClientSideTraceProcessing, ->>>>>>> e35202d (refactor: improve the code structure and documentation of qrynexporter.) } if err := initMetrics(exp.meter); err != nil { exp.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error())) From 3efb32b5bd38edf0a3f9e6759143575fff6ebac0 Mon Sep 17 00:00:00 2001 From: Cluas Date: Mon, 21 Oct 2024 22:33:47 +0800 Subject: [PATCH 5/6] docs: fix --- exporter/qrynexporter/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/qrynexporter/README.md b/exporter/qrynexporter/README.md index a844bb0..f22c2c9 100644 --- a/exporter/qrynexporter/README.md +++ b/exporter/qrynexporter/README.md @@ -9,7 +9,7 @@ # Configuration options: - `dsn` (required): Data Source Name for Clickhouse. - - Example: `tcp://localhost:9000/?database=cloki` + - Example: `tcp://localhost:9000/qryn` - `clustered_clickhouse` (required): - Type: boolean From f774a149b0e5120719aad579a54af0273d89662d Mon Sep 17 00:00:00 2001 From: Cluas Date: Mon, 21 Oct 2024 22:37:37 +0800 Subject: [PATCH 6/6] feat: need cfg.ClusteredClickhouse --- exporter/qrynexporter/traces.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/qrynexporter/traces.go b/exporter/qrynexporter/traces.go index fd56330..dfe6d7e 100644 --- a/exporter/qrynexporter/traces.go +++ b/exporter/qrynexporter/traces.go @@ -71,7 +71,7 @@ func newTracesExporter(logger *zap.Logger, cfg *Config, set *exporter.Settings) meter: set.MeterProvider.Meter(typeStr), db: db, cluster: cfg.ClusteredClickhouse, - clientSide: cfg.ClientSideTraceProcessing, + clientSide: cfg.ClusteredClickhouse && cfg.ClientSideTraceProcessing, } if err := initMetrics(exp.meter); err != nil { exp.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error()))