diff --git a/pkg/client/common.go b/pkg/client/common.go index 5a9ab53..cef0d45 100644 --- a/pkg/client/common.go +++ b/pkg/client/common.go @@ -1,7 +1,6 @@ package client import ( - "bufio" "context" "crypto/sha1" "encoding/hex" @@ -20,38 +19,7 @@ import ( "golang.org/x/sync/semaphore" ) -func isIgnored(ignore gitignore.Matcher, path string) bool { - return ignore.Match(strings.Split(path, string(filepath.Separator)), false) -} - -func readIgnoreFile(path string) ([]gitignore.Pattern, error) { - stat, err := os.Stat(path) - if err != nil { - return nil, fmt.Errorf("failed to checkout ignore file %q: %w", path, err) - } - - if stat.IsDir() { - return nil, fmt.Errorf("ignore file %q is a directory", path) - } - - var ps []gitignore.Pattern - f, err := os.Open(path) - if err != nil { - return nil, fmt.Errorf("failed to open ignore file %q: %w", path, err) - } - - scanner := bufio.NewScanner(f) - for scanner.Scan() { - s := scanner.Text() - if !strings.HasPrefix(s, "#") && len(strings.TrimSpace(s)) > 0 { - ps = append(ps, gitignore.ParsePattern(s, nil)) - } - } - - return ps, nil -} - -func ingestPaths(ctx context.Context, c Client, opts *IngestPathsOpts, datasetID string, ingestionFunc func(path string) error, paths ...string) (int, error) { +func ingestPaths(ctx context.Context, c Client, opts *IngestPathsOpts, datasetID string, ingestionFunc func(path string, metadata map[string]any) error, paths ...string) (int, error) { ingestedFilesCount := 0 var ignorePatterns []gitignore.Pattern @@ -72,7 +40,7 @@ func ingestPaths(ctx context.Context, c Client, opts *IngestPathsOpts, datasetID } } - slog.Debug("Ignore patterns", "patterns", ignorePatterns, "len", len(ignorePatterns)) + ignorePatterns = append(ignorePatterns, DefaultIgnorePatterns...) ignore := gitignore.NewMatcher(ignorePatterns) @@ -83,6 +51,9 @@ func ingestPaths(ctx context.Context, c Client, opts *IngestPathsOpts, datasetID g, ctx := errgroup.WithContext(ctx) + // Stack to store metadata when entering nested directories + var metadataStack []Metadata + for _, p := range paths { path := p var touchedFilePaths []string @@ -109,6 +80,13 @@ func ingestPaths(ctx context.Context, c Client, opts *IngestPathsOpts, datasetID } if fileInfo.IsDir() { + initialMetadata := &Metadata{Metadata: map[string]FileMetadata{}} + directoryMetadata, err := loadAndMergeMetadata(path, initialMetadata) + if err != nil { + return ingestedFilesCount, err + } + metadataStack = append(metadataStack, *directoryMetadata) + // Process directory err = filepath.WalkDir(path, func(subPath string, d os.DirEntry, err error) error { if err != nil { @@ -121,6 +99,14 @@ func ingestPaths(ctx context.Context, c Client, opts *IngestPathsOpts, datasetID if !opts.Recursive { return filepath.SkipDir // Skip subdirectories if not recursive } + + // One dir level deeper -> load new metadata + parentMetadata := metadataStack[len(metadataStack)-1] + newMetadata, err := loadAndMergeMetadata(subPath, &parentMetadata) + if err != nil { + return err + } + metadataStack = append(metadataStack, *newMetadata) return nil } if isIgnored(ignore, subPath) { @@ -128,12 +114,16 @@ func ingestPaths(ctx context.Context, c Client, opts *IngestPathsOpts, datasetID return nil } + // Process the file sp := subPath absPath, err := filepath.Abs(sp) if err != nil { return fmt.Errorf("failed to get absolute path for %s: %w", sp, err) } touchedFilePaths = append(touchedFilePaths, absPath) + + currentMetadata := metadataStack[len(metadataStack)-1] + g.Go(func() error { if err := sem.Acquire(ctx, 1); err != nil { return err @@ -141,14 +131,16 @@ func ingestPaths(ctx context.Context, c Client, opts *IngestPathsOpts, datasetID defer sem.Release(1) ingestedFilesCount++ - slog.Debug("Ingesting file", "path", absPath) - return ingestionFunc(sp) + slog.Debug("Ingesting file", "path", absPath, "metadata", currentMetadata) + return ingestionFunc(sp, currentMetadata.Metadata[filepath.Base(sp)]) // FIXME: metadata }) return nil }) if err != nil { return ingestedFilesCount, err } + // Directory processed, pop metadata + metadataStack = metadataStack[:len(metadataStack)-1] } else { if isIgnored(ignore, path) { slog.Debug("Ignoring file", "path", path, "ignorefile", opts.IgnoreFile, "ignoreExtensions", opts.IgnoreExtensions) @@ -168,7 +160,8 @@ func ingestPaths(ctx context.Context, c Client, opts *IngestPathsOpts, datasetID defer sem.Release(1) ingestedFilesCount++ - return ingestionFunc(path) + currentMetadata := metadataStack[len(metadataStack)-1] + return ingestionFunc(path, currentMetadata.Metadata[filepath.Base(path)]) // FIXME: metadata }) } diff --git a/pkg/client/default.go b/pkg/client/default.go index 078858a..46644b2 100644 --- a/pkg/client/default.go +++ b/pkg/client/default.go @@ -122,7 +122,7 @@ func (c *DefaultClient) IngestPaths(ctx context.Context, datasetID string, opts return 0, err } - ingestFile := func(path string) error { + ingestFile := func(path string, extraMetadata map[string]any) error { content, err := os.ReadFile(path) if err != nil { return fmt.Errorf("failed to read file %s: %w", path, err) @@ -148,6 +148,7 @@ func (c *DefaultClient) IngestPaths(ctx context.Context, datasetID string, opts ModifiedAt: finfo.ModTime(), }, IsDuplicateFuncName: "file_metadata", + ExtraMetadata: extraMetadata, } if opts != nil { payload.TextSplitterOpts = opts.TextSplitterOpts diff --git a/pkg/client/ignore.go b/pkg/client/ignore.go new file mode 100644 index 0000000..58c29f8 --- /dev/null +++ b/pkg/client/ignore.go @@ -0,0 +1,48 @@ +package client + +import ( + "bufio" + "fmt" + "os" + "path/filepath" + "strings" + + "github.com/go-git/go-git/v5/plumbing/format/gitignore" +) + +var DefaultIgnorePatterns = []gitignore.Pattern{ + gitignore.ParsePattern(MetadataFilename, nil), // Knowledge Metadata file + gitignore.ParsePattern("~$*", nil), // MS Office temp files + gitignore.ParsePattern("$*", nil), // Likely hidden/tempfiles +} + +func isIgnored(ignore gitignore.Matcher, path string) bool { + return ignore.Match(strings.Split(path, string(filepath.Separator)), false) +} + +func readIgnoreFile(path string) ([]gitignore.Pattern, error) { + stat, err := os.Stat(path) + if err != nil { + return nil, fmt.Errorf("failed to checkout ignore file %q: %w", path, err) + } + + if stat.IsDir() { + return nil, fmt.Errorf("ignore file %q is a directory", path) + } + + var ps []gitignore.Pattern + f, err := os.Open(path) + if err != nil { + return nil, fmt.Errorf("failed to open ignore file %q: %w", path, err) + } + + scanner := bufio.NewScanner(f) + for scanner.Scan() { + s := scanner.Text() + if !strings.HasPrefix(s, "#") && len(strings.TrimSpace(s)) > 0 { + ps = append(ps, gitignore.ParsePattern(s, nil)) + } + } + + return ps, nil +} diff --git a/pkg/client/metadata.go b/pkg/client/metadata.go new file mode 100644 index 0000000..4ee5513 --- /dev/null +++ b/pkg/client/metadata.go @@ -0,0 +1,61 @@ +package client + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" +) + +const MetadataFilename = ".knowledge.json" + +type Metadata struct { + Metadata map[string]FileMetadata `json:"metadata"` // Map of file paths to metadata + // TODO (idea): add other fields like description here, so we can hierarchically build a dataset description? Challenge is pruning and merging. +} + +type FileMetadata map[string]any + +func loadAndMergeMetadata(dirPath string, parentMetadata *Metadata) (*Metadata, error) { + metadataPath := filepath.Join(dirPath, MetadataFilename) + dirName := filepath.Base(dirPath) + if _, err := os.Stat(metadataPath); err == nil { // Metadata file exists + fileContent, err := os.ReadFile(metadataPath) + if err != nil { + return nil, fmt.Errorf("failed to read metadata file %s: %w", metadataPath, err) + } + + var newMetadata Metadata + if err := json.Unmarshal(fileContent, &newMetadata); err != nil { + return nil, fmt.Errorf("failed to unmarshal metadata file %s: %w", metadataPath, err) + } + + // Merge with parent metadata, overriding existing keys + mergedMetadata := &Metadata{Metadata: make(map[string]FileMetadata, len(parentMetadata.Metadata)+len(newMetadata.Metadata))} + for filename, fileMetadata := range parentMetadata.Metadata { + if !strings.HasPrefix(filename, dirName) { + // skip entries which are not meant for this (sub-)directory + continue + } + fname := strings.TrimPrefix(strings.TrimPrefix(filename, dirName), string(filepath.Separator)) + mergedMetadata.Metadata[fname] = fileMetadata + } + + if newMetadata.Metadata != nil { + for filename, fileMetadata := range newMetadata.Metadata { + for k, v := range fileMetadata { + if mergedMetadata.Metadata[filename] == nil { + mergedMetadata.Metadata[filename] = make(FileMetadata, len(fileMetadata)) + } + mergedMetadata.Metadata[filename][k] = v + } + } + } + + return mergedMetadata, nil + } + + // No metadata file, return parent metadata as is + return parentMetadata, nil +} diff --git a/pkg/client/standalone.go b/pkg/client/standalone.go index f0129ae..5b56f4d 100644 --- a/pkg/client/standalone.go +++ b/pkg/client/standalone.go @@ -68,7 +68,7 @@ func (c *StandaloneClient) IngestPaths(ctx context.Context, datasetID string, op return 0, err } - ingestFile := func(path string) error { + ingestFile := func(path string, extraMetadata map[string]any) error { // Gather metadata finfo, err := os.Stat(path) if err != nil { @@ -95,6 +95,7 @@ func (c *StandaloneClient) IngestPaths(ctx context.Context, datasetID string, op ModifiedAt: finfo.ModTime(), }, IsDuplicateFuncName: opts.IsDuplicateFuncName, + ExtraMetadata: extraMetadata, } if opts != nil { diff --git a/pkg/datastore/ingest.go b/pkg/datastore/ingest.go index d8922c5..e9c029a 100644 --- a/pkg/datastore/ingest.go +++ b/pkg/datastore/ingest.go @@ -24,11 +24,11 @@ type IngestOpts struct { IsDuplicateFunc IsDuplicateFunc TextSplitterOpts *textsplitter.TextSplitterOpts IngestionFlows []flows.IngestionFlow + ExtraMetadata map[string]any } // Ingest loads a document from a reader and adds it to the dataset. func (s *Datastore) Ingest(ctx context.Context, datasetID string, name string, content []byte, opts IngestOpts) ([]string, error) { - if name == "" { return nil, fmt.Errorf("name is required") } @@ -147,8 +147,14 @@ func (s *Datastore) Ingest(ctx context.Context, datasetID string, name string, c return nil, fmt.Errorf("%w (file %q)", &documentloader.UnsupportedFileTypeError{FileType: filetype}, opts.FileMetadata.AbsolutePath) } - // Mandatory Transformation: Add filename to metadata - em := &transformers.ExtraMetadata{Metadata: map[string]any{"filename": filename, "absPath": opts.FileMetadata.AbsolutePath}} + // Mandatory Transformation: Add filename to metadata -> append extraMetadata, but do not override filename or absPath + metadata := map[string]any{"filename": filename, "absPath": opts.FileMetadata.AbsolutePath} + for k, v := range opts.ExtraMetadata { + if _, ok := metadata[k]; !ok { + metadata[k] = v + } + } + em := &transformers.ExtraMetadata{Metadata: metadata} ingestionFlow.Transformations = append(ingestionFlow.Transformations, em) docs, err := ingestionFlow.Run(ctx, bytes.NewReader(content))