diff --git a/go/store/chunks/chunk_store.go b/go/store/chunks/chunk_store.go index 82c52cac656..19bf4d07f96 100644 --- a/go/store/chunks/chunk_store.go +++ b/go/store/chunks/chunk_store.go @@ -172,7 +172,9 @@ type MarkAndSweeper interface { // chunks is accessed and copied. SaveHashes(context.Context, []hash.Hash) error - Close(context.Context) (GCFinalizer, error) + Finalize(context.Context) (GCFinalizer, error) + + Close(context.Context) error } // A GCFinalizer is returned from a MarkAndSweeper after it is closed. diff --git a/go/store/chunks/memory_store.go b/go/store/chunks/memory_store.go index 93643c17c17..a7fd1ae5725 100644 --- a/go/store/chunks/memory_store.go +++ b/go/store/chunks/memory_store.go @@ -398,10 +398,14 @@ func (i *msvMarkAndSweeper) SaveHashes(ctx context.Context, hashes []hash.Hash) return nil } -func (i *msvMarkAndSweeper) Close(context.Context) (GCFinalizer, error) { +func (i *msvMarkAndSweeper) Finalize(context.Context) (GCFinalizer, error) { return msvGcFinalizer{i.ms, i.keepers}, nil } +func (i *msvMarkAndSweeper) Close(context.Context) error { + return nil +} + func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, getAddrs GetAddrsCurry, filter HasManyFunc, dest ChunkStore, mode GCMode) (MarkAndSweeper, error) { if dest != ms { panic("unsupported") diff --git a/go/store/nbs/cmp_chunk_table_writer.go b/go/store/nbs/cmp_chunk_table_writer.go index 905154463ea..28cdde95630 100644 --- a/go/store/nbs/cmp_chunk_table_writer.go +++ b/go/store/nbs/cmp_chunk_table_writer.go @@ -173,6 +173,21 @@ func (tw *CmpChunkTableWriter) Remove() error { return os.Remove(tw.path) } +// Cancel the inprogress write and attempt to cleanup any +// resources associated with it. It is an error to call +// Flush{,ToFile} or Reader after canceling the writer. +func (tw *CmpChunkTableWriter) Cancel() error { + closer, err := tw.sink.Reader() + if err != nil { + return err + } + err = closer.Close() + if err != nil { + return err + } + return tw.Remove() +} + func containsDuplicates(prefixes prefixIndexSlice) bool { if len(prefixes) == 0 { return false diff --git a/go/store/nbs/gc_copier.go b/go/store/nbs/gc_copier.go index 5092a5deeae..44c02df8ef7 100644 --- a/go/store/nbs/gc_copier.go +++ b/go/store/nbs/gc_copier.go @@ -45,21 +45,28 @@ func (ea gcErrAccum) Error() string { type gcCopier struct { writer *CmpChunkTableWriter + tfp tableFilePersister } -func newGarbageCollectionCopier() (*gcCopier, error) { +func newGarbageCollectionCopier(tfp tableFilePersister) (*gcCopier, error) { writer, err := NewCmpChunkTableWriter("") if err != nil { return nil, err } - return &gcCopier{writer}, nil + return &gcCopier{writer, tfp}, nil } func (gcc *gcCopier) addChunk(ctx context.Context, c CompressedChunk) error { return gcc.writer.AddCmpChunk(c) } -func (gcc *gcCopier) copyTablesToDir(ctx context.Context, tfp tableFilePersister) (ts []tableSpec, err error) { +// If the writer should be closed and deleted, instead of being used with +// copyTablesToDir, call this method. +func (gcc *gcCopier) cancel(_ context.Context) error { + return gcc.writer.Cancel() +} + +func (gcc *gcCopier) copyTablesToDir(ctx context.Context) (ts []tableSpec, err error) { var filename string filename, err = gcc.writer.Finish() if err != nil { @@ -79,7 +86,7 @@ func (gcc *gcCopier) copyTablesToDir(ctx context.Context, tfp tableFilePersister return nil, fmt.Errorf("invalid filename: %s", filename) } - exists, err := tfp.Exists(ctx, addr, uint32(gcc.writer.ChunkCount()), nil) + exists, err := gcc.tfp.Exists(ctx, addr, uint32(gcc.writer.ChunkCount()), nil) if err != nil { return nil, err } @@ -94,7 +101,7 @@ func (gcc *gcCopier) copyTablesToDir(ctx context.Context, tfp tableFilePersister } // Attempt to rename the file to the destination if we are working with a fsTablePersister... - if mover, ok := tfp.(movingTableFilePersister); ok { + if mover, ok := gcc.tfp.(movingTableFilePersister); ok { err = mover.TryMoveCmpChunkTableWriter(ctx, filename, gcc.writer) if err == nil { return []tableSpec{ @@ -114,7 +121,7 @@ func (gcc *gcCopier) copyTablesToDir(ctx context.Context, tfp tableFilePersister defer r.Close() sz := gcc.writer.ContentLength() - err = tfp.CopyTableFile(ctx, r, filename, sz, uint32(gcc.writer.ChunkCount())) + err = gcc.tfp.CopyTableFile(ctx, r, filename, sz, uint32(gcc.writer.ChunkCount())) if err != nil { return nil, err } diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index 7ddc44a001a..7f08eb62c99 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -1652,7 +1652,7 @@ func markAndSweepChunks(ctx context.Context, nbs *NomsBlockStore, src NBSCompres return nil, fmt.Errorf("NBS does not support copying garbage collection") } - gcc, err := newGarbageCollectionCopier() + gcc, err := newGarbageCollectionCopier(tfp) if err != nil { return nil, err } @@ -1758,11 +1758,12 @@ func (i *markAndSweeper) SaveHashes(ctx context.Context, hashes []hash.Hash) err return nil } -func (i *markAndSweeper) Close(ctx context.Context) (chunks.GCFinalizer, error) { - specs, err := i.gcc.copyTablesToDir(ctx, i.tfp) +func (i *markAndSweeper) Finalize(ctx context.Context) (chunks.GCFinalizer, error) { + specs, err := i.gcc.copyTablesToDir(ctx) if err != nil { return nil, err } + i.gcc = nil return gcFinalizer{ nbs: i.dest, @@ -1771,6 +1772,13 @@ func (i *markAndSweeper) Close(ctx context.Context) (chunks.GCFinalizer, error) }, nil } +func (i *markAndSweeper) Close(ctx context.Context) error { + if i.gcc != nil { + return i.gcc.cancel(ctx) + } + return nil +} + type gcFinalizer struct { nbs *NomsBlockStore specs []tableSpec diff --git a/go/store/nbs/store_test.go b/go/store/nbs/store_test.go index 02c92bb7a97..90f204bc996 100644 --- a/go/store/nbs/store_test.go +++ b/go/store/nbs/store_test.go @@ -345,8 +345,9 @@ func TestNBSCopyGC(t *testing.T) { keepersSlice = append(keepersSlice, h) } require.NoError(t, sweeper.SaveHashes(ctx, keepersSlice)) - finalizer, err := sweeper.Close(ctx) + finalizer, err := sweeper.Finalize(ctx) require.NoError(t, err) + require.NoError(t, sweeper.Close(ctx)) require.NoError(t, finalizer.SwapChunksInStore(ctx)) st.EndGC() diff --git a/go/store/types/value_store.go b/go/store/types/value_store.go index 7d0984831d7..02a2b897827 100644 --- a/go/store/types/value_store.go +++ b/go/store/types/value_store.go @@ -727,7 +727,7 @@ func (lvs *ValueStore) gc(ctx context.Context, err = sweeper.SaveHashes(ctx, toVisit.ToSlice()) if err != nil { - _, cErr := sweeper.Close(ctx) + cErr := sweeper.Close(ctx) return nil, errors.Join(err, cErr) } toVisit = nil @@ -738,7 +738,7 @@ func (lvs *ValueStore) gc(ctx context.Context, next := lvs.readAndResetNewGenToVisit() err = sweeper.SaveHashes(ctx, next.ToSlice()) if err != nil { - _, cErr := sweeper.Close(ctx) + cErr := sweeper.Close(ctx) return nil, errors.Join(err, cErr) } next = nil @@ -746,18 +746,22 @@ func (lvs *ValueStore) gc(ctx context.Context, final := finalize() err = sweeper.SaveHashes(ctx, final.ToSlice()) if err != nil { - _, cErr := sweeper.Close(ctx) + cErr := sweeper.Close(ctx) return nil, errors.Join(err, cErr) } if safepointF != nil { err = safepointF() if err != nil { - _, cErr := sweeper.Close(ctx) + cErr := sweeper.Close(ctx) return nil, errors.Join(err, cErr) } } - return sweeper.Close(ctx) + finalizer, err := sweeper.Finalize(ctx) + if err != nil { + return nil, err + } + return finalizer, sweeper.Close(ctx) } // Close closes the underlying ChunkStore