// Package service - WebSocket服务 // 功能:WebSocket连接管理、消息推送、心跳维护 package service import ( "context" "customer-server/dao" "customer-server/model/dto" "customer-server/model/entity" "customer-server/util" "errors" "net/http" commonMongo "gitea.com/red-future/common/db/mongo" "gitea.com/red-future/common/jaeger" "gitea.com/red-future/common/rabbitmq" "gitea.com/red-future/common/redis" "gitea.com/red-future/common/utils" "github.com/gogf/gf/v2/container/gmap" "github.com/gogf/gf/v2/encoding/gjson" "github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/net/ghttp" "github.com/gogf/gf/v2/os/glog" "github.com/gogf/gf/v2/os/gtime" "github.com/gogf/gf/v2/util/gconv" "github.com/gorilla/websocket" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" ) // WebSocket 全局单例 var WebSocket = &websocketService{ connections: gmap.NewStrAnyMap(true), upgrader: websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true // 允许跨域 }, }, } // GoFrame 并发安全 Map type websocketService struct { connections *gmap.StrAnyMap upgrader websocket.Upgrader } // key: userId_platform // wsConnection WebSocket 连接信息 type wsConnection struct { UserId string Platform string TenantId string AccountName string // 客服账号ID Conn *websocket.Conn CreatedAt int64 } // 租户ID缓存 Key 前缀和过期时间 const ( tenantCacheKeyPrefix = "tenant:custsvc:" tenantCacheExpire = 300 // 5分钟 ) // resolveTenantId 获取租户ID(兼容仅有accountName的场景) // 参数: ctx - 上下文,r - HTTP请求对象 // 返回: tenantId - 租户ID,err - 错误信息 // 功能: 优先从token获取,其次从客服账号查询,支持缓存 func (s *websocketService) resolveTenantId(ctx context.Context, r *ghttp.Request) (tenantId string, err error) { // 1. 优先从 token 获取 if user, userErr := util.GetTenantInfo(ctx); userErr == nil { if id := gconv.String(user.TenantId); id != "" { return id, nil } } if r == nil { return "", gerror.New("无法获取租户信息:缺少请求上下文") } // 2. 从请求参数获取 accountName custId := r.Get("accountName").String() if custId == "" { custId = r.Get("account_name").String() } if custId == "" { return "", gerror.New("缺少 accountName 参数") } // 2. 从 Redis 缓存查询 cacheKey := tenantCacheKeyPrefix + custId if cached, _ := redis.RedisClient().Get(ctx, cacheKey); !cached.IsEmpty() { return cached.String(), nil } // 3. Redis 未命中,查询 MongoDB coll := commonMongo.GetDB().Collection(entity.CustomerServiceAccountCollection) var doc struct { TenantId interface{} `bson:"tenantId"` } filters := []bson.M{ {"accountName": custId, "isDeleted": false}, {"accountName": custId}, // 兼容旧数据未设置 isDeleted } if objectId, objErr := bson.ObjectIDFromHex(custId); objErr == nil { filters = append(filters, bson.M{"_id": objectId, "isDeleted": false}, bson.M{"_id": objectId}, ) } for _, filter := range filters { if err = coll.FindOne(ctx, filter).Decode(&doc); err == nil { tenantId = gconv.String(doc.TenantId) if tenantId == "" { return "", gerror.Newf("客服账号 %s 未配置 tenantId", custId) } // 4. 写入 Redis 缓存 redis.RedisClient().SetEX(ctx, cacheKey, tenantId, tenantCacheExpire) return } if !errors.Is(err, mongo.ErrNoDocuments) { return } } return "", gerror.Newf("客服账号 %s 不存在", custId) } // Connect 建立 WebSocket 连接 func (s *websocketService) Connect(ctx context.Context, r *ghttp.Request, userId, platform string) error { // 使用原生upgrader升级WebSocket连接 ws, err := s.upgrader.Upgrade(r.Response.Writer, r.Request, nil) if err != nil { jaeger.RecordError(ctx, err, "WebSocket 升级失败") return err } defer ws.Close() tenantId, err := s.resolveTenantId(ctx, r) if err != nil { jaeger.RecordError(ctx, err, "获取租户ID失败") return err } // 读取accountName参数(客服账号名称) accountName := r.Get("accountName").String() if accountName == "" { accountName = r.Get("account_name").String() } glog.Infof(ctx, "WebSocket 连接建立 - 用户: %s, 平台: %s, 租户: %s, 客服账号: %s", userId, platform, tenantId, accountName) // key格式: tenantId:userId_platform (确保租户隔离) key := tenantId + ":" + userId + "_" + platform // 关闭旧连接 if old := s.connections.Get(key); old != nil { old.(*wsConnection).Conn.Close() } // 注册新连接(携带 TenantId 和 AccountName) s.connections.Set(key, &wsConnection{ UserId: userId, Platform: platform, TenantId: tenantId, AccountName: accountName, Conn: ws, CreatedAt: gtime.Now().Timestamp(), }) // 发送开场白(连接建立后立即推送) if accountName != "" { greeting := s.getGreeting(ctx, accountName, tenantId) if greeting != "" { s.writeJSON(ws, &dto.WebSocketPushMsg{ Type: "message", Message: greeting, }) glog.Infof(ctx, "已发送开场白 - 用户: %s, 客服账号: %s, 长度: %d", userId, accountName, len(greeting)) } else { glog.Warningf(ctx, "客服账号未配置开场白 - accountName: %s, tenantId: %s", accountName, tenantId) } } // 处理消息(阻塞) s.handleConnection(ctx, key, ws) return nil } // handleConnection 处理 WebSocket 连接 func (s *websocketService) handleConnection(ctx context.Context, key string, conn *websocket.Conn) { defer func() { s.connections.Remove(key) conn.Close() glog.Infof(ctx, "WebSocket 连接断开 - %s", key) }() for { msgType, message, err := conn.ReadMessage() if err != nil { // 排除正常关闭情况:正常关闭、离开页面、无状态码关闭 if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseNoStatusReceived, ) { jaeger.RecordError(ctx, err, "WebSocket 读取错误") } break } if msgType != websocket.TextMessage { continue } content := gconv.String(message) glog.Infof(ctx, "收到 WebSocket 消息 - %s: %s", key, content) // 解析 userId connInfo := s.connections.Get(key) if connInfo == nil { break } wsConn := connInfo.(*wsConnection) // 先检查对话轮数,>5 则只发卡片,跳过话术 // checkCardBeforeProcess 已推送卡片消息,无需ack if handled, err := checkCardBeforeProcess(ctx, wsConn.TenantId, wsConn.UserId, wsConn.Platform); err != nil { jaeger.RecordError(ctx, err, "卡片检查失败") } else if handled { continue } // 话术匹配并发布响应 // status 暂时为空,表示任意行为匹配 // isPushed=true表示已直接推送响应(话术匹配),无需ack // isPushed=false表示转发到RAGFlow,需要ack告知用户正在处理 // 创建带有accountName的context,供GetTenantInfo使用 newCtx := ctx if wsConn.AccountName != "" { newCtx = context.WithValue(ctx, "accountName", wsConn.AccountName) } isPushed, err := Speechcraft.ProcessAndPublish(newCtx, wsConn.UserId, wsConn.Platform, wsConn.TenantId, content, "", wsConn.AccountName) if err != nil { jaeger.RecordError(ctx, err, "话术处理失败") s.writeJSON(conn, &dto.WebSocketPushMsg{Type: "error", Message: "消息处理失败"}) continue } // 只在转发到RAGFlow时发送ack(Go直接返回的不需要ack) if !isPushed { s.writeJSON(conn, &dto.WebSocketPushMsg{Type: "ack", Message: "消息已接收,正在处理..."}) } } } // writeJSON 发送 JSON 消息 func (s *websocketService) writeJSON(conn *websocket.Conn, data interface{}) { jsonBytes, _ := gjson.Encode(data) conn.WriteMessage(websocket.TextMessage, jsonBytes) } // getGreeting 获取客服账号的开场白 func (s *websocketService) getGreeting(ctx context.Context, accountName, tenantId string) string { glog.Infof(ctx, "查询开场白 - accountName: %s, tenantId: %s", accountName, tenantId) // 复用dao层方法,保持查询逻辑一致 account, err := dao.CustomerServiceAccount.FindByAccountName(ctx, accountName) if err != nil { jaeger.RecordError(ctx, err, "查询客服账号开场白失败") glog.Errorf(ctx, "查询开场白失败: %v", err) return "" } if account == nil { glog.Warningf(ctx, "客服账号不存在: accountName=%s", accountName) return "" } // 详细输出查询结果 glog.Infof(ctx, "查询到客服账号: Id=%s, AccountName=%s, TenantId=%v, Greeting长度=%d, Platform=%s", account.Id.Hex(), account.AccountName, account.TenantId, len(account.Greeting), account.Platform) return account.Greeting } // Send 发送消息到 Redis Stream(HTTP 接口) func (s *websocketService) Send(ctx context.Context, req *dto.WebSocketSendReq) (*dto.WebSocketSendRes, error) { // 从 token 获取租户ID var tenantId string if user, err := utils.GetUserInfo(ctx); err == nil { tenantId = gconv.String(user.TenantId) } messageId, err := s.sendToStream(ctx, req.UserId, tenantId, req.Content) if err != nil { return nil, err } return &dto.WebSocketSendRes{MessageId: messageId}, nil } // sendToStream 发送消息到 Redis Stream // 如果用户无 session 缓存(已归档),则从 MongoDB 读取历史对话一起发送 func (s *websocketService) sendToStream(ctx context.Context, userId, tenantId, content string) (string, error) { now := gtime.Now() platform := "xiaohongshu" // 默认平台 // 获取当前实例的动态响应队列名(自动生成,支持多实例部署) baseQueue := GetConfigString(ctx, "rabbitmq.responseQueue") replyQueue := rabbitmq.GetInstanceQueueName(baseQueue) // 获取accountName(优先使用用户选择的方向映射) var accountName string var chatId string // 1. 尝试从用户状态获取(用户选择方向后的映射) if userState, stateErr := redis.GetUserState(ctx, userId, platform); stateErr == nil && userState.AccountName != "" { accountName = userState.AccountName glog.Infof(ctx, "使用用户选择方向的客服账号: %s", accountName) } else { // 2. 从连接信息获取(默认) key := tenantId + ":" + userId + "_" + platform connInfo := s.connections.Get(key) if connInfo != nil { wsConn := connInfo.(*wsConnection) accountName = wsConn.AccountName glog.Infof(ctx, "使用连接默认客服账号: %s", accountName) } } // 根据accountName查询ragflow_config获取chat_id if accountName != "" { config, err := dao.RAGFlowConfig.FindByAccountName(ctx, accountName) if err == nil && config != nil { chatId = config.ChatId glog.Infof(ctx, "查询到chatId: accountName=%s, chatId=%s", accountName, chatId) } } // 如果未找到chatId,报错(应该从ragflowconfig表重建session) if chatId == "" { return "", gerror.New("chatId未找到,需要重建RAGFlow session") } msg := &redis.SendStreamMessage{ UserId: userId, TenantId: tenantId, AccountName: accountName, ChatId: chatId, Content: content, Timestamp: now.Timestamp(), MessageId: userId + "_" + gconv.String(now.TimestampNano()), ReplyQueue: replyQueue, } // 检查是否有 session 缓存,无缓存说明已归档,需要读取历史 if sessionId, _ := redis.GetSessionCache(ctx, tenantId, userId); sessionId == "" { // 从 MongoDB 读取历史对话 if history, err := dao.Conversation.GetRecentHistory(ctx, userId, redis.GetHistoryContextLimit()); err == nil && len(history) > 0 { msg.History = history glog.Infof(ctx, "用户已归档,读取 %d 轮历史对话 - 用户: %s", len(history), userId) } } messageId, err := redis.AddToStream(ctx, redis.RAGFlowRequestStreamKey, msg) if err != nil { return "", err } glog.Infof(ctx, "消息已发送到 Stream - MessageID: %s, 用户: %s", messageId, userId) return messageId, nil } // SendToUser 发送消息给指定用户 func (s *websocketService) SendToUser(ctx context.Context, tenantId, userId, platform string, data interface{}) error { // key格式: tenantId:userId_platform key := tenantId + ":" + userId + "_" + platform connInfo := s.connections.Get(key) if connInfo == nil { glog.Warningf(ctx, "用户不在线 - %s", key) return nil } s.writeJSON(connInfo.(*wsConnection).Conn, data) return nil } // Broadcast 广播消息给所有连接 func (s *websocketService) Broadcast(ctx context.Context, content string) { msg := &dto.WebSocketPushMsg{Type: "broadcast", Content: content} msgBytes := gjson.MustEncode(msg) s.connections.Iterator(func(key string, value interface{}) bool { conn := value.(*wsConnection) if err := conn.Conn.WriteMessage(websocket.TextMessage, msgBytes); err != nil { jaeger.RecordError(ctx, err, "广播消息失败 - "+key) } return true }) } // GetOnlineUsers 获取在线用户列表 func (s *websocketService) GetOnlineUsers() *dto.WebSocketOnlineRes { users := make([]dto.WebSocketOnlineUserRes, 0, s.connections.Size()) s.connections.Iterator(func(_ string, value interface{}) bool { conn := value.(*wsConnection) users = append(users, dto.WebSocketOnlineUserRes{ UserId: conn.UserId, Platform: conn.Platform, CreatedAt: conn.CreatedAt, }) return true }) return &dto.WebSocketOnlineRes{ Count: len(users), Users: users, } } // PushRAGFlowResponse 推送 RAGFlow 响应给用户 func (s *websocketService) PushRAGFlowResponse(ctx context.Context, tenantId, userId, platform, content string) error { return s.SendToUser(ctx, tenantId, userId, platform, &dto.WebSocketPushMsg{Type: "answer", Content: content}) } // ============== RabbitMQ 消费者(追问延时消息)============== // FollowUpConsumer 追问消费者 type FollowUpConsumer struct { consumer *rabbitmq.Consumer } // NewFollowUpConsumer 创建追问消费者 func NewFollowUpConsumer(ctx context.Context) *FollowUpConsumer { queueName := GetConfigString(ctx, "followUp.queue") return &FollowUpConsumer{ consumer: rabbitmq.NewConsumer(queueName, handleFollowUp), } } // Start 启动消费者 func (c *FollowUpConsumer) Start(ctx context.Context) (err error) { glog.Info(ctx, "追问消费者启动...") return c.consumer.Start(ctx) } // Stop 停止消费者 func (c *FollowUpConsumer) Stop(ctx context.Context) { c.consumer.Stop(ctx) } // handleFollowUp 处理追问消息 func handleFollowUp(ctx context.Context, body []byte) (err error) { ctx, span := jaeger.NewSpan(ctx, "consumer.followup") defer span.End() var msg redis.FollowUpMessage if err = gjson.DecodeTo(body, &msg); err != nil { jaeger.RecordError(ctx, err, "解析追问消息失败") return } glog.Infof(ctx, "收到追问消息 - 用户: %s, 类型: %d", msg.UserId, msg.FollowUpType) // 检查用户状态,如果在状态5(方向选择)或状态3(发卡片),跳过追问 userState, err := redis.GetUserState(ctx, msg.UserId, msg.Platform) if err != nil { jaeger.RecordError(ctx, err, "获取用户状态失败") return } if userState.Stage == 5 { glog.Infof(ctx, "用户 %s 在方向选择状态,跳过追问", msg.UserId) return } if userState.Stage == 3 { glog.Infof(ctx, "用户 %s 在发卡片状态,跳过追问", msg.UserId) return } // 检查用户是否在追问发送后有活跃 isActive, err := redis.IsUserActive(ctx, msg.UserId, int64(redis.GetFollowUpDelay(msg.FollowUpType))) if err != nil { jaeger.RecordError(ctx, err, "检查用户活跃状态失败") return } if isActive { glog.Infof(ctx, "用户 %s 在追问期间有活跃,跳过追问", msg.UserId) return } // 发送追问消息给用户 if err = WebSocket.PushRAGFlowResponse(ctx, msg.TenantId, msg.UserId, msg.Platform, msg.Content); err != nil { jaeger.RecordError(ctx, err, "推送追问消息失败") } glog.Infof(ctx, "追问消息已发送 - 租户: %s, 用户: %s, 内容: %s", msg.TenantId, msg.UserId, msg.Content) return }