fonchain-fiee/pkg/cron/task.go
2025-12-17 16:24:53 +08:00

391 lines
12 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package cron
import (
"context"
"encoding/json"
"errors"
"fmt"
"fonchain-fiee/api/bundle"
"fonchain-fiee/api/cast"
"fonchain-fiee/pkg/cache"
bundleModel "fonchain-fiee/pkg/model/bundle"
modelCast "fonchain-fiee/pkg/model/cast"
"fonchain-fiee/pkg/service"
serverCast "fonchain-fiee/pkg/service/cast"
"log"
"math/rand"
"strconv"
"time"
"github.com/go-redis/redis"
"go.uber.org/zap"
)
// InitTasks 初始化定时任务
func InitTasks() error {
cm := GetCronManager()
err := cm.AddTask("refreshWorkApprovalStatus", "0 */1 * * * *", RefreshWorkApprovalStatusTask)
err = cm.AddTask("artistAutoConfirm", "0 */1 * * * *", ArtistAutoConfirmTask)
err = cm.AddTask("refreshPublishStatus", "0 */1 * * * *", RefreshPublishStatusTask)
err = cm.AddTask("artistAutoConfirmAnalysis", "0 */1 * * * *", ArtistAutoConfirmAnalysisTask)
err = cm.AddTask("refreshWorkAnalysisApprovalStatus", "0 */1 * * * *", RefreshWorkAnalysisApprovalStatusTask)
err = cm.AddTask("scheduledPublish", "0 */1 * * * *", ScheduledPublishTask)
if err != nil {
log.Printf("添加定时任务失败: %v", err)
}
cm.Start()
// 启动队列消费者
go WorkPublishQueueConsumer()
// 启动随机间隔的自动确认任务
go AutoManuallyConfirmWorkTaskWithRandomInterval()
return nil
}
func RefreshWorkApprovalStatusTask() {
resp, err := service.CastProvider.WorkList(context.Background(), &cast.WorkListReq{
Page: 1,
WorkStatus: uint32(cast.WorkActionENUM_APPROVAL),
PageSize: 9999,
})
if err != nil {
log.Printf("获取工作列表失败: %v", err)
return
}
if resp.Data == nil || len(resp.Data) == 0 {
return
}
serverCast.RefreshWorkApproval(nil, resp.Data)
}
func RefreshWorkAnalysisApprovalStatusTask() {
resp, err := service.CastProvider.ListWorkAnalysis(context.Background(), &cast.ListWorkAnalysisReq{
Page: 1,
StatusList: []uint32{2}, // 状态为2表示待审批
PageSize: 999999,
})
if err != nil {
log.Printf("获取数据分析列表失败: %v", err)
return
}
if resp.Data == nil || len(resp.Data) == 0 {
return
}
serverCast.RefreshWorkAnalysisApproval(nil, resp.Data)
}
// AutoManuallyConfirmWorkTaskWithRandomInterval 以随机间隔3-7分钟执行自动确认任务
func AutoManuallyConfirmWorkTaskWithRandomInterval() {
for {
// 执行任务
AutoManuallyConfirmWorkTask()
// 生成3-7分钟之间的随机间隔单位分钟
randomMinutes := rand.Intn(5) + 3 // 3-7分钟
randomDuration := time.Duration(randomMinutes) * time.Minute
// 等待随机时间
time.Sleep(randomDuration)
}
}
func AutoManuallyConfirmWorkTask() {
var req bundle.GetWaitConfirmWorkListReq
res, err := service.BundleProvider.GetWaitConfirmWorkList(context.Background(), &req)
if err != nil {
zap.L().Error("获取待确认作品列表失败", zap.Error(err))
return
}
if res.Data == nil || len(res.Data) == 0 {
return
}
for _, work := range res.Data {
var req bundleModel.UserWorkConfirmReq
req.WorkUuid = work.WorkUuid
req.ConfirmStatus = 1
artistId, err := strconv.ParseInt(work.ArtistUuid, 10, 64)
if err != nil {
zap.L().Error("解析艺术家ID失败", zap.Error(err))
return
}
if req.ConfirmStatus == 2 { // 驳回完直接结束
_, err := service.CastProvider.UpdateStatus(context.Background(), &cast.UpdateStatusReq{
WorkAction: cast.WorkActionENUM_CONFIRM,
WorkUuid: req.WorkUuid,
ConfirmRemark: req.ConfirmRemark,
ConfirmStatus: 2,
})
if err != nil {
zap.L().Error("确认作品失败", zap.Error(err))
return
}
continue
}
balanceInfoRes, err := service.BundleProvider.GetBundleBalanceByUserId(context.Background(), &bundle.GetBundleBalanceByUserIdReq{
UserId: int32(artistId),
})
if err != nil {
continue
}
wordInfoRes, err := service.CastProvider.WorkDetail(context.Background(), &cast.WorkDetailReq{
WorkUuid: req.WorkUuid,
})
if err != nil {
continue
}
if wordInfoRes.WorkStatus != 4 {
continue
}
var workCategory = wordInfoRes.WorkCategory
var addBalanceReq bundle.AddBundleBalanceReq
addBalanceReq.UserId = int32(artistId)
log.Printf("开始确认作品uuid:" + req.WorkUuid)
switch workCategory {
case 1:
{
if balanceInfoRes.ImageExtendConsumptionNumber >= balanceInfoRes.ImageExtendNumber { // 图文余量不足
_, err = service.CastProvider.UpdateStatus(context.Background(), &cast.UpdateStatusReq{
WorkAction: cast.WorkActionENUM_CONFIRM,
WorkUuid: req.WorkUuid,
ConfirmRemark: req.ConfirmRemark,
ConfirmStatus: 3,
})
log.Printf("图文余量不足,作品uuid:"+req.WorkUuid, zap.Error(err))
continue
}
addBalanceReq.ImageConsumptionNumber = 1
}
case 2:
{
if balanceInfoRes.VideoExtendConsumptionNumber >= balanceInfoRes.VideoExtendNumber { // 视频余量不足
_, err = service.CastProvider.UpdateStatus(context.Background(), &cast.UpdateStatusReq{
WorkAction: cast.WorkActionENUM_CONFIRM,
WorkUuid: req.WorkUuid,
ConfirmRemark: req.ConfirmRemark,
ConfirmStatus: 3,
})
log.Printf("视频余量不足,作品uuid:"+req.WorkUuid, zap.Error(err))
continue
}
addBalanceReq.VideoConsumptionNumber = 1
}
default:
continue
}
resp, err := service.BundleProvider.AddBundleBalance(context.Background(), &addBalanceReq)
if err != nil {
log.Printf("扣除余额失败,作品uuid:"+req.WorkUuid, zap.Error(err))
continue
}
log.Printf("扣除余额成功,作品uuid:" + req.WorkUuid)
_, err = service.CastProvider.UpdateStatus(context.Background(), &cast.UpdateStatusReq{
WorkAction: cast.WorkActionENUM_CONFIRM,
WorkUuid: req.WorkUuid,
ConfirmRemark: req.ConfirmRemark,
CostType: resp.UsedType,
ConfirmStatus: 1,
})
if err != nil {
log.Printf("更新作品状态失败,作品uuid:"+req.WorkUuid, zap.Error(err))
return
}
}
}
func ArtistAutoConfirmTask() {
now := float64(time.Now().Unix())
opt := redis.ZRangeBy{
Min: fmt.Sprintf("%d", 0),
Max: fmt.Sprintf("%f", now),
}
workUuids, err := cache.RedisClient.ZRangeByScore(modelCast.AutoConfirmQueueKey, opt).Result()
if err != nil {
zap.L().Error("获取到期任务失败", zap.Error(err))
return
}
if len(workUuids) == 0 {
zap.L().Debug("没有到期的任务")
return
}
zap.L().Info("发现到期任务", zap.Int("count", len(workUuids)))
for _, workUuid := range workUuids {
serverCast.ProcessTask(context.Background(), workUuid)
}
}
func RefreshPublishStatusTask() {
// 加上锁 万一上一批没有同步完
lockKey := "refresh_publish_status:lock"
reply := cache.RedisClient.SetNX(lockKey, "1", 5*time.Minute)
if !reply.Val() {
zap.L().Warn("任务正在被其他实例处理")
return
}
defer func() {
cache.RedisClient.Del(lockKey)
}()
err := serverCast.RefreshPublish()
if err != nil {
zap.L().Error("刷新发布状态失败", zap.Error(err))
return
}
zap.L().Info("刷新发布状态成功")
}
func ArtistAutoConfirmAnalysisTask() {
now := float64(time.Now().Unix())
opt := redis.ZRangeBy{
Min: fmt.Sprintf("%d", 0),
Max: fmt.Sprintf("%f", now),
}
analysisUuids, err := cache.RedisClient.ZRangeByScore(modelCast.AutoConfirmAnalysisQueueKey, opt).Result()
if err != nil {
zap.L().Error("获取到期数据分析任务失败", zap.Error(err))
return
}
if len(analysisUuids) == 0 {
zap.L().Debug("没有到期的数据分析任务")
return
}
zap.L().Info("发现到期数据分析任务", zap.Int("count", len(analysisUuids)))
for _, analysisUuid := range analysisUuids {
serverCast.ProcessAnalysisTask(context.Background(), analysisUuid)
}
}
// WorkPublishQueueConsumer 监听work:publish:queue队列的消费者
func WorkPublishQueueConsumer() {
zap.L().Info("开始监听work:publish:queue队列")
for {
result, err := cache.RedisClient.BRPop(0*time.Second, modelCast.WorkPublishQueueKey).Result()
if err != nil {
zap.L().Error("监听work:publish:queue队列失败", zap.Error(err))
time.Sleep(5 * time.Second) // 出错后等待5秒再重试
continue
}
if len(result) < 2 {
zap.L().Warn("队列返回数据格式异常", zap.Any("result", result))
continue
}
workData := result[1] // BRPOP返回[key, value]value在第二个元素
zap.L().Info("从work:publish:queue队列收到数据", zap.String("data", workData))
// 处理队列数据
if err = processWorkPublishQueueData(workData); err != nil {
zap.L().Error("处理work:publish:queue队列数据失败", zap.Error(err), zap.String("data", workData))
continue
}
zap.L().Info("成功处理work:publish:queue队列数据", zap.String("data", workData))
}
}
// processWorkPublishQueueData 处理从work:publish:queue队列中取出的数据
func processWorkPublishQueueData(data string) error {
// 延时1秒消费
time.Sleep(time.Second * 1)
var workData map[string]string
_ = json.Unmarshal([]byte(data), &workData)
workUuid := workData["workUuid"]
if workUuid == "" {
zap.L().Error("队列数据为空", zap.String("raw_data", data))
return errors.New("队列数据为空")
}
zap.L().Info("处理发布工作队列数据", zap.String("work_uuid", workUuid))
// 调用发布工作逻辑
err := serverCast.PublishWork(context.Background(), &cast.PublishReq{
WorkUuids: []string{workUuid},
})
if err != nil {
zap.L().Error("发布工作失败",
zap.String("work_uuid", workUuid),
zap.Error(err))
return err
}
zap.L().Info("发布工作成功", zap.String("work_uuid", workUuid))
return nil
}
// ScheduledPublishTask 定时发布任务从Redis Sorted Set中获取所有workUuid并根据score判断处理
func ScheduledPublishTask() {
// 加上锁
lockKey := "scheduled_publish:lock"
reply := cache.RedisClient.SetNX(lockKey, time.Now().Format("2006-01-02 15:04:05"), 1*time.Hour)
if !reply.Val() {
zap.L().Warn("定时发布任务正在被其他实例处理")
return
}
defer func() {
cache.RedisClient.Del(lockKey)
}()
// 获取所有数据不限制score范围
opt := redis.ZRangeBy{
Min: "-inf",
Max: "+inf",
}
// 从Redis Sorted Set中获取所有workUuid及其score
workList, err := cache.RedisClient.ZRangeByScoreWithScores(modelCast.ScheduledPublishQueueKey, opt).Result()
if err != nil {
zap.L().Error("获取定时发布任务失败", zap.Error(err))
return
}
if len(workList) == 0 {
zap.L().Debug("没有定时发布任务")
return
}
publishCount := 0
expiredCount := 0
zap.L().Info("发现定时发布任务", zap.Int("total_count", len(workList)))
// 遍历所有数据根据score判断处理
for _, item := range workList {
workUuid := item.Member.(string)
score := item.Score
now := float64(time.Now().Unix())
// 如果score小于当前时间删除但不消费不发布
if score < now {
zap.L().Info("发现过期的定时发布任务,直接删除不发布",
zap.String("work_uuid", workUuid),
zap.Float64("score", score),
zap.Float64("now", now))
removed, delErr := cache.RedisClient.ZRem(modelCast.ScheduledPublishQueueKey, workUuid).Result()
if delErr != nil {
zap.L().Error("删除过期定时发布任务失败", zap.Error(delErr), zap.String("work_uuid", workUuid))
} else if removed > 0 {
expiredCount++
zap.L().Info("已删除过期定时发布任务", zap.String("work_uuid", workUuid))
}
continue
}
// score大于等于当前时间正常发布
zap.L().Info("处理定时发布任务",
zap.String("work_uuid", workUuid),
zap.Float64("score", score))
_ = serverCast.PublishWork(context.Background(), &cast.PublishReq{
WorkUuids: []string{workUuid},
})
removed, delErr := cache.RedisClient.ZRem(modelCast.ScheduledPublishQueueKey, workUuid).Result()
if delErr != nil {
zap.L().Error("删除定时发布任务失败", zap.Error(delErr), zap.String("work_uuid", workUuid))
} else if removed > 0 {
publishCount++
zap.L().Info("已发布并删除定时发布任务", zap.String("work_uuid", workUuid))
}
}
zap.L().Info("定时发布任务处理完成",
zap.Int("published_count", publishCount),
zap.Int("expired_count", expiredCount),
zap.Int("total_count", len(workList)))
}