redis使用stream方式
This commit is contained in:
@@ -88,57 +88,70 @@ func (w *WorkerPool) PrintStats(ctx context.Context) {
|
||||
glog.Infof(ctx, "协程池统计 - 池大小: %d, 等待任务: %d", stats.PoolSize, stats.Jobs)
|
||||
}
|
||||
|
||||
// QueueProcessor 队列处理器,从 Redis 队列中取出任务并提交到协程池
|
||||
// QueueProcessor Stream 处理器,从 Redis Stream 中取出任务并提交到协程池
|
||||
type QueueProcessor struct {
|
||||
pool *WorkerPool
|
||||
queueKey string
|
||||
timeout int
|
||||
stopChan chan struct{}
|
||||
handleFunc func(ctx context.Context, message string) error
|
||||
pool *WorkerPool
|
||||
streamKey string // Stream 键名
|
||||
groupName string // 消费者组名称
|
||||
consumerName string // 消费者名称
|
||||
timeout int64 // 阻塞超时时间(毫秒)
|
||||
batchSize int64 // 每次读取的消息数量
|
||||
stopChan chan struct{}
|
||||
handleFunc func(ctx context.Context, message map[string]interface{}) error
|
||||
}
|
||||
|
||||
// NewQueueProcessor 创建队列处理器
|
||||
// NewQueueProcessor 创建 Stream 处理器
|
||||
// 参数:
|
||||
// - pool: 协程池
|
||||
// - queueKey: Redis 队列键名
|
||||
// - timeout: 从队列取消息的超时时间(秒)
|
||||
// - streamKey: Redis Stream 键名
|
||||
// - groupName: 消费者组名称
|
||||
// - consumerName: 消费者名称(唯一标识)
|
||||
// - timeout: 从 Stream 取消息的超时时间(毫秒)
|
||||
// - batchSize: 每次读取的消息数量
|
||||
// - handleFunc: 消息处理函数
|
||||
func NewQueueProcessor(pool *WorkerPool, queueKey string, timeout int, handleFunc func(ctx context.Context, message string) error) *QueueProcessor {
|
||||
func NewQueueProcessor(pool *WorkerPool, streamKey, groupName, consumerName string, timeout int64, batchSize int64, handleFunc func(ctx context.Context, message map[string]interface{}) error) *QueueProcessor {
|
||||
return &QueueProcessor{
|
||||
pool: pool,
|
||||
queueKey: queueKey,
|
||||
timeout: timeout,
|
||||
stopChan: make(chan struct{}),
|
||||
handleFunc: handleFunc,
|
||||
pool: pool,
|
||||
streamKey: streamKey,
|
||||
groupName: groupName,
|
||||
consumerName: consumerName,
|
||||
timeout: timeout,
|
||||
batchSize: batchSize,
|
||||
stopChan: make(chan struct{}),
|
||||
handleFunc: handleFunc,
|
||||
}
|
||||
}
|
||||
|
||||
// Start 启动队列处理器
|
||||
// 会阻塞运行,持续从 Redis 队列中取出消息并提交到协程池处理
|
||||
// Start 启动 Stream 处理器
|
||||
// 会阻塞运行,持续从 Redis Stream 中取出消息并提交到协程池处理
|
||||
func (q *QueueProcessor) Start(ctx context.Context) error {
|
||||
glog.Infof(ctx, "队列处理器启动 - 队列: %s, 超时: %ds", q.queueKey, q.timeout)
|
||||
glog.Infof(ctx, "Stream 处理器启动 - Stream: %s, 消费者组: %s, 消费者: %s, 超时: %dms",
|
||||
q.streamKey, q.groupName, q.consumerName, q.timeout)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-q.stopChan:
|
||||
glog.Info(ctx, "队列处理器收到停止信号")
|
||||
glog.Info(ctx, "Stream 处理器收到停止信号")
|
||||
return nil
|
||||
default:
|
||||
// 从 Redis 队列中取出消息
|
||||
message, err := q.fetchMessage(ctx)
|
||||
// 从 Redis Stream 中读取消息
|
||||
messages, err := q.fetchMessages(ctx)
|
||||
if err != nil {
|
||||
glog.Errorf(ctx, "从队列取消息失败: %v", err)
|
||||
glog.Errorf(ctx, "从 Stream 读取消息失败: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// 队列为空,继续等待
|
||||
if message == "" {
|
||||
// 没有新消息,继续等待
|
||||
if len(messages) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// 提交到协程池处理
|
||||
if err := q.submitTask(ctx, message); err != nil {
|
||||
glog.Errorf(ctx, "提交任务到协程池失败: %v", err)
|
||||
// 处理每条消息
|
||||
for _, msg := range messages {
|
||||
// 提交到协程池处理
|
||||
if err := q.submitTask(ctx, msg); err != nil {
|
||||
glog.Errorf(ctx, "提交任务到协程池失败: %v, 消息ID: %s", err, msg.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -149,17 +162,26 @@ func (q *QueueProcessor) Stop() {
|
||||
close(q.stopChan)
|
||||
}
|
||||
|
||||
// fetchMessage 从 Redis 队列中取出消息
|
||||
func (q *QueueProcessor) fetchMessage(ctx context.Context) (string, error) {
|
||||
// 调用 Redis 队列的 PopFromQueue 方法从队列中取出消息
|
||||
return redis.PopFromQueue(ctx, q.queueKey, q.timeout)
|
||||
// fetchMessages 从 Redis Stream 中读取消息
|
||||
func (q *QueueProcessor) fetchMessages(ctx context.Context) ([]redis.StreamMessage, error) {
|
||||
// 从消费者组读取消息
|
||||
return redis.ReadFromStream(ctx, q.streamKey, q.groupName, q.consumerName, q.batchSize, q.timeout)
|
||||
}
|
||||
|
||||
// submitTask 将消息处理任务提交到协程池
|
||||
func (q *QueueProcessor) submitTask(ctx context.Context, message string) error {
|
||||
func (q *QueueProcessor) submitTask(ctx context.Context, message redis.StreamMessage) error {
|
||||
return q.pool.Submit(ctx, func(ctx context.Context) {
|
||||
if err := q.handleFunc(ctx, message); err != nil {
|
||||
glog.Errorf(ctx, "处理消息失败: %v, 消息: %s", err, message)
|
||||
// 处理消息
|
||||
if err := q.handleFunc(ctx, message.Values); err != nil {
|
||||
glog.Errorf(ctx, "处理消息失败: %v, 消息ID: %s", err, message.ID)
|
||||
return
|
||||
}
|
||||
|
||||
// 处理成功后确认消息
|
||||
if err := redis.AckMessage(ctx, q.streamKey, q.groupName, message.ID); err != nil {
|
||||
glog.Errorf(ctx, "确认消息失败: %v, 消息ID: %s", err, message.ID)
|
||||
} else {
|
||||
glog.Debugf(ctx, "消息处理完成并已确认: %s", message.ID)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user