Files
common/rabbitmq/consumer.go
2026-03-12 08:50:45 +08:00

190 lines
4.4 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package rabbitmq
import (
"context"
"encoding/json"
"fmt"
"github.com/gogf/gf/v2/frame/g"
amqp "github.com/rabbitmq/amqp091-go"
)
// MessageHandler 消息处理函数
type MessageHandler func(ctx context.Context, body []byte) error
// Consumer 消费者
type Consumer struct {
queue string
consumerTag string
prefetchCount int // QoS: 预取数量(并发控制)
autoAck bool // 是否自动确认
handler MessageHandler
workerCount int // worker 数量
cancel context.CancelFunc // 用于停止 worker
}
// ConsumerOption 消费者配置选项
type ConsumerOption func(*Consumer)
// WithPrefetchCount 设置预取数量(并发控制)
func WithPrefetchCount(count int) ConsumerOption {
return func(c *Consumer) {
c.prefetchCount = count
}
}
// WithAutoAck 设置自动确认
func WithAutoAck(autoAck bool) ConsumerOption {
return func(c *Consumer) {
c.autoAck = autoAck
}
}
// WithWorkerCount 设置 worker 数量
func WithWorkerCount(count int) ConsumerOption {
return func(c *Consumer) {
c.workerCount = count
}
}
// WithConsumerTag 设置消费者标签
func WithConsumerTag(tag string) ConsumerOption {
return func(c *Consumer) {
c.consumerTag = tag
}
}
// NewConsumer 创建消费者
func NewConsumer(queue string, handler MessageHandler, opts ...ConsumerOption) *Consumer {
c := &Consumer{
queue: queue,
consumerTag: "",
prefetchCount: 1, // 默认 1 个
autoAck: false, // 默认手动确认
handler: handler,
workerCount: 1, // 默认 1 个 worker
}
// 应用选项
for _, opt := range opts {
opt(c)
}
return c
}
// Start 启动消费者
func (c *Consumer) Start(ctx context.Context) error {
// 创建可取消的 context
workerCtx, cancel := context.WithCancel(ctx)
c.cancel = cancel
ch, err := GetChannel()
if err != nil {
return err
}
// 设置 QoS并发控制
err = ch.Qos(
c.prefetchCount, // prefetchCount: 每个 consumer 最多同时处理的消息数
0, // prefetchSize: 0 表示不限制
false, // global: false 表示仅应用于当前 channel
)
if err != nil {
return fmt.Errorf("设置 QoS 失败: %v", err)
}
// 开始消费
msgs, err := ch.Consume(
c.queue, // queue
c.consumerTag, // consumer tag
c.autoAck, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return fmt.Errorf("开始消费失败: %v", err)
}
g.Log().Infof(ctx, "消费者已启动: queue=%s, prefetch=%d, workers=%d",
c.queue, c.prefetchCount, c.workerCount)
// 启动多个 worker
for i := 0; i < c.workerCount; i++ {
go c.worker(workerCtx, i, msgs)
}
return nil
}
// worker 工作协程
func (c *Consumer) worker(ctx context.Context, workerID int, msgs <-chan amqp.Delivery) {
g.Log().Debugf(ctx, "Worker %d 已启动", workerID)
for {
select {
case <-ctx.Done():
// Context 取消,退出
g.Log().Infof(ctx, "Worker %d 收到停止信号,正在退出", workerID)
return
case msg, ok := <-msgs:
if !ok {
// Channel 关闭,退出
g.Log().Infof(ctx, "Worker %d 消息通道已关闭,退出", workerID)
return
}
// 处理消息
err := c.handler(ctx, msg.Body)
if err != nil {
g.Log().Errorf(ctx, "Worker %d 处理消息失败: %v", workerID, err)
// 如果不是自动确认,需要手动 Nack
if !c.autoAck {
// requeue=false: 不重新入队,进入死信队列
msg.Nack(false, false)
}
} else {
// 处理成功,手动确认
if !c.autoAck {
msg.Ack(false)
}
g.Log().Debugf(ctx, "Worker %d 处理消息成功", workerID)
}
}
}
}
// StartTypedConsumer 启动类型化消费者(自动反序列化)
func StartTypedConsumer[T any](
ctx context.Context,
queue string,
handler func(ctx context.Context, msg *T) error,
opts ...ConsumerOption,
) error {
// 包装处理函数
wrappedHandler := func(ctx context.Context, body []byte) error {
var msg T
if err := json.Unmarshal(body, &msg); err != nil {
return fmt.Errorf("反序列化消息失败: %v", err)
}
return handler(ctx, &msg)
}
consumer := NewConsumer(queue, wrappedHandler, opts...)
return consumer.Start(ctx)
}
// Stop 停止消费者
func (c *Consumer) Stop(ctx context.Context) {
if c.cancel != nil {
g.Log().Infof(ctx, "正在停止消费者: queue=%s", c.queue)
c.cancel()
c.cancel = nil
}
}