From b152fbe01b2276b2b5353c0afdb0b2d45b5b6dc4 Mon Sep 17 00:00:00 2001 From: bx1834938347-prog Date: Fri, 5 Dec 2025 14:16:48 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9Aai=E7=94=9F=E6=88=90=E5=9B=BE?= =?UTF-8?q?=E7=89=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/service/import/imageContentGetResult.go | 3 + pkg/service/import/imageContentProcessor.go | 192 ++++++++++---------- 2 files changed, 101 insertions(+), 94 deletions(-) diff --git a/pkg/service/import/imageContentGetResult.go b/pkg/service/import/imageContentGetResult.go index dc35165..f4580fd 100644 --- a/pkg/service/import/imageContentGetResult.go +++ b/pkg/service/import/imageContentGetResult.go @@ -5,6 +5,7 @@ import ( "fmt" "fonchain-fiee/pkg/service" "fonchain-fiee/pkg/utils" + "sort" "time" "github.com/gin-gonic/gin" @@ -98,6 +99,8 @@ func returnExcel(failedTasks []*ImageTask, c *gin.Context) error { } dataList = append(dataList, &data) } + sort.Slice(failedTasks, func(i, j int) bool { return failedTasks[i].Data.LineNum < failedTasks[j].Data.LineNum }) + content, err := utils.ToExcelByType(titleList, dataList, "slice", "") if err != nil { return fmt.Errorf("生成Excel失败: %v", err) diff --git a/pkg/service/import/imageContentProcessor.go b/pkg/service/import/imageContentProcessor.go index fcbf733..a3a413c 100644 --- a/pkg/service/import/imageContentProcessor.go +++ b/pkg/service/import/imageContentProcessor.go @@ -34,8 +34,9 @@ const ( ) var ( - batchProcessor *BatchProcessor - instanceMutex sync.Mutex + batchProcessor *BatchProcessor + getBatchProcessorReadMutex sync.Mutex + getBatchProcessorMutex sync.Mutex ) const ( @@ -67,14 +68,14 @@ type ImageTask struct { } func GetBatchProcessorRead() *BatchProcessor { - instanceMutex.Lock() - defer instanceMutex.Unlock() + getBatchProcessorReadMutex.Lock() + defer getBatchProcessorReadMutex.Unlock() if batchProcessor == nil { batchProcessor = &BatchProcessor{ tasks: make(map[string]*ImageTask), inProgress: make(map[string]bool), - pollInterval: 1 * time.Second, + pollInterval: 100 * time.Millisecond, status: StatusIdle, } } @@ -82,14 +83,14 @@ func GetBatchProcessorRead() *BatchProcessor { } func GetBatchProcessor() *BatchProcessor { - instanceMutex.Lock() - defer instanceMutex.Unlock() + getBatchProcessorMutex.Lock() + defer getBatchProcessorMutex.Unlock() if batchProcessor == nil || batchProcessor.status == StatusCompleted { batchProcessor = &BatchProcessor{ tasks: make(map[string]*ImageTask), inProgress: make(map[string]bool), - pollInterval: 1 * time.Second, + pollInterval: 100 * time.Millisecond, status: StatusIdle, } } @@ -315,109 +316,112 @@ func (p *BatchProcessor) GetTaskStatistics() (completed, pending, total int, com return completed, pending, total, completedTasks, failedTasks } -// func (p *BatchProcessor) StartPolling() { -// go func() { -// ticker := time.NewTicker(p.pollInterval) -// defer ticker.Stop() -// -// for range ticker.C { -// if p.IsAllCompleted() { -// p.SetStatus(StatusCompleted) -// zap.L().Info("所有任务已完成,停止轮询") -// ticker.Stop() -// break -// } -// for i, v := range p.inProgress { -// if !v { -// if err := p.UpdateTaskStatuses(i); err != nil { -// zap.L().Error("批量更新任务状态失败: %v", zap.Error(err)) -// continue -// } -// } -// continue -// } -// } -// }() -// } func (p *BatchProcessor) StartPolling() { go func() { - ticker := time.NewTicker(p.pollInterval) // 1秒间隔 + ticker := time.NewTicker(p.pollInterval) defer ticker.Stop() - // 令牌桶,控制每秒最多10个请求 - tokenBucket := make(chan struct{}, 10) - - // 每秒补充令牌 - go func() { - refillTicker := time.NewTicker(time.Second) - defer refillTicker.Stop() - - for { - select { - case <-refillTicker.C: - // 每秒补充到10个令牌 - for i := 0; i < 10-len(tokenBucket); i++ { - select { - case tokenBucket <- struct{}{}: - default: - // 桶已满,跳过 - } - } - } - } - }() - for range ticker.C { + fmt.Println(1) if p.IsAllCompleted() { p.SetStatus(StatusCompleted) zap.L().Info("所有任务已完成,停止轮询") ticker.Stop() break } - - // 获取未完成的任务 - incompleteTasks := p.getIncompleteTasks() - if len(incompleteTasks) == 0 { + for i, v := range p.inProgress { + if !v { + fmt.Println(2) + if err := p.UpdateTaskStatuses(i); err != nil { + zap.L().Error("批量更新任务状态失败: %v", zap.Error(err)) + continue + } + } continue } - - // 处理当前可用的任务(最多10个) - processedCount := 0 - for _, taskID := range incompleteTasks { - if processedCount >= 10 { - break // 本秒已达到10个请求限制 - } - - select { - case <-tokenBucket: - // 获取到令牌,可以发送请求 - processedCount++ - go p.updateTaskWithToken(taskID, tokenBucket) - default: - // 没有令牌了,跳过 - break - } - } - - zap.L().Debug("本轮处理任务数量", - zap.Int("processed", processedCount), - zap.Int("remaining", len(incompleteTasks)-processedCount)) } }() } -// 使用令牌更新任务状态 -func (p *BatchProcessor) updateTaskWithToken(taskID string, tokenBucket chan struct{}) { - defer func() { - // 任务完成后不返还令牌,由定时器统一补充 - }() - - if err := p.UpdateTaskStatuses(taskID); err != nil { - zap.L().Error("更新任务状态失败", - zap.String("task_id", taskID), - zap.Error(err)) - } -} +//func (p *BatchProcessor) StartPolling() { +// go func() { +// ticker := time.NewTicker(p.pollInterval) // 1秒间隔 +// defer ticker.Stop() +// +// // 令牌桶,控制每秒最多10个请求 +// tokenBucket := make(chan struct{}, 10) +// +// // 每秒补充令牌 +// go func() { +// refillTicker := time.NewTicker(time.Second) +// defer refillTicker.Stop() +// +// for { +// select { +// case <-refillTicker.C: +// // 每秒补充到10个令牌 +// for i := 0; i < 10-len(tokenBucket); i++ { +// select { +// case tokenBucket <- struct{}{}: +// default: +// // 桶已满,跳过 +// } +// } +// } +// } +// }() +// +// for range ticker.C { +// if p.IsAllCompleted() { +// p.SetStatus(StatusCompleted) +// zap.L().Info("所有任务已完成,停止轮询") +// ticker.Stop() +// break +// } +// +// // 获取未完成的任务 +// incompleteTasks := p.getIncompleteTasks() +// if len(incompleteTasks) == 0 { +// continue +// } +// +// // 处理当前可用的任务(最多10个) +// processedCount := 0 +// for _, taskID := range incompleteTasks { +// if processedCount >= 10 { +// break // 本秒已达到10个请求限制 +// } +// +// select { +// case <-tokenBucket: +// // 获取到令牌,可以发送请求 +// processedCount++ +// go p.updateTaskWithToken(taskID, tokenBucket) +// default: +// // 没有令牌了,跳过 +// break +// } +// } +// +// zap.L().Debug("本轮处理任务数量", +// zap.Int("processed", processedCount), +// zap.Int("remaining", len(incompleteTasks)-processedCount)) +// } +// }() +//} +// +//// 使用令牌更新任务状态 +//func (p *BatchProcessor) updateTaskWithToken(taskID string, tokenBucket chan struct{}) { +// defer func() { +// // 任务完成后不返还令牌,由定时器统一补充 +// }() +// +// if err := p.UpdateTaskStatuses(taskID); err != nil { +// zap.L().Error("更新任务状态失败", +// zap.String("task_id", taskID), +// zap.Error(err)) +// } +//} // 获取未完成的任务列表 func (p *BatchProcessor) getIncompleteTasks() []string {