Compare commits

..

No commits in common. "17b91fc57925ffab7a1424f4046a885a451edc4c" and "06f7246d3562c01d5d935b99d14ba1f59fdd311c" have entirely different histories.

4 changed files with 44 additions and 29 deletions

View File

@ -240,6 +240,7 @@ func NewRouter() *gin.Engine {
} }
{ {
importRoute := v1.Group("/import") importRoute := v1.Group("/import")
importRoute.GET("generate/photo/test2", imports.Test2)
importRoute.Use(middleware.CheckWebLogin(service.AccountProvider)) importRoute.Use(middleware.CheckWebLogin(service.AccountProvider))
importRoute.POST("data/bind", imports.ImportBind) importRoute.POST("data/bind", imports.ImportBind)
importRoute.POST("data/publish", imports.ImportPublish) importRoute.POST("data/publish", imports.ImportPublish)
@ -247,13 +248,13 @@ func NewRouter() *gin.Engine {
importRoute.POST("data/publish3", imports.ImportPublishV3) importRoute.POST("data/publish3", imports.ImportPublishV3)
importRoute.POST("data/publish4", imports.ImportPublishV4) importRoute.POST("data/publish4", imports.ImportPublishV4)
importRoute.POST("data/confirm", imports.WorkConfirm) importRoute.POST("data/confirm", imports.WorkConfirm)
importRoute.GET("image-content/status", imports.ImageContentGetStatus) // 获取处理结果
importRoute.POST("image-content/import", imports.ImageContentImport) // AI生成内容并导入系统 importRoute.POST("image-content/import", imports.ImageContentImport) // AI生成内容并导入系统
importRoute.GET("image-content/result", imports.ImageContentGetResult) // 获取导入结果 importRoute.GET("image-content/result", imports.ImageContentGetResult) // 获取导入结果
importRoute.GET("image-content/result/excel", imports.ImageContentGetResultExcel) // 导出错误的excel importRoute.GET("image-content/result/excel", imports.ImageContentGetResultExcel) // 导出错误的excel
importRoute.GET("generate/photo/test", imports.Test) importRoute.GET("generate/photo/test", imports.Test)
importRoute.GET("generate/photo/test1", imports.Test1) importRoute.GET("generate/photo/test1", imports.Test1)
importRoute.GET("generate/photo/test2", imports.Test2)
} }
//静态文件 //静态文件

View File

@ -11,8 +11,14 @@ import (
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
) )
func ImageContentGetResult(c *gin.Context) { func ImageContentGetStatus(c *gin.Context) {
processor := GetBatchProcessorRead() processor := GetBatchProcessorRead()
currentStatus := processor.GetStatus()
service.Success(c, gin.H{"status": currentStatus, "status_description": StatusMap[currentStatus]})
return
}
func ImageContentGetResult(c *gin.Context) {
processor := GetBatchProcessorEx()
if processor == nil { if processor == nil {
service.Success(c, gin.H{ service.Success(c, gin.H{
"status": 0, "status": 0,
@ -66,10 +72,6 @@ func ImageContentGetResult(c *gin.Context) {
} }
func ImageContentGetResultExcel(c *gin.Context) { func ImageContentGetResultExcel(c *gin.Context) {
processor := GetBatchProcessorRead() processor := GetBatchProcessorRead()
if processor == nil {
service.Error(c, errors.New("任务未开始或者任务在处理中"))
return
}
currentStatus := processor.GetStatus() currentStatus := processor.GetStatus()
switch currentStatus { switch currentStatus {
case StatusCompleted: case StatusCompleted:

View File

@ -109,6 +109,8 @@ func ImageContentImport(c *gin.Context) {
return return
} }
processor.SetStatus(StatusProcessing) processor.SetStatus(StatusProcessing)
processor = GetBatchProcessorRead()
qps := 10 qps := 10
interval := time.Second / time.Duration(qps) interval := time.Second / time.Duration(qps)

View File

@ -34,8 +34,9 @@ const (
) )
var ( var (
batchProcessor *BatchProcessor batchProcessor *BatchProcessor
RwWMutex sync.RWMutex getBatchProcessorReadMutex sync.Mutex
getBatchProcessorMutex sync.Mutex
) )
const ( const (
@ -66,15 +67,28 @@ type ImageTask struct {
Content string //内容 Content string //内容
} }
func GetBatchProcessorEx() *BatchProcessor {
return batchProcessor
}
func GetBatchProcessorRead() *BatchProcessor { func GetBatchProcessorRead() *BatchProcessor {
RwWMutex.RLock() // getBatchProcessorReadMutex.Lock()
defer RwWMutex.RUnlock() defer getBatchProcessorReadMutex.Unlock()
if batchProcessor == nil {
batchProcessor = &BatchProcessor{
tasks: make(map[string]*ImageTask),
inProgress: make(map[string]bool),
pollInterval: 100 * time.Millisecond,
status: StatusIdle,
}
}
return batchProcessor return batchProcessor
} }
func GetBatchProcessor() *BatchProcessor { func GetBatchProcessor() *BatchProcessor {
RwWMutex.Lock() getBatchProcessorReadMutex.Lock()
defer RwWMutex.Unlock() defer getBatchProcessorReadMutex.Unlock()
if batchProcessor == nil || batchProcessor.status == StatusCompleted { if batchProcessor == nil || batchProcessor.status == StatusCompleted {
batchProcessor = &BatchProcessor{ batchProcessor = &BatchProcessor{
@ -312,6 +326,7 @@ func (p *BatchProcessor) StartPolling() {
defer ticker.Stop() defer ticker.Stop()
for range ticker.C { for range ticker.C {
fmt.Println(1)
if p.IsAllCompleted() { if p.IsAllCompleted() {
p.SetStatus(StatusCompleted) p.SetStatus(StatusCompleted)
zap.L().Info("所有任务已完成,停止轮询") zap.L().Info("所有任务已完成,停止轮询")
@ -320,6 +335,7 @@ func (p *BatchProcessor) StartPolling() {
} }
for i, v := range p.inProgress { for i, v := range p.inProgress {
if !v { if !v {
fmt.Println(2)
if err := p.UpdateTaskStatuses(i); err != nil { if err := p.UpdateTaskStatuses(i); err != nil {
zap.L().Error("批量更新任务状态失败: %v", zap.Error(err)) zap.L().Error("批量更新任务状态失败: %v", zap.Error(err))
continue continue
@ -444,14 +460,14 @@ func (p *BatchProcessor) IsAllCompleted() bool {
func (p *BatchProcessor) UpdateTaskStatuses(taskId string) (err error) { func (p *BatchProcessor) UpdateTaskStatuses(taskId string) (err error) {
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()
if p.tasks[taskId].Data.PhotoUrl != "" { //如果有图片 if p.tasks[taskId].Data.PhotoUrl != "" {
//生成标题
title, content, err := p.generateTitleAndContent(p.tasks[taskId].Data) title, content, err := p.generateTitleAndContent(p.tasks[taskId].Data)
if err != nil { if err != nil {
p.tasks[taskId].Status = TaskFailed p.tasks[taskId].Status = TaskFailed
p.inProgress[taskId] = true p.inProgress[taskId] = true
p.tasks[taskId].EndTime = time.Now() p.tasks[taskId].EndTime = time.Now()
zap.L().Error("生成标题和内容失败: %v", zap.Error(err)) p.tasks[taskId].Error = fmt.Errorf("生成标题和内容失败: %v", err)
p.tasks[taskId].Error = fmt.Errorf("生成标题和内容失败")
return err return err
} }
p.tasks[taskId].Title = title p.tasks[taskId].Title = title
@ -471,8 +487,7 @@ func (p *BatchProcessor) UpdateTaskStatuses(taskId string) (err error) {
p.tasks[taskId].Status = TaskFailed p.tasks[taskId].Status = TaskFailed
p.inProgress[taskId] = true p.inProgress[taskId] = true
p.tasks[taskId].EndTime = time.Now() p.tasks[taskId].EndTime = time.Now()
zap.L().Error("发布内容失败: %v", zap.Error(err)) p.tasks[taskId].Error = fmt.Errorf("发布内容失败: %v", err)
p.tasks[taskId].Error = fmt.Errorf("发布内容失败")
} }
p.tasks[taskId].Status = TaskSuccessful p.tasks[taskId].Status = TaskSuccessful
p.inProgress[taskId] = true p.inProgress[taskId] = true
@ -482,8 +497,7 @@ func (p *BatchProcessor) UpdateTaskStatuses(taskId string) (err error) {
getTaskDetailRes, err := NewAiGenerator().GetTaskDetail(taskId) getTaskDetailRes, err := NewAiGenerator().GetTaskDetail(taskId)
if err != nil { if err != nil {
zap.L().Error("查看图片生成结果失败: %v", zap.Error(err)) return fmt.Errorf("查看图片生成结果失败: %v", err)
return fmt.Errorf("查看图片生成结果失败")
} }
// 更新本地任务状态 // 更新本地任务状态
if localTask, exists := p.tasks[getTaskDetailRes.Output.TaskID]; exists { if localTask, exists := p.tasks[getTaskDetailRes.Output.TaskID]; exists {
@ -493,17 +507,17 @@ func (p *BatchProcessor) UpdateTaskStatuses(taskId string) (err error) {
//生成标题 //生成标题
title, content, err := p.generateTitleAndContent(p.tasks[taskId].Data) title, content, err := p.generateTitleAndContent(p.tasks[taskId].Data)
if err != nil { if err != nil {
zap.L().Debug("生成标题失败")
localTask.Status = TaskFailed localTask.Status = TaskFailed
p.tasks[getTaskDetailRes.Output.TaskID].Error = err p.tasks[getTaskDetailRes.Output.TaskID].Error = err
p.inProgress[getTaskDetailRes.Output.TaskID] = true p.inProgress[getTaskDetailRes.Output.TaskID] = true
p.tasks[getTaskDetailRes.Output.TaskID].EndTime = time.Now() p.tasks[getTaskDetailRes.Output.TaskID].EndTime = time.Now()
zap.L().Error("生成标题和内容失败: %v", zap.Error(err)) return fmt.Errorf("生成标题和内容失败: %v", err)
return fmt.Errorf("生成标题和内容失败")
} }
p.tasks[taskId].Title = title p.tasks[taskId].Title = title
p.tasks[taskId].Content = content p.tasks[taskId].Content = content
//上传图片 // 直接下载并上传到桶
localTask.EndTime = time.Now() localTask.EndTime = time.Now()
urls := make([]string, 0, len(getTaskDetailRes.Output.Results)) urls := make([]string, 0, len(getTaskDetailRes.Output.Results))
for _, v1 := range getTaskDetailRes.Output.Results { for _, v1 := range getTaskDetailRes.Output.Results {
@ -511,20 +525,18 @@ func (p *BatchProcessor) UpdateTaskStatuses(taskId string) (err error) {
} }
uploadedURLs, err := downloadAndUploadImages(urls) uploadedURLs, err := downloadAndUploadImages(urls)
if err != nil { if err != nil {
zap.L().Debug("图片上传失败")
localTask.Status = TaskFailed localTask.Status = TaskFailed
p.tasks[getTaskDetailRes.Output.TaskID].Error = err p.tasks[getTaskDetailRes.Output.TaskID].Error = err
p.inProgress[getTaskDetailRes.Output.TaskID] = true p.inProgress[getTaskDetailRes.Output.TaskID] = true
p.tasks[getTaskDetailRes.Output.TaskID].EndTime = time.Now() p.tasks[getTaskDetailRes.Output.TaskID].EndTime = time.Now()
zap.L().Error("图片上传失败: %v", zap.Error(err)) localTask.Error = fmt.Errorf("图片上传失败: %v", err)
localTask.Error = fmt.Errorf("图片上传失败")
return err return err
} }
var messages string var messages string
for _, v1 := range getTaskDetailRes.Output.Results { for _, v1 := range getTaskDetailRes.Output.Results {
messages += v1.Message messages += v1.Message
} }
//发布
if err = publishImage(publishImageReq{ if err = publishImage(publishImageReq{
ArtistName: p.tasks[getTaskDetailRes.Output.TaskID].Data.ArtistName, ArtistName: p.tasks[getTaskDetailRes.Output.TaskID].Data.ArtistName,
SubNum: p.tasks[getTaskDetailRes.Output.TaskID].Data.SubNum, SubNum: p.tasks[getTaskDetailRes.Output.TaskID].Data.SubNum,
@ -541,8 +553,6 @@ func (p *BatchProcessor) UpdateTaskStatuses(taskId string) (err error) {
p.inProgress[getTaskDetailRes.Output.TaskID] = true p.inProgress[getTaskDetailRes.Output.TaskID] = true
p.tasks[getTaskDetailRes.Output.TaskID].EndTime = time.Now() p.tasks[getTaskDetailRes.Output.TaskID].EndTime = time.Now()
} }
//成功
localTask.Status = TaskSuccessful localTask.Status = TaskSuccessful
p.inProgress[getTaskDetailRes.Output.TaskID] = true p.inProgress[getTaskDetailRes.Output.TaskID] = true
p.tasks[getTaskDetailRes.Output.TaskID].EndTime = time.Now() p.tasks[getTaskDetailRes.Output.TaskID].EndTime = time.Now()
@ -551,7 +561,7 @@ func (p *BatchProcessor) UpdateTaskStatuses(taskId string) (err error) {
case "FAILED": case "FAILED":
if localTask.Status != TaskFailed { if localTask.Status != TaskFailed {
localTask.Status = TaskFailed localTask.Status = TaskFailed
p.tasks[getTaskDetailRes.Output.TaskID].Error = errors.New("生成失败") p.tasks[getTaskDetailRes.Output.TaskID].Error = errors.New("转换失败")
p.inProgress[getTaskDetailRes.Output.TaskID] = true p.inProgress[getTaskDetailRes.Output.TaskID] = true
p.tasks[getTaskDetailRes.Output.TaskID].EndTime = time.Now() p.tasks[getTaskDetailRes.Output.TaskID].EndTime = time.Now()
} }