Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up making latest.zarr #221

Open
peterdudfield opened this issue Jan 9, 2025 · 3 comments
Open

Speed up making latest.zarr #221

peterdudfield opened this issue Jan 9, 2025 · 3 comments

Comments

@peterdudfield
Copy link
Contributor

peterdudfield commented Jan 9, 2025

Current the nwp dag moves the copies recent zarr file to latest.zarr but this can take 10 munutes as there could be 10,000 files.

  1. change the chunking, but doesn't work so well if we writing to the store in parrellel. And don't want to have make too many differences to archive and consume. This could be done with an env var when collecting live data
  2. Use s3 batch jobs
  3. re chunk after pulling the data in the nwp-consumer
  4. use aws sync?
  5. save as zarr and as a zarr.zip, then dag could copy and unzip
  6. Try zarr3 and larger chunk sizes
@peterdudfield
Copy link
Contributor Author

peterdudfield commented Jan 9, 2025

2 could be done somehting like this

import boto3
import json
import uuid

def create_manifest(bucket_name, source_prefix, manifest_file):
    """Generate a manifest file listing all objects in the source prefix."""
    s3_client = boto3.client("s3")
    paginator = s3_client.get_paginator("list_objects_v2")
    pages = paginator.paginate(Bucket=bucket_name, Prefix=source_prefix)

    manifest_content = {
        "fileFormat": "S3BatchOperations_CSV_20180820",
        "files": []
    }

    for page in pages:
        for obj in page.get("Contents", []):
            manifest_content["files"].append({
                "key": obj["Key"],
                "size": obj["Size"]
            })

    # Upload the manifest file to the bucket
    manifest_file_path = f"manifests/{manifest_file}"
    s3_client.put_object(
        Bucket=bucket_name,
        Key=manifest_file_path,
        Body=json.dumps(manifest_content)
    )

    print(f"Manifest file created at s3://{bucket_name}/{manifest_file_path}")
    return manifest_file_path

def create_batch_job(bucket_name, manifest_file_path, source_prefix, target_prefix):
    """Create an S3 Batch Operations job to copy objects from source to target."""
    s3control_client = boto3.client("s3control", region_name=<some-region>)

    caller = boto3.client("sts").get_caller_identity()
    token = str(uuid.uuid4())

    operation = {
        "S3PutObjectCopy": {
            "TargetResource": f"arn:aws:s3:::{bucket_name}",
            "TargetKeyPrefix": target_prefix,
        }
    }

    job = s3control_client.create_job(
        AccountId=caller["Account"],
        ClientRequestToken=token,
        Operation=operation,
        Report={
            "Bucket": f"{bucket_name}",
            "Format": "S3BatchOperations_CSV_20180820",
            "Enabled": True,
            "Prefix": f"reports/{token}/"
        },
        Manifest={
            "Spec": {
                "Format": "S3BatchOperations_CSV_20180820",
                "Fields": ["Bucket", "Key"]
            },
            "Location": {
                "ObjectArn": f"arn:aws:s3:::{bucket_name}/{manifest_file_path}",
                "ETag": boto3.client("s3").head_object(
                    Bucket=bucket_name, Key=manifest_file_path
                )["ETag"].strip('"')
            }
        },
        Priority=1,
        RoleArn=<some-arn>,
        Description="Copy objects from one prefix to another."
    )

    print(f"Batch job created with ID: {job['JobId']}")
    return job["JobId"]

def cleanup_files(bucket_name, manifest_file_path, report_prefix):
    """Clean up the manifest and reports if the job succeeds."""
    s3_client = boto3.client("s3")

    # Delete the manifest file
    print(f"Deleting manifest file: {manifest_file_path}")
    s3_client.delete_object(Bucket=bucket_name, Key=manifest_file_path)

    # Delete the reports
    print(f"Deleting reports in prefix: {report_prefix}")
    paginator = s3_client.get_paginator("list_objects_v2")
    pages = paginator.paginate(Bucket=bucket_name, Prefix=report_prefix)
    for page in pages:
        for obj in page.get("Contents", []):
            s3_client.delete_object(Bucket=bucket_name, Key=obj["Key"])

def main():
    bucket_name = "india-nwp-development"
    source_prefix = "ecmwf/data/2025010900.zarr/"
    target_prefix = "ecmwf/data/copy.zarr/"
    manifest_file = "source-prefix-manifest.json"

    manifest_file_path = create_manifest(bucket_name, source_prefix, manifest_file)
    batch_job_id = create_batch_job(bucket_name, manifest_file_path, source_prefix, target_prefix)

    print(f"S3 Batch Job ID: {batch_job_id}")

    # Wait for job to complete (simplified for example purposes)
    s3control_client = boto3.client("s3control")
    account_id = boto3.client("sts").get_caller_identity()["Account"]
    waiter = s3control_client.get_waiter("job_complete")
    waiter.wait(AccountId=account_id, JobId=batch_job_id)

    # Check job status
    response = s3control_client.describe_job(AccountId=account_id, JobId=batch_job_id)
    if response["Job"]['Status'] == "Complete":
        print("Job completed successfully. Cleaning up...")
        cleanup_files(bucket_name, manifest_file_path, response["Job"]["Report"]["Prefix"])
    else:
        print("Job did not complete successfully. Skipping cleanup.")

if __name__ == "__main__":
    main()

@devsjc
Copy link
Collaborator

devsjc commented Jan 10, 2025

  1. Use Zarr V3 and sharding to reduce number of files

@devsjc
Copy link
Collaborator

devsjc commented Jan 10, 2025

  1. (the quickest win) Update the number of chunks in the variable dimension to be 1, reducing the number of files by 18x and the copy time from 16min to < 1min.

This is safe to do on this dataset in particular as all the variables are present in every provided file, so thta entire chunk will always be written in its entirety.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants