Files
customer-server/service/xiaohongshu_service.go
2026-03-14 10:02:49 +08:00

460 lines
13 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package service
import (
"context"
"crypto/aes"
"crypto/cipher"
"crypto/rand"
"crypto/sha256"
"customer-server/dao"
"customer-server/model/dto"
"customer-server/model/entity"
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"strings"
commonMongo "gitea.com/red-future/common/db/mongo"
"gitea.com/red-future/common/jaeger"
"gitea.com/red-future/common/rabbitmq"
"gitea.com/red-future/common/redis"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gtime"
"github.com/gogf/gf/v2/util/gconv"
"go.mongodb.org/mongo-driver/v2/bson"
)
var Xiaohongshu = new(xiaohongshu)
type xiaohongshu struct{}
const (
XhsApiBaseUrl = "https://adapi.xiaohongshu.com"
XhsPlatformName = "xiaohongshu"
XhsEncryptSplit = "~split~"
)
// ==================== 加解密工具 ====================
// Encrypt AES加密
// 参数: ctx - 上下文content - 待加密内容secretKey - 密钥Base64编码
// 返回: res - 加密后的字符串Base64编码err - 错误信息
// 功能: 使用AES-CBC模式加密内容用于小红书API签名
func (s *xiaohongshu) Encrypt(ctx context.Context, content, secretKey string) (res string, err error) {
keyBytes, err := base64.StdEncoding.DecodeString(secretKey)
if err != nil {
jaeger.RecordError(ctx, err)
return
}
block, err := aes.NewCipher(keyBytes)
if err != nil {
jaeger.RecordError(ctx, err)
return
}
iv := make([]byte, aes.BlockSize)
if _, err = io.ReadFull(rand.Reader, iv); err != nil {
jaeger.RecordError(ctx, err)
return
}
stream := cipher.NewCBCEncrypter(block, iv)
contentBytes := []byte(content)
paddedContent := pkcs5Padding(contentBytes, aes.BlockSize)
cipherText := make([]byte, len(paddedContent))
stream.CryptBlocks(cipherText, paddedContent)
ivBase64 := base64.StdEncoding.EncodeToString(iv)
cipherBase64 := base64.StdEncoding.EncodeToString(cipherText)
res = fmt.Sprintf("%s%s%s", ivBase64, XhsEncryptSplit, cipherBase64)
return
}
func (s *xiaohongshu) Decrypt(ctx context.Context, cipherText, secretKey string) (res string, err error) {
keyBytes, err := base64.StdEncoding.DecodeString(secretKey)
if err != nil {
jaeger.RecordError(ctx, err)
return
}
parts := strings.Split(cipherText, XhsEncryptSplit)
if len(parts) != 2 {
err = errors.New("invalid cipher text format")
jaeger.RecordError(ctx, err)
return
}
iv, err := base64.StdEncoding.DecodeString(parts[0])
if err != nil {
jaeger.RecordError(ctx, err)
return
}
encrypted, err := base64.StdEncoding.DecodeString(parts[1])
if err != nil {
jaeger.RecordError(ctx, err)
return
}
block, err := aes.NewCipher(keyBytes)
if err != nil {
jaeger.RecordError(ctx, err)
return
}
if len(encrypted)%aes.BlockSize != 0 {
err = errors.New("cipher text is not a multiple of block size")
jaeger.RecordError(ctx, err)
return
}
stream := cipher.NewCBCDecrypter(block, iv)
decrypted := make([]byte, len(encrypted))
stream.CryptBlocks(decrypted, encrypted)
decrypted, err = pkcs5Unpadding(decrypted)
if err != nil {
jaeger.RecordError(ctx, err)
return
}
res = string(decrypted)
return
}
func pkcs5Padding(data []byte, blockSize int) []byte {
padding := blockSize - len(data)%blockSize
padText := make([]byte, padding)
for i := range padText {
padText[i] = byte(padding)
}
return append(data, padText...)
}
func pkcs5Unpadding(data []byte) (res []byte, err error) {
length := len(data)
if length == 0 {
err = errors.New("invalid padding size")
return
}
padding := int(data[length-1])
if padding > length {
err = errors.New("invalid padding size")
return
}
res = data[:length-padding]
return
}
// ==================== 账号绑定管理 ====================
func (s *xiaohongshu) HandleBindAccount(ctx context.Context, req *dto.XhsBindAccountReq) (err error) {
var account entity.CustomerServiceAccount
filter := bson.M{"platform": XhsPlatformName, "isDeleted": false}
if err = dao.MongoDAO.FindOne(ctx, filter, &account, entity.CustomerServiceAccountCollection); err != nil {
jaeger.RecordError(ctx, err)
return
}
decrypted, err := s.Decrypt(ctx, req.Content, account.SecretKey)
if err != nil {
jaeger.RecordError(ctx, err)
return
}
var bindData dto.XhsBindAccountDecrypted
if err = json.Unmarshal([]byte(decrypted), &bindData); err != nil {
jaeger.RecordError(ctx, err)
return
}
update := bson.M{
"$set": bson.M{
"accessToken": bindData.Token,
"appId": bindData.AppId,
"xhsUserId": bindData.UserId,
"updatedAt": gtime.Now().Time,
},
}
filter = bson.M{"_id": account.Id}
_, err = commonMongo.GetDB().Collection(entity.CustomerServiceAccountCollection).UpdateOne(ctx, filter, update)
if err != nil {
jaeger.RecordError(ctx, err)
return
}
g.Log().Infof(ctx, "[小红书] 绑定账户成功: userId=%s, nickName=%s", bindData.UserId, bindData.NickName)
return
}
func (s *xiaohongshu) HandleUnbindAccount(ctx context.Context, req *dto.XhsUnbindAccountReq) (err error) {
var account entity.CustomerServiceAccount
filter := bson.M{"platform": XhsPlatformName, "isDeleted": false}
if err = dao.MongoDAO.FindOne(ctx, filter, &account, entity.CustomerServiceAccountCollection); err != nil {
jaeger.RecordError(ctx, err)
return
}
decrypted, err := s.Decrypt(ctx, req.Content, account.SecretKey)
if err != nil {
jaeger.RecordError(ctx, err)
return
}
var unbindData dto.XhsUnbindAccountDecrypted
if err = json.Unmarshal([]byte(decrypted), &unbindData); err != nil {
jaeger.RecordError(ctx, err)
return
}
update := bson.M{
"$set": bson.M{
"accessToken": "",
"xhsUserId": "",
"updatedAt": gtime.Now().Time,
},
}
filter = bson.M{"_id": account.Id}
_, err = commonMongo.GetDB().Collection(entity.CustomerServiceAccountCollection).UpdateOne(ctx, filter, update)
if err != nil {
jaeger.RecordError(ctx, err)
return
}
g.Log().Infof(ctx, "[小红书] 解绑账户成功: userId=%s", unbindData.UserId)
return
}
// ==================== 消息收发 ====================
func (s *xiaohongshu) HandleReceiveMessage(ctx context.Context, req *dto.XhsReceiveMessageReq) (err error) {
accountId, err := s.getAccountIdByPlatform(ctx)
if err != nil {
return
}
var account entity.CustomerServiceAccount
filter := bson.M{"_id": accountId, "isDeleted": false}
if err = dao.MongoDAO.FindOne(ctx, filter, &account, entity.CustomerServiceAccountCollection); err != nil {
jaeger.RecordError(ctx, err)
return
}
decrypted, err := s.Decrypt(ctx, req.Content, account.SecretKey)
if err != nil {
jaeger.RecordError(ctx, err)
return
}
var conversation entity.Conversation
id := bson.NewObjectID()
conversation.Id = &id // 取地址赋值给指针类型
conversation.SessionId = fmt.Sprintf("%s_%s", req.FromUserId, XhsPlatformName)
conversation.UserId = req.FromUserId
conversation.CustomerServiceId = accountId.Hex()
conversation.Role = "user"
conversation.Platform = XhsPlatformName
conversation.MessageId = req.MessageId
conversation.MessageType = req.MessageType
now := gtime.Now().Time
conversation.CreatedAt = &now // 取地址赋值给指针类型
switch req.MessageType {
case "TEXT":
var textContent dto.XhsTextContent
if err = json.Unmarshal([]byte(decrypted), &textContent); err != nil {
jaeger.RecordError(ctx, err)
return
}
conversation.Content = textContent.Text
case "IMAGE":
var imgContent dto.XhsImageContent
if err = json.Unmarshal([]byte(decrypted), &imgContent); err != nil {
jaeger.RecordError(ctx, err)
return
}
conversation.Content = fmt.Sprintf("[图片]%s", imgContent.Link)
case "VIDEO":
var videoContent dto.XhsVideoContent
if err = json.Unmarshal([]byte(decrypted), &videoContent); err != nil {
jaeger.RecordError(ctx, err)
return
}
conversation.Content = fmt.Sprintf("[视频]%s", videoContent.Link)
case "CARD":
var cardContent dto.XhsCardContent
if err = json.Unmarshal([]byte(decrypted), &cardContent); err != nil {
jaeger.RecordError(ctx, err)
return
}
conversation.Content = fmt.Sprintf("[卡片-%s]%s", cardContent.ContentType, cardContent.Id)
case "REVOKE":
var revokeContent dto.XhsRevokeContent
if err = json.Unmarshal([]byte(decrypted), &revokeContent); err != nil {
jaeger.RecordError(ctx, err)
return
}
conversation.Content = fmt.Sprintf("[撤回消息]%s", revokeContent.MessageId)
case "HINT":
conversation.Content = "[系统提示消息]"
case "SMILES":
conversation.Content = "[表情消息]"
default:
conversation.Content = fmt.Sprintf("[%s类型消息]", req.MessageType)
}
_, err = commonMongo.GetDB().Collection(entity.ConversationCollection).InsertOne(ctx, conversation)
if err != nil {
jaeger.RecordError(ctx, err)
return
}
g.Log().Infof(ctx, "[小红书] 接收消息成功: sessionId=%s, messageType=%s", conversation.SessionId, req.MessageType)
if req.MessageType == "TEXT" {
asyncCtx := context.WithoutCancel(ctx)
go s.processUserMessage(asyncCtx, &account, &conversation)
}
return
}
func (s *xiaohongshu) SendMessage(ctx context.Context, account *entity.CustomerServiceAccount, toUserId, content string) (err error) {
textContent := dto.XhsTextContent{Text: content}
contentJson, err := json.Marshal(textContent)
if err != nil {
jaeger.RecordError(ctx, err)
return
}
encrypted, err := s.Encrypt(ctx, string(contentJson), account.SecretKey)
if err != nil {
jaeger.RecordError(ctx, err)
return
}
now := gtime.Now()
sendReq := dto.XhsSendMessageReq{
UserId: account.XhsUserId,
RequestId: fmt.Sprintf("%d", now.UnixNano()),
MessageType: "TEXT",
FromUserId: account.XhsUserId,
ToUserId: toUserId,
ThirdAccountId: account.Id.Hex(),
Timestamp: now.UnixMilli(),
Content: encrypted,
}
url := fmt.Sprintf("%s/api/open/im/third/send", XhsApiBaseUrl)
client := g.Client()
client.SetHeader("Access-Token", account.AccessToken)
client.SetHeader("Content-Type", "application/json")
resp, err := client.Post(ctx, url, sendReq)
if err != nil {
jaeger.RecordError(ctx, err)
return
}
defer resp.Close()
var sendRes dto.XhsSendMessageRes
if err = json.Unmarshal(resp.ReadAll(), &sendRes); err != nil {
jaeger.RecordError(ctx, err)
return
}
if sendRes.Code != 0 {
err = fmt.Errorf("发送消息失败: code=%d, msg=%s", sendRes.Code, sendRes.Msg)
jaeger.RecordError(ctx, err)
return
}
var conversation entity.Conversation
id2 := bson.NewObjectID()
conversation.Id = &id2 // 取地址赋值给指针类型
conversation.SessionId = fmt.Sprintf("%s_%s", toUserId, XhsPlatformName)
conversation.UserId = toUserId
conversation.CustomerServiceId = account.Id.Hex()
conversation.Role = "assistant"
conversation.Platform = XhsPlatformName
conversation.MessageId = sendRes.Data.MessageId
conversation.MessageType = "TEXT"
conversation.Content = content
now2 := gtime.Now().Time
conversation.CreatedAt = &now2 // 取地址赋值给指针类型
_, err = commonMongo.GetDB().Collection(entity.ConversationCollection).InsertOne(ctx, conversation)
if err != nil {
jaeger.RecordError(ctx, err)
return
}
g.Log().Infof(ctx, "[小红书] 发送消息成功: toUserId=%s, messageId=%s", toUserId, sendRes.Data.MessageId)
return
}
func (s *xiaohongshu) GenerateSignature(ctx context.Context, secretKey, requestBody string) (res string) {
h := sha256.New()
h.Write([]byte(secretKey + requestBody))
res = hex.EncodeToString(h.Sum(nil))
return
}
// ==================== 私有方法 ====================
func (s *xiaohongshu) getAccountIdByPlatform(ctx context.Context) (res bson.ObjectID, err error) {
var account entity.CustomerServiceAccount
filter := bson.M{"platform": XhsPlatformName, "isDeleted": false}
if err = dao.MongoDAO.FindOne(ctx, filter, &account, entity.CustomerServiceAccountCollection); err != nil {
jaeger.RecordError(ctx, err)
return
}
res = *account.Id // 解引用指针类型
return
}
func (s *xiaohongshu) processUserMessage(ctx context.Context, account *entity.CustomerServiceAccount, conversation *entity.Conversation) {
if err := s.sendToRAGFlowStream(ctx, account, conversation); err != nil {
jaeger.RecordError(ctx, err)
g.Log().Errorf(ctx, "[小红书] 发送到RAGFlow Stream失败: %v", err)
return
}
g.Log().Infof(ctx, "[小红书] 消息已发送到RAGFlow Stream: userId=%s", conversation.UserId)
}
func (s *xiaohongshu) sendToRAGFlowStream(ctx context.Context, account *entity.CustomerServiceAccount, conversation *entity.Conversation) (err error) {
baseQueue := GetConfigString(ctx, "rabbitmq.responseQueue")
replyQueue := rabbitmq.GetInstanceQueueName(baseQueue)
msg := &redis.SendStreamMessage{
UserId: fmt.Sprintf("%s_%s", XhsPlatformName, conversation.UserId),
TenantId: gconv.String(account.TenantId),
Content: conversation.Content,
Timestamp: gtime.New(conversation.CreatedAt).Timestamp(),
MessageId: conversation.MessageId,
Platform: XhsPlatformName,
AccountId: account.Id.Hex(),
AccountName: account.AccountName,
ReplyQueue: replyQueue,
}
if sessionId, _ := redis.GetSessionCache(ctx, gconv.String(account.TenantId), msg.UserId); sessionId == "" {
if history, histErr := dao.Conversation.GetRecentHistory(ctx, msg.UserId, redis.GetHistoryContextLimit()); histErr == nil && len(history) > 0 {
msg.History = history
g.Log().Infof(ctx, "[小红书] 用户已归档,读取 %d 轮历史对话", len(history))
}
}
streamMsgId, err := redis.AddToStream(ctx, redis.RAGFlowRequestStreamKey, msg)
if err != nil {
jaeger.RecordError(ctx, err)
return
}
g.Log().Infof(ctx, "[小红书] 消息已写入Stream: streamMsgId=%s, sessionId=%s", streamMsgId, conversation.SessionId)
return
}