Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add metadata blob_storage_total_files and blob_storage_file_index on azure blob storage input #89

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions internal/impl/azure/input_blob_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,21 +391,43 @@ type azureBlobStorage struct {
object *azurePendingObject

log *service.Logger

totalFiles int
currentIndex int
}

func newAzureBlobStorage(conf bsiConfig, log *service.Logger) (*azureBlobStorage, error) {
a := &azureBlobStorage{
conf: conf,
objectScannerCtor: conf.Codec,
log: log,

totalFiles: 0,
currentIndex: 0,
}
return a, nil
}

func (a *azureBlobStorage) Connect(ctx context.Context) error {
var err error
a.keyReader, err = newAzureTargetReader(ctx, a.log, a.conf)

// Count total files
for {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about if a file is added after the connection is made? Won't this information then become stale?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote it considering only batch mode. After testing, it seems that in batch mode, the file list is fetched at the time of connection.

func newAzureTargetReader(ctx context.Context, logger *service.Logger, conf bsiConfig) (azureTargetReader, error) {
	if conf.FileReader == nil {
		return newAzureTargetBatchReader(ctx, conf)
	}
	return &azureTargetStreamReader{
		input: conf.FileReader,
		log:   logger,
	}, nil
}

Looking at this, it seems that azureTargetStreamReader is distinguished by FileReader.

Copy link
Author

@mrchypark mrchypark Aug 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't know whether the Azure SDK holds the list at the point of input generation, or if it updates it every time the pager operates. Rather than relying on this approach, it would be better if there was a way for the pager to indicate when it has reached the end.

_, err := a.keyReader.Pop(ctx)
if err == io.EOF {
break
}
if err != nil {
return err
}
a.totalFiles++
}

// Reset the keyReader
a.keyReader, err = newAzureTargetReader(ctx, a.log, a.conf)
return err

}

func (a *azureBlobStorage) getObjectTarget(ctx context.Context) (*azurePendingObject, error) {
Expand Down Expand Up @@ -438,7 +460,7 @@ func (a *azureBlobStorage) getObjectTarget(ctx context.Context) (*azurePendingOb
return object, nil
}

func blobStorageMetaToBatch(p *azurePendingObject, containerName string, parts service.MessageBatch) {
func blobStorageMetaToBatch(a *azureBlobStorage, p *azurePendingObject, containerName string, parts service.MessageBatch) {
for _, part := range parts {
part.MetaSetMut("blob_storage_key", p.target.key)
part.MetaSetMut("blob_storage_container", containerName)
Expand All @@ -456,6 +478,10 @@ func blobStorageMetaToBatch(p *azurePendingObject, containerName string, parts s
for k, v := range p.obj.Metadata {
part.MetaSetMut(k, v)
}

// Add total files count and current file index
part.MetaSetMut("blob_storage_total_files", a.totalFiles)
part.MetaSetMut("blob_storage_file_index", a.currentIndex)
}
}

Expand Down Expand Up @@ -500,7 +526,10 @@ func (a *azureBlobStorage) ReadBatch(ctx context.Context) (msg service.MessageBa
}
}

blobStorageMetaToBatch(object, a.conf.Container, parts)
blobStorageMetaToBatch(a, object, a.conf.Container, parts)

// Increment the current index after processing a file
a.currentIndex++

return parts, func(rctx context.Context, res error) error {
return scnAckFn(rctx, res)
Expand Down