package cmd import ( "encoding/json" "fmt" "os/exec" "sync" "syscall" "time" "tools/entity" "tools/logic/cutil" "tools/logic/orm" "github.com/spf13/cast" ) const RUN_TYPE_CORE = "core" const RUN_TYPE_API = "api" const noRun = 0 const running = 1 const stop = 2 const done = 3 const taskKey = "gm:task:pool" const taskDataKey = "gm:task:data:pool:" const taskDataTempKey = "gm:task:data:pool:temp:" const taskErrDataKey = "gm:task:err_data:pool:" var runWs sync.WaitGroup var sender = make(map[string]func(task *Task, data string) int64) type Task struct { sync.Mutex TaskId int64 `json:"task_id"` Type string `json:"type"` Params string `json:"params"` Status int `json:"status"` Process int64 `json:"process"` //暂时不用 Total int64 `json:"total"` Thread int `json:"thread"` Weight int32 `json:"weight"` Script string `json:"script"` Start int64 `json:"start"` End int64 `json:"end"` Err int64 `json:"err"` Run bool `json:"run"` AllTime int64 `json:"all_time"` //总耗时 } // func init() { // orm.RegisterSupervisor[*Task]() // api := func(task *Task, data string) int64 { // fmt.Println("[request data params]" + data) // res, err := cutil.Post(task.Script, data) // if err != nil { // cutil.Logs(task.TaskId, err.Error()) // return -1 // } // resp := string(res) // fmt.Println("[response data info]" + resp) // if resp == "null" { // return 1 // } // result := make(map[string]interface{}) // err = json.Unmarshal(res, &result) // if err != nil { // cutil.Logs(task.TaskId, resp) // return -1 // } // v := result["ok"] // if cast.ToInt(v) != 1 { // cutil.Logs(task.TaskId, resp) // return -1 // } // return 1 // } // sender["api"] = api // sender["url"] = api // // //自定义执行命令 // file := func(task *Task, data string) int64 { // strbytes := []byte(data) // base64Code := base64.StdEncoding.EncodeToString(strbytes) // cmd := task.Script + " " + cast.ToString(task.TaskId) + " " + base64Code // return int64(execCmd(task.TaskId, cmd)) // } // sender["file"] = file // //固定脚本文件 // callback := func(task *Task, data string) int64 { // strbytes := []byte(data) // encoded := base64.StdEncoding.EncodeToString(strbytes) // script := base64.StdEncoding.EncodeToString([]byte(task.Script)) // cmd := entity.GetConf().Script + " callback " + cast.ToString(task.TaskId) + " " + script + " " + encoded // return int64(execCmd(task.TaskId, cmd)) // } // sender["callback"] = callback // } func getTaskDataKey(taskId string) string { return taskDataKey + taskId } func getTaskDataTempKey(taskId string) string { return taskDataTempKey + taskId } func getTaskErrDataKey(taskId string) string { return taskErrDataKey + taskId } func (task *Task) Id() int64 { return task.TaskId } func (task *Task) Commit() { redis := entity.GetRedis() redis.HSet(taskKey, cast.ToString(task.TaskId), task.ToString()) } func (task *Task) ToString() string { js, err := json.Marshal(task) if err != nil { return "" } return string(js) } func (task *Task) Stop() { if !task.Run { return } task.Status = stop task.End = time.Now().Unix() task.Run = false } func (task *Task) done() { if !task.Run { return } fmt.Printf(" task %d done......\n", task.TaskId) task.Run = false task.Status = done task.End = time.Now().Unix() } func (task *Task) crash() { if !task.Run { return } fmt.Printf(" task %d crash......\n", task.TaskId) task.Run = false task.Status = stop task.End = time.Now().Unix() } func (task *Task) merge() { taskId := cast.ToString(task.TaskId) key := getTaskDataKey(taskId) errKey := getTaskErrDataKey(taskId) redis := entity.GetRedis() lrange := redis.LRange(errKey, 0, -1) vs := lrange.Val() for _, v := range vs { redis.LPush(key, v) } redis.Del(errKey) } func (task *Task) run() { if task.Run { fmt.Println("task is running") // return } task.Run = true if task.Start == 0 { task.Start = time.Now().Unix() } task.Status = running task.merge() //运行之前一定要把错误放到执行队列,否则无法启动任务 for i := 0; i < task.Thread; i++ { go run(task) } } func (task *Task) exec() int32 { runType := task.Type weight := cast.ToInt(task.Weight) redis := entity.GetRedis() taskId := cast.ToString(task.TaskId) key := getTaskDataKey(taskId) errKey := getTaskErrDataKey(taskId) var doNum int32 var items [][]byte for true { if task.Status == stop { break } if redis.LLen(errKey).Val() > 0 { task.Stop() break } data := redis.LPop(key) if data == nil { break } item := data.Val() if item == "" { break } if runType == "callback" { Js, _ := json.Marshal(item) items = append(items, Js) if len(items) >= weight { break } continue } if runType == "file" && weight >= 1 { Js, _ := json.Marshal(item) items = append(items, Js) if len(items) >= weight { break } continue } //单个发 code := sender[task.Type](task, item) if code == 0 { return doNum } if code < 0 { redis.LPush(errKey, item) return -1 } doNum += int32(code) if doNum >= int32(weight) { return doNum } } //fixme 已经tick 出来的需要放回去 if task.Status == stop { for _, item := range items { redis.LPush(key, item) } items = nil //释放掉 return -1 } //批量发 if len(items) <= 0 { return 0 } js, err := json.Marshal(items) if err != nil { //解析失败 for _, item := range items { var ed string _ = json.Unmarshal(item, &ed) redis.LPush(errKey, ed) } return -1 } num := sender[task.Type](task, string(js)) if num < 0 { //发送失败 for _, item := range items { var ed string _ = json.Unmarshal(item, &ed) redis.LPush(errKey, ed) } return -1 } doNum = int32(len(items)) return doNum } func (task *Task) runApi() int { cmd := entity.GetConf().Script + " api " + cast.ToString(task.TaskId) top := execCmd(task.TaskId, cmd) return top } func (task *Task) delete() { if task.Run { task.Stop() } redis := entity.GetRedis() redis.Del(getTaskErrDataKey(cast.ToString(task.TaskId))) redis.Del(getTaskDataKey(cast.ToString(task.TaskId))) redis.Del(getTaskDataTempKey(cast.ToString(task.TaskId))) redis.HDel(taskKey, cast.ToString(task.TaskId)) } func Start() { redis := entity.GetRedis() list := redis.HGetAll(taskKey) values := list.Val() for _, val := range values { task := &Task{} err := json.Unmarshal([]byte(val), task) if err != nil { panic("marshal task error") } orm.Register[*Task](task) } } func clearAll() { err := orm.DeleteAll[*Task](func(task *Task) { cutil.DeleteLog(task.TaskId) task.delete() }) if err != nil { return } } func Close() { runWs.Wait() fmt.Println("Commit data done") } func run(task *Task) { if task == nil { return } if task.Status != running { return } runType := entity.GetConf().Type top := 0 runWs.Add(1) switch runType { case RUN_TYPE_API: top = task.runApi() break case RUN_TYPE_CORE: top = int(task.exec()) } runWs.Done() if top > 0 && task.Run && task.Status == running { run(task) } if top == 0 && task.Status == running { task.done() } if top < 0 && task.Status == running { task.crash() } } func execCmd(taskId int64, cmd string) int { fmt.Println(cmd) command := exec.Command("bash", "-c", cmd) _, err := command.StdoutPipe() if err != nil { //启动任务失败,一般是系统没有资源可以使用 info := "out| exec command error= " + err.Error() + "\r\n cmd=" + cmd cutil.Logs(taskId, info) fmt.Println(info) return -1 } err = command.Start() if err != nil { //执行command 失败,无法执行到这个命令、或者需要执行的脚本不存在,权限不足,命令找不到等报错 info := "start|exec command error= " + err.Error() + "\r\n cmd=" + cmd cutil.Logs(taskId, info) fmt.Println(info) return -1 } err = command.Wait() if err != nil { //执行脚本过程中发生了异常 //调用脚本无需exit(num) info := " wait| exec command error= " + err.Error() + "\r\n cmd=" + cmd info += "\r\n调用脚本无需exit(num)" //默认项:当exit(num>0) 认为中途发生异常后,本轮数据都未执行成功,未处理失败,将当前批次的数据放入到异常队列,然后停止当前task //若是当前批次已经处理掉一些数据,中途发生异常后又捕获处理掉异常,将失败的数据通过push_swoole_task_data toErr,然后exit(0)即可,然后停止当前task fmt.Println(info) cutil.Logs(taskId, info) return -1 } if command.ProcessState == nil { fmt.Println("state|command process state is null error=" + err.Error()) return 0 } if command.ProcessState.Sys() == nil { return 0 } top := command.ProcessState.Sys().(syscall.WaitStatus).ExitStatus() fmt.Println("ProcessState PID:", command.ProcessState.Pid()) fmt.Println("Exit Code:", top) return top }