Skip to content

Commit

Permalink
v2.0.1
Browse files Browse the repository at this point in the history
  • Loading branch information
iwannay committed Aug 5, 2019
1 parent 6bfab3d commit 763f685
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 25 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# 更新记录

## v2.0.1

1.修复进程数量显示异常
2.手动执行任务的支持kill
3.修复死锁造成的运行异常
4.修复依赖任务自定义代码不执行
7 changes: 3 additions & 4 deletions jiacrontabd/dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ func (d *dependencies) exec(task *depEntry) {

ctx, cancel := context.WithTimeout(context.Background(), time.Duration(task.timeout)*time.Second)
defer cancel()

myCmdUnit := cmdUint{
args: [][]string{task.commands},
ctx: ctx,
Expand All @@ -89,11 +88,11 @@ func (d *dependencies) exec(task *depEntry) {
exportLog: true,
}

err = myCmdUnit.launch()
log.Infof("exec %s %s cost %.4fs %v", task.name, task.commands, float64(myCmdUnit.costTime)/1000000000, err)
log.Infof("dep start exec %s->%v", task.name, task.commands)
task.err = myCmdUnit.launch()
task.logContent = bytes.TrimRight(myCmdUnit.content, "\x00")
task.done = true
task.err = err
log.Infof("exec %s %s cost %.4fs %v", task.name, task.commands, float64(myCmdUnit.costTime)/1000000000, err)

task.dest, task.from = task.from, task.dest

Expand Down
17 changes: 13 additions & 4 deletions jiacrontabd/jiacrontabd.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,22 @@ func (j *Jiacrontabd) execTask(job *crontab.Job) {
}

func (j *Jiacrontabd) killTask(jobID uint) {
var jobs []*JobEntry
j.mux.RLock()
if task, ok := j.jobs[jobID]; ok {
j.mux.RUnlock()
task.kill()
return
if job, ok := j.jobs[jobID]; ok {
jobs = append(jobs, job)
}

for _, v := range j.tmpJobs {
if v.detail.ID == jobID {
jobs = append(jobs, v)
}
}
j.mux.RUnlock()

for _, v := range jobs {
v.kill()
}
}

func (j *Jiacrontabd) run() {
Expand Down
31 changes: 15 additions & 16 deletions jiacrontabd/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (
"sync/atomic"
"time"

"github.com/jinzhu/gorm"

"github.com/iwannay/log"
)

Expand Down Expand Up @@ -50,13 +48,17 @@ func newProcess(id uint32, jobEntry *JobEntry) *process {
p.ctx, p.cancel = context.WithCancel(context.Background())

for _, v := range p.jobEntry.detail.DependJobs {
cmd := v.Command
if v.Code != "" {
cmd = append(cmd, v.Code)
}
p.deps = append(p.deps, &depEntry{
jobID: p.jobEntry.detail.ID,
processID: int(id),
jobUniqueID: p.jobEntry.uniqueID,
id: v.ID,
from: v.From,
commands: v.Command,
commands: cmd,
dest: v.Dest,
logPath: filepath.Join(p.jobEntry.jd.getOpts().LogPath, "depend_job", time.Now().Format("2006/01/02"), fmt.Sprintf("%d-%s.log", v.JobID, v.ID)),
done: false,
Expand Down Expand Up @@ -372,14 +374,15 @@ func (j *JobEntry) exec() {
var err error
if j.once {
err = models.DB().Take(&j.detail, "id=?", j.job.ID).Error
atomic.StoreInt32(&j.processNum, int32(j.detail.ProcessNum))
} else {
err = models.DB().Take(&j.detail, "id=? and status in(?)",
j.job.ID, []models.JobStatus{models.StatusJobTiming, models.StatusJobRunning}).Error
}

if err != nil {
j.jd.deleteJob(j.detail.ID)
log.Warn("JobEntry.exec:", err)
log.Warnf("jobID:%d JobEntry.exec:%v", j.detail.ID, err)
return
}

Expand All @@ -401,7 +404,8 @@ func (j *JobEntry) exec() {
j.jd.addJob(j.job)
}

if atomic.LoadInt32(&j.processNum) >= int32(j.detail.MaxConcurrent) {
if atomic.LoadInt32(&j.processNum) >= int32(j.detail.MaxConcurrent) && j.detail.MaxConcurrent != 0 {
j.logContent = []byte("不得超过job最大并发数量")
return
}

Expand Down Expand Up @@ -473,13 +477,13 @@ func (j *JobEntry) updateJob(status models.JobStatus, startTime, endTime time.Ti
data["last_cost_time"] = endTime.Sub(startTime).Seconds()
}

if j.once && (status == models.StatusJobRunning) {
data["process_num"] = gorm.Expr("process_num + ?", 1)
}
// if j.once && (status == models.StatusJobRunning) {
// data["process_num"] = gorm.Expr("process_num + ?", 1)
// }

if j.once && (status == models.StatusJobTiming) {
data["process_num"] = gorm.Expr("process_num - ?", 1)
}
// if j.once && (status == models.StatusJobTiming) {
// data["process_num"] = gorm.Expr("process_num - ?", 1)
// }

var errMsg string
if err != nil {
Expand Down Expand Up @@ -514,11 +518,6 @@ func (j *JobEntry) updateJob(status models.JobStatus, startTime, endTime time.Ti
func (j *JobEntry) kill() {
j.exit()
j.waitDone()
if err := models.DB().Model(&j.detail).Updates(map[string]interface{}{
"process_num": 0,
}).Error; err != nil {
log.Error("JobEntry.kill", err)
}
}

func (j *JobEntry) waitDone() []byte {
Expand Down
2 changes: 1 addition & 1 deletion pkg/version/ver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"runtime"
)

const Binary = "2.0.0"
const Binary = "2.0.1"

func String(app string) string {
return fmt.Sprintf("%s v%s (built w/%s)", app, Binary, runtime.Version())
Expand Down

0 comments on commit 763f685

Please sign in to comment.