Compare commits

...

2 Commits

Author SHA1 Message Date
bx1834938347-prog
c1c94d6f13 Merge branch 'wwq' into dev 2025-12-08 11:18:54 +08:00
bx1834938347-prog
cfe6f2b756 style :ai生成图文 2025-12-08 11:18:44 +08:00
3 changed files with 368 additions and 371 deletions

View File

@ -12,7 +12,7 @@ import (
)
func ImageContentGetResult(c *gin.Context) {
processor := GetBatchProcessorRead()
processor := GetBatchProcessorReadOnly()
if processor == nil {
service.Success(c, gin.H{
"status": 0,
@ -21,7 +21,7 @@ func ImageContentGetResult(c *gin.Context) {
})
return
}
currentStatus := processor.GetStatus()
currentStatus := processor.getStatus()
switch currentStatus {
case StatusIdle:
service.Success(c, gin.H{
@ -65,12 +65,12 @@ func ImageContentGetResult(c *gin.Context) {
}
}
func ImageContentGetResultExcel(c *gin.Context) {
processor := GetBatchProcessorRead()
processor := GetBatchProcessorReadOnly()
if processor == nil {
service.Error(c, errors.New("任务未开始或者任务在处理中"))
return
}
currentStatus := processor.GetStatus()
currentStatus := processor.getStatus()
switch currentStatus {
case StatusCompleted:
_, _, _, _, failedTasks := processor.GetTaskStatistics()

View File

@ -63,8 +63,8 @@ func getErrorMessage(err error) string {
}
func ImageContentImport(c *gin.Context) {
processor := GetBatchProcessor()
currentStatus := processor.GetStatus()
processor := GetOrCreateBatchProcessor()
currentStatus := processor.getStatus()
switch currentStatus {
case StatusProcessing: //进行中
service.Error(c, errors.New("当前有任务正在执行中,请先查看执行进度"))
@ -110,17 +110,17 @@ func ImageContentImport(c *gin.Context) {
service.Error(c, errors.New("请检查excel文件"))
return
}
processor.SetStatus(StatusProcessing)
//设置全局状态为进行中
processor.setStatus(StatusProcessing)
//设置请求间隔
qps := 10
interval := time.Second / time.Duration(qps)
for i, v := range readExcelResult {
// 在每次请求前等待
if i > 0 {
time.Sleep(interval)
}
if err := processor.submitSingleTask(&v); err != nil {
if err := processor.submitTask(&v); err != nil {
task := &ImageTask{
Data: &v,
TaskID: i,
@ -131,7 +131,8 @@ func ImageContentImport(c *gin.Context) {
processor.inProgress[v.LineNum] = true
}
}
processor.StartPolling()
//开始轮询
processor.startPolling()
service.Success(c, gin.H{
"message": "导入成功",
"total": len(readExcelResult),

View File

@ -33,8 +33,8 @@ const (
)
var (
batchProcessor *BatchProcessor
Mutex sync.Mutex
batchProcessor *BatchProcessor
batchProcessorMutex sync.Mutex
)
const (
@ -47,32 +47,34 @@ var StatusMap = map[int]string{StatusIdle: "空闲中", StatusProcessing: "处
type BatchProcessor struct {
mu sync.RWMutex
tasks map[int]*ImageTask //任务 编号id
idFindTaskId map[int]string
inProgress map[int]bool //是否成功
pollInterval time.Duration
status int // 当前状态
tasks map[int]*ImageTask //任务唯一标识find任务详情
inProgress map[int]bool //任务唯一标识find任务进行状态
idFindTaskId map[int]string //任务唯一标识find第三方编号id
pollInterval time.Duration //间隔时间
status int //全局实例状态
}
type ImageTask struct {
Data *excelData
TaskID int
Status TaskStatus
StartTime time.Time
EndTime time.Time
RetryCount int
Error error
Title string //标题
Content string //内容
Data *excelData //导入的初始任务数据
TaskID int //任务唯一标识
Status TaskStatus //第三方返回的错误
Error error //任务是否报错
Title string //生成的标题
Content string //生成的内容
StartTime time.Time //任务开始时间
EndTime time.Time //任务结束时间
RetryCount int //重试次数
}
func GetBatchProcessorRead() *BatchProcessor {
// GetBatchProcessorReadOnly 获取只读实例
func GetBatchProcessorReadOnly() *BatchProcessor {
return batchProcessor
}
func GetBatchProcessor() *BatchProcessor {
Mutex.Lock()
defer Mutex.Unlock()
// GetOrCreateBatchProcessor 获取实例
func GetOrCreateBatchProcessor() *BatchProcessor {
batchProcessorMutex.Lock()
defer batchProcessorMutex.Unlock()
if batchProcessor == nil || batchProcessor.status == StatusCompleted {
batchProcessor = &BatchProcessor{
@ -86,7 +88,8 @@ func GetBatchProcessor() *BatchProcessor {
return batchProcessor
}
func (p *BatchProcessor) SetStatus(status int) {
// 设置任务状态
func (p *BatchProcessor) setStatus(status int) {
p.mu.Lock()
defer p.mu.Unlock()
p.status = status
@ -94,15 +97,36 @@ func (p *BatchProcessor) SetStatus(status int) {
}
// GetStatus 获取当前状态
func (p *BatchProcessor) GetStatus() int {
func (p *BatchProcessor) getStatus() int {
p.mu.RLock()
defer p.mu.RUnlock()
return p.status
}
// 提交任务
func (p *BatchProcessor) submitSingleTask(req *excelData) error {
//var infoResp *accountFiee.UserInfoResponse
// GetTaskStatistics 获取实例相关信息
func (p *BatchProcessor) GetTaskStatistics() (completed, pending, total int, completedTasks, failedTasks []*ImageTask) {
p.mu.Lock()
defer p.mu.Unlock()
total = len(p.tasks)
for _, task := range p.tasks {
if p.inProgress[task.TaskID] { //是否转换成功
completed++
if task.Status == TaskSuccessful && task.Error == nil { //转换成功 并且 发布成功
completedTasks = append(completedTasks, task)
} else if task.Status == TaskFailed || task.Error != nil { //转换失败 或者 发布失败
failedTasks = append(failedTasks, task)
}
} else {
pending++
}
}
return completed, pending, total, completedTasks, failedTasks
}
// 提交一个任务
func (p *BatchProcessor) submitTask(req *excelData) error {
// 获取用户信息
list, err := service.AccountFieeProvider.UserList(context.Background(), &accountFiee.UserListRequest{
Name: req.ArtistName,
SubNum: req.SubNum,
@ -113,16 +137,16 @@ func (p *BatchProcessor) submitSingleTask(req *excelData) error {
if len(list.UserList) == 0 {
return fmt.Errorf("未找到用户信息: %s", req.ArtistName)
}
if len(list.UserList) > 0 {
_, err = service.AccountFieeProvider.Info(context.Background(), &accountFiee.InfoRequest{
ID: list.UserList[0].Id,
Domain: "app",
})
if err != nil {
return fmt.Errorf("获取用户信息失败: %s", err.Error())
}
// 获取用户详细信息
_, err = service.AccountFieeProvider.Info(context.Background(), &accountFiee.InfoRequest{
ID: list.UserList[0].Id,
Domain: "app",
})
if err != nil {
return fmt.Errorf("获取用户详细信息失败: %s", err.Error())
}
// 获取 TikTok 自媒体账号
accountListTikTok, err := service.CastProvider.MediaUserList(context.Background(), &apiCast.MediaUserListReq{
ArtistVal: req.ArtistName,
PlatformID: 1,
@ -130,12 +154,13 @@ func (p *BatchProcessor) submitSingleTask(req *excelData) error {
PageSize: 10,
ArtistUuid: strconv.FormatUint(list.UserList[0].Id, 10),
})
if err != nil {
return fmt.Errorf("获取 TikTok 账号失败: %s", err.Error())
}
if accountListTikTok == nil || len(accountListTikTok.Data) == 0 {
return fmt.Errorf("tiktok自媒体账号数量为0")
}
if err != nil {
return fmt.Errorf("tiktok自媒体账号数量获取失败: %s,账号数量:%d", err.Error(), len(accountListTikTok.Data))
}
// 查找 TikTok 账号
tiktokFound := false
for _, user := range accountListTikTok.Data {
if user.PlatformUserName == req.TikTok {
@ -148,6 +173,7 @@ func (p *BatchProcessor) submitSingleTask(req *excelData) error {
if !tiktokFound {
return fmt.Errorf("未找到匹配的TikTok账号: %s", req.TikTok)
}
// 获取 Instagram 自媒体账号
accountListIns, err := service.CastProvider.MediaUserList(context.Background(), &apiCast.MediaUserListReq{
ArtistVal: req.ArtistName,
PlatformID: 3,
@ -155,12 +181,14 @@ func (p *BatchProcessor) submitSingleTask(req *excelData) error {
PageSize: 10,
ArtistUuid: strconv.FormatUint(list.UserList[0].Id, 10),
})
if err != nil {
return fmt.Errorf("获取 Instagram 账号失败: %s", err.Error())
}
if accountListIns == nil || len(accountListIns.Data) == 0 {
return fmt.Errorf("ins自媒体账号数量为0")
}
if err != nil {
return fmt.Errorf("ins自媒体账号数量获取失败: %s,账号数量:%d", err.Error(), len(accountListIns.Data))
}
// 查找 Instagram 账号
insFound := false
for _, user := range accountListIns.Data {
if user.PlatformUserName == req.Instagram {
@ -175,42 +203,26 @@ func (p *BatchProcessor) submitSingleTask(req *excelData) error {
}
p.mu.Lock()
defer p.mu.Unlock()
//title, content, err := p.generateTitleAndContent(req)
//if err != nil {
// return fmt.Errorf("生成标题和内容失败: %v", err)
//}
if req.PhotoUrl == "" {
//生成标题
switch {
case req.PhotoUrl == "": // 如果没有提供照片 URL生成标题和内容
// 生成标题和内容
title, content, err := p.generateTitleAndContent(req)
if err != nil {
//task := &ImageTask{
// Data: req,
// Error: err,
// Status: TaskFailed,
// StartTime: time.Now(),
//}
//p.tasks[req.LineNum] = task
//p.inProgress[req.LineNum] = true
//zap.L().Error("生成标题和内容失败: %v", zap.Error(err))
zap.L().Error("生成标题和内容失败: %v", zap.Error(err))
return fmt.Errorf("生成标题失败")
}
req.Title = title
req.Content = content
//请求图片
generateImageRes, err := p.generateImage(req)
// 请求生成图片
taskId, err := p.generateImage(req)
if err != nil {
//task := &ImageTask{
// Data: req,
// Error: err,
// Status: TaskFailed,
// StartTime: time.Now(),
//}
//p.tasks[req.LineNum] = task
//p.inProgress[req.LineNum] = true
//zap.L().Error("生成图片失败", zap.Error(err))
zap.L().Error("生成图片失败: %v", zap.Error(err))
return fmt.Errorf("生成图片失败")
}
// 创建并保存任务
task := &ImageTask{
Title: title,
Content: content,
@ -219,24 +231,294 @@ func (p *BatchProcessor) submitSingleTask(req *excelData) error {
Status: TaskPending,
StartTime: time.Now(),
}
p.idFindTaskId[req.LineNum] = generateImageRes
p.tasks[req.LineNum] = task
p.inProgress[req.LineNum] = false
return nil
p.idFindTaskId[req.LineNum] = taskId
case req.PhotoUrl != "": //如果有图片
task := &ImageTask{
Data: req,
TaskID: req.LineNum,
Status: TaskPending,
StartTime: time.Now(),
}
p.tasks[req.LineNum] = task
p.inProgress[req.LineNum] = false
}
task := &ImageTask{
Data: req,
TaskID: req.LineNum,
Status: TaskPending,
StartTime: time.Now(),
return nil
}
// 开始轮询
func (p *BatchProcessor) startPolling() {
go func() {
ticker := time.NewTicker(p.pollInterval)
defer ticker.Stop()
for range ticker.C {
if p.IsAllCompleted() {
p.setStatus(StatusCompleted)
zap.L().Info("所有任务已完成,停止轮询")
ticker.Stop()
break
}
for i, v := range p.inProgress {
if !v {
if err := p.updateTask(i); err != nil {
zap.L().Error("批量更新任务状态失败: %v", zap.Error(err))
continue
}
}
continue
}
}
}()
}
// 更新任务
func (p *BatchProcessor) updateTask(id int) (err error) {
p.mu.Lock()
defer p.mu.Unlock()
// 任务不存在直接返回
task, exists := p.tasks[id]
if !exists {
return fmt.Errorf("任务ID %d 不存在", id)
}
switch {
case task.Data.PhotoUrl != "": // 如果有图片 URL生成标题和内容并发布
// 生成标题和内容
title, content, err := p.generateTitleAndContent(task.Data)
if err != nil { //生成标题失败
task.Status = TaskFailed
p.inProgress[id] = true
task.EndTime = time.Now()
task.Error = fmt.Errorf("生成标题和内容失败: %s", err.Error())
zap.L().Error("生成标题和内容失败: %v", zap.Error(err))
return err
}
task.Title = title
task.Content = content
// 发布内容
if err = publishImage(publishImageReq{
ArtistName: task.Data.ArtistName,
SubNum: task.Data.SubNum,
Title: task.Title,
Content: task.Content,
TikTok: task.Data.TikTok,
Instagram: task.Data.Instagram,
GeneratePhotoUrl: []string{task.Data.PhotoUrl},
MediaAccountUuids: task.Data.MediaAccountUuids,
MediaAccountNames: task.Data.MediaAccountNames,
}); err != nil { //发布失败
task.Status = TaskFailed
p.inProgress[id] = true
task.EndTime = time.Now()
task.Error = fmt.Errorf("发布内容失败: %s", err.Error())
zap.L().Error("发布内容失败: %v", zap.Error(err))
return err
}
//操作成功
task.Status = TaskSuccessful
p.inProgress[id] = true
task.EndTime = time.Now()
case task.Data.PhotoUrl == "": // 如果没有图片 URL处理图片生成结果
//获取图片结果
getTaskDetailRes, err := NewAiGenerator().GetTaskDetail(p.idFindTaskId[id])
if err != nil { //获取图片结果失败
zap.L().Error("查看图片生成结果失败: %v", zap.Error(err))
task.Error = fmt.Errorf("查看图片生成结果失败")
task.Status = TaskFailed
p.inProgress[id] = true
task.EndTime = time.Now()
return err
}
switch getTaskDetailRes.Output.TaskStatus {
case "SUCCEEDED":
if task.Status != TaskSuccessful {
//上传图片
urls := make([]string, len(getTaskDetailRes.Output.Results))
for i, v := range getTaskDetailRes.Output.Results {
urls[i] = v.URL
}
uploadedURLs, err := downloadAndUploadImages(urls)
if err != nil { //图片上传失败
zap.L().Error("图片上传失败: %v", zap.Error(err))
task.Error = fmt.Errorf("图片上传失败")
task.Status = TaskFailed
p.inProgress[id] = true
task.EndTime = time.Now()
return err
}
// 发布图文
if err = publishImage(publishImageReq{
ArtistName: task.Data.ArtistName,
SubNum: task.Data.SubNum,
Title: task.Title,
Content: task.Content,
TikTok: task.Data.TikTok,
Instagram: task.Data.Instagram,
MediaAccountUuids: task.Data.MediaAccountUuids,
MediaAccountNames: task.Data.MediaAccountNames,
GeneratePhotoUrl: uploadedURLs,
}); err != nil { //发布失败
zap.L().Error("发布内容失败: %v", zap.Error(err))
task.Error = fmt.Errorf("发布内容失败")
task.Status = TaskFailed
p.inProgress[id] = true
task.EndTime = time.Now()
return err
}
//处理成功
task.Status = TaskSuccessful
p.inProgress[id] = true
task.EndTime = time.Now()
}
case "FAILED": //第三方返回失败
if task.Status != TaskFailed {
zap.L().Error("第三方生成失败: %v", zap.Error(err))
task.Error = fmt.Errorf("生成失败")
task.Status = TaskFailed
p.inProgress[id] = true
task.EndTime = time.Now()
return err
}
}
}
p.tasks[req.LineNum] = task
p.inProgress[req.LineNum] = false
return nil
}
// IsAllCompleted 获取任务是否全部完成
func (p *BatchProcessor) IsAllCompleted() bool {
p.mu.RLock()
defer p.mu.RUnlock()
if len(p.inProgress) == 0 {
return true
}
// 检查是否所有任务都标记为完成
for _, completed := range p.inProgress {
if !completed {
return false
}
}
return true
}
// 获取未完成的任务列表
func (p *BatchProcessor) getIncompleteTasks() []int {
p.mu.RLock()
defer p.mu.RUnlock()
var incomplete []int
for taskID, completed := range p.inProgress {
if !completed {
incomplete = append(incomplete, taskID)
}
}
return incomplete
}
// 批量上传图片
func downloadAndUploadImages(urls []string) ([]string, error) {
var uploadedURLs []string
for _, result := range urls {
if result == "" {
continue
}
// 下载并直接上传到桶
bucketURL, err := downloadAndUploadToBucket(result)
if err != nil {
log.Printf("图片上传失败 [%s]: %v", result, err)
continue
}
uploadedURLs = append(uploadedURLs, bucketURL)
log.Printf("图片上传成功: %s -> %s", result, bucketURL)
}
if len(uploadedURLs) == 0 {
return nil, errors.New("所有图片上传失败")
}
return uploadedURLs, nil
}
// 上传图片到桶里面
func downloadAndUploadToBucket(imageURL string) (string, error) {
// 创建临时目录
tempDir := "tmp"
if err := os.MkdirAll(tempDir, 0755); err != nil {
return "", fmt.Errorf("创建临时目录失败: %v", err)
}
defer os.RemoveAll(tempDir) // 程序结束时清理整个目录
// 生成唯一文件名
fileName := fmt.Sprintf("%d.jpg",
time.Now().Unix())
// 构建文件路径
imgPath := filepath.Join(tempDir, fileName)
// 创建文件
file, err := os.Create(imgPath)
if err != nil {
return "", fmt.Errorf("创建文件失败: %v", err)
}
defer file.Close()
log.Printf("文件创建在: %s", imgPath)
// 下载图片到文件
resp, err := http.Get(imageURL)
if err != nil {
return "", fmt.Errorf("下载图片失败: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("下载失败,状态码: %d", resp.StatusCode)
}
// 复制到文件
_, err = io.Copy(file, resp.Body)
if err != nil {
return "", fmt.Errorf("保存文件失败: %v", err)
}
file.Sync()
fileBytes, err := os.ReadFile(imgPath)
if err != nil {
return "", fmt.Errorf("读取本地文件失败: %v", err)
}
// 上传到桶
BOSClient, err := objstorage.NewOSS(
os.Getenv(config.ConfigData.Oss.AccessKeyId),
os.Getenv(config.ConfigData.Oss.AccessKeySecret),
os.Getenv(config.ConfigData.Oss.Endpoint),
)
if BOSClient == nil {
return "", fmt.Errorf("上传文件失败: %v", err)
}
_, err = BOSClient.PutObjectFromBytes(os.Getenv(config.ConfigData.Oss.BucketName), fileName, fileBytes)
if err != nil {
return "", fmt.Errorf("上传文件失败: %v", err)
}
url := fmt.Sprintf("%s/%s", os.Getenv(config.ConfigData.Oss.CdnHost), fileName)
return url, nil
}
func (p *BatchProcessor) generateTitleAndContent(req *excelData) (string, string, error) {
if req.PhotoUrl != "" {
title, content, err := NewAiGenerator().GenerateTitleAndContentFromImage(
@ -363,51 +645,6 @@ func (p *BatchProcessor) generateImage(req *excelData) (string, error) {
return result.Output.TaskID, nil
}
func (p *BatchProcessor) GetTaskStatistics() (completed, pending, total int, completedTasks, failedTasks []*ImageTask) {
p.mu.Lock()
defer p.mu.Unlock()
total = len(p.tasks)
for _, task := range p.tasks {
if p.inProgress[task.TaskID] { //是否转换成功
completed++
if task.Status == TaskSuccessful && task.Error == nil { //转换成功 并且 发布成功
completedTasks = append(completedTasks, task)
} else if task.Status == TaskFailed || task.Error != nil { //转换失败 或者 发布失败
failedTasks = append(failedTasks, task)
}
} else {
pending++
}
}
return completed, pending, total, completedTasks, failedTasks
}
func (p *BatchProcessor) StartPolling() {
go func() {
ticker := time.NewTicker(p.pollInterval)
defer ticker.Stop()
for range ticker.C {
if p.IsAllCompleted() {
p.SetStatus(StatusCompleted)
zap.L().Info("所有任务已完成,停止轮询")
ticker.Stop()
break
}
for i, v := range p.inProgress {
if !v {
if err := p.UpdateTaskStatuses(i); err != nil {
zap.L().Error("批量更新任务状态失败: %v", zap.Error(err))
continue
}
}
continue
}
}
}()
}
//func (p *BatchProcessor) StartPolling() {
// go func() {
// ticker := time.NewTicker(p.pollInterval) // 1秒间隔
@ -487,244 +724,3 @@ func (p *BatchProcessor) StartPolling() {
// zap.Error(err))
// }
//}
// 获取未完成的任务列表
func (p *BatchProcessor) getIncompleteTasks() []int {
p.mu.RLock()
defer p.mu.RUnlock()
var incomplete []int
for taskID, completed := range p.inProgress {
if !completed {
incomplete = append(incomplete, taskID)
}
}
return incomplete
}
func (p *BatchProcessor) IsAllCompleted() bool {
p.mu.RLock()
defer p.mu.RUnlock()
if len(p.inProgress) == 0 {
return true
}
// 检查是否所有任务都标记为完成
for _, completed := range p.inProgress {
if !completed {
return false
}
}
return true
}
func (p *BatchProcessor) UpdateTaskStatuses(id int) (err error) {
p.mu.Lock()
defer p.mu.Unlock()
if p.tasks[id].Data.PhotoUrl != "" { //如果有图片
title, content, err := p.generateTitleAndContent(p.tasks[id].Data)
if err != nil {
p.tasks[id].Status = TaskFailed
p.inProgress[id] = true
p.tasks[id].EndTime = time.Now()
zap.L().Error("生成标题和内容失败: %v", zap.Error(err))
p.tasks[id].Error = fmt.Errorf("生成标题和内容失败")
return err
}
p.tasks[id].Title = title
p.tasks[id].Content = content
if err = publishImage(publishImageReq{
ArtistName: p.tasks[id].Data.ArtistName,
SubNum: p.tasks[id].Data.SubNum,
Title: p.tasks[id].Title,
Content: p.tasks[id].Content,
TikTok: p.tasks[id].Data.TikTok,
Instagram: p.tasks[id].Data.Instagram,
GeneratePhotoUrl: []string{p.tasks[id].Data.PhotoUrl},
MediaAccountUuids: p.tasks[id].Data.MediaAccountUuids,
MediaAccountNames: p.tasks[id].Data.MediaAccountNames,
}); err != nil {
p.tasks[id].Status = TaskFailed
p.inProgress[id] = true
p.tasks[id].EndTime = time.Now()
zap.L().Error("发布内容失败: %v", zap.Error(err))
p.tasks[id].Error = fmt.Errorf("发布内容失败")
}
p.tasks[id].Status = TaskSuccessful
p.inProgress[id] = true
p.tasks[id].EndTime = time.Now()
return err
}
getTaskDetailRes, err := NewAiGenerator().GetTaskDetail(p.idFindTaskId[id])
if err != nil {
zap.L().Error("查看图片生成结果失败: %v", zap.Error(err))
return fmt.Errorf("查看图片生成结果失败")
}
// 更新本地任务状态
if localTask, exists := p.tasks[id]; exists {
switch getTaskDetailRes.Output.TaskStatus {
case "SUCCEEDED":
if localTask.Status != TaskSuccessful {
////生成标题
//title, content, err := p.generateTitleAndContent(p.tasks[taskId].Data)
//if err != nil {
// localTask.Status = TaskFailed
// p.tasks[getTaskDetailRes.Output.TaskID].Error = err
// p.inProgress[getTaskDetailRes.Output.TaskID] = true
// p.tasks[getTaskDetailRes.Output.TaskID].EndTime = time.Now()
// zap.L().Error("生成标题和内容失败: %v", zap.Error(err))
// return fmt.Errorf("生成标题和内容失败")
//}
//p.tasks[taskId].Title = title
//p.tasks[taskId].Content = content
//上传图片
localTask.EndTime = time.Now()
urls := make([]string, 0, len(getTaskDetailRes.Output.Results))
for _, v1 := range getTaskDetailRes.Output.Results {
urls = append(urls, v1.URL)
}
uploadedURLs, err := downloadAndUploadImages(urls)
if err != nil {
localTask.Status = TaskFailed
p.tasks[id].Error = err
p.inProgress[id] = true
p.tasks[id].EndTime = time.Now()
zap.L().Error("图片上传失败: %v", zap.Error(err))
localTask.Error = fmt.Errorf("图片上传失败")
return err
}
var messages string
for _, v1 := range getTaskDetailRes.Output.Results {
messages += v1.Message
}
//发布
if err = publishImage(publishImageReq{
ArtistName: p.tasks[id].Data.ArtistName,
SubNum: p.tasks[id].Data.SubNum,
Title: p.tasks[id].Title,
Content: p.tasks[id].Content,
TikTok: p.tasks[id].Data.TikTok,
Instagram: p.tasks[id].Data.Instagram,
MediaAccountUuids: p.tasks[id].Data.MediaAccountUuids,
MediaAccountNames: p.tasks[id].Data.MediaAccountNames,
GeneratePhotoUrl: uploadedURLs,
}); err != nil {
localTask.Status = TaskFailed
p.tasks[id].Error = err
p.inProgress[id] = true
p.tasks[id].EndTime = time.Now()
}
//成功
localTask.Status = TaskSuccessful
p.inProgress[id] = true
p.tasks[id].EndTime = time.Now()
}
case "FAILED":
if localTask.Status != TaskFailed {
localTask.Status = TaskFailed
p.tasks[id].Error = errors.New("生成失败")
p.inProgress[id] = true
p.tasks[id].EndTime = time.Now()
}
}
}
return nil
}
func downloadAndUploadImages(urls []string) ([]string, error) {
var uploadedURLs []string
for _, result := range urls {
if result == "" {
continue
}
// 下载并直接上传到桶
bucketURL, err := downloadAndUploadToBucket(result)
if err != nil {
log.Printf("图片上传失败 [%s]: %v", result, err)
continue
}
uploadedURLs = append(uploadedURLs, bucketURL)
log.Printf("图片上传成功: %s -> %s", result, bucketURL)
}
if len(uploadedURLs) == 0 {
return nil, errors.New("所有图片上传失败")
}
return uploadedURLs, nil
}
func downloadAndUploadToBucket(imageURL string) (string, error) {
// 创建临时目录
tempDir := "tmp"
if err := os.MkdirAll(tempDir, 0755); err != nil {
return "", fmt.Errorf("创建临时目录失败: %v", err)
}
defer os.RemoveAll(tempDir) // 程序结束时清理整个目录
// 生成唯一文件名
fileName := fmt.Sprintf("%d.jpg",
time.Now().Unix())
// 构建文件路径
imgPath := filepath.Join(tempDir, fileName)
// 创建文件
file, err := os.Create(imgPath)
if err != nil {
return "", fmt.Errorf("创建文件失败: %v", err)
}
defer file.Close()
log.Printf("文件创建在: %s", imgPath)
// 下载图片到文件
resp, err := http.Get(imageURL)
if err != nil {
return "", fmt.Errorf("下载图片失败: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("下载失败,状态码: %d", resp.StatusCode)
}
// 复制到文件
_, err = io.Copy(file, resp.Body)
if err != nil {
return "", fmt.Errorf("保存文件失败: %v", err)
}
file.Sync()
fileBytes, err := os.ReadFile(imgPath)
if err != nil {
return "", fmt.Errorf("读取本地文件失败: %v", err)
}
// 上传到桶
BOSClient, err := objstorage.NewOSS(
os.Getenv(config.ConfigData.Oss.AccessKeyId),
os.Getenv(config.ConfigData.Oss.AccessKeySecret),
os.Getenv(config.ConfigData.Oss.Endpoint),
)
if BOSClient == nil {
return "", fmt.Errorf("上传文件失败: %v", err)
}
_, err = BOSClient.PutObjectFromBytes(os.Getenv(config.ConfigData.Oss.BucketName), fileName, fileBytes)
if err != nil {
return "", fmt.Errorf("上传文件失败: %v", err)
}
url := fmt.Sprintf("%s/%s", os.Getenv(config.ConfigData.Oss.CdnHost), fileName)
return url, nil
}