diff --git a/actuator/main.go b/actuator/main.go
new file mode 100644
index 0000000..9862687
--- /dev/null
+++ b/actuator/main.go
@@ -0,0 +1,8 @@
+package main
+
+//任务执行器
+//
+
+func main() {
+
+}
diff --git a/agent/common/protocol.go b/agent/common/protocol.go
new file mode 100644
index 0000000..0068958
--- /dev/null
+++ b/agent/common/protocol.go
@@ -0,0 +1,25 @@
+/************************************************************
+** @Description: common
+** @Author: george hao
+** @Date: 2018-11-29 11:14
+** @Last Modified by: george hao
+** @Last Modified time: 2018-11-29 11:14
+*************************************************************/
+package common
+
+//配置开始 注释查看配置文件
+type Conf struct {
+ Version string
+ AppMode string
+ LogLevel string
+ ServerName string
+ ServerId int
+ TcpPort int
+ TcpIp string
+ GroupId string
+ RegisterUrl string
+ UpdateStatusUrl string
+ IpType int
+}
+
+var ExitChan = make(chan int, 1)
diff --git a/agent/config/conf.ini b/agent/config/conf.ini
new file mode 100644
index 0000000..7d48f0a
--- /dev/null
+++ b/agent/config/conf.ini
@@ -0,0 +1,24 @@
+# GOLBAL
+# dev prod
+AppMode = dev
+Version = 1.0.0
+# ALL,DEBUG,INFO,NOTICE,WARN,ERROR,FATAL
+LogLevel = ALL
+# 执行器配置
+# auto-自动起名,或者自己起名
+ServerName = agent-10.32.40.165-1564
+# 启动后回写
+ServerId = 7
+# 端口,必须配置!!
+TcpPort = 1564
+# auto-自动获取
+TcpIp = 10.32.40.165
+# Ip地址是外网还是内网,1-外网,0-内网,若填写TcpIp则本项配置无意义
+IpType = 0
+# 添加的执行器属于分组Id,默认为1
+GroupId = 1
+
+# 以下配置必填,地址格式:http://yourdomain/server/apisave
+RegisterUrl = http://localhost:8081/server/apisave
+UpdateStatusUrl = http://localhost:8081/server/apistatus
+
diff --git a/agent/main.go b/agent/main.go
new file mode 100644
index 0000000..330b0bc
--- /dev/null
+++ b/agent/main.go
@@ -0,0 +1,59 @@
+package main
+
+import (
+ "flag"
+ "github.com/george518/PPGo_Job/agent/server"
+ "log"
+ "runtime"
+)
+
+//文件配置路径
+var configFilePath string
+
+func initArgs() {
+ //server -c ./configpath
+ //defaultPath := "/Users/haodaquan/golang/src/github.com/george518/PPGo_Job/agent/config/conf.ini"
+ defaultPath := "./config/conf.ini"
+ flag.StringVar(&configFilePath, "c", defaultPath, "config file path request")
+ flag.Parse()
+}
+
+func initEnv() {
+ runtime.GOMAXPROCS(runtime.NumCPU())
+}
+
+func main() {
+ var err error
+
+ //初始化线程
+ initEnv()
+
+ //配置文件路径
+ initArgs()
+
+ //加载配置
+ if err = server.InitConfig(configFilePath); err != nil {
+ goto ERR
+ }
+
+ server.NLog("INFO", "配置文件读取完毕...")
+
+ //应用关闭监控
+ server.ListenSignal()
+
+ //自动注册
+ if err = server.Register(); err != nil {
+ goto ERR
+ }
+
+ server.NLog("INFO", "自动注册完成...")
+
+ server.NLog("INFO", "agent is running...")
+ //监听
+ if err = server.RpcRun(); err != nil {
+ goto ERR
+ }
+
+ERR:
+ log.Fatal(err.Error())
+}
diff --git a/agent/run.sh b/agent/run.sh
new file mode 100644
index 0000000..ec87806
--- /dev/null
+++ b/agent/run.sh
@@ -0,0 +1,30 @@
+#!/bin/bash
+# @Author: haodaquan
+# @Date: 2017-06-29 17:44:45
+# @Last Modified by: haodaquan
+# @Last Modified time: 2019-07-03 17:44:45
+
+
+case $1 in
+ start)
+ nohup ./ppgo_agent 2>&1 >> info_agent.log 2>&1 /dev/null &
+ echo "服务已启动..."
+ sleep 1
+ ;;
+ stop)
+ killall ppgo_agent
+ echo "服务已停止..."
+ sleep 1
+ ;;
+ restart)
+ killall ppgo_agent
+ sleep 1
+ nohup ./ppgo_agent 2>&1 >> info_agent.log 2>&1 /dev/null &
+ echo "服务已重启..."
+ sleep 1
+ ;;
+ *)
+ echo "$0 {start|stop|restart}"
+ exit 4
+ ;;
+esac
\ No newline at end of file
diff --git a/agent/server/config.go b/agent/server/config.go
new file mode 100644
index 0000000..8d6b7c8
--- /dev/null
+++ b/agent/server/config.go
@@ -0,0 +1,39 @@
+/************************************************************
+** @Description: server
+** @Author: george hao
+** @Date: 2018-11-29 11:13
+** @Last Modified by: george hao
+** @Last Modified time: 2018-11-29 11:13
+*************************************************************/
+package server
+
+import (
+ "github.com/george518/PPGo_Job/agent/common"
+ "github.com/go-ini/ini"
+)
+
+var C = new(common.Conf)
+var ConfPath string
+
+func InitConfig(path string) error {
+
+ Cfg, err := ini.Load(path)
+ if err != nil {
+ return err
+ }
+
+ ConfPath = path
+ err = Cfg.MapTo(C)
+ return err
+}
+
+func SaveConfig(key string, value string) error {
+ Cfg, err := ini.Load(ConfPath)
+ if err != nil {
+ return err
+ }
+ Cfg.Section("").Key(key).SetValue(value)
+ Cfg.SaveTo(ConfPath)
+ InitConfig(ConfPath)
+ return nil
+}
diff --git a/agent/server/job.go b/agent/server/job.go
new file mode 100644
index 0000000..4bd9914
--- /dev/null
+++ b/agent/server/job.go
@@ -0,0 +1,139 @@
+/************************************************************
+** @Description: job
+** @Author: george hao
+** @Date: 2019-06-24 15:14
+** @Last Modified by: george hao
+** @Last Modified time: 2019-06-24 15:14
+*************************************************************/
+package server
+
+import (
+ "bytes"
+ "fmt"
+ "github.com/astaxie/beego/logs"
+ . "github.com/george518/PPGo_Job/jobs"
+ "github.com/george518/PPGo_Job/libs"
+ "github.com/george518/PPGo_Job/models"
+ "os/exec"
+ "runtime"
+ "runtime/debug"
+ "sync"
+ "time"
+)
+
+//执行句柄map
+var CmdMap sync.Map
+
+func SetCmdMap(key string, cmd *exec.Cmd) {
+ if _, ok := CmdMap.Load(key); ok {
+ Counter.Store(key, cmd)
+ }
+}
+
+func GetCmdMap(key string) *exec.Cmd {
+ if v, ok := CmdMap.Load(key); ok {
+ return v.(*exec.Cmd)
+ }
+
+ return nil
+}
+
+func RestJobFromTask(task *models.Task, serverId int) (*Job, error) {
+
+ if task.Id < 1 {
+ return nil, fmt.Errorf("ToJob: 缺少id")
+ }
+
+ if task.ServerIds == "" {
+ return nil, fmt.Errorf("任务执行失败,找不到执行的服务器")
+ }
+
+ job := ResetCommandJob(task.Id, serverId, task.TaskName, task.Command)
+ job.Task = task
+ job.Concurrent = task.Concurrent == 1
+ job.ServerId = serverId
+ job.ServerName = "执行器"
+
+ return job, nil
+}
+
+func ResetCommandJob(id int, serverId int, name string, command string) *Job {
+ job := &Job{
+ Id: id,
+ Name: name,
+ }
+
+ job.JobKey = libs.JobKey(id, serverId)
+ job.RunFunc = func(timeout time.Duration) (jobResult *JobResult) {
+ bufOut := new(bytes.Buffer)
+ bufErr := new(bytes.Buffer)
+ //cmd := exec.Command("/bin/bash", "-c", command)
+ var cmd *exec.Cmd
+ if runtime.GOOS == "windows" {
+ cmd = exec.Command("CMD", "/C", command)
+ } else {
+ cmd = exec.Command("sh", "-c", command)
+ }
+ cmd.Stdout = bufOut
+ cmd.Stderr = bufErr
+ cmd.Start()
+ err, isTimeout := runCmdWithTimeout(cmd, timeout)
+
+ jobResult = new(JobResult)
+ jobResult.ErrMsg = libs.GbkAsUtf8(bufErr.String())
+ jobResult.OutMsg = libs.GbkAsUtf8(bufOut.String())
+ jobResult.IsOk = true
+ if err != nil {
+ jobResult.IsOk = false
+ }
+
+ jobResult.IsTimeout = isTimeout
+
+ return
+ }
+ return job
+}
+func runCmdWithTimeout(cmd *exec.Cmd, timeout time.Duration) (error, bool) {
+ done := make(chan error)
+ go func() {
+ done <- cmd.Wait()
+ }()
+
+ var err error
+ select {
+ case <-time.After(timeout):
+ logs.Warn(fmt.Sprintf("任务执行时间超过%d秒,进程将被强制杀掉: %d", int(timeout/time.Second), cmd.Process.Pid))
+ go func() {
+ <-done // 读出上面的goroutine数据,避免阻塞导致无法退出
+ }()
+ if err = cmd.Process.Kill(); err != nil {
+ logs.Error(fmt.Sprintf("进程无法杀掉: %d, 错误信息: %s", cmd.Process.Pid, err))
+ }
+ return err, true
+ case err = <-done:
+ return err, false
+ }
+}
+
+func Run(j *Job) *JobResult {
+
+ defer func() {
+ if err := recover(); err != nil {
+ logs.Error(err, "\n", string(debug.Stack()))
+ }
+ }()
+
+ logs.Debug(fmt.Sprintf("开始执行任务: %d", j.JobKey))
+
+ j.Status++
+ defer func() {
+ j.Status--
+ }()
+
+ timeout := time.Duration(time.Hour * 24)
+ if j.Task.Timeout > 0 {
+ timeout = time.Second * time.Duration(j.Task.Timeout)
+ }
+
+ return j.RunFunc(timeout)
+}
diff --git a/agent/server/logs.go b/agent/server/logs.go
new file mode 100644
index 0000000..5ed3142
--- /dev/null
+++ b/agent/server/logs.go
@@ -0,0 +1,41 @@
+/************************************************************
+** @Description: log
+** @Author: haodaquan
+** @Date: 2018-08-22 23:00
+** @Last Modified by: haodaquan
+** @Last Modified time: 2018-08-22 23:00
+*************************************************************/
+package server
+
+import (
+ "fmt"
+ "log"
+ "net/http"
+ "strings"
+ "time"
+)
+
+var Env string
+
+func init() {
+ log.SetFlags(log.LstdFlags | log.Lshortfile)
+}
+
+//http相关
+func WriteLog(r *http.Request, t time.Time, match string, pattern string) {
+
+ if C.AppMode != "prod" {
+ d := time.Now().Sub(t)
+ l := fmt.Sprintf("[ACCESS] | % -10s | % -40s | % -16s | % -10s | % -40s |",
+ r.Method, r.URL.Path, d.String(), match, pattern)
+ log.Println(l)
+ }
+}
+
+//系统运行相关
+func NLog(level string, value ...interface{}) {
+ if strings.Contains(C.LogLevel, level) || C.LogLevel == "ALL" {
+ log.Println("["+level+"]", value)
+ return
+ }
+}
diff --git a/agent/server/notify.go b/agent/server/notify.go
new file mode 100644
index 0000000..dfa0c70
--- /dev/null
+++ b/agent/server/notify.go
@@ -0,0 +1,104 @@
+/************************************************************
+** @Description: notify
+** @Author: george hao
+** @Date: 2019-06-26 15:17
+** @Last Modified by: george hao
+** @Last Modified time: 2019-06-26 15:17
+*************************************************************/
+package server
+
+import (
+ "encoding/json"
+ "fmt"
+ "github.com/george518/PPGo_Job/libs"
+ "strconv"
+)
+
+//启动时注册
+func Register() error {
+ //获取本机ip以及端口 todo ip合法性判断
+ if C.TcpIp == "auto" {
+ tcpIp := libs.GetHostIp(C.IpType)
+ if tcpIp == "" {
+ return fmt.Errorf("无法获取本机IP,请手工在配置文件里设置")
+ }
+ SaveConfig("TcpIp", tcpIp)
+ }
+ param := make(map[string]string, 0)
+ if C.ServerName == "auto" {
+ serverName := "agent-" + C.TcpIp + "-" + strconv.Itoa(C.TcpPort)
+ SaveConfig("ServerName", serverName)
+ }
+
+ param["server_ip"] = C.TcpIp
+ param["port"] = strconv.Itoa(C.TcpPort)
+ param["server_name"] = C.ServerName
+ param["detail"] = "自动注册执行器"
+ param["connection_type"] = "2"
+ param["group_id"] = C.GroupId
+
+ if C.RegisterUrl == "" {
+ return fmt.Errorf("自动注册地址配置错误")
+ }
+ body, err := libs.HttpGet(C.RegisterUrl, param)
+ if err != nil {
+ return err
+ }
+
+ m := make(map[string]interface{})
+ err = json.Unmarshal([]byte(body), &m)
+ if err != nil {
+ return err
+ }
+
+ if _, ok := m["status"]; ok {
+ if m["status"] == float64(0) {
+ //回写serverId
+ serverId := int(m["message"].(float64))
+ SaveConfig("ServerId", strconv.Itoa(serverId))
+ return nil
+ } else {
+ return fmt.Errorf("自动注册失败:%v", m["message"])
+ }
+ }
+
+ return fmt.Errorf("自动注册失败")
+}
+
+//程序异常退出的通知
+func Close() error {
+
+ param := make(map[string]string, 0)
+ param["server_ip"] = C.TcpIp
+ param["port"] = strconv.Itoa(C.TcpPort)
+ param["status"] = "1"
+
+ if C.UpdateStatusUrl == "" {
+ return fmt.Errorf("执行器退出通知异常,请到系统中修改状态")
+ }
+ body, err := libs.HttpGet(C.UpdateStatusUrl, param)
+ if err != nil {
+ return err
+ }
+
+ m := make(map[string]interface{})
+ err = json.Unmarshal([]byte(body), &m)
+ if err != nil {
+ return err
+ }
+
+ if _, ok := m["status"]; ok {
+ if m["status"] == float64(0) {
+ return nil
+ } else {
+ return fmt.Errorf("执行器退出通知异常:%v", m["message"])
+ }
+ }
+
+ return fmt.Errorf("执行器退出通知异常:未知原因")
+}
+
+//心跳机制
+func Heartbeat() error {
+ return nil
+}
diff --git a/agent/server/service.go b/agent/server/service.go
new file mode 100644
index 0000000..62299ba
--- /dev/null
+++ b/agent/server/service.go
@@ -0,0 +1,38 @@
+/************************************************************
+** @Description: service
+** @Author: george hao
+** @Date: 2019-06-26 15:27
+** @Last Modified by: george hao
+** @Last Modified time: 2019-06-26 15:27
+*************************************************************/
+package server
+
+import (
+ "net"
+ "net/rpc"
+ "net/rpc/jsonrpc"
+ "strconv"
+)
+
+//初始化路由
+func init() {
+ rpc.RegisterName("RpcTask", new(RpcTask))
+ rpc.RegisterName("HeartBeat", new(RpcTask))
+}
+
+func RpcRun() error {
+
+ listener, err := net.Listen("tcp", ":"+strconv.Itoa(C.TcpPort))
+ if err != nil {
+ return err
+ }
+ for {
+ conn, err := listener.Accept()
+ if err != nil {
+ return err
+ }
+ //注意ServerCodec是个方法,不是接口
+ go rpc.ServeCodec(jsonrpc.NewServerCodec(conn))
+ //go rpc.ServeConn(conn)
+ }
+}
diff --git a/agent/server/signal.go b/agent/server/signal.go
new file mode 100644
index 0000000..9b2574a
--- /dev/null
+++ b/agent/server/signal.go
@@ -0,0 +1,41 @@
+/************************************************************
+** @Description: server
+** @Author: george hao
+** @Date: 2018-11-29 11:24
+** @Last Modified by: george hao
+** @Last Modified time: 2018-11-29 11:24
+*************************************************************/
+package server
+
+import (
+ "os"
+ "os/signal"
+ "syscall"
+)
+
+//监听关闭状态
+func ListenSignal() {
+ //创建监听退出chan
+ c := make(chan os.Signal)
+ //监听指定信号 ctrl+c kill
+ signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR1, syscall.SIGUSR2)
+
+ go func() {
+ for s := range c {
+ switch s {
+ case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR1, syscall.SIGUSR2:
+ NLog("NOTICE", " Ready to quit close type ", s)
+ //TODO 异常警报,汇报状态
+ if err := Close(); err != nil {
+ NLog("ERROR", err.Error())
+ } else {
+ NLog("NOTICE", " 执行器安全关闭...")
+ }
+ os.Exit(0)
+ default:
+ NLog("NOTICE", " close type ", s)
+ os.Exit(1)
+ }
+ }
+ }()
+}
diff --git a/agent/server/task.go b/agent/server/task.go
new file mode 100644
index 0000000..a1030aa
--- /dev/null
+++ b/agent/server/task.go
@@ -0,0 +1,47 @@
+/************************************************************
+** @Description: task
+** @Author: george hao
+** @Date: 2019-06-24 13:22
+** @Last Modified by: george hao
+** @Last Modified time: 2019-06-24 13:22
+*************************************************************/
+package server
+
+import (
+ "github.com/astaxie/beego/logs"
+ "github.com/george518/PPGo_Job/jobs"
+ "github.com/george518/PPGo_Job/models"
+)
+
+type RpcTask struct {
+}
+
+type RpcResult struct {
+ Status int
+ Message string
+}
+
+//Execute once
+func (r *RpcTask) RunTask(task *models.Task, Result *jobs.JobResult) error {
+ server_id := C.ServerId
+ job, err := RestJobFromTask(task, server_id)
+ if err != nil {
+ return nil
+ }
+ *Result = *(Run(job))
+ return nil
+}
+
+//Kill execution
+func (r *RpcTask) KillCommand(task models.Task, reply *RpcResult) error {
+ reply.Status = 200
+ reply.Message = "Ok kill " + task.TaskName
+ return nil
+}
+
+func (r *RpcTask) HeartBeat(ping string, reply *RpcResult) error {
+ reply.Status = 200
+ reply.Message = ping + " pong"
+ logs.Info(ping)
+ return nil
+}
diff --git a/agent/test/conf/conf.go b/agent/test/conf/conf.go
new file mode 100644
index 0000000..cb689a1
--- /dev/null
+++ b/agent/test/conf/conf.go
@@ -0,0 +1,28 @@
+/************************************************************
+** @Description: conf
+** @Author: george hao
+** @Date: 2019-06-27 09:49
+** @Last Modified by: george hao
+** @Last Modified time: 2019-06-27 09:49
+*************************************************************/
+package main
+
+import (
+ "github.com/astaxie/beego/logs"
+ "github.com/george518/PPGo_Job/agent/server"
+)
+
+func main() {
+ //获取配置,修改配置
+ loadconfig()
+}
+
+func loadconfig() {
+
+ path := "/Users/haodaquan/golang/src/github.com/george518/PPGo_Job/actuator/config/conf.ini"
+ server.InitConfig(path)
+ logs.Info(server.C.TcpIp, server.ConfPath)
+ server.SaveConfig("TcpIp", "10.32.33.22")
+ logs.Info(server.C.TcpIp, server.ConfPath)
+
+}
diff --git a/agent/test/ip/ip.go b/agent/test/ip/ip.go
new file mode 100644
index 0000000..b846eb1
--- /dev/null
+++ b/agent/test/ip/ip.go
@@ -0,0 +1,18 @@
+/************************************************************
+** @Description: ip
+** @Author: george hao
+** @Date: 2019-06-27 09:22
+** @Last Modified by: george hao
+** @Last Modified time: 2019-06-27 09:22
+*************************************************************/
+package main
+
+import (
+ "github.com/astaxie/beego/logs"
+ "github.com/george518/PPGo_Job/libs"
+)
+
+func main() {
+ logs.Info(libs.PublicIp())
+
+}
diff --git a/common/protocol.go b/common/protocol.go
new file mode 100644
index 0000000..5a31f8d
--- /dev/null
+++ b/common/protocol.go
@@ -0,0 +1,13 @@
+/************************************************************
+** @Description: protol
+** @Author: george hao
+** @Date: 2019-06-27 15:33
+** @Last Modified by: george hao
+** @Last Modified time: 2019-06-27 15:33
+*************************************************************/
+package common
+
+type RpcResult struct {
+ Status int
+ Message string
+}
diff --git a/conf/app.conf b/conf/app.conf
index 3848531..fe6b38c 100644
--- a/conf/app.conf
+++ b/conf/app.conf
@@ -1,9 +1,9 @@
AppName = PPGo_Job2
-HTTPPort = 8080
+HTTPPort = 8081
RunMode = dev
SessionOn = true
-version= V2.6
+version= V2.7
# 允许同时运行的任务数
jobs.pool = 1000
@@ -19,7 +19,7 @@ db.host = 127.0.0.1
db.user = root
db.password = "123456"
db.port = 3306
-db.name = ppgo_job2
+db.name = cron
db.prefix = pp_
db.timezone = Asia/Shanghai
diff --git a/controllers/common.go b/controllers/common.go
index 389c239..b068646 100644
--- a/controllers/common.go
+++ b/controllers/common.go
@@ -9,7 +9,6 @@ package controllers
import (
"github.com/astaxie/beego"
- "github.com/axgle/mahonia"
"github.com/george518/PPGo_Job/libs"
"github.com/george518/PPGo_Job/models"
"strconv"
@@ -100,7 +99,10 @@ func (self *BaseController) Auth() {
self.actionName != "loginin" &&
self.actionName != "apistart" &&
self.actionName != "apitask" &&
- self.actionName != "apipause") {
+ self.actionName != "apipause" &&
+ self.actionName != "apisave" &&
+ self.actionName != "apistatus" &&
+ self.actionName != "apiget") {
self.redirect(beego.URLFor("LoginController.Login"))
}
}
@@ -336,7 +338,7 @@ type serverList struct {
func serverLists(authStr string, adminId int) (sls []serverList) {
serverGroup := serverGroupLists(authStr, adminId)
Filters := make([]interface{}, 0)
- Filters = append(Filters, "status", 0)
+ Filters = append(Filters, "status__in", []int{0, 1})
Result, _ := models.TaskServerGetList(1, 1000, Filters...)
for k, v := range serverGroup {
@@ -354,16 +356,3 @@ func serverLists(authStr string, adminId int) (sls []serverList) {
}
return sls
}
-
-func gbkAsUtf8(str string) string {
- srcDecoder := mahonia.NewDecoder("gbk")
- desDecoder := mahonia.NewDecoder("utf-8")
- resStr := srcDecoder.ConvertString(str)
- _, resBytes, _ := desDecoder.Translate([]byte(resStr), true)
- return string(resBytes)
-}
-
-//任务识别码
-func jobKey(taskId, serverId int) int {
- return taskId*10000 + serverId
-}
diff --git a/controllers/login.go b/controllers/login.go
index 2e22cb8..c438b42 100644
--- a/controllers/login.go
+++ b/controllers/login.go
@@ -8,7 +8,6 @@
package controllers
import (
- "fmt"
"strconv"
"time"
@@ -43,7 +42,6 @@ func (self *LoginController) LoginIn() {
password := strings.TrimSpace(self.GetString("password"))
if username != "" && password != "" {
user, err := models.AdminGetByName(username)
- fmt.Println(user)
if err != nil || user.Password != libs.Md5([]byte(password+user.Salt)) {
self.ajaxMsg("帐号或密码错误", MSG_ERR)
} else if user.Status == -1 {
diff --git a/controllers/server.go b/controllers/server.go
index 71ef930..c4bb4ce 100644
--- a/controllers/server.go
+++ b/controllers/server.go
@@ -8,16 +8,12 @@
package controllers
import (
- "fmt"
+ "github.com/astaxie/beego/logs"
+ "github.com/george518/PPGo_Job/libs"
"github.com/george518/PPGo_Job/models"
- "golang.org/x/crypto/ssh"
- "io/ioutil"
- "net"
"strconv"
"strings"
"time"
- "github.com/linxiaozhi/go-telnet"
- "github.com/pkg/errors"
)
type ServerController struct {
@@ -25,13 +21,13 @@ type ServerController struct {
}
func (self *ServerController) List() {
- self.Data["pageTitle"] = "资源管理"
+ self.Data["pageTitle"] = "执行资源管理"
self.Data["serverGroup"] = serverGroupLists(self.serverGroups, self.userId)
self.display()
}
func (self *ServerController) Add() {
- self.Data["pageTitle"] = "新增服务器资源"
+ self.Data["pageTitle"] = "新增执行资源"
self.Data["serverGroup"] = serverGroupLists(self.serverGroups, self.userId)
self.display()
}
@@ -92,7 +88,7 @@ func (self *ServerController) GetServerByGroupId() {
}
func (self *ServerController) Edit() {
- self.Data["pageTitle"] = "编辑服务器资源"
+ self.Data["pageTitle"] = "编辑执行资源"
id, _ := self.GetInt("id", 0)
server, _ := models.TaskServerGetById(id)
@@ -136,12 +132,12 @@ func (self *ServerController) AjaxTestServer() {
if server.ConnectionType == 0 {
if server.Type == 0 {
//密码登录
- err = RemoteCommandByPassword(server)
+ err = libs.RemoteCommandByPassword(server)
}
if server.Type == 1 {
//密钥登录
- err = RemoteCommandByKey(server)
+ err = libs.RemoteCommandByKey(server)
}
if err != nil {
@@ -151,7 +147,7 @@ func (self *ServerController) AjaxTestServer() {
} else if server.ConnectionType == 1 {
if server.Type == 0 {
//密码登录
- err = RemoteCommandByTelnetPassword(server)
+ err = libs.RemoteCommandByTelnetPassword(server)
} else {
self.ajaxMsg("Telnet方式暂不支持密钥登陆!", MSG_ERR)
}
@@ -160,114 +156,19 @@ func (self *ServerController) AjaxTestServer() {
self.ajaxMsg(err.Error(), MSG_ERR)
}
self.ajaxMsg("Success", MSG_OK)
+ } else if server.ConnectionType == 2 {
+
+ if err := libs.RemoteAgent(server); err != nil {
+ self.ajaxMsg(err.Error(), MSG_ERR)
+ } else {
+ self.ajaxMsg("Success", MSG_OK)
+
+ }
}
self.ajaxMsg("未知连接方式", MSG_ERR)
}
-func RemoteCommandByTelnetPassword(servers *models.TaskServer) error {
-
- addr := fmt.Sprintf("%s:%d", servers.ServerIp, servers.Port)
- conn, err := gote.DialTimeout("tcp", addr, time.Second*10)
- if err != nil {
- return err
- }
-
- defer conn.Close()
-
- buf := make([]byte, 4096)
- _, err = conn.Read(buf)
- if err != nil {
- return err
- }
-
- _, err = conn.Write([]byte(servers.ServerAccount + "\r\n"))
- if err != nil {
- return err
- }
-
- _, err = conn.Read(buf)
- if err != nil {
- return err
- }
-
- _, err = conn.Write([]byte(servers.Password + "\r\n"))
- if err != nil {
- return err
- }
-
- _, err = conn.Read(buf)
- if err != nil {
- return err
- }
-
- str := gbkAsUtf8(string(buf[:]))
-
- if strings.Contains(str, ">") {
- return nil
- }
-
- return errors.Errorf("连接失败!")
-}
-
-func RemoteCommandByPassword(servers *models.TaskServer) error {
- var (
- auth []ssh.AuthMethod
- addr string
- clientConfig *ssh.ClientConfig
- )
-
- auth = make([]ssh.AuthMethod, 0)
- auth = append(auth, ssh.Password(servers.Password))
-
- clientConfig = &ssh.ClientConfig{
- User: servers.ServerAccount,
- Auth: auth,
- HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error {
- return nil
- },
- Timeout: 5 * time.Second,
- }
-
- addr = fmt.Sprintf("%s:%d", servers.ServerIp, servers.Port)
- client, err := ssh.Dial("tcp", addr, clientConfig)
- if err == nil {
- defer client.Close()
- }
- return err
-}
-
-func RemoteCommandByKey(servers *models.TaskServer) error {
- key, err := ioutil.ReadFile(servers.PrivateKeySrc)
- if err != nil {
- return err
- }
-
- signer, err := ssh.ParsePrivateKey(key)
- if err != nil {
- return err
- }
- addr := fmt.Sprintf("%s:%d", servers.ServerIp, servers.Port)
- config := &ssh.ClientConfig{
- User: servers.ServerAccount,
- Auth: []ssh.AuthMethod{
- // Use the PublicKeys method for remote authentication.
- ssh.PublicKeys(signer),
- },
- //HostKeyCallback: ssh.FixedHostKey(hostKey),
- HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error {
- return nil
- },
- Timeout: 5 * time.Second,
- }
-
- client, err := ssh.Dial("tcp", addr, config)
- if err == nil {
- client.Close()
- }
- return err
-}
-
func (self *ServerController) Copy() {
self.Data["pageTitle"] = "复制服务器资源"
@@ -347,9 +248,14 @@ func (self *ServerController) AjaxSave() {
func (self *ServerController) AjaxDel() {
id, _ := self.GetInt("id")
+
+ if id == 1 {
+ self.ajaxMsg("默认分组id=1,禁止删除", MSG_ERR)
+ }
+
server, _ := models.TaskServerGetById(id)
server.UpdateTime = time.Now().Unix()
- server.Status = 1
+ server.Status = 2
server.Id = id
//TODO 查询服务器是否用于定时任务
@@ -374,18 +280,19 @@ func (self *ServerController) Table() {
serverName := strings.TrimSpace(self.GetString("serverName"))
StatusText := []string{
- "正常",
- "禁用",
+ "",
+ "",
}
+ //
+ //loginType := [2]string{
+ // "密码",
+ // "密钥",
+ //}
- loginType := [2]string{
- "密码",
- "密钥",
- }
-
- connectionType := [2]string{
+ connectionType := [3]string{
"SSH",
"Telnet",
+ "Agent",
}
serverGroup := serverGroupLists(self.serverGroups, self.userId)
@@ -393,7 +300,8 @@ func (self *ServerController) Table() {
self.pageSize = limit
//查询条件
filters := make([]interface{}, 0)
- filters = append(filters, "status", 0)
+ ids := []int{0, 1}
+ filters = append(filters, "status__in", ids)
groupsIds := make([]int, 0)
if self.userId != 1 {
@@ -425,17 +333,128 @@ func (self *ServerController) Table() {
row := make(map[string]interface{})
row["id"] = v.Id
row["connection_type"] = connectionType[v.ConnectionType]
- row["server_name"] = v.ServerName
+ row["server_name"] = StatusText[v.Status] + " " + v.ServerName
row["detail"] = v.Detail
if serverGroup[v.GroupId] == "" {
v.GroupId = 0
}
+ row["ip_port"] = v.ServerIp + ":" + strconv.Itoa(v.Port)
row["group_name"] = serverGroup[v.GroupId]
- row["type"] = loginType[v.Type]
+ //row["type"] = loginType[v.Type]
row["status"] = v.Status
- row["status_text"] = StatusText[v.Status]
list[k] = row
}
self.ajaxList("成功", MSG_OK, count, list)
}
+
+//以下函数为执行器接口
+//注册
+func (self *ServerController) ApiSave() {
+ //唯一确定值 ip+port
+ serverIp := strings.TrimSpace(self.GetString("server_ip"))
+ port, _ := self.GetInt("port")
+
+ if serverIp == "" || port == 0 {
+ self.ajaxMsg("执行器和端口号必填", MSG_ERR)
+ }
+
+ defaultActName := "agent-" + serverIp + "-" + strconv.Itoa(port)
+
+ id := models.TaskServerForActuator(serverIp, port)
+ if id == 0 {
+ //新增
+ server := new(models.TaskServer)
+ server.ConnectionType, _ = self.GetInt("connection_type", 3)
+ server.ServerName = strings.TrimSpace(self.GetString("server_name", defaultActName))
+ server.ServerAccount = strings.TrimSpace(self.GetString("server_account", "agent"))
+ server.ServerOuterIp = strings.TrimSpace(self.GetString("server_outer_ip", ""))
+ server.ServerIp = strings.TrimSpace(self.GetString("server_ip"))
+ server.PrivateKeySrc = strings.TrimSpace(self.GetString("private_key_src", ""))
+ server.PublicKeySrc = strings.TrimSpace(self.GetString("public_key_src", ""))
+ server.Password = strings.TrimSpace(self.GetString("password", "agent"))
+
+ server.Detail = strings.TrimSpace(self.GetString("detail", ""))
+ server.Type, _ = self.GetInt("type", 0)
+ server.Port, _ = self.GetInt("port")
+ server.GroupId, _ = self.GetInt("group_id", 0)
+ server.Status = 0
+
+ server.CreateTime = time.Now().Unix()
+ server.UpdateTime = time.Now().Unix()
+ server.Status = 0
+ serverId, err := models.TaskServerAdd(server)
+ if err != nil {
+ self.ajaxMsg(err.Error(), MSG_ERR)
+ }
+ self.ajaxMsg(serverId, MSG_OK)
+ } else {
+ //修改状态
+ server, _ := models.TaskServerGetById(id)
+ server.UpdateTime = time.Now().Unix()
+ server.Status, _ = self.GetInt("status", 0)
+ if err := server.Update(); err != nil {
+ self.ajaxMsg(err.Error(), MSG_ERR)
+ }
+ self.ajaxMsg(id, MSG_OK)
+ }
+
+}
+
+//检测0-正常,1-异常,2-删除
+func (self *ServerController) ApiStatus() {
+ //唯一确定值 ip+port
+ serverId := strings.TrimSpace(self.GetString("server_ip"))
+ port, _ := self.GetInt("port")
+ status, _ := self.GetInt("status", 0)
+
+ if serverId == "" || port == 0 {
+ self.ajaxMsg("执行器和端口号必填", MSG_ERR)
+ }
+
+ id := models.TaskServerForActuator(serverId, port)
+ if id == 0 {
+ self.ajaxMsg("执行器不存在", MSG_ERR)
+ }
+
+ if status != 0 && status != 1 {
+ status = 0
+ }
+
+ server, _ := models.TaskServerGetById(id)
+ server.UpdateTime = time.Now().Unix()
+ server.Status = status
+ server.Id = id
+
+ logs.Info(server)
+
+ //TODO 查询执行器是否正在使用中
+ if err := server.Update(); err != nil {
+ self.ajaxMsg(err.Error(), MSG_ERR)
+ }
+ self.ajaxMsg(id, MSG_OK)
+}
+
+//获取 不检测执行器状态
+func (self *ServerController) ApiGet() {
+ //唯一确定值 ip+port
+ serverId := strings.TrimSpace(self.GetString("server_ip"))
+ port, _ := self.GetInt("port")
+
+ if serverId == "" || port == 0 {
+ self.ajaxMsg("执行器和端口号必填", MSG_ERR)
+ }
+
+ id := models.TaskServerForActuator(serverId, port)
+ if id == 0 {
+ self.ajaxMsg("执行器不存在", MSG_ERR)
+ }
+
+ server, err := models.TaskServerGetById(id)
+
+ if err != nil {
+ self.ajaxMsg(err.Error(), MSG_ERR)
+ } else {
+ self.ajaxMsg(server, MSG_OK)
+ }
+}
diff --git a/controllers/server_group.go b/controllers/server_group.go
index ad42408..20ab26c 100644
--- a/controllers/server_group.go
+++ b/controllers/server_group.go
@@ -11,8 +11,6 @@ import (
"strings"
"time"
- "fmt"
-
"strconv"
"github.com/astaxie/beego"
@@ -55,7 +53,6 @@ func (self *ServerGroupController) AjaxSave() {
servergroup_id, _ := self.GetInt("id")
- fmt.Println(servergroup_id)
if servergroup_id == 0 {
//新增
servergroup.CreateTime = time.Now().Unix()
diff --git a/controllers/task.go b/controllers/task.go
index c0ea96e..3b488b2 100644
--- a/controllers/task.go
+++ b/controllers/task.go
@@ -9,6 +9,7 @@ package controllers
import (
"fmt"
+ "github.com/george518/PPGo_Job/libs"
"strconv"
"strings"
"time"
@@ -115,11 +116,18 @@ func (self *TaskController) Copy() {
if err != nil {
self.ajaxMsg(err.Error(), MSG_ERR)
}
+
+ if task.Status == 1 {
+ self.ajaxMsg("运行状态无法编辑任务,请先暂停任务", MSG_ERR)
+ }
self.Data["task"] = task
+ self.Data["adminInfo"] = AllAdminInfo("")
+
// 分组列表
self.Data["taskGroup"] = taskGroupLists(self.taskGroups, self.userId)
self.Data["serverGroup"] = serverLists(self.serverGroups, self.userId)
+ self.Data["isAdmin"] = self.userId
var notifyUserIds []int
if task.NotifyUserIds != "0" {
notifyUserIdsStr := strings.Split(task.NotifyUserIds, ",")
@@ -128,7 +136,33 @@ func (self *TaskController) Copy() {
notifyUserIds = append(notifyUserIds, i)
}
}
+
self.Data["notify_user_ids"] = notifyUserIds
+
+ server_ids := strings.Split(task.ServerIds, ",")
+ var server_ids_arr []int
+ for _, sv := range server_ids {
+ i, _ := strconv.Atoi(sv)
+ server_ids_arr = append(server_ids_arr, i)
+ }
+
+ self.Data["service_ids"] = server_ids_arr
+
+ notifyTplList, _, err := models.NotifyTplGetByTplTypeList(task.NotifyType)
+ tplList := make([]map[string]interface{}, len(notifyTplList))
+
+ if err == nil {
+ for k, v := range notifyTplList {
+ row := make(map[string]interface{})
+ row["id"] = v.Id
+ row["tpl_name"] = v.TplName
+ row["tpl_type"] = v.TplType
+ tplList[k] = row
+ }
+ }
+
+ self.Data["notifyTpl"] = tplList
+
self.display()
}
@@ -157,12 +191,12 @@ func (self *TaskController) Detail() {
serverName := ""
if task.ServerIds == "0" {
- serverName = "本地服务器"
+ serverName = "本地服务器
"
} else {
serverIdSli := strings.Split(task.ServerIds, ",")
for _, v := range serverIdSli {
if v == "0" {
- serverName = "本地服务器 "
+ serverName = "本地服务器
"
}
}
servers, n := models.TaskServerGetByIds(task.ServerIds)
@@ -170,9 +204,9 @@ func (self *TaskController) Detail() {
for _, server := range servers {
fmt.Println(server.Status)
if server.Status != 0 {
- serverName += server.ServerName + "【无效】 "
+ serverName += server.ServerName + "
"
} else {
- serverName += server.ServerName + " "
+ serverName += server.ServerName + "
"
}
}
} else {
@@ -180,6 +214,12 @@ func (self *TaskController) Detail() {
}
}
+ //执行策略
+ self.Data["ServerType"] = "同时执行"
+ if task.ServerType == 1 {
+ self.Data["ServerType"] = "轮询执行"
+ }
+
//任务分组
groupName := "默认分组"
if task.GroupId > 0 {
@@ -242,6 +282,8 @@ func (self *TaskController) AjaxSave() {
task.Command = strings.TrimSpace(self.GetString("command"))
task.Timeout, _ = self.GetInt("timeout")
task.IsNotify, _ = self.GetInt("is_notify")
+ task.ServerType, _ = self.GetInt("server_type")
+
task.NotifyType, _ = self.GetInt("notify_type")
task.NotifyTplId, _ = self.GetInt("notify_tpl_id")
task.NotifyUserIds = strings.TrimSpace(self.GetString("notify_user_ids"))
@@ -287,6 +329,7 @@ func (self *TaskController) AjaxSave() {
task.CronSpec = strings.TrimSpace(self.GetString("cron_spec"))
task.Command = strings.TrimSpace(self.GetString("command"))
task.Timeout, _ = self.GetInt("timeout")
+ task.ServerType, _ = self.GetInt("server_type")
task.IsNotify, _ = self.GetInt("is_notify")
task.NotifyType, _ = self.GetInt("notify_type")
task.NotifyTplId, _ = self.GetInt("notify_tpl_id")
@@ -402,7 +445,7 @@ func (self *TaskController) AjaxPause() {
for _, server_id := range TaskServerIdsArr {
server_id_int, _ := strconv.Atoi(server_id)
- jobKey := jobKey(task.Id, server_id_int)
+ jobKey := libs.JobKey(task.Id, server_id_int)
jobs.RemoveJob(jobKey)
}
@@ -477,7 +520,7 @@ func (self *TaskController) AjaxBatchPause() {
for _, server_id := range TaskServerIdsArr {
server_id_int, _ := strconv.Atoi(server_id)
- jobKey := jobKey(task.Id, server_id_int)
+ jobKey := libs.JobKey(task.Id, server_id_int)
jobs.RemoveJob(jobKey)
}
if err == nil {
@@ -507,7 +550,7 @@ func (self *TaskController) AjaxBatchDel() {
for _, server_id := range TaskServerIdsArr {
server_id_int, _ := strconv.Atoi(server_id)
- jobKey := jobKey(task.Id, server_id_int)
+ jobKey := libs.JobKey(task.Id, server_id_int)
jobs.RemoveJob(jobKey)
}
models.TaskDel(id)
@@ -688,8 +731,16 @@ func (self *TaskController) Table() {
row["status"] = v.Status
row["pre_time"] = beego.Date(time.Unix(v.PrevTime, 0), "Y-m-d H:i:s")
row["execute_times"] = v.ExecuteTimes
+ row["cron_spec"] = v.CronSpec
+
+ TaskServerIdsArr := strings.Split(v.ServerIds, ",")
+ serverId := 0
+ if len(TaskServerIdsArr) > 1 {
+ serverId, _ = strconv.Atoi(TaskServerIdsArr[0])
+ }
+ jobskey := libs.JobKey(v.Id, serverId)
+ e := jobs.GetEntryById(jobskey)
- e := jobs.GetEntryById(v.Id)
if e != nil {
row["next_time"] = beego.Date(e.Next, "Y-m-d H:i:s")
row["prev_time"] = "-"
@@ -850,7 +901,7 @@ func (self *TaskController) ApiPause() {
for _, server_id := range TaskServerIdsArr {
server_id_int, _ := strconv.Atoi(server_id)
- jobKey := jobKey(task.Id, server_id_int)
+ jobKey := libs.JobKey(task.Id, server_id_int)
jobs.RemoveJob(jobKey)
}
diff --git a/controllers/task_log.go b/controllers/task_log.go
index f77cd1a..0de7962 100644
--- a/controllers/task_log.go
+++ b/controllers/task_log.go
@@ -75,7 +75,7 @@ func (self *TaskLogController) Table() {
for k, v := range result {
row := make(map[string]interface{})
row["id"] = v.Id
- row["task_id"] = jobKey(v.TaskId, v.ServerId)
+ row["task_id"] = libs.JobKey(v.TaskId, v.ServerId)
row["start_time"] = beego.Date(time.Unix(v.CreateTime, 0), "Y-m-d H:i:s")
row["process_time"] = float64(v.ProcessTime) / 1000
diff --git a/jobs/cron.go b/jobs/cron.go
index 79a96f7..65fdd2a 100644
--- a/jobs/cron.go
+++ b/jobs/cron.go
@@ -32,7 +32,7 @@ func AddJob(spec string, job *Job) bool {
lock.Lock()
defer lock.Unlock()
- if GetEntryById(job.jobKey) != nil {
+ if GetEntryById(job.JobKey) != nil {
return false
}
err := mainCron.AddJob(spec, job)
@@ -47,7 +47,7 @@ func AddJob(spec string, job *Job) bool {
func RemoveJob(jobKey int) {
mainCron.RemoveJob(func(e *cron.Entry) bool {
if v, ok := e.Job.(*Job); ok {
- if v.jobKey == jobKey {
+ if v.JobKey == jobKey {
return true
}
}
@@ -59,7 +59,7 @@ func GetEntryById(jobKey int) *cron.Entry {
entries := mainCron.Entries()
for _, e := range entries {
if v, ok := e.Job.(*Job); ok {
- if v.jobKey == jobKey {
+ if v.JobKey == jobKey {
return e
}
}
diff --git a/jobs/job.go b/jobs/job.go
index 5b8a5ac..a5536f7 100644
--- a/jobs/job.go
+++ b/jobs/job.go
@@ -9,11 +9,18 @@ package jobs
import (
"bytes"
+ "errors"
"fmt"
+ "github.com/astaxie/beego/logs"
+ "github.com/george518/PPGo_Job/libs"
+ "github.com/george518/PPGo_Job/models"
"io/ioutil"
"net"
+ "net/rpc"
+ "net/rpc/jsonrpc"
"os/exec"
"runtime/debug"
+ "sync"
"time"
"runtime"
@@ -22,25 +29,52 @@ import (
"encoding/json"
"github.com/astaxie/beego"
- "github.com/axgle/mahonia"
- "github.com/george518/PPGo_Job/models"
"github.com/george518/PPGo_Job/notify"
"github.com/linxiaozhi/go-telnet"
- "github.com/pkg/errors"
"golang.org/x/crypto/ssh"
)
type Job struct {
- jobKey int // jobId = id*10000+serverId
- id int // taskID
- logId int64 // 日志记录ID
- serverId int //服务器信息
- serverName string //服务器名称
- name string // 任务名称
- task *models.Task // 任务对象
- runFunc func(time.Duration) (string, string, error, bool) // 执行函数
- status int // 任务状态,大于0表示正在执行中
- Concurrent bool // 同一个任务是否允许并行执行
+ JobKey int // jobId = id*10000+serverId
+ Id int // taskID
+ LogId int64 // 日志记录ID
+ ServerId int // 执行器信息
+ ServerName string // 执行器名称
+ ServerType int // 执行器类型,2-agent 1-telnet 0-ssh
+ Name string // 任务名称
+ Task *models.Task // 任务对象
+ RunFunc func(time.Duration) *JobResult // 执行函数
+ Status int // 任务状态,大于0表示正在执行中
+ Concurrent bool // 同一个任务是否允许并行执行
+}
+
+type JobResult struct {
+ OutMsg string
+ ErrMsg string
+ IsOk bool
+ IsTimeout bool
+}
+
+//调度计数器
+var Counter sync.Map
+
+func GetCounter(key string) int {
+ if v, ok := Counter.LoadOrStore(key, 0); ok {
+ n := v.(int)
+ return n
+ }
+ return 0
+}
+
+func SetCounter(key string) {
+ if v, ok := Counter.Load(key); ok {
+ n := v.(int)
+ m := n + 1
+ if n > 1000 {
+ m = 0
+ }
+ Counter.Store(key, m)
+ }
}
func NewJobFromTask(task *models.Task) ([]*Job, error) {
@@ -53,24 +87,26 @@ func NewJobFromTask(task *models.Task) ([]*Job, error) {
}
TaskServerIdsArr := strings.Split(task.ServerIds, ",")
-
jobArr := make([]*Job, 0)
-
for _, server_id := range TaskServerIdsArr {
if server_id == "0" {
//本地执行
job := NewCommandJob(task.Id, 0, task.TaskName, task.Command)
- job.task = task
- job.Concurrent = task.Concurrent == 1
- job.serverId = 0
- job.serverName = "本地服务器"
+ job.Task = task
+ job.Concurrent = false
+ if task.Concurrent == 1 {
+ job.Concurrent = true
+ }
+ //job.Concurrent = task.Concurrent == 1
+ job.ServerId = 0
+ job.ServerName = "本地服务器"
jobArr = append(jobArr, job)
} else {
server_id_int, _ := strconv.Atoi(server_id)
//远程执行
server, _ := models.TaskServerGetById(server_id_int)
- if server.Status == 1 {
+ if server.Status == 2 {
fmt.Println("服务器已禁用")
continue
}
@@ -79,29 +115,54 @@ func NewJobFromTask(task *models.Task) ([]*Job, error) {
if server.Type == 0 {
//密码验证登录服务器
job := RemoteCommandJobByPassword(task.Id, server_id_int, task.TaskName, task.Command, server)
- job.task = task
- job.Concurrent = task.Concurrent == 1
- job.serverId = server_id_int
- job.serverName = server.ServerName
+ job.Task = task
+ job.Concurrent = false
+ if task.Concurrent == 1 {
+ job.Concurrent = true
+ }
+ //job.Concurrent = task.Concurrent == 1
+ job.ServerId = server_id_int
+ job.ServerName = server.ServerName
jobArr = append(jobArr, job)
} else {
job := RemoteCommandJob(task.Id, server_id_int, task.TaskName, task.Command, server)
- job.task = task
- job.Concurrent = task.Concurrent == 1
- job.serverId = server_id_int
- job.serverName = server.ServerName
+ job.Task = task
+ job.Concurrent = false
+ if task.Concurrent == 1 {
+ job.Concurrent = true
+ }
+ //job.Concurrent = task.Concurrent == 1
+ job.ServerId = server_id_int
+ job.ServerName = server.ServerName
jobArr = append(jobArr, job)
}
} else if server.ConnectionType == 1 {
if server.Type == 0 {
//密码验证登录服务器
job := RemoteCommandJobByTelnetPassword(task.Id, server_id_int, task.TaskName, task.Command, server)
- job.task = task
- job.Concurrent = task.Concurrent == 1
- job.serverId = server_id_int
- job.serverName = server.ServerName
+ job.Task = task
+ job.Concurrent = false
+ if task.Concurrent == 1 {
+ job.Concurrent = true
+ }
+ //job.Concurrent = task.Concurrent == 1
+ job.ServerId = server_id_int
+ job.ServerName = server.ServerName
jobArr = append(jobArr, job)
}
+ } else if server.ConnectionType == 2 {
+ //密码验证登录服务器
+ job := RemoteCommandJobByAgentPassword(task.Id, server_id_int, task.TaskName, task.Command, server)
+ job.Task = task
+ job.Concurrent = false
+ if task.Concurrent == 1 {
+ job.Concurrent = true
+ }
+ //job.Concurrent = task.Concurrent == 1
+ job.ServerId = server_id_int
+ job.ServerName = server.ServerName
+ jobArr = append(jobArr, job)
+
}
}
}
@@ -111,12 +172,12 @@ func NewJobFromTask(task *models.Task) ([]*Job, error) {
func NewCommandJob(id int, serverId int, name string, command string) *Job {
job := &Job{
- id: id,
- name: name,
+ Id: id,
+ Name: name,
}
- job.jobKey = jobKey(id, serverId)
- job.runFunc = func(timeout time.Duration) (string, string, error, bool) {
+ job.JobKey = libs.JobKey(id, serverId)
+ job.RunFunc = func(timeout time.Duration) (jobresult *JobResult) {
bufOut := new(bytes.Buffer)
bufErr := new(bytes.Buffer)
//cmd := exec.Command("/bin/bash", "-c", command)
@@ -130,8 +191,18 @@ func NewCommandJob(id int, serverId int, name string, command string) *Job {
cmd.Stderr = bufErr
cmd.Start()
err, isTimeout := runCmdWithTimeout(cmd, timeout)
+ jobresult = new(JobResult)
+ jobresult.OutMsg = libs.GbkAsUtf8(bufOut.String())
+ jobresult.ErrMsg = libs.GbkAsUtf8(bufErr.String())
- return gbkAsUtf8(bufOut.String()), gbkAsUtf8(bufErr.String()), err, isTimeout
+ jobresult.IsOk = true
+ if err != nil {
+ jobresult.IsOk = false
+ }
+
+ jobresult.IsTimeout = isTimeout
+
+ return jobresult
}
return job
}
@@ -139,23 +210,29 @@ func NewCommandJob(id int, serverId int, name string, command string) *Job {
//远程执行任务 密钥验证
func RemoteCommandJob(id int, serverId int, name string, command string, servers *models.TaskServer) *Job {
job := &Job{
- id: id,
- name: name,
- serverId: serverId,
+ Id: id,
+ Name: name,
+ ServerId: serverId,
}
- job.jobKey = jobKey(id, serverId)
+ job.JobKey = libs.JobKey(id, serverId)
- job.runFunc = func(timeout time.Duration) (string, string, error, bool) {
+ job.RunFunc = func(timeout time.Duration) (jobresult *JobResult) {
+ jobresult = new(JobResult)
+ jobresult.OutMsg = ""
+ jobresult.ErrMsg = ""
+ jobresult.IsTimeout = false
key, err := ioutil.ReadFile(servers.PrivateKeySrc)
if err != nil {
- return "", "", err, false
+ jobresult.IsOk = false
+ return
}
// Create the Signer for this private key.
signer, err := ssh.ParsePrivateKey(key)
if err != nil {
- return "", "", err, false
+ jobresult.IsOk = false
+ return
}
addr := fmt.Sprintf("%s:%d", servers.ServerIp, servers.Port)
config := &ssh.ClientConfig{
@@ -172,14 +249,16 @@ func RemoteCommandJob(id int, serverId int, name string, command string, servers
// Connect to the remote server and perform the SSH handshake.47.93.220.5
client, err := ssh.Dial("tcp", addr, config)
if err != nil {
- return "", "", err, false
+ jobresult.IsOk = false
+ return
}
defer client.Close()
session, err := client.NewSession()
if err != nil {
- return "", "", err, false
+ jobresult.IsOk = false
+ return
}
defer session.Close()
@@ -193,10 +272,14 @@ func RemoteCommandJob(id int, serverId int, name string, command string, servers
//session.Output(command)
if err := session.Run(command); err != nil {
- return "", "", err, false
+ jobresult.IsOk = false
+ return
}
- isTimeout := false
- return b.String(), c.String(), err, isTimeout
+ jobresult.OutMsg = b.String()
+ jobresult.ErrMsg = c.String()
+ jobresult.IsOk = true
+ jobresult.IsTimeout = false
+ return
}
return job
}
@@ -212,12 +295,18 @@ func RemoteCommandJobByPassword(id int, serverId int, name string, command strin
)
job := &Job{
- id: id,
- name: name,
- serverId: serverId,
+ Id: id,
+ Name: name,
+ ServerId: serverId,
+ ServerType: servers.ConnectionType,
}
- job.jobKey = jobKey(id, serverId)
- job.runFunc = func(timeout time.Duration) (string, string, error, bool) {
+ job.JobKey = libs.JobKey(id, serverId)
+ job.RunFunc = func(timeout time.Duration) (jobresult *JobResult) {
+ jobresult = new(JobResult)
+ jobresult.OutMsg = ""
+ jobresult.ErrMsg = ""
+ jobresult.IsTimeout = false
+
// get auth method
auth = make([]ssh.AuthMethod, 0)
auth = append(auth, ssh.Password(servers.Password))
@@ -235,14 +324,16 @@ func RemoteCommandJobByPassword(id int, serverId int, name string, command strin
addr = fmt.Sprintf("%s:%d", servers.ServerIp, servers.Port)
if client, err = ssh.Dial("tcp", addr, clientConfig); err != nil {
- return "", "", err, false
+ jobresult.IsOk = false
+ return
}
defer client.Close()
// create session
if session, err = client.NewSession(); err != nil {
- return "", "", err, false
+ jobresult.IsOk = false
+ return
}
var b bytes.Buffer
@@ -251,10 +342,14 @@ func RemoteCommandJobByPassword(id int, serverId int, name string, command strin
session.Stderr = &c
//session.Output(command)
if err := session.Run(command); err != nil {
- return "", "", err, false
+ jobresult.IsOk = false
+ return
}
- isTimeout := false
- return b.String(), c.String(), err, isTimeout
+ jobresult.OutMsg = b.String()
+ jobresult.ErrMsg = c.String()
+ jobresult.IsOk = true
+ jobresult.IsTimeout = false
+ return
}
return job
@@ -263,18 +358,23 @@ func RemoteCommandJobByPassword(id int, serverId int, name string, command strin
func RemoteCommandJobByTelnetPassword(id int, serverId int, name string, command string, servers *models.TaskServer) *Job {
job := &Job{
- id: id,
- name: name,
- serverId: serverId,
+ Id: id,
+ Name: name,
+ ServerId: serverId,
}
- job.jobKey = jobKey(id, serverId)
- job.runFunc = func(timeout time.Duration) (string, string, error, bool) {
+ job.JobKey = libs.JobKey(id, serverId)
+ job.RunFunc = func(timeout time.Duration) (jobresult *JobResult) {
+ jobresult = new(JobResult)
+ jobresult.OutMsg = ""
+ jobresult.ErrMsg = ""
+ jobresult.IsTimeout = false
addr := fmt.Sprintf("%s:%d", servers.ServerIp, servers.Port)
conn, err := gote.DialTimeout("tcp", addr, timeout)
if err != nil {
- return "", "", err, false
+ jobresult.IsOk = false
+ return
}
defer conn.Close()
@@ -282,28 +382,35 @@ func RemoteCommandJobByTelnetPassword(id int, serverId int, name string, command
buf := make([]byte, 4096)
if _, err = conn.Read(buf); err != nil {
- return "", "", err, false
+ jobresult.IsOk = false
+ return
}
if _, err = conn.Write([]byte(servers.ServerAccount + "\r\n")); err != nil {
- return "", "", err, false
+ jobresult.IsOk = false
+ return
}
if _, err = conn.Read(buf); err != nil {
- return "", "", err, false
+ jobresult.IsOk = false
+ return
}
if _, err = conn.Write([]byte(servers.Password + "\r\n")); err != nil {
- return "", "", err, false
+ jobresult.IsOk = false
+ return
}
if _, err = conn.Read(buf); err != nil {
- return "", "", err, false
+ jobresult.IsOk = false
+ return
}
- loginStr := gbkAsUtf8(string(buf[:]))
+ loginStr := libs.GbkAsUtf8(string(buf[:]))
if !strings.Contains(loginStr, ">") {
- return "", "", errors.Errorf("Login failed!"), false
+ jobresult.ErrMsg = jobresult.ErrMsg + "Login failed!"
+ jobresult.IsOk = false
+ return
}
commandArr := strings.Split(command, "\n")
@@ -312,44 +419,178 @@ func RemoteCommandJobByTelnetPassword(id int, serverId int, name string, command
for _, c := range commandArr {
_, err = conn.Write([]byte(c + "\r\n"))
if err != nil {
- return "", "", err, false
+ jobresult.IsOk = false
+ return
}
n, err = conn.Read(buf)
- out = out + gbkAsUtf8(string(buf[0:n]))
+ out = out + libs.GbkAsUtf8(string(buf[0:n]))
if err != nil ||
strings.Contains(out, "'"+c+"' is not recognized as an internal or external command") ||
strings.Contains(out, "'"+c+"' 不是内部或外部命令,也不是可运行的程序") {
- return out, "", fmt.Errorf(gbkAsUtf8(string(buf[0:n]))), false
+ jobresult.ErrMsg = jobresult.ErrMsg + " " + libs.GbkAsUtf8(string(buf[0:n]))
+ jobresult.IsOk = false
+ jobresult.OutMsg = out
+ return
}
}
-
- return out, "", nil, false
+ jobresult.IsOk = true
+ jobresult.OutMsg = out
+ return
}
return job
}
-func (j *Job) Status() int {
- return j.status
+func RemoteCommandJobByAgentPassword(id int, serverId int, name string, command string, servers *models.TaskServer) *Job {
+
+ job := &Job{
+ Id: id,
+ Name: name,
+ ServerType: servers.ConnectionType,
+ }
+
+ job.JobKey = libs.JobKey(id, serverId)
+ job.RunFunc = func(timeout time.Duration) *JobResult {
+ return new(JobResult)
+ }
+ return job
+
+}
+
+func (j *Job) GetStatus() int {
+ return j.Status
}
func (j *Job) GetName() string {
- return j.name
+ return j.Name
}
func (j *Job) GetId() int {
- return j.id
+ return j.Id
}
func (j *Job) GetLogId() int64 {
- return j.logId
+ return j.LogId
+}
+
+type RpcResult struct {
+ Status int
+ Message string
+}
+
+func (j *Job) agentRun() (reply *JobResult) {
+
+ server, _ := models.TaskServerGetById(j.ServerId)
+ conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", server.ServerIp, server.Port))
+ reply = new(JobResult)
+ if err != nil {
+ logs.Error("Net error:", err)
+ reply.IsOk = false
+ reply.ErrMsg = "Net error:" + err.Error()
+ reply.IsTimeout = false
+ reply.OutMsg = ""
+ return reply
+ }
+
+ defer conn.Close()
+ client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn))
+
+ defer client.Close()
+ reply = new(JobResult)
+
+ task := j.Task
+ err = client.Call("RpcTask.RunTask", task, &reply)
+ if err != nil {
+ reply.IsOk = false
+ reply.ErrMsg = "Net error:" + err.Error()
+ reply.IsTimeout = false
+ reply.OutMsg = ""
+ return reply
+ }
+ return
+}
+
+func TestServer(server *models.TaskServer) error {
+ if server.ConnectionType == 0 {
+ switch server.Type {
+ case 0:
+ //密码登录
+ return libs.RemoteCommandByPassword(server)
+ case 1:
+ //密钥登录
+ return libs.RemoteCommandByKey(server)
+ default:
+ return errors.New("未知的登录方式")
+
+ }
+ } else if server.ConnectionType == 1 {
+ if server.Type == 0 {
+ //密码登录]
+ return libs.RemoteCommandByTelnetPassword(server)
+ } else {
+ return errors.New("Telnet方式暂不支持密钥登陆!")
+ }
+
+ } else if server.ConnectionType == 2 {
+ return libs.RemoteAgent(server)
+ }
+
+ return errors.New("未知错误")
+}
+
+func PollServer(j *Job) bool {
+ //判断是否是当前执行器执行
+ TaskServerIdsArr := strings.Split(j.Task.ServerIds, ",")
+ num := len(TaskServerIdsArr)
+
+ if num == 0 {
+ return false
+ }
+
+ count := GetCounter(strconv.Itoa(j.Task.Id))
+ index := count % num
+ pollServerId, _ := strconv.Atoi(TaskServerIdsArr[index])
+
+ if j.ServerId != pollServerId {
+ return false
+ }
+
+ //本地服务器
+ if pollServerId == 0 {
+ return true
+ }
+
+ //判断执行器或者服务器是否存活
+ server, _ := models.TaskServerGetById(pollServerId)
+
+ if server.Status != 0 {
+ return false
+ }
+
+ if err := TestServer(server); err != nil {
+ server.Status = 1
+ server.Update()
+ return false
+ }
+
+ return true
+
}
func (j *Job) Run() {
- if !j.Concurrent && j.status > 0 {
- beego.Warn(fmt.Sprintf("任务[%d]上一次执行尚未结束,本次被忽略。", j.jobKey))
+ //执行策略 轮询
+ if j.Task.ServerType == 1 {
+ if !PollServer(j) {
+ return
+ } else {
+ SetCounter(strconv.Itoa(j.Task.Id))
+ }
+ }
+
+ if !j.Concurrent && j.Status > 0 {
+ beego.Warn(fmt.Sprintf("任务[%d]上一次执行尚未结束,本次被忽略。", j.JobKey))
return
}
@@ -366,42 +607,50 @@ func (j *Job) Run() {
}()
}
- beego.Debug(fmt.Sprintf("开始执行任务: %d", j.jobKey))
+ beego.Debug(fmt.Sprintf("开始执行任务: %d", j.JobKey))
- j.status++
+ j.Status++
defer func() {
- j.status--
+ j.Status--
}()
t := time.Now()
timeout := time.Duration(time.Hour * 24)
- if j.task.Timeout > 0 {
- timeout = time.Second * time.Duration(j.task.Timeout)
+ if j.Task.Timeout > 0 {
+ timeout = time.Second * time.Duration(j.Task.Timeout)
}
- cmdOut, cmdErr, err, isTimeout := j.runFunc(timeout)
+
+ var jobResult = new(JobResult)
+ //anget
+ if j.ServerType == 2 {
+ jobResult = j.agentRun()
+ } else {
+ jobResult = j.RunFunc(timeout)
+ }
+
ut := time.Now().Sub(t) / time.Millisecond
// 插入日志
log := new(models.TaskLog)
- log.TaskId = j.id
- log.ServerId = j.serverId
- log.ServerName = j.serverName
- log.Output = cmdOut
- log.Error = cmdErr
+ log.TaskId = j.Id
+ log.ServerId = j.ServerId
+ log.ServerName = j.ServerName
+ log.Output = jobResult.OutMsg
+ log.Error = jobResult.ErrMsg
log.ProcessTime = int(ut)
log.CreateTime = t.Unix()
- if isTimeout {
+ if jobResult.IsTimeout {
log.Status = models.TASK_TIMEOUT
- log.Error = fmt.Sprintf("任务执行超过 %d 秒\n----------------------\n%s\n", int(timeout/time.Second), cmdErr)
- } else if err != nil {
+ log.Error = fmt.Sprintf("任务执行超过 %d 秒\n----------------------\n%s\n", int(timeout/time.Second), jobResult.ErrMsg)
+ } else if !jobResult.IsOk {
log.Status = models.TASK_ERROR
- log.Error = err.Error() + ":" + cmdErr
+ log.Error = "ERROR:" + jobResult.ErrMsg
}
- if log.Status < 0 && j.task.IsNotify == 1 {
- if j.task.NotifyUserIds != "0" && j.task.NotifyUserIds != "" {
- adminInfo := AllAdminInfo(j.task.NotifyUserIds)
+ if log.Status < 0 && j.Task.IsNotify == 1 {
+ if j.Task.NotifyUserIds != "0" && j.Task.NotifyUserIds != "" {
+ adminInfo := AllAdminInfo(j.Task.NotifyUserIds)
phone := make(map[string]string, 0)
dingtalk := make(map[string]string, 0)
wechat := make(map[string]string, 0)
@@ -432,9 +681,9 @@ func (j *Job) Run() {
title, content, taskOutput, errOutput := "", "", "", ""
- notifyTpl, err := models.NotifyTplGetById(j.task.NotifyTplId)
+ notifyTpl, err := models.NotifyTplGetById(j.Task.NotifyTplId)
if err != nil {
- notifyTpl, err := models.NotifyTplGetByTplType(j.task.NotifyType, models.NotifyTplTypeSystem)
+ notifyTpl, err := models.NotifyTplGetByTplType(j.Task.NotifyType, models.NotifyTplTypeSystem)
if err == nil {
title = notifyTpl.Title
content = notifyTpl.Content
@@ -450,10 +699,10 @@ func (j *Job) Run() {
errOutput = strings.Replace(errOutput, "\"", "\\\"", -1)
if title != "" {
- title = strings.Replace(title, "{{TaskId}}", strconv.Itoa(j.task.Id), -1)
- title = strings.Replace(title, "{{ServerId}}", strconv.Itoa(j.serverId), -1)
- title = strings.Replace(title, "{{TaskName}}", j.task.TaskName, -1)
- title = strings.Replace(title, "{{ExecuteCommand}}", j.task.Command, -1)
+ title = strings.Replace(title, "{{TaskId}}", strconv.Itoa(j.Task.Id), -1)
+ title = strings.Replace(title, "{{ServerId}}", strconv.Itoa(j.ServerId), -1)
+ title = strings.Replace(title, "{{TaskName}}", j.Task.TaskName, -1)
+ title = strings.Replace(title, "{{ExecuteCommand}}", j.Task.Command, -1)
title = strings.Replace(title, "{{ExecuteTime}}", beego.Date(time.Unix(log.CreateTime, 0), "Y-m-d H:i:s"), -1)
title = strings.Replace(title, "{{ProcessTime}}", strconv.FormatFloat(float64(log.ProcessTime)/1000, 'f', 6, 64), -1)
title = strings.Replace(title, "{{ExecuteStatus}}", TextStatus[status], -1)
@@ -462,10 +711,10 @@ func (j *Job) Run() {
}
if content != "" {
- content = strings.Replace(content, "{{TaskId}}", strconv.Itoa(j.task.Id), -1)
- content = strings.Replace(content, "{{ServerId}}", strconv.Itoa(j.serverId), -1)
- content = strings.Replace(content, "{{TaskName}}", j.task.TaskName, -1)
- content = strings.Replace(content, "{{ExecuteCommand}}", j.task.Command, -1)
+ content = strings.Replace(content, "{{TaskId}}", strconv.Itoa(j.Task.Id), -1)
+ content = strings.Replace(content, "{{ServerId}}", strconv.Itoa(j.ServerId), -1)
+ content = strings.Replace(content, "{{TaskName}}", j.Task.TaskName, -1)
+ content = strings.Replace(content, "{{ExecuteCommand}}", j.Task.Command, -1)
content = strings.Replace(content, "{{ExecuteTime}}", beego.Date(time.Unix(log.CreateTime, 0), "Y-m-d H:i:s"), -1)
content = strings.Replace(content, "{{ProcessTime}}", strconv.FormatFloat(float64(log.ProcessTime)/1000, 'f', 6, 64), -1)
content = strings.Replace(content, "{{ExecuteStatus}}", TextStatus[status], -1)
@@ -473,14 +722,14 @@ func (j *Job) Run() {
content = strings.Replace(content, "{{ErrorOutput}}", errOutput, -1)
}
- if j.task.NotifyType == 0 && toEmail != "" {
+ if j.Task.NotifyType == 0 && toEmail != "" {
//邮件
mailtype := "html"
ok := notify.SendToChan(toEmail, title, content, mailtype)
if !ok {
fmt.Println("发送邮件错误", toEmail)
}
- } else if j.task.NotifyType == 1 && len(phone) > 0 {
+ } else if j.Task.NotifyType == 1 && len(phone) > 0 {
//信息
param := make(map[string]string)
err := json.Unmarshal([]byte(content), ¶m)
@@ -493,7 +742,7 @@ func (j *Job) Run() {
if !ok {
fmt.Println("发送信息错误", phone)
}
- } else if j.task.NotifyType == 2 && len(dingtalk) > 0 {
+ } else if j.Task.NotifyType == 2 && len(dingtalk) > 0 {
//钉钉
param := make(map[string]interface{})
@@ -507,7 +756,7 @@ func (j *Job) Run() {
if !ok {
fmt.Println("发送钉钉错误", dingtalk)
}
- } else if j.task.NotifyType == 3 && len(wechat) > 0 {
+ } else if j.Task.NotifyType == 3 && len(wechat) > 0 {
//微信
param := make(map[string]string)
err := json.Unmarshal([]byte(content), ¶m)
@@ -524,12 +773,12 @@ func (j *Job) Run() {
}
}
- j.logId, _ = models.TaskLogAdd(log)
+ j.LogId, _ = models.TaskLogAdd(log)
// 更新上次执行时间
- j.task.PrevTime = t.Unix()
- j.task.ExecuteTimes++
- j.task.Update("PrevTime", "ExecuteTimes")
+ j.Task.PrevTime = t.Unix()
+ j.Task.ExecuteTimes++
+ j.Task.Update("PrevTime", "ExecuteTimes")
}
//冗余代码
@@ -572,16 +821,3 @@ func AllAdminInfo(adminIds string) []*adminInfo {
return adminInfos
}
-
-func gbkAsUtf8(str string) string {
- srcDecoder := mahonia.NewDecoder("gbk")
- desDecoder := mahonia.NewDecoder("utf-8")
- resStr := srcDecoder.ConvertString(str)
- _, resBytes, _ := desDecoder.Translate([]byte(resStr), true)
- return string(resBytes)
-}
-
-//任务识别码
-func jobKey(taskId, serverId int) int {
- return taskId*10000 + serverId
-}
diff --git a/libs/convert.go b/libs/convert.go
new file mode 100644
index 0000000..d615ddc
--- /dev/null
+++ b/libs/convert.go
@@ -0,0 +1,15 @@
+/************************************************************
+** @Description: convert
+** @Author: george hao
+** @Date: 2019-06-28 09:34
+** @Last Modified by: george hao
+** @Last Modified time: 2019-06-28 09:34
+*************************************************************/
+package libs
+
+import "fmt"
+
+//查看数据类型
+func DataType(i interface{}) string {
+ return fmt.Sprintf("%T", i)
+}
diff --git a/libs/http.go b/libs/http.go
index c0504f2..95ee2fb 100644
--- a/libs/http.go
+++ b/libs/http.go
@@ -9,10 +9,10 @@ package libs
import (
"github.com/pkg/errors"
- "io/ioutil"
- "strings"
- "net/http"
"io"
+ "io/ioutil"
+ "net/http"
+ "strings"
)
func HttpGet(url string, param map[string]string) (string, error) {
diff --git a/libs/ip.go b/libs/ip.go
new file mode 100644
index 0000000..bcee9aa
--- /dev/null
+++ b/libs/ip.go
@@ -0,0 +1,52 @@
+/************************************************************
+** @Description: ip
+** @Author: george hao
+** @Date: 2019-06-27 09:20
+** @Last Modified by: george hao
+** @Last Modified time: 2019-06-27 09:20
+*************************************************************/
+package libs
+
+import (
+ "io/ioutil"
+ "net"
+ "net/http"
+)
+
+func GetHostIp(IpType int) string {
+ if IpType == 0 {
+ return LocalIp()
+ } else {
+ return PublicIp()
+ }
+}
+
+func LocalIp() string {
+
+ addrs, err := net.InterfaceAddrs()
+ if err != nil {
+ return ""
+ }
+ for _, address := range addrs {
+ // 检查ip地址判断是否回环地址
+ if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
+ if ipnet.IP.To4() != nil {
+ return ipnet.IP.String()
+ }
+ }
+ }
+ return ""
+}
+
+func PublicIp() string {
+ resp, err := http.Get("http://myexternalip.com/raw")
+ if err != nil {
+ return ""
+ }
+ defer resp.Body.Close()
+ content, _ := ioutil.ReadAll(resp.Body)
+ //buf := new(bytes.Buffer)
+ //buf.ReadFrom(resp.Body)
+ //s := buf.String()
+ return string(content)
+}
diff --git a/libs/server.go b/libs/server.go
new file mode 100644
index 0000000..d74d0e7
--- /dev/null
+++ b/libs/server.go
@@ -0,0 +1,150 @@
+/************************************************************
+** @Description: server
+** @Author: george hao
+** @Date: 2019-07-02 11:16
+** @Last Modified by: george hao
+** @Last Modified time: 2019-07-02 11:16
+*************************************************************/
+package libs
+
+import (
+ "fmt"
+ "github.com/george518/PPGo_Job/common"
+ "github.com/george518/PPGo_Job/models"
+ "github.com/linxiaozhi/go-telnet"
+ "github.com/pkg/errors"
+ "golang.org/x/crypto/ssh"
+ "io/ioutil"
+ "net"
+ "net/rpc"
+ "net/rpc/jsonrpc"
+ "strconv"
+ "strings"
+ "time"
+)
+
+func RemoteCommandByTelnetPassword(servers *models.TaskServer) error {
+
+ addr := fmt.Sprintf("%s:%d", servers.ServerIp, servers.Port)
+ conn, err := gote.DialTimeout("tcp", addr, time.Second*10)
+ if err != nil {
+ return err
+ }
+
+ defer conn.Close()
+
+ buf := make([]byte, 4096)
+ _, err = conn.Read(buf)
+ if err != nil {
+ return err
+ }
+
+ _, err = conn.Write([]byte(servers.ServerAccount + "\r\n"))
+ if err != nil {
+ return err
+ }
+
+ _, err = conn.Read(buf)
+ if err != nil {
+ return err
+ }
+
+ _, err = conn.Write([]byte(servers.Password + "\r\n"))
+ if err != nil {
+ return err
+ }
+
+ _, err = conn.Read(buf)
+ if err != nil {
+ return err
+ }
+
+ str := GbkAsUtf8(string(buf[:]))
+
+ if strings.Contains(str, ">") {
+ return nil
+ }
+
+ return errors.Errorf("连接失败!")
+}
+
+func RemoteCommandByPassword(servers *models.TaskServer) error {
+ var (
+ auth []ssh.AuthMethod
+ addr string
+ clientConfig *ssh.ClientConfig
+ )
+
+ auth = make([]ssh.AuthMethod, 0)
+ auth = append(auth, ssh.Password(servers.Password))
+
+ clientConfig = &ssh.ClientConfig{
+ User: servers.ServerAccount,
+ Auth: auth,
+ HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error {
+ return nil
+ },
+ Timeout: 5 * time.Second,
+ }
+
+ addr = fmt.Sprintf("%s:%d", servers.ServerIp, servers.Port)
+ client, err := ssh.Dial("tcp", addr, clientConfig)
+ if err == nil {
+ defer client.Close()
+ }
+ return err
+}
+
+func RemoteCommandByKey(servers *models.TaskServer) error {
+ key, err := ioutil.ReadFile(servers.PrivateKeySrc)
+ if err != nil {
+ return err
+ }
+
+ signer, err := ssh.ParsePrivateKey(key)
+ if err != nil {
+ return err
+ }
+ addr := fmt.Sprintf("%s:%d", servers.ServerIp, servers.Port)
+ config := &ssh.ClientConfig{
+ User: servers.ServerAccount,
+ Auth: []ssh.AuthMethod{
+ // Use the PublicKeys method for remote authentication.
+ ssh.PublicKeys(signer),
+ },
+ //HostKeyCallback: ssh.FixedHostKey(hostKey),
+ HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error {
+ return nil
+ },
+ Timeout: 5 * time.Second,
+ }
+
+ client, err := ssh.Dial("tcp", addr, config)
+ if err == nil {
+ client.Close()
+ }
+ return err
+}
+
+func RemoteAgent(servers *models.TaskServer) error {
+
+ conn, err := net.Dial("tcp", servers.ServerIp+":"+strconv.Itoa(servers.Port))
+ if err != nil {
+ return err
+ }
+ defer conn.Close()
+ client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn))
+ var reply *common.RpcResult
+ defer client.Close()
+
+ ping := "ping"
+ err = client.Call("RpcTask.HeartBeat", ping, &reply)
+ if err != nil {
+ return err
+ }
+ if reply.Status == 200 {
+ return nil
+ } else {
+ return fmt.Errorf("链接错误:%v", reply.Message)
+ }
+}
diff --git a/libs/string.go b/libs/string.go
index 6df5a7d..05eb756 100644
--- a/libs/string.go
+++ b/libs/string.go
@@ -11,6 +11,7 @@ package libs
import (
"crypto/md5"
"fmt"
+ "github.com/axgle/mahonia"
"math/rand"
"regexp"
"time"
@@ -67,3 +68,16 @@ func GetRandomString(lens int) string {
}
return string(result)
}
+
+func GbkAsUtf8(str string) string {
+ srcDecoder := mahonia.NewDecoder("gbk")
+ desDecoder := mahonia.NewDecoder("utf-8")
+ resStr := srcDecoder.ConvertString(str)
+ _, resBytes, _ := desDecoder.Translate([]byte(resStr), true)
+ return string(resBytes)
+}
+
+//任务识别码
+func JobKey(taskId, serverId int) int {
+ return taskId*10000000 + serverId
+}
diff --git a/main.go b/main.go
index 6dda545..fe0b743 100644
--- a/main.go
+++ b/main.go
@@ -16,10 +16,12 @@ import (
)
func init() {
+
//初始化数据模型
var StartTime = time.Now().Unix()
models.Init(StartTime)
jobs.InitJobs()
+
}
func main() {
diff --git a/models/init.go b/models/init.go
index c4db957..814540d 100644
--- a/models/init.go
+++ b/models/init.go
@@ -10,8 +10,6 @@ package models
import (
"net/url"
- "fmt"
-
"github.com/astaxie/beego"
"github.com/astaxie/beego/orm"
_ "github.com/go-sql-driver/mysql"
@@ -31,7 +29,6 @@ func Init(startTime int64) {
dbport = "3306"
}
dsn := dbuser + ":" + dbpassword + "@tcp(" + dbhost + ":" + dbport + ")/" + dbname + "?charset=utf8"
- fmt.Println(dsn)
if timezone != "" {
dsn = dsn + "&loc=" + url.QueryEscape(timezone)
}
diff --git a/models/server.go b/models/server.go
index f22760b..1bac635 100644
--- a/models/server.go
+++ b/models/server.go
@@ -16,22 +16,22 @@ import (
)
type TaskServer struct {
- Id int
- GroupId int
+ Id int
+ GroupId int
ConnectionType int
- ServerName string
- ServerAccount string
- ServerOuterIp string
- ServerIp string
- Port int
- Password string
- PrivateKeySrc string
- PublicKeySrc string
- Type int
- Detail string
- CreateTime int64
- UpdateTime int64
- Status int
+ ServerName string
+ ServerAccount string
+ ServerOuterIp string
+ ServerIp string
+ Port int
+ Password string
+ PrivateKeySrc string
+ PublicKeySrc string
+ Type int
+ Detail string
+ CreateTime int64
+ UpdateTime int64
+ Status int
}
func (t *TaskServer) TableName() string {
@@ -97,6 +97,20 @@ func TaskServerGetById(id int) (*TaskServer, error) {
return obj, nil
}
+func TaskServerForActuator(serverIp string, port int) int {
+ serverFilters := make([]interface{}, 0)
+ serverFilters = append(serverFilters, "status__in", []int{0, 1})
+ serverFilters = append(serverFilters, "server_ip", serverIp)
+ serverFilters = append(serverFilters, "port", port)
+
+ server, _ := TaskServerGetList(1, 1, serverFilters...)
+
+ if len(server) == 1 {
+ return server[0].Id
+ }
+ return 0
+}
+
//
func TaskServerGetByIds(ids string) ([]*TaskServer, int64) {
diff --git a/models/task.go b/models/task.go
index 34c2895..4927f81 100644
--- a/models/task.go
+++ b/models/task.go
@@ -24,6 +24,7 @@ type Task struct {
Id int
GroupId int
ServerIds string
+ ServerType int
TaskName string
Description string
CronSpec string
diff --git a/ppgo_job2.sql b/ppgo_job2.sql
index 03c4d20..6d8f6ca 100644
--- a/ppgo_job2.sql
+++ b/ppgo_job2.sql
@@ -332,11 +332,14 @@ COMMIT;
BEGIN;
ALTER TABLE `ppgo_job2`.`pp_task` CHANGE COLUMN `server_id` `server_ids` varchar(200) NOT NULL DEFAULT '0' COMMENT '服务器id字符串,英文都好隔开';
-BEGIN;
+COMMIT;
BEGIN;
ALTER TABLE `ppgo_job2`.`pp_task_log` ADD COLUMN `server_id` int(11) NOT NULL DEFAULT '-1' COMMENT '服务器ID,-1,异常' AFTER `task_id`, CHANGE COLUMN `output` `output` mediumtext NOT NULL COMMENT '任务输出' AFTER `server_id`, CHANGE COLUMN `error` `error` text NOT NULL COMMENT '错误信息' AFTER `output`, CHANGE COLUMN `status` `status` tinyint(4) NOT NULL COMMENT '状态' AFTER `error`, CHANGE COLUMN `process_time` `process_time` int(11) NOT NULL DEFAULT '0' COMMENT '消耗时间/毫秒' AFTER `status`, CHANGE COLUMN `create_time` `create_time` int(11) UNSIGNED NOT NULL DEFAULT '0' COMMENT '创建时间' AFTER `process_time`;
ALTER TABLE `ppgo_job2`.`pp_task_log` ADD COLUMN `server_name` varchar(60) NOT NULL DEFAULT '\"\"' COMMENT '服务器名称' AFTER `server_id`, CHANGE COLUMN `output` `output` mediumtext NOT NULL COMMENT '任务输出' AFTER `server_name`, CHANGE COLUMN `error` `error` text NOT NULL COMMENT '错误信息' AFTER `output`, CHANGE COLUMN `status` `status` tinyint(4) NOT NULL COMMENT '状态' AFTER `error`, CHANGE COLUMN `process_time` `process_time` int(11) NOT NULL DEFAULT '0' COMMENT '消耗时间/毫秒' AFTER `status`, CHANGE COLUMN `create_time` `create_time` int(11) UNSIGNED NOT NULL DEFAULT '0' COMMENT '创建时间' AFTER `process_time`;
-BEGIN;
+COMMIT;
+BEGIN;
+ALTER TABLE `ppgo_job2`.`pp_task` CHANGE COLUMN `is_notify` ` is_notify` tinyint(1) UNSIGNED NOT NULL DEFAULT '0' COMMENT '0-不通知,1-通知', ADD COLUMN `server_type` tinyint(1) UNSIGNED NOT NULL DEFAULT '1' COMMENT '执行策略:0-同时执行,1-轮询执行' AFTER `update_id`;
+COMMIT;
SET FOREIGN_KEY_CHECKS = 1;
diff --git a/static/admin/js/formSelects-v3.js b/static/admin/js/formSelects-v3.js
new file mode 100644
index 0000000..edd1f20
--- /dev/null
+++ b/static/admin/js/formSelects-v3.js
@@ -0,0 +1,946 @@
+'use strict';
+
+var _typeof = typeof Symbol === "function" && typeof Symbol.iterator === "symbol" ? function (obj) { return typeof obj; } : function (obj) { return obj && typeof Symbol === "function" && obj.constructor === Symbol && obj !== Symbol.prototype ? "symbol" : typeof obj; };
+
+/**
+ * name: formSelects
+ * 基于Layui Select多选
+ * version: 3.0.9.0607
+ * https://faysunshine.com/layui/template/formSelects-v3/formSelects-v3.js
+ */
+(function (layui, window, factory) {
+ if ((typeof exports === 'undefined' ? 'undefined' : _typeof(exports)) === 'object') {
+ // 支持 CommonJS
+ module.exports = factory();
+ } else if (typeof define === 'function' && define.amd) {
+ // 支持 AMD
+ define(factory);
+ } else if (window.layui && layui.define) {
+ //layui加载
+ layui.define(['jquery', 'form'], function (exports) {
+ exports('formSelects', factory());
+ });
+ } else {
+ window.formSelects = factory();
+ }
+})(layui, window, function () {
+ //针对IE的一些处理
+ if (window.Map == undefined) {
+ var _Map = function _Map() {
+ this.value = {};
+ };
+
+ _Map.prototype.set = function (key, val) {
+ this.value[key] = val;
+ };
+
+ _Map.prototype.get = function (key) {
+ return this.value[key];
+ };
+
+ _Map.prototype.has = function (key) {
+ return this.value.hasOwnProperty(key);
+ };
+
+ _Map.prototype.delete = function (key) {
+ delete this.value[key];
+ };
+
+ window.Map = _Map;
+ }
+
+ var $ = layui.jquery || $,
+ form = layui.form,
+ select3 = {
+ value: function value(name) {
+ var type = arguments.length > 1 && arguments[1] !== undefined ? arguments[1] : 'all';
+ var vals = arguments[2];
+
+ if (Array.isArray(type)) {
+ vals = type;
+ type = 'all';
+ }
+ if (name && vals && Array.isArray(vals)) {
+ var options = commons.data.confs.get(name);
+ if (options) {
+ var dl = commons.methods.getDiv(options).find('dl');
+ if (!options.repeat) {
+ vals = new Set(vals);
+ }
+ var on = options.on;
+ options.on = null;
+ commons.methods.removeAll(options);
+ options.delete = false;
+ vals.forEach(function (val) {
+ dl.find('dd:not(.layui-disabled)[lay-value=\'' + val + '\']').click();
+ });
+ options.on = on;
+ }
+ return;
+ }
+ var arr = commons.data.values.get(name);
+ if (!arr) {
+ return vals;
+ }
+ if (type == 'val') {
+ return arr.map(function (val) {
+ return val.val;
+ });
+ }
+ if (type == 'valStr') {
+ return arr.map(function (val) {
+ return val.val;
+ }).join(',');
+ }
+ if (type == 'name') {
+ return arr.map(function (val) {
+ return val.name;
+ });
+ }
+ if (type == 'nameStr') {
+ return arr.map(function (val) {
+ return val.name;
+ }).join(',');
+ }
+ return arr;
+ },
+ render: function render(options) {
+ if (options) {
+ if (commons.data.confs.get(options.name)) {
+ options = commons.methods.cloneOptions(options, commons.data.confs.get(options.name));
+ commons.methods.init(options);
+ } else {
+ var dom = commons.methods.getDom(options, true);
+ if (dom.length) {
+ var hisOptions = commons.methods.cloneOptions(commons.methods.getOptions(dom), commons.data.DEFAULT_OPTIONS);
+ options = commons.methods.cloneOptions(options, hisOptions);
+ commons.methods.init(options);
+ }
+ }
+ } else {
+ commons.methods.autoInit();
+ }
+ },
+ delete: function _delete(name, abs) {
+ if (name && commons.data.confs.get(name)) {
+ var dom = commons.methods.getDom({
+ name: name
+ });
+ if (dom.parent().hasClass(commons.data.pclass)) {
+ if (abs) {
+ dom.removeAttr(commons.data.name);
+ }
+ dom.removeAttr('style');
+ dom.parent()[0].outerHTML = dom[0].outerHTML;
+ }
+ commons.data.confs.delete(name);
+ for (var item in commons.data.temps) {
+ commons.data.temps[item].delete(name);
+ }
+ commons.data.values.delete(name);
+ }
+ },
+ style: function style(name, colors) {
+ if (name) {
+ if (!colors) {
+ commons.methods.loadCss(name, null);
+ } else if (Array.isArray(colors)) {
+ commons.methods.loadCss(name, colors);
+ } else {
+ var arr = [colors.labelBgColor, colors.labelColor, colors.labelIconBgColor, colors.labelIconColor, colors.labelLabelBorderColor, colors.thisBgColor, colors.thisColor];
+ commons.methods.loadCss(name, arr);
+ }
+ }
+ }
+ },
+ commons = {
+ data: {
+ name: 'xm-select',
+ pclass: 'xm-select-parent',
+ vclass: 'xm-select-validate',
+ DEFAULT_OPTIONS: {
+ name: null, //xm-select="xxx"
+ type: 1, //显示模式, 1:layui-this, 2:checkbox, 3:icon
+ icon: {
+ class: 'layui-icon-ok',
+ text: ''
+ },
+ max: null,
+ maxTips: null,
+ init: null, //初始化的选择值,
+ on: null, //select值发生变化
+ data: null
+ },
+ DEFAULT_RENDER: {
+ arr: null,
+ name: 'name',
+ val: 'val',
+ selected: 'selected',
+ disabled: 'disabled'
+ },
+ confs: new Map(),
+ temps: {
+ dom: new Map(),
+ div: new Map()
+ },
+ resize: new Map(),
+ values: new Map(),
+ times: new Map()
+ },
+ methods: {
+ init: function init(options, clone) {
+ if (!clone) {
+ options = commons.methods.cloneOptions(options);
+ }
+ //原始dom添加一个filter
+ var _ref = ['xm-' + options.name, commons.methods.getDom(options, true)],
+ filter = _ref[0],
+ dom = _ref[1];
+
+ if (dom.next().hasClass('layui-form-select')) {
+ dom.next().remove();
+ }
+ if (options.data && options.data.arr) {
+ var os = $.extend({}, commons.data.DEFAULT_RENDER, options.data);
+ var html = '';
+ for (var i in os.arr) {
+ var db = os.arr[i];
+ if (db.arr && Array.isArray(db.arr)) {
+ html += '';
+ } else {
+ html += '';
+ }
+ }
+ dom.html(html);
+ }
+ //判断dom中是否包含了空的option, 如果不包含, 添加
+ if (!dom.find('option[value=""]').length) {
+ $('').insertBefore(dom.find('option:first'));
+ }
+ if (dom.parent().hasClass(commons.data.pclass)) {
+ dom.parent().attr('lay-filter', filter).addClass('layui-form');
+ } else {
+ dom.wrap('