Files
ppgo_job/agent/server/job.go
2019-07-06 17:05:19 +08:00

140 lines
3.1 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/************************************************************
** @Description: job
** @Author: george hao
** @Date: 2019-06-24 15:14
** @Last Modified by: george hao
** @Last Modified time: 2019-06-24 15:14
*************************************************************/
package server
import (
"bytes"
"fmt"
"github.com/astaxie/beego/logs"
. "github.com/george518/PPGo_Job/jobs"
"github.com/george518/PPGo_Job/libs"
"github.com/george518/PPGo_Job/models"
"os/exec"
"runtime"
"runtime/debug"
"sync"
"time"
)
//执行句柄map
var CmdMap sync.Map
func SetCmdMap(key string, cmd *exec.Cmd) {
if _, ok := CmdMap.Load(key); ok {
Counter.Store(key, cmd)
}
}
func GetCmdMap(key string) *exec.Cmd {
if v, ok := CmdMap.Load(key); ok {
return v.(*exec.Cmd)
}
return nil
}
func RestJobFromTask(task *models.Task, serverId int) (*Job, error) {
if task.Id < 1 {
return nil, fmt.Errorf("ToJob: 缺少id")
}
if task.ServerIds == "" {
return nil, fmt.Errorf("任务执行失败,找不到执行的服务器")
}
job := ResetCommandJob(task.Id, serverId, task.TaskName, task.Command)
job.Task = task
job.Concurrent = task.Concurrent == 1
job.ServerId = serverId
job.ServerName = "执行器"
return job, nil
}
func ResetCommandJob(id int, serverId int, name string, command string) *Job {
job := &Job{
Id: id,
Name: name,
}
job.JobKey = libs.JobKey(id, serverId)
job.RunFunc = func(timeout time.Duration) (jobResult *JobResult) {
bufOut := new(bytes.Buffer)
bufErr := new(bytes.Buffer)
//cmd := exec.Command("/bin/bash", "-c", command)
var cmd *exec.Cmd
if runtime.GOOS == "windows" {
cmd = exec.Command("CMD", "/C", command)
} else {
cmd = exec.Command("sh", "-c", command)
}
cmd.Stdout = bufOut
cmd.Stderr = bufErr
cmd.Start()
err, isTimeout := runCmdWithTimeout(cmd, timeout)
jobResult = new(JobResult)
jobResult.ErrMsg = bufErr.String()
jobResult.OutMsg = bufOut.String()
jobResult.IsOk = true
if err != nil {
jobResult.IsOk = false
}
jobResult.IsTimeout = isTimeout
return
}
return job
}
func runCmdWithTimeout(cmd *exec.Cmd, timeout time.Duration) (error, bool) {
done := make(chan error)
go func() {
done <- cmd.Wait()
}()
var err error
select {
case <-time.After(timeout):
logs.Warn(fmt.Sprintf("任务执行时间超过%d秒进程将被强制杀掉: %d", int(timeout/time.Second), cmd.Process.Pid))
go func() {
<-done // 读出上面的goroutine数据避免阻塞导致无法退出
}()
if err = cmd.Process.Kill(); err != nil {
logs.Error(fmt.Sprintf("进程无法杀掉: %d, 错误信息: %s", cmd.Process.Pid, err))
}
return err, true
case err = <-done:
return err, false
}
}
func Run(j *Job) *JobResult {
defer func() {
if err := recover(); err != nil {
logs.Error(err, "\n", string(debug.Stack()))
}
}()
logs.Debug(fmt.Sprintf("开始执行任务: %d", j.JobKey))
j.Status++
defer func() {
j.Status--
}()
timeout := time.Duration(time.Hour * 24)
if j.Task.Timeout > 0 {
timeout = time.Second * time.Duration(j.Task.Timeout)
}
return j.RunFunc(timeout)
}