Files
common/nats/nats_task.go

213 lines
7.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package nats
import (
"context"
"fmt"
"github.com/gogf/gf/v2/frame/g"
"github.com/nats-io/nats.go/jetstream"
"time"
)
// TaskStreamConfig 任务流配置
type TaskStreamConfig struct {
StreamName string // 流名称
Subjects []string // 主题数组(支持任务优先级,如 ["tasks.high","tasks.normal", "tasks.low"]
PublishSubject string // 发布使用的主题(仅用于记录,不影响流配置)
Storage StorageType // 存储类型
Retention RetentionType // 保留策略
MaxAge time.Duration // 最大保留时间
Duplicates time.Duration // 消息去重窗口时间
Replicas int // 副本数
MaxMsgSize int32 // 单条消息最大大小(字节)
MaxBytes int64 // 流最大存储大小(字节)
MaxMsgs int64 // 流中最大消息数
MaxMsgsPerSubject int64 // 每个主题最大消息数
MaxConsumers int // 最大消费者数量
DenyPurge bool // 是否禁止清理流
AllowRollup bool // 是否允许汇总消息
DenyDelete bool // 是否禁止删除
DiscardPerSubject bool // 是否按主题限制(工作队列模式)
Republish *RePublishConfig // 死信队列重新发布配置
}
// RePublishConfig 重新发布配置(用于死信队列)
type RePublishConfig struct {
Source string // 源主题
Destination string // 目标主题
HeadersOnly bool // 仅复制消息头
}
// StorageType 存储类型
type StorageType string
const (
StorageFile StorageType = "file" // 文件存储(持久化)
StorageMemory StorageType = "memory" // 内存存储
)
// RetentionType 保留策略
type RetentionType string
const (
RetentionLimit RetentionType = "limit" // 消息数量限制
RetentionPolicy RetentionType = "interest" // 基于兴趣
RetentionWorkQueue RetentionType = "workqueue" // 工作队列
)
// parseStorageType 解析存储类型
func parseStorageType(st StorageType) jetstream.StorageType {
switch st {
case StorageMemory:
return jetstream.MemoryStorage
default:
return jetstream.FileStorage
}
}
// parseRetentionType 解析保留策略
func parseRetentionType(rt RetentionType) jetstream.RetentionPolicy {
switch rt {
case RetentionLimit:
return jetstream.LimitsPolicy
case RetentionPolicy:
return jetstream.InterestPolicy
default:
return jetstream.WorkQueuePolicy
}
}
// createTaskStreamSimple 简化版创建任务流(适用于大多数场景)
// 只需提供流名称和主题数组,其他使用默认配置
func createTaskStreamSimple(ctx context.Context, streamName string, subjects []string) error {
return createTaskStream(ctx, TaskStreamConfig{
StreamName: streamName,
Subjects: subjects,
})
}
// createTaskStreamWithPriority 创建支持优先级的任务流
func createTaskStreamWithPriority(ctx context.Context, streamPrefix string) error {
subjects := []string{
fmt.Sprintf("%s.high.>", streamPrefix),
fmt.Sprintf("%s.normal.>", streamPrefix),
fmt.Sprintf("%s.low.>", streamPrefix),
}
return createTaskStream(ctx, TaskStreamConfig{
StreamName: streamPrefix,
Subjects: subjects,
})
}
// CreateTaskStream 配置: 文件存储 + 工作队列策略
// CreateTaskStream 创建任务消息队列流JetStream 2.10+
// 核心设计思路:
// 1. 严格持久化:使用文件存储,任务消息不会因为服务器重启而丢失
// 2. 支持任务优先级:通过主题分级实现,如 ["tasks.high", "tasks.low"]
// 3. 死信队列支持:通过 RePublish 配置将失败任务路由到专门的 DLQ 流
// 4. 灵活保留策略根据任务重要性设置不同的保留时长MaxAge
// 5. 工作队列模式确保每个任务只被一个消费者处理DiscardPerSubject
func createTaskStream(ctx context.Context, config TaskStreamConfig) error {
if !IsConnected() {
return fmt.Errorf("NATS 未连接")
}
if g.IsNil(config.StreamName) {
return fmt.Errorf("流名称不能为空")
}
if len(config.Subjects) == 0 {
return fmt.Errorf("主题数组不能为空")
}
// 设置默认值
if config.Storage == "" {
config.Storage = StorageFile // 默认文件存储
}
if config.Retention == "" {
config.Retention = RetentionWorkQueue // 默认工作队列策略
}
if config.MaxAge == 0 {
config.MaxAge = 24 * time.Hour // 默认保留24小时
}
if config.Replicas == 0 {
config.Replicas = 1 // 默认单副本
}
if config.MaxBytes == 0 {
config.MaxBytes = 10 * 1024 * 1024 * 1024 // 默认10GB
}
if config.MaxMsgs == 0 {
config.MaxMsgs = 100000 // 默认10万条消息
}
if config.MaxMsgSize == 0 {
config.MaxMsgSize = 1024 * 1024 // 默认1MB
}
if config.DiscardPerSubject {
config.DenyDelete = true // 工作队列模式下禁止删除
}
// 构建流配置
jsConfig := jetstream.StreamConfig{
Name: config.StreamName,
Subjects: config.Subjects,
Storage: parseStorageType(config.Storage),
Retention: parseRetentionType(config.Retention),
MaxAge: config.MaxAge,
Duplicates: config.Duplicates,
Replicas: config.Replicas,
MaxMsgSize: config.MaxMsgSize,
MaxBytes: config.MaxBytes,
MaxMsgs: config.MaxMsgs,
MaxMsgsPerSubject: config.MaxMsgsPerSubject,
MaxConsumers: config.MaxConsumers,
AllowRollup: config.AllowRollup,
DenyDelete: config.DenyDelete,
DenyPurge: config.DenyPurge,
Discard: jetstream.DiscardOld, // 默认删除旧消息
DiscardNewPerSubject: config.DiscardPerSubject,
}
// 配置死信队列重新发布(如果设置了)
if config.Republish != nil {
jsConfig.RePublish = &jetstream.RePublish{
Source: config.Republish.Source,
Destination: config.Republish.Destination,
HeadersOnly: config.Republish.HeadersOnly,
}
} else {
// 使用固定的死信队列命名规范:{StreamName}.DLQ
dlqSubject := fmt.Sprintf("%s.DLQ", config.StreamName)
// 死信队列配置
jsConfig.RePublish = &jetstream.RePublish{
Source: ">",
Destination: dlqSubject,
HeadersOnly: true,
}
}
// 检查流是否已存在
stream, err := js.Stream(ctx, config.StreamName)
if err == nil {
// 流已存在,更新配置
_, err = js.UpdateStream(ctx, jsConfig)
if err != nil {
return fmt.Errorf("更新任务流失败: %w", err)
}
g.Log().Infof(ctx, "✅ 任务消息队列流已更新: %s", stream.CachedInfo().Config.Name)
return nil
}
// 创建新流
stream, err = js.CreateStream(ctx, jsConfig)
if err != nil {
return fmt.Errorf("创建任务流失败: %w", err)
}
// 记录配置信息
configInfo := fmt.Sprintf("存储=%s, 策略=%s, 副本=%d, 保留=%v", config.Storage, config.Retention, config.Replicas, config.MaxAge)
if config.Republish != nil {
configInfo += fmt.Sprintf(", 死信队列=%s->%s", config.Republish.Source, config.Republish.Destination)
}
g.Log().Infof(ctx, "✅ 任务消息队列流创建成功: %s (%s)", stream.CachedInfo().Config.Name, configInfo)
return nil
}