Merge branch 'feat-cjy-tag' into dev
This commit is contained in:
commit
7833d399fc
1337
api/cast/cast.pb.go
1337
api/cast/cast.pb.go
File diff suppressed because it is too large
Load Diff
@ -20493,6 +20493,8 @@ func (m *ListCastTagsReq) validate(all bool) error {
|
||||
|
||||
// no validation rules for CreatedAtEnd
|
||||
|
||||
// no validation rules for IsWatchCountCalled
|
||||
|
||||
// no validation rules for OrderBy
|
||||
|
||||
// no validation rules for Sort
|
||||
@ -20625,6 +20627,8 @@ func (m *CastTagInfo) validate(all bool) error {
|
||||
|
||||
// no validation rules for Success
|
||||
|
||||
// no validation rules for IsWatchCountCalled
|
||||
|
||||
if len(errors) > 0 {
|
||||
return CastTagInfoMultiError(errors)
|
||||
}
|
||||
@ -21114,6 +21118,142 @@ var _ interface {
|
||||
ErrorName() string
|
||||
} = UpdateCastTagBatchRespValidationError{}
|
||||
|
||||
// Validate checks the field values on BatchUpdateCastTagsReq with the rules
|
||||
// defined in the proto definition for this message. If any rules are
|
||||
// violated, the first error encountered is returned, or nil if there are no violations.
|
||||
func (m *BatchUpdateCastTagsReq) Validate() error {
|
||||
return m.validate(false)
|
||||
}
|
||||
|
||||
// ValidateAll checks the field values on BatchUpdateCastTagsReq with the rules
|
||||
// defined in the proto definition for this message. If any rules are
|
||||
// violated, the result is a list of violation errors wrapped in
|
||||
// BatchUpdateCastTagsReqMultiError, or nil if none found.
|
||||
func (m *BatchUpdateCastTagsReq) ValidateAll() error {
|
||||
return m.validate(true)
|
||||
}
|
||||
|
||||
func (m *BatchUpdateCastTagsReq) validate(all bool) error {
|
||||
if m == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var errors []error
|
||||
|
||||
for idx, item := range m.GetData() {
|
||||
_, _ = idx, item
|
||||
|
||||
if all {
|
||||
switch v := interface{}(item).(type) {
|
||||
case interface{ ValidateAll() error }:
|
||||
if err := v.ValidateAll(); err != nil {
|
||||
errors = append(errors, BatchUpdateCastTagsReqValidationError{
|
||||
field: fmt.Sprintf("Data[%v]", idx),
|
||||
reason: "embedded message failed validation",
|
||||
cause: err,
|
||||
})
|
||||
}
|
||||
case interface{ Validate() error }:
|
||||
if err := v.Validate(); err != nil {
|
||||
errors = append(errors, BatchUpdateCastTagsReqValidationError{
|
||||
field: fmt.Sprintf("Data[%v]", idx),
|
||||
reason: "embedded message failed validation",
|
||||
cause: err,
|
||||
})
|
||||
}
|
||||
}
|
||||
} else if v, ok := interface{}(item).(interface{ Validate() error }); ok {
|
||||
if err := v.Validate(); err != nil {
|
||||
return BatchUpdateCastTagsReqValidationError{
|
||||
field: fmt.Sprintf("Data[%v]", idx),
|
||||
reason: "embedded message failed validation",
|
||||
cause: err,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if len(errors) > 0 {
|
||||
return BatchUpdateCastTagsReqMultiError(errors)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// BatchUpdateCastTagsReqMultiError is an error wrapping multiple validation
|
||||
// errors returned by BatchUpdateCastTagsReq.ValidateAll() if the designated
|
||||
// constraints aren't met.
|
||||
type BatchUpdateCastTagsReqMultiError []error
|
||||
|
||||
// Error returns a concatenation of all the error messages it wraps.
|
||||
func (m BatchUpdateCastTagsReqMultiError) Error() string {
|
||||
var msgs []string
|
||||
for _, err := range m {
|
||||
msgs = append(msgs, err.Error())
|
||||
}
|
||||
return strings.Join(msgs, "; ")
|
||||
}
|
||||
|
||||
// AllErrors returns a list of validation violation errors.
|
||||
func (m BatchUpdateCastTagsReqMultiError) AllErrors() []error { return m }
|
||||
|
||||
// BatchUpdateCastTagsReqValidationError is the validation error returned by
|
||||
// BatchUpdateCastTagsReq.Validate if the designated constraints aren't met.
|
||||
type BatchUpdateCastTagsReqValidationError struct {
|
||||
field string
|
||||
reason string
|
||||
cause error
|
||||
key bool
|
||||
}
|
||||
|
||||
// Field function returns field value.
|
||||
func (e BatchUpdateCastTagsReqValidationError) Field() string { return e.field }
|
||||
|
||||
// Reason function returns reason value.
|
||||
func (e BatchUpdateCastTagsReqValidationError) Reason() string { return e.reason }
|
||||
|
||||
// Cause function returns cause value.
|
||||
func (e BatchUpdateCastTagsReqValidationError) Cause() error { return e.cause }
|
||||
|
||||
// Key function returns key value.
|
||||
func (e BatchUpdateCastTagsReqValidationError) Key() bool { return e.key }
|
||||
|
||||
// ErrorName returns error name.
|
||||
func (e BatchUpdateCastTagsReqValidationError) ErrorName() string {
|
||||
return "BatchUpdateCastTagsReqValidationError"
|
||||
}
|
||||
|
||||
// Error satisfies the builtin error interface
|
||||
func (e BatchUpdateCastTagsReqValidationError) Error() string {
|
||||
cause := ""
|
||||
if e.cause != nil {
|
||||
cause = fmt.Sprintf(" | caused by: %v", e.cause)
|
||||
}
|
||||
|
||||
key := ""
|
||||
if e.key {
|
||||
key = "key for "
|
||||
}
|
||||
|
||||
return fmt.Sprintf(
|
||||
"invalid %sBatchUpdateCastTagsReq.%s: %s%s",
|
||||
key,
|
||||
e.field,
|
||||
e.reason,
|
||||
cause)
|
||||
}
|
||||
|
||||
var _ error = BatchUpdateCastTagsReqValidationError{}
|
||||
|
||||
var _ interface {
|
||||
Field() string
|
||||
Reason() string
|
||||
Key() bool
|
||||
Cause() error
|
||||
ErrorName() string
|
||||
} = BatchUpdateCastTagsReqValidationError{}
|
||||
|
||||
// Validate checks the field values on UpdateCastTagStatusReq with the rules
|
||||
// defined in the proto definition for this message. If any rules are
|
||||
// violated, the first error encountered is returned, or nil if there are no violations.
|
||||
|
||||
@ -116,6 +116,7 @@ type CastClient interface {
|
||||
UpdateCastTag(ctx context.Context, in *UpdateCastTagReq, opts ...grpc_go.CallOption) (*UpdateCastTagResp, common.ErrorWithAttachment)
|
||||
ListCastTags(ctx context.Context, in *ListCastTagsReq, opts ...grpc_go.CallOption) (*ListCastTagsResp, common.ErrorWithAttachment)
|
||||
UpdateCastTagBatch(ctx context.Context, in *UpdateCastTagBatchReq, opts ...grpc_go.CallOption) (*UpdateCastTagBatchResp, common.ErrorWithAttachment)
|
||||
BatchUpdateCastTags(ctx context.Context, in *BatchUpdateCastTagsReq, opts ...grpc_go.CallOption) (*emptypb.Empty, common.ErrorWithAttachment)
|
||||
UpdateCastTagStatus(ctx context.Context, in *UpdateCastTagStatusReq, opts ...grpc_go.CallOption) (*emptypb.Empty, common.ErrorWithAttachment)
|
||||
RecalculateCastTagQuoteCount(ctx context.Context, in *emptypb.Empty, opts ...grpc_go.CallOption) (*RecalculateCastTagQuoteCountResp, common.ErrorWithAttachment)
|
||||
}
|
||||
@ -198,6 +199,7 @@ type CastClientImpl struct {
|
||||
UpdateCastTag func(ctx context.Context, in *UpdateCastTagReq) (*UpdateCastTagResp, error)
|
||||
ListCastTags func(ctx context.Context, in *ListCastTagsReq) (*ListCastTagsResp, error)
|
||||
UpdateCastTagBatch func(ctx context.Context, in *UpdateCastTagBatchReq) (*UpdateCastTagBatchResp, error)
|
||||
BatchUpdateCastTags func(ctx context.Context, in *BatchUpdateCastTagsReq) (*emptypb.Empty, error)
|
||||
UpdateCastTagStatus func(ctx context.Context, in *UpdateCastTagStatusReq) (*emptypb.Empty, error)
|
||||
RecalculateCastTagQuoteCount func(ctx context.Context, in *emptypb.Empty) (*RecalculateCastTagQuoteCountResp, error)
|
||||
}
|
||||
@ -652,6 +654,12 @@ func (c *castClient) UpdateCastTagBatch(ctx context.Context, in *UpdateCastTagBa
|
||||
return out, c.cc.Invoke(ctx, "/"+interfaceKey+"/UpdateCastTagBatch", in, out)
|
||||
}
|
||||
|
||||
func (c *castClient) BatchUpdateCastTags(ctx context.Context, in *BatchUpdateCastTagsReq, opts ...grpc_go.CallOption) (*emptypb.Empty, common.ErrorWithAttachment) {
|
||||
out := new(emptypb.Empty)
|
||||
interfaceKey := ctx.Value(constant.InterfaceKey).(string)
|
||||
return out, c.cc.Invoke(ctx, "/"+interfaceKey+"/BatchUpdateCastTags", in, out)
|
||||
}
|
||||
|
||||
func (c *castClient) UpdateCastTagStatus(ctx context.Context, in *UpdateCastTagStatusReq, opts ...grpc_go.CallOption) (*emptypb.Empty, common.ErrorWithAttachment) {
|
||||
out := new(emptypb.Empty)
|
||||
interfaceKey := ctx.Value(constant.InterfaceKey).(string)
|
||||
@ -755,6 +763,7 @@ type CastServer interface {
|
||||
UpdateCastTag(context.Context, *UpdateCastTagReq) (*UpdateCastTagResp, error)
|
||||
ListCastTags(context.Context, *ListCastTagsReq) (*ListCastTagsResp, error)
|
||||
UpdateCastTagBatch(context.Context, *UpdateCastTagBatchReq) (*UpdateCastTagBatchResp, error)
|
||||
BatchUpdateCastTags(context.Context, *BatchUpdateCastTagsReq) (*emptypb.Empty, error)
|
||||
UpdateCastTagStatus(context.Context, *UpdateCastTagStatusReq) (*emptypb.Empty, error)
|
||||
RecalculateCastTagQuoteCount(context.Context, *emptypb.Empty) (*RecalculateCastTagQuoteCountResp, error)
|
||||
mustEmbedUnimplementedCastServer()
|
||||
@ -984,6 +993,9 @@ func (UnimplementedCastServer) ListCastTags(context.Context, *ListCastTagsReq) (
|
||||
func (UnimplementedCastServer) UpdateCastTagBatch(context.Context, *UpdateCastTagBatchReq) (*UpdateCastTagBatchResp, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method UpdateCastTagBatch not implemented")
|
||||
}
|
||||
func (UnimplementedCastServer) BatchUpdateCastTags(context.Context, *BatchUpdateCastTagsReq) (*emptypb.Empty, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method BatchUpdateCastTags not implemented")
|
||||
}
|
||||
func (UnimplementedCastServer) UpdateCastTagStatus(context.Context, *UpdateCastTagStatusReq) (*emptypb.Empty, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method UpdateCastTagStatus not implemented")
|
||||
}
|
||||
@ -3135,6 +3147,35 @@ func _Cast_UpdateCastTagBatch_Handler(srv interface{}, ctx context.Context, dec
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _Cast_BatchUpdateCastTags_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc_go.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(BatchUpdateCastTagsReq)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
base := srv.(dubbo3.Dubbo3GrpcService)
|
||||
args := []interface{}{}
|
||||
args = append(args, in)
|
||||
md, _ := metadata.FromIncomingContext(ctx)
|
||||
invAttachment := make(map[string]interface{}, len(md))
|
||||
for k, v := range md {
|
||||
invAttachment[k] = v
|
||||
}
|
||||
invo := invocation.NewRPCInvocation("BatchUpdateCastTags", args, invAttachment)
|
||||
if interceptor == nil {
|
||||
result := base.XXX_GetProxyImpl().Invoke(ctx, invo)
|
||||
return result, result.Error()
|
||||
}
|
||||
info := &grpc_go.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: ctx.Value("XXX_TRIPLE_GO_INTERFACE_NAME").(string),
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
result := base.XXX_GetProxyImpl().Invoke(ctx, invo)
|
||||
return result, result.Error()
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _Cast_UpdateCastTagStatus_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc_go.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(UpdateCastTagStatusReq)
|
||||
if err := dec(in); err != nil {
|
||||
@ -3492,6 +3533,10 @@ var Cast_ServiceDesc = grpc_go.ServiceDesc{
|
||||
MethodName: "UpdateCastTagBatch",
|
||||
Handler: _Cast_UpdateCastTagBatch_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "BatchUpdateCastTags",
|
||||
Handler: _Cast_BatchUpdateCastTags_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "UpdateCastTagStatus",
|
||||
Handler: _Cast_UpdateCastTagStatus_Handler,
|
||||
|
||||
133
pkg/cron/task.go
133
pkg/cron/task.go
@ -5,6 +5,7 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"fonchain-fiee/api/aryshare"
|
||||
"fonchain-fiee/api/bundle"
|
||||
"fonchain-fiee/api/cast"
|
||||
"fonchain-fiee/pkg/cache"
|
||||
@ -16,6 +17,7 @@ import (
|
||||
"log"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis"
|
||||
@ -40,6 +42,9 @@ func InitTasks() error {
|
||||
if err != nil {
|
||||
log.Printf("添加定时任务失败: %v", err)
|
||||
}
|
||||
|
||||
// 每2分钟执行一次标签观看次数更新任务
|
||||
// err = cm.AddTask("updateCastTagWatchCount", "0 */2 * * * *", UpdateCastTagWatchCountTask)
|
||||
cm.Start()
|
||||
|
||||
// 启动队列消费者
|
||||
@ -508,3 +513,131 @@ func AyrshareMetricsCollectorTask() {
|
||||
func RefreshArtistOrderTask() {
|
||||
service.CastProvider.Tools(context.Background(), &cast.ToolsReq{Action: "refreshArtistOrder"})
|
||||
}
|
||||
|
||||
// UpdateCastTagWatchCountTask 更新标签观看次数的定时任务(每5分钟执行一次)
|
||||
func UpdateCastTagWatchCountTask() {
|
||||
ctx := context.Background()
|
||||
|
||||
// 计算两天前的00:00:00
|
||||
now := time.Now()
|
||||
twoDaysAgo := now.AddDate(0, 0, -2)
|
||||
createdAtStart := time.Date(twoDaysAgo.Year(), twoDaysAgo.Month(), twoDaysAgo.Day(), 0, 0, 0, 0, twoDaysAgo.Location())
|
||||
createdAtEnd := now
|
||||
|
||||
// 格式化时间字符串:2026-01-01 00:00:00
|
||||
createdAtStartStr := createdAtStart.Format("2006-01-02 15:04:05")
|
||||
createdAtEndStr := createdAtEnd.Format("2006-01-02 15:04:05")
|
||||
|
||||
// 调用 ListCastTags 接口,筛选 IsWatchCountCalled = 2 的数据
|
||||
listReq := &cast.ListCastTagsReq{
|
||||
CreatedAtStart: createdAtStartStr,
|
||||
CreatedAtEnd: createdAtEndStr,
|
||||
IsWatchCountCalled: 2, // 2表示未调用
|
||||
Page: 1,
|
||||
PageSize: 20,
|
||||
}
|
||||
|
||||
listResp, err := service.CastProvider.ListCastTags(ctx, listReq)
|
||||
if err != nil {
|
||||
zap.L().Error("获取标签列表失败", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
if listResp.Data == nil || len(listResp.Data) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
zap.L().Info("获取到需要更新的标签", zap.Int("count", len(listResp.Data)))
|
||||
|
||||
// 获取有效的 profileKey
|
||||
profileKey, err := serverCast.GetValidProfileKey(ctx, []uint32{1})
|
||||
if err != nil {
|
||||
zap.L().Error("获取有效profileKey失败", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
// 准备批量更新的数据
|
||||
updateData := make([]*cast.CastTagInfo, 0, len(listResp.Data))
|
||||
|
||||
// 遍历每个标签,调用 RecommendHashtags 接口
|
||||
for _, tag := range listResp.Data {
|
||||
if tag.HashTag == "" {
|
||||
zap.L().Warn("标签HashTag为空,跳过", zap.String("uuid", tag.Uuid))
|
||||
// 即使HashTag为空,也要更新IsWatchCountCalled为1
|
||||
updateData = append(updateData, &cast.CastTagInfo{
|
||||
Uuid: tag.Uuid,
|
||||
WatchCount: 1,
|
||||
IsWatchCountCalled: 1,
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
// 调用 RecommendHashtags 接口
|
||||
recommendReq := &aryshare.RecommendHashtagsRequest{
|
||||
Keyword: tag.HashTag,
|
||||
ProfileKey: profileKey,
|
||||
}
|
||||
|
||||
recommendResp, err := service.AyrshareProvider.RecommendHashtags(ctx, recommendReq)
|
||||
if err != nil {
|
||||
zap.L().Error("调用RecommendHashtags接口失败",
|
||||
zap.String("hashTag", tag.HashTag),
|
||||
zap.String("uuid", tag.Uuid),
|
||||
zap.Error(err))
|
||||
// 调用失败时,将WatchCount更新为1,IsWatchCountCalled更新为1
|
||||
updateData = append(updateData, &cast.CastTagInfo{
|
||||
Uuid: tag.Uuid,
|
||||
WatchCount: 1,
|
||||
IsWatchCountCalled: 1,
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
// 对比返回结果,查找完全一致的标签
|
||||
var matchedViewCount int64 = 0
|
||||
if recommendResp.Recommendations != nil {
|
||||
for _, recommendation := range recommendResp.Recommendations {
|
||||
// 完全一致匹配(不区分大小写)
|
||||
if strings.EqualFold(recommendation.Name, tag.HashTag) {
|
||||
matchedViewCount = recommendation.ViewCount
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 根据匹配结果更新WatchCount
|
||||
var watchCount int32 = 1
|
||||
if matchedViewCount > 0 {
|
||||
watchCount = int32(matchedViewCount)
|
||||
}
|
||||
|
||||
// 添加到更新列表
|
||||
updateData = append(updateData, &cast.CastTagInfo{
|
||||
Uuid: tag.Uuid,
|
||||
WatchCount: watchCount,
|
||||
IsWatchCountCalled: 1,
|
||||
})
|
||||
|
||||
zap.L().Debug("处理标签完成",
|
||||
zap.String("hashTag", tag.HashTag),
|
||||
zap.String("uuid", tag.Uuid),
|
||||
zap.Int64("matchedViewCount", matchedViewCount),
|
||||
zap.Int32("watchCount", watchCount))
|
||||
}
|
||||
|
||||
// 如果没有需要更新的数据,直接返回
|
||||
if len(updateData) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// 批量更新标签
|
||||
batchUpdateReq := &cast.BatchUpdateCastTagsReq{
|
||||
Data: updateData,
|
||||
}
|
||||
|
||||
_, err = service.CastProvider.BatchUpdateCastTags(ctx, batchUpdateReq)
|
||||
if err != nil {
|
||||
zap.L().Error("批量更新标签失败", zap.Error(err), zap.Int("count", len(updateData)))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user