redis mq 消息封装

This commit is contained in:
2026-01-09 10:19:31 +08:00
committed by 张斌
parent 845fe0a324
commit d44ed8bf35
3 changed files with 762 additions and 0 deletions

179
message/message.go Normal file
View File

@@ -0,0 +1,179 @@
package message
import (
"context"
"github.com/gogf/gf/v2/database/gredis"
"github.com/gogf/gf/v2/errors/gerror"
)
// GetRedisClient 获取 Redis 客户端(供外部使用)
func GetRedisClient() *gredis.Redis {
return getRedisClient()
}
func GetRedisClientTest(name string) *gredis.Redis {
return getRedisClientTest(name)
}
// GetLock 获取分布式锁
func GetLock(ctx context.Context, key string, expireSeconds int64, fn func(ctx context.Context) error) (success bool, err error) {
return lock(ctx, key, expireSeconds, fn)
}
// MessageConfig 消息配置接口
type MessageConfig interface {
start(ctx context.Context) error
publish(ctx context.Context, data interface{}, options ...map[string]interface{}) (messageID string, err error)
}
// RedisMessageConfig Redis Stream 消息配置
type RedisMessageConfig struct {
StreamKey string // Stream 键名
GroupName string // 消费者组名称
ConsumerName string // 消费者名称
BatchSize int64 // 最大并发数(信号量容量)
AutoAck bool // ACK确认,true自动确认,false手动确认
HandleFunc func(ctx context.Context, message map[string]interface{}) error
}
func (r *RedisMessageConfig) start(ctx context.Context) error {
return readFromStream(ctx, QueueMessage{
StreamKey: r.StreamKey,
GroupName: r.GroupName,
ConsumerName: r.ConsumerName,
BatchSize: r.BatchSize,
AutoAck: r.AutoAck,
HandleFunc: r.HandleFunc,
})
}
func (r *RedisMessageConfig) publish(ctx context.Context, data interface{}, options ...map[string]interface{}) (messageID string, err error) {
return publishToRedis(ctx, r.StreamKey, data)
}
// RabbitMQMessageConfig RabbitMQ 消息配置
type RabbitMQMessageConfig struct {
Queue string // 队列名称
Exchange string // 交换器名称
RoutingKey string // 路由键
PrefetchCount int // QoS: 预取数量(并发控制)
WorkerCount int // worker 数量
ConsumerTag string // 消费者标签
HandleFunc func(ctx context.Context, message map[string]interface{}) error
}
func (r *RabbitMQMessageConfig) start(ctx context.Context) error {
return startRabbitMQConsumer(ctx, QueueMessage{
Queue: r.Queue,
Exchange: r.Exchange,
RoutingKey: r.RoutingKey,
PrefetchCount: r.PrefetchCount,
WorkerCount: r.WorkerCount,
ConsumerTag: r.ConsumerTag,
AutoAck: true,
HandleFunc: r.HandleFunc,
})
}
func (r *RabbitMQMessageConfig) publish(ctx context.Context, data interface{}, options ...map[string]interface{}) (messageID string, err error) {
opts := make(map[string]interface{})
if len(options) > 0 {
opts = options[0]
}
exchange := r.Exchange
routingKey := r.RoutingKey
delay := 0
if v, ok := opts["exchange"].(string); ok {
exchange = v
}
if v, ok := opts["routingKey"].(string); ok {
routingKey = v
}
if v, ok := opts["delay"].(int); ok {
delay = v
}
if delay > 0 {
return publishDelayedToRabbitMQ(ctx, exchange, routingKey, data, delay)
}
return publishToRabbitMQ(ctx, exchange, routingKey, data)
}
// QueueMessage 统一消息队列配置结构体(内部使用)
type QueueMessage struct {
// Redis Stream 配置
StreamKey string
GroupName string
ConsumerName string
BatchSize int64
AutoAck bool
HandleFunc func(ctx context.Context, message map[string]interface{}) error
// RabbitMQ 配置
Queue string
Exchange string
RoutingKey string
PrefetchCount int
WorkerCount int
ConsumerTag string
}
// StartConsumers 启动消息消费者(统一入口)
// 支持同时启动多个消费者,包括 Redis Stream 和 RabbitMQ
func StartConsumers(ctx context.Context, configs ...MessageConfig) error {
for _, cfg := range configs {
if err := cfg.start(ctx); err != nil {
return gerror.Wrap(err, "启动消费者失败")
}
}
return nil
}
// PublishMessage 发布消息(统一入口)
// 根据配置类型选择发布到 Redis Stream 或 RabbitMQ
func PublishMessage(ctx context.Context, cfg MessageConfig, data interface{}, options ...map[string]interface{}) (messageID string, err error) {
return cfg.publish(ctx, data, options...)
}
// ========== Redis Stream 公共方法(方便迁移) ==========
// AddToStream 将消息添加到 Redis Stream
//func AddToStream(ctx context.Context, streamKey string, msg interface{}) (messageID string, err error) {
// return addToStream(ctx, streamKey, msg)
//}
// ReadFromStream 从 Redis Stream 读取消息(已废弃)
// 请使用 RedisMessageConfig.StartConsumers 启动消费者
// 此方法保留用于向后兼容,但实际不会返回消息(异步消费模式)
func ReadFromStream(ctx context.Context, streamKey, groupName, consumerName string, count, blockMs int64) ([]StreamMessage, error) {
return nil, gerror.New("ReadFromStream 已废弃,请使用 RedisMessageConfig.StartConsumers 启动消费者")
}
// AckMessage 确认 Redis Stream 消息
func AckMessage(ctx context.Context, streamKey, groupName string, messageIDs ...string) error {
return ackMessage(ctx, streamKey, groupName, messageIDs...)
}
// InitStreamGroup 初始化 Redis Stream 消费者组
func InitStreamGroup(ctx context.Context, streamKey, groupName string) error {
return initStreamGroup(ctx, streamKey, groupName)
}
// ========== RabbitMQ 公共方法(方便迁移) ==========
// InitRabbitMQ 初始化 RabbitMQ 连接
func InitRabbitMQ(ctx context.Context) error {
return initRabbitMQ(ctx)
}
// PublishToRabbitMQ 发布消息到 RabbitMQ
//func PublishToRabbitMQ(ctx context.Context, exchange, routingKey string, message interface{}) error {
// return publishToRabbitMQ(ctx, exchange, routingKey, message)
//}
// PublishDelayedToRabbitMQ 发布延时消息到 RabbitMQ
//func PublishDelayedToRabbitMQ(ctx context.Context, exchange, routingKey string, message interface{}, delaySeconds int) error {
// return publishDelayedToRabbitMQ(ctx, exchange, routingKey, message, delaySeconds)
//}

351
message/rabbit.go Normal file
View File

@@ -0,0 +1,351 @@
package message
import (
"context"
"sync"
"time"
"github.com/gogf/gf/v2/encoding/gjson"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/util/gconv"
amqp "github.com/rabbitmq/amqp091-go"
)
var (
rabbitConn *amqp.Connection
rabbitChannel *amqp.Channel
rabbitOnce sync.Once
rabbitMu sync.RWMutex
rabbitCloseWatcher chan struct{}
rabbitWatcherStarted bool
)
// Config RabbitMQ 配置
type RabbitMQConfig struct {
Host string
Port int
Username string
Password string
VHost string
}
// rabbitMQConfig 默认配置
func getRabbitMQConfig() *RabbitMQConfig {
return &RabbitMQConfig{
Host: g.Cfg().MustGet(context.Background(), "rabbitmq.host").String(),
Port: g.Cfg().MustGet(context.Background(), "rabbitmq.port").Int(),
Username: g.Cfg().MustGet(context.Background(), "rabbitmq.username").String(),
Password: g.Cfg().MustGet(context.Background(), "rabbitmq.password").String(),
VHost: g.Cfg().MustGet(context.Background(), "rabbitmq.vhost", "/").String(),
}
}
// initRabbitMQ 初始化 RabbitMQ 连接
func initRabbitMQ(ctx context.Context) error {
var err error
rabbitOnce.Do(func() {
cfg := getRabbitMQConfig()
url := "amqp://" + cfg.Username + ":" + cfg.Password + "@" + cfg.Host + ":" + gconv.String(cfg.Port) + "/" + cfg.VHost
rabbitConn, err = amqp.Dial(url)
if err != nil {
g.Log().Errorf(ctx, "RabbitMQ 连接失败: %v", err)
return
}
rabbitChannel, err = rabbitConn.Channel()
if err != nil {
g.Log().Errorf(ctx, "创建 RabbitMQ Channel 失败: %v", err)
return
}
rabbitCloseWatcher = make(chan struct{})
if !rabbitWatcherStarted {
go handleRabbitMQConnectionClose(ctx)
rabbitWatcherStarted = true
}
g.Log().Info(ctx, "RabbitMQ 连接成功")
})
return err
}
// getRabbitMQChannel 获取 RabbitMQ Channel
func getRabbitMQChannel() (*amqp.Channel, error) {
rabbitMu.RLock()
defer rabbitMu.RUnlock()
if rabbitChannel == nil || rabbitChannel.IsClosed() {
return nil, gerror.New("RabbitMQ Channel 未初始化或已关闭")
}
return rabbitChannel, nil
}
// getRabbitMQConnection 获取 RabbitMQ 连接
func getRabbitMQConnection() (*amqp.Connection, error) {
rabbitMu.RLock()
defer rabbitMu.RUnlock()
if rabbitConn == nil || rabbitConn.IsClosed() {
return nil, gerror.New("RabbitMQ 连接未初始化或已关闭")
}
return rabbitConn, nil
}
// handleRabbitMQConnectionClose 监听连接关闭并重连
func handleRabbitMQConnectionClose(ctx context.Context) {
for {
select {
case <-rabbitCloseWatcher:
g.Log().Info(ctx, "停止监听 RabbitMQ 连接状态")
return
default:
}
rabbitMu.RLock()
currentConn := rabbitConn
rabbitMu.RUnlock()
if currentConn == nil {
return
}
closeErr := make(chan *amqp.Error, 1)
currentConn.NotifyClose(closeErr)
select {
case err := <-closeErr:
if err != nil {
g.Log().Errorf(ctx, "RabbitMQ 连接关闭: %v尝试重连...", err)
reconnectRabbitMQ(ctx)
}
case <-rabbitCloseWatcher:
g.Log().Info(ctx, "停止监听 RabbitMQ 连接状态")
return
}
}
}
// reconnectRabbitMQ 重新连接
func reconnectRabbitMQ(ctx context.Context) {
rabbitMu.Lock()
defer rabbitMu.Unlock()
for i := 0; i < 10; i++ {
time.Sleep(time.Duration(i+1) * time.Second)
cfg := getRabbitMQConfig()
url := "amqp://" + cfg.Username + ":" + cfg.Password + "@" + cfg.Host + ":" + gconv.String(cfg.Port) + "/" + cfg.VHost
var err error
rabbitConn, err = amqp.Dial(url)
if err != nil {
g.Log().Errorf(ctx, "重连失败 (尝试 %d/10): %v", i+1, err)
continue
}
rabbitChannel, err = rabbitConn.Channel()
if err != nil {
g.Log().Errorf(ctx, "创建 Channel 失败 (尝试 %d/10): %v", i+1, err)
continue
}
g.Log().Info(ctx, "RabbitMQ 重连成功")
return
}
g.Log().Fatal(ctx, "RabbitMQ 重连失败,已达到最大重试次数")
}
// startRabbitMQConsumer 启动 RabbitMQ 消费者
func startRabbitMQConsumer(ctx context.Context, msg QueueMessage) error {
// 初始化连接
if err := initRabbitMQ(ctx); err != nil {
return gerror.Wrap(err, "初始化 RabbitMQ 连接失败")
}
// 创建独立 Channel避免并发冲突
conn, err := getRabbitMQConnection()
if err != nil {
return gerror.Wrap(err, "获取RabbitMQ连接失败")
}
ch, err := conn.Channel()
if err != nil {
return gerror.Wrap(err, "创建独立Channel失败")
}
// 声明队列
_, err = ch.QueueDeclare(
msg.Queue, // name
true, // durable
false, // autoDelete
false, // exclusive
false, // noWait
nil, // arguments
)
if err != nil {
return gerror.Newf("声明队列失败: %v", err)
}
// 设置 QoS并发控制
prefetchCount := msg.PrefetchCount
if prefetchCount == 0 {
prefetchCount = 1
}
err = ch.Qos(
prefetchCount, // prefetchCount
0, // prefetchSize
false, // global
)
if err != nil {
return gerror.Newf("设置 QoS 失败: %v", err)
}
// 开始消费
msgs, err := ch.Consume(
msg.Queue, // queue
msg.ConsumerTag, // consumer tag
msg.AutoAck, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return gerror.Newf("开始消费失败: %v", err)
}
workerCount := msg.WorkerCount
if workerCount == 0 {
workerCount = 1
}
g.Log().Infof(ctx, "RabbitMQ 消费者已启动: queue=%s, prefetch=%d, workers=%d",
msg.Queue, prefetchCount, workerCount)
// 启动多个 worker
for i := 0; i < workerCount; i++ {
go rabbitMQWorker(ctx, i, msgs, msg)
}
return nil
}
// rabbitMQWorker RabbitMQ 工作协程
func rabbitMQWorker(ctx context.Context, workerID int, msgs <-chan amqp.Delivery, msg QueueMessage) {
g.Log().Debugf(ctx, "RabbitMQ Worker %d 已启动", workerID)
for {
select {
case <-ctx.Done():
g.Log().Infof(ctx, "RabbitMQ Worker %d 收到停止信号,正在退出", workerID)
return
case delivery, ok := <-msgs:
if !ok {
g.Log().Infof(ctx, "RabbitMQ Worker %d 消息通道已关闭,退出", workerID)
return
}
// 反序列化消息
var message map[string]interface{}
if err := gjson.DecodeTo(delivery.Body, &message); err != nil {
g.Log().Errorf(ctx, "RabbitMQ Worker %d 反序列化消息失败: %v", workerID, err)
if !msg.AutoAck {
delivery.Nack(false, false)
}
continue
}
// 处理消息
err := msg.HandleFunc(ctx, message)
if err != nil {
g.Log().Errorf(ctx, "RabbitMQ Worker %d 处理消息失败: %v", workerID, err)
if !msg.AutoAck {
delivery.Nack(false, false)
}
} else {
if !msg.AutoAck {
delivery.Ack(false)
}
g.Log().Debugf(ctx, "RabbitMQ Worker %d 处理消息成功", workerID)
}
}
}
}
// publishToRabbitMQ 发布消息到 RabbitMQ
func publishToRabbitMQ(ctx context.Context, exchange, routingKey string, message interface{}) (messageID string, err error) {
ch, err := getRabbitMQChannel()
if err != nil {
return
}
body, err := gjson.Encode(message)
if err != nil {
return "", gerror.Newf("消息序列化失败: %v", err)
}
err = ch.PublishWithContext(
ctx,
exchange, // exchange
routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "application/json",
Body: body,
},
)
if err != nil {
g.Log().Errorf(ctx, "发布消息失败: exchange=%s, routingKey=%s, err=%v", exchange, routingKey, err)
return
}
g.Log().Debugf(ctx, "消息发布成功: exchange=%s, routingKey=%s", exchange, routingKey)
return messageID, nil
}
// publishDelayedToRabbitMQ 发布延时消息到 RabbitMQ
func publishDelayedToRabbitMQ(ctx context.Context, exchange, routingKey string, message interface{}, delaySeconds int) (messageID string, err error) {
ch, err := getRabbitMQChannel()
if err != nil {
return
}
body, err := gjson.Encode(message)
if err != nil {
return "", gerror.Newf("消息序列化失败: %v", err)
}
err = ch.PublishWithContext(
ctx,
exchange, // exchange必须是 x-delayed-message 类型)
routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "application/json",
Body: body,
Headers: amqp.Table{
"x-delay": delaySeconds * 1000, // 延时(毫秒)
},
},
)
if err != nil {
g.Log().Errorf(ctx, "发布延时消息失败: exchange=%s, routingKey=%s, delay=%ds, err=%v", exchange, routingKey, delaySeconds, err)
return
}
g.Log().Debugf(ctx, "延时消息发布成功: exchange=%s, routingKey=%s, delay=%ds", exchange, routingKey, delaySeconds)
return messageID, nil
}

232
message/redis.go Normal file
View File

@@ -0,0 +1,232 @@
package message
import (
"context"
"errors"
"strings"
"time"
"github.com/gogf/gf/v2/database/gredis"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/util/gconv"
)
// StreamMessage Redis Stream 消息结构
type StreamMessage struct {
ID string // 消息ID自动生成
Values map[string]interface{} // 消息内容
}
// getClient 获取 Redis 客户端
func getRedisClient() *gredis.Redis {
return g.Redis()
}
// getClient 获取 Redis 客户端
func getRedisClientTest(name string) *gredis.Redis {
return g.Redis(name)
}
// getRedisClientByDB 根据DB获取Redis客户端如果db<=0则返回默认客户端
func getRedisClientByDB(db int) *gredis.Redis {
if db <= 0 {
return g.Redis()
}
// 创建连接到指定DB的Redis客户端
client, err := gredis.New(&gredis.Config{
Address: g.Cfg().MustGet(context.Background(), "redis.default.address").String(),
Db: db,
})
if err != nil {
glog.Errorf(context.Background(), "创建Redis客户端失败: %v", err)
return g.Redis()
}
return client
}
// lock 分布式锁
func lock(ctx context.Context, key string, expireSeconds int64, fn func(ctx context.Context) error) (success bool, err error) {
limit := 3
LOOP:
if limit < 0 {
return false, errors.New("锁重试次数耗尽")
}
limit--
if val, err := getRedisClient().Set(ctx, key, true, gredis.SetOption{
TTLOption: gredis.TTLOption{
EX: &expireSeconds,
},
NX: true,
}); err != nil {
return false, err
} else {
if val.Bool() {
defer func(RedisClient *gredis.Redis, ctx context.Context, key string) {
if _, err = RedisClient.Del(ctx, key); err != nil {
glog.Errorf(ctx, "RedisClient.Del error: %v", err)
}
}(getRedisClient(), ctx, key)
if err = fn(ctx); err != nil {
return false, err
}
return true, nil
} else {
time.Sleep(time.Second)
goto LOOP
}
}
}
// publishToRedis 将消息添加到 Redis Stream
func publishToRedis(ctx context.Context, streamKey string, msg interface{}) (messageID string, err error) {
values := gconv.Map(msg)
args := make([]interface{}, 0, len(values)*2+2)
args = append(args, streamKey, "*")
for key, val := range values {
args = append(args, key, val)
}
result, err := getRedisClient().Do(ctx, "XADD", args...)
if err != nil {
return
}
messageID = result.String()
return
}
// initStreamGroup 初始化 Stream 和消费者组
func initStreamGroup(ctx context.Context, streamKey, groupName string) error {
_, err := getRedisClient().Do(ctx, "XGROUP", "CREATE", streamKey, groupName, "0", "MKSTREAM")
if err != nil {
// 如果组已存在,忽略错误
errStr := err.Error()
// 检查错误是否是 "BUSYGROUP Consumer Group name already exists"
if strings.Contains(errStr, "BUSYGROUP") || strings.Contains(errStr, "already exists") {
// 这是一个预期的情况,说明消费者组已经存在,无需处理
return nil
}
// 这是一个真正的错误,需要记录或处理
return err
}
return nil
}
// readFromStream 从 Stream 读取消息
func readFromStream(ctx context.Context, msg QueueMessage) error {
// 初始化 Stream 和消费者组
if err := initStreamGroup(ctx, msg.StreamKey, msg.GroupName); err != nil {
return err
}
go func() {
RECONNECT:
for {
result, err := getRedisClient().Do(ctx, "XREADGROUP", "GROUP", msg.GroupName, msg.ConsumerName, "COUNT", msg.BatchSize, "BLOCK", 0, "STREAMS", msg.StreamKey, ">")
if err != nil {
select {
case <-ctx.Done():
return
}
time.Sleep(time.Second)
goto RECONNECT
}
// 检查返回结果是否为空
if result == nil || result.IsEmpty() {
continue
}
messages := make([]StreamMessage, 0, int(msg.BatchSize))
// 尝试 map 格式GoFrame gredis 返回)
if streamsMap, ok := result.Val().(map[interface{}]interface{}); ok {
for _, streamMsgs := range streamsMap {
msgsArray, ok := streamMsgs.([]interface{})
if !ok {
continue
}
for _, msgData := range msgsArray {
msgArray, ok := msgData.([]interface{})
if !ok || len(msgArray) < 2 {
continue
}
msgID := gconv.String(msgArray[0])
fieldsArray, ok := msgArray[1].([]interface{})
if !ok {
continue
}
values := make(map[string]interface{}, len(fieldsArray)/2)
for i := 0; i < len(fieldsArray); i += 2 {
if i+1 < len(fieldsArray) {
key := gconv.String(fieldsArray[i])
values[key] = fieldsArray[i+1]
}
}
messages = append(messages, StreamMessage{
ID: msgID,
Values: values,
})
}
}
}
// 尝试数组格式(标准 Redis 返回)
if streamsArray, ok := result.Val().([]interface{}); ok && len(streamsArray) > 0 {
for _, streamData := range streamsArray {
streamArray, ok := streamData.([]interface{})
if !ok || len(streamArray) < 2 {
continue
}
messagesArray, ok := streamArray[1].([]interface{})
if !ok {
continue
}
for _, msgData := range messagesArray {
msgArray, ok := msgData.([]interface{})
if !ok || len(msgArray) < 2 {
continue
}
msgID := gconv.String(msgArray[0])
fieldsArray, ok := msgArray[1].([]interface{})
if !ok {
continue
}
values := make(map[string]interface{}, len(fieldsArray)/2)
for i := 0; i < len(fieldsArray); i += 2 {
if i+1 < len(fieldsArray) {
key := gconv.String(fieldsArray[i])
values[key] = fieldsArray[i+1]
}
}
messages = append(messages, StreamMessage{
ID: msgID,
Values: values,
})
}
}
}
// 处理消息
for _, streamMsg := range messages {
// 业务处理
if err := msg.HandleFunc(ctx, streamMsg.Values); err != nil {
glog.Infof(ctx, "业务处理失败-> err:%v\n", err)
continue
}
// 确认消息
if msg.AutoAck {
err := ackMessage(ctx, msg.StreamKey, msg.GroupName, streamMsg.ID)
if err != nil {
glog.Infof(ctx, "消费者 '%s' 确认消息 ID %s 失败: %v\n", msg.ConsumerName, streamMsg.ID, err)
}
}
}
}
}()
return nil
}
// ackMessage 确认消息已处理
func ackMessage(ctx context.Context, streamKey, groupName string, messageIDs ...string) error {
args := make([]interface{}, 0, len(messageIDs)+2)
args = append(args, streamKey, groupName)
for _, id := range messageIDs {
args = append(args, id)
}
_, err := getRedisClient().Do(ctx, "XACK", args...)
return err
}