diff --git a/.github/mergify.yml b/.github/mergify.yml index 3a589a8a..ff4f30bd 100644 --- a/.github/mergify.yml +++ b/.github/mergify.yml @@ -26,6 +26,7 @@ pull_request_rules: - name: Test passed for code changed-main conditions: + - check-success=Unit test go - check-success=Backup and restore cross version (docker-compose, standalone, standalone, 2.3-latest, master-latest) - check-success=Backup and restore cross version (docker-compose, standalone, standalone, 2.3-latest, 2.4-latest) - check-success=Backup and restore cross version (docker-compose, standalone, standalone, 2.4-latest, master-latest) @@ -113,7 +114,22 @@ pull_request_rules: - base=main - files~=^(?=.*((\.(go|h|cpp)|CMakeLists.txt))).*$ - or: - - check-failure=CI + - check-success!=Unit test go + - check-success!=Backup and restore cross version (docker-compose, standalone, standalone, 2.3-latest, master-latest) + - check-success!=Backup and restore cross version (docker-compose, standalone, standalone, 2.3-latest, 2.4-latest) + - check-success!=Backup and restore cross version (docker-compose, standalone, standalone, 2.4-latest, master-latest) + - check-success!=Backup and restore after upgrade (docker-compose, standalone, standalone, v2.4.17, master-latest) + - check-success!=Backup and restore after upgrade (docker-compose, standalone, standalone, v2.4.17, 2.4-latest) + - check-success!=Backup and restore after upgrade (docker-compose, standalone, standalone, 2.4-latest, master-latest) + - check-success!=Backup and restore cli (docker-compose, standalone, standalone, master-latest) + - check-success!=Backup and restore with rbac config (helm, standalone, master-latest) + - check-success!=Backup and restore api (docker-compose, standalone, master-latest, L0) + - check-success!=Backup and restore api (docker-compose, standalone, master-latest, L1) + - check-success!=Backup and restore api (docker-compose, standalone, master-latest, L2) + - check-success!=Backup and restore api (docker-compose, standalone, master-latest, MASTER) + - check-success!=Backup and restore api (docker-compose, standalone, 2.4-latest, L0) + - check-success!=Backup and restore api (docker-compose, standalone, 2.4-latest, L1) + - check-success!=Backup and restore api (docker-compose, standalone, 2.4-latest, L2) actions: label: remove: diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index 5cc250b8..f5382a06 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -11,12 +11,30 @@ on: schedule: - cron: '0 0 * * *' + +env: + go-version: 1.23 + concurrency: group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} cancel-in-progress: true jobs: + unit-test-go: + name: Unit test go + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Set up Go ${{ env.go-version }} + uses: actions/setup-go@v3 + with: + go-version: ${{ env.go-version }} + cache: true + - name: Unit test + run: make test + test-backup-restore-cross-version: + needs: unit-test-go name: Backup and restore cross version runs-on: ubuntu-latest strategy: @@ -41,8 +59,9 @@ jobs: cache: pip - uses: actions/setup-go@v3 + name: Set up Go ${{ env.go-version }} with: - go-version: '1.23' + go-version: ${{ env.go-version }} cache: true - name: Build @@ -62,7 +81,6 @@ jobs: sudo chmod +x /usr/local/bin/docker-compose - name: Milvus deploy - timeout-minutes: 15 shell: bash working-directory: deployment/${{ matrix.milvus_mode }} @@ -157,6 +175,7 @@ jobs: python example/verify_data.py test-backup-restore-after-upgrade: + needs: unit-test-go name: Backup and restore after upgrade runs-on: ubuntu-latest strategy: @@ -181,8 +200,9 @@ jobs: cache: pip - uses: actions/setup-go@v3 + name: Set up Go ${{ env.go-version }} with: - go-version: '1.23' + go-version: ${{ env.go-version }} cache: true - name: Build @@ -251,6 +271,7 @@ jobs: test-backup-restore-cli: + needs: unit-test-go name: Backup and restore cli runs-on: ubuntu-latest strategy: @@ -272,8 +293,9 @@ jobs: cache: pip - uses: actions/setup-go@v3 + name: Set up Go ${{ env.go-version }} with: - go-version: '1.23' + go-version: ${{ env.go-version }} cache: true - name: Build @@ -386,6 +408,7 @@ jobs: python example/verify_data.py test-backup-restore-with-rbac-config: + needs: unit-test-go name: Backup and restore with rbac config runs-on: ubuntu-latest strategy: @@ -406,7 +429,7 @@ jobs: - uses: actions/setup-go@v3 with: - go-version: '1.23' + go-version: ${{ env.go-version }} cache: "true" - name: Creating kind cluster @@ -555,7 +578,7 @@ jobs: - uses: actions/setup-go@v3 with: - go-version: '1.23' + go-version: ${{ env.go-version }} cache: true - name: Build diff --git a/Makefile b/Makefile index ed04bf7d..e37d93d5 100644 --- a/Makefile +++ b/Makefile @@ -12,6 +12,10 @@ LDFLAGS += -X "$(PKG)/version.Date=$(DATE)" # Default target all: gen build +test: + @echo "Running unit tests..." + @go test --race ./... + # Build the binary build: @echo "Building Backup binary..." diff --git a/core/backup_context.go b/core/backup_context.go index 1d176e29..f621b239 100644 --- a/core/backup_context.go +++ b/core/backup_context.go @@ -80,7 +80,7 @@ func paramsToCfg(params *paramtable.BackupParams) (*client.Cfg, error) { } cfg := &client.Cfg{ - Address: ep, + Host: ep, EnableTLS: enableTLS, Username: params.MilvusCfg.User, Password: params.MilvusCfg.Password, diff --git a/core/backup_context_test.go b/core/backup_context_test.go deleted file mode 100644 index 1168b91a..00000000 --- a/core/backup_context_test.go +++ /dev/null @@ -1,307 +0,0 @@ -package core - -import ( - "context" - "fmt" - "math/rand" - "testing" - - "github.com/zilliztech/milvus-backup/core/meta" - - "github.com/stretchr/testify/assert" - "go.uber.org/zap" - - "github.com/zilliztech/milvus-backup/core/paramtable" - "github.com/zilliztech/milvus-backup/core/proto/backuppb" - "github.com/zilliztech/milvus-backup/core/utils" - "github.com/zilliztech/milvus-backup/internal/log" -) - -func TestCreateBackup(t *testing.T) { - var params paramtable.BackupParams - params.Init() - ctx := context.Background() - backup := CreateBackupContext(ctx, ¶ms) - - req := &backuppb.CreateBackupRequest{ - BackupName: "test_21", - //CollectionNames: []string{"hello_milvus", "hello_milvus2"}, - DbCollections: utils.WrapDBCollections(""), - } - backup.CreateBackup(ctx, req) -} - -func TestCheck(t *testing.T) { - var params paramtable.BackupParams - params.Init() - ctx := context.Background() - backup := CreateBackupContext(ctx, ¶ms) - - res := backup.Check(ctx) - println(res) -} - -func TestListBackups(t *testing.T) { - var params paramtable.BackupParams - params.Init() - ctx := context.Background() - backupContext := CreateBackupContext(ctx, ¶ms) - - backupLists := backupContext.ListBackups(ctx, &backuppb.ListBackupsRequest{}) - assert.Equal(t, backupLists.GetCode(), backuppb.ResponseCode_Success) - - backupListsWithCollection := backupContext.ListBackups(ctx, &backuppb.ListBackupsRequest{ - //CollectionName: "hello_milvus", - }) - - for _, backup := range backupListsWithCollection.GetData() { - fmt.Println(backup.GetName()) - } - - //assert.NoError(t, err) - //assert.Equal(t, 1, len(backupListsWithCollection.BackupInfos)) - // - //backupListsWithCollection2, err := backupContext.ListBackups(context, &backuppb.ListBackupsRequest{ - // CollectionName: "hello_milvus2", - //}) - //assert.NoError(t, err) - //assert.Equal(t, 0, len(backupListsWithCollection2.BackupInfos)) -} - -func TestGetBackup(t *testing.T) { - var params paramtable.BackupParams - params.Init() - ctx := context.Background() - backupContext := CreateBackupContext(ctx, ¶ms) - - backup := backupContext.GetBackup(ctx, &backuppb.GetBackupRequest{ - BackupName: "mybackup", - }) - assert.Equal(t, backup.GetCode(), backuppb.ResponseCode_Success) -} - -func TestDeleteBackup(t *testing.T) { - var params paramtable.BackupParams - params.Init() - ctx := context.Background() - backupContext := CreateBackupContext(ctx, ¶ms) - - backup := backupContext.DeleteBackup(ctx, &backuppb.DeleteBackupRequest{ - BackupName: "test_backup6", - }) - assert.Equal(t, backup.GetCode(), backuppb.ResponseCode_Success) - - backupLists := backupContext.ListBackups(ctx, &backuppb.ListBackupsRequest{}) - assert.Equal(t, backupLists.GetCode(), backuppb.ResponseCode_Success) - - assert.Equal(t, 0, len(backupLists.GetData())) - -} - -func TestCreateBackupWithNoName(t *testing.T) { - var params paramtable.BackupParams - params.Init() - ctx := context.Background() - backup := CreateBackupContext(ctx, ¶ms) - - randBackupName := "" - - req := &backuppb.CreateBackupRequest{ - BackupName: randBackupName, - } - resp := backup.CreateBackup(ctx, req) - assert.Equal(t, backuppb.ResponseCode_Success, resp.GetCode()) - - // clean - backup.DeleteBackup(ctx, &backuppb.DeleteBackupRequest{ - BackupName: randBackupName, - }) -} - -func TestCreateBackupWithUnexistCollection(t *testing.T) { - var params paramtable.BackupParams - params.Init() - ctx := context.Background() - backup := CreateBackupContext(ctx, ¶ms) - - randBackupName := fmt.Sprintf("test_%d", rand.Int()) - - req := &backuppb.CreateBackupRequest{ - BackupName: randBackupName, - CollectionNames: []string{"not_exist"}, - } - resp := backup.CreateBackup(ctx, req) - assert.Equal(t, backuppb.ResponseCode_Fail, resp.GetCode()) - assert.Equal(t, "request backup collection does not exist: not_exist", resp.GetMsg()) - - // clean - backup.DeleteBackup(ctx, &backuppb.DeleteBackupRequest{ - BackupName: randBackupName, - }) -} - -func TestCreateBackupWithDuplicateName(t *testing.T) { - var params paramtable.BackupParams - params.Init() - ctx := context.Background() - backup := CreateBackupContext(ctx, ¶ms) - - randBackupName := fmt.Sprintf("test_%d", rand.Int()) - - req := &backuppb.CreateBackupRequest{ - BackupName: randBackupName, - } - resp := backup.CreateBackup(ctx, req) - assert.Equal(t, backuppb.ResponseCode_Success, resp.GetCode()) - - req2 := &backuppb.CreateBackupRequest{ - BackupName: randBackupName, - } - resp2 := backup.CreateBackup(ctx, req2) - assert.Equal(t, backuppb.ResponseCode_Fail, resp2.GetCode()) - assert.Equal(t, fmt.Sprintf("backup already exist with the name: %s", req2.GetBackupName()), resp2.GetMsg()) - - // clean - backup.DeleteBackup(ctx, &backuppb.DeleteBackupRequest{ - BackupName: randBackupName, - }) -} - -func TestCreateBackupWithIllegalName(t *testing.T) { - var params paramtable.BackupParams - params.Init() - ctx := context.Background() - backup := CreateBackupContext(ctx, ¶ms) - - randBackupName := "dahgg$%123" - - req := &backuppb.CreateBackupRequest{ - BackupName: randBackupName, - } - resp := backup.CreateBackup(ctx, req) - assert.Equal(t, backuppb.ResponseCode_Fail, resp.GetCode()) - - // clean - backup.DeleteBackup(ctx, &backuppb.DeleteBackupRequest{ - BackupName: randBackupName, - }) -} - -func TestGetBackupAfterCreate(t *testing.T) { - var params paramtable.BackupParams - params.Init() - ctx := context.Background() - backupContext := CreateBackupContext(ctx, ¶ms) - - randBackupName := fmt.Sprintf("test_%d", rand.Int()) - - req := &backuppb.CreateBackupRequest{ - BackupName: randBackupName, - } - resp := backupContext.CreateBackup(ctx, req) - assert.Equal(t, backuppb.ResponseCode_Success, resp.GetCode()) - - backup := backupContext.GetBackup(ctx, &backuppb.GetBackupRequest{ - BackupName: randBackupName, - }) - assert.Equal(t, backuppb.ResponseCode_Success, backup.GetCode()) - - // clean - backupContext.DeleteBackup(ctx, &backuppb.DeleteBackupRequest{ - BackupName: randBackupName, - }) -} - -func TestGetBackupFaultBackup(t *testing.T) { - var params paramtable.BackupParams - params.Init() - ctx := context.Background() - backupContext := CreateBackupContext(ctx, ¶ms) - backupContext.Start() - - randBackupName := fmt.Sprintf("test_%d", rand.Int()) - - backupContext.DeleteBackup(ctx, &backuppb.DeleteBackupRequest{ - BackupName: randBackupName, - }) - - req := &backuppb.CreateBackupRequest{ - BackupName: randBackupName, - } - resp := backupContext.CreateBackup(ctx, req) - assert.Equal(t, backuppb.ResponseCode_Success, resp.GetCode()) - - backupContext.getMilvusStorageClient().RemoveWithPrefix(ctx, params.MinioCfg.BackupBucketName, meta.BackupMetaPath(params.MinioCfg.BackupRootPath, resp.GetData().GetName())) - - backup := backupContext.GetBackup(ctx, &backuppb.GetBackupRequest{ - BackupName: randBackupName, - }) - assert.Equal(t, backuppb.ResponseCode_Fail, backup.GetCode()) - - // clean - backupContext.DeleteBackup(ctx, &backuppb.DeleteBackupRequest{ - BackupName: randBackupName, - }) -} - -func TestGetBackupUnexistBackupName(t *testing.T) { - var params paramtable.BackupParams - params.Init() - ctx := context.Background() - backupContext := CreateBackupContext(ctx, ¶ms) - backupContext.Start() - - backup := backupContext.GetBackup(ctx, &backuppb.GetBackupRequest{ - BackupName: "un_exist", - }) - assert.Equal(t, backuppb.ResponseCode_Fail, backup.GetCode()) -} - -func TestRestoreBackup(t *testing.T) { - var params paramtable.BackupParams - params.Init() - ctx := context.Background() - backup := CreateBackupContext(ctx, ¶ms) - backup.Start() - backupName := "demo" - //fmt.Sprintf("test_%d", rand.Int()) - - restoreResp := backup.RestoreBackup(ctx, &backuppb.RestoreBackupRequest{ - BackupName: backupName, - DbCollections: utils.WrapDBCollections("{\"default\":[]}"), - }) - log.Info("restore backup", zap.Any("resp", restoreResp)) -} - -func TestCreateAndRestoreBackup(t *testing.T) { - var params paramtable.BackupParams - params.Init() - ctx := context.Background() - backup := CreateBackupContext(ctx, ¶ms) - backup.Start() - randBackupName := "test" - //fmt.Sprintf("test_%d", rand.Int()) - - req := &backuppb.CreateBackupRequest{ - BackupName: randBackupName, - } - resp := backup.CreateBackup(ctx, req) - assert.Equal(t, backuppb.ResponseCode_Success, resp.GetCode()) - - getReq := &backuppb.GetBackupRequest{ - BackupName: randBackupName, - } - getResp := backup.GetBackup(ctx, getReq) - assert.Equal(t, backuppb.ResponseCode_Success, getResp.GetCode()) - - restoreResp := backup.RestoreBackup(ctx, &backuppb.RestoreBackupRequest{ - BackupName: randBackupName, - CollectionSuffix: "_recover", - }) - log.Info("restore backup", zap.Any("resp", restoreResp)) - - //clean - backup.DeleteBackup(ctx, &backuppb.DeleteBackupRequest{ - BackupName: randBackupName, - }) -} diff --git a/core/backup_meta_test.go b/core/backup_meta_test.go deleted file mode 100644 index 763b2fe0..00000000 --- a/core/backup_meta_test.go +++ /dev/null @@ -1,263 +0,0 @@ -package core - -import ( - "bufio" - "fmt" - "io" - "os" - "strconv" - "testing" - - "github.com/zilliztech/milvus-backup/core/meta" - - jsoniter "github.com/json-iterator/go" - "github.com/stretchr/testify/assert" - - "github.com/zilliztech/milvus-backup/core/proto/backuppb" - "github.com/zilliztech/milvus-backup/internal/log" - "github.com/zilliztech/milvus-backup/internal/util/funcutil" -) - -func TestBackupSerialize(t *testing.T) { - - constructCollectionSchema := func() *backuppb.CollectionSchema { - int64Field := "int64" - floatVecField := "fVec" - dim := 128 - prefix := "test_backup_" - collectionName := prefix + funcutil.GenRandomStr() - pk := &backuppb.FieldSchema{ - FieldID: 0, - Name: int64Field, - IsPrimaryKey: true, - Description: "", - DataType: backuppb.DataType_Int64, - TypeParams: nil, - IndexParams: nil, - AutoID: true, - } - fVec := &backuppb.FieldSchema{ - FieldID: 0, - Name: floatVecField, - IsPrimaryKey: false, - Description: "", - DataType: backuppb.DataType_FloatVector, - TypeParams: []*backuppb.KeyValuePair{ - { - Key: "dim", - Value: strconv.Itoa(dim), - }, - }, - IndexParams: nil, - AutoID: false, - } - return &backuppb.CollectionSchema{ - Name: collectionName, - Description: "", - AutoID: false, - Fields: []*backuppb.FieldSchema{ - pk, - fVec, - }, - } - } - schema := constructCollectionSchema() - - var collection_id int64 = 1 - - segement1 := &backuppb.SegmentBackupInfo{ - SegmentId: 1001, - CollectionId: collection_id, - PartitionId: 101, - NumOfRows: 3000, - Binlogs: []*backuppb.FieldBinlog{{Binlogs: []*backuppb.Binlog{{EntriesNum: 3000, TimestampFrom: 100, TimestampTo: 200, LogSize: 3000}}}}, - Statslogs: []*backuppb.FieldBinlog{{Binlogs: []*backuppb.Binlog{{EntriesNum: 3000, TimestampFrom: 100, TimestampTo: 200, LogSize: 6000}}}}, - Deltalogs: []*backuppb.FieldBinlog{{Binlogs: []*backuppb.Binlog{{EntriesNum: 3000, TimestampFrom: 100, TimestampTo: 200, LogSize: 9000}}}}, - } - - segement2 := &backuppb.SegmentBackupInfo{ - SegmentId: 1002, - CollectionId: collection_id, - PartitionId: 101, - NumOfRows: 3000, - Binlogs: []*backuppb.FieldBinlog{{Binlogs: []*backuppb.Binlog{{EntriesNum: 3000, TimestampFrom: 100, TimestampTo: 200, LogSize: 3000}}}}, - Statslogs: []*backuppb.FieldBinlog{{Binlogs: []*backuppb.Binlog{{EntriesNum: 3000, TimestampFrom: 100, TimestampTo: 200, LogSize: 6000}}}}, - Deltalogs: []*backuppb.FieldBinlog{{Binlogs: []*backuppb.Binlog{{EntriesNum: 3000, TimestampFrom: 100, TimestampTo: 200, LogSize: 9000}}}}, - } - - segement3 := &backuppb.SegmentBackupInfo{ - SegmentId: 1003, - CollectionId: collection_id, - PartitionId: 102, - NumOfRows: 3000, - Binlogs: []*backuppb.FieldBinlog{{Binlogs: []*backuppb.Binlog{{EntriesNum: 3000, TimestampFrom: 100, TimestampTo: 200, LogSize: 3000}}}}, - Statslogs: []*backuppb.FieldBinlog{{Binlogs: []*backuppb.Binlog{{EntriesNum: 3000, TimestampFrom: 100, TimestampTo: 200, LogSize: 6000}}}}, - Deltalogs: []*backuppb.FieldBinlog{{Binlogs: []*backuppb.Binlog{{EntriesNum: 3000, TimestampFrom: 100, TimestampTo: 200, LogSize: 9000}}}}, - } - - segement4 := &backuppb.SegmentBackupInfo{ - SegmentId: 1004, - CollectionId: collection_id, - PartitionId: 102, - NumOfRows: 3000, - Binlogs: []*backuppb.FieldBinlog{{Binlogs: []*backuppb.Binlog{{EntriesNum: 3000, TimestampFrom: 100, TimestampTo: 200, LogSize: 3000}}}}, - Statslogs: []*backuppb.FieldBinlog{{Binlogs: []*backuppb.Binlog{{EntriesNum: 3000, TimestampFrom: 100, TimestampTo: 200, LogSize: 6000}}}}, - Deltalogs: []*backuppb.FieldBinlog{{Binlogs: []*backuppb.Binlog{{EntriesNum: 3000, TimestampFrom: 100, TimestampTo: 200, LogSize: 9000}}}}, - } - - partition1 := &backuppb.PartitionBackupInfo{ - PartitionId: 101, - PartitionName: "20220101", - CollectionId: collection_id, - SegmentBackups: []*backuppb.SegmentBackupInfo{segement1, segement2}, - } - - partition2 := &backuppb.PartitionBackupInfo{ - PartitionId: 102, - PartitionName: "20220102", - CollectionId: collection_id, - SegmentBackups: []*backuppb.SegmentBackupInfo{segement3, segement4}, - } - - collection := &backuppb.CollectionBackupInfo{ - CollectionId: collection_id, - DbName: "default", - CollectionName: "hello_milvus", - Schema: schema, - ShardsNum: 2, - ConsistencyLevel: backuppb.ConsistencyLevel_Strong, - BackupTimestamp: 0, - PartitionBackups: []*backuppb.PartitionBackupInfo{partition1, partition2}, - } - - backup := &backuppb.BackupInfo{ - Name: "backup", - BackupTimestamp: 0, - CollectionBackups: []*backuppb.CollectionBackupInfo{collection}, - } - - serData, err := meta.serialize(backup) - assert.NoError(t, err) - log.Info(string(serData.BackupMetaBytes)) - log.Info(string(serData.CollectionMetaBytes)) - log.Info(string(serData.PartitionMetaBytes)) - log.Info(string(serData.SegmentMetaBytes)) - - deserBackup, err := meta.deserialize(serData) - log.Info(deserBackup.String()) -} - -func TestDbCollectionJson(t *testing.T) { - dbCollection := meta.DbCollections{"db1": []string{"coll1", "coll2"}, "db2": []string{"coll3", "coll4"}} - jsonStr, err := jsoniter.MarshalToString(dbCollection) - assert.NoError(t, err) - println(jsonStr) - - var dbCollection2 meta.DbCollections - jsoniter.UnmarshalFromString(jsonStr, &dbCollection2) - println(dbCollection2) -} - -func readBackup(backupDir string) (*backuppb.BackupInfo, error) { - readByteFunc := func(filepath string) ([]byte, error) { - file, err := os.OpenFile(filepath, os.O_RDWR, 0666) - if err != nil { - fmt.Println("Open file error!", err) - return nil, err - } - - // Get the file size - stat, err := file.Stat() - if err != nil { - fmt.Println(err) - return nil, err - } - - bs := make([]byte, stat.Size()) - _, err = bufio.NewReader(file).Read(bs) - if err != nil && err != io.EOF { - fmt.Println(err) - return nil, err - } - return bs, nil - } - - backupPath := backupDir + "/backup_meta.json" - collectionPath := backupDir + "/collection_meta.json" - partitionPath := backupDir + "/partition_meta.json" - segmentPath := backupDir + "/segment_meta.json" - - backupMetaBytes, err := readByteFunc(backupPath) - if err != nil { - return nil, err - } - collectionBackupMetaBytes, err := readByteFunc(collectionPath) - if err != nil { - return nil, err - } - partitionBackupMetaBytes, err := readByteFunc(partitionPath) - if err != nil { - return nil, err - } - segmentBackupMetaBytes, err := readByteFunc(segmentPath) - if err != nil { - return nil, err - } - - completeBackupMetas := &meta.BackupMetaBytes{ - BackupMetaBytes: backupMetaBytes, - CollectionMetaBytes: collectionBackupMetaBytes, - PartitionMetaBytes: partitionBackupMetaBytes, - SegmentMetaBytes: segmentBackupMetaBytes, - } - - deserBackup, err := meta.deserialize(completeBackupMetas) - - return deserBackup, err -} - -func TestReadBackupFile(t *testing.T) { - filepath := "/tmp/hxs_meta" - - backupInfo, err := readBackup(filepath) - assert.NoError(t, err) - - levelBackupInfo, err := meta.treeToLevel(backupInfo) - assert.NoError(t, err) - assert.NotNil(t, levelBackupInfo) - - output, _ := meta.serialize(backupInfo) - BackupMetaStr := string(output.BackupMetaBytes) - segmentMetaStr := string(output.SegmentMetaBytes) - fmt.Sprintf(BackupMetaStr) - fmt.Sprintf(segmentMetaStr) - //log.Info("segment meta", zap.String("value", string(output.SegmentMetaBytes))) -} - -func TestSimpleBackupResponse(t *testing.T) { - info := &backuppb.BackupInfoResponse{ - RequestId: "abc", - Code: backuppb.ResponseCode_Success, - Msg: "not found", - Data: nil, - } - simpleInfo := meta.SimpleBackupResponse(info) - assert.Nil(t, simpleInfo.Data) - assert.Equal(t, info.Code, simpleInfo.Code) - assert.Equal(t, info.Msg, simpleInfo.Msg) - assert.Equal(t, info.RequestId, simpleInfo.RequestId) -} - -func TestSimpleRestoreResponse(t *testing.T) { - info := &backuppb.RestoreBackupResponse{ - RequestId: "abc", - Code: backuppb.ResponseCode_Success, - Msg: "not found", - Data: nil, - } - simpleInfo := meta.SimpleRestoreResponse(info) - assert.Nil(t, simpleInfo.Data) - assert.Equal(t, info.Code, simpleInfo.Code) - assert.Equal(t, info.Msg, simpleInfo.Msg) - assert.Equal(t, info.RequestId, simpleInfo.RequestId) -} diff --git a/core/backup_server_test.go b/core/backup_server_test.go deleted file mode 100644 index d14a71d6..00000000 --- a/core/backup_server_test.go +++ /dev/null @@ -1,37 +0,0 @@ -package core - -import ( - "context" - "net/http" - "net/http/pprof" - "testing" - "time" - - "github.com/stretchr/testify/assert" - - "github.com/zilliztech/milvus-backup/core/paramtable" -) - -func TestBackupService(t *testing.T) { - var params paramtable.BackupParams - milvusYamlFile := "backup.yaml" - params.GlobalInitWithYaml(milvusYamlFile) - params.Init() - - context := context.Background() - server, err := NewServer(context, ¶ms) - assert.NoError(t, err) - server.Init() - server.Start() - time.Sleep(1000 * time.Second) - -} - -func TestProfileService(t *testing.T) { - go func() { - http.HandleFunc("/debug/pprof/heap", pprof.Index) - http.ListenAndServe("localhost:8089", nil) - }() - time.Sleep(1000 * time.Second) - -} diff --git a/core/client/cfg.go b/core/client/cfg.go index 29b44090..12d42c83 100644 --- a/core/client/cfg.go +++ b/core/client/cfg.go @@ -6,18 +6,14 @@ import ( "errors" "fmt" "net/url" - "regexp" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" ) -var _httpsScheme = regexp.MustCompile(`^https?://`) -var _httpScheme = regexp.MustCompile(`^http?://`) - type Cfg struct { - Address string // Remote address, "localhost:19530". + Host string // Remote address, "localhost:19530". EnableTLS bool // Enable TLS for connection. DialOpts []grpc.DialOption @@ -25,7 +21,7 @@ type Cfg struct { Password string // Password for auth. } -func (cfg *Cfg) parseAuth() string { +func (cfg *Cfg) parseGrpcAuth() string { if cfg.Username != "" || cfg.Password != "" { value := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%s:%s", cfg.Username, cfg.Password))) return value @@ -35,25 +31,19 @@ func (cfg *Cfg) parseAuth() string { } func (cfg *Cfg) parseGrpc() (*url.URL, []grpc.DialOption, error) { - var opts []grpc.DialOption - address := cfg.Address - if !_httpsScheme.MatchString(address) { - address = fmt.Sprintf("tcp://%s", address) - } - remoteURL, err := url.Parse(address) - if err != nil { - return nil, nil, errors.New("milvus address parse fail") - } + remoteURL := &url.URL{Host: cfg.Host} // Remote Host should never be empty. if remoteURL.Host == "" { - return nil, nil, errors.New("empty remote host of milvus address") + return nil, nil, errors.New("client: empty remote host of milvus address") } - if remoteURL.Scheme == "https" || cfg.EnableTLS { + opts := cfg.DialOpts + if cfg.EnableTLS { opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))) } else { opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) } + if remoteURL.Port() == "" && cfg.EnableTLS { remoteURL.Host += ":443" } @@ -61,15 +51,21 @@ func (cfg *Cfg) parseGrpc() (*url.URL, []grpc.DialOption, error) { return remoteURL, opts, nil } -func (cfg *Cfg) parseRestful() (*url.URL, error) { - address := cfg.Address - if !_httpsScheme.MatchString(address) && !_httpScheme.MatchString(address) { - if cfg.EnableTLS { - address = fmt.Sprintf("https://%s", address) - } else { - address = fmt.Sprintf("http://%s", address) - } +func (cfg *Cfg) parseRestfulAuth() string { + if cfg.Username != "" || cfg.Password != "" { + return fmt.Sprintf("%s:%s", cfg.Username, cfg.Password) + } + + return "" +} + +func (cfg *Cfg) parseRestful() *url.URL { + u := &url.URL{Host: cfg.Host} + if cfg.EnableTLS { + u.Scheme = "https" + } else { + u.Scheme = "http" } - return url.Parse(address) + return u } diff --git a/core/client/cfg_test.go b/core/client/cfg_test.go new file mode 100644 index 00000000..c3f7ec3d --- /dev/null +++ b/core/client/cfg_test.go @@ -0,0 +1,49 @@ +package client + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCfg_ParseAuth(t *testing.T) { + cfg := &Cfg{Username: "username", Password: "password"} + got := cfg.parseGrpcAuth() + assert.Equal(t, "dXNlcm5hbWU6cGFzc3dvcmQ=", got) + + cfg = &Cfg{} + got = cfg.parseGrpcAuth() + assert.Equal(t, "", got) +} + +func TestCfg_ParseGrpc(t *testing.T) { + cfg := &Cfg{Host: ""} + _, _, err := cfg.parseGrpc() + assert.Error(t, err) + + cfg = &Cfg{Host: "localhost:19530"} + u, _, err := cfg.parseGrpc() + assert.NoError(t, err) + assert.Equal(t, "localhost:19530", u.Host) + + cfg = &Cfg{Host: "localhost", EnableTLS: true} + u, _, err = cfg.parseGrpc() + assert.NoError(t, err) + assert.Equal(t, "localhost:443", u.Host) +} + +func TestCfg_ParseRestful(t *testing.T) { + cfg := &Cfg{Host: "localhost:19530", EnableTLS: false} + u := cfg.parseRestful() + assert.Equal(t, "http", u.Scheme) + + cfg = &Cfg{Host: "localhost:19530", EnableTLS: true} + u = cfg.parseRestful() + assert.Equal(t, "https", u.Scheme) +} + +func TestCfg_ParseRestfulAuth(t *testing.T) { + cfg := &Cfg{Username: "username", Password: "password"} + got := cfg.parseRestfulAuth() + assert.Equal(t, "username:password", got) +} diff --git a/core/client/grpc.go b/core/client/grpc.go index 7dd4e431..0c938855 100644 --- a/core/client/grpc.go +++ b/core/client/grpc.go @@ -9,16 +9,15 @@ import ( "strconv" "time" - grpcretry "github.com/grpc-ecosystem/go-grpc-middleware/retry" - "google.golang.org/grpc/backoff" - "google.golang.org/grpc/keepalive" - "github.com/golang/protobuf/proto" + grpcretry "github.com/grpc-ecosystem/go-grpc-middleware/retry" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "google.golang.org/grpc" + "google.golang.org/grpc/backoff" "google.golang.org/grpc/codes" + "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" @@ -120,11 +119,12 @@ type GrpcClient struct { conn *grpc.ClientConn srv milvuspb.MilvusServiceClient - auth string + auth string + + // get from connect serverVersion string identifier string - - flags uint64 + flags uint64 } func NewGrpc(cfg *Cfg) (*GrpcClient, error) { @@ -132,10 +132,9 @@ func NewGrpc(cfg *Cfg) (*GrpcClient, error) { if err != nil { return nil, fmt.Errorf("client: parse address failed: %w", err) } - auth := cfg.parseAuth() + auth := cfg.parseGrpcAuth() opts = append(opts, defaultDialOpt()...) - conn, err := grpc.NewClient(addr.Host, opts...) if err != nil { return nil, fmt.Errorf("client: create grpc client failed: %w", err) diff --git a/core/client/grpc_test.go b/core/client/grpc_test.go new file mode 100644 index 00000000..bd09656a --- /dev/null +++ b/core/client/grpc_test.go @@ -0,0 +1,14 @@ +package client + +import ( + "testing" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/stretchr/testify/assert" +) + +func TestOk(t *testing.T) { + assert.True(t, ok(&commonpb.Status{Code: 0})) + + assert.False(t, ok(&commonpb.Status{Code: 1})) +} diff --git a/core/client/restful.go b/core/client/restful.go index f889761b..c4b1ce5d 100644 --- a/core/client/restful.go +++ b/core/client/restful.go @@ -154,12 +154,12 @@ func (r *RestfulClient) GetBulkInsertState(ctx context.Context, dbName, jobID st } func NewRestful(cfg *Cfg) (*RestfulClient, error) { - baseURL, err := cfg.parseRestful() - if err != nil { - return nil, fmt.Errorf("client: failed to parse restful address: %w", err) + baseURL := cfg.parseRestful() + cli := req.C().SetBaseURL(baseURL.String()) + + if auth := cfg.parseRestfulAuth(); len(auth) != 0 { + cli.SetCommonBearerAuthToken(auth) } - cli := req.C(). - SetBaseURL(baseURL.String()). - SetCommonBearerAuthToken(fmt.Sprintf("%s:%s", cfg.Username, cfg.Password)) + return &RestfulClient{cli: cli}, nil } diff --git a/core/paramtable/base_table_test.go b/core/paramtable/base_table_test.go deleted file mode 100644 index ccd56730..00000000 --- a/core/paramtable/base_table_test.go +++ /dev/null @@ -1,52 +0,0 @@ -package paramtable - -import ( - "os" - "testing" - - "github.com/stretchr/testify/assert" - - memkv "github.com/zilliztech/milvus-backup/internal/kv/mem" -) - -func TestParseDataSizeWithDefault(t *testing.T) { - base := &BaseTable{ - params: memkv.NewMemoryKV(), - } - base.Init() - sizeKey := "sizeKey" - - size2g, err := base.ParseDataSizeWithDefault(sizeKey, "2g") - assert.NoError(t, err) - assert.Equal(t, size2g, int64(2*1024*1024*1024)) - - size2G, err := base.ParseDataSizeWithDefault(sizeKey, "2g") - assert.NoError(t, err) - assert.Equal(t, size2G, int64(2*1024*1024*1024)) - - size3M, err := base.ParseDataSizeWithDefault(sizeKey, "3Mb") - assert.NoError(t, err) - assert.Equal(t, size3M, int64(3*1024*1024)) - - size3m, err := base.ParseDataSizeWithDefault(sizeKey, "3M") - assert.NoError(t, err) - assert.Equal(t, size3m, int64(3*1024*1024)) - - size1024k, err := base.ParseDataSizeWithDefault(sizeKey, "1024k") - assert.NoError(t, err) - assert.Equal(t, size1024k, int64(1024*1024)) - - size1024000, err := base.ParseDataSizeWithDefault(sizeKey, "1024000") - assert.NoError(t, err) - assert.Equal(t, size1024000, int64(1024000)) -} - -func TestTryLoadFromEnv(t *testing.T) { - base := &BaseTable{ - params: memkv.NewMemoryKV(), - } - os.Setenv("MILVUS_USER", "Marco") - base.tryLoadFromEnv() - - assert.Equal(t, "Marco", base.params.LoadWithDefault("milvus.user", "")) -} diff --git a/core/paramtable/params_test.go b/core/paramtable/params_test.go deleted file mode 100644 index 77adadd8..00000000 --- a/core/paramtable/params_test.go +++ /dev/null @@ -1,15 +0,0 @@ -package paramtable - -import ( - "testing" -) - -func TestRootPathParams(t *testing.T) { - var params BackupParams - params.GlobalInitWithYaml("backup.yaml") - params.Init() - - //cfg := &MinioConfig{} - //cfg.initRootPath() - println(params.MinioCfg.RootPath) -} diff --git a/core/proto/backuppb/backup.pb.go b/core/proto/backuppb/backup.pb.go index eddeff63..93923fed 100644 --- a/core/proto/backuppb/backup.pb.go +++ b/core/proto/backuppb/backup.pb.go @@ -8,14 +8,15 @@ package backuppb import ( context "context" + reflect "reflect" + sync "sync" + _struct "github.com/golang/protobuf/ptypes/struct" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" - reflect "reflect" - sync "sync" ) const ( diff --git a/internal/common/error.go b/internal/common/error.go deleted file mode 100644 index 9e098234..00000000 --- a/internal/common/error.go +++ /dev/null @@ -1,36 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package common - -type IgnorableError struct { - msg string -} - -func (i *IgnorableError) Error() string { - return i.msg -} - -func NewIgnorableError(err error) error { - return &IgnorableError{ - msg: err.Error(), - } -} - -func IsIgnorableError(err error) bool { - _, ok := err.(*IgnorableError) - return ok -} diff --git a/internal/common/tuple.go b/internal/common/tuple.go deleted file mode 100644 index 85754784..00000000 --- a/internal/common/tuple.go +++ /dev/null @@ -1,5 +0,0 @@ -package common - -type Int64Tuple struct { - Key, Value int64 -} diff --git a/internal/util/funcutil/func.go b/internal/util/funcutil/func.go deleted file mode 100644 index 73bc4d28..00000000 --- a/internal/util/funcutil/func.go +++ /dev/null @@ -1,43 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package funcutil - -import ( - "context" - "errors" - - grpcStatus "google.golang.org/grpc/status" -) - -// CheckCtxValid check if the context is valid -func CheckCtxValid(ctx context.Context) bool { - return ctx.Err() != context.DeadlineExceeded && ctx.Err() != context.Canceled -} - -// IsGrpcErr checks whether err is instance of grpc status error. -func IsGrpcErr(err error) bool { - for { - if err == nil { - return false - } - _, ok := grpcStatus.FromError(err) - if ok { - return true - } - err = errors.Unwrap(err) - } -} diff --git a/internal/util/funcutil/random.go b/internal/util/funcutil/random.go deleted file mode 100644 index f3fde241..00000000 --- a/internal/util/funcutil/random.go +++ /dev/null @@ -1,60 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package funcutil - -import ( - "fmt" - "math/rand" - "time" -) - -var r *rand.Rand - -func init() { - r = rand.New(rand.NewSource(time.Now().UnixNano())) -} - -var letterRunes = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") - -// RandomBytes returns a batch of random string -func RandomBytes(n int) []byte { - b := make([]byte, n) - for i := range b { - b[i] = letterRunes[r.Intn(len(letterRunes))] - } - return b -} - -// RandomString returns a batch of random string -func RandomString(n int) string { - return string(RandomBytes(n)) -} - -// GenRandomBytes generates a random bytes. -func GenRandomBytes() []byte { - l := rand.Uint64()%10 + 1 - b := make([]byte, l) - if _, err := rand.Read(b); err != nil { - return nil - } - return b -} - -// GenRandomStr generates a random string. -func GenRandomStr() string { - return fmt.Sprintf("%X", GenRandomBytes()) -} diff --git a/internal/util/grpcclient/client.go b/internal/util/grpcclient/client.go deleted file mode 100644 index 8bf0844c..00000000 --- a/internal/util/grpcclient/client.go +++ /dev/null @@ -1,269 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcclient - -import ( - "context" - "fmt" - "sync" - "time" - - "google.golang.org/grpc/backoff" - - "go.uber.org/zap" - "google.golang.org/grpc" - "google.golang.org/grpc/keepalive" - - "github.com/zilliztech/milvus-backup/internal/log" - "github.com/zilliztech/milvus-backup/internal/util/funcutil" -) - -// GrpcClient abstracts client of grpc -type GrpcClient interface { - SetRole(string) - GetRole() string - SetGetAddrFunc(func() (string, error)) - SetNewGrpcClientFunc(func(cc *grpc.ClientConn) interface{}) - GetGrpcClient(ctx context.Context) (interface{}, error) - ReCall(ctx context.Context, caller func(client interface{}) (interface{}, error)) (interface{}, error) - Call(ctx context.Context, caller func(client interface{}) (interface{}, error)) (interface{}, error) - Close() error -} - -// ClientBase is a base of grpc client -type ClientBase struct { - getAddrFunc func() (string, error) - newGrpcClient func(cc *grpc.ClientConn) interface{} - - grpcClient interface{} - conn *grpc.ClientConn - grpcClientMtx sync.RWMutex - role string - ClientMaxSendSize int - ClientMaxRecvSize int - RetryServiceNameConfig string - - DialTimeout time.Duration - KeepAliveTime time.Duration - KeepAliveTimeout time.Duration - - MaxAttempts int - InitialBackoff float32 - MaxBackoff float32 - BackoffMultiplier float32 -} - -// SetRole sets role of client -func (c *ClientBase) SetRole(role string) { - c.role = role -} - -// GetRole returns role of client -func (c *ClientBase) GetRole() string { - return c.role -} - -// SetGetAddrFunc sets getAddrFunc of client -func (c *ClientBase) SetGetAddrFunc(f func() (string, error)) { - c.getAddrFunc = f -} - -// SetNewGrpcClientFunc sets newGrpcClient of client -func (c *ClientBase) SetNewGrpcClientFunc(f func(cc *grpc.ClientConn) interface{}) { - c.newGrpcClient = f -} - -// GetGrpcClient returns grpc client -func (c *ClientBase) GetGrpcClient(ctx context.Context) (interface{}, error) { - c.grpcClientMtx.RLock() - - if c.grpcClient != nil { - defer c.grpcClientMtx.RUnlock() - return c.grpcClient, nil - } - c.grpcClientMtx.RUnlock() - - c.grpcClientMtx.Lock() - defer c.grpcClientMtx.Unlock() - - if c.grpcClient != nil { - return c.grpcClient, nil - } - - err := c.connect(ctx) - if err != nil { - return nil, err - } - - return c.grpcClient, nil -} - -func (c *ClientBase) resetConnection(client interface{}) { - c.grpcClientMtx.Lock() - defer c.grpcClientMtx.Unlock() - if c.grpcClient == nil { - return - } - - if client != c.grpcClient { - return - } - if c.conn != nil { - _ = c.conn.Close() - } - c.conn = nil - c.grpcClient = nil -} - -func (c *ClientBase) connect(ctx context.Context) error { - addr, err := c.getAddrFunc() - if err != nil { - log.Error("failed to get client address", zap.Error(err)) - return err - } - - //opts := trace.GetInterceptorOpts() - dialContext, cancel := context.WithTimeout(ctx, c.DialTimeout) - - // refer to https://github.com/grpc/grpc-proto/blob/master/grpc/service_config/service_config.proto - retryPolicy := fmt.Sprintf(`{ - "methodConfig": [{ - "name": [{"service": "%s"}], - "retryPolicy": { - "MaxAttempts": %d, - "InitialBackoff": "%fs", - "MaxBackoff": "%fs", - "BackoffMultiplier": %f, - "RetryableStatusCodes": [ "UNAVAILABLE" ] - } - }]}`, c.RetryServiceNameConfig, c.MaxAttempts, c.InitialBackoff, c.MaxBackoff, c.BackoffMultiplier) - - conn, err := grpc.DialContext( - dialContext, - addr, - grpc.WithInsecure(), - grpc.WithBlock(), - grpc.WithDefaultCallOptions( - grpc.MaxCallRecvMsgSize(c.ClientMaxRecvSize), - grpc.MaxCallSendMsgSize(c.ClientMaxSendSize), - ), - //grpc.WithUnaryInterceptor(grpcopentracing.UnaryClientInterceptor(opts...)), - //grpc.WithStreamInterceptor(grpcopentracing.StreamClientInterceptor(opts...)), - grpc.WithDefaultServiceConfig(retryPolicy), - grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: c.KeepAliveTime, - Timeout: c.KeepAliveTimeout, - PermitWithoutStream: true, - }), - grpc.WithConnectParams(grpc.ConnectParams{ - Backoff: backoff.Config{ - BaseDelay: 100 * time.Millisecond, - Multiplier: 1.6, - Jitter: 0.2, - MaxDelay: 3 * time.Second, - }, - MinConnectTimeout: c.DialTimeout, - }), - //grpc.WithPerRPCCredentials(&Token{Value: crypto.Base64Encode(util.MemberCredID)}), - ) - cancel() - if err != nil { - return wrapErrConnect(addr, err) - } - if c.conn != nil { - _ = c.conn.Close() - } - - c.conn = conn - c.grpcClient = c.newGrpcClient(c.conn) - return nil -} - -func (c *ClientBase) callOnce(ctx context.Context, caller func(client interface{}) (interface{}, error)) (interface{}, error) { - client, err := c.GetGrpcClient(ctx) - if err != nil { - return nil, err - } - - ret, err2 := caller(client) - if err2 == nil { - return ret, nil - } - - if !funcutil.CheckCtxValid(ctx) { - return nil, err2 - } - if !funcutil.IsGrpcErr(err2) { - log.Debug("ClientBase:isNotGrpcErr", zap.Error(err2)) - return nil, err2 - } - log.Debug(c.GetRole()+" ClientBase grpc error, start to reset connection", zap.Error(err2)) - c.resetConnection(client) - return ret, err2 -} - -// Call does a grpc call -func (c *ClientBase) Call(ctx context.Context, caller func(client interface{}) (interface{}, error)) (interface{}, error) { - if !funcutil.CheckCtxValid(ctx) { - return nil, ctx.Err() - } - - ret, err := c.callOnce(ctx, caller) - if err != nil { - //traceErr := fmt.Errorf("err: %w\n, %s", err, trace.StackTrace()) - //log.Error("ClientBase Call grpc first call get error", zap.String("role", c.GetRole()), zap.Error(traceErr)) - return nil, err - } - return ret, err -} - -// ReCall does the grpc call twice -func (c *ClientBase) ReCall(ctx context.Context, caller func(client interface{}) (interface{}, error)) (interface{}, error) { - if !funcutil.CheckCtxValid(ctx) { - return nil, ctx.Err() - } - - ret, err := c.callOnce(ctx, caller) - if err == nil { - return ret, nil - } - - //traceErr := fmt.Errorf("err: %w\n, %s", err, trace.StackTrace()) - //log.Warn(c.GetRole()+" ClientBase ReCall grpc first call get error ", zap.Error(traceErr)) - - if !funcutil.CheckCtxValid(ctx) { - return nil, ctx.Err() - } - - ret, err = c.callOnce(ctx, caller) - if err != nil { - //traceErr = fmt.Errorf("err: %w\n, %s", err, trace.StackTrace()) - //log.Error("ClientBase ReCall grpc second call get error", zap.String("role", c.GetRole()), zap.Error(traceErr)) - return nil, err - } - return ret, err -} - -// Close close the client connection -func (c *ClientBase) Close() error { - c.grpcClientMtx.Lock() - defer c.grpcClientMtx.Unlock() - if c.conn != nil { - return c.conn.Close() - } - return nil -} diff --git a/internal/util/grpcclient/errors.go b/internal/util/grpcclient/errors.go deleted file mode 100644 index 5f831e6f..00000000 --- a/internal/util/grpcclient/errors.go +++ /dev/null @@ -1,50 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcclient - -import ( - "errors" - "fmt" -) - -// ErrConnect is the instance for errors.Is target usage. -var ErrConnect errConnect - -// make sure ErrConnect implements error. -var _ error = errConnect{} - -// errConnect error instance returned when dial error returned. -type errConnect struct { - addr string - err error -} - -// Error implements error interface. -func (e errConnect) Error() string { - return fmt.Sprintf("failed to connect %s, reason: %s", e.addr, e.err.Error()) -} - -// Is checks err is ErrConnect to make errors.Is work. -func (e errConnect) Is(err error) bool { - var ce errConnect - return errors.As(err, &ce) -} - -// wrapErrConnect wrap connection error and related address to ErrConnect. -func wrapErrConnect(addr string, err error) error { - return errConnect{addr: addr, err: err} -}