From d44ed8bf35bb2141505671a3c319749e7089372e Mon Sep 17 00:00:00 2001 From: qhd <1766646056@qq.com> Date: Fri, 9 Jan 2026 10:19:31 +0800 Subject: [PATCH] =?UTF-8?q?redis=20mq=20=E6=B6=88=E6=81=AF=E5=B0=81?= =?UTF-8?q?=E8=A3=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- message/message.go | 179 +++++++++++++++++++++++ message/rabbit.go | 351 +++++++++++++++++++++++++++++++++++++++++++++ message/redis.go | 232 ++++++++++++++++++++++++++++++ 3 files changed, 762 insertions(+) create mode 100644 message/message.go create mode 100644 message/rabbit.go create mode 100644 message/redis.go diff --git a/message/message.go b/message/message.go new file mode 100644 index 0000000..e2c72c1 --- /dev/null +++ b/message/message.go @@ -0,0 +1,179 @@ +package message + +import ( + "context" + "github.com/gogf/gf/v2/database/gredis" + + "github.com/gogf/gf/v2/errors/gerror" +) + +// GetRedisClient 获取 Redis 客户端(供外部使用) +func GetRedisClient() *gredis.Redis { + return getRedisClient() +} + +func GetRedisClientTest(name string) *gredis.Redis { + return getRedisClientTest(name) +} + +// GetLock 获取分布式锁 +func GetLock(ctx context.Context, key string, expireSeconds int64, fn func(ctx context.Context) error) (success bool, err error) { + return lock(ctx, key, expireSeconds, fn) +} + +// MessageConfig 消息配置接口 +type MessageConfig interface { + start(ctx context.Context) error + publish(ctx context.Context, data interface{}, options ...map[string]interface{}) (messageID string, err error) +} + +// RedisMessageConfig Redis Stream 消息配置 +type RedisMessageConfig struct { + StreamKey string // Stream 键名 + GroupName string // 消费者组名称 + ConsumerName string // 消费者名称 + BatchSize int64 // 最大并发数(信号量容量) + AutoAck bool // ACK确认,true自动确认,false手动确认 + HandleFunc func(ctx context.Context, message map[string]interface{}) error +} + +func (r *RedisMessageConfig) start(ctx context.Context) error { + return readFromStream(ctx, QueueMessage{ + StreamKey: r.StreamKey, + GroupName: r.GroupName, + ConsumerName: r.ConsumerName, + BatchSize: r.BatchSize, + AutoAck: r.AutoAck, + HandleFunc: r.HandleFunc, + }) +} + +func (r *RedisMessageConfig) publish(ctx context.Context, data interface{}, options ...map[string]interface{}) (messageID string, err error) { + return publishToRedis(ctx, r.StreamKey, data) +} + +// RabbitMQMessageConfig RabbitMQ 消息配置 +type RabbitMQMessageConfig struct { + Queue string // 队列名称 + Exchange string // 交换器名称 + RoutingKey string // 路由键 + PrefetchCount int // QoS: 预取数量(并发控制) + WorkerCount int // worker 数量 + ConsumerTag string // 消费者标签 + HandleFunc func(ctx context.Context, message map[string]interface{}) error +} + +func (r *RabbitMQMessageConfig) start(ctx context.Context) error { + return startRabbitMQConsumer(ctx, QueueMessage{ + Queue: r.Queue, + Exchange: r.Exchange, + RoutingKey: r.RoutingKey, + PrefetchCount: r.PrefetchCount, + WorkerCount: r.WorkerCount, + ConsumerTag: r.ConsumerTag, + AutoAck: true, + HandleFunc: r.HandleFunc, + }) +} + +func (r *RabbitMQMessageConfig) publish(ctx context.Context, data interface{}, options ...map[string]interface{}) (messageID string, err error) { + opts := make(map[string]interface{}) + if len(options) > 0 { + opts = options[0] + } + exchange := r.Exchange + routingKey := r.RoutingKey + delay := 0 + + if v, ok := opts["exchange"].(string); ok { + exchange = v + } + if v, ok := opts["routingKey"].(string); ok { + routingKey = v + } + if v, ok := opts["delay"].(int); ok { + delay = v + } + + if delay > 0 { + return publishDelayedToRabbitMQ(ctx, exchange, routingKey, data, delay) + } + return publishToRabbitMQ(ctx, exchange, routingKey, data) +} + +// QueueMessage 统一消息队列配置结构体(内部使用) +type QueueMessage struct { + // Redis Stream 配置 + StreamKey string + GroupName string + ConsumerName string + BatchSize int64 + AutoAck bool + HandleFunc func(ctx context.Context, message map[string]interface{}) error + + // RabbitMQ 配置 + Queue string + Exchange string + RoutingKey string + PrefetchCount int + WorkerCount int + ConsumerTag string +} + +// StartConsumers 启动消息消费者(统一入口) +// 支持同时启动多个消费者,包括 Redis Stream 和 RabbitMQ +func StartConsumers(ctx context.Context, configs ...MessageConfig) error { + for _, cfg := range configs { + if err := cfg.start(ctx); err != nil { + return gerror.Wrap(err, "启动消费者失败") + } + } + return nil +} + +// PublishMessage 发布消息(统一入口) +// 根据配置类型选择发布到 Redis Stream 或 RabbitMQ +func PublishMessage(ctx context.Context, cfg MessageConfig, data interface{}, options ...map[string]interface{}) (messageID string, err error) { + return cfg.publish(ctx, data, options...) +} + +// ========== Redis Stream 公共方法(方便迁移) ========== + +// AddToStream 将消息添加到 Redis Stream +//func AddToStream(ctx context.Context, streamKey string, msg interface{}) (messageID string, err error) { +// return addToStream(ctx, streamKey, msg) +//} + +// ReadFromStream 从 Redis Stream 读取消息(已废弃) +// 请使用 RedisMessageConfig.StartConsumers 启动消费者 +// 此方法保留用于向后兼容,但实际不会返回消息(异步消费模式) +func ReadFromStream(ctx context.Context, streamKey, groupName, consumerName string, count, blockMs int64) ([]StreamMessage, error) { + return nil, gerror.New("ReadFromStream 已废弃,请使用 RedisMessageConfig.StartConsumers 启动消费者") +} + +// AckMessage 确认 Redis Stream 消息 +func AckMessage(ctx context.Context, streamKey, groupName string, messageIDs ...string) error { + return ackMessage(ctx, streamKey, groupName, messageIDs...) +} + +// InitStreamGroup 初始化 Redis Stream 消费者组 +func InitStreamGroup(ctx context.Context, streamKey, groupName string) error { + return initStreamGroup(ctx, streamKey, groupName) +} + +// ========== RabbitMQ 公共方法(方便迁移) ========== + +// InitRabbitMQ 初始化 RabbitMQ 连接 +func InitRabbitMQ(ctx context.Context) error { + return initRabbitMQ(ctx) +} + +// PublishToRabbitMQ 发布消息到 RabbitMQ +//func PublishToRabbitMQ(ctx context.Context, exchange, routingKey string, message interface{}) error { +// return publishToRabbitMQ(ctx, exchange, routingKey, message) +//} + +// PublishDelayedToRabbitMQ 发布延时消息到 RabbitMQ +//func PublishDelayedToRabbitMQ(ctx context.Context, exchange, routingKey string, message interface{}, delaySeconds int) error { +// return publishDelayedToRabbitMQ(ctx, exchange, routingKey, message, delaySeconds) +//} diff --git a/message/rabbit.go b/message/rabbit.go new file mode 100644 index 0000000..bd6e010 --- /dev/null +++ b/message/rabbit.go @@ -0,0 +1,351 @@ +package message + +import ( + "context" + "sync" + "time" + + "github.com/gogf/gf/v2/encoding/gjson" + "github.com/gogf/gf/v2/errors/gerror" + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/util/gconv" + amqp "github.com/rabbitmq/amqp091-go" +) + +var ( + rabbitConn *amqp.Connection + rabbitChannel *amqp.Channel + rabbitOnce sync.Once + rabbitMu sync.RWMutex + rabbitCloseWatcher chan struct{} + rabbitWatcherStarted bool +) + +// Config RabbitMQ 配置 +type RabbitMQConfig struct { + Host string + Port int + Username string + Password string + VHost string +} + +// rabbitMQConfig 默认配置 +func getRabbitMQConfig() *RabbitMQConfig { + return &RabbitMQConfig{ + Host: g.Cfg().MustGet(context.Background(), "rabbitmq.host").String(), + Port: g.Cfg().MustGet(context.Background(), "rabbitmq.port").Int(), + Username: g.Cfg().MustGet(context.Background(), "rabbitmq.username").String(), + Password: g.Cfg().MustGet(context.Background(), "rabbitmq.password").String(), + VHost: g.Cfg().MustGet(context.Background(), "rabbitmq.vhost", "/").String(), + } +} + +// initRabbitMQ 初始化 RabbitMQ 连接 +func initRabbitMQ(ctx context.Context) error { + var err error + rabbitOnce.Do(func() { + cfg := getRabbitMQConfig() + url := "amqp://" + cfg.Username + ":" + cfg.Password + "@" + cfg.Host + ":" + gconv.String(cfg.Port) + "/" + cfg.VHost + + rabbitConn, err = amqp.Dial(url) + if err != nil { + g.Log().Errorf(ctx, "RabbitMQ 连接失败: %v", err) + return + } + + rabbitChannel, err = rabbitConn.Channel() + if err != nil { + g.Log().Errorf(ctx, "创建 RabbitMQ Channel 失败: %v", err) + return + } + + rabbitCloseWatcher = make(chan struct{}) + + if !rabbitWatcherStarted { + go handleRabbitMQConnectionClose(ctx) + rabbitWatcherStarted = true + } + + g.Log().Info(ctx, "RabbitMQ 连接成功") + }) + + return err +} + +// getRabbitMQChannel 获取 RabbitMQ Channel +func getRabbitMQChannel() (*amqp.Channel, error) { + rabbitMu.RLock() + defer rabbitMu.RUnlock() + + if rabbitChannel == nil || rabbitChannel.IsClosed() { + return nil, gerror.New("RabbitMQ Channel 未初始化或已关闭") + } + + return rabbitChannel, nil +} + +// getRabbitMQConnection 获取 RabbitMQ 连接 +func getRabbitMQConnection() (*amqp.Connection, error) { + rabbitMu.RLock() + defer rabbitMu.RUnlock() + + if rabbitConn == nil || rabbitConn.IsClosed() { + return nil, gerror.New("RabbitMQ 连接未初始化或已关闭") + } + + return rabbitConn, nil +} + +// handleRabbitMQConnectionClose 监听连接关闭并重连 +func handleRabbitMQConnectionClose(ctx context.Context) { + for { + select { + case <-rabbitCloseWatcher: + g.Log().Info(ctx, "停止监听 RabbitMQ 连接状态") + return + default: + } + + rabbitMu.RLock() + currentConn := rabbitConn + rabbitMu.RUnlock() + + if currentConn == nil { + return + } + + closeErr := make(chan *amqp.Error, 1) + currentConn.NotifyClose(closeErr) + + select { + case err := <-closeErr: + if err != nil { + g.Log().Errorf(ctx, "RabbitMQ 连接关闭: %v,尝试重连...", err) + reconnectRabbitMQ(ctx) + } + case <-rabbitCloseWatcher: + g.Log().Info(ctx, "停止监听 RabbitMQ 连接状态") + return + } + } +} + +// reconnectRabbitMQ 重新连接 +func reconnectRabbitMQ(ctx context.Context) { + rabbitMu.Lock() + defer rabbitMu.Unlock() + + for i := 0; i < 10; i++ { + time.Sleep(time.Duration(i+1) * time.Second) + + cfg := getRabbitMQConfig() + url := "amqp://" + cfg.Username + ":" + cfg.Password + "@" + cfg.Host + ":" + gconv.String(cfg.Port) + "/" + cfg.VHost + + var err error + rabbitConn, err = amqp.Dial(url) + if err != nil { + g.Log().Errorf(ctx, "重连失败 (尝试 %d/10): %v", i+1, err) + continue + } + + rabbitChannel, err = rabbitConn.Channel() + if err != nil { + g.Log().Errorf(ctx, "创建 Channel 失败 (尝试 %d/10): %v", i+1, err) + continue + } + + g.Log().Info(ctx, "RabbitMQ 重连成功") + return + } + + g.Log().Fatal(ctx, "RabbitMQ 重连失败,已达到最大重试次数") +} + +// startRabbitMQConsumer 启动 RabbitMQ 消费者 +func startRabbitMQConsumer(ctx context.Context, msg QueueMessage) error { + // 初始化连接 + if err := initRabbitMQ(ctx); err != nil { + return gerror.Wrap(err, "初始化 RabbitMQ 连接失败") + } + + // 创建独立 Channel(避免并发冲突) + conn, err := getRabbitMQConnection() + if err != nil { + return gerror.Wrap(err, "获取RabbitMQ连接失败") + } + + ch, err := conn.Channel() + if err != nil { + return gerror.Wrap(err, "创建独立Channel失败") + } + + // 声明队列 + _, err = ch.QueueDeclare( + msg.Queue, // name + true, // durable + false, // autoDelete + false, // exclusive + false, // noWait + nil, // arguments + ) + if err != nil { + return gerror.Newf("声明队列失败: %v", err) + } + + // 设置 QoS(并发控制) + prefetchCount := msg.PrefetchCount + if prefetchCount == 0 { + prefetchCount = 1 + } + err = ch.Qos( + prefetchCount, // prefetchCount + 0, // prefetchSize + false, // global + ) + if err != nil { + return gerror.Newf("设置 QoS 失败: %v", err) + } + + // 开始消费 + msgs, err := ch.Consume( + msg.Queue, // queue + msg.ConsumerTag, // consumer tag + msg.AutoAck, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + if err != nil { + return gerror.Newf("开始消费失败: %v", err) + } + + workerCount := msg.WorkerCount + if workerCount == 0 { + workerCount = 1 + } + + g.Log().Infof(ctx, "RabbitMQ 消费者已启动: queue=%s, prefetch=%d, workers=%d", + msg.Queue, prefetchCount, workerCount) + + // 启动多个 worker + for i := 0; i < workerCount; i++ { + go rabbitMQWorker(ctx, i, msgs, msg) + } + + return nil +} + +// rabbitMQWorker RabbitMQ 工作协程 +func rabbitMQWorker(ctx context.Context, workerID int, msgs <-chan amqp.Delivery, msg QueueMessage) { + g.Log().Debugf(ctx, "RabbitMQ Worker %d 已启动", workerID) + + for { + select { + case <-ctx.Done(): + g.Log().Infof(ctx, "RabbitMQ Worker %d 收到停止信号,正在退出", workerID) + return + case delivery, ok := <-msgs: + if !ok { + g.Log().Infof(ctx, "RabbitMQ Worker %d 消息通道已关闭,退出", workerID) + return + } + + // 反序列化消息 + var message map[string]interface{} + if err := gjson.DecodeTo(delivery.Body, &message); err != nil { + g.Log().Errorf(ctx, "RabbitMQ Worker %d 反序列化消息失败: %v", workerID, err) + if !msg.AutoAck { + delivery.Nack(false, false) + } + continue + } + + // 处理消息 + err := msg.HandleFunc(ctx, message) + if err != nil { + g.Log().Errorf(ctx, "RabbitMQ Worker %d 处理消息失败: %v", workerID, err) + if !msg.AutoAck { + delivery.Nack(false, false) + } + } else { + if !msg.AutoAck { + delivery.Ack(false) + } + g.Log().Debugf(ctx, "RabbitMQ Worker %d 处理消息成功", workerID) + } + } + } +} + +// publishToRabbitMQ 发布消息到 RabbitMQ +func publishToRabbitMQ(ctx context.Context, exchange, routingKey string, message interface{}) (messageID string, err error) { + ch, err := getRabbitMQChannel() + if err != nil { + return + } + + body, err := gjson.Encode(message) + if err != nil { + return "", gerror.Newf("消息序列化失败: %v", err) + } + + err = ch.PublishWithContext( + ctx, + exchange, // exchange + routingKey, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + DeliveryMode: amqp.Persistent, + ContentType: "application/json", + Body: body, + }, + ) + + if err != nil { + g.Log().Errorf(ctx, "发布消息失败: exchange=%s, routingKey=%s, err=%v", exchange, routingKey, err) + return + } + + g.Log().Debugf(ctx, "消息发布成功: exchange=%s, routingKey=%s", exchange, routingKey) + return messageID, nil +} + +// publishDelayedToRabbitMQ 发布延时消息到 RabbitMQ +func publishDelayedToRabbitMQ(ctx context.Context, exchange, routingKey string, message interface{}, delaySeconds int) (messageID string, err error) { + ch, err := getRabbitMQChannel() + if err != nil { + return + } + + body, err := gjson.Encode(message) + if err != nil { + return "", gerror.Newf("消息序列化失败: %v", err) + } + + err = ch.PublishWithContext( + ctx, + exchange, // exchange(必须是 x-delayed-message 类型) + routingKey, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + DeliveryMode: amqp.Persistent, + ContentType: "application/json", + Body: body, + Headers: amqp.Table{ + "x-delay": delaySeconds * 1000, // 延时(毫秒) + }, + }, + ) + + if err != nil { + g.Log().Errorf(ctx, "发布延时消息失败: exchange=%s, routingKey=%s, delay=%ds, err=%v", exchange, routingKey, delaySeconds, err) + return + } + + g.Log().Debugf(ctx, "延时消息发布成功: exchange=%s, routingKey=%s, delay=%ds", exchange, routingKey, delaySeconds) + return messageID, nil +} diff --git a/message/redis.go b/message/redis.go new file mode 100644 index 0000000..9f28ebe --- /dev/null +++ b/message/redis.go @@ -0,0 +1,232 @@ +package message + +import ( + "context" + "errors" + "strings" + "time" + + "github.com/gogf/gf/v2/database/gredis" + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/os/glog" + "github.com/gogf/gf/v2/util/gconv" +) + +// StreamMessage Redis Stream 消息结构 +type StreamMessage struct { + ID string // 消息ID(自动生成) + Values map[string]interface{} // 消息内容 +} + +// getClient 获取 Redis 客户端 +func getRedisClient() *gredis.Redis { + return g.Redis() +} + +// getClient 获取 Redis 客户端 +func getRedisClientTest(name string) *gredis.Redis { + return g.Redis(name) +} + +// getRedisClientByDB 根据DB获取Redis客户端,如果db<=0则返回默认客户端 +func getRedisClientByDB(db int) *gredis.Redis { + if db <= 0 { + return g.Redis() + } + // 创建连接到指定DB的Redis客户端 + client, err := gredis.New(&gredis.Config{ + Address: g.Cfg().MustGet(context.Background(), "redis.default.address").String(), + Db: db, + }) + if err != nil { + glog.Errorf(context.Background(), "创建Redis客户端失败: %v", err) + return g.Redis() + } + return client +} + +// lock 分布式锁 +func lock(ctx context.Context, key string, expireSeconds int64, fn func(ctx context.Context) error) (success bool, err error) { + limit := 3 +LOOP: + if limit < 0 { + return false, errors.New("锁重试次数耗尽") + } + limit-- + if val, err := getRedisClient().Set(ctx, key, true, gredis.SetOption{ + TTLOption: gredis.TTLOption{ + EX: &expireSeconds, + }, + NX: true, + }); err != nil { + return false, err + } else { + if val.Bool() { + defer func(RedisClient *gredis.Redis, ctx context.Context, key string) { + if _, err = RedisClient.Del(ctx, key); err != nil { + glog.Errorf(ctx, "RedisClient.Del error: %v", err) + } + }(getRedisClient(), ctx, key) + if err = fn(ctx); err != nil { + return false, err + } + return true, nil + } else { + time.Sleep(time.Second) + goto LOOP + } + } +} + +// publishToRedis 将消息添加到 Redis Stream +func publishToRedis(ctx context.Context, streamKey string, msg interface{}) (messageID string, err error) { + values := gconv.Map(msg) + args := make([]interface{}, 0, len(values)*2+2) + args = append(args, streamKey, "*") + for key, val := range values { + args = append(args, key, val) + } + result, err := getRedisClient().Do(ctx, "XADD", args...) + if err != nil { + return + } + messageID = result.String() + return +} + +// initStreamGroup 初始化 Stream 和消费者组 +func initStreamGroup(ctx context.Context, streamKey, groupName string) error { + _, err := getRedisClient().Do(ctx, "XGROUP", "CREATE", streamKey, groupName, "0", "MKSTREAM") + if err != nil { + // 如果组已存在,忽略错误 + errStr := err.Error() + // 检查错误是否是 "BUSYGROUP Consumer Group name already exists" + if strings.Contains(errStr, "BUSYGROUP") || strings.Contains(errStr, "already exists") { + // 这是一个预期的情况,说明消费者组已经存在,无需处理 + return nil + } + // 这是一个真正的错误,需要记录或处理 + return err + } + return nil +} + +// readFromStream 从 Stream 读取消息 +func readFromStream(ctx context.Context, msg QueueMessage) error { + // 初始化 Stream 和消费者组 + if err := initStreamGroup(ctx, msg.StreamKey, msg.GroupName); err != nil { + return err + } + go func() { + RECONNECT: + for { + result, err := getRedisClient().Do(ctx, "XREADGROUP", "GROUP", msg.GroupName, msg.ConsumerName, "COUNT", msg.BatchSize, "BLOCK", 0, "STREAMS", msg.StreamKey, ">") + if err != nil { + select { + case <-ctx.Done(): + return + } + time.Sleep(time.Second) + goto RECONNECT + } + // 检查返回结果是否为空 + if result == nil || result.IsEmpty() { + continue + } + messages := make([]StreamMessage, 0, int(msg.BatchSize)) + // 尝试 map 格式(GoFrame gredis 返回) + if streamsMap, ok := result.Val().(map[interface{}]interface{}); ok { + for _, streamMsgs := range streamsMap { + msgsArray, ok := streamMsgs.([]interface{}) + if !ok { + continue + } + for _, msgData := range msgsArray { + msgArray, ok := msgData.([]interface{}) + if !ok || len(msgArray) < 2 { + continue + } + msgID := gconv.String(msgArray[0]) + fieldsArray, ok := msgArray[1].([]interface{}) + if !ok { + continue + } + values := make(map[string]interface{}, len(fieldsArray)/2) + for i := 0; i < len(fieldsArray); i += 2 { + if i+1 < len(fieldsArray) { + key := gconv.String(fieldsArray[i]) + values[key] = fieldsArray[i+1] + } + } + messages = append(messages, StreamMessage{ + ID: msgID, + Values: values, + }) + } + } + } + // 尝试数组格式(标准 Redis 返回) + if streamsArray, ok := result.Val().([]interface{}); ok && len(streamsArray) > 0 { + for _, streamData := range streamsArray { + streamArray, ok := streamData.([]interface{}) + if !ok || len(streamArray) < 2 { + continue + } + messagesArray, ok := streamArray[1].([]interface{}) + if !ok { + continue + } + for _, msgData := range messagesArray { + msgArray, ok := msgData.([]interface{}) + if !ok || len(msgArray) < 2 { + continue + } + msgID := gconv.String(msgArray[0]) + fieldsArray, ok := msgArray[1].([]interface{}) + if !ok { + continue + } + values := make(map[string]interface{}, len(fieldsArray)/2) + for i := 0; i < len(fieldsArray); i += 2 { + if i+1 < len(fieldsArray) { + key := gconv.String(fieldsArray[i]) + values[key] = fieldsArray[i+1] + } + } + messages = append(messages, StreamMessage{ + ID: msgID, + Values: values, + }) + } + } + } + // 处理消息 + for _, streamMsg := range messages { + // 业务处理 + if err := msg.HandleFunc(ctx, streamMsg.Values); err != nil { + glog.Infof(ctx, "业务处理失败-> err:%v\n", err) + continue + } + // 确认消息 + if msg.AutoAck { + err := ackMessage(ctx, msg.StreamKey, msg.GroupName, streamMsg.ID) + if err != nil { + glog.Infof(ctx, "消费者 '%s' 确认消息 ID %s 失败: %v\n", msg.ConsumerName, streamMsg.ID, err) + } + } + } + } + }() + return nil +} + +// ackMessage 确认消息已处理 +func ackMessage(ctx context.Context, streamKey, groupName string, messageIDs ...string) error { + args := make([]interface{}, 0, len(messageIDs)+2) + args = append(args, streamKey, groupName) + for _, id := range messageIDs { + args = append(args, id) + } + _, err := getRedisClient().Do(ctx, "XACK", args...) + return err +}