Skip to content

Commit

Permalink
Merge pull request #8782 from dolthub/aaron/gc-finalizer-error-cleanup
Browse files Browse the repository at this point in the history
[no-release-notes]: go/store/nbs: Improve cleanup if we encounter an error during GC.
  • Loading branch information
reltuk authored Jan 24, 2025
2 parents 009ab93 + 591e29c commit 25657eb
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 17 deletions.
4 changes: 3 additions & 1 deletion go/store/chunks/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,9 @@ type MarkAndSweeper interface {
// chunks is accessed and copied.
SaveHashes(context.Context, []hash.Hash) error

Close(context.Context) (GCFinalizer, error)
Finalize(context.Context) (GCFinalizer, error)

Close(context.Context) error
}

// A GCFinalizer is returned from a MarkAndSweeper after it is closed.
Expand Down
6 changes: 5 additions & 1 deletion go/store/chunks/memory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,10 +398,14 @@ func (i *msvMarkAndSweeper) SaveHashes(ctx context.Context, hashes []hash.Hash)
return nil
}

func (i *msvMarkAndSweeper) Close(context.Context) (GCFinalizer, error) {
func (i *msvMarkAndSweeper) Finalize(context.Context) (GCFinalizer, error) {
return msvGcFinalizer{i.ms, i.keepers}, nil
}

func (i *msvMarkAndSweeper) Close(context.Context) error {
return nil
}

func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, getAddrs GetAddrsCurry, filter HasManyFunc, dest ChunkStore, mode GCMode) (MarkAndSweeper, error) {
if dest != ms {
panic("unsupported")
Expand Down
15 changes: 15 additions & 0 deletions go/store/nbs/cmp_chunk_table_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,21 @@ func (tw *CmpChunkTableWriter) Remove() error {
return os.Remove(tw.path)
}

// Cancel the inprogress write and attempt to cleanup any
// resources associated with it. It is an error to call
// Flush{,ToFile} or Reader after canceling the writer.
func (tw *CmpChunkTableWriter) Cancel() error {
closer, err := tw.sink.Reader()
if err != nil {
return err
}
err = closer.Close()
if err != nil {
return err
}
return tw.Remove()
}

func containsDuplicates(prefixes prefixIndexSlice) bool {
if len(prefixes) == 0 {
return false
Expand Down
19 changes: 13 additions & 6 deletions go/store/nbs/gc_copier.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,28 @@ func (ea gcErrAccum) Error() string {

type gcCopier struct {
writer *CmpChunkTableWriter
tfp tableFilePersister
}

func newGarbageCollectionCopier() (*gcCopier, error) {
func newGarbageCollectionCopier(tfp tableFilePersister) (*gcCopier, error) {
writer, err := NewCmpChunkTableWriter("")
if err != nil {
return nil, err
}
return &gcCopier{writer}, nil
return &gcCopier{writer, tfp}, nil
}

func (gcc *gcCopier) addChunk(ctx context.Context, c CompressedChunk) error {
return gcc.writer.AddCmpChunk(c)
}

func (gcc *gcCopier) copyTablesToDir(ctx context.Context, tfp tableFilePersister) (ts []tableSpec, err error) {
// If the writer should be closed and deleted, instead of being used with
// copyTablesToDir, call this method.
func (gcc *gcCopier) cancel(_ context.Context) error {
return gcc.writer.Cancel()
}

func (gcc *gcCopier) copyTablesToDir(ctx context.Context) (ts []tableSpec, err error) {
var filename string
filename, err = gcc.writer.Finish()
if err != nil {
Expand All @@ -79,7 +86,7 @@ func (gcc *gcCopier) copyTablesToDir(ctx context.Context, tfp tableFilePersister
return nil, fmt.Errorf("invalid filename: %s", filename)
}

exists, err := tfp.Exists(ctx, addr, uint32(gcc.writer.ChunkCount()), nil)
exists, err := gcc.tfp.Exists(ctx, addr, uint32(gcc.writer.ChunkCount()), nil)
if err != nil {
return nil, err
}
Expand All @@ -94,7 +101,7 @@ func (gcc *gcCopier) copyTablesToDir(ctx context.Context, tfp tableFilePersister
}

// Attempt to rename the file to the destination if we are working with a fsTablePersister...
if mover, ok := tfp.(movingTableFilePersister); ok {
if mover, ok := gcc.tfp.(movingTableFilePersister); ok {
err = mover.TryMoveCmpChunkTableWriter(ctx, filename, gcc.writer)
if err == nil {
return []tableSpec{
Expand All @@ -114,7 +121,7 @@ func (gcc *gcCopier) copyTablesToDir(ctx context.Context, tfp tableFilePersister
defer r.Close()
sz := gcc.writer.ContentLength()

err = tfp.CopyTableFile(ctx, r, filename, sz, uint32(gcc.writer.ChunkCount()))
err = gcc.tfp.CopyTableFile(ctx, r, filename, sz, uint32(gcc.writer.ChunkCount()))
if err != nil {
return nil, err
}
Expand Down
14 changes: 11 additions & 3 deletions go/store/nbs/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1652,7 +1652,7 @@ func markAndSweepChunks(ctx context.Context, nbs *NomsBlockStore, src NBSCompres
return nil, fmt.Errorf("NBS does not support copying garbage collection")
}

gcc, err := newGarbageCollectionCopier()
gcc, err := newGarbageCollectionCopier(tfp)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1758,11 +1758,12 @@ func (i *markAndSweeper) SaveHashes(ctx context.Context, hashes []hash.Hash) err
return nil
}

func (i *markAndSweeper) Close(ctx context.Context) (chunks.GCFinalizer, error) {
specs, err := i.gcc.copyTablesToDir(ctx, i.tfp)
func (i *markAndSweeper) Finalize(ctx context.Context) (chunks.GCFinalizer, error) {
specs, err := i.gcc.copyTablesToDir(ctx)
if err != nil {
return nil, err
}
i.gcc = nil

return gcFinalizer{
nbs: i.dest,
Expand All @@ -1771,6 +1772,13 @@ func (i *markAndSweeper) Close(ctx context.Context) (chunks.GCFinalizer, error)
}, nil
}

func (i *markAndSweeper) Close(ctx context.Context) error {
if i.gcc != nil {
return i.gcc.cancel(ctx)
}
return nil
}

type gcFinalizer struct {
nbs *NomsBlockStore
specs []tableSpec
Expand Down
3 changes: 2 additions & 1 deletion go/store/nbs/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,9 @@ func TestNBSCopyGC(t *testing.T) {
keepersSlice = append(keepersSlice, h)
}
require.NoError(t, sweeper.SaveHashes(ctx, keepersSlice))
finalizer, err := sweeper.Close(ctx)
finalizer, err := sweeper.Finalize(ctx)
require.NoError(t, err)
require.NoError(t, sweeper.Close(ctx))
require.NoError(t, finalizer.SwapChunksInStore(ctx))
st.EndGC()

Expand Down
14 changes: 9 additions & 5 deletions go/store/types/value_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ func (lvs *ValueStore) gc(ctx context.Context,

err = sweeper.SaveHashes(ctx, toVisit.ToSlice())
if err != nil {
_, cErr := sweeper.Close(ctx)
cErr := sweeper.Close(ctx)
return nil, errors.Join(err, cErr)
}
toVisit = nil
Expand All @@ -738,26 +738,30 @@ func (lvs *ValueStore) gc(ctx context.Context,
next := lvs.readAndResetNewGenToVisit()
err = sweeper.SaveHashes(ctx, next.ToSlice())
if err != nil {
_, cErr := sweeper.Close(ctx)
cErr := sweeper.Close(ctx)
return nil, errors.Join(err, cErr)
}
next = nil

final := finalize()
err = sweeper.SaveHashes(ctx, final.ToSlice())
if err != nil {
_, cErr := sweeper.Close(ctx)
cErr := sweeper.Close(ctx)
return nil, errors.Join(err, cErr)
}

if safepointF != nil {
err = safepointF()
if err != nil {
_, cErr := sweeper.Close(ctx)
cErr := sweeper.Close(ctx)
return nil, errors.Join(err, cErr)
}
}
return sweeper.Close(ctx)
finalizer, err := sweeper.Finalize(ctx)
if err != nil {
return nil, err
}
return finalizer, sweeper.Close(ctx)
}

// Close closes the underlying ChunkStore
Expand Down

0 comments on commit 25657eb

Please sign in to comment.