diff --git a/mongo/link.go b/mongo/connection.go similarity index 100% rename from mongo/link.go rename to mongo/connection.go diff --git a/nats/connection.go b/nats/connection.go new file mode 100644 index 0000000..eb8078a --- /dev/null +++ b/nats/connection.go @@ -0,0 +1,318 @@ +package nats + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/gogf/gf/v2/frame/g" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +var ( + nc *nats.Conn + js jetstream.JetStream + inited bool + mu sync.RWMutex + natsURL string + healthCtx context.Context + healthCancel context.CancelFunc + connected bool + reconnectChan chan struct{} + + // 连接状态变化监听器 + connStateListeners []ConnStateListener + connListenersMu sync.RWMutex + + // 监控指标 + metrics Metrics +) + +// Metrics 监控指标 +type Metrics struct { + PublishCount atomic.Int64 + PublishError atomic.Int64 + SubscribeCount atomic.Int64 + RequestCount atomic.Int64 + RequestError atomic.Int64 + ConsumeCount atomic.Int64 + ConsumeError atomic.Int64 +} + +// ConnState 连接状态 +type ConnState int + +const ( + ConnStateDisconnected ConnState = iota + ConnStateConnecting + ConnStateConnected + ConnStateReconnecting + ConnStateClosed +) + +// ConnStateListener 连接状态监听器 +type ConnStateListener func(state ConnState, err error) + +// GetMetrics 获取监控指标 +func GetMetrics() Metrics { + return metrics +} + +// RegisterConnStateListener 注册连接状态监听器 +func RegisterConnStateListener(listener ConnStateListener) { + connListenersMu.Lock() + defer connListenersMu.Unlock() + connStateListeners = append(connStateListeners, listener) +} + +// UnregisterConnStateListener 取消注册连接状态监听器 +func UnregisterConnStateListener(listener ConnStateListener) { + connListenersMu.Lock() + defer connListenersMu.Unlock() + for i, l := range connStateListeners { + if l != nil && &l == &listener { + connStateListeners = append(connStateListeners[:i], connStateListeners[i+1:]...) + break + } + } +} + +// notifyConnState 通知所有监听器连接状态变化 +func notifyConnState(state ConnState, err error) { + connListenersMu.RLock() + listeners := make([]ConnStateListener, len(connStateListeners)) + copy(listeners, connStateListeners) + connListenersMu.RUnlock() + + for _, listener := range listeners { + if listener != nil { + listener(state, err) + } + } +} + +// init 初始化 NATS 连接 +func init() { + // 从配置文件读取 NATS 地址 + natsURL = g.Cfg().MustGet(context.Background(), "nats.url").String() + if natsURL == "" { + // 默认使用本地地址 + natsURL = nats.DefaultURL + } + + // 创建健康检查上下文 + healthCtx, healthCancel = context.WithCancel(context.Background()) + + // 创建重连通知通道(增大缓冲区避免丢失通知) + reconnectChan = make(chan struct{}, 10) + + // 启动连接 + go initConnection() + + // 启动健康检查协程 + go healthCheck() +} + +// initConnection 初始化连接 +func initConnection() { + ctx := context.Background() + notifyConnState(ConnStateConnecting, nil) + if err := connect(ctx); err != nil { + g.Log().Errorf(ctx, "NATS 初始连接失败: %v", err) + notifyConnState(ConnStateDisconnected, err) + } +} + +// connect 建立 NATS 连接 +func connect(ctx context.Context) error { + mu.Lock() + defer mu.Unlock() + + if nc != nil && !nc.IsClosed() { + nc.Close() + } + + // 连接选项配置 + opts := []nats.Option{ + nats.Name("goframe-nats-client"), + nats.ReconnectWait(2 * time.Second), + nats.MaxReconnects(-1), // 无限重连 + nats.PingInterval(10 * time.Second), + nats.MaxPingsOutstanding(5), + nats.ReconnectHandler(func(nc *nats.Conn) { + g.Log().Infof(ctx, "✅ NATS 重连成功: %s", nc.ConnectedUrl()) + connected = true + + // 重新创建 JetStream 实例 + if newJS, err := jetstream.New(nc); err == nil { + js = newJS + } + + // 通知重连成功 + notifyConnState(ConnStateConnected, nil) + + // 使用非阻塞发送避免阻塞 + select { + case reconnectChan <- struct{}{}: + default: + // 通道已满,丢弃通知 + } + }), + nats.DisconnectErrHandler(func(nc *nats.Conn, err error) { + g.Log().Warningf(ctx, "⚠️ NATS 连接断开: %v, 准备重连...", err) + connected = false + notifyConnState(ConnStateReconnecting, err) + }), + nats.ClosedHandler(func(nc *nats.Conn) { + g.Log().Infof(ctx, "NATS 连接已关闭: %s", nc.ConnectedUrl()) + connected = false + notifyConnState(ConnStateClosed, nil) + }), + nats.ErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) { + g.Log().Errorf(ctx, "NATS 错误: %v", err) + }), + } + + var err error + nc, err = nats.Connect(natsURL, opts...) + if err != nil { + return fmt.Errorf("NATS 连接失败: %w", err) + } + + // 等待连接就绪 + if nc.Status() != nats.CONNECTED { + select { + case <-time.After(5 * time.Second): + notifyConnState(ConnStateDisconnected, fmt.Errorf("连接超时")) + return fmt.Errorf("NATS 连接超时") + case <-nc.StatusChanged(nats.CONNECTED): + } + } + + // 创建 JetStream 实例 + js, err = jetstream.New(nc) + if err != nil { + return fmt.Errorf("创建 JetStream 失败: %w", err) + } + + connected = true + inited = true + g.Log().Infof(ctx, "✅ NATS 连接成功: %s", nc.ConnectedUrl()) + notifyConnState(ConnStateConnected, nil) + return nil +} + +// healthCheck 健康检查协程(仅作为备用检查) +func healthCheck() { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case <-healthCtx.Done(): + return + case <-ticker.C: + mu.RLock() + currentConnected := connected + currentConn := nc + mu.RUnlock() + + if !currentConnected || currentConn == nil || currentConn.IsClosed() { + // 仅记录日志,不尝试重连(NATS 已有自动重连机制) + g.Log().Warning(context.Background(), "NATS 连接断开,等待 NATS 自动重连...") + } + case <-reconnectChan: + // 重连成功的通知(仅记录日志) + g.Log().Info(context.Background(), "收到重连成功通知") + } + } +} + +// checkConnected 检查连接状态 +func checkConnected() bool { + mu.RLock() + defer mu.RUnlock() + return connected && nc != nil && !nc.IsClosed() +} + +// IsConnected 检查 NATS 是否已连接 +func IsConnected() bool { + return checkConnected() +} + +// GetConnState 获取当前连接状态 +func GetConnState() ConnState { + mu.RLock() + defer mu.RUnlock() + + if nc == nil { + return ConnStateDisconnected + } + + if nc.IsClosed() { + return ConnStateClosed + } + + if connected { + return ConnStateConnected + } + + return ConnStateDisconnected +} + +// Shutdown 优雅关闭:自动注销所有已注册的服务并关闭 NATS 连接 +func Shutdown() error { + ctx := context.Background() + g.Log().Info(ctx, "开始优雅关闭 NATS RPC 服务...") + + // 注销所有单实例服务 + rpcServicesMu.Lock() + singleServiceCount := len(rpcServices) + for serviceName := range rpcServices { + if sub, exists := rpcSubs[serviceName]; exists { + if err := sub.Unsubscribe(); err != nil { + g.Log().Errorf(ctx, "注销服务 %s 失败: %v", serviceName, err) + } + } + delete(rpcSubs, serviceName) + delete(rpcServices, serviceName) + } + rpcServicesMu.Unlock() + + // 注销所有队列服务 + queueRPCMu.Lock() + queueServiceCount := 0 + for queueName, servicesMap := range queueRPCServices { + queueServiceCount += len(servicesMap) + for serviceName, sub := range queueRPCSubs[queueName] { + if err := sub.Unsubscribe(); err != nil { + g.Log().Errorf(ctx, "注销队列服务 %s (队列: %s) 失败: %v", serviceName, queueName, err) + } + } + delete(queueRPCSubs, queueName) + delete(queueRPCServices, queueName) + } + queueRPCMu.Unlock() + + g.Log().Infof(ctx, "已注销 %d 个单实例服务和 %d 个队列服务", singleServiceCount, queueServiceCount) + + mu.Lock() + defer mu.Unlock() + + // 停止健康检查协程 + if healthCancel != nil { + healthCancel() + } + + // 关闭连接 + if nc != nil && !nc.IsClosed() { + nc.Close() + connected = false + inited = false + } + g.Log().Info(ctx, "NATS RPC 服务已优雅关闭") + return nil +} diff --git a/nats/nats.go b/nats/nats.go index f04bab1..bcc325f 100644 --- a/nats/nats.go +++ b/nats/nats.go @@ -6,7 +6,6 @@ import ( "fmt" "reflect" "sync" - "sync/atomic" "time" "github.com/gogf/gf/v2/frame/g" @@ -14,256 +13,19 @@ import ( "github.com/nats-io/nats.go/jetstream" ) +// RPC 服务注册表 var ( - nc *nats.Conn - js jetstream.JetStream - inited bool - mu sync.RWMutex - natsURL string - healthCtx context.Context - healthCancel context.CancelFunc - connected bool - reconnectChan chan struct{} - - // 连接状态变化监听器 - connStateListeners []ConnStateListener - connListenersMu sync.RWMutex - - // 监控指标 - metrics Metrics + rpcServices map[string]RPCHandler + rpcSubs map[string]*nats.Subscription // 服务名 -> 订阅 + rpcServicesMu sync.RWMutex + queueRPCServices map[string]map[string]RPCHandler // queueName -> subject -> handler + queueRPCSubs map[string]map[string]*nats.Subscription // queueName -> serviceName -> 订阅 + queueRPCMu sync.RWMutex ) -// Metrics 监控指标 -type Metrics struct { - PublishCount atomic.Int64 - PublishError atomic.Int64 - SubscribeCount atomic.Int64 - RequestCount atomic.Int64 - RequestError atomic.Int64 - ConsumeCount atomic.Int64 - ConsumeError atomic.Int64 -} - -// ConnState 连接状态 -type ConnState int - -const ( - ConnStateDisconnected ConnState = iota - ConnStateConnecting - ConnStateConnected - ConnStateReconnecting - ConnStateClosed -) - -// ConnStateListener 连接状态监听器 -type ConnStateListener func(state ConnState, err error) - -// GetMetrics 获取监控指标 -func GetMetrics() Metrics { - return metrics -} - -// RegisterConnStateListener 注册连接状态监听器 -func RegisterConnStateListener(listener ConnStateListener) { - connListenersMu.Lock() - defer connListenersMu.Unlock() - connStateListeners = append(connStateListeners, listener) -} - -// UnregisterConnStateListener 取消注册连接状态监听器 -func UnregisterConnStateListener(listener ConnStateListener) { - connListenersMu.Lock() - defer connListenersMu.Unlock() - for i, l := range connStateListeners { - if l != nil && &l == &listener { - connStateListeners = append(connStateListeners[:i], connStateListeners[i+1:]...) - break - } - } -} - -// notifyConnState 通知所有监听器连接状态变化 -func notifyConnState(state ConnState, err error) { - connListenersMu.RLock() - listeners := make([]ConnStateListener, len(connStateListeners)) - copy(listeners, connStateListeners) - connListenersMu.RUnlock() - - for _, listener := range listeners { - if listener != nil { - listener(state, err) - } - } -} - -// init 初始化 NATS 连接 -func init() { - // 从配置文件读取 NATS 地址 - natsURL = g.Cfg().MustGet(context.Background(), "nats.url").String() - if natsURL == "" { - // 默认使用本地地址 - natsURL = nats.DefaultURL - } - - // 创建健康检查上下文 - healthCtx, healthCancel = context.WithCancel(context.Background()) - - // 创建重连通知通道(增大缓冲区避免丢失通知) - reconnectChan = make(chan struct{}, 10) - - // 启动连接 - go initConnection() - - // 启动健康检查协程 - go healthCheck() -} - -// initConnection 初始化连接 -func initConnection() { - ctx := context.Background() - notifyConnState(ConnStateConnecting, nil) - if err := connect(ctx); err != nil { - g.Log().Errorf(ctx, "NATS 初始连接失败: %v", err) - notifyConnState(ConnStateDisconnected, err) - } -} - -// connect 建立 NATS 连接 -func connect(ctx context.Context) error { - mu.Lock() - defer mu.Unlock() - - if nc != nil && !nc.IsClosed() { - nc.Close() - } - - // 连接选项配置 - opts := []nats.Option{ - nats.Name("goframe-nats-client"), - nats.ReconnectWait(2 * time.Second), - nats.MaxReconnects(-1), // 无限重连 - nats.PingInterval(10 * time.Second), - nats.MaxPingsOutstanding(5), - nats.ReconnectHandler(func(nc *nats.Conn) { - g.Log().Infof(ctx, "✅ NATS 重连成功: %s", nc.ConnectedUrl()) - connected = true - - // 重新创建 JetStream 实例 - if newJS, err := jetstream.New(nc); err == nil { - js = newJS - } - - // 通知重连成功 - notifyConnState(ConnStateConnected, nil) - - // 使用非阻塞发送避免阻塞 - select { - case reconnectChan <- struct{}{}: - default: - // 通道已满,丢弃通知 - } - }), - nats.DisconnectErrHandler(func(nc *nats.Conn, err error) { - g.Log().Warningf(ctx, "⚠️ NATS 连接断开: %v, 准备重连...", err) - connected = false - notifyConnState(ConnStateReconnecting, err) - }), - nats.ClosedHandler(func(nc *nats.Conn) { - g.Log().Infof(ctx, "NATS 连接已关闭: %s", nc.ConnectedUrl()) - connected = false - notifyConnState(ConnStateClosed, nil) - }), - nats.ErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) { - g.Log().Errorf(ctx, "NATS 错误: %v", err) - }), - } - - var err error - nc, err = nats.Connect(natsURL, opts...) - if err != nil { - return fmt.Errorf("NATS 连接失败: %w", err) - } - - // 等待连接就绪 - if nc.Status() != nats.CONNECTED { - select { - case <-time.After(5 * time.Second): - notifyConnState(ConnStateDisconnected, fmt.Errorf("连接超时")) - return fmt.Errorf("NATS 连接超时") - case <-nc.StatusChanged(nats.CONNECTED): - } - } - - // 创建 JetStream 实例 - js, err = jetstream.New(nc) - if err != nil { - return fmt.Errorf("创建 JetStream 失败: %w", err) - } - - connected = true - inited = true - g.Log().Infof(ctx, "✅ NATS 连接成功: %s", nc.ConnectedUrl()) - notifyConnState(ConnStateConnected, nil) - return nil -} - -// healthCheck 健康检查协程(仅作为备用检查) -func healthCheck() { - ticker := time.NewTicker(30 * time.Second) - defer ticker.Stop() - - for { - select { - case <-healthCtx.Done(): - return - case <-ticker.C: - mu.RLock() - currentConnected := connected - currentConn := nc - mu.RUnlock() - - if !currentConnected || currentConn == nil || currentConn.IsClosed() { - // 仅记录日志,不尝试重连(NATS 已有自动重连机制) - g.Log().Warning(context.Background(), "NATS 连接断开,等待 NATS 自动重连...") - } - case <-reconnectChan: - // 重连成功的通知(仅记录日志) - g.Log().Info(context.Background(), "收到重连成功通知") - } - } -} - -// checkConnected 检查连接状态 -func checkConnected() bool { - mu.RLock() - defer mu.RUnlock() - return connected && nc != nil && !nc.IsClosed() -} - -// IsConnected 检查 NATS 是否已连接 -func IsConnected() bool { - return checkConnected() -} - -// GetConnState 获取当前连接状态 -func GetConnState() ConnState { - mu.RLock() - defer mu.RUnlock() - - if nc == nil { - return ConnStateDisconnected - } - - if nc.IsClosed() { - return ConnStateClosed - } - - if connected { - return ConnStateConnected - } - - return ConnStateDisconnected -} +// RPCHandler RPC 处理函数类型 +// 实现方只需要关注请求参数和返回值,无需了解底层 NATS 实现 +type RPCHandler func(ctx context.Context, req []byte) ([]byte, error) // CreateTaskStream 创建任务消息队列流 // 存储策略: 文件存储 @@ -521,20 +283,6 @@ func CreateConsumer(ctx context.Context, streamName, consumerName string, config // 以下方法提供了完全抽象的 RPC 调用接口 // 调用方和响应方完全不需要知道底层使用的是 NATS 的发布订阅模式 -// RPC 服务注册表 -var ( - rpcServices map[string]RPCHandler - rpcSubs map[string]*nats.Subscription // 服务名 -> 订阅 - rpcServicesMu sync.RWMutex - queueRPCServices map[string]map[string]RPCHandler // queueName -> subject -> handler - queueRPCSubs map[string]map[string]*nats.Subscription // queueName -> serviceName -> 订阅 - queueRPCMu sync.RWMutex -) - -// RPCHandler RPC 处理函数类型 -// 实现方只需要关注请求参数和返回值,无需了解底层 NATS 实现 -type RPCHandler func(ctx context.Context, req []byte) ([]byte, error) - // RegisterRPCService 注册 RPC 服务(单实例) // serviceName: 服务名称,调用方通过此名称调用服务 // handler: 服务处理函数,接收请求并返回响应 @@ -898,57 +646,3 @@ func AutoRegisterServices(serviceInstances map[string]interface{}, options ...Re g.Log().Infof(context.Background(), "✅ 共自动注册了 %d 个服务", totalRegistered) return nil } - -// Shutdown 优雅关闭:自动注销所有已注册的服务并关闭 NATS 连接 -func Shutdown() error { - ctx := context.Background() - g.Log().Info(ctx, "开始优雅关闭 NATS RPC 服务...") - - // 注销所有单实例服务 - rpcServicesMu.Lock() - singleServiceCount := len(rpcServices) - for serviceName := range rpcServices { - if sub, exists := rpcSubs[serviceName]; exists { - if err := sub.Unsubscribe(); err != nil { - g.Log().Errorf(ctx, "注销服务 %s 失败: %v", serviceName, err) - } - } - delete(rpcSubs, serviceName) - delete(rpcServices, serviceName) - } - rpcServicesMu.Unlock() - - // 注销所有队列服务 - queueRPCMu.Lock() - queueServiceCount := 0 - for queueName, servicesMap := range queueRPCServices { - queueServiceCount += len(servicesMap) - for serviceName, sub := range queueRPCSubs[queueName] { - if err := sub.Unsubscribe(); err != nil { - g.Log().Errorf(ctx, "注销队列服务 %s (队列: %s) 失败: %v", serviceName, queueName, err) - } - } - delete(queueRPCSubs, queueName) - delete(queueRPCServices, queueName) - } - queueRPCMu.Unlock() - - g.Log().Infof(ctx, "已注销 %d 个单实例服务和 %d 个队列服务", singleServiceCount, queueServiceCount) - - mu.Lock() - defer mu.Unlock() - - // 停止健康检查协程 - if healthCancel != nil { - healthCancel() - } - - // 关闭连接 - if nc != nil && !nc.IsClosed() { - nc.Close() - connected = false - inited = false - } - g.Log().Info(ctx, "NATS RPC 服务已优雅关闭") - return nil -} diff --git a/nats/nats_test.go b/nats/nats_test.go index e5a16ec..7cb5851 100644 --- a/nats/nats_test.go +++ b/nats/nats_test.go @@ -131,7 +131,7 @@ func TestNatsPublishRequest(t *testing.T) { } // RPC 请求 - response, err := Request(ctx, "test.request", []byte("request"), 5*time.Second) + response, err := CallRPC(ctx, "test.request", []byte("request"), 5*time.Second) if err != nil { t.Logf("RPC 请求失败: %v", err) } else {