Files
customer-server/service/account_websocket_service.go

235 lines
6.5 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package service
import (
"context"
"customer-server/consts/account"
"customer-server/model/dto"
"fmt"
netHttp "net/http"
"gitea.com/red-future/common/beans"
"gitea.com/red-future/common/jaeger"
"github.com/gogf/gf/v2/container/gmap"
"github.com/gogf/gf/v2/encoding/gjson"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/ghttp"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/os/grpool"
"github.com/gogf/gf/v2/util/gconv"
"github.com/gorilla/websocket"
)
// AccountWebSocket 全局单例
var AccountWebSocket = &accountWebsocketService{
connections: gmap.NewStrAnyMap(true),
upgrader: websocket.Upgrader{
CheckOrigin: func(r *netHttp.Request) bool {
return true // 允许跨域
},
},
workerPool: grpool.New(50), // 限制最大并发数为50
}
type accountWebsocketService struct {
connections *gmap.StrAnyMap
upgrader websocket.Upgrader
workerPool *grpool.Pool // 工作池限制goroutine数量
}
// key: userId_platform
// accountWsConnection WebSocket 连接信息
type accountWsConnection struct {
AccountInfo *dto.AccountVO
UserId string
Conn *websocket.Conn
Headers map[string]string // 保存原始请求头
}
// Connect 建立 WebSocket 连接
func (s *accountWebsocketService) Connect(ctx context.Context, r *ghttp.Request, req *dto.AccountWebSocketConnectReq) error {
// 使用原生upgrader升级WebSocket连接
ws, err := s.upgrader.Upgrade(r.Response.Writer, r.Request, nil)
if err != nil {
jaeger.RecordError(ctx, err, "WebSocket 升级失败")
return err
}
// 获取客服账号信息
accountInfo, err := SessionToolService.GetAccountInfo(ctx, req.AccountCode)
if err != nil {
return err
}
if g.IsEmpty(accountInfo) {
return fmt.Errorf("客服账号不存在")
}
// 创建完整的用户信息
userInfo := &beans.User{
UserName: accountInfo.Creator,
TenantId: accountInfo.TenantId,
}
ctx = context.WithValue(ctx, "user", *userInfo)
// 提取并保存请求头(在连接升级前)
headers := make(map[string]string)
// 提取其他headers
for k, v := range r.Request.Header {
if len(v) > 0 {
headers[k] = v[0]
}
}
// 将完整用户信息序列化为JSON放到X-User-Info请求头
userInfoJson, err := gjson.Encode(userInfo)
if err != nil {
glog.Errorf(ctx, "用户信息序列化失败: %v", err)
} else {
headers["X-User-Info"] = string(userInfoJson)
glog.Debugf(ctx, "已添加用户信息到请求头: %s", string(userInfoJson))
}
var key = fmt.Sprintf("account:%s:%s:%s", req.AccountCode, account.GetDescByCode(req.Platform), req.UserId)
content, err := SessionToolService.PushOpeningRemark(ctx, req.UserId, accountInfo, headers)
if err != nil {
return err
}
if !g.IsEmpty(content) {
s.writeJSON(ws, &dto.WebSocketPushMsg{
Type: "message",
Message: content,
})
}
// 关闭旧连接
if old := s.connections.Get(key); old != nil {
old.(*accountWsConnection).Conn.Close()
}
// 注册新连接(携带完整用户信息)
s.connections.Set(key, &accountWsConnection{
AccountInfo: accountInfo,
UserId: req.UserId,
Conn: ws,
Headers: headers, // 保存请求头
})
// 处理消息(阻塞)
s.handleConnection(ctx, key, ws)
return nil
}
// handleConnection 处理 WebSocket 连接
func (s *accountWebsocketService) handleConnection(ctx context.Context, key string, conn *websocket.Conn) {
defer func() {
s.connections.Remove(key)
conn.Close()
glog.Infof(ctx, "WebSocket 连接断开 - %s", key)
}()
for {
msgType, message, err := conn.ReadMessage()
if err != nil {
// 排除正常关闭情况:正常关闭、离开页面、无状态码关闭
if websocket.IsUnexpectedCloseError(err,
websocket.CloseNormalClosure,
websocket.CloseGoingAway,
websocket.CloseNoStatusReceived,
) {
jaeger.RecordError(ctx, err, "WebSocket 读取错误")
}
break
}
if msgType != websocket.TextMessage {
continue
}
questionContent := gconv.String(message)
glog.Infof(ctx, "收到 WebSocket 消息 - %s: %s", key, questionContent)
// 解析连接信息
connInfo := s.connections.Get(key)
if connInfo == nil {
glog.Warningf(ctx, "WebSocket连接信息不存在 - %s", key)
break
}
wsConn := connInfo.(*accountWsConnection)
// 发送ack告知用户正在处理
s.writeJSON(conn, &dto.WebSocketPushMsg{Type: "ack", Message: "消息已接收,正在处理..."})
// 异步处理消息避免阻塞WebSocket连接使用工作池限制并发
s.workerPool.Add(ctx, func(poolCtx context.Context) {
defer func() {
if r := recover(); r != nil {
glog.Errorf(ctx, "WebSocket处理消息失败: %v", r)
}
}()
var content string
content, err = SessionToolService.PushDialog(ctx, wsConn.UserId, questionContent, wsConn.AccountInfo, wsConn.Headers)
if err != nil {
s.writeJSON(conn, &dto.WebSocketPushMsg{
Type: "error",
Content: err.Error(),
})
return
}
// 发送答案给前端
s.writeJSON(conn, &dto.WebSocketPushMsg{
Type: "answer",
Content: content,
})
})
}
}
// writeJSON 发送 JSON 消息(带错误处理)
func (s *accountWebsocketService) writeJSON(conn *websocket.Conn, data interface{}) {
jsonBytes, err := gjson.Encode(data)
if err != nil {
glog.Errorf(context.Background(), "JSON编码失败: %v", err)
return
}
if err := conn.WriteMessage(websocket.TextMessage, jsonBytes); err != nil {
glog.Errorf(context.Background(), "WebSocket写入失败: %v", err)
}
}
func (s *accountWebsocketService) AccountMsg(ctx context.Context, msg any) (err error) {
msgStr := gconv.Map(msg)
if g.IsEmpty(msgStr) {
g.Log().Error(ctx, "DocsChunkMsg err:", "msg is empty")
return
}
// 直接通过 key 获取连接
connAny := s.connections.Get(gconv.String(msgStr["key"]))
if connAny != nil {
wsConn := connAny.(*accountWsConnection)
s.writeJSON(wsConn.Conn, &dto.WebSocketPushMsg{
Type: "delay_msg",
Content: gconv.String(msgStr["data"]),
})
}
g.Log().Info(ctx, "DocsChunkMsg:", msgStr)
return
}
// Close 释放所有资源
func (s *accountWebsocketService) Close() {
if s.workerPool != nil {
s.workerPool.Close()
glog.Info(context.Background(), "WebSocket工作池已关闭")
}
// 关闭所有WebSocket连接
s.connections.LockFunc(func(m map[string]interface{}) {
for key, conn := range m {
if wsConn, ok := conn.(*accountWsConnection); ok && wsConn.Conn != nil {
wsConn.Conn.Close()
glog.Infof(context.Background(), "强制关闭WebSocket连接 - %s", key)
}
}
})
s.connections.Clear()
glog.Info(context.Background(), "WebSocket连接池已清空")
}