Files
customer-server/service/websocket_service.go
2026-03-14 10:02:49 +08:00

507 lines
16 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package service - WebSocket服务
// 功能WebSocket连接管理、消息推送、心跳维护
package service
import (
"context"
"customer-server/dao"
"customer-server/model/dto"
"customer-server/model/entity"
"customer-server/util"
"errors"
"net/http"
commonMongo "gitea.com/red-future/common/db/mongo"
"gitea.com/red-future/common/jaeger"
"gitea.com/red-future/common/rabbitmq"
"gitea.com/red-future/common/redis"
"gitea.com/red-future/common/utils"
"github.com/gogf/gf/v2/container/gmap"
"github.com/gogf/gf/v2/encoding/gjson"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/net/ghttp"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/os/gtime"
"github.com/gogf/gf/v2/util/gconv"
"github.com/gorilla/websocket"
"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo"
)
// WebSocket 全局单例
var WebSocket = &websocketService{
connections: gmap.NewStrAnyMap(true),
upgrader: websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true // 允许跨域
},
},
}
// GoFrame 并发安全 Map
type websocketService struct {
connections *gmap.StrAnyMap
upgrader websocket.Upgrader
}
// key: userId_platform
// wsConnection WebSocket 连接信息
type wsConnection struct {
UserId string
Platform string
TenantId string
AccountName string // 客服账号ID
Conn *websocket.Conn
CreatedAt int64
}
// 租户ID缓存 Key 前缀和过期时间
const (
tenantCacheKeyPrefix = "tenant:custsvc:"
tenantCacheExpire = 300 // 5分钟
)
// resolveTenantId 获取租户ID兼容仅有accountName的场景
// 参数: ctx - 上下文r - HTTP请求对象
// 返回: tenantId - 租户IDerr - 错误信息
// 功能: 优先从token获取其次从客服账号查询支持缓存
func (s *websocketService) resolveTenantId(ctx context.Context, r *ghttp.Request) (tenantId string, err error) {
// 1. 优先从 token 获取
if user, userErr := util.GetTenantInfo(ctx); userErr == nil {
if id := gconv.String(user.TenantId); id != "" {
return id, nil
}
}
if r == nil {
return "", gerror.New("无法获取租户信息:缺少请求上下文")
}
// 2. 从请求参数获取 accountName
custId := r.Get("accountName").String()
if custId == "" {
custId = r.Get("account_name").String()
}
if custId == "" {
return "", gerror.New("缺少 accountName 参数")
}
// 2. 从 Redis 缓存查询
cacheKey := tenantCacheKeyPrefix + custId
if cached, _ := redis.RedisClient().Get(ctx, cacheKey); !cached.IsEmpty() {
return cached.String(), nil
}
// 3. Redis 未命中,查询 MongoDB
coll := commonMongo.GetDB().Collection(entity.CustomerServiceAccountCollection)
var doc struct {
TenantId interface{} `bson:"tenantId"`
}
filters := []bson.M{
{"accountName": custId, "isDeleted": false},
{"accountName": custId}, // 兼容旧数据未设置 isDeleted
}
if objectId, objErr := bson.ObjectIDFromHex(custId); objErr == nil {
filters = append(filters,
bson.M{"_id": objectId, "isDeleted": false},
bson.M{"_id": objectId},
)
}
for _, filter := range filters {
if err = coll.FindOne(ctx, filter).Decode(&doc); err == nil {
tenantId = gconv.String(doc.TenantId)
if tenantId == "" {
return "", gerror.Newf("客服账号 %s 未配置 tenantId", custId)
}
// 4. 写入 Redis 缓存
redis.RedisClient().SetEX(ctx, cacheKey, tenantId, tenantCacheExpire)
return
}
if !errors.Is(err, mongo.ErrNoDocuments) {
return
}
}
return "", gerror.Newf("客服账号 %s 不存在", custId)
}
// Connect 建立 WebSocket 连接
func (s *websocketService) Connect(ctx context.Context, r *ghttp.Request, userId, platform string) error {
// 使用原生upgrader升级WebSocket连接
ws, err := s.upgrader.Upgrade(r.Response.Writer, r.Request, nil)
if err != nil {
jaeger.RecordError(ctx, err, "WebSocket 升级失败")
return err
}
defer ws.Close()
tenantId, err := s.resolveTenantId(ctx, r)
if err != nil {
jaeger.RecordError(ctx, err, "获取租户ID失败")
return err
}
// 读取accountName参数客服账号名称
accountName := r.Get("accountName").String()
if accountName == "" {
accountName = r.Get("account_name").String()
}
glog.Infof(ctx, "WebSocket 连接建立 - 用户: %s, 平台: %s, 租户: %s, 客服账号: %s", userId, platform, tenantId, accountName)
// key格式: tenantId:userId_platform (确保租户隔离)
key := tenantId + ":" + userId + "_" + platform
// 关闭旧连接
if old := s.connections.Get(key); old != nil {
old.(*wsConnection).Conn.Close()
}
// 注册新连接(携带 TenantId 和 AccountName
s.connections.Set(key, &wsConnection{
UserId: userId,
Platform: platform,
TenantId: tenantId,
AccountName: accountName,
Conn: ws,
CreatedAt: gtime.Now().Timestamp(),
})
// 发送开场白(连接建立后立即推送)
if accountName != "" {
greeting := s.getGreeting(ctx, accountName, tenantId)
if greeting != "" {
s.writeJSON(ws, &dto.WebSocketPushMsg{
Type: "message",
Message: greeting,
})
glog.Infof(ctx, "已发送开场白 - 用户: %s, 客服账号: %s, 长度: %d", userId, accountName, len(greeting))
} else {
glog.Warningf(ctx, "客服账号未配置开场白 - accountName: %s, tenantId: %s", accountName, tenantId)
}
}
// 处理消息(阻塞)
s.handleConnection(ctx, key, ws)
return nil
}
// handleConnection 处理 WebSocket 连接
func (s *websocketService) handleConnection(ctx context.Context, key string, conn *websocket.Conn) {
defer func() {
s.connections.Remove(key)
conn.Close()
glog.Infof(ctx, "WebSocket 连接断开 - %s", key)
}()
for {
msgType, message, err := conn.ReadMessage()
if err != nil {
// 排除正常关闭情况:正常关闭、离开页面、无状态码关闭
if websocket.IsUnexpectedCloseError(err,
websocket.CloseNormalClosure,
websocket.CloseGoingAway,
websocket.CloseNoStatusReceived,
) {
jaeger.RecordError(ctx, err, "WebSocket 读取错误")
}
break
}
if msgType != websocket.TextMessage {
continue
}
content := gconv.String(message)
glog.Infof(ctx, "收到 WebSocket 消息 - %s: %s", key, content)
// 解析 userId
connInfo := s.connections.Get(key)
if connInfo == nil {
break
}
wsConn := connInfo.(*wsConnection)
// 先检查对话轮数,>5 则只发卡片,跳过话术
// checkCardBeforeProcess 已推送卡片消息无需ack
if handled, err := checkCardBeforeProcess(ctx, wsConn.TenantId, wsConn.UserId, wsConn.Platform); err != nil {
jaeger.RecordError(ctx, err, "卡片检查失败")
} else if handled {
continue
}
// 话术匹配并发布响应
// status 暂时为空,表示任意行为匹配
// isPushed=true表示已直接推送响应话术匹配无需ack
// isPushed=false表示转发到RAGFlow需要ack告知用户正在处理
// 创建带有accountName的context供GetTenantInfo使用
newCtx := ctx
if wsConn.AccountName != "" {
newCtx = context.WithValue(ctx, "accountName", wsConn.AccountName)
}
isPushed, err := Speechcraft.ProcessAndPublish(newCtx, wsConn.UserId, wsConn.Platform, wsConn.TenantId, content, "", wsConn.AccountName)
if err != nil {
jaeger.RecordError(ctx, err, "话术处理失败")
s.writeJSON(conn, &dto.WebSocketPushMsg{Type: "error", Message: "消息处理失败"})
continue
}
// 只在转发到RAGFlow时发送ackGo直接返回的不需要ack
if !isPushed {
s.writeJSON(conn, &dto.WebSocketPushMsg{Type: "ack", Message: "消息已接收,正在处理..."})
}
}
}
// writeJSON 发送 JSON 消息
func (s *websocketService) writeJSON(conn *websocket.Conn, data interface{}) {
jsonBytes, _ := gjson.Encode(data)
conn.WriteMessage(websocket.TextMessage, jsonBytes)
}
// getGreeting 获取客服账号的开场白
func (s *websocketService) getGreeting(ctx context.Context, accountName, tenantId string) string {
glog.Infof(ctx, "查询开场白 - accountName: %s, tenantId: %s", accountName, tenantId)
// 复用dao层方法保持查询逻辑一致
account, err := dao.CustomerServiceAccount.FindByAccountName(ctx, accountName)
if err != nil {
jaeger.RecordError(ctx, err, "查询客服账号开场白失败")
glog.Errorf(ctx, "查询开场白失败: %v", err)
return ""
}
if account == nil {
glog.Warningf(ctx, "客服账号不存在: accountName=%s", accountName)
return ""
}
// 详细输出查询结果
glog.Infof(ctx, "查询到客服账号: Id=%s, AccountName=%s, TenantId=%v, Greeting长度=%d, Platform=%s",
account.Id.Hex(), account.AccountName, account.TenantId, len(account.Greeting), account.Platform)
return account.Greeting
}
// Send 发送消息到 Redis StreamHTTP 接口)
func (s *websocketService) Send(ctx context.Context, req *dto.WebSocketSendReq) (*dto.WebSocketSendRes, error) {
// 从 token 获取租户ID
var tenantId string
if user, err := utils.GetUserInfo(ctx); err == nil {
tenantId = gconv.String(user.TenantId)
}
messageId, err := s.sendToStream(ctx, req.UserId, tenantId, req.Content)
if err != nil {
return nil, err
}
return &dto.WebSocketSendRes{MessageId: messageId}, nil
}
// sendToStream 发送消息到 Redis Stream
// 如果用户无 session 缓存(已归档),则从 MongoDB 读取历史对话一起发送
func (s *websocketService) sendToStream(ctx context.Context, userId, tenantId, content string) (string, error) {
now := gtime.Now()
platform := "xiaohongshu" // 默认平台
// 获取当前实例的动态响应队列名(自动生成,支持多实例部署)
baseQueue := GetConfigString(ctx, "rabbitmq.responseQueue")
replyQueue := rabbitmq.GetInstanceQueueName(baseQueue)
// 获取accountName优先使用用户选择的方向映射
var accountName string
var chatId string
// 1. 尝试从用户状态获取(用户选择方向后的映射)
if userState, stateErr := redis.GetUserState(ctx, userId, platform); stateErr == nil && userState.AccountName != "" {
accountName = userState.AccountName
glog.Infof(ctx, "使用用户选择方向的客服账号: %s", accountName)
} else {
// 2. 从连接信息获取(默认)
key := tenantId + ":" + userId + "_" + platform
connInfo := s.connections.Get(key)
if connInfo != nil {
wsConn := connInfo.(*wsConnection)
accountName = wsConn.AccountName
glog.Infof(ctx, "使用连接默认客服账号: %s", accountName)
}
}
// 根据accountName查询ragflow_config获取chat_id
if accountName != "" {
config, err := dao.RAGFlowConfig.FindByAccountName(ctx, accountName)
if err == nil && config != nil {
chatId = config.ChatId
glog.Infof(ctx, "查询到chatId: accountName=%s, chatId=%s", accountName, chatId)
}
}
// 如果未找到chatId报错应该从ragflowconfig表重建session
if chatId == "" {
return "", gerror.New("chatId未找到需要重建RAGFlow session")
}
msg := &redis.SendStreamMessage{
UserId: userId,
TenantId: tenantId,
AccountName: accountName,
ChatId: chatId,
Content: content,
Timestamp: now.Timestamp(),
MessageId: userId + "_" + gconv.String(now.TimestampNano()),
ReplyQueue: replyQueue,
}
// 检查是否有 session 缓存,无缓存说明已归档,需要读取历史
if sessionId, _ := redis.GetSessionCache(ctx, tenantId, userId); sessionId == "" {
// 从 MongoDB 读取历史对话
if history, err := dao.Conversation.GetRecentHistory(ctx, userId, redis.GetHistoryContextLimit()); err == nil && len(history) > 0 {
msg.History = history
glog.Infof(ctx, "用户已归档,读取 %d 轮历史对话 - 用户: %s", len(history), userId)
}
}
messageId, err := redis.AddToStream(ctx, redis.RAGFlowRequestStreamKey, msg)
if err != nil {
return "", err
}
glog.Infof(ctx, "消息已发送到 Stream - MessageID: %s, 用户: %s", messageId, userId)
return messageId, nil
}
// SendToUser 发送消息给指定用户
func (s *websocketService) SendToUser(ctx context.Context, tenantId, userId, platform string, data interface{}) error {
// key格式: tenantId:userId_platform
key := tenantId + ":" + userId + "_" + platform
connInfo := s.connections.Get(key)
if connInfo == nil {
glog.Warningf(ctx, "用户不在线 - %s", key)
return nil
}
s.writeJSON(connInfo.(*wsConnection).Conn, data)
return nil
}
// Broadcast 广播消息给所有连接
func (s *websocketService) Broadcast(ctx context.Context, content string) {
msg := &dto.WebSocketPushMsg{Type: "broadcast", Content: content}
msgBytes := gjson.MustEncode(msg)
s.connections.Iterator(func(key string, value interface{}) bool {
conn := value.(*wsConnection)
if err := conn.Conn.WriteMessage(websocket.TextMessage, msgBytes); err != nil {
jaeger.RecordError(ctx, err, "广播消息失败 - "+key)
}
return true
})
}
// GetOnlineUsers 获取在线用户列表
func (s *websocketService) GetOnlineUsers() *dto.WebSocketOnlineRes {
users := make([]dto.WebSocketOnlineUserRes, 0, s.connections.Size())
s.connections.Iterator(func(_ string, value interface{}) bool {
conn := value.(*wsConnection)
users = append(users, dto.WebSocketOnlineUserRes{
UserId: conn.UserId,
Platform: conn.Platform,
CreatedAt: conn.CreatedAt,
})
return true
})
return &dto.WebSocketOnlineRes{
Count: len(users),
Users: users,
}
}
// PushRAGFlowResponse 推送 RAGFlow 响应给用户
func (s *websocketService) PushRAGFlowResponse(ctx context.Context, tenantId, userId, platform, content string) error {
return s.SendToUser(ctx, tenantId, userId, platform, &dto.WebSocketPushMsg{Type: "answer", Content: content})
}
// ============== RabbitMQ 消费者(追问延时消息)==============
// FollowUpConsumer 追问消费者
type FollowUpConsumer struct {
consumer *rabbitmq.Consumer
}
// NewFollowUpConsumer 创建追问消费者
func NewFollowUpConsumer(ctx context.Context) *FollowUpConsumer {
queueName := GetConfigString(ctx, "followUp.queue")
return &FollowUpConsumer{
consumer: rabbitmq.NewConsumer(queueName, handleFollowUp),
}
}
// Start 启动消费者
func (c *FollowUpConsumer) Start(ctx context.Context) (err error) {
glog.Info(ctx, "追问消费者启动...")
return c.consumer.Start(ctx)
}
// Stop 停止消费者
func (c *FollowUpConsumer) Stop(ctx context.Context) {
c.consumer.Stop(ctx)
}
// handleFollowUp 处理追问消息
func handleFollowUp(ctx context.Context, body []byte) (err error) {
ctx, span := jaeger.NewSpan(ctx, "consumer.followup")
defer span.End()
var msg redis.FollowUpMessage
if err = gjson.DecodeTo(body, &msg); err != nil {
jaeger.RecordError(ctx, err, "解析追问消息失败")
return
}
glog.Infof(ctx, "收到追问消息 - 用户: %s, 类型: %d", msg.UserId, msg.FollowUpType)
// 检查用户状态如果在状态5方向选择或状态3发卡片跳过追问
userState, err := redis.GetUserState(ctx, msg.UserId, msg.Platform)
if err != nil {
jaeger.RecordError(ctx, err, "获取用户状态失败")
return
}
if userState.Stage == 5 {
glog.Infof(ctx, "用户 %s 在方向选择状态,跳过追问", msg.UserId)
return
}
if userState.Stage == 3 {
glog.Infof(ctx, "用户 %s 在发卡片状态,跳过追问", msg.UserId)
return
}
// 检查用户是否在追问发送后有活跃
isActive, err := redis.IsUserActive(ctx, msg.UserId, int64(redis.GetFollowUpDelay(msg.FollowUpType)))
if err != nil {
jaeger.RecordError(ctx, err, "检查用户活跃状态失败")
return
}
if isActive {
glog.Infof(ctx, "用户 %s 在追问期间有活跃,跳过追问", msg.UserId)
return
}
// 发送追问消息给用户
if err = WebSocket.PushRAGFlowResponse(ctx, msg.TenantId, msg.UserId, msg.Platform, msg.Content); err != nil {
jaeger.RecordError(ctx, err, "推送追问消息失败")
}
glog.Infof(ctx, "追问消息已发送 - 租户: %s, 用户: %s, 内容: %s", msg.TenantId, msg.UserId, msg.Content)
return
}