Files
common/nats/nats.go

632 lines
16 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package nats
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/gogf/gf/v2/frame/g"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
var (
nc *nats.Conn
js jetstream.JetStream
inited bool
mu sync.RWMutex
natsURL string
healthCtx context.Context
healthCancel context.CancelFunc
connected bool
reconnectChan chan struct{}
// 连接状态变化监听器
connStateListeners []ConnStateListener
connListenersMu sync.RWMutex
// 监控指标
metrics Metrics
)
// Metrics 监控指标
type Metrics struct {
PublishCount atomic.Int64
PublishError atomic.Int64
SubscribeCount atomic.Int64
RequestCount atomic.Int64
RequestError atomic.Int64
ConsumeCount atomic.Int64
ConsumeError atomic.Int64
}
// ConnState 连接状态
type ConnState int
const (
ConnStateDisconnected ConnState = iota
ConnStateConnecting
ConnStateConnected
ConnStateReconnecting
ConnStateClosed
)
// ConnStateListener 连接状态监听器
type ConnStateListener func(state ConnState, err error)
// GetMetrics 获取监控指标
func GetMetrics() Metrics {
return metrics
}
// RegisterConnStateListener 注册连接状态监听器
func RegisterConnStateListener(listener ConnStateListener) {
connListenersMu.Lock()
defer connListenersMu.Unlock()
connStateListeners = append(connStateListeners, listener)
}
// UnregisterConnStateListener 取消注册连接状态监听器
func UnregisterConnStateListener(listener ConnStateListener) {
connListenersMu.Lock()
defer connListenersMu.Unlock()
for i, l := range connStateListeners {
if l != nil && &l == &listener {
connStateListeners = append(connStateListeners[:i], connStateListeners[i+1:]...)
break
}
}
}
// notifyConnState 通知所有监听器连接状态变化
func notifyConnState(state ConnState, err error) {
connListenersMu.RLock()
listeners := make([]ConnStateListener, len(connStateListeners))
copy(listeners, connStateListeners)
connListenersMu.RUnlock()
for _, listener := range listeners {
if listener != nil {
listener(state, err)
}
}
}
// init 初始化 NATS 连接
func init() {
// 从配置文件读取 NATS 地址
natsURL = g.Cfg().MustGet(context.Background(), "nats.url").String()
if natsURL == "" {
// 默认使用本地地址
natsURL = nats.DefaultURL
}
// 创建健康检查上下文
healthCtx, healthCancel = context.WithCancel(context.Background())
// 创建重连通知通道(增大缓冲区避免丢失通知)
reconnectChan = make(chan struct{}, 10)
// 启动连接
go initConnection()
// 启动健康检查协程
go healthCheck()
}
// initConnection 初始化连接
func initConnection() {
ctx := context.Background()
notifyConnState(ConnStateConnecting, nil)
if err := connect(ctx); err != nil {
g.Log().Errorf(ctx, "NATS 初始连接失败: %v", err)
notifyConnState(ConnStateDisconnected, err)
}
}
// connect 建立 NATS 连接
func connect(ctx context.Context) error {
mu.Lock()
defer mu.Unlock()
if nc != nil && !nc.IsClosed() {
nc.Close()
}
// 连接选项配置
opts := []nats.Option{
nats.Name("goframe-nats-client"),
nats.ReconnectWait(2 * time.Second),
nats.MaxReconnects(-1), // 无限重连
nats.PingInterval(10 * time.Second),
nats.MaxPingsOutstanding(5),
nats.ReconnectHandler(func(nc *nats.Conn) {
g.Log().Infof(ctx, "✅ NATS 重连成功: %s", nc.ConnectedUrl())
connected = true
// 重新创建 JetStream 实例
if newJS, err := jetstream.New(nc); err == nil {
js = newJS
}
// 通知重连成功
notifyConnState(ConnStateConnected, nil)
// 使用非阻塞发送避免阻塞
select {
case reconnectChan <- struct{}{}:
default:
// 通道已满,丢弃通知
}
}),
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
g.Log().Warningf(ctx, "⚠️ NATS 连接断开: %v, 准备重连...", err)
connected = false
notifyConnState(ConnStateReconnecting, err)
}),
nats.ClosedHandler(func(nc *nats.Conn) {
g.Log().Infof(ctx, "NATS 连接已关闭: %s", nc.ConnectedUrl())
connected = false
notifyConnState(ConnStateClosed, nil)
}),
nats.ErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
g.Log().Errorf(ctx, "NATS 错误: %v", err)
}),
}
var err error
nc, err = nats.Connect(natsURL, opts...)
if err != nil {
return fmt.Errorf("NATS 连接失败: %w", err)
}
// 等待连接就绪
if nc.Status() != nats.CONNECTED {
select {
case <-time.After(5 * time.Second):
notifyConnState(ConnStateDisconnected, fmt.Errorf("连接超时"))
return fmt.Errorf("NATS 连接超时")
case <-nc.StatusChanged(nats.CONNECTED):
}
}
// 创建 JetStream 实例
js, err = jetstream.New(nc)
if err != nil {
return fmt.Errorf("创建 JetStream 失败: %w", err)
}
connected = true
inited = true
g.Log().Infof(ctx, "✅ NATS 连接成功: %s", nc.ConnectedUrl())
notifyConnState(ConnStateConnected, nil)
return nil
}
// healthCheck 健康检查协程(仅作为备用检查)
func healthCheck() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-healthCtx.Done():
return
case <-ticker.C:
mu.RLock()
currentConnected := connected
currentConn := nc
mu.RUnlock()
if !currentConnected || currentConn == nil || currentConn.IsClosed() {
// 仅记录日志不尝试重连NATS 已有自动重连机制)
g.Log().Warning(context.Background(), "NATS 连接断开,等待 NATS 自动重连...")
}
case <-reconnectChan:
// 重连成功的通知(仅记录日志)
g.Log().Info(context.Background(), "收到重连成功通知")
}
}
}
// checkConnected 检查连接状态
func checkConnected() bool {
mu.RLock()
defer mu.RUnlock()
return connected && nc != nil && !nc.IsClosed()
}
// IsConnected 检查 NATS 是否已连接
func IsConnected() bool {
return checkConnected()
}
// GetConnState 获取当前连接状态
func GetConnState() ConnState {
mu.RLock()
defer mu.RUnlock()
if nc == nil {
return ConnStateDisconnected
}
if nc.IsClosed() {
return ConnStateClosed
}
if connected {
return ConnStateConnected
}
return ConnStateDisconnected
}
// CreateTaskStream 创建任务消息队列流
// 存储策略: 文件存储
// 工作队列模式: 工作队列策略
func CreateTaskStream(ctx context.Context, streamName string, subjects []string) error {
if !checkConnected() {
return fmt.Errorf("NATS 未连接")
}
stream, err := js.Stream(ctx, streamName)
if err == nil {
// 流已存在,更新配置
_, err = js.UpdateStream(ctx, jetstream.StreamConfig{
Name: streamName,
Subjects: subjects,
Storage: jetstream.FileStorage,
Retention: jetstream.WorkQueuePolicy,
})
if err != nil {
return fmt.Errorf("更新任务流失败: %w", err)
}
g.Log().Infof(ctx, "✅ 任务消息队列流已更新: %s", stream.CachedInfo().Config.Name)
return nil
}
// 创建新流
stream, err = js.CreateStream(ctx, jetstream.StreamConfig{
Name: streamName,
Subjects: subjects,
Storage: jetstream.FileStorage,
Retention: jetstream.WorkQueuePolicy,
})
if err != nil {
return fmt.Errorf("创建任务流失败: %w", err)
}
g.Log().Infof(ctx, "✅ 任务消息队列流创建成功: %s (文件存储+工作队列策略)", stream.CachedInfo().Config.Name)
return nil
}
// CreateLogStream 创建日志流
// 存储策略: 内存存储
// 副本数: 单副本 (1)
// 消息留存: 短时留存 (1小时)
func CreateLogStream(ctx context.Context, streamName string, subjects []string) error {
if !checkConnected() {
return fmt.Errorf("NATS 未连接")
}
maxAge := 1 * time.Hour
stream, err := js.Stream(ctx, streamName)
if err == nil {
// 流已存在,更新配置
_, err = js.UpdateStream(ctx, jetstream.StreamConfig{
Name: streamName,
Subjects: subjects,
Storage: jetstream.MemoryStorage,
Replicas: 1,
MaxAge: maxAge,
})
if err != nil {
return fmt.Errorf("更新日志流失败: %w", err)
}
g.Log().Infof(ctx, "✅ 日志流已更新: %s", stream.CachedInfo().Config.Name)
return nil
}
// 创建新流
stream, err = js.CreateStream(ctx, jetstream.StreamConfig{
Name: streamName,
Subjects: subjects,
Storage: jetstream.MemoryStorage,
Replicas: 1,
MaxAge: maxAge,
})
if err != nil {
return fmt.Errorf("创建日志流失败: %w", err)
}
g.Log().Infof(ctx, "✅ 日志流创建成功: %s (内存存储+单副本+短时留存1小时)", stream.CachedInfo().Config.Name)
return nil
}
// CreateTradeStream 创建交易业务流
// 存储策略: 文件存储
// 副本数: 3副本
// 同步刷盘: 启用
func CreateTradeStream(ctx context.Context, streamName string, subjects []string) error {
if !checkConnected() {
return fmt.Errorf("NATS 未连接")
}
stream, err := js.Stream(ctx, streamName)
if err == nil {
// 流已存在,更新配置
_, err = js.UpdateStream(ctx, jetstream.StreamConfig{
Name: streamName,
Subjects: subjects,
Storage: jetstream.FileStorage,
Replicas: 3,
RePublish: nil,
Duplicates: 0,
})
if err != nil {
return fmt.Errorf("更新交易流失败: %w", err)
}
g.Log().Infof(ctx, "✅ 交易业务流已更新: %s", stream.CachedInfo().Config.Name)
return nil
}
// 创建新流
stream, err = js.CreateStream(ctx, jetstream.StreamConfig{
Name: streamName,
Subjects: subjects,
Storage: jetstream.FileStorage,
Replicas: 3,
RePublish: nil,
Duplicates: 0,
})
if err != nil {
return fmt.Errorf("创建交易流失败: %w", err)
}
g.Log().Infof(ctx, "✅ 交易业务流创建成功: %s (文件存储+3副本+同步刷盘)", stream.CachedInfo().Config.Name)
return nil
}
// Publish 发布消息到指定主题
func Publish(ctx context.Context, subject string, data []byte) error {
if !checkConnected() {
return fmt.Errorf("NATS 未连接")
}
metrics.PublishCount.Add(1)
_, err := js.Publish(ctx, subject, data)
if err != nil {
metrics.PublishError.Add(1)
return fmt.Errorf("发布消息失败: %w", err)
}
return nil
}
// GetStream 获取流信息
func GetStream(ctx context.Context, streamName string) (*jetstream.StreamInfo, error) {
if !checkConnected() {
return nil, fmt.Errorf("NATS 未连接")
}
stream, err := js.Stream(ctx, streamName)
if err != nil {
return nil, fmt.Errorf("获取流失败: %w", err)
}
info, err := stream.Info(ctx)
if err != nil {
return nil, fmt.Errorf("获取流信息失败: %w", err)
}
return info, nil
}
// ListStreams 列出所有流(简化实现)
// 注意:由于 API 限制,此方法可能需要根据实际需求进一步实现
func ListStreams(ctx context.Context) ([]string, error) {
if !checkConnected() {
return nil, fmt.Errorf("NATS 未连接")
}
// TODO: 根据实际 NATS 版本实现完整的流列表功能
return []string{}, nil
}
// DeleteStream 删除流
func DeleteStream(ctx context.Context, streamName string) error {
if !checkConnected() {
return fmt.Errorf("NATS 未连接")
}
if err := js.DeleteStream(ctx, streamName); err != nil {
return fmt.Errorf("删除流失败: %w", err)
}
g.Log().Infof(ctx, "✅ 流已删除: %s", streamName)
return nil
}
// GetConsumer 获取消费者信息
func GetConsumer(ctx context.Context, streamName, consumerName string) (*jetstream.ConsumerInfo, error) {
if !checkConnected() {
return nil, fmt.Errorf("NATS 未连接")
}
consumer, err := js.Consumer(ctx, streamName, consumerName)
if err != nil {
return nil, fmt.Errorf("获取消费者失败: %w", err)
}
info, err := consumer.Info(ctx)
if err != nil {
return nil, fmt.Errorf("获取消费者信息失败: %w", err)
}
return info, nil
}
// ListConsumers 列出指定流的所有消费者(简化实现)
// 注意:由于 API 限制,此方法可能需要根据实际需求进一步实现
func ListConsumers(ctx context.Context, streamName string) ([]string, error) {
if !checkConnected() {
return nil, fmt.Errorf("NATS 未连接")
}
// TODO: 根据实际 NATS 版本实现完整的消费者列表功能
return []string{}, nil
}
// DeleteConsumer 删除消费者
func DeleteConsumer(ctx context.Context, streamName, consumerName string) error {
if !checkConnected() {
return fmt.Errorf("NATS 未连接")
}
if err := js.DeleteConsumer(ctx, streamName, consumerName); err != nil {
return fmt.Errorf("删除消费者失败: %w", err)
}
g.Log().Infof(ctx, "✅ 消费者已删除: %s/%s", streamName, consumerName)
return nil
}
// CreateConsumer 创建消费者
func CreateConsumer(ctx context.Context, streamName, consumerName string, config jetstream.ConsumerConfig) (jetstream.Consumer, error) {
if !checkConnected() {
return nil, fmt.Errorf("NATS 未连接")
}
// 尝试获取现有消费者
consumer, err := js.Consumer(ctx, streamName, consumerName)
if err == nil {
return consumer, nil
}
// 创建新消费者
consumer, err = js.CreateConsumer(ctx, streamName, config)
if err != nil {
return nil, fmt.Errorf("创建消费者失败: %w", err)
}
return consumer, nil
}
// SubscribeRequest 订阅 RPC 请求
// B服务作为服务提供者订阅主题并响应请求时使用此方法
// subject: 订阅的主题名,与 Request 调用时使用相同的 subject
func SubscribeRequest(subject string, handler func(subject string, data []byte) ([]byte, error)) (*nats.Subscription, error) {
if !checkConnected() {
return nil, fmt.Errorf("NATS 未连接")
}
sub, err := nc.Subscribe(subject, func(msg *nats.Msg) {
// 处理请求
response, err := handler(msg.Subject, msg.Data)
if err != nil {
// 处理错误,发送错误响应
errMsg := fmt.Sprintf("处理失败: %v", err)
if err = msg.Respond([]byte(errMsg)); err != nil {
g.Log().Errorf(context.Background(), "RPC 错误响应失败: %v", err)
}
return
}
// 发送成功响应
if err = msg.Respond(response); err != nil {
g.Log().Errorf(context.Background(), "RPC 响应失败: %v", err)
}
})
if err != nil {
return nil, fmt.Errorf("订阅 RPC 请求失败: %w", err)
}
return sub, nil
}
// SubscribeQueueRequest 订阅队列模式的 RPC 请求(负载均衡)
// 多个服务实例订阅同一主题,实现负载均衡
// subject: 订阅的主题名,与 Request 调用时使用相同的 subject
// queueName: 队列组名,同一队列组的实例之间实现负载均衡
func SubscribeQueueRequest(subject, queueName string, handler func(subject string, data []byte) ([]byte, error)) (*nats.Subscription, error) {
if !checkConnected() {
return nil, fmt.Errorf("NATS 未连接")
}
sub, err := nc.QueueSubscribe(subject, queueName, func(msg *nats.Msg) {
// 处理请求
response, err := handler(msg.Subject, msg.Data)
if err != nil {
// 处理错误,发送错误响应
errMsg := fmt.Sprintf("处理失败: %v", err)
if err = msg.Respond([]byte(errMsg)); err != nil {
g.Log().Errorf(context.Background(), "RPC 错误响应失败: %v", err)
}
return
}
// 发送成功响应
if err := msg.Respond(response); err != nil {
g.Log().Errorf(context.Background(), "RPC 响应失败: %v", err)
}
})
if err != nil {
return nil, fmt.Errorf("订阅队列 RPC 请求失败: %w", err)
}
return sub, nil
}
// Request RPC 请求-响应模式
// A服务调用B服务查询接口时使用此方法
func Request(ctx context.Context, subject string, data []byte, timeout time.Duration) ([]byte, error) {
if !checkConnected() {
return nil, fmt.Errorf("NATS 未连接")
}
metrics.RequestCount.Add(1)
// 使用 timeout 参数创建超时上下文
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
msg, err := nc.RequestWithContext(timeoutCtx, subject, data)
if err != nil {
metrics.RequestError.Add(1)
return nil, fmt.Errorf("RPC 请求失败: %w", err)
}
if msg == nil {
metrics.RequestError.Add(1)
return nil, fmt.Errorf("RPC 响应为空")
}
return msg.Data, nil
}
// Close 关闭 NATS 连接
func Close() error {
mu.Lock()
defer mu.Unlock()
// 停止健康检查协程
if healthCancel != nil {
healthCancel()
}
// 关闭连接
if nc != nil && !nc.IsClosed() {
nc.Close()
connected = false
inited = false
g.Log().Info(context.Background(), "NATS 连接已关闭")
}
return nil
}