From c028c4169349340a6fac2e753f3080657a25c8ea Mon Sep 17 00:00:00 2001 From: Alex Angelini Date: Thu, 21 Mar 2024 12:10:43 +0100 Subject: [PATCH] GC fixes --- internal/db/content.go | 4 ++-- internal/db/db.go | 3 ++- internal/db/gc.go | 13 +++++------ internal/db/project.go | 4 ++-- internal/testutil/db.go | 8 +++++++ pkg/api/fs.go | 50 ++++++----------------------------------- pkg/cli/client.go | 3 --- pkg/server/server.go | 4 ++++ 8 files changed, 31 insertions(+), 58 deletions(-) diff --git a/internal/db/content.go b/internal/db/content.go index 520bc2d..8fc6924 100644 --- a/internal/db/content.go +++ b/internal/db/content.go @@ -240,8 +240,8 @@ func (cl *ContentLookup) Lookup(ctx context.Context, tx pgx.Tx, hashesToLookup m return contents, nil } -func RandomContents(ctx context.Context, tx pgx.Tx, sample float32) ([]Hash, error) { - rows, err := tx.Query(ctx, fmt.Sprintf(` +func RandomContents(ctx context.Context, conn DbConnector, sample float32) ([]Hash, error) { + rows, err := conn.Query(ctx, fmt.Sprintf(` SELECT (hash).h1, (hash).h2 FROM dl.contents TABLESAMPLE SYSTEM(%f) diff --git a/internal/db/db.go b/internal/db/db.go index 27945e6..33185ad 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -10,6 +10,7 @@ import ( type CloseFunc func(context.Context) type DbConnector interface { - Exec(ctx context.Context, sql string, args ...interface{}) (pgconn.CommandTag, error) Connect(context.Context) (pgx.Tx, CloseFunc, error) + Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error) + Exec(ctx context.Context, sql string, args ...interface{}) (pgconn.CommandTag, error) } diff --git a/internal/db/gc.go b/internal/db/gc.go index 7f7ff78..dfb6007 100644 --- a/internal/db/gc.go +++ b/internal/db/gc.go @@ -6,11 +6,10 @@ import ( "github.com/gadget-inc/dateilager/internal/key" "github.com/gadget-inc/dateilager/internal/telemetry" - "github.com/jackc/pgx/v5" "go.opentelemetry.io/otel/trace" ) -func GcProjectObjects(ctx context.Context, tx pgx.Tx, project int64, keep int64, fromVersion int64) ([]Hash, error) { +func GcProjectObjects(ctx context.Context, conn DbConnector, project int64, keep int64, fromVersion int64) ([]Hash, error) { ctx, span := telemetry.Start(ctx, "gc.project-objects", trace.WithAttributes( key.Project.Attribute(project), key.KeepVersions.Attribute(keep), @@ -20,7 +19,7 @@ func GcProjectObjects(ctx context.Context, tx pgx.Tx, project int64, keep int64, hashes := []Hash{} - rows, err := tx.Query(ctx, ` + rows, err := conn.Query(ctx, ` WITH latest AS ( SELECT latest_version AS version FROM dl.projects @@ -50,11 +49,11 @@ func GcProjectObjects(ctx context.Context, tx pgx.Tx, project int64, keep int64, return hashes, nil } -func GcProjectsObjects(ctx context.Context, tx pgx.Tx, projects []int64, keep int64, fromVersion int64) ([]Hash, error) { +func GcProjectsObjects(ctx context.Context, conn DbConnector, projects []int64, keep int64, fromVersion int64) ([]Hash, error) { hashes := []Hash{} for _, project := range projects { - h, err := GcProjectObjects(ctx, tx, project, keep, fromVersion) + h, err := GcProjectObjects(ctx, conn, project, keep, fromVersion) if err != nil { return nil, err } @@ -64,7 +63,7 @@ func GcProjectsObjects(ctx context.Context, tx pgx.Tx, projects []int64, keep in return hashes, nil } -func GcContentHashes(ctx context.Context, tx pgx.Tx, hashes []Hash) (int64, error) { +func GcContentHashes(ctx context.Context, conn DbConnector, hashes []Hash) (int64, error) { if len(hashes) == 0 { return 0, nil } @@ -72,7 +71,7 @@ func GcContentHashes(ctx context.Context, tx pgx.Tx, hashes []Hash) (int64, erro rowsAffected := int64(0) for _, hashChunk := range chunk(hashes, 500) { - tag, err := tx.Exec(ctx, ` + tag, err := conn.Exec(ctx, ` WITH missing AS ( SELECT c.hash FROM dl.contents c diff --git a/internal/db/project.go b/internal/db/project.go index f1884fe..45a3786 100644 --- a/internal/db/project.go +++ b/internal/db/project.go @@ -71,13 +71,13 @@ func ListProjects(ctx context.Context, tx pgx.Tx) ([]*pb.Project, error) { return projects, nil } -func RandomProjects(ctx context.Context, tx pgx.Tx, sample float32) ([]int64, error) { +func RandomProjects(ctx context.Context, conn DbConnector, sample float32) ([]int64, error) { var projects []int64 for i := 0; i < 5; i++ { // The SYSTEM sampling method would be quicker but it often produces no or all data // on tables with just a few rows. - rows, err := tx.Query(ctx, fmt.Sprintf(` + rows, err := conn.Query(ctx, fmt.Sprintf(` SELECT id FROM dl.projects TABLESAMPLE BERNOULLI(%f) diff --git a/internal/testutil/db.go b/internal/testutil/db.go index 6863c35..d5f81c5 100644 --- a/internal/testutil/db.go +++ b/internal/testutil/db.go @@ -46,6 +46,14 @@ func (d *DbTestConnector) Connect(ctx context.Context) (pgx.Tx, db.CloseFunc, er return innerTx, func(context.Context) {}, nil } +func (d *DbTestConnector) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error) { + if d.innerTx != nil { + return d.innerTx.Query(ctx, sql, args...) + } else { + return d.tx.Query(ctx, sql, args...) + } +} + func (d *DbTestConnector) Exec(ctx context.Context, sql string, args ...interface{}) (pgconn.CommandTag, error) { if d.innerTx != nil { return d.innerTx.Exec(ctx, sql, args...) diff --git a/pkg/api/fs.go b/pkg/api/fs.go index c15f29a..7983712 100644 --- a/pkg/api/fs.go +++ b/pkg/api/fs.go @@ -847,12 +847,6 @@ func (f *Fs) GcProject(ctx context.Context, req *pb.GcProjectRequest) (*pb.GcPro return nil, status.Error(codes.InvalidArgument, "Invalid GC KeepVersions: cannot keep 0 versions") } - tx, close, err := f.DbConn.Connect(ctx) - if err != nil { - return nil, status.Errorf(codes.Unavailable, "FS db connection unavailable: %v", err) - } - defer close(ctx) - logger.Debug(ctx, "FS.GcProject[Init]", key.Project.Field(req.Project)) fromVersion := int64(0) @@ -860,22 +854,16 @@ func (f *Fs) GcProject(ctx context.Context, req *pb.GcProjectRequest) (*pb.GcPro fromVersion = *req.FromVersion } - hashes, err := db.GcProjectObjects(ctx, tx, req.Project, req.KeepVersions, fromVersion) + hashes, err := db.GcProjectObjects(ctx, f.DbConn, req.Project, req.KeepVersions, fromVersion) if err != nil { return nil, status.Errorf(codes.Internal, "FS gc project objects %v: %v", req.Project, err) } - count, err := db.GcContentHashes(ctx, tx, hashes) + count, err := db.GcContentHashes(ctx, f.DbConn, hashes) if err != nil { return nil, status.Errorf(codes.Internal, "FS gc content hashes %v: %v", req.Project, err) } - err = tx.Commit(ctx) - if err != nil { - return nil, status.Errorf(codes.Internal, "FS gc project commit tx: %v", err) - } - logger.Debug(ctx, "FS.GcProject[Commit]") - return &pb.GcProjectResponse{ Count: count, Project: req.Project, @@ -901,12 +889,6 @@ func (f *Fs) GcRandomProjects(ctx context.Context, req *pb.GcRandomProjectsReque return nil, status.Error(codes.InvalidArgument, "Invalid GC KeepVersions: cannot keep 0 versions") } - tx, close, err := f.DbConn.Connect(ctx) - if err != nil { - return nil, status.Errorf(codes.Unavailable, "FS db connection unavailable: %v", err) - } - defer close(ctx) - logger.Debug(ctx, "FS.GcRandomProjects[Init]", key.SampleRate.Field(req.Sample)) fromVersion := int64(0) @@ -914,27 +896,21 @@ func (f *Fs) GcRandomProjects(ctx context.Context, req *pb.GcRandomProjectsReque fromVersion = *req.FromVersion } - projects, err := db.RandomProjects(ctx, tx, req.Sample) + projects, err := db.RandomProjects(ctx, f.DbConn, req.Sample) if err != nil { return nil, status.Errorf(codes.Internal, "FS gc random projects %f: %v", req.Sample, err) } - hashes, err := db.GcProjectsObjects(ctx, tx, projects, req.KeepVersions, fromVersion) + hashes, err := db.GcProjectsObjects(ctx, f.DbConn, projects, req.KeepVersions, fromVersion) if err != nil { return nil, status.Errorf(codes.Internal, "FS gc random project objects: %v", err) } - count, err := db.GcContentHashes(ctx, tx, hashes) + count, err := db.GcContentHashes(ctx, f.DbConn, hashes) if err != nil { return nil, status.Errorf(codes.Internal, "FS gc random content hashes: %v", err) } - err = tx.Commit(ctx) - if err != nil { - return nil, status.Errorf(codes.Internal, "FS gc random projects commit tx: %v", err) - } - logger.Debug(ctx, "FS.GcRandomProjects[Commit]") - return &pb.GcRandomProjectsResponse{ Count: count, Projects: projects, @@ -951,30 +927,18 @@ func (f *Fs) GcContents(ctx context.Context, req *pb.GcContentsRequest) (*pb.GcC return nil, err } - tx, close, err := f.DbConn.Connect(ctx) - if err != nil { - return nil, status.Errorf(codes.Unavailable, "FS db connection unavailable: %v", err) - } - defer close(ctx) - logger.Debug(ctx, "FS.GcContents[Init]") - hashes, err := db.RandomContents(ctx, tx, req.Sample) + hashes, err := db.RandomContents(ctx, f.DbConn, req.Sample) if err != nil { return nil, status.Errorf(codes.Internal, "FS gc random contents %f: %v", req.Sample, err) } - count, err := db.GcContentHashes(ctx, tx, hashes) + count, err := db.GcContentHashes(ctx, f.DbConn, hashes) if err != nil { return nil, status.Errorf(codes.Internal, "FS gc random content hashes: %v", err) } - err = tx.Commit(ctx) - if err != nil { - return nil, status.Errorf(codes.Internal, "FS gc contents commit tx: %v", err) - } - logger.Debug(ctx, "FS.GcContents[Commit]") - return &pb.GcContentsResponse{ Count: count, }, nil diff --git a/pkg/cli/client.go b/pkg/cli/client.go index 8cf41ff..9b74b30 100644 --- a/pkg/cli/client.go +++ b/pkg/cli/client.go @@ -5,7 +5,6 @@ import ( "encoding/json" "flag" "fmt" - "os" "strings" "time" @@ -61,10 +60,8 @@ func NewClientCommand() *cobra.Command { ctx := cmd.Context() - fmt.Fprintf(os.Stderr, "timeout: %v\n", timeout) if timeout != 0 { ctx, cancel = context.WithTimeout(cmd.Context(), time.Duration(timeout)*time.Millisecond) - fmt.Fprintf(os.Stderr, "duration: %v\n", time.Duration(timeout)*time.Second) } if tracing { diff --git a/pkg/server/server.go b/pkg/server/server.go index 2537b56..4483e0a 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -115,6 +115,10 @@ func (d *DbPoolConnector) Connect(ctx context.Context) (pgx.Tx, db.CloseFunc, er return tx, func(ctx context.Context) { _ = tx.Rollback(ctx); conn.Release() }, nil } +func (d *DbPoolConnector) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error) { + return d.pool.Query(ctx, sql, args...) +} + func (d *DbPoolConnector) Exec(ctx context.Context, sql string, args ...interface{}) (pgconn.CommandTag, error) { return d.pool.Exec(ctx, sql, args...) }