Skip to content
This repository has been archived by the owner on Oct 30, 2024. It is now read-only.

Commit

Permalink
change: simplify metadata file lookup and assignment
Browse files Browse the repository at this point in the history
  • Loading branch information
iwilltry42 committed Oct 25, 2024
1 parent 81fccfb commit 4ee3008
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 50 deletions.
37 changes: 19 additions & 18 deletions pkg/client/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,13 @@ func ingestPaths(ctx context.Context, c Client, opts *IngestPathsOpts, datasetID
}

if fileInfo.IsDir() {
initialMetadata := &Metadata{Metadata: map[string]FileMetadata{}}
directoryMetadata, err := loadAndMergeMetadata(path, initialMetadata)
directoryMetadata, err := loadDirMetadata(path)
if err != nil {
return ingestedFilesCount, err
}
metadataStack = append(metadataStack, *directoryMetadata)
if directoryMetadata != nil {
metadataStack = append(metadataStack, *directoryMetadata)
}

// Process directory
err = filepath.WalkDir(path, func(subPath string, d os.DirEntry, err error) error {
Expand All @@ -115,12 +116,13 @@ func ingestPaths(ctx context.Context, c Client, opts *IngestPathsOpts, datasetID
}

// One dir level deeper -> load new metadata
parentMetadata := metadataStack[len(metadataStack)-1]
newMetadata, err := loadAndMergeMetadata(subPath, &parentMetadata)
newMetadata, err := loadDirMetadata(subPath)
if err != nil {
return err
}
metadataStack = append(metadataStack, *newMetadata)
if newMetadata != nil {
metadataStack = append(metadataStack, *newMetadata)
}
return nil
}

Expand All @@ -141,17 +143,19 @@ func ingestPaths(ctx context.Context, c Client, opts *IngestPathsOpts, datasetID
}
touchedFilePaths = append(touchedFilePaths, absPath)

currentMetadata := metadataStack[len(metadataStack)-1]
slog.Debug("metadata stack", "stack", metadataStack, "path", sp, "absPath", absPath, "metadata", currentMetadata.Metadata)

g.Go(func() error {
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
defer sem.Release(1)

slog.Debug("Ingesting file", "path", absPath, "metadata", currentMetadata, "metadataForFile", currentMetadata.Metadata[filepath.Base(sp)], "lookup", filepath.Base(sp))
err = ingestionFunc(sp, currentMetadata.Metadata[filepath.Base(sp)]) // FIXME: metadata
fileMeta, err := findMetadata(absPath, metadataStack)
if err != nil {
return fmt.Errorf("failed to find metadata for %s: %w", absPath, err)
}
slog.Debug("Ingesting file", "absPath", absPath, "metadata", fileMeta)

err = ingestionFunc(sp, fileMeta)
if err == nil {
ingestedFilesCount++
}
Expand All @@ -162,8 +166,6 @@ func ingestPaths(ctx context.Context, c Client, opts *IngestPathsOpts, datasetID
if err != nil {
return ingestedFilesCount, err
}
// Directory processed, pop metadata
metadataStack = metadataStack[:len(metadataStack)-1]
} else {
if isIgnored(ignore, path) {
slog.Debug("Ignoring file", "path", path, "ignorefile", opts.IgnoreFile, "ignoreExtensions", opts.IgnoreExtensions)
Expand All @@ -183,12 +185,11 @@ func ingestPaths(ctx context.Context, c Client, opts *IngestPathsOpts, datasetID
defer sem.Release(1)

ingestedFilesCount++
var fileMetadata FileMetadata
if len(metadataStack) > 0 {
currentMetadata := metadataStack[len(metadataStack)-1]
fileMetadata = currentMetadata.Metadata[filepath.Base(path)]
fileMeta, err := findMetadata(absPath, metadataStack)
if err != nil {
return fmt.Errorf("failed to find metadata for %s: %w", absPath, err)
}
return ingestionFunc(path, fileMetadata)
return ingestionFunc(path, fileMeta)
})
}

Expand Down
80 changes: 48 additions & 32 deletions pkg/client/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package client
import (
"encoding/json"
"fmt"
"log/slog"
"os"
"path/filepath"
"strings"
Expand All @@ -18,45 +19,60 @@ type Metadata struct {

type FileMetadata map[string]any

func loadAndMergeMetadata(dirPath string, parentMetadata *Metadata) (*Metadata, error) {
// loadAndMergeMetadata checks if the given directory contains a metadata file.
// If so, it reads it in and merges it with the previous level of metadata.
// Doing so, the parentMetadata is trimmed down to only the entries relevant to this directory.
func loadDirMetadata(dirPath string) (*Metadata, error) {
metadataPath := filepath.Join(dirPath, MetadataFilename)
dirName := filepath.Base(dirPath)
if _, err := os.Stat(metadataPath); err == nil { // Metadata file exists
fileContent, err := os.ReadFile(metadataPath)
if err != nil {
return nil, fmt.Errorf("failed to read metadata file %s: %w", metadataPath, err)
}
metaAbsPath, err := filepath.Abs(metadataPath)
if err != nil {
return nil, fmt.Errorf("failed to get absolute path for %s: %w", metadataPath, err)
}
dirPath = filepath.Dir(metadataPath)
if _, err := os.Stat(metadataPath); err != nil {
return nil, nil
}
// Metadata file exists
fileContent, err := os.ReadFile(metadataPath)
if err != nil {
return nil, fmt.Errorf("failed to read metadata file %s: %w", metadataPath, err)
}

var newMetadata Metadata
if err := json.Unmarshal(fileContent, &newMetadata); err != nil {
return nil, fmt.Errorf("failed to unmarshal metadata file %s: %w", metadataPath, err)
}
metadata := &Metadata{
MetadataFileAbsPath: metaAbsPath,
}
if err := json.Unmarshal(fileContent, &metadata); err != nil {
return nil, fmt.Errorf("failed to unmarshal metadata file %s: %w", metadataPath, err)
}

// Merge with parent metadata, overriding existing keys
mergedMetadata := &Metadata{Metadata: make(map[string]FileMetadata, len(parentMetadata.Metadata)+len(newMetadata.Metadata))}
for filename, fileMetadata := range parentMetadata.Metadata {
if !strings.HasPrefix(filename, dirName) {
// skip entries which are not meant for this (sub-)directory
continue
}
fname := strings.TrimPrefix(strings.TrimPrefix(filename, dirName), string(filepath.Separator))
mergedMetadata.Metadata[fname] = fileMetadata
}
slog.Info("Loaded metadata", "path", metadataPath, "metadata", metadata.Metadata)

return metadata, nil

if newMetadata.Metadata != nil {
for filename, fileMetadata := range newMetadata.Metadata {
for k, v := range fileMetadata {
if mergedMetadata.Metadata[filename] == nil {
mergedMetadata.Metadata[filename] = make(FileMetadata, len(fileMetadata))
}
mergedMetadata.Metadata[filename][k] = v
}
}

func findMetadata(path string, metadataStack []Metadata) (FileMetadata, error) {

absPath, err := filepath.Abs(path)
if err != nil {
return nil, err
}

metadata := make(map[string]any)

for _, metadataEntry := range metadataStack {
target := strings.TrimPrefix(strings.TrimPrefix(absPath, filepath.Dir(metadataEntry.MetadataFileAbsPath)), string(filepath.Separator))

if m, ok := metadataEntry.Metadata[target]; ok {
for k, v := range m {
metadata[k] = v
}
}

return mergedMetadata, nil
}

// No metadata file, return parent metadata as is
return parentMetadata, nil
slog.Debug("Found metadata", "path", path, "metadata", metadata)

return metadata, nil

}

0 comments on commit 4ee3008

Please sign in to comment.