redis消费队列接口优化
This commit is contained in:
@@ -8,7 +8,7 @@ type QueueMessage struct {
|
|||||||
ConsumerName string // 消费者名称
|
ConsumerName string // 消费者名称
|
||||||
Timeout int64 // 阻塞超时时间(毫秒)
|
Timeout int64 // 阻塞超时时间(毫秒)
|
||||||
BatchSize int64 // 最大并发数(信号量容量)
|
BatchSize int64 // 最大并发数(信号量容量)
|
||||||
BlockMs int64
|
BlockMs int64 // 阻塞时间
|
||||||
Block bool
|
AutoAck bool //ACK确认,true自动确认,false手动确认
|
||||||
HandleFunc func(ctx context.Context, message map[string]interface{}) error
|
HandleFunc func(ctx context.Context, message map[string]interface{}) error
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,7 +3,6 @@ package redis
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@@ -73,16 +72,17 @@ LOOP:
|
|||||||
|
|
||||||
func GetReadStream(ctx context.Context, msg ...QueueMessage) error {
|
func GetReadStream(ctx context.Context, msg ...QueueMessage) error {
|
||||||
for _, t := range msg {
|
for _, t := range msg {
|
||||||
err := GetReadFromStream(ctx, t.StreamKey, t.GroupName, t.ConsumerName, t.BatchSize, t.BlockMs, t.Block, t.HandleFunc)
|
err := GetReadFromStream(ctx, t.StreamKey, t.GroupName, t.ConsumerName, t.BatchSize, t.BlockMs, t.AutoAck, t.HandleFunc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
glog.Infof(ctx, "读取ReadFromStream数据失败-> 键名: %s, 消费者组: %s, 消费者名称%v\n, 失败err:%v\n", t.StreamKey, t.GroupName, t.ConsumerName, err)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetReadFromStream 读取ReadFromStream数据
|
// GetReadFromStream 读取ReadFromStream数据
|
||||||
func GetReadFromStream(ctx context.Context, streamKey, groupName, consumerName string, count, blockMs int64, Block bool, fn func(ctx context.Context, message map[string]interface{}) error) (err error) {
|
func GetReadFromStream(ctx context.Context, streamKey, groupName, consumerName string, count, blockMs int64, autoAck bool, fn func(ctx context.Context, message map[string]interface{}) error) (err error) {
|
||||||
glog.Infof(ctx, "初始化 Stream: %s, 消费者组: %s", streamKey, groupName)
|
glog.Infof(ctx, "初始化 Stream: %s, 消费者组: %s", streamKey, groupName)
|
||||||
err = InitStreamGroup(ctx, streamKey, groupName)
|
err = InitStreamGroup(ctx, streamKey, groupName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -97,13 +97,14 @@ func GetReadFromStream(ctx context.Context, streamKey, groupName, consumerName s
|
|||||||
}
|
}
|
||||||
// 处理消息
|
// 处理消息
|
||||||
for _, msg := range messages {
|
for _, msg := range messages {
|
||||||
fmt.Printf("消费者 '%s' -> 接收到消息 ID: %s, 内容: %v\n", consumerName, msg.ID, msg.Values)
|
glog.Infof(ctx, "消费者 '%s' -> 接收到消息 ID: %s, 内容: %v\n", consumerName, msg.ID, msg.Values)
|
||||||
// 业务处理
|
// 业务处理
|
||||||
if err = fn(ctx, msg.Values); err != nil {
|
if err = fn(ctx, msg.Values); err != nil {
|
||||||
return err
|
glog.Infof(ctx, "业务处理失败-> err:%v\n", err)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
// 确认消息 (ACK)
|
// 确认消息 (ACK)
|
||||||
if Block {
|
if autoAck {
|
||||||
// 处理成功后,必须调用 XAck,否则消息会一直留在 PEL 中
|
// 处理成功后,必须调用 XAck,否则消息会一直留在 PEL 中
|
||||||
err = AckMessage(ctx, streamKey, groupName, msg.ID)
|
err = AckMessage(ctx, streamKey, groupName, msg.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -94,6 +94,8 @@ func GetUserInfo(ctx context.Context) (user do.User, err error) {
|
|||||||
user.TenantId = dataMap["tenantId"]
|
user.TenantId = dataMap["tenantId"]
|
||||||
} else {
|
} else {
|
||||||
user.TenantId = ctx.Value("tenantId")
|
user.TenantId = ctx.Value("tenantId")
|
||||||
|
user.UserName = ctx.Value("userName")
|
||||||
|
fmt.Println("user.UserName==================", user.UserName)
|
||||||
}
|
}
|
||||||
if user.TenantId == nil {
|
if user.TenantId == nil {
|
||||||
return user, gerror.New("租户信息为空")
|
return user, gerror.New("租户信息为空")
|
||||||
|
|||||||
Reference in New Issue
Block a user