Files
common/rabbitmq/setup.go

232 lines
5.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package rabbitmq
import (
"context"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
amqp "github.com/rabbitmq/amqp091-go"
)
// QueueConfig 队列配置
type QueueConfig struct {
Name string
Durable bool // 持久化
AutoDelete bool // 自动删除
Exclusive bool // 排他
Args amqp.Table // 额外参数
}
// ExchangeConfig Exchange 配置
type ExchangeConfig struct {
Name string
Type string // direct/topic/fanout/x-delayed-message
Durable bool
AutoDelete bool
Args amqp.Table
}
// BindingConfig 绑定配置
type BindingConfig struct {
Queue string
Exchange string
RoutingKey string
Args amqp.Table
}
// DeclareQueue 声明队列
func DeclareQueue(ctx context.Context, cfg *QueueConfig) (err error) {
ch, err := GetChannel()
if err != nil {
return err
}
_, err = ch.QueueDeclare(
cfg.Name,
cfg.Durable,
cfg.AutoDelete,
cfg.Exclusive,
false, // no-wait
cfg.Args,
)
if err != nil {
g.Log().Errorf(ctx, "声明队列失败: %s, err=%v", cfg.Name, err)
return err
}
g.Log().Infof(ctx, "队列声明成功: %s", cfg.Name)
return
}
// DeclareExchange 声明 Exchange
func DeclareExchange(ctx context.Context, cfg *ExchangeConfig) (err error) {
ch, err := GetChannel()
if err != nil {
return err
}
err = ch.ExchangeDeclare(
cfg.Name,
cfg.Type,
cfg.Durable,
cfg.AutoDelete,
false, // internal
false, // no-wait
cfg.Args,
)
if err != nil {
g.Log().Errorf(ctx, "声明 Exchange 失败: %s, err=%v", cfg.Name, err)
return err
}
g.Log().Infof(ctx, "Exchange 声明成功: %s (type=%s)", cfg.Name, cfg.Type)
return
}
// BindQueue 绑定队列到 Exchange
func BindQueue(ctx context.Context, cfg *BindingConfig) (err error) {
ch, err := GetChannel()
if err != nil {
return err
}
err = ch.QueueBind(
cfg.Queue,
cfg.RoutingKey,
cfg.Exchange,
false, // no-wait
cfg.Args,
)
if err != nil {
g.Log().Errorf(ctx, "绑定队列失败: queue=%s, exchange=%s, routingKey=%s, err=%v",
cfg.Queue, cfg.Exchange, cfg.RoutingKey, err)
return err
}
g.Log().Infof(ctx, "队列绑定成功: queue=%s → exchange=%s (routingKey=%s)",
cfg.Queue, cfg.Exchange, cfg.RoutingKey)
return
}
// SetupDelayExchange 设置延时 Exchange需要 rabbitmq_delayed_message_exchange 插件)
func SetupDelayExchange(ctx context.Context, exchangeName string) error {
return DeclareExchange(ctx, &ExchangeConfig{
Name: exchangeName,
Type: "x-delayed-message",
Durable: true,
Args: amqp.Table{
"x-delayed-type": "direct",
},
})
}
// SetupDeadLetterQueue 设置死信队列
func SetupDeadLetterQueue(ctx context.Context, queueName, exchangeName string) error {
// 1. 声明死信 Exchange
err := DeclareExchange(ctx, &ExchangeConfig{
Name: exchangeName,
Type: "direct",
Durable: true,
})
if err != nil {
return err
}
// 2. 声明死信队列
err = DeclareQueue(ctx, &QueueConfig{
Name: queueName,
Durable: true,
})
if err != nil {
return err
}
// 3. 绑定
return BindQueue(ctx, &BindingConfig{
Queue: queueName,
Exchange: exchangeName,
RoutingKey: queueName,
})
}
// SetupQueueWithDLX 创建带死信队列的普通队列
func SetupQueueWithDLX(ctx context.Context, queueName, dlxExchange, dlxRoutingKey string) error {
return DeclareQueue(ctx, &QueueConfig{
Name: queueName,
Durable: true,
Args: amqp.Table{
"x-dead-letter-exchange": dlxExchange,
"x-dead-letter-routing-key": dlxRoutingKey,
},
})
}
// SetupBasicTopology 设置基础拓扑(适用于小红书客服场景)
func SetupBasicTopology(ctx context.Context) (err error) {
// 1. 声明普通 Exchange
err = DeclareExchange(ctx, &ExchangeConfig{
Name: "ragflow_exchange",
Type: "direct",
Durable: true,
})
if err != nil {
return err
}
// 2. 声明延时 Exchange
err = SetupDelayExchange(ctx, "delay_exchange")
if err != nil {
return gerror.Newf("延时 Exchange 声明失败(可能未安装插件): %v", err)
}
// 3. 声明死信队列
err = SetupDeadLetterQueue(ctx, "dead_letter_queue", "dlx_exchange")
if err != nil {
return err
}
// 4. 声明业务队列
queues := []struct {
name string
dlx bool // 是否需要死信队列
}{
{"ragflow_request_queue", true},
{"follow_up_queue", true},
{"archive_queue", true},
}
for _, q := range queues {
if q.dlx {
err = SetupQueueWithDLX(ctx, q.name, "dlx_exchange", "dead_letter_queue")
} else {
err = DeclareQueue(ctx, &QueueConfig{
Name: q.name,
Durable: true,
})
}
if err != nil {
return err
}
}
// 5. 绑定队列
bindings := []BindingConfig{
{Queue: "ragflow_request_queue", Exchange: "ragflow_exchange", RoutingKey: "ragflow_request_queue"},
{Queue: "follow_up_queue", Exchange: "delay_exchange", RoutingKey: "follow_up_queue"},
{Queue: "archive_queue", Exchange: "delay_exchange", RoutingKey: "archive_queue"},
}
for _, b := range bindings {
err = BindQueue(ctx, &b)
if err != nil {
return err
}
}
g.Log().Info(ctx, "RabbitMQ 拓扑结构设置完成")
return
}