You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

389 lines
8.8 KiB

package cmd
import (
"encoding/json"
"fmt"
"os/exec"
"sync"
"syscall"
"time"
"tools/entity"
"tools/logic/cutil"
"tools/logic/orm"
"github.com/spf13/cast"
)
const RUN_TYPE_CORE = "core"
const RUN_TYPE_API = "api"
const noRun = 0
const running = 1
const stop = 2
const done = 3
const taskKey = "gm:task:pool"
const taskDataKey = "gm:task:data:pool:"
const taskDataTempKey = "gm:task:data:pool:temp:"
const taskErrDataKey = "gm:task:err_data:pool:"
var runWs sync.WaitGroup
var sender = make(map[string]func(task *Task, data string) int64)
type Task struct {
sync.Mutex
TaskId int64 `json:"task_id"`
Type string `json:"type"`
Params string `json:"params"`
Status int `json:"status"`
Process int64 `json:"process"` //暂时不用
Total int64 `json:"total"`
Thread int `json:"thread"`
Weight int32 `json:"weight"`
Script string `json:"script"`
Start int64 `json:"start"`
End int64 `json:"end"`
Err int64 `json:"err"`
Run bool `json:"run"`
AllTime int64 `json:"all_time"` //总耗时
}
// func init() {
// orm.RegisterSupervisor[*Task]()
// api := func(task *Task, data string) int64 {
// fmt.Println("[request data params]" + data)
// res, err := cutil.Post(task.Script, data)
// if err != nil {
// cutil.Logs(task.TaskId, err.Error())
// return -1
// }
// resp := string(res)
// fmt.Println("[response data info]" + resp)
// if resp == "null" {
// return 1
// }
// result := make(map[string]interface{})
// err = json.Unmarshal(res, &result)
// if err != nil {
// cutil.Logs(task.TaskId, resp)
// return -1
// }
// v := result["ok"]
// if cast.ToInt(v) != 1 {
// cutil.Logs(task.TaskId, resp)
// return -1
// }
// return 1
// }
// sender["api"] = api
// sender["url"] = api
//
// //自定义执行命令
// file := func(task *Task, data string) int64 {
// strbytes := []byte(data)
// base64Code := base64.StdEncoding.EncodeToString(strbytes)
// cmd := task.Script + " " + cast.ToString(task.TaskId) + " " + base64Code
// return int64(execCmd(task.TaskId, cmd))
// }
// sender["file"] = file
// //固定脚本文件
// callback := func(task *Task, data string) int64 {
// strbytes := []byte(data)
// encoded := base64.StdEncoding.EncodeToString(strbytes)
// script := base64.StdEncoding.EncodeToString([]byte(task.Script))
// cmd := entity.GetConf().Script + " callback " + cast.ToString(task.TaskId) + " " + script + " " + encoded
// return int64(execCmd(task.TaskId, cmd))
// }
// sender["callback"] = callback
// }
func getTaskDataKey(taskId string) string {
return taskDataKey + taskId
}
func getTaskDataTempKey(taskId string) string {
return taskDataTempKey + taskId
}
func getTaskErrDataKey(taskId string) string {
return taskErrDataKey + taskId
}
func (task *Task) Id() int64 {
return task.TaskId
}
func (task *Task) Commit() {
redis := entity.GetRedis()
redis.HSet(taskKey, cast.ToString(task.TaskId), task.ToString())
}
func (task *Task) ToString() string {
js, err := json.Marshal(task)
if err != nil {
return ""
}
return string(js)
}
func (task *Task) Stop() {
if !task.Run {
return
}
task.Status = stop
task.End = time.Now().Unix()
task.Run = false
}
func (task *Task) done() {
if !task.Run {
return
}
fmt.Printf(" task %d done......\n", task.TaskId)
task.Run = false
task.Status = done
task.End = time.Now().Unix()
}
func (task *Task) crash() {
if !task.Run {
return
}
fmt.Printf(" task %d crash......\n", task.TaskId)
task.Run = false
task.Status = stop
task.End = time.Now().Unix()
}
func (task *Task) merge() {
taskId := cast.ToString(task.TaskId)
key := getTaskDataKey(taskId)
errKey := getTaskErrDataKey(taskId)
redis := entity.GetRedis()
lrange := redis.LRange(errKey, 0, -1)
vs := lrange.Val()
for _, v := range vs {
redis.LPush(key, v)
}
redis.Del(errKey)
}
func (task *Task) run() {
if task.Run {
fmt.Println("task is running")
// return
}
task.Run = true
if task.Start == 0 {
task.Start = time.Now().Unix()
}
task.Status = running
task.merge() //运行之前一定要把错误放到执行队列,否则无法启动任务
for i := 0; i < task.Thread; i++ {
go run(task)
}
}
func (task *Task) exec() int32 {
runType := task.Type
weight := cast.ToInt(task.Weight)
redis := entity.GetRedis()
taskId := cast.ToString(task.TaskId)
key := getTaskDataKey(taskId)
errKey := getTaskErrDataKey(taskId)
var doNum int32
var items [][]byte
for true {
if task.Status == stop {
break
}
if redis.LLen(errKey).Val() > 0 {
task.Stop()
break
}
data := redis.LPop(key)
if data == nil {
break
}
item := data.Val()
if item == "" {
break
}
if runType == "callback" {
Js, _ := json.Marshal(item)
items = append(items, Js)
if len(items) >= weight {
break
}
continue
}
if runType == "file" && weight >= 1 {
Js, _ := json.Marshal(item)
items = append(items, Js)
if len(items) >= weight {
break
}
continue
}
//单个发
code := sender[task.Type](task, item)
if code == 0 {
return doNum
}
if code < 0 {
redis.LPush(errKey, item)
return -1
}
doNum += int32(code)
if doNum >= int32(weight) {
return doNum
}
}
//fixme 已经tick 出来的需要放回去
if task.Status == stop {
for _, item := range items {
redis.LPush(key, item)
}
items = nil //释放掉
return -1
}
//批量发
if len(items) <= 0 {
return 0
}
js, err := json.Marshal(items)
if err != nil { //解析失败
for _, item := range items {
var ed string
_ = json.Unmarshal(item, &ed)
redis.LPush(errKey, ed)
}
return -1
}
num := sender[task.Type](task, string(js))
if num < 0 { //发送失败
for _, item := range items {
var ed string
_ = json.Unmarshal(item, &ed)
redis.LPush(errKey, ed)
}
return -1
}
doNum = int32(len(items))
return doNum
}
func (task *Task) runApi() int {
cmd := entity.GetConf().Script + " api " + cast.ToString(task.TaskId)
top := execCmd(task.TaskId, cmd)
return top
}
func (task *Task) delete() {
if task.Run {
task.Stop()
}
redis := entity.GetRedis()
redis.Del(getTaskErrDataKey(cast.ToString(task.TaskId)))
redis.Del(getTaskDataKey(cast.ToString(task.TaskId)))
redis.Del(getTaskDataTempKey(cast.ToString(task.TaskId)))
redis.HDel(taskKey, cast.ToString(task.TaskId))
}
func Start() {
redis := entity.GetRedis()
list := redis.HGetAll(taskKey)
values := list.Val()
for _, val := range values {
task := &Task{}
err := json.Unmarshal([]byte(val), task)
if err != nil {
panic("marshal task error")
}
orm.Register[*Task](task)
}
}
func clearAll() {
err := orm.DeleteAll[*Task](func(task *Task) {
cutil.DeleteLog(task.TaskId)
task.delete()
})
if err != nil {
return
}
}
func Close() {
runWs.Wait()
fmt.Println("Commit data done")
}
func run(task *Task) {
if task == nil {
return
}
if task.Status != running {
return
}
runType := entity.GetConf().Type
top := 0
runWs.Add(1)
switch runType {
case RUN_TYPE_API:
top = task.runApi()
break
case RUN_TYPE_CORE:
top = int(task.exec())
}
runWs.Done()
if top > 0 && task.Run && task.Status == running {
run(task)
}
if top == 0 && task.Status == running {
task.done()
}
if top < 0 && task.Status == running {
task.crash()
}
}
func execCmd(taskId int64, cmd string) int {
fmt.Println(cmd)
command := exec.Command("bash", "-c", cmd)
_, err := command.StdoutPipe()
if err != nil { //启动任务失败,一般是系统没有资源可以使用
info := "out| exec command error= " + err.Error() + "\r\n cmd=" + cmd
cutil.Logs(taskId, info)
fmt.Println(info)
return -1
}
err = command.Start()
if err != nil {
//执行command 失败,无法执行到这个命令、或者需要执行的脚本不存在,权限不足,命令找不到等报错
info := "start|exec command error= " + err.Error() + "\r\n cmd=" + cmd
cutil.Logs(taskId, info)
fmt.Println(info)
return -1
}
err = command.Wait()
if err != nil {
//执行脚本过程中发生了异常
//调用脚本无需exit(num)
info := " wait| exec command error= " + err.Error() + "\r\n cmd=" + cmd
info += "\r\n调用脚本无需exit(num)"
//默认项:当exit(num>0) 认为中途发生异常后,本轮数据都未执行成功,未处理失败,将当前批次的数据放入到异常队列,然后停止当前task
//若是当前批次已经处理掉一些数据,中途发生异常后又捕获处理掉异常,将失败的数据通过push_swoole_task_data toErr,然后exit(0)即可,然后停止当前task
fmt.Println(info)
cutil.Logs(taskId, info)
return -1
}
if command.ProcessState == nil {
fmt.Println("state|command process state is null error=" + err.Error())
return 0
}
if command.ProcessState.Sys() == nil {
return 0
}
top := command.ProcessState.Sys().(syscall.WaitStatus).ExitStatus()
fmt.Println("ProcessState PID:", command.ProcessState.Pid())
fmt.Println("Exit Code:", top)
return top
}