Files
customer-server/service/session_service.go
2026-03-14 10:02:49 +08:00

86 lines
2.4 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package service - 会话服务
// 功能:用户会话管理、状态维护
package service
import (
"context"
"customer-server/dao"
"gitea.com/red-future/common/jaeger"
"gitea.com/red-future/common/rabbitmq"
"gitea.com/red-future/common/redis"
"github.com/gogf/gf/v2/encoding/gjson"
"github.com/gogf/gf/v2/os/glog"
)
// sessionService 会话服务(操作 session 表)
type sessionService struct{}
// SessionService 会话服务单例
var SessionService = new(sessionService)
// ============== RabbitMQ 消费者(归档延时消息)==============
// SessionArchiveConsumer 会话归档消费者
type SessionArchiveConsumer struct {
consumer *rabbitmq.Consumer
}
// NewSessionArchiveConsumer 创建会话归档消费者
func NewSessionArchiveConsumer(ctx context.Context) *SessionArchiveConsumer {
queueName := GetConfigString(ctx, "archive.queue")
return &SessionArchiveConsumer{
consumer: rabbitmq.NewConsumer(queueName, handleSessionArchive),
}
}
// Start 启动消费者
func (c *SessionArchiveConsumer) Start(ctx context.Context) (err error) {
glog.Info(ctx, "会话归档消费者启动...")
return c.consumer.Start(ctx)
}
// Stop 停止消费者
func (c *SessionArchiveConsumer) Stop(ctx context.Context) {
c.consumer.Stop(ctx)
}
// handleSessionArchive 处理会话归档消息
func handleSessionArchive(ctx context.Context, body []byte) (err error) {
ctx, span := jaeger.NewSpan(ctx, "consumer.session.archive")
defer span.End()
var msg redis.ArchiveMessage
if err = gjson.DecodeTo(body, &msg); err != nil {
jaeger.RecordError(ctx, err, "解析归档消息失败")
return
}
glog.Infof(ctx, "收到归档消息 - 用户: %s, Session: %s", msg.UserId, msg.SessionId)
// 检查用户是否在归档发送后有活跃60分钟内
isActive, err := redis.IsUserActive(ctx, msg.UserId, int64(redis.GetArchiveDelay()))
if err != nil {
jaeger.RecordError(ctx, err, "检查用户活跃状态失败")
return
}
if isActive {
glog.Infof(ctx, "用户 %s 在归档期间有活跃,跳过归档", msg.UserId)
return
}
// 执行归档
if err = dao.Session.Archive(ctx, msg.UserId, msg.SessionId); err != nil {
jaeger.RecordError(ctx, err, "归档会话失败")
return
}
// 清除 Session 缓存需要tenantId
// TODO: ArchiveMessage需要添加TenantId字段
redis.DelSessionCache(ctx, msg.TenantId, msg.UserId)
glog.Infof(ctx, "会话已归档 - 用户: %s, Session: %s", msg.UserId, msg.SessionId)
return
}