Skip to content

Commit

Permalink
Staged Update
Browse files Browse the repository at this point in the history
  • Loading branch information
angelini committed Mar 19, 2024
1 parent ecd6db9 commit c676e9c
Show file tree
Hide file tree
Showing 8 changed files with 263 additions and 192 deletions.
75 changes: 44 additions & 31 deletions internal/db/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func UpdateLatestVersion(ctx context.Context, tx pgx.Tx, project int64, version
return nil
}

func DeleteObject(ctx context.Context, tx pgx.Tx, project int64, version int64, path string) error {
func DeleteObject(ctx context.Context, tx pgx.Tx, project int64, version int64, path string, isStaged bool) error {
_, err := tx.Exec(ctx, `
UPDATE dl.objects
SET stop_version = $1
Expand All @@ -39,7 +39,7 @@ func DeleteObject(ctx context.Context, tx pgx.Tx, project int64, version int64,
}

// UpdateObject returns true if content changed, false otherwise
func UpdateObject(ctx context.Context, tx pgx.Tx, conn DbConnector, encoder *ContentEncoder, project int64, version int64, object *pb.Object) (bool, error) {
func UpdateObject(ctx context.Context, tx pgx.Tx, conn DbConnector, encoder *ContentEncoder, project int64, version int64, object *pb.Object, isStaged bool) (bool, error) {
content := object.Content
if content == nil {
content = []byte("")
Expand All @@ -61,13 +61,18 @@ func UpdateObject(ctx context.Context, tx pgx.Tx, conn DbConnector, encoder *Con
return false, fmt.Errorf("insert objects content, hash %x-%x: %w", hash.H1, hash.H2, err)
}

rows, err := tx.Query(ctx, `
INSERT INTO dl.objects (project, start_version, stop_version, path, hash, mode, size, packed)
objectTable := "dl.objects"
if isStaged {
objectTable = "dl.staged_objects"
}

rows, err := tx.Query(ctx, fmt.Sprintf(`
INSERT INTO %s (project, start_version, stop_version, path, hash, mode, size, packed)
VALUES ($1, $2, NULL, $3, ($4, $5), $6, $7, $8)
ON CONFLICT
DO NOTHING
RETURNING project
`, project, version, object.Path, hash.H1, hash.H2, object.Mode, object.Size, false)
`, objectTable), project, version, object.Path, hash.H1, hash.H2, object.Mode, object.Size, false)
if err != nil {
return false, fmt.Errorf("insert new object, project %v, version %v, path %v: %w", project, version, object.Path, err)
}
Expand All @@ -79,30 +84,31 @@ func UpdateObject(ctx context.Context, tx pgx.Tx, conn DbConnector, encoder *Con
return false, nil
}

previousPaths := []string{object.Path}
pathChunks := strings.Split(object.Path, "/")

for i := 1; i < len(pathChunks); i++ {
previousPaths = append(previousPaths, fmt.Sprintf("%s/", strings.Join(pathChunks[:i], "/")))
}
if !isStaged {
previousPaths := []string{object.Path}
pathChunks := strings.Split(object.Path, "/")

_, err = tx.Exec(ctx, `
UPDATE dl.objects SET stop_version = $1
WHERE project = $2
AND path = ANY($3)
AND stop_version IS NULL
AND start_version != $4
`, version, project, previousPaths, version)
for i := 1; i < len(pathChunks); i++ {
previousPaths = append(previousPaths, fmt.Sprintf("%s/", strings.Join(pathChunks[:i], "/")))
}

if err != nil {
return false, fmt.Errorf("update previous object, project %v, version %v, path %v: %w", project, version, object.Path, err)
_, err = tx.Exec(ctx, `
UPDATE dl.objects SET stop_version = $1
WHERE project = $2
AND path = ANY($3)
AND stop_version IS NULL
AND start_version != $4
`, version, project, previousPaths, version)
if err != nil {
return false, fmt.Errorf("update previous object, project %v, version %v, path %v: %w", project, version, object.Path, err)
}
}

return true, nil
}

// UpdatePackedObjects returns true if content changed, false otherwise
func UpdatePackedObjects(ctx context.Context, tx pgx.Tx, conn DbConnector, project int64, version int64, parent string, updates []*pb.Object) (bool, error) {
func UpdatePackedObjects(ctx context.Context, tx pgx.Tx, conn DbConnector, project int64, version int64, parent string, updates []*pb.Object, isStaged bool) (bool, error) {
var hash Hash
var content []byte

Expand Down Expand Up @@ -146,13 +152,15 @@ func UpdatePackedObjects(ctx context.Context, tx pgx.Tx, conn DbConnector, proje

batch := &pgx.Batch{}

batch.Queue(`
UPDATE dl.objects SET stop_version = $1
WHERE project = $2
AND path = $3
AND packed IS true
AND stop_version IS NULL
`, version, project, parent)
if !isStaged {
batch.Queue(`
UPDATE dl.objects SET stop_version = $1
WHERE project = $2
AND path = $3
AND packed IS true
AND stop_version IS NULL
`, version, project, parent)
}

if shouldInsert {
// insert the content outside the transaction to avoid deadlocks and to keep smaller transactions
Expand All @@ -166,10 +174,15 @@ func UpdatePackedObjects(ctx context.Context, tx pgx.Tx, conn DbConnector, proje
return false, fmt.Errorf("insert packed content, hash %x-%x: %w", newHash.H1, newHash.H2, err)
}

batch.Queue(`
INSERT INTO dl.objects (project, start_version, stop_version, path, hash, mode, size, packed)
objectTable := "dl.objects"
if isStaged {
objectTable = "dl.staged_objects"
}

batch.Queue(fmt.Sprintf(`
INSERT INTO %s (project, start_version, stop_version, path, hash, mode, size, packed)
VALUES ($1, $2, NULL, $3, ($4, $5), $6, $7, $8)
`, project, version, parent, newHash.H1, newHash.H2, 0, len(updated), true)
`, objectTable), project, version, parent, newHash.H1, newHash.H2, 0, len(updated), true)
}

results := tx.SendBatch(ctx, batch)
Expand Down
1 change: 1 addition & 0 deletions internal/key/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const (
Directory = StringKey("dl.directory")
Environment = StringKey("dl.environment")
FromVersion = Int64pKey("dl.from_version")
IsStaged = BoolKey("dl.is_staged")
KeepVersions = Int64Key("dl.keep_versions")
LatestVersion = Int64Key("dl.latest_version")
LiveObjectsCount = Int64Key("dl.live_objects_count")
Expand Down
Loading

0 comments on commit c676e9c

Please sign in to comment.