diff --git a/backend/internal/gcp/client.go b/backend/internal/gcp/client.go index f6852b24cd..49a8b8cd30 100644 --- a/backend/internal/gcp/client.go +++ b/backend/internal/gcp/client.go @@ -1,7 +1,6 @@ package neosync_gcp import ( - "bufio" "compress/gzip" "context" "encoding/json" @@ -235,16 +234,12 @@ func getFirstRecordFromReader(reader io.Reader) (map[string]any, error) { } defer gzipReader.Close() - scanner := bufio.NewScanner(gzipReader) - if scanner.Scan() { - line := scanner.Text() - if err := json.Unmarshal([]byte(line), &result); err != nil { - return nil, fmt.Errorf("failed to unmarshal JSON: %w", err) - } - } - - if err := scanner.Err(); err != nil { - return nil, fmt.Errorf("error reading from gzip reader: %w", err) + decoder := json.NewDecoder(gzipReader) + err = decoder.Decode(&result) + if err != nil && err == io.EOF { + return result, nil + } else if err != nil { + return nil, err } return result, nil @@ -257,13 +252,16 @@ func streamRecordsFromReader(reader io.Reader, onRecord func(record map[string][ } defer gzipReader.Close() - scanner := bufio.NewScanner(gzipReader) - if scanner.Scan() { - line := scanner.Bytes() + decoder := json.NewDecoder(gzipReader) + for { var result map[string]any - if err := json.Unmarshal(line, &result); err != nil { - return fmt.Errorf("failed to unmarshal JSON: %w", err) + err = decoder.Decode(&result) + if err != nil && err == io.EOF { + break // End of file, stop the loop + } else if err != nil { + return err } + record, err := valToRecord(result) if err != nil { return fmt.Errorf("unable to convert record from map[string]any to map[string][]byte: %w", err) @@ -273,9 +271,6 @@ func streamRecordsFromReader(reader io.Reader, onRecord func(record map[string][ return err } } - if err := scanner.Err(); err != nil { - return fmt.Errorf("error reading from gzip reader: %w", err) - } return nil } diff --git a/backend/services/mgmt/v1alpha1/connection-data-service/connection-data.go b/backend/services/mgmt/v1alpha1/connection-data-service/connection-data.go index fd357c8592..0901cfb41d 100644 --- a/backend/services/mgmt/v1alpha1/connection-data-service/connection-data.go +++ b/backend/services/mgmt/v1alpha1/connection-data-service/connection-data.go @@ -1,12 +1,12 @@ package v1alpha1_connectiondataservice import ( - "bufio" "compress/gzip" "context" "encoding/json" "errors" "fmt" + "io" "log/slog" "sort" "strings" @@ -283,27 +283,30 @@ func (s *Service) GetConnectionDataStream( return fmt.Errorf("error creating gzip reader: %w", err) } - scanner := bufio.NewScanner(gzr) - for scanner.Scan() { - line := scanner.Bytes() + decoder := json.NewDecoder(gzr) + for { var data map[string]any - err = json.Unmarshal(line, &data) - if err != nil { + + // Decode the next JSON object + err = decoder.Decode(&data) + if err != nil && err == io.EOF { + break // End of file, stop the loop + } else if err != nil { result.Body.Close() gzr.Close() return err } - rowMap := make(map[string][]byte) for key, value := range data { var byteValue []byte - if str, ok := value.(string); ok { + switch v := value.(type) { + case string: // try converting string directly to []byte // prevents quoted strings - byteValue = []byte(str) - } else { + byteValue = []byte(v) + default: // if not a string use JSON encoding - byteValue, err = json.Marshal(value) + byteValue, err = json.Marshal(v) if err != nil { result.Body.Close() gzr.Close() @@ -321,11 +324,6 @@ func (s *Service) GetConnectionDataStream( return err } } - if err := scanner.Err(); err != nil { - result.Body.Close() - gzr.Close() - return err - } result.Body.Close() gzr.Close() } @@ -649,12 +647,14 @@ func (s *Service) GetConnectionSchema( return nil, fmt.Errorf("error creating gzip reader: %w", err) } - scanner := bufio.NewScanner(gzr) - if scanner.Scan() { - line := scanner.Bytes() + decoder := json.NewDecoder(gzr) + for { var data map[string]any - err = json.Unmarshal(line, &data) - if err != nil { + // Decode the next JSON object + err = decoder.Decode(&data) + if err != nil && err == io.EOF { + break // End of file, stop the loop + } else if err != nil { result.Body.Close() gzr.Close() return nil, err @@ -667,11 +667,7 @@ func (s *Service) GetConnectionSchema( Column: key, }) } - } - if err := scanner.Err(); err != nil { - result.Body.Close() - gzr.Close() - return nil, err + break // Only care about first record } result.Body.Close() gzr.Close() diff --git a/backend/services/mgmt/v1alpha1/integration_tests/transformers-service_integration_test.go b/backend/services/mgmt/v1alpha1/integration_tests/transformers-service_integration_test.go index cccdef5904..7e55874a44 100644 --- a/backend/services/mgmt/v1alpha1/integration_tests/transformers-service_integration_test.go +++ b/backend/services/mgmt/v1alpha1/integration_tests/transformers-service_integration_test.go @@ -41,7 +41,7 @@ func (s *IntegrationTestSuite) Test_TransformersService_GetTransformPiiRecognize t.Run("ok", func(t *testing.T) { allowed := []string{"foo", "bar"} - s.mocks.presidio.entities.On("GetSupportedEntitiesWithResponse", mock.Anything, mock.Anything). + s.mocks.presidio.entities.On("GetSupportedentitiesWithResponse", mock.Anything, mock.Anything). Once(). Return(&presidioapi.GetSupportedentitiesResponse{ JSON200: &allowed,