diff --git a/db/gfdb/gfdb.go b/db/gfdb/gfdb.go index d3c0aaa..ea5d844 100644 --- a/db/gfdb/gfdb.go +++ b/db/gfdb/gfdb.go @@ -373,6 +373,7 @@ var ( ) type Gfdb interface { + Exec(ctx context.Context, sql string, args ...any) (sql.Result, error) Model(ctx context.Context, tableNameOrStruct ...any) *model Transaction(ctx context.Context, f func(ctx context.Context, tx gdb.TX) error) error } @@ -389,17 +390,45 @@ type dataBase struct { gdb.DB } -func DB(ctx context.Context) Gfdb { - var dbName []string +func GetTablePrefix(ctx context.Context) (prefix string, err error) { + tenantId, config, err := checkSchemaConfig(ctx) + if err != nil { + glog.Errorf(ctx, "[DB] checkSchemaConfig error: %v", err) + return + } + if config { + sprintf := fmt.Sprintf("database.%s%v.0.prefix", schemaPrefix, tenantId) + prefix = g.Cfg().MustGet(ctx, sprintf).String() + return + } + prefix = g.Cfg().MustGet(ctx, "database.default.0.prefix").String() + return +} + +func checkSchemaConfig(ctx context.Context) (uint64, bool, error) { user, err := utils.GetUserInfo(ctx) if err != nil { glog.Errorf(ctx, "[DB] GetUserInfo error: %v", err) - return nil + return 0, false, err } - var schema = fmt.Sprintf("%s%v", schemaPrefix, user.TenantId) sprintf := fmt.Sprintf("database.%s", schema) if !g.Cfg().MustGet(ctx, sprintf).IsEmpty() { + return user.TenantId, true, nil + } + return user.TenantId, false, nil +} + +func DB(ctx context.Context) Gfdb { + tenantId, config, err := checkSchemaConfig(ctx) + if err != nil { + glog.Errorf(ctx, "[DB] checkSchemaConfig error: %v", err) + return nil + } + var schema = fmt.Sprintf("%s%v", schemaPrefix, tenantId) + + var dbName []string + if config { dbName = append(dbName, schema) } else { dbName = append(dbName, "default") @@ -418,26 +447,24 @@ func DB(ctx context.Context) Gfdb { } func (d *dataBase) Model(ctx context.Context, tableNameOrStruct ...any) *model { - user, err := utils.GetUserInfo(ctx) - if err != nil { - glog.Errorf(ctx, "[DB] GetUserInfo error: %v", err) - return nil - } m := d.DB.Model(tableNameOrStruct...).Ctx(ctx) - var schema = fmt.Sprintf("%s%v", schemaPrefix, user.TenantId) - sprintf := fmt.Sprintf("database.%s", schema) - if !g.Cfg().MustGet(ctx, sprintf).IsEmpty() { + tenantId, config, err := checkSchemaConfig(ctx) + if err != nil { + glog.Errorf(ctx, "[DB] checkSchemaConfig error: %v", err) + return nil + } + if config { // 创建按地区分库的配置 shardingConfig := gdb.ShardingConfig{ Schema: gdb.ShardingSchemaConfig{ - Enable: true, // 启用分库 - Prefix: schemaPrefix, // 分库前缀 - Rule: &RegionShardingRule{RegionMapping: user.TenantId}, // 自定义分库规则 + Enable: true, // 启用分库 + Prefix: schemaPrefix, // 分库前缀 + Rule: &RegionShardingRule{RegionMapping: tenantId}, // 自定义分库规则 }, } - m.Sharding(shardingConfig).ShardingValue(user.TenantId) + m.Sharding(shardingConfig).ShardingValue(tenantId) } m.OmitNil().Hook(catchSQLHook()) diff --git a/go.mod b/go.mod index 786103d..c1b66a2 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,10 @@ require ( github.com/cloudwego/eino-ext/components/document/parser/xlsx v0.0.0-20260323112355-f061db7e8419 github.com/cloudwego/eino-ext/components/document/transformer/splitter/recursive v0.0.0-20260323112355-f061db7e8419 github.com/cloudwego/eino-ext/components/document/transformer/splitter/semantic v0.0.0-20260323112355-f061db7e8419 + github.com/cloudwego/eino-ext/components/embedding/ark v0.1.1 github.com/cloudwego/eino-ext/components/embedding/dashscope v0.0.0-20260323112355-f061db7e8419 + github.com/cloudwego/eino-ext/components/embedding/openai v0.0.0-20260323112355-f061db7e8419 + github.com/go-ego/gse v1.0.2 github.com/gogf/gf/contrib/registry/consul/v2 v2.9.5 github.com/gogf/gf/contrib/trace/otlphttp/v2 v2.9.5 github.com/gogf/gf/v2 v2.9.5 @@ -55,7 +58,7 @@ require ( github.com/clbanning/mxj/v2 v2.7.0 // indirect github.com/cloudwego/base64x v0.1.6 // indirect github.com/cloudwego/eino-ext/components/document/parser/html v0.0.0-20241224063832-9fbcc0e56c28 // indirect - github.com/cloudwego/eino-ext/libs/acl/openai v0.1.2 // indirect + github.com/cloudwego/eino-ext/libs/acl/openai v0.1.14 // indirect github.com/dgraph-io/badger/v4 v4.2.0 // indirect github.com/dgraph-io/ristretto v0.1.1 // indirect github.com/dgryski/go-jump v0.0.0-20211018200510-ba001c3ffce0 // indirect @@ -100,6 +103,7 @@ require ( github.com/hashicorp/go-rootcerts v1.0.2 // indirect github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/hashicorp/serf v0.10.1 // indirect + github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/juju/ratelimit v1.0.2 // indirect @@ -115,7 +119,7 @@ require ( github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.16 // indirect - github.com/meguminnnnnnnnn/go-openai v0.1.0 // indirect + github.com/meguminnnnnnnnn/go-openai v0.1.1 // indirect github.com/microcosm-cc/bluemonday v1.0.27 // indirect github.com/miekg/dns v1.1.63 // indirect github.com/minio/crc64nvme v1.1.0 // indirect @@ -163,8 +167,11 @@ require ( github.com/tklauser/numcpus v0.2.2 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/valyala/fastrand v1.1.0 // indirect + github.com/vcaesar/cedar v0.30.0 // indirect github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect + github.com/volcengine/volc-sdk-golang v1.0.23 // indirect + github.com/volcengine/volcengine-go-sdk v1.0.181 // indirect github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect diff --git a/go.sum b/go.sum index ffd4195..9a70e50 100644 --- a/go.sum +++ b/go.sum @@ -41,6 +41,7 @@ github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+ github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6lCRdSC2Tm3DSWRPvIPr6xNKyeHdqDQSQT+A= +github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY= github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU= github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g= @@ -67,8 +68,8 @@ github.com/bwmarrin/snowflake v0.3.0 h1:xm67bEhkKh6ij1790JB83OujPR5CzNe8QuQqAgIS github.com/bwmarrin/snowflake v0.3.0/go.mod h1:NdZxfVWX+oR6y2K0o6qAYv6gIOP9rjG0/E9WsDpxqwE= github.com/bytedance/gopkg v0.1.3 h1:TPBSwH8RsouGCBcMBktLt1AymVo2TVsBVCY4b6TnZ/M= github.com/bytedance/gopkg v0.1.3/go.mod h1:576VvJ+eJgyCzdjS+c4+77QF3p7ubbtiKARP3TxducM= -github.com/bytedance/mockey v1.2.14 h1:KZaFgPdiUwW+jOWFieo3Lr7INM1P+6adO3hxZhDswY8= -github.com/bytedance/mockey v1.2.14/go.mod h1:1BPHF9sol5R1ud/+0VEHGQq/+i2lN+GTsr3O2Q9IENY= +github.com/bytedance/mockey v1.3.0 h1:ONLRdvhqmCfr9rTasUB8ZKCfvbdD2tohOg4u+4Q/ed0= +github.com/bytedance/mockey v1.3.0/go.mod h1:1BPHF9sol5R1ud/+0VEHGQq/+i2lN+GTsr3O2Q9IENY= github.com/bytedance/sonic v1.14.1 h1:FBMC0zVz5XUmE4z9wF4Jey0An5FueFvOsTKKKtwIl7w= github.com/bytedance/sonic v1.14.1/go.mod h1:gi6uhQLMbTdeP0muCnrjHLeCUPyb70ujhnNlhOylAFc= github.com/bytedance/sonic/loader v0.3.0 h1:dskwH8edlzNMctoruo8FPTJDF3vLtDT0sXZwvZJyqeA= @@ -109,10 +110,14 @@ github.com/cloudwego/eino-ext/components/document/transformer/splitter/recursive github.com/cloudwego/eino-ext/components/document/transformer/splitter/recursive v0.0.0-20260323112355-f061db7e8419/go.mod h1:9R0RQrQSpg1JaNnRtw7+RfRAAv0HgdE348YnrlZ6coo= github.com/cloudwego/eino-ext/components/document/transformer/splitter/semantic v0.0.0-20260323112355-f061db7e8419 h1:XsvQmwMKMD/w/YIPK0Y9pfBv0UH6rJ5ZiedUuZiA9Vo= github.com/cloudwego/eino-ext/components/document/transformer/splitter/semantic v0.0.0-20260323112355-f061db7e8419/go.mod h1:Ov33JMUewdOoUgJbYNJt3qL7KQDVHYpoVBCjJXsz8sw= +github.com/cloudwego/eino-ext/components/embedding/ark v0.1.1 h1:PM/+XAvJtrBqFlBY15ws0pb0+92XKHQv0ei3M7PIJcQ= +github.com/cloudwego/eino-ext/components/embedding/ark v0.1.1/go.mod h1:6O6x0fHfM3uCLr3lX1DnB/my7fC3WRUA5hpkCkrkZrg= github.com/cloudwego/eino-ext/components/embedding/dashscope v0.0.0-20260323112355-f061db7e8419 h1:gGnohcgEaHqp5V826Ay0H3fi4TpK8ReWlUPePAnzvA4= github.com/cloudwego/eino-ext/components/embedding/dashscope v0.0.0-20260323112355-f061db7e8419/go.mod h1:ekJmA+GLD9vJyZNeODZDBFMiJ92Suy6nF0OY42X3sao= -github.com/cloudwego/eino-ext/libs/acl/openai v0.1.2 h1:r9Id2wzJ05PoHl+Km7jQgNMgciaZI93TVnUYso89esM= -github.com/cloudwego/eino-ext/libs/acl/openai v0.1.2/go.mod h1:S4OkvglPY9hsm9tXeShODrf/WN1Cgu4bqu4nn/CnIic= +github.com/cloudwego/eino-ext/components/embedding/openai v0.0.0-20260323112355-f061db7e8419 h1:eM29lyMShtFZNoAhE5g96+zHg9PBLckRyd2HtVeeY4E= +github.com/cloudwego/eino-ext/components/embedding/openai v0.0.0-20260323112355-f061db7e8419/go.mod h1:SajSFFRIXJXIbxadAAlSUIS5KTY8R/jzJg9RNSOXCCI= +github.com/cloudwego/eino-ext/libs/acl/openai v0.1.14 h1:yOZII6VYaL00CVZYba+HUixFygsW0Xz/1QjQ5htj1Ls= +github.com/cloudwego/eino-ext/libs/acl/openai v0.1.14/go.mod h1:1xMQZ8eE11pkEoTAEy8UlaAY817qGVMvjpDPGSIO3Ns= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= @@ -182,6 +187,8 @@ github.com/getsentry/raven-go v0.2.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49P github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-check/check v0.0.0-20180628173108-788fd7840127 h1:0gkP6mzaMqkmpcJYCFOLkIBwI7xFExG03bbkOkCvUPI= github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98= +github.com/go-ego/gse v1.0.2 h1:+27lYFPhQEhA9igtdOsJPRKYL/k3TwYsxBF5jr6KFv4= +github.com/go-ego/gse v1.0.2/go.mod h1:Fy35G+q7VV7Et1zIKO8o/sW1kkugV3znXap/lF/11zc= github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A= github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= @@ -272,6 +279,7 @@ github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/goph/emperror v0.17.2 h1:yLapQcmEsO0ipe9p5TaN22djm3OFV/TfM/fcYP0/J18= @@ -365,6 +373,10 @@ github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANyt github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= @@ -439,8 +451,8 @@ github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzp github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc= github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= -github.com/meguminnnnnnnnn/go-openai v0.1.0 h1:BGzB1PlS2Epq0mBB2TGLwzMihbR7BANrlMH3w4ZnY88= -github.com/meguminnnnnnnnn/go-openai v0.1.0/go.mod h1:qs96ysDmxhE4BZoU45I43zcyfnaYxU3X+aRzLko/htY= +github.com/meguminnnnnnnnn/go-openai v0.1.1 h1:u/IMMgrj/d617Dh/8BKAwlcstD74ynOJzCtVl+y8xAs= +github.com/meguminnnnnnnnn/go-openai v0.1.1/go.mod h1:qs96ysDmxhE4BZoU45I43zcyfnaYxU3X+aRzLko/htY= github.com/meilisearch/meilisearch-go v0.36.1 h1:mJTCJE5g7tRvaqKco6DfqOuJEjX+rRltDEnkEC02Y0M= github.com/meilisearch/meilisearch-go v0.36.1/go.mod h1:hWcR0MuWLSzHfbz9GGzIr3s9rnXLm1jqkmHkJPbUSvM= github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b h1:j7+1HpAFS1zy5+Q4qx1fWh90gTKwiN4QCGoY9TWyyO4= @@ -668,6 +680,7 @@ github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -698,10 +711,18 @@ github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijb github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8= github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ= +github.com/vcaesar/cedar v0.30.0 h1:9fSDpM7FTjjUdPiBUUa0MWYMRGSEcqgFXvppZcZ4d7Y= +github.com/vcaesar/cedar v0.30.0/go.mod h1:lyuGvALuZZDPNXwpzv/9LyxW+8Y6faN7zauFezNsnik= +github.com/vcaesar/tt v0.20.1 h1:D/jUeeVCNbq3ad8M7hhtB3J9x5RZ6I1n1eZ0BJp7M+4= +github.com/vcaesar/tt v0.20.1/go.mod h1:cH2+AwGAJm19Wa6xvEa+0r+sXDJBT0QgNQey6mwqLeU= github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= +github.com/volcengine/volc-sdk-golang v1.0.23 h1:anOslb2Qp6ywnsbyq9jqR0ljuO63kg9PY+4OehIk5R8= +github.com/volcengine/volc-sdk-golang v1.0.23/go.mod h1:AfG/PZRUkHJ9inETvbjNifTDgut25Wbkm2QoYBTbvyU= +github.com/volcengine/volcengine-go-sdk v1.0.181 h1:/3PB4M1N4fjMqiSKTJwX43EZ5Nn1HUOtQrSCk+22+wI= +github.com/volcengine/volcengine-go-sdk v1.0.181/go.mod h1:gfEDc1s7SYaGoY+WH2dRrS3qiuDJMkwqyfXWCa7+7oA= github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc= github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw= github.com/x-cray/logrus-prefixed-formatter v0.5.2 h1:00txxvfBM9muc0jiLIEAkAcIMJzfthRT6usrui8uGmg= diff --git a/middleware/circuit_breaker.go b/middleware/circuit_breaker.go index e60bd45..c1b8618 100644 --- a/middleware/circuit_breaker.go +++ b/middleware/circuit_breaker.go @@ -1081,7 +1081,7 @@ func syncCircuitBreakerStateToDistributed(ctx context.Context, resourceName, sta } // 使用common/redis中的Lock方法获取分布式锁 - success, err := redis.Lock(ctx, lockKey, lockTimeout, func(ctx context.Context) error { + success, err := utils.Lock(ctx, lockKey, lockTimeout, func(ctx context.Context) error { // 设置熔断器状态 _, err := redisClient.Do(ctx, "SETEX", stateKey, ttl, state) if err != nil { @@ -1335,7 +1335,7 @@ func resetSingleResource(r *ghttp.Request, resourceName string) error { if redisClient != nil { lockKey := "circuit_breaker:" + resourceName + ":lock" // 使用较短的锁超时时间 - success, err := redis.Lock(r.GetCtx(), lockKey, int64(3), func(ctx context.Context) error { + success, err := utils.Lock(r.GetCtx(), lockKey, int64(3), func(ctx context.Context) error { _, err := redisClient.Del(ctx, "circuit_breaker:"+resourceName+":state") if err != nil { g.Log().Warningf(ctx, "清除分布式熔断状态失败: %s, error: %v", resourceName, err) diff --git a/rag/eino/consts.go b/rag/eino/consts.go new file mode 100644 index 0000000..e766233 --- /dev/null +++ b/rag/eino/consts.go @@ -0,0 +1,8 @@ +package eino + +const ( + providerArk = "ark" + providerOpenai = "openai" + providerQianfan = "qianfan" + providerDashscope = "dashscope" +) diff --git a/rag/eino/document_semantic.go b/rag/eino/document_semantic.go index 1fa53bd..2f39a1a 100644 --- a/rag/eino/document_semantic.go +++ b/rag/eino/document_semantic.go @@ -5,59 +5,60 @@ import ( "github.com/cloudwego/eino-ext/components/document/transformer/splitter/recursive" "github.com/cloudwego/eino-ext/components/document/transformer/splitter/semantic" - "github.com/cloudwego/eino/components/document" "github.com/cloudwego/eino/schema" "github.com/gogf/gf/v2/frame/g" ) -// 全局只初始化一次 -var ( - splitter document.Transformer -) - // SemanticSplitDocument 语义分割文档 func SemanticSplitDocument(ctx context.Context, docs []*schema.Document) (res []*schema.Document, err error) { - if g.IsEmpty(splitter) { - // 默认分隔符(支持中英文) - separators := []string{"\n\n", "\n", "。", "!", "?", ";", ".", "!", "?", ";"} - // 读取配置,使用合理的默认值 - bufferSize := g.Cfg().MustGet(ctx, "eino.splitter.bufferSize").Int() - percentile := g.Cfg().MustGet(ctx, "eino.splitter.percentile").Float64() - batchSize := g.Cfg().MustGet(ctx, "eino.splitter.batchSize").Int() - if batchSize <= 0 { - batchSize = 10 // doubao-embedding-vision 限制每批最多 10 个 - } + // 默认分隔符(支持中英文) + separators := []string{"\n\n", "\n", "。", "!", "?", ";", ".", "!", "?", ";"} + // 读取配置,使用合理的默认值 + bufferSize := g.Cfg().MustGet(ctx, "eino.splitter.bufferSize").Int() + minChunkSize := g.Cfg().MustGet(ctx, "eino.splitter.minChunkSize").Int() + percentile := g.Cfg().MustGet(ctx, "eino.splitter.percentile").Float64() + batchSize := g.Cfg().MustGet(ctx, "eino.splitter.batchSize").Int() + if batchSize <= 0 { + batchSize = 10 // doubao-embedding-vision 限制每批最多 10 个 + } - // 使用批量包装器 - batchEmbedder := NewBatchEmbedder(Embedder, batchSize) + // 使用批量包装器 + var batchEmbedder *BatchEmbedder + provider := g.Cfg().MustGet(ctx, "eino.embedding.provider").String() + switch provider { + case providerArk: + batchEmbedder = NewBatchEmbedder(EmbedderArk, batchSize) + case providerOpenai: + batchEmbedder = NewBatchEmbedder(EmbedderOpenAI, batchSize) + case providerDashscope: + batchEmbedder = NewBatchEmbedder(EmbedderDashscope, batchSize) + } - splitter, err = semantic.NewSplitter(ctx, &semantic.Config{ - Embedding: batchEmbedder, - BufferSize: bufferSize, - Percentile: percentile, - Separators: separators, - }) - if err != nil { - return - } + splitter, err := semantic.NewSplitter(ctx, &semantic.Config{ + Embedding: batchEmbedder, + BufferSize: bufferSize, + MinChunkSize: minChunkSize, + Percentile: percentile, + Separators: separators, + }) + if err != nil { + return } return splitter.Transform(ctx, docs) } // RecursiveSplitDocument 递归分割文档 func RecursiveSplitDocument(ctx context.Context, docs []*schema.Document) (res []*schema.Document, err error) { - if g.IsEmpty(splitter) { - // 默认分隔符(支持中英文) - separators := []string{"\n\n", "\n", "。", "!", "?", ";", ".", "!", "?", ";"} - splitter, err = recursive.NewSplitter(ctx, &recursive.Config{ - ChunkSize: 1500, - OverlapSize: 300, - KeepType: recursive.KeepTypeNone, - Separators: separators, - }) - if err != nil { - return - } + // 默认分隔符(支持中英文) + separators := []string{"\n\n", "\n", "。", "!", "?", ";", ".", "!", "?", ";"} + splitter, err := recursive.NewSplitter(ctx, &recursive.Config{ + ChunkSize: 512, + OverlapSize: 100, + KeepType: recursive.KeepTypeNone, + Separators: separators, + }) + if err != nil { + return } return splitter.Transform(ctx, docs) } diff --git a/rag/eino/embedding.go b/rag/eino/embedding.go index 218eb59..7af67e3 100644 --- a/rag/eino/embedding.go +++ b/rag/eino/embedding.go @@ -2,45 +2,68 @@ package eino import ( "context" + "fmt" + "github.com/cloudwego/eino-ext/components/embedding/ark" "github.com/cloudwego/eino-ext/components/embedding/dashscope" + "github.com/cloudwego/eino-ext/components/embedding/openai" "github.com/gogf/gf/v2/frame/g" "github.com/golang/glog" ) // 全局只初始化一次 var ( - Embedder *dashscope.Embedder // 导出供其他模块使用 + EmbedderArk *ark.Embedder + EmbedderDashscope *dashscope.Embedder + EmbedderOpenAI *openai.Embedder ) -// init:程序启动时自动执行一次 func init() { ctx := context.Background() if !g.Cfg().MustGet(ctx, "eino.embedding").IsEmpty() { var err error - cfg := &dashscope.EmbeddingConfig{ - APIKey: g.Cfg().MustGet(ctx, "eino.embedding.apiKey").String(), - Model: g.Cfg().MustGet(ctx, "eino.embedding.model").String(), + provider := g.Cfg().MustGet(ctx, "eino.embedding.provider").String() + switch provider { + case providerArk: + cfg := &ark.EmbeddingConfig{ + APIKey: g.Cfg().MustGet(ctx, "eino.embedding.apiKey").String(), + Model: g.Cfg().MustGet(ctx, "eino.embedding.model").String(), + } + if apiType := g.Cfg().MustGet(ctx, "eino.embedding.apiType").String(); apiType != "" { + apiTypeVal := ark.APIType(apiType) + cfg.APIType = &apiTypeVal + } + EmbedderArk, err = ark.NewEmbedder(ctx, cfg) + case providerOpenai: + chatModelConfig := &openai.EmbeddingConfig{ + APIKey: g.Cfg().MustGet(ctx, "eino.embedding.apiKey").String(), + Model: g.Cfg().MustGet(ctx, "eino.embedding.model").String(), + } + EmbedderOpenAI, err = openai.NewEmbedder(ctx, chatModelConfig) + case providerDashscope: + cfg := &dashscope.EmbeddingConfig{ + APIKey: g.Cfg().MustGet(ctx, "eino.embedding.apiKey").String(), + Model: g.Cfg().MustGet(ctx, "eino.embedding.model").String(), + } + EmbedderDashscope, err = dashscope.NewEmbedder(ctx, cfg) } - // 检查是否配置了 APIType,支持 "text_api" 和 "multi_modal_api" - //if apiType := g.Cfg().MustGet(ctx, "eino.embedding.apiType").String(); apiType != "" { - // apiTypeVal := dashscope.APIType(apiType) - // cfg.APIType = &apiTypeVal - //} - Embedder, err = dashscope.NewEmbedder(ctx, cfg) if err != nil { - glog.Fatalf("NewEmbedder of ark error: %v", err) + glog.Fatalf("NewEmbedder of %v error: %v", provider, err) } - //embedding, err := embedder.EmbedStrings(ctx, []string{"hello world", "bye bye"}) - //if err != nil { - // log.Printf("embedding error: %v\n", err) - // return - //} - // - //log.Printf("embedding: %v\n", embedding) } + + return } func EmbedStrings(ctx context.Context, texts []string) (embeddings [][]float64, err error) { - return Embedder.EmbedStrings(ctx, texts) + provider := g.Cfg().MustGet(ctx, "eino.embedding.provider").String() + switch provider { + case providerArk: + return EmbedderArk.EmbedStrings(ctx, texts) + case providerOpenai: + return EmbedderOpenAI.EmbedStrings(ctx, texts) + case providerDashscope: + return EmbedderDashscope.EmbedStrings(ctx, texts) + } + return nil, fmt.Errorf("unsupported provider: %v", provider) } diff --git a/rag/gse/utils.go b/rag/gse/utils.go new file mode 100644 index 0000000..aea4b38 --- /dev/null +++ b/rag/gse/utils.go @@ -0,0 +1,114 @@ +package gse + +import ( + "context" + "sort" + + "github.com/go-ego/gse" + "github.com/go-ego/gse/hmm/extracker" + "github.com/go-ego/gse/hmm/segment" + "github.com/gogf/gf/v2/os/glog" +) + +var GseTool *gseTool + +// 初始化函数:程序启动时执行一次 +func init() { + var err error + GseTool, err = newGseTool() + if err != nil { + glog.Error(context.Background(), err) + } +} + +// gseTool 关键词提取工具(gse v1.0.2 标准) +type gseTool struct { + seg gse.Segmenter + tfidf *extracker.TagExtracter + tr *extracker.TextRanker +} + +// newGseTool 初始化工具(内置词典 + 停用词) +func newGseTool() (tool *gseTool, err error) { + // 1. 初始化分词器 + var seg gse.Segmenter + // 内置词典(无外部文件) + err = seg.LoadDictEmbed() + if err != nil { + return + } + // 内置停用词(v1.0.2 标准) + err = seg.LoadStopEmbed() + if err != nil { + return + } + + // 2. 初始化 TF-IDF 提取器 + tfidf := &extracker.TagExtracter{} + tfidf.WithGse(seg) + err = tfidf.LoadIdf() + if err != nil { + return + } + + // 3. 初始化 TextRank 提取器 + tr := &extracker.TextRanker{} + tr.WithGse(seg) + + tool = &gseTool{ + seg: seg, + tfidf: tfidf, + tr: tr, + } + return +} + +// Cut 分词(关键词提取唯一正确模式:精确模式 + HMM) +func (k *gseTool) Cut(text string) []string { + return k.seg.Cut(text, true) +} + +// Keyword 最终输出:关键词 + 权重 +type Keyword struct { + Word string `json:"word"` + Score float64 `json:"score"` +} + +func (k *gseTool) Extract(text string, topN int) []Keyword { + // 1. 提取 TF-IDF + tfTags := k.extractTFIDF(text, topN) + + // 2. 提取 TextRank + trTags := k.extractTextRank(text, topN) + + // 3. 合并成最终关键词(业务最常用) + scoreMap := make(map[string]float64) + for _, tag := range tfTags { + scoreMap[tag.Text] = tag.Weight + } + for _, tag := range trTags { + scoreMap[tag.Text] = tag.Weight + } + + // 转成切片并排序(高分在前) + res := make([]Keyword, 0, len(scoreMap)) + for word, score := range scoreMap { + res = append(res, Keyword{Word: word, Score: score}) + } + + sort.Slice(res, func(i, j int) bool { + return res[i].Score > res[j].Score + }) + + return res +} + +// ExtractTFIDF TF-IDF 关键词(带权重)90% 业务:文章标签、搜索、关键词 +func (k *gseTool) extractTFIDF(text string, topN int) segment.Segments { + return k.tfidf.ExtractTags(text, topN) +} + +// ExtractTextRank TextRank 关键词(带权重)长文本、摘要、语义理解 +func (k *gseTool) extractTextRank(text string, topN int) segment.Segments { + return k.tr.TextRank(text, topN) +} diff --git a/rpc/rpcx.go b/rpc/rpcx.go index 1843f0d..5c7e1c3 100644 --- a/rpc/rpcx.go +++ b/rpc/rpcx.go @@ -1,364 +1,347 @@ package rpc -import ( - "context" - "encoding/json" - "errors" - "strings" - "sync" - "time" - - "gitea.com/red-future/common/consul" - "gitea.com/red-future/common/jaeger" - "github.com/gogf/gf/v2/frame/g" - rpcxClient "github.com/smallnest/rpcx/client" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/trace" -) - -var ( - // pluginsContainer rpcx插件容器(全局统一设置) - // init()中添加链路追踪插件,所有client共用此容器 - pluginsContainer = rpcxClient.NewPluginContainer() - - // clientPool 连接池缓存,key为服务名,value为客户端实例 - clientPool = make(map[string]*rpcxClient.OneClient) - - // poolMutex 连接池锁 - poolMutex sync.RWMutex - - // healthCheckInterval 健康检查间隔(秒) - healthCheckInterval = 30 - - // lastHealthCheckTime 上次健康检查时间,key为服务名 - lastHealthCheckTime = make(map[string]time.Time) - - // serviceAddrCache 服务地址缓存,key为服务名,value为地址 - serviceAddrCache = make(map[string]string) -) - -func init() { - // 全局设置链路追踪插件,所有client共用 - pluginsContainer.Add(&TracingPlugin{}) - - // 启动后台健康检查协程 - go healthCheckLoop() -} - -// healthCheckLoop 后台健康检查循环 -func healthCheckLoop() { - ticker := time.NewTicker(time.Duration(healthCheckInterval) * time.Second) - defer ticker.Stop() - - for range ticker.C { - checkAllConnections() - } -} - -// checkAllConnections 检查所有缓存连接的健康状态 -func checkAllConnections() { - poolMutex.Lock() - defer poolMutex.Unlock() - - now := time.Now() - for serviceName, client := range clientPool { - // 检查连接是否需要健康检查 - if lastCheck, ok := lastHealthCheckTime[serviceName]; ok { - if now.Sub(lastCheck) < time.Duration(healthCheckInterval)*time.Second { - continue - } - } - - ctx := context.Background() - - // 检查连接健康状态(心跳检测) - if !isClientHealthy(ctx, client, serviceName) { - g.Log().Warningf(ctx, "检测到服务[%s]连接不健康,将从连接池移除", serviceName) - client.Close() - delete(clientPool, serviceName) - delete(lastHealthCheckTime, serviceName) - delete(serviceAddrCache, serviceName) - continue - } - - // 连接健康,检查服务地址是否发生变化 - currentAddr, err := consul.GetInstanceAddr(ctx, serviceName) - if err != nil { - g.Log().Warningf(ctx, "健康检查时从consul获取服务[%s]地址失败: %v,保持现有连接", serviceName, err) - lastHealthCheckTime[serviceName] = now - continue - } - - // 检查地址是否发生变化 - if oldAddr, ok := serviceAddrCache[serviceName]; ok && oldAddr != currentAddr { - g.Log().Infof(ctx, "检测到服务[%s]地址变化: %s -> %s,重建连接", serviceName, oldAddr, currentAddr) - // 关闭旧连接并从连接池移除,下次请求时会创建新连接 - client.Close() - delete(clientPool, serviceName) - delete(lastHealthCheckTime, serviceName) - // 更新缓存的新地址 - serviceAddrCache[serviceName] = currentAddr - } else { - // 地址未变化,更新检查时间 - if !ok { - serviceAddrCache[serviceName] = currentAddr - } - g.Log().Debugf(ctx, "服务[%s]地址未变化,连接健康", serviceName) - } - - lastHealthCheckTime[serviceName] = now - } -} - -// isClientHealthy 检查client是否健康 -// 使用心跳检测方式:尝试调用服务的心跳方法 -func isClientHealthy(ctx context.Context, client *rpcxClient.OneClient, serviceName string) bool { - if client == nil { - return false - } - - // 设置较短的超时时间,避免阻塞 - pingCtx, cancel := context.WithTimeout(ctx, 2*time.Second) - defer cancel() - - // 尝试调用健康检查方法 - // 大多数服务都会提供 Ping 或 Health 方法 - // 如果服务没有提供这些方法,会返回错误,我们认为是健康的 - // 因为连接本身是正常的,只是方法不存在 - var reply interface{} - err := client.Call(pingCtx, serviceName, "Ping", nil, &reply) - - // 如果调用成功,连接肯定健康 - if err == nil { - return true - } - - // 如果是方法不存在的错误,说明连接是健康的,只是服务没有Ping方法 - // 这种情况下我们认为是健康的 - if isMethodNotFoundError(err) || isServiceNotFoundError(err) { - return true - } - - // 其他错误(网络错误、超时等)说明连接不健康 - g.Log().Warningf(ctx, "健康检查失败,服务[%s]连接可能不健康: %v", serviceName, err) - return false -} - -// isMethodNotFoundError 判断是否是方法未找到错误 -func isMethodNotFoundError(err error) bool { - if err == nil { - return false - } - errStr := err.Error() - // rpcx 方法不存在的常见错误信息 - return strings.Contains(errStr, "not found") || - strings.Contains(errStr, "no such") || - strings.Contains(errStr, "service not found") || - strings.Contains(errStr, "method not found") -} - -// isServiceNotFoundError 判断是否是服务未找到错误 -func isServiceNotFoundError(err error) bool { - if err == nil { - return false - } - errStr := err.Error() - return strings.Contains(errStr, "no service") || - strings.Contains(errStr, "service not registered") -} - -// getOrCreateClient 从连接池获取或创建客户端(带连接池) -func getOrCreateClient(ctx context.Context, serviceName string) (*rpcxClient.OneClient, error) { - if g.IsEmpty(serviceName) { - return nil, errors.New("服务名称不能为空") - } - - // 先尝试从连接池获取 - poolMutex.RLock() - client, exists := clientPool[serviceName] - poolMutex.RUnlock() - - // 如果存在且健康,直接返回 - if exists && isClientHealthy(ctx, client, serviceName) { - g.Log().Debugf(ctx, "从连接池获取rpcx客户端[%s]", serviceName) - return client, nil - } - - // 不存在或不健康,重新创建 - poolMutex.Lock() - defer poolMutex.Unlock() - - // 双重检查,防止并发时重复创建 - if client, exists := clientPool[serviceName]; exists && isClientHealthy(ctx, client, serviceName) { - return client, nil - } - - // 获取服务实例地址 - addr, err := consul.GetInstanceAddr(ctx, serviceName) - if err != nil { - g.Log().Errorf(ctx, "从consul获取服务[%s]地址失败: %v", serviceName, err) - return nil, err - } - - g.Log().Debugf(ctx, "服务[%s]地址: %s", serviceName, addr) - - // 缓存服务地址,用于健康检查时对比 - serviceAddrCache[serviceName] = addr - - // 创建服务发现 - discovery, err := rpcxClient.NewPeer2PeerDiscovery("tcp@"+addr, "") - if err != nil { - g.Log().Errorf(ctx, "创建服务发现失败: %v", err) - return nil, err - } - - // 创建新客户端 - newClient := rpcxClient.NewOneClient( - rpcxClient.Failtry, - rpcxClient.RandomSelect, - discovery, - rpcxClient.DefaultOption, - ) - newClient.SetPlugins(pluginsContainer) - - // 更新连接池 - if oldClient, ok := clientPool[serviceName]; ok && oldClient != nil { - oldClient.Close() - } - clientPool[serviceName] = newClient - lastHealthCheckTime[serviceName] = time.Now() - - g.Log().Infof(ctx, "rpcx客户端[%s]创建并加入连接池", serviceName) - - return newClient, nil -} - -// Call 调用rpcx服务方法 -// serviceName: 服务名称 -// serviceMethod: 服务方法 -// args: 请求参数 -// reply: 响应结果 -func Call(ctx context.Context, serviceName, serviceMethod string, args interface{}, reply interface{}) error { - // 从连接池获取客户端(不再关闭连接) - client, err := getOrCreateClient(ctx, serviceName) - if err != nil { - g.Log().Errorf(ctx, "获取rpcx客户端失败: %v", err) - return err - } - - // 设置超时 - callCtx, cancel := context.WithTimeout(ctx, 30*time.Second) - defer cancel() - - // 调用服务方法 - err = client.Call(callCtx, serviceName, serviceMethod, args, reply) - if err != nil { - g.Log().Errorf(ctx, "调用服务[%s]方法[%s]失败: %v", serviceName, serviceMethod, err) - - // 如果调用失败,检查连接是否需要重新创建 - poolMutex.Lock() - if pooledClient, ok := clientPool[serviceName]; ok && pooledClient == client { - // 标记为不健康,下次请求时会重新创建 - delete(lastHealthCheckTime, serviceName) - } - poolMutex.Unlock() - - return err - } - - return nil -} - -// Close 关闭指定服务的连接(用于清理连接池) -func Close(serviceName string) { - poolMutex.Lock() - defer poolMutex.Unlock() - - if client, ok := clientPool[serviceName]; ok { - client.Close() - delete(clientPool, serviceName) - delete(lastHealthCheckTime, serviceName) - delete(serviceAddrCache, serviceName) - g.Log().Infof(context.Background(), "rpcx客户端[%s]已从连接池移除", serviceName) - } -} - -// CloseAll 关闭所有连接(用于优雅停机) -func CloseAll() { - poolMutex.Lock() - defer poolMutex.Unlock() - - for serviceName, client := range clientPool { - client.Close() - g.Log().Infof(context.Background(), "rpcx客户端[%s]已关闭", serviceName) - } - clientPool = make(map[string]*rpcxClient.OneClient) - lastHealthCheckTime = make(map[string]time.Time) - serviceAddrCache = make(map[string]string) -} - -// TracingPlugin rpcx链路追踪插件 -// 实现 rpcx 的 PreCallPlugin 和 PostCallPlugin 接口 -type TracingPlugin struct{} - -// PreCall 调用前拦截 - 创建jaeger span -func (p *TracingPlugin) PreCall(ctx context.Context, serviceName, serviceMethod string, args interface{}) (err error) { - // 创建span,名称格式: ServiceName.Method - spanName := serviceName + "." + serviceMethod - ctx, span := jaeger.NewSpan(ctx, spanName) - - // 记录服务和方法信息 - span.SetAttributes( - attribute.String("rpc.service", serviceName), - attribute.String("rpc.method", serviceMethod), - attribute.String("rpc.system", "rpcx"), - ) - var data []byte - // 记录请求参数(序列化为JSON) - if args != nil { - if data, err = json.Marshal(args); err == nil { - argsStr := string(data) - // 限制长度,避免过大 - if len(argsStr) > 2000 { - argsStr = argsStr[:2000] + "... (truncated)" - } - span.SetAttributes(attribute.String("rpc.request", argsStr)) - } - } - - g.Log().Debugf(ctx, "[rpcx] 调用开始: %s.%s", serviceName, serviceMethod) - - return -} - -// PostCall 调用后拦截 - 记录结果和错误 -func (p *TracingPlugin) PostCall(ctx context.Context, serviceName, serviceMethod string, args interface{}, reply interface{}, err error) error { - span := trace.SpanFromContext(ctx) - if span != nil && span.IsRecording() { - defer span.End() - - // 记录响应结果 - if reply != nil { - if data, err := json.Marshal(reply); err == nil { - replyStr := string(data) - // 限制长度,避免过大 - if len(replyStr) > 2000 { - replyStr = replyStr[:2000] + "... (truncated)" - } - span.SetAttributes(attribute.String("rpc.response", replyStr)) - } - } - - // 处理错误 - if err != nil { - jaeger.RecordError(ctx, err, "rpcx调用失败") - span.SetStatus(codes.Error, err.Error()) - g.Log().Errorf(ctx, "[rpcx] 调用失败: %s.%s, 错误: %v", serviceName, serviceMethod, err) - } else { - g.Log().Debugf(ctx, "[rpcx] 调用成功: %s.%s", serviceName, serviceMethod) - } - } - - return nil -} +//var ( +// // pluginsContainer rpcx插件容器(全局统一设置) +// // init()中添加链路追踪插件,所有client共用此容器 +// pluginsContainer = rpcxClient.NewPluginContainer() +// +// // clientPool 连接池缓存,key为服务名,value为客户端实例 +// clientPool = make(map[string]*rpcxClient.OneClient) +// +// // poolMutex 连接池锁 +// poolMutex sync.RWMutex +// +// // healthCheckInterval 健康检查间隔(秒) +// healthCheckInterval = 30 +// +// // lastHealthCheckTime 上次健康检查时间,key为服务名 +// lastHealthCheckTime = make(map[string]time.Time) +// +// // serviceAddrCache 服务地址缓存,key为服务名,value为地址 +// serviceAddrCache = make(map[string]string) +//) +// +//func init() { +// // 全局设置链路追踪插件,所有client共用 +// pluginsContainer.Add(&TracingPlugin{}) +// +// // 启动后台健康检查协程 +// go healthCheckLoop() +//} +// +//// healthCheckLoop 后台健康检查循环 +//func healthCheckLoop() { +// ticker := time.NewTicker(time.Duration(healthCheckInterval) * time.Second) +// defer ticker.Stop() +// +// for range ticker.C { +// checkAllConnections() +// } +//} +// +//// checkAllConnections 检查所有缓存连接的健康状态 +//func checkAllConnections() { +// poolMutex.Lock() +// defer poolMutex.Unlock() +// +// now := time.Now() +// for serviceName, client := range clientPool { +// // 检查连接是否需要健康检查 +// if lastCheck, ok := lastHealthCheckTime[serviceName]; ok { +// if now.Sub(lastCheck) < time.Duration(healthCheckInterval)*time.Second { +// continue +// } +// } +// +// ctx := context.Background() +// +// // 检查连接健康状态(心跳检测) +// if !isClientHealthy(ctx, client, serviceName) { +// g.Log().Warningf(ctx, "检测到服务[%s]连接不健康,将从连接池移除", serviceName) +// client.Close() +// delete(clientPool, serviceName) +// delete(lastHealthCheckTime, serviceName) +// delete(serviceAddrCache, serviceName) +// continue +// } +// +// // 连接健康,检查服务地址是否发生变化 +// currentAddr, err := consul.GetInstanceAddr(ctx, serviceName) +// if err != nil { +// g.Log().Warningf(ctx, "健康检查时从consul获取服务[%s]地址失败: %v,保持现有连接", serviceName, err) +// lastHealthCheckTime[serviceName] = now +// continue +// } +// +// // 检查地址是否发生变化 +// if oldAddr, ok := serviceAddrCache[serviceName]; ok && oldAddr != currentAddr { +// g.Log().Infof(ctx, "检测到服务[%s]地址变化: %s -> %s,重建连接", serviceName, oldAddr, currentAddr) +// // 关闭旧连接并从连接池移除,下次请求时会创建新连接 +// client.Close() +// delete(clientPool, serviceName) +// delete(lastHealthCheckTime, serviceName) +// // 更新缓存的新地址 +// serviceAddrCache[serviceName] = currentAddr +// } else { +// // 地址未变化,更新检查时间 +// if !ok { +// serviceAddrCache[serviceName] = currentAddr +// } +// g.Log().Debugf(ctx, "服务[%s]地址未变化,连接健康", serviceName) +// } +// +// lastHealthCheckTime[serviceName] = now +// } +//} +// +//// isClientHealthy 检查client是否健康 +//// 使用心跳检测方式:尝试调用服务的心跳方法 +//func isClientHealthy(ctx context.Context, client *rpcxClient.OneClient, serviceName string) bool { +// if client == nil { +// return false +// } +// +// // 设置较短的超时时间,避免阻塞 +// pingCtx, cancel := context.WithTimeout(ctx, 2*time.Second) +// defer cancel() +// +// // 尝试调用健康检查方法 +// // 大多数服务都会提供 Ping 或 Health 方法 +// // 如果服务没有提供这些方法,会返回错误,我们认为是健康的 +// // 因为连接本身是正常的,只是方法不存在 +// var reply interface{} +// err := client.Call(pingCtx, serviceName, "Ping", nil, &reply) +// +// // 如果调用成功,连接肯定健康 +// if err == nil { +// return true +// } +// +// // 如果是方法不存在的错误,说明连接是健康的,只是服务没有Ping方法 +// // 这种情况下我们认为是健康的 +// if isMethodNotFoundError(err) || isServiceNotFoundError(err) { +// return true +// } +// +// // 其他错误(网络错误、超时等)说明连接不健康 +// g.Log().Warningf(ctx, "健康检查失败,服务[%s]连接可能不健康: %v", serviceName, err) +// return false +//} +// +//// isMethodNotFoundError 判断是否是方法未找到错误 +//func isMethodNotFoundError(err error) bool { +// if err == nil { +// return false +// } +// errStr := err.Error() +// // rpcx 方法不存在的常见错误信息 +// return strings.Contains(errStr, "not found") || +// strings.Contains(errStr, "no such") || +// strings.Contains(errStr, "service not found") || +// strings.Contains(errStr, "method not found") +//} +// +//// isServiceNotFoundError 判断是否是服务未找到错误 +//func isServiceNotFoundError(err error) bool { +// if err == nil { +// return false +// } +// errStr := err.Error() +// return strings.Contains(errStr, "no service") || +// strings.Contains(errStr, "service not registered") +//} +// +//// getOrCreateClient 从连接池获取或创建客户端(带连接池) +//func getOrCreateClient(ctx context.Context, serviceName string) (*rpcxClient.OneClient, error) { +// if g.IsEmpty(serviceName) { +// return nil, errors.New("服务名称不能为空") +// } +// +// // 先尝试从连接池获取 +// poolMutex.RLock() +// client, exists := clientPool[serviceName] +// poolMutex.RUnlock() +// +// // 如果存在且健康,直接返回 +// if exists && isClientHealthy(ctx, client, serviceName) { +// g.Log().Debugf(ctx, "从连接池获取rpcx客户端[%s]", serviceName) +// return client, nil +// } +// +// // 不存在或不健康,重新创建 +// poolMutex.Lock() +// defer poolMutex.Unlock() +// +// // 双重检查,防止并发时重复创建 +// if client, exists := clientPool[serviceName]; exists && isClientHealthy(ctx, client, serviceName) { +// return client, nil +// } +// +// // 获取服务实例地址 +// addr, err := consul.GetInstanceAddr(ctx, serviceName) +// if err != nil { +// g.Log().Errorf(ctx, "从consul获取服务[%s]地址失败: %v", serviceName, err) +// return nil, err +// } +// +// g.Log().Debugf(ctx, "服务[%s]地址: %s", serviceName, addr) +// +// // 缓存服务地址,用于健康检查时对比 +// serviceAddrCache[serviceName] = addr +// +// // 创建服务发现 +// discovery, err := rpcxClient.NewPeer2PeerDiscovery("tcp@"+addr, "") +// if err != nil { +// g.Log().Errorf(ctx, "创建服务发现失败: %v", err) +// return nil, err +// } +// +// // 创建新客户端 +// newClient := rpcxClient.NewOneClient( +// rpcxClient.Failtry, +// rpcxClient.RandomSelect, +// discovery, +// rpcxClient.DefaultOption, +// ) +// newClient.SetPlugins(pluginsContainer) +// +// // 更新连接池 +// if oldClient, ok := clientPool[serviceName]; ok && oldClient != nil { +// oldClient.Close() +// } +// clientPool[serviceName] = newClient +// lastHealthCheckTime[serviceName] = time.Now() +// +// g.Log().Infof(ctx, "rpcx客户端[%s]创建并加入连接池", serviceName) +// +// return newClient, nil +//} +// +//// Call 调用rpcx服务方法 +//// serviceName: 服务名称 +//// serviceMethod: 服务方法 +//// args: 请求参数 +//// reply: 响应结果 +//func Call(ctx context.Context, serviceName, serviceMethod string, args interface{}, reply interface{}) error { +// // 从连接池获取客户端(不再关闭连接) +// client, err := getOrCreateClient(ctx, serviceName) +// if err != nil { +// g.Log().Errorf(ctx, "获取rpcx客户端失败: %v", err) +// return err +// } +// +// // 设置超时 +// callCtx, cancel := context.WithTimeout(ctx, 30*time.Second) +// defer cancel() +// +// // 调用服务方法 +// err = client.Call(callCtx, serviceName, serviceMethod, args, reply) +// if err != nil { +// g.Log().Errorf(ctx, "调用服务[%s]方法[%s]失败: %v", serviceName, serviceMethod, err) +// +// // 如果调用失败,检查连接是否需要重新创建 +// poolMutex.Lock() +// if pooledClient, ok := clientPool[serviceName]; ok && pooledClient == client { +// // 标记为不健康,下次请求时会重新创建 +// delete(lastHealthCheckTime, serviceName) +// } +// poolMutex.Unlock() +// +// return err +// } +// +// return nil +//} +// +//// Close 关闭指定服务的连接(用于清理连接池) +//func Close(serviceName string) { +// poolMutex.Lock() +// defer poolMutex.Unlock() +// +// if client, ok := clientPool[serviceName]; ok { +// client.Close() +// delete(clientPool, serviceName) +// delete(lastHealthCheckTime, serviceName) +// delete(serviceAddrCache, serviceName) +// g.Log().Infof(context.Background(), "rpcx客户端[%s]已从连接池移除", serviceName) +// } +//} +// +//// CloseAll 关闭所有连接(用于优雅停机) +//func CloseAll() { +// poolMutex.Lock() +// defer poolMutex.Unlock() +// +// for serviceName, client := range clientPool { +// client.Close() +// g.Log().Infof(context.Background(), "rpcx客户端[%s]已关闭", serviceName) +// } +// clientPool = make(map[string]*rpcxClient.OneClient) +// lastHealthCheckTime = make(map[string]time.Time) +// serviceAddrCache = make(map[string]string) +//} +// +//// TracingPlugin rpcx链路追踪插件 +//// 实现 rpcx 的 PreCallPlugin 和 PostCallPlugin 接口 +//type TracingPlugin struct{} +// +//// PreCall 调用前拦截 - 创建jaeger span +//func (p *TracingPlugin) PreCall(ctx context.Context, serviceName, serviceMethod string, args interface{}) (err error) { +// // 创建span,名称格式: ServiceName.Method +// spanName := serviceName + "." + serviceMethod +// ctx, span := jaeger.NewSpan(ctx, spanName) +// +// // 记录服务和方法信息 +// span.SetAttributes( +// attribute.String("rpc.service", serviceName), +// attribute.String("rpc.method", serviceMethod), +// attribute.String("rpc.system", "rpcx"), +// ) +// var data []byte +// // 记录请求参数(序列化为JSON) +// if args != nil { +// if data, err = json.Marshal(args); err == nil { +// argsStr := string(data) +// // 限制长度,避免过大 +// if len(argsStr) > 2000 { +// argsStr = argsStr[:2000] + "... (truncated)" +// } +// span.SetAttributes(attribute.String("rpc.request", argsStr)) +// } +// } +// +// g.Log().Debugf(ctx, "[rpcx] 调用开始: %s.%s", serviceName, serviceMethod) +// +// return +//} +// +//// PostCall 调用后拦截 - 记录结果和错误 +//func (p *TracingPlugin) PostCall(ctx context.Context, serviceName, serviceMethod string, args interface{}, reply interface{}, err error) error { +// span := trace.SpanFromContext(ctx) +// if span != nil && span.IsRecording() { +// defer span.End() +// +// // 记录响应结果 +// if reply != nil { +// if data, err := json.Marshal(reply); err == nil { +// replyStr := string(data) +// // 限制长度,避免过大 +// if len(replyStr) > 2000 { +// replyStr = replyStr[:2000] + "... (truncated)" +// } +// span.SetAttributes(attribute.String("rpc.response", replyStr)) +// } +// } +// +// // 处理错误 +// if err != nil { +// jaeger.RecordError(ctx, err, "rpcx调用失败") +// span.SetStatus(codes.Error, err.Error()) +// g.Log().Errorf(ctx, "[rpcx] 调用失败: %s.%s, 错误: %v", serviceName, serviceMethod, err) +// } else { +// g.Log().Debugf(ctx, "[rpcx] 调用成功: %s.%s", serviceName, serviceMethod) +// } +// } +// +// return nil +//}