Skip to content

Commit

Permalink
queue fix to not set job to running while submission
Browse files Browse the repository at this point in the history
  • Loading branch information
supraja-968 committed Aug 6, 2024
1 parent 98262cd commit 0e08c0b
Showing 1 changed file with 17 additions and 15 deletions.
32 changes: 17 additions & 15 deletions gateway/utils/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (rq *RayQueue) worker(workerID int) {
fmt.Printf("Error marking dangling jobs to stopped: %v\n", err)
}
var job models.Job
err = fetchAndMarkOldestQueuedJobAsProcessing(&job, models.QueueTypeRay, rq.db)
err = fetchAndMarkOldestQueuedJobAsProcessing(&job, rq.db)
if err != nil {
state.Busy = false
state.CurrentJob = nil
Expand Down Expand Up @@ -140,9 +140,6 @@ func fetchAndMarkOldestRunningJobAsStopped(db *gorm.DB) error {
func checkRunningJob(jobID uint, db *gorm.DB) error {
var job models.Job
err := fetchJobWithModelAndExperimentData(&job, jobID, db)
if err != nil {
return err
}
if err != nil && strings.Contains(err.Error(), "Job not found") {
fmt.Printf("Job %v , %v has missing Ray Job, failing Job\n", job.ID, job.RayJobID)
return setJobStatus(&job, models.JobStateFailed, fmt.Sprintf("Ray job %v not found", job.RayJobID), db)
Expand All @@ -154,6 +151,14 @@ func checkRunningJob(jobID uint, db *gorm.DB) error {
fmt.Printf("Job %v , %v is still in pending state nothing to do\n", job.ID, job.RayJobID)
return nil
} else if ray.JobIsRunning(job.RayJobID) {
if job.JobStatus != models.JobStateRunning {
fmt.Printf("Job %v , %v is running, updating status\n", job.ID, job.RayJobID)
err := setJobStatus(&job, models.JobStateRunning, "", db)
if err != nil {
return err
}
createInferenceEvent(job.ID, models.JobStateRunning, job.RayJobID, job.RetryCount, db)
}
fmt.Printf("Job %v , %v is still running nothing to do\n", job.ID, job.RayJobID)
return nil
} else if ray.JobFailed(job.RayJobID) {
Expand Down Expand Up @@ -248,7 +253,7 @@ func fetchRunningJobsWithModelData(jobs *[]models.Job, db *gorm.DB) error {
return db.Preload("Model").Where("job_status = ?", models.JobStateRunning).Where("job_type", models.JobTypeJob).Find(jobs).Error
}

func fetchAndMarkOldestQueuedJobAsProcessing(job *models.Job, queueType models.QueueType, db *gorm.DB) error {
func fetchAndMarkOldestQueuedJobAsProcessing(job *models.Job, db *gorm.DB) error {
return db.Transaction(func(tx *gorm.DB) error {
if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("job_status = ?", models.JobStateQueued).Order("created_at ASC").First(job).Error; err != nil {
return err
Expand Down Expand Up @@ -417,9 +422,8 @@ func submitRayJobAndUpdateID(job *models.Job, db *gorm.DB) error {
modelPath := job.Model.S3URI

log.Printf("Submitting to Ray with inputs: %+v\n", inputs)
createInferenceEvent(job.ID, models.JobStateRunning, job.RayJobID, 0, db)
setJobStatusAndID(job, models.JobStateRunning, job.RayJobID, "", db)
log.Printf("setting job %v to running\n", job.ID)
setRayJobID(job, job.RayJobID, db)
// log.Printf("setting job %v to running\n", job.ID)
resp, err := ray.SubmitRayJob(*job, modelPath, job.RayJobID, inputs, db)
if err != nil {
return err
Expand Down Expand Up @@ -508,15 +512,13 @@ func submitRayJobAndUpdateID(job *models.Job, db *gorm.DB) error {
return nil
}

func setJobStatusAndID(job *models.Job, state models.JobState, rayJobID string, errorMessage string, db *gorm.DB) error {
job.JobStatus = state
job.StartedAt = time.Now().UTC()
job.Error = errorMessage
func setRayJobID(job *models.Job, rayJobID string, db *gorm.DB) error {
job.RayJobID = rayJobID
err := db.Save(&job).Error
if err != nil {
return err
}
createInferenceEvent(job.ID, models.JobStatePending, job.RayJobID, 0, db)
return nil
}

Expand Down Expand Up @@ -587,7 +589,7 @@ func completeRayJobAndAddFiles(job *models.Job, body []byte, resultJSON models.R
}
}

if err := addFiles(job, body, resultJSON, db); err != nil {
if err := addFiles(job, resultJSON, db); err != nil {
return fmt.Errorf("failed to add files: %v", err)
}

Expand Down Expand Up @@ -681,7 +683,7 @@ func processFile(fileName string, job *models.Job, db *gorm.DB) error {
}

// Add files and update related job data in the database without marking the job as completed
if err := addFiles(job, data, rayJobResponse, db); err != nil {
if err := addFiles(job, rayJobResponse, db); err != nil {
log.Printf("Failed to add files and update job for job %d from file %s: %v", job.ID, fileName, err)
return err
}
Expand Down Expand Up @@ -723,7 +725,7 @@ func fileProcessed(fileName string, jobID uint, db *gorm.DB) bool {
return count > 0
}

func addFiles(job *models.Job, data []byte, response models.RayJobResponse, db *gorm.DB) error {
func addFiles(job *models.Job, response models.RayJobResponse, db *gorm.DB) error {
fmt.Printf("Adding output files for job %d\n", job.ID)

fmt.Printf("Looping through files in RayJobResponse\n %v\n", response.Files)
Expand Down

0 comments on commit 0e08c0b

Please sign in to comment.