Skip to content

Commit

Permalink
(2.11) ADR-44: JetStream Asset Versioning in Metadata (#5850)
Browse files Browse the repository at this point in the history
Implements
[ADR-44](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-44.md)

Initial addition of JetStream Asset Versioning, containing:
- generating stream and consumer metadata
  - created server version: `_nats.created.server.version`
  - created server API level: `_nats.created.server.api_level`
  - required server API level: `_nats.server.require.api_level`
- logging supported API level upon startup: `[INF]   API Level:       1`

<br>

Note that stream and consumer metadata is only set/updated upon:
- creating a new stream/consumer
- updating a stream/consumer
_(created metadata will not be set for pre-existing assets, only
required level will be updated)_

<br>

Many tests are added ensuring that:
- restoring streams from backups doesn't set this metadata
_(if it doesn't exist, for example restoring a backup from a previous
version)_
- restarting a server (or upgrading) doesn't set this metadata
  _(if it doesn't exist, for example due to upgrading)_
- updating consumer `PauseUntil` ups or lowers required API level
- metadata is consistently reported through add, update and info
requests
- only stream/consumer/meta leader determine the metadata, so that there
can be no skew in metadata if followers were allowed to update as well
- users can't manually supply these `_nats.>` metadata fields, they will
be overwritten to the appropriate values
- if the metadata is missing during an update, the metadata is preserved
_(for example an update by a client that doesn't know about metadata
yet)_

These tests check for both non-clustered R1 setups and clustered R3
setups.

<br>

This PR doesn't fully implement
[ADR-44](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-44.md).
There will be follow-up PRs later on to add support for:
- dynamic metadata, like `_nats.server.version` and
`_nats.server.api_level` that report the current server version and API
level the asset lives on:
  #5857
- reporting server JS API level through `jsz`, `varz` and `$JS.API.INFO`
(it is already reported in the logs during startup):
  #5855
- etc.

There are some slight inconsistencies between the ADR and this PR. For
example using the terms feature level, `api_version` and `api_level`
that all mean the same and are all made consistent to be `api_level`.
I'll correct the ADR with any changed details after this PR has been
merged.

Signed-off-by: Maurice van Veen <[email protected]>

---------

Signed-off-by: Maurice van Veen <[email protected]>
  • Loading branch information
MauriceVanVeen authored Sep 7, 2024
1 parent b715238 commit a10dc6a
Show file tree
Hide file tree
Showing 5 changed files with 645 additions and 2 deletions.
1 change: 1 addition & 0 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ func (s *Server) enableJetStream(cfg JetStreamConfig) error {
s.Noticef(" TPM File: %q, Pcr: %d", opts.JetStreamTpm.KeysFile,
opts.JetStreamTpm.Pcr)
}
s.Noticef(" API Level: %d", JSApiLevel)
s.Noticef("-------------------------------------------")

// Setup our internal subscriptions.
Expand Down
24 changes: 22 additions & 2 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1422,6 +1422,9 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account,
return
}

// Initialize asset version metadata.
setStaticStreamMetadata(&cfg.StreamConfig, nil)

streamName := streamNameFromSubject(subject)
if streamName != cfg.Name {
resp.Error = NewJSStreamMismatchError()
Expand Down Expand Up @@ -1557,6 +1560,9 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account,
return
}

// Update asset version metadata.
setStaticStreamMetadata(&cfg, &mset.cfg)

if err := mset.updatePedantic(&cfg, ncfg.Pedantic); err != nil {
resp.Error = NewJSStreamUpdateError(err, Unless(err))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
Expand Down Expand Up @@ -4028,12 +4034,17 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun
return
}

// If the consumer already exists then don't allow updating the PauseUntil, just set
// it back to whatever the current configured value is.
var oldCfg *ConsumerConfig
if o := stream.lookupConsumer(consumerName); o != nil {
oldCfg = &o.cfg
// If the consumer already exists then don't allow updating the PauseUntil, just set
// it back to whatever the current configured value is.
req.Config.PauseUntil = o.cfg.PauseUntil
}

// Initialize/update asset version metadata.
setStaticConsumerMetadata(&req.Config, oldCfg)

o, err := stream.addConsumerWithAction(&req.Config, req.Action, req.Pedantic)

if err != nil {
Expand Down Expand Up @@ -4589,6 +4600,11 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account
} else {
nca.Config.PauseUntil = nil
}

// Update asset version metadata due to updating pause/resume.
// Only PauseUntil is updated above, so reuse config for both.
setStaticConsumerMetadata(nca.Config, nca.Config)

eca := encodeAddConsumerAssignment(&nca)
cc.meta.Propose(eca)

Expand Down Expand Up @@ -4622,6 +4638,10 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account
ncfg.PauseUntil = nil
}

// Update asset version metadata due to updating pause/resume.
// Only PauseUntil is updated above, so reuse config for both.
setStaticConsumerMetadata(&ncfg, &ncfg)

if err := obs.updateConfig(&ncfg); err != nil {
// The only type of error that should be returned here is from o.store,
// so use a store failed error type.
Expand Down
14 changes: 14 additions & 0 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -6245,6 +6245,10 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
}

// Update asset version metadata.
setStaticStreamMetadata(cfg, osa.Config)

var newCfg *StreamConfig
if jsa := js.accounts[acc.Name]; jsa != nil {
js.mu.Unlock()
Expand Down Expand Up @@ -7283,6 +7287,9 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
// if name was set by the user.
if oname != _EMPTY_ {
if ca = sa.consumers[oname]; ca != nil && !ca.deleted {
// Provided config might miss metadata, copy from existing config.
copyConsumerMetadata(cfg, ca.Config)

if action == ActionCreate && !reflect.DeepEqual(cfg, ca.Config) {
resp.Error = NewJSConsumerAlreadyExistsError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
Expand All @@ -7297,6 +7304,13 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
}
}

// Initialize/update asset version metadata.
var oldCfg *ConsumerConfig
if ca != nil {
oldCfg = ca.Config
}
setStaticConsumerMetadata(cfg, oldCfg)

// If this is new consumer.
if ca == nil {
if action == ActionUpdate {
Expand Down
138 changes: 138 additions & 0 deletions server/jetstream_versioning.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright 2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import "strconv"

const (
// JSApiLevel is the maximum supported JetStream API level for this server.
JSApiLevel int = 1

JSCreatedVersionMetadataKey = "_nats.created.server.version"
JSCreatedLevelMetadataKey = "_nats.created.server.api_level"
JSRequiredLevelMetadataKey = "_nats.server.require.api_level"
)

// setStaticStreamMetadata sets JetStream stream metadata, like the server version and API level.
// Given:
// - cfg!=nil, prevCfg==nil add stream: adds created and required metadata
// - cfg!=nil, prevCfg!=nil update stream: created metadata is preserved, required metadata is updated
func setStaticStreamMetadata(cfg *StreamConfig, prevCfg *StreamConfig) {
if cfg.Metadata == nil {
cfg.Metadata = make(map[string]string)
}

var prevMetadata map[string]string
if prevCfg != nil {
prevMetadata = prevCfg.Metadata
if prevMetadata == nil {
// Initialize to empty to indicate we had a previous config but metadata was missing.
prevMetadata = make(map[string]string)
}
}
preserveCreatedMetadata(cfg.Metadata, prevMetadata)

var requiredApiLevel int
cfg.Metadata[JSRequiredLevelMetadataKey] = strconv.Itoa(requiredApiLevel)
}

// setStaticConsumerMetadata sets JetStream consumer metadata, like the server version and API level.
// Given:
// - cfg!=nil, prevCfg==nil add consumer: adds created and required metadata
// - cfg!=nil, prevCfg!=nil update consumer: created metadata is preserved, required metadata is updated
func setStaticConsumerMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfig) {
if cfg.Metadata == nil {
cfg.Metadata = make(map[string]string)
}

var prevMetadata map[string]string
if prevCfg != nil {
prevMetadata = prevCfg.Metadata
if prevMetadata == nil {
// Initialize to empty to indicate we had a previous config but metadata was missing.
prevMetadata = make(map[string]string)
}
}
preserveCreatedMetadata(cfg.Metadata, prevMetadata)

var requiredApiLevel int

// Added in 2.11, absent | zero is the feature is not used.
// one could be stricter and say even if its set but the time
// has already passed it is also not needed to restore the consumer
if cfg.PauseUntil != nil && !cfg.PauseUntil.IsZero() {
requiredApiLevel = 1
}

cfg.Metadata[JSRequiredLevelMetadataKey] = strconv.Itoa(requiredApiLevel)
}

// copyConsumerMetadata copies versioning fields from metadata of prevCfg into cfg.
// Removes versioning fields if no previous metadata, updates if set, and removes fields if it doesn't exist in prevCfg.
//
// Note: useful when doing equality checks on cfg and prevCfg, but ignoring any versioning metadata differences.
// MUST be followed up with a call to setStaticConsumerMetadata to fix potentially lost metadata.
func copyConsumerMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfig) {
// Remove fields when no previous metadata.
if prevCfg == nil || prevCfg.Metadata == nil {
if cfg.Metadata != nil {
delete(cfg.Metadata, JSCreatedVersionMetadataKey)
delete(cfg.Metadata, JSCreatedLevelMetadataKey)
delete(cfg.Metadata, JSRequiredLevelMetadataKey)
if len(cfg.Metadata) == 0 {
cfg.Metadata = nil
}
}
return
}

// Set if exists, delete otherwise.
setOrDeleteInMetadata(cfg, prevCfg, JSCreatedVersionMetadataKey)
setOrDeleteInMetadata(cfg, prevCfg, JSCreatedLevelMetadataKey)
setOrDeleteInMetadata(cfg, prevCfg, JSRequiredLevelMetadataKey)
}

// setOrDeleteInMetadata sets field with key/value in metadata of cfg if set, deletes otherwise.
func setOrDeleteInMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfig, key string) {
if value, ok := prevCfg.Metadata[key]; ok {
if cfg.Metadata == nil {
cfg.Metadata = make(map[string]string)
}
cfg.Metadata[key] = value
} else {
delete(cfg.Metadata, key)
}
}

// preserveCreatedMetadata sets metadata to contain which version and API level the asset was created on.
// Preserves previous metadata, if not set it initializes versions for the metadata.
func preserveCreatedMetadata(metadata, prevMetadata map[string]string) {
if prevMetadata == nil {
metadata[JSCreatedVersionMetadataKey] = VERSION
metadata[JSCreatedLevelMetadataKey] = strconv.Itoa(JSApiLevel)
return
}

// Preserve previous metadata if it was set, but delete if not since it could be user-provided.
if v := prevMetadata[JSCreatedVersionMetadataKey]; v != _EMPTY_ {
metadata[JSCreatedVersionMetadataKey] = v
} else {
delete(metadata, JSCreatedVersionMetadataKey)
}
if v := prevMetadata[JSCreatedLevelMetadataKey]; v != _EMPTY_ {
metadata[JSCreatedLevelMetadataKey] = v
} else {
delete(metadata, JSCreatedLevelMetadataKey)
}
}
Loading

0 comments on commit a10dc6a

Please sign in to comment.