package cron import ( "context" "encoding/json" "errors" "fmt" "fonchain-fiee/api/aryshare" "fonchain-fiee/api/bundle" "fonchain-fiee/api/cast" "fonchain-fiee/pkg/cache" bundleModel "fonchain-fiee/pkg/model/bundle" modelCast "fonchain-fiee/pkg/model/cast" "fonchain-fiee/pkg/service" "fonchain-fiee/pkg/service/bundle/common" serverCast "fonchain-fiee/pkg/service/cast" "log" "math/rand" "strconv" "strings" "time" "github.com/go-redis/redis" "go.uber.org/zap" ) // InitTasks 初始化定时任务 func InitTasks() error { cm := GetCronManager() err := cm.AddTask("refreshWorkApprovalStatus", "0 */1 * * * *", RefreshWorkApprovalStatusTask) err = cm.AddTask("artistAutoConfirm", "0 */1 * * * *", ArtistAutoConfirmTask) err = cm.AddTask("refreshPublishStatus", "0 */1 * * * *", RefreshPublishStatusTask) err = cm.AddTask("artistAutoConfirmAnalysis", "0 */1 * * * *", ArtistAutoConfirmAnalysisTask) err = cm.AddTask("refreshWorkAnalysisApprovalStatus", "0 */1 * * * *", RefreshWorkAnalysisApprovalStatusTask) err = cm.AddTask("scheduledPublish", "0 */1 * * * *", ScheduledPublishTask) err = cm.AddTask("refreshArtistOrder", "0 */30 * * * *", RefreshArtistOrderTask) // 每天 00:30 和 12:30 执行 Ayrshare 指标采集任务 // err = cm.AddTask("ayrshareMetricsCollector", "0 30 0,12 * * *", AyrshareMetricsCollectorTask) err = cm.AddTask("ayrshareMetricsCollector", "0 30 0 * * *", AyrshareMetricsCollectorTask) if err != nil { log.Printf("添加定时任务失败: %v", err) } // 每2分钟执行一次标签观看次数更新任务 err = cm.AddTask("updateCastTagWatchCount", "0 */2 * * * *", UpdateCastTagWatchCountTask) cm.Start() // 启动队列消费者 //go WorkPublishQueueConsumer() // 启动随机间隔的自动确认任务 go AutoManuallyConfirmWorkTaskWithRandomInterval() return nil } func RefreshWorkApprovalStatusTask() { resp, err := service.CastProvider.WorkList(context.Background(), &cast.WorkListReq{ Page: 1, WorkStatus: uint32(cast.WorkActionENUM_APPROVAL), PageSize: 9999, }) if err != nil { log.Printf("获取工作列表失败: %v", err) return } if resp.Data == nil || len(resp.Data) == 0 { return } serverCast.RefreshWorkApproval(nil, resp.Data) } // AutoManuallyConfirmWorkTaskWithRandomInterval 以随机间隔(3-7分钟)执行自动确认任务 func AutoManuallyConfirmWorkTaskWithRandomInterval() { for { // 添加 panic 恢复机制 func() { defer func() { if r := recover(); r != nil { zap.L().Error("自动确认任务执行panic", zap.Any("panic", r), zap.Stack("stack")) } }() // 执行任务 zap.L().Debug("开始执行自动确认任务") AutoManuallyConfirmWorkTask() zap.L().Debug("自动确认任务执行完成") }() // 生成3-7分钟之间的随机间隔(单位:分钟) randomMinutes := rand.Intn(5) + 3 // 3-7分钟 randomDuration := time.Duration(randomMinutes) * time.Minute log.Printf("自动确认任务等待下次执行,间隔:%d分钟,时长:%s", randomMinutes, randomDuration) zap.L().Info("自动确认任务等待下次执行", zap.Int("minutes", randomMinutes), zap.Duration("duration", randomDuration)) // 等待随机时间 time.Sleep(randomDuration) } } func AutoManuallyConfirmWorkTask() { var req bundle.GetWaitConfirmWorkListReq res, err := service.BundleProvider.GetWaitConfirmWorkList(context.Background(), &req) if err != nil { zap.L().Error("获取待确认作品列表失败", zap.Error(err)) return } if res.Data == nil || len(res.Data) == 0 { return } for _, work := range res.Data { var req bundleModel.UserWorkConfirmReq req.WorkUuid = work.WorkUuid req.ConfirmStatus = 1 artistId, err := strconv.ParseInt(work.ArtistUuid, 10, 64) if err != nil { zap.L().Error("解析艺术家ID失败", zap.Error(err)) continue } if req.ConfirmStatus == 2 { // 驳回完直接结束 _, err := service.CastProvider.UpdateStatus(context.Background(), &cast.UpdateStatusReq{ WorkAction: cast.WorkActionENUM_CONFIRM, WorkUuid: req.WorkUuid, ConfirmRemark: req.ConfirmRemark, ConfirmStatus: 2, }) if err != nil { zap.L().Error("确认作品失败", zap.Error(err)) return } continue } balanceInfoRes, err := service.BundleProvider.GetBundleBalanceByUserId(context.Background(), &bundle.GetBundleBalanceByUserIdReq{ UserId: int32(artistId), }) if err != nil { continue } if balanceInfoRes.BundleStatus == common.BundleExpired { _, err = service.CastProvider.UpdateStatus(context.Background(), &cast.UpdateStatusReq{ WorkAction: cast.WorkActionENUM_CONFIRM, WorkUuid: req.WorkUuid, ConfirmRemark: req.ConfirmRemark, ConfirmStatus: 3, }) log.Printf("套餐已过期,作品uuid:"+req.WorkUuid, zap.Error(err)) continue } wordInfoRes, err := service.CastProvider.WorkDetail(context.Background(), &cast.WorkDetailReq{ WorkUuid: req.WorkUuid, }) if err != nil { continue } if wordInfoRes.WorkStatus != 4 { continue } var workCategory = wordInfoRes.WorkCategory var addBalanceReq bundle.AddBundleBalanceReq addBalanceReq.UserId = int32(artistId) log.Printf("开始确认作品uuid:" + req.WorkUuid) switch workCategory { case 1: { if balanceInfoRes.ImageExtendConsumptionNumber >= balanceInfoRes.ImageExtendNumber { // 图文余量不足 _, err = service.CastProvider.UpdateStatus(context.Background(), &cast.UpdateStatusReq{ WorkAction: cast.WorkActionENUM_CONFIRM, WorkUuid: req.WorkUuid, ConfirmRemark: req.ConfirmRemark, ConfirmStatus: 3, }) log.Printf("图文余量不足,作品uuid:"+req.WorkUuid, zap.Error(err)) continue } addBalanceReq.ImageConsumptionNumber = 1 } case 2: { if balanceInfoRes.VideoExtendConsumptionNumber >= balanceInfoRes.VideoExtendNumber { // 视频余量不足 _, err = service.CastProvider.UpdateStatus(context.Background(), &cast.UpdateStatusReq{ WorkAction: cast.WorkActionENUM_CONFIRM, WorkUuid: req.WorkUuid, ConfirmRemark: req.ConfirmRemark, ConfirmStatus: 3, }) log.Printf("视频余量不足,作品uuid:"+req.WorkUuid, zap.Error(err)) continue } addBalanceReq.VideoConsumptionNumber = 1 } default: continue } resp, err := service.BundleProvider.AddBundleBalance(context.Background(), &addBalanceReq) if err != nil { log.Printf("扣除余额失败,作品uuid:"+req.WorkUuid, zap.Error(err)) continue } log.Printf("扣除余额成功,作品uuid:" + req.WorkUuid) _, err = service.CastProvider.UpdateStatus(context.Background(), &cast.UpdateStatusReq{ WorkAction: cast.WorkActionENUM_CONFIRM, WorkUuid: req.WorkUuid, ConfirmRemark: req.ConfirmRemark, CostType: resp.UsedType, ConfirmStatus: 1, }) if err != nil { log.Printf("更新作品状态失败,作品uuid:"+req.WorkUuid, zap.Error(err)) continue } } } func RefreshWorkAnalysisApprovalStatusTask() { resp, err := service.CastProvider.ListWorkAnalysis(context.Background(), &cast.ListWorkAnalysisReq{ Page: 1, StatusList: []uint32{2}, // 状态为2表示待审批 PageSize: 999999, }) if err != nil { log.Printf("获取数据分析列表失败: %v", err) return } if resp.Data == nil || len(resp.Data) == 0 { return } serverCast.RefreshWorkAnalysisApproval(nil, resp.Data) } func ArtistAutoConfirmTask() { now := float64(time.Now().Unix()) opt := redis.ZRangeBy{ Min: fmt.Sprintf("%d", 0), Max: fmt.Sprintf("%f", now), } workUuids, err := cache.RedisClient.ZRangeByScore(modelCast.AutoConfirmQueueKey, opt).Result() if err != nil { zap.L().Error("获取到期任务失败", zap.Error(err)) return } if len(workUuids) == 0 { zap.L().Info("没有到期的任务") return } zap.L().Info("发现到期任务", zap.Int("count", len(workUuids))) for _, workUuid := range workUuids { serverCast.ProcessTask(context.Background(), workUuid) } } func RefreshPublishStatusTask() { // 加上锁 万一上一批没有同步完 lockKey := "refresh_publish_status:lock" reply := cache.RedisClient.SetNX(lockKey, "1", 5*time.Minute) if !reply.Val() { zap.L().Warn("任务正在被其他实例处理") return } defer func() { cache.RedisClient.Del(lockKey) }() err := serverCast.RefreshPublish() if err != nil { zap.L().Error("刷新发布状态失败", zap.Error(err)) return } //zap.L().Info("刷新发布状态成功") } func ArtistAutoConfirmAnalysisTask() { now := float64(time.Now().Unix()) opt := redis.ZRangeBy{ Min: fmt.Sprintf("%d", 0), Max: fmt.Sprintf("%f", now), } analysisUuids, err := cache.RedisClient.ZRangeByScore(modelCast.AutoConfirmAnalysisQueueKey, opt).Result() if err != nil { zap.L().Error("获取到期数据分析任务失败", zap.Error(err)) return } if len(analysisUuids) == 0 { zap.L().Debug("没有到期的数据分析任务") return } zap.L().Info("发现到期数据分析任务", zap.Int("count", len(analysisUuids))) for _, analysisUuid := range analysisUuids { serverCast.ProcessAnalysisTask(context.Background(), analysisUuid) } } // ScheduledPublishTask 定时发布任务,从Redis Sorted Set中获取所有workUuid并根据score判断处理 func ScheduledPublishTask() { // 加上锁 lockKey := "scheduled_publish:lock" reply := cache.RedisClient.SetNX(lockKey, time.Now().Format("2006-01-02 15:04:05"), 1*time.Hour) if !reply.Val() { zap.L().Warn("定时发布任务正在被其他实例处理") return } defer func() { cache.RedisClient.Del(lockKey) }() // 获取所有数据(不限制score范围) opt := redis.ZRangeBy{ Min: "-inf", Max: "+inf", } // 从Redis Sorted Set中获取所有workUuid及其score workList, err := cache.RedisClient.ZRangeByScoreWithScores(modelCast.ScheduledPublishQueueKey, opt).Result() if err != nil { zap.L().Error("获取定时发布任务失败", zap.Error(err)) return } if len(workList) == 0 { zap.L().Info("没有定时发布任务") return } publishCount := 0 expiredCount := 0 const batchSize = 10 // 每批发布10个 zap.L().Info("发现定时发布任务", zap.Int("total_count", len(workList))) now := float64(time.Now().Unix()) publishBatch := make([]string, 0, batchSize) // 遍历所有数据,根据score判断处理 for _, item := range workList { workUuid := item.Member.(string) score := item.Score // 如果score小于当前时间,删除但不消费(不发布) if score < now { zap.L().Info("发现过期的定时发布任务,直接删除不发布", zap.String("work_uuid", workUuid), zap.Float64("score", score), zap.Float64("now", now)) removed, delErr := cache.RedisClient.ZRem(modelCast.ScheduledPublishQueueKey, workUuid).Result() if delErr != nil { zap.L().Error("删除过期定时发布任务失败", zap.Error(delErr), zap.String("work_uuid", workUuid)) } else if removed > 0 { expiredCount++ zap.L().Info("已删除过期定时发布任务", zap.String("work_uuid", workUuid)) } continue } // score大于等于当前时间,添加到待发布批次 zap.L().Info("添加到发布批次", zap.String("work_uuid", workUuid), zap.Float64("score", score)) publishBatch = append(publishBatch, workUuid) // 当批次达到指定大小时,批量发布 if len(publishBatch) >= batchSize { zap.L().Info("批量发布作品", zap.Int("batch_size", len(publishBatch)), zap.Strings("work_uuids", publishBatch)) _ = serverCast.PublishWork(context.Background(), &cast.PublishReq{ WorkUuids: publishBatch, }) // 批量删除已发布的任务 for _, uuid := range publishBatch { removed, delErr := cache.RedisClient.ZRem(modelCast.ScheduledPublishQueueKey, uuid).Result() if delErr != nil { zap.L().Error("删除定时发布任务失败", zap.Error(delErr), zap.String("work_uuid", uuid)) } else if removed > 0 { publishCount++ } } zap.L().Info("批次发布完成", zap.Int("published", len(publishBatch))) // 清空批次,准备下一批 publishBatch = make([]string, 0, batchSize) } } // 处理剩余的作品(不足一批的) if len(publishBatch) > 0 { zap.L().Info("批量发布剩余作品", zap.Int("batch_size", len(publishBatch)), zap.Strings("work_uuids", publishBatch)) _ = serverCast.PublishWork(context.Background(), &cast.PublishReq{ WorkUuids: publishBatch, }) // 批量删除已发布的任务 for _, uuid := range publishBatch { removed, delErr := cache.RedisClient.ZRem(modelCast.ScheduledPublishQueueKey, uuid).Result() if delErr != nil { zap.L().Error("删除定时发布任务失败", zap.Error(delErr), zap.String("work_uuid", uuid)) } else if removed > 0 { publishCount++ } } zap.L().Info("剩余批次发布完成", zap.Int("published", len(publishBatch))) } zap.L().Info("定时发布任务处理完成", zap.Int("published_count", publishCount), zap.Int("expired_count", expiredCount), zap.Int("total_count", len(workList))) } // WorkPublishQueueConsumer 监听work:publish:queue队列的消费者 func WorkPublishQueueConsumer() { zap.L().Info("开始监听work:publish:queue队列(批量处理模式)") const batchSize = 10 for { batch := make([]string, 0, batchSize) result, err := cache.RedisClient.BRPop(0*time.Second, modelCast.WorkPublishQueueKey).Result() if err != nil { zap.L().Error("监听work:publish:queue队列失败", zap.Error(err)) time.Sleep(5 * time.Second) // 出错后等待5秒再重试 continue } if len(result) < 2 { zap.L().Warn("队列返回数据格式异常", zap.Any("result", result)) continue } workData := result[1] // BRPOP返回[key, value],value在第二个元素 zap.L().Info("从work:publish:queue队列收到第一条数据", zap.String("data", workData)) batch = append(batch, workData) for len(batch) < batchSize { // 使用RPOP非阻塞获取,如果没有数据立即返回 data, err := cache.RedisClient.RPop(modelCast.WorkPublishQueueKey).Result() if err == redis.Nil { // 队列为空,跳出循环 zap.L().Info("队列暂时为空,开始处理当前批次", zap.Int("batch_size", len(batch))) break } else if err != nil { zap.L().Error("从队列获取数据失败", zap.Error(err)) break } batch = append(batch, data) } zap.L().Info("收集到批次数据,准备批量处理", zap.Int("batch_size", len(batch))) // 批量处理数据 if err = processBatchWorkPublishQueueData(batch); err != nil { zap.L().Error("批量处理work:publish:queue队列数据失败", zap.Error(err), zap.Int("batch_size", len(batch))) continue } zap.L().Info("成功批量处理work:publish:queue队列数据", zap.Int("batch_size", len(batch))) } } // processBatchWorkPublishQueueData 批量处理从work:publish:queue队列中取出的数据 func processBatchWorkPublishQueueData(batchData []string) error { // 延时1秒消费 time.Sleep(time.Second * 1) workUuids := make([]string, 0, len(batchData)) // 解析所有数据,提取workUuid for _, data := range batchData { var workData map[string]string if err := json.Unmarshal([]byte(data), &workData); err != nil { zap.L().Error("解析队列数据失败", zap.String("raw_data", data), zap.Error(err)) continue } workUuid := workData["workUuid"] if workUuid == "" { zap.L().Error("队列数据中workUuid为空", zap.String("raw_data", data)) continue } workUuids = append(workUuids, workUuid) } if len(workUuids) == 0 { zap.L().Warn("批次中没有有效的workUuid") return errors.New("批次中没有有效的workUuid") } zap.L().Info("批量处理发布工作队列数据", zap.Strings("work_uuids", workUuids), zap.Int("count", len(workUuids))) // 批量调用发布工作逻辑 err := serverCast.PublishWork(context.Background(), &cast.PublishReq{ WorkUuids: workUuids, }) if err != nil { zap.L().Error("批量发布工作失败", zap.Strings("work_uuids", workUuids), zap.Error(err)) return err } zap.L().Info("批量发布工作成功", zap.Strings("work_uuids", workUuids), zap.Int("count", len(workUuids))) return nil } // AyrshareMetricsCollectorTask Ayrshare 指标采集定时任务(每天 00:30 和 12:30 执行) func AyrshareMetricsCollectorTask() { serverCast.ExecuteAyrshareMetricsCollector() } func RefreshArtistOrderTask() { service.CastProvider.Tools(context.Background(), &cast.ToolsReq{Action: "refreshArtistOrder"}) } // UpdateCastTagWatchCountTask 更新标签观看次数的定时任务(每5分钟执行一次) func UpdateCastTagWatchCountTask() { ctx := context.Background() // 计算两天前的00:00:00 now := time.Now() twoDaysAgo := now.AddDate(0, 0, -2) createdAtStart := time.Date(twoDaysAgo.Year(), twoDaysAgo.Month(), twoDaysAgo.Day(), 0, 0, 0, 0, twoDaysAgo.Location()) createdAtEnd := now // 格式化时间字符串:2026-01-01 00:00:00 createdAtStartStr := createdAtStart.Format("2006-01-02 15:04:05") createdAtEndStr := createdAtEnd.Format("2006-01-02 15:04:05") // 调用 ListCastTags 接口,筛选 IsWatchCountCalled = 2 的数据 listReq := &cast.ListCastTagsReq{ CreatedAtStart: createdAtStartStr, CreatedAtEnd: createdAtEndStr, IsWatchCountCalled: 2, // 2表示未调用 Page: 1, PageSize: 20, } listResp, err := service.CastProvider.ListCastTags(ctx, listReq) if err != nil { zap.L().Error("获取标签列表失败", zap.Error(err)) return } if listResp.Data == nil || len(listResp.Data) == 0 { return } zap.L().Info("获取到需要更新的标签", zap.Int("count", len(listResp.Data))) // 获取有效的 profileKey profileKey, err := serverCast.GetValidProfileKey(ctx, []uint32{1}) if err != nil { zap.L().Error("获取有效profileKey失败", zap.Error(err)) return } // 准备批量更新的数据 updateData := make([]*cast.CastTagInfo, 0, len(listResp.Data)) // 遍历每个标签,调用 RecommendHashtags 接口 for _, tag := range listResp.Data { if tag.HashTag == "" { zap.L().Warn("标签HashTag为空,跳过", zap.String("uuid", tag.Uuid)) // 即使HashTag为空,也要更新IsWatchCountCalled为1 updateData = append(updateData, &cast.CastTagInfo{ Uuid: tag.Uuid, WatchCount: 1, IsWatchCountCalled: 1, }) continue } // 调用 RecommendHashtags 接口 recommendReq := &aryshare.RecommendHashtagsRequest{ Keyword: tag.HashTag, ProfileKey: profileKey, } recommendResp, err := service.AyrshareProvider.RecommendHashtags(ctx, recommendReq) if err != nil { zap.L().Error("调用RecommendHashtags接口失败", zap.String("hashTag", tag.HashTag), zap.String("uuid", tag.Uuid), zap.Error(err)) // 调用失败时,将WatchCount更新为1,IsWatchCountCalled更新为1 updateData = append(updateData, &cast.CastTagInfo{ Uuid: tag.Uuid, WatchCount: 1, IsWatchCountCalled: 1, }) continue } // 对比返回结果,查找完全一致的标签 var matchedViewCount int64 = 0 if recommendResp.Recommendations != nil { for _, recommendation := range recommendResp.Recommendations { // 完全一致匹配(不区分大小写) if strings.EqualFold(recommendation.Name, tag.HashTag) { matchedViewCount = recommendation.ViewCount break } } } // 根据匹配结果更新WatchCount var watchCount int32 = 1 if matchedViewCount > 0 { watchCount = int32(matchedViewCount) } // 添加到更新列表 updateData = append(updateData, &cast.CastTagInfo{ Uuid: tag.Uuid, WatchCount: watchCount, IsWatchCountCalled: 1, }) zap.L().Debug("处理标签完成", zap.String("hashTag", tag.HashTag), zap.String("uuid", tag.Uuid), zap.Int64("matchedViewCount", matchedViewCount), zap.Int32("watchCount", watchCount)) } // 如果没有需要更新的数据,直接返回 if len(updateData) == 0 { return } // 批量更新标签 batchUpdateReq := &cast.BatchUpdateCastTagsReq{ Data: updateData, } _, err = service.CastProvider.BatchUpdateCastTags(ctx, batchUpdateReq) if err != nil { zap.L().Error("批量更新标签失败", zap.Error(err), zap.Int("count", len(updateData))) return } }