增加mq独立channel避免并发争夺
This commit is contained in:
@@ -21,6 +21,7 @@ type Consumer struct {
|
||||
handler MessageHandler
|
||||
workerCount int // worker 数量
|
||||
cancel context.CancelFunc // 用于停止 worker
|
||||
channel *amqp.Channel // 独立Channel(避免并发冲突)
|
||||
}
|
||||
|
||||
// ConsumerOption 消费者配置选项
|
||||
@@ -78,10 +79,17 @@ func (c *Consumer) Start(ctx context.Context) (err error) {
|
||||
// 创建可取消的 context
|
||||
workerCtx, cancel := context.WithCancel(ctx)
|
||||
c.cancel = cancel
|
||||
ch, err := GetChannel()
|
||||
|
||||
// 为每个消费者创建独立Channel(避免并发冲突)
|
||||
conn, err := GetConnection()
|
||||
if err != nil {
|
||||
return err
|
||||
return gerror.Wrap(err, "获取RabbitMQ连接失败")
|
||||
}
|
||||
c.channel, err = conn.Channel()
|
||||
if err != nil {
|
||||
return gerror.Wrap(err, "创建独立Channel失败")
|
||||
}
|
||||
ch := c.channel
|
||||
|
||||
// 声明队列(如果不存在则创建)
|
||||
// 注意:Queue到Exchange的绑定应由message服务在发送响应时动态创建,或通过运维工具提前配置
|
||||
@@ -196,8 +204,13 @@ func StartTypedConsumer[T any](
|
||||
// Stop 停止消费者
|
||||
func (c *Consumer) Stop(ctx context.Context) {
|
||||
if c.cancel != nil {
|
||||
g.Log().Infof(ctx, "正在停止消费者: queue=%s", c.queue)
|
||||
c.cancel()
|
||||
c.cancel = nil
|
||||
}
|
||||
// 关闭独立Channel
|
||||
if c.channel != nil && !c.channel.IsClosed() {
|
||||
c.channel.Close()
|
||||
g.Log().Debugf(ctx, "消费者Channel已关闭: queue=%s", c.queue)
|
||||
}
|
||||
g.Log().Infof(ctx, "正在停止消费者: queue=%s", c.queue)
|
||||
c.cancel = nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user