package message import ( "context" "fmt" "github.com/gogf/gf/v2/os/glog" "strings" "time" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/util/gconv" ) type RedisPublishMsgConfig struct { QueueName string Data any } type RedisPublishDelayMsgConfig struct { } type RedisSubscribeMsgConfig struct { QueueName string ConsumerName string AutoAck bool PrefetchCount int HandleFunc func(ctx context.Context, message map[string]interface{}) error } func (*RedisPublishMsgConfig) GetPublishMsgType() { } func (*RedisPublishDelayMsgConfig) GetPublishDelayMsgType() {} func (*RedisSubscribeMsgConfig) GetSubscribeMsgType() { } type redis struct { name string // 数据源名称 } func init() { // 注册 Redis 插件(默认数据源) RegisterPlugin(context.Background(), "default", MessageRedis, func() messageUtil { return &redis{name: "default"} }) } // RedisStreamMessage Redis Stream 消息结构 type redisStreamMessage struct { ID string Values map[string]interface{} } // Connect 连接 Redis func (c *redis) Connect(ctx context.Context) error { return redisConnect(ctx, c.name) } // Ping 检测 Redis 连接状态 func (c *redis) Ping(ctx context.Context) bool { return redisPing(ctx, c.name) } // Close 关闭 Redis 连接 func (c *redis) Close(ctx context.Context) error { return redisClose(ctx, c.name) } // Publish 发布消息 func (c *redis) Publish(ctx context.Context, msgConfig messagePublishConfig) error { cfg, ok := msgConfig.(*RedisPublishMsgConfig) if !ok { return fmt.Errorf("无效的 Redis 配置类型") } if g.IsEmpty(cfg.QueueName) { return fmt.Errorf("队列名称不能为空") } if g.IsEmpty(cfg.Data) { return fmt.Errorf("数据不能为空") } rc := getRedisConn(c.name) if !c.Ping(ctx) { if err := commonConnect(ctx, MessageRedis, c.name, func(ctx context.Context) error { return c.Connect(ctx) }, func(ctx context.Context) error { return c.Close(ctx) }); err != nil { g.Log().Errorf(ctx, "❌ [%s][%s] 连接失败: %v", MessageRedis, c.name, err) return err } } values := gconv.Map(cfg.Data) args := make([]interface{}, 0, len(values)*2+2) args = append(args, cfg.QueueName, "*") for key, val := range values { args = append(args, key, val) } result, err := rc.Do(ctx, "XADD", args...) if err != nil { g.Log().Errorf(ctx, "❌ Redis 发布消息失败: key=%s, err=%v", cfg.QueueName, err) return err } g.Log().Infof(ctx, "✅ Redis 发布消息成功: key=%s, messageID=%s", cfg.QueueName, gconv.String(result)) return nil } // PublishDelay 发布延迟消息 func (c *redis) PublishDelay(ctx context.Context, _ messagePublishDelayConfig) error { g.Log().Errorf(ctx, "❌ Redis 不支持延迟消息") return fmt.Errorf("❌ Redis 不支持延迟消息") } // Subscribe 订阅消息 func (c *redis) Subscribe(ctx context.Context, msgConfig messageSubscribeConfig) error { cfg, ok := msgConfig.(*RedisSubscribeMsgConfig) if !ok { return fmt.Errorf("无效的 Redis 配置类型") } if g.IsEmpty(cfg.QueueName) { return fmt.Errorf("队列名称不能为空") } if g.IsEmpty(cfg.ConsumerName) { return fmt.Errorf("消费者名称不能为空") } if g.IsEmpty(cfg.HandleFunc) { return fmt.Errorf("处理函数不能为空") } return c.createSubscribe(ctx, cfg.QueueName, cfg.ConsumerName, cfg.PrefetchCount, cfg.AutoAck, cfg.HandleFunc) } // createSubscribe 内部订阅消息 func (c *redis) createSubscribe(ctx context.Context, key, consumerName string, prefetchCount int, autoAck bool, handler func(ctx context.Context, message map[string]interface{}) error) error { LOOP: err := c.consumeMessages(ctx, key, consumerName, prefetchCount, autoAck, handler) if err != nil { // 对于超时错误,返回nil继续循环,而不是返回错误 if strings.Contains(err.Error(), "i/o timeout") || strings.Contains(err.Error(), "timeout") || strings.Contains(err.Error(), "context deadline exceeded") || strings.Contains(err.Error(), "context canceled") { time.Sleep(time.Second) goto LOOP } else { g.Log().Errorf(ctx, "❌ 严重错误: %v", err) } } time.Sleep(time.Second) goto LOOP } // consumeMessages 消费消息 func (c *redis) consumeMessages(ctx context.Context, key, consumerName string, prefetchCount int, autoAck bool, handler func(ctx context.Context, message map[string]interface{}) error) error { if !c.Ping(ctx) { if err := commonConnect(ctx, MessageRedis, c.name, func(ctx context.Context) error { return c.Connect(ctx) }, func(ctx context.Context) error { return c.Close(ctx) }); err != nil { g.Log().Errorf(ctx, "❌ [%s][%s] 连接失败: %v", MessageRedis, c.name, err) return err } } rc := getRedisConn(c.name) if rc == nil { g.Log().Errorf(ctx, "❌ Redis [%s] 连接不存在", c.name) return fmt.Errorf("Redis 连接不存在") } // 检查消费者组是否存在 groupName := "default" _, err := rc.Do(ctx, "XGROUP", "CREATE", key, groupName, "0", "MKSTREAM") if err != nil { errStr := err.Error() if strings.Contains(errStr, "BUSYGROUP") && strings.Contains(errStr, "already exists") { glog.Infof(ctx, "✅ Redis [%s] 消费者组已存在: %s", c.name, key) return nil } g.Log().Errorf(ctx, "❌ 创建消费组失败: key=%s, err=%v", key, err) return err } glog.Infof(ctx, "✅ Redis [%s] 消费者组创建成功: %s", c.name, key) // 使用带重试的命令执行 result, err := rc.Do(ctx, "XREADGROUP", "GROUP", groupName, consumerName, "COUNT", prefetchCount, "BLOCK", 0, "STREAMS", key, ">") if err != nil { return err } messages, err := c.parseStreamResult(result) if err != nil { g.Log().Errorf(ctx, "❌ 解析消息失败: %v", err) return err } for _, msg := range messages { // 处理消息 if err := handler(ctx, msg.Values); err != nil { g.Log().Errorf(ctx, "❌ 消息处理失败: messageID=%s, err=%v", msg.ID, err) // 如果不是自动ACK,则跳过当前消息 if !autoAck { continue } } else { g.Log().Infof(ctx, "✅ 消息处理成功: messageID=%s", msg.ID) } // ACK 消息 args := make([]interface{}, 0, len(msg.ID)+2) args = append(args, key, groupName, msg.ID) _, err = rc.Do(ctx, "XACK", args...) if err != nil { g.Log().Errorf(ctx, "❌ ACK 消息失败: messageID=%s, err=%v", msg.ID, err) } else { g.Log().Infof(ctx, "✅ ACK 消息成功: messageID=%s", msg.ID) } } return nil } // parseStreamResult 解析 Stream 结果 func (c *redis) parseStreamResult(result interface{}) ([]redisStreamMessage, error) { if result == nil { return []redisStreamMessage{}, nil } var resultVal interface{} // 尝试获取 Val() 方法 if valuer, ok := result.(interface{ Val() interface{} }); ok { resultVal = valuer.Val() } else { resultVal = result } // 检查是否为空 if resultVal == nil { return []redisStreamMessage{}, nil } // 预分配切片容量,避免多次扩容 messages := make([]redisStreamMessage, 0) if streamsMap, ok := resultVal.(map[interface{}]interface{}); ok { for _, streamData := range streamsMap { msgArray, ok := streamData.([]interface{}) if !ok { continue } for _, msgData := range msgArray { msgArray, ok := msgData.([]interface{}) if !ok || len(msgArray) < 2 { continue } msgID := gconv.String(msgArray[0]) fieldsArray, ok := msgArray[1].([]interface{}) if !ok { continue } values := make(map[string]interface{}, len(fieldsArray)/2) for i := 0; i < len(fieldsArray); i += 2 { if i+1 < len(fieldsArray) { key := gconv.String(fieldsArray[i]) values[key] = fieldsArray[i+1] } } messages = append(messages, redisStreamMessage{ ID: msgID, Values: values, }) } } } return messages, nil }