.gitignore

This commit is contained in:
2026-01-30 17:33:03 +08:00
parent 9445a42eab
commit 6acdbb6e88
10 changed files with 0 additions and 2561 deletions

View File

@@ -1,65 +0,0 @@
package nats
import (
"context"
"github.com/gogf/gf/v2/errors/gerror"
)
// NatsMessageConfig nats Stream 消息配置
type NatsMessageConfig struct {
CreateTaskStreamName string
CreateTaskSubjects []string
PublishSubject string
CreateTaskConsumerName string
MsgCount int
HandleFunc func(ctx context.Context, message map[string]interface{}) error
}
// MessageConfig 消息配置接口
type MessageConfig interface {
createTaskStream(ctx context.Context) error
publish(ctx context.Context, data interface{}) error
createTaskConsumer(ctx context.Context) error
//startConsumer(ctx context.Context, handleFunc func(ctx context.Context, msg *nats.Msg) error) error
}
func (n *NatsMessageConfig) createTaskStream(ctx context.Context) error {
return createTaskStreamSimple(ctx, n.CreateTaskStreamName, n.CreateTaskSubjects)
}
// CreateTaskStreamBatch 批量创建任务消息队列流
func CreateTaskStreamBatch(ctx context.Context, configs ...MessageConfig) error {
for _, cfg := range configs {
if err := cfg.createTaskStream(ctx); err != nil {
return gerror.Wrap(err, "创建任务消息队列流失败")
}
}
return nil
}
func (n *NatsMessageConfig) publish(ctx context.Context, data interface{}) error {
return publish(ctx, n.PublishSubject, data)
}
// PublishMessage 发布消息(统一入口)
func PublishMessage(ctx context.Context, cfg MessageConfig, data interface{}) (err error) {
return cfg.publish(ctx, data)
}
func (n *NatsMessageConfig) createTaskConsumer(ctx context.Context) error {
return CreateConsumerPushMode(ctx, n.CreateTaskStreamName, n.CreateTaskConsumerName, n.PublishSubject, n.MsgCount)
}
// CreateTaskConsumerBatch 批量创建任务消息队列消费者
func CreateTaskConsumerBatch(ctx context.Context, configs ...MessageConfig) error {
for _, cfg := range configs {
if err := cfg.createTaskConsumer(ctx); err != nil {
return gerror.Wrap(err, "创建任务消息队列流失败")
}
}
return nil
}
//func (n *NatsMessageConfig) startConsumer(ctx context.Context, handleFunc func(ctx context.Context, msg *nats.Msg) error) error {
// return ConsumeMessages(ctx, n.CreateTaskStreamName, n.CreateTaskConsumerName, handleFunc)
//}

View File

@@ -1,268 +0,0 @@
package nats
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/gogf/gf/v2/frame/g"
"github.com/nats-io/nats.go/jetstream"
)
// createTaskStream 创建任务消息队列流(内部使用,兼容旧版本)
// 存储策略: 文件存储
// 工作队列模式: 工作队列策略
func CreateTaskStream(ctx context.Context, streamInfo TaskStreamConfig) error {
if !IsConnected() {
return fmt.Errorf("NATS 未连接")
}
stream, err := js.Stream(ctx, streamInfo.StreamName)
if err == nil {
// 流已存在,更新配置
_, err = js.UpdateStream(ctx, jetstream.StreamConfig{
Name: streamInfo.StreamName,
Subjects: streamInfo.Subjects,
Storage: jetstream.FileStorage,
Retention: jetstream.WorkQueuePolicy,
})
if err != nil {
return fmt.Errorf("更新任务流失败: %w", err)
}
g.Log().Infof(ctx, "✅ 任务消息队列流已更新: %s", stream.CachedInfo().Config.Name)
return nil
}
// 创建新流
stream, err = js.CreateStream(ctx, jetstream.StreamConfig{
Name: streamInfo.StreamName,
Subjects: streamInfo.Subjects,
Storage: jetstream.FileStorage,
Retention: jetstream.WorkQueuePolicy,
})
if err != nil {
return fmt.Errorf("创建任务流失败: %w", err)
}
g.Log().Infof(ctx, "✅ 任务消息队列流创建成功: %s (文件存储+工作队列策略)", stream.CachedInfo().Config.Name)
return nil
}
// CreateLogStream 创建日志流
// 存储策略: 内存存储
// 副本数: 单副本 (1)
// 消息留存: 短时留存 (1小时)
func CreateLogStream(ctx context.Context, streamName string, subjects []string) error {
if !IsConnected() {
return fmt.Errorf("NATS 未连接")
}
maxAge := 1 * time.Hour
stream, err := js.Stream(ctx, streamName)
if err == nil {
// 流已存在,更新配置
_, err = js.UpdateStream(ctx, jetstream.StreamConfig{
Name: streamName,
Subjects: subjects,
Storage: jetstream.MemoryStorage,
Replicas: 1,
MaxAge: maxAge,
})
if err != nil {
return fmt.Errorf("更新日志流失败: %w", err)
}
g.Log().Infof(ctx, "✅ 日志流已更新: %s", stream.CachedInfo().Config.Name)
return nil
}
// 创建新流
stream, err = js.CreateStream(ctx, jetstream.StreamConfig{
Name: streamName,
Subjects: subjects,
Storage: jetstream.MemoryStorage,
Replicas: 1,
MaxAge: maxAge,
})
if err != nil {
return fmt.Errorf("创建日志流失败: %w", err)
}
g.Log().Infof(ctx, "✅ 日志流创建成功: %s (内存存储+单副本+短时留存1小时)", stream.CachedInfo().Config.Name)
return nil
}
// CreateTradeStream 创建交易业务流
// 存储策略: 文件存储
// 副本数: 3副本
// 同步刷盘: 启用
func CreateTradeStream(ctx context.Context, streamName string, subjects []string) error {
if !IsConnected() {
return fmt.Errorf("NATS 未连接")
}
stream, err := js.Stream(ctx, streamName)
if err == nil {
// 流已存在,更新配置
_, err = js.UpdateStream(ctx, jetstream.StreamConfig{
Name: streamName,
Subjects: subjects,
Storage: jetstream.FileStorage,
Replicas: 3,
RePublish: nil,
Duplicates: 0,
})
if err != nil {
return fmt.Errorf("更新交易流失败: %w", err)
}
g.Log().Infof(ctx, "✅ 交易业务流已更新: %s", stream.CachedInfo().Config.Name)
return nil
}
// 创建新流
stream, err = js.CreateStream(ctx, jetstream.StreamConfig{
Name: streamName,
Subjects: subjects,
Storage: jetstream.FileStorage,
Replicas: 3,
RePublish: nil,
Duplicates: 0,
})
if err != nil {
return fmt.Errorf("创建交易流失败: %w", err)
}
g.Log().Infof(ctx, "✅ 交易业务流创建成功: %s (文件存储+3副本+同步刷盘)", stream.CachedInfo().Config.Name)
return nil
}
// JsPublish 发布消息到指定主题
func JsPublish(ctx context.Context, subject string, data any) (err error) {
if !IsConnected() {
return fmt.Errorf("NATS 未连接")
}
// 序列化数据
dataBytes, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("序列化数据失败: %w", err)
}
// 发布消息
metrics.PublishCount.Add(1)
_, err = js.Publish(ctx, subject, dataBytes)
if err != nil {
metrics.PublishError.Add(1)
return fmt.Errorf("发布消息失败: %w", err)
}
return
}
// GetStream 获取流信息
func GetStream(ctx context.Context, streamName string) (*jetstream.StreamInfo, error) {
if !IsConnected() {
return nil, fmt.Errorf("NATS 未连接")
}
stream, err := js.Stream(ctx, streamName)
if err != nil {
return nil, fmt.Errorf("获取流失败: %w", err)
}
info, err := stream.Info(ctx)
if err != nil {
return nil, fmt.Errorf("获取流信息失败: %w", err)
}
return info, nil
}
// ListStreams 列出所有流(简化实现)
// 注意:由于 API 限制,此方法可能需要根据实际需求进一步实现
func ListStreams(ctx context.Context) ([]string, error) {
if !IsConnected() {
return nil, fmt.Errorf("NATS 未连接")
}
// TODO: 根据实际 NATS 版本实现完整的流列表功能
return []string{}, nil
}
// DeleteStream 删除流
func DeleteStream(ctx context.Context, streamName string) error {
if !IsConnected() {
return fmt.Errorf("NATS 未连接")
}
if err := js.DeleteStream(ctx, streamName); err != nil {
return fmt.Errorf("删除流失败: %w", err)
}
g.Log().Infof(ctx, "✅ 流已删除: %s", streamName)
return nil
}
// GetConsumer 获取消费者信息
func GetConsumer(ctx context.Context, streamName, consumerName string) (*jetstream.ConsumerInfo, error) {
if !IsConnected() {
return nil, fmt.Errorf("NATS 未连接")
}
consumer, err := js.Consumer(ctx, streamName, consumerName)
if err != nil {
return nil, fmt.Errorf("获取消费者失败: %w", err)
}
info, err := consumer.Info(ctx)
if err != nil {
return nil, fmt.Errorf("获取消费者信息失败: %w", err)
}
return info, nil
}
// ListConsumers 列出指定流的所有消费者(简化实现)
// 注意:由于 API 限制,此方法可能需要根据实际需求进一步实现
func ListConsumers(ctx context.Context, streamName string) ([]string, error) {
if !IsConnected() {
return nil, fmt.Errorf("NATS 未连接")
}
// TODO: 根据实际 NATS 版本实现完整的消费者列表功能
return []string{}, nil
}
// DeleteConsumer 删除消费者
func DeleteConsumer(ctx context.Context, streamName, consumerName string) error {
if !IsConnected() {
return fmt.Errorf("NATS 未连接")
}
if err := js.DeleteConsumer(ctx, streamName, consumerName); err != nil {
return fmt.Errorf("删除消费者失败: %w", err)
}
g.Log().Infof(ctx, "✅ 消费者已删除: %s/%s", streamName, consumerName)
return nil
}
// CreateConsumer 创建消费者
func CreateConsumer(ctx context.Context, streamName, consumerName string, config jetstream.ConsumerConfig) (jetstream.Consumer, error) {
if !IsConnected() {
return nil, fmt.Errorf("NATS 未连接")
}
// 尝试获取现有消费者
consumer, err := js.Consumer(ctx, streamName, consumerName)
if err == nil {
return consumer, nil
}
// 推荐:不存在则创建,存在则更新配置
consumer, err = js.CreateOrUpdateConsumer(ctx, streamName, config)
if err != nil {
return nil, fmt.Errorf("创建消费者失败: %w", err)
}
return consumer, nil
}

View File

@@ -1,313 +0,0 @@
package nats
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/gogf/gf/v2/frame/g"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
var (
nc *nats.Conn
js jetstream.JetStream
inited bool
natsMu sync.RWMutex
natsURL string
healthCtx context.Context
healthCancel context.CancelFunc
connected bool
reconnectChan chan struct{}
// 连接状态变化监听器
connStateListeners []connStateListener
connListenersMu sync.RWMutex
// 监控指标
metrics metricsCounter
)
// Metrics 监控指标
type metricsCounter struct {
PublishCount atomic.Int64
PublishError atomic.Int64
SubscribeCount atomic.Int64
RequestCount atomic.Int64
RequestError atomic.Int64
ConsumeCount atomic.Int64
ConsumeError atomic.Int64
}
// ConnState 连接状态
type connState int
const (
connStateDisconnected connState = iota
connStateConnecting
connStateConnected
connStateReconnecting
connStateClosed
)
// ConnStateListener 连接状态监听器
type connStateListener func(state connState, err error)
// GetMetrics 获取监控指标
func getMetrics() metricsCounter {
return metrics
}
// registerConnStateListener 注册连接状态监听器
func registerConnStateListener(listener connStateListener) {
connListenersMu.Lock()
defer connListenersMu.Unlock()
connStateListeners = append(connStateListeners, listener)
}
// unregisterConnStateListener 取消注册连接状态监听器
func unregisterConnStateListener(listener connStateListener) {
connListenersMu.Lock()
defer connListenersMu.Unlock()
for i, l := range connStateListeners {
if l != nil && &l == &listener {
connStateListeners = append(connStateListeners[:i], connStateListeners[i+1:]...)
break
}
}
}
// notifyConnState 通知所有监听器连接状态变化
func notifyConnState(state connState, err error) {
connListenersMu.RLock()
listeners := make([]connStateListener, len(connStateListeners))
copy(listeners, connStateListeners)
connListenersMu.RUnlock()
for _, listener := range listeners {
if listener != nil {
listener(state, err)
}
}
}
// init 初始化 NATS 连接
func init() {
// 从配置文件读取 NATS 地址
natsURL = g.Cfg().MustGet(context.Background(), "nats.url").String()
if natsURL == "" {
// 默认使用本地地址
natsURL = nats.DefaultURL
}
// 创建健康检查上下文
healthCtx, healthCancel = context.WithCancel(context.Background())
// 创建重连通知通道(增大缓冲区避免丢失通知)
reconnectChan = make(chan struct{}, 10)
// 启动连接
go initConnection()
// 启动健康检查协程
go healthCheck()
}
// initConnection 初始化连接
func initConnection() {
ctx := context.Background()
notifyConnState(connStateConnecting, nil)
if err := connect(ctx); err != nil {
g.Log().Errorf(ctx, "NATS 初始连接失败: %v", err)
notifyConnState(connStateDisconnected, err)
}
}
// connect 建立 NATS 连接
func connect(ctx context.Context) error {
natsMu.Lock()
defer natsMu.Unlock()
if nc != nil && !nc.IsClosed() {
nc.Close()
}
// 连接选项配置
opts := []nats.Option{
nats.Name("goframe-nats-client"),
nats.ReconnectWait(2 * time.Second),
nats.MaxReconnects(-1), // 无限重连
nats.PingInterval(10 * time.Second),
nats.MaxPingsOutstanding(5),
nats.ReconnectHandler(func(nc *nats.Conn) {
g.Log().Infof(ctx, "✅ NATS 重连成功: %s", nc.ConnectedUrl())
connected = true
// 重新创建 JetStream 实例
if newJS, err := jetstream.New(nc); err == nil {
js = newJS
}
// 通知重连成功
notifyConnState(connStateConnected, nil)
// 使用非阻塞发送避免阻塞
select {
case reconnectChan <- struct{}{}:
default:
// 通道已满,丢弃通知
}
}),
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
g.Log().Warningf(ctx, "⚠️ NATS 连接断开: %v, 准备重连...", err)
connected = false
notifyConnState(connStateReconnecting, err)
}),
nats.ClosedHandler(func(nc *nats.Conn) {
g.Log().Infof(ctx, "NATS 连接已关闭: %s", nc.ConnectedUrl())
connected = false
notifyConnState(connStateClosed, nil)
}),
nats.ErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
g.Log().Errorf(ctx, "NATS 错误: %v", err)
}),
}
var err error
nc, err = nats.Connect(natsURL, opts...)
if err != nil {
return fmt.Errorf("NATS 连接失败: %w", err)
}
// 等待连接就绪
if nc.Status() != nats.CONNECTED {
select {
case <-time.After(5 * time.Second):
notifyConnState(connStateDisconnected, fmt.Errorf("连接超时"))
return fmt.Errorf("NATS 连接超时")
case <-nc.StatusChanged(nats.CONNECTED):
}
}
// 创建 JetStream 实例
js, err = jetstream.New(nc)
if err != nil {
return fmt.Errorf("创建 JetStream 失败: %w", err)
}
connected = true
inited = true
g.Log().Infof(ctx, "✅ NATS 连接成功: %s", nc.ConnectedUrl())
notifyConnState(connStateConnected, nil)
return nil
}
// healthCheck 健康检查协程(仅作为备用检查)
func healthCheck() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-healthCtx.Done():
return
case <-ticker.C:
natsMu.RLock()
currentConnected := connected
currentConn := nc
natsMu.RUnlock()
if !currentConnected || currentConn == nil || currentConn.IsClosed() {
// 仅记录日志不尝试重连NATS 已有自动重连机制)
g.Log().Warning(context.Background(), "NATS 连接断开,等待 NATS 自动重连...")
}
case <-reconnectChan:
// 重连成功的通知(仅记录日志)
g.Log().Info(context.Background(), "收到重连成功通知")
}
}
}
// checkConnected 检查连接状态
func checkConnected() bool {
natsMu.RLock()
defer natsMu.RUnlock()
return connected && nc != nil && !nc.IsClosed()
}
// getConnState 获取当前连接状态
func getConnState() connState {
natsMu.RLock()
defer natsMu.RUnlock()
if nc == nil {
return connStateDisconnected
}
if nc.IsClosed() {
return connStateClosed
}
if connected {
return connStateConnected
}
return connStateDisconnected
}
// shutdown 优雅关闭:自动注销所有已注册的服务并关闭 NATS 连接
func shutdown() error {
ctx := context.Background()
g.Log().Info(ctx, "开始优雅关闭 NATS RPC 服务...")
// 注销所有单实例服务
rpcServicesMu.Lock()
singleServiceCount := len(rpcServices)
for serviceName := range rpcServices {
if sub, exists := rpcSubs[serviceName]; exists {
if err := sub.Unsubscribe(); err != nil {
g.Log().Errorf(ctx, "注销服务 %s 失败: %v", serviceName, err)
}
}
delete(rpcSubs, serviceName)
delete(rpcServices, serviceName)
}
rpcServicesMu.Unlock()
// 注销所有队列服务
queueRPCMu.Lock()
queueServiceCount := 0
for queueName, servicesMap := range queueRPCServices {
queueServiceCount += len(servicesMap)
for serviceName, sub := range queueRPCSubs[queueName] {
if err := sub.Unsubscribe(); err != nil {
g.Log().Errorf(ctx, "注销队列服务 %s (队列: %s) 失败: %v", serviceName, queueName, err)
}
}
delete(queueRPCSubs, queueName)
delete(queueRPCServices, queueName)
}
queueRPCMu.Unlock()
g.Log().Infof(ctx, "已注销 %d 个单实例服务和 %d 个队列服务", singleServiceCount, queueServiceCount)
natsMu.Lock()
defer natsMu.Unlock()
// 停止健康检查协程
if healthCancel != nil {
healthCancel()
}
// 关闭连接
if nc != nil && !nc.IsClosed() {
nc.Close()
connected = false
inited = false
}
g.Log().Info(ctx, "NATS RPC 服务已优雅关闭")
return nil
}

View File

@@ -1,294 +0,0 @@
package nats
import (
"context"
"fmt"
"github.com/gogf/gf/v2/frame/g"
"github.com/nats-io/nats.go/jetstream"
)
// AckPolicy 确认策略
type AckPolicy string
const (
AckPolicyExplicit AckPolicy = "explicit" // 显式确认(默认)
AckPolicyAll AckPolicy = "all" // 确认所有消息
AckPolicyNone AckPolicy = "none" // 不需要确认
)
// DeliverPolicy 投递策略
type DeliverPolicy string
const (
DeliverPolicyAll DeliverPolicy = "all" // 投递所有消息(包括已投递的)
DeliverPolicyLast DeliverPolicy = "last" // 从最后一条消息开始
DeliverPolicyNew DeliverPolicy = "new" // 仅投递新消息(默认)
DeliverPolicyLastPerSubj DeliverPolicy = "lastpersubj" // 每个主题的最后一条
DeliverPolicyByStartSeq DeliverPolicy = "by_start_sequence" // 按起始序列号
)
// ReplayPolicy 重放策略
type ReplayPolicy string
const (
ReplayPolicyInstant ReplayPolicy = "instant" // 立即重放
ReplayPolicyOriginal ReplayPolicy = "original" // 按原始顺序重放
)
// ConsumerConfig 消费者配置
type ConsumerConfig struct {
DurableName string // 持久化名称(空表示临时消费者)
Description string // 描述信息
AckPolicy AckPolicy // 确认策略
AckWait int // 确认等待时间(秒)
MaxDeliver int // 最大投递次数
FilterSubject string // 过滤主题(流内多主题时使用)
DeliverPolicy DeliverPolicy // 投递策略
ReplayPolicy ReplayPolicy // 重放策略
MaxWaiting int // 最大等待消息数
MaxAckPending int // 最大待确认消息数
OptStartTime int64 // 起始时间戳
OptStartSeq uint64 // 起始序列号
HeadersOnly bool // 仅消费消息头
Backoff []int // 退避策略(秒数数组)
RateLimit uint64 // 消息速率限制(消息/秒)
Replica int // 副本数
FlowControl bool // 启用流控
Metadata map[string]string // 元数据
}
// parseAckPolicy 解析确认策略
func parseAckPolicy(policy AckPolicy) jetstream.AckPolicy {
switch policy {
case AckPolicyAll:
return jetstream.AckAllPolicy
case AckPolicyNone:
return jetstream.AckNonePolicy
default:
return jetstream.AckExplicitPolicy
}
}
// parseDeliverPolicy 解析投递策略
func parseDeliverPolicy(policy DeliverPolicy) jetstream.DeliverPolicy {
switch policy {
case DeliverPolicyAll:
return jetstream.DeliverAllPolicy
case DeliverPolicyLast:
return jetstream.DeliverLastPolicy
case DeliverPolicyLastPerSubj:
return jetstream.DeliverLastPerSubjectPolicy
case DeliverPolicyByStartSeq:
return jetstream.DeliverByStartSequencePolicy
default:
return jetstream.DeliverNewPolicy
}
}
// parseReplayPolicy 解析重放策略
func parseReplayPolicy(policy ReplayPolicy) jetstream.ReplayPolicy {
switch policy {
case ReplayPolicyOriginal:
return jetstream.ReplayOriginalPolicy
default:
return jetstream.ReplayInstantPolicy
}
}
// CreateTaskConsumer 创建任务消费者
// 核心设计思路:
// 1. 显式确认:确保消息被正确处理后才确认
// 2. 重试机制:通过 MaxDeliver 控制最大重试次数
// 3. 持久化DurableName 确保消费者状态持久化
// 4. 流控:防止消费者过载
func CreateTaskConsumer(ctx context.Context, streamName string, config ConsumerConfig) (jetstream.Consumer, error) {
if !IsConnected() {
return nil, fmt.Errorf("NATS 未连接")
}
if streamName == "" {
return nil, fmt.Errorf("流名称不能为空")
}
// 设置默认值
if config.AckPolicy == "" {
config.AckPolicy = AckPolicyExplicit // 默认显式确认
}
if config.AckWait == 0 {
config.AckWait = 30 // 默认30秒确认超时
}
if config.MaxDeliver == 0 {
config.MaxDeliver = 3 // 默认最多投递3次
}
if config.DeliverPolicy == "" {
config.DeliverPolicy = DeliverPolicyNew // 默认仅消费新消息
}
if config.ReplayPolicy == "" {
config.ReplayPolicy = ReplayPolicyInstant // 默认立即重放
}
if config.MaxAckPending == 0 {
config.MaxAckPending = 1000 // 默认最多1000条待确认消息
}
// 构建消费者配置
jsConfig := jetstream.ConsumerConfig{
Name: config.DurableName,
Description: config.Description,
AckPolicy: parseAckPolicy(config.AckPolicy),
AckWait: 0,
MaxDeliver: config.MaxDeliver,
FilterSubjects: []string{config.FilterSubject},
DeliverPolicy: parseDeliverPolicy(config.DeliverPolicy),
ReplayPolicy: parseReplayPolicy(config.ReplayPolicy),
MaxWaiting: config.MaxWaiting,
MaxAckPending: config.MaxAckPending,
HeadersOnly: config.HeadersOnly,
RateLimit: config.RateLimit,
Replicas: config.Replica,
Metadata: config.Metadata,
}
// 配置流控和心跳
if config.FlowControl {
jsConfig.FlowControl = true
}
// 配置起始位置
if config.OptStartSeq > 0 {
jsConfig.OptStartSeq = config.OptStartSeq
}
// 创建新消费者
consumer, err := js.CreateOrUpdateConsumer(ctx, streamName, jsConfig)
if err != nil {
return nil, fmt.Errorf("创建消费者失败: %w", err)
}
// 记录配置信息
configInfo := fmt.Sprintf("确认策略=%s, 最大投递=%d, 投递策略=%s", config.AckPolicy, config.MaxDeliver, config.DeliverPolicy)
if config.FilterSubject != "" {
configInfo += fmt.Sprintf(", 过滤主题=%s", config.FilterSubject)
}
g.Log().Infof(ctx, "✅ 任务消费者创建成功: %s/%s (%s)", streamName, config.DurableName, configInfo)
return consumer, nil
}
// CreateConsumerSimple 简化版创建消费者(适用于大多数场景)
// 只需提供流名称和消费者名称,其他使用默认配置
func CreateConsumerSimple(ctx context.Context, streamName, durableName string) (err error) {
_, err = CreateTaskConsumer(ctx, streamName, ConsumerConfig{
DurableName: durableName,
})
return
}
// CreateConsumerWithFilter 创建带主题过滤的消费者
//func CreateConsumerWithFilter(ctx context.Context, streamName, durableName, filterSubject string) (jetstream.Consumer, error) {
// return CreateTaskConsumer(ctx, streamName, ConsumerConfig{
// DurableName: durableName,
// FilterSubject: filterSubject,
// })
//}
// CreateConsumerEphemeral 创建临时消费者
// 临时消费者没有持久化名称,连接断开后自动删除
//func CreateConsumerEphemeral(ctx context.Context, streamName string) (jetstream.Consumer, error) {
// if !IsConnected() {
// return nil, fmt.Errorf("NATS 未连接")
// }
//
// jsConfig := jetstream.ConsumerConfig{
// AckPolicy: jetstream.AckNonePolicy,
// AckWait: 0,
// MaxDeliver: 3,
// DeliverPolicy: jetstream.DeliverNewPolicy,
// ReplayPolicy: jetstream.ReplayInstantPolicy,
// MaxAckPending: 1000,
// }
//
// consumer, err := js.CreateConsumer(ctx, streamName, jsConfig)
// if err != nil {
// return nil, fmt.Errorf("创建临时消费者失败: %w", err)
// }
//
// g.Log().Infof(ctx, "✅ 临时消费者创建成功: %s", streamName)
// return consumer, nil
//}
// CreateConsumerPushMode 创建推送模式消费者
// 推送模式下NATS 服务器主动将消息推送给消费者
func CreateConsumerPushMode(ctx context.Context, streamName, durableName, subject string, msgCount int) (err error) {
_, err = CreateTaskConsumer(ctx, streamName, ConsumerConfig{
DurableName: durableName,
FilterSubject: subject,
MaxAckPending: msgCount,
})
return
}
// CreateConsumerPullMode 创建拉取模式消费者
// 拉取模式下,消费者主动从服务器拉取消息
//func CreateConsumerPullMode(ctx context.Context, streamName, durableName string) (jetstream.Consumer, error) {
// return CreateTaskConsumer(ctx, streamName, ConsumerConfig{
// DurableName: durableName,
// DeliverPolicy: DeliverPolicyAll,
// MaxAckPending: 500, // 拉取模式下待确认消息数可以设置小一些
// })
//}
// ConsumeMessages 消费消息(推送模式)
func ConsumeMessages(ctx context.Context, streamName, consumerName string, handler jetstream.MessageHandler) error {
if !IsConnected() {
return fmt.Errorf("NATS 未连接")
}
// 获取消费者
consumer, err := js.Consumer(ctx, streamName, consumerName)
if err != nil {
return fmt.Errorf("获取消费者失败: %w", err)
}
// 业务处理
//if err := handler(ctx, streamMsg.Values); err != nil {
// glog.Infof(ctx, "业务处理失败-> err:%v\n", err)
// continue
//}
//// 确认消息
//if msg.AutoAck {
// err := ackMessage(ctx, msg.StreamKey, msg.GroupName, streamMsg.ID)
// if err != nil {
// glog.Infof(ctx, "消费者 '%s' 确认消息 ID %s 失败: %v\n", msg.ConsumerName, streamMsg.ID, err)
// }
//}
//// 创建消息处理函数
//handler = func(msg jetstream.Msg) {
// // 解析消息
// var task TaskMessage
// if err := json.Unmarshal(msg.Data(), &task); err != nil {
// g.Log().Errorf(ctx, "解析消息失败: %v", err)
// msg.Nak() // 拒绝消息,触发重试
// return
// }
//
// // 处理业务逻辑
// g.Log().Infof(ctx, "处理任务: %s", task.TaskID)
//
// // 处理成功,确认消息
// msg.Ack()
//}
// 开始消费
_, err = consumer.Consume(handler)
if err != nil {
return fmt.Errorf("开始消费失败: %w", err)
}
g.Log().Infof(ctx, "✅ 开始消费消息: %s/%s", streamName, consumerName)
return nil
}
// 定义消息结构
type TaskMessage struct {
TaskID string `json:"task_id"`
TaskType string `json:"task_type"`
Data string `json:"data"`
}

View File

@@ -1,28 +0,0 @@
package nats
import (
"context"
"encoding/json"
"fmt"
)
// publish 发布消息到指定主题
func publish(ctx context.Context, subject string, data any) (err error) {
if !IsConnected() {
return fmt.Errorf("NATS 未连接")
}
// 序列化数据
dataBytes, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("序列化数据失败: %w", err)
}
// 发布消息
metrics.PublishCount.Add(1)
_, err = js.Publish(ctx, subject, dataBytes)
if err != nil {
metrics.PublishError.Add(1)
return fmt.Errorf("发布消息失败: %w", err)
}
return
}

View File

@@ -1,752 +0,0 @@
package nats
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/gogf/gf/v2/frame/g"
"github.com/nats-io/nats.go"
"go.opentelemetry.io/otel/trace"
"reflect"
"sync"
)
// ============ RPC 服务封装 ============
// 以下方法提供了完全抽象的 RPC 调用接口
// 调用方和响应方完全不需要知道底层使用的是 NATS 的发布订阅模式
// RPC 服务注册表
var (
rpcServices map[string]rpcHandler
rpcSubs map[string]*nats.Subscription // 服务名 -> 订阅
rpcServicesMu sync.RWMutex
queueRPCServices map[string]map[string]rpcHandler // queueName -> subject -> handler
queueRPCSubs map[string]map[string]*nats.Subscription // queueName -> serviceName -> 订阅
queueRPCMu sync.RWMutex
// ============ TraceID 主动取消支持 ============
// 全局映射表TraceID -> CancelFunc并发安全
traceCancelMap map[string]context.CancelFunc
traceCancelMu sync.RWMutex
// 取消主题前缀
cancelSubjectPrefix = "ctx.cancel.otel."
)
// rpcHandler RPC 处理函数类型
// 实现方只需要关注请求参数和返回值,无需了解底层 NATS 实现
// 返回值可以是任意类型,会被自动序列化为 JSON
type rpcHandler func(ctx context.Context, req []byte) (any, error)
// RegisterRPCService 注册 RPC 服务(单实例)
// serviceName: 服务名称,调用方通过此名称调用服务
// handler: 服务处理函数,接收请求并返回响应
func registerRPCService(serviceName string, handler rpcHandler) (err error) {
if !checkConnected() {
return fmt.Errorf("NATS 未连接")
}
rpcServicesMu.Lock()
if rpcServices == nil {
rpcServices = make(map[string]rpcHandler)
}
if rpcSubs == nil {
rpcSubs = make(map[string]*nats.Subscription)
}
// 如果已存在该服务,先取消之前的订阅
if oldSub, exists := rpcSubs[serviceName]; exists {
oldSub.Unsubscribe()
}
rpcServices[serviceName] = handler
rpcServicesMu.Unlock()
// 订阅服务主题
subject := fmt.Sprintf("rpc.%s", serviceName)
sub, err := nc.Subscribe(subject, func(msg *nats.Msg) {
// 执行处理函数
executeHandler(handler, msg)
})
if err != nil {
return fmt.Errorf("注册 RPC 服务失败: %w", err)
}
rpcSubs[serviceName] = sub
metrics.SubscribeCount.Add(1)
g.Log().Infof(context.Background(), "✅ RPC 服务已注册: %s", serviceName)
return nil
}
// RegisterQueueRPCService 注册 RPC 服务(集群模式)
// 多个服务实例注册同一服务时,请求会自动负载均衡
// serviceName: 服务名称
// queueName: 队列组名,同一队列组的实例共享请求
// handler: 服务处理函数
func registerQueueRPCService(serviceName, queueName string, handler rpcHandler) (err error) {
if !checkConnected() {
return fmt.Errorf("NATS 未连接")
}
queueRPCMu.Lock()
if queueRPCServices == nil {
queueRPCServices = make(map[string]map[string]rpcHandler)
}
if queueRPCSubs == nil {
queueRPCSubs = make(map[string]map[string]*nats.Subscription)
}
if queueRPCServices[queueName] == nil {
queueRPCServices[queueName] = make(map[string]rpcHandler)
}
if queueRPCSubs[queueName] == nil {
queueRPCSubs[queueName] = make(map[string]*nats.Subscription)
}
// 如果已存在该服务,先取消之前的订阅
if oldSub, exists := queueRPCSubs[queueName][serviceName]; exists {
oldSub.Unsubscribe()
}
queueRPCServices[queueName][serviceName] = handler
queueRPCMu.Unlock()
// 订阅服务主题(队列模式)
subject := fmt.Sprintf("rpc.%s", serviceName)
sub, err := nc.QueueSubscribe(subject, queueName, func(msg *nats.Msg) {
// 执行处理函数
executeHandler(handler, msg)
})
if err != nil {
return fmt.Errorf("注册队列 RPC 服务失败: %w", err)
}
queueRPCMu.Lock()
queueRPCSubs[queueName][serviceName] = sub
queueRPCMu.Unlock()
metrics.SubscribeCount.Add(1)
g.Log().Infof(context.Background(), "✅ 队列 RPC 服务已注册: %s (队列组: %s)", serviceName, queueName)
return nil
}
// executeHandler 执行 RPC 处理函数
func executeHandler(handler rpcHandler, msg *nats.Msg) {
// 响应
var respData []byte
// 从消息头重建上下文
ctx := headersToContext(context.Background(), msg.Header)
// 提取 TraceID创建可取消的 context
ctx = createCancelContext(ctx, msg.Header.Get(TraceIDKey))
// 检查 context 是否已取消(在调用 handler 之前)
select {
case <-ctx.Done():
// context 已取消,返回取消错误
g.Log().Infof(ctx, "RPC 请求已取消traceID: %s", msg.Header.Get(TraceIDKey))
// 仍然需要发送响应以避免客户端超时
respData = []byte(`{"_err":"请求已取消"}`)
// 清理取消映射表
cleanupTraceCancel(msg.Header.Get(TraceIDKey))
return
default:
}
// 执行业务处理
response, err := handler(ctx, msg.Data)
if err != nil {
// 错误时返回 {"_err": "错误信息"}
if respData, err = json.Marshal(map[string]any{"_err": err.Error()}); err != nil {
g.Log().Errorf(ctx, "RPC 错误响应序列化失败: %v", err)
respData = []byte(`{"_err":"错误响应序列化失败"}`)
}
} else if response == nil {
// 空响应时返回空对象(或 {"_err": ""}
respData = []byte(`{}`)
} else {
// 成功时返回业务数据
if respData, err = json.Marshal(response); err != nil {
g.Log().Errorf(ctx, "RPC 响应序列化失败: %v", err)
respData = []byte(`{"_err":"响应序列化失败"}`)
}
}
// 发送响应(必须执行) 如果客户端用 nc.Request(...) 发送消息 → 双向模式,服务端必须 msg.Respond
if err = msg.Respond(respData); err != nil {
g.Log().Errorf(ctx, "RPC 响应失败: %v", err)
}
// 请求结束,清理取消映射表
cleanupTraceCancel(msg.Header.Get(TraceIDKey))
}
// createCancelContext 创建可取消的 context 并注册到取消映射表
// 返回可取消的 context如果 traceID 为空则返回原 context
func createCancelContext(ctx context.Context, traceID string) context.Context {
if g.IsEmpty(traceID) {
return ctx
}
// 创建带取消功能的 context
taskCtx, cancel := context.WithCancel(ctx)
// 注册到取消映射表
traceCancelMu.Lock()
if traceCancelMap == nil {
traceCancelMap = make(map[string]context.CancelFunc)
}
// 如果同一 TraceID 已有 CancelFunc先调用它
if oldCancel, exists := traceCancelMap[traceID]; exists {
oldCancel()
}
traceCancelMap[traceID] = cancel
traceCancelMu.Unlock()
return taskCtx
}
// ============ TraceID 主动取消功能 ============
// 以下函数实现了基于 OpenTelemetry TraceID 的跨进程任务取消机制
// SetupCancelListener 设置取消监听器
// 订阅取消主题,监听取消指令
// 使用示例:
//
// sub, err := nats.SetupCancelListener(ctx)
func setupCancelListener(ctx context.Context) (*nats.Subscription, error) {
if !checkConnected() {
return nil, fmt.Errorf("NATS 未连接")
}
if traceCancelMap == nil {
traceCancelMap = make(map[string]context.CancelFunc)
}
// 修复问题3订阅取消主题格式: ctx.cancel.otel.*
// 使用 * 通配符而不是 >,因为 TraceID 是最后一部分
cancelSubject := cancelSubjectPrefix + "*"
sub, err := nc.Subscribe(cancelSubject, func(msg *nats.Msg) {
// 从主题中解析 TraceID (去除前缀)
prefixLen := len(cancelSubjectPrefix)
if len(msg.Subject) <= prefixLen {
g.Log().Warningf(ctx, "取消消息主题格式错误: %s", msg.Subject)
return
}
traceID := msg.Subject[prefixLen:]
if traceID == "" {
g.Log().Warning(ctx, "取消消息主题缺少 TraceID")
return
}
// 从映射表获取 CancelFunc 并执行取消
traceCancelMu.RLock()
cancel, ok := traceCancelMap[traceID]
traceCancelMu.RUnlock()
if ok {
cancel()
g.Log().Infof(ctx, "📢 取消信号已发送traceID: %s", traceID)
} else {
g.Log().Infof(ctx, "⚠️ 未找到对应的可取消任务traceID: %s", traceID)
}
})
if err != nil {
return nil, fmt.Errorf("设置取消监听器失败: %w", err)
}
metrics.SubscribeCount.Add(1)
g.Log().Infof(ctx, "✅ 取消监听器已设置: %s", cancelSubject)
return sub, nil
}
// publishCancel 发布取消指令
// 向指定 TraceID 发送取消信号
// 使用示例:
//
// err := nats.publishCancel(ctx, traceID)
func publishCancel(ctx context.Context, traceID string) error {
if !checkConnected() {
return fmt.Errorf("NATS 未连接")
}
if traceID == "" {
return fmt.Errorf("TraceID 不能为空")
}
cancelSubject := cancelSubjectPrefix + traceID
err := nc.Publish(cancelSubject, nil)
if err != nil {
return fmt.Errorf("发布取消信号失败: %w", err)
}
g.Log().Infof(ctx, "📤 已发送取消信号traceID: %s主题: %s", traceID, cancelSubject)
return nil
}
// cleanupTraceCancel 清理取消映射表中的条目
// 任务取消/正常结束后必须调用此函数,避免内存泄漏
// 使用示例:
//
// defer nats.cleanupTraceCancel(traceID)
func cleanupTraceCancel(traceID string) {
if traceID == "" {
return
}
traceCancelMu.Lock()
defer traceCancelMu.Unlock()
if _, ok := traceCancelMap[traceID]; ok {
delete(traceCancelMap, traceID)
g.Log().Infof(context.Background(), "✅ 已清理取消映射表traceID: %s", traceID)
}
}
// CallRPC 调用 RPC 服务
// serviceName: 服务名称
// req: 请求数据
// 返回: 响应数据(任意类型)和错误
func CallRPC(ctx context.Context, serviceName string, req any, resp any) (err error) {
if !checkConnected() {
return fmt.Errorf("NATS 未连接")
}
metrics.RequestCount.Add(1)
// 验证 resp 必须是指针类型
respValue := reflect.ValueOf(resp)
if respValue.Kind() != reflect.Ptr {
return fmt.Errorf("resp 参数必须是指针类型(当前类型: %T", resp)
}
// 构建请求体
var reqBody []byte
if !g.IsEmpty(req) {
reqValue := reflect.ValueOf(req)
if !(reqValue.Kind() == reflect.Ptr && reqValue.IsNil()) && !reqValue.IsZero() {
reqData, err := json.Marshal(req)
if err != nil {
return fmt.Errorf("序列化请求参数失败: %w", err)
}
reqBody = reqData
}
}
// 检查本地是否有注册的单实例服务,如果有则直接调用(优化性能)
rpcServicesMu.RLock()
if localHandler, exists := rpcServices[serviceName]; exists {
rpcServicesMu.RUnlock()
// 修复问题1本地调用也需要处理取消机制
var traceID string
if traceID, err = getTraceID(ctx); err != nil {
return err
}
// 提取 TraceID创建可取消的 context
cancelCtx := createCancelContext(ctx, traceID)
// 执行本地调用
var response interface{}
if response, err = localHandler(cancelCtx, reqBody); err != nil {
metrics.RequestError.Add(1)
return fmt.Errorf("本地调用 RPC 服务失败 [%s]: %w", serviceName, err)
}
// 请求结束,清理取消映射表
cleanupTraceCancel(traceID)
// 检查是否为错误消息:尝试解析为 map看是否包含 "_err" 字段
var respMap map[string]any
if json.Unmarshal(response.([]byte), &respMap) == nil {
if errMsg, ok := respMap["_err"]; ok {
metrics.RequestError.Add(1)
return fmt.Errorf("%v", errMsg)
}
}
// 正常数据直接返回
// responseMsg.Data 已经是 []byte 类型(来自 msg.Data直接反序列化
if err = json.Unmarshal(response.([]byte), resp); err != nil {
return fmt.Errorf("解析响应失败: %w (响应内容: %s)", err, response)
}
return
}
rpcServicesMu.RUnlock()
subject := fmt.Sprintf("rpc.%s", serviceName)
// 创建消息并将上下文元数据写入消息头
msg := nats.NewMsg(subject)
msg.Data = reqBody
headers, err := contextToHeaders(ctx)
if err != nil {
return fmt.Errorf("上下文转换失败: %w", err)
}
msg.Header = headers
// 修复问题5优化 go 协程避免资源泄漏
// 使用 done channel 来确保 goroutine 能正确退出
done := make(chan struct{})
var closeDoneOnce sync.Once
closeDone := func() {
closeDoneOnce.Do(func() {
close(done)
})
}
if msg.Header.Get(TraceIDKey) != "" {
go func() {
defer closeDone()
select {
case <-ctx.Done():
// context 被取消时,发送取消信号给服务端
if errors.Is(ctx.Err(), context.Canceled) {
if err := publishCancel(context.Background(), msg.Header.Get(TraceIDKey)); err != nil {
g.Log().Errorf(ctx, "发送 RPC 取消信号失败: %v", err)
} else {
g.Log().Infof(ctx, "RPC 调用已取消traceID: %s", msg.Header.Get(TraceIDKey))
}
}
case <-done:
// 请求已完成,无需发送取消信号
return
}
}()
}
// 发送请求
responseMsg, err := nc.RequestMsgWithContext(ctx, msg)
// 关闭 done channel通知 goroutine 退出
closeDone()
if err != nil {
metrics.RequestError.Add(1)
return fmt.Errorf("调用 RPC 服务失败 [%s]: %w", serviceName, err)
}
if responseMsg == nil {
metrics.RequestError.Add(1)
return fmt.Errorf("RPC 响应为空 [%s]", serviceName)
}
// 解析响应
if len(responseMsg.Data) > 0 {
// 检查是否为错误消息:尝试解析为 map看是否包含 "_err" 字段
var respMap map[string]any
if json.Unmarshal(responseMsg.Data, &respMap) == nil {
if errMsg, ok := respMap["_err"]; ok {
metrics.RequestError.Add(1)
return fmt.Errorf("%v", errMsg)
}
}
// 正常数据直接返回
// responseMsg.Data 已经是 []byte 类型(来自 msg.Data直接反序列化
if err = json.Unmarshal(responseMsg.Data, resp); err != nil {
return fmt.Errorf("解析响应失败: %w (响应内容: %s)", err, responseMsg.Data)
}
}
return
}
// RegisterServiceOption 注册选项类型
type RegisterServiceOption func(*registerServiceConfig)
type registerServiceConfig struct {
queueName string // 队列组名(用于集群模式)
excludeMethods []string
}
// WithQueueGroup 设置队列组名(集群模式)
func WithQueueGroup(queueName string) RegisterServiceOption {
return func(cfg *registerServiceConfig) {
cfg.queueName = queueName
}
}
// WithExcludeMethods 排除不需要注册的方法
func WithExcludeMethods(methods ...string) RegisterServiceOption {
return func(cfg *registerServiceConfig) {
cfg.excludeMethods = append(cfg.excludeMethods, methods...)
}
}
// AutoRegisterServices 自动注册多个服务的所有公开方法
// serviceInstances: map[包名]service实例如 map[string]interface{}{"user": userService, "order": orderService}
// options: 注册选项(可选)
// 示例:
//
// AutoRegisterServices(map[string]interface{}{
// "user": userService,
// "order": orderService,
// })
// 或
// AutoRegisterServices(map[string]interface{}{
// "order": orderService,
// }, WithQueueGroup("order-group"))
func AutoRegisterServices(ctx context.Context, serviceInstances map[string]interface{}, options ...RegisterServiceOption) error {
// 先注册 RPC 服务(如果 NATS 不可用则记录警告但不阻塞启动)
if !checkConnected() {
return fmt.Errorf("NATS 未连接RPC 服务未注册")
}
if len(serviceInstances) == 0 {
return fmt.Errorf("service 实例列表不能为空")
}
totalRegistered := 0
// 遍历每个 service 实例
for pkgName, serviceInstance := range serviceInstances {
// 注册服务
err := registerService(serviceInstance, pkgName, options...)
if err != nil {
g.Log().Errorf(ctx, "注册 %s 服务失败: %v", pkgName, err)
continue
}
totalRegistered++
g.Log().Infof(ctx, "✅ %s 服务已自动注册", pkgName)
}
if totalRegistered == 0 {
return fmt.Errorf("未能注册任何服务")
}
// 设置取消监听器(监听基于 TraceID 的取消请求)
if _, err := setupCancelListener(ctx); err != nil {
g.Log().Errorf(ctx, "设置取消监听器失败: %v", err)
} else {
g.Log().Infof(ctx, "✅ 取消监听器已自动设置")
}
g.Log().Infof(ctx, "✅ 共自动注册了 %d 个服务", totalRegistered)
return nil
}
// registerService 注册单个服务的所有公开方法(内部函数)
func registerService(service interface{}, serviceNamePrefix string, options ...RegisterServiceOption) (err error) {
if !checkConnected() {
return fmt.Errorf("NATS 未连接")
}
// 应用选项
cfg := &registerServiceConfig{}
for _, opt := range options {
opt(cfg)
}
// 创建排除方法集合
excludeSet := make(map[string]struct{})
for _, method := range cfg.excludeMethods {
excludeSet[method] = struct{}{}
}
// 获取 service 的类型
serviceType := reflect.TypeOf(service)
// 遍历所有方法
registeredCount := 0
for i := 0; i < serviceType.NumMethod(); i++ {
method := serviceType.Method(i)
// 只注册导出方法(首字母大写)
if !method.IsExported() {
continue
}
// 排除指定的方法
if _, exists := excludeSet[method.Name]; exists {
continue
}
// 检查方法签名:必须是 func(ctx context.Context, request) (response, error)
// 注意method.Type.NumIn() 包含接收者,所以实际参数数量需要减去 1
// 要求:接收者 + context.Context + request总共3个参数
if method.Type.NumIn() != 3 {
g.Log().Warningf(context.Background(), "方法 %s 必须有2个参数context.Context 和请求参数),跳过注册", method.Name)
continue
}
// 第一个参数(接收者之后的第一个参数)必须是 context.Context
// method.Type.In(0) 是接收者method.Type.In(1) 才是第一个参数
if !method.Type.In(1).Implements(reflect.TypeOf((*context.Context)(nil)).Elem()) {
g.Log().Warningf(context.Background(), "方法 %s 的第一个参数必须是 context.Context跳过注册", method.Name)
continue
}
// 第二个参数必须是结构体指针或数组
reqType := method.Type.In(2)
if reqType.Kind() != reflect.Ptr && reqType.Kind() != reflect.Slice && reqType.Kind() != reflect.Array {
g.Log().Warningf(context.Background(), "方法 %s 的第二个参数必须是结构体指针或数组,跳过注册", method.Name)
continue
}
// 返回值必须是 (result, error)即2个返回值
if method.Type.NumOut() != 2 {
g.Log().Warningf(context.Background(), "方法 %s 必须有2个返回值result 和 error跳过注册", method.Name)
continue
}
// 最后一个返回值必须是 error
if !method.Type.Out(1).Implements(reflect.TypeOf((*error)(nil)).Elem()) {
g.Log().Warningf(context.Background(), "方法 %s 的最后一个返回值必须是 error跳过注册", method.Name)
continue
}
// 生成服务名称:前缀.方法名(保持原始方法名)
serviceName := fmt.Sprintf("%s.%s", serviceNamePrefix, method.Name)
// 创建 RPC handler
handler := func(ctx context.Context, req []byte) (any, error) {
// 准备方法调用参数
// args[0] 是接收者, args[1] 是 ctx, args[2] 是请求参数
args := make([]reflect.Value, 3)
args[0] = reflect.ValueOf(service) // 接收者
args[1] = reflect.ValueOf(ctx) // context.Context
// 解析请求参数
if len(req) > 0 {
reqValuePtr := reflect.New(reqType)
// 解析 JSON
if err := json.Unmarshal(req, reqValuePtr.Interface()); err != nil {
// 根据参数类型提供更友好的错误提示
var typeHint string
if reqType.Kind() == reflect.Ptr {
typeHint = fmt.Sprintf("(期望类型: %s", reqType.Elem().Name())
} else { // reflect.Slice 或 reflect.Array
typeHint = fmt.Sprintf("(期望类型: %s请确保客户端传递的是JSON数组格式", reqType.String())
}
return nil, fmt.Errorf("解析请求参数失败%s: %w", typeHint, err)
}
args[2] = reqValuePtr.Elem()
} else {
// 请求为空,创建零值
args[2] = reflect.Zero(method.Type.In(2))
}
// 调用方法
results := method.Func.Call(args)
// 处理返回值
var result any
if len(results) == 1 {
// 只有 error
if !results[0].IsNil() {
err = results[0].Interface().(error)
}
} else if len(results) == 2 {
// (result, error)
result = results[0].Interface()
if !results[1].IsNil() {
err = results[1].Interface().(error)
}
}
if err != nil {
return nil, err
}
return result, nil
}
// 注册 RPC 服务
var err error
if cfg.queueName != "" {
err = registerQueueRPCService(serviceName, cfg.queueName, handler)
} else {
err = registerRPCService(serviceName, handler)
}
if err != nil {
g.Log().Errorf(context.Background(), "注册服务 %s 失败: %v", serviceName, err)
continue
}
registeredCount++
g.Log().Infof(context.Background(), "✅ 已自动注册 RPC 服务: %s -> %s", serviceName, method.Name)
}
if registeredCount == 0 {
g.Log().Warningf(context.Background(), "未注册任何方法,请检查 %v 的方法签名", serviceNamePrefix)
return fmt.Errorf("未找到可注册的方法")
}
g.Log().Infof(context.Background(), "✅ Service %v 共注册了 %d 个 RPC 方法", serviceNamePrefix, registeredCount)
return nil
}
// ============ 上下文元数据工具函数 ============
// 以下函数用于在 context 和 NATS 消息头之间互转元数据
// 定义常见的上下文元数据 key
const (
TraceIDKey = "trace_id"
TokenKey = "token"
)
func getTraceID(ctx context.Context) (traceID string, err error) {
// 提取 traceId首先尝试从 OpenTelemetry Span 中提取,从 context 中提取 TraceID
span := trace.SpanFromContext(ctx)
if span != nil && span.SpanContext().HasTraceID() {
traceID = span.SpanContext().TraceID().String()
} else if tid := ctx.Value(TraceIDKey); tid != nil {
traceID = fmt.Sprintf("%v", tid)
}
if traceID == "" {
return traceID, fmt.Errorf("context 中没有 TraceID")
}
return
}
// contextToHeaders 将 context 中的元数据转换为 NATS 消息头
// 支持提取 user_id、tenant_id、trace_id、token 等常见字段
func contextToHeaders(ctx context.Context) (nats.Header, error) {
headers := make(nats.Header)
// 提取 traceId首先尝试从 OpenTelemetry Span 中提取
if traceID, err := getTraceID(ctx); err != nil {
return headers, err
} else {
headers.Set(TraceIDKey, traceID)
}
// 提取 token优先级context value > HTTP Authorization header
token := ""
if t := ctx.Value(TokenKey); t != nil {
token = fmt.Sprintf("%v", t)
} else if r := g.RequestFromCtx(ctx); r != nil {
// 从 HTTP 请求的 Authorization header 中提取 token
auth := r.GetHeader("Authorization")
if auth != "" {
// 移除 "Bearer " 前缀
if len(auth) > 7 && auth[:7] == "Bearer " {
token = auth[7:]
} else {
token = auth
}
}
}
if token != "" {
headers.Set(TokenKey, token)
}
return headers, nil
}
// headersToContext 从 NATS 消息头重建 context
// 支持还原 user_id、tenant_id、trace_id、token 等字段
func headersToContext(ctx context.Context, headers nats.Header) context.Context {
if headers == nil {
return ctx
}
// 恢复 trace_id
if traceID := headers.Get(TraceIDKey); traceID != "" {
ctx = context.WithValue(ctx, TraceIDKey, traceID)
}
// 恢复 token
if token := headers.Get(TokenKey); token != "" {
ctx = context.WithValue(ctx, TokenKey, token)
}
return ctx
}

View File

@@ -1,212 +0,0 @@
package nats
import (
"context"
"fmt"
"github.com/gogf/gf/v2/frame/g"
"github.com/nats-io/nats.go/jetstream"
"time"
)
// TaskStreamConfig 任务流配置
type TaskStreamConfig struct {
StreamName string // 流名称
Subjects []string // 主题数组(支持任务优先级,如 ["tasks.high","tasks.normal", "tasks.low"]
PublishSubject string // 发布使用的主题(仅用于记录,不影响流配置)
Storage StorageType // 存储类型
Retention RetentionType // 保留策略
MaxAge time.Duration // 最大保留时间
Duplicates time.Duration // 消息去重窗口时间
Replicas int // 副本数
MaxMsgSize int32 // 单条消息最大大小(字节)
MaxBytes int64 // 流最大存储大小(字节)
MaxMsgs int64 // 流中最大消息数
MaxMsgsPerSubject int64 // 每个主题最大消息数
MaxConsumers int // 最大消费者数量
DenyPurge bool // 是否禁止清理流
AllowRollup bool // 是否允许汇总消息
DenyDelete bool // 是否禁止删除
DiscardPerSubject bool // 是否按主题限制(工作队列模式)
Republish *RePublishConfig // 死信队列重新发布配置
}
// RePublishConfig 重新发布配置(用于死信队列)
type RePublishConfig struct {
Source string // 源主题
Destination string // 目标主题
HeadersOnly bool // 仅复制消息头
}
// StorageType 存储类型
type StorageType string
const (
StorageFile StorageType = "file" // 文件存储(持久化)
StorageMemory StorageType = "memory" // 内存存储
)
// RetentionType 保留策略
type RetentionType string
const (
RetentionLimit RetentionType = "limit" // 消息数量限制
RetentionPolicy RetentionType = "interest" // 基于兴趣
RetentionWorkQueue RetentionType = "workqueue" // 工作队列
)
// parseStorageType 解析存储类型
func parseStorageType(st StorageType) jetstream.StorageType {
switch st {
case StorageMemory:
return jetstream.MemoryStorage
default:
return jetstream.FileStorage
}
}
// parseRetentionType 解析保留策略
func parseRetentionType(rt RetentionType) jetstream.RetentionPolicy {
switch rt {
case RetentionLimit:
return jetstream.LimitsPolicy
case RetentionPolicy:
return jetstream.InterestPolicy
default:
return jetstream.WorkQueuePolicy
}
}
// createTaskStreamSimple 简化版创建任务流(适用于大多数场景)
// 只需提供流名称和主题数组,其他使用默认配置
func createTaskStreamSimple(ctx context.Context, streamName string, subjects []string) error {
return createTaskStream(ctx, TaskStreamConfig{
StreamName: streamName,
Subjects: subjects,
})
}
// createTaskStreamWithPriority 创建支持优先级的任务流
func createTaskStreamWithPriority(ctx context.Context, streamPrefix string) error {
subjects := []string{
fmt.Sprintf("%s.high.>", streamPrefix),
fmt.Sprintf("%s.normal.>", streamPrefix),
fmt.Sprintf("%s.low.>", streamPrefix),
}
return createTaskStream(ctx, TaskStreamConfig{
StreamName: streamPrefix,
Subjects: subjects,
})
}
// CreateTaskStream 配置: 文件存储 + 工作队列策略
// CreateTaskStream 创建任务消息队列流JetStream 2.10+
// 核心设计思路:
// 1. 严格持久化:使用文件存储,任务消息不会因为服务器重启而丢失
// 2. 支持任务优先级:通过主题分级实现,如 ["tasks.high", "tasks.low"]
// 3. 死信队列支持:通过 RePublish 配置将失败任务路由到专门的 DLQ 流
// 4. 灵活保留策略根据任务重要性设置不同的保留时长MaxAge
// 5. 工作队列模式确保每个任务只被一个消费者处理DiscardPerSubject
func createTaskStream(ctx context.Context, config TaskStreamConfig) error {
if !IsConnected() {
return fmt.Errorf("NATS 未连接")
}
if g.IsNil(config.StreamName) {
return fmt.Errorf("流名称不能为空")
}
if len(config.Subjects) == 0 {
return fmt.Errorf("主题数组不能为空")
}
// 设置默认值
if config.Storage == "" {
config.Storage = StorageFile // 默认文件存储
}
if config.Retention == "" {
config.Retention = RetentionWorkQueue // 默认工作队列策略
}
if config.MaxAge == 0 {
config.MaxAge = 24 * time.Hour // 默认保留24小时
}
if config.Replicas == 0 {
config.Replicas = 1 // 默认单副本
}
if config.MaxBytes == 0 {
config.MaxBytes = 10 * 1024 * 1024 * 1024 // 默认10GB
}
if config.MaxMsgs == 0 {
config.MaxMsgs = 100000 // 默认10万条消息
}
if config.MaxMsgSize == 0 {
config.MaxMsgSize = 1024 * 1024 // 默认1MB
}
if config.DiscardPerSubject {
config.DenyDelete = true // 工作队列模式下禁止删除
}
// 构建流配置
jsConfig := jetstream.StreamConfig{
Name: config.StreamName,
Subjects: config.Subjects,
Storage: parseStorageType(config.Storage),
Retention: parseRetentionType(config.Retention),
MaxAge: config.MaxAge,
Duplicates: config.Duplicates,
Replicas: config.Replicas,
MaxMsgSize: config.MaxMsgSize,
MaxBytes: config.MaxBytes,
MaxMsgs: config.MaxMsgs,
MaxMsgsPerSubject: config.MaxMsgsPerSubject,
MaxConsumers: config.MaxConsumers,
AllowRollup: config.AllowRollup,
DenyDelete: config.DenyDelete,
DenyPurge: config.DenyPurge,
Discard: jetstream.DiscardOld, // 默认删除旧消息
DiscardNewPerSubject: config.DiscardPerSubject,
}
// 配置死信队列重新发布(如果设置了)
if config.Republish != nil {
jsConfig.RePublish = &jetstream.RePublish{
Source: config.Republish.Source,
Destination: config.Republish.Destination,
HeadersOnly: config.Republish.HeadersOnly,
}
} else {
// 使用固定的死信队列命名规范:{StreamName}.DLQ
dlqSubject := fmt.Sprintf("%s.DLQ", config.StreamName)
// 死信队列配置
jsConfig.RePublish = &jetstream.RePublish{
Source: ">",
Destination: dlqSubject,
HeadersOnly: true,
}
}
// 检查流是否已存在
stream, err := js.Stream(ctx, config.StreamName)
if err == nil {
// 流已存在,更新配置
_, err = js.UpdateStream(ctx, jsConfig)
if err != nil {
return fmt.Errorf("更新任务流失败: %w", err)
}
g.Log().Infof(ctx, "✅ 任务消息队列流已更新: %s", stream.CachedInfo().Config.Name)
return nil
}
// 创建新流
stream, err = js.CreateStream(ctx, jsConfig)
if err != nil {
return fmt.Errorf("创建任务流失败: %w", err)
}
// 记录配置信息
configInfo := fmt.Sprintf("存储=%s, 策略=%s, 副本=%d, 保留=%v", config.Storage, config.Retention, config.Replicas, config.MaxAge)
if config.Republish != nil {
configInfo += fmt.Sprintf(", 死信队列=%s->%s", config.Republish.Source, config.Republish.Destination)
}
g.Log().Infof(ctx, "✅ 任务消息队列流创建成功: %s (%s)", stream.CachedInfo().Config.Name, configInfo)
return nil
}

View File

@@ -1,131 +0,0 @@
package nats
import (
"context"
"fmt"
"testing"
"time"
"github.com/nats-io/nats.go/jetstream"
)
// TestNatsBasicOperations 测试基础操作
func TestNatsBasicOperations(t *testing.T) {
// 测试连接状态
if !IsConnected() {
t.Log("NATS 未连接")
}
// 测试连接状态获取
state := GetConnState()
t.Logf("当前连接状态: %d", state)
}
// TestNatsMetrics 测试监控指标
func TestNatsMetrics(t *testing.T) {
metrics := GetMetrics()
t.Logf("发布计数: %d", metrics.PublishCount.Load())
t.Logf("发布错误: %d", metrics.PublishError.Load())
t.Logf("请求计数: %d", metrics.RequestCount.Load())
}
// TestNatsConnStateListener 测试连接状态监听
func TestNatsConnStateListener(t *testing.T) {
listener := func(state ConnState, err error) {
fmt.Printf("连接状态变化: %d, 错误: %v\n", state, err)
}
RegisterConnStateListener(listener)
defer UnregisterConnStateListener(listener)
time.Sleep(1 * time.Second)
}
// TestNatsStreamOperations 测试流操作
func TestNatsStreamOperations(t *testing.T) {
ctx := context.Background()
// 创建任务流
config := TaskStreamConfig{
StreamName: "test_tasks",
Subjects: []string{"test.task.>"},
//Subject: "test.task.process",
}
err := CreateTaskStream(ctx, config)
if err != nil {
t.Logf("创建任务流失败: %v", err)
}
// 获取流信息
info, err := GetStream(ctx, "test_tasks")
if err != nil {
t.Logf("获取流信息失败: %v", err)
} else {
t.Logf("流信息: %s", info.Config.Name)
}
// 列出所有流
streams, err := ListStreams(ctx)
if err != nil {
t.Logf("列出流失败: %v", err)
} else {
t.Logf("流列表: %v", streams)
}
// 删除流
err = DeleteStream(ctx, "test_tasks")
if err != nil {
t.Logf("删除流失败: %v", err)
}
}
// TestNatsConsumerOperations 测试消费者操作
func TestNatsConsumerOperations(t *testing.T) {
ctx := context.Background()
// 创建测试流
config := TaskStreamConfig{
StreamName: "test_consumer",
Subjects: []string{"test.consumer.>"},
//Subject: "test.consumer.process",
}
err := CreateTaskStream(ctx, config)
if err != nil {
t.Logf("创建流失败: %v", err)
}
// 创建消费者
consumerConfig := jetstream.ConsumerConfig{
Name: "test_consumer",
Durable: "test_consumer",
}
_, err = CreateConsumer(ctx, "test_consumer", "test_consumer", consumerConfig)
if err != nil {
t.Logf("创建消费者失败: %v", err)
}
// 获取消费者信息
info, err := GetConsumer(ctx, "test_consumer", "test_consumer")
if err != nil {
t.Logf("获取消费者信息失败: %v", err)
} else {
t.Logf("消费者信息: %s", info.Name)
}
// 列出消费者
consumers, err := ListConsumers(ctx, "test_consumer")
if err != nil {
t.Logf("列出消费者失败: %v", err)
} else {
t.Logf("消费者列表: %v", consumers)
}
// 删除消费者
err = DeleteConsumer(ctx, "test_consumer", "test_consumer")
if err != nil {
t.Logf("删除消费者失败: %v", err)
}
// 清理流
_ = DeleteStream(ctx, "test_consumer")
}

View File

@@ -1,411 +0,0 @@
package nats
//import (
// "context"
// "fmt"
// "time"
//
// "github.com/gogf/gf/v2/frame/g"
// "github.com/nats-io/nats.go/jetstream"
//)
//// TaskPriority 任务优先级
//type TaskPriority string
//
//const (
// TaskPriorityHigh TaskPriority = "high" // 高优先级任务
// TaskPriorityNormal TaskPriority = "normal" // 普通优先级任务
// TaskPriorityLow TaskPriority = "low" // 低优先级任务
//)
//
//// TaskStreamConfig 任务流配置
//type TaskStreamConfig struct {
// StreamName string // 流名称
// Subjects []string // 主题列表(支持优先级分级,如 tasks.high.>, tasks.normal.>, tasks.low.>
// Subject string // 默认发布主题
// Priority TaskPriority // 任务优先级
// MaxAge time.Duration // 消息保留时长(根据任务重要性设置)
// MaxMsgsPerSub int64 // 每个订阅者最大消息数(防止内存溢出)
// Replicas int // 副本数默认1建议生产环境使用3
// Duplicates time.Duration // 消息去重窗口0表示不启用
//}
//
//// TaskConsumerConfig 任务消费者配置
//type TaskConsumerConfig struct {
// ConsumerName string // 消费者名称
// AckPolicy *jetstream.AckPolicy
// MaxDeliveries int32 // 最大投递次数(用于重试控制)
// AckWait time.Duration // 等待ACK超时时间
// Backoff []time.Duration // 重试退避策略
// FilterSubject string // 过滤主题(可指定特定优先级任务)
// MaxAckPending int // 最大待确认消息数
// MaxWaiting int // 最大等待消息数
// ReplayPolicy *jetstream.ReplayPolicy // 重放策略
//}
// CreateTaskStream 创建任务流(基于 JetStream 2.10+ API
//
// 核心设计思路:
// 1. 严格的持久化使用文件存储FileStorage避免任务丢失
// 2. 任务优先级通过主题分级实现tasks.high/tasks.normal/tasks.low
// 3. 死信队列:配置死信队列处理失败任务
// 4. 保留策略:按任务重要性设置不同的保留时长
// 5. 工作队列策略:确保每条消息只被一个消费者处理
//
// 参数:
// - ctx: 上下文
// - config: 任务流配置
//
// 返回:
// - error: 错误信息
//func CreateTaskStream(ctx context.Context, config TaskStreamConfig) error {
// if !IsConnected() {
// return fmt.Errorf("NATS 未连接")
// }
//
// // 设置默认值
// if config.MaxAge == 0 {
// config.MaxAge = 7 * 24 * time.Hour // 默认保留7天
// }
// if config.MaxMsgsPerSub == 0 {
// config.MaxMsgsPerSub = 100000 // 默认每订阅者最多10万条消息
// }
// if config.Replicas == 0 {
// config.Replicas = 1 // 默认单副本
// }
// if config.Duplicates == 0 {
// config.Duplicates = 2 * time.Minute // 默认2分钟去重窗口
// }
//
// // 验证主题配置
// if len(config.Subjects) == 0 {
// return fmt.Errorf("任务流必须指定至少一个主题")
// }
//
// // 设置死信队列
// // 使用固定的死信队列命名规范:{StreamName}.DLQ
// dlqSubject := fmt.Sprintf("%s.DLQ", config.StreamName)
//
// // 尝试获取现有流
// stream, err := js.Stream(ctx, config.StreamName)
// if err == nil {
// // 流已存在,更新配置以适配任务流的特殊需求
// _, err = js.UpdateStream(ctx, jetstream.StreamConfig{
// Name: config.StreamName,
// Subjects: config.Subjects,
// Storage: jetstream.FileStorage, // 文件存储确保持久化
// Retention: jetstream.WorkQueuePolicy, // 工作队列策略
// MaxAge: config.MaxAge,
// MaxMsgs: config.MaxMsgsPerSub,
// Replicas: config.Replicas,
// Duplicates: config.Duplicates,
// // 死信队列配置
// RePublish: &jetstream.RePublish{
// Source: ">", // 匹配所有主题
// Destination: dlqSubject,
// },
// // 限制流大小(防止磁盘占用过多)
// MaxBytes: 10 * 1024 * 1024 * 1024, // 10GB
// })
// if err != nil {
// return fmt.Errorf("更新任务流失败: %w", err)
// }
// g.Log().Infof(ctx, "✅ 任务流已更新: %s (优先级: %s, 保留: %v)",
// stream.CachedInfo().Config.Name, config.Priority, config.MaxAge)
// return nil
// }
//
// // 创建新任务流
// streamConfig := jetstream.StreamConfig{
// Name: config.StreamName,
// Subjects: config.Subjects,
// Storage: jetstream.FileStorage, // 文件存储确保持久化
// Retention: jetstream.WorkQueuePolicy, // 工作队列策略
// MaxAge: config.MaxAge,
// MaxMsgs: config.MaxMsgsPerSub,
// Replicas: config.Replicas,
// Duplicates: config.Duplicates,
// // 死信队列配置
// RePublish: &jetstream.RePublish{
// Source: ">", // 匹配所有主题
// Destination: dlqSubject,
// },
// // 限制流大小(防止磁盘占用过多)
// MaxBytes: 10 * 1024 * 1024 * 1024, // 10GB
// // 启用流清理
// Discard: jetstream.DiscardOld, // 新消息替换旧消息
// }
//
// stream, err = js.CreateStream(ctx, streamConfig)
// if err != nil {
// return fmt.Errorf("创建任务流失败: %w", err)
// }
//
// // 验证流是否创建成功
// if stream == nil {
// return fmt.Errorf("创建任务流失败:流对象为空")
// }
//
// g.Log().Infof(ctx, "✅ 任务流创建成功: %s (文件存储+工作队列策略+死信队列, 优先级: %s, 保留: %v, 副本: %d)",
// stream.CachedInfo().Config.Name, config.Priority, config.MaxAge, config.Replicas)
//
// // 记录配置信息
// g.Log().Infof(ctx, " - 主题列表: %v", config.Subjects)
// g.Log().Infof(ctx, " - 死信队列: %s", dlqSubject)
// g.Log().Infof(ctx, " - 最大消息数: %d", config.MaxMsgsPerSub)
// g.Log().Infof(ctx, " - 去重窗口: %v", config.Duplicates)
//
// return nil
//}
//
//// CreateOrUpdateTaskConsumer 创建或更新任务消费者(基于 JetStream 2.10+ API
////
//// 核心设计思路:
//// 1. 支持手动确认AckExplicit确保任务处理完成
//// 2. 通过 Nack() 方法实现消息重试,超限后进入死信队列
//// 3. 支持主题过滤,可订阅特定优先级任务
//// 4. 限制待确认消息数,防止消费者过载
//// 5. AckWait 设置消息处理超时时间
////
//// 参数:
//// - ctx: 上下文
//// - streamName: 流名称
//// - consumerConfig: 消费者配置
////
//// 返回:
//// - jetstream.Consumer: 消费者对象
//// - error: 错误信息
//func CreateOrUpdateTaskConsumer(ctx context.Context, streamName string, consumerConfig TaskConsumerConfig) (jetstream.Consumer, error) {
// if !IsConnected() {
// return nil, fmt.Errorf("NATS 未连接")
// }
//
// // 设置默认值
// ackPolicy := jetstream.AckExplicitPolicy
// if consumerConfig.AckPolicy != nil {
// ackPolicy = *consumerConfig.AckPolicy
// }
//
// if consumerConfig.MaxDeliveries == 0 {
// consumerConfig.MaxDeliveries = 10 // 默认最多投递10次
// }
//
// if consumerConfig.AckWait == 0 {
// consumerConfig.AckWait = 30 * time.Second // 默认30秒等待确认
// }
//
// if consumerConfig.MaxAckPending == 0 {
// consumerConfig.MaxAckPending = 1000 // 默认最多1000条待确认消息
// }
//
// if consumerConfig.MaxWaiting == 0 {
// consumerConfig.MaxWaiting = 512 // 默认最多512条等待消息
// }
//
// replayPolicy := jetstream.ReplayInstantPolicy
// if consumerConfig.ReplayPolicy != nil {
// replayPolicy = *consumerConfig.ReplayPolicy
// }
//
// // 构建消费者配置
// config := jetstream.ConsumerConfig{
// Name: consumerConfig.ConsumerName,
// Durable: consumerConfig.ConsumerName, // 持久化消费者
// AckPolicy: ackPolicy,
// AckWait: consumerConfig.AckWait,
// MaxAckPending: consumerConfig.MaxAckPending,
// MaxWaiting: consumerConfig.MaxWaiting,
// ReplayPolicy: replayPolicy,
// FilterSubject: consumerConfig.FilterSubject,
// }
//
// // 使用 CreateOrUpdateConsumer 创建或更新消费者
// consumer, err := js.CreateOrUpdateConsumer(ctx, streamName, config)
// if err != nil {
// return nil, fmt.Errorf("创建任务消费者失败: %w", err)
// }
//
// g.Log().Infof(ctx, "✅ 任务消费者已创建/更新: %s/%s (等待确认: %v)",
// streamName, consumerConfig.ConsumerName, consumerConfig.AckWait)
//
// // 获取消费者信息并记录
// info, err := consumer.Info(ctx)
// if err == nil {
// g.Log().Infof(ctx, " - 过滤主题: %s", info.Config.FilterSubject)
// g.Log().Infof(ctx, " - 最大待确认: %d", info.Config.MaxAckPending)
// g.Log().Infof(ctx, " - ACK策略: %s", info.Config.AckPolicy)
// }
//
// return consumer, nil
//}
//
//// CreateTaskStreamWithPriority 创建带优先级的任务流
////
//// 便捷方法,自动创建支持多优先级的任务流配置
////
//// 参数:
//// - ctx: 上下文
//// - streamPrefix: 流名称前缀(如 "tasks"
//// - priority: 默认优先级
////
//// 返回:
//// - error: 错误信息
//func CreateTaskStreamWithPriority(ctx context.Context, streamPrefix string, priority TaskPriority) error {
// if !IsConnected() {
// return fmt.Errorf("NATS 未连接")
// }
//
// // 构建支持多优先级的主题列表
// subjects := []string{
// fmt.Sprintf("%s.high.>", streamPrefix), // 高优先级任务
// fmt.Sprintf("%s.normal.>", streamPrefix), // 普通优先级任务
// fmt.Sprintf("%s.low.>", streamPrefix), // 低优先级任务
// }
//
// // 根据优先级设置不同的保留时长
// var maxAge time.Duration
// switch priority {
// case TaskPriorityHigh:
// maxAge = 30 * 24 * time.Hour // 高优先级保留30天
// case TaskPriorityNormal:
// maxAge = 7 * 24 * time.Hour // 普通优先级保留7天
// case TaskPriorityLow:
// maxAge = 24 * time.Hour // 低优先级保留1天
// default:
// maxAge = 7 * 24 * time.Hour
// }
//
// config := TaskStreamConfig{
// StreamName: streamPrefix,
// Subjects: subjects,
// Subject: fmt.Sprintf("%s.%s.>", streamPrefix, priority),
// Priority: priority,
// MaxAge: maxAge,
// MaxMsgsPerSub: 100000,
// Replicas: 1,
// Duplicates: 2 * time.Minute,
// }
//
// return CreateTaskStream(ctx, config)
//}
//
//// PublishTask 发布任务到指定流
////
//// 参数:
//// - ctx: 上下文
//// - streamName: 流名称
//// - task: 任务数据会被JSON序列化
////
//// 返回:
//// - error: 错误信息
//func PublishTask(ctx context.Context, streamName string, task interface{}) error {
// if !IsConnected() {
// return fmt.Errorf("NATS 未连接")
// }
//
// // 使用 JsPublish 发布消息
// if err := JsPublish(ctx, streamName, task); err != nil {
// return fmt.Errorf("发布任务失败: %w", err)
// }
//
// return nil
//}
//
//// PublishTaskWithPriority 发布带优先级的任务
////
//// 参数:
//// - ctx: 上下文
//// - streamPrefix: 流名称前缀
//// - priority: 任务优先级
//// - taskType: 任务类型
//// - task: 任务数据会被JSON序列化
////
//// 返回:
//// - error: 错误信息
//func PublishTaskWithPriority(ctx context.Context, streamPrefix string, priority TaskPriority, taskType string, task interface{}) error {
// if !IsConnected() {
// return fmt.Errorf("NATS 未连接")
// }
//
// // 构建主题:{streamPrefix}.{priority}.{taskType}
// subject := fmt.Sprintf("%s.%s.%s", streamPrefix, priority, taskType)
//
// // 使用 JsPublish 发布消息
// if err := JsPublish(ctx, subject, task); err != nil {
// return fmt.Errorf("发布任务失败: %w", err)
// }
//
// g.Log().Debugf(ctx, "任务已发布: %s (优先级: %s, 类型: %s)", subject, priority, taskType)
//
// return nil
//}
//
//// GetTaskStreamInfo 获取任务流信息
////
//// 参数:
//// - ctx: 上下文
//// - streamName: 流名称
////
//// 返回:
//// - *jetstream.StreamInfo: 流信息
//// - error: 错误信息
//func GetTaskStreamInfo(ctx context.Context, streamName string) (*jetstream.StreamInfo, error) {
// if !IsConnected() {
// return nil, fmt.Errorf("NATS 未连接")
// }
//
// return GetStream(ctx, streamName)
//}
//
//// GetTaskConsumerInfo 获取任务消费者信息
////
//// 参数:
//// - ctx: 上下文
//// - streamName: 流名称
//// - consumerName: 消费者名称
////
//// 返回:
//// - *jetstream.ConsumerInfo: 消费者信息
//// - error: 错误信息
//func GetTaskConsumerInfo(ctx context.Context, streamName, consumerName string) (*jetstream.ConsumerInfo, error) {
// if !IsConnected() {
// return nil, fmt.Errorf("NATS 未连接")
// }
//
// return GetConsumer(ctx, streamName, consumerName)
//}
//
//// DeleteTaskStream 删除任务流
////
//// 注意:此操作会删除流及其所有消息,请谨慎使用
////
//// 参数:
//// - ctx: 上下文
//// - streamName: 流名称
////
//// 返回:
//// - error: 错误信息
//func DeleteTaskStream(ctx context.Context, streamName string) error {
// if !IsConnected() {
// return fmt.Errorf("NATS 未连接")
// }
//
// return DeleteStream(ctx, streamName)
//}
//
//// DeleteTaskConsumer 删除任务消费者
////
//// 参数:
//// - ctx: 上下文
//// - streamName: 流名称
//// - consumerName: 消费者名称
////
//// 返回:
//// - error: 错误信息
//func DeleteTaskConsumer(ctx context.Context, streamName, consumerName string) error {
// if !IsConnected() {
// return fmt.Errorf("NATS 未连接")
// }
//
// return DeleteConsumer(ctx, streamName, consumerName)
//}

View File

@@ -1,87 +0,0 @@
package nats
import (
"context"
"fmt"
"github.com/gogf/gf/v2/frame/g"
"github.com/nats-io/nats.go"
"go.opentelemetry.io/otel/trace"
)
// ============ 上下文元数据工具函数 ============
// 以下函数用于在 context 和 NATS 消息头之间互转元数据
// 定义常见的上下文元数据 key
const (
TraceIDKey = "trace_id"
TokenKey = "token"
)
func getTraceID(ctx context.Context) (traceID string, err error) {
// 提取 traceId首先尝试从 OpenTelemetry Span 中提取,从 context 中提取 TraceID
span := trace.SpanFromContext(ctx)
if span != nil && span.SpanContext().HasTraceID() {
traceID = span.SpanContext().TraceID().String()
} else if tid := ctx.Value(TraceIDKey); tid != nil {
traceID = fmt.Sprintf("%v", tid)
}
if traceID == "" {
return traceID, fmt.Errorf("context 中没有 TraceID")
}
return
}
// contextToHeaders 将 context 中的元数据转换为 NATS 消息头
// 支持提取 user_id、tenant_id、trace_id、token 等常见字段
func contextToHeaders(ctx context.Context) (nats.Header, error) {
headers := make(nats.Header)
// 提取 traceId首先尝试从 OpenTelemetry Span 中提取
if traceID, err := getTraceID(ctx); err != nil {
return headers, err
} else {
headers.Set(TraceIDKey, traceID)
}
// 提取 token优先级context value > HTTP Authorization header
token := ""
if t := ctx.Value(TokenKey); t != nil {
token = fmt.Sprintf("%v", t)
} else if r := g.RequestFromCtx(ctx); r != nil {
// 从 HTTP 请求的 Authorization header 中提取 token
auth := r.GetHeader("Authorization")
if auth != "" {
// 移除 "Bearer " 前缀
if len(auth) > 7 && auth[:7] == "Bearer " {
token = auth[7:]
} else {
token = auth
}
}
}
if token != "" {
headers.Set(TokenKey, token)
}
return headers, nil
}
// headersToContext 从 NATS 消息头重建 context
// 支持还原 user_id、tenant_id、trace_id、token 等字段
func headersToContext(ctx context.Context, headers nats.Header) context.Context {
if headers == nil {
return ctx
}
// 恢复 trace_id
if traceID := headers.Get(TraceIDKey); traceID != "" {
ctx = context.WithValue(ctx, TraceIDKey, traceID)
}
// 恢复 token
if token := headers.Get(TokenKey); token != "" {
ctx = context.WithValue(ctx, TokenKey, token)
}
return ctx
}