Skip to content

Commit

Permalink
Cut patch release v0.35.1 (#7394)
Browse files Browse the repository at this point in the history
* compact: recover from panics (#7318)

For #6775, it would be useful
to know the exact block IDs to aid debugging.

Signed-off-by: Giedrius Statkevičius <[email protected]>

* Sidecar: wait for prometheus on startup (#7323)

Signed-off-by: Michael Hoffmann <[email protected]>

* Receive: fix serverAsClient.Series goroutines leak (#6948)

* fix serverAsClient goroutines leak

Signed-off-by: Thibault Mange <[email protected]>

* fix lint

Signed-off-by: Thibault Mange <[email protected]>

* update changelog

Signed-off-by: Thibault Mange <[email protected]>

* delete invalid comment

Signed-off-by: Thibault Mange <[email protected]>

* remove temp dev test

Signed-off-by: Thibault Mange <[email protected]>

* remove timer channel drain

Signed-off-by: Thibault Mange <[email protected]>

---------

Signed-off-by: Thibault Mange <[email protected]>

* Receive: fix stats (#7373)

If we account stats for remote write and local writes we will count them
twice since the remote write will be counted locally again by the remote
receiver instance.

Signed-off-by: Michael Hoffmann <[email protected]>

* *: Ensure objstore flag values are masked & disable debug/pprof/cmdline (#7382)

* *: Ensure objstore flag values are masked & disable debug/pprof/cmdline

Signed-off-by: Saswata Mukherjee <[email protected]>

* small fix

Signed-off-by: Saswata Mukherjee <[email protected]>

---------

Signed-off-by: Saswata Mukherjee <[email protected]>

* Query: dont pass query hints to avoid triggering pushdown (#7392)

If we have a new querier it will create query hints even without the
pushdown feature being present anymore. Old sidecars will then trigger
query pushdown which leads to broken max,min,max_over_time and
min_over_time.

Signed-off-by: Michael Hoffmann <[email protected]>

* Cut patch release v0.35.1

Signed-off-by: Saswata Mukherjee <[email protected]>

---------

Signed-off-by: Giedrius Statkevičius <[email protected]>
Signed-off-by: Michael Hoffmann <[email protected]>
Signed-off-by: Thibault Mange <[email protected]>
Signed-off-by: Saswata Mukherjee <[email protected]>
Co-authored-by: Giedrius Statkevičius <[email protected]>
Co-authored-by: Michael Hoffmann <[email protected]>
Co-authored-by: Thibault Mange <[email protected]>
  • Loading branch information
4 people authored May 28, 2024
1 parent d9a0efa commit 086a698
Show file tree
Hide file tree
Showing 12 changed files with 178 additions and 177 deletions.
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,23 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

### Removed

## [v0.35.1](https://github.com/thanos-io/thanos/tree/release-0.35) - 28.05.2024

### Fixed

- [#7323](https://github.com/thanos-io/thanos/pull/7323) Sidecar: wait for prometheus on startup
- [#6948](https://github.com/thanos-io/thanos/pull/6948) Receive: fix goroutines leak during series requests to thanos store api.
- [#7382](https://github.com/thanos-io/thanos/pull/7382) *: Ensure objstore flag values are masked & disable debug/pprof/cmdline
- [#7392](https://github.com/thanos-io/thanos/pull/7392) Query: fix broken min, max for pre 0.34.1 sidecars
- [#7373](https://github.com/thanos-io/thanos/pull/7373) Receive: Fix stats for remote write
- [#7318](https://github.com/thanos-io/thanos/pull/7318) Compactor: Recover from panic to log block ID

### Added

### Changed

### Removed

## [v0.35.0](https://github.com/thanos-io/thanos/tree/release-0.35) - 02.05.2024

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.35.0
0.35.1
5 changes: 5 additions & 0 deletions cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,11 @@ func getFlagsMap(flags []*kingpin.FlagModel) map[string]string {
if boilerplateFlags.GetFlag(f.Name) != nil {
continue
}
// Mask inline objstore flag which can have credentials.
if f.Name == "objstore.config" || f.Name == "objstore.config-file" {
flagsMap[f.Name] = "<REDACTED>"
continue
}
flagsMap[f.Name] = f.Value.String()
}

Expand Down
99 changes: 61 additions & 38 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,64 +172,87 @@ func runSidecar(
Help: "Boolean indicator whether the sidecar can reach its Prometheus peer.",
})

ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
// Only check Prometheus's flags when upload is enabled.
if uploads {
// Check prometheus's flags to ensure same sidecar flags.
if err := validatePrometheus(ctx, m.client, logger, conf.shipper.ignoreBlockSize, m); err != nil {
return errors.Wrap(err, "validate Prometheus flags")
}
}
ctx := context.Background()
// Only check Prometheus's flags when upload is enabled.
if uploads {
// Check prometheus's flags to ensure same sidecar flags.
// We retry infinitely until we validated prometheus flags
err := runutil.Retry(conf.prometheus.getConfigInterval, ctx.Done(), func() error {
iterCtx, iterCancel := context.WithTimeout(context.Background(), conf.prometheus.getConfigTimeout)
defer iterCancel()

// We retry infinitely until we reach and fetch BuildVersion from our Prometheus.
err := runutil.Retry(2*time.Second, ctx.Done(), func() error {
if err := m.BuildVersion(ctx); err != nil {
if err := validatePrometheus(iterCtx, m.client, logger, conf.shipper.ignoreBlockSize, m); err != nil {
level.Warn(logger).Log(
"msg", "failed to fetch prometheus version. Is Prometheus running? Retrying",
"msg", "failed to validate prometheus flags. Is Prometheus running? Retrying",
"err", err,
)
return err
}

level.Info(logger).Log(
"msg", "successfully loaded prometheus version",
"msg", "successfully validated prometheus flags",
)
return nil
})
if err != nil {
return errors.Wrap(err, "failed to get prometheus version")
return errors.Wrap(err, "failed to validate prometheus flags")
}
}

// Blocking query of external labels before joining as a Source Peer into gossip.
// We retry infinitely until we reach and fetch labels from our Prometheus.
err = runutil.Retry(2*time.Second, ctx.Done(), func() error {
if err := m.UpdateLabels(ctx); err != nil {
level.Warn(logger).Log(
"msg", "failed to fetch initial external labels. Is Prometheus running? Retrying",
"err", err,
)
promUp.Set(0)
statusProber.NotReady(err)
return err
}
// We retry infinitely until we reach and fetch BuildVersion from our Prometheus.
err := runutil.Retry(conf.prometheus.getConfigInterval, ctx.Done(), func() error {
iterCtx, iterCancel := context.WithTimeout(context.Background(), conf.prometheus.getConfigTimeout)
defer iterCancel()

level.Info(logger).Log(
"msg", "successfully loaded prometheus external labels",
"external_labels", m.Labels().String(),
if err := m.BuildVersion(iterCtx); err != nil {
level.Warn(logger).Log(
"msg", "failed to fetch prometheus version. Is Prometheus running? Retrying",
"err", err,
)
promUp.Set(1)
statusProber.Ready()
return nil
})
if err != nil {
return errors.Wrap(err, "initial external labels query")
return err
}

if len(m.Labels()) == 0 {
return errors.New("no external labels configured on Prometheus server, uniquely identifying external labels must be configured; see https://thanos.io/tip/thanos/storage.md#external-labels for details.")
level.Info(logger).Log(
"msg", "successfully loaded prometheus version",
)
return nil
})
if err != nil {
return errors.Wrap(err, "failed to get prometheus version")
}

// Blocking query of external labels before joining as a Source Peer into gossip.
// We retry infinitely until we reach and fetch labels from our Prometheus.
err = runutil.Retry(conf.prometheus.getConfigInterval, ctx.Done(), func() error {
iterCtx, iterCancel := context.WithTimeout(context.Background(), conf.prometheus.getConfigTimeout)
defer iterCancel()

if err := m.UpdateLabels(iterCtx); err != nil {
level.Warn(logger).Log(
"msg", "failed to fetch initial external labels. Is Prometheus running? Retrying",
"err", err,
)
return err
}

level.Info(logger).Log(
"msg", "successfully loaded prometheus external labels",
"external_labels", m.Labels().String(),
)
return nil
})
if err != nil {
return errors.Wrap(err, "initial external labels query")
}

if len(m.Labels()) == 0 {
return errors.New("no external labels configured on Prometheus server, uniquely identifying external labels must be configured; see https://thanos.io/tip/thanos/storage.md#external-labels for details.")
}
promUp.Set(1)
statusProber.Ready()

ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
// Periodically query the Prometheus config. We use this as a heartbeat as well as for updating
// the external labels we apply.
return runutil.Repeat(conf.prometheus.getConfigInterval, ctx.Done(), func() error {
Expand Down
16 changes: 16 additions & 0 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -871,6 +872,21 @@ func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp
return false, ulid.ULID{}, errors.Wrap(err, "create compaction group dir")
}

defer func() {
if p := recover(); p != nil {
var sb strings.Builder

cgIDs := cg.IDs()
for i, blid := range cgIDs {
_, _ = sb.WriteString(blid.String())
if i < len(cgIDs)-1 {
_, _ = sb.WriteString(",")
}
}
rerr = fmt.Errorf("paniced while compacting %s: %v", sb.String(), p)
}
}()

errChan := make(chan error, 1)
err := tracing.DoInSpanWithErr(ctx, "compaction_group", func(ctx context.Context) (err error) {
shouldRerun, compID, err = cg.compact(ctx, subDir, planner, comp, blockDeletableChecker, compactionLifecycleCallback, errChan)
Expand Down
15 changes: 0 additions & 15 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,20 +241,6 @@ func aggrsFromFunc(f string) []storepb.Aggr {
return []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM}
}

func storeHintsFromPromHints(hints *storage.SelectHints) *storepb.QueryHints {
return &storepb.QueryHints{
StepMillis: hints.Step,
Func: &storepb.Func{
Name: hints.Func,
},
Grouping: &storepb.Grouping{
By: hints.By,
Labels: hints.Grouping,
},
Range: &storepb.Range{Millis: hints.Range},
}
}

func (q *querier) Select(ctx context.Context, _ bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet {
if hints == nil {
hints = &storage.SelectHints{
Expand Down Expand Up @@ -351,7 +337,6 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .
ShardInfo: q.shardInfo,
PartialResponseStrategy: q.partialResponseStrategy,
SkipChunks: q.skipChunks,
QueryHints: storeHintsFromPromHints(hints),
}
if q.isDedupEnabled() {
// Soft ask to sort without replica labels and push them at the end of labelset.
Expand Down
35 changes: 16 additions & 19 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,35 +681,32 @@ type remoteWriteParams struct {
alreadyReplicated bool
}

func (h *Handler) gatherWriteStats(writes ...map[endpointReplica]map[string]trackedSeries) tenantRequestStats {
func (h *Handler) gatherWriteStats(localWrites map[endpointReplica]map[string]trackedSeries) tenantRequestStats {
var stats tenantRequestStats = make(tenantRequestStats)

for _, write := range writes {
for er := range write {
for tenant, series := range write[er] {
samples := 0
for er := range localWrites {
for tenant, series := range localWrites[er] {
samples := 0

for _, ts := range series.timeSeries {
samples += len(ts.Samples)
}
for _, ts := range series.timeSeries {
samples += len(ts.Samples)
}

if st, ok := stats[tenant]; ok {
st.timeseries += len(series.timeSeries)
st.totalSamples += samples
if st, ok := stats[tenant]; ok {
st.timeseries += len(series.timeSeries)
st.totalSamples += samples

stats[tenant] = st
} else {
stats[tenant] = requestStats{
timeseries: len(series.timeSeries),
totalSamples: samples,
}
stats[tenant] = st
} else {
stats[tenant] = requestStats{
timeseries: len(series.timeSeries),
totalSamples: samples,
}
}
}
}

return stats

}

func (h *Handler) fanoutForward(ctx context.Context, params remoteWriteParams) (tenantRequestStats, error) {
Expand Down Expand Up @@ -739,7 +736,7 @@ func (h *Handler) fanoutForward(ctx context.Context, params remoteWriteParams) (
return stats, err
}

stats = h.gatherWriteStats(localWrites, remoteWrites)
stats = h.gatherWriteStats(localWrites)

// Prepare a buffered channel to receive the responses from the local and remote writes. Remote writes will all go
// asynchronously and with this capacity we will never block on writing to the channel.
Expand Down
1 change: 0 additions & 1 deletion pkg/server/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ func (s *Server) Handle(pattern string, handler http.Handler) {

func registerProfiler(mux *http.ServeMux) {
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
Expand Down
2 changes: 0 additions & 2 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1571,7 +1571,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
var resp respSet
if s.sortingStrategy == sortingStrategyStore {
resp = newEagerRespSet(
srv.Context(),
span,
10*time.Minute,
blk.meta.ULID.String(),
Expand All @@ -1585,7 +1584,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
)
} else {
resp = newLazyRespSet(
srv.Context(),
span,
10*time.Minute,
blk.meta.ULID.String(),
Expand Down
14 changes: 1 addition & 13 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,19 +163,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Sto
// Don't ask for more than available time. This includes potential `minTime` flag limit.
availableMinTime, _ := p.timestamps()
if r.MinTime < availableMinTime {
// Align min time with the step to avoid missing data when it gets retrieved by the upper layer's PromQL engine.
// This also is necessary when Sidecar uploads a block and then availableMinTime
// becomes a fixed timestamp.
if r.QueryHints != nil && r.QueryHints.StepMillis != 0 {
diff := availableMinTime - r.MinTime
r.MinTime += (diff / r.QueryHints.StepMillis) * r.QueryHints.StepMillis
// Add one more to strictly fit within --min-time -> infinity.
if r.MinTime != availableMinTime {
r.MinTime += r.QueryHints.StepMillis
}
} else {
r.MinTime = availableMinTime
}
r.MinTime = availableMinTime
}

extLsetToRemove := map[string]struct{}{}
Expand Down
Loading

0 comments on commit 086a698

Please sign in to comment.