Skip to content

Commit

Permalink
GC fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
angelini committed Mar 21, 2024
1 parent 24d6a05 commit c028c41
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 58 deletions.
4 changes: 2 additions & 2 deletions internal/db/content.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,8 @@ func (cl *ContentLookup) Lookup(ctx context.Context, tx pgx.Tx, hashesToLookup m
return contents, nil
}

func RandomContents(ctx context.Context, tx pgx.Tx, sample float32) ([]Hash, error) {
rows, err := tx.Query(ctx, fmt.Sprintf(`
func RandomContents(ctx context.Context, conn DbConnector, sample float32) ([]Hash, error) {
rows, err := conn.Query(ctx, fmt.Sprintf(`
SELECT (hash).h1, (hash).h2
FROM dl.contents
TABLESAMPLE SYSTEM(%f)
Expand Down
3 changes: 2 additions & 1 deletion internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
type CloseFunc func(context.Context)

type DbConnector interface {
Exec(ctx context.Context, sql string, args ...interface{}) (pgconn.CommandTag, error)
Connect(context.Context) (pgx.Tx, CloseFunc, error)
Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
Exec(ctx context.Context, sql string, args ...interface{}) (pgconn.CommandTag, error)
}
13 changes: 6 additions & 7 deletions internal/db/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ import (

"github.com/gadget-inc/dateilager/internal/key"
"github.com/gadget-inc/dateilager/internal/telemetry"
"github.com/jackc/pgx/v5"
"go.opentelemetry.io/otel/trace"
)

func GcProjectObjects(ctx context.Context, tx pgx.Tx, project int64, keep int64, fromVersion int64) ([]Hash, error) {
func GcProjectObjects(ctx context.Context, conn DbConnector, project int64, keep int64, fromVersion int64) ([]Hash, error) {
ctx, span := telemetry.Start(ctx, "gc.project-objects", trace.WithAttributes(
key.Project.Attribute(project),
key.KeepVersions.Attribute(keep),
Expand All @@ -20,7 +19,7 @@ func GcProjectObjects(ctx context.Context, tx pgx.Tx, project int64, keep int64,

hashes := []Hash{}

rows, err := tx.Query(ctx, `
rows, err := conn.Query(ctx, `
WITH latest AS (
SELECT latest_version AS version
FROM dl.projects
Expand Down Expand Up @@ -50,11 +49,11 @@ func GcProjectObjects(ctx context.Context, tx pgx.Tx, project int64, keep int64,
return hashes, nil
}

func GcProjectsObjects(ctx context.Context, tx pgx.Tx, projects []int64, keep int64, fromVersion int64) ([]Hash, error) {
func GcProjectsObjects(ctx context.Context, conn DbConnector, projects []int64, keep int64, fromVersion int64) ([]Hash, error) {
hashes := []Hash{}

for _, project := range projects {
h, err := GcProjectObjects(ctx, tx, project, keep, fromVersion)
h, err := GcProjectObjects(ctx, conn, project, keep, fromVersion)
if err != nil {
return nil, err
}
Expand All @@ -64,15 +63,15 @@ func GcProjectsObjects(ctx context.Context, tx pgx.Tx, projects []int64, keep in
return hashes, nil
}

func GcContentHashes(ctx context.Context, tx pgx.Tx, hashes []Hash) (int64, error) {
func GcContentHashes(ctx context.Context, conn DbConnector, hashes []Hash) (int64, error) {
if len(hashes) == 0 {
return 0, nil
}

rowsAffected := int64(0)

for _, hashChunk := range chunk(hashes, 500) {
tag, err := tx.Exec(ctx, `
tag, err := conn.Exec(ctx, `
WITH missing AS (
SELECT c.hash
FROM dl.contents c
Expand Down
4 changes: 2 additions & 2 deletions internal/db/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,13 @@ func ListProjects(ctx context.Context, tx pgx.Tx) ([]*pb.Project, error) {
return projects, nil
}

func RandomProjects(ctx context.Context, tx pgx.Tx, sample float32) ([]int64, error) {
func RandomProjects(ctx context.Context, conn DbConnector, sample float32) ([]int64, error) {
var projects []int64

for i := 0; i < 5; i++ {
// The SYSTEM sampling method would be quicker but it often produces no or all data
// on tables with just a few rows.
rows, err := tx.Query(ctx, fmt.Sprintf(`
rows, err := conn.Query(ctx, fmt.Sprintf(`
SELECT id
FROM dl.projects
TABLESAMPLE BERNOULLI(%f)
Expand Down
8 changes: 8 additions & 0 deletions internal/testutil/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ func (d *DbTestConnector) Connect(ctx context.Context) (pgx.Tx, db.CloseFunc, er
return innerTx, func(context.Context) {}, nil
}

func (d *DbTestConnector) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error) {
if d.innerTx != nil {
return d.innerTx.Query(ctx, sql, args...)
} else {
return d.tx.Query(ctx, sql, args...)
}
}

func (d *DbTestConnector) Exec(ctx context.Context, sql string, args ...interface{}) (pgconn.CommandTag, error) {
if d.innerTx != nil {
return d.innerTx.Exec(ctx, sql, args...)
Expand Down
50 changes: 7 additions & 43 deletions pkg/api/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,35 +847,23 @@ func (f *Fs) GcProject(ctx context.Context, req *pb.GcProjectRequest) (*pb.GcPro
return nil, status.Error(codes.InvalidArgument, "Invalid GC KeepVersions: cannot keep 0 versions")
}

tx, close, err := f.DbConn.Connect(ctx)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "FS db connection unavailable: %v", err)
}
defer close(ctx)

logger.Debug(ctx, "FS.GcProject[Init]", key.Project.Field(req.Project))

fromVersion := int64(0)
if req.FromVersion != nil {
fromVersion = *req.FromVersion
}

hashes, err := db.GcProjectObjects(ctx, tx, req.Project, req.KeepVersions, fromVersion)
hashes, err := db.GcProjectObjects(ctx, f.DbConn, req.Project, req.KeepVersions, fromVersion)
if err != nil {
return nil, status.Errorf(codes.Internal, "FS gc project objects %v: %v", req.Project, err)
}

count, err := db.GcContentHashes(ctx, tx, hashes)
count, err := db.GcContentHashes(ctx, f.DbConn, hashes)
if err != nil {
return nil, status.Errorf(codes.Internal, "FS gc content hashes %v: %v", req.Project, err)
}

err = tx.Commit(ctx)
if err != nil {
return nil, status.Errorf(codes.Internal, "FS gc project commit tx: %v", err)
}
logger.Debug(ctx, "FS.GcProject[Commit]")

return &pb.GcProjectResponse{
Count: count,
Project: req.Project,
Expand All @@ -901,40 +889,28 @@ func (f *Fs) GcRandomProjects(ctx context.Context, req *pb.GcRandomProjectsReque
return nil, status.Error(codes.InvalidArgument, "Invalid GC KeepVersions: cannot keep 0 versions")
}

tx, close, err := f.DbConn.Connect(ctx)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "FS db connection unavailable: %v", err)
}
defer close(ctx)

logger.Debug(ctx, "FS.GcRandomProjects[Init]", key.SampleRate.Field(req.Sample))

fromVersion := int64(0)
if req.FromVersion != nil {
fromVersion = *req.FromVersion
}

projects, err := db.RandomProjects(ctx, tx, req.Sample)
projects, err := db.RandomProjects(ctx, f.DbConn, req.Sample)
if err != nil {
return nil, status.Errorf(codes.Internal, "FS gc random projects %f: %v", req.Sample, err)
}

hashes, err := db.GcProjectsObjects(ctx, tx, projects, req.KeepVersions, fromVersion)
hashes, err := db.GcProjectsObjects(ctx, f.DbConn, projects, req.KeepVersions, fromVersion)
if err != nil {
return nil, status.Errorf(codes.Internal, "FS gc random project objects: %v", err)
}

count, err := db.GcContentHashes(ctx, tx, hashes)
count, err := db.GcContentHashes(ctx, f.DbConn, hashes)
if err != nil {
return nil, status.Errorf(codes.Internal, "FS gc random content hashes: %v", err)
}

err = tx.Commit(ctx)
if err != nil {
return nil, status.Errorf(codes.Internal, "FS gc random projects commit tx: %v", err)
}
logger.Debug(ctx, "FS.GcRandomProjects[Commit]")

return &pb.GcRandomProjectsResponse{
Count: count,
Projects: projects,
Expand All @@ -951,30 +927,18 @@ func (f *Fs) GcContents(ctx context.Context, req *pb.GcContentsRequest) (*pb.GcC
return nil, err
}

tx, close, err := f.DbConn.Connect(ctx)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "FS db connection unavailable: %v", err)
}
defer close(ctx)

logger.Debug(ctx, "FS.GcContents[Init]")

hashes, err := db.RandomContents(ctx, tx, req.Sample)
hashes, err := db.RandomContents(ctx, f.DbConn, req.Sample)
if err != nil {
return nil, status.Errorf(codes.Internal, "FS gc random contents %f: %v", req.Sample, err)
}

count, err := db.GcContentHashes(ctx, tx, hashes)
count, err := db.GcContentHashes(ctx, f.DbConn, hashes)
if err != nil {
return nil, status.Errorf(codes.Internal, "FS gc random content hashes: %v", err)
}

err = tx.Commit(ctx)
if err != nil {
return nil, status.Errorf(codes.Internal, "FS gc contents commit tx: %v", err)
}
logger.Debug(ctx, "FS.GcContents[Commit]")

return &pb.GcContentsResponse{
Count: count,
}, nil
Expand Down
3 changes: 0 additions & 3 deletions pkg/cli/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"flag"
"fmt"
"os"
"strings"
"time"

Expand Down Expand Up @@ -61,10 +60,8 @@ func NewClientCommand() *cobra.Command {

ctx := cmd.Context()

fmt.Fprintf(os.Stderr, "timeout: %v\n", timeout)
if timeout != 0 {
ctx, cancel = context.WithTimeout(cmd.Context(), time.Duration(timeout)*time.Millisecond)
fmt.Fprintf(os.Stderr, "duration: %v\n", time.Duration(timeout)*time.Second)
}

if tracing {
Expand Down
4 changes: 4 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ func (d *DbPoolConnector) Connect(ctx context.Context) (pgx.Tx, db.CloseFunc, er
return tx, func(ctx context.Context) { _ = tx.Rollback(ctx); conn.Release() }, nil
}

func (d *DbPoolConnector) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error) {
return d.pool.Query(ctx, sql, args...)
}

func (d *DbPoolConnector) Exec(ctx context.Context, sql string, args ...interface{}) (pgconn.CommandTag, error) {
return d.pool.Exec(ctx, sql, args...)
}
Expand Down

0 comments on commit c028c41

Please sign in to comment.