package service import ( "context" "crypto/aes" "crypto/cipher" "crypto/rand" "crypto/sha256" "customer-server/dao" "customer-server/model/dto" "customer-server/model/entity" "encoding/base64" "encoding/hex" "encoding/json" "errors" "fmt" "io" "strings" commonMongo "gitea.com/red-future/common/db/mongo" "gitea.com/red-future/common/jaeger" "gitea.com/red-future/common/rabbitmq" "gitea.com/red-future/common/redis" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/gtime" "github.com/gogf/gf/v2/util/gconv" "go.mongodb.org/mongo-driver/v2/bson" ) var Xiaohongshu = new(xiaohongshu) type xiaohongshu struct{} const ( XhsApiBaseUrl = "https://adapi.xiaohongshu.com" XhsPlatformName = "xiaohongshu" XhsEncryptSplit = "~split~" ) // ==================== 加解密工具 ==================== // Encrypt AES加密 // 参数: ctx - 上下文,content - 待加密内容,secretKey - 密钥(Base64编码) // 返回: res - 加密后的字符串(Base64编码),err - 错误信息 // 功能: 使用AES-CBC模式加密内容,用于小红书API签名 func (s *xiaohongshu) Encrypt(ctx context.Context, content, secretKey string) (res string, err error) { keyBytes, err := base64.StdEncoding.DecodeString(secretKey) if err != nil { jaeger.RecordError(ctx, err) return } block, err := aes.NewCipher(keyBytes) if err != nil { jaeger.RecordError(ctx, err) return } iv := make([]byte, aes.BlockSize) if _, err = io.ReadFull(rand.Reader, iv); err != nil { jaeger.RecordError(ctx, err) return } stream := cipher.NewCBCEncrypter(block, iv) contentBytes := []byte(content) paddedContent := pkcs5Padding(contentBytes, aes.BlockSize) cipherText := make([]byte, len(paddedContent)) stream.CryptBlocks(cipherText, paddedContent) ivBase64 := base64.StdEncoding.EncodeToString(iv) cipherBase64 := base64.StdEncoding.EncodeToString(cipherText) res = fmt.Sprintf("%s%s%s", ivBase64, XhsEncryptSplit, cipherBase64) return } func (s *xiaohongshu) Decrypt(ctx context.Context, cipherText, secretKey string) (res string, err error) { keyBytes, err := base64.StdEncoding.DecodeString(secretKey) if err != nil { jaeger.RecordError(ctx, err) return } parts := strings.Split(cipherText, XhsEncryptSplit) if len(parts) != 2 { err = errors.New("invalid cipher text format") jaeger.RecordError(ctx, err) return } iv, err := base64.StdEncoding.DecodeString(parts[0]) if err != nil { jaeger.RecordError(ctx, err) return } encrypted, err := base64.StdEncoding.DecodeString(parts[1]) if err != nil { jaeger.RecordError(ctx, err) return } block, err := aes.NewCipher(keyBytes) if err != nil { jaeger.RecordError(ctx, err) return } if len(encrypted)%aes.BlockSize != 0 { err = errors.New("cipher text is not a multiple of block size") jaeger.RecordError(ctx, err) return } stream := cipher.NewCBCDecrypter(block, iv) decrypted := make([]byte, len(encrypted)) stream.CryptBlocks(decrypted, encrypted) decrypted, err = pkcs5Unpadding(decrypted) if err != nil { jaeger.RecordError(ctx, err) return } res = string(decrypted) return } func pkcs5Padding(data []byte, blockSize int) []byte { padding := blockSize - len(data)%blockSize padText := make([]byte, padding) for i := range padText { padText[i] = byte(padding) } return append(data, padText...) } func pkcs5Unpadding(data []byte) (res []byte, err error) { length := len(data) if length == 0 { err = errors.New("invalid padding size") return } padding := int(data[length-1]) if padding > length { err = errors.New("invalid padding size") return } res = data[:length-padding] return } // ==================== 账号绑定管理 ==================== func (s *xiaohongshu) HandleBindAccount(ctx context.Context, req *dto.XhsBindAccountReq) (err error) { var account entity.CustomerServiceAccount filter := bson.M{"platform": XhsPlatformName, "isDeleted": false} if err = dao.MongoDAO.FindOne(ctx, filter, &account, entity.CustomerServiceAccountCollection); err != nil { jaeger.RecordError(ctx, err) return } decrypted, err := s.Decrypt(ctx, req.Content, account.SecretKey) if err != nil { jaeger.RecordError(ctx, err) return } var bindData dto.XhsBindAccountDecrypted if err = json.Unmarshal([]byte(decrypted), &bindData); err != nil { jaeger.RecordError(ctx, err) return } update := bson.M{ "$set": bson.M{ "accessToken": bindData.Token, "appId": bindData.AppId, "xhsUserId": bindData.UserId, "updatedAt": gtime.Now().Time, }, } filter = bson.M{"_id": account.Id} _, err = commonMongo.GetDB().Collection(entity.CustomerServiceAccountCollection).UpdateOne(ctx, filter, update) if err != nil { jaeger.RecordError(ctx, err) return } g.Log().Infof(ctx, "[小红书] 绑定账户成功: userId=%s, nickName=%s", bindData.UserId, bindData.NickName) return } func (s *xiaohongshu) HandleUnbindAccount(ctx context.Context, req *dto.XhsUnbindAccountReq) (err error) { var account entity.CustomerServiceAccount filter := bson.M{"platform": XhsPlatformName, "isDeleted": false} if err = dao.MongoDAO.FindOne(ctx, filter, &account, entity.CustomerServiceAccountCollection); err != nil { jaeger.RecordError(ctx, err) return } decrypted, err := s.Decrypt(ctx, req.Content, account.SecretKey) if err != nil { jaeger.RecordError(ctx, err) return } var unbindData dto.XhsUnbindAccountDecrypted if err = json.Unmarshal([]byte(decrypted), &unbindData); err != nil { jaeger.RecordError(ctx, err) return } update := bson.M{ "$set": bson.M{ "accessToken": "", "xhsUserId": "", "updatedAt": gtime.Now().Time, }, } filter = bson.M{"_id": account.Id} _, err = commonMongo.GetDB().Collection(entity.CustomerServiceAccountCollection).UpdateOne(ctx, filter, update) if err != nil { jaeger.RecordError(ctx, err) return } g.Log().Infof(ctx, "[小红书] 解绑账户成功: userId=%s", unbindData.UserId) return } // ==================== 消息收发 ==================== func (s *xiaohongshu) HandleReceiveMessage(ctx context.Context, req *dto.XhsReceiveMessageReq) (err error) { accountId, err := s.getAccountIdByPlatform(ctx) if err != nil { return } var account entity.CustomerServiceAccount filter := bson.M{"_id": accountId, "isDeleted": false} if err = dao.MongoDAO.FindOne(ctx, filter, &account, entity.CustomerServiceAccountCollection); err != nil { jaeger.RecordError(ctx, err) return } decrypted, err := s.Decrypt(ctx, req.Content, account.SecretKey) if err != nil { jaeger.RecordError(ctx, err) return } var conversation entity.Conversation id := bson.NewObjectID() conversation.Id = &id // 取地址赋值给指针类型 conversation.SessionId = fmt.Sprintf("%s_%s", req.FromUserId, XhsPlatformName) conversation.UserId = req.FromUserId conversation.CustomerServiceId = accountId.Hex() conversation.Role = "user" conversation.Platform = XhsPlatformName conversation.MessageId = req.MessageId conversation.MessageType = req.MessageType now := gtime.Now().Time conversation.CreatedAt = &now // 取地址赋值给指针类型 switch req.MessageType { case "TEXT": var textContent dto.XhsTextContent if err = json.Unmarshal([]byte(decrypted), &textContent); err != nil { jaeger.RecordError(ctx, err) return } conversation.Content = textContent.Text case "IMAGE": var imgContent dto.XhsImageContent if err = json.Unmarshal([]byte(decrypted), &imgContent); err != nil { jaeger.RecordError(ctx, err) return } conversation.Content = fmt.Sprintf("[图片]%s", imgContent.Link) case "VIDEO": var videoContent dto.XhsVideoContent if err = json.Unmarshal([]byte(decrypted), &videoContent); err != nil { jaeger.RecordError(ctx, err) return } conversation.Content = fmt.Sprintf("[视频]%s", videoContent.Link) case "CARD": var cardContent dto.XhsCardContent if err = json.Unmarshal([]byte(decrypted), &cardContent); err != nil { jaeger.RecordError(ctx, err) return } conversation.Content = fmt.Sprintf("[卡片-%s]%s", cardContent.ContentType, cardContent.Id) case "REVOKE": var revokeContent dto.XhsRevokeContent if err = json.Unmarshal([]byte(decrypted), &revokeContent); err != nil { jaeger.RecordError(ctx, err) return } conversation.Content = fmt.Sprintf("[撤回消息]%s", revokeContent.MessageId) case "HINT": conversation.Content = "[系统提示消息]" case "SMILES": conversation.Content = "[表情消息]" default: conversation.Content = fmt.Sprintf("[%s类型消息]", req.MessageType) } _, err = commonMongo.GetDB().Collection(entity.ConversationCollection).InsertOne(ctx, conversation) if err != nil { jaeger.RecordError(ctx, err) return } g.Log().Infof(ctx, "[小红书] 接收消息成功: sessionId=%s, messageType=%s", conversation.SessionId, req.MessageType) if req.MessageType == "TEXT" { asyncCtx := context.WithoutCancel(ctx) go s.processUserMessage(asyncCtx, &account, &conversation) } return } func (s *xiaohongshu) SendMessage(ctx context.Context, account *entity.CustomerServiceAccount, toUserId, content string) (err error) { textContent := dto.XhsTextContent{Text: content} contentJson, err := json.Marshal(textContent) if err != nil { jaeger.RecordError(ctx, err) return } encrypted, err := s.Encrypt(ctx, string(contentJson), account.SecretKey) if err != nil { jaeger.RecordError(ctx, err) return } now := gtime.Now() sendReq := dto.XhsSendMessageReq{ UserId: account.XhsUserId, RequestId: fmt.Sprintf("%d", now.UnixNano()), MessageType: "TEXT", FromUserId: account.XhsUserId, ToUserId: toUserId, ThirdAccountId: account.Id.Hex(), Timestamp: now.UnixMilli(), Content: encrypted, } url := fmt.Sprintf("%s/api/open/im/third/send", XhsApiBaseUrl) client := g.Client() client.SetHeader("Access-Token", account.AccessToken) client.SetHeader("Content-Type", "application/json") resp, err := client.Post(ctx, url, sendReq) if err != nil { jaeger.RecordError(ctx, err) return } defer resp.Close() var sendRes dto.XhsSendMessageRes if err = json.Unmarshal(resp.ReadAll(), &sendRes); err != nil { jaeger.RecordError(ctx, err) return } if sendRes.Code != 0 { err = fmt.Errorf("发送消息失败: code=%d, msg=%s", sendRes.Code, sendRes.Msg) jaeger.RecordError(ctx, err) return } var conversation entity.Conversation id2 := bson.NewObjectID() conversation.Id = &id2 // 取地址赋值给指针类型 conversation.SessionId = fmt.Sprintf("%s_%s", toUserId, XhsPlatformName) conversation.UserId = toUserId conversation.CustomerServiceId = account.Id.Hex() conversation.Role = "assistant" conversation.Platform = XhsPlatformName conversation.MessageId = sendRes.Data.MessageId conversation.MessageType = "TEXT" conversation.Content = content now2 := gtime.Now().Time conversation.CreatedAt = &now2 // 取地址赋值给指针类型 _, err = commonMongo.GetDB().Collection(entity.ConversationCollection).InsertOne(ctx, conversation) if err != nil { jaeger.RecordError(ctx, err) return } g.Log().Infof(ctx, "[小红书] 发送消息成功: toUserId=%s, messageId=%s", toUserId, sendRes.Data.MessageId) return } func (s *xiaohongshu) GenerateSignature(ctx context.Context, secretKey, requestBody string) (res string) { h := sha256.New() h.Write([]byte(secretKey + requestBody)) res = hex.EncodeToString(h.Sum(nil)) return } // ==================== 私有方法 ==================== func (s *xiaohongshu) getAccountIdByPlatform(ctx context.Context) (res bson.ObjectID, err error) { var account entity.CustomerServiceAccount filter := bson.M{"platform": XhsPlatformName, "isDeleted": false} if err = dao.MongoDAO.FindOne(ctx, filter, &account, entity.CustomerServiceAccountCollection); err != nil { jaeger.RecordError(ctx, err) return } res = *account.Id // 解引用指针类型 return } func (s *xiaohongshu) processUserMessage(ctx context.Context, account *entity.CustomerServiceAccount, conversation *entity.Conversation) { if err := s.sendToRAGFlowStream(ctx, account, conversation); err != nil { jaeger.RecordError(ctx, err) g.Log().Errorf(ctx, "[小红书] 发送到RAGFlow Stream失败: %v", err) return } g.Log().Infof(ctx, "[小红书] 消息已发送到RAGFlow Stream: userId=%s", conversation.UserId) } func (s *xiaohongshu) sendToRAGFlowStream(ctx context.Context, account *entity.CustomerServiceAccount, conversation *entity.Conversation) (err error) { baseQueue := GetConfigString(ctx, "rabbitmq.responseQueue") replyQueue := rabbitmq.GetInstanceQueueName(baseQueue) msg := &redis.SendStreamMessage{ UserId: fmt.Sprintf("%s_%s", XhsPlatformName, conversation.UserId), TenantId: gconv.String(account.TenantId), Content: conversation.Content, Timestamp: gtime.New(conversation.CreatedAt).Timestamp(), MessageId: conversation.MessageId, Platform: XhsPlatformName, AccountId: account.Id.Hex(), AccountName: account.AccountName, ReplyQueue: replyQueue, } if sessionId, _ := redis.GetSessionCache(ctx, gconv.String(account.TenantId), msg.UserId); sessionId == "" { if history, histErr := dao.Conversation.GetRecentHistory(ctx, msg.UserId, redis.GetHistoryContextLimit()); histErr == nil && len(history) > 0 { msg.History = history g.Log().Infof(ctx, "[小红书] 用户已归档,读取 %d 轮历史对话", len(history)) } } streamMsgId, err := redis.AddToStream(ctx, redis.RAGFlowRequestStreamKey, msg) if err != nil { jaeger.RecordError(ctx, err) return } g.Log().Infof(ctx, "[小红书] 消息已写入Stream: streamMsgId=%s, sessionId=%s", streamMsgId, conversation.SessionId) return }