// 库存管理服务(Stock公共库存) // 职责:入库/出库操作,支持明细模式(StockDetails)和批次模式(StockBatch) // 调用链:Controller → StockOperation → stockPublishMessage → NATS → AddStock(消费者) // 紧密耦合:dao.StockDetails、dao.StockBatch、dao.AssetSku(更新库存数)、common/message(NATS发布) // 注意:移库/调拨是PrivateStock专属操作,不在此实现 package service import ( "assets/consts/public" "assets/consts/stock" assetDao "assets/dao/asset" dao "assets/dao/stock" assetDto "assets/model/dto/asset" stockDto "assets/model/dto/stock" assetEntity "assets/model/entity/asset" entity "assets/model/entity/stock" "context" "fmt" "gitea.com/red-future/common/beans" "gitea.com/red-future/common/redis" "gitea.com/red-future/common/utils" "github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/gtime" "github.com/gogf/gf/v2/util/gconv" "go.mongodb.org/mongo-driver/v2/bson" ) type stockManage struct{} // StockManage 库存管理服务(Stock公共库存) // 职责: // 1. 入库/出库:StockDetails(明细模式)和 StockBatch(批次模式)的库存操作 // 2. 库存查询表单字段生成 // 注意:移库(MoveStock)和调拨(TransferStock)是PrivateStock专属操作,不要在这里实现 var StockManage = new(stockManage) // GetStockFormFields 获取库存操作表单字段 func (s *stockManage) GetStockFormFields(ctx context.Context, req *stockDto.GetStockFormFieldsReq) (*stockDto.GetStockFormFieldsRes, error) { // 获取资产SKU信息 assetSku, err := assetDao.AssetSku.GetOne(ctx, &assetDto.GetAssetSkuReq{Id: req.AssetSkuId}) if err != nil { return nil, err } fields := make([]map[string]interface{}, 0) // Stock 字段在两种模式下都显示 fields = append(fields, map[string]interface{}{ "name": "stock", "label": "库存数量", "type": "number", "required": true, "min": 1, }) // 如果是批次模式(2),添加批次相关字段 if !g.IsEmpty(assetSku.StockMode) && assetSku.StockMode == stock.StockModeBatch { fields = append(fields, []map[string]interface{}{ { "name": "batchNo", "label": "批次号", "type": "number", "required": true, "default": gconv.Int(gtime.Now().Format("20060102") + "0001"), "maxLength": 12, }, { "name": "productionDate", "label": "生产日期", "type": "date", }, { "name": "expiryDate", "label": "过期日期", "type": "date", }, { "name": "expiryWarningDate", "label": "临期预警时间", "type": "date", }, }...) } return &stockDto.GetStockFormFieldsRes{ StockMode: assetSku.StockMode, Fields: fields, }, nil } // StockOperation 库存操作入口(入库/出库) // 根据SKU的StockMode区分明细模式和批次模式,计算差值后发布消息到NATS func (s *stockManage) StockOperation(ctx context.Context, req *stockDto.StockOperationReq) (err error) { assetSku, err := assetDao.AssetSku.GetOne(ctx, &assetDto.GetAssetSkuReq{Id: req.AssetSkuId}) if err != nil { return } if !assetSku.UnlimitedStock && req.Stock >= 0 { var stockId *bson.ObjectID count := 0 if assetSku.StockMode == stock.StockModeDetail { _count, err := dao.StockDetails.GetStockCountBySkuId(ctx, assetSku.Id) if err != nil { return err } count = gconv.Int(_count) } if assetSku.StockMode == stock.StockModeBatch { if g.IsEmpty(req.BatchNo) { return gerror.New("批次号不能为空") } getOne, err := dao.StockBatch.GetOne(ctx, req.BatchNo) if err != nil { return err } if !g.IsEmpty(getOne) { stockId = getOne.Id count = getOne.BatchQty } } stockCount := 0 operationType := "" if count != req.Stock { if count > req.Stock { stockCount = count - req.Stock operationType = "del" } else { stockCount = req.Stock - count operationType = "add" } } if !g.IsEmpty(operationType) && stockCount > 0 { if err = s.stockPublishMessage(ctx, assetSku, stockId, stockCount, operationType, req); err != nil { return err } } } return } // stockPublishMessage 发布库存变更消息到NATS // 消费者接收后执行实际的入库/出库操作(异步解耦) func (s *stockManage) stockPublishMessage(ctx context.Context, assetSku *assetEntity.AssetSku, stockId *bson.ObjectID, stockCount int, operationType string, req *stockDto.StockOperationReq) (err error) { // 用户信息 user, err := utils.GetUserInfo(ctx) if err != nil { return } publishMessage := stockDto.StockPublishMessage{ AssetId: assetSku.AssetId, AssetSkuId: assetSku.Id, TenantId: user.TenantId, UserName: user.UserName, StockCount: stockCount, OperationType: operationType, Metadata: gconv.Maps(assetSku.SpecValues), StockMode: int(assetSku.StockMode), BatchNo: req.BatchNo, ProductionDate: req.ProductionDate, ExpiryDate: req.ExpiryDate, ExpiryWarningDate: req.ExpiryWarningDate, } if !g.IsEmpty(stockId) && !stockId.IsZero() { publishMessage.StockId = stockId.Hex() } // 发布到 NATS //plugin, err := message.GetMsgPlugin(ctx, message.MessageNATS) //if err != nil { // return gerror.Newf("NATS插件未就绪: %v", err) //} //err = plugin.Publish(ctx, &message.NatsPublishMsgConfig{ // QueueName: public.StockDetailGroupName, // Durable: true, // Data: publishMessage, //}) //_, err = message.PublishMessage(ctx, &message.RedisMessageConfig{StreamKey: public.StockDetailStreamKey}, publishMessage) //plugin, err := message.GetMsgPlugin(message.MessageRedis) //if err != nil { // return err //} //err = plugin.Publish(ctx, &message.RedisPublishMsgConfig{ // QueueName: public.StockDetailQueueName, // Data: publishMessage, //}) return } // AddStock NATS消费者调用,执行实际的入库/出库操作 // 使用Redis分布式锁防止并发冲突,支持明细模式和批次模式 func (s *stockManage) AddStock(ctx context.Context, msg map[string]interface{}) error { assetId := gconv.Int64(msg["assetId"]) assetSkuId := gconv.Int64(msg["assetSkuId"]) stockId := gconv.Int64(msg["stockId"]) userName := gconv.String(msg["userName"]) tenantId := gconv.Float64(msg["tenantId"]) stockCount := gconv.Int(msg["stockCount"]) operationType := gconv.String(msg["operationType"]) metadata := gconv.Maps(msg["metadata"]) stockMode := stock.StockMode(gconv.Int(msg["stockMode"])) batchNo := gconv.String(msg["batchNo"]) productionDate := gtime.New(msg["productionDate"]) expiryDate := gtime.New(msg["expiryDate"]) expiryWarningDate := gtime.New(msg["expiryWarningDate"]) // 设置 userId 和 tenantId 到 ctx ctx = context.WithValue(ctx, "userName", userName) ctx = context.WithValue(ctx, "tenantId", tenantId) // 获取redis-租户存储-锁key fileLockKey := fmt.Sprintf(public.StockDetailLockKey, assetSkuId) success, err := redis.Lock(ctx, fileLockKey, int64(60), func(ctx context.Context) error { if operationType == "add" { if stockMode == stock.StockModeBatch { if !g.IsEmpty(stockId) { batch := stockDto.UpdateBatchReq{ Id: stockId, BatchQty: stockCount, AvailableQty: stockCount, } if err := dao.StockBatch.Update(ctx, &batch); err != nil { return err } } else { batch := stockDto.CreateBatchReq{ AssetId: assetId, AssetSkuId: assetSkuId, Status: stock.BatchStatusActive, Metadata: metadata, BatchNo: batchNo, BatchQty: stockCount, AvailableQty: stockCount, ProductionDate: productionDate, ExpiryDate: expiryDate, ExpiryWarningDate: expiryWarningDate, } if _, err := dao.StockBatch.Insert(ctx, &batch); err != nil { return err } } } if stockMode == stock.StockModeDetail { // 创建指定数量的库存 var stockInterfaces []interface{} for i := 0; i < stockCount; i++ { stockInterfaces = append(stockInterfaces, entity.StockDetails{ AssetId: assetId, AssetSkuId: assetSkuId, Status: stock.StockStatusAvailable, Metadata: metadata, }) } // 批量插入数据库 if _, err := dao.StockDetails.BatchInsert(ctx, stockInterfaces); err != nil { return err } } } if operationType == "del" { if stockMode == stock.StockModeBatch { stockCount = 0 - stockCount // 更新批次 batch := stockDto.UpdateBatchReq{ Id: stockId, BatchQty: stockCount, AvailableQty: stockCount, } if err := dao.StockBatch.Update(ctx, &batch); err != nil { return err } } if stockMode == stock.StockModeDetail { // 分页查询所有库存明细,收集所有ID var allStockIds []*bson.ObjectID pageSize := int64(50) for pageNum := int64(1); ; pageNum++ { details, total, err := dao.StockDetails.List(ctx, &stockDto.ListStockDetailsReq{ AssetSkuId: assetSkuId, Status: stock.StockStatusAvailable, Page: &beans.Page{PageNum: pageNum, PageSize: pageSize}, }) if err != nil { return err } if pageNum == 1 && int(total) < stockCount { return gerror.New("可操作库存数量不足") } // 收集当前页的ID for _, detail := range details { if detail.Id != nil && !detail.Id.IsZero() { allStockIds = append(allStockIds, detail.Id) if len(allStockIds) >= stockCount { break } } } if len(allStockIds) >= stockCount { break } } // 根据ID批量删除库存 delCount, err := dao.StockDetails.DeleteManyByIds(ctx, allStockIds) if err != nil { return err } if delCount != int64(stockCount) { return gerror.New("删除库存数量不匹配") } stockCount = 0 - stockCount } } _, err := assetDao.AssetSku.Update(ctx, &assetDto.UpdateAssetSkuReq{Id: assetSkuId, Stock: stockCount}) return err }) if err != nil { return err } if !success { return fmt.Errorf("获取库存操作锁失败: %v", err) } return nil }