Files
common/message/nats_msg.go

306 lines
8.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package message
import (
"context"
"encoding/json"
"fmt"
"github.com/gogf/gf/v2/frame/g"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"time"
)
type NatsPublishMsgConfig struct {
QueueName string
Durable bool
DelayTime int
Data any
}
type NatsSubscribeMsgConfig struct {
QueueName string
Durable bool
DelayTime int
ConsumerName string
AutoAck bool
PrefetchCount int
HandleFunc func(ctx context.Context, message map[string]interface{}) error
}
func (*NatsPublishMsgConfig) GetPublishMsgType() {
}
func (*NatsSubscribeMsgConfig) GetSubscribeMsgType() {
}
func init() {
// 注册 Nats 插件,必须使用 RegisterPlugin 确保连接检测
registerPlugin(MessageNATS, func() messageUtil {
return &natsMsg{}
})
}
type natsMsg struct{}
// Ping 检测 NATS 连接状态
func (c *natsMsg) ping(_ context.Context) bool {
return natsPing()
}
// Reconnect 重连 NATS
func (c *natsMsg) reconnect(ctx context.Context) error {
return natsReconnect(ctx)
}
// Close 关闭 NATS 连接
func (c *natsMsg) close(ctx context.Context) error {
return natsClose(ctx)
}
// Publish 发布消息
func (c *natsMsg) Publish(ctx context.Context, msgConfig messagePublishConfig) error {
cfg, ok := msgConfig.(*NatsPublishMsgConfig)
if !ok {
return fmt.Errorf("无效的 NATS 配置类型")
}
if g.IsEmpty(cfg.QueueName) {
return fmt.Errorf("必须提供队列名称")
}
if g.IsEmpty(cfg.Data) {
return fmt.Errorf("必须提供数据")
}
return c.createPublish(ctx, cfg.QueueName, cfg.Durable, cfg.DelayTime, cfg.Data)
}
// Publish 发布消息
func (c *natsMsg) createPublish(ctx context.Context, subject string, durable bool, delayTime int, data any) error {
delayMsg := delayTime > 0
if err := c.createStreamGroupInternal(ctx, subject, durable, delayMsg); err != nil {
return err
}
payload, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("序列化数据失败: %w", err)
}
msg := &nats.Msg{
Subject: subject,
Data: payload,
}
if delayMsg {
// 计算目标投递时间
targetTime := time.Now().Add(time.Duration(delayTime) * time.Second)
delayNs := time.Until(targetTime).Nanoseconds()
if delayNs < 0 {
delayNs = 0
}
g.Log().Infof(ctx, "📅 NATS 延迟消息配置: DelayTime=%d秒, TargetTime=%v, DelayNs=%d纳秒(%.2f秒)",
delayTime, targetTime.Format("2006-01-02 15:04:05"), delayNs, float64(delayNs)/float64(time.Second.Nanoseconds()))
// NATS JetStream 延迟消息使用 Nats-Msg-Delay Header纳秒数
msg.Header = nats.Header{
"Nats-Msg-Delay": []string{fmt.Sprintf("%d", delayNs)},
}
g.Log().Infof(ctx, "📅 NATS 延迟消息 Header: %v", msg.Header)
// 获取 Stream 配置验证
streamName, _ := getStreamInfo(durable, delayMsg)
stream, err := js.Stream(ctx, streamName)
if err == nil {
info, _ := stream.Info(ctx)
g.Log().Infof(ctx, "📅 Stream 配置: AllowMsgSchedules=%v, Storage=%v",
info.Config.AllowMsgSchedules, info.Config.Storage)
if !info.Config.AllowMsgSchedules {
g.Log().Errorf(ctx, "❌ Stream 不支持延迟消息AllowMsgSchedules=false")
}
}
}
// 发布消息到 JetStream
ack, err := js.PublishMsg(ctx, msg)
if err != nil {
g.Log().Errorf(ctx, "❌ NATS 发布消息失败: err=%v", err)
return err
}
g.Log().Infof(ctx, "✅ NATS 发布消息成功: StreamSeq=%d, Domain=%s", ack.Sequence, ack.Domain)
return nil
}
// createStreamGroup 内部创建消费组
func (c *natsMsg) createStreamGroupInternal(ctx context.Context, subject string, durable, delayMsg bool) error {
streamName, storage := getStreamInfo(durable, delayMsg)
// 先检查 Stream 是否存在
stream, err := js.Stream(ctx, streamName)
if err == nil {
// Stream 已存在,检查配置是否匹配
info, _ := stream.Info(ctx)
if info.Config.AllowMsgSchedules != delayMsg || info.Config.Storage != storage {
g.Log().Infof(ctx, "🔄 Stream 配置不匹配,正在重新创建: stream=%s, 当前AllowMsgSchedules=%v, 需要%v",
streamName, info.Config.AllowMsgSchedules, delayMsg)
// 删除旧 Stream
if err := js.DeleteStream(ctx, streamName); err != nil {
g.Log().Warningf(ctx, "删除旧 Stream 失败: %v", err)
}
} else {
g.Log().Infof(ctx, "✅ Stream 已存在且配置正确: stream=%s", streamName)
return nil
}
}
// 构建流配置
jsConfig := jetstream.StreamConfig{
Name: streamName,
Subjects: []string{subject},
AllowMsgSchedules: delayMsg, // 延迟消息核心开关
Storage: storage,
Discard: jetstream.DiscardOld, // 达到上限删除旧消息
}
stream, err = js.CreateStream(ctx, jsConfig)
if err != nil {
return fmt.Errorf("创建任务流失败: %w", err)
}
// 获取 Stream 信息验证配置
info, err := stream.Info(ctx)
if err == nil {
g.Log().Infof(ctx, "✅ NATS 队列初始化成功: stream=%s, AllowMsgSchedules=%v, Storage=%v",
streamName, info.Config.AllowMsgSchedules, info.Config.Storage)
}
g.Log().Infof(ctx, "✅ NATS 队列初始化成功: stream=%s", streamName)
return nil
}
// Subscribe 订阅消息
func (c *natsMsg) Subscribe(ctx context.Context, msgConfig messageSubscribeConfig) error {
cfg, ok := msgConfig.(*NatsSubscribeMsgConfig)
if !ok {
return fmt.Errorf("无效的 NATS 配置类型")
}
if g.IsEmpty(cfg.QueueName) {
return fmt.Errorf("必须提供队列名称")
}
if g.IsEmpty(cfg.ConsumerName) {
return fmt.Errorf("必须提供消费者名称")
}
if g.IsEmpty(cfg.HandleFunc) {
return fmt.Errorf("必须提供处理函数")
}
if g.IsEmpty(cfg.PrefetchCount) {
cfg.PrefetchCount = 1
}
return c.createSubscribeInternal(ctx, cfg.QueueName, cfg.ConsumerName, cfg.PrefetchCount, cfg.AutoAck, cfg.Durable, cfg.DelayTime, cfg.HandleFunc)
}
// createSubscribe 内部订阅消息
func (c *natsMsg) createSubscribeInternal(ctx context.Context, subject, consumerName string, prefetchCount int, autoAck, durable bool, delayTime int, handler func(ctx context.Context, message map[string]interface{}) error) error {
g.Log().Infof(ctx, "🔔 NATS 开始订阅: QueueName=%s, ConsumerName=%s", subject, consumerName)
delayMsg := delayTime > 0
streamName, _ := getStreamInfo(durable, delayMsg)
// 确保 Stream 存在,如果不存在则创建
if err := c.createStreamGroupInternal(ctx, subject, durable, delayMsg); err != nil {
g.Log().Errorf(ctx, "创建 Stream 失败: %v", err)
return fmt.Errorf("创建 Stream 失败: %w", err)
}
// Stream 不存在,创建新的
ackPolicy := jetstream.AckExplicitPolicy
if autoAck {
ackPolicy = jetstream.AckNonePolicy
}
jsConfig := jetstream.ConsumerConfig{
Name: consumerName,
Durable: consumerName,
FilterSubject: subject,
AckPolicy: ackPolicy,
MaxDeliver: 3,
MaxAckPending: prefetchCount,
}
// 创建新消费者
consumer, err := js.CreateOrUpdateConsumer(ctx, streamName, jsConfig)
if err != nil {
g.Log().Errorf(ctx, "创建消费者失败: %v", err)
return err
}
// 获取消费者信息验证
if cInfo, err := consumer.Info(ctx); err == nil {
g.Log().Infof(ctx, "🔔 消费者创建成功: %s, AckPolicy=%v, MaxAckPending=%d",
cInfo.Name, cInfo.Config.AckPolicy, cInfo.Config.MaxAckPending)
}
// 创建消息处理函数
msgHandler := func(msg jetstream.Msg) {
// 记录消息接收时间
now := time.Now()
meta, err := msg.Metadata()
if err == nil {
g.Log().Infof(ctx, "📨 收到消息: StreamSeq=%d, Published=%v, Received=%v, 距离发布=%.2f秒",
meta.Sequence.Stream,
meta.Timestamp.Format("2006-01-02 15:04:05"),
now.Format("2006-01-02 15:04:05"),
now.Sub(meta.Timestamp).Seconds())
}
// 解析消息
var data map[string]any
if err := json.Unmarshal(msg.Data(), &data); err != nil {
g.Log().Errorf(ctx, "解析消息失败: %v", err)
if err := msg.Nak(); err != nil {
g.Log().Errorf(ctx, "Nak 失败: %v", err)
}
return
}
// 处理业务逻辑
if err := handler(ctx, data); err != nil {
g.Log().Errorf(ctx, "处理消息失败: %v", err)
if err := msg.Nak(); err != nil {
g.Log().Errorf(ctx, "Nak 失败: %v", err)
}
return
}
g.Log().Infof(ctx, "处理消息成功")
if !autoAck {
if err := msg.Ack(); err != nil {
g.Log().Errorf(ctx, "Ack 失败: %v", err)
}
}
}
// 开始消费
_, err = consumer.Consume(msgHandler)
if err != nil {
return fmt.Errorf("开始消费失败: %w", err)
}
g.Log().Infof(ctx, "✅ NATS 订阅成功")
return nil
}
func getStreamInfo(durable, delayMsg bool) (string, jetstream.StorageType) {
// Stream 不存在,创建新的
streamName := "ordinary_msg_memory"
storage := jetstream.MemoryStorage
// 延迟消息必须使用 FileStorageNATS 官方要求)
if delayMsg {
streamName = "delay_msg_file"
storage = jetstream.FileStorage
} else {
if durable {
streamName = "ordinary_msg_file"
storage = jetstream.FileStorage
}
}
return streamName, storage
}