fonchain-fiee/pkg/service/import/imageContentProcessor.go
2025-12-08 14:12:38 +08:00

727 lines
20 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package imports
import (
"context"
"errors"
"fmt"
"fonchain-fiee/api/accountFiee"
apiCast "fonchain-fiee/api/cast"
"fonchain-fiee/pkg/config"
"fonchain-fiee/pkg/service"
"io"
"log"
"net/http"
"os"
"path/filepath"
"strconv"
"sync"
"time"
"github.com/fonchain_enterprise/utils/objstorage"
"go.uber.org/zap"
)
type TaskStatus string
const (
TaskPending TaskStatus = "PENDING" //任务排队中
TaskRunning TaskStatus = "RUNNING" //任务处理中
TaskSuccessful TaskStatus = "SUCCESSFUL" //任务执行成功
TaskFailed TaskStatus = "FAILED" //任务执行失败
TaskCancelled TaskStatus = "CANCELLED" //任务已经取消
TaskCanceled TaskStatus = "UNKNOWN" //任务不存在
)
var (
batchProcessor *BatchProcessor
batchProcessorMutex sync.Mutex
)
const (
StatusIdle = 0 // 空闲中(可执行新任务)
StatusProcessing = 1 // 处理中(只能读取进度)
StatusCompleted = 2 // 已完成(可读取结果)
)
var StatusMap = map[int]string{StatusIdle: "空闲中", StatusProcessing: "处理中", StatusCompleted: "已完成"}
type BatchProcessor struct {
mu sync.RWMutex
tasks map[int]*ImageTask //任务唯一标识find任务详情
inProgress map[int]bool //任务唯一标识find任务进行状态
idFindTaskId map[int]string //任务唯一标识find第三方编号id
pollInterval time.Duration //间隔时间
status int //全局实例状态
}
type ImageTask struct {
Data *excelData //导入的初始任务数据
TaskID int //任务唯一标识
Status TaskStatus //第三方返回的错误
Error error //任务是否报错
Title string //生成的标题
Content string //生成的内容
StartTime time.Time //任务开始时间
EndTime time.Time //任务结束时间
RetryCount int //重试次数
}
// GetBatchProcessorReadOnly 获取只读实例
func GetBatchProcessorReadOnly() *BatchProcessor {
return batchProcessor
}
// GetOrCreateBatchProcessor 获取实例
func GetOrCreateBatchProcessor() *BatchProcessor {
batchProcessorMutex.Lock()
defer batchProcessorMutex.Unlock()
if batchProcessor == nil || batchProcessor.status == StatusCompleted {
batchProcessor = &BatchProcessor{
tasks: make(map[int]*ImageTask),
inProgress: make(map[int]bool),
idFindTaskId: make(map[int]string),
pollInterval: 100 * time.Millisecond,
status: StatusIdle,
}
}
return batchProcessor
}
// 设置任务状态
func (p *BatchProcessor) setStatus(status int) {
p.mu.Lock()
defer p.mu.Unlock()
p.status = status
}
// GetStatus 获取当前状态
func (p *BatchProcessor) getStatus() int {
p.mu.RLock()
defer p.mu.RUnlock()
return p.status
}
// GetTaskStatistics 获取实例相关信息
func (p *BatchProcessor) GetTaskStatistics() (completed, pending, total int, completedTasks, failedTasks []*ImageTask) {
p.mu.Lock()
defer p.mu.Unlock()
total = len(p.tasks)
for _, task := range p.tasks {
if p.inProgress[task.TaskID] { //是否转换成功
completed++
if task.Status == TaskSuccessful && task.Error == nil { //转换成功 并且 发布成功
completedTasks = append(completedTasks, task)
} else if task.Status == TaskFailed || task.Error != nil { //转换失败 或者 发布失败
failedTasks = append(failedTasks, task)
}
} else {
pending++
}
}
return completed, pending, total, completedTasks, failedTasks
}
// 提交一个任务
func (p *BatchProcessor) submitTask(req *excelData) error {
// 获取用户信息
list, err := service.AccountFieeProvider.UserList(context.Background(), &accountFiee.UserListRequest{
Name: req.ArtistName,
SubNum: req.SubNum,
})
if err != nil {
return fmt.Errorf("获取用户信息失败: %s", err.Error())
}
if len(list.UserList) == 0 {
return fmt.Errorf("未找到用户信息: %s", req.ArtistName)
}
// 获取用户详细信息
_, err = service.AccountFieeProvider.Info(context.Background(), &accountFiee.InfoRequest{
ID: list.UserList[0].Id,
Domain: "app",
})
if err != nil {
return fmt.Errorf("获取用户详细信息失败: %s", err.Error())
}
// 获取 TikTok 自媒体账号
accountListTikTok, err := service.CastProvider.MediaUserList(context.Background(), &apiCast.MediaUserListReq{
ArtistVal: req.ArtistName,
PlatformID: 1,
Page: 1,
PageSize: 10,
ArtistUuid: strconv.FormatUint(list.UserList[0].Id, 10),
})
if err != nil {
return fmt.Errorf("获取 TikTok 账号失败: %s", err.Error())
}
if accountListTikTok == nil || len(accountListTikTok.Data) == 0 {
return fmt.Errorf("tiktok自媒体账号数量为0")
}
// 查找 TikTok 账号
tiktokFound := false
for _, user := range accountListTikTok.Data {
if user.PlatformUserName == req.TikTok {
req.MediaAccountNames = append(req.MediaAccountNames, user.PlatformUserName)
req.MediaAccountUuids = append(req.MediaAccountUuids, user.MediaAccountUuid)
tiktokFound = true
break
}
}
if !tiktokFound {
return fmt.Errorf("未找到匹配的TikTok账号: %s", req.TikTok)
}
// 获取 Instagram 自媒体账号
accountListIns, err := service.CastProvider.MediaUserList(context.Background(), &apiCast.MediaUserListReq{
ArtistVal: req.ArtistName,
PlatformID: 3,
Page: 1,
PageSize: 10,
ArtistUuid: strconv.FormatUint(list.UserList[0].Id, 10),
})
if err != nil {
return fmt.Errorf("获取 Instagram 账号失败: %s", err.Error())
}
if accountListIns == nil || len(accountListIns.Data) == 0 {
return fmt.Errorf("ins自媒体账号数量为0")
}
// 查找 Instagram 账号
insFound := false
for _, user := range accountListIns.Data {
if user.PlatformUserName == req.Instagram {
req.MediaAccountNames = append(req.MediaAccountNames, user.PlatformUserName)
req.MediaAccountUuids = append(req.MediaAccountUuids, user.MediaAccountUuid)
insFound = true
break
}
}
if !insFound {
return fmt.Errorf("未找到匹配的Instagram账号: %s", req.Instagram)
}
p.mu.Lock()
defer p.mu.Unlock()
switch {
case req.PhotoUrl == "": // 如果没有提供照片 URL生成标题和内容
// 生成标题和内容
title, content, err := p.generateTitleAndContent(req)
if err != nil {
zap.L().Error("生成标题和内容失败: %v", zap.Error(err))
return fmt.Errorf("生成标题失败")
}
req.Title = title
req.Content = content
// 请求生成图片
taskId, err := p.generateImage(req)
if err != nil {
zap.L().Error("生成图片失败: %v", zap.Error(err))
return fmt.Errorf("生成图片失败")
}
// 创建并保存任务
task := &ImageTask{
Title: title,
Content: content,
Data: req,
TaskID: req.LineNum,
Status: TaskPending,
StartTime: time.Now(),
}
p.tasks[req.LineNum] = task
p.inProgress[req.LineNum] = false
p.idFindTaskId[req.LineNum] = taskId
case req.PhotoUrl != "": //如果有图片
task := &ImageTask{
Data: req,
TaskID: req.LineNum,
Status: TaskPending,
StartTime: time.Now(),
}
p.tasks[req.LineNum] = task
p.inProgress[req.LineNum] = false
}
return nil
}
// 开始轮询
func (p *BatchProcessor) startPolling() {
go func() {
ticker := time.NewTicker(p.pollInterval)
defer ticker.Stop()
for range ticker.C {
if p.IsAllCompleted() {
p.setStatus(StatusCompleted)
zap.L().Info("所有任务已完成,停止轮询")
ticker.Stop()
break
}
for i, v := range p.inProgress {
if !v {
if err := p.updateTask(i); err != nil {
zap.L().Error("批量更新任务状态失败: %v", zap.Error(err))
continue
}
}
continue
}
}
}()
}
// 更新任务
func (p *BatchProcessor) updateTask(id int) (err error) {
p.mu.Lock()
defer p.mu.Unlock()
// 任务不存在直接返回
task, exists := p.tasks[id]
if !exists {
return fmt.Errorf("任务ID %d 不存在", id)
}
switch {
case task.Data.PhotoUrl != "": // 如果有图片 URL生成标题和内容并发布
// 生成标题和内容
title, content, err := p.generateTitleAndContent(task.Data)
if err != nil { //生成标题失败
task.Status = TaskFailed
p.inProgress[id] = true
task.EndTime = time.Now()
task.Error = fmt.Errorf("生成标题和内容失败: %s", err.Error())
zap.L().Error("生成标题和内容失败: %v", zap.Error(err))
return err
}
task.Title = title
task.Content = content
// 发布内容
if err = publishImage(publishImageReq{
ArtistName: task.Data.ArtistName,
SubNum: task.Data.SubNum,
Title: task.Title,
Content: task.Content,
TikTok: task.Data.TikTok,
Instagram: task.Data.Instagram,
GeneratePhotoUrl: []string{task.Data.PhotoUrl},
MediaAccountUuids: task.Data.MediaAccountUuids,
MediaAccountNames: task.Data.MediaAccountNames,
}); err != nil { //发布失败
task.Status = TaskFailed
p.inProgress[id] = true
task.EndTime = time.Now()
task.Error = fmt.Errorf("发布内容失败: %s", err.Error())
zap.L().Error("发布内容失败: %v", zap.Error(err))
return err
}
//操作成功
task.Status = TaskSuccessful
p.inProgress[id] = true
task.EndTime = time.Now()
case task.Data.PhotoUrl == "": // 如果没有图片 URL处理图片生成结果
//获取图片结果
getTaskDetailRes, err := NewAiGenerator().GetTaskDetail(p.idFindTaskId[id])
if err != nil { //获取图片结果失败
zap.L().Error("查看图片生成结果失败: %v", zap.Error(err))
task.Error = fmt.Errorf("查看图片生成结果失败")
task.Status = TaskFailed
p.inProgress[id] = true
task.EndTime = time.Now()
return err
}
switch getTaskDetailRes.Output.TaskStatus {
case "SUCCEEDED":
if task.Status != TaskSuccessful {
//上传图片
urls := make([]string, len(getTaskDetailRes.Output.Results))
for i, v := range getTaskDetailRes.Output.Results {
urls[i] = v.URL
}
uploadedURLs, err := downloadAndUploadImages(urls)
if err != nil { //图片上传失败
zap.L().Error("图片上传失败: %v", zap.Error(err))
task.Error = fmt.Errorf("图片上传失败")
task.Status = TaskFailed
p.inProgress[id] = true
task.EndTime = time.Now()
return err
}
// 发布图文
if err = publishImage(publishImageReq{
ArtistName: task.Data.ArtistName,
SubNum: task.Data.SubNum,
Title: task.Title,
Content: task.Content,
TikTok: task.Data.TikTok,
Instagram: task.Data.Instagram,
MediaAccountUuids: task.Data.MediaAccountUuids,
MediaAccountNames: task.Data.MediaAccountNames,
GeneratePhotoUrl: uploadedURLs,
}); err != nil { //发布失败
zap.L().Error("发布内容失败: %v", zap.Error(err))
task.Error = fmt.Errorf("发布内容失败")
task.Status = TaskFailed
p.inProgress[id] = true
task.EndTime = time.Now()
return err
}
//处理成功
task.Status = TaskSuccessful
p.inProgress[id] = true
task.EndTime = time.Now()
}
case "FAILED": //第三方返回失败
if task.Status != TaskFailed {
zap.L().Error("第三方生成失败: %v", zap.Error(err))
task.Error = fmt.Errorf("生成失败")
task.Status = TaskFailed
p.inProgress[id] = true
task.EndTime = time.Now()
return err
}
}
}
return nil
}
// IsAllCompleted 获取任务是否全部完成
func (p *BatchProcessor) IsAllCompleted() bool {
p.mu.RLock()
defer p.mu.RUnlock()
if len(p.inProgress) == 0 {
return true
}
// 检查是否所有任务都标记为完成
for _, completed := range p.inProgress {
if !completed {
return false
}
}
return true
}
// 获取未完成的任务列表
func (p *BatchProcessor) getIncompleteTasks() []int {
p.mu.RLock()
defer p.mu.RUnlock()
var incomplete []int
for taskID, completed := range p.inProgress {
if !completed {
incomplete = append(incomplete, taskID)
}
}
return incomplete
}
// 批量上传图片
func downloadAndUploadImages(urls []string) ([]string, error) {
var uploadedURLs []string
for _, result := range urls {
if result == "" {
continue
}
// 下载并直接上传到桶
bucketURL, err := downloadAndUploadToBucket(result)
if err != nil {
log.Printf("图片上传失败 [%s]: %v", result, err)
continue
}
uploadedURLs = append(uploadedURLs, bucketURL)
log.Printf("图片上传成功: %s -> %s", result, bucketURL)
}
if len(uploadedURLs) == 0 {
return nil, errors.New("所有图片上传失败")
}
return uploadedURLs, nil
}
// 上传图片到桶里面
func downloadAndUploadToBucket(imageURL string) (string, error) {
// 创建临时目录
tempDir := "tmp"
if err := os.MkdirAll(tempDir, 0755); err != nil {
return "", fmt.Errorf("创建临时目录失败: %v", err)
}
defer os.RemoveAll(tempDir) // 程序结束时清理整个目录
// 生成唯一文件名
fileName := fmt.Sprintf("%d.jpg",
time.Now().Unix())
// 构建文件路径
imgPath := filepath.Join(tempDir, fileName)
// 创建文件
file, err := os.Create(imgPath)
if err != nil {
return "", fmt.Errorf("创建文件失败: %v", err)
}
defer file.Close()
log.Printf("文件创建在: %s", imgPath)
// 下载图片到文件
resp, err := http.Get(imageURL)
if err != nil {
return "", fmt.Errorf("下载图片失败: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("下载失败,状态码: %d", resp.StatusCode)
}
// 复制到文件
_, err = io.Copy(file, resp.Body)
if err != nil {
return "", fmt.Errorf("保存文件失败: %v", err)
}
file.Sync()
fileBytes, err := os.ReadFile(imgPath)
if err != nil {
return "", fmt.Errorf("读取本地文件失败: %v", err)
}
// 上传到桶
BOSClient, err := objstorage.NewOSS(
os.Getenv(config.ConfigData.Oss.AccessKeyId),
os.Getenv(config.ConfigData.Oss.AccessKeySecret),
os.Getenv(config.ConfigData.Oss.Endpoint),
)
if BOSClient == nil {
return "", fmt.Errorf("上传文件失败: %v", err)
}
_, err = BOSClient.PutObjectFromBytes(os.Getenv(config.ConfigData.Oss.BucketName), fileName, fileBytes)
if err != nil {
return "", fmt.Errorf("上传文件失败: %v", err)
}
url := fmt.Sprintf("%s/%s", os.Getenv(config.ConfigData.Oss.CdnHost), fileName)
return url, nil
}
func (p *BatchProcessor) generateTitleAndContent(req *excelData) (string, string, error) {
if req.PhotoUrl != "" {
title, content, err := NewAiGenerator().GenerateTitleAndContentFromImage(
req.PhotoUrl,
req.TitleRequire,
req.ContentRequire,
)
if err != nil {
return "", "", fmt.Errorf("图生文失败: %v", err)
}
return title, content, nil
} else {
title, content, err := NewAiGenerator().GenerateTitleAndContentFromText(
req.TitleRequire,
req.ContentRequire,
)
if err != nil {
return "", "", fmt.Errorf("生成内容失败: %v", err)
}
return title, content, nil
//// 无图片:使用文生文
//title, err := p.generateTitle(req)
//if err != nil {
// return "", "", fmt.Errorf("生成标题失败: %v", err)
//}
//
//content, err := p.generateContent(req)
//if err != nil {
// return "", "", fmt.Errorf("生成内容失败: %v", err)
//}
}
}
//func (p *BatchProcessor) generateTitle(req *excelData) (string, error) {
// prompt := fmt.Sprintf("请根据以下要求生成一个标题:%s", req.TitleRequire)
// if req.Desc != "" {
// prompt += fmt.Sprintf("\n艺人简介%s", req.Desc)
// }
// prompt += "\n请直接输出标题不要包含任何其他文字。"
//
// result, err := NewAiGenerator().GenerateTextSync(prompt)
// if err != nil {
// return "", err
// }
//
// if len(result.Output.Choices) == 0 {
// return "", errors.New("AI未生成标题内容")
// }
//
// req.Title = strings.TrimSpace(result.Output.Choices[0].Message.Content)
// return req.Title, nil
//}
//
//func (p *BatchProcessor) generateContent(req *excelData) (string, error) {
// // 使用已生成的标题作为上下文
// prompt := fmt.Sprintf("请根据以下要求生成内容:%s", req.ContentRequire)
// if req.Title != "" {
// prompt += fmt.Sprintf("\n标题%s", req.Title) // 关联标题
// }
// if req.Desc != "" {
// prompt += fmt.Sprintf("\n艺人简介%s", req.Desc)
// }
// prompt += "\n请基于标题生成相关内容直接输出内容不要包含任何其他文字。"
//
// result, err := NewAiGenerator().GenerateTextSync(prompt)
// if err != nil {
// return "", err
// }
//
// if len(result.Output.Choices) == 0 {
// return "", errors.New("AI未生成内容")
// }
//
// req.Content = strings.TrimSpace(result.Output.Choices[0].Message.Content)
// return req.Content, nil
//}
//func (p *BatchProcessor) generateImage(req *excelData) (string, error) {
// prompt := fmt.Sprintf("请根据以下要求生成内容:%s", req.PhotoRequire)
// if req.Title != "" {
// prompt += fmt.Sprintf("\n标题%s", req.Title) // 关联标题
// }
// if req.Content != "" {
// prompt += fmt.Sprintf("\n内容%s", req.Content) // 关联内容
// }
// if req.Desc != "" {
// prompt += fmt.Sprintf("\n艺人简介%s", req.Desc)
// }
// prompt += "\n请基于标题和内容生成相关内容"
//
// result, err := NewAiGenerator().TextToImage(
// prompt,
// "1024*1024",
// req.PhotoNum,
// )
// if err != nil {
// return "", err
// }
// return result.Output.TaskID, nil
//}
func (p *BatchProcessor) generateImage(req *excelData) (string, error) {
prompt := fmt.Sprintf("请根据以下要求生成内容:%s\n", req.PhotoRequire)
if req.Title != "" {
prompt += fmt.Sprintf("1标题%s\n", req.Title) // 关联标题
}
if req.Content != "" {
prompt += fmt.Sprintf("2内容%s\n", req.Content) // 关联内容
}
//if req.Desc != "" {
// prompt += fmt.Sprintf("3艺人简介%s", req.Desc)
//}
prompt += "\n请基于标题和内容生成相关图片"
result, err := NewAiGenerator().TextToImage(
prompt,
"1024*1024",
req.PhotoNum,
)
if err != nil {
return "", err
}
return result.Output.TaskID, nil
}
//func (p *BatchProcessor) StartPolling() {
// go func() {
// ticker := time.NewTicker(p.pollInterval) // 1秒间隔
// defer ticker.Stop()
//
// // 令牌桶控制每秒最多10个请求
// tokenBucket := make(chan struct{}, 10)
//
// // 每秒补充令牌
// go func() {
// refillTicker := time.NewTicker(time.Second)
// defer refillTicker.Stop()
//
// for {
// select {
// case <-refillTicker.C:
// // 每秒补充到10个令牌
// for i := 0; i < 10-len(tokenBucket); i++ {
// select {
// case tokenBucket <- struct{}{}:
// default:
// // 桶已满,跳过
// }
// }
// }
// }
// }()
//
// for range ticker.C {
// if p.IsAllCompleted() {
// p.SetStatus(StatusCompleted)
// zap.L().Info("所有任务已完成,停止轮询")
// ticker.Stop()
// break
// }
//
// // 获取未完成的任务
// incompleteTasks := p.getIncompleteTasks()
// if len(incompleteTasks) == 0 {
// continue
// }
//
// // 处理当前可用的任务最多10个
// processedCount := 0
// for _, taskID := range incompleteTasks {
// if processedCount >= 10 {
// break // 本秒已达到10个请求限制
// }
//
// select {
// case <-tokenBucket:
// // 获取到令牌,可以发送请求
// processedCount++
// go p.updateTaskWithToken(taskID, tokenBucket)
// default:
// // 没有令牌了,跳过
// break
// }
// }
//
// zap.L().Debug("本轮处理任务数量",
// zap.Int("processed", processedCount),
// zap.Int("remaining", len(incompleteTasks)-processedCount))
// }
// }()
//}
//
//// 使用令牌更新任务状态
//func (p *BatchProcessor) updateTaskWithToken(taskID string, tokenBucket chan struct{}) {
// defer func() {
// // 任务完成后不返还令牌,由定时器统一补充
// }()
//
// if err := p.UpdateTaskStatuses(taskID); err != nil {
// zap.L().Error("更新任务状态失败",
// zap.String("task_id", taskID),
// zap.Error(err))
// }
//}