修改代码防止重复发送

This commit is contained in:
戴育兵 2026-01-16 23:29:21 +08:00
parent 050a82d732
commit de4020fbbe

View File

@ -28,7 +28,7 @@ func InitTasks() error {
err := cm.AddTask("refreshWorkApprovalStatus", "0 */1 * * * *", RefreshWorkApprovalStatusTask) err := cm.AddTask("refreshWorkApprovalStatus", "0 */1 * * * *", RefreshWorkApprovalStatusTask)
err = cm.AddTask("artistAutoConfirm", "0 */1 * * * *", ArtistAutoConfirmTask) err = cm.AddTask("artistAutoConfirm", "0 */1 * * * *", ArtistAutoConfirmTask)
err = cm.AddTask("refreshPublishStatus", "0 */1 * * * *", PublishTask) err = cm.AddTask("refreshPublishStatus", "0 */1 * * * *", PublishTask)
err = cm.AddTask("scheduledPublish", "0 */1 * * * *", ScheduledPublishTask) err = cm.AddTask("scheduledPublish", "0 */10 * * * *", ScheduledPublishTask)
err = cm.AddTask("artistAutoConfirmAnalysis", "0 */1 * * * *", ArtistAutoConfirmAnalysisTask) err = cm.AddTask("artistAutoConfirmAnalysis", "0 */1 * * * *", ArtistAutoConfirmAnalysisTask)
err = cm.AddTask("refreshWorkAnalysisApprovalStatus", "0 */1 * * * *", RefreshWorkAnalysisApprovalStatusTask) err = cm.AddTask("refreshWorkAnalysisApprovalStatus", "0 */1 * * * *", RefreshWorkAnalysisApprovalStatusTask)
@ -280,7 +280,7 @@ func PublishTask() {
func ScheduledPublishTask() { func ScheduledPublishTask() {
// 加上锁 // 加上锁
lockKey := "scheduled_publish:lock" lockKey := "scheduled_publish:lock"
reply := cache.RedisClient.SetNX(lockKey, time.Now().Format("2006-01-02 15:04:05"), 1*time.Hour) reply := cache.RedisClient.SetNX(lockKey, time.Now().Format("2006-01-02 15:04:05"), 2*time.Hour)
if !reply.Val() { if !reply.Val() {
zap.L().Warn("定时发布任务正在被其他实例处理") zap.L().Warn("定时发布任务正在被其他实例处理")
return return
@ -310,9 +310,12 @@ func ScheduledPublishTask() {
const batchSize = 8 // 每批发布8个与PublishWork的workerCount保持一致 const batchSize = 8 // 每批发布8个与PublishWork的workerCount保持一致
zap.L().Info("发现定时发布任务", zap.Int("total_count", len(workList))) zap.L().Info("发现定时发布任务", zap.Int("total_count", len(workList)))
zap.L().Info("发现定时发布任务 列表", zap.Any("workList", workList))
now := float64(time.Now().Unix()) now := float64(time.Now().Unix())
publishBatch := make([]string, 0, batchSize) publishBatch := make([]string, 0, batchSize)
// 使用 map 进行去重,防止同一个 work_uuid 被多次添加
publishedUuids := make(map[string]bool)
// 遍历所有数据根据score判断处理 // 遍历所有数据根据score判断处理
for _, item := range workList { for _, item := range workList {
@ -335,11 +338,32 @@ func ScheduledPublishTask() {
continue continue
} }
// 先从 Redis 中删除,采用"先删后处理"的策略,防止重复处理
removed, delErr := cache.RedisClient.ZRem(modelCast.ScheduledPublishQueueKey, workUuid).Result()
if delErr != nil {
zap.L().Error("删除定时发布任务失败", zap.Error(delErr), zap.String("work_uuid", workUuid))
continue // 删除失败则跳过,避免重复处理
}
if removed == 0 {
zap.L().Warn("定时发布任务已被删除,跳过",
zap.String("work_uuid", workUuid))
continue // 已被其他实例删除,跳过
}
// 检查是否已经在当前批次中(去重)
if publishedUuids[workUuid] {
zap.L().Warn("发现重复的定时发布任务,跳过",
zap.String("work_uuid", workUuid))
continue
}
// score大于等于当前时间添加到待发布批次 // score大于等于当前时间添加到待发布批次
zap.L().Info("添加到发布批次", zap.L().Info("添加到发布批次",
zap.String("work_uuid", workUuid), zap.String("work_uuid", workUuid),
zap.Float64("score", score)) zap.Float64("score", score))
publishBatch = append(publishBatch, workUuid) publishBatch = append(publishBatch, workUuid)
publishedUuids[workUuid] = true
publishCount++
// 当批次达到指定大小时,批量发布 // 当批次达到指定大小时,批量发布
if len(publishBatch) >= batchSize { if len(publishBatch) >= batchSize {
@ -347,15 +371,6 @@ func ScheduledPublishTask() {
_ = serverCast.PublishWork(context.Background(), &cast.PublishReq{ _ = serverCast.PublishWork(context.Background(), &cast.PublishReq{
WorkUuids: publishBatch, WorkUuids: publishBatch,
}) })
// 批量删除已发布的任务
for _, uuid := range publishBatch {
removed, delErr := cache.RedisClient.ZRem(modelCast.ScheduledPublishQueueKey, uuid).Result()
if delErr != nil {
zap.L().Error("删除定时发布任务失败", zap.Error(delErr), zap.String("work_uuid", uuid))
} else if removed > 0 {
publishCount++
}
}
zap.L().Info("批次发布完成", zap.Int("published", len(publishBatch))) zap.L().Info("批次发布完成", zap.Int("published", len(publishBatch)))
// 清空批次,准备下一批 // 清空批次,准备下一批
publishBatch = make([]string, 0, batchSize) publishBatch = make([]string, 0, batchSize)
@ -368,15 +383,6 @@ func ScheduledPublishTask() {
_ = serverCast.PublishWork(context.Background(), &cast.PublishReq{ _ = serverCast.PublishWork(context.Background(), &cast.PublishReq{
WorkUuids: publishBatch, WorkUuids: publishBatch,
}) })
// 批量删除已发布的任务
for _, uuid := range publishBatch {
removed, delErr := cache.RedisClient.ZRem(modelCast.ScheduledPublishQueueKey, uuid).Result()
if delErr != nil {
zap.L().Error("删除定时发布任务失败", zap.Error(delErr), zap.String("work_uuid", uuid))
} else if removed > 0 {
publishCount++
}
}
zap.L().Info("剩余批次发布完成", zap.Int("published", len(publishBatch))) zap.L().Info("剩余批次发布完成", zap.Int("published", len(publishBatch)))
} }