Skip to content
This repository has been archived by the owner on Oct 30, 2024. It is now read-only.

Commit

Permalink
Feat: Improve ingestion speed by running more routinues
Browse files Browse the repository at this point in the history
Signed-off-by: Daishan Peng <[email protected]>
  • Loading branch information
StrongMonkey committed Jun 17, 2024
1 parent d6593be commit 641c494
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 44 deletions.
28 changes: 14 additions & 14 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.22.0

replace (
github.com/philippgille/chromem-go => github.com/iwilltry42/chromem-go v0.0.0-20240513080122-88f1efa639f5 // Azure OpenAI support
github.com/tmc/langchaingo => github.com/iwilltry42/langchaingo v0.0.0-20240516095223-8cf46ac74799 // Context-Aware Markdown Splitting
github.com/tmc/langchaingo => ../langchaingo // Context-Aware Markdown Splitting
)

require (
Expand Down Expand Up @@ -37,21 +37,21 @@ require (
)

require (
cloud.google.com/go/ai v0.4.0 // indirect
cloud.google.com/go/ai v0.5.0 // indirect
github.com/AssemblyAI/assemblyai-go-sdk v1.3.0 // indirect
github.com/EndFirstCorp/peekingReader v0.0.0-20171012052444-257fb6f1a1a6 // indirect
github.com/KyleBanks/depth v1.2.1 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/ProtonMail/go-crypto v1.0.0 // indirect
github.com/andybalholm/cascadia v1.3.2 // indirect
github.com/avast/retry-go v3.0.0+incompatible // indirect
github.com/aws/aws-sdk-go-v2 v1.26.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.4 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.4 // indirect
github.com/aws/aws-sdk-go-v2/service/bedrockruntime v1.7.3 // indirect
github.com/aws/aws-sdk-go-v2 v1.26.1 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5 // indirect
github.com/aws/aws-sdk-go-v2/service/bedrockruntime v1.8.1 // indirect
github.com/aws/aws-sdk-go-v2/service/textract v1.30.4 // indirect
github.com/aws/smithy-go v1.20.1 // indirect
github.com/aws/smithy-go v1.20.2 // indirect
github.com/aymerick/douceur v0.2.0 // indirect
github.com/bytedance/sonic v1.11.3 // indirect
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
Expand Down Expand Up @@ -92,7 +92,7 @@ require (
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kevinburke/ssh_config v1.2.0 // indirect
github.com/klauspost/compress v1.17.2 // indirect
github.com/klauspost/compress v1.17.6 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/ledongthuc/pdf v0.0.0-20240201131950-da5b75280b06 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
Expand Down Expand Up @@ -139,11 +139,11 @@ require (
golang.org/x/sys v0.20.0 // indirect
golang.org/x/tools v0.20.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/api v0.176.1 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240415180920-8c6c420018be // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be // indirect
google.golang.org/grpc v1.63.2 // indirect
google.golang.org/protobuf v1.33.0 // indirect
google.golang.org/api v0.180.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240506185236-b8a5c65736ae // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240506185236-b8a5c65736ae // indirect
google.golang.org/grpc v1.64.0 // indirect
google.golang.org/protobuf v1.34.1 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
modernc.org/libc v1.22.5 // indirect
Expand Down
13 changes: 13 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
cloud.google.com/go v0.112.1 h1:uJSeirPke5UNZHIb4SxfZklVSiWWVqW4oXlETwZziwM=
cloud.google.com/go/ai v0.4.0 h1:hoF8+joXKfW2Ug7MKssoffXCVUSxUqMUJL0hJxVtO1Q=
cloud.google.com/go/ai v0.4.0/go.mod h1:iX72tmUodGXVDxRDCGUZEPiB9HaMeERXkOdgCkUi8sA=
cloud.google.com/go/ai v0.5.0/go.mod h1:96VBphk70e0zdXZrbtgPuKYRZsQ3UktSUXhuojwiKA8=
cloud.google.com/go/aiplatform v1.66.0 h1:bbFYY4JInclG10czRFUYj2rjD+obhh3Gi9zVlyoMgEc=
cloud.google.com/go/aiplatform v1.66.0/go.mod h1:bPQS0UjaXaTAq57UgP3XWDCtYFOIbXXpkMsl6uP4JAc=
cloud.google.com/go/longrunning v0.5.6 h1:xAe8+0YaWoCKr9t1+aWe+OeQgN/iJK1fEgZSXmjuEaE=
Expand Down Expand Up @@ -38,20 +39,26 @@ github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHS
github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY=
github.com/aws/aws-sdk-go-v2 v1.26.0 h1:/Ce4OCiM3EkpW7Y+xUnfAFpchU78K7/Ug01sZni9PgA=
github.com/aws/aws-sdk-go-v2 v1.26.0/go.mod h1:35hUlJVYd+M++iLI3ALmVwMOyRYMmRqUXpTtRGW+K9I=
github.com/aws/aws-sdk-go-v2 v1.26.1/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1 h1:gTK2uhtAPtFcdRRJilZPx8uJLL2J85xK11nKtWL0wfU=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1/go.mod h1:sxpLb+nZk7tIfCWChfd+h4QwHNUR57d8hA1cleTkjJo=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2/go.mod h1:lPprDr1e6cJdyYeGXnRaJoP4Md+cDBvi2eOj00BlGmg=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.4 h1:0ScVK/4qZ8CIW0k8jOeFVsyS/sAiXpYxRBLolMkuLQM=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.4/go.mod h1:84KyjNZdHC6QZW08nfHI6yZgPd+qRgaWcYsyLUo3QY8=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5/go.mod h1:FSaRudD0dXiMPK2UjknVwwTYyZMRsHv3TtkabsZih5I=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.4 h1:sHmMWWX5E7guWEFQ9SVo6A3S4xpPrWnd77a6y4WM6PU=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.4/go.mod h1:WjpDrhWisWOIoS9n3nk67A3Ll1vfULJ9Kq6h29HTD48=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5/go.mod h1:jU1li6RFryMz+so64PpKtudI+QzbKoIEivqdf6LNpOc=
github.com/aws/aws-sdk-go-v2/service/bedrockruntime v1.7.3 h1:Ch31Jl96ULFDPPe7nLHnxmImjO9I8bjlPAECb1Wrx6E=
github.com/aws/aws-sdk-go-v2/service/bedrockruntime v1.7.3/go.mod h1:yHTz9jyvT5dT3sUHovWTWjv332k7FvD8MPk7xWEkbnQ=
github.com/aws/aws-sdk-go-v2/service/bedrockruntime v1.8.1/go.mod h1:nZspkhg+9p8iApLFoyAqfyuMP0F38acy2Hm3r5r95Cg=
github.com/aws/aws-sdk-go-v2/service/sagemakerruntime v1.27.3 h1:bvRsyhuEYCvyg3i89U0gRqdoJqtRoH5RK1hwOmmCCsM=
github.com/aws/aws-sdk-go-v2/service/sagemakerruntime v1.27.3/go.mod h1:ljK0mx70pj1+eBVh2tyJM+VzZooiZZgQLMIT+PHJSho=
github.com/aws/aws-sdk-go-v2/service/textract v1.30.4 h1:rz8lRkt7F0iKCGRbpL3NwbqGKUyX6rPrw1Nk1bO9tKc=
github.com/aws/aws-sdk-go-v2/service/textract v1.30.4/go.mod h1:PFUGOzwfe10atRNGR9abVqiKPAPSfKgu/LF0oDBflsY=
github.com/aws/smithy-go v1.20.1 h1:4SZlSlMr36UEqC7XOyRVb27XMeZubNcBNN+9IgEPIQw=
github.com/aws/smithy-go v1.20.1/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E=
github.com/aws/smithy-go v1.20.2/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E=
github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk=
github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4=
github.com/bwesterb/go-ristretto v1.2.3/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0=
Expand Down Expand Up @@ -200,6 +207,7 @@ github.com/kevinburke/ssh_config v1.2.0/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF
github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.6/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM=
github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
Expand Down Expand Up @@ -437,16 +445,21 @@ golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSm
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90=
google.golang.org/api v0.176.1 h1:DJSXnV6An+NhJ1J+GWtoF2nHEuqB1VNoTfnIbjNvwD4=
google.golang.org/api v0.176.1/go.mod h1:j2MaSDYcvYV1lkZ1+SMW4IeF90SrEyFA+tluDYWRrFg=
google.golang.org/api v0.180.0/go.mod h1:51AiyoEg1MJPSZ9zvklA8VnRILPXxn1iVen9v25XHAE=
google.golang.org/genproto v0.0.0-20240325203815-454cdb8f5daa h1:ePqxpG3LVx+feAUOx8YmR5T7rc0rdzK8DyxM8cQ9zq0=
google.golang.org/genproto v0.0.0-20240325203815-454cdb8f5daa/go.mod h1:CnZenrTdRJb7jc+jOm0Rkywq+9wh0QC4U8tyiRbEPPM=
google.golang.org/genproto/googleapis/api v0.0.0-20240415180920-8c6c420018be h1:Zz7rLWqp0ApfsR/l7+zSHhY3PMiH2xqgxlfYfAfNpoU=
google.golang.org/genproto/googleapis/api v0.0.0-20240415180920-8c6c420018be/go.mod h1:dvdCTIoAGbkWbcIKBniID56/7XHTt6WfxXNMxuziJ+w=
google.golang.org/genproto/googleapis/api v0.0.0-20240506185236-b8a5c65736ae/go.mod h1:FfiGhwUm6CJviekPrc0oJ+7h29e+DmWU6UtjX0ZvI7Y=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be h1:LG9vZxsWGOmUKieR8wPAUR3u3MpnYFQZROPIMaXh7/A=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240506185236-b8a5c65736ae/go.mod h1:I7Y+G38R2bu5j1aLzfFmQfTcU/WnFuqDwLZAbvKTKpM=
google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM=
google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA=
google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
75 changes: 47 additions & 28 deletions pkg/datastore/documentloader/pdf.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,20 @@ import (
"context"
"io"
"strings"
"sync"

md "github.com/JohannesKaufmann/html-to-markdown"
"github.com/PuerkitoBio/goquery"
"github.com/gen2brain/go-fitz"
"github.com/gptscript-ai/knowledge/pkg/datastore/types"
"github.com/gptscript-ai/knowledge/pkg/env"
vs "github.com/gptscript-ai/knowledge/pkg/vectorstore"
"golang.org/x/sync/errgroup"
)

// PDF_LOAD_PARALLEL_THREAD can be set as an environment variable that controls the number of goroutines to load pdf pages
const PDF_LOAD_PARALLEL_THREAD = "PDF_LOAD_PARALLEL_THREAD"

// Compile time check to ensure PDF satisfies the DocumentLoader interface.
var _ types.DocumentLoader = (*PDF)(nil)

Expand Down Expand Up @@ -41,6 +47,7 @@ type PDF struct {
opts PDFOptions
document *fitz.Document
converter *md.Converter
lock *sync.Mutex
}

// NewPDFFromFile creates a new PDF loader with the given options.
Expand All @@ -67,47 +74,59 @@ func NewPDF(r io.Reader, optFns ...func(o *PDFOptions)) (*PDF, error) {
opts: opts,
document: doc,
converter: converter,
lock: &sync.Mutex{},
}, nil
}

// Load loads the PDF document and returns a slice of vs.Document containing the page contents and metadata.
func (l *PDF) Load(ctx context.Context) ([]vs.Document, error) {
docs := make([]vs.Document, 0, l.document.NumPage())
numPages := l.document.NumPage()

for pageNum := 0; pageNum < l.document.NumPage(); pageNum++ {
g, childCtx := errgroup.WithContext(ctx)
g.SetLimit(env.GetIntFromEnvOrDefault(PDF_LOAD_PARALLEL_THREAD, 100))
for pageNum := 0; pageNum < numPages; pageNum++ {
html, err := l.document.HTML(pageNum, true)
if err != nil {
return nil, err
}

htmlDoc, err := goquery.NewDocumentFromReader(strings.NewReader(html))
if err != nil {
return nil, err
}
htmlDoc.Find("img").Remove()

ret, err := htmlDoc.First().Html()
if err != nil {
return nil, err
}

markdown, err := l.converter.ConvertString(ret)
if err != nil {
return nil, err
}

doc := vs.Document{
Content: strings.TrimSpace(markdown),
Metadata: map[string]any{
"page": pageNum + 1,
"totalPages": l.document.NumPage(),
},
}

docs = append(docs, doc)
g.Go(func() error {
select {
case <-childCtx.Done():
return context.Canceled
default:
htmlDoc, err := goquery.NewDocumentFromReader(strings.NewReader(html))
if err != nil {
return err
}
htmlDoc.Find("img").Remove()

ret, err := htmlDoc.First().Html()
if err != nil {
return err
}

markdown, err := l.converter.ConvertString(ret)
if err != nil {
return err
}

doc := vs.Document{
Content: strings.TrimSpace(markdown),
Metadata: map[string]any{
"page": pageNum + 1,
"totalPages": numPages,
},
}
l.lock.Lock()
docs = append(docs, doc)
l.lock.Unlock()
return nil
}
})
}

return docs, nil
return docs, g.Wait()
}

// LoadAndSplit loads PDF documents from the provided reader and splits them using the specified text splitter.
Expand Down
15 changes: 15 additions & 0 deletions pkg/env/env.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package env

import (
"os"
"strconv"
)

func GetIntFromEnvOrDefault(env string, def int) int {
v, _ := strconv.Atoi(os.Getenv(env))
if v != 0 {
return v
}

return def
}
7 changes: 5 additions & 2 deletions pkg/vectorstore/chromem/chromem.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ import (
"context"
"log/slog"
"maps"
"runtime"
"strconv"

"github.com/google/uuid"
"github.com/gptscript-ai/knowledge/pkg/env"
vs "github.com/gptscript-ai/knowledge/pkg/vectorstore"
"github.com/philippgille/chromem-go"
)

// EMBEDDING_PARALLEL_THREAD can be set as an environment variable to control the number of parallel API calls to create embedding for documents. Default is 100
const EMBEDDING_PARALLEL_THREAD = "EMBEDDING_PARALLEL_THREAD"

type Store struct {
db *chromem.DB
embeddingFunc chromem.EmbeddingFunc
Expand Down Expand Up @@ -58,7 +61,7 @@ func (s *Store) AddDocuments(ctx context.Context, docs []vs.Document, collection
return nil, vs.ErrCollectionNotFound{Collection: collection}
}

err := col.AddDocuments(ctx, chromemDocs, runtime.NumCPU()/2)
err := col.AddDocuments(ctx, chromemDocs, env.GetIntFromEnvOrDefault(EMBEDDING_PARALLEL_THREAD, 100))
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 641c494

Please sign in to comment.