完成批量导入记录

This commit is contained in:
戴育兵 2026-01-15 15:42:04 +08:00
parent 81325137a9
commit 3c57d024d0
2 changed files with 66 additions and 30 deletions

View File

@ -27,8 +27,8 @@ func InitTasks() error {
cm := GetCronManager() cm := GetCronManager()
err := cm.AddTask("refreshWorkApprovalStatus", "0 */1 * * * *", RefreshWorkApprovalStatusTask) err := cm.AddTask("refreshWorkApprovalStatus", "0 */1 * * * *", RefreshWorkApprovalStatusTask)
err = cm.AddTask("artistAutoConfirm", "0 */1 * * * *", ArtistAutoConfirmTask) err = cm.AddTask("artistAutoConfirm", "0 */1 * * * *", ArtistAutoConfirmTask)
err = cm.AddTask("refreshPublishStatus", "0 */1 * * * *", RefreshPublishStatusTask) err = cm.AddTask("refreshPublishStatus", "0 */1 * * * *", PublishTask)
err = cm.AddTask("scheduledPublish", "0 */1 * * * *", PublishTask) err = cm.AddTask("scheduledPublish", "0 */1 * * * *", ScheduledPublishTask)
err = cm.AddTask("artistAutoConfirmAnalysis", "0 */1 * * * *", ArtistAutoConfirmAnalysisTask) err = cm.AddTask("artistAutoConfirmAnalysis", "0 */1 * * * *", ArtistAutoConfirmAnalysisTask)
err = cm.AddTask("refreshWorkAnalysisApprovalStatus", "0 */1 * * * *", RefreshWorkAnalysisApprovalStatusTask) err = cm.AddTask("refreshWorkAnalysisApprovalStatus", "0 */1 * * * *", RefreshWorkAnalysisApprovalStatusTask)
@ -274,8 +274,8 @@ func RefreshPublishStatusTask() {
// PublishTask 定时发布任务从Redis Sorted Set中获取所有workUuid并根据score判断处理 // PublishTask 定时发布任务从Redis Sorted Set中获取所有workUuid并根据score判断处理
func PublishTask() { func PublishTask() {
go ScheduledPublishTask() // 刷新发布状态 go RefreshPublishStatusTask() // 刷新发布状态
go TaskStatus() // 异步任务状态 go TaskStatus() // 异步任务状态
} }
func ScheduledPublishTask() { func ScheduledPublishTask() {
// 加上锁 // 加上锁
@ -307,7 +307,7 @@ func ScheduledPublishTask() {
publishCount := 0 publishCount := 0
expiredCount := 0 expiredCount := 0
const batchSize = 10 // 每批发布10个 const batchSize = 8 // 每批发布8个与PublishWork的workerCount保持一致
zap.L().Info("发现定时发布任务", zap.Int("total_count", len(workList))) zap.L().Info("发现定时发布任务", zap.Int("total_count", len(workList)))
@ -562,5 +562,5 @@ func AyrshareMetricsCollectorTask() {
} }
func RefreshArtistOrderTask() { func RefreshArtistOrderTask() {
service.CastProvider.Tools(context.Background(), &cast.ToolsReq{Action: "refreshArtistOrder"}) _, _ = service.CastProvider.Tools(context.Background(), &cast.ToolsReq{Action: "artistOrderInfo"})
} }

View File

@ -445,7 +445,6 @@ func Publish(ctx *gin.Context) {
// PublishWork 统一发布 // PublishWork 统一发布
func PublishWork(ctx context.Context, req *cast.PublishReq) error { func PublishWork(ctx context.Context, req *cast.PublishReq) error {
var ( var (
wg sync.WaitGroup
errsMu sync.Mutex errsMu sync.Mutex
errs []error errs []error
) )
@ -453,32 +452,69 @@ func PublishWork(ctx context.Context, req *cast.PublishReq) error {
if len(req.WorkUuids) == 0 { if len(req.WorkUuids) == 0 {
return errors.New("请选择作品") return errors.New("请选择作品")
} }
jobs := make(chan string, len(req.WorkUuids))
// 添加日志 // 添加日志
zap.L().Info("开始发布作品", zap.Int("总数", len(req.WorkUuids))) zap.L().Info("开始发布作品", zap.Int("总数", len(req.WorkUuids)))
startTime := time.Now().Unix() startTime := time.Now().Unix()
workerCount := 10
if len(req.WorkUuids) < workerCount { // 每批次协程数量设为8
workerCount = len(req.WorkUuids) // 避免创建多余的协程 workerCount := 8
} batchSize := workerCount // 每批处理8个任务
for i := 0; i < workerCount; i++ {
wg.Add(1) // 分批处理
go func() { for batchStart := 0; batchStart < len(req.WorkUuids); batchStart += batchSize {
defer wg.Done() batchEnd := batchStart + batchSize
for workUuid := range jobs { if batchEnd > len(req.WorkUuids) {
if err := PostAS(workUuid); err != nil { batchEnd = len(req.WorkUuids)
errsMu.Lock() }
errs = append(errs, err)
errsMu.Unlock() currentBatch := req.WorkUuids[batchStart:batchEnd]
batchNum := batchStart/batchSize + 1
totalBatches := (len(req.WorkUuids) + batchSize - 1) / batchSize
zap.L().Info("开始处理批次",
zap.Int("批次", batchNum),
zap.Int("总批次", totalBatches),
zap.Int("本批数量", len(currentBatch)))
// 处理当前批次
var wg sync.WaitGroup
jobs := make(chan string, len(currentBatch))
currentWorkerCount := workerCount
if len(currentBatch) < workerCount {
currentWorkerCount = len(currentBatch) // 避免创建多余的协程
}
for i := 0; i < currentWorkerCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for workUuid := range jobs {
if err := PostAS(ctx, workUuid); err != nil {
errsMu.Lock()
errs = append(errs, err)
errsMu.Unlock()
}
} }
} }()
}() }
for _, workUuid := range currentBatch {
jobs <- workUuid
}
close(jobs)
wg.Wait()
zap.L().Info("批次处理完成", zap.Int("批次", batchNum))
// 如果不是最后一批等待10-15秒再进行下一批
if batchEnd < len(req.WorkUuids) {
waitTime := 10 + time.Duration(time.Now().UnixNano()%6) // 随机10-15秒
zap.L().Info("等待后再处理下一批", zap.Int64("等待秒数", int64(waitTime)))
time.Sleep(waitTime * time.Second)
}
} }
for _, workUuid := range req.WorkUuids {
jobs <- workUuid
}
close(jobs)
wg.Wait()
// 统计结果 // 统计结果
successCount := len(req.WorkUuids) - len(errs) successCount := len(req.WorkUuids) - len(errs)
zap.L().Info("发布完成", zap.L().Info("发布完成",
@ -493,9 +529,9 @@ func PublishWork(ctx context.Context, req *cast.PublishReq) error {
return nil return nil
} }
func PostAS(workUuid string) error { func PostAS(ctx context.Context, workUuid string) error {
var err error var err error
_, err = service.CastProvider.Publish(context.Background(), &cast.PublishReq{WorkUuids: []string{workUuid}}) _, err = service.CastProvider.Publish(ctx, &cast.PublishReq{WorkUuids: []string{workUuid}})
if err != nil { if err != nil {
zap.L().Error("Publish err", zap.String("workUuid", workUuid), zap.Error(err)) zap.L().Error("Publish err", zap.String("workUuid", workUuid), zap.Error(err))
return err return err