Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

internal/worker/server: return an error on depsolve timeout (HMS-2989) #4398

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ jobs:
run: sudo apt-get update
# gpgme-devel is needed for container upload dependencies
- name: Install test dependencies
run: sudo apt-get install -y libgpgme-dev
run: sudo apt-get install -y libgpgme-dev libbtrfs-dev
croissanne marked this conversation as resolved.
Show resolved Hide resolved
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ push-check: lint build unit-tests srpm man
./tools/check-runners
./tools/check-snapshots --errors-only .
rpmlint --config rpmlint.config $(CURDIR)/rpmbuild/SRPMS/*
./tools/prepare-source.sh
@if [ 0 -ne $$(git status --porcelain --untracked-files|wc -l) ]; then \
echo "There should be no changed or untracked files"; \
git status --porcelain --untracked-files; \
Expand Down Expand Up @@ -350,6 +349,7 @@ SHELLCHECK_FILES=$(shell find . -name "*.sh" -not -regex "./vendor/.*")

.PHONY: lint
lint: $(GOLANGCI_LINT_CACHE_DIR) container_composer_golangci_built.info
./tools/prepare-source.sh
podman run -t --rm -v $(SRCDIR):/app:z -v $(GOLANGCI_LINT_CACHE_DIR):/root/.cache:z -w /app $(GOLANGCI_COMPOSER_IMAGE) golangci-lint run -v
echo "$(SHELLCHECK_FILES)" | xargs shellcheck --shell bash -e SC1091 -e SC2002 -e SC2317

Expand Down
93 changes: 65 additions & 28 deletions internal/cloudapi/v2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package v2
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
Expand Down Expand Up @@ -445,22 +446,81 @@ func (s *Server) enqueueKojiCompose(taskID uint64, server, name, version, releas
}

func serializeManifest(ctx context.Context, manifestSource *manifest.Manifest, workers *worker.Server, depsolveJobID, containerResolveJobID, ostreeResolveJobID, manifestJobID uuid.UUID, seed int64) {
ctx, cancel := context.WithTimeout(ctx, time.Minute*5)
// prepared to become a config variable
const depsolveTimeout = 5
ctx, cancel := context.WithTimeout(ctx, time.Minute*depsolveTimeout)
defer cancel()

// wait until job is in a pending state
var token uuid.UUID
jobResult := &worker.ManifestJobByIDResult{
Manifest: nil,
ManifestInfo: worker.ManifestInfo{
OSBuildComposerVersion: common.BuildVersion(),
},
}

var dynArgs []json.RawMessage
var err error
token := uuid.Nil
logWithId := logrus.WithField("jobId", manifestJobID)

defer func() {
// token == uuid.Nil indicates that no worker even started processing
if token == uuid.Nil {
if jobResult.JobError != nil {
// set all jobs to "failed"
croissanne marked this conversation as resolved.
Show resolved Hide resolved
// osbuild job will fail as dependency
jobs := []struct {
Name string
ID uuid.UUID
}{
{"depsolve", depsolveJobID},
{"containerResolve", containerResolveJobID},
{"ostreeResolve", ostreeResolveJobID},
{"manifest", manifestJobID},
}

for _, job := range jobs {
if job.ID != uuid.Nil {
err := workers.SetFailed(job.ID, jobResult.JobError)
if err != nil {
logWithId.Errorf("Error failing %s job: %v", job.Name, err)
}
}
}

} else {
logWithId.Errorf("Internal error, no worker started depsolve but we didn't get a reason.")
}
} else {
result, err := json.Marshal(jobResult)
if err != nil {
logWithId.Errorf("Error marshalling manifest job results: %v", err)
}
err = workers.FinishJob(token, result)
if err != nil {
logWithId.Errorf("Error finishing manifest job: %v", err)
}
if jobResult.JobError != nil {
logWithId.Errorf("Error in manifest job %v: %v", jobResult.JobError.Reason, err)
}
}
}()

// wait until job is in a pending state
for {
_, token, _, _, dynArgs, err = workers.RequestJobById(ctx, "", manifestJobID)
if err == jobqueue.ErrNotPending {
if errors.Is(err, jobqueue.ErrNotPending) {
logWithId.Debug("Manifest job not pending, waiting for depsolve job to finish")
time.Sleep(time.Millisecond * 50)
select {
case <-ctx.Done():
logWithId.Warning("Manifest job dependencies took longer than 5 minutes to finish, or the server is shutting down, returning to avoid dangling routines")
logWithId.Warning(fmt.Sprintf("Manifest job dependencies took longer than %d minutes to finish,"+
" or the server is shutting down, returning to avoid dangling routines", depsolveTimeout))

jobResult.JobError = clienterrors.New(clienterrors.ErrorDepsolveTimeout,
"Timeout while waiting for package dependency resolution",
"There may be a temporary issue with compute resources.",
)
break
default:
continue
Expand All @@ -473,13 +533,6 @@ func serializeManifest(ctx context.Context, manifestSource *manifest.Manifest, w
break
}

jobResult := &worker.ManifestJobByIDResult{
Manifest: nil,
ManifestInfo: worker.ManifestInfo{
OSBuildComposerVersion: common.BuildVersion(),
},
}

// add osbuild/images dependency info to job result
osbuildImagesDep, err := common.GetDepModuleInfoByPath(common.OSBuildImagesModulePath)
if err != nil {
Expand All @@ -491,22 +544,6 @@ func serializeManifest(ctx context.Context, manifestSource *manifest.Manifest, w
jobResult.ManifestInfo.OSBuildComposerDeps = append(jobResult.ManifestInfo.OSBuildComposerDeps, osbuildImagesDepModule)
}

defer func() {
if jobResult.JobError != nil {
logWithId.Errorf("Error in manifest job %v: %v", jobResult.JobError.Reason, err)
}

result, err := json.Marshal(jobResult)
if err != nil {
logWithId.Errorf("Error marshalling manifest job results: %v", err)
}

err = workers.FinishJob(token, result)
if err != nil {
logWithId.Errorf("Error finishing manifest job: %v", err)
}
}()

if len(dynArgs) == 0 {
reason := "No dynamic arguments"
jobResult.JobError = clienterrors.New(clienterrors.ErrorNoDynamicArgs, reason, nil)
Expand Down
34 changes: 34 additions & 0 deletions internal/jobqueue/fsjobqueue/fsjobqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,40 @@ func (q *fsJobQueue) CancelJob(id uuid.UUID) error {
return nil
}

func (q *fsJobQueue) FailJob(id uuid.UUID, result interface{}) error {
q.mu.Lock()
defer q.mu.Unlock()

j, err := q.readJob(id)
if err != nil {
return err
}

if !j.FinishedAt.IsZero() {
return jobqueue.ErrFinished
}

if !j.StartedAt.IsZero() {
return jobqueue.ErrRunning
}

j.Result, err = json.Marshal(result)
if err != nil {
return err
}

j.StartedAt = time.Now()
j.FinishedAt = time.Now()
j.Token = uuid.New()

err = q.db.Write(id.String(), j)
if err != nil {
return fmt.Errorf("error writing job %s: %v", id, err)
}

return nil
}

func (q *fsJobQueue) JobStatus(id uuid.UUID) (jobType string, channel string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, dependents []uuid.UUID, err error) {
j, err := q.readJob(id)
if err != nil {
Expand Down
54 changes: 54 additions & 0 deletions internal/jobqueue/jobqueuetest/jobqueuetest.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/osbuild/osbuild-composer/internal/worker"
"github.com/osbuild/osbuild-composer/internal/worker/clienterrors"
"os"
"sync"
"testing"
Expand Down Expand Up @@ -55,6 +57,7 @@ func TestJobQueue(t *testing.T, makeJobQueue MakeJobQueue) {
t.Run("multiple-channels", wrap(testMultipleChannels))
t.Run("100-dequeuers", wrap(test100dequeuers))
t.Run("workers", wrap(testWorkers))
t.Run("fail", wrap(testFail))
}

func pushTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, args interface{}, dependencies []uuid.UUID, channel string) uuid.UUID {
Expand Down Expand Up @@ -762,3 +765,54 @@ func testWorkers(t *testing.T, q jobqueue.JobQueue) {
err = q.DeleteWorker(w2)
require.NoError(t, err)
}

func testFail(t *testing.T, q jobqueue.JobQueue) {
startTime := time.Now()

FailedJobErrorResult := worker.JobResult{
JobError: clienterrors.New(clienterrors.ErrorDepsolveTimeout,
"Test timeout reason",
"Test timeout details",
),
}

// set a non-existing job to failed
err := q.FailJob(uuid.New(), FailedJobErrorResult)
require.Error(t, err)

// Cancel a pending job
id := pushTestJob(t, q, "coralreef", nil, nil, "testchannel")
require.NotEmpty(t, id)

err = q.FailJob(id, FailedJobErrorResult)
require.NoError(t, err)

//nolint:golint,ineffassign
jobType, channel, result, queued, started, finished, canceled, _, _, err := q.JobStatus(id)
require.NoError(t, err)

endTime := time.Now()
type JobResult struct {
JobError *clienterrors.Error `json:"job_error"`
}

var r1 JobResult
err = json.Unmarshal(result, &r1)
require.NoError(t, err, fmt.Sprintf("Error %v when trying to unmarshal %v", err, string(result)))

require.NotNil(t, r1)
require.Equal(t, "Test timeout reason", r1.JobError.Reason)
require.Equal(t, "Test timeout details", r1.JobError.Details)
require.Equal(t, clienterrors.ErrorDepsolveTimeout, r1.JobError.ID)
require.Equal(t, "testchannel", channel)
require.Equal(t, "coralreef", jobType)
require.Equal(t, false, canceled)

allTimings := []time.Time{queued, started, finished}

for _, tmr := range allTimings {
require.Less(t, startTime, tmr)
require.Greater(t, endTime, tmr)
}

}
1 change: 1 addition & 0 deletions internal/worker/clienterrors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const (
ErrorJobPanicked ClientErrorCode = 37
ErrorGeneratingSignedURL ClientErrorCode = 38
ErrorInvalidRepositoryURL ClientErrorCode = 39
ErrorDepsolveTimeout ClientErrorCode = 40
)

type ClientErrorCode int
Expand Down
14 changes: 14 additions & 0 deletions internal/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,20 @@ func (s *Server) Cancel(id uuid.UUID) error {
return s.jobs.CancelJob(id)
}

// SetFailed sets the given job id to "failed" with the given error
func (s *Server) SetFailed(id uuid.UUID, error *clienterrors.Error) error {
FailedJobErrorResult := JobResult{
JobError: error,
}

res, err := json.Marshal(FailedJobErrorResult)
if err != nil {
logrus.Errorf("error marshalling the error: %v", err)
return nil
}
return s.jobs.FailJob(id, res)
}

// Provides access to artifacts of a job. Returns an io.Reader for the artifact
// and the artifact's size.
func (s *Server) JobArtifact(id uuid.UUID, name string) (io.Reader, int64, error) {
Expand Down
32 changes: 32 additions & 0 deletions pkg/jobqueue/dbjobqueue/dbjobqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ const (
WHERE id = $1 AND finished_at IS NULL
RETURNING type, started_at`

sqlFailJob = `
croissanne marked this conversation as resolved.
Show resolved Hide resolved
UPDATE jobs
SET token = $2, started_at = now(), finished_at = now(), result = $3
WHERE id = $1 AND finished_at IS NULL AND started_at IS NULL AND token IS NULL
RETURNING id, type`

sqlInsertHeartbeat = `
INSERT INTO heartbeats(token, id, heartbeat)
VALUES ($1, $2, now())`
Expand Down Expand Up @@ -592,6 +598,32 @@ func (q *DBJobQueue) CancelJob(id uuid.UUID) error {
return nil
}

func (q *DBJobQueue) FailJob(id uuid.UUID, result interface{}) error {
conn, err := q.pool.Acquire(context.Background())
if err != nil {
return fmt.Errorf("error connecting to database: %w", err)
}
defer conn.Release()

var jobType string
var resultId uuid.UUID
dummyToken := uuid.New()
err = conn.QueryRow(context.Background(), sqlFailJob, id, dummyToken, result).Scan(&resultId, &jobType)
if errors.Is(err, pgx.ErrNoRows) {
return jobqueue.ErrNotRunning
}
if err != nil {
return fmt.Errorf("error failing job %s: %w", id, err)
}
if id != resultId {
return fmt.Errorf("that should never happen, I wanted to set %s to failed but got %s back from DB", id, resultId)
}

q.logger.Info("Job set to failed", "job_type", jobType, "job_id", id.String())

return nil
}

func (q *DBJobQueue) JobStatus(id uuid.UUID) (jobType string, channel string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, dependents []uuid.UUID, err error) {
conn, err := q.pool.Acquire(context.Background())
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions pkg/jobqueue/jobqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ type JobQueue interface {
// Cancel a job. Does nothing if the job has already finished.
CancelJob(id uuid.UUID) error

// Fail a job that didn't even start (e.g. no worker available)
FailJob(id uuid.UUID, result interface{}) error

// If the job has finished, returns the result as raw JSON.
//
// Returns the current status of the job, in the form of three times:
Expand Down Expand Up @@ -114,6 +117,8 @@ var (
ErrDequeueTimeout = errors.New("dequeue context timed out or was canceled")
ErrActiveJobs = errors.New("worker has active jobs associated with it")
ErrWorkerNotExist = errors.New("worker does not exist")
ErrRunning = errors.New("job is running, but wasn't expected to be")
ErrFinished = errors.New("job is finished, but wasn't expected to be")
)

type Worker struct {
Expand Down
Loading