Files
common/message/rabbitmq_msg.go
qhd 55a6ec0374 重构消息队列连接管理,支持多数据源配置
主要变更:
1. 重构NATS、RabbitMQ和Redis连接管理模块,支持多数据源配置
2. 统一连接管理接口,增加数据源名称参数
3. 优化连接状态检查和错误处理
4. 增加连接池管理和资源清理机制
5. 改进日志输出格式和内容
2026-03-12 08:51:45 +08:00

312 lines
8.2 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package message
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/gogf/gf/v2/frame/g"
amqp "github.com/rabbitmq/amqp091-go"
)
type RabbitMQPublishMsgConfig struct {
QueueName string
Durable bool
Data any
}
type RabbitMQPublishDelayMsgConfig struct {
QueueName string
Durable bool
DelayTime int
Data any
}
type RabbitMQSubscribeMsgConfig struct {
QueueName string
ConsumerName string
AutoAck bool
PrefetchCount int
HandleFunc func(ctx context.Context, message map[string]interface{}) error
}
func (*RabbitMQPublishMsgConfig) GetPublishMsgType() {
}
func (*RabbitMQPublishDelayMsgConfig) GetPublishDelayMsgType() {}
func (*RabbitMQSubscribeMsgConfig) GetSubscribeMsgType() {
}
type rabbitMQ struct {
name string // 数据源名称
}
func init() {
// 注册 RabbitMQ 插件(默认数据源)
RegisterPlugin(context.Background(), "default", MessageRabbitMQ, func() messageUtil {
return &rabbitMQ{name: "default"}
})
}
// Connect 连接 RabbitMQ
func (c *rabbitMQ) Connect(ctx context.Context) error {
return rabbitmqConnect(ctx, c.name)
}
// Ping 检测 RabbitMQ 连接状态
func (c *rabbitMQ) Ping(ctx context.Context) bool {
return rabbitmqPing(ctx, c.name)
}
// Close 关闭 RabbitMQ 连接
func (c *rabbitMQ) Close(ctx context.Context) error {
return rabbitmqClose(ctx, c.name)
}
// Publish 发布消息
func (c *rabbitMQ) Publish(ctx context.Context, msgConfig messagePublishConfig) error {
cfg, ok := msgConfig.(*RabbitMQPublishMsgConfig)
if !ok {
return fmt.Errorf("无效的 RabbitMQ 配置类型")
}
if g.IsEmpty(cfg.QueueName) {
return fmt.Errorf("队列名称不能为空")
}
if cfg.Data == nil {
return fmt.Errorf("数据不能为空")
}
return c.publishMessageInternal(ctx, cfg.QueueName, cfg.Durable, 0, cfg.Data)
}
// PublishDelay 发布延迟消息
func (c *rabbitMQ) PublishDelay(ctx context.Context, msgConfig messagePublishDelayConfig) error {
cfg, ok := msgConfig.(*RabbitMQPublishDelayMsgConfig)
if !ok {
return fmt.Errorf("无效的 RabbitMQ 配置类型")
}
if g.IsEmpty(cfg.QueueName) {
return fmt.Errorf("队列名称不能为空")
}
if cfg.Data == nil {
return fmt.Errorf("数据不能为空")
}
return c.publishMessageInternal(ctx, cfg.QueueName, cfg.Durable, cfg.DelayTime, cfg.Data)
}
// publishMessage 发布消息内部实现
func (c *rabbitMQ) publishMessageInternal(ctx context.Context, queueName string, durable bool, delayTime int, data interface{}) error {
if !c.Ping(ctx) {
if err := commonConnect(ctx, MessageRabbitMQ, c.name, func(ctx context.Context) error {
return c.Connect(ctx)
}, func(ctx context.Context) error {
return c.Close(ctx)
}); err != nil {
g.Log().Errorf(ctx, "❌ [%s][%s] 连接失败: %v", MessageRabbitMQ, c.name, err)
return err
}
}
channel := getRabbitMQChannel(c.name)
if channel == nil || channel.IsClosed() {
g.Log().Errorf(ctx, "❌ RabbitMQ [%s] Channel 不存在或已关闭", c.name)
return fmt.Errorf("RabbitMQ Channel 不存在或已关闭")
}
delayMsg := delayTime > 0
// 1. 决定 Exchange 类型
exchangeType := "fanout"
exchangeName := queueName
routingKey := queueName
args := amqp.Table{}
if delayMsg {
exchangeType = "x-delayed-message"
exchangeName = queueName + ".delayed"
args["x-delayed-type"] = "fanout"
}
// 2. 声明 Exchange使用 exchangeName 而不是 queueName
if err := channel.ExchangeDeclare(
exchangeName, // 修复:使用正确的交换机名称
exchangeType,
durable,
false, // autoDelete
false, // internal
false, // noWait
args,
); err != nil {
g.Log().Errorf(ctx, "❌ 声明 Exchange 失败: %v", err)
return err
}
// 3. 声明队列
if _, err := channel.QueueDeclare(
queueName,
durable,
false, // autoDelete
false, // exclusive
false, // noWait
nil, // args
); err != nil {
g.Log().Errorf(ctx, "❌ 声明队列失败: %v", err)
return err
}
// 4. 绑定队列
if err := channel.QueueBind(
queueName,
routingKey, // routingKey 路由键
exchangeName, // exchange 交换机名称
false, // noWait
nil, // args
); err != nil {
g.Log().Errorf(ctx, "❌ 绑定队列失败: %v", err)
return err
}
// 5. 序列化数据
body, err := json.Marshal(data)
if err != nil {
g.Log().Errorf(ctx, "❌ 序列化数据失败: %v", err)
return err
}
// 6. 发布消息
deliveryMode := amqp.Transient
if durable {
deliveryMode = amqp.Persistent
}
publishing := amqp.Publishing{
ContentType: "application/json",
Body: body,
DeliveryMode: deliveryMode,
Timestamp: time.Now(),
}
if delayMsg {
duration := delayTime * 1000 // 延迟时间(毫秒)= 秒 * 1000
publishing.Headers = amqp.Table{
"x-delay": duration,
}
}
err = channel.PublishWithContext(
ctx,
exchangeName,
routingKey,
false, false,
publishing,
)
if err != nil {
g.Log().Errorf(ctx, "❌ 发布消息失败: %v", err)
return err
}
g.Log().Infof(ctx, "📨 发布消息成功: queueName=%s, data=%v", queueName, data)
return err
}
// Subscribe 订阅消息
func (c *rabbitMQ) Subscribe(ctx context.Context, msgConfig messageSubscribeConfig) error {
cfg, ok := msgConfig.(*RabbitMQSubscribeMsgConfig)
if !ok {
return fmt.Errorf("无效的 RabbitMQ 配置类型")
}
if g.IsEmpty(cfg.QueueName) {
return fmt.Errorf("队列名称不能为空")
}
if g.IsEmpty(cfg.ConsumerName) {
return fmt.Errorf("消费者名称不能为空")
}
if g.IsEmpty(cfg.PrefetchCount) {
cfg.PrefetchCount = 1
}
if g.IsEmpty(cfg.HandleFunc) {
return fmt.Errorf("必须提供处理函数")
}
return c.createSubscribeInternal(ctx, cfg.QueueName, cfg.ConsumerName, cfg.PrefetchCount, cfg.AutoAck, cfg.HandleFunc)
}
// createSubscribe 内部订阅消息
func (c *rabbitMQ) createSubscribeInternal(ctx context.Context, queueName, consumerName string, prefetchCount int, autoAck bool, handler func(ctx context.Context, message map[string]interface{}) error) error {
g.Log().Infof(ctx, "🔔 RabbitMQ [%s] 开始订阅: queueName=%s, consumerName=%s", c.name, queueName, consumerName)
if !c.Ping(ctx) {
if err := commonConnect(ctx, MessageRabbitMQ, c.name, func(ctx context.Context) error {
return c.Connect(ctx)
}, func(ctx context.Context) error {
return c.Close(ctx)
}); err != nil {
g.Log().Errorf(ctx, "❌ [%s][%s] 连接失败: %v", MessageRabbitMQ, c.name, err)
return err
}
}
channel := getRabbitMQChannel(c.name)
if channel == nil || channel.IsClosed() {
g.Log().Errorf(ctx, "❌ RabbitMQ [%s] Channel 不存在或已关闭", c.name)
return fmt.Errorf("RabbitMQ Channel 不存在或已关闭")
}
if err := channel.Qos(prefetchCount, 0, false); err != nil {
g.Log().Errorf(ctx, "❌ 设置 Qos 失败: %v", err)
return err
}
g.Log().Infof(ctx, "📊 设置 Prefetch Count: %d", prefetchCount)
msg, err := channel.Consume(
queueName, // queue
consumerName, // consumer
autoAck, // auto-ack (根据配置决定)
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
g.Log().Errorf(ctx, "❌ 消费消息失败: %v", err)
return err
}
g.Log().Infof(ctx, "👀 开始监听消息")
for {
select {
case <-ctx.Done():
// Context 取消,退出
g.Log().Infof(ctx, "context cancel 监听消息退出")
return nil
case m, ok := <-msg:
if !ok {
// Channel 关闭,退出
g.Log().Infof(ctx, "channel close 监听消息退出")
return nil
}
g.Log().Infof(ctx, "📨 收到消息: %s", string(m.Body))
var data map[string]interface{}
if err := json.Unmarshal(m.Body, &data); err != nil {
// 如果不是 JSON直接使用原始内容
data = map[string]interface{}{
"data": string(m.Body),
}
}
err := handler(ctx, data)
if err != nil {
g.Log().Errorf(ctx, "❌ 消息处理失败: %v", err)
// 仅在手动 ACK 模式下拒绝消息
if !autoAck {
// 拒绝消息不再重新入队(避免死循环)
m.Nack(false, false)
continue
}
}
g.Log().Infof(ctx, "✅ 消息处理成功: %v", err)
// 仅在手动 ACK 模式下确认消息
if err := m.Ack(false); err != nil {
g.Log().Errorf(ctx, "❌ AUTO ACK 消息失败: %v", err)
} else {
g.Log().Infof(ctx, "✅ AUTO ACK 消息成功")
}
}
}
}