Files
assets/service/stock/capacity_service.go
2026-03-18 10:18:03 +08:00

290 lines
9.5 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// 库位容量管理服务
// 职责:库位/库区/仓库三级容量计算与同步,支持整入整出换算
// 调用链PrivateStock.Create/Update/Delete → UpdateLocationCapacity → SyncCapacityToZone → SyncCapacityToWarehouse
// 紧密耦合dao.Location(更新容量)、dao.Zone(汇总)、dao.Warehouse(汇总)、dao.UnitConversion(单位换算)
// 注意使用Redis分布式锁防止并发重算覆盖锁key格式 lock:location:{id}:capacity
package service
import (
"assets/consts/public"
"assets/consts/stock"
dao "assets/dao/stock"
dto "assets/model/dto/stock"
entityAsset "assets/model/entity/asset"
"context"
"fmt"
"math"
"gitea.com/red-future/common/db/mongo"
"gitea.com/red-future/common/jaeger"
"gitea.com/red-future/common/redis"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
"go.mongodb.org/mongo-driver/v2/bson"
)
var Capacity = new(capacity)
type capacity struct{}
// UpdateLocationCapacity 更新库位容量入口方法带Redis分布式锁
func (s *capacity) UpdateLocationCapacity(ctx context.Context, locationId *bson.ObjectID) (err error) {
// Redis分布式锁防止并发入库/出库同一库位时重算覆盖)
lockKey := fmt.Sprintf("lock:location:%s:capacity", locationId.Hex())
expireSeconds := int64(30)
var zoneId *bson.ObjectID
var capacityUnitType stock.CapacityUnitType
success, err := redis.Lock(ctx, lockKey, expireSeconds, func(ctx context.Context) error {
// 1. 查询库位信息
location, err := dao.Location.GetOne(ctx, &dto.GetLocationReq{Id: locationId})
if err != nil {
g.Log().Errorf(ctx, "查询库位失败: %v", err)
return err
}
zoneId = location.ZoneId
capacityUnitType = location.CapacityUnitType
// 2. 查询库位下所有库存记录
privateStocks, _, err := dao.PrivateStock.List(ctx, &dto.ListPrivateStockReq{
LocationId: locationId,
})
if err != nil {
g.Log().Errorf(ctx, "查询库位库存失败: %v", err)
return err
}
// 3. 批量查询PrivateSku避免N+1查询问题
skuIds := make([]*bson.ObjectID, 0, len(privateStocks))
for _, ps := range privateStocks {
if ps.PrivateSkuID != nil && ps.AvailableQty > 0 {
skuIds = append(skuIds, ps.PrivateSkuID)
}
}
// 批量查询PrivateSku并构建Map缓存
skuMap := make(map[string]*entityAsset.PrivateSku)
if len(skuIds) > 0 {
var skuList []*entityAsset.PrivateSku
filter := bson.M{"_id": bson.M{"$in": skuIds}}
_, err = mongo.DB().Find(ctx, filter, &skuList, public.PrivateSkuCollection, nil, nil)
if err != nil {
g.Log().Errorf(ctx, "批量查询PrivateSku失败: %v", err)
return err
}
// 构建Map缓存
for i := range skuList {
skuMap[skuList[i].Id.Hex()] = skuList[i]
}
}
// 4. 整入整出计算先按SKU聚合同库位的总数量同SKU可合箱再按库位单位换算
// 聚合同一SKU的总数量避免逐条取整导致容量虚高
// 例2批次各1瓶逐条取整=2箱(错误),聚合后取整=ceil(2/20)=1箱(正确)
skuQtyMap := make(map[string]int) // key: privateSkuId.Hex(), value: 总可用数量
for _, ps := range privateStocks {
if ps.PrivateSkuID == nil || ps.AvailableQty <= 0 {
continue
}
skuQtyMap[ps.PrivateSkuID.Hex()] += ps.AvailableQty
}
totalCapacity := 0
for skuIdHex, totalQty := range skuQtyMap {
// 从Map缓存中获取PrivateSku
privateSku, exists := skuMap[skuIdHex]
if !exists || privateSku == nil {
g.Log().Warningf(ctx, "PrivateSku不存在跳过: %s", skuIdHex)
continue
}
// 检查location和privateSku的Capacity是否为nil
if location.Capacity == nil || privateSku.Capacity.CapacityUnit == "" {
g.Log().Warningf(ctx, "库位或SKU容量信息不完整跳过")
continue
}
// 如果库存单位与库位单位相同,直接累加
if privateSku.Capacity.CapacityUnit == location.Capacity.CapacityUnit {
totalCapacity += totalQty
continue
}
// 不同单位需要换算
conversion, err := dao.UnitConversion.GetByUnits(ctx,
location.CapacityUnitType,
privateSku.Capacity.CapacityUnit,
location.Capacity.CapacityUnit,
)
if err != nil {
err = gerror.Newf("未找到单位换算规则 %s→%s请在系统中添加该换算规则",
privateSku.Capacity.CapacityUnit, location.Capacity.CapacityUnit)
jaeger.RecordError(ctx, err)
return err
}
// 检查换算系数是否为0防止除零错误
if conversion.ConversionFactor == 0 {
err = gerror.Newf("换算系数为0%s→%s请检查换算规则配置",
privateSku.Capacity.CapacityUnit, location.Capacity.CapacityUnit)
jaeger.RecordError(ctx, err)
return err
}
// 向上取整计算整入整出同SKU合箱后取整不足一箱按一箱计
convertedQty := int(math.Ceil(float64(totalQty) / conversion.ConversionFactor))
totalCapacity += convertedQty
g.Log().Debugf(ctx, "单位换算: %d%s ÷ %.2f = %d%s",
totalQty, privateSku.Capacity.CapacityUnit,
conversion.ConversionFactor, convertedQty, location.Capacity.CapacityUnit)
}
currentCapacity := totalCapacity
// 5. 更新库位容量
err = dao.Location.UpdateCapacity(ctx, locationId, currentCapacity)
if err != nil {
g.Log().Errorf(ctx, "更新库位容量失败: %v", err)
return err
}
g.Log().Infof(ctx, "库位容量更新成功: locationId=%s, 当前容量=%d",
locationId.Hex(), currentCapacity)
return nil
})
if !success {
return fmt.Errorf("获取库位容量锁失败: %v", err)
}
if err != nil {
return
}
// 6. 触发向上汇总到库区(在锁外执行,避免嵌套锁时间过长)
if zoneId != nil && !zoneId.IsZero() {
if syncErr := s.SyncCapacityToZone(ctx, zoneId, capacityUnitType); syncErr != nil {
g.Log().Errorf(ctx, "同步库区容量失败: %v", syncErr)
}
}
return
}
// SyncCapacityToZone 同步容量到库区带Redis分布式锁
func (s *capacity) SyncCapacityToZone(ctx context.Context, zoneId *bson.ObjectID, unitType stock.CapacityUnitType) (err error) {
// 1. Redis分布式锁
lockKey := fmt.Sprintf("lock:zone:%s:capacity:%s", zoneId.Hex(), unitType)
expireSeconds := int64(30) // 30秒超时
success, err := redis.Lock(ctx, lockKey, expireSeconds, func(ctx context.Context) error {
// 2. 查询该库区下所有使用该单位类型的库位
locations, err := dao.Location.ListByZoneAndUnitType(ctx, zoneId, unitType)
if err != nil {
return fmt.Errorf("查询库位列表失败: %v", err)
}
// 3. 汇总所有库位的当前容量
totalCapacity := 0
maxCapacity := 0
var capacityUnit string
for _, loc := range locations {
if loc.Capacity != nil {
totalCapacity += loc.Capacity.CurrentCapacity
maxCapacity += loc.Capacity.MaxCapacity
if capacityUnit == "" {
capacityUnit = loc.Capacity.CapacityUnit
}
}
}
// 4. 查询库区信息获取warehouseId
zone, err := dao.Zone.GetOne(ctx, &dto.GetZoneReq{Id: zoneId})
if err != nil {
return fmt.Errorf("查询库区失败: %v", err)
}
// 5. 更新库区该单位类型的容量
err = dao.Zone.UpdateCapacityByUnitType(ctx, zoneId, unitType, totalCapacity, maxCapacity, capacityUnit)
if err != nil {
return fmt.Errorf("更新库区容量失败: %v", err)
}
g.Log().Infof(ctx, "库区容量同步成功: zoneId=%s, unitType=%s, 当前容量=%d",
zoneId.Hex(), unitType, totalCapacity)
// 6. 触发向上汇总到仓库
if zone.WarehouseId != "" {
warehouseObjId, hexErr := bson.ObjectIDFromHex(zone.WarehouseId)
if hexErr != nil {
g.Log().Errorf(ctx, "库区WarehouseId格式错误: %s, %v", zone.WarehouseId, hexErr)
} else {
if syncErr := s.SyncCapacityToWarehouse(ctx, &warehouseObjId, unitType); syncErr != nil {
g.Log().Errorf(ctx, "同步仓库容量失败: %v", syncErr)
}
}
}
return nil
})
if !success {
return fmt.Errorf("获取Redis锁失败: %v", err)
}
return
}
// SyncCapacityToWarehouse 同步容量到仓库带Redis分布式锁
func (s *capacity) SyncCapacityToWarehouse(ctx context.Context, warehouseId *bson.ObjectID, unitType stock.CapacityUnitType) (err error) {
// 1. Redis分布式锁
lockKey := fmt.Sprintf("lock:warehouse:%s:capacity:%s", warehouseId.Hex(), unitType)
expireSeconds := int64(30) // 30秒超时
success, err := redis.Lock(ctx, lockKey, expireSeconds, func(ctx context.Context) error {
// 2. 查询该仓库下所有库区
zones, err := dao.Zone.ListByWarehouseAndUnitType(ctx, warehouseId.Hex())
if err != nil {
return fmt.Errorf("查询库区列表失败: %v", err)
}
// 3. 汇总所有库区该单位类型的容量
totalCapacity := 0
maxCapacity := 0
var capacityUnit string
for _, zone := range zones {
if zone.Capacity != nil {
if cap, exists := (*zone.Capacity)[unitType]; exists {
totalCapacity += cap.CurrentCapacity
maxCapacity += cap.MaxCapacity
if capacityUnit == "" {
capacityUnit = cap.CapacityUnit
}
}
}
}
// 4. 更新仓库该单位类型的容量
err = dao.Warehouse.UpdateCapacityByUnitType(ctx, warehouseId, unitType, totalCapacity, maxCapacity, capacityUnit)
if err != nil {
return fmt.Errorf("更新仓库容量失败: %v", err)
}
g.Log().Infof(ctx, "仓库容量同步成功: warehouseId=%s, unitType=%s, 当前容量=%d",
warehouseId.Hex(), unitType, totalCapacity)
return nil
})
if !success {
return fmt.Errorf("获取Redis锁失败: %v", err)
}
return
}
// ConvertWithCeil 向上取整换算(用于容量计算)
func (s *capacity) ConvertWithCeil(fromQty int, conversionFactor float64) int {
return int(math.Ceil(float64(fromQty) / conversionFactor))
}