Skip to content

Commit

Permalink
Merge pull request #423 from go-faster/fix/save-scope-attributes
Browse files Browse the repository at this point in the history
fix(chstorage): save metrics scope attributes
  • Loading branch information
tdakkota authored Jun 10, 2024
2 parents 7ab8c70 + e3116c5 commit ae7349d
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 55 deletions.
99 changes: 60 additions & 39 deletions internal/chstorage/columns_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type pointColumns struct {

flags proto.ColUInt8
attributes *Attributes
scope *Attributes
resource *Attributes
}

Expand All @@ -26,21 +27,27 @@ func newPointColumns() *pointColumns {
timestamp: new(proto.ColDateTime64).WithPrecision(proto.PrecisionNano),

attributes: NewAttributes(colAttrs),
scope: NewAttributes(colScope),
resource: NewAttributes(colResource),
}
}

func (c *pointColumns) Columns() Columns {
return MergeColumns(Columns{
{Name: "name", Data: c.name},
{Name: "name_normalized", Data: c.nameNormalized},
{Name: "timestamp", Data: c.timestamp},

{Name: "mapping", Data: proto.Wrap(&c.mapping, metricMappingDDL)},
{Name: "value", Data: &c.value},

{Name: "flags", Data: &c.flags},
}, c.attributes.Columns(), c.resource.Columns())
return MergeColumns(
Columns{
{Name: "name", Data: c.name},
{Name: "name_normalized", Data: c.nameNormalized},
{Name: "timestamp", Data: c.timestamp},

{Name: "mapping", Data: proto.Wrap(&c.mapping, metricMappingDDL)},
{Name: "value", Data: &c.value},

{Name: "flags", Data: &c.flags},
},
c.attributes.Columns(),
c.scope.Columns(),
c.resource.Columns(),
)
}

func (c *pointColumns) Input() proto.Input { return c.Columns().Input() }
Expand All @@ -65,6 +72,7 @@ type expHistogramColumns struct {

flags proto.ColUInt8
attributes *Attributes
scope *Attributes
resource *Attributes
}

Expand All @@ -81,29 +89,35 @@ func newExpHistogramColumns() *expHistogramColumns {
negativeBucketCounts: new(proto.ColUInt64).Array(),

attributes: NewAttributes(colAttrs),
scope: NewAttributes(colScope),
resource: NewAttributes(colResource),
}
}

func (c *expHistogramColumns) Columns() Columns {
return MergeColumns(Columns{
{Name: "name", Data: c.name},
{Name: "name_normalized", Data: c.nameNormalized},
{Name: "timestamp", Data: c.timestamp},

{Name: "exp_histogram_count", Data: &c.count},
{Name: "exp_histogram_sum", Data: c.sum},
{Name: "exp_histogram_min", Data: c.min},
{Name: "exp_histogram_max", Data: c.max},
{Name: "exp_histogram_scale", Data: &c.scale},
{Name: "exp_histogram_zerocount", Data: &c.zerocount},
{Name: "exp_histogram_positive_offset", Data: &c.positiveOffset},
{Name: "exp_histogram_positive_bucket_counts", Data: c.positiveBucketCounts},
{Name: "exp_histogram_negative_offset", Data: &c.negativeOffset},
{Name: "exp_histogram_negative_bucket_counts", Data: c.negativeBucketCounts},

{Name: "flags", Data: &c.flags},
}, c.attributes.Columns(), c.resource.Columns())
return MergeColumns(
Columns{
{Name: "name", Data: c.name},
{Name: "name_normalized", Data: c.nameNormalized},
{Name: "timestamp", Data: c.timestamp},

{Name: "exp_histogram_count", Data: &c.count},
{Name: "exp_histogram_sum", Data: c.sum},
{Name: "exp_histogram_min", Data: c.min},
{Name: "exp_histogram_max", Data: c.max},
{Name: "exp_histogram_scale", Data: &c.scale},
{Name: "exp_histogram_zerocount", Data: &c.zerocount},
{Name: "exp_histogram_positive_offset", Data: &c.positiveOffset},
{Name: "exp_histogram_positive_bucket_counts", Data: c.positiveBucketCounts},
{Name: "exp_histogram_negative_offset", Data: &c.negativeOffset},
{Name: "exp_histogram_negative_bucket_counts", Data: c.negativeBucketCounts},

{Name: "flags", Data: &c.flags},
},
c.attributes.Columns(),
c.scope.Columns(),
c.resource.Columns(),
)
}

func (c *expHistogramColumns) Input() proto.Input { return c.Columns().Input() }
Expand Down Expand Up @@ -149,6 +163,7 @@ type exemplarColumns struct {
traceID proto.ColFixedStr16

attributes *Attributes
scope *Attributes
resource *Attributes
}

Expand All @@ -159,22 +174,28 @@ func newExemplarColumns() *exemplarColumns {
timestamp: new(proto.ColDateTime64).WithPrecision(proto.PrecisionNano),
exemplarTimestamp: new(proto.ColDateTime64).WithPrecision(proto.PrecisionNano),
attributes: NewAttributes(colAttrs),
scope: NewAttributes(colScope),
resource: NewAttributes(colResource),
}
}

func (c *exemplarColumns) Columns() Columns {
return MergeColumns(Columns{
{Name: "name", Data: c.name},
{Name: "name_normalized", Data: c.nameNormalized},
{Name: "timestamp", Data: c.timestamp},

{Name: "filtered_attributes", Data: &c.filteredAttributes},
{Name: "exemplar_timestamp", Data: c.exemplarTimestamp},
{Name: "value", Data: &c.value},
{Name: "span_id", Data: &c.spanID},
{Name: "trace_id", Data: &c.traceID},
}, c.attributes.Columns(), c.resource.Columns())
return MergeColumns(
Columns{
{Name: "name", Data: c.name},
{Name: "name_normalized", Data: c.nameNormalized},
{Name: "timestamp", Data: c.timestamp},

{Name: "filtered_attributes", Data: &c.filteredAttributes},
{Name: "exemplar_timestamp", Data: c.exemplarTimestamp},
{Name: "value", Data: &c.value},
{Name: "span_id", Data: &c.spanID},
{Name: "trace_id", Data: &c.traceID},
},
c.attributes.Columns(),
c.scope.Columns(),
c.resource.Columns(),
)
}

func (c *exemplarColumns) Input() proto.Input { return c.Columns().Input() }
Expand Down
50 changes: 34 additions & 16 deletions internal/chstorage/inserter_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (b *metricsBatch) Insert(ctx context.Context, tables Tables, client Clickho
return nil
}

func (b *metricsBatch) addPoints(name string, res lazyAttributes, slice pmetric.NumberDataPointSlice) error {
func (b *metricsBatch) addPoints(name string, res, scope lazyAttributes, slice pmetric.NumberDataPointSlice) error {
c := b.points

for i := 0; i < slice.Len(); i++ {
Expand Down Expand Up @@ -171,6 +171,7 @@ func (b *metricsBatch) addPoints(name string, res lazyAttributes, slice pmetric.
Name: name,
Timestamp: ts,
Attributes: attrs,
Scope: scope,
Resource: res,
},
point.Exemplars(),
Expand All @@ -184,12 +185,13 @@ func (b *metricsBatch) addPoints(name string, res lazyAttributes, slice pmetric.
c.value.Append(val)
c.flags.Append(uint8(flags))
c.attributes.Append(attrs.Attributes())
c.scope.Append(scope.Attributes())
c.resource.Append(res.Attributes())
}
return nil
}

func (b *metricsBatch) addHistogramPoints(name string, res lazyAttributes, slice pmetric.HistogramDataPointSlice) error {
func (b *metricsBatch) addHistogramPoints(name string, res, scope lazyAttributes, slice pmetric.HistogramDataPointSlice) error {
for i := 0; i < slice.Len(); i++ {
point := slice.At(i)
ts := point.Timestamp().AsTime()
Expand Down Expand Up @@ -221,6 +223,7 @@ func (b *metricsBatch) addHistogramPoints(name string, res lazyAttributes, slice
Timestamp: ts,
Flags: flags,
Attributes: attrs,
Scope: scope,
Resource: res,
}
if sum.Set {
Expand Down Expand Up @@ -358,7 +361,7 @@ type histogramBucketBounds struct {
bucketKey [2]string
}

func (b *metricsBatch) addExpHistogramPoints(name string, res lazyAttributes, slice pmetric.ExponentialHistogramDataPointSlice) error {
func (b *metricsBatch) addExpHistogramPoints(name string, res, scope lazyAttributes, slice pmetric.ExponentialHistogramDataPointSlice) error {
var (
c = b.expHistograms
mapBuckets = func(b pmetric.ExponentialHistogramDataPointBuckets) (offset int32, counts []uint64) {
Expand Down Expand Up @@ -391,6 +394,7 @@ func (b *metricsBatch) addExpHistogramPoints(name string, res lazyAttributes, sl
Name: name,
Timestamp: ts,
Attributes: attrs,
Scope: scope,
Resource: res,
},
point.Exemplars(),
Expand All @@ -412,12 +416,13 @@ func (b *metricsBatch) addExpHistogramPoints(name string, res lazyAttributes, sl
c.negativeBucketCounts.Append(negativeBucketCounts)
c.flags.Append(uint8(flags))
c.attributes.Append(attrs.Attributes())
c.scope.Append(scope.Attributes())
c.resource.Append(res.Attributes())
}
return nil
}

func (b *metricsBatch) addSummaryPoints(name string, res lazyAttributes, slice pmetric.SummaryDataPointSlice) error {
func (b *metricsBatch) addSummaryPoints(name string, res, scope lazyAttributes, slice pmetric.SummaryDataPointSlice) error {
for i := 0; i < slice.Len(); i++ {
var (
point = slice.At(i)
Expand Down Expand Up @@ -446,6 +451,7 @@ func (b *metricsBatch) addSummaryPoints(name string, res lazyAttributes, slice p
Timestamp: ts,
Flags: flags,
Attributes: attrs,
Scope: scope,
Resource: res,
}
b.addMappedSample(ms, name+"_count", summaryCount, float64(count))
Expand All @@ -469,6 +475,7 @@ type mappedSeries struct {
Timestamp time.Time
Flags pmetric.DataPointFlags
Attributes lazyAttributes
Scope lazyAttributes
Resource lazyAttributes
}

Expand All @@ -488,13 +495,15 @@ func (b *metricsBatch) addMappedSample(
c.value.Append(val)
c.flags.Append(uint8(series.Flags))
c.attributes.Append(series.Attributes.Attributes(bucketKey...))
c.resource.Append(series.Resource.Attributes(bucketKey...))
c.scope.Append(series.Scope.Attributes())
c.resource.Append(series.Resource.Attributes())
}

type exemplarSeries struct {
Name string
Timestamp time.Time
Attributes lazyAttributes
Scope lazyAttributes
Resource lazyAttributes
}

Expand Down Expand Up @@ -535,6 +544,7 @@ func (b *metricsBatch) addExemplar(p exemplarSeries, e pmetric.Exemplar, bucketK
c.traceID.Append(e.TraceID())

c.attributes.Append(p.Attributes.Attributes(bucketKey...))
c.scope.Append(p.Scope.Attributes())
c.resource.Append(p.Resource.Attributes())
return nil
}
Expand All @@ -558,45 +568,53 @@ func (b *metricsBatch) addLabels(attrs lazyAttributes) {
func (b *metricsBatch) mapMetrics(metrics pmetric.Metrics) error {
resMetrics := metrics.ResourceMetrics()
for i := 0; i < resMetrics.Len(); i++ {
resMetric := resMetrics.At(i)
resAttrs := lazyAttributes{
orig: resMetric.Resource().Attributes(),
}
var (
resMetric = resMetrics.At(i)
resAttrs = lazyAttributes{
orig: resMetric.Resource().Attributes(),
}
)
b.addLabels(resAttrs)

scopeMetrics := resMetric.ScopeMetrics()
for i := 0; i < scopeMetrics.Len(); i++ {
scopeLog := scopeMetrics.At(i)
var (
scopeMetric = scopeMetrics.At(i)
scopeAttrs = lazyAttributes{
orig: scopeMetric.Scope().Attributes(),
}
)
b.addLabels(scopeAttrs)

records := scopeLog.Metrics()
records := scopeMetric.Metrics()
for i := 0; i < records.Len(); i++ {
record := records.At(i)
name := record.Name()

switch typ := record.Type(); typ {
case pmetric.MetricTypeGauge:
gauge := record.Gauge()
if err := b.addPoints(name, resAttrs, gauge.DataPoints()); err != nil {
if err := b.addPoints(name, resAttrs, scopeAttrs, gauge.DataPoints()); err != nil {
return err
}
case pmetric.MetricTypeSum:
sum := record.Sum()
if err := b.addPoints(name, resAttrs, sum.DataPoints()); err != nil {
if err := b.addPoints(name, resAttrs, scopeAttrs, sum.DataPoints()); err != nil {
return err
}
case pmetric.MetricTypeHistogram:
hist := record.Histogram()
if err := b.addHistogramPoints(name, resAttrs, hist.DataPoints()); err != nil {
if err := b.addHistogramPoints(name, resAttrs, scopeAttrs, hist.DataPoints()); err != nil {
return err
}
case pmetric.MetricTypeExponentialHistogram:
hist := record.ExponentialHistogram()
if err := b.addExpHistogramPoints(name, resAttrs, hist.DataPoints()); err != nil {
if err := b.addExpHistogramPoints(name, resAttrs, scopeAttrs, hist.DataPoints()); err != nil {
return err
}
case pmetric.MetricTypeSummary:
summary := record.Summary()
if err := b.addSummaryPoints(name, resAttrs, summary.DataPoints()); err != nil {
if err := b.addSummaryPoints(name, resAttrs, scopeAttrs, summary.DataPoints()); err != nil {
return err
}
default:
Expand Down
1 change: 1 addition & 0 deletions internal/chstorage/querier_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ func (p *promQuerier) buildQuery(
}
selectors = []chsql.Expr{
attrSelector(colAttrs, name),
attrSelector(colScope, name),
attrSelector(colResource, name),
}
}
Expand Down

0 comments on commit ae7349d

Please sign in to comment.