295 lines
9.5 KiB
Go
295 lines
9.5 KiB
Go
package nats
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"github.com/gogf/gf/v2/frame/g"
|
||
"github.com/nats-io/nats.go/jetstream"
|
||
)
|
||
|
||
// AckPolicy 确认策略
|
||
type AckPolicy string
|
||
|
||
const (
|
||
AckPolicyExplicit AckPolicy = "explicit" // 显式确认(默认)
|
||
AckPolicyAll AckPolicy = "all" // 确认所有消息
|
||
AckPolicyNone AckPolicy = "none" // 不需要确认
|
||
)
|
||
|
||
// DeliverPolicy 投递策略
|
||
type DeliverPolicy string
|
||
|
||
const (
|
||
DeliverPolicyAll DeliverPolicy = "all" // 投递所有消息(包括已投递的)
|
||
DeliverPolicyLast DeliverPolicy = "last" // 从最后一条消息开始
|
||
DeliverPolicyNew DeliverPolicy = "new" // 仅投递新消息(默认)
|
||
DeliverPolicyLastPerSubj DeliverPolicy = "lastpersubj" // 每个主题的最后一条
|
||
DeliverPolicyByStartSeq DeliverPolicy = "by_start_sequence" // 按起始序列号
|
||
)
|
||
|
||
// ReplayPolicy 重放策略
|
||
type ReplayPolicy string
|
||
|
||
const (
|
||
ReplayPolicyInstant ReplayPolicy = "instant" // 立即重放
|
||
ReplayPolicyOriginal ReplayPolicy = "original" // 按原始顺序重放
|
||
)
|
||
|
||
// ConsumerConfig 消费者配置
|
||
type ConsumerConfig struct {
|
||
DurableName string // 持久化名称(空表示临时消费者)
|
||
Description string // 描述信息
|
||
AckPolicy AckPolicy // 确认策略
|
||
AckWait int // 确认等待时间(秒)
|
||
MaxDeliver int // 最大投递次数
|
||
FilterSubject string // 过滤主题(流内多主题时使用)
|
||
DeliverPolicy DeliverPolicy // 投递策略
|
||
ReplayPolicy ReplayPolicy // 重放策略
|
||
MaxWaiting int // 最大等待消息数
|
||
MaxAckPending int // 最大待确认消息数
|
||
OptStartTime int64 // 起始时间戳
|
||
OptStartSeq uint64 // 起始序列号
|
||
HeadersOnly bool // 仅消费消息头
|
||
Backoff []int // 退避策略(秒数数组)
|
||
RateLimit uint64 // 消息速率限制(消息/秒)
|
||
Replica int // 副本数
|
||
FlowControl bool // 启用流控
|
||
Metadata map[string]string // 元数据
|
||
}
|
||
|
||
// parseAckPolicy 解析确认策略
|
||
func parseAckPolicy(policy AckPolicy) jetstream.AckPolicy {
|
||
switch policy {
|
||
case AckPolicyAll:
|
||
return jetstream.AckAllPolicy
|
||
case AckPolicyNone:
|
||
return jetstream.AckNonePolicy
|
||
default:
|
||
return jetstream.AckExplicitPolicy
|
||
}
|
||
}
|
||
|
||
// parseDeliverPolicy 解析投递策略
|
||
func parseDeliverPolicy(policy DeliverPolicy) jetstream.DeliverPolicy {
|
||
switch policy {
|
||
case DeliverPolicyAll:
|
||
return jetstream.DeliverAllPolicy
|
||
case DeliverPolicyLast:
|
||
return jetstream.DeliverLastPolicy
|
||
case DeliverPolicyLastPerSubj:
|
||
return jetstream.DeliverLastPerSubjectPolicy
|
||
case DeliverPolicyByStartSeq:
|
||
return jetstream.DeliverByStartSequencePolicy
|
||
default:
|
||
return jetstream.DeliverNewPolicy
|
||
}
|
||
}
|
||
|
||
// parseReplayPolicy 解析重放策略
|
||
func parseReplayPolicy(policy ReplayPolicy) jetstream.ReplayPolicy {
|
||
switch policy {
|
||
case ReplayPolicyOriginal:
|
||
return jetstream.ReplayOriginalPolicy
|
||
default:
|
||
return jetstream.ReplayInstantPolicy
|
||
}
|
||
}
|
||
|
||
// CreateTaskConsumer 创建任务消费者
|
||
// 核心设计思路:
|
||
// 1. 显式确认:确保消息被正确处理后才确认
|
||
// 2. 重试机制:通过 MaxDeliver 控制最大重试次数
|
||
// 3. 持久化:DurableName 确保消费者状态持久化
|
||
// 4. 流控:防止消费者过载
|
||
func CreateTaskConsumer(ctx context.Context, streamName string, config ConsumerConfig) (jetstream.Consumer, error) {
|
||
if !IsConnected() {
|
||
return nil, fmt.Errorf("NATS 未连接")
|
||
}
|
||
|
||
if streamName == "" {
|
||
return nil, fmt.Errorf("流名称不能为空")
|
||
}
|
||
|
||
// 设置默认值
|
||
if config.AckPolicy == "" {
|
||
config.AckPolicy = AckPolicyExplicit // 默认显式确认
|
||
}
|
||
if config.AckWait == 0 {
|
||
config.AckWait = 30 // 默认30秒确认超时
|
||
}
|
||
if config.MaxDeliver == 0 {
|
||
config.MaxDeliver = 3 // 默认最多投递3次
|
||
}
|
||
if config.DeliverPolicy == "" {
|
||
config.DeliverPolicy = DeliverPolicyNew // 默认仅消费新消息
|
||
}
|
||
if config.ReplayPolicy == "" {
|
||
config.ReplayPolicy = ReplayPolicyInstant // 默认立即重放
|
||
}
|
||
if config.MaxAckPending == 0 {
|
||
config.MaxAckPending = 1000 // 默认最多1000条待确认消息
|
||
}
|
||
|
||
// 构建消费者配置
|
||
jsConfig := jetstream.ConsumerConfig{
|
||
Name: config.DurableName,
|
||
Description: config.Description,
|
||
AckPolicy: parseAckPolicy(config.AckPolicy),
|
||
AckWait: 0,
|
||
MaxDeliver: config.MaxDeliver,
|
||
FilterSubjects: []string{config.FilterSubject},
|
||
DeliverPolicy: parseDeliverPolicy(config.DeliverPolicy),
|
||
ReplayPolicy: parseReplayPolicy(config.ReplayPolicy),
|
||
MaxWaiting: config.MaxWaiting,
|
||
MaxAckPending: config.MaxAckPending,
|
||
HeadersOnly: config.HeadersOnly,
|
||
RateLimit: config.RateLimit,
|
||
Replicas: config.Replica,
|
||
Metadata: config.Metadata,
|
||
}
|
||
|
||
// 配置流控和心跳
|
||
if config.FlowControl {
|
||
jsConfig.FlowControl = true
|
||
}
|
||
// 配置起始位置
|
||
if config.OptStartSeq > 0 {
|
||
jsConfig.OptStartSeq = config.OptStartSeq
|
||
}
|
||
|
||
// 创建新消费者
|
||
consumer, err := js.CreateOrUpdateConsumer(ctx, streamName, jsConfig)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("创建消费者失败: %w", err)
|
||
}
|
||
|
||
// 记录配置信息
|
||
configInfo := fmt.Sprintf("确认策略=%s, 最大投递=%d, 投递策略=%s", config.AckPolicy, config.MaxDeliver, config.DeliverPolicy)
|
||
if config.FilterSubject != "" {
|
||
configInfo += fmt.Sprintf(", 过滤主题=%s", config.FilterSubject)
|
||
}
|
||
g.Log().Infof(ctx, "✅ 任务消费者创建成功: %s/%s (%s)", streamName, config.DurableName, configInfo)
|
||
|
||
return consumer, nil
|
||
}
|
||
|
||
// CreateConsumerSimple 简化版创建消费者(适用于大多数场景)
|
||
// 只需提供流名称和消费者名称,其他使用默认配置
|
||
func CreateConsumerSimple(ctx context.Context, streamName, durableName string) (err error) {
|
||
_, err = CreateTaskConsumer(ctx, streamName, ConsumerConfig{
|
||
DurableName: durableName,
|
||
})
|
||
return
|
||
}
|
||
|
||
// CreateConsumerWithFilter 创建带主题过滤的消费者
|
||
//func CreateConsumerWithFilter(ctx context.Context, streamName, durableName, filterSubject string) (jetstream.Consumer, error) {
|
||
// return CreateTaskConsumer(ctx, streamName, ConsumerConfig{
|
||
// DurableName: durableName,
|
||
// FilterSubject: filterSubject,
|
||
// })
|
||
//}
|
||
|
||
// CreateConsumerEphemeral 创建临时消费者
|
||
// 临时消费者没有持久化名称,连接断开后自动删除
|
||
//func CreateConsumerEphemeral(ctx context.Context, streamName string) (jetstream.Consumer, error) {
|
||
// if !IsConnected() {
|
||
// return nil, fmt.Errorf("NATS 未连接")
|
||
// }
|
||
//
|
||
// jsConfig := jetstream.ConsumerConfig{
|
||
// AckPolicy: jetstream.AckNonePolicy,
|
||
// AckWait: 0,
|
||
// MaxDeliver: 3,
|
||
// DeliverPolicy: jetstream.DeliverNewPolicy,
|
||
// ReplayPolicy: jetstream.ReplayInstantPolicy,
|
||
// MaxAckPending: 1000,
|
||
// }
|
||
//
|
||
// consumer, err := js.CreateConsumer(ctx, streamName, jsConfig)
|
||
// if err != nil {
|
||
// return nil, fmt.Errorf("创建临时消费者失败: %w", err)
|
||
// }
|
||
//
|
||
// g.Log().Infof(ctx, "✅ 临时消费者创建成功: %s", streamName)
|
||
// return consumer, nil
|
||
//}
|
||
|
||
// CreateConsumerPushMode 创建推送模式消费者
|
||
// 推送模式下,NATS 服务器主动将消息推送给消费者
|
||
func CreateConsumerPushMode(ctx context.Context, streamName, durableName, subject string, msgCount int) (err error) {
|
||
_, err = CreateTaskConsumer(ctx, streamName, ConsumerConfig{
|
||
DurableName: durableName,
|
||
FilterSubject: subject,
|
||
MaxAckPending: msgCount,
|
||
})
|
||
return
|
||
}
|
||
|
||
// CreateConsumerPullMode 创建拉取模式消费者
|
||
// 拉取模式下,消费者主动从服务器拉取消息
|
||
//func CreateConsumerPullMode(ctx context.Context, streamName, durableName string) (jetstream.Consumer, error) {
|
||
// return CreateTaskConsumer(ctx, streamName, ConsumerConfig{
|
||
// DurableName: durableName,
|
||
// DeliverPolicy: DeliverPolicyAll,
|
||
// MaxAckPending: 500, // 拉取模式下待确认消息数可以设置小一些
|
||
// })
|
||
//}
|
||
|
||
// ConsumeMessages 消费消息(推送模式)
|
||
func ConsumeMessages(ctx context.Context, streamName, consumerName string, handler jetstream.MessageHandler) error {
|
||
if !IsConnected() {
|
||
return fmt.Errorf("NATS 未连接")
|
||
}
|
||
// 获取消费者
|
||
consumer, err := js.Consumer(ctx, streamName, consumerName)
|
||
if err != nil {
|
||
return fmt.Errorf("获取消费者失败: %w", err)
|
||
}
|
||
|
||
// 业务处理
|
||
//if err := handler(ctx, streamMsg.Values); err != nil {
|
||
// glog.Infof(ctx, "业务处理失败-> err:%v\n", err)
|
||
// continue
|
||
//}
|
||
//// 确认消息
|
||
//if msg.AutoAck {
|
||
// err := ackMessage(ctx, msg.StreamKey, msg.GroupName, streamMsg.ID)
|
||
// if err != nil {
|
||
// glog.Infof(ctx, "消费者 '%s' 确认消息 ID %s 失败: %v\n", msg.ConsumerName, streamMsg.ID, err)
|
||
// }
|
||
//}
|
||
//// 创建消息处理函数
|
||
//handler = func(msg jetstream.Msg) {
|
||
// // 解析消息
|
||
// var task TaskMessage
|
||
// if err := json.Unmarshal(msg.Data(), &task); err != nil {
|
||
// g.Log().Errorf(ctx, "解析消息失败: %v", err)
|
||
// msg.Nak() // 拒绝消息,触发重试
|
||
// return
|
||
// }
|
||
//
|
||
// // 处理业务逻辑
|
||
// g.Log().Infof(ctx, "处理任务: %s", task.TaskID)
|
||
//
|
||
// // 处理成功,确认消息
|
||
// msg.Ack()
|
||
//}
|
||
|
||
// 开始消费
|
||
_, err = consumer.Consume(handler)
|
||
if err != nil {
|
||
return fmt.Errorf("开始消费失败: %w", err)
|
||
}
|
||
|
||
g.Log().Infof(ctx, "✅ 开始消费消息: %s/%s", streamName, consumerName)
|
||
return nil
|
||
}
|
||
|
||
// 定义消息结构
|
||
type TaskMessage struct {
|
||
TaskID string `json:"task_id"`
|
||
TaskType string `json:"task_type"`
|
||
Data string `json:"data"`
|
||
}
|