287 lines
		
	
	
		
			8.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			287 lines
		
	
	
		
			8.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Package asChat -----------------------------
 | ||
| // @file      : cache.go
 | ||
| // @author    : JJXu
 | ||
| // @contact   : wavingbear@163.com
 | ||
| // @time      : 2024/9/11 下午5:18
 | ||
| // -------------------------------------------
 | ||
| package chatCache
 | ||
| 
 | ||
| import (
 | ||
| 	"context"
 | ||
| 	"errors"
 | ||
| 	"fmt"
 | ||
| 	"fonchain-fiee/api/accountFiee"
 | ||
| 	"fonchain-fiee/pkg/cache"
 | ||
| 	"fonchain-fiee/pkg/service/asChat/dto"
 | ||
| 	"github.com/go-redis/redis"
 | ||
| 	"github.com/goccy/go-json"
 | ||
| 	"go.uber.org/zap"
 | ||
| 	"log"
 | ||
| 	"strconv"
 | ||
| 	"strings"
 | ||
| 	"sync"
 | ||
| 	"time"
 | ||
| )
 | ||
| 
 | ||
| const CacheChatRecordKey = "fiee:chatRecord"
 | ||
| const CacheSessionKey = "fiee:chatSession"
 | ||
| const CacheNewMsgStatKey = "fiee:newMsgStat"
 | ||
| const CacheAutoReplySwitchKey = "fiee:AutoReplySwitch"
 | ||
| 
 | ||
| var chatCacheLocker sync.RWMutex
 | ||
| 
 | ||
| type ChatCache struct {
 | ||
| 	NewMessageStatExpireAfter time.Duration //消息统计的数据过期时间
 | ||
| }
 | ||
| 
 | ||
| // ------------------------------存储用户的会话ID--------------------------------
 | ||
| func (cr ChatCache) GetUserSessionCacheKey(userId int64) string {
 | ||
| 	return fmt.Sprintf("%s:%d", CacheSessionKey, userId)
 | ||
| }
 | ||
| func (cr ChatCache) SaveUserSession(userId int64, sessionId string) {
 | ||
| 	chatCacheLocker.Lock()
 | ||
| 	defer chatCacheLocker.Unlock()
 | ||
| 	////var c = context.Background()
 | ||
| 	err := cache.RedisClient.Set(cr.GetUserSessionCacheKey(userId), sessionId, 0).Err()
 | ||
| 	if err != nil {
 | ||
| 		log.Print("保存用户会话失败", zap.Error(err))
 | ||
| 	}
 | ||
| }
 | ||
| func (cr ChatCache) GetUserSession(userId int64) (sessionId string) {
 | ||
| 	fmt.Println("GetUserSession-1")
 | ||
| 	chatCacheLocker.RLock()
 | ||
| 	defer chatCacheLocker.RUnlock()
 | ||
| 	//var c = context.Background()
 | ||
| 	sessionId, err := cache.RedisClient.Get(cr.GetUserSessionCacheKey(userId)).Result()
 | ||
| 	fmt.Println("GetUserSession-2")
 | ||
| 	if err != nil {
 | ||
| 		if err.Error() == "redis: nil" {
 | ||
| 			err = nil
 | ||
| 		} else {
 | ||
| 			log.Print("获取用户会话失败", zap.Error(err))
 | ||
| 		}
 | ||
| 	}
 | ||
| 	fmt.Println("GetUserSession-3, sessionId:", sessionId)
 | ||
| 	return
 | ||
| }
 | ||
| 
 | ||
| // ------------------------------存储会话的聊天记录--------------------------------
 | ||
| func (cr ChatCache) GetChatRecordCacheKey(sessionId string) string {
 | ||
| 	return fmt.Sprintf("%s:%s", CacheChatRecordKey, sessionId)
 | ||
| }
 | ||
| func (cr ChatCache) AddChatRecord(sessionId string, data ...*accountFiee.ChatRecordData) (err error) {
 | ||
| 	////var c = context.Background()
 | ||
| 	messages := cr.GetChatRecord(sessionId)
 | ||
| 	fmt.Printf("AddChatRecord add data:%+v\n", data)
 | ||
| 	messages = append(messages, data...)
 | ||
| 	cacheBytes, _ := json.Marshal(messages)
 | ||
| 	fmt.Println("Marshal result", string(cacheBytes))
 | ||
| 	err = cache.RedisClient.Set(cr.GetChatRecordCacheKey(sessionId), cacheBytes, 2*time.Hour).Err()
 | ||
| 	return
 | ||
| }
 | ||
| 
 | ||
| func (cr ChatCache) CoverChatRecord(sessionId string, data []*accountFiee.ChatRecordData) (err error) {
 | ||
| 	chatCacheLocker.Lock()
 | ||
| 	defer chatCacheLocker.Unlock()
 | ||
| 	//var c = context.Background()
 | ||
| 	cacheBytes, _ := json.Marshal(data)
 | ||
| 	err = cache.RedisClient.Set(cr.GetChatRecordCacheKey(sessionId), cacheBytes, 2*time.Hour).Err()
 | ||
| 	return
 | ||
| }
 | ||
| func (cr ChatCache) GetChatRecord(sessionId string) (data []*accountFiee.ChatRecordData) {
 | ||
| 	chatCacheLocker.RLock()
 | ||
| 	defer chatCacheLocker.RUnlock()
 | ||
| 	data = make([]*accountFiee.ChatRecordData, 0)
 | ||
| 	//var c = context.Background()
 | ||
| 	messages, err := cache.RedisClient.Get(cr.GetChatRecordCacheKey(sessionId)).Bytes()
 | ||
| 	if err != nil {
 | ||
| 		if err.Error() == "redis: nil" {
 | ||
| 			err = nil
 | ||
| 		}
 | ||
| 		//log.Print("获取聊天记录失败", zap.Error(err))
 | ||
| 		return
 | ||
| 	}
 | ||
| 	//fmt.Printf("cache data: %+v", string(messages))
 | ||
| 	if len(messages) > 0 {
 | ||
| 		_ = json.Unmarshal(messages, &data)
 | ||
| 	}
 | ||
| 	return
 | ||
| }
 | ||
| 
 | ||
| // ------------------------------存储新消息统计--------------------------------
 | ||
| func (cr ChatCache) GetNewMsgStatCacheKey(ownerId int64) string {
 | ||
| 	return fmt.Sprintf("%s:%d", CacheNewMsgStatKey, ownerId)
 | ||
| }
 | ||
| 
 | ||
| // 消息数量自增
 | ||
| func (cr ChatCache) IncreaseNewMessageTotal(ownerId int64, sessionId string) (err error) {
 | ||
| 	chatCacheLocker.Lock()
 | ||
| 	defer chatCacheLocker.Unlock()
 | ||
| 	ctx := context.Background()
 | ||
| 	data := cr.GetNewMessageStat(ctx, ownerId)
 | ||
| 	if len(data) > 0 {
 | ||
| 		foundIndex := -1
 | ||
| 		for i, v := range data {
 | ||
| 			if v.SessionId == sessionId {
 | ||
| 				foundIndex = i
 | ||
| 				break
 | ||
| 			}
 | ||
| 		}
 | ||
| 		if foundIndex > -1 {
 | ||
| 			data[foundIndex].Total += 1
 | ||
| 		}
 | ||
| 		//将foundIndex之后的所有元素右移动一位
 | ||
| 		if foundIndex > 0 {
 | ||
| 			elementToMove := data[foundIndex]
 | ||
| 			copy(data[1:], data[0:foundIndex])
 | ||
| 			data[0] = elementToMove
 | ||
| 		} else if foundIndex == -1 {
 | ||
| 			data = append([]dto.UserMsgStatic{{SessionId: sessionId, Total: 1}}, data...)
 | ||
| 		}
 | ||
| 	} else {
 | ||
| 		data = []dto.UserMsgStatic{{SessionId: sessionId, Total: 1}}
 | ||
| 	}
 | ||
| 	return cr.coverOwnerNewMessageStat(ctx, ownerId, data)
 | ||
| }
 | ||
| 
 | ||
| // 重置新消息数量
 | ||
| func (cr ChatCache) ResetNewMessageTotal(ownerId int64, sessionId string, total ...int64) error {
 | ||
| 	fmt.Printf("ResetNewMessageTotal: %d ,sessionId:%s ,total:%v\n", ownerId, sessionId, total)
 | ||
| 	chatCacheLocker.Lock()
 | ||
| 	defer chatCacheLocker.Unlock()
 | ||
| 	var tl int64
 | ||
| 	if len(total) > 0 {
 | ||
| 		tl = total[0]
 | ||
| 	}
 | ||
| 	fmt.Println("ResetNewMessageTotal tl:", tl)
 | ||
| 	ctx := context.Background()
 | ||
| 	data := cr.GetNewMessageStat(ctx, ownerId)
 | ||
| 	fmt.Printf("ResetNewMessageTotal data:%+v\n", data)
 | ||
| 	found := false
 | ||
| 	for i, v := range data {
 | ||
| 		if v.SessionId == sessionId {
 | ||
| 			found = true
 | ||
| 			data[i].Total = tl
 | ||
| 			fmt.Println("ResetNewMessageTotal found!")
 | ||
| 			break
 | ||
| 		}
 | ||
| 	}
 | ||
| 	if !found {
 | ||
| 		fmt.Println("ResetNewMessageTotal not found!")
 | ||
| 		data = append(data, dto.UserMsgStatic{
 | ||
| 			SessionId: sessionId,
 | ||
| 			Total:     tl,
 | ||
| 		})
 | ||
| 	}
 | ||
| 	err := cr.coverOwnerNewMessageStat(ctx, ownerId, data)
 | ||
| 	fmt.Println("ResetNewMessageTotal result:", err)
 | ||
| 	return err
 | ||
| }
 | ||
| 
 | ||
| func (cr ChatCache) RecountNewMessageTotal(ownerId int64) {
 | ||
| 	//var c = context.Background()
 | ||
| 	var keys []string
 | ||
| 	var err error
 | ||
| 	keys, err = cache.RedisClient.Keys(CacheChatRecordKey + "*").Result()
 | ||
| 	if err != nil {
 | ||
| 		log.Print("获取聊天记录所有缓存KEY失败", zap.Error(err))
 | ||
| 		return
 | ||
| 	}
 | ||
| 	var countMap = make(map[string]int)
 | ||
| 	for _, key := range keys {
 | ||
| 		var messages []byte
 | ||
| 		var data []*accountFiee.ChatRecordData
 | ||
| 		messages, err = cache.RedisClient.Get(key).Bytes()
 | ||
| 		if err != nil {
 | ||
| 			if err.Error() == "redis: nil" {
 | ||
| 				err = nil
 | ||
| 			}
 | ||
| 			log.Print("获取聊天记录失败", zap.Error(err))
 | ||
| 			data = make([]*accountFiee.ChatRecordData, 0)
 | ||
| 			continue
 | ||
| 		}
 | ||
| 		if len(messages) > 0 {
 | ||
| 			_ = json.Unmarshal(messages, &data)
 | ||
| 		}
 | ||
| 		lastIndex := strings.Count(key, ":")
 | ||
| 		var sessionId = strings.Split(key, ":")[lastIndex]
 | ||
| 		countMap[sessionId] = 0
 | ||
| 		for _, v := range data {
 | ||
| 			if v.WaiterRead == 2 { //统计未读消息数量
 | ||
| 				countMap[sessionId]++
 | ||
| 			}
 | ||
| 		}
 | ||
| 	}
 | ||
| 	for sessionId, count := range countMap {
 | ||
| 		err = cr.ResetNewMessageTotal(ownerId, sessionId, int64(count))
 | ||
| 		if err != nil {
 | ||
| 			log.Print("重置新消息数量统计",
 | ||
| 				zap.String("function", "RecountNewMessageTotal"),
 | ||
| 				zap.Int64("ownerId", ownerId),
 | ||
| 				zap.String("sessionId", sessionId),
 | ||
| 				zap.Int("count", count),
 | ||
| 				zap.Error(err),
 | ||
| 			)
 | ||
| 		}
 | ||
| 	}
 | ||
| 	return
 | ||
| }
 | ||
| 
 | ||
| // erp获取最新的消息统计
 | ||
| func (cr ChatCache) GetNewMessageStat(ctx context.Context, ownerId int64) (result []dto.UserMsgStatic) {
 | ||
| 	//chatCacheLocker.RLock()
 | ||
| 	//defer chatCacheLocker.RUnlock()
 | ||
| 	result = make([]dto.UserMsgStatic, 0)
 | ||
| 	vals, err := cache.RedisClient.Get(cr.GetNewMsgStatCacheKey(ownerId)).Bytes()
 | ||
| 	if err != nil && errors.Is(err, redis.Nil) {
 | ||
| 		log.Print("从缓存获取新消息统计失败", zap.Error(err), zap.Int64("ownerId", ownerId))
 | ||
| 		return
 | ||
| 	}
 | ||
| 	if vals != nil {
 | ||
| 		_ = json.Unmarshal(vals, &result)
 | ||
| 	}
 | ||
| 	return
 | ||
| }
 | ||
| 
 | ||
| // 覆盖指定erp用户的新消息统计
 | ||
| func (cr ChatCache) coverOwnerNewMessageStat(ctx context.Context, ownerId int64, data []dto.UserMsgStatic) (err error) {
 | ||
| 	value, _ := json.Marshal(data)
 | ||
| 	//err = cache.RedisClient.Set(ctx, cr.GetNewMsgStatCacheKey(ownerId), value, cr.NewMessageStatExpireAfter).Err()
 | ||
| 	err = cache.RedisClient.Set(cr.GetNewMsgStatCacheKey(ownerId), value, 0).Err()
 | ||
| 	return
 | ||
| }
 | ||
| 
 | ||
| // -----------------------------------用户自动回复开关
 | ||
| // erp获取最新的消息统计
 | ||
| func (cr ChatCache) GetCacheAutoReplySwitchKey(userId int64) string {
 | ||
| 	return fmt.Sprintf("%s:%d", CacheAutoReplySwitchKey, userId)
 | ||
| }
 | ||
| 
 | ||
| func (cr ChatCache) SetAutoReplySwitch(ctx context.Context, ownerId int64, enableAutoReply bool) {
 | ||
| 	//chatCacheLocker.RLock()
 | ||
| 	//defer chatCacheLocker.RUnlock()
 | ||
| 	err := cache.RedisClient.Set(cr.GetCacheAutoReplySwitchKey(ownerId), enableAutoReply, 0).Err()
 | ||
| 	if err != nil {
 | ||
| 		log.Print("保存用户会话失败", zap.Error(err))
 | ||
| 	}
 | ||
| 	return
 | ||
| }
 | ||
| 
 | ||
| func (cr ChatCache) GetAutoReplySwitch(ctx context.Context, ownerId int64) (enableAutoReply bool) {
 | ||
| 	//chatCacheLocker.RLock()
 | ||
| 	//defer chatCacheLocker.RUnlock()
 | ||
| 	val, err := cache.RedisClient.Get(cr.GetCacheAutoReplySwitchKey(ownerId)).Bytes()
 | ||
| 	if err != nil {
 | ||
| 		log.Print("获取自动回复开关查询失败", zap.Error(err), zap.Int64("ownerId", ownerId))
 | ||
| 		return true
 | ||
| 	}
 | ||
| 	// 解析缓存值为布尔值
 | ||
| 	boolVal, err := strconv.ParseBool(string(val))
 | ||
| 	if err != nil {
 | ||
| 		log.Print("解析自动回复开关值失败", zap.Error(err), zap.String("value", string(val)), zap.Int64("ownerId", ownerId))
 | ||
| 		return true // 解析失败时也返回默认值true
 | ||
| 	}
 | ||
| 
 | ||
| 	return boolVal
 | ||
| }
 |