From 98e604c0f379e901e1c086b690e59c281f6a8ba5 Mon Sep 17 00:00:00 2001 From: Ashwanth Date: Mon, 18 Mar 2024 14:05:52 +0530 Subject: [PATCH 01/14] chore(image-tags): exclude helm release tags (#12240) --- tools/image-tag | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/image-tag b/tools/image-tag index a5d06861571a..0793c8fb229f 100755 --- a/tools/image-tag +++ b/tools/image-tag @@ -13,7 +13,7 @@ SHA=$(git rev-parse --short=7 HEAD | head -c7) # If not a tag, use branch-hash else use tag TAG=$((git describe --tags --exact-match 2> /dev/null || echo "") | sed 's/[operator\/]*v//g') -if [ -z "$TAG" ] +if [ -z "$TAG" ] || [[ "${TAG}" == helm-loki-* ]] then echo "${BRANCH}"-"${SHA}""${WIP}" else From 71d4d3723ac89cb45503c5fbef3bcb1ecc16693e Mon Sep 17 00:00:00 2001 From: Michel Hollands <42814411+MichelHollands@users.noreply.github.com> Date: Mon, 18 Mar 2024 14:34:34 +0000 Subject: [PATCH 02/14] fix: Helm fix usage of http_listen_port and grpc_listen_port fields in templates (#12242) Signed-off-by: Michel Hollands --- production/helm/loki/CHANGELOG.md | 4 ++++ production/helm/loki/Chart.yaml | 2 +- production/helm/loki/README.md | 2 +- .../helm/loki/templates/monitoring/_helpers-monitoring.tpl | 4 ++-- 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/production/helm/loki/CHANGELOG.md b/production/helm/loki/CHANGELOG.md index d6259a7fd60a..16db7683d0ee 100644 --- a/production/helm/loki/CHANGELOG.md +++ b/production/helm/loki/CHANGELOG.md @@ -13,6 +13,10 @@ Entries should include a reference to the pull request that introduced the chang [//]: # ( : do not remove this line. This locator is used by the CI pipeline to automatically create a changelog entry for each new Loki release. Add other chart versions and respective changelog entries bellow this line.) +## 5.44.2 + +- [BUGFIX] Fix usage of `http_listen_port` and `grpc_listen_port` field in template. + ## 5.44.1 - [BUGFIX] Fix `compactorAddress` field: add protocol and port. diff --git a/production/helm/loki/Chart.yaml b/production/helm/loki/Chart.yaml index 14a7d07e4559..00bb148d34a2 100644 --- a/production/helm/loki/Chart.yaml +++ b/production/helm/loki/Chart.yaml @@ -3,7 +3,7 @@ name: loki description: Helm chart for Grafana Loki in simple, scalable mode type: application appVersion: 2.9.4 -version: 5.44.1 +version: 5.44.2 home: https://grafana.github.io/helm-charts sources: - https://github.com/grafana/loki diff --git a/production/helm/loki/README.md b/production/helm/loki/README.md index bf3fa5589484..e3eb07ccd396 100644 --- a/production/helm/loki/README.md +++ b/production/helm/loki/README.md @@ -1,6 +1,6 @@ # loki -![Version: 5.44.1](https://img.shields.io/badge/Version-5.44.1-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: 2.9.4](https://img.shields.io/badge/AppVersion-2.9.4-informational?style=flat-square) +![Version: 5.44.2](https://img.shields.io/badge/Version-5.44.2-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: 2.9.4](https://img.shields.io/badge/AppVersion-2.9.4-informational?style=flat-square) Helm chart for Grafana Loki in simple, scalable mode diff --git a/production/helm/loki/templates/monitoring/_helpers-monitoring.tpl b/production/helm/loki/templates/monitoring/_helpers-monitoring.tpl index 5a8e8b2ce417..cb693e4f888b 100644 --- a/production/helm/loki/templates/monitoring/_helpers-monitoring.tpl +++ b/production/helm/loki/templates/monitoring/_helpers-monitoring.tpl @@ -3,9 +3,9 @@ Client definition for LogsInstance */}} {{- define "loki.logsInstanceClient" -}} {{- $isSingleBinary := eq (include "loki.deployment.isSingleBinary" .) "true" -}} -{{- $url := printf "http://%s.%s.svc.%s:%s/loki/api/v1/push" (include "loki.writeFullname" .) .Release.Namespace .Values.global.clusterDomain .Values.loki.server.http_listen_port }} +{{- $url := printf "http://%s.%s.svc.%s:%s/loki/api/v1/push" (include "loki.writeFullname" .) .Release.Namespace .Values.global.clusterDomain ( .Values.loki.server.http_listen_port | toString ) }} {{- if $isSingleBinary }} - {{- $url = printf "http://%s.%s.svc.%s:%s/loki/api/v1/push" (include "loki.singleBinaryFullname" .) .Release.Namespace .Values.global.clusterDomain .Values.loki.server.http_listen_port }} + {{- $url = printf "http://%s.%s.svc.%s:%s/loki/api/v1/push" (include "loki.singleBinaryFullname" .) .Release.Namespace .Values.global.clusterDomain ( .Values.loki.server.http_listen_port | toString ) }} {{- else if .Values.gateway.enabled -}} {{- $url = printf "http://%s.%s.svc.%s/loki/api/v1/push" (include "loki.gatewayFullname" .) .Release.Namespace .Values.global.clusterDomain }} {{- end -}} From 06e5dad7167c9ab2a82e4095195bcc54aca4e5fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan-Otto=20Kr=C3=B6pke?= Date: Mon, 18 Mar 2024 15:58:33 +0100 Subject: [PATCH 03/14] fix: helm template error if ingress is enabled (#12241) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jan-Otto Kröpke --- production/helm/loki/CHANGELOG.md | 4 ++++ production/helm/loki/Chart.yaml | 2 +- production/helm/loki/README.md | 2 +- production/helm/loki/templates/_helpers.tpl | 8 ++++---- 4 files changed, 10 insertions(+), 6 deletions(-) diff --git a/production/helm/loki/CHANGELOG.md b/production/helm/loki/CHANGELOG.md index 16db7683d0ee..d38b738e0419 100644 --- a/production/helm/loki/CHANGELOG.md +++ b/production/helm/loki/CHANGELOG.md @@ -13,6 +13,10 @@ Entries should include a reference to the pull request that introduced the chang [//]: # ( : do not remove this line. This locator is used by the CI pipeline to automatically create a changelog entry for each new Loki release. Add other chart versions and respective changelog entries bellow this line.) +## 5.44.3 + +- [BUGFIX] Fix template error: `<.Values.loki.server.http_listen_port>: can't evaluate field Values in type interface {}` + ## 5.44.2 - [BUGFIX] Fix usage of `http_listen_port` and `grpc_listen_port` field in template. diff --git a/production/helm/loki/Chart.yaml b/production/helm/loki/Chart.yaml index 00bb148d34a2..3126ae0f8758 100644 --- a/production/helm/loki/Chart.yaml +++ b/production/helm/loki/Chart.yaml @@ -3,7 +3,7 @@ name: loki description: Helm chart for Grafana Loki in simple, scalable mode type: application appVersion: 2.9.4 -version: 5.44.2 +version: 5.44.3 home: https://grafana.github.io/helm-charts sources: - https://github.com/grafana/loki diff --git a/production/helm/loki/README.md b/production/helm/loki/README.md index e3eb07ccd396..43e2509f218a 100644 --- a/production/helm/loki/README.md +++ b/production/helm/loki/README.md @@ -1,6 +1,6 @@ # loki -![Version: 5.44.2](https://img.shields.io/badge/Version-5.44.2-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: 2.9.4](https://img.shields.io/badge/AppVersion-2.9.4-informational?style=flat-square) +![Version: 5.44.3](https://img.shields.io/badge/Version-5.44.3-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: 2.9.4](https://img.shields.io/badge/AppVersion-2.9.4-informational?style=flat-square) Helm chart for Grafana Loki in simple, scalable mode diff --git a/production/helm/loki/templates/_helpers.tpl b/production/helm/loki/templates/_helpers.tpl index 1e858fba62a0..4d1218016dc4 100644 --- a/production/helm/loki/templates/_helpers.tpl +++ b/production/helm/loki/templates/_helpers.tpl @@ -562,16 +562,16 @@ Params: pathType: Prefix {{- end }} backend: - {{- if $ingressApiIsStable }} {{- $serviceName := include "loki.ingress.serviceName" (dict "ctx" $.ctx "svcName" $.svcName) }} + {{- if $ingressApiIsStable }} service: name: {{ $serviceName }} port: - number: {{ .Values.loki.server.http_listen_port }} + number: {{ $.ctx.Values.loki.server.http_listen_port }} {{- else }} serviceName: {{ $serviceName }} - servicePort: {{ .Values.loki.server.http_listen_port }} -{{- end -}} + servicePort: {{ $.ctx.Values.loki.server.http_listen_port }} + {{- end -}} {{- end -}} {{- end -}} From c235fbfaadd8ca0b37ec33b8932740db711c782c Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 18 Mar 2024 17:18:46 +0100 Subject: [PATCH 04/14] fix(bloom-compactor): Correctly disable the bloom-compactor. (#12196) Co-authored-by: Salva Corts --- pkg/loki/loki.go | 3 +++ pkg/loki/modules.go | 12 ++++++++++++ 2 files changed, 15 insertions(+) diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index f44c079e51df..96814966b01a 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -664,6 +664,9 @@ func (t *Loki) setupModuleManager() error { Write: {Ingester, Distributor}, Backend: {QueryScheduler, Ruler, Compactor, IndexGateway, BloomGateway, BloomCompactor}, + // TODO(salvacorts): We added the BloomCompactor component to the `all` target to ease testing. + // We should remove it before releasing the feature since we don’t think any user running + // the single binary will benefit from the blooms given their scale in terms of ingested data All: {QueryScheduler, QueryFrontend, Querier, Ingester, Distributor, Ruler, Compactor, BloomCompactor}, } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 0761331f6b68..f9d67ef255be 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -1325,6 +1325,9 @@ func (t *Loki) addCompactorMiddleware(h http.HandlerFunc) http.Handler { } func (t *Loki) initBloomGateway() (services.Service, error) { + if !t.Cfg.BloomGateway.Enabled { + return nil, nil + } logger := log.With(util_log.Logger, "component", "bloom-gateway") gateway, err := bloomgateway.New(t.Cfg.BloomGateway, t.BloomStore, logger, prometheus.DefaultRegisterer) @@ -1336,6 +1339,9 @@ func (t *Loki) initBloomGateway() (services.Service, error) { } func (t *Loki) initBloomGatewayRing() (services.Service, error) { + if !t.Cfg.BloomGateway.Enabled { + return nil, nil + } // Inherit ring listen port from gRPC config t.Cfg.BloomGateway.Ring.ListenPort = t.Cfg.Server.GRPCListenPort @@ -1469,6 +1475,9 @@ func (t *Loki) initIndexGatewayInterceptors() (services.Service, error) { } func (t *Loki) initBloomCompactor() (services.Service, error) { + if !t.Cfg.BloomCompactor.Enabled { + return nil, nil + } logger := log.With(util_log.Logger, "component", "bloom-compactor") shuffleSharding := util_ring.NewTenantShuffleSharding(t.bloomCompactorRingManager.Ring, t.bloomCompactorRingManager.RingLifecycler, t.Overrides.BloomCompactorShardSize) @@ -1488,6 +1497,9 @@ func (t *Loki) initBloomCompactor() (services.Service, error) { } func (t *Loki) initBloomCompactorRing() (services.Service, error) { + if !t.Cfg.BloomCompactor.Enabled { + return nil, nil + } t.Cfg.BloomCompactor.Ring.ListenPort = t.Cfg.Server.GRPCListenPort // is LegacyMode needed? From 6c572b6797c9050b765b3cca7ee2944fb16e14f6 Mon Sep 17 00:00:00 2001 From: ND Tai <49815011+taind772@users.noreply.github.com> Date: Mon, 18 Mar 2024 23:20:04 +0700 Subject: [PATCH 05/14] feat: use http_listen_port for compactorAddress (#12230) Co-authored-by: Michel Hollands <42814411+MichelHollands@users.noreply.github.com> --- production/helm/loki/CHANGELOG.md | 4 ++++ production/helm/loki/Chart.yaml | 2 +- production/helm/loki/README.md | 2 +- production/helm/loki/templates/_helpers.tpl | 2 +- 4 files changed, 7 insertions(+), 3 deletions(-) diff --git a/production/helm/loki/CHANGELOG.md b/production/helm/loki/CHANGELOG.md index d38b738e0419..08110f3834a7 100644 --- a/production/helm/loki/CHANGELOG.md +++ b/production/helm/loki/CHANGELOG.md @@ -13,6 +13,10 @@ Entries should include a reference to the pull request that introduced the chang [//]: # ( : do not remove this line. This locator is used by the CI pipeline to automatically create a changelog entry for each new Loki release. Add other chart versions and respective changelog entries bellow this line.) +## 5.44.4 + +- [ENHANCEMENT] Use http_listen_port for `compactorAddress`. + ## 5.44.3 - [BUGFIX] Fix template error: `<.Values.loki.server.http_listen_port>: can't evaluate field Values in type interface {}` diff --git a/production/helm/loki/Chart.yaml b/production/helm/loki/Chart.yaml index 3126ae0f8758..b086fea7c0c0 100644 --- a/production/helm/loki/Chart.yaml +++ b/production/helm/loki/Chart.yaml @@ -3,7 +3,7 @@ name: loki description: Helm chart for Grafana Loki in simple, scalable mode type: application appVersion: 2.9.4 -version: 5.44.3 +version: 5.44.4 home: https://grafana.github.io/helm-charts sources: - https://github.com/grafana/loki diff --git a/production/helm/loki/README.md b/production/helm/loki/README.md index 43e2509f218a..6ae189fd6645 100644 --- a/production/helm/loki/README.md +++ b/production/helm/loki/README.md @@ -1,6 +1,6 @@ # loki -![Version: 5.44.3](https://img.shields.io/badge/Version-5.44.3-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: 2.9.4](https://img.shields.io/badge/AppVersion-2.9.4-informational?style=flat-square) +![Version: 5.44.4](https://img.shields.io/badge/Version-5.44.4-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: 2.9.4](https://img.shields.io/badge/AppVersion-2.9.4-informational?style=flat-square) Helm chart for Grafana Loki in simple, scalable mode diff --git a/production/helm/loki/templates/_helpers.tpl b/production/helm/loki/templates/_helpers.tpl index 4d1218016dc4..14fe80006608 100644 --- a/production/helm/loki/templates/_helpers.tpl +++ b/production/helm/loki/templates/_helpers.tpl @@ -888,7 +888,7 @@ enableServiceLinks: false {{/* single binary */}} {{- $compactorAddress = include "loki.singleBinaryFullname" . -}} {{- end -}} -{{- printf "http://%s:3100" $compactorAddress }} +{{- printf "http://%s:%s" $compactorAddress (.Values.loki.server.http_listen_port | toString) }} {{- end }} {{/* Determine query-scheduler address */}} From 4d38ff79358c33c0e0aadaa1e6fa09b609f62032 Mon Sep 17 00:00:00 2001 From: J Stickler Date: Mon, 18 Mar 2024 14:40:42 -0400 Subject: [PATCH 06/14] docs: Update structured metadata docs (#12187) --- docs/sources/get-started/labels/structured-metadata.md | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/docs/sources/get-started/labels/structured-metadata.md b/docs/sources/get-started/labels/structured-metadata.md index e199402e0b00..587306b2d852 100644 --- a/docs/sources/get-started/labels/structured-metadata.md +++ b/docs/sources/get-started/labels/structured-metadata.md @@ -15,8 +15,16 @@ Structured metadata is a way to attach metadata to logs without indexing them or kubernetes pod names, process ID's, or any other label that is often used in queries but has high cardinality and is expensive to extract at query time. -Structured metadata can also be used to query commonly needed metadata from log lines without needing to apply a parser at query time. Large json blobs or a poorly written query using complex regex patterns, for example, come with a high performance cost. Examples of useful metadata include trace IDs or user IDs. +Structured metadata can also be used to query commonly needed metadata from log lines without needing to apply a parser at query time. Large json blobs or a poorly written query using complex regex patterns, for example, come with a high performance cost. Examples of useful metadata include container_IDs or user IDs. +## When to use structured metadata + +You should only use structured metadata in the following situations: + + • If you are ingesting data in OpenTelemetry format, using the Grafana Agent or an OpenTelemetry Collector. Structured metadata was designed to support native ingestion of OpenTelemetry data. + • If you have high cardinality metadata that should not be used as a label and does not exist in the log line. Some examples might include `process_id` or `thread_id` or Kubernetes pod names. + +It is an antipattern to extract information that already exists in your log lines and put it into structured metadata. ## Attaching structured metadata to log lines From 0bf894b7047125cbf5fbb632bfc83220329a5ecc Mon Sep 17 00:00:00 2001 From: Edson C Date: Mon, 18 Mar 2024 19:39:15 -0300 Subject: [PATCH 07/14] docs: Update Kubernetes terminology in operations pages (#12235) Co-authored-by: J Stickler Co-authored-by: Jack Baldry --- docs/sources/operations/loki-canary/_index.md | 2 +- docs/sources/operations/storage/wal.md | 14 ++++++++------ docs/sources/operations/troubleshooting.md | 16 ++++++++-------- docs/sources/operations/zone-ingesters.md | 2 +- 4 files changed, 18 insertions(+), 16 deletions(-) diff --git a/docs/sources/operations/loki-canary/_index.md b/docs/sources/operations/loki-canary/_index.md index d1b68726ee33..cf2a1075d3c0 100644 --- a/docs/sources/operations/loki-canary/_index.md +++ b/docs/sources/operations/loki-canary/_index.md @@ -289,7 +289,7 @@ The `-labelname` and `-labelvalue` flags should also be provided, as these are used by Loki Canary to filter the log stream to only process logs for the current instance of the canary. Ensure that the values provided to the flags are unique to each instance of Loki Canary. Grafana Labs' Tanka config -accomplishes this by passing in the pod name as the label value. +accomplishes this by passing in the Pod name as the label value. If Loki Canary reports a high number of `unexpected_entries`, Loki Canary may not be waiting long enough and the value for the `-wait` flag should be diff --git a/docs/sources/operations/storage/wal.md b/docs/sources/operations/storage/wal.md index 6baf78adc5f4..5f7110f2f182 100644 --- a/docs/sources/operations/storage/wal.md +++ b/docs/sources/operations/storage/wal.md @@ -38,7 +38,7 @@ The WAL also includes a backpressure mechanism to allow a large WAL to be replay ## Changes to deployment -1. Since ingesters need to have the same persistent volume across restarts/rollout, all the ingesters should be run on [statefulset](https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/) with fixed volumes. +1. Since ingesters need to have the same persistent volume across restarts/rollout, all the ingesters should be run on [StatefulSet](https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/) with fixed volumes. 2. Following flags needs to be set * `--ingester.wal-enabled` to `true` which enables writing to WAL during ingestion. @@ -48,7 +48,7 @@ The WAL also includes a backpressure mechanism to allow a large WAL to be replay ## Changes in lifecycle when WAL is enabled -1. Flushing of data to chunk store during rollouts or scale down is disabled. This is because during a rollout of statefulset there are no ingesters that are simultaneously leaving and joining, rather the same ingester is shut down and brought back again with updated config. Hence flushing is skipped and the data is recovered from the WAL. +1. Flushing of data to chunk store during rollouts or scale down is disabled. This is because during a rollout of StatefulSet there are no ingesters that are simultaneously leaving and joining, rather the same ingester is shut down and brought back again with updated config. Hence flushing is skipped and the data is recovered from the WAL. ## Disk space requirements @@ -62,7 +62,7 @@ You should not target 100% disk utilisation. ## Migrating from stateless deployments -The ingester _deployment without WAL_ and _statefulset with WAL_ should be scaled down and up respectively in sync without transfer of data between them to ensure that any ingestion after migration is reliable immediately. +The ingester _Deployment without WAL_ and _StatefulSet with WAL_ should be scaled down and up respectively in sync without transfer of data between them to ensure that any ingestion after migration is reliable immediately. Let's take an example of 4 ingesters. The migration would look something like this: @@ -83,7 +83,7 @@ Scaling up is same as what you would do without WAL or statefulsets. Nothing to When scaling down, we must ensure existing data on the leaving ingesters are flushed to storage instead of just the WAL. This is because we won't be replaying the WAL on an ingester that will no longer exist and we need to make sure the data is not orphaned. -Consider you have 4 ingesters `ingester-0 ingester-1 ingester-2 ingester-3` and you want to scale down to 2 ingesters, the ingesters which will be shutdown according to statefulset rules are `ingester-3` and then `ingester-2`. +Consider you have 4 ingesters `ingester-0 ingester-1 ingester-2 ingester-3` and you want to scale down to 2 ingesters, the ingesters which will be shut down according to StatefulSet rules are `ingester-3` and then `ingester-2`. Hence before actually scaling down in Kubernetes, port forward those ingesters and hit the [`/ingester/shutdown?flush=true`]({{< relref "../../reference/api#flush-in-memory-chunks-and-shut-down" >}}) endpoint. This will flush the chunks and remove itself from the ring, after which it will register as unready and may be deleted. @@ -95,13 +95,15 @@ After hitting the endpoint for `ingester-2 ingester-3`, scale down the ingesters Statefulsets are significantly more cumbersome to work with, upgrade, and so on. Much of this stems from immutable fields on the specification. For example, if one wants to start using the WAL with single store Loki and wants separate volume mounts for the WAL and the boltdb-shipper, you may see immutability errors when attempting updates the Kubernetes statefulsets. -In this case, try `kubectl -n delete sts ingester --cascade=false`. This will leave the pods alive but delete the statefulset. Then you may recreate the (updated) statefulset and one-by-one start deleting the `ingester-0` through `ingester-n` pods _in that order_, allowing the statefulset to spin up new pods to replace them. +In this case, try `kubectl -n delete sts ingester --cascade=false`. +This will leave the Pods alive but delete the StatefulSet. +Then you may recreate the (updated) StatefulSet and one-by-one start deleting the `ingester-0` through `ingester-n` Pods _in that order_, allowing the StatefulSet to spin up new pods to replace them. #### Scaling Down Using `/flush_shutdown` Endpoint and Lifecycle Hook 1. **StatefulSets for Ordered Scaling Down**: Loki's ingesters should be scaled down one by one, which is efficiently handled by Kubernetes StatefulSets. This ensures an ordered and reliable scaling process, as described in the [Deployment and Scaling Guarantees](https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/#deployment-and-scaling-guarantees) documentation. -2. **Using PreStop Lifecycle Hook**: During the pod scaling down process, the PreStop [lifecycle hook](https://kubernetes.io/docs/concepts/containers/container-lifecycle-hooks/) triggers the `/flush_shutdown` endpoint on the ingester. This action flushes the chunks and removes the ingester from the ring, allowing it to register as unready and become eligible for deletion. +2. **Using PreStop Lifecycle Hook**: During the Pod scaling down process, the PreStop [lifecycle hook](https://kubernetes.io/docs/concepts/containers/container-lifecycle-hooks/) triggers the `/flush_shutdown` endpoint on the ingester. This action flushes the chunks and removes the ingester from the ring, allowing it to register as unready and become eligible for deletion. 3. **Using terminationGracePeriodSeconds**: Provides time for the ingester to flush its data before being deleted, if flushing data takes more than 30 minutes, you may need to increase it. diff --git a/docs/sources/operations/troubleshooting.md b/docs/sources/operations/troubleshooting.md index 914a432f3068..68127a222891 100644 --- a/docs/sources/operations/troubleshooting.md +++ b/docs/sources/operations/troubleshooting.md @@ -123,14 +123,14 @@ promtail -log.level=debug The Promtail configuration contains a `__path__` entry to a directory that Promtail cannot find. -## Connecting to a Promtail pod to troubleshoot +## Connecting to a Promtail Pod to troubleshoot First check [Troubleshooting targets](#troubleshooting-targets) section above. -If that doesn't help answer your questions, you can connect to the Promtail pod +If that doesn't help answer your questions, you can connect to the Promtail Pod to investigate further. If you are running Promtail as a DaemonSet in your cluster, you will have a -Promtail pod on each node, so figure out which Promtail you need to debug first: +Promtail Pod on each node, so figure out which Promtail you need to debug first: ```shell @@ -145,10 +145,10 @@ promtail-bth9q 1/1 Running 0 3h 10.56. That output is truncated to highlight just the two pods we are interested in, you can see with the `-o wide` flag the NODE on which they are running. -You'll want to match the node for the pod you are interested in, in this example +You'll want to match the node for the Pod you are interested in, in this example NGINX, to the Promtail running on the same node. -To debug you can connect to the Promtail pod: +To debug you can connect to the Promtail Pod: ```shell kubectl exec -it promtail-bth9q -- /bin/sh @@ -182,12 +182,12 @@ $ helm upgrade --install loki loki/loki --set "loki.tracing.enabled=true" ## Running Loki with Istio Sidecars -An Istio sidecar runs alongside a pod. It intercepts all traffic to and from the pod. -When a pod tries to communicate with another pod using a given protocol, Istio inspects the destination's service using [Protocol Selection](https://istio.io/latest/docs/ops/configuration/traffic-management/protocol-selection/). +An Istio sidecar runs alongside a Pod. It intercepts all traffic to and from the Pod. +When a Pod tries to communicate with another Pod using a given protocol, Istio inspects the destination's service using [Protocol Selection](https://istio.io/latest/docs/ops/configuration/traffic-management/protocol-selection/). This mechanism uses a convention on the port name (for example, `http-my-port` or `grpc-my-port`) to determine how to handle this outgoing traffic. Istio can then do operations such as authorization and smart routing. -This works fine when one pod communicates with another pod using a hostname. But, +This works fine when one Pod communicates with another Pod using a hostname. But, Istio does not allow pods to communicate with other pods using IP addresses, unless the traffic type is `tcp`. diff --git a/docs/sources/operations/zone-ingesters.md b/docs/sources/operations/zone-ingesters.md index 63c6c843fde8..f2a88ff2dc2b 100644 --- a/docs/sources/operations/zone-ingesters.md +++ b/docs/sources/operations/zone-ingesters.md @@ -105,7 +105,7 @@ These instructions assume you are using the zone aware ingester jsonnet deployme 1. if you're using an automated reconcilliation/deployment system like flux, disable it now (for example using flux ignore), if possible for just the default ingester StatefulSet -1. Shutdown flush the default ingesters, unregistering them from the ring, you can do this by port-forwarding each ingester pod and using the endpoint: `"http://url:PORT/ingester/shutdown?flush=true&delete_ring_tokens=true&terminate=false"` +1. Shutdown flush the default ingesters, unregistering them from the ring, you can do this by port-forwarding each ingester Pod and using the endpoint: `"http://url:PORT/ingester/shutdown?flush=true&delete_ring_tokens=true&terminate=false"` 1. manually scale down the default ingester StatefulSet to 0 replicas, we do this via `tk apply` but you could do it via modifying the yaml From 9621d5c5ee3975df8be36df9a637e315e9a5c9d2 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Tue, 19 Mar 2024 09:05:16 +0100 Subject: [PATCH 08/14] test: Fix race condition in LogQL test (#12247) --- pkg/iter/sample_iterator.go | 11 +++++------ pkg/logql/engine_test.go | 32 +++++++++++++++++++------------- 2 files changed, 24 insertions(+), 19 deletions(-) diff --git a/pkg/iter/sample_iterator.go b/pkg/iter/sample_iterator.go index 4c17c473e9f0..632ed9106df1 100644 --- a/pkg/iter/sample_iterator.go +++ b/pkg/iter/sample_iterator.go @@ -3,7 +3,6 @@ package iter import ( "container/heap" "context" - "go.uber.org/atomic" "io" "sync" @@ -522,7 +521,7 @@ func NewSampleQueryResponseIterator(resp *logproto.SampleQueryResponse) SampleIt } type seriesIterator struct { - i *atomic.Int32 + i int series logproto.Series } @@ -568,14 +567,14 @@ func NewMultiSeriesIterator(series []logproto.Series) SampleIterator { // NewSeriesIterator iterates over sample in a series. func NewSeriesIterator(series logproto.Series) SampleIterator { return &seriesIterator{ - i: atomic.NewInt32(-1), + i: -1, series: series, } } func (i *seriesIterator) Next() bool { - tmp := i.i.Add(1) - return int(tmp) < len(i.series.Samples) + i.i++ + return i.i < len(i.series.Samples) } func (i *seriesIterator) Error() error { @@ -591,7 +590,7 @@ func (i *seriesIterator) StreamHash() uint64 { } func (i *seriesIterator) Sample() logproto.Sample { - return i.series.Samples[i.i.Load()] + return i.series.Samples[i.i] } func (i *seriesIterator) Close() error { diff --git a/pkg/logql/engine_test.go b/pkg/logql/engine_test.go index 2dce4ba57ed4..1391b40ff424 100644 --- a/pkg/logql/engine_test.go +++ b/pkg/logql/engine_test.go @@ -2383,16 +2383,16 @@ func TestEngine_LogsInstantQuery_Vector(t *testing.T) { } type errorIteratorQuerier struct { - samples []iter.SampleIterator - entries []iter.EntryIterator + samples func() []iter.SampleIterator + entries func() []iter.EntryIterator } func (e errorIteratorQuerier) SelectLogs(_ context.Context, p SelectLogParams) (iter.EntryIterator, error) { - return iter.NewSortEntryIterator(e.entries, p.Direction), nil + return iter.NewSortEntryIterator(e.entries(), p.Direction), nil } func (e errorIteratorQuerier) SelectSamples(_ context.Context, _ SelectSampleParams) (iter.SampleIterator, error) { - return iter.NewSortSampleIterator(e.samples), nil + return iter.NewSortSampleIterator(e.samples()), nil } func TestStepEvaluator_Error(t *testing.T) { @@ -2406,9 +2406,11 @@ func TestStepEvaluator_Error(t *testing.T) { "rangeAggEvaluator", `count_over_time({app="foo"}[1m])`, &errorIteratorQuerier{ - samples: []iter.SampleIterator{ - iter.NewSeriesIterator(newSeries(testSize, identity, `{app="foo"}`)), - NewErrorSampleIterator(), + samples: func() []iter.SampleIterator { + return []iter.SampleIterator{ + iter.NewSeriesIterator(newSeries(testSize, identity, `{app="foo"}`)), + NewErrorSampleIterator(), + } }, }, ErrMock, @@ -2417,9 +2419,11 @@ func TestStepEvaluator_Error(t *testing.T) { "stream", `{app="foo"}`, &errorIteratorQuerier{ - entries: []iter.EntryIterator{ - iter.NewStreamIterator(newStream(testSize, identity, `{app="foo"}`)), - NewErrorEntryIterator(), + entries: func() []iter.EntryIterator { + return []iter.EntryIterator{ + iter.NewStreamIterator(newStream(testSize, identity, `{app="foo"}`)), + NewErrorEntryIterator(), + } }, }, ErrMock, @@ -2428,9 +2432,11 @@ func TestStepEvaluator_Error(t *testing.T) { "binOpStepEvaluator", `count_over_time({app="foo"}[1m]) / count_over_time({app="foo"}[1m])`, &errorIteratorQuerier{ - samples: []iter.SampleIterator{ - iter.NewSeriesIterator(newSeries(testSize, identity, `{app="foo"}`)), - NewErrorSampleIterator(), + samples: func() []iter.SampleIterator { + return []iter.SampleIterator{ + iter.NewSeriesIterator(newSeries(testSize, identity, `{app="foo"}`)), + NewErrorSampleIterator(), + } }, }, ErrMockMultiple, From 4aec20f36b035c52a7186b580e31fd43d21b0ac1 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Tue, 19 Mar 2024 10:35:40 +0100 Subject: [PATCH 09/14] perf(blooms): Recycle bloom page buffers (#12256) Every time when reading a bloom page into memory, a new byte slice with the size of the decoded page was allocated. This PR changes bloom page decoder to use a byte slice from the `BlockPool`. The caller (`LazyBloomIter` of the block querier) is responsible for closing the page whenever a new one is loaded, so the byte slice is put back to its pool. Additionally, this PR changes the max bucket of the `BlockPool` byte slice pool to 64MB so that pages bigger than 16MB are correctly put back to the pool, since [slices exceeding the capacity of the largest bucket are not put back](https://github.com/grafana/loki/blob/main/vendor/github.com/prometheus/prometheus/util/pool/pool.go#L74-L87). --- Signed-off-by: Christian Haudum --- pkg/storage/bloom/v1/bloom.go | 19 +++++++++++++------ pkg/storage/bloom/v1/bloom_querier.go | 24 +++++++++++++++++------- pkg/storage/bloom/v1/util.go | 4 ++-- 3 files changed, 32 insertions(+), 15 deletions(-) diff --git a/pkg/storage/bloom/v1/bloom.go b/pkg/storage/bloom/v1/bloom.go index bae444094a01..6a6c2610e82e 100644 --- a/pkg/storage/bloom/v1/bloom.go +++ b/pkg/storage/bloom/v1/bloom.go @@ -86,7 +86,7 @@ func LazyDecodeBloomPage(r io.Reader, pool chunkenc.ReaderPool, page BloomPageHe } defer pool.PutReader(decompressor) - b := make([]byte, page.DecompressedLen) + b := BlockPool.Get(page.DecompressedLen)[:page.DecompressedLen] if _, err = io.ReadFull(decompressor, b); err != nil { return nil, errors.Wrap(err, "decompressing bloom page") @@ -97,6 +97,10 @@ func LazyDecodeBloomPage(r io.Reader, pool chunkenc.ReaderPool, page BloomPageHe return decoder, nil } +// NewBloomPageDecoder returns a decoder for a bloom page. +// If the byte slice passed in the constructor is from the BlockPool pool, then +// the caller needs to ensure that Close() is called to put the slice back to +// its pool. func NewBloomPageDecoder(data []byte) *BloomPageDecoder { // last 8 bytes are the number of blooms in this page dec := encoding.DecWith(data[len(data)-8:]) @@ -117,11 +121,6 @@ func NewBloomPageDecoder(data []byte) *BloomPageDecoder { } // Decoder is a seekable, reset-able iterator -// TODO(owen-d): use buffer pools. The reason we don't currently -// do this is because the `data` slice currently escapes the decoder -// via the returned bloom, so we can't know when it's safe to return it to the pool. -// This happens via `data ([]byte) -> dec (*encoding.Decbuf) -> bloom (Bloom)` where -// the final Bloom has a reference to the data slice. // We could optimize this by encoding the mode (read, write) into our structs // and doing copy-on-write shenannigans, but I'm avoiding this for now. type BloomPageDecoder struct { @@ -167,6 +166,14 @@ func (d *BloomPageDecoder) Err() error { return d.err } +func (d *BloomPageDecoder) Close() { + d.err = nil + d.cur = nil + BlockPool.Put(d.data) + d.data = nil + d.dec.B = nil +} + type BloomPageHeader struct { N, Offset, Len, DecompressedLen int } diff --git a/pkg/storage/bloom/v1/bloom_querier.go b/pkg/storage/bloom/v1/bloom_querier.go index d0dbdc1b3b55..a6b475607eb1 100644 --- a/pkg/storage/bloom/v1/bloom_querier.go +++ b/pkg/storage/bloom/v1/bloom_querier.go @@ -32,6 +32,13 @@ func (it *LazyBloomIter) ensureInit() { } } +func (it *LazyBloomIter) setPage(dec *BloomPageDecoder) { + if it.curPage != nil { + it.curPage.Close() + } + it.curPage = dec +} + func (it *LazyBloomIter) Seek(offset BloomOffset) { it.ensureInit() @@ -42,17 +49,18 @@ func (it *LazyBloomIter) Seek(offset BloomOffset) { r, err := it.b.reader.Blooms() if err != nil { it.err = errors.Wrap(err, "getting blooms reader") + it.setPage(nil) return } decoder, err := it.b.blooms.BloomPageDecoder(r, offset.Page) if err != nil { it.err = errors.Wrap(err, "loading bloom page") + it.setPage(nil) return } it.curPageIndex = offset.Page - it.curPage = decoder - + it.setPage(decoder) } it.curPage.Seek(offset.ByteOffset) @@ -80,25 +88,26 @@ func (it *LazyBloomIter) next() bool { return false } - it.curPage, err = it.b.blooms.BloomPageDecoder( - r, - it.curPageIndex, - ) + decoder, err := it.b.blooms.BloomPageDecoder(r, it.curPageIndex) if err != nil { it.err = err + it.setPage(nil) return false } + it.setPage(decoder) continue } if !it.curPage.Next() { // there was an error if it.curPage.Err() != nil { + it.err = it.curPage.Err() + it.setPage(nil) return false } // we've exhausted the current page, progress to next it.curPageIndex++ - it.curPage = nil + it.setPage(nil) continue } @@ -106,6 +115,7 @@ func (it *LazyBloomIter) next() bool { } // finished last page + it.setPage(nil) return false } diff --git a/pkg/storage/bloom/v1/util.go b/pkg/storage/bloom/v1/util.go index e96af779cc18..e6ad69a248fe 100644 --- a/pkg/storage/bloom/v1/util.go +++ b/pkg/storage/bloom/v1/util.go @@ -32,10 +32,10 @@ var ( }, } - // 1KB -> 8MB + // 4KB -> 64MB BlockPool = BytePool{ pool: pool.New( - 1<<10, 1<<24, 4, + 4<<10, 64<<20, 4, func(size int) interface{} { return make([]byte, size) }), From 36e3d7ea61b77d88ea63a8e237fd013043f0411b Mon Sep 17 00:00:00 2001 From: Timofei Larkin Date: Tue, 19 Mar 2024 14:20:32 +0400 Subject: [PATCH 10/14] feat: Helm: Add extraContainers to read pods (#12243) --- CHANGELOG.md | 1 + docs/sources/setup/install/helm/reference.md | 9 +++++++++ production/helm/loki/CHANGELOG.md | 4 ++++ production/helm/loki/Chart.yaml | 2 +- production/helm/loki/README.md | 2 +- production/helm/loki/templates/read/deployment-read.yaml | 3 +++ .../helm/loki/templates/read/statefulset-read.yaml | 3 +++ production/helm/loki/values.yaml | 2 ++ 8 files changed, 24 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c19c74a065a..3ff3d00daed5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ ##### Enhancements +* [12243](https://github.com/grafana/loki/pull/12243) **lllamnyp**: Helm: Add extraContainers to read pods. * [11840](https://github.com/grafana/loki/pull/11840) **jeschkies**: Allow custom usage trackers for ingested and discarded bytes metric. * [11814](https://github.com/grafana/loki/pull/11814) **kavirajk**: feat: Support split align and caching for instant metric query results * [11851](https://github.com/grafana/loki/pull/11851) **elcomtik**: Helm: Allow the definition of resources for GrafanaAgent pods. diff --git a/docs/sources/setup/install/helm/reference.md b/docs/sources/setup/install/helm/reference.md index 1038a6e9d353..28ff765bf3a1 100644 --- a/docs/sources/setup/install/helm/reference.md +++ b/docs/sources/setup/install/helm/reference.md @@ -3345,6 +3345,15 @@ null
 []
 
+ + + + read.extraContainers + list + Containers to add to the read pods +
+[]
+
diff --git a/production/helm/loki/CHANGELOG.md b/production/helm/loki/CHANGELOG.md index 08110f3834a7..d7725e6969b3 100644 --- a/production/helm/loki/CHANGELOG.md +++ b/production/helm/loki/CHANGELOG.md @@ -13,6 +13,10 @@ Entries should include a reference to the pull request that introduced the chang [//]: # ( : do not remove this line. This locator is used by the CI pipeline to automatically create a changelog entry for each new Loki release. Add other chart versions and respective changelog entries bellow this line.) +## 5.45.0 + +- [CHANGE] Add extraContainers parameter for the read pod + ## 5.44.4 - [ENHANCEMENT] Use http_listen_port for `compactorAddress`. diff --git a/production/helm/loki/Chart.yaml b/production/helm/loki/Chart.yaml index b086fea7c0c0..2e9e6334fa4b 100644 --- a/production/helm/loki/Chart.yaml +++ b/production/helm/loki/Chart.yaml @@ -3,7 +3,7 @@ name: loki description: Helm chart for Grafana Loki in simple, scalable mode type: application appVersion: 2.9.4 -version: 5.44.4 +version: 5.45.0 home: https://grafana.github.io/helm-charts sources: - https://github.com/grafana/loki diff --git a/production/helm/loki/README.md b/production/helm/loki/README.md index 6ae189fd6645..1494f5ead16f 100644 --- a/production/helm/loki/README.md +++ b/production/helm/loki/README.md @@ -1,6 +1,6 @@ # loki -![Version: 5.44.4](https://img.shields.io/badge/Version-5.44.4-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: 2.9.4](https://img.shields.io/badge/AppVersion-2.9.4-informational?style=flat-square) +![Version: 5.45.0](https://img.shields.io/badge/Version-5.45.0-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: 2.9.4](https://img.shields.io/badge/AppVersion-2.9.4-informational?style=flat-square) Helm chart for Grafana Loki in simple, scalable mode diff --git a/production/helm/loki/templates/read/deployment-read.yaml b/production/helm/loki/templates/read/deployment-read.yaml index dbe8f531ed18..ee9a15108a80 100644 --- a/production/helm/loki/templates/read/deployment-read.yaml +++ b/production/helm/loki/templates/read/deployment-read.yaml @@ -115,6 +115,9 @@ spec: {{- end }} resources: {{- toYaml .Values.read.resources | nindent 12 }} + {{- with .Values.read.extraContainers }} + {{- toYaml . | nindent 8}} + {{- end }} {{- with .Values.read.affinity }} affinity: {{- tpl . $ | nindent 8 }} diff --git a/production/helm/loki/templates/read/statefulset-read.yaml b/production/helm/loki/templates/read/statefulset-read.yaml index e0fd2c102975..6efa0ad5594c 100644 --- a/production/helm/loki/templates/read/statefulset-read.yaml +++ b/production/helm/loki/templates/read/statefulset-read.yaml @@ -128,6 +128,9 @@ spec: {{- end }} resources: {{- toYaml .Values.read.resources | nindent 12 }} + {{- with .Values.read.extraContainers }} + {{- toYaml . | nindent 8}} + {{- end }} {{- with .Values.read.affinity }} affinity: {{- tpl . $ | nindent 8 }} diff --git a/production/helm/loki/values.yaml b/production/helm/loki/values.yaml index 8fd771fbe51c..f4bf083dcf77 100644 --- a/production/helm/loki/values.yaml +++ b/production/helm/loki/values.yaml @@ -993,6 +993,8 @@ read: legacyReadTarget: false # -- Additional CLI args for the read extraArgs: [] + # -- Containers to add to the read pods + extraContainers: [] # -- Environment variables to add to the read pods extraEnv: [] # -- Environment variables from secrets or configmaps to add to the read pods From ebdf8fef3fe7ef172ceb3b7ac1b712c7b45a2289 Mon Sep 17 00:00:00 2001 From: Robert Jacob Date: Tue, 19 Mar 2024 12:42:41 +0100 Subject: [PATCH 11/14] feat(operator): Restructure LokiStack metrics (#12228) --- operator/CHANGELOG.md | 1 + .../handlers/lokistack_create_or_update.go | 7 - .../internal/alerts/prometheus-alerts.yaml | 14 +- operator/internal/metrics/lokistack.go | 96 ++++++++++ operator/internal/metrics/lokistack_test.go | 175 +++++++++++++++++ operator/internal/metrics/metrics.go | 181 ------------------ operator/main.go | 7 +- 7 files changed, 288 insertions(+), 193 deletions(-) create mode 100644 operator/internal/metrics/lokistack.go create mode 100644 operator/internal/metrics/lokistack_test.go delete mode 100644 operator/internal/metrics/metrics.go diff --git a/operator/CHANGELOG.md b/operator/CHANGELOG.md index b4aca5f3c444..5d21171aa6a2 100644 --- a/operator/CHANGELOG.md +++ b/operator/CHANGELOG.md @@ -1,5 +1,6 @@ ## Main +- [12228](https://github.com/grafana/loki/pull/12228) **xperimental**: Restructure LokiStack metrics - [12164](https://github.com/grafana/loki/pull/12164) **periklis**: Use safe bearer token authentication to scrape operator metrics - [12216](https://github.com/grafana/loki/pull/12216) **xperimental**: Fix duplicate operator metrics due to ServiceMonitor selector - [12212](https://github.com/grafana/loki/pull/12212) **xperimental**: Keep credentialMode in status when updating schemas diff --git a/operator/internal/handlers/lokistack_create_or_update.go b/operator/internal/handlers/lokistack_create_or_update.go index dbb5a110d3b0..ee0eea1513af 100644 --- a/operator/internal/handlers/lokistack_create_or_update.go +++ b/operator/internal/handlers/lokistack_create_or_update.go @@ -24,7 +24,6 @@ import ( "github.com/grafana/loki/operator/internal/handlers/internal/storage" "github.com/grafana/loki/operator/internal/handlers/internal/tlsprofile" "github.com/grafana/loki/operator/internal/manifests" - "github.com/grafana/loki/operator/internal/metrics" "github.com/grafana/loki/operator/internal/status" ) @@ -208,12 +207,6 @@ func CreateOrUpdateLokiStack( return "", kverrors.New("failed to configure lokistack resources", "name", req.NamespacedName) } - // 1x.demo is used only for development, so the metrics will not - // be collected. - if opts.Stack.Size != lokiv1.SizeOneXDemo { - metrics.Collect(&opts.Stack, opts.Name) - } - return objStore.CredentialMode, nil } diff --git a/operator/internal/manifests/internal/alerts/prometheus-alerts.yaml b/operator/internal/manifests/internal/alerts/prometheus-alerts.yaml index 6d2d978843dd..15cc42462f15 100644 --- a/operator/internal/manifests/internal/alerts/prometheus-alerts.yaml +++ b/operator/internal/manifests/internal/alerts/prometheus-alerts.yaml @@ -178,10 +178,16 @@ groups: - alert: LokistackSchemaUpgradesRequired annotations: message: |- - Object storage schema needs upgrade. - summary: "The applied storage schema config is old and should be upgraded." + The LokiStack "{{ $labels.stack_name }}" in namespace "{{ $labels.stack_namespace }}" is using a storage schema + configuration that does not contain the latest schema version. It is recommended to update the schema + configuration to update the schema version to the latest version in the future. + summary: "One or more of the deployed LokiStacks contains an outdated storage schema configuration." runbook_url: "[[ .RunbookURL ]]#Lokistack-Schema-Upgrades-Required" - expr: sum by(stack_id) (lokistack_warnings_count) > 0 + expr: | + sum ( + lokistack_status_condition{reason="StorageNeedsSchemaUpdate",status="true"} + ) by (stack_namespace, stack_name) + > 0 + for: 1m labels: severity: warning - resource: '{{ $labels.stack_id}}' diff --git a/operator/internal/metrics/lokistack.go b/operator/internal/metrics/lokistack.go new file mode 100644 index 000000000000..7bdc96602495 --- /dev/null +++ b/operator/internal/metrics/lokistack.go @@ -0,0 +1,96 @@ +package metrics + +import ( + "context" + + "github.com/go-logr/logr" + "github.com/prometheus/client_golang/prometheus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + lokiv1 "github.com/grafana/loki/operator/apis/loki/v1" +) + +const ( + metricsPrefix = "lokistack_" +) + +var ( + metricsCommonLabels = []string{ + "stack_namespace", + "stack_name", + "size", + } + + lokiStackInfoDesc = prometheus.NewDesc( + metricsPrefix+"info", + "Information about deployed LokiStack instances. Value is always 1.", + metricsCommonLabels, nil, + ) + + lokiStackConditionsCountDesc = prometheus.NewDesc( + metricsPrefix+"status_condition", + "Counts the current status conditions of the LokiStack.", + append(metricsCommonLabels, "condition", "reason", "status"), nil, + ) +) + +func RegisterLokiStackCollector(log logr.Logger, k8sClient client.Client, registry prometheus.Registerer) error { + metrics := &lokiStackCollector{ + log: log, + k8sClient: k8sClient, + } + + return registry.Register(metrics) +} + +type lokiStackCollector struct { + log logr.Logger + k8sClient client.Client +} + +func (l *lokiStackCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- lokiStackInfoDesc + ch <- lokiStackConditionsCountDesc +} + +func (l *lokiStackCollector) Collect(m chan<- prometheus.Metric) { + ctx := context.TODO() + + stackList := &lokiv1.LokiStackList{} + err := l.k8sClient.List(ctx, stackList) + if err != nil { + l.log.Error(err, "failed to get list of LokiStacks for metrics") + return + } + + for _, stack := range stackList.Items { + labels := []string{ + stack.Namespace, + stack.Name, + string(stack.Spec.Size), + } + + m <- prometheus.MustNewConstMetric(lokiStackInfoDesc, prometheus.GaugeValue, 1.0, labels...) + + for _, c := range stack.Status.Conditions { + activeValue := 0.0 + if c.Status == metav1.ConditionTrue { + activeValue = 1.0 + } + + // This mirrors the behavior of kube_state_metrics, which creates two metrics for each condition, + // one for each status (true/false). + m <- prometheus.MustNewConstMetric( + lokiStackConditionsCountDesc, + prometheus.GaugeValue, activeValue, + append(labels, c.Type, c.Reason, "true")..., + ) + m <- prometheus.MustNewConstMetric( + lokiStackConditionsCountDesc, + prometheus.GaugeValue, 1.0-activeValue, + append(labels, c.Type, c.Reason, "false")..., + ) + } + } +} diff --git a/operator/internal/metrics/lokistack_test.go b/operator/internal/metrics/lokistack_test.go new file mode 100644 index 000000000000..5bf3fcc18404 --- /dev/null +++ b/operator/internal/metrics/lokistack_test.go @@ -0,0 +1,175 @@ +package metrics + +import ( + "context" + "io" + "strings" + "testing" + + "github.com/ViaQ/logerr/v2/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + lokiv1 "github.com/grafana/loki/operator/apis/loki/v1" + "github.com/grafana/loki/operator/internal/external/k8s/k8sfakes" +) + +func TestRegisterLokiStackMetrics(t *testing.T) { + logger := log.NewLogger("test", log.WithOutput(io.Discard)) + client := &k8sfakes.FakeClient{} + registry := prometheus.NewPedanticRegistry() + + err := RegisterLokiStackCollector(logger, client, registry) + require.NoError(t, err) +} + +func TestLokiStackMetricsCollect(t *testing.T) { + tt := []struct { + desc string + k8sError error + stacks *lokiv1.LokiStackList + wantMetrics string + }{ + { + desc: "no stacks", + k8sError: nil, + stacks: &lokiv1.LokiStackList{}, + wantMetrics: "", + }, + { + desc: "one demo", + k8sError: nil, + stacks: &lokiv1.LokiStackList{ + Items: []lokiv1.LokiStack{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-stack", + Namespace: "test-namespace", + }, + Spec: lokiv1.LokiStackSpec{ + Size: lokiv1.SizeOneXDemo, + }, + }, + }, + }, + wantMetrics: `# HELP lokistack_info Information about deployed LokiStack instances. Value is always 1. +# TYPE lokistack_info gauge +lokistack_info{size="1x.demo",stack_name="test-stack",stack_namespace="test-namespace"} 1 +`, + }, + { + desc: "one small with warning", + k8sError: nil, + stacks: &lokiv1.LokiStackList{ + Items: []lokiv1.LokiStack{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-stack", + Namespace: "test-namespace", + }, + Spec: lokiv1.LokiStackSpec{ + Size: lokiv1.SizeOneXSmall, + }, + Status: lokiv1.LokiStackStatus{ + Conditions: []metav1.Condition{ + { + Type: string(lokiv1.ConditionWarning), + Status: metav1.ConditionTrue, + Reason: string(lokiv1.ReasonStorageNeedsSchemaUpdate), + }, + }, + }, + }, + }, + }, + wantMetrics: `# HELP lokistack_info Information about deployed LokiStack instances. Value is always 1. +# TYPE lokistack_info gauge +lokistack_info{size="1x.small",stack_name="test-stack",stack_namespace="test-namespace"} 1 +# HELP lokistack_status_condition Counts the current status conditions of the LokiStack. +# TYPE lokistack_status_condition gauge +lokistack_status_condition{condition="Warning",reason="StorageNeedsSchemaUpdate",size="1x.small",stack_name="test-stack",stack_namespace="test-namespace",status="false"} 0 +lokistack_status_condition{condition="Warning",reason="StorageNeedsSchemaUpdate",size="1x.small",stack_name="test-stack",stack_namespace="test-namespace",status="true"} 1 +`, + }, + { + desc: "multiple conditions, inactive warning", + k8sError: nil, + stacks: &lokiv1.LokiStackList{ + Items: []lokiv1.LokiStack{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-stack", + Namespace: "test-namespace", + }, + Spec: lokiv1.LokiStackSpec{ + Size: lokiv1.SizeOneXSmall, + }, + Status: lokiv1.LokiStackStatus{ + Conditions: []metav1.Condition{ + { + Type: string(lokiv1.ConditionReady), + Status: metav1.ConditionTrue, + Reason: string(lokiv1.ReasonReadyComponents), + }, + { + Type: string(lokiv1.ConditionPending), + Status: metav1.ConditionFalse, + Reason: string(lokiv1.ReasonPendingComponents), + }, + { + Type: string(lokiv1.ConditionWarning), + Status: metav1.ConditionFalse, + Reason: string(lokiv1.ReasonStorageNeedsSchemaUpdate), + }, + }, + }, + }, + }, + }, + wantMetrics: `# HELP lokistack_info Information about deployed LokiStack instances. Value is always 1. +# TYPE lokistack_info gauge +lokistack_info{size="1x.small",stack_name="test-stack",stack_namespace="test-namespace"} 1 +# HELP lokistack_status_condition Counts the current status conditions of the LokiStack. +# TYPE lokistack_status_condition gauge +lokistack_status_condition{condition="Pending",reason="PendingComponents",size="1x.small",stack_name="test-stack",stack_namespace="test-namespace",status="false"} 1 +lokistack_status_condition{condition="Pending",reason="PendingComponents",size="1x.small",stack_name="test-stack",stack_namespace="test-namespace",status="true"} 0 +lokistack_status_condition{condition="Ready",reason="ReadyComponents",size="1x.small",stack_name="test-stack",stack_namespace="test-namespace",status="false"} 0 +lokistack_status_condition{condition="Ready",reason="ReadyComponents",size="1x.small",stack_name="test-stack",stack_namespace="test-namespace",status="true"} 1 +lokistack_status_condition{condition="Warning",reason="StorageNeedsSchemaUpdate",size="1x.small",stack_name="test-stack",stack_namespace="test-namespace",status="false"} 1 +lokistack_status_condition{condition="Warning",reason="StorageNeedsSchemaUpdate",size="1x.small",stack_name="test-stack",stack_namespace="test-namespace",status="true"} 0 +`, + }, + } + + for _, tc := range tt { + tc := tc + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + logger := log.NewLogger("test", log.WithOutput(io.Discard)) + k := &k8sfakes.FakeClient{} + k.ListStub = func(_ context.Context, list client.ObjectList, _ ...client.ListOption) error { + if tc.k8sError != nil { + return tc.k8sError + } + + k.SetClientObjectList(list, tc.stacks) + return nil + } + + expected := strings.NewReader(tc.wantMetrics) + + c := &lokiStackCollector{ + log: logger, + k8sClient: k, + } + + if err := testutil.CollectAndCompare(c, expected); err != nil { + t.Error(err) + } + }) + } +} diff --git a/operator/internal/metrics/metrics.go b/operator/internal/metrics/metrics.go deleted file mode 100644 index fc87b76c6fa9..000000000000 --- a/operator/internal/metrics/metrics.go +++ /dev/null @@ -1,181 +0,0 @@ -package metrics - -import ( - "reflect" - - "github.com/prometheus/client_golang/prometheus" - "sigs.k8s.io/controller-runtime/pkg/metrics" - - lokiv1 "github.com/grafana/loki/operator/apis/loki/v1" - "github.com/grafana/loki/operator/internal/manifests" -) - -// UserDefinedLimitsType defines a label that describes the type of limits -// imposed on the cluster -type UserDefinedLimitsType string - -const ( - labelGlobal UserDefinedLimitsType = "global" - labelTenant UserDefinedLimitsType = "tenant" -) - -var ( - deploymentMetric = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "lokistack_deployments", - Help: "Number of clusters that are deployed", - }, - []string{"size", "stack_id"}, - ) - - userDefinedLimitsMetric = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "lokistack_user_defined_limits", - Help: "Number of clusters that are using user defined limits", - }, - []string{"size", "stack_id", "type"}, - ) - - globalStreamLimitMetric = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "lokistack_global_stream_limit", - Help: "Sum of stream limits used globally by the ingesters", - }, - []string{"size", "stack_id"}, - ) - - averageTenantStreamLimitMetric = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "lokistack_avg_stream_limit_per_tenant", - Help: "Sum of stream limits used for defined tenants by the ingesters", - }, - []string{"size", "stack_id"}, - ) - - lokistackWarningsCount = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "lokistack_warnings_count", - Help: "Counts the number of warnings set on a LokiStack.", - }, - []string{"reason", "stack_id"}, - ) -) - -// RegisterMetricCollectors registers the prometheus collectors with the k8 default metrics -func RegisterMetricCollectors() { - metricCollectors := []prometheus.Collector{ - deploymentMetric, - userDefinedLimitsMetric, - globalStreamLimitMetric, - averageTenantStreamLimitMetric, - lokistackWarningsCount, - } - - for _, collector := range metricCollectors { - metrics.Registry.MustRegister(collector) - } -} - -// Collect takes metrics based on the spec -func Collect(spec *lokiv1.LokiStackSpec, stackName string) { - defaultSpec := manifests.DefaultLokiStackSpec(spec.Size) - sizes := []lokiv1.LokiStackSizeType{lokiv1.SizeOneXSmall, lokiv1.SizeOneXMedium} - - for _, size := range sizes { - var ( - globalRate float64 - tenantRate float64 - isUsingSize = false - isUsingTenantLimits = false - isUsingCustomGlobalLimits = false - ) - - if spec.Size == size { - isUsingSize = true - - if !reflect.DeepEqual(spec.Limits.Global, defaultSpec.Limits.Global) { - isUsingCustomGlobalLimits = true - } - - if len(spec.Limits.Tenants) != 0 { - isUsingTenantLimits = true - } - - if ingesters := spec.Template.Ingester.Replicas; ingesters > 0 { - tenantRate = streamRate(spec.Limits.Tenants, ingesters) - globalRate = float64(spec.Limits.Global.IngestionLimits.MaxGlobalStreamsPerTenant / ingesters) - } - } - - setDeploymentMetric(size, stackName, isUsingSize) - setUserDefinedLimitsMetric(size, stackName, labelGlobal, isUsingCustomGlobalLimits) - setUserDefinedLimitsMetric(size, stackName, labelTenant, isUsingTenantLimits) - setGlobalStreamLimitMetric(size, stackName, globalRate) - setAverageTenantStreamLimitMetric(size, stackName, tenantRate) - } - - if len(spec.Storage.Schemas) > 0 && spec.Storage.Schemas[len(spec.Storage.Schemas)-1].Version != lokiv1.ObjectStorageSchemaV13 { - setLokistackSchemaUpgradesRequired(stackName, true) - } -} - -func setLokistackSchemaUpgradesRequired(identifier string, active bool) { - lokistackWarningsCount.With(prometheus.Labels{ - "reason": string(lokiv1.ReasonStorageNeedsSchemaUpdate), - "stack_id": identifier, - }).Set(boolValue(active)) -} - -func setDeploymentMetric(size lokiv1.LokiStackSizeType, identifier string, active bool) { - deploymentMetric.With(prometheus.Labels{ - "size": string(size), - "stack_id": identifier, - }).Set(boolValue(active)) -} - -func setUserDefinedLimitsMetric(size lokiv1.LokiStackSizeType, identifier string, limitType UserDefinedLimitsType, active bool) { - userDefinedLimitsMetric.With(prometheus.Labels{ - "size": string(size), - "stack_id": identifier, - "type": string(limitType), - }).Set(boolValue(active)) -} - -func setGlobalStreamLimitMetric(size lokiv1.LokiStackSizeType, identifier string, rate float64) { - globalStreamLimitMetric.With(prometheus.Labels{ - "size": string(size), - "stack_id": identifier, - }).Set(rate) -} - -func setAverageTenantStreamLimitMetric(size lokiv1.LokiStackSizeType, identifier string, rate float64) { - averageTenantStreamLimitMetric.With(prometheus.Labels{ - "size": string(size), - "stack_id": identifier, - }).Set(rate) -} - -func boolValue(value bool) float64 { - if value { - return 1 - } - return 0 -} - -func streamRate(tenantLimits map[string]lokiv1.PerTenantLimitsTemplateSpec, ingesters int32) float64 { - var tenants, tenantStreamLimit int32 = 0, 0 - - for _, tenant := range tenantLimits { - if tenant.IngestionLimits == nil || tenant.IngestionLimits.MaxGlobalStreamsPerTenant == 0 { - continue - } - - tenants++ - tenantStreamLimit += tenant.IngestionLimits.MaxGlobalStreamsPerTenant - } - - if tenants == 0 || ingesters == 0 { - return 0 - } - return float64(tenantStreamLimit / ingesters / tenants) -} diff --git a/operator/main.go b/operator/main.go index 4dc501f8be33..fdefa6f663a0 100644 --- a/operator/main.go +++ b/operator/main.go @@ -15,6 +15,7 @@ import ( clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/healthz" + runtimemetrics "sigs.k8s.io/controller-runtime/pkg/metrics" ctrlconfigv1 "github.com/grafana/loki/operator/apis/config/v1" lokiv1 "github.com/grafana/loki/operator/apis/loki/v1" @@ -218,7 +219,11 @@ func main() { } logger.Info("registering metrics") - metrics.RegisterMetricCollectors() + err = metrics.RegisterLokiStackCollector(logger, mgr.GetClient(), runtimemetrics.Registry) + if err != nil { + logger.Error(err, "failed to register metrics") + os.Exit(1) + } logger.Info("starting manager") if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { From cdb934ccee38375bdb9cc2263e398e1a7ac56db6 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Tue, 19 Mar 2024 14:20:33 +0100 Subject: [PATCH 12/14] feat(blooms): Dedupe download queue items to reduce queue size (#12222) --- go.mod | 2 + go.sum | 4 + .../stores/shipper/bloomshipper/fetcher.go | 53 ++- vendor/github.com/dolthub/maphash/.gitignore | 2 + vendor/github.com/dolthub/maphash/LICENSE | 201 ++++++++++ vendor/github.com/dolthub/maphash/README.md | 4 + vendor/github.com/dolthub/maphash/hasher.go | 48 +++ vendor/github.com/dolthub/maphash/runtime.go | 111 ++++++ vendor/github.com/dolthub/swiss/.gitignore | 5 + vendor/github.com/dolthub/swiss/LICENSE | 201 ++++++++++ vendor/github.com/dolthub/swiss/README.md | 54 +++ vendor/github.com/dolthub/swiss/bits.go | 58 +++ vendor/github.com/dolthub/swiss/bits_amd64.go | 50 +++ vendor/github.com/dolthub/swiss/map.go | 359 ++++++++++++++++++ vendor/github.com/dolthub/swiss/simd/match.s | 19 + .../dolthub/swiss/simd/match_amd64.go | 9 + vendor/modules.txt | 7 + 17 files changed, 1174 insertions(+), 13 deletions(-) create mode 100644 vendor/github.com/dolthub/maphash/.gitignore create mode 100644 vendor/github.com/dolthub/maphash/LICENSE create mode 100644 vendor/github.com/dolthub/maphash/README.md create mode 100644 vendor/github.com/dolthub/maphash/hasher.go create mode 100644 vendor/github.com/dolthub/maphash/runtime.go create mode 100644 vendor/github.com/dolthub/swiss/.gitignore create mode 100644 vendor/github.com/dolthub/swiss/LICENSE create mode 100644 vendor/github.com/dolthub/swiss/README.md create mode 100644 vendor/github.com/dolthub/swiss/bits.go create mode 100644 vendor/github.com/dolthub/swiss/bits_amd64.go create mode 100644 vendor/github.com/dolthub/swiss/map.go create mode 100644 vendor/github.com/dolthub/swiss/simd/match.s create mode 100644 vendor/github.com/dolthub/swiss/simd/match_amd64.go diff --git a/go.mod b/go.mod index 5b4c32ddb214..00383c41e0c4 100644 --- a/go.mod +++ b/go.mod @@ -120,6 +120,7 @@ require ( github.com/IBM/ibm-cos-sdk-go v1.10.0 github.com/axiomhq/hyperloglog v0.0.0-20240124082744-24bca3a5b39b github.com/d4l3k/messagediff v1.2.1 + github.com/dolthub/swiss v0.2.1 github.com/efficientgo/core v1.0.0-rc.2 github.com/fsnotify/fsnotify v1.6.0 github.com/gogo/googleapis v1.4.0 @@ -199,6 +200,7 @@ require ( github.com/docker/go-connections v0.4.0 // indirect github.com/docker/go-metrics v0.0.1 // indirect github.com/docker/go-units v0.5.0 // indirect + github.com/dolthub/maphash v0.1.0 // indirect github.com/eapache/go-resiliency v1.3.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 // indirect github.com/eapache/queue v1.1.0 // indirect diff --git a/go.sum b/go.sum index 744c904e823c..1d7597a61ea9 100644 --- a/go.sum +++ b/go.sum @@ -523,6 +523,10 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4 github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/docker/libnetwork v0.8.0-dev.2.0.20181012153825-d7b61745d166/go.mod h1:93m0aTqz6z+g32wla4l4WxTrdtvBRmVzYRkYvasA5Z8= github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM= +github.com/dolthub/maphash v0.1.0 h1:bsQ7JsF4FkkWyrP3oCnFJgrCUAFbFf3kOl4L/QxPDyQ= +github.com/dolthub/maphash v0.1.0/go.mod h1:gkg4Ch4CdCDu5h6PMriVLawB7koZ+5ijb9puGMV50a4= +github.com/dolthub/swiss v0.2.1 h1:gs2osYs5SJkAaH5/ggVJqXQxRXtWshF6uE0lgR/Y3Gw= +github.com/dolthub/swiss v0.2.1/go.mod h1:8AhKZZ1HK7g18j7v7k6c5cYIGEZJcPn0ARsai8cUrh0= github.com/drone/envsubst v1.0.3 h1:PCIBwNDYjs50AsLZPYdfhSATKaRg/FJmDc2D6+C2x8g= github.com/drone/envsubst v1.0.3/go.mod h1:N2jZmlMufstn1KEqvbHjw40h1KyTmnVzHcSc9bFiJ2g= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= diff --git a/pkg/storage/stores/shipper/bloomshipper/fetcher.go b/pkg/storage/stores/shipper/bloomshipper/fetcher.go index bf0d200f4471..8a5a1f528fa8 100644 --- a/pkg/storage/stores/shipper/bloomshipper/fetcher.go +++ b/pkg/storage/stores/shipper/bloomshipper/fetcher.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "github.com/dolthub/swiss" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/pkg/errors" @@ -20,7 +21,7 @@ import ( "github.com/grafana/loki/pkg/util/constants" ) -var downloadQueueCapacity = 100000 +var downloadQueueCapacity = 10000 type options struct { ignoreNotFound bool // ignore 404s from object storage; default=true @@ -205,6 +206,7 @@ func (f *Fetcher) FetchBlocks(ctx context.Context, refs []BlockRef, opts ...Fetc item: refs[i], key: key, idx: i, + async: cfg.fetchAsync, results: responses, errors: errors, }) @@ -397,6 +399,7 @@ type downloadRequest[T any, R any] struct { item T key string idx int + async bool results chan<- downloadResponse[R] errors chan<- error } @@ -408,12 +411,14 @@ type downloadResponse[R any] struct { } type downloadQueue[T any, R any] struct { - queue chan downloadRequest[T, R] - mu keymutex.KeyMutex - wg sync.WaitGroup - done chan struct{} - process processFunc[T, R] - logger log.Logger + queue chan downloadRequest[T, R] + enqueued *swiss.Map[string, struct{}] + enqueuedMutex sync.Mutex + mu keymutex.KeyMutex + wg sync.WaitGroup + done chan struct{} + process processFunc[T, R] + logger log.Logger } func newDownloadQueue[T any, R any](size, workers int, process processFunc[T, R], logger log.Logger) (*downloadQueue[T, R], error) { @@ -424,11 +429,12 @@ func newDownloadQueue[T any, R any](size, workers int, process processFunc[T, R] return nil, errors.New("queue requires at least 1 worker") } q := &downloadQueue[T, R]{ - queue: make(chan downloadRequest[T, R], size), - mu: keymutex.NewHashed(workers), - done: make(chan struct{}), - process: process, - logger: logger, + queue: make(chan downloadRequest[T, R], size), + enqueued: swiss.NewMap[string, struct{}](uint32(size)), + mu: keymutex.NewHashed(workers), + done: make(chan struct{}), + process: process, + logger: logger, } for i := 0; i < workers; i++ { q.wg.Add(1) @@ -438,7 +444,23 @@ func newDownloadQueue[T any, R any](size, workers int, process processFunc[T, R] } func (q *downloadQueue[T, R]) enqueue(t downloadRequest[T, R]) { - q.queue <- t + if !t.async { + q.queue <- t + } + // for async task we attempt to dedupe task already in progress. + q.enqueuedMutex.Lock() + defer q.enqueuedMutex.Unlock() + if q.enqueued.Has(t.key) { + return + } + select { + case q.queue <- t: + q.enqueued.Put(t.key, struct{}{}) + default: + // todo we probably want a metric on dropped items + level.Warn(q.logger).Log("msg", "download queue is full, dropping item", "key", t.key) + return + } } func (q *downloadQueue[T, R]) runWorker() { @@ -464,6 +486,11 @@ func (q *downloadQueue[T, R]) do(ctx context.Context, task downloadRequest[T, R] if err != nil { level.Error(q.logger).Log("msg", "failed to unlock key in block lock", "key", task.key, "err", err) } + if task.async { + q.enqueuedMutex.Lock() + _ = q.enqueued.Delete(task.key) + q.enqueuedMutex.Unlock() + } }() q.process(ctx, task) diff --git a/vendor/github.com/dolthub/maphash/.gitignore b/vendor/github.com/dolthub/maphash/.gitignore new file mode 100644 index 000000000000..977a7cadfa20 --- /dev/null +++ b/vendor/github.com/dolthub/maphash/.gitignore @@ -0,0 +1,2 @@ +*.idea +*.test \ No newline at end of file diff --git a/vendor/github.com/dolthub/maphash/LICENSE b/vendor/github.com/dolthub/maphash/LICENSE new file mode 100644 index 000000000000..261eeb9e9f8b --- /dev/null +++ b/vendor/github.com/dolthub/maphash/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/dolthub/maphash/README.md b/vendor/github.com/dolthub/maphash/README.md new file mode 100644 index 000000000000..d91530f99f91 --- /dev/null +++ b/vendor/github.com/dolthub/maphash/README.md @@ -0,0 +1,4 @@ +# maphash + +Hash any `comparable` type using Golang's fast runtime hash. +Uses [AES](https://en.wikipedia.org/wiki/AES_instruction_set) instructions when available. \ No newline at end of file diff --git a/vendor/github.com/dolthub/maphash/hasher.go b/vendor/github.com/dolthub/maphash/hasher.go new file mode 100644 index 000000000000..ef53596a2420 --- /dev/null +++ b/vendor/github.com/dolthub/maphash/hasher.go @@ -0,0 +1,48 @@ +// Copyright 2022 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package maphash + +import "unsafe" + +// Hasher hashes values of type K. +// Uses runtime AES-based hashing. +type Hasher[K comparable] struct { + hash hashfn + seed uintptr +} + +// NewHasher creates a new Hasher[K] with a random seed. +func NewHasher[K comparable]() Hasher[K] { + return Hasher[K]{ + hash: getRuntimeHasher[K](), + seed: newHashSeed(), + } +} + +// NewSeed returns a copy of |h| with a new hash seed. +func NewSeed[K comparable](h Hasher[K]) Hasher[K] { + return Hasher[K]{ + hash: h.hash, + seed: newHashSeed(), + } +} + +// Hash hashes |key|. +func (h Hasher[K]) Hash(key K) uint64 { + // promise to the compiler that pointer + // |p| does not escape the stack. + p := noescape(unsafe.Pointer(&key)) + return uint64(h.hash(p, h.seed)) +} diff --git a/vendor/github.com/dolthub/maphash/runtime.go b/vendor/github.com/dolthub/maphash/runtime.go new file mode 100644 index 000000000000..29cd6a8edf88 --- /dev/null +++ b/vendor/github.com/dolthub/maphash/runtime.go @@ -0,0 +1,111 @@ +// Copyright 2022 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// This file incorporates work covered by the following copyright and +// permission notice: +// +// Copyright 2022 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build go1.18 || go1.19 +// +build go1.18 go1.19 + +package maphash + +import ( + "math/rand" + "unsafe" +) + +type hashfn func(unsafe.Pointer, uintptr) uintptr + +func getRuntimeHasher[K comparable]() (h hashfn) { + a := any(make(map[K]struct{})) + i := (*mapiface)(unsafe.Pointer(&a)) + h = i.typ.hasher + return +} + +func newHashSeed() uintptr { + return uintptr(rand.Int()) +} + +// noescape hides a pointer from escape analysis. It is the identity function +// but escape analysis doesn't think the output depends on the input. +// noescape is inlined and currently compiles down to zero instructions. +// USE CAREFULLY! +// This was copied from the runtime (via pkg "strings"); see issues 23382 and 7921. +// +//go:nosplit +//go:nocheckptr +func noescape(p unsafe.Pointer) unsafe.Pointer { + x := uintptr(p) + return unsafe.Pointer(x ^ 0) +} + +type mapiface struct { + typ *maptype + val *hmap +} + +// go/src/runtime/type.go +type maptype struct { + typ _type + key *_type + elem *_type + bucket *_type + // function for hashing keys (ptr to key, seed) -> hash + hasher func(unsafe.Pointer, uintptr) uintptr + keysize uint8 + elemsize uint8 + bucketsize uint16 + flags uint32 +} + +// go/src/runtime/map.go +type hmap struct { + count int + flags uint8 + B uint8 + noverflow uint16 + // hash seed + hash0 uint32 + buckets unsafe.Pointer + oldbuckets unsafe.Pointer + nevacuate uintptr + // true type is *mapextra + // but we don't need this data + extra unsafe.Pointer +} + +// go/src/runtime/type.go +type tflag uint8 +type nameOff int32 +type typeOff int32 + +// go/src/runtime/type.go +type _type struct { + size uintptr + ptrdata uintptr + hash uint32 + tflag tflag + align uint8 + fieldAlign uint8 + kind uint8 + equal func(unsafe.Pointer, unsafe.Pointer) bool + gcdata *byte + str nameOff + ptrToThis typeOff +} diff --git a/vendor/github.com/dolthub/swiss/.gitignore b/vendor/github.com/dolthub/swiss/.gitignore new file mode 100644 index 000000000000..1f9adf93b65e --- /dev/null +++ b/vendor/github.com/dolthub/swiss/.gitignore @@ -0,0 +1,5 @@ +**/.idea/ +.vscode +.run +venv +.DS_Store \ No newline at end of file diff --git a/vendor/github.com/dolthub/swiss/LICENSE b/vendor/github.com/dolthub/swiss/LICENSE new file mode 100644 index 000000000000..261eeb9e9f8b --- /dev/null +++ b/vendor/github.com/dolthub/swiss/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/dolthub/swiss/README.md b/vendor/github.com/dolthub/swiss/README.md new file mode 100644 index 000000000000..71c6f7dd08c1 --- /dev/null +++ b/vendor/github.com/dolthub/swiss/README.md @@ -0,0 +1,54 @@ +# SwissMap + +SwissMap is a hash table adapated from the "SwissTable" family of hash tables from [Abseil](https://abseil.io/blog/20180927-swisstables). It uses [AES](https://github.com/dolthub/maphash) instructions for fast-hashing and performs key lookups in parallel using [SSE](https://en.wikipedia.org/wiki/Streaming_SIMD_Extensions) instructions. Because of these optimizations, SwissMap is faster and more memory efficient than Golang's built-in `map`. If you'd like to learn more about its design and implementation, check out this [blog post](https://www.dolthub.com/blog/2023-03-28-swiss-map/) announcing its release. + + +## Example + +SwissMap exposes the same interface as the built-in `map`. Give it a try using this [Go playground](https://go.dev/play/p/JPDC5WhYN7g). + +```go +package main + +import "github.com/dolthub/swiss" + +func main() { + m := swiss.NewMap[string, int](42) + + m.Put("foo", 1) + m.Put("bar", 2) + + m.Iter(func(k string, v int) (stop bool) { + println("iter", k, v) + return false // continue + }) + + if x, ok := m.Get("foo"); ok { + println(x) + } + if m.Has("bar") { + x, _ := m.Get("bar") + println(x) + } + + m.Put("foo", -1) + m.Delete("bar") + + if x, ok := m.Get("foo"); ok { + println(x) + } + if m.Has("bar") { + x, _ := m.Get("bar") + println(x) + } + + m.Clear() + + // Output: + // iter foo 1 + // iter bar 2 + // 1 + // 2 + // -1 +} +``` diff --git a/vendor/github.com/dolthub/swiss/bits.go b/vendor/github.com/dolthub/swiss/bits.go new file mode 100644 index 000000000000..f435b6dc914c --- /dev/null +++ b/vendor/github.com/dolthub/swiss/bits.go @@ -0,0 +1,58 @@ +// Copyright 2023 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !amd64 || nosimd + +package swiss + +import ( + "math/bits" + "unsafe" +) + +const ( + groupSize = 8 + maxAvgGroupLoad = 7 + + loBits uint64 = 0x0101010101010101 + hiBits uint64 = 0x8080808080808080 +) + +type bitset uint64 + +func metaMatchH2(m *metadata, h h2) bitset { + // https://graphics.stanford.edu/~seander/bithacks.html##ValueInWord + return hasZeroByte(castUint64(m) ^ (loBits * uint64(h))) +} + +func metaMatchEmpty(m *metadata) bitset { + return hasZeroByte(castUint64(m) ^ hiBits) +} + +func nextMatch(b *bitset) uint32 { + s := uint32(bits.TrailingZeros64(uint64(*b))) + *b &= ^(1 << s) // clear bit |s| + return s >> 3 // div by 8 +} + +func hasZeroByte(x uint64) bitset { + return bitset(((x - loBits) & ^(x)) & hiBits) +} + +func castUint64(m *metadata) uint64 { + return *(*uint64)((unsafe.Pointer)(m)) +} + +//go:linkname fastrand runtime.fastrand +func fastrand() uint32 diff --git a/vendor/github.com/dolthub/swiss/bits_amd64.go b/vendor/github.com/dolthub/swiss/bits_amd64.go new file mode 100644 index 000000000000..8b91f57cf272 --- /dev/null +++ b/vendor/github.com/dolthub/swiss/bits_amd64.go @@ -0,0 +1,50 @@ +// Copyright 2023 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build amd64 && !nosimd + +package swiss + +import ( + "math/bits" + _ "unsafe" + + "github.com/dolthub/swiss/simd" +) + +const ( + groupSize = 16 + maxAvgGroupLoad = 14 +) + +type bitset uint16 + +func metaMatchH2(m *metadata, h h2) bitset { + b := simd.MatchMetadata((*[16]int8)(m), int8(h)) + return bitset(b) +} + +func metaMatchEmpty(m *metadata) bitset { + b := simd.MatchMetadata((*[16]int8)(m), empty) + return bitset(b) +} + +func nextMatch(b *bitset) (s uint32) { + s = uint32(bits.TrailingZeros16(uint16(*b))) + *b &= ^(1 << s) // clear bit |s| + return +} + +//go:linkname fastrand runtime.fastrand +func fastrand() uint32 diff --git a/vendor/github.com/dolthub/swiss/map.go b/vendor/github.com/dolthub/swiss/map.go new file mode 100644 index 000000000000..e5ad203866bd --- /dev/null +++ b/vendor/github.com/dolthub/swiss/map.go @@ -0,0 +1,359 @@ +// Copyright 2023 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package swiss + +import ( + "github.com/dolthub/maphash" +) + +const ( + maxLoadFactor = float32(maxAvgGroupLoad) / float32(groupSize) +) + +// Map is an open-addressing hash map +// based on Abseil's flat_hash_map. +type Map[K comparable, V any] struct { + ctrl []metadata + groups []group[K, V] + hash maphash.Hasher[K] + resident uint32 + dead uint32 + limit uint32 +} + +// metadata is the h2 metadata array for a group. +// find operations first probe the controls bytes +// to filter candidates before matching keys +type metadata [groupSize]int8 + +// group is a group of 16 key-value pairs +type group[K comparable, V any] struct { + keys [groupSize]K + values [groupSize]V +} + +const ( + h1Mask uint64 = 0xffff_ffff_ffff_ff80 + h2Mask uint64 = 0x0000_0000_0000_007f + empty int8 = -128 // 0b1000_0000 + tombstone int8 = -2 // 0b1111_1110 +) + +// h1 is a 57 bit hash prefix +type h1 uint64 + +// h2 is a 7 bit hash suffix +type h2 int8 + +// NewMap constructs a Map. +func NewMap[K comparable, V any](sz uint32) (m *Map[K, V]) { + groups := numGroups(sz) + m = &Map[K, V]{ + ctrl: make([]metadata, groups), + groups: make([]group[K, V], groups), + hash: maphash.NewHasher[K](), + limit: groups * maxAvgGroupLoad, + } + for i := range m.ctrl { + m.ctrl[i] = newEmptyMetadata() + } + return +} + +// Has returns true if |key| is present in |m|. +func (m *Map[K, V]) Has(key K) (ok bool) { + hi, lo := splitHash(m.hash.Hash(key)) + g := probeStart(hi, len(m.groups)) + for { // inlined find loop + matches := metaMatchH2(&m.ctrl[g], lo) + for matches != 0 { + s := nextMatch(&matches) + if key == m.groups[g].keys[s] { + ok = true + return + } + } + // |key| is not in group |g|, + // stop probing if we see an empty slot + matches = metaMatchEmpty(&m.ctrl[g]) + if matches != 0 { + ok = false + return + } + g += 1 // linear probing + if g >= uint32(len(m.groups)) { + g = 0 + } + } +} + +// Get returns the |value| mapped by |key| if one exists. +func (m *Map[K, V]) Get(key K) (value V, ok bool) { + hi, lo := splitHash(m.hash.Hash(key)) + g := probeStart(hi, len(m.groups)) + for { // inlined find loop + matches := metaMatchH2(&m.ctrl[g], lo) + for matches != 0 { + s := nextMatch(&matches) + if key == m.groups[g].keys[s] { + value, ok = m.groups[g].values[s], true + return + } + } + // |key| is not in group |g|, + // stop probing if we see an empty slot + matches = metaMatchEmpty(&m.ctrl[g]) + if matches != 0 { + ok = false + return + } + g += 1 // linear probing + if g >= uint32(len(m.groups)) { + g = 0 + } + } +} + +// Put attempts to insert |key| and |value| +func (m *Map[K, V]) Put(key K, value V) { + if m.resident >= m.limit { + m.rehash(m.nextSize()) + } + hi, lo := splitHash(m.hash.Hash(key)) + g := probeStart(hi, len(m.groups)) + for { // inlined find loop + matches := metaMatchH2(&m.ctrl[g], lo) + for matches != 0 { + s := nextMatch(&matches) + if key == m.groups[g].keys[s] { // update + m.groups[g].keys[s] = key + m.groups[g].values[s] = value + return + } + } + // |key| is not in group |g|, + // stop probing if we see an empty slot + matches = metaMatchEmpty(&m.ctrl[g]) + if matches != 0 { // insert + s := nextMatch(&matches) + m.groups[g].keys[s] = key + m.groups[g].values[s] = value + m.ctrl[g][s] = int8(lo) + m.resident++ + return + } + g += 1 // linear probing + if g >= uint32(len(m.groups)) { + g = 0 + } + } +} + +// Delete attempts to remove |key|, returns true successful. +func (m *Map[K, V]) Delete(key K) (ok bool) { + hi, lo := splitHash(m.hash.Hash(key)) + g := probeStart(hi, len(m.groups)) + for { + matches := metaMatchH2(&m.ctrl[g], lo) + for matches != 0 { + s := nextMatch(&matches) + if key == m.groups[g].keys[s] { + ok = true + // optimization: if |m.ctrl[g]| contains any empty + // metadata bytes, we can physically delete |key| + // rather than placing a tombstone. + // The observation is that any probes into group |g| + // would already be terminated by the existing empty + // slot, and therefore reclaiming slot |s| will not + // cause premature termination of probes into |g|. + if metaMatchEmpty(&m.ctrl[g]) != 0 { + m.ctrl[g][s] = empty + m.resident-- + } else { + m.ctrl[g][s] = tombstone + m.dead++ + } + var k K + var v V + m.groups[g].keys[s] = k + m.groups[g].values[s] = v + return + } + } + // |key| is not in group |g|, + // stop probing if we see an empty slot + matches = metaMatchEmpty(&m.ctrl[g]) + if matches != 0 { // |key| absent + ok = false + return + } + g += 1 // linear probing + if g >= uint32(len(m.groups)) { + g = 0 + } + } +} + +// Iter iterates the elements of the Map, passing them to the callback. +// It guarantees that any key in the Map will be visited only once, and +// for un-mutated Maps, every key will be visited once. If the Map is +// Mutated during iteration, mutations will be reflected on return from +// Iter, but the set of keys visited by Iter is non-deterministic. +func (m *Map[K, V]) Iter(cb func(k K, v V) (stop bool)) { + // take a consistent view of the table in case + // we rehash during iteration + ctrl, groups := m.ctrl, m.groups + // pick a random starting group + g := randIntN(len(groups)) + for n := 0; n < len(groups); n++ { + for s, c := range ctrl[g] { + if c == empty || c == tombstone { + continue + } + k, v := groups[g].keys[s], groups[g].values[s] + if stop := cb(k, v); stop { + return + } + } + g++ + if g >= uint32(len(groups)) { + g = 0 + } + } +} + +// Clear removes all elements from the Map. +func (m *Map[K, V]) Clear() { + for i, c := range m.ctrl { + for j := range c { + m.ctrl[i][j] = empty + } + } + var k K + var v V + for i := range m.groups { + g := &m.groups[i] + for i := range g.keys { + g.keys[i] = k + g.values[i] = v + } + } + m.resident, m.dead = 0, 0 +} + +// Count returns the number of elements in the Map. +func (m *Map[K, V]) Count() int { + return int(m.resident - m.dead) +} + +// Capacity returns the number of additional elements +// the can be added to the Map before resizing. +func (m *Map[K, V]) Capacity() int { + return int(m.limit - m.resident) +} + +// find returns the location of |key| if present, or its insertion location if absent. +// for performance, find is manually inlined into public methods. +func (m *Map[K, V]) find(key K, hi h1, lo h2) (g, s uint32, ok bool) { + g = probeStart(hi, len(m.groups)) + for { + matches := metaMatchH2(&m.ctrl[g], lo) + for matches != 0 { + s = nextMatch(&matches) + if key == m.groups[g].keys[s] { + return g, s, true + } + } + // |key| is not in group |g|, + // stop probing if we see an empty slot + matches = metaMatchEmpty(&m.ctrl[g]) + if matches != 0 { + s = nextMatch(&matches) + return g, s, false + } + g += 1 // linear probing + if g >= uint32(len(m.groups)) { + g = 0 + } + } +} + +func (m *Map[K, V]) nextSize() (n uint32) { + n = uint32(len(m.groups)) * 2 + if m.dead >= (m.resident / 2) { + n = uint32(len(m.groups)) + } + return +} + +func (m *Map[K, V]) rehash(n uint32) { + groups, ctrl := m.groups, m.ctrl + m.groups = make([]group[K, V], n) + m.ctrl = make([]metadata, n) + for i := range m.ctrl { + m.ctrl[i] = newEmptyMetadata() + } + m.hash = maphash.NewSeed(m.hash) + m.limit = n * maxAvgGroupLoad + m.resident, m.dead = 0, 0 + for g := range ctrl { + for s := range ctrl[g] { + c := ctrl[g][s] + if c == empty || c == tombstone { + continue + } + m.Put(groups[g].keys[s], groups[g].values[s]) + } + } +} + +func (m *Map[K, V]) loadFactor() float32 { + slots := float32(len(m.groups) * groupSize) + return float32(m.resident-m.dead) / slots +} + +// numGroups returns the minimum number of groups needed to store |n| elems. +func numGroups(n uint32) (groups uint32) { + groups = (n + maxAvgGroupLoad - 1) / maxAvgGroupLoad + if groups == 0 { + groups = 1 + } + return +} + +func newEmptyMetadata() (meta metadata) { + for i := range meta { + meta[i] = empty + } + return +} + +func splitHash(h uint64) (h1, h2) { + return h1((h & h1Mask) >> 7), h2(h & h2Mask) +} + +func probeStart(hi h1, groups int) uint32 { + return fastModN(uint32(hi), uint32(groups)) +} + +// lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/ +func fastModN(x, n uint32) uint32 { + return uint32((uint64(x) * uint64(n)) >> 32) +} + +// randIntN returns a random number in the interval [0, n). +func randIntN(n int) uint32 { + return fastModN(fastrand(), uint32(n)) +} diff --git a/vendor/github.com/dolthub/swiss/simd/match.s b/vendor/github.com/dolthub/swiss/simd/match.s new file mode 100644 index 000000000000..4ae29e77b9ab --- /dev/null +++ b/vendor/github.com/dolthub/swiss/simd/match.s @@ -0,0 +1,19 @@ +// Code generated by command: go run asm.go -out match.s -stubs match_amd64.go. DO NOT EDIT. + +//go:build amd64 + +#include "textflag.h" + +// func MatchMetadata(metadata *[16]int8, hash int8) uint16 +// Requires: SSE2, SSSE3 +TEXT ·MatchMetadata(SB), NOSPLIT, $0-18 + MOVQ metadata+0(FP), AX + MOVBLSX hash+8(FP), CX + MOVD CX, X0 + PXOR X1, X1 + PSHUFB X1, X0 + MOVOU (AX), X1 + PCMPEQB X1, X0 + PMOVMSKB X0, AX + MOVW AX, ret+16(FP) + RET diff --git a/vendor/github.com/dolthub/swiss/simd/match_amd64.go b/vendor/github.com/dolthub/swiss/simd/match_amd64.go new file mode 100644 index 000000000000..538c8e1248fb --- /dev/null +++ b/vendor/github.com/dolthub/swiss/simd/match_amd64.go @@ -0,0 +1,9 @@ +// Code generated by command: go run asm.go -out match.s -stubs match_amd64.go. DO NOT EDIT. + +//go:build amd64 + +package simd + +// MatchMetadata performs a 16-way probe of |metadata| using SSE instructions +// nb: |metadata| must be an aligned pointer +func MatchMetadata(metadata *[16]int8, hash int8) uint16 diff --git a/vendor/modules.txt b/vendor/modules.txt index 2992d1e44075..d9c9bd146e7b 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -545,6 +545,13 @@ github.com/docker/go-plugins-helpers/sdk # github.com/docker/go-units v0.5.0 ## explicit github.com/docker/go-units +# github.com/dolthub/maphash v0.1.0 +## explicit; go 1.18 +github.com/dolthub/maphash +# github.com/dolthub/swiss v0.2.1 +## explicit; go 1.18 +github.com/dolthub/swiss +github.com/dolthub/swiss/simd # github.com/drone/envsubst v1.0.3 ## explicit; go 1.13 github.com/drone/envsubst From 9a2e00bc170669512d1910d71d8eddb091636060 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Tue, 19 Mar 2024 14:34:00 +0100 Subject: [PATCH 13/14] refactor: Replace TenantConfig method with interface. (#12260) **What this PR does / why we need it**: This change allows the retrieval of the `TenantConfig` or the `DefaultConfig`. It also defines an interface instead of a tenant config function which makes it a little easier to grok the code. Eventually this will enable us to introduce GEL specific configurations. **Checklist** - [ ] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [x] Tests updated - [ ] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/setup/upgrade/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213) - [ ] If the change is deprecating or removing a configuration option, update the `deprecated-config.yaml` and `deleted-config.yaml` files respectively in the `tools/deprecated-config-checker` directory. [Example PR](https://github.com/grafana/loki/pull/10840/commits/0d4416a4b03739583349934b96f272fb4f685d15) --- pkg/distributor/writefailures/manager_test.go | 44 ++++++++++++------- pkg/loki/modules.go | 2 +- pkg/loki/runtime_config.go | 31 ++++++++----- pkg/loki/runtime_config_test.go | 10 ++--- pkg/runtime/config.go | 26 +++++------ 5 files changed, 67 insertions(+), 46 deletions(-) diff --git a/pkg/distributor/writefailures/manager_test.go b/pkg/distributor/writefailures/manager_test.go index 27028fc21c5e..6f5f1eee3841 100644 --- a/pkg/distributor/writefailures/manager_test.go +++ b/pkg/distributor/writefailures/manager_test.go @@ -20,18 +20,20 @@ func TestWriteFailuresLogging(t *testing.T) { buf := bytes.NewBuffer(nil) logger := log.NewLogfmtLogger(buf) - f := func(tenantID string) *runtime.Config { - if tenantID == "good-tenant" { - return &runtime.Config{ - LimitedLogPushErrors: true, + f := &providerMock{ + tenantConfig: func(tenantID string) *runtime.Config { + if tenantID == "good-tenant" { + return &runtime.Config{ + LimitedLogPushErrors: true, + } } - } - if tenantID == "bad-tenant" { - return &runtime.Config{ - LimitedLogPushErrors: false, + if tenantID == "bad-tenant" { + return &runtime.Config{ + LimitedLogPushErrors: false, + } } - } - return &runtime.Config{} + return &runtime.Config{} + }, } runtimeCfg, err := runtime.NewTenantConfigs(f) @@ -55,12 +57,14 @@ func TestWriteFailuresRateLimiting(t *testing.T) { buf := bytes.NewBuffer(nil) logger := log.NewLogfmtLogger(buf) - f := func(tenantID string) *runtime.Config { - return &runtime.Config{ - LimitedLogPushErrors: true, - } + provider := &providerMock{ + tenantConfig: func(tenantID string) *runtime.Config { + return &runtime.Config{ + LimitedLogPushErrors: true, + } + }, } - runtimeCfg, err := runtime.NewTenantConfigs(f) + runtimeCfg, err := runtime.NewTenantConfigs(provider) require.NoError(t, err) t.Run("with zero rate limiting", func(t *testing.T) { @@ -126,7 +130,7 @@ func TestWriteFailuresRateLimiting(t *testing.T) { }) t.Run("limit is per-tenant", func(t *testing.T) { - runtimeCfg, err := runtime.NewTenantConfigs(f) + runtimeCfg, err := runtime.NewTenantConfigs(provider) require.NoError(t, err) manager := NewManager(logger, prometheus.NewRegistry(), Cfg{LogRate: flagext.ByteSize(1000)}, runtimeCfg, "ingester") @@ -161,3 +165,11 @@ func TestWriteFailuresRateLimiting(t *testing.T) { require.Contains(t, content, "3z") }) } + +type providerMock struct { + tenantConfig func(string) *runtime.Config +} + +func (m *providerMock) TenantConfig(userID string) *runtime.Config { + return m.tenantConfig(userID) +} diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index f9d67ef255be..7dec8946c71d 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -306,7 +306,7 @@ func (t *Loki) initOverridesExporter() (services.Service, error) { } func (t *Loki) initTenantConfigs() (_ services.Service, err error) { - t.tenantConfigs, err = runtime.NewTenantConfigs(tenantConfigFromRuntimeConfig(t.runtimeConfig)) + t.tenantConfigs, err = runtime.NewTenantConfigs(newTenantConfigProvider(t.runtimeConfig)) // tenantConfigs are not a service, since they don't have any operational state. return nil, err } diff --git a/pkg/loki/runtime_config.go b/pkg/loki/runtime_config.go index 4b76bae70e85..3432ee1b68b8 100644 --- a/pkg/loki/runtime_config.go +++ b/pkg/loki/runtime_config.go @@ -55,6 +55,7 @@ func loadRuntimeConfig(r io.Reader) (interface{}, error) { return overrides, nil } +// tenantLimitsFromRuntimeConfig implements validation.Limits type tenantLimitsFromRuntimeConfig struct { c *runtimeconfig.Manager } @@ -85,20 +86,28 @@ func newtenantLimitsFromRuntimeConfig(c *runtimeconfig.Manager) validation.Tenan return &tenantLimitsFromRuntimeConfig{c: c} } -func tenantConfigFromRuntimeConfig(c *runtimeconfig.Manager) runtime.TenantConfig { - if c == nil { +type tenantConfigProvider struct { + c *runtimeconfig.Manager +} + +func newTenantConfigProvider(c *runtimeconfig.Manager) runtime.TenantConfigProvider { + return &tenantConfigProvider{c: c} +} + +// TenantConfig returns the user config or default config if none was defined. +func (t *tenantConfigProvider) TenantConfig(userID string) *runtime.Config { + if t.c == nil { return nil } - return func(userID string) *runtime.Config { - cfg, ok := c.GetConfig().(*runtimeConfigValues) - if !ok || cfg == nil { - return nil - } - if tenantCfg, ok := cfg.TenantConfig[userID]; ok { - return tenantCfg - } - return cfg.DefaultConfig + + cfg, ok := t.c.GetConfig().(*runtimeConfigValues) + if !ok || cfg == nil { + return nil + } + if tenantCfg, ok := cfg.TenantConfig[userID]; ok { + return tenantCfg } + return cfg.DefaultConfig } func multiClientRuntimeConfigChannel(manager *runtimeconfig.Manager) func() <-chan kv.MultiRuntimeConfig { diff --git a/pkg/loki/runtime_config_test.go b/pkg/loki/runtime_config_test.go index 26b018318a37..d0fd2ffa4103 100644 --- a/pkg/loki/runtime_config_test.go +++ b/pkg/loki/runtime_config_test.go @@ -98,9 +98,9 @@ configs: log_push_request: true `) - user1 := runtimeGetter("1") - user2 := runtimeGetter("2") - user3 := runtimeGetter("3") + user1 := runtimeGetter.TenantConfig("1") + user2 := runtimeGetter.TenantConfig("2") + user3 := runtimeGetter.TenantConfig("3") require.Equal(t, false, user1.LogPushRequest) require.Equal(t, false, user1.LimitedLogPushErrors) @@ -110,7 +110,7 @@ configs: require.Equal(t, true, user3.LogPushRequest) } -func newTestRuntimeconfig(t *testing.T, yaml string) runtime.TenantConfig { +func newTestRuntimeconfig(t *testing.T, yaml string) runtime.TenantConfigProvider { t.Helper() f, err := os.CreateTemp(t.TempDir(), "bar") require.NoError(t, err) @@ -140,7 +140,7 @@ func newTestRuntimeconfig(t *testing.T, yaml string) runtime.TenantConfig { require.NoError(t, runtimeConfig.AwaitTerminated(context.Background())) }() - return tenantConfigFromRuntimeConfig(runtimeConfig) + return newTenantConfigProvider(runtimeConfig) } func newTestOverrides(t *testing.T, yaml string) *validation.Overrides { diff --git a/pkg/runtime/config.go b/pkg/runtime/config.go index 9795277fbf13..234b5c0c5e2b 100644 --- a/pkg/runtime/config.go +++ b/pkg/runtime/config.go @@ -9,41 +9,41 @@ type Config struct { LimitedLogPushErrors bool `yaml:"limited_log_push_errors"` } -// TenantConfig is a function that returns configs for given tenant, or -// nil, if there are no tenant-specific configs. -type TenantConfig func(userID string) *Config +var EmptyConfig = &Config{} + +// TenantConfigProvider serves a tenant or default config. +type TenantConfigProvider interface { + TenantConfig(userID string) *Config +} // TenantConfigs periodically fetch a set of per-user configs, and provides convenience // functions for fetching the correct value. type TenantConfigs struct { - defaultConfig *Config - tenantConfig TenantConfig + TenantConfigProvider } // DefaultTenantConfigs creates and returns a new TenantConfigs with the defaults populated. func DefaultTenantConfigs() *TenantConfigs { return &TenantConfigs{ - defaultConfig: &Config{}, - tenantConfig: nil, + TenantConfigProvider: nil, } } // NewTenantConfig makes a new TenantConfigs -func NewTenantConfigs(tenantConfig TenantConfig) (*TenantConfigs, error) { +func NewTenantConfigs(configProvider TenantConfigProvider) (*TenantConfigs, error) { return &TenantConfigs{ - defaultConfig: DefaultTenantConfigs().defaultConfig, - tenantConfig: tenantConfig, + TenantConfigProvider: configProvider, }, nil } func (o *TenantConfigs) getOverridesForUser(userID string) *Config { - if o.tenantConfig != nil { - l := o.tenantConfig(userID) + if o.TenantConfigProvider != nil { + l := o.TenantConfigProvider.TenantConfig(userID) if l != nil { return l } } - return o.defaultConfig + return EmptyConfig } func (o *TenantConfigs) LogStreamCreation(userID string) bool { From 2e7dbe6da3bc2b85d5582c6c98194987c542d499 Mon Sep 17 00:00:00 2001 From: Pieter <110168856+Pionerd@users.noreply.github.com> Date: Tue, 19 Mar 2024 18:58:50 +0100 Subject: [PATCH 14/14] fix: Remove lost character in ciliumnetworkpolicy.yaml (#12263) --- production/helm/loki/templates/ciliumnetworkpolicy.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/production/helm/loki/templates/ciliumnetworkpolicy.yaml b/production/helm/loki/templates/ciliumnetworkpolicy.yaml index fb2ce12fc6c8..fbd2619d807b 100644 --- a/production/helm/loki/templates/ciliumnetworkpolicy.yaml +++ b/production/helm/loki/templates/ciliumnetworkpolicy.yaml @@ -149,7 +149,7 @@ spec: {{- range $port := .Values.networkPolicy.externalStorage.ports }} - port: "{{ $port }}" protocol: TCP - {{- end }}à + {{- end }} {{- if .Values.networkPolicy.externalStorage.cidrs }} {{- range $cidr := .Values.networkPolicy.externalStorage.cidrs }} toCIDR: