Skip to content
This repository has been archived by the owner on Oct 30, 2024. It is now read-only.

feat: ingestion - include metadata from .knowledge.json on dir level #124

Merged
merged 4 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 30 additions & 37 deletions pkg/client/common.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package client

import (
"bufio"
"context"
"crypto/sha1"
"encoding/hex"
Expand All @@ -20,38 +19,7 @@ import (
"golang.org/x/sync/semaphore"
)

func isIgnored(ignore gitignore.Matcher, path string) bool {
return ignore.Match(strings.Split(path, string(filepath.Separator)), false)
}

func readIgnoreFile(path string) ([]gitignore.Pattern, error) {
stat, err := os.Stat(path)
if err != nil {
return nil, fmt.Errorf("failed to checkout ignore file %q: %w", path, err)
}

if stat.IsDir() {
return nil, fmt.Errorf("ignore file %q is a directory", path)
}

var ps []gitignore.Pattern
f, err := os.Open(path)
if err != nil {
return nil, fmt.Errorf("failed to open ignore file %q: %w", path, err)
}

scanner := bufio.NewScanner(f)
for scanner.Scan() {
s := scanner.Text()
if !strings.HasPrefix(s, "#") && len(strings.TrimSpace(s)) > 0 {
ps = append(ps, gitignore.ParsePattern(s, nil))
}
}

return ps, nil
}

func ingestPaths(ctx context.Context, c Client, opts *IngestPathsOpts, datasetID string, ingestionFunc func(path string) error, paths ...string) (int, error) {
func ingestPaths(ctx context.Context, c Client, opts *IngestPathsOpts, datasetID string, ingestionFunc func(path string, metadata map[string]any) error, paths ...string) (int, error) {
ingestedFilesCount := 0

var ignorePatterns []gitignore.Pattern
Expand All @@ -72,7 +40,7 @@ func ingestPaths(ctx context.Context, c Client, opts *IngestPathsOpts, datasetID
}
}

slog.Debug("Ignore patterns", "patterns", ignorePatterns, "len", len(ignorePatterns))
ignorePatterns = append(ignorePatterns, DefaultIgnorePatterns...)

ignore := gitignore.NewMatcher(ignorePatterns)

Expand All @@ -83,6 +51,9 @@ func ingestPaths(ctx context.Context, c Client, opts *IngestPathsOpts, datasetID

g, ctx := errgroup.WithContext(ctx)

// Stack to store metadata when entering nested directories
var metadataStack []Metadata

for _, p := range paths {
path := p
var touchedFilePaths []string
Expand All @@ -109,6 +80,13 @@ func ingestPaths(ctx context.Context, c Client, opts *IngestPathsOpts, datasetID
}

if fileInfo.IsDir() {
initialMetadata := &Metadata{Metadata: map[string]FileMetadata{}}
directoryMetadata, err := loadAndMergeMetadata(path, initialMetadata)
if err != nil {
return ingestedFilesCount, err
}
metadataStack = append(metadataStack, *directoryMetadata)

// Process directory
err = filepath.WalkDir(path, func(subPath string, d os.DirEntry, err error) error {
if err != nil {
Expand All @@ -121,34 +99,48 @@ func ingestPaths(ctx context.Context, c Client, opts *IngestPathsOpts, datasetID
if !opts.Recursive {
return filepath.SkipDir // Skip subdirectories if not recursive
}

// One dir level deeper -> load new metadata
parentMetadata := metadataStack[len(metadataStack)-1]
newMetadata, err := loadAndMergeMetadata(subPath, &parentMetadata)
if err != nil {
return err
}
metadataStack = append(metadataStack, *newMetadata)
return nil
}
if isIgnored(ignore, subPath) {
slog.Debug("Ignoring file", "path", subPath, "ignorefile", opts.IgnoreFile, "ignoreExtensions", opts.IgnoreExtensions)
return nil
}

// Process the file
sp := subPath
absPath, err := filepath.Abs(sp)
if err != nil {
return fmt.Errorf("failed to get absolute path for %s: %w", sp, err)
}
touchedFilePaths = append(touchedFilePaths, absPath)

currentMetadata := metadataStack[len(metadataStack)-1]

g.Go(func() error {
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
defer sem.Release(1)

ingestedFilesCount++
slog.Debug("Ingesting file", "path", absPath)
return ingestionFunc(sp)
slog.Debug("Ingesting file", "path", absPath, "metadata", currentMetadata)
return ingestionFunc(sp, currentMetadata.Metadata[filepath.Base(sp)]) // FIXME: metadata
})
return nil
})
if err != nil {
return ingestedFilesCount, err
}
// Directory processed, pop metadata
metadataStack = metadataStack[:len(metadataStack)-1]
} else {
if isIgnored(ignore, path) {
slog.Debug("Ignoring file", "path", path, "ignorefile", opts.IgnoreFile, "ignoreExtensions", opts.IgnoreExtensions)
Expand All @@ -168,7 +160,8 @@ func ingestPaths(ctx context.Context, c Client, opts *IngestPathsOpts, datasetID
defer sem.Release(1)

ingestedFilesCount++
return ingestionFunc(path)
currentMetadata := metadataStack[len(metadataStack)-1]
return ingestionFunc(path, currentMetadata.Metadata[filepath.Base(path)]) // FIXME: metadata
})
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/client/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (c *DefaultClient) IngestPaths(ctx context.Context, datasetID string, opts
return 0, err
}

ingestFile := func(path string) error {
ingestFile := func(path string, extraMetadata map[string]any) error {
content, err := os.ReadFile(path)
if err != nil {
return fmt.Errorf("failed to read file %s: %w", path, err)
Expand All @@ -148,6 +148,7 @@ func (c *DefaultClient) IngestPaths(ctx context.Context, datasetID string, opts
ModifiedAt: finfo.ModTime(),
},
IsDuplicateFuncName: "file_metadata",
ExtraMetadata: extraMetadata,
}
if opts != nil {
payload.TextSplitterOpts = opts.TextSplitterOpts
Expand Down
48 changes: 48 additions & 0 deletions pkg/client/ignore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package client

import (
"bufio"
"fmt"
"os"
"path/filepath"
"strings"

"github.com/go-git/go-git/v5/plumbing/format/gitignore"
)

var DefaultIgnorePatterns = []gitignore.Pattern{
gitignore.ParsePattern(MetadataFilename, nil), // Knowledge Metadata file
gitignore.ParsePattern("~$*", nil), // MS Office temp files
gitignore.ParsePattern("$*", nil), // Likely hidden/tempfiles
}

func isIgnored(ignore gitignore.Matcher, path string) bool {
return ignore.Match(strings.Split(path, string(filepath.Separator)), false)
}

func readIgnoreFile(path string) ([]gitignore.Pattern, error) {
stat, err := os.Stat(path)
if err != nil {
return nil, fmt.Errorf("failed to checkout ignore file %q: %w", path, err)
}

if stat.IsDir() {
return nil, fmt.Errorf("ignore file %q is a directory", path)
}

var ps []gitignore.Pattern
f, err := os.Open(path)
if err != nil {
return nil, fmt.Errorf("failed to open ignore file %q: %w", path, err)
}

scanner := bufio.NewScanner(f)
for scanner.Scan() {
s := scanner.Text()
if !strings.HasPrefix(s, "#") && len(strings.TrimSpace(s)) > 0 {
ps = append(ps, gitignore.ParsePattern(s, nil))
}
}

return ps, nil
}
61 changes: 61 additions & 0 deletions pkg/client/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package client

import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"strings"
)

const MetadataFilename = ".knowledge.json"

type Metadata struct {
Metadata map[string]FileMetadata `json:"metadata"` // Map of file paths to metadata
// TODO (idea): add other fields like description here, so we can hierarchically build a dataset description? Challenge is pruning and merging.
}

type FileMetadata map[string]any

func loadAndMergeMetadata(dirPath string, parentMetadata *Metadata) (*Metadata, error) {
metadataPath := filepath.Join(dirPath, MetadataFilename)
dirName := filepath.Base(dirPath)
if _, err := os.Stat(metadataPath); err == nil { // Metadata file exists
fileContent, err := os.ReadFile(metadataPath)
if err != nil {
return nil, fmt.Errorf("failed to read metadata file %s: %w", metadataPath, err)
}

var newMetadata Metadata
if err := json.Unmarshal(fileContent, &newMetadata); err != nil {
return nil, fmt.Errorf("failed to unmarshal metadata file %s: %w", metadataPath, err)
}

// Merge with parent metadata, overriding existing keys
mergedMetadata := &Metadata{Metadata: make(map[string]FileMetadata, len(parentMetadata.Metadata)+len(newMetadata.Metadata))}
for filename, fileMetadata := range parentMetadata.Metadata {
if !strings.HasPrefix(filename, dirName) {
// skip entries which are not meant for this (sub-)directory
continue
}
fname := strings.TrimPrefix(strings.TrimPrefix(filename, dirName), string(filepath.Separator))
mergedMetadata.Metadata[fname] = fileMetadata
}

if newMetadata.Metadata != nil {
for filename, fileMetadata := range newMetadata.Metadata {
for k, v := range fileMetadata {
if mergedMetadata.Metadata[filename] == nil {
mergedMetadata.Metadata[filename] = make(FileMetadata, len(fileMetadata))
}
mergedMetadata.Metadata[filename][k] = v
}
}
}

return mergedMetadata, nil
}

// No metadata file, return parent metadata as is
return parentMetadata, nil
}
3 changes: 2 additions & 1 deletion pkg/client/standalone.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (c *StandaloneClient) IngestPaths(ctx context.Context, datasetID string, op
return 0, err
}

ingestFile := func(path string) error {
ingestFile := func(path string, extraMetadata map[string]any) error {
// Gather metadata
finfo, err := os.Stat(path)
if err != nil {
Expand All @@ -95,6 +95,7 @@ func (c *StandaloneClient) IngestPaths(ctx context.Context, datasetID string, op
ModifiedAt: finfo.ModTime(),
},
IsDuplicateFuncName: opts.IsDuplicateFuncName,
ExtraMetadata: extraMetadata,
}

if opts != nil {
Expand Down
12 changes: 9 additions & 3 deletions pkg/datastore/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ type IngestOpts struct {
IsDuplicateFunc IsDuplicateFunc
TextSplitterOpts *textsplitter.TextSplitterOpts
IngestionFlows []flows.IngestionFlow
ExtraMetadata map[string]any
}

// Ingest loads a document from a reader and adds it to the dataset.
func (s *Datastore) Ingest(ctx context.Context, datasetID string, name string, content []byte, opts IngestOpts) ([]string, error) {

if name == "" {
return nil, fmt.Errorf("name is required")
}
Expand Down Expand Up @@ -147,8 +147,14 @@ func (s *Datastore) Ingest(ctx context.Context, datasetID string, name string, c
return nil, fmt.Errorf("%w (file %q)", &documentloader.UnsupportedFileTypeError{FileType: filetype}, opts.FileMetadata.AbsolutePath)
}

// Mandatory Transformation: Add filename to metadata
em := &transformers.ExtraMetadata{Metadata: map[string]any{"filename": filename, "absPath": opts.FileMetadata.AbsolutePath}}
// Mandatory Transformation: Add filename to metadata -> append extraMetadata, but do not override filename or absPath
metadata := map[string]any{"filename": filename, "absPath": opts.FileMetadata.AbsolutePath}
for k, v := range opts.ExtraMetadata {
if _, ok := metadata[k]; !ok {
metadata[k] = v
}
}
em := &transformers.ExtraMetadata{Metadata: metadata}
ingestionFlow.Transformations = append(ingestionFlow.Transformations, em)

docs, err := ingestionFlow.Run(ctx, bytes.NewReader(content))
Expand Down
Loading