From 0f235ad52d933914d063b94a9081cf33a73ce741 Mon Sep 17 00:00:00 2001 From: Cold <16419454+cold502@user.noreply.gitee.com> Date: Tue, 30 Dec 2025 13:46:02 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0mq=E7=8B=AC=E7=AB=8Bchannel?= =?UTF-8?q?=E9=81=BF=E5=85=8D=E5=B9=B6=E5=8F=91=E4=BA=89=E5=A4=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rabbitmq/consumer.go | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/rabbitmq/consumer.go b/rabbitmq/consumer.go index 305b3f7..160d4bd 100644 --- a/rabbitmq/consumer.go +++ b/rabbitmq/consumer.go @@ -21,6 +21,7 @@ type Consumer struct { handler MessageHandler workerCount int // worker 数量 cancel context.CancelFunc // 用于停止 worker + channel *amqp.Channel // 独立Channel(避免并发冲突) } // ConsumerOption 消费者配置选项 @@ -78,10 +79,17 @@ func (c *Consumer) Start(ctx context.Context) (err error) { // 创建可取消的 context workerCtx, cancel := context.WithCancel(ctx) c.cancel = cancel - ch, err := GetChannel() + + // 为每个消费者创建独立Channel(避免并发冲突) + conn, err := GetConnection() if err != nil { - return err + return gerror.Wrap(err, "获取RabbitMQ连接失败") } + c.channel, err = conn.Channel() + if err != nil { + return gerror.Wrap(err, "创建独立Channel失败") + } + ch := c.channel // 声明队列(如果不存在则创建) // 注意:Queue到Exchange的绑定应由message服务在发送响应时动态创建,或通过运维工具提前配置 @@ -196,8 +204,13 @@ func StartTypedConsumer[T any]( // Stop 停止消费者 func (c *Consumer) Stop(ctx context.Context) { if c.cancel != nil { - g.Log().Infof(ctx, "正在停止消费者: queue=%s", c.queue) c.cancel() - c.cancel = nil } + // 关闭独立Channel + if c.channel != nil && !c.channel.IsClosed() { + c.channel.Close() + g.Log().Debugf(ctx, "消费者Channel已关闭: queue=%s", c.queue) + } + g.Log().Infof(ctx, "正在停止消费者: queue=%s", c.queue) + c.cancel = nil }