diff --git a/pkg/cron/task.go b/pkg/cron/task.go index 2d12db9..c957258 100644 --- a/pkg/cron/task.go +++ b/pkg/cron/task.go @@ -27,8 +27,8 @@ func InitTasks() error { cm := GetCronManager() err := cm.AddTask("refreshWorkApprovalStatus", "0 */1 * * * *", RefreshWorkApprovalStatusTask) err = cm.AddTask("artistAutoConfirm", "0 */1 * * * *", ArtistAutoConfirmTask) - err = cm.AddTask("refreshPublishStatus", "0 */1 * * * *", RefreshPublishStatusTask) - err = cm.AddTask("scheduledPublish", "0 */1 * * * *", PublishTask) + err = cm.AddTask("refreshPublishStatus", "0 */1 * * * *", PublishTask) + err = cm.AddTask("scheduledPublish", "0 */1 * * * *", ScheduledPublishTask) err = cm.AddTask("artistAutoConfirmAnalysis", "0 */1 * * * *", ArtistAutoConfirmAnalysisTask) err = cm.AddTask("refreshWorkAnalysisApprovalStatus", "0 */1 * * * *", RefreshWorkAnalysisApprovalStatusTask) @@ -274,8 +274,8 @@ func RefreshPublishStatusTask() { // PublishTask 定时发布任务,从Redis Sorted Set中获取所有workUuid并根据score判断处理 func PublishTask() { - go ScheduledPublishTask() // 刷新发布状态 - go TaskStatus() // 异步任务状态 + go RefreshPublishStatusTask() // 刷新发布状态 + go TaskStatus() // 异步任务状态 } func ScheduledPublishTask() { // 加上锁 @@ -307,7 +307,7 @@ func ScheduledPublishTask() { publishCount := 0 expiredCount := 0 - const batchSize = 10 // 每批发布10个 + const batchSize = 8 // 每批发布8个,与PublishWork的workerCount保持一致 zap.L().Info("发现定时发布任务", zap.Int("total_count", len(workList))) @@ -562,5 +562,5 @@ func AyrshareMetricsCollectorTask() { } func RefreshArtistOrderTask() { - service.CastProvider.Tools(context.Background(), &cast.ToolsReq{Action: "refreshArtistOrder"}) + _, _ = service.CastProvider.Tools(context.Background(), &cast.ToolsReq{Action: "artistOrderInfo"}) } diff --git a/pkg/service/cast/work.go b/pkg/service/cast/work.go index 77f4f36..a8fc794 100644 --- a/pkg/service/cast/work.go +++ b/pkg/service/cast/work.go @@ -445,7 +445,6 @@ func Publish(ctx *gin.Context) { // PublishWork 统一发布 func PublishWork(ctx context.Context, req *cast.PublishReq) error { var ( - wg sync.WaitGroup errsMu sync.Mutex errs []error ) @@ -453,32 +452,69 @@ func PublishWork(ctx context.Context, req *cast.PublishReq) error { if len(req.WorkUuids) == 0 { return errors.New("请选择作品") } - jobs := make(chan string, len(req.WorkUuids)) // 添加日志 zap.L().Info("开始发布作品", zap.Int("总数", len(req.WorkUuids))) startTime := time.Now().Unix() - workerCount := 10 - if len(req.WorkUuids) < workerCount { - workerCount = len(req.WorkUuids) // 避免创建多余的协程 - } - for i := 0; i < workerCount; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for workUuid := range jobs { - if err := PostAS(workUuid); err != nil { - errsMu.Lock() - errs = append(errs, err) - errsMu.Unlock() + + // 每批次协程数量设为8 + workerCount := 8 + batchSize := workerCount // 每批处理8个任务 + + // 分批处理 + for batchStart := 0; batchStart < len(req.WorkUuids); batchStart += batchSize { + batchEnd := batchStart + batchSize + if batchEnd > len(req.WorkUuids) { + batchEnd = len(req.WorkUuids) + } + + currentBatch := req.WorkUuids[batchStart:batchEnd] + batchNum := batchStart/batchSize + 1 + totalBatches := (len(req.WorkUuids) + batchSize - 1) / batchSize + + zap.L().Info("开始处理批次", + zap.Int("批次", batchNum), + zap.Int("总批次", totalBatches), + zap.Int("本批数量", len(currentBatch))) + + // 处理当前批次 + var wg sync.WaitGroup + jobs := make(chan string, len(currentBatch)) + + currentWorkerCount := workerCount + if len(currentBatch) < workerCount { + currentWorkerCount = len(currentBatch) // 避免创建多余的协程 + } + + for i := 0; i < currentWorkerCount; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for workUuid := range jobs { + if err := PostAS(ctx, workUuid); err != nil { + errsMu.Lock() + errs = append(errs, err) + errsMu.Unlock() + } } - } - }() + }() + } + + for _, workUuid := range currentBatch { + jobs <- workUuid + } + close(jobs) + wg.Wait() + + zap.L().Info("批次处理完成", zap.Int("批次", batchNum)) + + // 如果不是最后一批,等待10-15秒再进行下一批 + if batchEnd < len(req.WorkUuids) { + waitTime := 10 + time.Duration(time.Now().UnixNano()%6) // 随机10-15秒 + zap.L().Info("等待后再处理下一批", zap.Int64("等待秒数", int64(waitTime))) + time.Sleep(waitTime * time.Second) + } } - for _, workUuid := range req.WorkUuids { - jobs <- workUuid - } - close(jobs) - wg.Wait() + // 统计结果 successCount := len(req.WorkUuids) - len(errs) zap.L().Info("发布完成", @@ -493,9 +529,9 @@ func PublishWork(ctx context.Context, req *cast.PublishReq) error { return nil } -func PostAS(workUuid string) error { +func PostAS(ctx context.Context, workUuid string) error { var err error - _, err = service.CastProvider.Publish(context.Background(), &cast.PublishReq{WorkUuids: []string{workUuid}}) + _, err = service.CastProvider.Publish(ctx, &cast.PublishReq{WorkUuids: []string{workUuid}}) if err != nil { zap.L().Error("Publish err", zap.String("workUuid", workUuid), zap.Error(err)) return err