package rabbitmq import ( "context" "github.com/gogf/gf/v2/encoding/gjson" "github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/frame/g" amqp "github.com/rabbitmq/amqp091-go" ) // Publisher 消息发布器 type Publisher struct { exchange string routingKey string } // NewPublisher 创建发布器 func NewPublisher(exchange, routingKey string) *Publisher { return &Publisher{ exchange: exchange, routingKey: routingKey, } } // Publish 发布消息(使用默认 routing key) func (p *Publisher) Publish(ctx context.Context, message interface{}) (err error) { return p.PublishWithRoutingKey(ctx, p.routingKey, message) } // PublishWithRoutingKey 发布消息(指定 routing key) func (p *Publisher) PublishWithRoutingKey(ctx context.Context, routingKey string, message interface{}) (err error) { ch, err := GetChannel() if err != nil { return err } // 序列化消息 body, err := gjson.Encode(message) if err != nil { return gerror.Newf("消息序列化失败: %v", err) } // 发布消息 err = ch.PublishWithContext( ctx, p.exchange, // exchange routingKey, // routing key false, // mandatory false, // immediate amqp.Publishing{ DeliveryMode: amqp.Persistent, // 持久化 ContentType: "application/json", Body: body, }, ) if err != nil { g.Log().Errorf(ctx, "发布消息失败: exchange=%s, routingKey=%s, err=%v", p.exchange, routingKey, err) return err } g.Log().Debugf(ctx, "消息发布成功: exchange=%s, routingKey=%s", p.exchange, routingKey) return } // PublishDelayed 发布延时消息 // delaySeconds: 延时秒数 func (p *Publisher) PublishDelayed(ctx context.Context, message interface{}, delaySeconds int) (err error) { ch, err := GetChannel() if err != nil { return err } // 序列化消息 body, err := gjson.Encode(message) if err != nil { return gerror.Newf("消息序列化失败: %v", err) } // 发布延时消息(需要 rabbitmq_delayed_message_exchange 插件) err = ch.PublishWithContext( ctx, p.exchange, // exchange(必须是 x-delayed-message 类型) p.routingKey, // routing key false, // mandatory false, // immediate amqp.Publishing{ DeliveryMode: amqp.Persistent, ContentType: "application/json", Body: body, Headers: amqp.Table{ "x-delay": delaySeconds * 1000, // 延时(毫秒) }, }, ) if err != nil { g.Log().Errorf(ctx, "发布延时消息失败: exchange=%s, routingKey=%s, delay=%ds, err=%v", p.exchange, p.routingKey, delaySeconds, err) return err } g.Log().Debugf(ctx, "延时消息发布成功: exchange=%s, routingKey=%s, delay=%ds", p.exchange, p.routingKey, delaySeconds) return } // PublishBatch 批量发布消息 func (p *Publisher) PublishBatch(ctx context.Context, messages []interface{}) (err error) { if len(messages) == 0 { return } ch, err := GetChannel() if err != nil { return err } for i, message := range messages { body, err := gjson.Encode(message) if err != nil { g.Log().Errorf(ctx, "消息 %d 序列化失败: %v", i, err) continue } err = ch.PublishWithContext( ctx, p.exchange, p.routingKey, false, false, amqp.Publishing{ DeliveryMode: amqp.Persistent, ContentType: "application/json", Body: body, }, ) if err != nil { g.Log().Errorf(ctx, "消息 %d 发布失败: %v", i, err) continue } } g.Log().Infof(ctx, "批量发布完成: 共 %d 条消息", len(messages)) return }