// Package service - Webhook服务 // 功能:接收并处理平台(小红书、抖音)的webhook消息 package service import ( "context" "customer-server/dao" "customer-server/model/dto" "gitea.com/red-future/common/jaeger" "gitea.com/red-future/common/rabbitmq" "gitea.com/red-future/common/redis" "gitea.com/red-future/common/utils" "github.com/gogf/gf/v2/os/glog" "github.com/gogf/gf/v2/os/gtime" "github.com/gogf/gf/v2/util/gconv" ) var Webhook = new(webhookService) type webhookService struct{} // Receive 接收平台消息并写入队列 func (s *webhookService) Receive(ctx context.Context, req *dto.WebhookReceiveReq) (res *dto.WebhookReceiveRes, err error) { glog.Infof(ctx, "收到 Webhook 消息 - 平台: %s, 用户: %s, 内容: %s", req.Platform, req.UserId, req.Content) // 生成消息ID now := gtime.Now() messageId := req.Platform + "_" + req.UserId + "_" + gconv.String(now.TimestampNano()) if req.MsgId != "" { messageId = req.MsgId // 使用平台消息ID(便于去重) } // 从 token 获取租户ID var tenantId string if user, userErr := utils.GetUserInfo(ctx); userErr == nil { tenantId = gconv.String(user.TenantId) } // 构造消息 userId := req.Platform + "_" + req.UserId // 添加平台前缀 // 获取当前实例的动态响应队列名(自动生成,支持多实例部署) baseQueue := GetConfigString(ctx, "rabbitmq.responseQueue") replyQueue := rabbitmq.GetInstanceQueueName(baseQueue) msg := &redis.SendStreamMessage{ UserId: userId, TenantId: tenantId, Content: req.Content, Timestamp: now.Timestamp(), MessageId: messageId, Platform: req.Platform, AccountId: req.AccountId, ReplyQueue: replyQueue, } // 检查是否有 session 缓存,无缓存说明已归档,需要读取历史 if sessionId, _ := redis.GetSessionCache(ctx, tenantId, userId); sessionId == "" { if history, histErr := dao.Conversation.GetRecentHistory(ctx, userId, redis.GetHistoryContextLimit()); histErr == nil && len(history) > 0 { msg.History = history glog.Infof(ctx, "用户已归档,读取 %d 轮历史对话 - 用户: %s", len(history), userId) } } // 写入 Redis Stream streamMsgId, err := redis.AddToStream(ctx, redis.RAGFlowRequestStreamKey, msg) if err != nil { jaeger.RecordError(ctx, err, "写入 Stream 失败") return } glog.Infof(ctx, "消息已写入 Stream - MessageID: %s", streamMsgId) res = &dto.WebhookReceiveRes{ Success: true, MsgId: streamMsgId, } return } // GetHistory 查询用户对话记录 func (s *webhookService) GetHistory(ctx context.Context, req *dto.ConversationListReq) (res *dto.ConversationListRes, err error) { list, err := dao.Conversation.FindByUserId(ctx, req.UserId, req.Limit) if err != nil { jaeger.RecordError(ctx, err, "查询对话记录失败") return } res = &dto.ConversationListRes{ List: make([]*dto.ConversationItem, 0, len(list)), } for _, item := range list { res.List = append(res.List, &dto.ConversationItem{ Question: item.Question, Answer: item.Answer, MsgTime: gtime.New(item.MsgTime).Format("Y-m-d H:i:s"), SessionId: item.SessionId, }) } return }