数据引擎-快手平台数据抽取bug修复

This commit is contained in:
2026-06-16 10:44:10 +08:00
parent e5133eea34
commit b4fc6f54af
22 changed files with 1324 additions and 487 deletions

View File

@@ -132,13 +132,65 @@ func paramsInQuery(iface *entity.ApiInterface) bool {
return false
}
// pageResult 单页处理结果
type pageResult struct {
raw []byte // 原始响应体hasMore 分页需用于判断是否还有下一页)
rows []map[string]interface{}
maxTime int64
nextCursor string
inserted int
err error
}
// processPage 处理单页数据:请求 → 解析 → 注入
func processPage(ctx context.Context, api *ApiClient, iface *entity.ApiInterface,
method string, inQuery bool, page, pageSize int, lastSyncTime int64,
extraParams map[string]interface{}) pageResult {
body := buildReqBody(ctx, iface, page, pageSize, lastSyncTime, extraParams)
resp, err := api.Request(ctx, method, iface.Url, body, inQuery)
if err != nil {
return pageResult{err: fmt.Errorf("请求失败: %w", err)}
}
rows, _, maxTime, nextCursor, err := parseRespExt(resp.Body, iface.ResponseConfig)
if err != nil {
return pageResult{err: fmt.Errorf("解析失败: %w", err)}
}
injectRowFields(rows, body, iface.RequestConfig)
return pageResult{
raw: resp.Body,
rows: rows,
maxTime: maxTime,
nextCursor: nextCursor,
}
}
// accumPage 将单页结果累加到 SyncResult
func accumPage(result *SyncResult, maxTime *int64, td *TableDefinition,
ctx context.Context, pr pageResult) {
if len(pr.rows) == 0 {
return
}
inserted, _ := savePage(ctx, td, pr.rows)
result.InsertedRows += inserted
result.TotalRows += len(pr.rows)
if pr.maxTime > *maxTime {
*maxTime = pr.maxTime
}
result.TotalPages++
time.Sleep(100 * time.Millisecond)
}
// syncSingleAPI 单接口分页同步
func syncSingleAPI(ctx context.Context, api *ApiClient, platform *PlatformConfig, iface *entity.ApiInterface, td *TableDefinition, isFullSync bool, lastSyncTime int64, start time.Time) (*SyncResult, error) {
pageSize := GetSyncPageSize(ctx)
if ps, ok := iface.RequestConfig["page_size"].(float64); ok {
pageSize = int(ps)
} else if ps, ok := iface.RequestConfig["pageSize"].(float64); ok {
pageSize = int(ps)
if ps, ok := toInt(iface.RequestConfig["page_size"]); ok {
pageSize = ps
} else if ps, ok := toInt(iface.RequestConfig["pageSize"]); ok {
pageSize = ps
}
taskType := "incremental"
@@ -149,159 +201,123 @@ func syncSingleAPI(ctx context.Context, api *ApiClient, platform *PlatformConfig
inQuery := paramsInQuery(iface)
method := string(iface.Method)
// 游标分页首次请求需要处理初始游标值
firstExtra := map[string]interface{}{}
if isCursorPagination(iface) {
cp := "cursor"
if p, ok := iface.RequestConfig["page_param"].(string); ok && p != "" {
cp = p
}
// 支持 initial_cursor 配置如钉钉HRM首次传 0
if icv, ok := iface.RequestConfig["initial_cursor"]; ok {
firstExtra[cp] = icv
} else {
firstExtra[cp] = ""
}
}
body := buildReqBody(iface, 1, pageSize, lastSyncTime, firstExtra)
resp, err := api.Request(ctx, method, iface.Url, body, inQuery)
if err != nil {
recordFailure(ctx, platform.PlatformCode, iface.Code, taskType, err.Error())
return nil, fmt.Errorf("获取第一页失败: %w", err)
// 游标参数名
cursorParam := "cursor"
if p, ok := iface.RequestConfig["page_param"].(string); ok && p != "" {
cursorParam = p
}
cursorMode := isCursorPagination(iface)
rows, totalPages, maxTime, nextCursor, err := parseRespExt(resp.Body, iface.ResponseConfig)
if err != nil {
recordFailure(ctx, platform.PlatformCode, iface.Code, taskType, fmt.Sprintf("解析第一页响应失败: %v", err))
return nil, err
}
// 检测 range 模式快手时间分片按7天分片循环拉取
timeMode, _ := iface.RequestConfig["time_field_mode"].(string)
isRangeMode := timeMode == "range"
injectRowFields(rows, body, iface.RequestConfig)
result := &SyncResult{TableName: td.TableName, TotalPages: totalPages}
inserted, _ := savePage(ctx, td, rows)
result.InsertedRows += inserted
result.TotalRows += len(rows)
// 游标分页
if isCursorPagination(iface) {
for nextCursor != "" && nextCursor != "nomore" {
cp := "cursor"
if p, ok := iface.RequestConfig["page_param"].(string); ok && p != "" {
cp = p
}
body := buildReqBody(iface, 1, pageSize, lastSyncTime, map[string]interface{}{
cp: nextCursor,
})
resp, err := api.Request(ctx, method, iface.Url, body, inQuery)
if err != nil {
logrus.Errorf("游标 %s 请求失败: %v", nextCursor, err)
recordFailure(ctx, platform.PlatformCode, iface.Code, taskType, fmt.Sprintf("游标 %s 请求失败: %v", nextCursor, err))
break
}
rows, _, mt, nc, pe := parseRespExt(resp.Body, iface.ResponseConfig)
if pe != nil {
logrus.Errorf("游标 %s 解析失败: %v", nextCursor, pe)
recordFailure(ctx, platform.PlatformCode, iface.Code, taskType, fmt.Sprintf("游标 %s 解析失败: %v", nextCursor, pe))
break
}
if len(rows) == 0 {
break
}
nextCursor = nc
injectRowFields(rows, body, iface.RequestConfig)
inserted, _ = savePage(ctx, td, rows)
result.InsertedRows += inserted
result.TotalRows += len(rows)
if mt > maxTime {
maxTime = mt
}
result.TotalPages++
time.Sleep(100 * time.Millisecond)
// 计算时间分片range 模式 + 全量同步)
var timeChunks [][2]int64
if isRangeMode && lastSyncTime <= 0 {
chunkMs := int64(3 * 24 * 3600 * 1000) // 3天
startMs := time.Now().Add(-time.Duration(GetDefaultLookbackDays(ctx)) * 24 * time.Hour).UnixMilli()
if fst, ok := toFloat64(iface.RequestConfig["full_sync_start_time"]); ok && fst > 0 {
startMs = int64(fst)
}
} else if iface.ResponseConfig != nil {
// hasMore 分页(如钉钉 offset/size + hasMore
if hf, _ := iface.ResponseConfig["has_more_field"].(string); hf != "" {
for page := 2; hasMoreCheck(resp.Body, hf); page++ {
body := buildReqBody(iface, page, pageSize, lastSyncTime, nil)
resp2, e2 := api.Request(ctx, method, iface.Url, body, inQuery)
if e2 != nil {
logrus.Errorf("第 %d 页请求失败: %v", page, e2)
break
}
rows2, _, mt2, _, pe2 := parseRespExt(resp2.Body, iface.ResponseConfig)
if pe2 != nil {
logrus.Errorf("第 %d 页解析失败: %v", page, pe2)
break
}
injectRowFields(rows2, body, iface.RequestConfig)
inserted2, _ := savePage(ctx, td, rows2)
result.InsertedRows += inserted2
result.TotalRows += len(rows2)
if mt2 > maxTime {
maxTime = mt2
}
resp = resp2
time.Sleep(100 * time.Millisecond)
}
} else {
// 普通分页
for page := 2; page <= totalPages; page++ {
body := buildReqBody(iface, page, pageSize, lastSyncTime, nil)
resp, err = api.Request(ctx, method, iface.Url, body, inQuery)
if err != nil {
logrus.Errorf("第 %d 页请求失败: %v", page, err)
recordFailure(ctx, platform.PlatformCode, iface.Code, taskType, fmt.Sprintf("第 %d 页请求失败: %v", page, err))
continue
}
rows, _, mt, _, pe := parseRespExt(resp.Body, iface.ResponseConfig)
if pe != nil {
logrus.Errorf("第 %d 页解析失败: %v", page, pe)
recordFailure(ctx, platform.PlatformCode, iface.Code, taskType, fmt.Sprintf("第 %d 页解析失败: %v", page, pe))
continue
}
injectRowFields(rows, body, iface.RequestConfig)
inserted, _ = savePage(ctx, td, rows)
result.InsertedRows += inserted
result.TotalRows += len(rows)
if mt > maxTime {
maxTime = mt
}
time.Sleep(100 * time.Millisecond)
endMs := time.Now().UnixMilli()
for t := startMs; t < endMs; t += chunkMs {
chunkEnd := t + chunkMs
if chunkEnd > endMs {
chunkEnd = endMs
}
timeChunks = append(timeChunks, [2]int64{t, chunkEnd})
}
logrus.Infof("时间分片模式: %d 个分片(每片7天)", len(timeChunks))
} else {
// 普通分页(无 response_config
for page := 2; page <= totalPages; page++ {
body := buildReqBody(iface, page, pageSize, lastSyncTime, nil)
resp, err = api.Request(ctx, method, iface.Url, body, inQuery)
if err != nil {
logrus.Errorf("第 %d 页请求失败: %v", page, err)
continue
}
rows, _, mt, _, pe := parseRespExt(resp.Body, iface.ResponseConfig)
if pe != nil {
logrus.Errorf("第 %d 页解析失败: %v", page, pe)
continue
}
injectRowFields(rows, body, iface.RequestConfig)
inserted, _ = savePage(ctx, td, rows)
result.InsertedRows += inserted
result.TotalRows += len(rows)
if mt > maxTime {
maxTime = mt
}
time.Sleep(100 * time.Millisecond)
}
timeChunks = [][2]int64{{0, 0}} // 单次,时间由 buildReqBody 决定
}
if maxTime <= 0 {
maxTime = time.Now().Unix()
result := &SyncResult{TableName: td.TableName}
globalMaxTime := int64(0)
for _, chunk := range timeChunks {
if isRangeMode && chunk[0] > 0 {
lastSyncTime = chunk[0]
}
// 构建第一页的额外参数
firstExtra := map[string]interface{}{}
if cursorMode {
if icv, ok := iface.RequestConfig["initial_cursor"]; ok {
firstExtra[cursorParam] = icv
} else {
firstExtra[cursorParam] = ""
}
}
// 处理第一页
pr := processPage(ctx, api, iface, method, inQuery, 1, pageSize, lastSyncTime, firstExtra)
if pr.err != nil {
recordFailure(ctx, platform.PlatformCode, iface.Code, taskType, pr.err.Error())
return nil, fmt.Errorf("获取第一页失败: %w", pr.err)
}
if len(pr.rows) == 0 {
return &SyncResult{TableName: td.TableName, Duration: fmt.Sprintf("%.1fs", time.Since(start).Seconds())}, nil
}
// result initialized above in time chunk loop
maxTime := pr.maxTime
accumPage(result, &maxTime, td, ctx, pr)
// 分页循环:三种模式仅循环控制和参数不同,内核复用 processPage + accumPage
switch {
case cursorMode:
nextCursor := pr.nextCursor
for nextCursor != "" && nextCursor != "nomore" {
pr := processPage(ctx, api, iface, method, inQuery, 1, pageSize, lastSyncTime, map[string]interface{}{
cursorParam: nextCursor,
})
if pr.err != nil {
logrus.Errorf("游标 %s 处理失败: %v", nextCursor, pr.err)
recordFailure(ctx, platform.PlatformCode, iface.Code, taskType, fmt.Sprintf("游标 %s 失败: %v", nextCursor, pr.err))
break
}
if len(pr.rows) == 0 {
break
}
nextCursor = pr.nextCursor
accumPage(result, &maxTime, td, ctx, pr)
}
case iface.ResponseConfig != nil && iface.ResponseConfig["has_more_field"] != nil:
// hasMore 分页(如钉钉 offset/size + hasMore用上一页的 raw body 判断
hf, _ := iface.ResponseConfig["has_more_field"].(string)
lastRaw := pr.raw
for page := 2; hasMoreCheck(lastRaw, hf); page++ {
pr2 := processPage(ctx, api, iface, method, inQuery, page, pageSize, lastSyncTime, nil)
if pr2.err != nil {
logrus.Errorf("第 %d 页处理失败: %v", page, pr2.err)
break
}
accumPage(result, &maxTime, td, ctx, pr2)
lastRaw = pr2.raw
}
default:
// 普通分页:第一页 parseRespExt 返回 totalPagesparseRespExt 第4个返回值被忽略需要从别处获取
totalPages := getTotalPages(pr.raw)
for page := 2; page <= totalPages; page++ {
pr := processPage(ctx, api, iface, method, inQuery, page, pageSize, lastSyncTime, nil)
if pr.err != nil {
logrus.Errorf("第 %d 页请求失败: %v", page, pr.err)
recordFailure(ctx, platform.PlatformCode, iface.Code, taskType, fmt.Sprintf("第 %d 页失败: %v", page, pr.err))
continue
}
accumPage(result, &maxTime, td, ctx, pr)
}
}
if globalMaxTime <= 0 {
globalMaxTime = time.Now().Unix()
}
}
updateSyncTime(ctx, platform.PlatformCode, iface.Code, maxTime)
updateSyncTime(ctx, platform.PlatformCode, iface.Code, globalMaxTime)
result.Duration = fmt.Sprintf("%.1fs", time.Since(start).Seconds())
logrus.Infof("同步完成 - 表:%s, %d条, 写入%d条, 耗时%s", td.TableName, result.TotalRows, result.InsertedRows, result.Duration)
@@ -350,7 +366,7 @@ func collectPrefetchEntities(rows []map[string]interface{}, prefetch *PrefetchCo
if prefetch.ValueField == "" {
*allEntities = append(*allEntities, item)
} else if v, ok := item[prefetch.ValueField]; ok {
if f, ok := v.(float64); ok {
if f, ok := toFloat64(v); ok {
*allEntities = append(*allEntities, int64(f))
} else {
*allEntities = append(*allEntities, v)
@@ -392,8 +408,8 @@ func syncWithPrefetch(ctx context.Context, api *ApiClient, platform *PlatformCon
prefetchMethod := strings.ToUpper(prefetch.Method)
prefetchPageSize := 100
if prefetchIface != nil && prefetchIface.RequestConfig != nil {
if ps, ok := prefetchIface.RequestConfig["pageSize"].(float64); ok {
prefetchPageSize = int(ps)
if ps, ok := toInt(prefetchIface.RequestConfig["pageSize"]); ok {
prefetchPageSize = ps
}
}
@@ -409,6 +425,9 @@ func syncWithPrefetch(ctx context.Context, api *ApiClient, platform *PlatformCon
var prefetchRespCfg map[string]interface{}
if prefetchIface != nil {
prefetchRespCfg = prefetchIface.ResponseConfig
logrus.Debugf("预取接口配置: code=%s, success_field=%v, success_value=%v", prefetchIface.Code, prefetchRespCfg["success_field"], prefetchRespCfg["success_value"])
} else {
logrus.Warnf("未找到预取接口配置: URL=%s将使用默认配置", prefetch.URL)
}
allEntities := make([]interface{}, 0)
@@ -422,8 +441,8 @@ func syncWithPrefetch(ctx context.Context, api *ApiClient, platform *PlatformCon
if prefetchIface != nil && prefetchRecursiveCfg != nil {
// ----- 递归遍历预取(如钉钉部门树)-----
maxDepth := 20
if md, ok := prefetchIface.RequestConfig["max_recursive_depth"].(float64); ok {
maxDepth = int(md)
if md, ok := toInt(prefetchIface.RequestConfig["max_recursive_depth"]); ok {
maxDepth = md
}
processedKeys := make(map[string]bool)
type rItem struct {
@@ -449,7 +468,7 @@ func syncWithPrefetch(ctx context.Context, api *ApiClient, platform *PlatformCon
if item.keyVal != nil {
extra[prefetchRecursiveCfg.TargetParam] = item.keyVal
}
body := buildReqBody(prefetchReqIface, 1, prefetchPageSize, 0, extra)
body := buildReqBody(ctx, prefetchReqIface, 1, prefetchPageSize, 0, extra)
r2, err := api.Request(ctx, prefetchMethod, prefetch.URL, body, prefetchInQuery)
if err != nil {
logrus.Errorf("预取递归 [depth=%d] 请求失败: %v", item.depth, err)
@@ -465,7 +484,7 @@ func syncWithPrefetch(ctx context.Context, api *ApiClient, platform *PlatformCon
if prefetch.ValueField == "" {
allEntities = append(allEntities, row)
} else if v, ok := row[prefetch.ValueField]; ok {
if f, ok := v.(float64); ok {
if f, ok := toFloat64(v); ok {
allEntities = append(allEntities, int64(f))
} else {
allEntities = append(allEntities, v)
@@ -481,9 +500,15 @@ func syncWithPrefetch(ctx context.Context, api *ApiClient, platform *PlatformCon
// ----- 常规分页预取 -----
firstExtra := make(map[string]interface{})
if prefetchIsCursor {
firstExtra[prefetchPageParam] = ""
// 支持 initial_cursor 配置,如果没有则使用空字符串
if icv, ok := prefetchReqIface.RequestConfig["initial_cursor"]; ok {
firstExtra[prefetchPageParam] = icv
} else {
firstExtra[prefetchPageParam] = ""
}
}
body := buildReqBody(prefetchReqIface, 1, prefetchPageSize, lastSyncTime, firstExtra)
body := buildReqBody(ctx, prefetchReqIface, 1, prefetchPageSize, lastSyncTime, firstExtra)
logrus.Debugf("预取请求 URL: %s, Method: %s, Body: %+v", prefetch.URL, prefetchMethod, body)
resp, err := api.Request(ctx, prefetchMethod, prefetch.URL, body, prefetchInQuery)
if err != nil {
recordFailure(ctx, platform.PlatformCode, iface.Code, taskType, fmt.Sprintf("预取第一页请求失败: %v", err))
@@ -499,7 +524,7 @@ func syncWithPrefetch(ctx context.Context, api *ApiClient, platform *PlatformCon
if prefetchIsCursor {
for nextCursor != "" && nextCursor != "nomore" {
body := buildReqBody(prefetchReqIface, 1, prefetchPageSize, lastSyncTime, map[string]interface{}{
body := buildReqBody(ctx, prefetchReqIface, 1, prefetchPageSize, lastSyncTime, map[string]interface{}{
prefetchPageParam: nextCursor,
})
resp, err := api.Request(ctx, prefetchMethod, prefetch.URL, body, prefetchInQuery)
@@ -521,7 +546,7 @@ func syncWithPrefetch(ctx context.Context, api *ApiClient, platform *PlatformCon
}
} else {
for page := 2; page <= prefetchTotalPages; page++ {
body := buildReqBody(prefetchReqIface, page, prefetchPageSize, lastSyncTime, nil)
body := buildReqBody(ctx, prefetchReqIface, page, prefetchPageSize, lastSyncTime, nil)
resp, err := api.Request(ctx, prefetchMethod, prefetch.URL, body, prefetchInQuery)
if err != nil {
logrus.Errorf("预取第 %d 页请求失败: %v", page, err)
@@ -558,10 +583,10 @@ func syncWithPrefetch(ctx context.Context, api *ApiClient, platform *PlatformCon
// 并发处理每个实体的数据
result := &SyncResult{TableName: td.TableName}
pageSize := GetSyncPageSize(ctx)
if ps, ok := iface.RequestConfig["page_size"].(float64); ok {
pageSize = int(ps)
} else if ps, ok := iface.RequestConfig["pageSize"].(float64); ok {
pageSize = int(ps)
if ps, ok := toInt(iface.RequestConfig["page_size"]); ok {
pageSize = ps
} else if ps, ok := toInt(iface.RequestConfig["pageSize"]); ok {
pageSize = ps
}
dataMethod := string(iface.Method)
@@ -598,7 +623,7 @@ func syncWithPrefetch(ctx context.Context, api *ApiClient, platform *PlatformCon
} else {
firstExtra[cp] = ""
}
body := buildReqBody(iface, 1, pageSize, lastSyncTime, firstExtra)
body := buildReqBody(ctx, iface, 1, pageSize, lastSyncTime, firstExtra)
resp, err := api.Request(ctx, dataMethod, iface.Url, body, inQuery)
if err != nil {
logrus.Errorf(" 实体 %v 首次请求失败: %v", val, err)
@@ -623,7 +648,7 @@ func syncWithPrefetch(ctx context.Context, api *ApiClient, platform *PlatformCon
}
nextCursor := nc
for nextCursor != "" && nextCursor != "nomore" {
body := buildReqBody(iface, 1, pageSize, lastSyncTime, map[string]interface{}{
body := buildReqBody(ctx, iface, 1, pageSize, lastSyncTime, map[string]interface{}{
cp: nextCursor,
prefetch.TargetParam: val,
})
@@ -660,7 +685,7 @@ func syncWithPrefetch(ctx context.Context, api *ApiClient, platform *PlatformCon
page := 1
totalPages := 1
for page <= totalPages {
body := buildReqBody(iface, page, pageSize, lastSyncTime, map[string]interface{}{
body := buildReqBody(ctx, iface, page, pageSize, lastSyncTime, map[string]interface{}{
prefetch.TargetParam: val,
})
resp, err := api.Request(ctx, dataMethod, iface.Url, body, inQuery)
@@ -721,8 +746,8 @@ func syncWithPrefetch(ctx context.Context, api *ApiClient, platform *PlatformCon
// syncRecursive 递归遍历同步(如钉钉部门树:先查根级 → 对每个子部门递归查下级)
func syncRecursive(ctx context.Context, api *ApiClient, platform *PlatformConfig, iface *entity.ApiInterface, td *TableDefinition, recursive *RecursiveConfig, start time.Time) (*SyncResult, error) {
maxDepth := 20
if md, ok := iface.RequestConfig["max_recursive_depth"].(float64); ok {
maxDepth = int(md)
if md, ok := toInt(iface.RequestConfig["max_recursive_depth"]); ok {
maxDepth = md
}
inQuery := paramsInQuery(iface)
@@ -760,7 +785,7 @@ func syncRecursive(ctx context.Context, api *ApiClient, platform *PlatformConfig
extraParams[recursive.TargetParam] = item.keyVal
}
body := buildReqBody(iface, 1, 100, 0, extraParams)
body := buildReqBody(ctx, iface, 1, 100, 0, extraParams)
resp, err := api.Request(ctx, method, iface.Url, body, inQuery)
if err != nil {
logrus.Errorf("递归 [depth=%d] 请求失败: %v", item.depth, err)
@@ -819,6 +844,12 @@ func toFloat64(v interface{}) (float64, bool) {
return float64(val), true
case int64:
return float64(val), true
case json.Number:
f, err := val.Float64()
if err != nil {
return 0, false
}
return f, true
case string:
// 支持字符串类型的成功值(如钉钉智能薪酬返回 code: "200"
if f, err := strconv.ParseFloat(val, 64); err == nil {
@@ -830,6 +861,32 @@ func toFloat64(v interface{}) (float64, bool) {
}
}
// toInt 从 interface{} 安全转换为 int支持 float64/int/int64/json.Number/string 类型
// JSONB 字段通过 pgx 驱动扫描时可能返回 json.Number 而非 float64
func toInt(v interface{}) (int, bool) {
switch val := v.(type) {
case float64:
return int(val), true
case int:
return val, true
case int64:
return int(val), true
case json.Number:
i, err := val.Int64()
if err != nil {
return 0, false
}
return int(i), true
case string:
if i, err := strconv.Atoi(val); err == nil {
return i, true
}
return 0, false
default:
return 0, false
}
}
// buildPrefetchParams 构建预取接口的请求参数
func buildPrefetchParams(iface *entity.ApiInterface) map[string]interface{} {
params := make(map[string]interface{})
@@ -960,8 +1017,39 @@ func extractValues(raw []byte, path, valueField string) ([]interface{}, error) {
return nil, fmt.Errorf("路径 %s 不完整", path)
}
// normalizeJSONNumbers 递归将 json.Number 转换为 Go 原生数值类型
// pgx 驱动扫描 PostgreSQL JSONB 数值字段时可能返回 json.Number底层是 string
// 直接用 json.Marshal 会将其序列化为带引号的字符串(如 "0" 而非 0
// 导致快手等平台的 param JSON 签名校验失败
func normalizeJSONNumbers(v interface{}) interface{} {
switch val := v.(type) {
case json.Number:
if i, err := val.Int64(); err == nil {
return i
}
if f, err := val.Float64(); err == nil {
return f
}
return val.String()
case map[string]interface{}:
result := make(map[string]interface{}, len(val))
for mk, mv := range val {
result[mk] = normalizeJSONNumbers(mv)
}
return result
case []interface{}:
result := make([]interface{}, len(val))
for i, item := range val {
result[i] = normalizeJSONNumbers(item)
}
return result
default:
return v
}
}
// buildReqBody 构建请求参数
func buildReqBody(iface *entity.ApiInterface, page, pageSize int, lastSyncTime int64, extraParams map[string]interface{}) map[string]interface{} {
func buildReqBody(ctx context.Context, iface *entity.ApiInterface, page, pageSize int, lastSyncTime int64, extraParams map[string]interface{}) map[string]interface{} {
body := make(map[string]interface{})
if iface.RequestConfig != nil {
for k, v := range iface.RequestConfig {
@@ -1017,15 +1105,23 @@ func buildReqBody(iface *entity.ApiInterface, page, pageSize int, lastSyncTime i
timeMs := lastSyncTime
if timeMs <= 0 {
// 全量:优先使用配置的 full_sync_start_time否则默认90天前
if fst, ok := iface.RequestConfig["full_sync_start_time"].(float64); ok && fst > 0 {
if fst, ok := toFloat64(iface.RequestConfig["full_sync_start_time"]); ok && fst > 0 {
timeMs = int64(fst)
} else {
timeMs = time.Now().Add(-90 * 24 * time.Hour).UnixMilli()
timeMs = time.Now().Add(-time.Duration(GetDefaultLookbackDays(ctx)) * 24 * time.Hour).UnixMilli()
}
}
body["queryType"] = 2
// 仅在配置未指定 queryType 时设默认值,尊重配置
if _, exists := body["queryType"]; !exists {
body["queryType"] = 2
}
body["beginTime"] = timeMs
body["endTime"] = time.Now().UnixMilli()
endTime := time.Now().UnixMilli()
maxRangeMs := int64(3 * 24 * 3600 * 1000)
if endTime-timeMs > maxRangeMs {
endTime = timeMs + maxRangeMs
}
body["endTime"] = endTime
} else if lastSyncTime > 0 {
// 腾讯 filtering 模式(仅增量时)
timeFilter := map[string]interface{}{
@@ -1038,7 +1134,7 @@ func buildReqBody(iface *entity.ApiInterface, page, pageSize int, lastSyncTime i
} else {
body["filtering"] = []interface{}{timeFilter}
}
} else if fst, ok := iface.RequestConfig["full_sync_start_time"].(float64); ok && fst > 0 {
} else if fst, ok := toFloat64(iface.RequestConfig["full_sync_start_time"]); ok && fst > 0 {
// 全量 filtering 模式:指定了 full_sync_start_time从该时间戳开始拉取
timeFilter := map[string]interface{}{
"field": tf,
@@ -1076,7 +1172,11 @@ func buildReqBody(iface *entity.ApiInterface, page, pageSize int, lastSyncTime i
delete(body, k)
}
}
b, err := json.Marshal(wrapperObj)
// 规范化 json.Number → Go 原生数值类型,避免 json.Marshal 将其序列化为字符串
// pgx 驱动扫描 JSONB 数值时可能返回 json.Number其底层是 string 类型)
normalized := normalizeJSONNumbers(wrapperObj)
b, err := json.Marshal(normalized)
logrus.Infof("body_wrapper param JSON (normalized): %s", string(b))
if err != nil {
logrus.Errorf("JSON序列化 wrapper 失败: %v", err)
} else {
@@ -1127,6 +1227,17 @@ func parseRespExt(raw []byte, rc map[string]interface{}) ([]map[string]interface
actual, _ := toFloat64(v)
if actual != successVal {
msg, _ := respMap[msgField].(string)
// 如果配置的消息字段为空,尝试通用的错误信息字段
if msg == "" {
for _, altField := range []string{"error_msg", "sub_msg", "msg"} {
if altMsg, _ := respMap[altField].(string); altMsg != "" {
msg = altMsg
break
}
}
}
respJSON, _ := json.Marshal(respMap)
logrus.Errorf("API响应校验失败: success_field=%s, expected=%v, actual=%v, message=%s, 完整响应: %s", successField, successVal, actual, msg, string(respJSON))
return nil, 0, 0, "", fmt.Errorf("API错误: %s=%v, %s=%s", successField, v, msgField, msg)
}
}
@@ -1202,7 +1313,7 @@ func parseRespExt(raw []byte, rc map[string]interface{}) ([]map[string]interface
j, _ := json.Marshal(m)
flat["raw_data"] = string(j)
for _, tf := range []string{"last_modified_time", "created_time", "update_time", "createTime", "updateTime", "lastModifiedTime"} {
if t, ok := flat[tf].(float64); ok && int64(t) > maxTime {
if t, ok := toFloat64(flat[tf]); ok && int64(t) > maxTime {
maxTime = int64(t)
}
}
@@ -1217,7 +1328,7 @@ func parseRespExt(raw []byte, rc map[string]interface{}) ([]map[string]interface
if i == len(cp)-1 {
if s, ok := cc[p].(string); ok {
nextCursor = s
} else if f, ok := cc[p].(float64); ok {
} else if f, ok := toFloat64(cc[p]); ok {
// 数字游标(如钉钉 next_cursor=10
nextCursor = fmt.Sprintf("%.0f", f)
}
@@ -1241,8 +1352,8 @@ func parseRespExt(raw []byte, rc map[string]interface{}) ([]map[string]interface
}
}
if pi, ok := dataContainer["page_info"].(map[string]interface{}); ok {
if tp, ok := pi["total_page"].(float64); ok {
totalPages = int(tp)
if tp, ok := toInt(pi["total_page"]); ok {
totalPages = tp
}
}
return rows, totalPages, maxTime, nextCursor, nil