Files
common/rabbitmq/delay.go
2026-03-12 08:51:17 +08:00

96 lines
2.3 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package rabbitmq - RabbitMQ延时消息发布
package rabbitmq
import (
"context"
"time"
"github.com/gogf/gf/v2/encoding/gjson"
"github.com/gogf/gf/v2/errors/gerror"
amqp "github.com/rabbitmq/amqp091-go"
)
// PublishWithDelay 发布延时消息到RabbitMQ
// delaySeconds: 延时秒数
func PublishWithDelay(ctx context.Context, routingKey string, message interface{}, delaySeconds int) error {
ch, err := GetChannel()
if err != nil {
return gerror.Wrap(err, "获取RabbitMQ通道失败")
}
if ch == nil {
return gerror.New("RabbitMQ通道未初始化")
}
// 序列化消息
body, err := gjson.Encode(message)
if err != nil {
return gerror.Wrapf(err, "序列化消息失败")
}
// 声明延时交换机x-delayed-message类型
// 注意需要RabbitMQ安装延时插件 rabbitmq-plugins enable rabbitmq_delayed_message_exchange
exchangeName := "delayed.exchange"
err = ch.ExchangeDeclare(
exchangeName,
"x-delayed-message", // 延时交换机类型
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
amqp.Table{
"x-delayed-type": "direct", // 底层交换机类型
},
)
if err != nil {
return gerror.Wrapf(err, "声明延时交换机失败")
}
// 声明队列
queue, err := ch.QueueDeclare(
routingKey, // 队列名使用routingKey
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil,
)
if err != nil {
return gerror.Wrapf(err, "声明队列失败")
}
// 绑定队列到交换机
err = ch.QueueBind(
queue.Name, // queue name
routingKey, // routing key
exchangeName, // exchange
false,
nil,
)
if err != nil {
return gerror.Wrapf(err, "绑定队列失败")
}
// 发布延时消息
err = ch.PublishWithContext(
ctx,
exchangeName, // exchange
routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: body,
DeliveryMode: amqp.Persistent, // 持久化消息
Headers: amqp.Table{
"x-delay": delaySeconds * 1000, // 延时时间(毫秒)
},
Timestamp: time.Now(),
},
)
if err != nil {
return gerror.Wrapf(err, "发布延时消息失败")
}
return nil
}