From a10dc6a6b700217d06d8e3dc817851a44feaa8cd Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Sat, 7 Sep 2024 03:18:47 +0200 Subject: [PATCH] (2.11) ADR-44: JetStream Asset Versioning in Metadata (#5850) Implements [ADR-44](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-44.md) Initial addition of JetStream Asset Versioning, containing: - generating stream and consumer metadata - created server version: `_nats.created.server.version` - created server API level: `_nats.created.server.api_level` - required server API level: `_nats.server.require.api_level` - logging supported API level upon startup: `[INF] API Level: 1`
Note that stream and consumer metadata is only set/updated upon: - creating a new stream/consumer - updating a stream/consumer _(created metadata will not be set for pre-existing assets, only required level will be updated)_
Many tests are added ensuring that: - restoring streams from backups doesn't set this metadata _(if it doesn't exist, for example restoring a backup from a previous version)_ - restarting a server (or upgrading) doesn't set this metadata _(if it doesn't exist, for example due to upgrading)_ - updating consumer `PauseUntil` ups or lowers required API level - metadata is consistently reported through add, update and info requests - only stream/consumer/meta leader determine the metadata, so that there can be no skew in metadata if followers were allowed to update as well - users can't manually supply these `_nats.>` metadata fields, they will be overwritten to the appropriate values - if the metadata is missing during an update, the metadata is preserved _(for example an update by a client that doesn't know about metadata yet)_ These tests check for both non-clustered R1 setups and clustered R3 setups.
This PR doesn't fully implement [ADR-44](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-44.md). There will be follow-up PRs later on to add support for: - dynamic metadata, like `_nats.server.version` and `_nats.server.api_level` that report the current server version and API level the asset lives on: https://github.com/nats-io/nats-server/pull/5857 - reporting server JS API level through `jsz`, `varz` and `$JS.API.INFO` (it is already reported in the logs during startup): https://github.com/nats-io/nats-server/pull/5855 - etc. There are some slight inconsistencies between the ADR and this PR. For example using the terms feature level, `api_version` and `api_level` that all mean the same and are all made consistent to be `api_level`. I'll correct the ADR with any changed details after this PR has been merged. Signed-off-by: Maurice van Veen --------- Signed-off-by: Maurice van Veen --- server/jetstream.go | 1 + server/jetstream_api.go | 24 +- server/jetstream_cluster.go | 14 + server/jetstream_versioning.go | 138 ++++++++ server/jetstream_versioning_test.go | 470 ++++++++++++++++++++++++++++ 5 files changed, 645 insertions(+), 2 deletions(-) create mode 100644 server/jetstream_versioning.go create mode 100644 server/jetstream_versioning_test.go diff --git a/server/jetstream.go b/server/jetstream.go index 3a986cccd4a..b0e22ef6eb7 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -467,6 +467,7 @@ func (s *Server) enableJetStream(cfg JetStreamConfig) error { s.Noticef(" TPM File: %q, Pcr: %d", opts.JetStreamTpm.KeysFile, opts.JetStreamTpm.Pcr) } + s.Noticef(" API Level: %d", JSApiLevel) s.Noticef("-------------------------------------------") // Setup our internal subscriptions. diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 9558ae49fe7..d05503b9222 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1422,6 +1422,9 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account, return } + // Initialize asset version metadata. + setStaticStreamMetadata(&cfg.StreamConfig, nil) + streamName := streamNameFromSubject(subject) if streamName != cfg.Name { resp.Error = NewJSStreamMismatchError() @@ -1557,6 +1560,9 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account, return } + // Update asset version metadata. + setStaticStreamMetadata(&cfg, &mset.cfg) + if err := mset.updatePedantic(&cfg, ncfg.Pedantic); err != nil { resp.Error = NewJSStreamUpdateError(err, Unless(err)) s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) @@ -4028,12 +4034,17 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun return } - // If the consumer already exists then don't allow updating the PauseUntil, just set - // it back to whatever the current configured value is. + var oldCfg *ConsumerConfig if o := stream.lookupConsumer(consumerName); o != nil { + oldCfg = &o.cfg + // If the consumer already exists then don't allow updating the PauseUntil, just set + // it back to whatever the current configured value is. req.Config.PauseUntil = o.cfg.PauseUntil } + // Initialize/update asset version metadata. + setStaticConsumerMetadata(&req.Config, oldCfg) + o, err := stream.addConsumerWithAction(&req.Config, req.Action, req.Pedantic) if err != nil { @@ -4589,6 +4600,11 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account } else { nca.Config.PauseUntil = nil } + + // Update asset version metadata due to updating pause/resume. + // Only PauseUntil is updated above, so reuse config for both. + setStaticConsumerMetadata(nca.Config, nca.Config) + eca := encodeAddConsumerAssignment(&nca) cc.meta.Propose(eca) @@ -4622,6 +4638,10 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account ncfg.PauseUntil = nil } + // Update asset version metadata due to updating pause/resume. + // Only PauseUntil is updated above, so reuse config for both. + setStaticConsumerMetadata(&ncfg, &ncfg) + if err := obs.updateConfig(&ncfg); err != nil { // The only type of error that should be returned here is from o.store, // so use a store failed error type. diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index d1b2cc49b05..3a32b521eba 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -6245,6 +6245,10 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } + + // Update asset version metadata. + setStaticStreamMetadata(cfg, osa.Config) + var newCfg *StreamConfig if jsa := js.accounts[acc.Name]; jsa != nil { js.mu.Unlock() @@ -7283,6 +7287,9 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec // if name was set by the user. if oname != _EMPTY_ { if ca = sa.consumers[oname]; ca != nil && !ca.deleted { + // Provided config might miss metadata, copy from existing config. + copyConsumerMetadata(cfg, ca.Config) + if action == ActionCreate && !reflect.DeepEqual(cfg, ca.Config) { resp.Error = NewJSConsumerAlreadyExistsError() s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) @@ -7297,6 +7304,13 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec } } + // Initialize/update asset version metadata. + var oldCfg *ConsumerConfig + if ca != nil { + oldCfg = ca.Config + } + setStaticConsumerMetadata(cfg, oldCfg) + // If this is new consumer. if ca == nil { if action == ActionUpdate { diff --git a/server/jetstream_versioning.go b/server/jetstream_versioning.go new file mode 100644 index 00000000000..1c49468650e --- /dev/null +++ b/server/jetstream_versioning.go @@ -0,0 +1,138 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import "strconv" + +const ( + // JSApiLevel is the maximum supported JetStream API level for this server. + JSApiLevel int = 1 + + JSCreatedVersionMetadataKey = "_nats.created.server.version" + JSCreatedLevelMetadataKey = "_nats.created.server.api_level" + JSRequiredLevelMetadataKey = "_nats.server.require.api_level" +) + +// setStaticStreamMetadata sets JetStream stream metadata, like the server version and API level. +// Given: +// - cfg!=nil, prevCfg==nil add stream: adds created and required metadata +// - cfg!=nil, prevCfg!=nil update stream: created metadata is preserved, required metadata is updated +func setStaticStreamMetadata(cfg *StreamConfig, prevCfg *StreamConfig) { + if cfg.Metadata == nil { + cfg.Metadata = make(map[string]string) + } + + var prevMetadata map[string]string + if prevCfg != nil { + prevMetadata = prevCfg.Metadata + if prevMetadata == nil { + // Initialize to empty to indicate we had a previous config but metadata was missing. + prevMetadata = make(map[string]string) + } + } + preserveCreatedMetadata(cfg.Metadata, prevMetadata) + + var requiredApiLevel int + cfg.Metadata[JSRequiredLevelMetadataKey] = strconv.Itoa(requiredApiLevel) +} + +// setStaticConsumerMetadata sets JetStream consumer metadata, like the server version and API level. +// Given: +// - cfg!=nil, prevCfg==nil add consumer: adds created and required metadata +// - cfg!=nil, prevCfg!=nil update consumer: created metadata is preserved, required metadata is updated +func setStaticConsumerMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfig) { + if cfg.Metadata == nil { + cfg.Metadata = make(map[string]string) + } + + var prevMetadata map[string]string + if prevCfg != nil { + prevMetadata = prevCfg.Metadata + if prevMetadata == nil { + // Initialize to empty to indicate we had a previous config but metadata was missing. + prevMetadata = make(map[string]string) + } + } + preserveCreatedMetadata(cfg.Metadata, prevMetadata) + + var requiredApiLevel int + + // Added in 2.11, absent | zero is the feature is not used. + // one could be stricter and say even if its set but the time + // has already passed it is also not needed to restore the consumer + if cfg.PauseUntil != nil && !cfg.PauseUntil.IsZero() { + requiredApiLevel = 1 + } + + cfg.Metadata[JSRequiredLevelMetadataKey] = strconv.Itoa(requiredApiLevel) +} + +// copyConsumerMetadata copies versioning fields from metadata of prevCfg into cfg. +// Removes versioning fields if no previous metadata, updates if set, and removes fields if it doesn't exist in prevCfg. +// +// Note: useful when doing equality checks on cfg and prevCfg, but ignoring any versioning metadata differences. +// MUST be followed up with a call to setStaticConsumerMetadata to fix potentially lost metadata. +func copyConsumerMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfig) { + // Remove fields when no previous metadata. + if prevCfg == nil || prevCfg.Metadata == nil { + if cfg.Metadata != nil { + delete(cfg.Metadata, JSCreatedVersionMetadataKey) + delete(cfg.Metadata, JSCreatedLevelMetadataKey) + delete(cfg.Metadata, JSRequiredLevelMetadataKey) + if len(cfg.Metadata) == 0 { + cfg.Metadata = nil + } + } + return + } + + // Set if exists, delete otherwise. + setOrDeleteInMetadata(cfg, prevCfg, JSCreatedVersionMetadataKey) + setOrDeleteInMetadata(cfg, prevCfg, JSCreatedLevelMetadataKey) + setOrDeleteInMetadata(cfg, prevCfg, JSRequiredLevelMetadataKey) +} + +// setOrDeleteInMetadata sets field with key/value in metadata of cfg if set, deletes otherwise. +func setOrDeleteInMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfig, key string) { + if value, ok := prevCfg.Metadata[key]; ok { + if cfg.Metadata == nil { + cfg.Metadata = make(map[string]string) + } + cfg.Metadata[key] = value + } else { + delete(cfg.Metadata, key) + } +} + +// preserveCreatedMetadata sets metadata to contain which version and API level the asset was created on. +// Preserves previous metadata, if not set it initializes versions for the metadata. +func preserveCreatedMetadata(metadata, prevMetadata map[string]string) { + if prevMetadata == nil { + metadata[JSCreatedVersionMetadataKey] = VERSION + metadata[JSCreatedLevelMetadataKey] = strconv.Itoa(JSApiLevel) + return + } + + // Preserve previous metadata if it was set, but delete if not since it could be user-provided. + if v := prevMetadata[JSCreatedVersionMetadataKey]; v != _EMPTY_ { + metadata[JSCreatedVersionMetadataKey] = v + } else { + delete(metadata, JSCreatedVersionMetadataKey) + } + if v := prevMetadata[JSCreatedLevelMetadataKey]; v != _EMPTY_ { + metadata[JSCreatedLevelMetadataKey] = v + } else { + delete(metadata, JSCreatedLevelMetadataKey) + } +} diff --git a/server/jetstream_versioning_test.go b/server/jetstream_versioning_test.go new file mode 100644 index 00000000000..6a87348041b --- /dev/null +++ b/server/jetstream_versioning_test.go @@ -0,0 +1,470 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !skip_js_tests +// +build !skip_js_tests + +package server + +import ( + "archive/tar" + "bytes" + "encoding/json" + "fmt" + "strconv" + "testing" + "time" + + "github.com/klauspost/compress/s2" + + "github.com/nats-io/nats.go" +) + +func metadataAllSet(featureLevel string) map[string]string { + return map[string]string{ + JSCreatedVersionMetadataKey: VERSION, + JSCreatedLevelMetadataKey: strconv.Itoa(JSApiLevel), + JSRequiredLevelMetadataKey: featureLevel, + } +} + +func metadataPrevious() map[string]string { + return map[string]string{ + JSCreatedVersionMetadataKey: "previous-version", + JSCreatedLevelMetadataKey: "previous-level", + JSRequiredLevelMetadataKey: "previous-level", + } +} + +func metadataUpdatedPrevious(featureLevel string) map[string]string { + return map[string]string{ + JSCreatedVersionMetadataKey: "previous-version", + JSCreatedLevelMetadataKey: "previous-level", + JSRequiredLevelMetadataKey: featureLevel, + } +} + +func metadataOnlyRequired() map[string]string { + return map[string]string{ + JSRequiredLevelMetadataKey: "0", + } +} + +func TestJetStreamSetStaticStreamMetadata(t *testing.T) { + for _, test := range []struct { + desc string + cfg *StreamConfig + prev *StreamConfig + expectedMetadata map[string]string + }{ + { + desc: "create", + cfg: &StreamConfig{}, + prev: nil, + expectedMetadata: metadataAllSet("0"), + }, + { + desc: "create/overwrite-user-provided", + cfg: &StreamConfig{Metadata: metadataPrevious()}, + prev: nil, + expectedMetadata: metadataAllSet("0"), + }, + { + desc: "update", + cfg: &StreamConfig{}, + prev: &StreamConfig{Metadata: metadataPrevious()}, + expectedMetadata: metadataUpdatedPrevious("0"), + }, + { + desc: "update/empty-prev-metadata", + cfg: &StreamConfig{}, + prev: &StreamConfig{}, + expectedMetadata: metadataOnlyRequired(), + }, + { + desc: "update/empty-prev-metadata/delete-user-provided", + cfg: &StreamConfig{Metadata: metadataPrevious()}, + prev: &StreamConfig{}, + expectedMetadata: metadataOnlyRequired(), + }, + } { + t.Run(test.desc, func(t *testing.T) { + setStaticStreamMetadata(test.cfg, test.prev) + require_Equal(t, test.cfg.Metadata[JSCreatedVersionMetadataKey], test.expectedMetadata[JSCreatedVersionMetadataKey]) + require_Equal(t, test.cfg.Metadata[JSCreatedLevelMetadataKey], test.expectedMetadata[JSCreatedLevelMetadataKey]) + require_Equal(t, test.cfg.Metadata[JSRequiredLevelMetadataKey], test.expectedMetadata[JSRequiredLevelMetadataKey]) + }) + } +} + +func TestJetStreamSetStaticConsumerMetadata(t *testing.T) { + pauseUntil := time.Unix(0, 0) + pauseUntilZero := time.Time{} + for _, test := range []struct { + desc string + cfg *ConsumerConfig + prev *ConsumerConfig + expectedMetadata map[string]string + }{ + { + desc: "create", + cfg: &ConsumerConfig{}, + prev: nil, + expectedMetadata: metadataAllSet("0"), + }, + { + desc: "create/PauseUntil/zero", + cfg: &ConsumerConfig{PauseUntil: &pauseUntilZero}, + prev: nil, + expectedMetadata: metadataAllSet("0"), + }, + { + desc: "create/PauseUntil", + cfg: &ConsumerConfig{PauseUntil: &pauseUntil}, + prev: nil, + expectedMetadata: metadataAllSet("1"), + }, + { + desc: "create/overwrite-user-provided", + cfg: &ConsumerConfig{Metadata: metadataPrevious()}, + prev: nil, + expectedMetadata: metadataAllSet("0"), + }, + { + desc: "update", + cfg: &ConsumerConfig{}, + prev: &ConsumerConfig{Metadata: metadataPrevious()}, + expectedMetadata: metadataUpdatedPrevious("0"), + }, + { + desc: "create/PauseUntil/zero", + cfg: &ConsumerConfig{PauseUntil: &pauseUntilZero}, + prev: &ConsumerConfig{Metadata: metadataPrevious()}, + expectedMetadata: metadataUpdatedPrevious("0"), + }, + { + desc: "update/PauseUntil", + cfg: &ConsumerConfig{PauseUntil: &pauseUntil}, + prev: &ConsumerConfig{Metadata: metadataPrevious()}, + expectedMetadata: metadataUpdatedPrevious("1"), + }, + { + desc: "update/empty-prev-metadata", + cfg: &ConsumerConfig{}, + prev: &ConsumerConfig{}, + expectedMetadata: metadataOnlyRequired(), + }, + { + desc: "update/empty-prev-metadata/delete-user-provided", + cfg: &ConsumerConfig{Metadata: metadataPrevious()}, + prev: &ConsumerConfig{}, + expectedMetadata: metadataOnlyRequired(), + }, + } { + t.Run(test.desc, func(t *testing.T) { + setStaticConsumerMetadata(test.cfg, test.prev) + require_Equal(t, test.cfg.Metadata[JSCreatedVersionMetadataKey], test.expectedMetadata[JSCreatedVersionMetadataKey]) + require_Equal(t, test.cfg.Metadata[JSCreatedLevelMetadataKey], test.expectedMetadata[JSCreatedLevelMetadataKey]) + require_Equal(t, test.cfg.Metadata[JSRequiredLevelMetadataKey], test.expectedMetadata[JSRequiredLevelMetadataKey]) + }) + } +} + +func TestJetStreamCopyConsumerMetadata(t *testing.T) { + for _, test := range []struct { + desc string + cfg *ConsumerConfig + prev *ConsumerConfig + }{ + { + desc: "no-previous-ignore", + cfg: &ConsumerConfig{Metadata: metadataAllSet("-1")}, + prev: nil, + }, + { + desc: "nil-previous-metadata-ignore", + cfg: &ConsumerConfig{Metadata: metadataAllSet("-1")}, + prev: &ConsumerConfig{Metadata: nil}, + }, + { + desc: "nil-current-metadata-ignore", + cfg: &ConsumerConfig{Metadata: nil}, + prev: &ConsumerConfig{Metadata: metadataPrevious()}, + }, + { + desc: "copy-previous", + cfg: &ConsumerConfig{Metadata: metadataAllSet("-1")}, + prev: &ConsumerConfig{Metadata: metadataPrevious()}, + }, + { + desc: "delete-missing-fields", + cfg: &ConsumerConfig{Metadata: metadataAllSet("-1")}, + prev: &ConsumerConfig{Metadata: make(map[string]string)}, + }, + } { + t.Run(test.desc, func(t *testing.T) { + copyConsumerMetadata(test.cfg, test.prev) + + var expectedMetadata map[string]string + if test.prev != nil { + expectedMetadata = test.prev.Metadata + } + + value, ok := expectedMetadata[JSCreatedVersionMetadataKey] + if ok { + require_Equal(t, test.cfg.Metadata[JSCreatedVersionMetadataKey], value) + } else { + // Key shouldn't exist. + _, ok = test.cfg.Metadata[JSCreatedVersionMetadataKey] + require_False(t, ok) + } + + value, ok = expectedMetadata[JSCreatedLevelMetadataKey] + if ok { + require_Equal(t, test.cfg.Metadata[JSCreatedLevelMetadataKey], value) + } else { + // Key shouldn't exist. + _, ok = test.cfg.Metadata[JSCreatedLevelMetadataKey] + require_False(t, ok) + } + + value, ok = expectedMetadata[JSRequiredLevelMetadataKey] + if ok { + require_Equal(t, test.cfg.Metadata[JSRequiredLevelMetadataKey], value) + } else { + // Key shouldn't exist. + _, ok = test.cfg.Metadata[JSRequiredLevelMetadataKey] + require_False(t, ok) + } + }) + } +} + +type server struct { + replicas int + js nats.JetStreamContext + nc *nats.Conn +} + +const ( + streamName = "STREAM" + consumerName = "CONSUMER" +) + +func TestJetStreamMetadataMutations(t *testing.T) { + single := RunBasicJetStreamServer(t) + defer single.Shutdown() + nc, js := jsClientConnect(t, single) + defer nc.Close() + + cluster := createJetStreamClusterExplicit(t, "R3S", 3) + defer cluster.shutdown() + cnc, cjs := jsClientConnect(t, cluster.randomServer()) + defer cnc.Close() + + // Test for both single server and clustered mode. + for _, s := range []server{ + {1, js, nc}, + {3, cjs, cnc}, + } { + t.Run(fmt.Sprintf("R%d", s.replicas), func(t *testing.T) { + streamMetadataChecks(t, s) + consumerMetadataChecks(t, s) + }) + } +} + +func validateMetadata(metadata map[string]string, expectedFeatureLevel string) bool { + return metadata[JSCreatedVersionMetadataKey] == VERSION || + metadata[JSCreatedLevelMetadataKey] == strconv.Itoa(JSApiLevel) || + metadata[JSRequiredLevelMetadataKey] == expectedFeatureLevel +} + +func streamMetadataChecks(t *testing.T, s server) { + // Add stream. + sc := nats.StreamConfig{Name: streamName, Replicas: s.replicas} + si, err := s.js.AddStream(&sc) + require_NoError(t, err) + require_True(t, validateMetadata(si.Config.Metadata, "0")) + + // Stream info. + si, err = s.js.StreamInfo(streamName) + require_NoError(t, err) + require_True(t, validateMetadata(si.Config.Metadata, "0")) + + // Update stream. + // Metadata set on creation should be preserved, even if not included in update. + si, err = s.js.UpdateStream(&sc) + require_NoError(t, err) + require_True(t, validateMetadata(si.Config.Metadata, "0")) +} + +func consumerMetadataChecks(t *testing.T, s server) { + // Add consumer. + cc := nats.ConsumerConfig{Name: consumerName, Replicas: s.replicas} + ci, err := s.js.AddConsumer(streamName, &cc) + require_NoError(t, err) + require_True(t, validateMetadata(ci.Config.Metadata, "0")) + + // Consumer info. + ci, err = s.js.ConsumerInfo(streamName, consumerName) + require_NoError(t, err) + require_True(t, validateMetadata(ci.Config.Metadata, "0")) + + // Update consumer. + // Metadata set on creation should be preserved, even if not included in update. + ci, err = s.js.UpdateConsumer(streamName, &cc) + require_NoError(t, err) + require_True(t, validateMetadata(ci.Config.Metadata, "0")) + + // Use pause advisories to know when pause/resume is applied. + pauseCh := make(chan *nats.Msg, 10) + _, err = s.nc.ChanSubscribe(JSAdvisoryConsumerPausePre+".STREAM.CONSUMER", pauseCh) + require_NoError(t, err) + + // Pause consumer, should up required API level. + jsTestPause_PauseConsumer(t, s.nc, streamName, consumerName, time.Now().Add(time.Second*3)) + require_ChanRead(t, pauseCh, time.Second*2) + require_Len(t, len(pauseCh), 0) + + ci, err = s.js.ConsumerInfo(streamName, consumerName) + require_NoError(t, err) + require_True(t, validateMetadata(ci.Config.Metadata, "1")) + + // Unpause consumer, should lower required API level. + subj := fmt.Sprintf("$JS.API.CONSUMER.PAUSE.%s.%s", streamName, consumerName) + _, err = s.nc.Request(subj, nil, time.Second) + require_NoError(t, err) + require_ChanRead(t, pauseCh, time.Second*2) + require_Len(t, len(pauseCh), 0) + + ci, err = s.js.ConsumerInfo(streamName, consumerName) + require_NoError(t, err) + require_True(t, validateMetadata(ci.Config.Metadata, "0")) +} + +func TestJetStreamMetadataStreamRestoreAndRestart(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + restoreEmptyStream(t, nc, 1) + + // Stream restore should result in empty metadata to be preserved. + si, err := js.StreamInfo(streamName) + require_NoError(t, err) + require_Equal(t, len(si.Config.Metadata), 0) + + // Restart server. + port := s.opts.Port + sd := s.StoreDir() + nc.Close() + s.Shutdown() + s = RunJetStreamServerOnPort(port, sd) + defer s.Shutdown() + nc, js = jsClientConnect(t, s) + defer nc.Close() + + // After restart (or upgrade) metadata data should remain empty. + si, err = js.StreamInfo(streamName) + require_NoError(t, err) + require_Equal(t, len(si.Config.Metadata), 0) +} + +func TestJetStreamMetadataStreamRestoreAndRestartCluster(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + restoreEmptyStream(t, nc, 3) + + // Stream restore should result in empty metadata to be preserved. + si, err := js.StreamInfo(streamName) + require_NoError(t, err) + require_Equal(t, len(si.Config.Metadata), 0) + + // Restart cluster. + c.stopAll() + c.restartAllSamePorts() + defer c.shutdown() + c.waitOnAllCurrent() + c.waitOnStreamLeader("$G", streamName) + nc, js = jsClientConnect(t, c.randomServer()) + defer nc.Close() + + // After restart (or upgrade) metadata data should remain empty. + si, err = js.StreamInfo(streamName) + require_NoError(t, err) + require_Equal(t, len(si.Config.Metadata), 0) +} + +func restoreEmptyStream(t *testing.T, nc *nats.Conn, replicas int) { + rreq := JSApiStreamRestoreRequest{ + Config: StreamConfig{ + Name: "STREAM", + Retention: LimitsPolicy, + Storage: FileStorage, + Replicas: replicas, + }, + } + buf, err := json.Marshal(rreq) + require_NoError(t, err) + + var rresp JSApiStreamRestoreResponse + msg, err := nc.Request(fmt.Sprintf(JSApiStreamRestoreT, rreq.Config.Name), buf, 5*time.Second) + require_NoError(t, err) + json.Unmarshal(msg.Data, &rresp) + if rresp.Error != nil { + t.Fatalf("Error on restore: %+v", rresp.Error) + } + + // Construct empty stream.tar.s2 (only containing meta.inf). + fsi := FileStreamInfo{StreamConfig: rreq.Config} + fsij, err := json.Marshal(fsi) + require_NoError(t, err) + + hdr := &tar.Header{ + Name: JetStreamMetaFile, + Mode: 0600, + Uname: "nats", + Gname: "nats", + Size: int64(len(fsij)), + Format: tar.FormatPAX, + } + var buffer bytes.Buffer + enc := s2.NewWriter(&buffer) + tw := tar.NewWriter(enc) + err = tw.WriteHeader(hdr) + require_NoError(t, err) + _, err = tw.Write(fsij) + require_NoError(t, err) + err = tw.Close() + require_NoError(t, err) + err = enc.Close() + require_NoError(t, err) + + data := buffer.Bytes() + msg, err = nc.Request(rresp.DeliverSubject, data, 5*time.Second) + require_NoError(t, err) + json.Unmarshal(msg.Data, &rresp) + if rresp.Error != nil { + t.Fatalf("Error on restore: %+v", rresp.Error) + } + + msg, err = nc.Request(rresp.DeliverSubject, nil, 5*time.Second) + require_NoError(t, err) + err = json.Unmarshal(msg.Data, &rresp) + require_NoError(t, err) +}