diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 455eee6389..c799ba23f9 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -67,7 +67,7 @@ jobs: run: sudo apt-get update # gpgme-devel is needed for container upload dependencies - name: Install test dependencies - run: sudo apt-get install -y libgpgme-dev + run: sudo apt-get install -y libgpgme-dev libbtrfs-dev - uses: actions/checkout@v4 - uses: actions/setup-go@v5 with: diff --git a/Makefile b/Makefile index 50ad8b22e6..9103c6c16b 100644 --- a/Makefile +++ b/Makefile @@ -182,7 +182,6 @@ push-check: lint build unit-tests srpm man ./tools/check-runners ./tools/check-snapshots --errors-only . rpmlint --config rpmlint.config $(CURDIR)/rpmbuild/SRPMS/* - ./tools/prepare-source.sh @if [ 0 -ne $$(git status --porcelain --untracked-files|wc -l) ]; then \ echo "There should be no changed or untracked files"; \ git status --porcelain --untracked-files; \ @@ -350,6 +349,7 @@ SHELLCHECK_FILES=$(shell find . -name "*.sh" -not -regex "./vendor/.*") .PHONY: lint lint: $(GOLANGCI_LINT_CACHE_DIR) container_composer_golangci_built.info + ./tools/prepare-source.sh podman run -t --rm -v $(SRCDIR):/app:z -v $(GOLANGCI_LINT_CACHE_DIR):/root/.cache:z -w /app $(GOLANGCI_COMPOSER_IMAGE) golangci-lint run -v echo "$(SHELLCHECK_FILES)" | xargs shellcheck --shell bash -e SC1091 -e SC2002 -e SC2317 diff --git a/internal/cloudapi/v2/server.go b/internal/cloudapi/v2/server.go index 2d702de8a3..53245fe6d1 100644 --- a/internal/cloudapi/v2/server.go +++ b/internal/cloudapi/v2/server.go @@ -3,6 +3,7 @@ package v2 import ( "context" "encoding/json" + "errors" "fmt" "net/http" "strings" @@ -445,22 +446,81 @@ func (s *Server) enqueueKojiCompose(taskID uint64, server, name, version, releas } func serializeManifest(ctx context.Context, manifestSource *manifest.Manifest, workers *worker.Server, depsolveJobID, containerResolveJobID, ostreeResolveJobID, manifestJobID uuid.UUID, seed int64) { - ctx, cancel := context.WithTimeout(ctx, time.Minute*5) + // prepared to become a config variable + const depsolveTimeout = 5 + ctx, cancel := context.WithTimeout(ctx, time.Minute*depsolveTimeout) defer cancel() - // wait until job is in a pending state - var token uuid.UUID + jobResult := &worker.ManifestJobByIDResult{ + Manifest: nil, + ManifestInfo: worker.ManifestInfo{ + OSBuildComposerVersion: common.BuildVersion(), + }, + } + var dynArgs []json.RawMessage var err error + token := uuid.Nil logWithId := logrus.WithField("jobId", manifestJobID) + + defer func() { + // token == uuid.Nil indicates that no worker even started processing + if token == uuid.Nil { + if jobResult.JobError != nil { + // set all jobs to "failed" + // osbuild job will fail as dependency + jobs := []struct { + Name string + ID uuid.UUID + }{ + {"depsolve", depsolveJobID}, + {"containerResolve", containerResolveJobID}, + {"ostreeResolve", ostreeResolveJobID}, + {"manifest", manifestJobID}, + } + + for _, job := range jobs { + if job.ID != uuid.Nil { + err := workers.SetFailed(job.ID, jobResult.JobError) + if err != nil { + logWithId.Errorf("Error failing %s job: %v", job.Name, err) + } + } + } + + } else { + logWithId.Errorf("Internal error, no worker started depsolve but we didn't get a reason.") + } + } else { + result, err := json.Marshal(jobResult) + if err != nil { + logWithId.Errorf("Error marshalling manifest job results: %v", err) + } + err = workers.FinishJob(token, result) + if err != nil { + logWithId.Errorf("Error finishing manifest job: %v", err) + } + if jobResult.JobError != nil { + logWithId.Errorf("Error in manifest job %v: %v", jobResult.JobError.Reason, err) + } + } + }() + + // wait until job is in a pending state for { _, token, _, _, dynArgs, err = workers.RequestJobById(ctx, "", manifestJobID) - if err == jobqueue.ErrNotPending { + if errors.Is(err, jobqueue.ErrNotPending) { logWithId.Debug("Manifest job not pending, waiting for depsolve job to finish") time.Sleep(time.Millisecond * 50) select { case <-ctx.Done(): - logWithId.Warning("Manifest job dependencies took longer than 5 minutes to finish, or the server is shutting down, returning to avoid dangling routines") + logWithId.Warning(fmt.Sprintf("Manifest job dependencies took longer than %d minutes to finish,"+ + " or the server is shutting down, returning to avoid dangling routines", depsolveTimeout)) + + jobResult.JobError = clienterrors.New(clienterrors.ErrorDepsolveTimeout, + "Timeout while waiting for package dependency resolution", + "There may be a temporary issue with compute resources.", + ) break default: continue @@ -473,13 +533,6 @@ func serializeManifest(ctx context.Context, manifestSource *manifest.Manifest, w break } - jobResult := &worker.ManifestJobByIDResult{ - Manifest: nil, - ManifestInfo: worker.ManifestInfo{ - OSBuildComposerVersion: common.BuildVersion(), - }, - } - // add osbuild/images dependency info to job result osbuildImagesDep, err := common.GetDepModuleInfoByPath(common.OSBuildImagesModulePath) if err != nil { @@ -491,22 +544,6 @@ func serializeManifest(ctx context.Context, manifestSource *manifest.Manifest, w jobResult.ManifestInfo.OSBuildComposerDeps = append(jobResult.ManifestInfo.OSBuildComposerDeps, osbuildImagesDepModule) } - defer func() { - if jobResult.JobError != nil { - logWithId.Errorf("Error in manifest job %v: %v", jobResult.JobError.Reason, err) - } - - result, err := json.Marshal(jobResult) - if err != nil { - logWithId.Errorf("Error marshalling manifest job results: %v", err) - } - - err = workers.FinishJob(token, result) - if err != nil { - logWithId.Errorf("Error finishing manifest job: %v", err) - } - }() - if len(dynArgs) == 0 { reason := "No dynamic arguments" jobResult.JobError = clienterrors.New(clienterrors.ErrorNoDynamicArgs, reason, nil) diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue.go b/internal/jobqueue/fsjobqueue/fsjobqueue.go index c8f9e4b38b..cc73abe516 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue.go @@ -403,6 +403,40 @@ func (q *fsJobQueue) CancelJob(id uuid.UUID) error { return nil } +func (q *fsJobQueue) FailJob(id uuid.UUID, result interface{}) error { + q.mu.Lock() + defer q.mu.Unlock() + + j, err := q.readJob(id) + if err != nil { + return err + } + + if !j.FinishedAt.IsZero() { + return jobqueue.ErrFinished + } + + if !j.StartedAt.IsZero() { + return jobqueue.ErrRunning + } + + j.Result, err = json.Marshal(result) + if err != nil { + return err + } + + j.StartedAt = time.Now() + j.FinishedAt = time.Now() + j.Token = uuid.New() + + err = q.db.Write(id.String(), j) + if err != nil { + return fmt.Errorf("error writing job %s: %v", id, err) + } + + return nil +} + func (q *fsJobQueue) JobStatus(id uuid.UUID) (jobType string, channel string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, dependents []uuid.UUID, err error) { j, err := q.readJob(id) if err != nil { diff --git a/internal/jobqueue/jobqueuetest/jobqueuetest.go b/internal/jobqueue/jobqueuetest/jobqueuetest.go index bfd431cd52..930a076f55 100644 --- a/internal/jobqueue/jobqueuetest/jobqueuetest.go +++ b/internal/jobqueue/jobqueuetest/jobqueuetest.go @@ -7,6 +7,8 @@ import ( "context" "encoding/json" "fmt" + "github.com/osbuild/osbuild-composer/internal/worker" + "github.com/osbuild/osbuild-composer/internal/worker/clienterrors" "os" "sync" "testing" @@ -55,6 +57,7 @@ func TestJobQueue(t *testing.T, makeJobQueue MakeJobQueue) { t.Run("multiple-channels", wrap(testMultipleChannels)) t.Run("100-dequeuers", wrap(test100dequeuers)) t.Run("workers", wrap(testWorkers)) + t.Run("fail", wrap(testFail)) } func pushTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, args interface{}, dependencies []uuid.UUID, channel string) uuid.UUID { @@ -762,3 +765,54 @@ func testWorkers(t *testing.T, q jobqueue.JobQueue) { err = q.DeleteWorker(w2) require.NoError(t, err) } + +func testFail(t *testing.T, q jobqueue.JobQueue) { + startTime := time.Now() + + FailedJobErrorResult := worker.JobResult{ + JobError: clienterrors.New(clienterrors.ErrorDepsolveTimeout, + "Test timeout reason", + "Test timeout details", + ), + } + + // set a non-existing job to failed + err := q.FailJob(uuid.New(), FailedJobErrorResult) + require.Error(t, err) + + // Cancel a pending job + id := pushTestJob(t, q, "coralreef", nil, nil, "testchannel") + require.NotEmpty(t, id) + + err = q.FailJob(id, FailedJobErrorResult) + require.NoError(t, err) + + //nolint:golint,ineffassign + jobType, channel, result, queued, started, finished, canceled, _, _, err := q.JobStatus(id) + require.NoError(t, err) + + endTime := time.Now() + type JobResult struct { + JobError *clienterrors.Error `json:"job_error"` + } + + var r1 JobResult + err = json.Unmarshal(result, &r1) + require.NoError(t, err, fmt.Sprintf("Error %v when trying to unmarshal %v", err, string(result))) + + require.NotNil(t, r1) + require.Equal(t, "Test timeout reason", r1.JobError.Reason) + require.Equal(t, "Test timeout details", r1.JobError.Details) + require.Equal(t, clienterrors.ErrorDepsolveTimeout, r1.JobError.ID) + require.Equal(t, "testchannel", channel) + require.Equal(t, "coralreef", jobType) + require.Equal(t, false, canceled) + + allTimings := []time.Time{queued, started, finished} + + for _, tmr := range allTimings { + require.Less(t, startTime, tmr) + require.Greater(t, endTime, tmr) + } + +} diff --git a/internal/worker/clienterrors/errors.go b/internal/worker/clienterrors/errors.go index 7d25610b06..417c0ee954 100644 --- a/internal/worker/clienterrors/errors.go +++ b/internal/worker/clienterrors/errors.go @@ -44,6 +44,7 @@ const ( ErrorJobPanicked ClientErrorCode = 37 ErrorGeneratingSignedURL ClientErrorCode = 38 ErrorInvalidRepositoryURL ClientErrorCode = 39 + ErrorDepsolveTimeout ClientErrorCode = 40 ) type ClientErrorCode int diff --git a/internal/worker/server.go b/internal/worker/server.go index 137ea1d7a5..a5e2c8d5e7 100644 --- a/internal/worker/server.go +++ b/internal/worker/server.go @@ -566,6 +566,20 @@ func (s *Server) Cancel(id uuid.UUID) error { return s.jobs.CancelJob(id) } +// SetFailed sets the given job id to "failed" with the given error +func (s *Server) SetFailed(id uuid.UUID, error *clienterrors.Error) error { + FailedJobErrorResult := JobResult{ + JobError: error, + } + + res, err := json.Marshal(FailedJobErrorResult) + if err != nil { + logrus.Errorf("error marshalling the error: %v", err) + return nil + } + return s.jobs.FailJob(id, res) +} + // Provides access to artifacts of a job. Returns an io.Reader for the artifact // and the artifact's size. func (s *Server) JobArtifact(id uuid.UUID, name string) (io.Reader, int64, error) { diff --git a/pkg/jobqueue/dbjobqueue/dbjobqueue.go b/pkg/jobqueue/dbjobqueue/dbjobqueue.go index 35e5531e9d..58c34cc237 100644 --- a/pkg/jobqueue/dbjobqueue/dbjobqueue.go +++ b/pkg/jobqueue/dbjobqueue/dbjobqueue.go @@ -94,6 +94,12 @@ const ( WHERE id = $1 AND finished_at IS NULL RETURNING type, started_at` + sqlFailJob = ` + UPDATE jobs + SET token = $2, started_at = now(), finished_at = now(), result = $3 + WHERE id = $1 AND finished_at IS NULL AND started_at IS NULL AND token IS NULL + RETURNING id, type` + sqlInsertHeartbeat = ` INSERT INTO heartbeats(token, id, heartbeat) VALUES ($1, $2, now())` @@ -592,6 +598,32 @@ func (q *DBJobQueue) CancelJob(id uuid.UUID) error { return nil } +func (q *DBJobQueue) FailJob(id uuid.UUID, result interface{}) error { + conn, err := q.pool.Acquire(context.Background()) + if err != nil { + return fmt.Errorf("error connecting to database: %w", err) + } + defer conn.Release() + + var jobType string + var resultId uuid.UUID + dummyToken := uuid.New() + err = conn.QueryRow(context.Background(), sqlFailJob, id, dummyToken, result).Scan(&resultId, &jobType) + if errors.Is(err, pgx.ErrNoRows) { + return jobqueue.ErrNotRunning + } + if err != nil { + return fmt.Errorf("error failing job %s: %w", id, err) + } + if id != resultId { + return fmt.Errorf("that should never happen, I wanted to set %s to failed but got %s back from DB", id, resultId) + } + + q.logger.Info("Job set to failed", "job_type", jobType, "job_id", id.String()) + + return nil +} + func (q *DBJobQueue) JobStatus(id uuid.UUID) (jobType string, channel string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, dependents []uuid.UUID, err error) { conn, err := q.pool.Acquire(context.Background()) if err != nil { diff --git a/pkg/jobqueue/jobqueue.go b/pkg/jobqueue/jobqueue.go index f92a00730c..a1c679fcf2 100644 --- a/pkg/jobqueue/jobqueue.go +++ b/pkg/jobqueue/jobqueue.go @@ -59,6 +59,9 @@ type JobQueue interface { // Cancel a job. Does nothing if the job has already finished. CancelJob(id uuid.UUID) error + // Fail a job that didn't even start (e.g. no worker available) + FailJob(id uuid.UUID, result interface{}) error + // If the job has finished, returns the result as raw JSON. // // Returns the current status of the job, in the form of three times: @@ -114,6 +117,8 @@ var ( ErrDequeueTimeout = errors.New("dequeue context timed out or was canceled") ErrActiveJobs = errors.New("worker has active jobs associated with it") ErrWorkerNotExist = errors.New("worker does not exist") + ErrRunning = errors.New("job is running, but wasn't expected to be") + ErrFinished = errors.New("job is finished, but wasn't expected to be") ) type Worker struct {