Files
common/rabbitmq/publisher.go
2026-03-12 08:50:40 +08:00

148 lines
3.2 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package rabbitmq
import (
"context"
"encoding/json"
"fmt"
"github.com/gogf/gf/v2/frame/g"
amqp "github.com/rabbitmq/amqp091-go"
)
// Publisher 消息发布器
type Publisher struct {
exchange string
routingKey string
}
// NewPublisher 创建发布器
func NewPublisher(exchange, routingKey string) *Publisher {
return &Publisher{
exchange: exchange,
routingKey: routingKey,
}
}
// Publish 发布消息
func (p *Publisher) Publish(ctx context.Context, message interface{}) error {
ch, err := GetChannel()
if err != nil {
return err
}
// 序列化消息
body, err := json.Marshal(message)
if err != nil {
return fmt.Errorf("消息序列化失败: %v", err)
}
// 发布消息
err = ch.PublishWithContext(
ctx,
p.exchange, // exchange
p.routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
DeliveryMode: amqp.Persistent, // 持久化
ContentType: "application/json",
Body: body,
},
)
if err != nil {
g.Log().Errorf(ctx, "发布消息失败: exchange=%s, routingKey=%s, err=%v",
p.exchange, p.routingKey, err)
return err
}
g.Log().Debugf(ctx, "消息发布成功: exchange=%s, routingKey=%s",
p.exchange, p.routingKey)
return nil
}
// PublishDelayed 发布延时消息
// delaySeconds: 延时秒数
func (p *Publisher) PublishDelayed(ctx context.Context, message interface{}, delaySeconds int) error {
ch, err := GetChannel()
if err != nil {
return err
}
// 序列化消息
body, err := json.Marshal(message)
if err != nil {
return fmt.Errorf("消息序列化失败: %v", err)
}
// 发布延时消息(需要 rabbitmq_delayed_message_exchange 插件)
err = ch.PublishWithContext(
ctx,
p.exchange, // exchange必须是 x-delayed-message 类型)
p.routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "application/json",
Body: body,
Headers: amqp.Table{
"x-delay": delaySeconds * 1000, // 延时(毫秒)
},
},
)
if err != nil {
g.Log().Errorf(ctx, "发布延时消息失败: exchange=%s, routingKey=%s, delay=%ds, err=%v",
p.exchange, p.routingKey, delaySeconds, err)
return err
}
g.Log().Debugf(ctx, "延时消息发布成功: exchange=%s, routingKey=%s, delay=%ds",
p.exchange, p.routingKey, delaySeconds)
return nil
}
// PublishBatch 批量发布消息
func (p *Publisher) PublishBatch(ctx context.Context, messages []interface{}) error {
if len(messages) == 0 {
return nil
}
ch, err := GetChannel()
if err != nil {
return err
}
for i, message := range messages {
body, err := json.Marshal(message)
if err != nil {
g.Log().Errorf(ctx, "消息 %d 序列化失败: %v", i, err)
continue
}
err = ch.PublishWithContext(
ctx,
p.exchange,
p.routingKey,
false,
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "application/json",
Body: body,
},
)
if err != nil {
g.Log().Errorf(ctx, "消息 %d 发布失败: %v", i, err)
continue
}
}
g.Log().Infof(ctx, "批量发布完成: 共 %d 条消息", len(messages))
return nil
}