Files
data-engine/service/sync/data_writer.go
2026-06-11 13:06:54 +08:00

67 lines
1.5 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package sync
import (
"context"
"time"
"gitea.redpowerfuture.com/red-future/common/db/gfdb"
"github.com/sirupsen/logrus"
)
// InsertRows 批量写入数据
func InsertRows(ctx context.Context, tableName string, conflictKeys []string, rows []map[string]interface{}) (int, error) {
if len(rows) == 0 {
return 0, nil
}
now := time.Now()
for i := range rows {
if rows[i] == nil {
rows[i] = make(map[string]interface{})
}
// 始终覆盖 updated_at不设置 created_at 让数据库维护首次值upsert 时不会覆盖)
rows[i]["updated_at"] = now
}
batchSize := 100
total := 0
for i := 0; i < len(rows); i += batchSize {
end := i + batchSize
if end > len(rows) {
end = len(rows)
}
batch := rows[i:end]
m := gfdb.DB(ctx).Model(ctx, tableName).Data(batch)
if len(conflictKeys) > 0 {
keys := make([]interface{}, len(conflictKeys))
for j, k := range conflictKeys {
keys[j] = k
}
m = m.OnConflict(keys...)
}
_, err := m.Save()
if err != nil {
logrus.Errorf("批量写入 %s 失败: %v", tableName, err)
for _, row := range batch {
mm := gfdb.DB(ctx).Model(ctx, tableName).Data(row)
if len(conflictKeys) > 0 {
keys := make([]interface{}, len(conflictKeys))
for j, k := range conflictKeys {
keys[j] = k
}
mm = mm.OnConflict(keys...)
}
if _, e := mm.Save(); e != nil {
logrus.Errorf("逐条写入失败: %v", e)
} else {
total++
}
}
} else {
total += len(batch)
}
}
return total, nil
}