Skip to content

Commit

Permalink
fix race condition with repo add files
Browse files Browse the repository at this point in the history
Do all relevant database reading/modifying inside `maybeRunTaskInBackground`.

Notably, `LoadComplete` will load the reflist of a repo. if this is done outside of a background operation,
the data might be outdated when the background tasks runs.
  • Loading branch information
neolynx committed Oct 21, 2024
1 parent a771707 commit d813fef
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 104 deletions.
114 changes: 61 additions & 53 deletions api/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,6 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
}

resources = append(resources, string(snapshot.ResourceKey()))
err = snapshotCollection.LoadComplete(snapshot)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to publish: %s", err))
return
}

sources = append(sources, snapshot)
}
Expand All @@ -259,11 +254,6 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
}

resources = append(resources, string(localRepo.Key()))
err = localCollection.LoadComplete(localRepo)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to publish: %s", err))
}

sources = append(sources, localRepo)
}
} else {
Expand All @@ -275,17 +265,9 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
if b.MultiDist != nil {
multiDist = *b.MultiDist
}
published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, collectionFactory, multiDist)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to publish: %s", err))
return
}

collection := collectionFactory.PublishedRepoCollection()

resources = append(resources, string(published.Key()))
taskName := fmt.Sprintf("Publish %s repository %s/%s with components \"%s\" and sources \"%s\"",
b.SourceKind, published.StoragePrefix(), published.Distribution, strings.Join(components, `", "`), strings.Join(names, `", "`))
b.SourceKind, prefix, b.Distribution, strings.Join(components, `", "`), strings.Join(names, `", "`))
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
taskDetail := task.PublishDetail{
Detail: detail,
Expand All @@ -295,6 +277,29 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
PublishDetail: taskDetail,
}

for _, source := range sources {
switch s := source.(type) {
case *deb.Snapshot:
snapshotCollection := collectionFactory.SnapshotCollection()
err = snapshotCollection.LoadComplete(s)
case *deb.LocalRepo:
localCollection := collectionFactory.LocalRepoCollection()
err = localCollection.LoadComplete(s)
default:
err = fmt.Errorf("unexpected type for source: %T", source)
}
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err)
}
}

published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, collectionFactory, multiDist)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err)
}

resources = append(resources, string(published.Key()))

if b.Origin != "" {
published.Origin = b.Origin
}
Expand All @@ -320,13 +325,14 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
published.AcquireByHash = *b.AcquireByHash
}

collection := collectionFactory.PublishedRepoCollection()
duplicate := collection.CheckDuplicate(published)
if duplicate != nil {
collectionFactory.PublishedRepoCollection().LoadComplete(duplicate, collectionFactory)
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("prefix/distribution already used by another published repo: %s", duplicate)
}

err := published.Publish(context.PackagePool(), context, collectionFactory, signer, publishOutput, b.ForceOverwrite)
err = published.Publish(context.PackagePool(), context, collectionFactory, signer, publishOutput, b.ForceOverwrite)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err)
}
Expand Down Expand Up @@ -402,11 +408,6 @@ func apiPublishUpdateSwitch(c *gin.Context) {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err))
return
}
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err))
return
}

if published.SourceKind == deb.SourceLocalRepo {
if len(b.Snapshots) > 0 {
Expand All @@ -431,20 +432,24 @@ func apiPublishUpdateSwitch(c *gin.Context) {
published.MultiDist = *b.MultiDist
}

revision := published.ObtainRevision()
sources := revision.Sources

if published.SourceKind == deb.SourceSnapshot {
for _, snapshotInfo := range b.Snapshots {
component := snapshotInfo.Component
name := snapshotInfo.Name
sources[component] = name
}
}

resources := []string{string(published.Key())}
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
revision := published.ObtainRevision()
sources := revision.Sources

if published.SourceKind == deb.SourceSnapshot {
for _, snapshotInfo := range b.Snapshots {
component := snapshotInfo.Component
name := snapshotInfo.Name
sources[component] = name
}
}

err = collection.LoadComplete(published, collectionFactory)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("Unable to update: %s", err)
}
result, err := published.Update(collectionFactory, out)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("Unable to update: %s", err)
Expand Down Expand Up @@ -551,33 +556,31 @@ func apiPublishAddSource(c *gin.Context) {
return
}

err = collection.LoadComplete(published, collectionFactory)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to create: %s", err))
return
}

if c.Bind(&b) != nil {
return
}

revision := published.ObtainRevision()
sources := revision.Sources

component := b.Component
name := b.Name

_, exists := sources[component]
if exists {
AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("unable to create: Component '%s' already exists", component))
return
}

sources[component] = name

resources := []string{string(published.Key())}
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to create: %s", err)
}

revision := published.ObtainRevision()
sources := revision.Sources

_, exists := sources[component]
if exists {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("unable to create: Component '%s' already exists", component)
}

sources[component] = name

err = collection.Update(published)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
Expand Down Expand Up @@ -663,6 +666,7 @@ func apiPublishSetSources(c *gin.Context) {
return
}

// FIXME
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err))
Expand Down Expand Up @@ -722,6 +726,7 @@ func apiPublishDropChanges(c *gin.Context) {
return
}

// FIXME
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to delete: %s", err))
Expand Down Expand Up @@ -772,6 +777,7 @@ func apiPublishUpdateSource(c *gin.Context) {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err))
return
}
// FIXME

err = collection.LoadComplete(published, collectionFactory)
if err != nil {
Expand Down Expand Up @@ -842,6 +848,7 @@ func apiPublishRemoveSource(c *gin.Context) {
return
}

// FIXME
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to delete: %s", err))
Expand Down Expand Up @@ -929,6 +936,7 @@ func apiPublishUpdate(c *gin.Context) {
return
}

// FIXME
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err))
Expand Down
54 changes: 24 additions & 30 deletions api/repos.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,14 +254,14 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li
return
}

err = collection.LoadComplete(repo)
if err != nil {
AbortWithJSONError(c, 500, err)
return
}

resources := []string{string(repo.Key())}

maybeRunTaskInBackground(c, taskNamePrefix+repo.Name, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collection.LoadComplete(repo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}

out.Printf("Loading packages...\n")
list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil)
if err != nil {
Expand Down Expand Up @@ -360,12 +360,6 @@ func apiReposPackageFromDir(c *gin.Context) {
return
}

err = collection.LoadComplete(repo)
if err != nil {
AbortWithJSONError(c, 500, err)
return
}

var taskName string
var sources []string
if fileParam == "" {
Expand All @@ -379,6 +373,11 @@ func apiReposPackageFromDir(c *gin.Context) {
resources := []string{string(repo.Key())}
resources = append(resources, sources...)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collection.LoadComplete(repo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}

verifier := context.GetVerifier()

var (
Expand Down Expand Up @@ -480,17 +479,7 @@ func apiReposCopyPackage(c *gin.Context) {
return
}

err = collectionFactory.LocalRepoCollection().LoadComplete(dstRepo)
if err != nil {
AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("dest repo error: %s", err))
return
}

var (
srcRefList *deb.PackageRefList
srcRepo *deb.LocalRepo
)

var srcRepo *deb.LocalRepo
srcRepo, err = collectionFactory.LocalRepoCollection().ByName(srcRepoName)
if err != nil {
AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("src repo error: %s", err))
Expand All @@ -502,17 +491,22 @@ func apiReposCopyPackage(c *gin.Context) {
return
}

err = collectionFactory.LocalRepoCollection().LoadComplete(srcRepo)
if err != nil {
AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("src repo error: %s", err))
return
}

srcRefList = srcRepo.RefList()
taskName := fmt.Sprintf("Copy packages from repo %s to repo %s", srcRepoName, dstRepoName)
resources := []string{string(dstRepo.Key()), string(srcRepo.Key())}

maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collectionFactory.LocalRepoCollection().LoadComplete(dstRepo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("dest repo error: %s", err)
}

err = collectionFactory.LocalRepoCollection().LoadComplete(srcRepo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("src repo error: %s", err)
}

srcRefList := srcRepo.RefList()

reporter := &aptly.RecordingResultReporter{
Warnings: []string{},
AddedLines: []string{},
Expand Down
Loading

0 comments on commit d813fef

Please sign in to comment.