1507 lines
45 KiB
Go
1507 lines
45 KiB
Go
package sync
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"strconv"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
|
||
consts "dataengine/consts/public"
|
||
dao "dataengine/dao/copydata"
|
||
taskDto "dataengine/model/dto/copydata"
|
||
entity "dataengine/model/entity/dict"
|
||
|
||
"gitea.redpowerfuture.com/red-future/common/db/gfdb"
|
||
"github.com/sirupsen/logrus"
|
||
)
|
||
|
||
// syncRunningMap 防止同一个接口被并发执行同步
|
||
var syncRunningMap sync.Map
|
||
|
||
// SyncResult 同步结果
|
||
type SyncResult struct {
|
||
TableName string
|
||
TotalPages int
|
||
TotalRows int
|
||
InsertedRows int
|
||
Duration string
|
||
}
|
||
|
||
// PrefetchConfig 预取配置
|
||
type PrefetchConfig struct {
|
||
URL string `json:"url"`
|
||
Method string `json:"method"`
|
||
ResponsePath string `json:"response_path"`
|
||
TargetParam string `json:"target_param"`
|
||
ValueField string `json:"value_field"`
|
||
}
|
||
|
||
// RecursiveConfig 递归遍历配置(如钉钉部门树)
|
||
type RecursiveConfig struct {
|
||
KeyField string `json:"key_field"`
|
||
TargetParam string `json:"target_param"`
|
||
}
|
||
|
||
// SyncByConfig 执行同步
|
||
func SyncByConfig(ctx context.Context, platformCode, interfaceCode string, isFullSync bool) (*SyncResult, error) {
|
||
// 创建超时 context 防止单次同步卡死
|
||
timeoutMin := GetSyncTimeout(ctx)
|
||
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(timeoutMin)*time.Minute)
|
||
defer cancel()
|
||
ctx = timeoutCtx
|
||
|
||
// 内存锁:防止同一个接口被并发执行(两个调度周期重叠时跳过)
|
||
lockKey := platformCode + "/" + interfaceCode
|
||
if _, loaded := syncRunningMap.LoadOrStore(lockKey, true); loaded {
|
||
logrus.Warnf("接口 [%s] 正在同步中,跳过重复请求", lockKey)
|
||
return nil, fmt.Errorf("接口 [%s] 正在同步中,跳过", lockKey)
|
||
}
|
||
defer syncRunningMap.Delete(lockKey)
|
||
|
||
start := time.Now()
|
||
pm := &PlatformManager{}
|
||
|
||
platform, ifaces, err := pm.GetPlatformWithInterfaces(ctx, platformCode)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("读取平台配置失败: %w", err)
|
||
}
|
||
|
||
var iface *entity.ApiInterface
|
||
for i := range ifaces {
|
||
if ifaces[i].Code == interfaceCode {
|
||
iface = &ifaces[i]
|
||
break
|
||
}
|
||
}
|
||
if iface == nil {
|
||
return nil, fmt.Errorf("未找到接口 [%s]", interfaceCode)
|
||
}
|
||
if iface.TableDefinition == nil || len(iface.TableDefinition) == 0 {
|
||
return nil, fmt.Errorf("接口 [%s] 未配置 table_definition", interfaceCode)
|
||
}
|
||
|
||
td, err := ParseTableDefinition(iface.TableDefinition)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("解析表结构失败: %w", err)
|
||
}
|
||
if err := EnsureTable(ctx, td); err != nil {
|
||
return nil, fmt.Errorf("建表失败: %w", err)
|
||
}
|
||
|
||
// 检查上次同步状态(在标记 running 之前检查)
|
||
prevStatus := getSyncStatus(ctx, platformCode, interfaceCode)
|
||
lastSyncTime := int64(0)
|
||
if !isFullSync {
|
||
lastSyncTime = getLastSyncTime(ctx, platformCode, interfaceCode)
|
||
}
|
||
if prevStatus == "running" {
|
||
logrus.Warnf("检测到上次同步异常中断 [%s/%s],将重新全量同步", platformCode, interfaceCode)
|
||
lastSyncTime = 0
|
||
}
|
||
|
||
// 标记同步开始(保留 last_sync_time 不变,状态设为 running)
|
||
markSyncRunning(ctx, platformCode, interfaceCode, lastSyncTime)
|
||
|
||
api := NewApiClient(platform)
|
||
defer api.Close()
|
||
|
||
prefetch := parsePrefetchConfig(iface.RequestConfig)
|
||
if prefetch != nil {
|
||
return syncWithPrefetch(ctx, api, platform, iface, ifaces, td, prefetch, isFullSync, lastSyncTime, start)
|
||
}
|
||
recursive := parseRecursiveConfig(iface.RequestConfig)
|
||
if recursive != nil {
|
||
return syncRecursive(ctx, api, platform, iface, td, recursive, start)
|
||
}
|
||
return syncSingleAPI(ctx, api, platform, iface, td, isFullSync, lastSyncTime, start)
|
||
}
|
||
|
||
// paramsInQuery 判断参数是否应放在 URL 查询字符串中
|
||
func paramsInQuery(iface *entity.ApiInterface) bool {
|
||
if iface.Method == "GET" {
|
||
return true
|
||
}
|
||
if iface.RequestConfig != nil {
|
||
if loc, _ := iface.RequestConfig["parameters_location"].(string); loc == "query" {
|
||
return true
|
||
}
|
||
}
|
||
return false
|
||
}
|
||
|
||
// pageResult 单页处理结果
|
||
type pageResult struct {
|
||
raw []byte // 原始响应体(hasMore 分页需用于判断是否还有下一页)
|
||
rows []map[string]interface{}
|
||
maxTime int64
|
||
nextCursor string
|
||
inserted int
|
||
err error
|
||
}
|
||
|
||
// processPage 处理单页数据:请求 → 解析 → 注入
|
||
func processPage(ctx context.Context, api *ApiClient, iface *entity.ApiInterface,
|
||
method string, inQuery bool, page, pageSize int, lastSyncTime int64,
|
||
extraParams map[string]interface{}) pageResult {
|
||
|
||
body := buildReqBody(ctx, iface, page, pageSize, lastSyncTime, extraParams)
|
||
resp, err := api.Request(ctx, method, iface.Url, body, inQuery)
|
||
if err != nil {
|
||
return pageResult{err: fmt.Errorf("请求失败: %w", err)}
|
||
}
|
||
|
||
rows, _, maxTime, nextCursor, err := parseRespExt(resp.Body, iface.ResponseConfig)
|
||
if err != nil {
|
||
return pageResult{err: fmt.Errorf("解析失败: %w", err)}
|
||
}
|
||
|
||
injectRowFields(rows, body, iface.RequestConfig)
|
||
return pageResult{
|
||
raw: resp.Body,
|
||
rows: rows,
|
||
maxTime: maxTime,
|
||
nextCursor: nextCursor,
|
||
}
|
||
}
|
||
|
||
// accumPage 将单页结果累加到 SyncResult
|
||
func accumPage(result *SyncResult, maxTime *int64, td *TableDefinition,
|
||
ctx context.Context, pr pageResult) {
|
||
|
||
if len(pr.rows) == 0 {
|
||
return
|
||
}
|
||
inserted, _ := savePage(ctx, td, pr.rows)
|
||
result.InsertedRows += inserted
|
||
result.TotalRows += len(pr.rows)
|
||
if pr.maxTime > *maxTime {
|
||
*maxTime = pr.maxTime
|
||
}
|
||
result.TotalPages++
|
||
time.Sleep(100 * time.Millisecond)
|
||
}
|
||
|
||
// syncSingleAPI 单接口分页同步
|
||
func syncSingleAPI(ctx context.Context, api *ApiClient, platform *PlatformConfig, iface *entity.ApiInterface, td *TableDefinition, isFullSync bool, lastSyncTime int64, start time.Time) (*SyncResult, error) {
|
||
pageSize := GetSyncPageSize(ctx)
|
||
if ps, ok := toInt(iface.RequestConfig["page_size"]); ok {
|
||
pageSize = ps
|
||
} else if ps, ok := toInt(iface.RequestConfig["pageSize"]); ok {
|
||
pageSize = ps
|
||
}
|
||
|
||
taskType := "incremental"
|
||
if isFullSync || lastSyncTime <= 0 {
|
||
taskType = "full"
|
||
}
|
||
|
||
inQuery := paramsInQuery(iface)
|
||
method := string(iface.Method)
|
||
|
||
// 游标参数名
|
||
cursorParam := "cursor"
|
||
if p, ok := iface.RequestConfig["page_param"].(string); ok && p != "" {
|
||
cursorParam = p
|
||
}
|
||
cursorMode := isCursorPagination(iface)
|
||
|
||
// 检测 range 模式(快手时间分片),按7天分片循环拉取
|
||
timeMode, _ := iface.RequestConfig["time_field_mode"].(string)
|
||
isRangeMode := timeMode == "range"
|
||
|
||
// 计算时间分片(range 模式 + 全量同步)
|
||
var timeChunks [][2]int64
|
||
if isRangeMode && lastSyncTime <= 0 {
|
||
chunkMs := int64(3 * 24 * 3600 * 1000) // 3天
|
||
startMs := time.Now().Add(-time.Duration(GetDefaultLookbackDays(ctx)) * 24 * time.Hour).UnixMilli()
|
||
if fst, ok := toFloat64(iface.RequestConfig["full_sync_start_time"]); ok && fst > 0 {
|
||
startMs = int64(fst)
|
||
}
|
||
endMs := time.Now().UnixMilli()
|
||
for t := startMs; t < endMs; t += chunkMs {
|
||
chunkEnd := t + chunkMs
|
||
if chunkEnd > endMs {
|
||
chunkEnd = endMs
|
||
}
|
||
timeChunks = append(timeChunks, [2]int64{t, chunkEnd})
|
||
}
|
||
logrus.Infof("时间分片模式: %d 个分片(每片7天)", len(timeChunks))
|
||
} else {
|
||
timeChunks = [][2]int64{{0, 0}} // 单次,时间由 buildReqBody 决定
|
||
}
|
||
|
||
result := &SyncResult{TableName: td.TableName}
|
||
globalMaxTime := int64(0)
|
||
|
||
for _, chunk := range timeChunks {
|
||
if isRangeMode && chunk[0] > 0 {
|
||
lastSyncTime = chunk[0]
|
||
}
|
||
|
||
// 构建第一页的额外参数
|
||
firstExtra := map[string]interface{}{}
|
||
if cursorMode {
|
||
if icv, ok := iface.RequestConfig["initial_cursor"]; ok {
|
||
firstExtra[cursorParam] = icv
|
||
} else {
|
||
firstExtra[cursorParam] = ""
|
||
}
|
||
}
|
||
|
||
// 处理第一页
|
||
pr := processPage(ctx, api, iface, method, inQuery, 1, pageSize, lastSyncTime, firstExtra)
|
||
if pr.err != nil {
|
||
recordFailure(ctx, platform.PlatformCode, iface.Code, taskType, pr.err.Error())
|
||
return nil, fmt.Errorf("获取第一页失败: %w", pr.err)
|
||
}
|
||
if len(pr.rows) == 0 {
|
||
return &SyncResult{TableName: td.TableName, Duration: fmt.Sprintf("%.1fs", time.Since(start).Seconds())}, nil
|
||
}
|
||
|
||
// result initialized above in time chunk loop
|
||
maxTime := pr.maxTime
|
||
accumPage(result, &maxTime, td, ctx, pr)
|
||
|
||
// 分页循环:三种模式仅循环控制和参数不同,内核复用 processPage + accumPage
|
||
switch {
|
||
case cursorMode:
|
||
nextCursor := pr.nextCursor
|
||
for nextCursor != "" && nextCursor != "nomore" {
|
||
pr := processPage(ctx, api, iface, method, inQuery, 1, pageSize, lastSyncTime, map[string]interface{}{
|
||
cursorParam: nextCursor,
|
||
})
|
||
if pr.err != nil {
|
||
logrus.Errorf("游标 %s 处理失败: %v", nextCursor, pr.err)
|
||
recordFailure(ctx, platform.PlatformCode, iface.Code, taskType, fmt.Sprintf("游标 %s 失败: %v", nextCursor, pr.err))
|
||
break
|
||
}
|
||
if len(pr.rows) == 0 {
|
||
break
|
||
}
|
||
nextCursor = pr.nextCursor
|
||
accumPage(result, &maxTime, td, ctx, pr)
|
||
}
|
||
|
||
case iface.ResponseConfig != nil && iface.ResponseConfig["has_more_field"] != nil:
|
||
// hasMore 分页(如钉钉 offset/size + hasMore):用上一页的 raw body 判断
|
||
hf, _ := iface.ResponseConfig["has_more_field"].(string)
|
||
lastRaw := pr.raw
|
||
for page := 2; hasMoreCheck(lastRaw, hf); page++ {
|
||
pr2 := processPage(ctx, api, iface, method, inQuery, page, pageSize, lastSyncTime, nil)
|
||
if pr2.err != nil {
|
||
logrus.Errorf("第 %d 页处理失败: %v", page, pr2.err)
|
||
break
|
||
}
|
||
accumPage(result, &maxTime, td, ctx, pr2)
|
||
lastRaw = pr2.raw
|
||
}
|
||
|
||
default:
|
||
// 普通分页:第一页 parseRespExt 返回 totalPages(parseRespExt 第4个返回值被忽略,需要从别处获取)
|
||
totalPages := getTotalPages(pr.raw)
|
||
for page := 2; page <= totalPages; page++ {
|
||
pr := processPage(ctx, api, iface, method, inQuery, page, pageSize, lastSyncTime, nil)
|
||
if pr.err != nil {
|
||
logrus.Errorf("第 %d 页请求失败: %v", page, pr.err)
|
||
recordFailure(ctx, platform.PlatformCode, iface.Code, taskType, fmt.Sprintf("第 %d 页失败: %v", page, pr.err))
|
||
continue
|
||
}
|
||
accumPage(result, &maxTime, td, ctx, pr)
|
||
}
|
||
}
|
||
|
||
if globalMaxTime <= 0 {
|
||
globalMaxTime = time.Now().Unix()
|
||
}
|
||
}
|
||
updateSyncTime(ctx, platform.PlatformCode, iface.Code, globalMaxTime)
|
||
|
||
result.Duration = fmt.Sprintf("%.1fs", time.Since(start).Seconds())
|
||
logrus.Infof("同步完成 - 表:%s, %d条, 写入%d条, 耗时%s", td.TableName, result.TotalRows, result.InsertedRows, result.Duration)
|
||
return result, nil
|
||
}
|
||
|
||
func isCursorPagination(iface *entity.ApiInterface) bool {
|
||
if iface.RequestConfig == nil {
|
||
return false
|
||
}
|
||
cp, _ := iface.RequestConfig["cursor_pagination"].(bool)
|
||
return cp
|
||
}
|
||
|
||
// hasMoreCheck 从响应体中提取 has_more_field 的值
|
||
func hasMoreCheck(raw []byte, hasMorePath string) bool {
|
||
var respMap map[string]interface{}
|
||
if err := json.Unmarshal(raw, &respMap); err != nil {
|
||
return false
|
||
}
|
||
parts := strings.Split(hasMorePath, ".")
|
||
cc := respMap
|
||
for i, p := range parts {
|
||
if i == len(parts)-1 {
|
||
if b, ok := cc[p].(bool); ok {
|
||
return b
|
||
}
|
||
if s, ok := cc[p].(string); ok {
|
||
return s == "true"
|
||
}
|
||
return false
|
||
}
|
||
if m, ok := cc[p].(map[string]interface{}); ok {
|
||
cc = m
|
||
} else {
|
||
return false
|
||
}
|
||
}
|
||
return false
|
||
}
|
||
|
||
// collectPrefetchEntities 从 rows 中收集实体和行数据
|
||
func collectPrefetchEntities(rows []map[string]interface{}, prefetch *PrefetchConfig, allEntities *[]interface{}, allRows *[]map[string]interface{}) {
|
||
for _, item := range rows {
|
||
*allRows = append(*allRows, item)
|
||
if prefetch.ValueField == "" {
|
||
*allEntities = append(*allEntities, item)
|
||
} else if v, ok := item[prefetch.ValueField]; ok {
|
||
if f, ok := toFloat64(v); ok {
|
||
*allEntities = append(*allEntities, int64(f))
|
||
} else {
|
||
*allEntities = append(*allEntities, v)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// syncWithPrefetch 预取模式同步(先分页拉取全部实体列表,再并发处理每个实体)
|
||
func syncWithPrefetch(ctx context.Context, api *ApiClient, platform *PlatformConfig, iface *entity.ApiInterface, allIfaces []entity.ApiInterface, td *TableDefinition, prefetch *PrefetchConfig, isFullSync bool, lastSyncTime int64, start time.Time) (*SyncResult, error) {
|
||
logrus.Infof("预取模式: %s -> %s", prefetch.URL, iface.Url)
|
||
|
||
taskType := "incremental"
|
||
if isFullSync || lastSyncTime <= 0 {
|
||
taskType = "full"
|
||
}
|
||
|
||
// ====== 1. 预取阶段:分页拉取全部实体列表 ======
|
||
prefetchIface := findInterfaceByURL(allIfaces, prefetch.URL)
|
||
|
||
// 判断预取来源是否有递归配置(如钉钉部门树)
|
||
var prefetchRecursiveCfg *RecursiveConfig
|
||
if prefetchIface != nil {
|
||
prefetchRecursiveCfg = parseRecursiveConfig(prefetchIface.RequestConfig)
|
||
}
|
||
|
||
// 判断预取来源是否游标分页,以及分页参数名
|
||
prefetchIsCursor := false
|
||
prefetchPageParam := "page"
|
||
if prefetchIface != nil && prefetchIface.RequestConfig != nil {
|
||
if cp, ok := prefetchIface.RequestConfig["cursor_pagination"].(bool); ok {
|
||
prefetchIsCursor = cp
|
||
}
|
||
if p, ok := prefetchIface.RequestConfig["page_param"].(string); ok && p != "" {
|
||
prefetchPageParam = p
|
||
}
|
||
}
|
||
|
||
prefetchMethod := strings.ToUpper(prefetch.Method)
|
||
prefetchPageSize := 100
|
||
if prefetchIface != nil && prefetchIface.RequestConfig != nil {
|
||
if ps, ok := toInt(prefetchIface.RequestConfig["pageSize"]); ok {
|
||
prefetchPageSize = ps
|
||
}
|
||
}
|
||
|
||
// 使用 prefetch 来源接口自己的配置判断参数位置
|
||
var prefetchInQuery bool
|
||
if prefetchIface != nil {
|
||
prefetchInQuery = paramsInQuery(prefetchIface)
|
||
} else {
|
||
prefetchInQuery = paramsInQuery(iface)
|
||
}
|
||
|
||
// prefetch 来源接口的 response_config(用于正确解析列表路径)
|
||
var prefetchRespCfg map[string]interface{}
|
||
if prefetchIface != nil {
|
||
prefetchRespCfg = prefetchIface.ResponseConfig
|
||
logrus.Debugf("预取接口配置: code=%s, success_field=%v, success_value=%v", prefetchIface.Code, prefetchRespCfg["success_field"], prefetchRespCfg["success_value"])
|
||
} else {
|
||
logrus.Warnf("未找到预取接口配置: URL=%s,将使用默认配置", prefetch.URL)
|
||
}
|
||
|
||
allEntities := make([]interface{}, 0)
|
||
allRows := make([]map[string]interface{}, 0)
|
||
|
||
prefetchReqIface := prefetchIface
|
||
if prefetchReqIface == nil {
|
||
prefetchReqIface = iface
|
||
}
|
||
|
||
if prefetchIface != nil && prefetchRecursiveCfg != nil {
|
||
// ----- 递归遍历预取(如钉钉部门树)-----
|
||
maxDepth := 20
|
||
if md, ok := toInt(prefetchIface.RequestConfig["max_recursive_depth"]); ok {
|
||
maxDepth = md
|
||
}
|
||
processedKeys := make(map[string]bool)
|
||
type rItem struct {
|
||
depth int
|
||
keyVal interface{}
|
||
}
|
||
queue := []rItem{{depth: 0, keyVal: nil}}
|
||
|
||
for len(queue) > 0 {
|
||
item := queue[0]
|
||
queue = queue[1:]
|
||
if item.depth > maxDepth {
|
||
continue
|
||
}
|
||
if item.keyVal != nil {
|
||
keyStr := fmt.Sprintf("%v", item.keyVal)
|
||
if processedKeys[keyStr] {
|
||
continue
|
||
}
|
||
processedKeys[keyStr] = true
|
||
}
|
||
extra := make(map[string]interface{})
|
||
if item.keyVal != nil {
|
||
extra[prefetchRecursiveCfg.TargetParam] = item.keyVal
|
||
}
|
||
body := buildReqBody(ctx, prefetchReqIface, 1, prefetchPageSize, 0, extra)
|
||
r2, err := api.Request(ctx, prefetchMethod, prefetch.URL, body, prefetchInQuery)
|
||
if err != nil {
|
||
logrus.Errorf("预取递归 [depth=%d] 请求失败: %v", item.depth, err)
|
||
continue
|
||
}
|
||
itemRows, _, _, _, pe := parseRespExt(r2.Body, prefetchRespCfg)
|
||
if pe != nil {
|
||
logrus.Errorf("预取递归 [depth=%d] 解析失败: %v", item.depth, pe)
|
||
continue
|
||
}
|
||
for _, row := range itemRows {
|
||
allRows = append(allRows, row)
|
||
if prefetch.ValueField == "" {
|
||
allEntities = append(allEntities, row)
|
||
} else if v, ok := row[prefetch.ValueField]; ok {
|
||
if f, ok := toFloat64(v); ok {
|
||
allEntities = append(allEntities, int64(f))
|
||
} else {
|
||
allEntities = append(allEntities, v)
|
||
}
|
||
}
|
||
if v, ok := row[prefetchRecursiveCfg.KeyField]; ok {
|
||
queue = append(queue, rItem{depth: item.depth + 1, keyVal: v})
|
||
}
|
||
}
|
||
time.Sleep(100 * time.Millisecond)
|
||
}
|
||
} else {
|
||
// ----- 常规分页预取 -----
|
||
firstExtra := make(map[string]interface{})
|
||
if prefetchIsCursor {
|
||
// 支持 initial_cursor 配置,如果没有则使用空字符串
|
||
if icv, ok := prefetchReqIface.RequestConfig["initial_cursor"]; ok {
|
||
firstExtra[prefetchPageParam] = icv
|
||
} else {
|
||
firstExtra[prefetchPageParam] = ""
|
||
}
|
||
}
|
||
body := buildReqBody(ctx, prefetchReqIface, 1, prefetchPageSize, lastSyncTime, firstExtra)
|
||
logrus.Debugf("预取请求 URL: %s, Method: %s, Body: %+v", prefetch.URL, prefetchMethod, body)
|
||
resp, err := api.Request(ctx, prefetchMethod, prefetch.URL, body, prefetchInQuery)
|
||
if err != nil {
|
||
recordFailure(ctx, platform.PlatformCode, iface.Code, taskType, fmt.Sprintf("预取第一页请求失败: %v", err))
|
||
return nil, fmt.Errorf("预取第一页失败: %w", err)
|
||
}
|
||
|
||
rows, prefetchTotalPages, _, nextCursor, err := parseRespExt(resp.Body, prefetchRespCfg)
|
||
if err != nil {
|
||
recordFailure(ctx, platform.PlatformCode, iface.Code, taskType, fmt.Sprintf("解析预取响应失败: %v", err))
|
||
return nil, fmt.Errorf("解析预取响应失败: %w", err)
|
||
}
|
||
collectPrefetchEntities(rows, prefetch, &allEntities, &allRows)
|
||
|
||
if prefetchIsCursor {
|
||
for nextCursor != "" && nextCursor != "nomore" {
|
||
body := buildReqBody(ctx, prefetchReqIface, 1, prefetchPageSize, lastSyncTime, map[string]interface{}{
|
||
prefetchPageParam: nextCursor,
|
||
})
|
||
resp, err := api.Request(ctx, prefetchMethod, prefetch.URL, body, prefetchInQuery)
|
||
if err != nil {
|
||
logrus.Errorf("预取游标 %s 请求失败: %v", nextCursor, err)
|
||
break
|
||
}
|
||
rows, _, _, nc, pe := parseRespExt(resp.Body, prefetchRespCfg)
|
||
if pe != nil {
|
||
logrus.Errorf("预取游标 %s 解析失败: %v", nextCursor, pe)
|
||
break
|
||
}
|
||
if len(rows) == 0 {
|
||
break
|
||
}
|
||
nextCursor = nc
|
||
collectPrefetchEntities(rows, prefetch, &allEntities, &allRows)
|
||
time.Sleep(100 * time.Millisecond)
|
||
}
|
||
} else {
|
||
for page := 2; page <= prefetchTotalPages; page++ {
|
||
body := buildReqBody(ctx, prefetchReqIface, page, prefetchPageSize, lastSyncTime, nil)
|
||
resp, err := api.Request(ctx, prefetchMethod, prefetch.URL, body, prefetchInQuery)
|
||
if err != nil {
|
||
logrus.Errorf("预取第 %d 页请求失败: %v", page, err)
|
||
continue
|
||
}
|
||
rows, _, _, _, pe := parseRespExt(resp.Body, prefetchRespCfg)
|
||
if pe != nil {
|
||
logrus.Errorf("预取第 %d 页解析失败: %v", page, pe)
|
||
continue
|
||
}
|
||
collectPrefetchEntities(rows, prefetch, &allEntities, &allRows)
|
||
time.Sleep(100 * time.Millisecond)
|
||
}
|
||
}
|
||
}
|
||
|
||
if len(allEntities) == 0 {
|
||
logrus.Warn("预取结果为空列表,跳过同步")
|
||
return &SyncResult{TableName: td.TableName, Duration: fmt.Sprintf("%.1fs", time.Since(start).Seconds())}, nil
|
||
}
|
||
logrus.Infof("预取到 %d 个实体", len(allEntities))
|
||
|
||
// 将预取的数据也存入库(如账户列表存入 tencent_account_relation)
|
||
if prefetchIface != nil && prefetchIface.TableDefinition != nil {
|
||
prefetchTd, err := ParseTableDefinition(prefetchIface.TableDefinition)
|
||
if err == nil {
|
||
if ensureErr := EnsureTable(ctx, prefetchTd); ensureErr == nil {
|
||
saved, _ := savePage(ctx, prefetchTd, allRows)
|
||
logrus.Infof("预取数据已存库: %s, %d 条", prefetchTd.TableName, saved)
|
||
}
|
||
}
|
||
}
|
||
|
||
// 并发处理每个实体的数据
|
||
result := &SyncResult{TableName: td.TableName}
|
||
pageSize := GetSyncPageSize(ctx)
|
||
if ps, ok := toInt(iface.RequestConfig["page_size"]); ok {
|
||
pageSize = ps
|
||
} else if ps, ok := toInt(iface.RequestConfig["pageSize"]); ok {
|
||
pageSize = ps
|
||
}
|
||
|
||
dataMethod := string(iface.Method)
|
||
inQuery := paramsInQuery(iface)
|
||
concurrency := GetSyncConcurrency(ctx)
|
||
|
||
var mu sync.Mutex
|
||
var wg sync.WaitGroup
|
||
sem := make(chan struct{}, concurrency)
|
||
globalMaxTime := lastSyncTime
|
||
|
||
for idx, entityVal := range allEntities {
|
||
wg.Add(1)
|
||
sem <- struct{}{}
|
||
|
||
go func(idx int, val interface{}) {
|
||
defer wg.Done()
|
||
defer func() { <-sem }()
|
||
|
||
logrus.Infof(" 处理实体 [%d/%d]: %v", idx+1, len(allEntities), val)
|
||
entityMaxTime := int64(0)
|
||
|
||
if isCursorPagination(iface) {
|
||
// ----- 游标分页(如钉钉 user_list)-----
|
||
cp := "cursor"
|
||
if p, ok := iface.RequestConfig["page_param"].(string); ok && p != "" {
|
||
cp = p
|
||
}
|
||
firstExtra := map[string]interface{}{
|
||
prefetch.TargetParam: val,
|
||
}
|
||
if icv, ok := iface.RequestConfig["initial_cursor"]; ok {
|
||
firstExtra[cp] = icv
|
||
} else {
|
||
firstExtra[cp] = ""
|
||
}
|
||
body := buildReqBody(ctx, iface, 1, pageSize, lastSyncTime, firstExtra)
|
||
resp, err := api.Request(ctx, dataMethod, iface.Url, body, inQuery)
|
||
if err != nil {
|
||
logrus.Errorf(" 实体 %v 首次请求失败: %v", val, err)
|
||
return
|
||
}
|
||
rows, _, mt, nc, pe := parseRespExt(resp.Body, iface.ResponseConfig)
|
||
if pe != nil {
|
||
logrus.Errorf(" 实体 %v 解析首页失败: %v", val, pe)
|
||
return
|
||
}
|
||
for i := range rows {
|
||
rows[i][prefetch.TargetParam] = val
|
||
}
|
||
injectRowFields(rows, body, iface.RequestConfig)
|
||
inserted, _ := savePage(ctx, td, rows)
|
||
mu.Lock()
|
||
result.InsertedRows += inserted
|
||
result.TotalRows += len(rows)
|
||
mu.Unlock()
|
||
if mt > entityMaxTime {
|
||
entityMaxTime = mt
|
||
}
|
||
nextCursor := nc
|
||
for nextCursor != "" && nextCursor != "nomore" {
|
||
body := buildReqBody(ctx, iface, 1, pageSize, lastSyncTime, map[string]interface{}{
|
||
cp: nextCursor,
|
||
prefetch.TargetParam: val,
|
||
})
|
||
resp, err := api.Request(ctx, dataMethod, iface.Url, body, inQuery)
|
||
if err != nil {
|
||
logrus.Errorf(" 实体 %v 游标 %s 失败: %v", val, nextCursor, err)
|
||
break
|
||
}
|
||
rows, _, mt, nc, pe := parseRespExt(resp.Body, iface.ResponseConfig)
|
||
if pe != nil {
|
||
logrus.Errorf(" 实体 %v 游标 %s 解析失败: %v", val, nextCursor, pe)
|
||
break
|
||
}
|
||
if len(rows) == 0 {
|
||
break
|
||
}
|
||
nextCursor = nc
|
||
for i := range rows {
|
||
rows[i][prefetch.TargetParam] = val
|
||
}
|
||
injectRowFields(rows, body, iface.RequestConfig)
|
||
inserted, _ := savePage(ctx, td, rows)
|
||
mu.Lock()
|
||
result.InsertedRows += inserted
|
||
result.TotalRows += len(rows)
|
||
mu.Unlock()
|
||
if mt > entityMaxTime {
|
||
entityMaxTime = mt
|
||
}
|
||
time.Sleep(100 * time.Millisecond)
|
||
}
|
||
} else {
|
||
// ----- 普通分页 -----
|
||
page := 1
|
||
totalPages := 1
|
||
for page <= totalPages {
|
||
body := buildReqBody(ctx, iface, page, pageSize, lastSyncTime, map[string]interface{}{
|
||
prefetch.TargetParam: val,
|
||
})
|
||
resp, err := api.Request(ctx, dataMethod, iface.Url, body, inQuery)
|
||
if err != nil {
|
||
logrus.Errorf(" 实体 %v 第 %d 页失败: %v", val, page, err)
|
||
page++
|
||
time.Sleep(200 * time.Millisecond)
|
||
continue
|
||
}
|
||
rows, tp, mt, parseErr := parseResp(resp.Body, iface.ResponseConfig)
|
||
if parseErr != nil {
|
||
logrus.Errorf(" 解析响应失败: %v", parseErr)
|
||
page++
|
||
continue
|
||
}
|
||
if page == 1 {
|
||
totalPages = tp
|
||
}
|
||
for i := range rows {
|
||
rows[i][prefetch.TargetParam] = val
|
||
}
|
||
injectRowFields(rows, body, iface.RequestConfig)
|
||
inserted, _ := savePage(ctx, td, rows)
|
||
mu.Lock()
|
||
result.InsertedRows += inserted
|
||
result.TotalRows += len(rows)
|
||
mu.Unlock()
|
||
if mt > entityMaxTime {
|
||
entityMaxTime = mt
|
||
}
|
||
page++
|
||
time.Sleep(100 * time.Millisecond)
|
||
}
|
||
}
|
||
|
||
if entityMaxTime > 0 {
|
||
mu.Lock()
|
||
if entityMaxTime > globalMaxTime {
|
||
globalMaxTime = entityMaxTime
|
||
}
|
||
mu.Unlock()
|
||
}
|
||
}(idx, entityVal)
|
||
}
|
||
|
||
wg.Wait()
|
||
|
||
if globalMaxTime <= 0 {
|
||
globalMaxTime = time.Now().Unix()
|
||
}
|
||
updateSyncTime(ctx, platform.PlatformCode, iface.Code, globalMaxTime)
|
||
|
||
result.Duration = fmt.Sprintf("%.1fs", time.Since(start).Seconds())
|
||
logrus.Infof("同步完成 - 表:%s, %d条, 写入%d条, 耗时%s", td.TableName, result.TotalRows, result.InsertedRows, result.Duration)
|
||
return result, nil
|
||
}
|
||
|
||
// syncRecursive 递归遍历同步(如钉钉部门树:先查根级 → 对每个子部门递归查下级)
|
||
func syncRecursive(ctx context.Context, api *ApiClient, platform *PlatformConfig, iface *entity.ApiInterface, td *TableDefinition, recursive *RecursiveConfig, start time.Time) (*SyncResult, error) {
|
||
maxDepth := 20
|
||
if md, ok := toInt(iface.RequestConfig["max_recursive_depth"]); ok {
|
||
maxDepth = md
|
||
}
|
||
|
||
inQuery := paramsInQuery(iface)
|
||
method := string(iface.Method)
|
||
|
||
allRows := make([]map[string]interface{}, 0)
|
||
processedKeys := make(map[string]bool)
|
||
|
||
type queueItem struct {
|
||
depth int
|
||
keyVal interface{} // nil 表示根级
|
||
}
|
||
queue := []queueItem{{depth: 0, keyVal: nil}}
|
||
|
||
for len(queue) > 0 {
|
||
item := queue[0]
|
||
queue = queue[1:]
|
||
|
||
if item.depth > maxDepth {
|
||
logrus.Warnf("递归已达最大深度 %d,终止该分支", maxDepth)
|
||
continue
|
||
}
|
||
|
||
// 防重复处理
|
||
if item.keyVal != nil {
|
||
keyStr := fmt.Sprintf("%v", item.keyVal)
|
||
if processedKeys[keyStr] {
|
||
continue
|
||
}
|
||
processedKeys[keyStr] = true
|
||
}
|
||
|
||
extraParams := make(map[string]interface{})
|
||
if item.keyVal != nil {
|
||
extraParams[recursive.TargetParam] = item.keyVal
|
||
}
|
||
|
||
body := buildReqBody(ctx, iface, 1, 100, 0, extraParams)
|
||
resp, err := api.Request(ctx, method, iface.Url, body, inQuery)
|
||
if err != nil {
|
||
logrus.Errorf("递归 [depth=%d] 请求失败: %v", item.depth, err)
|
||
recordFailure(ctx, platform.PlatformCode, iface.Code, "full", fmt.Sprintf("递归深度 %d 请求失败: %v", item.depth, err))
|
||
continue
|
||
}
|
||
|
||
rows, _, _, _, err := parseRespExt(resp.Body, iface.ResponseConfig)
|
||
if err != nil {
|
||
logrus.Errorf("递归 [depth=%d] 解析失败: %v", item.depth, err)
|
||
continue
|
||
}
|
||
|
||
for _, row := range rows {
|
||
allRows = append(allRows, row)
|
||
if v, ok := row[recursive.KeyField]; ok {
|
||
queue = append(queue, queueItem{depth: item.depth + 1, keyVal: v})
|
||
}
|
||
}
|
||
|
||
time.Sleep(100 * time.Millisecond)
|
||
}
|
||
|
||
if len(allRows) == 0 {
|
||
logrus.Warn("递归结果为空,跳过入库")
|
||
return &SyncResult{TableName: td.TableName, Duration: fmt.Sprintf("%.1fs", time.Since(start).Seconds())}, nil
|
||
}
|
||
|
||
inserted, _ := savePage(ctx, td, allRows)
|
||
updateSyncTime(ctx, platform.PlatformCode, iface.Code, time.Now().Unix())
|
||
|
||
result := &SyncResult{
|
||
TableName: td.TableName,
|
||
TotalRows: len(allRows),
|
||
InsertedRows: inserted,
|
||
Duration: fmt.Sprintf("%.1fs", time.Since(start).Seconds()),
|
||
}
|
||
logrus.Infof("递归同步完成 - 表:%s, %d条, 写入%d条, 耗时%s", td.TableName, result.TotalRows, result.InsertedRows, result.Duration)
|
||
return result, nil
|
||
}
|
||
|
||
// getTotalPages 从响应中提取总页数
|
||
func getTotalPages(raw []byte) int {
|
||
rows, tp, _, _, err := parseRespExt(raw, nil)
|
||
if err != nil || len(rows) == 0 {
|
||
return 0
|
||
}
|
||
return tp
|
||
}
|
||
|
||
func toFloat64(v interface{}) (float64, bool) {
|
||
switch val := v.(type) {
|
||
case float64:
|
||
return val, true
|
||
case int:
|
||
return float64(val), true
|
||
case int64:
|
||
return float64(val), true
|
||
case json.Number:
|
||
f, err := val.Float64()
|
||
if err != nil {
|
||
return 0, false
|
||
}
|
||
return f, true
|
||
case string:
|
||
// 支持字符串类型的成功值(如钉钉智能薪酬返回 code: "200")
|
||
if f, err := strconv.ParseFloat(val, 64); err == nil {
|
||
return f, true
|
||
}
|
||
return 0, false
|
||
default:
|
||
return 0, false
|
||
}
|
||
}
|
||
|
||
// toInt 从 interface{} 安全转换为 int,支持 float64/int/int64/json.Number/string 类型
|
||
// JSONB 字段通过 pgx 驱动扫描时可能返回 json.Number 而非 float64
|
||
func toInt(v interface{}) (int, bool) {
|
||
switch val := v.(type) {
|
||
case float64:
|
||
return int(val), true
|
||
case int:
|
||
return val, true
|
||
case int64:
|
||
return int(val), true
|
||
case json.Number:
|
||
i, err := val.Int64()
|
||
if err != nil {
|
||
return 0, false
|
||
}
|
||
return int(i), true
|
||
case string:
|
||
if i, err := strconv.Atoi(val); err == nil {
|
||
return i, true
|
||
}
|
||
return 0, false
|
||
default:
|
||
return 0, false
|
||
}
|
||
}
|
||
|
||
// buildPrefetchParams 构建预取接口的请求参数
|
||
func buildPrefetchParams(iface *entity.ApiInterface) map[string]interface{} {
|
||
params := make(map[string]interface{})
|
||
|
||
if iface.RequestConfig != nil {
|
||
pageParam := "page"
|
||
psParam := "page_size"
|
||
if p, ok := iface.RequestConfig["page_param"].(string); ok {
|
||
pageParam = p
|
||
}
|
||
if p, ok := iface.RequestConfig["page_size_param"].(string); ok {
|
||
psParam = p
|
||
}
|
||
params[pageParam] = 1
|
||
params[psParam] = 100
|
||
|
||
for k, v := range iface.RequestConfig {
|
||
if k == "headers" || k == "prefetch" || k == "page_param" ||
|
||
k == "page_size_param" || k == "time_field" || k == "parameters_location" ||
|
||
k == "filtering" || k == "group_by" || k == "date_range" ||
|
||
k == "body_wrapper_field" || k == "exclude_from_wrapper" ||
|
||
k == "cursor_pagination" || k == "time_field_mode" ||
|
||
k == "recursive" || k == "max_recursive_depth" ||
|
||
k == "initial_cursor" || k == "pagination_mode" ||
|
||
k == "full_sync_start_time" || k == "row_inject" {
|
||
continue
|
||
}
|
||
if k == pageParam || k == psParam {
|
||
continue
|
||
}
|
||
params[k] = v
|
||
}
|
||
}
|
||
|
||
return params
|
||
}
|
||
|
||
// parsePrefetchConfig 解析预取配置
|
||
func parsePrefetchConfig(requestConfig map[string]interface{}) *PrefetchConfig {
|
||
if requestConfig == nil {
|
||
return nil
|
||
}
|
||
raw, ok := requestConfig["prefetch"]
|
||
if !ok || raw == nil {
|
||
return nil
|
||
}
|
||
m, ok := raw.(map[string]interface{})
|
||
if !ok {
|
||
return nil
|
||
}
|
||
pc := &PrefetchConfig{}
|
||
if u, _ := m["url"].(string); u != "" {
|
||
pc.URL = u
|
||
} else {
|
||
return nil
|
||
}
|
||
if method, _ := m["method"].(string); method != "" {
|
||
pc.Method = method
|
||
} else {
|
||
pc.Method = "GET"
|
||
}
|
||
pc.ResponsePath, _ = m["response_path"].(string)
|
||
pc.TargetParam, _ = m["target_param"].(string)
|
||
pc.ValueField, _ = m["value_field"].(string)
|
||
return pc
|
||
}
|
||
|
||
// parseRecursiveConfig 解析递归遍历配置
|
||
func parseRecursiveConfig(requestConfig map[string]interface{}) *RecursiveConfig {
|
||
if requestConfig == nil {
|
||
return nil
|
||
}
|
||
raw, ok := requestConfig["recursive"]
|
||
if !ok || raw == nil {
|
||
return nil
|
||
}
|
||
m, ok := raw.(map[string]interface{})
|
||
if !ok {
|
||
return nil
|
||
}
|
||
rc := &RecursiveConfig{}
|
||
if kf, _ := m["key_field"].(string); kf != "" {
|
||
rc.KeyField = kf
|
||
} else {
|
||
return nil
|
||
}
|
||
if tp, _ := m["target_param"].(string); tp != "" {
|
||
rc.TargetParam = tp
|
||
} else {
|
||
return nil
|
||
}
|
||
return rc
|
||
}
|
||
|
||
// extractValues 从 JSON 响应中提取值列表
|
||
func extractValues(raw []byte, path, valueField string) ([]interface{}, error) {
|
||
var resp map[string]interface{}
|
||
if err := json.Unmarshal(raw, &resp); err != nil {
|
||
return nil, fmt.Errorf("JSON解析失败: %w", err)
|
||
}
|
||
|
||
parts := strings.Split(path, ".")
|
||
current := resp
|
||
for i, part := range parts {
|
||
if i == len(parts)-1 {
|
||
list, ok := current[part].([]interface{})
|
||
if !ok {
|
||
return nil, fmt.Errorf("路径 %s 不是数组", path)
|
||
}
|
||
var values []interface{}
|
||
for _, item := range list {
|
||
if valueField == "" {
|
||
values = append(values, item)
|
||
} else if m, ok := item.(map[string]interface{}); ok {
|
||
if v, exists := m[valueField]; exists {
|
||
values = append(values, v)
|
||
}
|
||
}
|
||
}
|
||
return values, nil
|
||
}
|
||
next, ok := current[part].(map[string]interface{})
|
||
if !ok {
|
||
return nil, fmt.Errorf("路径 %s 在 %s 处中断", path, part)
|
||
}
|
||
current = next
|
||
}
|
||
return nil, fmt.Errorf("路径 %s 不完整", path)
|
||
}
|
||
|
||
// normalizeJSONNumbers 递归将 json.Number 转换为 Go 原生数值类型
|
||
// pgx 驱动扫描 PostgreSQL JSONB 数值字段时可能返回 json.Number(底层是 string),
|
||
// 直接用 json.Marshal 会将其序列化为带引号的字符串(如 "0" 而非 0),
|
||
// 导致快手等平台的 param JSON 签名校验失败
|
||
func normalizeJSONNumbers(v interface{}) interface{} {
|
||
switch val := v.(type) {
|
||
case json.Number:
|
||
if i, err := val.Int64(); err == nil {
|
||
return i
|
||
}
|
||
if f, err := val.Float64(); err == nil {
|
||
return f
|
||
}
|
||
return val.String()
|
||
case map[string]interface{}:
|
||
result := make(map[string]interface{}, len(val))
|
||
for mk, mv := range val {
|
||
result[mk] = normalizeJSONNumbers(mv)
|
||
}
|
||
return result
|
||
case []interface{}:
|
||
result := make([]interface{}, len(val))
|
||
for i, item := range val {
|
||
result[i] = normalizeJSONNumbers(item)
|
||
}
|
||
return result
|
||
default:
|
||
return v
|
||
}
|
||
}
|
||
|
||
// buildReqBody 构建请求参数
|
||
func buildReqBody(ctx context.Context, iface *entity.ApiInterface, page, pageSize int, lastSyncTime int64, extraParams map[string]interface{}) map[string]interface{} {
|
||
body := make(map[string]interface{})
|
||
if iface.RequestConfig != nil {
|
||
for k, v := range iface.RequestConfig {
|
||
if k == "time_field" || k == "headers" || k == "prefetch" ||
|
||
k == "page_param" || k == "page_size_param" || k == "parameters_location" ||
|
||
k == "cursor_pagination" || k == "time_field_mode" ||
|
||
k == "body_wrapper_field" || k == "exclude_from_wrapper" ||
|
||
k == "top_level_params" || k == "recursive" ||
|
||
k == "max_recursive_depth" || k == "initial_cursor" ||
|
||
k == "pagination_mode" || k == "full_sync_start_time" ||
|
||
k == "row_inject" {
|
||
continue
|
||
}
|
||
body[k] = v
|
||
}
|
||
}
|
||
pageParam := "page"
|
||
psParam := "page_size"
|
||
if iface.RequestConfig != nil {
|
||
if p, ok := iface.RequestConfig["page_param"].(string); ok {
|
||
pageParam = p
|
||
}
|
||
if p, ok := iface.RequestConfig["page_size_param"].(string); ok {
|
||
psParam = p
|
||
}
|
||
}
|
||
// 偏移量分页(如钉钉 offset):offset = (page-1) * pageSize
|
||
paginationMode := ""
|
||
if iface.RequestConfig != nil {
|
||
if pm, ok := iface.RequestConfig["pagination_mode"].(string); ok {
|
||
paginationMode = pm
|
||
}
|
||
}
|
||
if paginationMode == "offset" {
|
||
body[pageParam] = (page - 1) * pageSize
|
||
} else {
|
||
body[pageParam] = page
|
||
}
|
||
body[psParam] = pageSize
|
||
|
||
// 时间过滤处理:支持两种模式
|
||
// 1. "filtering" 模式(默认):生成 filtering=[{"field":"...","operator":"GREATER_EQUALS","values":["..."]}](腾讯)
|
||
// 2. "range" 模式:生成 beginTime/endTime + queryType(快手)
|
||
if iface.RequestConfig != nil {
|
||
if tf, ok := iface.RequestConfig["time_field"].(string); ok && tf != "" {
|
||
timeMode := "filtering"
|
||
if tm, ok := iface.RequestConfig["time_field_mode"].(string); ok && tm != "" {
|
||
timeMode = tm
|
||
}
|
||
|
||
if timeMode == "range" {
|
||
// 快手模式:beginTime/endTime(毫秒时间戳)
|
||
timeMs := lastSyncTime
|
||
if timeMs <= 0 {
|
||
// 全量:优先使用配置的 full_sync_start_time,否则默认90天前
|
||
if fst, ok := toFloat64(iface.RequestConfig["full_sync_start_time"]); ok && fst > 0 {
|
||
timeMs = int64(fst)
|
||
} else {
|
||
timeMs = time.Now().Add(-time.Duration(GetDefaultLookbackDays(ctx)) * 24 * time.Hour).UnixMilli()
|
||
}
|
||
}
|
||
// 仅在配置未指定 queryType 时设默认值,尊重配置
|
||
if _, exists := body["queryType"]; !exists {
|
||
body["queryType"] = 2
|
||
}
|
||
body["beginTime"] = timeMs
|
||
endTime := time.Now().UnixMilli()
|
||
maxRangeMs := int64(3 * 24 * 3600 * 1000)
|
||
if endTime-timeMs > maxRangeMs {
|
||
endTime = timeMs + maxRangeMs
|
||
}
|
||
body["endTime"] = endTime
|
||
} else if lastSyncTime > 0 {
|
||
// 腾讯 filtering 模式(仅增量时)
|
||
timeFilter := map[string]interface{}{
|
||
"field": tf,
|
||
"operator": "GREATER_EQUALS",
|
||
"values": []interface{}{fmt.Sprintf("%d", lastSyncTime)},
|
||
}
|
||
if existing, ok := body["filtering"].([]interface{}); ok {
|
||
body["filtering"] = append(existing, timeFilter)
|
||
} else {
|
||
body["filtering"] = []interface{}{timeFilter}
|
||
}
|
||
} else if fst, ok := toFloat64(iface.RequestConfig["full_sync_start_time"]); ok && fst > 0 {
|
||
// 全量 filtering 模式:指定了 full_sync_start_time,从该时间戳开始拉取
|
||
timeFilter := map[string]interface{}{
|
||
"field": tf,
|
||
"operator": "GREATER_EQUALS",
|
||
"values": []interface{}{fmt.Sprintf("%d", int64(fst))},
|
||
}
|
||
if existing, ok := body["filtering"].([]interface{}); ok {
|
||
body["filtering"] = append(existing, timeFilter)
|
||
} else {
|
||
body["filtering"] = []interface{}{timeFilter}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
for k, v := range extraParams {
|
||
body[k] = v
|
||
}
|
||
|
||
// body_wrapper_field 支持:将业务参数包装到指定字段(如快手 API 的 param JSON)
|
||
if iface.RequestConfig != nil {
|
||
if wf, ok := iface.RequestConfig["body_wrapper_field"].(string); ok && wf != "" {
|
||
excludeSet := map[string]bool{"method": true, "version": true, "signMethod": true}
|
||
if excl, ok := iface.RequestConfig["exclude_from_wrapper"].([]interface{}); ok {
|
||
for _, v := range excl {
|
||
if s, ok := v.(string); ok {
|
||
excludeSet[s] = true
|
||
}
|
||
}
|
||
}
|
||
wrapperObj := make(map[string]interface{})
|
||
for k, v := range body {
|
||
if !excludeSet[k] && k != wf {
|
||
wrapperObj[k] = v
|
||
delete(body, k)
|
||
}
|
||
}
|
||
// 规范化 json.Number → Go 原生数值类型,避免 json.Marshal 将其序列化为字符串
|
||
// (pgx 驱动扫描 JSONB 数值时可能返回 json.Number,其底层是 string 类型)
|
||
normalized := normalizeJSONNumbers(wrapperObj)
|
||
b, err := json.Marshal(normalized)
|
||
logrus.Infof("body_wrapper param JSON (normalized): %s", string(b))
|
||
if err != nil {
|
||
logrus.Errorf("JSON序列化 wrapper 失败: %v", err)
|
||
} else {
|
||
body[wf] = string(b)
|
||
}
|
||
}
|
||
}
|
||
|
||
return body
|
||
}
|
||
|
||
// parseRespExt 解析响应,支持自定义成功判断和数据路径
|
||
func parseRespExt(raw []byte, rc map[string]interface{}) ([]map[string]interface{}, int, int64, string, error) {
|
||
var respMap map[string]interface{}
|
||
if err := json.Unmarshal(raw, &respMap); err != nil {
|
||
return nil, 0, 0, "", fmt.Errorf("JSON解析失败: %w", err)
|
||
}
|
||
successField, successVal := "code", float64(0)
|
||
msgField, listPath, cursorPath := "message", "data", ""
|
||
hasMorePath := ""
|
||
singleRecord := false
|
||
if rc != nil {
|
||
if sf, _ := rc["success_field"].(string); sf != "" {
|
||
successField = sf
|
||
}
|
||
if sv, ok := rc["success_value"]; ok {
|
||
if f, ok := toFloat64(sv); ok {
|
||
successVal = f
|
||
}
|
||
}
|
||
if mf, _ := rc["message_field"].(string); mf != "" {
|
||
msgField = mf
|
||
}
|
||
if lp, _ := rc["list_path"].(string); lp != "" {
|
||
listPath = lp
|
||
}
|
||
if cf, _ := rc["cursor_field"].(string); cf != "" {
|
||
cursorPath = cf
|
||
}
|
||
if sr, _ := rc["single_record"].(bool); sr {
|
||
singleRecord = true
|
||
}
|
||
if hm, _ := rc["has_more_field"].(string); hm != "" {
|
||
hasMorePath = hm
|
||
}
|
||
}
|
||
if v, ok := respMap[successField]; ok {
|
||
actual, _ := toFloat64(v)
|
||
if actual != successVal {
|
||
msg, _ := respMap[msgField].(string)
|
||
// 如果配置的消息字段为空,尝试通用的错误信息字段
|
||
if msg == "" {
|
||
for _, altField := range []string{"error_msg", "sub_msg", "msg"} {
|
||
if altMsg, _ := respMap[altField].(string); altMsg != "" {
|
||
msg = altMsg
|
||
break
|
||
}
|
||
}
|
||
}
|
||
respJSON, _ := json.Marshal(respMap)
|
||
logrus.Errorf("API响应校验失败: success_field=%s, expected=%v, actual=%v, message=%s, 完整响应: %s", successField, successVal, actual, msg, string(respJSON))
|
||
return nil, 0, 0, "", fmt.Errorf("API错误: %s=%v, %s=%s", successField, v, msgField, msg)
|
||
}
|
||
}
|
||
|
||
// 解析 list_path,支持最后一段是数组的情况(如 data.orderList)
|
||
var listData []interface{}
|
||
var dataContainer map[string]interface{}
|
||
if listPath != "" {
|
||
parts := strings.Split(listPath, ".")
|
||
cur := respMap
|
||
for i, p := range parts {
|
||
if i == len(parts)-1 {
|
||
// 最后一段:可能直接是数组,也可能是包含 list/orderList 的 map
|
||
listData, _ = cur[p].([]interface{})
|
||
if listData == nil {
|
||
if m, ok := cur[p].(map[string]interface{}); ok {
|
||
dataContainer = m
|
||
if l, ok := m["list"]; ok {
|
||
listData, _ = l.([]interface{})
|
||
}
|
||
if listData == nil {
|
||
if ol, ok := m["orderList"]; ok {
|
||
listData, _ = ol.([]interface{})
|
||
}
|
||
}
|
||
}
|
||
} else {
|
||
dataContainer = cur
|
||
}
|
||
} else {
|
||
next, ok := cur[p].(map[string]interface{})
|
||
if !ok {
|
||
return nil, 0, 0, "", fmt.Errorf("路径 %s 在 %s 处中断", listPath, p)
|
||
}
|
||
cur = next
|
||
}
|
||
}
|
||
}
|
||
if listData == nil {
|
||
if singleRecord && listPath != "" {
|
||
// 详情接口:list_path 指向单个对象,包装为单元素数组
|
||
parts := strings.Split(listPath, ".")
|
||
cur := respMap
|
||
ok := true
|
||
for _, p := range parts {
|
||
if m, exists := cur[p].(map[string]interface{}); exists {
|
||
cur = m
|
||
} else {
|
||
ok = false
|
||
break
|
||
}
|
||
}
|
||
if ok {
|
||
listData = []interface{}{cur}
|
||
dataContainer = cur
|
||
}
|
||
}
|
||
}
|
||
if listData == nil {
|
||
// 回退到根级字段
|
||
listData, _ = respMap["list"].([]interface{})
|
||
if listData == nil {
|
||
listData, _ = respMap["orderList"].([]interface{})
|
||
}
|
||
dataContainer = respMap
|
||
}
|
||
var rows []map[string]interface{}
|
||
totalPages, maxTime := 1, int64(0)
|
||
for _, item := range listData {
|
||
if m, ok := item.(map[string]interface{}); ok {
|
||
// 展平嵌套 map:将子 map 的字段合并到顶层(如 orderBaseInfo.oid → oid)
|
||
flat := flattenRow(m)
|
||
j, _ := json.Marshal(m)
|
||
flat["raw_data"] = string(j)
|
||
for _, tf := range []string{"last_modified_time", "created_time", "update_time", "createTime", "updateTime", "lastModifiedTime"} {
|
||
if t, ok := toFloat64(flat[tf]); ok && int64(t) > maxTime {
|
||
maxTime = int64(t)
|
||
}
|
||
}
|
||
rows = append(rows, flat)
|
||
}
|
||
}
|
||
nextCursor := ""
|
||
if cursorPath != "" {
|
||
cp := strings.Split(cursorPath, ".")
|
||
cc := respMap
|
||
for i, p := range cp {
|
||
if i == len(cp)-1 {
|
||
if s, ok := cc[p].(string); ok {
|
||
nextCursor = s
|
||
} else if f, ok := toFloat64(cc[p]); ok {
|
||
// 数字游标(如钉钉 next_cursor=10)
|
||
nextCursor = fmt.Sprintf("%.0f", f)
|
||
}
|
||
} else if m, ok := cc[p].(map[string]interface{}); ok {
|
||
cc = m
|
||
}
|
||
}
|
||
}
|
||
// has_more 字段支持:false 时标记游标结束
|
||
if hasMorePath != "" {
|
||
parts := strings.Split(hasMorePath, ".")
|
||
cc := respMap
|
||
for i, p := range parts {
|
||
if i == len(parts)-1 {
|
||
if b, ok := cc[p].(bool); ok && !b {
|
||
nextCursor = "nomore"
|
||
}
|
||
} else if m, ok := cc[p].(map[string]interface{}); ok {
|
||
cc = m
|
||
}
|
||
}
|
||
}
|
||
if pi, ok := dataContainer["page_info"].(map[string]interface{}); ok {
|
||
if tp, ok := toInt(pi["total_page"]); ok {
|
||
totalPages = tp
|
||
}
|
||
}
|
||
return rows, totalPages, maxTime, nextCursor, nil
|
||
}
|
||
|
||
// flattenRow 展平嵌套 map:将子 map 的字段递归合并到顶层
|
||
// 数组类型的字段保持原样不展平
|
||
func flattenRow(m map[string]interface{}) map[string]interface{} {
|
||
result := make(map[string]interface{}, len(m))
|
||
for k, v := range m {
|
||
if sub, ok := v.(map[string]interface{}); ok {
|
||
// 子 map 递归展平后合并到顶层
|
||
for sk, sv := range flattenRow(sub) {
|
||
result[sk] = sv
|
||
}
|
||
} else {
|
||
result[k] = v
|
||
}
|
||
}
|
||
return result
|
||
}
|
||
|
||
// parseResp 兼容旧版,保持4个返回值
|
||
func parseResp(raw []byte, responseConfig map[string]interface{}) ([]map[string]interface{}, int, int64, error) {
|
||
rows, tp, mt, _, err := parseRespExt(raw, responseConfig)
|
||
return rows, tp, mt, err
|
||
}
|
||
|
||
func savePage(ctx context.Context, td *TableDefinition, rows []map[string]interface{}) (int, error) {
|
||
if len(rows) == 0 {
|
||
return 0, nil
|
||
}
|
||
colSet := make(map[string]bool)
|
||
for _, c := range td.Columns {
|
||
colSet[c.Name] = true
|
||
}
|
||
var clean []map[string]interface{}
|
||
for _, row := range rows {
|
||
c := make(map[string]interface{})
|
||
for k, v := range row {
|
||
if colSet[k] {
|
||
c[k] = v
|
||
}
|
||
}
|
||
if r, ok := row["raw_data"]; ok {
|
||
c["raw_data"] = r
|
||
}
|
||
clean = append(clean, c)
|
||
}
|
||
return InsertRows(ctx, td.TableName, td.ConflictKeys, clean)
|
||
}
|
||
|
||
func getLastSyncTime(ctx context.Context, platformCode, interfaceCode string) int64 {
|
||
var t int64
|
||
gfdb.DB(ctx).Model(ctx, consts.SyncTrackerTable).
|
||
Fields("last_sync_time").
|
||
Where("platform_code", platformCode).
|
||
Where("interface_code", interfaceCode).
|
||
Scan(&t)
|
||
return t
|
||
}
|
||
|
||
func getSyncStatus(ctx context.Context, platformCode, interfaceCode string) string {
|
||
var s string
|
||
gfdb.DB(ctx).Model(ctx, consts.SyncTrackerTable).
|
||
Fields("sync_status").
|
||
Where("platform_code", platformCode).
|
||
Where("interface_code", interfaceCode).
|
||
Scan(&s)
|
||
return s
|
||
}
|
||
|
||
func markSyncRunning(ctx context.Context, platformCode, interfaceCode string, lastSyncTime int64) {
|
||
gfdb.DB(ctx).Model(ctx, consts.SyncTrackerTable).
|
||
Data(map[string]interface{}{
|
||
"platform_code": platformCode,
|
||
"interface_code": interfaceCode,
|
||
"last_sync_time": lastSyncTime,
|
||
"sync_status": "running",
|
||
}).
|
||
OnConflict("platform_code", "interface_code").
|
||
Save()
|
||
}
|
||
|
||
func updateSyncTime(ctx context.Context, platformCode, interfaceCode string, t int64) {
|
||
gfdb.DB(ctx).Model(ctx, consts.SyncTrackerTable).
|
||
Data(map[string]interface{}{
|
||
"platform_code": platformCode,
|
||
"interface_code": interfaceCode,
|
||
"last_sync_time": t,
|
||
"last_sync_at": time.Now(),
|
||
"sync_status": "success",
|
||
}).
|
||
OnConflict("platform_code", "interface_code").
|
||
Save()
|
||
}
|
||
|
||
func recordFailure(ctx context.Context, platformCode, interfaceCode, taskType, errMsg string) {
|
||
dao.SyncTaskLog.Create(ctx, &taskDto.CreateSyncTaskLogReq{
|
||
TaskID: fmt.Sprintf("%s_%s_%d", platformCode, interfaceCode, time.Now().UnixNano()),
|
||
TaskType: taskType,
|
||
PlatformCode: platformCode,
|
||
InterfaceCode: interfaceCode,
|
||
Status: "failed",
|
||
MaxRetry: 3,
|
||
StartTime: time.Now(),
|
||
RequestParams: map[string]interface{}{
|
||
"error": errMsg,
|
||
},
|
||
})
|
||
}
|
||
|
||
// findInterfaceByURL 在所有接口中查找匹配 URL 的接口
|
||
func findInterfaceByURL(ifaces []entity.ApiInterface, url string) *entity.ApiInterface {
|
||
for i := range ifaces {
|
||
if ifaces[i].Url == url {
|
||
return &ifaces[i]
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// injectRowFields 将请求参数中 row_inject 指定的字段注入到响应行中
|
||
// 用于需要将请求参数(如 statisticsMonth)持久化到表中,但响应不含该字段的场景
|
||
func injectRowFields(rows []map[string]interface{}, body map[string]interface{}, requestConfig map[string]interface{}) {
|
||
if requestConfig == nil || body == nil {
|
||
return
|
||
}
|
||
rawInject, ok := requestConfig["row_inject"]
|
||
if !ok {
|
||
return
|
||
}
|
||
injectList, ok := rawInject.([]interface{})
|
||
if !ok {
|
||
return
|
||
}
|
||
for _, item := range injectList {
|
||
fieldName, ok := item.(string)
|
||
if !ok {
|
||
continue
|
||
}
|
||
val, exists := body[fieldName]
|
||
if !exists {
|
||
continue
|
||
}
|
||
for i := range rows {
|
||
rows[i][fieldName] = val
|
||
}
|
||
}
|
||
}
|