From af81a652c3ced4b10460c6abcb1fc378ada6c206 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sun, 16 Jun 2024 18:04:53 -0700 Subject: [PATCH] On a bad snapshot restore, e.g. R3 on single server, make sure to remove restored stream. We did not remove it which could cause bad healthz reports on single servers for instance. Signed-off-by: Derek Collison --- server/jetstream_cluster_3_test.go | 38 ++++++++++++++++++++++++++++-- server/stream.go | 2 ++ 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 89388b8c64b..d27a949a0fe 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -4735,7 +4735,7 @@ func TestJetStreamClusterSnapshotAndRestoreWithHealthz(t *testing.T) { rresp.Error = nil json.Unmarshal(rmsg.Data, &rresp) - require_True(t, resp.Error == nil) + require_True(t, rresp.Error == nil) checkHealth() @@ -4757,7 +4757,7 @@ func TestJetStreamClusterSnapshotAndRestoreWithHealthz(t *testing.T) { require_NoError(t, err) rresp.Error = nil json.Unmarshal(rmsg.Data, &rresp) - require_True(t, resp.Error == nil) + require_True(t, rresp.Error == nil) si, err := js.StreamInfo("TEST") require_NoError(t, err) @@ -4770,6 +4770,40 @@ func TestJetStreamClusterSnapshotAndRestoreWithHealthz(t *testing.T) { si, err = js.StreamInfo("TEST") require_NoError(t, err) require_True(t, si.State.Msgs == uint64(toSend)) + + // Now make sure if we try to restore to a single server that the artifact is cleaned up and the server returns ok for healthz. + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, _ = jsClientConnect(t, s) + defer nc.Close() + + rmsg, err = nc.Request(fmt.Sprintf(JSApiStreamRestoreT, "TEST"), req, 5*time.Second) + require_NoError(t, err) + + rresp.Error = nil + json.Unmarshal(rmsg.Data, &rresp) + require_True(t, rresp.Error == nil) + + for i, r := 0, bytes.NewReader(snapshot); ; { + n, err := r.Read(chunk[:]) + if err != nil { + break + } + _, err = nc.Request(rresp.DeliverSubject, chunk[:n], time.Second) + require_NoError(t, err) + i++ + } + rmsg, err = nc.Request(rresp.DeliverSubject, nil, time.Second) + require_NoError(t, err) + rresp.Error = nil + json.Unmarshal(rmsg.Data, &rresp) + + require_True(t, rresp.Error != nil) + require_Equal(t, rresp.ApiResponse.Error.ErrCode, 10074) + + status := s.healthz(nil) + require_Equal(t, status.StatusCode, 200) } func TestJetStreamClusterBinaryStreamSnapshotCapability(t *testing.T) { diff --git a/server/stream.go b/server/stream.go index bb0dcf1d0c3..5fcb0dac34f 100644 --- a/server/stream.go +++ b/server/stream.go @@ -6106,6 +6106,8 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error } mset, err := a.addStream(&cfg) if err != nil { + // Make sure to clean up after ourselves here. + os.RemoveAll(ndir) return nil, err } if !fcfg.Created.IsZero() {