From e473e40765ef82ab83f05cd6047dd3bb5bc0765e Mon Sep 17 00:00:00 2001 From: Supraja Sampath Date: Mon, 5 Aug 2024 17:29:38 +0200 Subject: [PATCH] patch processing files in check running jobs (#1012) --- gateway/utils/queue.go | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/gateway/utils/queue.go b/gateway/utils/queue.go index 6a7289c3..d60dc6d8 100644 --- a/gateway/utils/queue.go +++ b/gateway/utils/queue.go @@ -164,19 +164,12 @@ func checkRunningJob(jobID uint, db *gorm.DB) error { return setJobStatus(&job, models.JobStateStopped, fmt.Sprintf("Ray job %v was stopped", job.RayJobID), db) } else if ray.JobSucceeded(job.RayJobID) { fmt.Printf("Job %v , %v completed, updating status and adding output files\n", job.ID, job.RayJobID) - responseFileName := fmt.Sprintf("%s/%s_response.json", job.RayJobID, job.RayJobID) - bytes := GetRayJobResponseFromS3(responseFileName, &job, db) - rayJobResponse, err := UnmarshalRayJobResponse(bytes) - if err != nil { - fmt.Println("Error unmarshalling result JSON:", err) - return err - } - completeRayJobAndAddFiles(&job, bytes, rayJobResponse, db) + processNewFiles(&job, db) + return setJobStatus(&job, models.JobStateSucceeded, "", db) } else { fmt.Printf("Job %v , %v had unexpected Ray state %v, marking as failed\n", job.ID, job.RayJobID, job.JobStatus) return setJobStatus(&job, models.JobStateFailed, fmt.Sprintf("unexpected Ray state %v", job.JobStatus), db) } - return nil } func GetRayJobResponseFromS3(key string, job *models.Job, db *gorm.DB) []byte {