746 lines
21 KiB
Go
746 lines
21 KiB
Go
package imports
|
||
|
||
import (
|
||
"context"
|
||
"errors"
|
||
"fmt"
|
||
"fonchain-fiee/api/accountFiee"
|
||
apiCast "fonchain-fiee/api/cast"
|
||
"fonchain-fiee/pkg/config"
|
||
"fonchain-fiee/pkg/service"
|
||
"io"
|
||
"log"
|
||
"math/rand"
|
||
"net/http"
|
||
"os"
|
||
"path/filepath"
|
||
"strconv"
|
||
"sync"
|
||
"time"
|
||
|
||
"github.com/fonchain_enterprise/utils/objstorage"
|
||
"go.uber.org/zap"
|
||
)
|
||
|
||
type TaskStatus string
|
||
|
||
const (
|
||
TaskPending TaskStatus = "PENDING" //任务排队中
|
||
TaskRunning TaskStatus = "RUNNING" //任务处理中
|
||
TaskSuccessful TaskStatus = "SUCCESSFUL" //任务执行成功
|
||
TaskFailed TaskStatus = "FAILED" //任务执行失败
|
||
TaskCancelled TaskStatus = "CANCELLED" //任务已经取消
|
||
TaskCanceled TaskStatus = "UNKNOWN" //任务不存在
|
||
)
|
||
|
||
var (
|
||
batchProcessor *BatchProcessor
|
||
batchProcessorMutex sync.Mutex
|
||
)
|
||
|
||
const (
|
||
StatusIdle = 0 // 空闲中(可执行新任务)
|
||
StatusProcessing = 1 // 处理中(只能读取进度)
|
||
StatusCompleted = 2 // 已完成(可读取结果)
|
||
)
|
||
|
||
var StatusMap = map[int]string{StatusIdle: "空闲中", StatusProcessing: "处理中", StatusCompleted: "已完成"}
|
||
|
||
type BatchProcessor struct {
|
||
mu sync.RWMutex
|
||
tasks map[int]*ImageTask //任务唯一标识find任务详情
|
||
inProgress map[int]bool //任务唯一标识find任务进行状态
|
||
idTaskStatus map[int]map[string]TaskStatus
|
||
pollInterval time.Duration //间隔时间
|
||
status int //全局实例状态
|
||
}
|
||
|
||
type ImageTask struct {
|
||
Data *excelData //导入的初始任务数据
|
||
TaskID int //任务唯一标识
|
||
Status TaskStatus //第三方返回的错误
|
||
Error error //任务是否报错
|
||
Title string //生成的标题
|
||
Content string //生成的内容
|
||
StartTime time.Time //任务开始时间
|
||
EndTime time.Time //任务结束时间
|
||
RetryCount int //重试次数
|
||
SubTaskStatus map[string]TaskStatus // 子任务状态: taskID -> "pending"/"success"/"failed"
|
||
RequiredCount int // 需要成功的子任务数量
|
||
SuccessCount int // 已成功的子任务数量
|
||
FailedCount int // 已失败的子任务数量
|
||
Urls []string
|
||
}
|
||
|
||
// GetBatchProcessorReadOnly 获取只读实例
|
||
func GetBatchProcessorReadOnly() *BatchProcessor {
|
||
return batchProcessor
|
||
}
|
||
|
||
// GetOrCreateBatchProcessor 获取实例
|
||
func GetOrCreateBatchProcessor() *BatchProcessor {
|
||
batchProcessorMutex.Lock()
|
||
defer batchProcessorMutex.Unlock()
|
||
|
||
if batchProcessor == nil || batchProcessor.status == StatusCompleted {
|
||
batchProcessor = &BatchProcessor{
|
||
tasks: make(map[int]*ImageTask),
|
||
inProgress: make(map[int]bool),
|
||
idTaskStatus: make(map[int]map[string]TaskStatus),
|
||
pollInterval: 100 * time.Millisecond,
|
||
status: StatusIdle,
|
||
}
|
||
}
|
||
return batchProcessor
|
||
}
|
||
|
||
// 设置任务状态
|
||
func (p *BatchProcessor) setStatus(status int) {
|
||
p.mu.Lock()
|
||
defer p.mu.Unlock()
|
||
p.status = status
|
||
|
||
}
|
||
|
||
// GetStatus 获取当前状态
|
||
func (p *BatchProcessor) getStatus() int {
|
||
p.mu.RLock()
|
||
defer p.mu.RUnlock()
|
||
return p.status
|
||
}
|
||
|
||
// GetTaskStatistics 获取实例相关信息
|
||
func (p *BatchProcessor) GetTaskStatistics() (completed, pending, total int, completedTasks, failedTasks []*ImageTask) {
|
||
p.mu.Lock()
|
||
defer p.mu.Unlock()
|
||
total = len(p.tasks)
|
||
for _, task := range p.tasks {
|
||
if p.inProgress[task.TaskID] { //是否转换成功
|
||
completed++
|
||
if task.Status == TaskSuccessful && task.Error == nil { //转换成功 并且 发布成功
|
||
completedTasks = append(completedTasks, task)
|
||
} else if task.Status == TaskFailed || task.Error != nil { //转换失败 或者 发布失败
|
||
failedTasks = append(failedTasks, task)
|
||
}
|
||
} else {
|
||
pending++
|
||
}
|
||
}
|
||
|
||
return completed, pending, total, completedTasks, failedTasks
|
||
}
|
||
|
||
// 提交一个任务
|
||
func (p *BatchProcessor) submitTask(req *excelData) error {
|
||
// 获取用户信息
|
||
list, err := service.AccountFieeProvider.UserList(context.Background(), &accountFiee.UserListRequest{
|
||
Name: req.ArtistName,
|
||
SubNum: req.SubNum,
|
||
})
|
||
if err != nil {
|
||
return fmt.Errorf("获取用户信息失败: %s", err.Error())
|
||
}
|
||
if len(list.UserList) == 0 {
|
||
return fmt.Errorf("未找到用户信息: %s", req.ArtistName)
|
||
}
|
||
// 获取用户详细信息
|
||
_, err = service.AccountFieeProvider.Info(context.Background(), &accountFiee.InfoRequest{
|
||
ID: list.UserList[0].Id,
|
||
Domain: "app",
|
||
})
|
||
if err != nil {
|
||
return fmt.Errorf("获取用户详细信息失败: %s", err.Error())
|
||
}
|
||
|
||
// 获取 TikTok 自媒体账号
|
||
accountListTikTok, err := service.CastProvider.MediaUserList(context.Background(), &apiCast.MediaUserListReq{
|
||
ArtistVal: req.ArtistName,
|
||
PlatformID: 1,
|
||
Page: 1,
|
||
PageSize: 10,
|
||
ArtistUuid: strconv.FormatUint(list.UserList[0].Id, 10),
|
||
})
|
||
if err != nil {
|
||
return fmt.Errorf("获取 TikTok 账号失败: %s", err.Error())
|
||
}
|
||
if accountListTikTok == nil || len(accountListTikTok.Data) == 0 {
|
||
return fmt.Errorf("tiktok自媒体账号数量为0")
|
||
}
|
||
// 查找 TikTok 账号
|
||
tiktokFound := false
|
||
for _, user := range accountListTikTok.Data {
|
||
if user.PlatformUserName == req.TikTok {
|
||
req.MediaAccountNames = append(req.MediaAccountNames, user.PlatformUserName)
|
||
req.MediaAccountUuids = append(req.MediaAccountUuids, user.MediaAccountUuid)
|
||
tiktokFound = true
|
||
break
|
||
}
|
||
}
|
||
if !tiktokFound {
|
||
return fmt.Errorf("未找到匹配的TikTok账号: %s", req.TikTok)
|
||
}
|
||
// 获取 Instagram 自媒体账号
|
||
accountListIns, err := service.CastProvider.MediaUserList(context.Background(), &apiCast.MediaUserListReq{
|
||
ArtistVal: req.ArtistName,
|
||
PlatformID: 3,
|
||
Page: 1,
|
||
PageSize: 10,
|
||
ArtistUuid: strconv.FormatUint(list.UserList[0].Id, 10),
|
||
})
|
||
if err != nil {
|
||
return fmt.Errorf("获取 Instagram 账号失败: %s", err.Error())
|
||
}
|
||
if accountListIns == nil || len(accountListIns.Data) == 0 {
|
||
return fmt.Errorf("ins自媒体账号数量为0")
|
||
}
|
||
|
||
// 查找 Instagram 账号
|
||
insFound := false
|
||
for _, user := range accountListIns.Data {
|
||
if user.PlatformUserName == req.Instagram {
|
||
req.MediaAccountNames = append(req.MediaAccountNames, user.PlatformUserName)
|
||
req.MediaAccountUuids = append(req.MediaAccountUuids, user.MediaAccountUuid)
|
||
insFound = true
|
||
break
|
||
}
|
||
}
|
||
if !insFound {
|
||
return fmt.Errorf("未找到匹配的Instagram账号: %s", req.Instagram)
|
||
}
|
||
|
||
switch {
|
||
case req.PhotoUrl == "": // 如果没有提供照片 URL,生成标题和内容
|
||
// 生成标题和内容
|
||
title, content, err := p.generateTitleAndContent(req)
|
||
if err != nil {
|
||
zap.L().Error("生成标题和内容失败: %v", zap.Error(err))
|
||
return fmt.Errorf("生成标题失败")
|
||
}
|
||
req.Title = title
|
||
req.Content = content
|
||
|
||
// 请求生成图片
|
||
|
||
taskIds := make([]string, 0, req.PhotoNum)
|
||
for i := 0; i < req.PhotoNum; i++ {
|
||
taskId, err := p.generateImage(req)
|
||
if err != nil {
|
||
zap.L().Error("生成图片失败: %v", zap.Error(err))
|
||
p.recordSubTaskStatus(req.LineNum, taskId, TaskFailed)
|
||
return fmt.Errorf("生成图片失败")
|
||
}
|
||
taskIds = append(taskIds, taskId)
|
||
p.recordSubTaskStatus(req.LineNum, taskId, TaskPending)
|
||
}
|
||
|
||
// 创建并保存任务
|
||
task := &ImageTask{
|
||
Data: req,
|
||
TaskID: req.LineNum,
|
||
Status: TaskPending,
|
||
Title: title,
|
||
Content: content,
|
||
StartTime: time.Now(),
|
||
SubTaskStatus: make(map[string]TaskStatus),
|
||
RequiredCount: req.PhotoNum, // 需要成功的图片数量
|
||
SuccessCount: 0,
|
||
FailedCount: 0,
|
||
}
|
||
p.tasks[req.LineNum] = task
|
||
p.inProgress[req.LineNum] = false
|
||
|
||
case req.PhotoUrl != "": //如果有图片
|
||
task := &ImageTask{
|
||
Data: req,
|
||
TaskID: req.LineNum,
|
||
Status: TaskPending,
|
||
StartTime: time.Now(),
|
||
RequiredCount: 1, // 单张图片只需要成功1次
|
||
SuccessCount: 0,
|
||
FailedCount: 0,
|
||
SubTaskStatus: make(map[string]TaskStatus),
|
||
}
|
||
p.recordSubTaskStatus(req.LineNum, strconv.Itoa(req.LineNum), TaskPending)
|
||
p.tasks[req.LineNum] = task
|
||
p.inProgress[req.LineNum] = false
|
||
}
|
||
return nil
|
||
}
|
||
func (p *BatchProcessor) recordSubTaskStatus(taskID int, subTaskID string, status TaskStatus) {
|
||
|
||
if _, exists := p.idTaskStatus[taskID]; !exists {
|
||
p.idTaskStatus[taskID] = make(map[string]TaskStatus)
|
||
}
|
||
|
||
p.idTaskStatus[taskID][subTaskID] = status
|
||
|
||
// 更新主任务状态
|
||
if task, exists := p.tasks[taskID]; exists {
|
||
switch status {
|
||
case TaskSuccessful:
|
||
task.SuccessCount++
|
||
task.SubTaskStatus[subTaskID] = TaskSuccessful
|
||
case TaskFailed:
|
||
task.FailedCount++
|
||
task.SubTaskStatus[subTaskID] = TaskFailed
|
||
case TaskPending:
|
||
task.SubTaskStatus[subTaskID] = TaskPending
|
||
}
|
||
|
||
// 检查任务完成状态
|
||
p.checkTaskCompletion(taskID)
|
||
}
|
||
}
|
||
|
||
// 检查任务是否完成
|
||
func (p *BatchProcessor) checkTaskCompletion(taskID int) {
|
||
task, exists := p.tasks[taskID]
|
||
if !exists {
|
||
return
|
||
}
|
||
|
||
// 检查是否所有子任务都完成
|
||
if task.SuccessCount+task.FailedCount >= len(p.idTaskStatus[taskID]) {
|
||
// 子任务都完成,判断成功数量
|
||
if task.SuccessCount >= task.RequiredCount {
|
||
// 成功数量满足要求
|
||
task.Status = TaskSuccessful
|
||
zap.L().Info("任务完成", zap.Int("taskID", taskID), zap.String("status", "success"))
|
||
} else {
|
||
// 成功数量不满足要求,但所有子任务完成
|
||
task.Status = TaskFailed
|
||
zap.L().Warn("任务部分成功",
|
||
zap.Int("taskID", taskID),
|
||
zap.Int("success", task.SuccessCount),
|
||
zap.Int("required", task.RequiredCount))
|
||
}
|
||
task.EndTime = time.Now()
|
||
p.inProgress[taskID] = true
|
||
} else {
|
||
// 子任务未完成
|
||
zap.L().Info("任务正在进行中", zap.Int("taskID", taskID), zap.String("status", "in progress"))
|
||
}
|
||
}
|
||
|
||
// 开始轮询
|
||
func (p *BatchProcessor) startPolling() {
|
||
go func() {
|
||
ticker := time.NewTicker(p.pollInterval)
|
||
defer ticker.Stop()
|
||
|
||
for range ticker.C {
|
||
if p.IsAllCompleted() {
|
||
p.setStatus(StatusCompleted)
|
||
zap.L().Info("所有任务已完成,停止轮询")
|
||
ticker.Stop()
|
||
break
|
||
}
|
||
for i, v := range p.inProgress {
|
||
if !v {
|
||
for i1, v1 := range p.idTaskStatus[i] {
|
||
if v1 == TaskPending {
|
||
if err := p.updateTask(i, i1); err != nil {
|
||
zap.L().Error("批量更新任务状态失败: %v", zap.Error(err))
|
||
continue
|
||
}
|
||
}
|
||
}
|
||
|
||
}
|
||
continue
|
||
}
|
||
}
|
||
}()
|
||
}
|
||
|
||
// 更新任务
|
||
func (p *BatchProcessor) updateTask(id int, taskId string) (err error) {
|
||
p.mu.Lock()
|
||
defer p.mu.Unlock()
|
||
|
||
// 任务不存在直接返回
|
||
task, exists := p.tasks[id]
|
||
if !exists {
|
||
return fmt.Errorf("任务ID %d 不存在", id)
|
||
}
|
||
|
||
switch {
|
||
case task.Data.PhotoUrl != "": // 如果有图片 URL,生成标题和内容并发布
|
||
// 生成标题和内容
|
||
title, content, err := p.generateTitleAndContent(task.Data)
|
||
if err != nil { //生成标题失败
|
||
task.Status = TaskFailed
|
||
p.inProgress[id] = true
|
||
task.EndTime = time.Now()
|
||
task.Error = fmt.Errorf("生成标题和内容失败: %s", err.Error())
|
||
p.recordSubTaskStatus(id, taskId, TaskFailed)
|
||
zap.L().Error("生成标题和内容失败: %v", zap.Error(err))
|
||
return err
|
||
}
|
||
|
||
task.Title = title
|
||
task.Content = content
|
||
// 发布内容
|
||
if err = publishImage(publishImageReq{
|
||
ArtistName: task.Data.ArtistName,
|
||
SubNum: task.Data.SubNum,
|
||
Title: task.Title,
|
||
Content: task.Content,
|
||
TikTok: task.Data.TikTok,
|
||
Instagram: task.Data.Instagram,
|
||
GeneratePhotoUrl: []string{task.Data.PhotoUrl},
|
||
MediaAccountUuids: task.Data.MediaAccountUuids,
|
||
MediaAccountNames: task.Data.MediaAccountNames,
|
||
}); err != nil { //发布失败
|
||
task.Status = TaskFailed
|
||
p.inProgress[id] = true
|
||
task.EndTime = time.Now()
|
||
task.Error = fmt.Errorf("发布内容失败: %s", err.Error())
|
||
zap.L().Error("发布内容失败: %v", zap.Error(err))
|
||
p.recordSubTaskStatus(id, taskId, TaskFailed)
|
||
return err
|
||
}
|
||
//操作成功
|
||
p.recordSubTaskStatus(id, taskId, TaskSuccessful)
|
||
|
||
case task.Data.PhotoUrl == "": // 如果没有图片 URL,处理图片生成结果
|
||
getTaskDetailRes, err := NewAiGenerator().GetTaskDetail(taskId)
|
||
if err != nil { //获取图片结果失败
|
||
zap.L().Error("查看图片生成结果失败: %v", zap.Error(err))
|
||
task.Error = fmt.Errorf("查看图片生成结果失败")
|
||
//task.Status = TaskFailed
|
||
//p.inProgress[id] = true
|
||
//task.EndTime = time.Now()
|
||
p.recordSubTaskStatus(id, taskId, TaskFailed)
|
||
return err
|
||
}
|
||
|
||
switch getTaskDetailRes.Output.TaskStatus {
|
||
case "SUCCEEDED":
|
||
if task.Status != TaskSuccessful {
|
||
//上传图片
|
||
urls := make([]string, len(getTaskDetailRes.Output.Results))
|
||
for i, v := range getTaskDetailRes.Output.Results {
|
||
urls[i] = v.URL
|
||
}
|
||
uploadedURLs, err := downloadAndUploadImages(urls)
|
||
if err != nil { //图片上传失败
|
||
zap.L().Error("图片上传失败: %v", zap.Error(err))
|
||
task.Error = fmt.Errorf("图片上传失败")
|
||
//task.Status = TaskFailed
|
||
//p.inProgress[id] = true
|
||
//task.EndTime = time.Now()
|
||
p.recordSubTaskStatus(id, taskId, TaskFailed)
|
||
return err
|
||
}
|
||
task.Urls = append(task.Urls, uploadedURLs...)
|
||
p.recordSubTaskStatus(id, taskId, TaskSuccessful)
|
||
}
|
||
|
||
case "FAILED": //第三方返回失败
|
||
if task.Status != TaskFailed {
|
||
zap.L().Error("第三方生成失败: %v", zap.Error(err))
|
||
task.Error = fmt.Errorf("生成失败")
|
||
//task.Status = TaskFailed
|
||
//p.inProgress[id] = true
|
||
//task.EndTime = time.Now()
|
||
p.recordSubTaskStatus(id, taskId, TaskFailed)
|
||
return err
|
||
}
|
||
}
|
||
p.checkTaskCompletion(id)
|
||
if p.inProgress[id] == true {
|
||
// 发布图文
|
||
if err = publishImage(publishImageReq{
|
||
ArtistName: task.Data.ArtistName,
|
||
SubNum: task.Data.SubNum,
|
||
Title: task.Title,
|
||
Content: task.Content,
|
||
TikTok: task.Data.TikTok,
|
||
Instagram: task.Data.Instagram,
|
||
MediaAccountUuids: task.Data.MediaAccountUuids,
|
||
MediaAccountNames: task.Data.MediaAccountNames,
|
||
GeneratePhotoUrl: task.Urls,
|
||
}); err != nil { //发布失败
|
||
zap.L().Error("发布内容失败: %v", zap.Error(err))
|
||
task.Error = fmt.Errorf("发布内容失败")
|
||
task.Status = TaskFailed
|
||
p.inProgress[id] = true
|
||
task.EndTime = time.Now()
|
||
return err
|
||
}
|
||
//处理成功
|
||
task.Status = TaskSuccessful
|
||
p.inProgress[id] = true
|
||
task.EndTime = time.Now()
|
||
}
|
||
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// IsAllCompleted 获取任务是否全部完成
|
||
func (p *BatchProcessor) IsAllCompleted() bool {
|
||
p.mu.RLock()
|
||
defer p.mu.RUnlock()
|
||
|
||
if len(p.inProgress) == 0 {
|
||
return true
|
||
}
|
||
|
||
// 检查是否所有任务都标记为完成
|
||
for _, completed := range p.inProgress {
|
||
if !completed {
|
||
return false
|
||
}
|
||
}
|
||
return true
|
||
}
|
||
|
||
// 获取未完成的任务列表
|
||
func (p *BatchProcessor) getIncompleteTasks() []int {
|
||
p.mu.RLock()
|
||
defer p.mu.RUnlock()
|
||
|
||
var incomplete []int
|
||
for taskID, completed := range p.inProgress {
|
||
if !completed {
|
||
incomplete = append(incomplete, taskID)
|
||
}
|
||
}
|
||
return incomplete
|
||
}
|
||
|
||
// 批量上传图片
|
||
func downloadAndUploadImages(urls []string) ([]string, error) {
|
||
var uploadedURLs []string
|
||
|
||
for _, result := range urls {
|
||
if result == "" {
|
||
continue
|
||
}
|
||
|
||
// 下载并直接上传到桶
|
||
bucketURL, err := downloadAndUploadToBucket(result)
|
||
if err != nil {
|
||
log.Printf("图片上传失败 [%s]: %v", result, err)
|
||
continue
|
||
}
|
||
|
||
uploadedURLs = append(uploadedURLs, bucketURL)
|
||
log.Printf("图片上传成功: %s -> %s", result, bucketURL)
|
||
}
|
||
|
||
if len(uploadedURLs) == 0 {
|
||
return nil, errors.New("所有图片上传失败")
|
||
}
|
||
|
||
return uploadedURLs, nil
|
||
}
|
||
|
||
// 上传图片到桶里面
|
||
func downloadAndUploadToBucket(imageURL string) (string, error) {
|
||
// 创建临时目录
|
||
tempDir := "tmp"
|
||
if err := os.MkdirAll(tempDir, 0755); err != nil {
|
||
return "", fmt.Errorf("创建临时目录失败: %v", err)
|
||
}
|
||
defer os.RemoveAll(tempDir) // 程序结束时清理整个目录
|
||
rand.Seed(time.Now().UnixNano())
|
||
fileName := fmt.Sprintf("%d%04d.jpg", time.Now().Unix(), rand.Intn(10000))
|
||
// 构建文件路径
|
||
imgPath := filepath.Join(tempDir, fileName)
|
||
|
||
// 创建文件
|
||
file, err := os.Create(imgPath)
|
||
if err != nil {
|
||
return "", fmt.Errorf("创建文件失败: %v", err)
|
||
}
|
||
defer file.Close()
|
||
|
||
log.Printf("文件创建在: %s", imgPath)
|
||
|
||
// 下载图片到文件
|
||
resp, err := http.Get(imageURL)
|
||
if err != nil {
|
||
return "", fmt.Errorf("下载图片失败: %v", err)
|
||
}
|
||
defer resp.Body.Close()
|
||
|
||
if resp.StatusCode != http.StatusOK {
|
||
return "", fmt.Errorf("下载失败,状态码: %d", resp.StatusCode)
|
||
}
|
||
|
||
// 复制到文件
|
||
_, err = io.Copy(file, resp.Body)
|
||
if err != nil {
|
||
return "", fmt.Errorf("保存文件失败: %v", err)
|
||
}
|
||
|
||
file.Sync()
|
||
|
||
fileBytes, err := os.ReadFile(imgPath)
|
||
if err != nil {
|
||
return "", fmt.Errorf("读取本地文件失败: %v", err)
|
||
}
|
||
|
||
BOSClient, err := objstorage.NewOSS(
|
||
os.Getenv(config.ConfigData.Oss.AccessKeyId),
|
||
os.Getenv(config.ConfigData.Oss.AccessKeySecret),
|
||
os.Getenv(config.ConfigData.Oss.Endpoint),
|
||
)
|
||
if BOSClient == nil {
|
||
return "", fmt.Errorf("上传文件失败: %v", err)
|
||
}
|
||
_, err = BOSClient.PutObjectFromBytes(os.Getenv(config.ConfigData.Oss.BucketName), fileName, fileBytes)
|
||
if err != nil {
|
||
return "", fmt.Errorf("上传文件失败: %v", err)
|
||
}
|
||
url := fmt.Sprintf("%s/%s", os.Getenv(config.ConfigData.Oss.CdnHost), fileName)
|
||
//上传到桶
|
||
//BOSClient, err := objstorage.NewOSS(
|
||
// config.ConfigData.Oss.AccessKeyId,
|
||
// config.ConfigData.Oss.AccessKeySecret,
|
||
// config.ConfigData.Oss.Endpoint,
|
||
//)
|
||
//if BOSClient == nil {
|
||
// return "", fmt.Errorf("上传文件失败: %v", err)
|
||
//}
|
||
//_, err = BOSClient.PutObjectFromBytes(config.ConfigData.Oss.BucketName, fileName, fileBytes)
|
||
//if err != nil {
|
||
// return "", fmt.Errorf("上传文件失败: %v", err)
|
||
//}
|
||
//url := fmt.Sprintf("%s/%s", config.ConfigData.Oss.CdnHost, fileName)
|
||
|
||
return url, nil
|
||
}
|
||
|
||
func (p *BatchProcessor) generateTitleAndContent(req *excelData) (string, string, error) {
|
||
if req.PhotoUrl != "" {
|
||
title, content, err := NewAiGenerator().GenerateTitleAndContentFromImage(
|
||
req.PhotoUrl,
|
||
req.TitleRequire,
|
||
req.ContentRequire,
|
||
)
|
||
if err != nil {
|
||
return "", "", fmt.Errorf("图生文失败: %v", err)
|
||
}
|
||
return title, content, nil
|
||
} else {
|
||
title, content, err := NewAiGenerator().GenerateTitleAndContentFromText(
|
||
req.TitleRequire,
|
||
req.ContentRequire,
|
||
)
|
||
if err != nil {
|
||
return "", "", fmt.Errorf("生成内容失败: %v", err)
|
||
}
|
||
return title, content, nil
|
||
|
||
}
|
||
}
|
||
|
||
func (p *BatchProcessor) generateImage(req *excelData) (string, error) {
|
||
prompt := fmt.Sprintf("请根据以下要求生成内容:%s\n", req.PhotoRequire)
|
||
if req.Title != "" {
|
||
prompt += fmt.Sprintf("1:标题:%s\n", req.Title) // 关联标题
|
||
}
|
||
if req.Content != "" {
|
||
prompt += fmt.Sprintf("2:内容:%s\n", req.Content) // 关联内容
|
||
}
|
||
if req.Desc != "" {
|
||
prompt += fmt.Sprintf("3:艺人简介:%s,艺人简介的优先级要低,只依据艺人简介的风格", req.Desc)
|
||
}
|
||
prompt += "\n请基于标题和内容生成单张图片,高质量,高分辨率。"
|
||
|
||
result, err := NewAiGenerator().TextToImage(
|
||
prompt,
|
||
"1024*1024",
|
||
1,
|
||
)
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
return result.Output.TaskID, nil
|
||
}
|
||
|
||
//func (p *BatchProcessor) StartPolling() {
|
||
// go func() {
|
||
// ticker := time.NewTicker(p.pollInterval) // 1秒间隔
|
||
// defer ticker.Stop()
|
||
//
|
||
// // 令牌桶,控制每秒最多10个请求
|
||
// tokenBucket := make(chan struct{}, 10)
|
||
//
|
||
// // 每秒补充令牌
|
||
// go func() {
|
||
// refillTicker := time.NewTicker(time.Second)
|
||
// defer refillTicker.Stop()
|
||
//
|
||
// for {
|
||
// select {
|
||
// case <-refillTicker.C:
|
||
// // 每秒补充到10个令牌
|
||
// for i := 0; i < 10-len(tokenBucket); i++ {
|
||
// select {
|
||
// case tokenBucket <- struct{}{}:
|
||
// default:
|
||
// // 桶已满,跳过
|
||
// }
|
||
// }
|
||
// }
|
||
// }
|
||
// }()
|
||
//
|
||
// for range ticker.C {
|
||
// if p.IsAllCompleted() {
|
||
// p.SetStatus(StatusCompleted)
|
||
// zap.L().Info("所有任务已完成,停止轮询")
|
||
// ticker.Stop()
|
||
// break
|
||
// }
|
||
//
|
||
// // 获取未完成的任务
|
||
// incompleteTasks := p.getIncompleteTasks()
|
||
// if len(incompleteTasks) == 0 {
|
||
// continue
|
||
// }
|
||
//
|
||
// // 处理当前可用的任务(最多10个)
|
||
// processedCount := 0
|
||
// for _, taskID := range incompleteTasks {
|
||
// if processedCount >= 10 {
|
||
// break // 本秒已达到10个请求限制
|
||
// }
|
||
//
|
||
// select {
|
||
// case <-tokenBucket:
|
||
// // 获取到令牌,可以发送请求
|
||
// processedCount++
|
||
// go p.updateTaskWithToken(taskID, tokenBucket)
|
||
// default:
|
||
// // 没有令牌了,跳过
|
||
// break
|
||
// }
|
||
// }
|
||
//
|
||
// zap.L().Debug("本轮处理任务数量",
|
||
// zap.Int("processed", processedCount),
|
||
// zap.Int("remaining", len(incompleteTasks)-processedCount))
|
||
// }
|
||
// }()
|
||
//}
|
||
//
|
||
//// 使用令牌更新任务状态
|
||
//func (p *BatchProcessor) updateTaskWithToken(taskID string, tokenBucket chan struct{}) {
|
||
// defer func() {
|
||
// // 任务完成后不返还令牌,由定时器统一补充
|
||
// }()
|
||
//
|
||
// if err := p.UpdateTaskStatuses(taskID); err != nil {
|
||
// zap.L().Error("更新任务状态失败",
|
||
// zap.String("task_id", taskID),
|
||
// zap.Error(err))
|
||
// }
|
||
//}
|