Files
common/message/connection_rabbitmq.go
qhd 55a6ec0374 重构消息队列连接管理,支持多数据源配置
主要变更:
1. 重构NATS、RabbitMQ和Redis连接管理模块,支持多数据源配置
2. 统一连接管理接口,增加数据源名称参数
3. 优化连接状态检查和错误处理
4. 增加连接池管理和资源清理机制
5. 改进日志输出格式和内容
2026-03-12 08:51:45 +08:00

165 lines
4.7 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package message
import (
"context"
"fmt"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/util/gconv"
amqp "github.com/rabbitmq/amqp091-go"
"sync"
)
var (
muRabbitMQ sync.RWMutex
rabbitmqConns map[string]*amqp.Connection
rabbitmqChannels map[string]*amqp.Channel
)
func init() {
rabbitmqConns = make(map[string]*amqp.Connection)
rabbitmqChannels = make(map[string]*amqp.Channel)
}
// rabbitmqConnect 建立 RabbitMQ 连接
func rabbitmqConnect(ctx context.Context, name string) error {
if g.Cfg().MustGet(ctx, "rabbitmq").IsEmpty() {
g.Log().Errorf(ctx, "❌ RabbitMQ 配置不存在")
return fmt.Errorf("RabbitMQ Configuration does not exist")
}
// 确定数据源名称
dsName := "default"
if !g.IsEmpty(name) {
dsName = name
}
g.Log().Infof(ctx, "🔔 RabbitMQ [%s] 开始创建连接", dsName)
muRabbitMQ.Lock()
defer muRabbitMQ.Unlock()
// 安全地关闭旧连接(仅针对该数据源)
if oldConn, exists := rabbitmqConns[dsName]; exists && oldConn != nil && !oldConn.IsClosed() {
oldConn.Close()
}
if oldChannel, exists := rabbitmqChannels[dsName]; exists && oldChannel != nil && !oldChannel.IsClosed() {
oldChannel.Close()
}
delete(rabbitmqConns, dsName)
delete(rabbitmqChannels, dsName)
// 从配置文件读取 RabbitMQ 配置
host := g.Cfg().MustGet(ctx, fmt.Sprintf("rabbitmq.%s.host", dsName)).String()
port := g.Cfg().MustGet(ctx, fmt.Sprintf("rabbitmq.%s.port", dsName)).Int()
username := g.Cfg().MustGet(ctx, fmt.Sprintf("rabbitmq.%s.username", dsName)).String()
password := g.Cfg().MustGet(ctx, fmt.Sprintf("rabbitmq.%s.password", dsName)).String()
vHost := g.Cfg().MustGet(ctx, fmt.Sprintf("rabbitmq.%s.vhost", dsName), "/").String()
if g.IsEmpty(host) {
return fmt.Errorf("❌ RabbitMQ 配置错误: host 不能为空 (数据源: %s)", dsName)
}
if g.IsEmpty(port) {
return fmt.Errorf("❌ RabbitMQ 配置错误: port 不能为空 (数据源: %s)", dsName)
}
if g.IsEmpty(username) {
return fmt.Errorf("❌ RabbitMQ 配置错误: username 不能为空 (数据源: %s)", dsName)
}
if g.IsEmpty(password) {
return fmt.Errorf("❌ RabbitMQ 配置错误: password 不能为空 (数据源: %s)", dsName)
}
// 构建连接 URL
url := "amqp://" + username + ":" + password + "@" + host + ":" + gconv.String(port) + "/" + vHost
// 创建连接
newConn, err := amqp.Dial(url)
if err != nil {
g.Log().Errorf(ctx, "❌ RabbitMQ [%s] 连接失败: %v", dsName, err)
return err
}
// 创建 Channel
newChannel, err := newConn.Channel()
if err != nil {
g.Log().Errorf(ctx, "❌ RabbitMQ [%s] 创建 Channel 失败: %v", dsName, err)
newConn.Close()
return err
}
// 保存连接和 Channel
rabbitmqConns[dsName] = newConn
rabbitmqChannels[dsName] = newChannel
g.Log().Infof(ctx, "✅ RabbitMQ [%s] 连接成功", dsName)
return nil
}
// rabbitmqPing 检测 RabbitMQ 连接状态
func rabbitmqPing(ctx context.Context, name string) bool {
// 确定数据源名称
dsName := "default"
if !g.IsEmpty(name) {
dsName = name
}
muRabbitMQ.RLock()
defer muRabbitMQ.RUnlock()
conn, exists := rabbitmqConns[dsName]
channel, channelExists := rabbitmqChannels[dsName]
if !exists || conn == nil || conn.IsClosed() || !channelExists || channel == nil || channel.IsClosed() {
g.Log().Errorf(ctx, "❌ RabbitMQ [%s] 连接已关闭或不可用", dsName)
return false
}
g.Log().Infof(ctx, "📊 RabbitMQ [%s] 连接正常", dsName)
return true
}
// rabbitmqClose 关闭 RabbitMQ 连接
func rabbitmqClose(ctx context.Context, name string) error {
// 确定数据源名称
dsName := "default"
if !g.IsEmpty(name) {
dsName = name
}
muRabbitMQ.Lock()
defer muRabbitMQ.Unlock()
var lastErr error
if channel, exists := rabbitmqChannels[dsName]; exists && channel != nil && !channel.IsClosed() {
if err := channel.Close(); err != nil {
g.Log().Errorf(ctx, "❌ RabbitMQ [%s] 关闭 Channel 失败: %v", dsName, err)
lastErr = err
}
}
delete(rabbitmqChannels, dsName)
if conn, exists := rabbitmqConns[dsName]; exists && conn != nil && !conn.IsClosed() {
if err := conn.Close(); err != nil {
g.Log().Errorf(ctx, "❌ RabbitMQ [%s] 关闭连接失败: %v", dsName, err)
lastErr = err
}
}
delete(rabbitmqConns, dsName)
g.Log().Infof(ctx, "✅ RabbitMQ [%s] 连接已关闭", dsName)
return lastErr
}
// getRabbitMQConn 获取 RabbitMQ 连接(内部使用)
func getRabbitMQConn(name string) *amqp.Connection {
dsName := "default"
if !g.IsEmpty(name) {
dsName = name
}
return rabbitmqConns[dsName]
}
// getRabbitMQChannel 获取 RabbitMQ Channel内部使用
func getRabbitMQChannel(name string) *amqp.Channel {
dsName := "default"
if !g.IsEmpty(name) {
dsName = name
}
return rabbitmqChannels[dsName]
}