Files
common/rabbitmq/consumer.go

218 lines
5.5 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package rabbitmq
import (
"context"
"github.com/gogf/gf/v2/encoding/gjson"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
amqp "github.com/rabbitmq/amqp091-go"
)
// MessageHandler 消息处理函数
type MessageHandler func(ctx context.Context, body []byte) error
// Consumer 消费者
type Consumer struct {
queue string
consumerTag string
prefetchCount int // QoS: 预取数量(并发控制)
autoAck bool // 是否自动确认
handler MessageHandler
workerCount int // worker 数量
cancel context.CancelFunc // 用于停止 worker
}
// ConsumerOption 消费者配置选项
type ConsumerOption func(*Consumer)
// WithPrefetchCount 设置预取数量(并发控制)
func WithPrefetchCount(count int) ConsumerOption {
return func(c *Consumer) {
c.prefetchCount = count
}
}
// WithAutoAck 设置自动确认
func WithAutoAck(autoAck bool) ConsumerOption {
return func(c *Consumer) {
c.autoAck = autoAck
}
}
// WithWorkerCount 设置 worker 数量
func WithWorkerCount(count int) ConsumerOption {
return func(c *Consumer) {
c.workerCount = count
}
}
// WithConsumerTag 设置消费者标签
func WithConsumerTag(tag string) ConsumerOption {
return func(c *Consumer) {
c.consumerTag = tag
}
}
// NewConsumer 创建消费者
func NewConsumer(queue string, handler MessageHandler, opts ...ConsumerOption) *Consumer {
c := &Consumer{
queue: queue,
consumerTag: "",
prefetchCount: 1, // 默认 1 个
autoAck: false, // 默认手动确认
handler: handler,
workerCount: 1, // 默认 1 个 worker
}
// 应用选项
for _, opt := range opts {
opt(c)
}
return c
}
// Start 启动消费者
func (c *Consumer) Start(ctx context.Context) (err error) {
// 创建可取消的 context
workerCtx, cancel := context.WithCancel(ctx)
c.cancel = cancel
ch, err := GetChannel()
if err != nil {
return err
}
// 声明队列(如果不存在则创建)
_, err = ch.QueueDeclare(
c.queue, // name
true, // durable持久化
false, // autoDelete不自动删除
false, // exclusive非独占
false, // noWait
nil, // arguments
)
if err != nil {
return gerror.Newf("声明队列失败: %v", err)
}
// TODO: 队列绑定逻辑暂时注释避免重复binding导致消息重复投递
// 绑定队列到Exchange使用队列名作为routingKey支持多租户
// Exchange类型应该是topic绑定模式为 #(接收所有消息)
// err = ch.QueueBind(
// c.queue, // queue name
// "#", // routing key通配符接收所有消息
// "ragflow.response", // exchange name
// false, // noWait
// nil, // arguments
// )
// if err != nil {
// g.Log().Warningf(ctx, "绑定队列到Exchange失败可能Exchange不存在或类型不匹配: %v", err)
// // 不返回错误继续启动消费者可能是direct exchange或队列已绑定
// }
// 设置 QoS并发控制
err = ch.Qos(
c.prefetchCount, // prefetchCount: 每个 consumer 最多同时处理的消息数
0, // prefetchSize: 0 表示不限制
false, // global: false 表示仅应用于当前 channel
)
if err != nil {
return gerror.Newf("设置 QoS 失败: %v", err)
}
// 开始消费
msgs, err := ch.Consume(
c.queue, // queue
c.consumerTag, // consumer tag
c.autoAck, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return gerror.Newf("开始消费失败: %v", err)
}
g.Log().Infof(ctx, "消费者已启动: queue=%s, prefetch=%d, workers=%d",
c.queue, c.prefetchCount, c.workerCount)
// 启动多个 worker
for i := 0; i < c.workerCount; i++ {
go c.worker(workerCtx, i, msgs)
}
return
}
// worker 工作协程
func (c *Consumer) worker(ctx context.Context, workerID int, msgs <-chan amqp.Delivery) {
g.Log().Debugf(ctx, "Worker %d 已启动", workerID)
for {
select {
case <-ctx.Done():
// Context 取消,退出
g.Log().Infof(ctx, "Worker %d 收到停止信号,正在退出", workerID)
return
case msg, ok := <-msgs:
if !ok {
// Channel 关闭,退出
g.Log().Infof(ctx, "Worker %d 消息通道已关闭,退出", workerID)
return
}
// 处理消息
err := c.handler(ctx, msg.Body)
if err != nil {
g.Log().Errorf(ctx, "Worker %d 处理消息失败: %v", workerID, err)
// 如果不是自动确认,需要手动 Nack
if !c.autoAck {
// requeue=false: 不重新入队,进入死信队列
msg.Nack(false, false)
}
} else {
// 处理成功,手动确认
if !c.autoAck {
msg.Ack(false)
}
g.Log().Debugf(ctx, "Worker %d 处理消息成功", workerID)
}
}
}
}
// StartTypedConsumer 启动类型化消费者(自动反序列化)
func StartTypedConsumer[T any](
ctx context.Context,
queue string,
handler func(ctx context.Context, msg *T) error,
opts ...ConsumerOption,
) error {
// 包装处理函数
wrappedHandler := func(ctx context.Context, body []byte) error {
var msg T
if err := gjson.DecodeTo(body, &msg); err != nil {
return gerror.Newf("反序列化消息失败: %v", err)
}
return handler(ctx, &msg)
}
consumer := NewConsumer(queue, wrappedHandler, opts...)
return consumer.Start(ctx)
}
// Stop 停止消费者
func (c *Consumer) Stop(ctx context.Context) {
if c.cancel != nil {
g.Log().Infof(ctx, "正在停止消费者: queue=%s", c.queue)
c.cancel()
c.cancel = nil
}
}