package rabbitmq import ( "context" "encoding/json" "fmt" "github.com/gogf/gf/v2/frame/g" amqp "github.com/rabbitmq/amqp091-go" ) // MessageHandler 消息处理函数 type MessageHandler func(ctx context.Context, body []byte) error // Consumer 消费者 type Consumer struct { queue string consumerTag string prefetchCount int // QoS: 预取数量(并发控制) autoAck bool // 是否自动确认 handler MessageHandler workerCount int // worker 数量 } // ConsumerOption 消费者配置选项 type ConsumerOption func(*Consumer) // WithPrefetchCount 设置预取数量(并发控制) func WithPrefetchCount(count int) ConsumerOption { return func(c *Consumer) { c.prefetchCount = count } } // WithAutoAck 设置自动确认 func WithAutoAck(autoAck bool) ConsumerOption { return func(c *Consumer) { c.autoAck = autoAck } } // WithWorkerCount 设置 worker 数量 func WithWorkerCount(count int) ConsumerOption { return func(c *Consumer) { c.workerCount = count } } // WithConsumerTag 设置消费者标签 func WithConsumerTag(tag string) ConsumerOption { return func(c *Consumer) { c.consumerTag = tag } } // NewConsumer 创建消费者 func NewConsumer(queue string, handler MessageHandler, opts ...ConsumerOption) *Consumer { c := &Consumer{ queue: queue, consumerTag: "", prefetchCount: 1, // 默认 1 个 autoAck: false, // 默认手动确认 handler: handler, workerCount: 1, // 默认 1 个 worker } // 应用选项 for _, opt := range opts { opt(c) } return c } // Start 启动消费者 func (c *Consumer) Start(ctx context.Context) error { ch, err := GetChannel() if err != nil { return err } // 设置 QoS(并发控制) err = ch.Qos( c.prefetchCount, // prefetchCount: 每个 consumer 最多同时处理的消息数 0, // prefetchSize: 0 表示不限制 false, // global: false 表示仅应用于当前 channel ) if err != nil { return fmt.Errorf("设置 QoS 失败: %v", err) } // 开始消费 msgs, err := ch.Consume( c.queue, // queue c.consumerTag, // consumer tag c.autoAck, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { return fmt.Errorf("开始消费失败: %v", err) } g.Log().Infof(ctx, "消费者已启动: queue=%s, prefetch=%d, workers=%d", c.queue, c.prefetchCount, c.workerCount) // 启动多个 worker for i := 0; i < c.workerCount; i++ { go c.worker(ctx, i, msgs) } return nil } // worker 工作协程 func (c *Consumer) worker(ctx context.Context, workerID int, msgs <-chan amqp.Delivery) { g.Log().Debugf(ctx, "Worker %d 已启动", workerID) for msg := range msgs { // 处理消息 err := c.handler(ctx, msg.Body) if err != nil { g.Log().Errorf(ctx, "Worker %d 处理消息失败: %v", workerID, err) // 如果不是自动确认,需要手动 Nack if !c.autoAck { // requeue=false: 不重新入队,进入死信队列 msg.Nack(false, false) } } else { // 处理成功,手动确认 if !c.autoAck { msg.Ack(false) } g.Log().Debugf(ctx, "Worker %d 处理消息成功", workerID) } } g.Log().Debugf(ctx, "Worker %d 已停止", workerID) } // StartTypedConsumer 启动类型化消费者(自动反序列化) func StartTypedConsumer[T any]( ctx context.Context, queue string, handler func(ctx context.Context, msg *T) error, opts ...ConsumerOption, ) error { // 包装处理函数 wrappedHandler := func(ctx context.Context, body []byte) error { var msg T if err := json.Unmarshal(body, &msg); err != nil { return fmt.Errorf("反序列化消息失败: %v", err) } return handler(ctx, &msg) } consumer := NewConsumer(queue, wrappedHandler, opts...) return consumer.Start(ctx) }