添加定时消费队列
This commit is contained in:
commit
399aa08dd0
@ -26,6 +26,7 @@ func InitTasks() error {
|
|||||||
|
|
||||||
err = cm.AddTask("artistAutoConfirmAnalysis", "0 */1 * * * *", ArtistAutoConfirmAnalysisTask)
|
err = cm.AddTask("artistAutoConfirmAnalysis", "0 */1 * * * *", ArtistAutoConfirmAnalysisTask)
|
||||||
err = cm.AddTask("refreshWorkAnalysisApprovalStatus", "0 */1 * * * *", RefreshWorkAnalysisApprovalStatusTask)
|
err = cm.AddTask("refreshWorkAnalysisApprovalStatus", "0 */1 * * * *", RefreshWorkAnalysisApprovalStatusTask)
|
||||||
|
err = cm.AddTask("scheduledPublish", "0 */1 * * * *", ScheduledPublishTask)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("添加定时任务失败: %v", err)
|
log.Printf("添加定时任务失败: %v", err)
|
||||||
}
|
}
|
||||||
@ -183,3 +184,47 @@ func processWorkPublishQueueData(data string) error {
|
|||||||
zap.L().Info("发布工作成功", zap.String("work_uuid", workUuid))
|
zap.L().Info("发布工作成功", zap.String("work_uuid", workUuid))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ScheduledPublishTask 定时发布任务,从Redis Sorted Set中获取到期的workUuid并发布
|
||||||
|
func ScheduledPublishTask() {
|
||||||
|
// 加上锁
|
||||||
|
lockKey := "scheduled_publish:lock"
|
||||||
|
reply := cache.RedisClient.SetNX(lockKey, time.Now().Format("2006-01-02 15:04:05"), 1*time.Hour)
|
||||||
|
if !reply.Val() {
|
||||||
|
zap.L().Warn("定时发布任务正在被其他实例处理")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
cache.RedisClient.Del(lockKey)
|
||||||
|
}()
|
||||||
|
|
||||||
|
now := float64(time.Now().Unix())
|
||||||
|
opt := redis.ZRangeBy{
|
||||||
|
Min: fmt.Sprintf("%d", 0),
|
||||||
|
Max: fmt.Sprintf("%f", now),
|
||||||
|
}
|
||||||
|
// 从Redis Sorted Set中获取所有score小于等于当前时间的workUuid
|
||||||
|
workUuids, err := cache.RedisClient.ZRangeByScore(modelCast.ScheduledPublishQueueKey, opt).Result()
|
||||||
|
if err != nil {
|
||||||
|
zap.L().Error("获取定时发布任务失败", zap.Error(err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(workUuids) == 0 {
|
||||||
|
zap.L().Debug("没有到期的定时发布任务")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
zap.L().Info("发现到期的定时发布任务", zap.Int("count", len(workUuids)))
|
||||||
|
// 发布成功后,从Redis中删除这些workUuid
|
||||||
|
for _, workUuid := range workUuids {
|
||||||
|
_ = serverCast.PublishWork(context.Background(), &cast.PublishReq{
|
||||||
|
WorkUuids: []string{workUuid},
|
||||||
|
})
|
||||||
|
removed, err := cache.RedisClient.ZRem(modelCast.ScheduledPublishQueueKey, workUuid).Result()
|
||||||
|
if err != nil {
|
||||||
|
zap.L().Error("删除定时发布任务失败", zap.Error(err), zap.String("work_uuid", workUuid))
|
||||||
|
} else if removed > 0 {
|
||||||
|
zap.L().Info("已删除定时发布任务", zap.String("work_uuid", workUuid))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
zap.L().Info("定时发布任务处理完成", zap.Int("count", len(workUuids)))
|
||||||
|
}
|
||||||
|
|||||||
@ -16,6 +16,7 @@ const (
|
|||||||
AutoConfirmQueueKey = "auto_confirm:queue"
|
AutoConfirmQueueKey = "auto_confirm:queue"
|
||||||
AutoConfirmLockKey = "auto_confirm:lock:%s"
|
AutoConfirmLockKey = "auto_confirm:lock:%s"
|
||||||
WorkPublishQueueKey = "work:publish:queue"
|
WorkPublishQueueKey = "work:publish:queue"
|
||||||
|
ScheduledPublishQueueKey = "scheduled:publish:queue" // 定时发布队列
|
||||||
)
|
)
|
||||||
|
|
||||||
var WorkCategoryMM = map[int]string{
|
var WorkCategoryMM = map[int]string{
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user