Files
common/message/rabbit.go
2026-03-12 08:51:25 +08:00

352 lines
8.6 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package message
import (
"context"
"sync"
"time"
"github.com/gogf/gf/v2/encoding/gjson"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/util/gconv"
amqp "github.com/rabbitmq/amqp091-go"
)
var (
rabbitConn *amqp.Connection
rabbitChannel *amqp.Channel
rabbitOnce sync.Once
rabbitMu sync.RWMutex
rabbitCloseWatcher chan struct{}
rabbitWatcherStarted bool
)
// Config RabbitMQ 配置
type RabbitMQConfig struct {
Host string
Port int
Username string
Password string
VHost string
}
// rabbitMQConfig 默认配置
func getRabbitMQConfig() *RabbitMQConfig {
return &RabbitMQConfig{
Host: g.Cfg().MustGet(context.Background(), "rabbitmq.host").String(),
Port: g.Cfg().MustGet(context.Background(), "rabbitmq.port").Int(),
Username: g.Cfg().MustGet(context.Background(), "rabbitmq.username").String(),
Password: g.Cfg().MustGet(context.Background(), "rabbitmq.password").String(),
VHost: g.Cfg().MustGet(context.Background(), "rabbitmq.vhost", "/").String(),
}
}
// initRabbitMQ 初始化 RabbitMQ 连接
func initRabbitMQ(ctx context.Context) error {
var err error
rabbitOnce.Do(func() {
cfg := getRabbitMQConfig()
url := "amqp://" + cfg.Username + ":" + cfg.Password + "@" + cfg.Host + ":" + gconv.String(cfg.Port) + "/" + cfg.VHost
rabbitConn, err = amqp.Dial(url)
if err != nil {
g.Log().Errorf(ctx, "RabbitMQ 连接失败: %v", err)
return
}
rabbitChannel, err = rabbitConn.Channel()
if err != nil {
g.Log().Errorf(ctx, "创建 RabbitMQ Channel 失败: %v", err)
return
}
rabbitCloseWatcher = make(chan struct{})
if !rabbitWatcherStarted {
go handleRabbitMQConnectionClose(ctx)
rabbitWatcherStarted = true
}
g.Log().Info(ctx, "RabbitMQ 连接成功")
})
return err
}
// getRabbitMQChannel 获取 RabbitMQ Channel
func getRabbitMQChannel() (*amqp.Channel, error) {
rabbitMu.RLock()
defer rabbitMu.RUnlock()
if rabbitChannel == nil || rabbitChannel.IsClosed() {
return nil, gerror.New("RabbitMQ Channel 未初始化或已关闭")
}
return rabbitChannel, nil
}
// getRabbitMQConnection 获取 RabbitMQ 连接
func getRabbitMQConnection() (*amqp.Connection, error) {
rabbitMu.RLock()
defer rabbitMu.RUnlock()
if rabbitConn == nil || rabbitConn.IsClosed() {
return nil, gerror.New("RabbitMQ 连接未初始化或已关闭")
}
return rabbitConn, nil
}
// handleRabbitMQConnectionClose 监听连接关闭并重连
func handleRabbitMQConnectionClose(ctx context.Context) {
for {
select {
case <-rabbitCloseWatcher:
g.Log().Info(ctx, "停止监听 RabbitMQ 连接状态")
return
default:
}
rabbitMu.RLock()
currentConn := rabbitConn
rabbitMu.RUnlock()
if currentConn == nil {
return
}
closeErr := make(chan *amqp.Error, 1)
currentConn.NotifyClose(closeErr)
select {
case err := <-closeErr:
if err != nil {
g.Log().Errorf(ctx, "RabbitMQ 连接关闭: %v尝试重连...", err)
reconnectRabbitMQ(ctx)
}
case <-rabbitCloseWatcher:
g.Log().Info(ctx, "停止监听 RabbitMQ 连接状态")
return
}
}
}
// reconnectRabbitMQ 重新连接
func reconnectRabbitMQ(ctx context.Context) {
rabbitMu.Lock()
defer rabbitMu.Unlock()
for i := 0; i < 10; i++ {
time.Sleep(time.Duration(i+1) * time.Second)
cfg := getRabbitMQConfig()
url := "amqp://" + cfg.Username + ":" + cfg.Password + "@" + cfg.Host + ":" + gconv.String(cfg.Port) + "/" + cfg.VHost
var err error
rabbitConn, err = amqp.Dial(url)
if err != nil {
g.Log().Errorf(ctx, "重连失败 (尝试 %d/10): %v", i+1, err)
continue
}
rabbitChannel, err = rabbitConn.Channel()
if err != nil {
g.Log().Errorf(ctx, "创建 Channel 失败 (尝试 %d/10): %v", i+1, err)
continue
}
g.Log().Info(ctx, "RabbitMQ 重连成功")
return
}
g.Log().Fatal(ctx, "RabbitMQ 重连失败,已达到最大重试次数")
}
// startRabbitMQConsumer 启动 RabbitMQ 消费者
func startRabbitMQConsumer(ctx context.Context, msg QueueMessage) error {
// 初始化连接
if err := initRabbitMQ(ctx); err != nil {
return gerror.Wrap(err, "初始化 RabbitMQ 连接失败")
}
// 创建独立 Channel避免并发冲突
conn, err := getRabbitMQConnection()
if err != nil {
return gerror.Wrap(err, "获取RabbitMQ连接失败")
}
ch, err := conn.Channel()
if err != nil {
return gerror.Wrap(err, "创建独立Channel失败")
}
// 声明队列
_, err = ch.QueueDeclare(
msg.Queue, // name
true, // durable
false, // autoDelete
false, // exclusive
false, // noWait
nil, // arguments
)
if err != nil {
return gerror.Newf("声明队列失败: %v", err)
}
// 设置 QoS并发控制
prefetchCount := msg.PrefetchCount
if prefetchCount == 0 {
prefetchCount = 1
}
err = ch.Qos(
prefetchCount, // prefetchCount
0, // prefetchSize
false, // global
)
if err != nil {
return gerror.Newf("设置 QoS 失败: %v", err)
}
// 开始消费
msgs, err := ch.Consume(
msg.Queue, // queue
msg.ConsumerTag, // consumer tag
msg.AutoAck, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return gerror.Newf("开始消费失败: %v", err)
}
workerCount := msg.WorkerCount
if workerCount == 0 {
workerCount = 1
}
g.Log().Infof(ctx, "RabbitMQ 消费者已启动: queue=%s, prefetch=%d, workers=%d",
msg.Queue, prefetchCount, workerCount)
// 启动多个 worker
for i := 0; i < workerCount; i++ {
go rabbitMQWorker(ctx, i, msgs, msg)
}
return nil
}
// rabbitMQWorker RabbitMQ 工作协程
func rabbitMQWorker(ctx context.Context, workerID int, msgs <-chan amqp.Delivery, msg QueueMessage) {
g.Log().Debugf(ctx, "RabbitMQ Worker %d 已启动", workerID)
for {
select {
case <-ctx.Done():
g.Log().Infof(ctx, "RabbitMQ Worker %d 收到停止信号,正在退出", workerID)
return
case delivery, ok := <-msgs:
if !ok {
g.Log().Infof(ctx, "RabbitMQ Worker %d 消息通道已关闭,退出", workerID)
return
}
// 反序列化消息
var message map[string]interface{}
if err := gjson.DecodeTo(delivery.Body, &message); err != nil {
g.Log().Errorf(ctx, "RabbitMQ Worker %d 反序列化消息失败: %v", workerID, err)
if !msg.AutoAck {
delivery.Nack(false, false)
}
continue
}
// 处理消息
err := msg.HandleFunc(ctx, message)
if err != nil {
g.Log().Errorf(ctx, "RabbitMQ Worker %d 处理消息失败: %v", workerID, err)
if !msg.AutoAck {
delivery.Nack(false, false)
}
} else {
if !msg.AutoAck {
delivery.Ack(false)
}
g.Log().Debugf(ctx, "RabbitMQ Worker %d 处理消息成功", workerID)
}
}
}
}
// publishToRabbitMQ 发布消息到 RabbitMQ
func publishToRabbitMQ(ctx context.Context, exchange, routingKey string, message interface{}) (messageID string, err error) {
ch, err := getRabbitMQChannel()
if err != nil {
return
}
body, err := gjson.Encode(message)
if err != nil {
return "", gerror.Newf("消息序列化失败: %v", err)
}
err = ch.PublishWithContext(
ctx,
exchange, // exchange
routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "application/json",
Body: body,
},
)
if err != nil {
g.Log().Errorf(ctx, "发布消息失败: exchange=%s, routingKey=%s, err=%v", exchange, routingKey, err)
return
}
g.Log().Debugf(ctx, "消息发布成功: exchange=%s, routingKey=%s", exchange, routingKey)
return messageID, nil
}
// publishDelayedToRabbitMQ 发布延时消息到 RabbitMQ
func publishDelayedToRabbitMQ(ctx context.Context, exchange, routingKey string, message interface{}, delaySeconds int) (messageID string, err error) {
ch, err := getRabbitMQChannel()
if err != nil {
return
}
body, err := gjson.Encode(message)
if err != nil {
return "", gerror.Newf("消息序列化失败: %v", err)
}
err = ch.PublishWithContext(
ctx,
exchange, // exchange必须是 x-delayed-message 类型)
routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "application/json",
Body: body,
Headers: amqp.Table{
"x-delay": delaySeconds * 1000, // 延时(毫秒)
},
},
)
if err != nil {
g.Log().Errorf(ctx, "发布延时消息失败: exchange=%s, routingKey=%s, delay=%ds, err=%v", exchange, routingKey, delaySeconds, err)
return
}
g.Log().Debugf(ctx, "延时消息发布成功: exchange=%s, routingKey=%s, delay=%ds", exchange, routingKey, delaySeconds)
return messageID, nil
}