391 lines
12 KiB
Go
391 lines
12 KiB
Go
package cron
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"errors"
|
||
"fmt"
|
||
"fonchain-fiee/api/bundle"
|
||
"fonchain-fiee/api/cast"
|
||
"fonchain-fiee/pkg/cache"
|
||
bundleModel "fonchain-fiee/pkg/model/bundle"
|
||
modelCast "fonchain-fiee/pkg/model/cast"
|
||
"fonchain-fiee/pkg/service"
|
||
serverCast "fonchain-fiee/pkg/service/cast"
|
||
"log"
|
||
"math/rand"
|
||
"strconv"
|
||
"time"
|
||
|
||
"github.com/go-redis/redis"
|
||
"go.uber.org/zap"
|
||
)
|
||
|
||
// InitTasks 初始化定时任务
|
||
func InitTasks() error {
|
||
cm := GetCronManager()
|
||
err := cm.AddTask("refreshWorkApprovalStatus", "0 */1 * * * *", RefreshWorkApprovalStatusTask)
|
||
err = cm.AddTask("artistAutoConfirm", "0 */1 * * * *", ArtistAutoConfirmTask)
|
||
err = cm.AddTask("refreshPublishStatus", "0 */1 * * * *", RefreshPublishStatusTask)
|
||
|
||
err = cm.AddTask("artistAutoConfirmAnalysis", "0 */1 * * * *", ArtistAutoConfirmAnalysisTask)
|
||
err = cm.AddTask("refreshWorkAnalysisApprovalStatus", "0 */1 * * * *", RefreshWorkAnalysisApprovalStatusTask)
|
||
err = cm.AddTask("scheduledPublish", "0 */1 * * * *", ScheduledPublishTask)
|
||
if err != nil {
|
||
log.Printf("添加定时任务失败: %v", err)
|
||
}
|
||
cm.Start()
|
||
|
||
// 启动队列消费者
|
||
go WorkPublishQueueConsumer()
|
||
|
||
// 启动随机间隔的自动确认任务
|
||
go AutoManuallyConfirmWorkTaskWithRandomInterval()
|
||
|
||
return nil
|
||
}
|
||
|
||
func RefreshWorkApprovalStatusTask() {
|
||
resp, err := service.CastProvider.WorkList(context.Background(), &cast.WorkListReq{
|
||
Page: 1,
|
||
WorkStatus: uint32(cast.WorkActionENUM_APPROVAL),
|
||
PageSize: 9999,
|
||
})
|
||
if err != nil {
|
||
log.Printf("获取工作列表失败: %v", err)
|
||
return
|
||
}
|
||
if resp.Data == nil || len(resp.Data) == 0 {
|
||
return
|
||
}
|
||
serverCast.RefreshWorkApproval(nil, resp.Data)
|
||
}
|
||
|
||
func RefreshWorkAnalysisApprovalStatusTask() {
|
||
resp, err := service.CastProvider.ListWorkAnalysis(context.Background(), &cast.ListWorkAnalysisReq{
|
||
Page: 1,
|
||
StatusList: []uint32{2}, // 状态为2表示待审批
|
||
PageSize: 999999,
|
||
})
|
||
if err != nil {
|
||
log.Printf("获取数据分析列表失败: %v", err)
|
||
return
|
||
}
|
||
if resp.Data == nil || len(resp.Data) == 0 {
|
||
return
|
||
}
|
||
serverCast.RefreshWorkAnalysisApproval(nil, resp.Data)
|
||
}
|
||
|
||
// AutoManuallyConfirmWorkTaskWithRandomInterval 以随机间隔(3-7分钟)执行自动确认任务
|
||
func AutoManuallyConfirmWorkTaskWithRandomInterval() {
|
||
|
||
for {
|
||
// 执行任务
|
||
AutoManuallyConfirmWorkTask()
|
||
|
||
// 生成3-7分钟之间的随机间隔(单位:分钟)
|
||
randomMinutes := rand.Intn(5) + 3 // 3-7分钟
|
||
randomDuration := time.Duration(randomMinutes) * time.Minute
|
||
|
||
// 等待随机时间
|
||
time.Sleep(randomDuration)
|
||
}
|
||
}
|
||
func AutoManuallyConfirmWorkTask() {
|
||
var req bundle.GetWaitConfirmWorkListReq
|
||
res, err := service.BundleProvider.GetWaitConfirmWorkList(context.Background(), &req)
|
||
if err != nil {
|
||
zap.L().Error("获取待确认作品列表失败", zap.Error(err))
|
||
return
|
||
}
|
||
if res.Data == nil || len(res.Data) == 0 {
|
||
return
|
||
}
|
||
for _, work := range res.Data {
|
||
var req bundleModel.UserWorkConfirmReq
|
||
req.WorkUuid = work.WorkUuid
|
||
req.ConfirmStatus = 1
|
||
artistId, err := strconv.ParseInt(work.ArtistUuid, 10, 64)
|
||
if err != nil {
|
||
zap.L().Error("解析艺术家ID失败", zap.Error(err))
|
||
return
|
||
}
|
||
if req.ConfirmStatus == 2 { // 驳回完直接结束
|
||
_, err := service.CastProvider.UpdateStatus(context.Background(), &cast.UpdateStatusReq{
|
||
WorkAction: cast.WorkActionENUM_CONFIRM,
|
||
WorkUuid: req.WorkUuid,
|
||
ConfirmRemark: req.ConfirmRemark,
|
||
ConfirmStatus: 2,
|
||
})
|
||
if err != nil {
|
||
zap.L().Error("确认作品失败", zap.Error(err))
|
||
return
|
||
}
|
||
continue
|
||
}
|
||
|
||
balanceInfoRes, err := service.BundleProvider.GetBundleBalanceByUserId(context.Background(), &bundle.GetBundleBalanceByUserIdReq{
|
||
UserId: int32(artistId),
|
||
})
|
||
if err != nil {
|
||
continue
|
||
}
|
||
|
||
wordInfoRes, err := service.CastProvider.WorkDetail(context.Background(), &cast.WorkDetailReq{
|
||
WorkUuid: req.WorkUuid,
|
||
})
|
||
if err != nil {
|
||
continue
|
||
}
|
||
if wordInfoRes.WorkStatus != 4 {
|
||
continue
|
||
}
|
||
var workCategory = wordInfoRes.WorkCategory
|
||
|
||
var addBalanceReq bundle.AddBundleBalanceReq
|
||
addBalanceReq.UserId = int32(artistId)
|
||
log.Printf("开始确认作品uuid:" + req.WorkUuid)
|
||
switch workCategory {
|
||
case 1:
|
||
{
|
||
if balanceInfoRes.ImageExtendConsumptionNumber >= balanceInfoRes.ImageExtendNumber { // 图文余量不足
|
||
_, err = service.CastProvider.UpdateStatus(context.Background(), &cast.UpdateStatusReq{
|
||
WorkAction: cast.WorkActionENUM_CONFIRM,
|
||
WorkUuid: req.WorkUuid,
|
||
ConfirmRemark: req.ConfirmRemark,
|
||
ConfirmStatus: 3,
|
||
})
|
||
log.Printf("图文余量不足,作品uuid:"+req.WorkUuid, zap.Error(err))
|
||
continue
|
||
}
|
||
addBalanceReq.ImageConsumptionNumber = 1
|
||
}
|
||
case 2:
|
||
{
|
||
if balanceInfoRes.VideoExtendConsumptionNumber >= balanceInfoRes.VideoExtendNumber { // 视频余量不足
|
||
_, err = service.CastProvider.UpdateStatus(context.Background(), &cast.UpdateStatusReq{
|
||
WorkAction: cast.WorkActionENUM_CONFIRM,
|
||
WorkUuid: req.WorkUuid,
|
||
ConfirmRemark: req.ConfirmRemark,
|
||
ConfirmStatus: 3,
|
||
})
|
||
log.Printf("视频余量不足,作品uuid:"+req.WorkUuid, zap.Error(err))
|
||
continue
|
||
}
|
||
addBalanceReq.VideoConsumptionNumber = 1
|
||
}
|
||
default:
|
||
continue
|
||
}
|
||
resp, err := service.BundleProvider.AddBundleBalance(context.Background(), &addBalanceReq)
|
||
if err != nil {
|
||
log.Printf("扣除余额失败,作品uuid:"+req.WorkUuid, zap.Error(err))
|
||
continue
|
||
}
|
||
log.Printf("扣除余额成功,作品uuid:" + req.WorkUuid)
|
||
_, err = service.CastProvider.UpdateStatus(context.Background(), &cast.UpdateStatusReq{
|
||
WorkAction: cast.WorkActionENUM_CONFIRM,
|
||
WorkUuid: req.WorkUuid,
|
||
ConfirmRemark: req.ConfirmRemark,
|
||
CostType: resp.UsedType,
|
||
ConfirmStatus: 1,
|
||
})
|
||
if err != nil {
|
||
log.Printf("更新作品状态失败,作品uuid:"+req.WorkUuid, zap.Error(err))
|
||
return
|
||
}
|
||
}
|
||
}
|
||
|
||
func ArtistAutoConfirmTask() {
|
||
now := float64(time.Now().Unix())
|
||
opt := redis.ZRangeBy{
|
||
Min: fmt.Sprintf("%d", 0),
|
||
Max: fmt.Sprintf("%f", now),
|
||
}
|
||
workUuids, err := cache.RedisClient.ZRangeByScore(modelCast.AutoConfirmQueueKey, opt).Result()
|
||
if err != nil {
|
||
zap.L().Error("获取到期任务失败", zap.Error(err))
|
||
return
|
||
}
|
||
if len(workUuids) == 0 {
|
||
zap.L().Debug("没有到期的任务")
|
||
return
|
||
}
|
||
zap.L().Info("发现到期任务", zap.Int("count", len(workUuids)))
|
||
for _, workUuid := range workUuids {
|
||
serverCast.ProcessTask(context.Background(), workUuid)
|
||
}
|
||
}
|
||
|
||
func RefreshPublishStatusTask() {
|
||
// 加上锁 万一上一批没有同步完
|
||
lockKey := "refresh_publish_status:lock"
|
||
reply := cache.RedisClient.SetNX(lockKey, "1", 5*time.Minute)
|
||
if !reply.Val() {
|
||
zap.L().Warn("任务正在被其他实例处理")
|
||
return
|
||
}
|
||
defer func() {
|
||
cache.RedisClient.Del(lockKey)
|
||
}()
|
||
err := serverCast.RefreshPublish()
|
||
if err != nil {
|
||
zap.L().Error("刷新发布状态失败", zap.Error(err))
|
||
return
|
||
}
|
||
zap.L().Info("刷新发布状态成功")
|
||
}
|
||
|
||
func ArtistAutoConfirmAnalysisTask() {
|
||
now := float64(time.Now().Unix())
|
||
opt := redis.ZRangeBy{
|
||
Min: fmt.Sprintf("%d", 0),
|
||
Max: fmt.Sprintf("%f", now),
|
||
}
|
||
analysisUuids, err := cache.RedisClient.ZRangeByScore(modelCast.AutoConfirmAnalysisQueueKey, opt).Result()
|
||
if err != nil {
|
||
zap.L().Error("获取到期数据分析任务失败", zap.Error(err))
|
||
return
|
||
}
|
||
if len(analysisUuids) == 0 {
|
||
zap.L().Debug("没有到期的数据分析任务")
|
||
return
|
||
}
|
||
zap.L().Info("发现到期数据分析任务", zap.Int("count", len(analysisUuids)))
|
||
for _, analysisUuid := range analysisUuids {
|
||
serverCast.ProcessAnalysisTask(context.Background(), analysisUuid)
|
||
}
|
||
}
|
||
|
||
// WorkPublishQueueConsumer 监听work:publish:queue队列的消费者
|
||
func WorkPublishQueueConsumer() {
|
||
zap.L().Info("开始监听work:publish:queue队列")
|
||
for {
|
||
result, err := cache.RedisClient.BRPop(0*time.Second, modelCast.WorkPublishQueueKey).Result()
|
||
if err != nil {
|
||
zap.L().Error("监听work:publish:queue队列失败", zap.Error(err))
|
||
time.Sleep(5 * time.Second) // 出错后等待5秒再重试
|
||
continue
|
||
}
|
||
|
||
if len(result) < 2 {
|
||
zap.L().Warn("队列返回数据格式异常", zap.Any("result", result))
|
||
continue
|
||
}
|
||
|
||
workData := result[1] // BRPOP返回[key, value],value在第二个元素
|
||
zap.L().Info("从work:publish:queue队列收到数据", zap.String("data", workData))
|
||
|
||
// 处理队列数据
|
||
if err = processWorkPublishQueueData(workData); err != nil {
|
||
zap.L().Error("处理work:publish:queue队列数据失败", zap.Error(err), zap.String("data", workData))
|
||
continue
|
||
}
|
||
zap.L().Info("成功处理work:publish:queue队列数据", zap.String("data", workData))
|
||
}
|
||
}
|
||
|
||
// processWorkPublishQueueData 处理从work:publish:queue队列中取出的数据
|
||
func processWorkPublishQueueData(data string) error {
|
||
// 延时1秒消费
|
||
time.Sleep(time.Second * 1)
|
||
var workData map[string]string
|
||
_ = json.Unmarshal([]byte(data), &workData)
|
||
workUuid := workData["workUuid"]
|
||
if workUuid == "" {
|
||
zap.L().Error("队列数据为空", zap.String("raw_data", data))
|
||
return errors.New("队列数据为空")
|
||
}
|
||
zap.L().Info("处理发布工作队列数据", zap.String("work_uuid", workUuid))
|
||
// 调用发布工作逻辑
|
||
err := serverCast.PublishWork(context.Background(), &cast.PublishReq{
|
||
WorkUuids: []string{workUuid},
|
||
})
|
||
if err != nil {
|
||
zap.L().Error("发布工作失败",
|
||
zap.String("work_uuid", workUuid),
|
||
zap.Error(err))
|
||
return err
|
||
}
|
||
zap.L().Info("发布工作成功", zap.String("work_uuid", workUuid))
|
||
return nil
|
||
}
|
||
|
||
// ScheduledPublishTask 定时发布任务,从Redis Sorted Set中获取所有workUuid并根据score判断处理
|
||
func ScheduledPublishTask() {
|
||
// 加上锁
|
||
lockKey := "scheduled_publish:lock"
|
||
reply := cache.RedisClient.SetNX(lockKey, time.Now().Format("2006-01-02 15:04:05"), 1*time.Hour)
|
||
if !reply.Val() {
|
||
zap.L().Warn("定时发布任务正在被其他实例处理")
|
||
return
|
||
}
|
||
defer func() {
|
||
cache.RedisClient.Del(lockKey)
|
||
}()
|
||
|
||
// 获取所有数据(不限制score范围)
|
||
opt := redis.ZRangeBy{
|
||
Min: "-inf",
|
||
Max: "+inf",
|
||
}
|
||
// 从Redis Sorted Set中获取所有workUuid及其score
|
||
workList, err := cache.RedisClient.ZRangeByScoreWithScores(modelCast.ScheduledPublishQueueKey, opt).Result()
|
||
if err != nil {
|
||
zap.L().Error("获取定时发布任务失败", zap.Error(err))
|
||
return
|
||
}
|
||
if len(workList) == 0 {
|
||
zap.L().Debug("没有定时发布任务")
|
||
return
|
||
}
|
||
|
||
publishCount := 0
|
||
expiredCount := 0
|
||
|
||
zap.L().Info("发现定时发布任务", zap.Int("total_count", len(workList)))
|
||
|
||
// 遍历所有数据,根据score判断处理
|
||
for _, item := range workList {
|
||
workUuid := item.Member.(string)
|
||
score := item.Score
|
||
now := float64(time.Now().Unix())
|
||
// 如果score小于当前时间,删除但不消费(不发布)
|
||
if score < now {
|
||
zap.L().Info("发现过期的定时发布任务,直接删除不发布",
|
||
zap.String("work_uuid", workUuid),
|
||
zap.Float64("score", score),
|
||
zap.Float64("now", now))
|
||
removed, delErr := cache.RedisClient.ZRem(modelCast.ScheduledPublishQueueKey, workUuid).Result()
|
||
if delErr != nil {
|
||
zap.L().Error("删除过期定时发布任务失败", zap.Error(delErr), zap.String("work_uuid", workUuid))
|
||
} else if removed > 0 {
|
||
expiredCount++
|
||
zap.L().Info("已删除过期定时发布任务", zap.String("work_uuid", workUuid))
|
||
}
|
||
continue
|
||
}
|
||
|
||
// score大于等于当前时间,正常发布
|
||
zap.L().Info("处理定时发布任务",
|
||
zap.String("work_uuid", workUuid),
|
||
zap.Float64("score", score))
|
||
_ = serverCast.PublishWork(context.Background(), &cast.PublishReq{
|
||
WorkUuids: []string{workUuid},
|
||
})
|
||
removed, delErr := cache.RedisClient.ZRem(modelCast.ScheduledPublishQueueKey, workUuid).Result()
|
||
if delErr != nil {
|
||
zap.L().Error("删除定时发布任务失败", zap.Error(delErr), zap.String("work_uuid", workUuid))
|
||
} else if removed > 0 {
|
||
publishCount++
|
||
zap.L().Info("已发布并删除定时发布任务", zap.String("work_uuid", workUuid))
|
||
}
|
||
}
|
||
zap.L().Info("定时发布任务处理完成",
|
||
zap.Int("published_count", publishCount),
|
||
zap.Int("expired_count", expiredCount),
|
||
zap.Int("total_count", len(workList)))
|
||
}
|