Skip to content
This repository has been archived by the owner on Oct 30, 2024. It is now read-only.

Commit

Permalink
feat(breaking): add structured json log + make filename mandatory for…
Browse files Browse the repository at this point in the history
… ingestion
  • Loading branch information
iwilltry42 committed Sep 13, 2024
1 parent 58ed962 commit 488139e
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 39 deletions.
2 changes: 1 addition & 1 deletion pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type Client interface {
DeleteDataset(ctx context.Context, datasetID string) error
GetDataset(ctx context.Context, datasetID string) (*index.Dataset, error)
ListDatasets(ctx context.Context) ([]types.Dataset, error)
Ingest(ctx context.Context, datasetID string, data []byte, opts datastore.IngestOpts) ([]string, error)
Ingest(ctx context.Context, datasetID string, name string, data []byte, opts datastore.IngestOpts) ([]string, error)
IngestPaths(ctx context.Context, datasetID string, opts *IngestPathsOpts, paths ...string) (int, error) // returns number of files ingested
AskDirectory(ctx context.Context, path string, query string, opts *IngestPathsOpts, ropts *datastore.RetrieveOpts) (*dstypes.RetrievalResponse, error)
PrunePath(ctx context.Context, datasetID string, path string, keep []string) ([]index.File, error)
Expand Down
11 changes: 6 additions & 5 deletions pkg/client/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
dstypes "github.com/gptscript-ai/knowledge/pkg/datastore/types"
"io"
"net/http"
"os"
"path/filepath"
"strings"

"github.com/acorn-io/z"
dstypes "github.com/gptscript-ai/knowledge/pkg/datastore/types"

"github.com/gptscript-ai/knowledge/pkg/datastore"
"github.com/gptscript-ai/knowledge/pkg/index"
"github.com/gptscript-ai/knowledge/pkg/server/types"
Expand Down Expand Up @@ -88,9 +89,9 @@ func (c *DefaultClient) ListDatasets(_ context.Context) ([]types.Dataset, error)
return datasets, nil
}

func (c *DefaultClient) Ingest(_ context.Context, datasetID string, data []byte, opts datastore.IngestOpts) ([]string, error) {
func (c *DefaultClient) Ingest(_ context.Context, datasetID string, name string, data []byte, opts datastore.IngestOpts) ([]string, error) {
payload := types.Ingest{
Filename: opts.Filename,
Filename: z.Pointer(name),
Content: base64.StdEncoding.EncodeToString(data),
}
if opts.FileMetadata != nil {
Expand Down Expand Up @@ -138,8 +139,8 @@ func (c *DefaultClient) IngestPaths(ctx context.Context, datasetID string, opts
return fmt.Errorf("failed to get absolute path for %s: %w", path, err)
}

filename := filepath.Base(path)
payload := datastore.IngestOpts{
Filename: z.Pointer(filepath.Base(path)),
FileMetadata: &index.FileMetadata{
Name: filepath.Base(path),
AbsolutePath: abspath,
Expand All @@ -151,7 +152,7 @@ func (c *DefaultClient) IngestPaths(ctx context.Context, datasetID string, opts
if opts != nil {
payload.TextSplitterOpts = opts.TextSplitterOpts
}
_, err = c.Ingest(ctx, datasetID, content, payload)
_, err = c.Ingest(ctx, datasetID, filename, content, payload)
return err
}

Expand Down
14 changes: 7 additions & 7 deletions pkg/client/standalone.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@ import (
"os"
"path/filepath"

"github.com/gptscript-ai/knowledge/pkg/datastore"
"github.com/gptscript-ai/knowledge/pkg/datastore/documentloader"
dstypes "github.com/gptscript-ai/knowledge/pkg/datastore/types"

"github.com/acorn-io/z"
"github.com/gptscript-ai/knowledge/pkg/datastore"
"github.com/gptscript-ai/knowledge/pkg/index"
"github.com/gptscript-ai/knowledge/pkg/log"
"github.com/gptscript-ai/knowledge/pkg/server/types"
)

Expand Down Expand Up @@ -59,8 +58,8 @@ func (c *StandaloneClient) ListDatasets(ctx context.Context) ([]types.Dataset, e
return r, nil
}

func (c *StandaloneClient) Ingest(ctx context.Context, datasetID string, data []byte, opts datastore.IngestOpts) ([]string, error) {
return c.Datastore.Ingest(ctx, datasetID, data, opts)
func (c *StandaloneClient) Ingest(ctx context.Context, datasetID string, name string, data []byte, opts datastore.IngestOpts) ([]string, error) {
return c.Datastore.Ingest(ctx, datasetID, name, data, opts)
}

func (c *StandaloneClient) IngestPaths(ctx context.Context, datasetID string, opts *IngestPathsOpts, paths ...string) (int, error) {
Expand All @@ -86,8 +85,9 @@ func (c *StandaloneClient) IngestPaths(ctx context.Context, datasetID string, op
return fmt.Errorf("failed to open file %s: %w", path, err)
}

filename := filepath.Base(path)

iopts := datastore.IngestOpts{
Filename: z.Pointer(filepath.Base(path)),
FileMetadata: &index.FileMetadata{
Name: filepath.Base(path),
AbsolutePath: abspath,
Expand All @@ -102,7 +102,7 @@ func (c *StandaloneClient) IngestPaths(ctx context.Context, datasetID string, op
iopts.IngestionFlows = opts.IngestionFlows
}

_, err = c.Ingest(ctx, datasetID, file, iopts)
_, err = c.Ingest(log.ToCtx(ctx, log.FromCtx(ctx).With("filepath", path)), datasetID, filename, file, iopts)

if err != nil && !opts.ErrOnUnsupportedFile && errors.Is(err, &documentloader.UnsupportedFileTypeError{}) {
err = nil
Expand Down
4 changes: 3 additions & 1 deletion pkg/cmd/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"

"github.com/acorn-io/z"
"github.com/gptscript-ai/knowledge/pkg/log"
"github.com/spf13/cobra"

"github.com/gptscript-ai/knowledge/pkg/client"
Expand Down Expand Up @@ -112,7 +113,8 @@ func (s *ClientIngest) Run(cmd *cobra.Command, args []string) error {
slog.Debug("Loaded ingestion flows from config", "flows_file", s.FlowsFile, "dataset", datasetID, "flows", len(ingestOpts.IngestionFlows))
}

filesIngested, err := c.IngestPaths(cmd.Context(), datasetID, ingestOpts, filePath)
ctx := log.ToCtx(cmd.Context(), slog.With("flow", "ingestion").With("rootPath", filePath))
filesIngested, err := c.IngestPaths(ctx, datasetID, ingestOpts, filePath)
if err != nil {
return err
}
Expand Down
13 changes: 12 additions & 1 deletion pkg/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func New() *cobra.Command {

type Knowledge struct {
Debug bool `usage:"Enable debug logging" env:"DEBUG" hidden:"true"`
Json bool `usage:"Output JSON" env:"KNOW_JSON" hidden:"true"`
}

func (c *Knowledge) Run(cmd *cobra.Command, _ []string) error {
Expand All @@ -45,8 +46,18 @@ func (c *Knowledge) Run(cmd *cobra.Command, _ []string) error {

func (c *Knowledge) Customize(cmd *cobra.Command) {
cmd.PersistentPreRun = func(cmd *cobra.Command, _ []string) {
lvl := slog.LevelInfo

if c.Debug {
_ = slog.SetLogLoggerLevel(slog.LevelDebug)
lvl = slog.LevelDebug
slog.SetLogLoggerLevel(lvl)
}

if c.Json {
slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
AddSource: false,
Level: lvl,
})))
}
}
}
Expand Down
42 changes: 25 additions & 17 deletions pkg/datastore/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (

"github.com/gptscript-ai/knowledge/pkg/datastore/documentloader"
"github.com/gptscript-ai/knowledge/pkg/datastore/embeddings"
"github.com/gptscript-ai/knowledge/pkg/log"

"github.com/acorn-io/z"
"github.com/google/uuid"
"github.com/gptscript-ai/knowledge/pkg/datastore/filetypes"
"github.com/gptscript-ai/knowledge/pkg/datastore/textsplitter"
Expand All @@ -19,7 +19,6 @@ import (
)

type IngestOpts struct {
Filename *string
FileMetadata *index.FileMetadata
IsDuplicateFuncName string
IsDuplicateFunc IsDuplicateFunc
Expand All @@ -28,7 +27,14 @@ type IngestOpts struct {
}

// Ingest loads a document from a reader and adds it to the dataset.
func (s *Datastore) Ingest(ctx context.Context, datasetID string, content []byte, opts IngestOpts) ([]string, error) {
func (s *Datastore) Ingest(ctx context.Context, datasetID string, name string, content []byte, opts IngestOpts) ([]string, error) {

if name == "" {
return nil, fmt.Errorf("name is required")
}

statusLog := log.FromCtx(ctx).With("phase", "store")

// Get dataset
ds, err := s.GetDataset(ctx, datasetID)
if err != nil {
Expand Down Expand Up @@ -86,7 +92,7 @@ func (s *Datastore) Ingest(ctx context.Context, datasetID string, content []byte
isDuplicate = opts.IsDuplicateFunc
}

filename := z.Dereference(opts.Filename)
filename := name

// Generate ID
fUUID, err := uuid.NewUUID()
Expand All @@ -105,13 +111,7 @@ func (s *Datastore) Ingest(ctx context.Context, datasetID string, content []byte
return nil, err
}

/*
* Set filename if not provided
*/
if filename == "" {
filename = "<unnamed_document>"
*opts.Filename = filename
}
statusLog = statusLog.With("filename", filename, "filetype", filetype)

slog.Debug("Loading data", "type", filetype, "filename", filename, "size", len(content))

Expand All @@ -120,11 +120,11 @@ func (s *Datastore) Ingest(ctx context.Context, datasetID string, content []byte
*/
isDupe, err := isDuplicate(ctx, s, datasetID, nil, opts)
if err != nil {
slog.Error("Failed to check for duplicates", "error", err)
statusLog.With("status", "failed").Error("Failed to check for duplicates", "error", err)
return nil, fmt.Errorf("failed to check for duplicates: %w", err)
}
if isDupe {
slog.Debug("Ignoring duplicate document", "filename", filename, "absolute_path", opts.FileMetadata.AbsolutePath)
statusLog.With("status", "skipped").With("reason", "duplicate").Info("Ignoring duplicate document")
return nil, nil
}

Expand Down Expand Up @@ -153,24 +153,30 @@ func (s *Datastore) Ingest(ctx context.Context, datasetID string, content []byte

docs, err := ingestionFlow.Run(ctx, bytes.NewReader(content))
if err != nil {
slog.Error("Ingestion Flow failed", "error", err, "filename", filename)
return nil, fmt.Errorf("ingestion Flow failed for file %q: %w", filename, err)
statusLog.With("status", "failed").Error("Ingestion Flow failed", "error", err)
return nil, fmt.Errorf("ingestion flow failed for file %q: %w", filename, err)
}

if len(docs) == 0 {
statusLog.With("status", "skipped").With("reason", "no documents").Debug("No documents loaded")
return nil, nil
}

// Before adding doc, we need to remove the existing documents for duplicates or old contents
statusLog.With("component", "vectorstore").With("action", "remove").Debug("Removing existing documents")
where := map[string]string{
"absPath": opts.FileMetadata.AbsolutePath,
}
if err := s.Vectorstore.RemoveDocument(ctx, "", datasetID, where, nil); err != nil {
statusLog.With("status", "failed").With("component", "vectorstore").Error("Failed to remove existing documents", "error", err)
return nil, err
}

// Add documents to VectorStore -> This generates the embeddings
slog.Debug("Ingesting documents", "count", len(docs))

log.ToCtx(ctx, log.FromCtx(ctx).With("phase", "store").With("num_documents", len(docs)))

docIDs, err := s.Vectorstore.AddDocuments(ctx, docs, datasetID)
if err != nil {
slog.Error("Failed to add documents", "error", err)
Expand Down Expand Up @@ -202,13 +208,15 @@ func (s *Datastore) Ingest(ctx context.Context, datasetID string, content []byte
dbFile.FileMetadata.ModifiedAt = opts.FileMetadata.ModifiedAt
}

iLog := statusLog.With("component", "index")
iLog.Info("Inserting file and documents into index")
tx := s.Index.WithContext(ctx).Create(&dbFile)
if tx.Error != nil {
slog.Error("Failed to create file", "error", tx.Error)
iLog.Error("Failed to create file", "error", tx.Error)
return nil, fmt.Errorf("failed to create file: %w", tx.Error)
}

slog.Info("Ingested document", "filename", filename, "count", len(docIDs), "absolute_path", dbFile.FileMetadata.AbsolutePath)
statusLog.With("status", "completed").Info("Ingested document", "num_documents", len(docIDs), "absolute_path", dbFile.FileMetadata.AbsolutePath)

return docIDs, nil
}
26 changes: 22 additions & 4 deletions pkg/flows/flows.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/acorn-io/z"
"github.com/gptscript-ai/knowledge/pkg/datastore/store"
"github.com/gptscript-ai/knowledge/pkg/log"
"github.com/mitchellh/mapstructure"
"github.com/philippgille/chromem-go"

Expand Down Expand Up @@ -36,11 +37,15 @@ type IngestionFlow struct {

func (f *IngestionFlow) Transform(ctx context.Context, docs []vs.Document) ([]vs.Document, error) {
var err error
for _, t := range f.Transformations {
for i, t := range f.Transformations {
l := log.FromCtx(ctx).With("transformer", t.Name()).With("progress", fmt.Sprintf("%d/%d", i+1, len(f.Transformations))).With("progress_unit", "transformations")
l.Info("Running transformer")
docs, err = t.Transform(ctx, docs)
if err != nil {
l.With("status", "failed").Error("Failed to transform documents", "error", err)
return nil, err
}
l.With("status", "completed").Info("Transformed documents", "num_documents", len(docs))
}
return docs, nil
}
Expand Down Expand Up @@ -91,39 +96,52 @@ func (f *IngestionFlow) Run(ctx context.Context, reader io.Reader) ([]vs.Documen
var err error
var docs []vs.Document

phaseLog := log.FromCtx(ctx).With("phase", "parse")

/*
* Load documents from the content
* For now, we're using documentloaders from both langchaingo and golc
* and translate them to our document schema.
*/

loaderLog := phaseLog.With("stage", "documentloader")
loaderLog.With("status", "starting").Info("Starting document loader")
if f.Load == nil {
loaderLog.With("status", "skipped").With("reason", "missing documentloader").Info("No documentloader available")
return nil, nil
}

docs, err = f.Load(ctx, reader)
if err != nil {
slog.Error("Failed to load documents", "error", err)
loaderLog.With("status", "failed").Error("Failed to load documents", "error", err)
return nil, fmt.Errorf("failed to load documents: %w", err)
}
loaderLog.With("status", "completed").Info("Loaded documents", "num_documents", len(docs))

/*
* Split documents - Chunking
*/
splitterLog := phaseLog.With("stage", "textsplitter").With(slog.Int("num_documents", len(docs)))
splitterLog.With("status", "starting").Info("Starting text splitter")

docs, err = f.Splitter.SplitDocuments(docs)
if err != nil {
slog.Error("Failed to split documents", "error", err)
splitterLog.With("status", "failed").Error("Failed to split documents", "error", err)
return nil, fmt.Errorf("failed to split documents: %w", err)
}
splitterLog.With("status", "completed").Info("Split documents", "new_num_documents", len(docs))

/*
* Transform documents
*/
transformerLog := phaseLog.With("stage", "transformer").With(slog.Int("num_documents", len(docs))).With(slog.Int("num_transformers", len(f.Transformations)))
transformerLog.With("status", "starting").Info("Starting document transformers")
docs, err = f.Transform(ctx, docs)
if err != nil {
slog.Error("Failed to transform documents", "error", err)
transformerLog.With("progress", "failed").Error("Failed to transform documents", "error", err)
return nil, fmt.Errorf("failed to transform documents: %w", err)
}
transformerLog.With("status", "completed").Info("Transformed documents", "new_num_documents", len(docs))

return docs, nil
}
Expand Down
22 changes: 22 additions & 0 deletions pkg/log/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package log

import (
"context"
"log/slog"
)

type contextKey string

const loggerKey = contextKey("logger")

func ToCtx(ctx context.Context, logger *slog.Logger) context.Context {
return context.WithValue(ctx, loggerKey, logger)
}

func FromCtx(ctx context.Context) *slog.Logger {
v := ctx.Value(loggerKey)
if v == nil {
return slog.Default()
}
return v.(*slog.Logger)
}
3 changes: 1 addition & 2 deletions pkg/server/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,7 @@ func (s *Server) IngestIntoDS(c *gin.Context) {

// ingest content
// TODO: support ingestion flows
docIDs, err := s.Ingest(c, id, data, datastore.IngestOpts{
Filename: ingest.Filename,
docIDs, err := s.Ingest(c, id, z.Dereference(ingest.Filename), data, datastore.IngestOpts{
FileMetadata: ingest.FileMetadata,
TextSplitterOpts: ingest.TextSplitterOpts,
})
Expand Down
Loading

0 comments on commit 488139e

Please sign in to comment.