Skip to content

Commit

Permalink
Merge pull request #572 from ripienaar/filter_meta
Browse files Browse the repository at this point in the history
Adds a utility to remove server metadata
  • Loading branch information
ripienaar authored Sep 10, 2024
2 parents 543abd8 + 82d1712 commit a94b384
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 0 deletions.
2 changes: 2 additions & 0 deletions api/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ const (
JSAdvisoryPrefix = "$JS.EVENT.ADVISORY"
JSApiAccountInfo = "$JS.API.INFO"

// also update FilterServerMetadata when this changes

JSMetaCreatedServerLevel = "_nats.created.server.api_level"
JSMetaCreatedServerVersion = "_nats.created.server.version"
JSMetaCurrentServerLevel = "_nats.server.api_level"
Expand Down
26 changes: 26 additions & 0 deletions jsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"regexp"
"slices"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -310,3 +311,28 @@ func ParseDuration(d string) (time.Duration, error) {

return time.Duration(neg) * r, nil
}

// FilterServerMetadata removes a copy of metadata with the server generated metadata removed
func FilterServerMetadata(metadata map[string]string) map[string]string {
if metadata == nil {
return nil
}

nm := map[string]string{}
reserved := []string{
api.JSMetaCurrentServerVersion,
api.JSMetaCurrentServerLevel,
api.JSMetaCreatedServerVersion,
api.JSMetaCreatedServerVersion,
api.JsMetaRequiredServerLevel,
api.JSMetaCreatedServerLevel,
}

for k, v := range metadata {
if !slices.Contains(reserved, k) {
nm[k] = v
}
}

return nm
}
36 changes: 36 additions & 0 deletions test/jsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package test
import (
"encoding/json"
"fmt"
"reflect"
"testing"

"github.com/nats-io/jsm.go/api"
Expand Down Expand Up @@ -145,3 +146,38 @@ func TestIsNatsError(t *testing.T) {
t.Fatalf("Non api error is 10077")
}
}

func TestFilterServerMetadata(t *testing.T) {
srv, nc, mgr := startJSServer(t)
defer srv.Shutdown()
defer nc.Flush()

s, err := mgr.NewStream("q1", jsm.Subjects("in.q1"), jsm.StreamMetadata(map[string]string{
"io.nats.monitor.enabled": "1",
"io.nats.monitor.lag-critical": "100",
"io.nats.monitor.msgs-critical": "500",
"io.nats.monitor.msgs-warn": "999",
"io.nats.monitor.peer-expect": "3",
"io.nats.monitor.seen-critical": "5m",
}))
checkErr(t, err, "create failed")

if _, ok := s.Metadata()[api.JsMetaRequiredServerLevel]; !ok {
t.Fatalf("No server metadata added")
}

newMeta := jsm.FilterServerMetadata(s.Metadata())

expected := map[string]string{
"io.nats.monitor.enabled": "1",
"io.nats.monitor.lag-critical": "100",
"io.nats.monitor.msgs-critical": "500",
"io.nats.monitor.msgs-warn": "999",
"io.nats.monitor.peer-expect": "3",
"io.nats.monitor.seen-critical": "5m",
}

if !reflect.DeepEqual(newMeta, expected) {
t.Fatalf("all server data was not removed: %v", newMeta)
}
}

0 comments on commit a94b384

Please sign in to comment.