添加定时任务

This commit is contained in:
戴育兵 2025-10-23 16:51:27 +08:00
parent cab4266e45
commit 53ed09e6be
11 changed files with 210 additions and 9 deletions

View File

@ -7,15 +7,16 @@
package cast
import (
reflect "reflect"
sync "sync"
unsafe "unsafe"
_ "github.com/envoyproxy/protoc-gen-validate/validate"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
_ "google.golang.org/protobuf/types/descriptorpb"
emptypb "google.golang.org/protobuf/types/known/emptypb"
_ "google.golang.org/protobuf/types/known/wrapperspb"
reflect "reflect"
sync "sync"
unsafe "unsafe"
)
const (

View File

@ -22,6 +22,7 @@ import (
"fonchain-fiee/cmd/config"
"fonchain-fiee/pkg/cache"
"fonchain-fiee/pkg/common"
cronpkg "fonchain-fiee/pkg/cron"
"fonchain-fiee/pkg/logger"
"fonchain-fiee/pkg/router"
)
@ -32,6 +33,7 @@ func main() {
if err := bootstrap(); err != nil {
panic(err)
}
r := router.NewRouter()
_ = r.Run(config.AppConfig.System.HttpPort)
select {}
@ -57,7 +59,12 @@ func bootstrap() (err error) {
cache.LoadRedis(redisConfig)
common.Init()
//
if configEnv.System.CronOpen {
fmt.Println("启动定时任务管理器...")
if err = cronpkg.InitTasks(); err != nil {
fmt.Printf("定时任务启动失败: %v\n", err)
}
}
//gpt.InitSet(configEnv.Ai.Host, configEnv.Ai.TelNum, configEnv.Ai.Password)
return nil
}

View File

@ -119,6 +119,7 @@ type System struct {
ErpHost string
FieeHost string
AuthRedirectUrl string
CronOpen bool
}
type Oss struct {
AccessKeyId string

View File

@ -7,6 +7,7 @@ RedirectUri = "/api/redirect/url"
ErpHost = "http://114.218.158.24:9020"
FieeHost = "http://114.218.158.24:9020"
AuthRedirectUrl = "http://121.229.45.214:9028/media_account"
CronOpen = false
[bos]
Ak = "ALTAKxrqOQHnAN525Tb2GX4Bhe"
Sk = "d2ecaa9d75114d3b9f42b99014198306"

View File

@ -7,6 +7,7 @@ RedirectUri = "/api/redirect/url"
ErpHost = "https://erpapi.fontree.cn"
FieeHost = "https://erpapi.fiee.com"
AuthRedirectUrl = "https://erp.fiee.com/media_account"
CronOpen = true
[bos]
Ak = "ALTAKxrqOQHnAN525Tb2GX4Bhe"
Sk = "d2ecaa9d75114d3b9f42b99014198306"

View File

@ -8,6 +8,7 @@ ErpHost = "http://114.218.158.24:9020"
FieeHost = "http://114.218.158.24:9020"
FieeApiHost = "https://saas-test.szjixun.cn"
AuthRedirectUrl = "http://121.229.45.214:9028/media_account"
CronOpen = true
[bos]
Ak = "ALTAKxrqOQHnAN525Tb2GX4Bhe"
Sk = "d2ecaa9d75114d3b9f42b99014198306"

134
pkg/cron/cron_manager.go Normal file
View File

@ -0,0 +1,134 @@
package cron
import (
"fmt"
"log"
"sync"
"github.com/robfig/cron/v3"
)
var (
cronManager *CronManager
once sync.Once
)
// CronManager 定时任务管理器
type CronManager struct {
cron *cron.Cron
tasks map[string]cron.EntryID
mu sync.RWMutex
isRunning bool
}
// GetCronManager 获取定时任务管理器单例
func GetCronManager() *CronManager {
once.Do(func() {
cronManager = &CronManager{
cron: cron.New(cron.WithSeconds()), // 支持秒级定时
tasks: make(map[string]cron.EntryID),
}
})
return cronManager
}
// AddTask 添加定时任务
// name: 任务名称,用于标识任务
// spec: cron表达式例如
func (cm *CronManager) AddTask(name string, spec string, job func()) error {
cm.mu.Lock()
defer cm.mu.Unlock()
if _, exists := cm.tasks[name]; exists {
return fmt.Errorf("任务 %s 已存在", name)
}
// 添加任务
entryID, err := cm.cron.AddFunc(spec, func() {
defer func() {
if r := recover(); r != nil {
log.Printf("定时任务 [%s] 执行panic: %v", name, r)
}
}()
log.Printf("定时任务 [%s] 开始执行", name)
job()
log.Printf("定时任务 [%s] 执行完成", name)
})
if err != nil {
return fmt.Errorf("添加任务失败: %v", err)
}
cm.tasks[name] = entryID
log.Printf("定时任务 [%s] 添加成功, cron表达式: %s", name, spec)
return nil
}
// RemoveTask 移除定时任务
func (cm *CronManager) RemoveTask(name string) error {
cm.mu.Lock()
defer cm.mu.Unlock()
entryID, exists := cm.tasks[name]
if !exists {
return fmt.Errorf("任务 %s 不存在", name)
}
cm.cron.Remove(entryID)
delete(cm.tasks, name)
log.Printf("定时任务 [%s] 已移除", name)
return nil
}
// Start 启动定时任务管理器
func (cm *CronManager) Start() {
cm.mu.Lock()
defer cm.mu.Unlock()
if cm.isRunning {
log.Println("定时任务管理器已经在运行中")
return
}
cm.cron.Start()
cm.isRunning = true
log.Printf("定时任务管理器已启动,当前任务数: %d", len(cm.tasks))
}
// Stop 停止定时任务管理器
func (cm *CronManager) Stop() {
cm.mu.Lock()
defer cm.mu.Unlock()
if !cm.isRunning {
return
}
ctx := cm.cron.Stop()
<-ctx.Done()
cm.isRunning = false
log.Println("定时任务管理器已停止")
}
// IsRunning 检查是否在运行
func (cm *CronManager) IsRunning() bool {
cm.mu.RLock()
defer cm.mu.RUnlock()
return cm.isRunning
}
// GetTaskCount 获取任务数量
func (cm *CronManager) GetTaskCount() int {
cm.mu.RLock()
defer cm.mu.RUnlock()
return len(cm.tasks)
}
// ListTasks 列出所有任务名称
func (cm *CronManager) ListTasks() []string {
cm.mu.RLock()
defer cm.mu.RUnlock()
taskNames := make([]string, 0, len(cm.tasks))
for name := range cm.tasks {
taskNames = append(taskNames, name)
}
return taskNames
}

36
pkg/cron/task.go Normal file
View File

@ -0,0 +1,36 @@
package cron
import (
"context"
"fonchain-fiee/api/cast"
"fonchain-fiee/pkg/service"
serverCast "fonchain-fiee/pkg/service/cast"
"log"
)
// InitTasks 初始化定时任务
func InitTasks() error {
cm := GetCronManager()
err := cm.AddTask("refreshWorkApprovalStatus", "0 */5 * * * *", RefreshWorkApprovalStatusTask)
if err != nil {
log.Printf("添加测试任务失败: %v", err)
}
cm.Start()
return nil
}
func RefreshWorkApprovalStatusTask() {
resp, err := service.CastProvider.WorkList(context.Background(), &cast.WorkListReq{
Page: 1,
WorkStatus: uint32(cast.WorkActionENUM_APPROVAL),
PageSize: 9999,
})
if err != nil {
log.Printf("获取工作列表失败: %v", err)
return
}
if resp.Data == nil || len(resp.Data) == 0 {
return
}
serverCast.RefreshWorkApproval(nil, resp.Data)
}

View File

@ -8,6 +8,8 @@ import (
"fonchain-fiee/pkg/e"
modelCast "fonchain-fiee/pkg/model/cast"
"fonchain-fiee/pkg/utils"
"go.uber.org/zap"
)
type CastService struct {
@ -25,6 +27,7 @@ func (c *CastService) ApprovalDetail(approvalIds []int) (data map[int]modelCast.
url := fmt.Sprintf(config.AppConfig.System.FieeHost + "/approval/list/ex")
respBody, err = utils.Post(url, string(idsBytes))
if err != nil {
zap.L().Error("ApprovalDetail error", zap.Error(err))
return
}
var respDetail modelCast.ApprovalDetailResponse

View File

@ -13,7 +13,16 @@ import (
func NewCtxWithUserInfo(ctx *gin.Context) (newCtx context.Context) {
var userInfo = login.Info{}
if config.AppConfig.System.AppMode == "prod" {
userInfo = login.GetUserInfoFromC(ctx)
//_, ok := ctx.Get("jwtInfo")
if ctx == nil {
userInfo = login.Info{
ID: 0,
Name: "系统",
TelNum: "",
}
} else {
userInfo = login.GetUserInfoFromC(ctx)
}
} else {
userInfo = login.Info{
ID: 61,

View File

@ -146,9 +146,15 @@ func WorkList(ctx *gin.Context) {
service.Error(ctx, err)
return
}
if len(resp.Data) > 0 {
RefreshWorkApproval(ctx, resp.Data)
service.Success(ctx, resp)
return
}
func RefreshWorkApproval(ctx *gin.Context, data []*cast.WorkListResp_Info) {
if len(data) > 0 {
var workUuidApprovalIDMap = make(map[int]string)
for _, v := range resp.Data {
for _, v := range data {
if v.WorkStatus == 2 && v.ApprovalID != "" {
approvalID, _ := strconv.ParseUint(v.ApprovalID, 10, 64)
workUuidApprovalIDMap[int(approvalID)] = v.WorkUuid
@ -166,8 +172,6 @@ func WorkList(ctx *gin.Context) {
_ = RefreshWorkApprovalStatus(ctx, workUuidApprovalIDMap)
}
}
service.Success(ctx, resp)
return
}
func WorkDetail(ctx *gin.Context) {
@ -305,6 +309,9 @@ func RefreshWorkApprovalStatus(ctx *gin.Context, approvalIDWorkUuidMap map[int]s
return
}
data, err = castS.ApprovalDetail(approvalIDs)
if err != nil {
return
}
// status: 1待审批 2审批通过 3审批不通过 6撤销发其中 7撤销完成
var newData = make(map[int]modelCast.Item, len(approvalIDs))
for _, v := range approvalIDs {