Files
common/message/nats_client.go

314 lines
7.5 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package message
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/gogf/gf/v2/frame/g"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
var (
nc *nats.Conn
js jetstream.JetStream
inited bool
natsMu sync.RWMutex
natsURL string
healthCtx context.Context
healthCancel context.CancelFunc
connected bool
reconnectChan chan struct{}
// 连接状态变化监听器
connStateListeners []connStateListener
connListenersMu sync.RWMutex
// 监控指标
metrics metricsCounter
)
// Metrics 监控指标
type metricsCounter struct {
PublishCount atomic.Int64
PublishError atomic.Int64
SubscribeCount atomic.Int64
RequestCount atomic.Int64
RequestError atomic.Int64
ConsumeCount atomic.Int64
ConsumeError atomic.Int64
}
// ConnState 连接状态
type connState int
const (
connStateDisconnected connState = iota
connStateConnecting
connStateConnected
connStateReconnecting
connStateClosed
)
// ConnStateListener 连接状态监听器
type connStateListener func(state connState, err error)
// GetMetrics 获取监控指标
func getMetrics() metricsCounter {
return metrics
}
// registerConnStateListener 注册连接状态监听器
func registerConnStateListener(listener connStateListener) {
connListenersMu.Lock()
defer connListenersMu.Unlock()
connStateListeners = append(connStateListeners, listener)
}
// unregisterConnStateListener 取消注册连接状态监听器
func unregisterConnStateListener(listener connStateListener) {
connListenersMu.Lock()
defer connListenersMu.Unlock()
for i, l := range connStateListeners {
if l != nil && &l == &listener {
connStateListeners = append(connStateListeners[:i], connStateListeners[i+1:]...)
break
}
}
}
// notifyConnState 通知所有监听器连接状态变化
func notifyConnState(state connState, err error) {
connListenersMu.RLock()
listeners := make([]connStateListener, len(connStateListeners))
copy(listeners, connStateListeners)
connListenersMu.RUnlock()
for _, listener := range listeners {
if listener != nil {
listener(state, err)
}
}
}
// init 初始化 NATS 连接
func init() {
// 从配置文件读取 NATS 地址
natsURL = g.Cfg().MustGet(context.Background(), "nats.url").String()
if natsURL == "" {
// 默认使用本地地址
natsURL = nats.DefaultURL
}
// 创建健康检查上下文
healthCtx, healthCancel = context.WithCancel(context.Background())
// 创建重连通知通道(增大缓冲区避免丢失通知)
reconnectChan = make(chan struct{}, 10)
// 启动连接
go initConnection()
// 启动健康检查协程
go healthCheck()
}
// initConnection 初始化连接
func initConnection() {
ctx := context.Background()
notifyConnState(connStateConnecting, nil)
if err := connect(ctx); err != nil {
g.Log().Errorf(ctx, "NATS 初始连接失败: %v", err)
notifyConnState(connStateDisconnected, err)
}
}
// connect 建立 NATS 连接
func connect(ctx context.Context) error {
natsMu.Lock()
defer natsMu.Unlock()
if nc != nil && !nc.IsClosed() {
nc.Close()
}
// 连接选项配置
opts := []nats.Option{
nats.Name("goframe-nats-client"),
nats.ReconnectWait(2 * time.Second),
nats.MaxReconnects(-1), // 无限重连
nats.PingInterval(10 * time.Second),
nats.MaxPingsOutstanding(5),
nats.ReconnectHandler(func(nc *nats.Conn) {
g.Log().Infof(ctx, "✅ NATS 重连成功: %s", nc.ConnectedUrl())
connected = true
// 重新创建 JetStream 实例
if newJS, err := jetstream.New(nc); err == nil {
js = newJS
}
// 通知重连成功
notifyConnState(connStateConnected, nil)
// 使用非阻塞发送避免阻塞
select {
case reconnectChan <- struct{}{}:
default:
// 通道已满,丢弃通知
}
}),
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
g.Log().Warningf(ctx, "⚠️ NATS 连接断开: %v, 准备重连...", err)
connected = false
notifyConnState(connStateReconnecting, err)
}),
nats.ClosedHandler(func(nc *nats.Conn) {
g.Log().Infof(ctx, "NATS 连接已关闭: %s", nc.ConnectedUrl())
connected = false
notifyConnState(connStateClosed, nil)
}),
nats.ErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
g.Log().Errorf(ctx, "NATS 错误: %v", err)
}),
}
var err error
nc, err = nats.Connect(natsURL, opts...)
if err != nil {
return fmt.Errorf("NATS 连接失败: %w", err)
}
// 等待连接就绪
if nc.Status() != nats.CONNECTED {
select {
case <-time.After(5 * time.Second):
notifyConnState(connStateDisconnected, fmt.Errorf("连接超时"))
return fmt.Errorf("NATS 连接超时")
case <-nc.StatusChanged(nats.CONNECTED):
}
}
// 创建 JetStream 实例
js, err = jetstream.New(nc)
if err != nil {
return fmt.Errorf("创建 JetStream 失败: %w", err)
}
connected = true
inited = true
g.Log().Infof(ctx, "✅ NATS 连接成功: %s", nc.ConnectedUrl())
notifyConnState(connStateConnected, nil)
return nil
}
// healthCheck 健康检查协程(仅作为备用检查)
func healthCheck() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-healthCtx.Done():
return
case <-ticker.C:
natsMu.RLock()
currentConnected := connected
currentConn := nc
natsMu.RUnlock()
if !currentConnected || currentConn == nil || currentConn.IsClosed() {
// 仅记录日志不尝试重连NATS 已有自动重连机制)
g.Log().Warning(context.Background(), "NATS 连接断开,等待 NATS 自动重连...")
}
case <-reconnectChan:
// 重连成功的通知(仅记录日志)
g.Log().Info(context.Background(), "收到重连成功通知")
}
}
}
// checkConnected 检查连接状态
func checkConnected() bool {
natsMu.RLock()
defer natsMu.RUnlock()
return connected && nc != nil && !nc.IsClosed()
}
// getConnState 获取当前连接状态
func getConnState() connState {
natsMu.RLock()
defer natsMu.RUnlock()
if nc == nil {
return connStateDisconnected
}
if nc.IsClosed() {
return connStateClosed
}
if connected {
return connStateConnected
}
return connStateDisconnected
}
// shutdown 优雅关闭:自动注销所有已注册的服务并关闭 NATS 连接
func shutdown() error {
ctx := context.Background()
g.Log().Info(ctx, "开始优雅关闭 NATS RPC 服务...")
// 注销所有单实例服务
rpcServicesMu.Lock()
singleServiceCount := len(rpcServices)
for serviceName := range rpcServices {
if sub, exists := rpcSubs[serviceName]; exists {
if err := sub.Unsubscribe(); err != nil {
g.Log().Errorf(ctx, "注销服务 %s 失败: %v", serviceName, err)
}
}
delete(rpcSubs, serviceName)
delete(rpcServices, serviceName)
}
rpcServicesMu.Unlock()
// 注销所有队列服务
queueRPCMu.Lock()
queueServiceCount := 0
for queueName, servicesMap := range queueRPCServices {
queueServiceCount += len(servicesMap)
for serviceName, sub := range queueRPCSubs[queueName] {
if err := sub.Unsubscribe(); err != nil {
g.Log().Errorf(ctx, "注销队列服务 %s (队列: %s) 失败: %v", serviceName, queueName, err)
}
}
delete(queueRPCSubs, queueName)
delete(queueRPCServices, queueName)
}
queueRPCMu.Unlock()
g.Log().Infof(ctx, "已注销 %d 个单实例服务和 %d 个队列服务", singleServiceCount, queueServiceCount)
natsMu.Lock()
defer natsMu.Unlock()
// 停止健康检查协程
if healthCancel != nil {
healthCancel()
}
// 关闭连接
if nc != nil && !nc.IsClosed() {
nc.Close()
connected = false
inited = false
}
g.Log().Info(ctx, "NATS RPC 服务已优雅关闭")
return nil
}