Files
data-engine/service/sync/table_manager.go
2026-06-11 13:06:54 +08:00

115 lines
3.1 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package sync
import (
"context"
"fmt"
"strings"
"gitea.redpowerfuture.com/red-future/common/db/gfdb"
"github.com/sirupsen/logrus"
)
// ColumnDef 列定义
type ColumnDef struct {
Name string `json:"name"`
Type string `json:"type"`
Comment string `json:"comment,omitempty"`
}
// TableDefinition 表结构定义
type TableDefinition struct {
TableName string `json:"table_name"`
Columns []ColumnDef `json:"columns"`
ConflictKeys []string `json:"conflict_keys,omitempty"`
}
// ParseTableDefinition 解析 table_definition JSON
func ParseTableDefinition(raw map[string]interface{}) (*TableDefinition, error) {
td := &TableDefinition{}
name, _ := raw["table_name"].(string)
if name == "" {
return nil, fmt.Errorf("table_definition 缺少 table_name")
}
td.TableName = name
colsRaw, _ := raw["columns"].([]interface{})
for _, c := range colsRaw {
cm, _ := c.(map[string]interface{})
if cm == nil {
continue
}
n, _ := cm["name"].(string)
t, _ := cm["type"].(string)
comment, _ := cm["comment"].(string)
if n == "" || t == "" {
continue
}
td.Columns = append(td.Columns, ColumnDef{Name: n, Type: t, Comment: comment})
}
if keys, _ := raw["conflict_keys"].([]interface{}); keys != nil {
for _, k := range keys {
if s, ok := k.(string); ok {
td.ConflictKeys = append(td.ConflictKeys, s)
}
}
}
if len(td.Columns) == 0 {
return nil, fmt.Errorf("table_definition 列定义为空")
}
return td, nil
}
// EnsureTable 确保表存在
func EnsureTable(ctx context.Context, td *TableDefinition) error {
sql := buildCreateSQL(td)
logrus.Infof("建表: %s", td.TableName)
_, err := gfdb.DB(ctx).Exec(ctx, sql)
if err != nil {
return fmt.Errorf("建表失败 [%s]: %w", td.TableName, err)
}
logrus.Infof("表 %s 已就绪", td.TableName)
return nil
}
func buildCreateSQL(td *TableDefinition) string {
cols := []string{
"id BIGSERIAL PRIMARY KEY",
"tenant_id BIGINT NOT NULL DEFAULT 0",
"creator VARCHAR(64) DEFAULT ''",
"created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()",
"updater VARCHAR(64) DEFAULT ''",
"updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()",
"deleted_at TIMESTAMP WITH TIME ZONE",
}
for _, c := range td.Columns {
cols = append(cols, fmt.Sprintf("%s %s", c.Name, c.Type))
}
cols = append(cols, "raw_data JSONB DEFAULT '{}'")
// 添加复合唯一索引(用于 ON CONFLICT upsert
var constraints []string
if len(td.ConflictKeys) > 0 {
ck := strings.Join(td.ConflictKeys, ", ")
indexName := fmt.Sprintf("uq_%s_conflict", td.TableName)
constraints = append(constraints,
fmt.Sprintf("CREATE UNIQUE INDEX IF NOT EXISTS %s ON %s (%s)", indexName, td.TableName, ck))
}
sql := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (\n %s\n);\n", td.TableName, strings.Join(cols, ",\n "))
// 添加唯一索引
if len(constraints) > 0 {
sql += strings.Join(constraints, ";\n") + ";\n"
}
// 添加字段注释COMMENT ON COLUMN
for _, c := range td.Columns {
if c.Comment != "" {
escaped := strings.ReplaceAll(c.Comment, "'", "''")
sql += fmt.Sprintf("COMMENT ON COLUMN %s.%s IS '%s';\n", td.TableName, c.Name, escaped)
}
}
return sql
}