Files
assets/service/stock/stock_manage_service.go

319 lines
10 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// 库存管理服务Stock公共库存
// 职责:入库/出库操作,支持明细模式(StockDetails)和批次模式(StockBatch)
// 调用链Controller → StockOperation → stockPublishMessage → NATS → AddStock(消费者)
// 紧密耦合dao.StockDetails、dao.StockBatch、dao.AssetSku(更新库存数)、common/message(NATS发布)
// 注意:移库/调拨是PrivateStock专属操作不在此实现
package service
import (
"assets/consts/public"
"assets/consts/stock"
assetDao "assets/dao/asset"
dao "assets/dao/stock"
assetDto "assets/model/dto/asset"
stockDto "assets/model/dto/stock"
assetEntity "assets/model/entity/asset"
entity "assets/model/entity/stock"
"context"
"fmt"
"gitea.com/red-future/common/beans"
"gitea.com/red-future/common/redis"
"gitea.com/red-future/common/utils"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gtime"
"github.com/gogf/gf/v2/util/gconv"
"go.mongodb.org/mongo-driver/v2/bson"
)
type stockManage struct{}
// StockManage 库存管理服务Stock公共库存
// 职责:
// 1. 入库/出库StockDetails明细模式和 StockBatch批次模式的库存操作
// 2. 库存查询表单字段生成
// 注意移库MoveStock和调拨TransferStock是PrivateStock专属操作不要在这里实现
var StockManage = new(stockManage)
// GetStockFormFields 获取库存操作表单字段
func (s *stockManage) GetStockFormFields(ctx context.Context, req *stockDto.GetStockFormFieldsReq) (*stockDto.GetStockFormFieldsRes, error) {
// 获取资产SKU信息
assetSku, err := assetDao.AssetSku.GetOne(ctx, &assetDto.GetAssetSkuReq{Id: req.AssetSkuId})
if err != nil {
return nil, err
}
fields := make([]map[string]interface{}, 0)
// Stock 字段在两种模式下都显示
fields = append(fields, map[string]interface{}{
"name": "stock",
"label": "库存数量",
"type": "number",
"required": true,
"min": 1,
})
// 如果是批次模式(2),添加批次相关字段
if !g.IsEmpty(assetSku.StockMode) && assetSku.StockMode == stock.StockModeBatch {
fields = append(fields, []map[string]interface{}{
{
"name": "batchNo",
"label": "批次号",
"type": "number",
"required": true,
"default": gconv.Int(gtime.Now().Format("20060102") + "0001"),
"maxLength": 12,
},
{
"name": "productionDate",
"label": "生产日期",
"type": "date",
},
{
"name": "expiryDate",
"label": "过期日期",
"type": "date",
},
{
"name": "expiryWarningDate",
"label": "临期预警时间",
"type": "date",
},
}...)
}
return &stockDto.GetStockFormFieldsRes{
StockMode: assetSku.StockMode,
Fields: fields,
}, nil
}
// StockOperation 库存操作入口(入库/出库)
// 根据SKU的StockMode区分明细模式和批次模式计算差值后发布消息到NATS
func (s *stockManage) StockOperation(ctx context.Context, req *stockDto.StockOperationReq) (err error) {
assetSku, err := assetDao.AssetSku.GetOne(ctx, &assetDto.GetAssetSkuReq{Id: req.AssetSkuId})
if err != nil {
return
}
if !assetSku.UnlimitedStock && req.Stock >= 0 {
var stockId *bson.ObjectID
count := 0
if assetSku.StockMode == stock.StockModeDetail {
_count, err := dao.StockDetails.GetStockCountBySkuId(ctx, assetSku.Id)
if err != nil {
return err
}
count = gconv.Int(_count)
}
if assetSku.StockMode == stock.StockModeBatch {
if g.IsEmpty(req.BatchNo) {
return gerror.New("批次号不能为空")
}
getOne, err := dao.StockBatch.GetOne(ctx, req.BatchNo)
if err != nil {
return err
}
if !g.IsEmpty(getOne) {
stockId = getOne.Id
count = getOne.BatchQty
}
}
stockCount := 0
operationType := ""
if count != req.Stock {
if count > req.Stock {
stockCount = count - req.Stock
operationType = "del"
} else {
stockCount = req.Stock - count
operationType = "add"
}
}
if !g.IsEmpty(operationType) && stockCount > 0 {
if err = s.stockPublishMessage(ctx, assetSku, stockId, stockCount, operationType, req); err != nil {
return err
}
}
}
return
}
// stockPublishMessage 发布库存变更消息到NATS
// 消费者接收后执行实际的入库/出库操作(异步解耦)
func (s *stockManage) stockPublishMessage(ctx context.Context, assetSku *assetEntity.AssetSku, stockId *bson.ObjectID, stockCount int, operationType string, req *stockDto.StockOperationReq) (err error) {
// 用户信息
user, err := utils.GetUserInfo(ctx)
if err != nil {
return
}
publishMessage := stockDto.StockPublishMessage{
AssetId: assetSku.AssetId,
AssetSkuId: assetSku.Id,
TenantId: user.TenantId,
UserName: user.UserName,
StockCount: stockCount,
OperationType: operationType,
Metadata: gconv.Maps(assetSku.SpecValues),
StockMode: int(assetSku.StockMode),
BatchNo: req.BatchNo,
ProductionDate: req.ProductionDate,
ExpiryDate: req.ExpiryDate,
ExpiryWarningDate: req.ExpiryWarningDate,
}
if !g.IsEmpty(stockId) && !stockId.IsZero() {
publishMessage.StockId = stockId.Hex()
}
// 发布到 NATS
//plugin, err := message.GetMsgPlugin(ctx, message.MessageNATS)
//if err != nil {
// return gerror.Newf("NATS插件未就绪: %v", err)
//}
//err = plugin.Publish(ctx, &message.NatsPublishMsgConfig{
// QueueName: public.StockDetailGroupName,
// Durable: true,
// Data: publishMessage,
//})
//_, err = message.PublishMessage(ctx, &message.RedisMessageConfig{StreamKey: public.StockDetailStreamKey}, publishMessage)
//plugin, err := message.GetMsgPlugin(message.MessageRedis)
//if err != nil {
// return err
//}
//err = plugin.Publish(ctx, &message.RedisPublishMsgConfig{
// QueueName: public.StockDetailQueueName,
// Data: publishMessage,
//})
return
}
// AddStock NATS消费者调用执行实际的入库/出库操作
// 使用Redis分布式锁防止并发冲突支持明细模式和批次模式
func (s *stockManage) AddStock(ctx context.Context, msg map[string]interface{}) error {
assetId := gconv.Int64(msg["assetId"])
assetSkuId := gconv.Int64(msg["assetSkuId"])
stockId := gconv.Int64(msg["stockId"])
userName := gconv.String(msg["userName"])
tenantId := gconv.Float64(msg["tenantId"])
stockCount := gconv.Int(msg["stockCount"])
operationType := gconv.String(msg["operationType"])
metadata := gconv.Maps(msg["metadata"])
stockMode := stock.StockMode(gconv.Int(msg["stockMode"]))
batchNo := gconv.String(msg["batchNo"])
productionDate := gtime.New(msg["productionDate"])
expiryDate := gtime.New(msg["expiryDate"])
expiryWarningDate := gtime.New(msg["expiryWarningDate"])
// 设置 userId 和 tenantId 到 ctx
ctx = context.WithValue(ctx, "userName", userName)
ctx = context.WithValue(ctx, "tenantId", tenantId)
// 获取redis-租户存储-锁key
fileLockKey := fmt.Sprintf(public.StockDetailLockKey, assetSkuId)
success, err := redis.Lock(ctx, fileLockKey, int64(60), func(ctx context.Context) error {
if operationType == "add" {
if stockMode == stock.StockModeBatch {
if !g.IsEmpty(stockId) {
batch := stockDto.UpdateBatchReq{
Id: stockId,
BatchQty: stockCount,
AvailableQty: stockCount,
}
if err := dao.StockBatch.Update(ctx, &batch); err != nil {
return err
}
} else {
batch := stockDto.CreateBatchReq{
AssetId: assetId,
AssetSkuId: assetSkuId,
Status: stock.BatchStatusActive,
Metadata: metadata,
BatchNo: batchNo,
BatchQty: stockCount,
AvailableQty: stockCount,
ProductionDate: productionDate,
ExpiryDate: expiryDate,
ExpiryWarningDate: expiryWarningDate,
}
if _, err := dao.StockBatch.Insert(ctx, &batch); err != nil {
return err
}
}
}
if stockMode == stock.StockModeDetail {
// 创建指定数量的库存
var stockInterfaces []interface{}
for i := 0; i < stockCount; i++ {
stockInterfaces = append(stockInterfaces, entity.StockDetails{
AssetId: assetId,
AssetSkuId: assetSkuId,
Status: stock.StockStatusAvailable,
Metadata: metadata,
})
}
// 批量插入数据库
if _, err := dao.StockDetails.BatchInsert(ctx, stockInterfaces); err != nil {
return err
}
}
}
if operationType == "del" {
if stockMode == stock.StockModeBatch {
stockCount = 0 - stockCount
// 更新批次
batch := stockDto.UpdateBatchReq{
Id: stockId,
BatchQty: stockCount,
AvailableQty: stockCount,
}
if err := dao.StockBatch.Update(ctx, &batch); err != nil {
return err
}
}
if stockMode == stock.StockModeDetail {
// 分页查询所有库存明细收集所有ID
var allStockIds []*bson.ObjectID
pageSize := int64(50)
for pageNum := int64(1); ; pageNum++ {
details, total, err := dao.StockDetails.List(ctx,
&stockDto.ListStockDetailsReq{
AssetSkuId: assetSkuId,
Status: stock.StockStatusAvailable,
Page: &beans.Page{PageNum: pageNum, PageSize: pageSize},
})
if err != nil {
return err
}
if pageNum == 1 && int(total) < stockCount {
return gerror.New("可操作库存数量不足")
}
// 收集当前页的ID
for _, detail := range details {
if detail.Id != nil && !detail.Id.IsZero() {
allStockIds = append(allStockIds, detail.Id)
if len(allStockIds) >= stockCount {
break
}
}
}
if len(allStockIds) >= stockCount {
break
}
}
// 根据ID批量删除库存
delCount, err := dao.StockDetails.DeleteManyByIds(ctx, allStockIds)
if err != nil {
return err
}
if delCount != int64(stockCount) {
return gerror.New("删除库存数量不匹配")
}
stockCount = 0 - stockCount
}
}
_, err := assetDao.AssetSku.Update(ctx, &assetDto.UpdateAssetSkuReq{Id: assetSkuId, Stock: stockCount})
return err
})
if err != nil {
return err
}
if !success {
return fmt.Errorf("获取库存操作锁失败: %v", err)
}
return nil
}