345 lines
9.4 KiB
Go
345 lines
9.4 KiB
Go
package message
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"strings"
|
||
"time"
|
||
|
||
"github.com/gogf/gf/v2/frame/g"
|
||
"github.com/gogf/gf/v2/os/glog"
|
||
"github.com/gogf/gf/v2/util/gconv"
|
||
)
|
||
|
||
// redisMessageClient Redis 实现
|
||
type redisMessageClient struct {
|
||
clientType messageClientType
|
||
}
|
||
|
||
// RedisStreamMessage Redis Stream 消息结构
|
||
type RedisStreamMessage struct {
|
||
ID string
|
||
Values map[string]interface{}
|
||
}
|
||
|
||
// StreamGroup 创建消费组(支持单个或批量)
|
||
func (q *redisMessageClient) streamGroup(ctx context.Context, configs ...interface{}) error {
|
||
if len(configs) == 0 {
|
||
return fmt.Errorf("配置不能为空")
|
||
}
|
||
for _, config := range configs {
|
||
cfg, ok := config.(*RedisConfig)
|
||
if !ok {
|
||
return fmt.Errorf("无效的 Redis 配置类型")
|
||
}
|
||
if err := q.createStreamGroup(ctx, cfg); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// streamGroup 内部单个创建消费组
|
||
func (q *redisMessageClient) createStreamGroup(ctx context.Context, cfg *RedisConfig) error {
|
||
// 获取默认数据源
|
||
ds, err := GetManager().GetDefaultDataSource()
|
||
if err != nil {
|
||
return fmt.Errorf("获取默认数据源失败: %w", err)
|
||
}
|
||
|
||
// 检查连接状态,未连接则自动重连
|
||
if !ds.IsConnected() {
|
||
if err := ds.Reconnect(ctx); err != nil {
|
||
return fmt.Errorf("redis重连失败: %w", err)
|
||
}
|
||
}
|
||
|
||
_, err = ds.Redis().Do(ctx, "XGROUP", "CREATE", cfg.Stream, cfg.Group, "0", "MKSTREAM")
|
||
if err != nil {
|
||
errStr := err.Error()
|
||
if strings.Contains(errStr, "BUSYGROUP") && strings.Contains(errStr, "already exists") {
|
||
glog.Infof(ctx, "✅ Redis 消费者组已存在: %s", cfg.Group)
|
||
return nil
|
||
}
|
||
return fmt.Errorf("初始化消费者组失败: %w", err)
|
||
}
|
||
glog.Infof(ctx, "✅ Redis 消费者组创建成功: %s", cfg.Group)
|
||
return nil
|
||
}
|
||
|
||
// Publish 内部单个发布消息
|
||
func (q *redisMessageClient) publish(ctx context.Context, config interface{}, data interface{}) error {
|
||
ds, err := GetManager().GetDefaultDataSource()
|
||
if err != nil {
|
||
return fmt.Errorf("获取默认数据源失败: %w", err)
|
||
}
|
||
|
||
if !ds.IsConnected() {
|
||
if err := ds.Reconnect(ctx); err != nil {
|
||
return fmt.Errorf("redis重连失败: %w", err)
|
||
}
|
||
}
|
||
|
||
cfg, ok := config.(*RedisConfig)
|
||
if !ok {
|
||
return fmt.Errorf("无效的redis配置类型")
|
||
}
|
||
values := gconv.Map(data)
|
||
args := make([]interface{}, 0, len(values)*2+2)
|
||
args = append(args, cfg.Stream, "*")
|
||
for key, val := range values {
|
||
args = append(args, key, val)
|
||
}
|
||
result, err := ds.Redis().Do(ctx, "XADD", args...)
|
||
if err != nil {
|
||
g.Log().Errorf(ctx, "❌ Redis 发布消息失败: topic=%s, err=%v", cfg.Stream, err)
|
||
return err
|
||
}
|
||
g.Log().Infof(ctx, "✅ Redis 发布消息成功: topic=%s, messageID=%s", cfg.Stream, gconv.String(result))
|
||
return nil
|
||
}
|
||
|
||
// PublishDelayed 发布延迟消息(使用 ZSET)
|
||
func (q *redisMessageClient) publishDelayed(ctx context.Context, config interface{}, data interface{}, delay int) error {
|
||
ds, err := GetManager().GetDefaultDataSource()
|
||
if err != nil {
|
||
return fmt.Errorf("获取默认数据源失败: %w", err)
|
||
}
|
||
|
||
if !ds.IsConnected() {
|
||
if err := ds.Reconnect(ctx); err != nil {
|
||
return fmt.Errorf("redis重连失败: %w", err)
|
||
}
|
||
}
|
||
|
||
cfg, ok := config.(*RedisConfig)
|
||
if !ok {
|
||
return fmt.Errorf("无效的redis配置类型")
|
||
}
|
||
payload, err := json.Marshal(data)
|
||
if err != nil {
|
||
return fmt.Errorf("序列化数据失败: %w", err)
|
||
}
|
||
score := float64(time.Now().Add(time.Duration(delay)).UnixMilli())
|
||
delayedKey := fmt.Sprintf("delayed:%s", cfg.Stream)
|
||
|
||
// ZADD delayedKey score payload
|
||
_, err = ds.Redis().Do(ctx, "ZADD", delayedKey, score, string(payload))
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
g.Log().Infof(ctx, "✅ Redis 延迟消息已发布: topic=%s, delay=%v", cfg.Stream, delay)
|
||
return nil
|
||
}
|
||
|
||
// Subscribe 订阅消息(支持单个或批量)
|
||
func (q *redisMessageClient) subscribe(ctx context.Context, configs ...interface{}) error {
|
||
if len(configs) == 0 {
|
||
return fmt.Errorf("配置不能为空")
|
||
}
|
||
for _, config := range configs {
|
||
cfg, ok := config.(*RedisConfig)
|
||
if !ok {
|
||
return fmt.Errorf("无效的 Redis 配置类型")
|
||
}
|
||
handler := cfg.HandleFunc
|
||
if handler == nil {
|
||
return fmt.Errorf("必须提供处理函数")
|
||
}
|
||
if err := q.createSubscribe(ctx, cfg, handler); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// subscribe 内部单个订阅消息
|
||
func (q *redisMessageClient) createSubscribe(ctx context.Context, cfg *RedisConfig, handler func(ctx context.Context, message map[string]interface{}) error) error {
|
||
go func() {
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
g.Log().Errorf(ctx, "❌ Redis 消费者 panic: %v", r)
|
||
}
|
||
}()
|
||
|
||
retryTicker := time.NewTicker(time.Second)
|
||
defer retryTicker.Stop()
|
||
|
||
// 重试计数器
|
||
var consecutiveErrors int
|
||
const maxConsecutiveErrors = 3
|
||
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
g.Log().Infof(ctx, "🔕 Redis 消费者停止: topic=%s", cfg.Stream)
|
||
return
|
||
case <-retryTicker.C:
|
||
err := q.consumeMessages(ctx, cfg, handler)
|
||
if err != nil {
|
||
// 对于超时错误,返回nil继续循环,而不是返回错误
|
||
if strings.Contains(err.Error(), "i/o timeout") || strings.Contains(err.Error(), "timeout") ||
|
||
strings.Contains(err.Error(), "context deadline exceeded") || strings.Contains(err.Error(), "context canceled") {
|
||
|
||
consecutiveErrors++
|
||
if consecutiveErrors > maxConsecutiveErrors {
|
||
g.Log().Errorf(ctx, "Max retries exceeded, giving up")
|
||
return
|
||
}
|
||
backoffTime := 5 * time.Second
|
||
g.Log().Warningf(ctx, "⚠️ 等待 %v 后重试...", backoffTime)
|
||
|
||
time.Sleep(backoffTime)
|
||
} else {
|
||
// 非超时错误(严重错误)
|
||
consecutiveErrors = 0 // 重置计数
|
||
g.Log().Errorf(ctx, "严重错误,立即重试: %v", err)
|
||
|
||
// 短暂等待后重试
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
case <-time.After(time.Second):
|
||
// 继续循环
|
||
}
|
||
}
|
||
} else {
|
||
// 成功时重置错误计数器
|
||
consecutiveErrors = 0
|
||
}
|
||
}
|
||
}
|
||
}()
|
||
return nil
|
||
}
|
||
|
||
// consumeMessages 消费消息
|
||
func (q *redisMessageClient) consumeMessages(ctx context.Context, cfg *RedisConfig, handler func(ctx context.Context, message map[string]interface{}) error) error {
|
||
ds, err := GetManager().GetDefaultDataSource()
|
||
if err != nil {
|
||
return fmt.Errorf("获取默认数据源失败: %w", err)
|
||
}
|
||
|
||
if !ds.IsConnected() {
|
||
if err := ds.Reconnect(ctx); err != nil {
|
||
return fmt.Errorf("redis重连失败: %w", err)
|
||
}
|
||
}
|
||
|
||
// 检查消费者组是否存在
|
||
if err := q.createStreamGroup(ctx, cfg); err != nil {
|
||
return fmt.Errorf("create stream group failed: %w", err)
|
||
}
|
||
|
||
// 使用带重试的命令执行
|
||
result, err := ds.Redis().Do(ctx, "XREADGROUP", "GROUP", cfg.Group, cfg.Consumer, "COUNT", cfg.Count, "BLOCK", 0, "STREAMS", cfg.Stream, ">")
|
||
if err != nil {
|
||
if strings.Contains(err.Error(), "i/o timeout") || strings.Contains(err.Error(), "timeout") ||
|
||
strings.Contains(err.Error(), "context deadline exceeded") || strings.Contains(err.Error(), "context canceled") {
|
||
|
||
}
|
||
return err
|
||
}
|
||
messages, err := q.parseStreamResult(result)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
for _, msg := range messages {
|
||
// 处理消息
|
||
if err := handler(ctx, msg.Values); err != nil {
|
||
g.Log().Errorf(ctx, "❌ 消息处理失败: messageID=%s, err=%v", msg.ID, err)
|
||
continue
|
||
}
|
||
|
||
// ACK 消息
|
||
if cfg.AutoAck {
|
||
if err := q.ackMessage(ctx, cfg.Stream, cfg.Group, msg.ID); err != nil {
|
||
g.Log().Errorf(ctx, "❌ ACK 消息失败: messageID=%s, err=%v", msg.ID, err)
|
||
}
|
||
}
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// ackMessage ACK 消息
|
||
func (q *redisMessageClient) ackMessage(ctx context.Context, streamKey, groupName string, messageIDs ...string) error {
|
||
ds, err := GetManager().GetDefaultDataSource()
|
||
if err != nil {
|
||
return fmt.Errorf("获取默认数据源失败: %w", err)
|
||
}
|
||
|
||
if !ds.IsConnected() {
|
||
if err := ds.Reconnect(ctx); err != nil {
|
||
return fmt.Errorf("redis重连失败: %w", err)
|
||
}
|
||
}
|
||
|
||
args := make([]interface{}, 0, len(messageIDs)+2)
|
||
args = append(args, streamKey, groupName)
|
||
for _, id := range messageIDs {
|
||
args = append(args, id)
|
||
}
|
||
_, err = ds.Redis().Do(ctx, "XACK", args...)
|
||
return err
|
||
}
|
||
|
||
// parseStreamResult 解析 Stream 结果
|
||
func (q *redisMessageClient) parseStreamResult(result interface{}) ([]RedisStreamMessage, error) {
|
||
if result == nil {
|
||
return []RedisStreamMessage{}, nil
|
||
}
|
||
|
||
var resultVal interface{}
|
||
|
||
// 尝试获取 Val() 方法
|
||
if valuer, ok := result.(interface{ Val() interface{} }); ok {
|
||
resultVal = valuer.Val()
|
||
} else {
|
||
resultVal = result
|
||
}
|
||
|
||
// 检查是否为空
|
||
if resultVal == nil {
|
||
return []RedisStreamMessage{}, nil
|
||
}
|
||
|
||
// 预分配切片容量,避免多次扩容
|
||
messages := make([]RedisStreamMessage, 0)
|
||
|
||
if streamsMap, ok := resultVal.(map[interface{}]interface{}); ok {
|
||
for _, streamMsg := range streamsMap {
|
||
msgArray, ok := streamMsg.([]interface{})
|
||
if !ok {
|
||
continue
|
||
}
|
||
for _, msgData := range msgArray {
|
||
msgArray, ok := msgData.([]interface{})
|
||
if !ok || len(msgArray) < 2 {
|
||
continue
|
||
}
|
||
msgID := gconv.String(msgArray[0])
|
||
fieldsArray, ok := msgArray[1].([]interface{})
|
||
if !ok {
|
||
continue
|
||
}
|
||
values := make(map[string]interface{}, len(fieldsArray)/2)
|
||
for i := 0; i < len(fieldsArray); i += 2 {
|
||
if i+1 < len(fieldsArray) {
|
||
key := gconv.String(fieldsArray[i])
|
||
values[key] = fieldsArray[i+1]
|
||
}
|
||
}
|
||
messages = append(messages, RedisStreamMessage{
|
||
ID: msgID,
|
||
Values: values,
|
||
})
|
||
}
|
||
}
|
||
}
|
||
|
||
return messages, nil
|
||
}
|