Skip to content

Commit

Permalink
Staged Update
Browse files Browse the repository at this point in the history
  • Loading branch information
angelini committed Mar 20, 2024
1 parent ecd6db9 commit 870ce18
Show file tree
Hide file tree
Showing 13 changed files with 658 additions and 318 deletions.
10 changes: 9 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ test-fuzz: reset-db
go run cmd/fuzz-test/main.go --host $(GRPC_HOST) --iterations 1000 --projects 5

reset-db: migrate
psql $(DB_URI) -c "truncate dl.objects; truncate dl.contents; truncate dl.projects; truncate dl.cache_versions;"
psql $(DB_URI) -c "truncate dl.objects; truncate dl.contents; truncate dl.projects; truncate dl.cache_versions; truncate dl.staged_objects"

setup-local: reset-db
psql $(DB_URI) -c "insert into dl.projects (id, latest_version, pack_patterns) values (1, 0, '{\"node_modules/.*/\"}');"
Expand Down Expand Up @@ -145,6 +145,14 @@ client-large-update:
development/scripts/complex_input.sh 3
go run cmd/client/main.go update --host $(GRPC_HOST) --project 1 --dir input/complex

client-staged-update: export DL_TOKEN=$(DEV_TOKEN_PROJECT_1)
client-staged-update: export DL_SKIP_SSL_VERIFICATION=1
client-staged-update:
development/scripts/simple_input.sh 1
go run cmd/client/main.go update --host $(GRPC_HOST) --project 1 --dir input/simple
development/scripts/simple_input.sh 2
go run cmd/client/main.go update --host $(GRPC_HOST) --project 1 --dir input/simple --staged

client-get: export DL_TOKEN=$(DEV_TOKEN_PROJECT_1)
client-get: export DL_SKIP_SSL_VERIFICATION=1
client-get:
Expand Down
2 changes: 1 addition & 1 deletion cmd/fuzz-test/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ func runIteration(ctx context.Context, client *dlc.Client, project int64, operat
return -1, fmt.Errorf("failed to apply operation %s: %w", operation.String(), err)
}

version, _, err := client.Update(ctx, project, dirs.Base(project))
version, _, err := client.Update(ctx, project, dirs.Base(project), false)
if err != nil {
return -1, fmt.Errorf("failed to update project %d: %w", project, err)
}
Expand Down
99 changes: 67 additions & 32 deletions internal/db/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,19 @@ func UpdateLatestVersion(ctx context.Context, tx pgx.Tx, project int64, version
return nil
}

func DeleteObject(ctx context.Context, tx pgx.Tx, project int64, version int64, path string) error {
func DeleteObject(ctx context.Context, tx pgx.Tx, project int64, version int64, path string, isStaged bool) error {
if isStaged {
_, err := tx.Exec(ctx, `
INSERT INTO dl.staged_objects (project, start_version, stop_version, path, hash, mode, size, packed)
VALUES ($1, NULL, $2, $3, NULL, NULL, NULL, NULL)
`, project, version, path)
if err != nil {
return fmt.Errorf("delete staged object, project %v, version %v, path %v: %w", project, version, path, err)
}

return nil
}

_, err := tx.Exec(ctx, `
UPDATE dl.objects
SET stop_version = $1
Expand All @@ -39,7 +51,7 @@ func DeleteObject(ctx context.Context, tx pgx.Tx, project int64, version int64,
}

// UpdateObject returns true if content changed, false otherwise
func UpdateObject(ctx context.Context, tx pgx.Tx, conn DbConnector, encoder *ContentEncoder, project int64, version int64, object *pb.Object) (bool, error) {
func UpdateObject(ctx context.Context, tx pgx.Tx, conn DbConnector, encoder *ContentEncoder, project int64, version int64, object *pb.Object, isStaged bool) (bool, error) {
content := object.Content
if content == nil {
content = []byte("")
Expand All @@ -61,13 +73,18 @@ func UpdateObject(ctx context.Context, tx pgx.Tx, conn DbConnector, encoder *Con
return false, fmt.Errorf("insert objects content, hash %x-%x: %w", hash.H1, hash.H2, err)
}

rows, err := tx.Query(ctx, `
INSERT INTO dl.objects (project, start_version, stop_version, path, hash, mode, size, packed)
objectTable := "dl.objects"
if isStaged {
objectTable = "dl.staged_objects"
}

rows, err := tx.Query(ctx, fmt.Sprintf(`
INSERT INTO %s (project, start_version, stop_version, path, hash, mode, size, packed)
VALUES ($1, $2, NULL, $3, ($4, $5), $6, $7, $8)
ON CONFLICT
DO NOTHING
RETURNING project
`, project, version, object.Path, hash.H1, hash.H2, object.Mode, object.Size, false)
`, objectTable), project, version, object.Path, hash.H1, hash.H2, object.Mode, object.Size, false)
if err != nil {
return false, fmt.Errorf("insert new object, project %v, version %v, path %v: %w", project, version, object.Path, err)
}
Expand All @@ -79,30 +96,31 @@ func UpdateObject(ctx context.Context, tx pgx.Tx, conn DbConnector, encoder *Con
return false, nil
}

previousPaths := []string{object.Path}
pathChunks := strings.Split(object.Path, "/")
if !isStaged {
previousPaths := []string{object.Path}
pathChunks := strings.Split(object.Path, "/")

for i := 1; i < len(pathChunks); i++ {
previousPaths = append(previousPaths, fmt.Sprintf("%s/", strings.Join(pathChunks[:i], "/")))
}

_, err = tx.Exec(ctx, `
UPDATE dl.objects SET stop_version = $1
WHERE project = $2
AND path = ANY($3)
AND stop_version IS NULL
AND start_version != $4
`, version, project, previousPaths, version)
for i := 1; i < len(pathChunks); i++ {
previousPaths = append(previousPaths, fmt.Sprintf("%s/", strings.Join(pathChunks[:i], "/")))
}

if err != nil {
return false, fmt.Errorf("update previous object, project %v, version %v, path %v: %w", project, version, object.Path, err)
_, err = tx.Exec(ctx, `
UPDATE dl.objects SET stop_version = $1
WHERE project = $2
AND path = ANY($3)
AND stop_version IS NULL
AND start_version != $4
`, version, project, previousPaths, version)
if err != nil {
return false, fmt.Errorf("update previous object, project %v, version %v, path %v: %w", project, version, object.Path, err)
}
}

return true, nil
}

// UpdatePackedObjects returns true if content changed, false otherwise
func UpdatePackedObjects(ctx context.Context, tx pgx.Tx, conn DbConnector, project int64, version int64, parent string, updates []*pb.Object) (bool, error) {
func UpdatePackedObjects(ctx context.Context, tx pgx.Tx, conn DbConnector, project int64, version int64, parent string, updates []*pb.Object, isStaged bool) (bool, error) {
var hash Hash
var content []byte

Expand Down Expand Up @@ -146,30 +164,43 @@ func UpdatePackedObjects(ctx context.Context, tx pgx.Tx, conn DbConnector, proje

batch := &pgx.Batch{}

batch.Queue(`
UPDATE dl.objects SET stop_version = $1
WHERE project = $2
AND path = $3
AND packed IS true
AND stop_version IS NULL
`, version, project, parent)
if isStaged {
batch.Queue(`
INSERT INTO dl.staged_objects (project, start_version, stop_version, path, hash, mode, size, packed)
VALUES ($1, NULL, $2, $3, NULL, NULL, NULL, NULL)
`, project, version, parent)
} else {
batch.Queue(`
UPDATE dl.objects SET stop_version = $1
WHERE project = $2
AND path = $3
AND packed IS true
AND stop_version IS NULL
`, version, project, parent)
}

if shouldInsert {
// insert the content outside the transaction to avoid deadlocks and to keep smaller transactions
_, err = conn.Exec(ctx, `
INSERT INTO dl.contents (hash, bytes)
VALUES (($1, $2), $3)
ON CONFLICT DO NOTHING
ON CONFLICT
DO NOTHING
`, newHash.H1, newHash.H2, updated)

if err != nil {
return false, fmt.Errorf("insert packed content, hash %x-%x: %w", newHash.H1, newHash.H2, err)
}

batch.Queue(`
INSERT INTO dl.objects (project, start_version, stop_version, path, hash, mode, size, packed)
objectTable := "dl.objects"
if isStaged {
objectTable = "dl.staged_objects"
}

batch.Queue(fmt.Sprintf(`
INSERT INTO %s (project, start_version, stop_version, path, hash, mode, size, packed)
VALUES ($1, $2, NULL, $3, ($4, $5), $6, $7, $8)
`, project, version, parent, newHash.H1, newHash.H2, 0, len(updated), true)
`, objectTable), project, version, parent, newHash.H1, newHash.H2, 0, len(updated), true)
}

results := tx.SendBatch(ctx, batch)
Expand All @@ -190,3 +221,7 @@ func UpdatePackedObjects(ctx context.Context, tx pgx.Tx, conn DbConnector, proje
// content did change
return true, nil
}

func CommitStagedObjects(ctx context.Context, tx pgx.Tx, project int64, currentVersion, nextVersion int64) error {
return nil
}
1 change: 1 addition & 0 deletions internal/key/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const (
Directory = StringKey("dl.directory")
Environment = StringKey("dl.environment")
FromVersion = Int64pKey("dl.from_version")
IsStaged = BoolKey("dl.is_staged")
KeepVersions = Int64Key("dl.keep_versions")
LatestVersion = Int64Key("dl.latest_version")
LiveObjectsCount = Int64Key("dl.live_objects_count")
Expand Down
Loading

0 comments on commit 870ce18

Please sign in to comment.