Skip to content

Commit

Permalink
Merge branch 'main' into http-grpc-pr
Browse files Browse the repository at this point in the history
  • Loading branch information
MichelHollands authored Mar 15, 2024
2 parents ebbe299 + 4ce5fa8 commit fb81edf
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 47 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@

#### LogCLI

* [11852](https://github.com/grafana/loki/pull/11852) **MichelHollands**: feat: update logcli so it tries to load the latest version of the schemaconfig

#### Mixins

* [11087](https://github.com/grafana/loki/pull/11087) **JoaoBraveCoding**: Adds structured metadata panels for ingested data
Expand Down
101 changes: 63 additions & 38 deletions pkg/logcli/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package query

import (
"context"
stdErrors "errors"
"flag"
"fmt"
"io"
Expand All @@ -10,7 +11,6 @@ import (
"sync"
"time"

"github.com/grafana/dskit/multierror"
"github.com/grafana/dskit/user"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -395,6 +395,41 @@ func maxTime(t1, t2 time.Time) time.Time {
return t2
}

func getLatestConfig(client chunk.ObjectClient, orgID string) (*config.SchemaConfig, error) {
// Get the latest
iteration := 0
searchFor := fmt.Sprintf("%s-%s.yaml", orgID, schemaConfigFilename) // schemaconfig-tenant.yaml
var loadedSchema *config.SchemaConfig
for {
if iteration != 0 {
searchFor = fmt.Sprintf("%s-%s-%d.yaml", orgID, schemaConfigFilename, iteration) // tenant-schemaconfig-1.yaml
}
tempSchema, err := LoadSchemaUsingObjectClient(client, searchFor)
if err == errNotExists {
break
}
if err != nil {
return nil, err
}

loadedSchema = tempSchema
iteration++
}
if loadedSchema != nil {
return loadedSchema, nil
}

searchFor = fmt.Sprintf("%s.yaml", schemaConfigFilename) // schemaconfig.yaml for backwards compatibility
loadedSchema, err := LoadSchemaUsingObjectClient(client, searchFor)
if err == nil {
return loadedSchema, nil
}
if err != errNotExists {
return nil, err
}
return nil, errNotExists
}

// DoLocalQuery executes the query against the local store using a Loki configuration file.
func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string, useRemoteSchema bool) error {
var conf loki.Config
Expand All @@ -417,15 +452,10 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string
return err
}

objects := []string{
fmt.Sprintf("%s-%s.yaml", orgID, schemaConfigFilename), // schemaconfig-tenant.yaml
fmt.Sprintf("%s.yaml", schemaConfigFilename), // schemaconfig.yaml for backwards compatibility
}
loadedSchema, err := LoadSchemaUsingObjectClient(client, objects...)
loadedSchema, err := getLatestConfig(client, orgID)
if err != nil {
return err
}

conf.SchemaConfig = *loadedSchema
}

Expand Down Expand Up @@ -484,10 +514,6 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string
query = eng.Query(params)
}

if err != nil {
return err
}

// execute the query
ctx := user.InjectOrgID(context.Background(), orgID)
result, err := query.Exec(ctx)
Expand Down Expand Up @@ -521,41 +547,40 @@ func GetObjectClient(store string, conf loki.Config, cm storage.ClientMetrics) (
return oc, nil
}

var errNotExists = stdErrors.New("doesn't exist")

type schemaConfigSection struct {
config.SchemaConfig `yaml:"schema_config"`
}

// LoadSchemaUsingObjectClient returns the loaded schema from the first found object
func LoadSchemaUsingObjectClient(oc chunk.ObjectClient, names ...string) (*config.SchemaConfig, error) {
errs := multierror.New()
for _, name := range names {
schema, err := func(name string) (*config.SchemaConfig, error) {
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(1*time.Minute))
defer cancel()
rdr, _, err := oc.GetObject(ctx, name)
if err != nil {
return nil, errors.Wrapf(err, "failed to load schema object '%s'", name)
}
defer rdr.Close()
// LoadSchemaUsingObjectClient returns the loaded schema from the object with the given name
func LoadSchemaUsingObjectClient(oc chunk.ObjectClient, name string) (*config.SchemaConfig, error) {
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(1*time.Minute))
defer cancel()

decoder := yaml.NewDecoder(rdr)
decoder.SetStrict(true)
section := schemaConfigSection{}
err = decoder.Decode(&section)
if err != nil {
return nil, err
}
ok, err := oc.ObjectExists(ctx, name)
if !ok {
return nil, errNotExists
}
if err != nil {
return nil, err
}

return &section.SchemaConfig, nil
}(name)
rdr, _, err := oc.GetObject(ctx, name)
if err != nil {
return nil, errors.Wrapf(err, "failed to load schema object '%s'", name)
}
defer rdr.Close()

if err != nil {
errs = append(errs, err)
continue
}
return schema, nil
decoder := yaml.NewDecoder(rdr)
decoder.SetStrict(true)
section := schemaConfigSection{}
err = decoder.Decode(&section)
if err != nil {
return nil, err
}
return nil, errs.Err()

return &section.SchemaConfig, nil
}

// SetInstant makes the Query an instant type
Expand Down
132 changes: 125 additions & 7 deletions pkg/logcli/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package query
import (
"bytes"
"context"
"errors"
"fmt"
"os"
"path/filepath"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/loki"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/chunk/client"
"github.com/grafana/loki/pkg/storage/chunk/client/local"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/util/marshal"
Expand Down Expand Up @@ -406,7 +408,6 @@ func Test_batch(t *testing.T) {
type testQueryClient struct {
engine *logql.Engine
queryRangeCalls int
orgID string
}

func newTestQueryClient(testStreams ...logproto.Stream) *testQueryClient {
Expand Down Expand Up @@ -484,6 +485,17 @@ func (t *testQueryClient) GetVolumeRange(_ *volume.Query) (*loghttp.QueryRespons
panic("not implemented")
}

var legacySchemaConfigContents = `schema_config:
configs:
- from: 2020-05-15
store: boltdb-shipper
object_store: gcs
schema: v10
index:
prefix: index_
period: 168h
`

var schemaConfigContents = `schema_config:
configs:
- from: 2020-05-15
Expand All @@ -501,10 +513,35 @@ var schemaConfigContents = `schema_config:
prefix: index_
period: 24h
`
var schemaConfigContents2 = `schema_config:
configs:
- from: 2020-05-15
store: boltdb-shipper
object_store: gcs
schema: v10
index:
prefix: index_
period: 168h
- from: 2020-07-31
store: boltdb-shipper
object_store: gcs
schema: v11
index:
prefix: index_
period: 24h
- from: 2020-09-30
store: boltdb-shipper
object_store: gcs
schema: v12
index:
prefix: index_
period: 24h
`
var cm = storage.NewClientMetrics()

func TestLoadFromURL(t *testing.T) {
func setupTestEnv(t *testing.T) (string, client.ObjectClient) {
t.Helper()
tmpDir := t.TempDir()

conf := loki.Config{
StorageConfig: storage.Config{
FSConfig: local.FSConfig{
Expand All @@ -513,11 +550,19 @@ func TestLoadFromURL(t *testing.T) {
},
}

cm := storage.NewClientMetrics()
client, err := GetObjectClient(config.StorageTypeFileSystem, conf, cm)
require.NoError(t, err)
require.NotNil(t, client)

_, err = getLatestConfig(client, "456")
require.Error(t, err)
require.True(t, errors.Is(err, errNotExists))

return tmpDir, client
}

func TestLoadFromURL(t *testing.T) {
tmpDir, client := setupTestEnv(t)
filename := "schemaconfig.yaml"

// Missing schemaconfig.yaml file should error
Expand All @@ -537,12 +582,85 @@ func TestLoadFromURL(t *testing.T) {

require.NoError(t, err)
require.NotNil(t, schemaConfig)
}

// Load multiple schemaconfig files
schemaConfig, err = LoadSchemaUsingObjectClient(client, "foo.yaml", filename, "bar.yaml")
func TestMultipleConfigs(t *testing.T) {
tmpDir, client := setupTestEnv(t)

err := os.WriteFile(
filepath.Join(tmpDir, "456-schemaconfig.yaml"),
[]byte(schemaConfigContents),
0666,
)
require.NoError(t, err)
require.NotNil(t, schemaConfig)

config, err := getLatestConfig(client, "456")
require.NoError(t, err)
require.NotNil(t, config)
require.Len(t, config.Configs, 2)

err = os.WriteFile(
filepath.Join(tmpDir, "456-schemaconfig-1.yaml"),
[]byte(schemaConfigContents2),
0666,
)
require.NoError(t, err)

config, err = getLatestConfig(client, "456")
require.NoError(t, err)
require.NotNil(t, config)
require.Len(t, config.Configs, 3)
}

func TestMultipleConfigsIncludingLegacy(t *testing.T) {
tmpDir, client := setupTestEnv(t)

err := os.WriteFile(
filepath.Join(tmpDir, "schemaconfig.yaml"),
[]byte(legacySchemaConfigContents),
0666,
)
require.NoError(t, err)

err = os.WriteFile(
filepath.Join(tmpDir, "456-schemaconfig.yaml"),
[]byte(schemaConfigContents),
0666,
)
require.NoError(t, err)

config, err := getLatestConfig(client, "456")
require.NoError(t, err)
require.NotNil(t, config)
require.Len(t, config.Configs, 2)

err = os.WriteFile(
filepath.Join(tmpDir, "456-schemaconfig-1.yaml"),
[]byte(schemaConfigContents2),
0666,
)
require.NoError(t, err)

config, err = getLatestConfig(client, "456")
require.NoError(t, err)
require.NotNil(t, config)
require.Len(t, config.Configs, 3)
}

func TestLegacyConfigOnly(t *testing.T) {
tmpDir, client := setupTestEnv(t)

err := os.WriteFile(
filepath.Join(tmpDir, "schemaconfig.yaml"),
[]byte(legacySchemaConfigContents),
0666,
)
require.NoError(t, err)

config, err := getLatestConfig(client, "456")
require.NoError(t, err)
require.NotNil(t, config)
require.Len(t, config.Configs, 1)
}

func TestDurationCeilDiv(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/chunk/client/gcp/gcs_object_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func fakeHTTPRespondingServer(t *testing.T, code int) *httptest.Server {
}

func fakeSleepingServer(t *testing.T, responseSleep, connectSleep time.Duration, closeOnNew, closeOnActive bool) *httptest.Server {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// sleep on response to mimic server overload
time.Sleep(responseSleep)
}))
Expand All @@ -264,6 +264,6 @@ func fakeSleepingServer(t *testing.T, responseSleep, connectSleep time.Duration,
}
}
t.Cleanup(server.Close)

server.Start()
return server
}

0 comments on commit fb81edf

Please sign in to comment.