Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge branch 'improvement/error-handling-task-creation' of https://gi…
Browse files Browse the repository at this point in the history
…thub.com/PrefectHQ/prefect-aws into improvement/error-handling-task-creation
  • Loading branch information
jeanluciano committed Apr 10, 2024
2 parents a5743ce + 91bcc24 commit 4cae60d
Show file tree
Hide file tree
Showing 11 changed files with 422 additions and 391 deletions.
1 change: 1 addition & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ jobs:
- "3.9"
- "3.10"
- "3.11"
- "3.12"
fail-fast: false
steps:
- uses: actions/checkout@v4
Expand Down
406 changes: 224 additions & 182 deletions docs/ecs_guide.md

Large diffs are not rendered by default.

Binary file added docs/img/Workpool_UI.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
209 changes: 38 additions & 171 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,180 +12,61 @@
<br>
<a href="https://prefect-community.slack.com" alt="Slack">
<img src="https://img.shields.io/badge/slack-join_community-red.svg?color=26272B&labelColor=090422&logo=slack" /></a>
<a href="https://discourse.prefect.io/" alt="Discourse">
<img src="https://img.shields.io/badge/discourse-browse_forum-red.svg?color=26272B&labelColor=090422&logo=discourse" /></a>
</p>

## Welcome!
## Welcome

`prefect-aws` makes it easy to leverage the capabilities of AWS in your flows, featuring support for ECSTask, S3, Secrets Manager, Batch Job, and Client Waiter.
`prefect-aws` makes it easy to leverage the capabilities of AWS in your workflows.

## Getting started

## Getting Started

### Saving credentials to a block

You will need an AWS account and credentials in order to use `prefect-aws`.

1. Refer to the [AWS Configuration documentation](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-quickstart.html#cli-configure-quickstart-creds) on how to retrieve your access key ID and secret access key
2. Copy the access key ID and secret access key
3. Create a short script and replace the placeholders with your credential information and desired block name:

```python
from prefect_aws import AwsCredentials
AwsCredentials(
aws_access_key_id="PLACEHOLDER",
aws_secret_access_key="PLACEHOLDER",
aws_session_token=None, # replace this with token if necessary
region_name="us-east-2"
).save("BLOCK-NAME-PLACEHOLDER")
```

Congrats! You can now load the saved block to use your credentials in your Python code:

```python
from prefect_aws import AwsCredentials
AwsCredentials.load("BLOCK-NAME-PLACEHOLDER")
```

!!! info "Registering blocks"

Register blocks in this module to
[view and edit them](https://docs.prefect.io/ui/blocks/)
on Prefect Cloud:

```bash
prefect block register -m prefect_aws
```

### Using Prefect with AWS ECS

`prefect_aws` allows you to use [AWS ECS](https://aws.amazon.com/ecs/) as infrastructure for your deployments. Using ECS for scheduled flow runs enables the dynamic provisioning of infrastructure for containers and unlocks greater scalability.

The snippets below show how you can use `prefect_aws` to run a task on ECS. The first example uses the `ECSTask` block as [infrastructure](https://docs.prefect.io/concepts/infrastructure/) and the second example shows using ECS within a flow.

#### As deployment Infrastructure
### Installation

Prefect requires Python 3.8 or newer.

##### Set variables
We recommend using a Python virtual environment manager such as pipenv, conda, or virtualenv.

To expedite copy/paste without the needing to update placeholders manually, update and execute the following.
Install `prefect-aws`

```bash
export CREDENTIALS_BLOCK_NAME="aws-credentials"
export VPC_ID="vpc-id"
export ECS_TASK_BLOCK_NAME="ecs-task-example"
export S3_BUCKET_BLOCK_NAME="ecs-task-bucket-example"
```

##### Save an infrastructure and storage block

Save a custom infrastructure and storage block by executing the following snippet.

```python
import os
from prefect_aws import AwsCredentials, ECSTask, S3Bucket

aws_credentials = AwsCredentials.load(os.environ["CREDENTIALS_BLOCK_NAME"])

ecs_task = ECSTask(
image="prefecthq/prefect:2-python3.10",
aws_credentials=aws_credentials,
vpc_id=os.environ["VPC_ID"],
)
ecs_task.save(os.environ["ECS_TASK_BLOCK_NAME"], overwrite=True)

bucket_name = "ecs-task-bucket-example"
s3_client = aws_credentials.get_s3_client()
s3_client.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={"LocationConstraint": aws_credentials.region_name}
)
s3_bucket = S3Bucket(
bucket_name=bucket_name,
credentials=aws_credentials,
)
s3_bucket.save(os.environ["S3_BUCKET_BLOCK_NAME"], overwrite=True)
```

##### Write a flow

Then, use an existing flow to create a deployment with, or use the flow below if you don't have an existing flow handy.

```python
from prefect import flow

@flow(log_prints=True)
def ecs_task_flow():
print("Hello, Prefect!")

if __name__ == "__main__":
ecs_task_flow()
pip install prefect-aws
```

##### Create a deployment
### Registering blocks

If the script was named "ecs_task_script.py", build a deployment manifest with the following command.
Register [blocks](https://docs.prefect.io/ui/blocks/) in this module to make them available for use.

```bash
prefect deployment build ecs_task_script.py:ecs_task_flow \
-n ecs-task-deployment \
-ib ecs-task/${ECS_TASK_BLOCK_NAME} \
-sb s3-bucket/${S3_BUCKET_BLOCK_NAME} \
--override env.EXTRA_PIP_PACKAGES=prefect-aws
prefect block register -m prefect_aws
```

Now apply the deployment!

```bash
prefect deployment apply ecs_task_flow-deployment.yaml
```

##### Test the deployment
A list of available blocks in `prefect-aws` and their setup instructions can be found [here](https://PrefectHQ.github.io/prefect-aws/#blocks-catalog).

Start an [agent](https://docs.prefect.io/latest/concepts/work-pools/) in a separate terminal. The agent will poll the Prefect API's work pool for scheduled flow runs.
### Saving credentials to a block

```bash
prefect agent start -q 'default'
```
You will need an AWS account and credentials to use `prefect-aws`.

Run the deployment once to test it:
1. Refer to the [AWS Configuration documentation](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-quickstart.html#cli-configure-quickstart-creds) on how to retrieve your access key ID and secret access key
2. Copy the access key ID and secret access key
3. Create an `AWSCredenitals` block in the Prefect UI or use a Python script like the one below and replace the placeholders with your credential information and desired block name:

```bash
prefect deployment run ecs-task-flow/ecs-task-deployment
```python
from prefect_aws import AwsCredentials
AwsCredentials(
aws_access_key_id="PLACEHOLDER",
aws_secret_access_key="PLACEHOLDER",
aws_session_token=None, # replace this with token if necessary
region_name="us-east-2"
).save("BLOCK-NAME-PLACEHOLDER")
```

Once the flow run has completed, you will see `Hello, Prefect!` logged in the CLI and the Prefect UI.

!!! info "No class found for dispatch key"

If you encounter an error message like `KeyError: "No class found for dispatch key 'ecs-task' in registry for type 'Block'."`,
ensure `prefect-aws` is installed in the environment in which your agent is running!

Another tutorial on `ECSTask` can be found [here](https://towardsdatascience.com/prefect-aws-ecs-fargate-github-actions-make-serverless-dataflows-as-easy-as-py-f6025335effc).

#### Within Flow

You can also execute commands with an `ECSTask` block directly within a Prefect flow. Running containers via ECS in your flows is useful for executing non-Python code in a distributed manner while using Prefect.
Congrats! You can now load the saved block to use your credentials in your Python code:

```python
from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.ecs import ECSTask

@flow
def ecs_task_flow():
ecs_task = ECSTask(
image="prefecthq/prefect:2-python3.10",
credentials=AwsCredentials.load("BLOCK-NAME-PLACEHOLDER"),
region="us-east-2",
command=["echo", "Hello, Prefect!"],
)
return ecs_task.run()
AwsCredentials.load("BLOCK-NAME-PLACEHOLDER")
```

This setup gives you all of the observation and orchestration benefits of Prefect, while also providing you the scalability of ECS.

### Using Prefect with AWS S3

`prefect_aws` allows you to read and write objects with AWS S3 within your Prefect flows.
Expand All @@ -208,7 +89,7 @@ def s3_flow():
aws_credentials = AwsCredentials.load("BLOCK-NAME-PLACEHOLDER")
s3_bucket = S3Bucket(
bucket_name="BUCKET-NAME-PLACEHOLDER",
aws_credentials=aws_credentials
credentials=aws_credentials
)

s3_bucket_path = s3_bucket.upload_from_path(file_path)
Expand Down Expand Up @@ -242,36 +123,22 @@ def secrets_manager_flow():
secrets_manager_flow()
```

## Resources

Refer to the API documentation on the sidebar to explore all the capabilities of Prefect AWS!

For more tips on how to use tasks and flows in a Collection, check out [Using Collections](https://docs.prefect.io/collections/usage/)!

### Recipes

For additional recipes and examples, check out [`prefect-recipes`](https://github.com/PrefectHQ/prefect-recipes).

### Installation
### Using Prefect with AWS ECS

Install `prefect-aws`
`prefect_aws` allows you to use [AWS ECS](https://aws.amazon.com/ecs/) as infrastructure for your deployments. Using ECS for scheduled flow runs enables the dynamic provisioning of infrastructure for containers and unlocks greater scalability. This setup gives you all of the observation and orchestration benefits of Prefect, while also providing you the scalability of ECS.

```bash
pip install prefect-aws
```
See the [ECS guide](/ecs_guide/) for a full walkthrough.

A list of available blocks in `prefect-aws` and their setup instructions can be found [here](https://PrefectHQ.github.io/prefect-aws/#blocks-catalog).

Requires an installation of Python 3.7+
## Resources

We recommend using a Python virtual environment manager such as pipenv, conda or virtualenv.
Refer to the API documentation on the sidebar to explore all the capabilities of Prefect AWS!

These tasks are designed to work with Prefect 2.0. For more information about how to use Prefect, please refer to the [Prefect documentation](https://docs.prefect.io/).
For more tips on how to use blocks and tasks in Prefect integration libraries, check out the [docs](https://docs.prefect.io/integrations/usage/)!

### Feedback
For more information about how to use Prefect, please refer to the [Prefect documentation](https://docs.prefect.io/).

If you encounter any bugs while using `prefect-aws`, feel free to open an issue in the [`prefect-aws`](https://github.com/PrefectHQ/prefect-aws) repository.

If you have any questions or issues while using `prefect-aws`, you can find help in either the [Prefect Discourse forum](https://discourse.prefect.io/) or the [Prefect Slack community](https://prefect.io/slack).
Feel free to star or watch [`prefect-aws`](https://github.com/PrefectHQ/prefect-aws) for updates too!
If you have any questions or issues while using `prefect-aws`, you can find help in the [Prefect Slack community](https://prefect.io/slack).

Feel free to check out the source code and give [`prefect-aws`](https://github.com/PrefectHQ/prefect-aws) a ⭐️!
40 changes: 23 additions & 17 deletions prefect_aws/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,8 +594,8 @@ async def read_path(self, path: str) -> bytes:
s3_bucket_block = S3Bucket(
bucket_name="bucket",
aws_credentials=aws_creds,
basepath="subfolder"
credentials=aws_creds,
bucket_folder="subfolder"
)
key_contents = s3_bucket_block.read_path(path="subfolder/file1")
Expand Down Expand Up @@ -645,7 +645,7 @@ async def write_path(self, path: str, content: bytes) -> str:
s3_bucket_block = S3Bucket(
bucket_name="bucket",
minio_credentials=minio_creds,
basepath="dogs/smalldogs",
bucket_folder="dogs/smalldogs",
endpoint_url="http://localhost:9000",
)
s3_havanese_path = s3_bucket_block.write_path(path="havanese", content=data)
Expand Down Expand Up @@ -1234,20 +1234,23 @@ async def copy_object(
"""
s3_client = self.credentials.get_s3_client()

source_bucket_name = self.bucket_name
source_path = self._resolve_path(Path(from_path).as_posix())
target_path = self._resolve_path(Path(to_path).as_posix())

source_bucket_name = self.bucket_name
target_bucket_name = self.bucket_name
# Default to copying within the same bucket
to_bucket = to_bucket or self

target_bucket_name: str
target_path: str
if isinstance(to_bucket, S3Bucket):
target_bucket_name = to_bucket.bucket_name
target_path = to_bucket._resolve_path(target_path)
target_path = to_bucket._resolve_path(Path(to_path).as_posix())
elif isinstance(to_bucket, str):
target_bucket_name = to_bucket
elif to_bucket is not None:
target_path = Path(to_path).as_posix()
else:
raise TypeError(
"to_bucket must be a string or S3Bucket, not"
f" {type(target_bucket_name)}"
f"to_bucket must be a string or S3Bucket, not {type(to_bucket)}"
)

self.logger.info(
Expand Down Expand Up @@ -1316,20 +1319,23 @@ async def move_object(
"""
s3_client = self.credentials.get_s3_client()

source_bucket_name = self.bucket_name
source_path = self._resolve_path(Path(from_path).as_posix())
target_path = self._resolve_path(Path(to_path).as_posix())

source_bucket_name = self.bucket_name
target_bucket_name = self.bucket_name
# Default to moving within the same bucket
to_bucket = to_bucket or self

target_bucket_name: str
target_path: str
if isinstance(to_bucket, S3Bucket):
target_bucket_name = to_bucket.bucket_name
target_path = to_bucket._resolve_path(target_path)
target_path = to_bucket._resolve_path(Path(to_path).as_posix())
elif isinstance(to_bucket, str):
target_bucket_name = to_bucket
elif to_bucket is not None:
target_path = Path(to_path).as_posix()
else:
raise TypeError(
"to_bucket must be a string or S3Bucket, not"
f" {type(target_bucket_name)}"
f"to_bucket must be a string or S3Bucket, not {type(to_bucket)}"
)

self.logger.info(
Expand Down
Loading

0 comments on commit 4cae60d

Please sign in to comment.