Files
common/nats/task.go

412 lines
13 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package nats
//import (
// "context"
// "fmt"
// "time"
//
// "github.com/gogf/gf/v2/frame/g"
// "github.com/nats-io/nats.go/jetstream"
//)
//// TaskPriority 任务优先级
//type TaskPriority string
//
//const (
// TaskPriorityHigh TaskPriority = "high" // 高优先级任务
// TaskPriorityNormal TaskPriority = "normal" // 普通优先级任务
// TaskPriorityLow TaskPriority = "low" // 低优先级任务
//)
//
//// TaskStreamConfig 任务流配置
//type TaskStreamConfig struct {
// StreamName string // 流名称
// Subjects []string // 主题列表(支持优先级分级,如 tasks.high.>, tasks.normal.>, tasks.low.>
// Subject string // 默认发布主题
// Priority TaskPriority // 任务优先级
// MaxAge time.Duration // 消息保留时长(根据任务重要性设置)
// MaxMsgsPerSub int64 // 每个订阅者最大消息数(防止内存溢出)
// Replicas int // 副本数默认1建议生产环境使用3
// Duplicates time.Duration // 消息去重窗口0表示不启用
//}
//
//// TaskConsumerConfig 任务消费者配置
//type TaskConsumerConfig struct {
// ConsumerName string // 消费者名称
// AckPolicy *jetstream.AckPolicy
// MaxDeliveries int32 // 最大投递次数(用于重试控制)
// AckWait time.Duration // 等待ACK超时时间
// Backoff []time.Duration // 重试退避策略
// FilterSubject string // 过滤主题(可指定特定优先级任务)
// MaxAckPending int // 最大待确认消息数
// MaxWaiting int // 最大等待消息数
// ReplayPolicy *jetstream.ReplayPolicy // 重放策略
//}
// CreateTaskStream 创建任务流(基于 JetStream 2.10+ API
//
// 核心设计思路:
// 1. 严格的持久化使用文件存储FileStorage避免任务丢失
// 2. 任务优先级通过主题分级实现tasks.high/tasks.normal/tasks.low
// 3. 死信队列:配置死信队列处理失败任务
// 4. 保留策略:按任务重要性设置不同的保留时长
// 5. 工作队列策略:确保每条消息只被一个消费者处理
//
// 参数:
// - ctx: 上下文
// - config: 任务流配置
//
// 返回:
// - error: 错误信息
//func CreateTaskStream(ctx context.Context, config TaskStreamConfig) error {
// if !IsConnected() {
// return fmt.Errorf("NATS 未连接")
// }
//
// // 设置默认值
// if config.MaxAge == 0 {
// config.MaxAge = 7 * 24 * time.Hour // 默认保留7天
// }
// if config.MaxMsgsPerSub == 0 {
// config.MaxMsgsPerSub = 100000 // 默认每订阅者最多10万条消息
// }
// if config.Replicas == 0 {
// config.Replicas = 1 // 默认单副本
// }
// if config.Duplicates == 0 {
// config.Duplicates = 2 * time.Minute // 默认2分钟去重窗口
// }
//
// // 验证主题配置
// if len(config.Subjects) == 0 {
// return fmt.Errorf("任务流必须指定至少一个主题")
// }
//
// // 设置死信队列
// // 使用固定的死信队列命名规范:{StreamName}.DLQ
// dlqSubject := fmt.Sprintf("%s.DLQ", config.StreamName)
//
// // 尝试获取现有流
// stream, err := js.Stream(ctx, config.StreamName)
// if err == nil {
// // 流已存在,更新配置以适配任务流的特殊需求
// _, err = js.UpdateStream(ctx, jetstream.StreamConfig{
// Name: config.StreamName,
// Subjects: config.Subjects,
// Storage: jetstream.FileStorage, // 文件存储确保持久化
// Retention: jetstream.WorkQueuePolicy, // 工作队列策略
// MaxAge: config.MaxAge,
// MaxMsgs: config.MaxMsgsPerSub,
// Replicas: config.Replicas,
// Duplicates: config.Duplicates,
// // 死信队列配置
// RePublish: &jetstream.RePublish{
// Source: ">", // 匹配所有主题
// Destination: dlqSubject,
// },
// // 限制流大小(防止磁盘占用过多)
// MaxBytes: 10 * 1024 * 1024 * 1024, // 10GB
// })
// if err != nil {
// return fmt.Errorf("更新任务流失败: %w", err)
// }
// g.Log().Infof(ctx, "✅ 任务流已更新: %s (优先级: %s, 保留: %v)",
// stream.CachedInfo().Config.Name, config.Priority, config.MaxAge)
// return nil
// }
//
// // 创建新任务流
// streamConfig := jetstream.StreamConfig{
// Name: config.StreamName,
// Subjects: config.Subjects,
// Storage: jetstream.FileStorage, // 文件存储确保持久化
// Retention: jetstream.WorkQueuePolicy, // 工作队列策略
// MaxAge: config.MaxAge,
// MaxMsgs: config.MaxMsgsPerSub,
// Replicas: config.Replicas,
// Duplicates: config.Duplicates,
// // 死信队列配置
// RePublish: &jetstream.RePublish{
// Source: ">", // 匹配所有主题
// Destination: dlqSubject,
// },
// // 限制流大小(防止磁盘占用过多)
// MaxBytes: 10 * 1024 * 1024 * 1024, // 10GB
// // 启用流清理
// Discard: jetstream.DiscardOld, // 新消息替换旧消息
// }
//
// stream, err = js.CreateStream(ctx, streamConfig)
// if err != nil {
// return fmt.Errorf("创建任务流失败: %w", err)
// }
//
// // 验证流是否创建成功
// if stream == nil {
// return fmt.Errorf("创建任务流失败:流对象为空")
// }
//
// g.Log().Infof(ctx, "✅ 任务流创建成功: %s (文件存储+工作队列策略+死信队列, 优先级: %s, 保留: %v, 副本: %d)",
// stream.CachedInfo().Config.Name, config.Priority, config.MaxAge, config.Replicas)
//
// // 记录配置信息
// g.Log().Infof(ctx, " - 主题列表: %v", config.Subjects)
// g.Log().Infof(ctx, " - 死信队列: %s", dlqSubject)
// g.Log().Infof(ctx, " - 最大消息数: %d", config.MaxMsgsPerSub)
// g.Log().Infof(ctx, " - 去重窗口: %v", config.Duplicates)
//
// return nil
//}
//
//// CreateOrUpdateTaskConsumer 创建或更新任务消费者(基于 JetStream 2.10+ API
////
//// 核心设计思路:
//// 1. 支持手动确认AckExplicit确保任务处理完成
//// 2. 通过 Nack() 方法实现消息重试,超限后进入死信队列
//// 3. 支持主题过滤,可订阅特定优先级任务
//// 4. 限制待确认消息数,防止消费者过载
//// 5. AckWait 设置消息处理超时时间
////
//// 参数:
//// - ctx: 上下文
//// - streamName: 流名称
//// - consumerConfig: 消费者配置
////
//// 返回:
//// - jetstream.Consumer: 消费者对象
//// - error: 错误信息
//func CreateOrUpdateTaskConsumer(ctx context.Context, streamName string, consumerConfig TaskConsumerConfig) (jetstream.Consumer, error) {
// if !IsConnected() {
// return nil, fmt.Errorf("NATS 未连接")
// }
//
// // 设置默认值
// ackPolicy := jetstream.AckExplicitPolicy
// if consumerConfig.AckPolicy != nil {
// ackPolicy = *consumerConfig.AckPolicy
// }
//
// if consumerConfig.MaxDeliveries == 0 {
// consumerConfig.MaxDeliveries = 10 // 默认最多投递10次
// }
//
// if consumerConfig.AckWait == 0 {
// consumerConfig.AckWait = 30 * time.Second // 默认30秒等待确认
// }
//
// if consumerConfig.MaxAckPending == 0 {
// consumerConfig.MaxAckPending = 1000 // 默认最多1000条待确认消息
// }
//
// if consumerConfig.MaxWaiting == 0 {
// consumerConfig.MaxWaiting = 512 // 默认最多512条等待消息
// }
//
// replayPolicy := jetstream.ReplayInstantPolicy
// if consumerConfig.ReplayPolicy != nil {
// replayPolicy = *consumerConfig.ReplayPolicy
// }
//
// // 构建消费者配置
// config := jetstream.ConsumerConfig{
// Name: consumerConfig.ConsumerName,
// Durable: consumerConfig.ConsumerName, // 持久化消费者
// AckPolicy: ackPolicy,
// AckWait: consumerConfig.AckWait,
// MaxAckPending: consumerConfig.MaxAckPending,
// MaxWaiting: consumerConfig.MaxWaiting,
// ReplayPolicy: replayPolicy,
// FilterSubject: consumerConfig.FilterSubject,
// }
//
// // 使用 CreateOrUpdateConsumer 创建或更新消费者
// consumer, err := js.CreateOrUpdateConsumer(ctx, streamName, config)
// if err != nil {
// return nil, fmt.Errorf("创建任务消费者失败: %w", err)
// }
//
// g.Log().Infof(ctx, "✅ 任务消费者已创建/更新: %s/%s (等待确认: %v)",
// streamName, consumerConfig.ConsumerName, consumerConfig.AckWait)
//
// // 获取消费者信息并记录
// info, err := consumer.Info(ctx)
// if err == nil {
// g.Log().Infof(ctx, " - 过滤主题: %s", info.Config.FilterSubject)
// g.Log().Infof(ctx, " - 最大待确认: %d", info.Config.MaxAckPending)
// g.Log().Infof(ctx, " - ACK策略: %s", info.Config.AckPolicy)
// }
//
// return consumer, nil
//}
//
//// CreateTaskStreamWithPriority 创建带优先级的任务流
////
//// 便捷方法,自动创建支持多优先级的任务流配置
////
//// 参数:
//// - ctx: 上下文
//// - streamPrefix: 流名称前缀(如 "tasks"
//// - priority: 默认优先级
////
//// 返回:
//// - error: 错误信息
//func CreateTaskStreamWithPriority(ctx context.Context, streamPrefix string, priority TaskPriority) error {
// if !IsConnected() {
// return fmt.Errorf("NATS 未连接")
// }
//
// // 构建支持多优先级的主题列表
// subjects := []string{
// fmt.Sprintf("%s.high.>", streamPrefix), // 高优先级任务
// fmt.Sprintf("%s.normal.>", streamPrefix), // 普通优先级任务
// fmt.Sprintf("%s.low.>", streamPrefix), // 低优先级任务
// }
//
// // 根据优先级设置不同的保留时长
// var maxAge time.Duration
// switch priority {
// case TaskPriorityHigh:
// maxAge = 30 * 24 * time.Hour // 高优先级保留30天
// case TaskPriorityNormal:
// maxAge = 7 * 24 * time.Hour // 普通优先级保留7天
// case TaskPriorityLow:
// maxAge = 24 * time.Hour // 低优先级保留1天
// default:
// maxAge = 7 * 24 * time.Hour
// }
//
// config := TaskStreamConfig{
// StreamName: streamPrefix,
// Subjects: subjects,
// Subject: fmt.Sprintf("%s.%s.>", streamPrefix, priority),
// Priority: priority,
// MaxAge: maxAge,
// MaxMsgsPerSub: 100000,
// Replicas: 1,
// Duplicates: 2 * time.Minute,
// }
//
// return CreateTaskStream(ctx, config)
//}
//
//// PublishTask 发布任务到指定流
////
//// 参数:
//// - ctx: 上下文
//// - streamName: 流名称
//// - task: 任务数据会被JSON序列化
////
//// 返回:
//// - error: 错误信息
//func PublishTask(ctx context.Context, streamName string, task interface{}) error {
// if !IsConnected() {
// return fmt.Errorf("NATS 未连接")
// }
//
// // 使用 JsPublish 发布消息
// if err := JsPublish(ctx, streamName, task); err != nil {
// return fmt.Errorf("发布任务失败: %w", err)
// }
//
// return nil
//}
//
//// PublishTaskWithPriority 发布带优先级的任务
////
//// 参数:
//// - ctx: 上下文
//// - streamPrefix: 流名称前缀
//// - priority: 任务优先级
//// - taskType: 任务类型
//// - task: 任务数据会被JSON序列化
////
//// 返回:
//// - error: 错误信息
//func PublishTaskWithPriority(ctx context.Context, streamPrefix string, priority TaskPriority, taskType string, task interface{}) error {
// if !IsConnected() {
// return fmt.Errorf("NATS 未连接")
// }
//
// // 构建主题:{streamPrefix}.{priority}.{taskType}
// subject := fmt.Sprintf("%s.%s.%s", streamPrefix, priority, taskType)
//
// // 使用 JsPublish 发布消息
// if err := JsPublish(ctx, subject, task); err != nil {
// return fmt.Errorf("发布任务失败: %w", err)
// }
//
// g.Log().Debugf(ctx, "任务已发布: %s (优先级: %s, 类型: %s)", subject, priority, taskType)
//
// return nil
//}
//
//// GetTaskStreamInfo 获取任务流信息
////
//// 参数:
//// - ctx: 上下文
//// - streamName: 流名称
////
//// 返回:
//// - *jetstream.StreamInfo: 流信息
//// - error: 错误信息
//func GetTaskStreamInfo(ctx context.Context, streamName string) (*jetstream.StreamInfo, error) {
// if !IsConnected() {
// return nil, fmt.Errorf("NATS 未连接")
// }
//
// return GetStream(ctx, streamName)
//}
//
//// GetTaskConsumerInfo 获取任务消费者信息
////
//// 参数:
//// - ctx: 上下文
//// - streamName: 流名称
//// - consumerName: 消费者名称
////
//// 返回:
//// - *jetstream.ConsumerInfo: 消费者信息
//// - error: 错误信息
//func GetTaskConsumerInfo(ctx context.Context, streamName, consumerName string) (*jetstream.ConsumerInfo, error) {
// if !IsConnected() {
// return nil, fmt.Errorf("NATS 未连接")
// }
//
// return GetConsumer(ctx, streamName, consumerName)
//}
//
//// DeleteTaskStream 删除任务流
////
//// 注意:此操作会删除流及其所有消息,请谨慎使用
////
//// 参数:
//// - ctx: 上下文
//// - streamName: 流名称
////
//// 返回:
//// - error: 错误信息
//func DeleteTaskStream(ctx context.Context, streamName string) error {
// if !IsConnected() {
// return fmt.Errorf("NATS 未连接")
// }
//
// return DeleteStream(ctx, streamName)
//}
//
//// DeleteTaskConsumer 删除任务消费者
////
//// 参数:
//// - ctx: 上下文
//// - streamName: 流名称
//// - consumerName: 消费者名称
////
//// 返回:
//// - error: 错误信息
//func DeleteTaskConsumer(ctx context.Context, streamName, consumerName string) error {
// if !IsConnected() {
// return fmt.Errorf("NATS 未连接")
// }
//
// return DeleteConsumer(ctx, streamName, consumerName)
//}