Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go/store/types: Move to a safepoint controller which will allow a caller better control over when to take actions while the GC is running. #8780

Merged
merged 3 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go/libraries/doltcore/doltdb/doltdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1728,7 +1728,7 @@ func (ddb *DoltDB) Rebase(ctx context.Context) error {
// until no possibly-stale ChunkStore state is retained in memory, or failing
// certain in-progress operations which cannot be finalized in a timely manner,
// etc.
func (ddb *DoltDB) GC(ctx context.Context, mode types.GCMode, safepointF func() error) error {
func (ddb *DoltDB) GC(ctx context.Context, mode types.GCMode, safepointController types.GCSafepointController) error {
collector, ok := ddb.db.Database.(datas.GarbageCollector)
if !ok {
return fmt.Errorf("this database does not support garbage collection")
Expand Down Expand Up @@ -1772,7 +1772,7 @@ func (ddb *DoltDB) GC(ctx context.Context, mode types.GCMode, safepointF func()
return err
}

return collector.GC(ctx, mode, oldGen, newGen, safepointF)
return collector.GC(ctx, mode, oldGen, newGen, safepointController)
}

func (ddb *DoltDB) ShallowGC(ctx context.Context) error {
Expand Down
131 changes: 81 additions & 50 deletions go/libraries/doltcore/sqle/dprocedures/dolt_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package dprocedures

import (
"context"
"errors"
"fmt"
"os"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/branch_control"
"github.com/dolthub/dolt/go/libraries/doltcore/dconfig"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/types"
)

Expand Down Expand Up @@ -57,6 +59,29 @@ func doltGC(ctx *sql.Context, args ...string) (sql.RowIter, error) {

var ErrServerPerformedGC = errors.New("this connection was established when this server performed an online garbage collection. this connection can no longer be used. please reconnect.")

type safepointController struct {
begin func(context.Context, func(hash.Hash) bool) error
preFinalize func(context.Context) error
postFinalize func(context.Context) error
cancel func()
}

func (sc safepointController) BeginGC(ctx context.Context, keeper func(hash.Hash) bool) error {
return sc.begin(ctx, keeper)
}

func (sc safepointController) EstablishPreFinalizeSafepoint(ctx context.Context) error {
return sc.preFinalize(ctx)
}

func (sc safepointController) EstablishPostFinalizeSafepoint(ctx context.Context) error {
return sc.postFinalize(ctx)
}

func (sc safepointController) CancelSafepoint() {
sc.cancel()
}

func doDoltGC(ctx *sql.Context, args []string) (int, error) {
dbName := ctx.GetCurrentDatabase()

Expand Down Expand Up @@ -116,66 +141,72 @@ func doDoltGC(ctx *sql.Context, args []string) (int, error) {
mode = types.GCModeFull
}

// TODO: If we got a callback at the beginning and an
// (allowed-to-block) callback at the end, we could more
// gracefully tear things down.
err = ddb.GC(ctx, mode, func() error {
if origepoch != -1 {
// Here we need to sanity check role and epoch.
if _, role, ok := sql.SystemVariables.GetGlobal(dsess.DoltClusterRoleVariable); ok {
if role.(string) != "primary" {
return fmt.Errorf("dolt_gc failed: when we began we were a primary in a cluster, but now our role is %s", role.(string))
}
_, epoch, ok := sql.SystemVariables.GetGlobal(dsess.DoltClusterRoleEpochVariable)
if !ok {
return fmt.Errorf("dolt_gc failed: when we began we were a primary in a cluster, but we can no longer read the cluster role epoch.")
// TODO: Implement safepointController so that begin can capture inflight sessions
// and preFinalize can ensure they're all in a good place before returning.
sc := safepointController{
begin: func(context.Context, func(hash.Hash) bool) error { return nil },
preFinalize: func(context.Context) error { return nil },
postFinalize: func(context.Context) error {
if origepoch != -1 {
// Here we need to sanity check role and epoch.
if _, role, ok := sql.SystemVariables.GetGlobal(dsess.DoltClusterRoleVariable); ok {
if role.(string) != "primary" {
return fmt.Errorf("dolt_gc failed: when we began we were a primary in a cluster, but now our role is %s", role.(string))
}
_, epoch, ok := sql.SystemVariables.GetGlobal(dsess.DoltClusterRoleEpochVariable)
if !ok {
return fmt.Errorf("dolt_gc failed: when we began we were a primary in a cluster, but we can no longer read the cluster role epoch.")
}
if origepoch != epoch.(int) {
return fmt.Errorf("dolt_gc failed: when we began we were primary in the cluster at epoch %d, but now we are at epoch %d. for gc to safely finalize, our role and epoch must not change throughout the gc.", origepoch, epoch.(int))
}
} else {
return fmt.Errorf("dolt_gc failed: when we began we were a primary in a cluster, but we can no longer read the cluster role.")
}
if origepoch != epoch.(int) {
return fmt.Errorf("dolt_gc failed: when we began we were primary in the cluster at epoch %d, but now we are at epoch %d. for gc to safely finalize, our role and epoch must not change throughout the gc.", origepoch, epoch.(int))
}
} else {
return fmt.Errorf("dolt_gc failed: when we began we were a primary in a cluster, but we can no longer read the cluster role.")
}
}

killed := make(map[uint32]struct{})
processes := ctx.ProcessList.Processes()
for _, p := range processes {
if p.Connection != ctx.Session.ID() {
// Kill any inflight query.
ctx.ProcessList.Kill(p.Connection)
// Tear down the connection itself.
ctx.KillConnection(p.Connection)
killed[p.Connection] = struct{}{}
}
}

// Look in processes until the connections are actually gone.
params := backoff.NewExponentialBackOff()
params.InitialInterval = 1 * time.Millisecond
params.MaxInterval = 25 * time.Millisecond
params.MaxElapsedTime = 3 * time.Second
err := backoff.Retry(func() error {
killed := make(map[uint32]struct{})
processes := ctx.ProcessList.Processes()
allgood := true
for _, p := range processes {
if _, ok := killed[p.Connection]; ok {
allgood = false
if p.Connection != ctx.Session.ID() {
// Kill any inflight query.
ctx.ProcessList.Kill(p.Connection)
// Tear down the connection itself.
ctx.KillConnection(p.Connection)
killed[p.Connection] = struct{}{}
}
}
if !allgood {
return errors.New("unable to establish safepoint.")

// Look in processes until the connections are actually gone.
params := backoff.NewExponentialBackOff()
params.InitialInterval = 1 * time.Millisecond
params.MaxInterval = 25 * time.Millisecond
params.MaxElapsedTime = 3 * time.Second
err := backoff.Retry(func() error {
processes := ctx.ProcessList.Processes()
allgood := true
for _, p := range processes {
if _, ok := killed[p.Connection]; ok {
allgood = false
ctx.ProcessList.Kill(p.Connection)
}
}
if !allgood {
return errors.New("unable to establish safepoint.")
}
return nil
}, params)
if err != nil {
return err
}
ctx.Session.SetTransaction(nil)
dsess.DSessFromSess(ctx.Session).SetValidateErr(ErrServerPerformedGC)
return nil
}, params)
if err != nil {
return err
}
ctx.Session.SetTransaction(nil)
dsess.DSessFromSess(ctx.Session).SetValidateErr(ErrServerPerformedGC)
return nil
})
},
cancel: func() {},
}

err = ddb.GC(ctx, mode, sc)
if err != nil {
return cmdFailure, err
}
Expand Down
4 changes: 2 additions & 2 deletions go/libraries/doltcore/sqle/enginetest/dolt_queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -7671,7 +7671,7 @@ var DoltTempTableScripts = []queries.ScriptTest{
},
},
{
Name: "drop temporary table behavior",
Name: "drop temporary table behavior",
Dialect: "mysql",
SetUpScript: []string{
"create table t (i int);",
Expand Down Expand Up @@ -7723,7 +7723,7 @@ var DoltTempTableScripts = []queries.ScriptTest{
},
},
{
Query: "drop temporary table t;",
Query: "drop temporary table t;",
ExpectedErr: sql.ErrUnknownTable,
},
},
Expand Down
2 changes: 1 addition & 1 deletion go/store/datas/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ type GarbageCollector interface {

// GC traverses the database starting at the Root and removes
// all unreferenced data from persistent storage.
GC(ctx context.Context, mode types.GCMode, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error
GC(ctx context.Context, mode types.GCMode, oldGenRefs, newGenRefs hash.HashSet, safepointController types.GCSafepointController) error
}

// CanUsePuller returns true if a datas.Puller can be used to pull data from one Database into another. Not all
Expand Down
4 changes: 2 additions & 2 deletions go/store/datas/database_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -1168,8 +1168,8 @@ func (db *database) doDelete(ctx context.Context, datasetIDstr string, workingse
}

// GC traverses the database starting at the Root and removes all unreferenced data from persistent storage.
func (db *database) GC(ctx context.Context, mode types.GCMode, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error {
return db.ValueStore.GC(ctx, mode, oldGenRefs, newGenRefs, safepointF)
func (db *database) GC(ctx context.Context, mode types.GCMode, oldGenRefs, newGenRefs hash.HashSet, safepointController types.GCSafepointController) error {
return db.ValueStore.GC(ctx, mode, oldGenRefs, newGenRefs, safepointController)
}

func (db *database) tryCommitChunks(ctx context.Context, newRootHash hash.Hash, currentRootHash hash.Hash) error {
Expand Down
57 changes: 51 additions & 6 deletions go/store/types/value_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,8 +565,15 @@ const (
GCModeFull
)

type GCSafepointController interface {
BeginGC(ctx context.Context, keeper func(h hash.Hash) bool) error
EstablishPreFinalizeSafepoint(context.Context) error
EstablishPostFinalizeSafepoint(context.Context) error
CancelSafepoint()
}

// GC traverses the ValueStore from the root and removes unreferenced chunks from the ChunkStore
func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error {
func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRefs hash.HashSet, safepoint GCSafepointController) error {
lvs.versOnce.Do(lvs.expectVersion)

lvs.transitionToOldGenGC()
Expand Down Expand Up @@ -600,6 +607,20 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
}
defer collector.EndGC()

var callCancelSafepoint bool
if safepoint != nil {
err = safepoint.BeginGC(ctx, lvs.gcAddChunk)
if err != nil {
return err
}
callCancelSafepoint = true
defer func() {
if callCancelSafepoint {
safepoint.CancelSafepoint()
}
}()
}

root, err := lvs.Root(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -634,10 +655,11 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
oldGenHasMany = newFileHasMany
}

newGenFinalizer, err = lvs.gc(ctx, newGenRefs, oldGenHasMany, chksMode, collector, newGen, safepointF, lvs.transitionToFinalizingGC)
newGenFinalizer, err = lvs.gc(ctx, newGenRefs, oldGenHasMany, chksMode, collector, newGen, safepoint, lvs.transitionToFinalizingGC)
if err != nil {
return err
}
callCancelSafepoint = false

err = newGenFinalizer.SwapChunksInStore(ctx)
if err != nil {
Expand Down Expand Up @@ -669,6 +691,20 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
}
defer collector.EndGC()

var callCancelSafepoint bool
if safepoint != nil {
err = safepoint.BeginGC(ctx, lvs.gcAddChunk)
if err != nil {
return err
}
callCancelSafepoint = true
defer func() {
if callCancelSafepoint {
safepoint.CancelSafepoint()
}
}()
}

root, err := lvs.Root(ctx)
if err != nil {
return err
Expand All @@ -682,10 +718,11 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
newGenRefs.Insert(root)

var finalizer chunks.GCFinalizer
finalizer, err = lvs.gc(ctx, newGenRefs, unfilteredHashFunc, chunks.GCMode_Full, collector, collector, safepointF, lvs.transitionToFinalizingGC)
finalizer, err = lvs.gc(ctx, newGenRefs, unfilteredHashFunc, chunks.GCMode_Full, collector, collector, safepoint, lvs.transitionToFinalizingGC)
if err != nil {
return err
}
callCancelSafepoint = false

err = finalizer.SwapChunksInStore(ctx)
if err != nil {
Expand Down Expand Up @@ -718,7 +755,7 @@ func (lvs *ValueStore) gc(ctx context.Context,
hashFilter chunks.HasManyFunc,
chksMode chunks.GCMode,
src, dest chunks.ChunkStoreGarbageCollector,
safepointF func() error,
safepointController GCSafepointController,
finalize func() hash.HashSet) (chunks.GCFinalizer, error) {
sweeper, err := src.MarkAndSweepChunks(ctx, lvs.getAddrs, hashFilter, dest, chksMode)
if err != nil {
Expand All @@ -732,6 +769,14 @@ func (lvs *ValueStore) gc(ctx context.Context,
}
toVisit = nil

if safepointController != nil {
err = safepointController.EstablishPreFinalizeSafepoint(ctx)
if err != nil {
cErr := sweeper.Close(ctx)
return nil, errors.Join(err, cErr)
}
}

// Before we call finalize(), we can process the current set of
// NewGenToVisit. NewGen -> Finalize is going to block writes until
// we are done, so its best to keep it as small as possible.
Expand All @@ -750,8 +795,8 @@ func (lvs *ValueStore) gc(ctx context.Context,
return nil, errors.Join(err, cErr)
}

if safepointF != nil {
err = safepointF()
if safepointController != nil {
err = safepointController.EstablishPostFinalizeSafepoint(ctx)
if err != nil {
cErr := sweeper.Close(ctx)
return nil, errors.Join(err, cErr)
Expand Down
Loading