293 lines
8.3 KiB
Go
293 lines
8.3 KiB
Go
package service
|
||
|
||
import (
|
||
consts "assets/consts/public"
|
||
dao "assets/dao/sync"
|
||
dto "assets/model/dto/sync"
|
||
entity "assets/model/entity/sync"
|
||
"context"
|
||
|
||
"gitea.com/red-future/common/utils"
|
||
"github.com/gogf/gf/v2/frame/g"
|
||
"github.com/gogf/gf/v2/os/grpool"
|
||
"go.mongodb.org/mongo-driver/v2/bson"
|
||
)
|
||
|
||
type syncService struct{}
|
||
|
||
// Sync 同步服务
|
||
var Sync = new(syncService)
|
||
|
||
// PlatformFactory 平台服务工厂实例
|
||
var platformFactory = NewPlatformServiceFactory()
|
||
|
||
// SyncPool 同步任务协程池,限制并发数避免goroutine爆炸
|
||
var SyncPool = grpool.New(20)
|
||
|
||
// CreateSyncTask 创建同步任务
|
||
func (s *syncService) CreateSyncTask(ctx context.Context, req *dto.CreateSyncTaskReq) (*bson.ObjectID, error) {
|
||
task := &entity.SyncTask{
|
||
Platform: req.Platform,
|
||
SyncType: req.SyncType,
|
||
Status: consts.SyncStatusPending,
|
||
AssetID: req.AssetID,
|
||
AssetSKUID: req.AssetSKUID,
|
||
StockID: req.StockID,
|
||
ErrorMessage: "",
|
||
ErrorCount: 0,
|
||
}
|
||
|
||
err := dao.SyncTask.Insert(ctx, task)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
return task.Id, nil
|
||
}
|
||
|
||
// GetSyncTask 获取同步任务详情
|
||
func (s *syncService) GetSyncTask(ctx context.Context, id *bson.ObjectID) (res *dto.GetSyncTaskRes, err error) {
|
||
task, err := dao.SyncTask.GetOne(ctx, id)
|
||
if err != nil {
|
||
return
|
||
}
|
||
res = &dto.GetSyncTaskRes{}
|
||
err = utils.Struct(task, &res.SyncTaskItem)
|
||
return
|
||
}
|
||
|
||
// ListSyncTasks 获取同步任务列表
|
||
func (s *syncService) ListSyncTasks(ctx context.Context, req *dto.ListSyncTaskReq) (list []*dto.SyncTaskItem, total int64, err error) {
|
||
tasks, total, err := dao.SyncTask.List(ctx, req)
|
||
if err != nil {
|
||
return
|
||
}
|
||
err = utils.Struct(tasks, &list)
|
||
return
|
||
}
|
||
|
||
// UpdateSyncTaskStatus 更新同步任务状态
|
||
func (s *syncService) UpdateSyncTaskStatus(ctx context.Context, req *dto.UpdateSyncTaskStatusReq) error {
|
||
return dao.SyncTask.UpdateStatus(ctx, req.ID, req.Status, req.ErrorMessage)
|
||
}
|
||
|
||
// SyncAsset 同步资产
|
||
func (s *syncService) SyncAsset(ctx context.Context, req *dto.SyncAssetReq) (*bson.ObjectID, error) {
|
||
// 创建同步任务
|
||
taskReq := &dto.CreateSyncTaskReq{
|
||
Platform: req.Platform,
|
||
SyncType: consts.SyncTypeIncremental,
|
||
AssetID: req.AssetID,
|
||
}
|
||
|
||
taskID, err := s.CreateSyncTask(ctx, taskReq)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// 异步执行同步任务(使用协程池限制并发)
|
||
asyncCtx := context.WithoutCancel(ctx)
|
||
SyncPool.Add(asyncCtx, func(ctx context.Context) {
|
||
s.executeAssetSync(ctx, taskID, req.AssetID, req.Platform)
|
||
})
|
||
|
||
return taskID, nil
|
||
}
|
||
|
||
// SyncAssetSku 同步资产SKU
|
||
func (s *syncService) SyncAssetSku(ctx context.Context, req *dto.SyncAssetSkuReq) (*bson.ObjectID, error) {
|
||
taskReq := &dto.CreateSyncTaskReq{
|
||
Platform: req.Platform,
|
||
SyncType: consts.SyncTypeIncremental,
|
||
AssetSKUID: req.AssetSKUID,
|
||
}
|
||
|
||
taskID, err := s.CreateSyncTask(ctx, taskReq)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// 异步执行同步任务(使用协程池限制并发)
|
||
asyncCtx := context.WithoutCancel(ctx)
|
||
SyncPool.Add(asyncCtx, func(ctx context.Context) {
|
||
s.executeAssetSkuSync(ctx, taskID, req.AssetSKUID, req.Platform)
|
||
})
|
||
|
||
return taskID, nil
|
||
}
|
||
|
||
// SyncStock 同步库存
|
||
func (s *syncService) SyncStock(ctx context.Context, req *dto.SyncStockReq) (*bson.ObjectID, error) {
|
||
taskReq := &dto.CreateSyncTaskReq{
|
||
Platform: req.Platform,
|
||
SyncType: consts.SyncTypeIncremental,
|
||
StockID: req.StockID,
|
||
}
|
||
|
||
taskID, err := s.CreateSyncTask(ctx, taskReq)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// 异步执行同步任务(使用协程池限制并发)
|
||
asyncCtx := context.WithoutCancel(ctx)
|
||
SyncPool.Add(asyncCtx, func(ctx context.Context) {
|
||
s.executeStockSync(ctx, taskID, req.StockID, req.Platform)
|
||
})
|
||
|
||
return taskID, nil
|
||
}
|
||
|
||
// BatchSyncAssets 批量同步资产
|
||
func (s *syncService) BatchSyncAssets(ctx context.Context, req *dto.BatchSyncAssetsReq) ([]*bson.ObjectID, error) {
|
||
var taskIDs []*bson.ObjectID
|
||
for _, assetID := range req.AssetIDs {
|
||
taskReq := &dto.CreateSyncTaskReq{
|
||
Platform: req.Platform,
|
||
SyncType: consts.SyncTypeIncremental,
|
||
AssetID: assetID,
|
||
}
|
||
|
||
taskID, err := s.CreateSyncTask(ctx, taskReq)
|
||
if err != nil {
|
||
return taskIDs, err
|
||
}
|
||
taskIDs = append(taskIDs, taskID)
|
||
|
||
// 异步执行同步任务(使用协程池限制并发)
|
||
asyncCtx := context.WithoutCancel(ctx)
|
||
currentAssetID := assetID
|
||
SyncPool.Add(asyncCtx, func(ctx context.Context) {
|
||
s.executeAssetSync(ctx, taskID, currentAssetID, req.Platform)
|
||
})
|
||
}
|
||
|
||
return taskIDs, nil
|
||
}
|
||
|
||
// GetPlatformSyncStatus 获取平台同步状态
|
||
func (s *syncService) GetPlatformSyncStatus(ctx context.Context, req *dto.GetPlatformSyncStatusReq) (*dto.GetPlatformSyncStatusRes, error) {
|
||
// 统计各状态任务数量
|
||
totalReq := &dto.ListSyncTaskReq{Platform: req.Platform}
|
||
totalReq.PageNum = 1
|
||
totalReq.PageSize = 1
|
||
_, total, err := dao.SyncTask.List(ctx, totalReq)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
successReq := &dto.ListSyncTaskReq{Platform: req.Platform, Status: consts.SyncStatusSuccess}
|
||
successReq.PageNum = 1
|
||
successReq.PageSize = 1
|
||
_, successCount, err := dao.SyncTask.List(ctx, successReq)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
failedReq := &dto.ListSyncTaskReq{Platform: req.Platform, Status: consts.SyncStatusFailed}
|
||
failedReq.PageNum = 1
|
||
failedReq.PageSize = 1
|
||
_, failedCount, err := dao.SyncTask.List(ctx, failedReq)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
return &dto.GetPlatformSyncStatusRes{
|
||
Platform: req.Platform,
|
||
IsEnabled: true,
|
||
SyncCount: total,
|
||
SuccessCount: successCount,
|
||
FailedCount: failedCount,
|
||
}, nil
|
||
}
|
||
|
||
// executeAssetSync 执行资产同步
|
||
func (s *syncService) executeAssetSync(ctx context.Context, taskID, assetID *bson.ObjectID, platform consts.SyncPlatform) {
|
||
// 更新任务状态为同步中
|
||
dao.SyncTask.UpdateStatus(ctx, taskID, consts.SyncStatusSyncing, "")
|
||
|
||
// 获取平台服务
|
||
assetService := platformFactory.CreateAssetService(platform)
|
||
|
||
// 执行同步
|
||
err := assetService.SyncAsset(ctx, assetID)
|
||
if err != nil {
|
||
// 同步失败
|
||
g.Log().Error(ctx, "资产同步失败", g.Map{
|
||
"task_id": taskID,
|
||
"asset_id": assetID,
|
||
"platform": string(platform),
|
||
"error": err.Error(),
|
||
})
|
||
dao.SyncTask.UpdateStatus(ctx, taskID, consts.SyncStatusFailed, err.Error())
|
||
} else {
|
||
// 同步成功
|
||
g.Log().Info(ctx, "资产同步成功", g.Map{
|
||
"task_id": taskID,
|
||
"asset_id": assetID,
|
||
"platform": string(platform),
|
||
})
|
||
dao.SyncTask.UpdateStatus(ctx, taskID, consts.SyncStatusSuccess, "")
|
||
}
|
||
}
|
||
|
||
// executeAssetSkuSync 执行资产SKU同步
|
||
func (s *syncService) executeAssetSkuSync(ctx context.Context, taskID, assetSkuID *bson.ObjectID, platform consts.SyncPlatform) {
|
||
// 更新任务状态为同步中
|
||
dao.SyncTask.UpdateStatus(ctx, taskID, consts.SyncStatusSyncing, "")
|
||
|
||
// 获取平台服务
|
||
assetSkuService := platformFactory.CreateAssetSkuService(platform)
|
||
|
||
// 执行同步
|
||
err := assetSkuService.SyncAssetSku(ctx, assetSkuID)
|
||
if err != nil {
|
||
// 同步失败
|
||
g.Log().Error(ctx, "资产SKU同步失败", g.Map{
|
||
"task_id": taskID,
|
||
"asset_sku_id": assetSkuID,
|
||
"platform": string(platform),
|
||
"error": err.Error(),
|
||
})
|
||
dao.SyncTask.UpdateStatus(ctx, taskID, consts.SyncStatusFailed, err.Error())
|
||
} else {
|
||
// 同步成功
|
||
g.Log().Info(ctx, "资产SKU同步成功", g.Map{
|
||
"task_id": taskID,
|
||
"asset_sku_id": assetSkuID,
|
||
"platform": string(platform),
|
||
})
|
||
dao.SyncTask.UpdateStatus(ctx, taskID, consts.SyncStatusSuccess, "")
|
||
}
|
||
}
|
||
|
||
// executeStockSync 执行库存同步
|
||
func (s *syncService) executeStockSync(ctx context.Context, taskID, stockID *bson.ObjectID, platform consts.SyncPlatform) {
|
||
// 更新任务状态为同步中
|
||
dao.SyncTask.UpdateStatus(ctx, taskID, consts.SyncStatusSyncing, "")
|
||
|
||
// 获取平台服务
|
||
stockService := platformFactory.CreateStockService(platform)
|
||
|
||
// 执行同步
|
||
err := stockService.SyncStock(ctx, stockID)
|
||
if err != nil {
|
||
// 同步失败
|
||
g.Log().Error(ctx, "库存同步失败", g.Map{
|
||
"task_id": taskID,
|
||
"stock_id": stockID,
|
||
"platform": string(platform),
|
||
"error": err.Error(),
|
||
})
|
||
dao.SyncTask.UpdateStatus(ctx, taskID, consts.SyncStatusFailed, err.Error())
|
||
} else {
|
||
// 同步成功
|
||
g.Log().Info(ctx, "库存同步成功", g.Map{
|
||
"task_id": taskID,
|
||
"stock_id": stockID,
|
||
"platform": string(platform),
|
||
})
|
||
dao.SyncTask.UpdateStatus(ctx, taskID, consts.SyncStatusSuccess, "")
|
||
}
|
||
}
|