From 763f6850ad0794cc35ddf7fb7c7a98d826990cfe Mon Sep 17 00:00:00 2001 From: iwannay <772648576@qq.com> Date: Mon, 5 Aug 2019 21:58:23 +0800 Subject: [PATCH] v2.0.1 --- CHANGELOG.md | 8 ++++++++ jiacrontabd/dependencies.go | 7 +++---- jiacrontabd/jiacrontabd.go | 17 +++++++++++++---- jiacrontabd/job.go | 31 +++++++++++++++---------------- pkg/version/ver.go | 2 +- 5 files changed, 40 insertions(+), 25 deletions(-) create mode 100644 CHANGELOG.md diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 00000000..32c848e7 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,8 @@ +# 更新记录 + +## v2.0.1 + +1.修复进程数量显示异常 +2.手动执行任务的支持kill +3.修复死锁造成的运行异常 +4.修复依赖任务自定义代码不执行 diff --git a/jiacrontabd/dependencies.go b/jiacrontabd/dependencies.go index 690ff9b2..a0c06d46 100644 --- a/jiacrontabd/dependencies.go +++ b/jiacrontabd/dependencies.go @@ -77,7 +77,6 @@ func (d *dependencies) exec(task *depEntry) { ctx, cancel := context.WithTimeout(context.Background(), time.Duration(task.timeout)*time.Second) defer cancel() - myCmdUnit := cmdUint{ args: [][]string{task.commands}, ctx: ctx, @@ -89,11 +88,11 @@ func (d *dependencies) exec(task *depEntry) { exportLog: true, } - err = myCmdUnit.launch() - log.Infof("exec %s %s cost %.4fs %v", task.name, task.commands, float64(myCmdUnit.costTime)/1000000000, err) + log.Infof("dep start exec %s->%v", task.name, task.commands) + task.err = myCmdUnit.launch() task.logContent = bytes.TrimRight(myCmdUnit.content, "\x00") task.done = true - task.err = err + log.Infof("exec %s %s cost %.4fs %v", task.name, task.commands, float64(myCmdUnit.costTime)/1000000000, err) task.dest, task.from = task.from, task.dest diff --git a/jiacrontabd/jiacrontabd.go b/jiacrontabd/jiacrontabd.go index 4ddbbf74..505947d2 100644 --- a/jiacrontabd/jiacrontabd.go +++ b/jiacrontabd/jiacrontabd.go @@ -105,13 +105,22 @@ func (j *Jiacrontabd) execTask(job *crontab.Job) { } func (j *Jiacrontabd) killTask(jobID uint) { + var jobs []*JobEntry j.mux.RLock() - if task, ok := j.jobs[jobID]; ok { - j.mux.RUnlock() - task.kill() - return + if job, ok := j.jobs[jobID]; ok { + jobs = append(jobs, job) + } + + for _, v := range j.tmpJobs { + if v.detail.ID == jobID { + jobs = append(jobs, v) + } } j.mux.RUnlock() + + for _, v := range jobs { + v.kill() + } } func (j *Jiacrontabd) run() { diff --git a/jiacrontabd/job.go b/jiacrontabd/job.go index 160751c3..227572d6 100644 --- a/jiacrontabd/job.go +++ b/jiacrontabd/job.go @@ -13,8 +13,6 @@ import ( "sync/atomic" "time" - "github.com/jinzhu/gorm" - "github.com/iwannay/log" ) @@ -50,13 +48,17 @@ func newProcess(id uint32, jobEntry *JobEntry) *process { p.ctx, p.cancel = context.WithCancel(context.Background()) for _, v := range p.jobEntry.detail.DependJobs { + cmd := v.Command + if v.Code != "" { + cmd = append(cmd, v.Code) + } p.deps = append(p.deps, &depEntry{ jobID: p.jobEntry.detail.ID, processID: int(id), jobUniqueID: p.jobEntry.uniqueID, id: v.ID, from: v.From, - commands: v.Command, + commands: cmd, dest: v.Dest, logPath: filepath.Join(p.jobEntry.jd.getOpts().LogPath, "depend_job", time.Now().Format("2006/01/02"), fmt.Sprintf("%d-%s.log", v.JobID, v.ID)), done: false, @@ -372,6 +374,7 @@ func (j *JobEntry) exec() { var err error if j.once { err = models.DB().Take(&j.detail, "id=?", j.job.ID).Error + atomic.StoreInt32(&j.processNum, int32(j.detail.ProcessNum)) } else { err = models.DB().Take(&j.detail, "id=? and status in(?)", j.job.ID, []models.JobStatus{models.StatusJobTiming, models.StatusJobRunning}).Error @@ -379,7 +382,7 @@ func (j *JobEntry) exec() { if err != nil { j.jd.deleteJob(j.detail.ID) - log.Warn("JobEntry.exec:", err) + log.Warnf("jobID:%d JobEntry.exec:%v", j.detail.ID, err) return } @@ -401,7 +404,8 @@ func (j *JobEntry) exec() { j.jd.addJob(j.job) } - if atomic.LoadInt32(&j.processNum) >= int32(j.detail.MaxConcurrent) { + if atomic.LoadInt32(&j.processNum) >= int32(j.detail.MaxConcurrent) && j.detail.MaxConcurrent != 0 { + j.logContent = []byte("不得超过job最大并发数量") return } @@ -473,13 +477,13 @@ func (j *JobEntry) updateJob(status models.JobStatus, startTime, endTime time.Ti data["last_cost_time"] = endTime.Sub(startTime).Seconds() } - if j.once && (status == models.StatusJobRunning) { - data["process_num"] = gorm.Expr("process_num + ?", 1) - } + // if j.once && (status == models.StatusJobRunning) { + // data["process_num"] = gorm.Expr("process_num + ?", 1) + // } - if j.once && (status == models.StatusJobTiming) { - data["process_num"] = gorm.Expr("process_num - ?", 1) - } + // if j.once && (status == models.StatusJobTiming) { + // data["process_num"] = gorm.Expr("process_num - ?", 1) + // } var errMsg string if err != nil { @@ -514,11 +518,6 @@ func (j *JobEntry) updateJob(status models.JobStatus, startTime, endTime time.Ti func (j *JobEntry) kill() { j.exit() j.waitDone() - if err := models.DB().Model(&j.detail).Updates(map[string]interface{}{ - "process_num": 0, - }).Error; err != nil { - log.Error("JobEntry.kill", err) - } } func (j *JobEntry) waitDone() []byte { diff --git a/pkg/version/ver.go b/pkg/version/ver.go index 5886ad70..6509a4da 100644 --- a/pkg/version/ver.go +++ b/pkg/version/ver.go @@ -5,7 +5,7 @@ import ( "runtime" ) -const Binary = "2.0.0" +const Binary = "2.0.1" func String(app string) string { return fmt.Sprintf("%s v%s (built w/%s)", app, Binary, runtime.Version())