Files
data-engine/service/sync/dynamic_sync.go

1507 lines
45 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package sync
import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
"sync"
"time"
consts "dataengine/consts/public"
dao "dataengine/dao/copydata"
taskDto "dataengine/model/dto/copydata"
entity "dataengine/model/entity/dict"
"gitea.redpowerfuture.com/red-future/common/db/gfdb"
"github.com/sirupsen/logrus"
)
// syncRunningMap 防止同一个接口被并发执行同步
var syncRunningMap sync.Map
// SyncResult 同步结果
type SyncResult struct {
TableName string
TotalPages int
TotalRows int
InsertedRows int
Duration string
}
// PrefetchConfig 预取配置
type PrefetchConfig struct {
URL string `json:"url"`
Method string `json:"method"`
ResponsePath string `json:"response_path"`
TargetParam string `json:"target_param"`
ValueField string `json:"value_field"`
}
// RecursiveConfig 递归遍历配置(如钉钉部门树)
type RecursiveConfig struct {
KeyField string `json:"key_field"`
TargetParam string `json:"target_param"`
}
// SyncByConfig 执行同步
func SyncByConfig(ctx context.Context, platformCode, interfaceCode string, isFullSync bool) (*SyncResult, error) {
// 创建超时 context 防止单次同步卡死
timeoutMin := GetSyncTimeout(ctx)
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(timeoutMin)*time.Minute)
defer cancel()
ctx = timeoutCtx
// 内存锁:防止同一个接口被并发执行(两个调度周期重叠时跳过)
lockKey := platformCode + "/" + interfaceCode
if _, loaded := syncRunningMap.LoadOrStore(lockKey, true); loaded {
logrus.Warnf("接口 [%s] 正在同步中,跳过重复请求", lockKey)
return nil, fmt.Errorf("接口 [%s] 正在同步中,跳过", lockKey)
}
defer syncRunningMap.Delete(lockKey)
start := time.Now()
pm := &PlatformManager{}
platform, ifaces, err := pm.GetPlatformWithInterfaces(ctx, platformCode)
if err != nil {
return nil, fmt.Errorf("读取平台配置失败: %w", err)
}
var iface *entity.ApiInterface
for i := range ifaces {
if ifaces[i].Code == interfaceCode {
iface = &ifaces[i]
break
}
}
if iface == nil {
return nil, fmt.Errorf("未找到接口 [%s]", interfaceCode)
}
if iface.TableDefinition == nil || len(iface.TableDefinition) == 0 {
return nil, fmt.Errorf("接口 [%s] 未配置 table_definition", interfaceCode)
}
td, err := ParseTableDefinition(iface.TableDefinition)
if err != nil {
return nil, fmt.Errorf("解析表结构失败: %w", err)
}
if err := EnsureTable(ctx, td); err != nil {
return nil, fmt.Errorf("建表失败: %w", err)
}
// 检查上次同步状态(在标记 running 之前检查)
prevStatus := getSyncStatus(ctx, platformCode, interfaceCode)
lastSyncTime := int64(0)
if !isFullSync {
lastSyncTime = getLastSyncTime(ctx, platformCode, interfaceCode)
}
if prevStatus == "running" {
logrus.Warnf("检测到上次同步异常中断 [%s/%s],将重新全量同步", platformCode, interfaceCode)
lastSyncTime = 0
}
// 标记同步开始(保留 last_sync_time 不变,状态设为 running
markSyncRunning(ctx, platformCode, interfaceCode, lastSyncTime)
api := NewApiClient(platform)
defer api.Close()
prefetch := parsePrefetchConfig(iface.RequestConfig)
if prefetch != nil {
return syncWithPrefetch(ctx, api, platform, iface, ifaces, td, prefetch, isFullSync, lastSyncTime, start)
}
recursive := parseRecursiveConfig(iface.RequestConfig)
if recursive != nil {
return syncRecursive(ctx, api, platform, iface, td, recursive, start)
}
return syncSingleAPI(ctx, api, platform, iface, td, isFullSync, lastSyncTime, start)
}
// paramsInQuery 判断参数是否应放在 URL 查询字符串中
func paramsInQuery(iface *entity.ApiInterface) bool {
if iface.Method == "GET" {
return true
}
if iface.RequestConfig != nil {
if loc, _ := iface.RequestConfig["parameters_location"].(string); loc == "query" {
return true
}
}
return false
}
// pageResult 单页处理结果
type pageResult struct {
raw []byte // 原始响应体hasMore 分页需用于判断是否还有下一页)
rows []map[string]interface{}
maxTime int64
nextCursor string
inserted int
err error
}
// processPage 处理单页数据:请求 → 解析 → 注入
func processPage(ctx context.Context, api *ApiClient, iface *entity.ApiInterface,
method string, inQuery bool, page, pageSize int, lastSyncTime int64,
extraParams map[string]interface{}) pageResult {
body := buildReqBody(ctx, iface, page, pageSize, lastSyncTime, extraParams)
resp, err := api.Request(ctx, method, iface.Url, body, inQuery)
if err != nil {
return pageResult{err: fmt.Errorf("请求失败: %w", err)}
}
rows, _, maxTime, nextCursor, err := parseRespExt(resp.Body, iface.ResponseConfig)
if err != nil {
return pageResult{err: fmt.Errorf("解析失败: %w", err)}
}
injectRowFields(rows, body, iface.RequestConfig)
return pageResult{
raw: resp.Body,
rows: rows,
maxTime: maxTime,
nextCursor: nextCursor,
}
}
// accumPage 将单页结果累加到 SyncResult
func accumPage(result *SyncResult, maxTime *int64, td *TableDefinition,
ctx context.Context, pr pageResult) {
if len(pr.rows) == 0 {
return
}
inserted, _ := savePage(ctx, td, pr.rows)
result.InsertedRows += inserted
result.TotalRows += len(pr.rows)
if pr.maxTime > *maxTime {
*maxTime = pr.maxTime
}
result.TotalPages++
time.Sleep(100 * time.Millisecond)
}
// syncSingleAPI 单接口分页同步
func syncSingleAPI(ctx context.Context, api *ApiClient, platform *PlatformConfig, iface *entity.ApiInterface, td *TableDefinition, isFullSync bool, lastSyncTime int64, start time.Time) (*SyncResult, error) {
pageSize := GetSyncPageSize(ctx)
if ps, ok := toInt(iface.RequestConfig["page_size"]); ok {
pageSize = ps
} else if ps, ok := toInt(iface.RequestConfig["pageSize"]); ok {
pageSize = ps
}
taskType := "incremental"
if isFullSync || lastSyncTime <= 0 {
taskType = "full"
}
inQuery := paramsInQuery(iface)
method := string(iface.Method)
// 游标参数名
cursorParam := "cursor"
if p, ok := iface.RequestConfig["page_param"].(string); ok && p != "" {
cursorParam = p
}
cursorMode := isCursorPagination(iface)
// 检测 range 模式快手时间分片按7天分片循环拉取
timeMode, _ := iface.RequestConfig["time_field_mode"].(string)
isRangeMode := timeMode == "range"
// 计算时间分片range 模式 + 全量同步)
var timeChunks [][2]int64
if isRangeMode && lastSyncTime <= 0 {
chunkMs := int64(3 * 24 * 3600 * 1000) // 3天
startMs := time.Now().Add(-time.Duration(GetDefaultLookbackDays(ctx)) * 24 * time.Hour).UnixMilli()
if fst, ok := toFloat64(iface.RequestConfig["full_sync_start_time"]); ok && fst > 0 {
startMs = int64(fst)
}
endMs := time.Now().UnixMilli()
for t := startMs; t < endMs; t += chunkMs {
chunkEnd := t + chunkMs
if chunkEnd > endMs {
chunkEnd = endMs
}
timeChunks = append(timeChunks, [2]int64{t, chunkEnd})
}
logrus.Infof("时间分片模式: %d 个分片(每片7天)", len(timeChunks))
} else {
timeChunks = [][2]int64{{0, 0}} // 单次,时间由 buildReqBody 决定
}
result := &SyncResult{TableName: td.TableName}
globalMaxTime := int64(0)
for _, chunk := range timeChunks {
if isRangeMode && chunk[0] > 0 {
lastSyncTime = chunk[0]
}
// 构建第一页的额外参数
firstExtra := map[string]interface{}{}
if cursorMode {
if icv, ok := iface.RequestConfig["initial_cursor"]; ok {
firstExtra[cursorParam] = icv
} else {
firstExtra[cursorParam] = ""
}
}
// 处理第一页
pr := processPage(ctx, api, iface, method, inQuery, 1, pageSize, lastSyncTime, firstExtra)
if pr.err != nil {
recordFailure(ctx, platform.PlatformCode, iface.Code, taskType, pr.err.Error())
return nil, fmt.Errorf("获取第一页失败: %w", pr.err)
}
if len(pr.rows) == 0 {
return &SyncResult{TableName: td.TableName, Duration: fmt.Sprintf("%.1fs", time.Since(start).Seconds())}, nil
}
// result initialized above in time chunk loop
maxTime := pr.maxTime
accumPage(result, &maxTime, td, ctx, pr)
// 分页循环:三种模式仅循环控制和参数不同,内核复用 processPage + accumPage
switch {
case cursorMode:
nextCursor := pr.nextCursor
for nextCursor != "" && nextCursor != "nomore" {
pr := processPage(ctx, api, iface, method, inQuery, 1, pageSize, lastSyncTime, map[string]interface{}{
cursorParam: nextCursor,
})
if pr.err != nil {
logrus.Errorf("游标 %s 处理失败: %v", nextCursor, pr.err)
recordFailure(ctx, platform.PlatformCode, iface.Code, taskType, fmt.Sprintf("游标 %s 失败: %v", nextCursor, pr.err))
break
}
if len(pr.rows) == 0 {
break
}
nextCursor = pr.nextCursor
accumPage(result, &maxTime, td, ctx, pr)
}
case iface.ResponseConfig != nil && iface.ResponseConfig["has_more_field"] != nil:
// hasMore 分页(如钉钉 offset/size + hasMore用上一页的 raw body 判断
hf, _ := iface.ResponseConfig["has_more_field"].(string)
lastRaw := pr.raw
for page := 2; hasMoreCheck(lastRaw, hf); page++ {
pr2 := processPage(ctx, api, iface, method, inQuery, page, pageSize, lastSyncTime, nil)
if pr2.err != nil {
logrus.Errorf("第 %d 页处理失败: %v", page, pr2.err)
break
}
accumPage(result, &maxTime, td, ctx, pr2)
lastRaw = pr2.raw
}
default:
// 普通分页:第一页 parseRespExt 返回 totalPagesparseRespExt 第4个返回值被忽略需要从别处获取
totalPages := getTotalPages(pr.raw)
for page := 2; page <= totalPages; page++ {
pr := processPage(ctx, api, iface, method, inQuery, page, pageSize, lastSyncTime, nil)
if pr.err != nil {
logrus.Errorf("第 %d 页请求失败: %v", page, pr.err)
recordFailure(ctx, platform.PlatformCode, iface.Code, taskType, fmt.Sprintf("第 %d 页失败: %v", page, pr.err))
continue
}
accumPage(result, &maxTime, td, ctx, pr)
}
}
if globalMaxTime <= 0 {
globalMaxTime = time.Now().Unix()
}
}
updateSyncTime(ctx, platform.PlatformCode, iface.Code, globalMaxTime)
result.Duration = fmt.Sprintf("%.1fs", time.Since(start).Seconds())
logrus.Infof("同步完成 - 表:%s, %d条, 写入%d条, 耗时%s", td.TableName, result.TotalRows, result.InsertedRows, result.Duration)
return result, nil
}
func isCursorPagination(iface *entity.ApiInterface) bool {
if iface.RequestConfig == nil {
return false
}
cp, _ := iface.RequestConfig["cursor_pagination"].(bool)
return cp
}
// hasMoreCheck 从响应体中提取 has_more_field 的值
func hasMoreCheck(raw []byte, hasMorePath string) bool {
var respMap map[string]interface{}
if err := json.Unmarshal(raw, &respMap); err != nil {
return false
}
parts := strings.Split(hasMorePath, ".")
cc := respMap
for i, p := range parts {
if i == len(parts)-1 {
if b, ok := cc[p].(bool); ok {
return b
}
if s, ok := cc[p].(string); ok {
return s == "true"
}
return false
}
if m, ok := cc[p].(map[string]interface{}); ok {
cc = m
} else {
return false
}
}
return false
}
// collectPrefetchEntities 从 rows 中收集实体和行数据
func collectPrefetchEntities(rows []map[string]interface{}, prefetch *PrefetchConfig, allEntities *[]interface{}, allRows *[]map[string]interface{}) {
for _, item := range rows {
*allRows = append(*allRows, item)
if prefetch.ValueField == "" {
*allEntities = append(*allEntities, item)
} else if v, ok := item[prefetch.ValueField]; ok {
if f, ok := toFloat64(v); ok {
*allEntities = append(*allEntities, int64(f))
} else {
*allEntities = append(*allEntities, v)
}
}
}
}
// syncWithPrefetch 预取模式同步(先分页拉取全部实体列表,再并发处理每个实体)
func syncWithPrefetch(ctx context.Context, api *ApiClient, platform *PlatformConfig, iface *entity.ApiInterface, allIfaces []entity.ApiInterface, td *TableDefinition, prefetch *PrefetchConfig, isFullSync bool, lastSyncTime int64, start time.Time) (*SyncResult, error) {
logrus.Infof("预取模式: %s -> %s", prefetch.URL, iface.Url)
taskType := "incremental"
if isFullSync || lastSyncTime <= 0 {
taskType = "full"
}
// ====== 1. 预取阶段:分页拉取全部实体列表 ======
prefetchIface := findInterfaceByURL(allIfaces, prefetch.URL)
// 判断预取来源是否有递归配置(如钉钉部门树)
var prefetchRecursiveCfg *RecursiveConfig
if prefetchIface != nil {
prefetchRecursiveCfg = parseRecursiveConfig(prefetchIface.RequestConfig)
}
// 判断预取来源是否游标分页,以及分页参数名
prefetchIsCursor := false
prefetchPageParam := "page"
if prefetchIface != nil && prefetchIface.RequestConfig != nil {
if cp, ok := prefetchIface.RequestConfig["cursor_pagination"].(bool); ok {
prefetchIsCursor = cp
}
if p, ok := prefetchIface.RequestConfig["page_param"].(string); ok && p != "" {
prefetchPageParam = p
}
}
prefetchMethod := strings.ToUpper(prefetch.Method)
prefetchPageSize := 100
if prefetchIface != nil && prefetchIface.RequestConfig != nil {
if ps, ok := toInt(prefetchIface.RequestConfig["pageSize"]); ok {
prefetchPageSize = ps
}
}
// 使用 prefetch 来源接口自己的配置判断参数位置
var prefetchInQuery bool
if prefetchIface != nil {
prefetchInQuery = paramsInQuery(prefetchIface)
} else {
prefetchInQuery = paramsInQuery(iface)
}
// prefetch 来源接口的 response_config用于正确解析列表路径
var prefetchRespCfg map[string]interface{}
if prefetchIface != nil {
prefetchRespCfg = prefetchIface.ResponseConfig
logrus.Debugf("预取接口配置: code=%s, success_field=%v, success_value=%v", prefetchIface.Code, prefetchRespCfg["success_field"], prefetchRespCfg["success_value"])
} else {
logrus.Warnf("未找到预取接口配置: URL=%s将使用默认配置", prefetch.URL)
}
allEntities := make([]interface{}, 0)
allRows := make([]map[string]interface{}, 0)
prefetchReqIface := prefetchIface
if prefetchReqIface == nil {
prefetchReqIface = iface
}
if prefetchIface != nil && prefetchRecursiveCfg != nil {
// ----- 递归遍历预取(如钉钉部门树)-----
maxDepth := 20
if md, ok := toInt(prefetchIface.RequestConfig["max_recursive_depth"]); ok {
maxDepth = md
}
processedKeys := make(map[string]bool)
type rItem struct {
depth int
keyVal interface{}
}
queue := []rItem{{depth: 0, keyVal: nil}}
for len(queue) > 0 {
item := queue[0]
queue = queue[1:]
if item.depth > maxDepth {
continue
}
if item.keyVal != nil {
keyStr := fmt.Sprintf("%v", item.keyVal)
if processedKeys[keyStr] {
continue
}
processedKeys[keyStr] = true
}
extra := make(map[string]interface{})
if item.keyVal != nil {
extra[prefetchRecursiveCfg.TargetParam] = item.keyVal
}
body := buildReqBody(ctx, prefetchReqIface, 1, prefetchPageSize, 0, extra)
r2, err := api.Request(ctx, prefetchMethod, prefetch.URL, body, prefetchInQuery)
if err != nil {
logrus.Errorf("预取递归 [depth=%d] 请求失败: %v", item.depth, err)
continue
}
itemRows, _, _, _, pe := parseRespExt(r2.Body, prefetchRespCfg)
if pe != nil {
logrus.Errorf("预取递归 [depth=%d] 解析失败: %v", item.depth, pe)
continue
}
for _, row := range itemRows {
allRows = append(allRows, row)
if prefetch.ValueField == "" {
allEntities = append(allEntities, row)
} else if v, ok := row[prefetch.ValueField]; ok {
if f, ok := toFloat64(v); ok {
allEntities = append(allEntities, int64(f))
} else {
allEntities = append(allEntities, v)
}
}
if v, ok := row[prefetchRecursiveCfg.KeyField]; ok {
queue = append(queue, rItem{depth: item.depth + 1, keyVal: v})
}
}
time.Sleep(100 * time.Millisecond)
}
} else {
// ----- 常规分页预取 -----
firstExtra := make(map[string]interface{})
if prefetchIsCursor {
// 支持 initial_cursor 配置,如果没有则使用空字符串
if icv, ok := prefetchReqIface.RequestConfig["initial_cursor"]; ok {
firstExtra[prefetchPageParam] = icv
} else {
firstExtra[prefetchPageParam] = ""
}
}
body := buildReqBody(ctx, prefetchReqIface, 1, prefetchPageSize, lastSyncTime, firstExtra)
logrus.Debugf("预取请求 URL: %s, Method: %s, Body: %+v", prefetch.URL, prefetchMethod, body)
resp, err := api.Request(ctx, prefetchMethod, prefetch.URL, body, prefetchInQuery)
if err != nil {
recordFailure(ctx, platform.PlatformCode, iface.Code, taskType, fmt.Sprintf("预取第一页请求失败: %v", err))
return nil, fmt.Errorf("预取第一页失败: %w", err)
}
rows, prefetchTotalPages, _, nextCursor, err := parseRespExt(resp.Body, prefetchRespCfg)
if err != nil {
recordFailure(ctx, platform.PlatformCode, iface.Code, taskType, fmt.Sprintf("解析预取响应失败: %v", err))
return nil, fmt.Errorf("解析预取响应失败: %w", err)
}
collectPrefetchEntities(rows, prefetch, &allEntities, &allRows)
if prefetchIsCursor {
for nextCursor != "" && nextCursor != "nomore" {
body := buildReqBody(ctx, prefetchReqIface, 1, prefetchPageSize, lastSyncTime, map[string]interface{}{
prefetchPageParam: nextCursor,
})
resp, err := api.Request(ctx, prefetchMethod, prefetch.URL, body, prefetchInQuery)
if err != nil {
logrus.Errorf("预取游标 %s 请求失败: %v", nextCursor, err)
break
}
rows, _, _, nc, pe := parseRespExt(resp.Body, prefetchRespCfg)
if pe != nil {
logrus.Errorf("预取游标 %s 解析失败: %v", nextCursor, pe)
break
}
if len(rows) == 0 {
break
}
nextCursor = nc
collectPrefetchEntities(rows, prefetch, &allEntities, &allRows)
time.Sleep(100 * time.Millisecond)
}
} else {
for page := 2; page <= prefetchTotalPages; page++ {
body := buildReqBody(ctx, prefetchReqIface, page, prefetchPageSize, lastSyncTime, nil)
resp, err := api.Request(ctx, prefetchMethod, prefetch.URL, body, prefetchInQuery)
if err != nil {
logrus.Errorf("预取第 %d 页请求失败: %v", page, err)
continue
}
rows, _, _, _, pe := parseRespExt(resp.Body, prefetchRespCfg)
if pe != nil {
logrus.Errorf("预取第 %d 页解析失败: %v", page, pe)
continue
}
collectPrefetchEntities(rows, prefetch, &allEntities, &allRows)
time.Sleep(100 * time.Millisecond)
}
}
}
if len(allEntities) == 0 {
logrus.Warn("预取结果为空列表,跳过同步")
return &SyncResult{TableName: td.TableName, Duration: fmt.Sprintf("%.1fs", time.Since(start).Seconds())}, nil
}
logrus.Infof("预取到 %d 个实体", len(allEntities))
// 将预取的数据也存入库(如账户列表存入 tencent_account_relation
if prefetchIface != nil && prefetchIface.TableDefinition != nil {
prefetchTd, err := ParseTableDefinition(prefetchIface.TableDefinition)
if err == nil {
if ensureErr := EnsureTable(ctx, prefetchTd); ensureErr == nil {
saved, _ := savePage(ctx, prefetchTd, allRows)
logrus.Infof("预取数据已存库: %s, %d 条", prefetchTd.TableName, saved)
}
}
}
// 并发处理每个实体的数据
result := &SyncResult{TableName: td.TableName}
pageSize := GetSyncPageSize(ctx)
if ps, ok := toInt(iface.RequestConfig["page_size"]); ok {
pageSize = ps
} else if ps, ok := toInt(iface.RequestConfig["pageSize"]); ok {
pageSize = ps
}
dataMethod := string(iface.Method)
inQuery := paramsInQuery(iface)
concurrency := GetSyncConcurrency(ctx)
var mu sync.Mutex
var wg sync.WaitGroup
sem := make(chan struct{}, concurrency)
globalMaxTime := lastSyncTime
for idx, entityVal := range allEntities {
wg.Add(1)
sem <- struct{}{}
go func(idx int, val interface{}) {
defer wg.Done()
defer func() { <-sem }()
logrus.Infof(" 处理实体 [%d/%d]: %v", idx+1, len(allEntities), val)
entityMaxTime := int64(0)
if isCursorPagination(iface) {
// ----- 游标分页(如钉钉 user_list-----
cp := "cursor"
if p, ok := iface.RequestConfig["page_param"].(string); ok && p != "" {
cp = p
}
firstExtra := map[string]interface{}{
prefetch.TargetParam: val,
}
if icv, ok := iface.RequestConfig["initial_cursor"]; ok {
firstExtra[cp] = icv
} else {
firstExtra[cp] = ""
}
body := buildReqBody(ctx, iface, 1, pageSize, lastSyncTime, firstExtra)
resp, err := api.Request(ctx, dataMethod, iface.Url, body, inQuery)
if err != nil {
logrus.Errorf(" 实体 %v 首次请求失败: %v", val, err)
return
}
rows, _, mt, nc, pe := parseRespExt(resp.Body, iface.ResponseConfig)
if pe != nil {
logrus.Errorf(" 实体 %v 解析首页失败: %v", val, pe)
return
}
for i := range rows {
rows[i][prefetch.TargetParam] = val
}
injectRowFields(rows, body, iface.RequestConfig)
inserted, _ := savePage(ctx, td, rows)
mu.Lock()
result.InsertedRows += inserted
result.TotalRows += len(rows)
mu.Unlock()
if mt > entityMaxTime {
entityMaxTime = mt
}
nextCursor := nc
for nextCursor != "" && nextCursor != "nomore" {
body := buildReqBody(ctx, iface, 1, pageSize, lastSyncTime, map[string]interface{}{
cp: nextCursor,
prefetch.TargetParam: val,
})
resp, err := api.Request(ctx, dataMethod, iface.Url, body, inQuery)
if err != nil {
logrus.Errorf(" 实体 %v 游标 %s 失败: %v", val, nextCursor, err)
break
}
rows, _, mt, nc, pe := parseRespExt(resp.Body, iface.ResponseConfig)
if pe != nil {
logrus.Errorf(" 实体 %v 游标 %s 解析失败: %v", val, nextCursor, pe)
break
}
if len(rows) == 0 {
break
}
nextCursor = nc
for i := range rows {
rows[i][prefetch.TargetParam] = val
}
injectRowFields(rows, body, iface.RequestConfig)
inserted, _ := savePage(ctx, td, rows)
mu.Lock()
result.InsertedRows += inserted
result.TotalRows += len(rows)
mu.Unlock()
if mt > entityMaxTime {
entityMaxTime = mt
}
time.Sleep(100 * time.Millisecond)
}
} else {
// ----- 普通分页 -----
page := 1
totalPages := 1
for page <= totalPages {
body := buildReqBody(ctx, iface, page, pageSize, lastSyncTime, map[string]interface{}{
prefetch.TargetParam: val,
})
resp, err := api.Request(ctx, dataMethod, iface.Url, body, inQuery)
if err != nil {
logrus.Errorf(" 实体 %v 第 %d 页失败: %v", val, page, err)
page++
time.Sleep(200 * time.Millisecond)
continue
}
rows, tp, mt, parseErr := parseResp(resp.Body, iface.ResponseConfig)
if parseErr != nil {
logrus.Errorf(" 解析响应失败: %v", parseErr)
page++
continue
}
if page == 1 {
totalPages = tp
}
for i := range rows {
rows[i][prefetch.TargetParam] = val
}
injectRowFields(rows, body, iface.RequestConfig)
inserted, _ := savePage(ctx, td, rows)
mu.Lock()
result.InsertedRows += inserted
result.TotalRows += len(rows)
mu.Unlock()
if mt > entityMaxTime {
entityMaxTime = mt
}
page++
time.Sleep(100 * time.Millisecond)
}
}
if entityMaxTime > 0 {
mu.Lock()
if entityMaxTime > globalMaxTime {
globalMaxTime = entityMaxTime
}
mu.Unlock()
}
}(idx, entityVal)
}
wg.Wait()
if globalMaxTime <= 0 {
globalMaxTime = time.Now().Unix()
}
updateSyncTime(ctx, platform.PlatformCode, iface.Code, globalMaxTime)
result.Duration = fmt.Sprintf("%.1fs", time.Since(start).Seconds())
logrus.Infof("同步完成 - 表:%s, %d条, 写入%d条, 耗时%s", td.TableName, result.TotalRows, result.InsertedRows, result.Duration)
return result, nil
}
// syncRecursive 递归遍历同步(如钉钉部门树:先查根级 → 对每个子部门递归查下级)
func syncRecursive(ctx context.Context, api *ApiClient, platform *PlatformConfig, iface *entity.ApiInterface, td *TableDefinition, recursive *RecursiveConfig, start time.Time) (*SyncResult, error) {
maxDepth := 20
if md, ok := toInt(iface.RequestConfig["max_recursive_depth"]); ok {
maxDepth = md
}
inQuery := paramsInQuery(iface)
method := string(iface.Method)
allRows := make([]map[string]interface{}, 0)
processedKeys := make(map[string]bool)
type queueItem struct {
depth int
keyVal interface{} // nil 表示根级
}
queue := []queueItem{{depth: 0, keyVal: nil}}
for len(queue) > 0 {
item := queue[0]
queue = queue[1:]
if item.depth > maxDepth {
logrus.Warnf("递归已达最大深度 %d终止该分支", maxDepth)
continue
}
// 防重复处理
if item.keyVal != nil {
keyStr := fmt.Sprintf("%v", item.keyVal)
if processedKeys[keyStr] {
continue
}
processedKeys[keyStr] = true
}
extraParams := make(map[string]interface{})
if item.keyVal != nil {
extraParams[recursive.TargetParam] = item.keyVal
}
body := buildReqBody(ctx, iface, 1, 100, 0, extraParams)
resp, err := api.Request(ctx, method, iface.Url, body, inQuery)
if err != nil {
logrus.Errorf("递归 [depth=%d] 请求失败: %v", item.depth, err)
recordFailure(ctx, platform.PlatformCode, iface.Code, "full", fmt.Sprintf("递归深度 %d 请求失败: %v", item.depth, err))
continue
}
rows, _, _, _, err := parseRespExt(resp.Body, iface.ResponseConfig)
if err != nil {
logrus.Errorf("递归 [depth=%d] 解析失败: %v", item.depth, err)
continue
}
for _, row := range rows {
allRows = append(allRows, row)
if v, ok := row[recursive.KeyField]; ok {
queue = append(queue, queueItem{depth: item.depth + 1, keyVal: v})
}
}
time.Sleep(100 * time.Millisecond)
}
if len(allRows) == 0 {
logrus.Warn("递归结果为空,跳过入库")
return &SyncResult{TableName: td.TableName, Duration: fmt.Sprintf("%.1fs", time.Since(start).Seconds())}, nil
}
inserted, _ := savePage(ctx, td, allRows)
updateSyncTime(ctx, platform.PlatformCode, iface.Code, time.Now().Unix())
result := &SyncResult{
TableName: td.TableName,
TotalRows: len(allRows),
InsertedRows: inserted,
Duration: fmt.Sprintf("%.1fs", time.Since(start).Seconds()),
}
logrus.Infof("递归同步完成 - 表:%s, %d条, 写入%d条, 耗时%s", td.TableName, result.TotalRows, result.InsertedRows, result.Duration)
return result, nil
}
// getTotalPages 从响应中提取总页数
func getTotalPages(raw []byte) int {
rows, tp, _, _, err := parseRespExt(raw, nil)
if err != nil || len(rows) == 0 {
return 0
}
return tp
}
func toFloat64(v interface{}) (float64, bool) {
switch val := v.(type) {
case float64:
return val, true
case int:
return float64(val), true
case int64:
return float64(val), true
case json.Number:
f, err := val.Float64()
if err != nil {
return 0, false
}
return f, true
case string:
// 支持字符串类型的成功值(如钉钉智能薪酬返回 code: "200"
if f, err := strconv.ParseFloat(val, 64); err == nil {
return f, true
}
return 0, false
default:
return 0, false
}
}
// toInt 从 interface{} 安全转换为 int支持 float64/int/int64/json.Number/string 类型
// JSONB 字段通过 pgx 驱动扫描时可能返回 json.Number 而非 float64
func toInt(v interface{}) (int, bool) {
switch val := v.(type) {
case float64:
return int(val), true
case int:
return val, true
case int64:
return int(val), true
case json.Number:
i, err := val.Int64()
if err != nil {
return 0, false
}
return int(i), true
case string:
if i, err := strconv.Atoi(val); err == nil {
return i, true
}
return 0, false
default:
return 0, false
}
}
// buildPrefetchParams 构建预取接口的请求参数
func buildPrefetchParams(iface *entity.ApiInterface) map[string]interface{} {
params := make(map[string]interface{})
if iface.RequestConfig != nil {
pageParam := "page"
psParam := "page_size"
if p, ok := iface.RequestConfig["page_param"].(string); ok {
pageParam = p
}
if p, ok := iface.RequestConfig["page_size_param"].(string); ok {
psParam = p
}
params[pageParam] = 1
params[psParam] = 100
for k, v := range iface.RequestConfig {
if k == "headers" || k == "prefetch" || k == "page_param" ||
k == "page_size_param" || k == "time_field" || k == "parameters_location" ||
k == "filtering" || k == "group_by" || k == "date_range" ||
k == "body_wrapper_field" || k == "exclude_from_wrapper" ||
k == "cursor_pagination" || k == "time_field_mode" ||
k == "recursive" || k == "max_recursive_depth" ||
k == "initial_cursor" || k == "pagination_mode" ||
k == "full_sync_start_time" || k == "row_inject" {
continue
}
if k == pageParam || k == psParam {
continue
}
params[k] = v
}
}
return params
}
// parsePrefetchConfig 解析预取配置
func parsePrefetchConfig(requestConfig map[string]interface{}) *PrefetchConfig {
if requestConfig == nil {
return nil
}
raw, ok := requestConfig["prefetch"]
if !ok || raw == nil {
return nil
}
m, ok := raw.(map[string]interface{})
if !ok {
return nil
}
pc := &PrefetchConfig{}
if u, _ := m["url"].(string); u != "" {
pc.URL = u
} else {
return nil
}
if method, _ := m["method"].(string); method != "" {
pc.Method = method
} else {
pc.Method = "GET"
}
pc.ResponsePath, _ = m["response_path"].(string)
pc.TargetParam, _ = m["target_param"].(string)
pc.ValueField, _ = m["value_field"].(string)
return pc
}
// parseRecursiveConfig 解析递归遍历配置
func parseRecursiveConfig(requestConfig map[string]interface{}) *RecursiveConfig {
if requestConfig == nil {
return nil
}
raw, ok := requestConfig["recursive"]
if !ok || raw == nil {
return nil
}
m, ok := raw.(map[string]interface{})
if !ok {
return nil
}
rc := &RecursiveConfig{}
if kf, _ := m["key_field"].(string); kf != "" {
rc.KeyField = kf
} else {
return nil
}
if tp, _ := m["target_param"].(string); tp != "" {
rc.TargetParam = tp
} else {
return nil
}
return rc
}
// extractValues 从 JSON 响应中提取值列表
func extractValues(raw []byte, path, valueField string) ([]interface{}, error) {
var resp map[string]interface{}
if err := json.Unmarshal(raw, &resp); err != nil {
return nil, fmt.Errorf("JSON解析失败: %w", err)
}
parts := strings.Split(path, ".")
current := resp
for i, part := range parts {
if i == len(parts)-1 {
list, ok := current[part].([]interface{})
if !ok {
return nil, fmt.Errorf("路径 %s 不是数组", path)
}
var values []interface{}
for _, item := range list {
if valueField == "" {
values = append(values, item)
} else if m, ok := item.(map[string]interface{}); ok {
if v, exists := m[valueField]; exists {
values = append(values, v)
}
}
}
return values, nil
}
next, ok := current[part].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("路径 %s 在 %s 处中断", path, part)
}
current = next
}
return nil, fmt.Errorf("路径 %s 不完整", path)
}
// normalizeJSONNumbers 递归将 json.Number 转换为 Go 原生数值类型
// pgx 驱动扫描 PostgreSQL JSONB 数值字段时可能返回 json.Number底层是 string
// 直接用 json.Marshal 会将其序列化为带引号的字符串(如 "0" 而非 0
// 导致快手等平台的 param JSON 签名校验失败
func normalizeJSONNumbers(v interface{}) interface{} {
switch val := v.(type) {
case json.Number:
if i, err := val.Int64(); err == nil {
return i
}
if f, err := val.Float64(); err == nil {
return f
}
return val.String()
case map[string]interface{}:
result := make(map[string]interface{}, len(val))
for mk, mv := range val {
result[mk] = normalizeJSONNumbers(mv)
}
return result
case []interface{}:
result := make([]interface{}, len(val))
for i, item := range val {
result[i] = normalizeJSONNumbers(item)
}
return result
default:
return v
}
}
// buildReqBody 构建请求参数
func buildReqBody(ctx context.Context, iface *entity.ApiInterface, page, pageSize int, lastSyncTime int64, extraParams map[string]interface{}) map[string]interface{} {
body := make(map[string]interface{})
if iface.RequestConfig != nil {
for k, v := range iface.RequestConfig {
if k == "time_field" || k == "headers" || k == "prefetch" ||
k == "page_param" || k == "page_size_param" || k == "parameters_location" ||
k == "cursor_pagination" || k == "time_field_mode" ||
k == "body_wrapper_field" || k == "exclude_from_wrapper" ||
k == "top_level_params" || k == "recursive" ||
k == "max_recursive_depth" || k == "initial_cursor" ||
k == "pagination_mode" || k == "full_sync_start_time" ||
k == "row_inject" {
continue
}
body[k] = v
}
}
pageParam := "page"
psParam := "page_size"
if iface.RequestConfig != nil {
if p, ok := iface.RequestConfig["page_param"].(string); ok {
pageParam = p
}
if p, ok := iface.RequestConfig["page_size_param"].(string); ok {
psParam = p
}
}
// 偏移量分页(如钉钉 offsetoffset = (page-1) * pageSize
paginationMode := ""
if iface.RequestConfig != nil {
if pm, ok := iface.RequestConfig["pagination_mode"].(string); ok {
paginationMode = pm
}
}
if paginationMode == "offset" {
body[pageParam] = (page - 1) * pageSize
} else {
body[pageParam] = page
}
body[psParam] = pageSize
// 时间过滤处理:支持两种模式
// 1. "filtering" 模式(默认):生成 filtering=[{"field":"...","operator":"GREATER_EQUALS","values":["..."]}](腾讯)
// 2. "range" 模式:生成 beginTime/endTime + queryType快手
if iface.RequestConfig != nil {
if tf, ok := iface.RequestConfig["time_field"].(string); ok && tf != "" {
timeMode := "filtering"
if tm, ok := iface.RequestConfig["time_field_mode"].(string); ok && tm != "" {
timeMode = tm
}
if timeMode == "range" {
// 快手模式beginTime/endTime毫秒时间戳
timeMs := lastSyncTime
if timeMs <= 0 {
// 全量:优先使用配置的 full_sync_start_time否则默认90天前
if fst, ok := toFloat64(iface.RequestConfig["full_sync_start_time"]); ok && fst > 0 {
timeMs = int64(fst)
} else {
timeMs = time.Now().Add(-time.Duration(GetDefaultLookbackDays(ctx)) * 24 * time.Hour).UnixMilli()
}
}
// 仅在配置未指定 queryType 时设默认值,尊重配置
if _, exists := body["queryType"]; !exists {
body["queryType"] = 2
}
body["beginTime"] = timeMs
endTime := time.Now().UnixMilli()
maxRangeMs := int64(3 * 24 * 3600 * 1000)
if endTime-timeMs > maxRangeMs {
endTime = timeMs + maxRangeMs
}
body["endTime"] = endTime
} else if lastSyncTime > 0 {
// 腾讯 filtering 模式(仅增量时)
timeFilter := map[string]interface{}{
"field": tf,
"operator": "GREATER_EQUALS",
"values": []interface{}{fmt.Sprintf("%d", lastSyncTime)},
}
if existing, ok := body["filtering"].([]interface{}); ok {
body["filtering"] = append(existing, timeFilter)
} else {
body["filtering"] = []interface{}{timeFilter}
}
} else if fst, ok := toFloat64(iface.RequestConfig["full_sync_start_time"]); ok && fst > 0 {
// 全量 filtering 模式:指定了 full_sync_start_time从该时间戳开始拉取
timeFilter := map[string]interface{}{
"field": tf,
"operator": "GREATER_EQUALS",
"values": []interface{}{fmt.Sprintf("%d", int64(fst))},
}
if existing, ok := body["filtering"].([]interface{}); ok {
body["filtering"] = append(existing, timeFilter)
} else {
body["filtering"] = []interface{}{timeFilter}
}
}
}
}
for k, v := range extraParams {
body[k] = v
}
// body_wrapper_field 支持:将业务参数包装到指定字段(如快手 API 的 param JSON
if iface.RequestConfig != nil {
if wf, ok := iface.RequestConfig["body_wrapper_field"].(string); ok && wf != "" {
excludeSet := map[string]bool{"method": true, "version": true, "signMethod": true}
if excl, ok := iface.RequestConfig["exclude_from_wrapper"].([]interface{}); ok {
for _, v := range excl {
if s, ok := v.(string); ok {
excludeSet[s] = true
}
}
}
wrapperObj := make(map[string]interface{})
for k, v := range body {
if !excludeSet[k] && k != wf {
wrapperObj[k] = v
delete(body, k)
}
}
// 规范化 json.Number → Go 原生数值类型,避免 json.Marshal 将其序列化为字符串
// pgx 驱动扫描 JSONB 数值时可能返回 json.Number其底层是 string 类型)
normalized := normalizeJSONNumbers(wrapperObj)
b, err := json.Marshal(normalized)
logrus.Infof("body_wrapper param JSON (normalized): %s", string(b))
if err != nil {
logrus.Errorf("JSON序列化 wrapper 失败: %v", err)
} else {
body[wf] = string(b)
}
}
}
return body
}
// parseRespExt 解析响应,支持自定义成功判断和数据路径
func parseRespExt(raw []byte, rc map[string]interface{}) ([]map[string]interface{}, int, int64, string, error) {
var respMap map[string]interface{}
if err := json.Unmarshal(raw, &respMap); err != nil {
return nil, 0, 0, "", fmt.Errorf("JSON解析失败: %w", err)
}
successField, successVal := "code", float64(0)
msgField, listPath, cursorPath := "message", "data", ""
hasMorePath := ""
singleRecord := false
if rc != nil {
if sf, _ := rc["success_field"].(string); sf != "" {
successField = sf
}
if sv, ok := rc["success_value"]; ok {
if f, ok := toFloat64(sv); ok {
successVal = f
}
}
if mf, _ := rc["message_field"].(string); mf != "" {
msgField = mf
}
if lp, _ := rc["list_path"].(string); lp != "" {
listPath = lp
}
if cf, _ := rc["cursor_field"].(string); cf != "" {
cursorPath = cf
}
if sr, _ := rc["single_record"].(bool); sr {
singleRecord = true
}
if hm, _ := rc["has_more_field"].(string); hm != "" {
hasMorePath = hm
}
}
if v, ok := respMap[successField]; ok {
actual, _ := toFloat64(v)
if actual != successVal {
msg, _ := respMap[msgField].(string)
// 如果配置的消息字段为空,尝试通用的错误信息字段
if msg == "" {
for _, altField := range []string{"error_msg", "sub_msg", "msg"} {
if altMsg, _ := respMap[altField].(string); altMsg != "" {
msg = altMsg
break
}
}
}
respJSON, _ := json.Marshal(respMap)
logrus.Errorf("API响应校验失败: success_field=%s, expected=%v, actual=%v, message=%s, 完整响应: %s", successField, successVal, actual, msg, string(respJSON))
return nil, 0, 0, "", fmt.Errorf("API错误: %s=%v, %s=%s", successField, v, msgField, msg)
}
}
// 解析 list_path支持最后一段是数组的情况如 data.orderList
var listData []interface{}
var dataContainer map[string]interface{}
if listPath != "" {
parts := strings.Split(listPath, ".")
cur := respMap
for i, p := range parts {
if i == len(parts)-1 {
// 最后一段:可能直接是数组,也可能是包含 list/orderList 的 map
listData, _ = cur[p].([]interface{})
if listData == nil {
if m, ok := cur[p].(map[string]interface{}); ok {
dataContainer = m
if l, ok := m["list"]; ok {
listData, _ = l.([]interface{})
}
if listData == nil {
if ol, ok := m["orderList"]; ok {
listData, _ = ol.([]interface{})
}
}
}
} else {
dataContainer = cur
}
} else {
next, ok := cur[p].(map[string]interface{})
if !ok {
return nil, 0, 0, "", fmt.Errorf("路径 %s 在 %s 处中断", listPath, p)
}
cur = next
}
}
}
if listData == nil {
if singleRecord && listPath != "" {
// 详情接口list_path 指向单个对象,包装为单元素数组
parts := strings.Split(listPath, ".")
cur := respMap
ok := true
for _, p := range parts {
if m, exists := cur[p].(map[string]interface{}); exists {
cur = m
} else {
ok = false
break
}
}
if ok {
listData = []interface{}{cur}
dataContainer = cur
}
}
}
if listData == nil {
// 回退到根级字段
listData, _ = respMap["list"].([]interface{})
if listData == nil {
listData, _ = respMap["orderList"].([]interface{})
}
dataContainer = respMap
}
var rows []map[string]interface{}
totalPages, maxTime := 1, int64(0)
for _, item := range listData {
if m, ok := item.(map[string]interface{}); ok {
// 展平嵌套 map将子 map 的字段合并到顶层(如 orderBaseInfo.oid → oid
flat := flattenRow(m)
j, _ := json.Marshal(m)
flat["raw_data"] = string(j)
for _, tf := range []string{"last_modified_time", "created_time", "update_time", "createTime", "updateTime", "lastModifiedTime"} {
if t, ok := toFloat64(flat[tf]); ok && int64(t) > maxTime {
maxTime = int64(t)
}
}
rows = append(rows, flat)
}
}
nextCursor := ""
if cursorPath != "" {
cp := strings.Split(cursorPath, ".")
cc := respMap
for i, p := range cp {
if i == len(cp)-1 {
if s, ok := cc[p].(string); ok {
nextCursor = s
} else if f, ok := toFloat64(cc[p]); ok {
// 数字游标(如钉钉 next_cursor=10
nextCursor = fmt.Sprintf("%.0f", f)
}
} else if m, ok := cc[p].(map[string]interface{}); ok {
cc = m
}
}
}
// has_more 字段支持false 时标记游标结束
if hasMorePath != "" {
parts := strings.Split(hasMorePath, ".")
cc := respMap
for i, p := range parts {
if i == len(parts)-1 {
if b, ok := cc[p].(bool); ok && !b {
nextCursor = "nomore"
}
} else if m, ok := cc[p].(map[string]interface{}); ok {
cc = m
}
}
}
if pi, ok := dataContainer["page_info"].(map[string]interface{}); ok {
if tp, ok := toInt(pi["total_page"]); ok {
totalPages = tp
}
}
return rows, totalPages, maxTime, nextCursor, nil
}
// flattenRow 展平嵌套 map将子 map 的字段递归合并到顶层
// 数组类型的字段保持原样不展平
func flattenRow(m map[string]interface{}) map[string]interface{} {
result := make(map[string]interface{}, len(m))
for k, v := range m {
if sub, ok := v.(map[string]interface{}); ok {
// 子 map 递归展平后合并到顶层
for sk, sv := range flattenRow(sub) {
result[sk] = sv
}
} else {
result[k] = v
}
}
return result
}
// parseResp 兼容旧版保持4个返回值
func parseResp(raw []byte, responseConfig map[string]interface{}) ([]map[string]interface{}, int, int64, error) {
rows, tp, mt, _, err := parseRespExt(raw, responseConfig)
return rows, tp, mt, err
}
func savePage(ctx context.Context, td *TableDefinition, rows []map[string]interface{}) (int, error) {
if len(rows) == 0 {
return 0, nil
}
colSet := make(map[string]bool)
for _, c := range td.Columns {
colSet[c.Name] = true
}
var clean []map[string]interface{}
for _, row := range rows {
c := make(map[string]interface{})
for k, v := range row {
if colSet[k] {
c[k] = v
}
}
if r, ok := row["raw_data"]; ok {
c["raw_data"] = r
}
clean = append(clean, c)
}
return InsertRows(ctx, td.TableName, td.ConflictKeys, clean)
}
func getLastSyncTime(ctx context.Context, platformCode, interfaceCode string) int64 {
var t int64
gfdb.DB(ctx).Model(ctx, consts.SyncTrackerTable).
Fields("last_sync_time").
Where("platform_code", platformCode).
Where("interface_code", interfaceCode).
Scan(&t)
return t
}
func getSyncStatus(ctx context.Context, platformCode, interfaceCode string) string {
var s string
gfdb.DB(ctx).Model(ctx, consts.SyncTrackerTable).
Fields("sync_status").
Where("platform_code", platformCode).
Where("interface_code", interfaceCode).
Scan(&s)
return s
}
func markSyncRunning(ctx context.Context, platformCode, interfaceCode string, lastSyncTime int64) {
gfdb.DB(ctx).Model(ctx, consts.SyncTrackerTable).
Data(map[string]interface{}{
"platform_code": platformCode,
"interface_code": interfaceCode,
"last_sync_time": lastSyncTime,
"sync_status": "running",
}).
OnConflict("platform_code", "interface_code").
Save()
}
func updateSyncTime(ctx context.Context, platformCode, interfaceCode string, t int64) {
gfdb.DB(ctx).Model(ctx, consts.SyncTrackerTable).
Data(map[string]interface{}{
"platform_code": platformCode,
"interface_code": interfaceCode,
"last_sync_time": t,
"last_sync_at": time.Now(),
"sync_status": "success",
}).
OnConflict("platform_code", "interface_code").
Save()
}
func recordFailure(ctx context.Context, platformCode, interfaceCode, taskType, errMsg string) {
dao.SyncTaskLog.Create(ctx, &taskDto.CreateSyncTaskLogReq{
TaskID: fmt.Sprintf("%s_%s_%d", platformCode, interfaceCode, time.Now().UnixNano()),
TaskType: taskType,
PlatformCode: platformCode,
InterfaceCode: interfaceCode,
Status: "failed",
MaxRetry: 3,
StartTime: time.Now(),
RequestParams: map[string]interface{}{
"error": errMsg,
},
})
}
// findInterfaceByURL 在所有接口中查找匹配 URL 的接口
func findInterfaceByURL(ifaces []entity.ApiInterface, url string) *entity.ApiInterface {
for i := range ifaces {
if ifaces[i].Url == url {
return &ifaces[i]
}
}
return nil
}
// injectRowFields 将请求参数中 row_inject 指定的字段注入到响应行中
// 用于需要将请求参数(如 statisticsMonth持久化到表中但响应不含该字段的场景
func injectRowFields(rows []map[string]interface{}, body map[string]interface{}, requestConfig map[string]interface{}) {
if requestConfig == nil || body == nil {
return
}
rawInject, ok := requestConfig["row_inject"]
if !ok {
return
}
injectList, ok := rawInject.([]interface{})
if !ok {
return
}
for _, item := range injectList {
fieldName, ok := item.(string)
if !ok {
continue
}
val, exists := body[fieldName]
if !exists {
continue
}
for i := range rows {
rows[i][fieldName] = val
}
}
}