Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingester: Validate completed blocks #4256

Merged
merged 5 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
* [ENHANCEMENT] Changed log level from INFO to DEBUG for the TempoDB Find operation using traceId to reduce excessive/unwanted logs in log search. [#4179](https://github.com/grafana/tempo/pull/4179) (@Aki0x137)
* [ENHANCEMENT] Pushdown collection of results from generators in the querier [#4119](https://github.com/grafana/tempo/pull/4119) (@electron0zero)
* [ENHANCEMENT] Send semver version in api/stattus/buildinfo for cloud deployments [#4110](https://github.com/grafana/tempo/pull/4110) [@Aki0x137]
* [ENHANCEMENT] Add completed block validation on startup.[#4256](https://github.com/grafana/tempo/pull/4256) (@joe-elliott)
* [ENHANCEMENT] Speedup DistinctString and ScopedDistinctString collectors [#4109](https://github.com/grafana/tempo/pull/4109) (@electron0zero)
* [ENHANCEMENT] Speedup collection of results from ingesters in the querier [#4100](https://github.com/grafana/tempo/pull/4100) (@electron0zero)
* [ENHANCEMENT] Speedup DistinctValue collector and exit early for ingesters [#4104](https://github.com/grafana/tempo/pull/4104) (@electron0zero)
Expand Down
2 changes: 2 additions & 0 deletions modules/generator/processor/localblocks/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,3 +361,5 @@ func (m *mockBlock) FetchTagNames(context.Context, traceql.FetchTagsRequest, tra
}

func (m *mockBlock) BlockMeta() *backend.BlockMeta { return m.meta }

func (m *mockBlock) Validate(context.Context) error { return nil }
92 changes: 92 additions & 0 deletions modules/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/google/uuid"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv/consul"
"github.com/grafana/dskit/ring"
Expand All @@ -32,6 +33,7 @@ import (
"github.com/grafana/tempo/tempodb/backend/local"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/grafana/tempo/tempodb/encoding/vparquet4"
"github.com/grafana/tempo/tempodb/wal"
)

Expand Down Expand Up @@ -259,6 +261,96 @@ func TestSearchWAL(t *testing.T) {
require.Equal(t, uint32(1), results.Metrics.InspectedTraces)
}

func TestRediscoverLocalBlocks(t *testing.T) {
tmpDir := t.TempDir()

ctx := user.InjectOrgID(context.Background(), "test")
ingester, traces, traceIDs := defaultIngester(t, tmpDir)

// force cut all traces
for _, instance := range ingester.instances {
err := instance.CutCompleteTraces(0, true)
require.NoError(t, err, "unexpected error cutting traces")
}

// force complete all blocks
for _, instance := range ingester.instances {
blockID, err := instance.CutBlockIfReady(0, 0, true)
require.NoError(t, err)

err = instance.CompleteBlock(context.Background(), blockID)
require.NoError(t, err)

err = instance.ClearCompletingBlock(blockID)
require.NoError(t, err)
}

// create new ingester. this should rediscover local blocks
ingester, _, _ = defaultIngester(t, tmpDir)

// should be able to find old traces that were replayed
for i, traceID := range traceIDs {
foundTrace, err := ingester.FindTraceByID(ctx, &tempopb.TraceByIDRequest{
TraceID: traceID,
})
require.NoError(t, err, "unexpected error querying")
require.NotNil(t, foundTrace.Trace)
trace.SortTrace(foundTrace.Trace)
equal := proto.Equal(traces[i], foundTrace.Trace)
require.True(t, equal)
}
}

func TestRediscoverDropsInvalidBlocks(t *testing.T) {
tmpDir := t.TempDir()

ctx := user.InjectOrgID(context.Background(), "test")
ingester, _, _ := defaultIngester(t, tmpDir)

// force cut all traces
for _, instance := range ingester.instances {
err := instance.CutCompleteTraces(0, true)
require.NoError(t, err, "unexpected error cutting traces")
}

// force complete all blocks
for _, instance := range ingester.instances {
blockID, err := instance.CutBlockIfReady(0, 0, true)
require.NoError(t, err)

err = instance.CompleteBlock(context.Background(), blockID)
require.NoError(t, err)

err = instance.ClearCompletingBlock(blockID)
require.NoError(t, err)
}

// create new ingester. this should rediscover local blocks. there should be 1 block
ingester, _, _ = defaultIngester(t, tmpDir)

instance, ok := ingester.instances["test"]
require.True(t, ok)
require.Len(t, instance.completeBlocks, 1)

// now mangle a complete block
instance, ok = ingester.instances["test"]
require.True(t, ok)
require.Len(t, instance.completeBlocks, 1)

// this cheats by reaching into the internals of the block and overwriting the parquet file directly. if this test starts failing
// it could be b/c the block internals changed and this no longer breaks a block
block := instance.completeBlocks[0]
err := block.writer.Write(ctx, vparquet4.DataFileName, uuid.UUID(block.BlockMeta().BlockID), "test", []byte("mangled"), nil)
require.NoError(t, err)

// create new ingester. this should rediscover local blocks. there should be 0 blocks
ingester, _, _ = defaultIngester(t, tmpDir)

instance, ok = ingester.instances["test"]
require.True(t, ok)
require.Len(t, instance.completeBlocks, 0)
}

// TODO - This test is flaky and commented out until it's fixed
// TestWalReplayDeletesLocalBlocks simulates the condition where an ingester restarts after a wal is completed
// to the local disk, but before the wal is deleted. On startup both blocks exist, and the ingester now errs
Expand Down
16 changes: 15 additions & 1 deletion modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,6 @@ func (i *instance) rediscoverLocalBlocks(ctx context.Context) ([]*LocalBlock, er
var rediscoveredBlocks []*LocalBlock

for _, id := range ids {

// Ignore blocks that have a matching wal. The wal will be replayed and the local block recreated.
// NOTE - Wal replay must be done beforehand.
if hasWal(id) {
Expand Down Expand Up @@ -629,6 +628,21 @@ func (i *instance) rediscoverLocalBlocks(ctx context.Context) ([]*LocalBlock, er
return nil, err
}

// validate the block before adding it to the list. if we drop a block here and its not in the wal this is data loss, but there is no way to recover. this is likely due to disk
// level corruption
err = b.Validate(ctx)
if err != nil && !errors.Is(err, common.ErrUnsupported) {
level.Error(log.Logger).Log("msg", "local block failed validation, dropping", "tenantID", i.instanceID, "block", id.String(), "error", err)
metricReplayErrorsTotal.WithLabelValues(i.instanceID).Inc()

err = i.local.ClearBlock(id, i.instanceID)
if err != nil {
return nil, fmt.Errorf("deleting invalid local block tenant %v block %v: %w", i.instanceID, id.String(), err)
}

continue
}

ib := NewLocalBlock(ctx, b, i.local)
rediscoveredBlocks = append(rediscoveredBlocks, ib)

Expand Down
1 change: 1 addition & 0 deletions tempodb/encoding/common/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ type BackendBlock interface {
Searcher

BlockMeta() *backend.BlockMeta
Validate(ctx context.Context) error
}

type WALBlock interface {
Expand Down
4 changes: 4 additions & 0 deletions tempodb/encoding/v2/backend_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,7 @@ func (b *BackendBlock) FetchTagValues(context.Context, traceql.FetchTagValuesReq
func (b *BackendBlock) FetchTagNames(context.Context, traceql.FetchTagsRequest, traceql.FetchTagsCallback, common.MetricsCallback, common.SearchOptions) error {
return common.ErrUnsupported
}

func (b *BackendBlock) Validate(_ context.Context) error {
return common.ErrUnsupported
}
4 changes: 4 additions & 0 deletions tempodb/encoding/v2/wal_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,10 @@ func (a *walBlock) FetchTagNames(context.Context, traceql.FetchTagsRequest, trac
return common.ErrUnsupported
}

func (a *walBlock) Validate(context.Context) error {
return common.ErrUnsupported
}

func (a *walBlock) fullFilename() string {
filename := a.fullFilenameSeparator("+")
_, e1 := os.Stat(filename)
Expand Down
4 changes: 4 additions & 0 deletions tempodb/encoding/vparquet2/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,7 @@ func (b *backendBlock) FetchTagValues(context.Context, traceql.FetchTagValuesReq
func (b *backendBlock) FetchTagNames(context.Context, traceql.FetchTagsRequest, traceql.FetchTagsCallback, common.MetricsCallback, common.SearchOptions) error {
return common.ErrUnsupported
}

func (b *backendBlock) Validate(context.Context) error {
return common.ErrUnsupported
}
4 changes: 4 additions & 0 deletions tempodb/encoding/vparquet2/wal_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,10 @@ func (b *walBlock) AppendTrace(id common.ID, trace *tempopb.Trace, start, end ui
return nil
}

func (b *walBlock) Validate(context.Context) error {
return common.ErrUnsupported
}

func (b *walBlock) adjustTimeRangeForSlack(start, end uint32) (uint32, uint32) {
now := time.Now()
startOfRange := uint32(now.Add(-b.ingestionSlack).Unix())
Expand Down
5 changes: 5 additions & 0 deletions tempodb/encoding/vparquet3/block.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package vparquet3

import (
"context"
"sync"

"github.com/grafana/tempo/tempodb/backend"
Expand Down Expand Up @@ -33,3 +34,7 @@ func newBackendBlock(meta *backend.BlockMeta, r backend.Reader) *backendBlock {
func (b *backendBlock) BlockMeta() *backend.BlockMeta {
return b.meta
}

func (b *backendBlock) Validate(context.Context) error {
return common.ErrUnsupported
}
4 changes: 4 additions & 0 deletions tempodb/encoding/vparquet3/wal_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,10 @@ func (b *walBlock) AppendTrace(id common.ID, trace *tempopb.Trace, start, end ui
return nil
}

func (b *walBlock) Validate(context.Context) error {
return common.ErrUnsupported
}

func (b *walBlock) adjustTimeRangeForSlack(start, end uint32) (uint32, uint32) {
now := time.Now()
startOfRange := uint32(now.Add(-b.ingestionSlack).Unix())
Expand Down
42 changes: 42 additions & 0 deletions tempodb/encoding/vparquet4/block.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package vparquet4

import (
"context"
"encoding/binary"
"errors"
"fmt"
"sync"

"github.com/google/uuid"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding/common"
"go.opentelemetry.io/otel"
Expand Down Expand Up @@ -33,3 +38,40 @@ func newBackendBlock(meta *backend.BlockMeta, r backend.Reader) *backendBlock {
func (b *backendBlock) BlockMeta() *backend.BlockMeta {
return b.meta
}

// Validate will do a basic sanity check of the state of the parquet file. This can be extended to do more checks in the future.
// This method should lean towards being cost effective over complete.
func (b *backendBlock) Validate(ctx context.Context) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would not be simpler trying to open the file directly with parquet-go:

	o := []parquet.FileOption{
		parquet.SkipBloomFilters(true),
		parquet.SkipPageIndex(true),
		parquet.FileSchema(parquetSchema),
		parquet.FileReadMode(parquet.ReadModeAsync),
	}

	reader := NewBackendReaderAt(ctx, b.r, DataFileName, b.meta)
	_, err := parquet.OpenFile(reader, int64(b.meta.Size_), o...)
	if err != nil {
		return fmt.Errorf("failed to read parquet fike: %w", err)
	}

OpenFile seems to do essentially the same plus some other validations. Is this difference noticeable enough? Since this is at startup time maybe it isn't.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OpenFile also unmarshals the thrift footer which is more costly. I was trying to avoid paying those allocs on startup.

if b.meta == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to reassemble this information from data on disk and still avoid paying the allocs on startup like you mention above?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

possibly? what do you mean by "this information"?

return errors.New("block meta is nil")
}

// read last 8 bytes of the file to confirm its at least complete. the last 4 should be ascii "PAR1"
// and the 4 bytes before that should be the length of the footer
buff := make([]byte, 8)
err := b.r.ReadRange(ctx, DataFileName, uuid.UUID(b.meta.BlockID), b.meta.TenantID, b.meta.Size_-8, buff, nil)
if err != nil {
return fmt.Errorf("failed to read parquet magic footer: %w", err)
}

if string(buff[4:]) != "PAR1" {
return fmt.Errorf("invalid parquet magic footer: %x", buff[4:])
}

footerSize := int64(binary.LittleEndian.Uint32(buff[:4]))
if footerSize != int64(b.meta.FooterSize) {
return fmt.Errorf("unexpected parquet footer size: %d", footerSize)
}

// read the first byte from all blooms to confirm they exist
buff = make([]byte, 1)
for i := 0; i < int(b.meta.BloomShardCount); i++ {
bloomName := common.BloomName(i)
err = b.r.ReadRange(ctx, bloomName, uuid.UUID(b.meta.BlockID), b.meta.TenantID, 0, buff, nil)
if err != nil {
return fmt.Errorf("failed to read first byte of bloom(%d): %w", i, err)
}
}

return nil
}
81 changes: 81 additions & 0 deletions tempodb/encoding/vparquet4/block_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package vparquet4

import (
"context"
"testing"
"time"

"github.com/google/uuid"
"github.com/grafana/tempo/pkg/util/test"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/backend/local"
"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/stretchr/testify/require"
)

func TestValidateFailsOnCorruptParquetFile(t *testing.T) {
ctx := context.Background()
block, w := validBlock(t)
meta := block.meta

err := block.Validate(ctx)
require.NoError(t, err)

// Corrupt the file
err = w.Write(ctx, DataFileName, uuid.UUID(meta.BlockID), meta.TenantID, []byte{0, 0, 0, 0, 0, 0, 0, 0}, nil)
require.NoError(t, err)

err = block.Validate(ctx)
require.Error(t, err)
}

func TestValidateFailsOnMissingBloom(t *testing.T) {
ctx := context.Background()
block, w := validBlock(t)
meta := block.meta

err := block.Validate(ctx)
require.NoError(t, err)

// remove a bloom
err = w.Delete(ctx, common.BloomName(0), backend.KeyPathForBlock(uuid.UUID(meta.BlockID), meta.TenantID))
require.NoError(t, err)

err = block.Validate(ctx)
require.Error(t, err)
}

func validBlock(t *testing.T) (*backendBlock, backend.Writer) {
t.Helper()

ctx := context.Background()

rawR, rawW, _, err := local.New(&local.Config{
Path: t.TempDir(),
})
require.NoError(t, err)

r := backend.NewReader(rawR)
w := backend.NewWriter(rawW)

iter := newTestIterator()

iter.Add(test.MakeTrace(10, nil), 100, 401)
iter.Add(test.MakeTrace(10, nil), 101, 402)
iter.Add(test.MakeTrace(10, nil), 102, 403)

cfg := &common.BlockConfig{
BloomFP: 0.01,
BloomShardSizeBytes: 100 * 1024,
}

meta := backend.NewBlockMeta("fake", uuid.New(), VersionString, backend.EncNone, "")
meta.TotalObjects = 1
meta.StartTime = time.Unix(300, 0)
meta.EndTime = time.Unix(305, 0)

outMeta, err := CreateBlock(ctx, cfg, meta, iter, r, w)
require.NoError(t, err)

return newBackendBlock(outMeta, r), w
}
5 changes: 5 additions & 0 deletions tempodb/encoding/vparquet4/wal_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,11 @@ func (b *walBlock) AppendTrace(id common.ID, trace *tempopb.Trace, start, end ui
return nil
}

// TODO: potentially add validation to wal blocks and use in the wal replay code in the ingester.
func (b *walBlock) Validate(context.Context) error {
return common.ErrUnsupported
}

// It controls the block start/end date as a sliding window.
func (b *walBlock) adjustTimeRangeForSlack(start, end uint32) (uint32, uint32) {
now := time.Now()
Expand Down