Skip to content
This repository has been archived by the owner on Oct 30, 2024. It is now read-only.

Commit

Permalink
change: by default, do not exit on a failing file ingestion, but make…
Browse files Browse the repository at this point in the history
… sure we log the failed status (#154)
  • Loading branch information
iwilltry42 authored Oct 23, 2024
1 parent c91d365 commit 8a7aea3
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 8 deletions.
1 change: 1 addition & 0 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type IngestPathsOpts struct {
IsDuplicateFuncName string
Prune bool // Prune deleted files
ErrOnUnsupportedFile bool
ExitOnFailedFile bool
}

type Client interface {
Expand Down
14 changes: 10 additions & 4 deletions pkg/client/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,12 @@ func ingestPaths(ctx context.Context, c Client, opts *IngestPathsOpts, datasetID
}
defer sem.Release(1)

ingestedFilesCount++
slog.Debug("Ingesting file", "path", absPath, "metadata", currentMetadata)
return ingestionFunc(sp, currentMetadata.Metadata[filepath.Base(sp)]) // FIXME: metadata
err = ingestionFunc(sp, currentMetadata.Metadata[filepath.Base(sp)]) // FIXME: metadata
if err == nil {
ingestedFilesCount++
}
return err
})
return nil
})
Expand All @@ -178,13 +181,16 @@ func ingestPaths(ctx context.Context, c Client, opts *IngestPathsOpts, datasetID
}
defer sem.Release(1)

ingestedFilesCount++
var fileMetadata FileMetadata
if len(metadataStack) > 0 {
currentMetadata := metadataStack[len(metadataStack)-1]
fileMetadata = currentMetadata.Metadata[filepath.Base(path)]
}
return ingestionFunc(path, fileMetadata)
err = ingestionFunc(path, fileMetadata)
if err == nil {
ingestedFilesCount++
}
return err
})
}

Expand Down
8 changes: 6 additions & 2 deletions pkg/client/standalone.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@ func (c *StandaloneClient) ListDatasets(ctx context.Context) ([]types.Dataset, e
}

func (c *StandaloneClient) Ingest(ctx context.Context, datasetID string, name string, data []byte, opts datastore.IngestOpts) ([]string, error) {
return c.Datastore.Ingest(ctx, datasetID, name, data, opts)
ids, err := c.Datastore.Ingest(ctx, datasetID, name, data, opts)
if err != nil {
log.FromCtx(ctx).With("status", "failed").With("error", err.Error()).Error("Ingest failed")
}
return ids, err
}

func (c *StandaloneClient) IngestPaths(ctx context.Context, datasetID string, opts *IngestPathsOpts, paths ...string) (int, error) {
Expand Down Expand Up @@ -111,7 +115,7 @@ func (c *StandaloneClient) IngestPaths(ctx context.Context, datasetID string, op
iopts.IngestionFlows = opts.IngestionFlows
}

_, err = c.Ingest(log.ToCtx(ctx, log.FromCtx(ctx).With("filepath", path)), datasetID, filename, file, iopts)
_, err = c.Ingest(log.ToCtx(ctx, log.FromCtx(ctx).With("filepath", path).With("absolute_path", iopts.FileMetadata.AbsolutePath)), datasetID, filename, file, iopts)

if err != nil && !opts.ErrOnUnsupportedFile && errors.Is(err, &documentloader.UnsupportedFileTypeError{}) {
err = nil
Expand Down
4 changes: 3 additions & 1 deletion pkg/cmd/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type ClientIngestOpts struct {
NoCreateDataset bool `usage:"Do NOT create the dataset if it doesn't exist" default:"true" env:"KNOW_INGEST_NO_CREATE_DATASET"`
DeduplicationFuncName string `usage:"Name of the deduplication function to use" name:"dedupe-func" env:"KNOW_INGEST_DEDUPE_FUNC"`
ErrOnUnsupportedFile bool `usage:"Error on unsupported file types" default:"false" env:"KNOW_INGEST_ERR_ON_UNSUPPORTED_FILE"`
ExitOnFailedFile bool `usage:"Exit directly on failed file" default:"false" env:"KNOW_INGEST_EXIT_ON_FAILED_FILE"`
}

func (s *ClientIngest) Customize(cmd *cobra.Command) {
Expand Down Expand Up @@ -80,6 +81,7 @@ func (s *ClientIngest) Run(cmd *cobra.Command, args []string) error {
IsDuplicateFuncName: s.DeduplicationFuncName,
Prune: s.Prune,
ErrOnUnsupportedFile: s.ErrOnUnsupportedFile,
ExitOnFailedFile: s.ExitOnFailedFile,
}

if s.FlowsFile != "" {
Expand Down Expand Up @@ -119,7 +121,7 @@ func (s *ClientIngest) Run(cmd *cobra.Command, args []string) error {

filesIngested, err := c.IngestPaths(ctx, datasetID, ingestOpts, filePath)
if err != nil {
return err
return fmt.Errorf("ingested %d files but encountered at least one error: %w", filesIngested, err)
}

fmt.Printf("Ingested %d files from %q into dataset %q (took: %s)\n", filesIngested, filePath, datasetID, time.Since(startTime))
Expand Down
2 changes: 1 addition & 1 deletion pkg/datastore/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (s *Datastore) Ingest(ctx context.Context, datasetID string, name string, c
startTime := time.Now()
docIDs, err := s.Vectorstore.AddDocuments(ctx, docs, datasetID)
if err != nil {
statusLog.With("component", "vectorstore").Error("Failed to add documents", "error", err)
statusLog.With("component", "vectorstore").With("status", "failed").With("error", err.Error()).Error("Failed to add documents")
return nil, fmt.Errorf("failed to add documents from file %q: %w", opts.FileMetadata.AbsolutePath, err)
}
statusLog.Debug("Added documents to vectorstore", "duration", time.Since(startTime))
Expand Down

0 comments on commit 8a7aea3

Please sign in to comment.