From b0609ebffc80928aae87cb9926d221c710dc7c0a Mon Sep 17 00:00:00 2001 From: daiyb <570956418@qq.com> Date: Tue, 16 Dec 2025 18:57:27 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9A=E6=97=B6=E5=A4=84=E7=90=86=E5=8F=91?= =?UTF-8?q?=E5=B8=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/cron/task.go | 45 ++++++++++++++++++++++++++++++++++++++++++ pkg/model/cast/work.go | 7 ++++--- 2 files changed, 49 insertions(+), 3 deletions(-) diff --git a/pkg/cron/task.go b/pkg/cron/task.go index b84e4ba..70b75af 100644 --- a/pkg/cron/task.go +++ b/pkg/cron/task.go @@ -23,6 +23,7 @@ func InitTasks() error { err := cm.AddTask("refreshWorkApprovalStatus", "0 */1 * * * *", RefreshWorkApprovalStatusTask) err = cm.AddTask("artistAutoConfirm", "0 */1 * * * *", ArtistAutoConfirmTask) err = cm.AddTask("refreshPublishStatus", "0 */1 * * * *", RefreshPublishStatusTask) + err = cm.AddTask("scheduledPublish", "0 */1 * * * *", ScheduledPublishTask) if err != nil { log.Printf("添加定时任务失败: %v", err) } @@ -90,6 +91,50 @@ func RefreshPublishStatusTask() { zap.L().Info("刷新发布状态成功") } +// ScheduledPublishTask 定时发布任务,从Redis Sorted Set中获取到期的workUuid并发布 +func ScheduledPublishTask() { + // 加上锁 + lockKey := "scheduled_publish:lock" + reply := cache.RedisClient.SetNX(lockKey, time.Now().Format("2006-01-02 15:04:05"), 1*time.Hour) + if !reply.Val() { + zap.L().Warn("定时发布任务正在被其他实例处理") + return + } + defer func() { + cache.RedisClient.Del(lockKey) + }() + + now := float64(time.Now().Unix()) + opt := redis.ZRangeBy{ + Min: fmt.Sprintf("%d", 0), + Max: fmt.Sprintf("%f", now), + } + // 从Redis Sorted Set中获取所有score小于等于当前时间的workUuid + workUuids, err := cache.RedisClient.ZRangeByScore(modelCast.ScheduledPublishQueueKey, opt).Result() + if err != nil { + zap.L().Error("获取定时发布任务失败", zap.Error(err)) + return + } + if len(workUuids) == 0 { + zap.L().Debug("没有到期的定时发布任务") + return + } + zap.L().Info("发现到期的定时发布任务", zap.Int("count", len(workUuids))) + // 发布成功后,从Redis中删除这些workUuid + for _, workUuid := range workUuids { + _ = serverCast.PublishWork(context.Background(), &cast.PublishReq{ + WorkUuids: []string{workUuid}, + }) + removed, err := cache.RedisClient.ZRem(modelCast.ScheduledPublishQueueKey, workUuid).Result() + if err != nil { + zap.L().Error("删除定时发布任务失败", zap.Error(err), zap.String("work_uuid", workUuid)) + } else if removed > 0 { + zap.L().Info("已删除定时发布任务", zap.String("work_uuid", workUuid)) + } + } + zap.L().Info("定时发布任务处理完成", zap.Int("count", len(workUuids))) +} + // WorkPublishQueueConsumer 监听work:publish:queue队列的消费者 func WorkPublishQueueConsumer() { zap.L().Info("开始监听work:publish:queue队列") diff --git a/pkg/model/cast/work.go b/pkg/model/cast/work.go index e0dd00c..2ada597 100644 --- a/pkg/model/cast/work.go +++ b/pkg/model/cast/work.go @@ -11,9 +11,10 @@ const ( ) const ( - AutoConfirmQueueKey = "auto_confirm:queue" - AutoConfirmLockKey = "auto_confirm:lock:%s" - WorkPublishQueueKey = "work:publish:queue" + AutoConfirmQueueKey = "auto_confirm:queue" + AutoConfirmLockKey = "auto_confirm:lock:%s" + WorkPublishQueueKey = "work:publish:queue" + ScheduledPublishQueueKey = "scheduled:publish:queue" // 定时发布队列 ) var WorkCategoryMM = map[int]string{