Skip to content

Commit

Permalink
Use json decoder instead of scanner when reading from document storage (
Browse files Browse the repository at this point in the history
  • Loading branch information
alishakawaguchi authored Oct 23, 2024
1 parent 4afdf98 commit 82f237a
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 46 deletions.
33 changes: 14 additions & 19 deletions backend/internal/gcp/client.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package neosync_gcp

import (
"bufio"
"compress/gzip"
"context"
"encoding/json"
Expand Down Expand Up @@ -235,16 +234,12 @@ func getFirstRecordFromReader(reader io.Reader) (map[string]any, error) {
}
defer gzipReader.Close()

scanner := bufio.NewScanner(gzipReader)
if scanner.Scan() {
line := scanner.Text()
if err := json.Unmarshal([]byte(line), &result); err != nil {
return nil, fmt.Errorf("failed to unmarshal JSON: %w", err)
}
}

if err := scanner.Err(); err != nil {
return nil, fmt.Errorf("error reading from gzip reader: %w", err)
decoder := json.NewDecoder(gzipReader)
err = decoder.Decode(&result)
if err != nil && err == io.EOF {
return result, nil
} else if err != nil {
return nil, err
}

return result, nil
Expand All @@ -257,13 +252,16 @@ func streamRecordsFromReader(reader io.Reader, onRecord func(record map[string][
}
defer gzipReader.Close()

scanner := bufio.NewScanner(gzipReader)
if scanner.Scan() {
line := scanner.Bytes()
decoder := json.NewDecoder(gzipReader)
for {
var result map[string]any
if err := json.Unmarshal(line, &result); err != nil {
return fmt.Errorf("failed to unmarshal JSON: %w", err)
err = decoder.Decode(&result)
if err != nil && err == io.EOF {
break // End of file, stop the loop
} else if err != nil {
return err
}

record, err := valToRecord(result)
if err != nil {
return fmt.Errorf("unable to convert record from map[string]any to map[string][]byte: %w", err)
Expand All @@ -273,9 +271,6 @@ func streamRecordsFromReader(reader io.Reader, onRecord func(record map[string][
return err
}
}
if err := scanner.Err(); err != nil {
return fmt.Errorf("error reading from gzip reader: %w", err)
}

return nil
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package v1alpha1_connectiondataservice

import (
"bufio"
"compress/gzip"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"sort"
"strings"
Expand Down Expand Up @@ -283,27 +283,30 @@ func (s *Service) GetConnectionDataStream(
return fmt.Errorf("error creating gzip reader: %w", err)
}

scanner := bufio.NewScanner(gzr)
for scanner.Scan() {
line := scanner.Bytes()
decoder := json.NewDecoder(gzr)
for {
var data map[string]any
err = json.Unmarshal(line, &data)
if err != nil {

// Decode the next JSON object
err = decoder.Decode(&data)
if err != nil && err == io.EOF {
break // End of file, stop the loop
} else if err != nil {
result.Body.Close()
gzr.Close()
return err
}

rowMap := make(map[string][]byte)
for key, value := range data {
var byteValue []byte
if str, ok := value.(string); ok {
switch v := value.(type) {
case string:
// try converting string directly to []byte
// prevents quoted strings
byteValue = []byte(str)
} else {
byteValue = []byte(v)
default:
// if not a string use JSON encoding
byteValue, err = json.Marshal(value)
byteValue, err = json.Marshal(v)
if err != nil {
result.Body.Close()
gzr.Close()
Expand All @@ -321,11 +324,6 @@ func (s *Service) GetConnectionDataStream(
return err
}
}
if err := scanner.Err(); err != nil {
result.Body.Close()
gzr.Close()
return err
}
result.Body.Close()
gzr.Close()
}
Expand Down Expand Up @@ -649,12 +647,14 @@ func (s *Service) GetConnectionSchema(
return nil, fmt.Errorf("error creating gzip reader: %w", err)
}

scanner := bufio.NewScanner(gzr)
if scanner.Scan() {
line := scanner.Bytes()
decoder := json.NewDecoder(gzr)
for {
var data map[string]any
err = json.Unmarshal(line, &data)
if err != nil {
// Decode the next JSON object
err = decoder.Decode(&data)
if err != nil && err == io.EOF {
break // End of file, stop the loop
} else if err != nil {
result.Body.Close()
gzr.Close()
return nil, err
Expand All @@ -667,11 +667,7 @@ func (s *Service) GetConnectionSchema(
Column: key,
})
}
}
if err := scanner.Err(); err != nil {
result.Body.Close()
gzr.Close()
return nil, err
break // Only care about first record
}
result.Body.Close()
gzr.Close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (s *IntegrationTestSuite) Test_TransformersService_GetTransformPiiRecognize

t.Run("ok", func(t *testing.T) {
allowed := []string{"foo", "bar"}
s.mocks.presidio.entities.On("GetSupportedEntitiesWithResponse", mock.Anything, mock.Anything).
s.mocks.presidio.entities.On("GetSupportedentitiesWithResponse", mock.Anything, mock.Anything).
Once().
Return(&presidioapi.GetSupportedentitiesResponse{
JSON200: &allowed,
Expand Down

0 comments on commit 82f237a

Please sign in to comment.