package sync import ( "context" "encoding/json" "fmt" "strconv" "strings" "sync" "time" consts "dataengine/consts/public" dao "dataengine/dao/copydata" taskDto "dataengine/model/dto/copydata" entity "dataengine/model/entity/dict" "gitea.redpowerfuture.com/red-future/common/db/gfdb" "github.com/sirupsen/logrus" ) // syncRunningMap 防止同一个接口被并发执行同步 var syncRunningMap sync.Map // SyncResult 同步结果 type SyncResult struct { TableName string TotalPages int TotalRows int InsertedRows int Duration string } // PrefetchConfig 预取配置 type PrefetchConfig struct { URL string `json:"url"` Method string `json:"method"` ResponsePath string `json:"response_path"` TargetParam string `json:"target_param"` ValueField string `json:"value_field"` } // RecursiveConfig 递归遍历配置(如钉钉部门树) type RecursiveConfig struct { KeyField string `json:"key_field"` TargetParam string `json:"target_param"` } // SyncByConfig 执行同步 func SyncByConfig(ctx context.Context, platformCode, interfaceCode string, isFullSync bool) (*SyncResult, error) { // 创建超时 context 防止单次同步卡死 timeoutMin := GetSyncTimeout(ctx) timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(timeoutMin)*time.Minute) defer cancel() ctx = timeoutCtx // 内存锁:防止同一个接口被并发执行(两个调度周期重叠时跳过) lockKey := platformCode + "/" + interfaceCode if _, loaded := syncRunningMap.LoadOrStore(lockKey, true); loaded { logrus.Warnf("接口 [%s] 正在同步中,跳过重复请求", lockKey) return nil, fmt.Errorf("接口 [%s] 正在同步中,跳过", lockKey) } defer syncRunningMap.Delete(lockKey) start := time.Now() pm := &PlatformManager{} platform, ifaces, err := pm.GetPlatformWithInterfaces(ctx, platformCode) if err != nil { return nil, fmt.Errorf("读取平台配置失败: %w", err) } var iface *entity.ApiInterface for i := range ifaces { if ifaces[i].Code == interfaceCode { iface = &ifaces[i] break } } if iface == nil { return nil, fmt.Errorf("未找到接口 [%s]", interfaceCode) } if iface.TableDefinition == nil || len(iface.TableDefinition) == 0 { return nil, fmt.Errorf("接口 [%s] 未配置 table_definition", interfaceCode) } td, err := ParseTableDefinition(iface.TableDefinition) if err != nil { return nil, fmt.Errorf("解析表结构失败: %w", err) } if err := EnsureTable(ctx, td); err != nil { return nil, fmt.Errorf("建表失败: %w", err) } // 检查上次同步状态(在标记 running 之前检查) prevStatus := getSyncStatus(ctx, platformCode, interfaceCode) lastSyncTime := int64(0) if !isFullSync { lastSyncTime = getLastSyncTime(ctx, platformCode, interfaceCode) } if prevStatus == "running" { logrus.Warnf("检测到上次同步异常中断 [%s/%s],将重新全量同步", platformCode, interfaceCode) lastSyncTime = 0 } // 标记同步开始(保留 last_sync_time 不变,状态设为 running) markSyncRunning(ctx, platformCode, interfaceCode, lastSyncTime) api := NewApiClient(platform) defer api.Close() prefetch := parsePrefetchConfig(iface.RequestConfig) if prefetch != nil { return syncWithPrefetch(ctx, api, platform, iface, ifaces, td, prefetch, isFullSync, lastSyncTime, start) } recursive := parseRecursiveConfig(iface.RequestConfig) if recursive != nil { return syncRecursive(ctx, api, platform, iface, td, recursive, start) } return syncSingleAPI(ctx, api, platform, iface, td, isFullSync, lastSyncTime, start) } // paramsInQuery 判断参数是否应放在 URL 查询字符串中 func paramsInQuery(iface *entity.ApiInterface) bool { if iface.Method == "GET" { return true } if iface.RequestConfig != nil { if loc, _ := iface.RequestConfig["parameters_location"].(string); loc == "query" { return true } } return false } // pageResult 单页处理结果 type pageResult struct { raw []byte // 原始响应体(hasMore 分页需用于判断是否还有下一页) rows []map[string]interface{} maxTime int64 nextCursor string inserted int err error } // processPage 处理单页数据:请求 → 解析 → 注入 func processPage(ctx context.Context, api *ApiClient, iface *entity.ApiInterface, method string, inQuery bool, page, pageSize int, lastSyncTime int64, extraParams map[string]interface{}) pageResult { body := buildReqBody(ctx, iface, page, pageSize, lastSyncTime, extraParams) resp, err := api.Request(ctx, method, iface.Url, body, inQuery) if err != nil { return pageResult{err: fmt.Errorf("请求失败: %w", err)} } rows, _, maxTime, nextCursor, err := parseRespExt(resp.Body, iface.ResponseConfig) if err != nil { return pageResult{err: fmt.Errorf("解析失败: %w", err)} } injectRowFields(rows, body, iface.RequestConfig) return pageResult{ raw: resp.Body, rows: rows, maxTime: maxTime, nextCursor: nextCursor, } } // accumPage 将单页结果累加到 SyncResult func accumPage(result *SyncResult, maxTime *int64, td *TableDefinition, ctx context.Context, pr pageResult) { if len(pr.rows) == 0 { return } inserted, _ := savePage(ctx, td, pr.rows) result.InsertedRows += inserted result.TotalRows += len(pr.rows) if pr.maxTime > *maxTime { *maxTime = pr.maxTime } result.TotalPages++ time.Sleep(100 * time.Millisecond) } // syncSingleAPI 单接口分页同步 func syncSingleAPI(ctx context.Context, api *ApiClient, platform *PlatformConfig, iface *entity.ApiInterface, td *TableDefinition, isFullSync bool, lastSyncTime int64, start time.Time) (*SyncResult, error) { pageSize := GetSyncPageSize(ctx) if ps, ok := toInt(iface.RequestConfig["page_size"]); ok { pageSize = ps } else if ps, ok := toInt(iface.RequestConfig["pageSize"]); ok { pageSize = ps } taskType := "incremental" if isFullSync || lastSyncTime <= 0 { taskType = "full" } inQuery := paramsInQuery(iface) method := string(iface.Method) // 游标参数名 cursorParam := "cursor" if p, ok := iface.RequestConfig["page_param"].(string); ok && p != "" { cursorParam = p } cursorMode := isCursorPagination(iface) // 检测 range 模式(快手时间分片),按7天分片循环拉取 timeMode, _ := iface.RequestConfig["time_field_mode"].(string) isRangeMode := timeMode == "range" // 计算时间分片(range 模式 + 全量同步) var timeChunks [][2]int64 if isRangeMode && lastSyncTime <= 0 { chunkMs := int64(3 * 24 * 3600 * 1000) // 3天 startMs := time.Now().Add(-time.Duration(GetDefaultLookbackDays(ctx)) * 24 * time.Hour).UnixMilli() if fst, ok := toFloat64(iface.RequestConfig["full_sync_start_time"]); ok && fst > 0 { startMs = int64(fst) } endMs := time.Now().UnixMilli() for t := startMs; t < endMs; t += chunkMs { chunkEnd := t + chunkMs if chunkEnd > endMs { chunkEnd = endMs } timeChunks = append(timeChunks, [2]int64{t, chunkEnd}) } logrus.Infof("时间分片模式: %d 个分片(每片7天)", len(timeChunks)) } else { timeChunks = [][2]int64{{0, 0}} // 单次,时间由 buildReqBody 决定 } result := &SyncResult{TableName: td.TableName} globalMaxTime := int64(0) for _, chunk := range timeChunks { if isRangeMode && chunk[0] > 0 { lastSyncTime = chunk[0] } // 构建第一页的额外参数 firstExtra := map[string]interface{}{} if cursorMode { if icv, ok := iface.RequestConfig["initial_cursor"]; ok { firstExtra[cursorParam] = icv } else { firstExtra[cursorParam] = "" } } // 处理第一页 pr := processPage(ctx, api, iface, method, inQuery, 1, pageSize, lastSyncTime, firstExtra) if pr.err != nil { recordFailure(ctx, platform.PlatformCode, iface.Code, taskType, pr.err.Error()) return nil, fmt.Errorf("获取第一页失败: %w", pr.err) } if len(pr.rows) == 0 { return &SyncResult{TableName: td.TableName, Duration: fmt.Sprintf("%.1fs", time.Since(start).Seconds())}, nil } // result initialized above in time chunk loop maxTime := pr.maxTime accumPage(result, &maxTime, td, ctx, pr) // 分页循环:三种模式仅循环控制和参数不同,内核复用 processPage + accumPage switch { case cursorMode: nextCursor := pr.nextCursor for nextCursor != "" && nextCursor != "nomore" { pr := processPage(ctx, api, iface, method, inQuery, 1, pageSize, lastSyncTime, map[string]interface{}{ cursorParam: nextCursor, }) if pr.err != nil { logrus.Errorf("游标 %s 处理失败: %v", nextCursor, pr.err) recordFailure(ctx, platform.PlatformCode, iface.Code, taskType, fmt.Sprintf("游标 %s 失败: %v", nextCursor, pr.err)) break } if len(pr.rows) == 0 { break } nextCursor = pr.nextCursor accumPage(result, &maxTime, td, ctx, pr) } case iface.ResponseConfig != nil && iface.ResponseConfig["has_more_field"] != nil: // hasMore 分页(如钉钉 offset/size + hasMore):用上一页的 raw body 判断 hf, _ := iface.ResponseConfig["has_more_field"].(string) lastRaw := pr.raw for page := 2; hasMoreCheck(lastRaw, hf); page++ { pr2 := processPage(ctx, api, iface, method, inQuery, page, pageSize, lastSyncTime, nil) if pr2.err != nil { logrus.Errorf("第 %d 页处理失败: %v", page, pr2.err) break } accumPage(result, &maxTime, td, ctx, pr2) lastRaw = pr2.raw } default: // 普通分页:第一页 parseRespExt 返回 totalPages(parseRespExt 第4个返回值被忽略,需要从别处获取) totalPages := getTotalPages(pr.raw) for page := 2; page <= totalPages; page++ { pr := processPage(ctx, api, iface, method, inQuery, page, pageSize, lastSyncTime, nil) if pr.err != nil { logrus.Errorf("第 %d 页请求失败: %v", page, pr.err) recordFailure(ctx, platform.PlatformCode, iface.Code, taskType, fmt.Sprintf("第 %d 页失败: %v", page, pr.err)) continue } accumPage(result, &maxTime, td, ctx, pr) } } if globalMaxTime <= 0 { globalMaxTime = time.Now().Unix() } } updateSyncTime(ctx, platform.PlatformCode, iface.Code, globalMaxTime) result.Duration = fmt.Sprintf("%.1fs", time.Since(start).Seconds()) logrus.Infof("同步完成 - 表:%s, %d条, 写入%d条, 耗时%s", td.TableName, result.TotalRows, result.InsertedRows, result.Duration) return result, nil } func isCursorPagination(iface *entity.ApiInterface) bool { if iface.RequestConfig == nil { return false } cp, _ := iface.RequestConfig["cursor_pagination"].(bool) return cp } // hasMoreCheck 从响应体中提取 has_more_field 的值 func hasMoreCheck(raw []byte, hasMorePath string) bool { var respMap map[string]interface{} if err := json.Unmarshal(raw, &respMap); err != nil { return false } parts := strings.Split(hasMorePath, ".") cc := respMap for i, p := range parts { if i == len(parts)-1 { if b, ok := cc[p].(bool); ok { return b } if s, ok := cc[p].(string); ok { return s == "true" } return false } if m, ok := cc[p].(map[string]interface{}); ok { cc = m } else { return false } } return false } // collectPrefetchEntities 从 rows 中收集实体和行数据 func collectPrefetchEntities(rows []map[string]interface{}, prefetch *PrefetchConfig, allEntities *[]interface{}, allRows *[]map[string]interface{}) { for _, item := range rows { *allRows = append(*allRows, item) if prefetch.ValueField == "" { *allEntities = append(*allEntities, item) } else if v, ok := item[prefetch.ValueField]; ok { if f, ok := toFloat64(v); ok { *allEntities = append(*allEntities, int64(f)) } else { *allEntities = append(*allEntities, v) } } } } // syncWithPrefetch 预取模式同步(先分页拉取全部实体列表,再并发处理每个实体) func syncWithPrefetch(ctx context.Context, api *ApiClient, platform *PlatformConfig, iface *entity.ApiInterface, allIfaces []entity.ApiInterface, td *TableDefinition, prefetch *PrefetchConfig, isFullSync bool, lastSyncTime int64, start time.Time) (*SyncResult, error) { logrus.Infof("预取模式: %s -> %s", prefetch.URL, iface.Url) taskType := "incremental" if isFullSync || lastSyncTime <= 0 { taskType = "full" } // ====== 1. 预取阶段:分页拉取全部实体列表 ====== prefetchIface := findInterfaceByURL(allIfaces, prefetch.URL) // 判断预取来源是否有递归配置(如钉钉部门树) var prefetchRecursiveCfg *RecursiveConfig if prefetchIface != nil { prefetchRecursiveCfg = parseRecursiveConfig(prefetchIface.RequestConfig) } // 判断预取来源是否游标分页,以及分页参数名 prefetchIsCursor := false prefetchPageParam := "page" if prefetchIface != nil && prefetchIface.RequestConfig != nil { if cp, ok := prefetchIface.RequestConfig["cursor_pagination"].(bool); ok { prefetchIsCursor = cp } if p, ok := prefetchIface.RequestConfig["page_param"].(string); ok && p != "" { prefetchPageParam = p } } prefetchMethod := strings.ToUpper(prefetch.Method) prefetchPageSize := 100 if prefetchIface != nil && prefetchIface.RequestConfig != nil { if ps, ok := toInt(prefetchIface.RequestConfig["pageSize"]); ok { prefetchPageSize = ps } } // 使用 prefetch 来源接口自己的配置判断参数位置 var prefetchInQuery bool if prefetchIface != nil { prefetchInQuery = paramsInQuery(prefetchIface) } else { prefetchInQuery = paramsInQuery(iface) } // prefetch 来源接口的 response_config(用于正确解析列表路径) var prefetchRespCfg map[string]interface{} if prefetchIface != nil { prefetchRespCfg = prefetchIface.ResponseConfig logrus.Debugf("预取接口配置: code=%s, success_field=%v, success_value=%v", prefetchIface.Code, prefetchRespCfg["success_field"], prefetchRespCfg["success_value"]) } else { logrus.Warnf("未找到预取接口配置: URL=%s,将使用默认配置", prefetch.URL) } allEntities := make([]interface{}, 0) allRows := make([]map[string]interface{}, 0) prefetchReqIface := prefetchIface if prefetchReqIface == nil { prefetchReqIface = iface } if prefetchIface != nil && prefetchRecursiveCfg != nil { // ----- 递归遍历预取(如钉钉部门树)----- maxDepth := 20 if md, ok := toInt(prefetchIface.RequestConfig["max_recursive_depth"]); ok { maxDepth = md } processedKeys := make(map[string]bool) type rItem struct { depth int keyVal interface{} } queue := []rItem{{depth: 0, keyVal: nil}} for len(queue) > 0 { item := queue[0] queue = queue[1:] if item.depth > maxDepth { continue } if item.keyVal != nil { keyStr := fmt.Sprintf("%v", item.keyVal) if processedKeys[keyStr] { continue } processedKeys[keyStr] = true } extra := make(map[string]interface{}) if item.keyVal != nil { extra[prefetchRecursiveCfg.TargetParam] = item.keyVal } body := buildReqBody(ctx, prefetchReqIface, 1, prefetchPageSize, 0, extra) r2, err := api.Request(ctx, prefetchMethod, prefetch.URL, body, prefetchInQuery) if err != nil { logrus.Errorf("预取递归 [depth=%d] 请求失败: %v", item.depth, err) continue } itemRows, _, _, _, pe := parseRespExt(r2.Body, prefetchRespCfg) if pe != nil { logrus.Errorf("预取递归 [depth=%d] 解析失败: %v", item.depth, pe) continue } for _, row := range itemRows { allRows = append(allRows, row) if prefetch.ValueField == "" { allEntities = append(allEntities, row) } else if v, ok := row[prefetch.ValueField]; ok { if f, ok := toFloat64(v); ok { allEntities = append(allEntities, int64(f)) } else { allEntities = append(allEntities, v) } } if v, ok := row[prefetchRecursiveCfg.KeyField]; ok { queue = append(queue, rItem{depth: item.depth + 1, keyVal: v}) } } time.Sleep(100 * time.Millisecond) } } else { // ----- 常规分页预取 ----- firstExtra := make(map[string]interface{}) if prefetchIsCursor { // 支持 initial_cursor 配置,如果没有则使用空字符串 if icv, ok := prefetchReqIface.RequestConfig["initial_cursor"]; ok { firstExtra[prefetchPageParam] = icv } else { firstExtra[prefetchPageParam] = "" } } body := buildReqBody(ctx, prefetchReqIface, 1, prefetchPageSize, lastSyncTime, firstExtra) logrus.Debugf("预取请求 URL: %s, Method: %s, Body: %+v", prefetch.URL, prefetchMethod, body) resp, err := api.Request(ctx, prefetchMethod, prefetch.URL, body, prefetchInQuery) if err != nil { recordFailure(ctx, platform.PlatformCode, iface.Code, taskType, fmt.Sprintf("预取第一页请求失败: %v", err)) return nil, fmt.Errorf("预取第一页失败: %w", err) } rows, prefetchTotalPages, _, nextCursor, err := parseRespExt(resp.Body, prefetchRespCfg) if err != nil { recordFailure(ctx, platform.PlatformCode, iface.Code, taskType, fmt.Sprintf("解析预取响应失败: %v", err)) return nil, fmt.Errorf("解析预取响应失败: %w", err) } collectPrefetchEntities(rows, prefetch, &allEntities, &allRows) if prefetchIsCursor { for nextCursor != "" && nextCursor != "nomore" { body := buildReqBody(ctx, prefetchReqIface, 1, prefetchPageSize, lastSyncTime, map[string]interface{}{ prefetchPageParam: nextCursor, }) resp, err := api.Request(ctx, prefetchMethod, prefetch.URL, body, prefetchInQuery) if err != nil { logrus.Errorf("预取游标 %s 请求失败: %v", nextCursor, err) break } rows, _, _, nc, pe := parseRespExt(resp.Body, prefetchRespCfg) if pe != nil { logrus.Errorf("预取游标 %s 解析失败: %v", nextCursor, pe) break } if len(rows) == 0 { break } nextCursor = nc collectPrefetchEntities(rows, prefetch, &allEntities, &allRows) time.Sleep(100 * time.Millisecond) } } else { for page := 2; page <= prefetchTotalPages; page++ { body := buildReqBody(ctx, prefetchReqIface, page, prefetchPageSize, lastSyncTime, nil) resp, err := api.Request(ctx, prefetchMethod, prefetch.URL, body, prefetchInQuery) if err != nil { logrus.Errorf("预取第 %d 页请求失败: %v", page, err) continue } rows, _, _, _, pe := parseRespExt(resp.Body, prefetchRespCfg) if pe != nil { logrus.Errorf("预取第 %d 页解析失败: %v", page, pe) continue } collectPrefetchEntities(rows, prefetch, &allEntities, &allRows) time.Sleep(100 * time.Millisecond) } } } if len(allEntities) == 0 { logrus.Warn("预取结果为空列表,跳过同步") return &SyncResult{TableName: td.TableName, Duration: fmt.Sprintf("%.1fs", time.Since(start).Seconds())}, nil } logrus.Infof("预取到 %d 个实体", len(allEntities)) // 将预取的数据也存入库(如账户列表存入 tencent_account_relation) if prefetchIface != nil && prefetchIface.TableDefinition != nil { prefetchTd, err := ParseTableDefinition(prefetchIface.TableDefinition) if err == nil { if ensureErr := EnsureTable(ctx, prefetchTd); ensureErr == nil { saved, _ := savePage(ctx, prefetchTd, allRows) logrus.Infof("预取数据已存库: %s, %d 条", prefetchTd.TableName, saved) } } } // 并发处理每个实体的数据 result := &SyncResult{TableName: td.TableName} pageSize := GetSyncPageSize(ctx) if ps, ok := toInt(iface.RequestConfig["page_size"]); ok { pageSize = ps } else if ps, ok := toInt(iface.RequestConfig["pageSize"]); ok { pageSize = ps } dataMethod := string(iface.Method) inQuery := paramsInQuery(iface) concurrency := GetSyncConcurrency(ctx) var mu sync.Mutex var wg sync.WaitGroup sem := make(chan struct{}, concurrency) globalMaxTime := lastSyncTime for idx, entityVal := range allEntities { wg.Add(1) sem <- struct{}{} go func(idx int, val interface{}) { defer wg.Done() defer func() { <-sem }() logrus.Infof(" 处理实体 [%d/%d]: %v", idx+1, len(allEntities), val) entityMaxTime := int64(0) if isCursorPagination(iface) { // ----- 游标分页(如钉钉 user_list)----- cp := "cursor" if p, ok := iface.RequestConfig["page_param"].(string); ok && p != "" { cp = p } firstExtra := map[string]interface{}{ prefetch.TargetParam: val, } if icv, ok := iface.RequestConfig["initial_cursor"]; ok { firstExtra[cp] = icv } else { firstExtra[cp] = "" } body := buildReqBody(ctx, iface, 1, pageSize, lastSyncTime, firstExtra) resp, err := api.Request(ctx, dataMethod, iface.Url, body, inQuery) if err != nil { logrus.Errorf(" 实体 %v 首次请求失败: %v", val, err) return } rows, _, mt, nc, pe := parseRespExt(resp.Body, iface.ResponseConfig) if pe != nil { logrus.Errorf(" 实体 %v 解析首页失败: %v", val, pe) return } for i := range rows { rows[i][prefetch.TargetParam] = val } injectRowFields(rows, body, iface.RequestConfig) inserted, _ := savePage(ctx, td, rows) mu.Lock() result.InsertedRows += inserted result.TotalRows += len(rows) mu.Unlock() if mt > entityMaxTime { entityMaxTime = mt } nextCursor := nc for nextCursor != "" && nextCursor != "nomore" { body := buildReqBody(ctx, iface, 1, pageSize, lastSyncTime, map[string]interface{}{ cp: nextCursor, prefetch.TargetParam: val, }) resp, err := api.Request(ctx, dataMethod, iface.Url, body, inQuery) if err != nil { logrus.Errorf(" 实体 %v 游标 %s 失败: %v", val, nextCursor, err) break } rows, _, mt, nc, pe := parseRespExt(resp.Body, iface.ResponseConfig) if pe != nil { logrus.Errorf(" 实体 %v 游标 %s 解析失败: %v", val, nextCursor, pe) break } if len(rows) == 0 { break } nextCursor = nc for i := range rows { rows[i][prefetch.TargetParam] = val } injectRowFields(rows, body, iface.RequestConfig) inserted, _ := savePage(ctx, td, rows) mu.Lock() result.InsertedRows += inserted result.TotalRows += len(rows) mu.Unlock() if mt > entityMaxTime { entityMaxTime = mt } time.Sleep(100 * time.Millisecond) } } else { // ----- 普通分页 ----- page := 1 totalPages := 1 for page <= totalPages { body := buildReqBody(ctx, iface, page, pageSize, lastSyncTime, map[string]interface{}{ prefetch.TargetParam: val, }) resp, err := api.Request(ctx, dataMethod, iface.Url, body, inQuery) if err != nil { logrus.Errorf(" 实体 %v 第 %d 页失败: %v", val, page, err) page++ time.Sleep(200 * time.Millisecond) continue } rows, tp, mt, parseErr := parseResp(resp.Body, iface.ResponseConfig) if parseErr != nil { logrus.Errorf(" 解析响应失败: %v", parseErr) page++ continue } if page == 1 { totalPages = tp } for i := range rows { rows[i][prefetch.TargetParam] = val } injectRowFields(rows, body, iface.RequestConfig) inserted, _ := savePage(ctx, td, rows) mu.Lock() result.InsertedRows += inserted result.TotalRows += len(rows) mu.Unlock() if mt > entityMaxTime { entityMaxTime = mt } page++ time.Sleep(100 * time.Millisecond) } } if entityMaxTime > 0 { mu.Lock() if entityMaxTime > globalMaxTime { globalMaxTime = entityMaxTime } mu.Unlock() } }(idx, entityVal) } wg.Wait() if globalMaxTime <= 0 { globalMaxTime = time.Now().Unix() } updateSyncTime(ctx, platform.PlatformCode, iface.Code, globalMaxTime) result.Duration = fmt.Sprintf("%.1fs", time.Since(start).Seconds()) logrus.Infof("同步完成 - 表:%s, %d条, 写入%d条, 耗时%s", td.TableName, result.TotalRows, result.InsertedRows, result.Duration) return result, nil } // syncRecursive 递归遍历同步(如钉钉部门树:先查根级 → 对每个子部门递归查下级) func syncRecursive(ctx context.Context, api *ApiClient, platform *PlatformConfig, iface *entity.ApiInterface, td *TableDefinition, recursive *RecursiveConfig, start time.Time) (*SyncResult, error) { maxDepth := 20 if md, ok := toInt(iface.RequestConfig["max_recursive_depth"]); ok { maxDepth = md } inQuery := paramsInQuery(iface) method := string(iface.Method) allRows := make([]map[string]interface{}, 0) processedKeys := make(map[string]bool) type queueItem struct { depth int keyVal interface{} // nil 表示根级 } queue := []queueItem{{depth: 0, keyVal: nil}} for len(queue) > 0 { item := queue[0] queue = queue[1:] if item.depth > maxDepth { logrus.Warnf("递归已达最大深度 %d,终止该分支", maxDepth) continue } // 防重复处理 if item.keyVal != nil { keyStr := fmt.Sprintf("%v", item.keyVal) if processedKeys[keyStr] { continue } processedKeys[keyStr] = true } extraParams := make(map[string]interface{}) if item.keyVal != nil { extraParams[recursive.TargetParam] = item.keyVal } body := buildReqBody(ctx, iface, 1, 100, 0, extraParams) resp, err := api.Request(ctx, method, iface.Url, body, inQuery) if err != nil { logrus.Errorf("递归 [depth=%d] 请求失败: %v", item.depth, err) recordFailure(ctx, platform.PlatformCode, iface.Code, "full", fmt.Sprintf("递归深度 %d 请求失败: %v", item.depth, err)) continue } rows, _, _, _, err := parseRespExt(resp.Body, iface.ResponseConfig) if err != nil { logrus.Errorf("递归 [depth=%d] 解析失败: %v", item.depth, err) continue } for _, row := range rows { allRows = append(allRows, row) if v, ok := row[recursive.KeyField]; ok { queue = append(queue, queueItem{depth: item.depth + 1, keyVal: v}) } } time.Sleep(100 * time.Millisecond) } if len(allRows) == 0 { logrus.Warn("递归结果为空,跳过入库") return &SyncResult{TableName: td.TableName, Duration: fmt.Sprintf("%.1fs", time.Since(start).Seconds())}, nil } inserted, _ := savePage(ctx, td, allRows) updateSyncTime(ctx, platform.PlatformCode, iface.Code, time.Now().Unix()) result := &SyncResult{ TableName: td.TableName, TotalRows: len(allRows), InsertedRows: inserted, Duration: fmt.Sprintf("%.1fs", time.Since(start).Seconds()), } logrus.Infof("递归同步完成 - 表:%s, %d条, 写入%d条, 耗时%s", td.TableName, result.TotalRows, result.InsertedRows, result.Duration) return result, nil } // getTotalPages 从响应中提取总页数 func getTotalPages(raw []byte) int { rows, tp, _, _, err := parseRespExt(raw, nil) if err != nil || len(rows) == 0 { return 0 } return tp } func toFloat64(v interface{}) (float64, bool) { switch val := v.(type) { case float64: return val, true case int: return float64(val), true case int64: return float64(val), true case json.Number: f, err := val.Float64() if err != nil { return 0, false } return f, true case string: // 支持字符串类型的成功值(如钉钉智能薪酬返回 code: "200") if f, err := strconv.ParseFloat(val, 64); err == nil { return f, true } return 0, false default: return 0, false } } // toInt 从 interface{} 安全转换为 int,支持 float64/int/int64/json.Number/string 类型 // JSONB 字段通过 pgx 驱动扫描时可能返回 json.Number 而非 float64 func toInt(v interface{}) (int, bool) { switch val := v.(type) { case float64: return int(val), true case int: return val, true case int64: return int(val), true case json.Number: i, err := val.Int64() if err != nil { return 0, false } return int(i), true case string: if i, err := strconv.Atoi(val); err == nil { return i, true } return 0, false default: return 0, false } } // buildPrefetchParams 构建预取接口的请求参数 func buildPrefetchParams(iface *entity.ApiInterface) map[string]interface{} { params := make(map[string]interface{}) if iface.RequestConfig != nil { pageParam := "page" psParam := "page_size" if p, ok := iface.RequestConfig["page_param"].(string); ok { pageParam = p } if p, ok := iface.RequestConfig["page_size_param"].(string); ok { psParam = p } params[pageParam] = 1 params[psParam] = 100 for k, v := range iface.RequestConfig { if k == "headers" || k == "prefetch" || k == "page_param" || k == "page_size_param" || k == "time_field" || k == "parameters_location" || k == "filtering" || k == "group_by" || k == "date_range" || k == "body_wrapper_field" || k == "exclude_from_wrapper" || k == "cursor_pagination" || k == "time_field_mode" || k == "recursive" || k == "max_recursive_depth" || k == "initial_cursor" || k == "pagination_mode" || k == "full_sync_start_time" || k == "row_inject" { continue } if k == pageParam || k == psParam { continue } params[k] = v } } return params } // parsePrefetchConfig 解析预取配置 func parsePrefetchConfig(requestConfig map[string]interface{}) *PrefetchConfig { if requestConfig == nil { return nil } raw, ok := requestConfig["prefetch"] if !ok || raw == nil { return nil } m, ok := raw.(map[string]interface{}) if !ok { return nil } pc := &PrefetchConfig{} if u, _ := m["url"].(string); u != "" { pc.URL = u } else { return nil } if method, _ := m["method"].(string); method != "" { pc.Method = method } else { pc.Method = "GET" } pc.ResponsePath, _ = m["response_path"].(string) pc.TargetParam, _ = m["target_param"].(string) pc.ValueField, _ = m["value_field"].(string) return pc } // parseRecursiveConfig 解析递归遍历配置 func parseRecursiveConfig(requestConfig map[string]interface{}) *RecursiveConfig { if requestConfig == nil { return nil } raw, ok := requestConfig["recursive"] if !ok || raw == nil { return nil } m, ok := raw.(map[string]interface{}) if !ok { return nil } rc := &RecursiveConfig{} if kf, _ := m["key_field"].(string); kf != "" { rc.KeyField = kf } else { return nil } if tp, _ := m["target_param"].(string); tp != "" { rc.TargetParam = tp } else { return nil } return rc } // extractValues 从 JSON 响应中提取值列表 func extractValues(raw []byte, path, valueField string) ([]interface{}, error) { var resp map[string]interface{} if err := json.Unmarshal(raw, &resp); err != nil { return nil, fmt.Errorf("JSON解析失败: %w", err) } parts := strings.Split(path, ".") current := resp for i, part := range parts { if i == len(parts)-1 { list, ok := current[part].([]interface{}) if !ok { return nil, fmt.Errorf("路径 %s 不是数组", path) } var values []interface{} for _, item := range list { if valueField == "" { values = append(values, item) } else if m, ok := item.(map[string]interface{}); ok { if v, exists := m[valueField]; exists { values = append(values, v) } } } return values, nil } next, ok := current[part].(map[string]interface{}) if !ok { return nil, fmt.Errorf("路径 %s 在 %s 处中断", path, part) } current = next } return nil, fmt.Errorf("路径 %s 不完整", path) } // normalizeJSONNumbers 递归将 json.Number 转换为 Go 原生数值类型 // pgx 驱动扫描 PostgreSQL JSONB 数值字段时可能返回 json.Number(底层是 string), // 直接用 json.Marshal 会将其序列化为带引号的字符串(如 "0" 而非 0), // 导致快手等平台的 param JSON 签名校验失败 func normalizeJSONNumbers(v interface{}) interface{} { switch val := v.(type) { case json.Number: if i, err := val.Int64(); err == nil { return i } if f, err := val.Float64(); err == nil { return f } return val.String() case map[string]interface{}: result := make(map[string]interface{}, len(val)) for mk, mv := range val { result[mk] = normalizeJSONNumbers(mv) } return result case []interface{}: result := make([]interface{}, len(val)) for i, item := range val { result[i] = normalizeJSONNumbers(item) } return result default: return v } } // buildReqBody 构建请求参数 func buildReqBody(ctx context.Context, iface *entity.ApiInterface, page, pageSize int, lastSyncTime int64, extraParams map[string]interface{}) map[string]interface{} { body := make(map[string]interface{}) if iface.RequestConfig != nil { for k, v := range iface.RequestConfig { if k == "time_field" || k == "headers" || k == "prefetch" || k == "page_param" || k == "page_size_param" || k == "parameters_location" || k == "cursor_pagination" || k == "time_field_mode" || k == "body_wrapper_field" || k == "exclude_from_wrapper" || k == "top_level_params" || k == "recursive" || k == "max_recursive_depth" || k == "initial_cursor" || k == "pagination_mode" || k == "full_sync_start_time" || k == "row_inject" { continue } body[k] = v } } pageParam := "page" psParam := "page_size" if iface.RequestConfig != nil { if p, ok := iface.RequestConfig["page_param"].(string); ok { pageParam = p } if p, ok := iface.RequestConfig["page_size_param"].(string); ok { psParam = p } } // 偏移量分页(如钉钉 offset):offset = (page-1) * pageSize paginationMode := "" if iface.RequestConfig != nil { if pm, ok := iface.RequestConfig["pagination_mode"].(string); ok { paginationMode = pm } } if paginationMode == "offset" { body[pageParam] = (page - 1) * pageSize } else { body[pageParam] = page } body[psParam] = pageSize // 时间过滤处理:支持两种模式 // 1. "filtering" 模式(默认):生成 filtering=[{"field":"...","operator":"GREATER_EQUALS","values":["..."]}](腾讯) // 2. "range" 模式:生成 beginTime/endTime + queryType(快手) if iface.RequestConfig != nil { if tf, ok := iface.RequestConfig["time_field"].(string); ok && tf != "" { timeMode := "filtering" if tm, ok := iface.RequestConfig["time_field_mode"].(string); ok && tm != "" { timeMode = tm } if timeMode == "range" { // 快手模式:beginTime/endTime(毫秒时间戳) timeMs := lastSyncTime if timeMs <= 0 { // 全量:优先使用配置的 full_sync_start_time,否则默认90天前 if fst, ok := toFloat64(iface.RequestConfig["full_sync_start_time"]); ok && fst > 0 { timeMs = int64(fst) } else { timeMs = time.Now().Add(-time.Duration(GetDefaultLookbackDays(ctx)) * 24 * time.Hour).UnixMilli() } } // 仅在配置未指定 queryType 时设默认值,尊重配置 if _, exists := body["queryType"]; !exists { body["queryType"] = 2 } body["beginTime"] = timeMs endTime := time.Now().UnixMilli() maxRangeMs := int64(3 * 24 * 3600 * 1000) if endTime-timeMs > maxRangeMs { endTime = timeMs + maxRangeMs } body["endTime"] = endTime } else if lastSyncTime > 0 { // 腾讯 filtering 模式(仅增量时) timeFilter := map[string]interface{}{ "field": tf, "operator": "GREATER_EQUALS", "values": []interface{}{fmt.Sprintf("%d", lastSyncTime)}, } if existing, ok := body["filtering"].([]interface{}); ok { body["filtering"] = append(existing, timeFilter) } else { body["filtering"] = []interface{}{timeFilter} } } else if fst, ok := toFloat64(iface.RequestConfig["full_sync_start_time"]); ok && fst > 0 { // 全量 filtering 模式:指定了 full_sync_start_time,从该时间戳开始拉取 timeFilter := map[string]interface{}{ "field": tf, "operator": "GREATER_EQUALS", "values": []interface{}{fmt.Sprintf("%d", int64(fst))}, } if existing, ok := body["filtering"].([]interface{}); ok { body["filtering"] = append(existing, timeFilter) } else { body["filtering"] = []interface{}{timeFilter} } } } } for k, v := range extraParams { body[k] = v } // body_wrapper_field 支持:将业务参数包装到指定字段(如快手 API 的 param JSON) if iface.RequestConfig != nil { if wf, ok := iface.RequestConfig["body_wrapper_field"].(string); ok && wf != "" { excludeSet := map[string]bool{"method": true, "version": true, "signMethod": true} if excl, ok := iface.RequestConfig["exclude_from_wrapper"].([]interface{}); ok { for _, v := range excl { if s, ok := v.(string); ok { excludeSet[s] = true } } } wrapperObj := make(map[string]interface{}) for k, v := range body { if !excludeSet[k] && k != wf { wrapperObj[k] = v delete(body, k) } } // 规范化 json.Number → Go 原生数值类型,避免 json.Marshal 将其序列化为字符串 // (pgx 驱动扫描 JSONB 数值时可能返回 json.Number,其底层是 string 类型) normalized := normalizeJSONNumbers(wrapperObj) b, err := json.Marshal(normalized) logrus.Infof("body_wrapper param JSON (normalized): %s", string(b)) if err != nil { logrus.Errorf("JSON序列化 wrapper 失败: %v", err) } else { body[wf] = string(b) } } } return body } // parseRespExt 解析响应,支持自定义成功判断和数据路径 func parseRespExt(raw []byte, rc map[string]interface{}) ([]map[string]interface{}, int, int64, string, error) { var respMap map[string]interface{} if err := json.Unmarshal(raw, &respMap); err != nil { return nil, 0, 0, "", fmt.Errorf("JSON解析失败: %w", err) } successField, successVal := "code", float64(0) msgField, listPath, cursorPath := "message", "data", "" hasMorePath := "" singleRecord := false if rc != nil { if sf, _ := rc["success_field"].(string); sf != "" { successField = sf } if sv, ok := rc["success_value"]; ok { if f, ok := toFloat64(sv); ok { successVal = f } } if mf, _ := rc["message_field"].(string); mf != "" { msgField = mf } if lp, _ := rc["list_path"].(string); lp != "" { listPath = lp } if cf, _ := rc["cursor_field"].(string); cf != "" { cursorPath = cf } if sr, _ := rc["single_record"].(bool); sr { singleRecord = true } if hm, _ := rc["has_more_field"].(string); hm != "" { hasMorePath = hm } } if v, ok := respMap[successField]; ok { actual, _ := toFloat64(v) if actual != successVal { msg, _ := respMap[msgField].(string) // 如果配置的消息字段为空,尝试通用的错误信息字段 if msg == "" { for _, altField := range []string{"error_msg", "sub_msg", "msg"} { if altMsg, _ := respMap[altField].(string); altMsg != "" { msg = altMsg break } } } respJSON, _ := json.Marshal(respMap) logrus.Errorf("API响应校验失败: success_field=%s, expected=%v, actual=%v, message=%s, 完整响应: %s", successField, successVal, actual, msg, string(respJSON)) return nil, 0, 0, "", fmt.Errorf("API错误: %s=%v, %s=%s", successField, v, msgField, msg) } } // 解析 list_path,支持最后一段是数组的情况(如 data.orderList) var listData []interface{} var dataContainer map[string]interface{} if listPath != "" { parts := strings.Split(listPath, ".") cur := respMap for i, p := range parts { if i == len(parts)-1 { // 最后一段:可能直接是数组,也可能是包含 list/orderList 的 map listData, _ = cur[p].([]interface{}) if listData == nil { if m, ok := cur[p].(map[string]interface{}); ok { dataContainer = m if l, ok := m["list"]; ok { listData, _ = l.([]interface{}) } if listData == nil { if ol, ok := m["orderList"]; ok { listData, _ = ol.([]interface{}) } } } } else { dataContainer = cur } } else { next, ok := cur[p].(map[string]interface{}) if !ok { return nil, 0, 0, "", fmt.Errorf("路径 %s 在 %s 处中断", listPath, p) } cur = next } } } if listData == nil { if singleRecord && listPath != "" { // 详情接口:list_path 指向单个对象,包装为单元素数组 parts := strings.Split(listPath, ".") cur := respMap ok := true for _, p := range parts { if m, exists := cur[p].(map[string]interface{}); exists { cur = m } else { ok = false break } } if ok { listData = []interface{}{cur} dataContainer = cur } } } if listData == nil { // 回退到根级字段 listData, _ = respMap["list"].([]interface{}) if listData == nil { listData, _ = respMap["orderList"].([]interface{}) } dataContainer = respMap } var rows []map[string]interface{} totalPages, maxTime := 1, int64(0) for _, item := range listData { if m, ok := item.(map[string]interface{}); ok { // 展平嵌套 map:将子 map 的字段合并到顶层(如 orderBaseInfo.oid → oid) flat := flattenRow(m) j, _ := json.Marshal(m) flat["raw_data"] = string(j) for _, tf := range []string{"last_modified_time", "created_time", "update_time", "createTime", "updateTime", "lastModifiedTime"} { if t, ok := toFloat64(flat[tf]); ok && int64(t) > maxTime { maxTime = int64(t) } } rows = append(rows, flat) } } nextCursor := "" if cursorPath != "" { cp := strings.Split(cursorPath, ".") cc := respMap for i, p := range cp { if i == len(cp)-1 { if s, ok := cc[p].(string); ok { nextCursor = s } else if f, ok := toFloat64(cc[p]); ok { // 数字游标(如钉钉 next_cursor=10) nextCursor = fmt.Sprintf("%.0f", f) } } else if m, ok := cc[p].(map[string]interface{}); ok { cc = m } } } // has_more 字段支持:false 时标记游标结束 if hasMorePath != "" { parts := strings.Split(hasMorePath, ".") cc := respMap for i, p := range parts { if i == len(parts)-1 { if b, ok := cc[p].(bool); ok && !b { nextCursor = "nomore" } } else if m, ok := cc[p].(map[string]interface{}); ok { cc = m } } } if pi, ok := dataContainer["page_info"].(map[string]interface{}); ok { if tp, ok := toInt(pi["total_page"]); ok { totalPages = tp } } return rows, totalPages, maxTime, nextCursor, nil } // flattenRow 展平嵌套 map:将子 map 的字段递归合并到顶层 // 数组类型的字段保持原样不展平 func flattenRow(m map[string]interface{}) map[string]interface{} { result := make(map[string]interface{}, len(m)) for k, v := range m { if sub, ok := v.(map[string]interface{}); ok { // 子 map 递归展平后合并到顶层 for sk, sv := range flattenRow(sub) { result[sk] = sv } } else { result[k] = v } } return result } // parseResp 兼容旧版,保持4个返回值 func parseResp(raw []byte, responseConfig map[string]interface{}) ([]map[string]interface{}, int, int64, error) { rows, tp, mt, _, err := parseRespExt(raw, responseConfig) return rows, tp, mt, err } func savePage(ctx context.Context, td *TableDefinition, rows []map[string]interface{}) (int, error) { if len(rows) == 0 { return 0, nil } colSet := make(map[string]bool) for _, c := range td.Columns { colSet[c.Name] = true } var clean []map[string]interface{} for _, row := range rows { c := make(map[string]interface{}) for k, v := range row { if colSet[k] { c[k] = v } } if r, ok := row["raw_data"]; ok { c["raw_data"] = r } clean = append(clean, c) } return InsertRows(ctx, td.TableName, td.ConflictKeys, clean) } func getLastSyncTime(ctx context.Context, platformCode, interfaceCode string) int64 { var t int64 gfdb.DB(ctx).Model(ctx, consts.SyncTrackerTable). Fields("last_sync_time"). Where("platform_code", platformCode). Where("interface_code", interfaceCode). Scan(&t) return t } func getSyncStatus(ctx context.Context, platformCode, interfaceCode string) string { var s string gfdb.DB(ctx).Model(ctx, consts.SyncTrackerTable). Fields("sync_status"). Where("platform_code", platformCode). Where("interface_code", interfaceCode). Scan(&s) return s } func markSyncRunning(ctx context.Context, platformCode, interfaceCode string, lastSyncTime int64) { gfdb.DB(ctx).Model(ctx, consts.SyncTrackerTable). Data(map[string]interface{}{ "platform_code": platformCode, "interface_code": interfaceCode, "last_sync_time": lastSyncTime, "sync_status": "running", }). OnConflict("platform_code", "interface_code"). Save() } func updateSyncTime(ctx context.Context, platformCode, interfaceCode string, t int64) { gfdb.DB(ctx).Model(ctx, consts.SyncTrackerTable). Data(map[string]interface{}{ "platform_code": platformCode, "interface_code": interfaceCode, "last_sync_time": t, "last_sync_at": time.Now(), "sync_status": "success", }). OnConflict("platform_code", "interface_code"). Save() } func recordFailure(ctx context.Context, platformCode, interfaceCode, taskType, errMsg string) { dao.SyncTaskLog.Create(ctx, &taskDto.CreateSyncTaskLogReq{ TaskID: fmt.Sprintf("%s_%s_%d", platformCode, interfaceCode, time.Now().UnixNano()), TaskType: taskType, PlatformCode: platformCode, InterfaceCode: interfaceCode, Status: "failed", MaxRetry: 3, StartTime: time.Now(), RequestParams: map[string]interface{}{ "error": errMsg, }, }) } // findInterfaceByURL 在所有接口中查找匹配 URL 的接口 func findInterfaceByURL(ifaces []entity.ApiInterface, url string) *entity.ApiInterface { for i := range ifaces { if ifaces[i].Url == url { return &ifaces[i] } } return nil } // injectRowFields 将请求参数中 row_inject 指定的字段注入到响应行中 // 用于需要将请求参数(如 statisticsMonth)持久化到表中,但响应不含该字段的场景 func injectRowFields(rows []map[string]interface{}, body map[string]interface{}, requestConfig map[string]interface{}) { if requestConfig == nil || body == nil { return } rawInject, ok := requestConfig["row_inject"] if !ok { return } injectList, ok := rawInject.([]interface{}) if !ok { return } for _, item := range injectList { fieldName, ok := item.(string) if !ok { continue } val, exists := body[fieldName] if !exists { continue } for i := range rows { rows[i][fieldName] = val } } }