支持多台服务器同时执行任务+优化页面

一个定时任务可以允许多台服务器同时执行任务,优化页面关闭并刷新列表页
This commit is contained in:
georgehao
2019-04-16 23:40:07 +08:00
parent 07aff7ee72
commit 830d49ee1c
24 changed files with 391 additions and 137 deletions

View File

@@ -32,7 +32,7 @@ func AddJob(spec string, job *Job) bool {
lock.Lock()
defer lock.Unlock()
if GetEntryById(job.id) != nil {
if GetEntryById(job.jobKey) != nil {
return false
}
err := mainCron.AddJob(spec, job)
@@ -40,13 +40,14 @@ func AddJob(spec string, job *Job) bool {
beego.Error("AddJob: ", err.Error())
return false
}
//fmt.Println(job)
return true
}
func RemoveJob(id int) {
func RemoveJob(jobKey int) {
mainCron.RemoveJob(func(e *cron.Entry) bool {
if v, ok := e.Job.(*Job); ok {
if v.id == id {
if v.jobKey == jobKey {
return true
}
}
@@ -54,11 +55,11 @@ func RemoveJob(id int) {
})
}
func GetEntryById(id int) *cron.Entry {
func GetEntryById(jobKey int) *cron.Entry {
entries := mainCron.Entries()
for _, e := range entries {
if v, ok := e.Job.(*Job); ok {
if v.id == id {
if v.jobKey == jobKey {
return e
}
}

View File

@@ -18,12 +18,16 @@ import (
func InitJobs() {
list, _ := models.TaskGetList(1, 1000000, "status", 1)
for _, task := range list {
job, err := NewJobFromTask(task)
jobs, err := NewJobFromTask(task)
if err != nil {
beego.Error("InitJobs:", err.Error())
continue
}
AddJob(task.CronSpec, job)
for _, job := range jobs {
AddJob(task.CronSpec, job)
}
}
}

View File

@@ -20,19 +20,22 @@ import (
"strconv"
"strings"
"encoding/json"
"github.com/astaxie/beego"
"github.com/axgle/mahonia"
"github.com/george518/PPGo_Job/models"
"github.com/george518/PPGo_Job/notify"
"golang.org/x/crypto/ssh"
"encoding/json"
"github.com/axgle/mahonia"
"github.com/pkg/errors"
"github.com/linxiaozhi/go-telnet"
"github.com/pkg/errors"
"golang.org/x/crypto/ssh"
)
type Job struct {
id int // 任务ID
jobKey int // jobId = id*10000+serverId
id int // taskID
logId int64 // 日志记录ID
serverId int //服务器信息
serverName string //服务器名称
name string // 任务名称
task *models.Task // 任务对象
runFunc func(time.Duration) (string, string, error, bool) // 执行函数
@@ -40,51 +43,79 @@ type Job struct {
Concurrent bool // 同一个任务是否允许并行执行
}
func NewJobFromTask(task *models.Task) (*Job, error) {
func NewJobFromTask(task *models.Task) ([]*Job, error) {
if task.Id < 1 {
return nil, fmt.Errorf("ToJob: 缺少id")
}
//本地程序执行
if task.ServerId == 0 {
job := NewCommandJob(task.Id, task.TaskName, task.Command)
job.task = task
job.Concurrent = task.Concurrent == 1
return job, nil
if task.ServerIds == "" {
return nil, fmt.Errorf("任务执行失败,找不到执行的服务器")
}
server, _ := models.TaskServerGetById(task.ServerId)
if server.ConnectionType == 0 {
if server.Type == 0 {
//密码验证登录服务器
job := RemoteCommandJobByPassword(task.Id, task.TaskName, task.Command, server)
job.task = task
job.Concurrent = task.Concurrent == 1
return job, nil
}
TaskServerIdsArr := strings.Split(task.ServerIds, ",")
job := RemoteCommandJob(task.Id, task.TaskName, task.Command, server)
job.task = task
job.Concurrent = task.Concurrent == 1
return job, nil
} else if server.ConnectionType == 1 {
if server.Type == 0 {
//密码验证登录服务器
job := RemoteCommandJobByTelnetPassword(task.Id, task.TaskName, task.Command, server)
jobArr := make([]*Job, 0)
for _, server_id := range TaskServerIdsArr {
if server_id == "0" {
//本地执行
job := NewCommandJob(task.Id, 0, task.TaskName, task.Command)
job.task = task
job.Concurrent = task.Concurrent == 1
return job, nil
job.serverId = 0
job.serverName = "本地服务器"
jobArr = append(jobArr, job)
} else {
server_id_int, _ := strconv.Atoi(server_id)
//远程执行
server, _ := models.TaskServerGetById(server_id_int)
if server.Status == 1 {
fmt.Println("服务器已禁用")
continue
}
if server.ConnectionType == 0 {
if server.Type == 0 {
//密码验证登录服务器
job := RemoteCommandJobByPassword(task.Id, server_id_int, task.TaskName, task.Command, server)
job.task = task
job.Concurrent = task.Concurrent == 1
job.serverId = server_id_int
job.serverName = server.ServerName
jobArr = append(jobArr, job)
} else {
job := RemoteCommandJob(task.Id, server_id_int, task.TaskName, task.Command, server)
job.task = task
job.Concurrent = task.Concurrent == 1
job.serverId = server_id_int
job.serverName = server.ServerName
jobArr = append(jobArr, job)
}
} else if server.ConnectionType == 1 {
if server.Type == 0 {
//密码验证登录服务器
job := RemoteCommandJobByTelnetPassword(task.Id, server_id_int, task.TaskName, task.Command, server)
job.task = task
job.Concurrent = task.Concurrent == 1
job.serverId = server_id_int
job.serverName = server.ServerName
jobArr = append(jobArr, job)
}
}
}
}
return nil, fmt.Errorf("未知ConnectionType")
return jobArr, nil
}
func NewCommandJob(id int, name string, command string) *Job {
func NewCommandJob(id int, serverId int, name string, command string) *Job {
job := &Job{
id: id,
name: name,
}
job.jobKey = jobKey(id, serverId)
job.runFunc = func(timeout time.Duration) (string, string, error, bool) {
bufOut := new(bytes.Buffer)
bufErr := new(bytes.Buffer)
@@ -106,11 +137,15 @@ func NewCommandJob(id int, name string, command string) *Job {
}
//远程执行任务 密钥验证
func RemoteCommandJob(id int, name string, command string, servers *models.TaskServer) *Job {
func RemoteCommandJob(id int, serverId int, name string, command string, servers *models.TaskServer) *Job {
job := &Job{
id: id,
name: name,
id: id,
name: name,
serverId: serverId,
}
job.jobKey = jobKey(id, serverId)
job.runFunc = func(timeout time.Duration) (string, string, error, bool) {
key, err := ioutil.ReadFile(servers.PrivateKeySrc)
@@ -166,7 +201,7 @@ func RemoteCommandJob(id int, name string, command string, servers *models.TaskS
return job
}
func RemoteCommandJobByPassword(id int, name string, command string, servers *models.TaskServer) *Job {
func RemoteCommandJobByPassword(id int, serverId int, name string, command string, servers *models.TaskServer) *Job {
var (
auth []ssh.AuthMethod
addr string
@@ -177,9 +212,11 @@ func RemoteCommandJobByPassword(id int, name string, command string, servers *mo
)
job := &Job{
id: id,
name: name,
id: id,
name: name,
serverId: serverId,
}
job.jobKey = jobKey(id, serverId)
job.runFunc = func(timeout time.Duration) (string, string, error, bool) {
// get auth method
auth = make([]ssh.AuthMethod, 0)
@@ -223,12 +260,15 @@ func RemoteCommandJobByPassword(id int, name string, command string, servers *mo
return job
}
func RemoteCommandJobByTelnetPassword(id int, name string, command string, servers *models.TaskServer) *Job {
func RemoteCommandJobByTelnetPassword(id int, serverId int, name string, command string, servers *models.TaskServer) *Job {
job := &Job{
id: id,
name: name,
id: id,
name: name,
serverId: serverId,
}
job.jobKey = jobKey(id, serverId)
job.runFunc = func(timeout time.Duration) (string, string, error, bool) {
addr := fmt.Sprintf("%s:%d", servers.ServerIp, servers.Port)
@@ -309,7 +349,7 @@ func (j *Job) GetLogId() int64 {
func (j *Job) Run() {
if !j.Concurrent && j.status > 0 {
beego.Warn(fmt.Sprintf("任务[%d]上一次执行尚未结束,本次被忽略。", j.id))
beego.Warn(fmt.Sprintf("任务[%d]上一次执行尚未结束,本次被忽略。", j.jobKey))
return
}
@@ -326,7 +366,7 @@ func (j *Job) Run() {
}()
}
beego.Debug(fmt.Sprintf("开始执行任务: %d", j.id))
beego.Debug(fmt.Sprintf("开始执行任务: %d", j.jobKey))
j.status++
defer func() {
@@ -344,6 +384,8 @@ func (j *Job) Run() {
// 插入日志
log := new(models.TaskLog)
log.TaskId = j.id
log.ServerId = j.serverId
log.ServerName = j.serverName
log.Output = cmdOut
log.Error = cmdErr
log.ProcessTime = int(ut)
@@ -536,3 +578,8 @@ func gbkAsUtf8(str string) string {
_, resBytes, _ := desDecoder.Translate([]byte(resStr), true)
return string(resBytes)
}
//任务识别码
func jobKey(taskId, serverId int) int {
return taskId*10000 + serverId
}