添加消息队列消费者并优化租户存储总量更新逻辑
This commit is contained in:
@@ -17,16 +17,16 @@ type tenantOssTotal struct {
|
||||
}
|
||||
|
||||
// SaveOrUpdate 增加或更新
|
||||
func (d *tenantOssTotal) SaveOrUpdate(ctx context.Context, updateData []*entity.TenantOssTotal) (err error) {
|
||||
func (d *tenantOssTotal) SaveOrUpdate(ctx context.Context, updateData []*dto.UpdateUsedOssReq) (err error) {
|
||||
if !g.IsEmpty(updateData) {
|
||||
var filter, update []bson.M
|
||||
for _, v := range updateData {
|
||||
bsonm, err := mongo.EntityToBsonWithFilter(v, true)
|
||||
buildUpdateData, err := mongo.BuildUpdateData(ctx, v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
filter = append(filter, bson.M{"tenantId": v.TenantId})
|
||||
update = append(update, bson.M{"$set": bsonm})
|
||||
update = append(update, bson.M{"$set": buildUpdateData})
|
||||
}
|
||||
_, err = MongoDAO.SaveOrUpdate(ctx, filter, update, consts.TenantOssTotalCollection)
|
||||
if err != nil {
|
||||
|
||||
17
main.go
17
main.go
@@ -2,6 +2,9 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"gitee.com/red-future---jilin-g/common/log/consts"
|
||||
"gitee.com/red-future---jilin-g/common/message"
|
||||
"gitee.com/red-future---jilin-g/common/mongo"
|
||||
"github.com/gogf/gf/v2/os/glog"
|
||||
"github.com/gogf/gf/v2/os/gtimer"
|
||||
"oss/controller"
|
||||
@@ -10,7 +13,7 @@ import (
|
||||
|
||||
"gitee.com/red-future---jilin-g/common/http"
|
||||
"gitee.com/red-future---jilin-g/common/jaeger"
|
||||
_ "gitee.com/red-future---jilin-g/common/mongo"
|
||||
logService "gitee.com/red-future---jilin-g/common/log/service"
|
||||
_ "github.com/gogf/gf/contrib/nosql/redis/v2"
|
||||
)
|
||||
|
||||
@@ -30,6 +33,18 @@ func main() {
|
||||
}
|
||||
})
|
||||
|
||||
// 启动消息队列消费者
|
||||
if err := message.StartConsumers(ctx, &message.RedisMessageConfig{
|
||||
StreamKey: mongo.LogRedisKey,
|
||||
GroupName: consts.GroupName,
|
||||
ConsumerName: consts.ConsumerName,
|
||||
BatchSize: consts.BatchSize,
|
||||
AutoAck: consts.AutoAck,
|
||||
HandleFunc: logService.OperationLog.AddOperationLog,
|
||||
}); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// 保持应用运行
|
||||
select {}
|
||||
}
|
||||
|
||||
@@ -15,3 +15,17 @@ type GetByTenantIdReq struct {
|
||||
type GetByTenantIdRes struct {
|
||||
*entity.TenantOssTotal
|
||||
}
|
||||
|
||||
// UpdateUsedOssReq 更新使用存储总量请求
|
||||
type UpdateUsedOssReq struct {
|
||||
g.Meta `path:"/GetOneByTenantId" method:"get" tags:"租户存储总量管理" summary:"更新使用存储总量" dc:"更新使用存储总量"`
|
||||
|
||||
TenantId interface{} `json:"tenantId" v:"required#租户id不能为空"`
|
||||
UsedOssSize int `bson:"usedOssSize" json:"usedOssSize"`
|
||||
TotalOssSize int `bson:"totalOssSize" json:"totalOssSize"`
|
||||
Updater interface{} `json:"updater" v:"required#更新人不能为空"`
|
||||
}
|
||||
|
||||
// UpdateUsedOssRes 更新使用存储总量响应
|
||||
type UpdateUsedOssRes struct {
|
||||
}
|
||||
|
||||
@@ -8,7 +8,6 @@ import (
|
||||
"oss/consts"
|
||||
"oss/dao"
|
||||
"oss/model/dto"
|
||||
"oss/model/entity"
|
||||
)
|
||||
|
||||
type tenantOssTotal struct{}
|
||||
@@ -32,13 +31,13 @@ func (s *tenantOssTotal) UpdateUsedOssSize(ctx context.Context) (err error) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
updateData := make([]*entity.TenantOssTotal, 0)
|
||||
updateData := make([]*dto.UpdateUsedOssReq, 0)
|
||||
for _, key := range keys {
|
||||
get, err := g.Redis().Get(ctx, key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e := &entity.TenantOssTotal{}
|
||||
e := new(dto.UpdateUsedOssReq)
|
||||
err = gconv.Struct(get, e)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
Reference in New Issue
Block a user