140 lines
3.1 KiB
Go
140 lines
3.1 KiB
Go
/************************************************************
|
||
** @Description: job
|
||
** @Author: george hao
|
||
** @Date: 2019-06-24 15:14
|
||
** @Last Modified by: george hao
|
||
** @Last Modified time: 2019-06-24 15:14
|
||
*************************************************************/
|
||
package server
|
||
|
||
import (
|
||
"bytes"
|
||
"fmt"
|
||
"github.com/astaxie/beego/logs"
|
||
. "github.com/george518/PPGo_Job/jobs"
|
||
"github.com/george518/PPGo_Job/libs"
|
||
"github.com/george518/PPGo_Job/models"
|
||
"os/exec"
|
||
"runtime"
|
||
"runtime/debug"
|
||
"sync"
|
||
"time"
|
||
)
|
||
|
||
//执行句柄map
|
||
var CmdMap sync.Map
|
||
|
||
func SetCmdMap(key string, cmd *exec.Cmd) {
|
||
if _, ok := CmdMap.Load(key); ok {
|
||
Counter.Store(key, cmd)
|
||
}
|
||
}
|
||
|
||
func GetCmdMap(key string) *exec.Cmd {
|
||
if v, ok := CmdMap.Load(key); ok {
|
||
return v.(*exec.Cmd)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func RestJobFromTask(task *models.Task, serverId int) (*Job, error) {
|
||
|
||
if task.Id < 1 {
|
||
return nil, fmt.Errorf("ToJob: 缺少id")
|
||
}
|
||
|
||
if task.ServerIds == "" {
|
||
return nil, fmt.Errorf("任务执行失败,找不到执行的服务器")
|
||
}
|
||
|
||
job := ResetCommandJob(task.Id, serverId, task.TaskName, task.Command)
|
||
job.Task = task
|
||
job.Concurrent = task.Concurrent == 1
|
||
job.ServerId = serverId
|
||
job.ServerName = "执行器"
|
||
|
||
return job, nil
|
||
}
|
||
|
||
func ResetCommandJob(id int, serverId int, name string, command string) *Job {
|
||
job := &Job{
|
||
Id: id,
|
||
Name: name,
|
||
}
|
||
|
||
job.JobKey = libs.JobKey(id, serverId)
|
||
job.RunFunc = func(timeout time.Duration) (jobResult *JobResult) {
|
||
bufOut := new(bytes.Buffer)
|
||
bufErr := new(bytes.Buffer)
|
||
//cmd := exec.Command("/bin/bash", "-c", command)
|
||
var cmd *exec.Cmd
|
||
if runtime.GOOS == "windows" {
|
||
cmd = exec.Command("CMD", "/C", command)
|
||
} else {
|
||
cmd = exec.Command("sh", "-c", command)
|
||
}
|
||
cmd.Stdout = bufOut
|
||
cmd.Stderr = bufErr
|
||
cmd.Start()
|
||
err, isTimeout := runCmdWithTimeout(cmd, timeout)
|
||
|
||
jobResult = new(JobResult)
|
||
jobResult.ErrMsg = bufErr.String()
|
||
jobResult.OutMsg = bufOut.String()
|
||
jobResult.IsOk = true
|
||
if err != nil {
|
||
jobResult.IsOk = false
|
||
}
|
||
|
||
jobResult.IsTimeout = isTimeout
|
||
|
||
return
|
||
}
|
||
return job
|
||
}
|
||
func runCmdWithTimeout(cmd *exec.Cmd, timeout time.Duration) (error, bool) {
|
||
done := make(chan error)
|
||
go func() {
|
||
done <- cmd.Wait()
|
||
}()
|
||
|
||
var err error
|
||
select {
|
||
case <-time.After(timeout):
|
||
logs.Warn(fmt.Sprintf("任务执行时间超过%d秒,进程将被强制杀掉: %d", int(timeout/time.Second), cmd.Process.Pid))
|
||
go func() {
|
||
<-done // 读出上面的goroutine数据,避免阻塞导致无法退出
|
||
}()
|
||
if err = cmd.Process.Kill(); err != nil {
|
||
logs.Error(fmt.Sprintf("进程无法杀掉: %d, 错误信息: %s", cmd.Process.Pid, err))
|
||
}
|
||
return err, true
|
||
case err = <-done:
|
||
return err, false
|
||
}
|
||
}
|
||
|
||
func Run(j *Job) *JobResult {
|
||
|
||
defer func() {
|
||
if err := recover(); err != nil {
|
||
logs.Error(err, "\n", string(debug.Stack()))
|
||
}
|
||
}()
|
||
|
||
logs.Debug(fmt.Sprintf("开始执行任务: %d", j.JobKey))
|
||
|
||
j.Status++
|
||
defer func() {
|
||
j.Status--
|
||
}()
|
||
|
||
timeout := time.Duration(time.Hour * 24)
|
||
if j.Task.Timeout > 0 {
|
||
timeout = time.Second * time.Duration(j.Task.Timeout)
|
||
}
|
||
|
||
return j.RunFunc(timeout)
|
||
}
|