-
Notifications
You must be signed in to change notification settings - Fork 7
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #32 from maazmaqsood/dynamic-base-image
Enabling Dynamic Addition of Images for Dockerization
- Loading branch information
Showing
9 changed files
with
327 additions
and
29 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
apiVersion: argoproj.io/v1alpha1 | ||
kind: Workflow | ||
metadata: | ||
generateName: argo-train- | ||
spec: | ||
entrypoint: DAG-etl-pipeline | ||
templates: | ||
- name: create-spark-session-template | ||
container: | ||
image: maaz112233/pirlib | ||
command: | ||
- python | ||
- -m | ||
- pirlib.backends.argo_batch | ||
- node | ||
- gASVdgIAAAAAAACMCnBpcmxpYi5waXKUjAROb2RllJOUKYGUfZQojAJpZJSMFGNyZWF0ZV9zcGFya19zZXNzaW9ulIwLZW50cnlwb2ludHOUfZSMBG1haW6UaACMCkVudHJ5cG9pbnSUk5QpgZR9lCiMB3ZlcnNpb26UjAJ2MZSMB2hhbmRsZXKUjC5leGFtcGxlcy5ldGxfcGlwZWxpbmUuZXRsOmNyZWF0ZV9zcGFya19zZXNzaW9ulIwHcnVudGltZZSMCnB5dGhvbjozLjiUjAdjb2RldXJslE6MBWltYWdllIwRbWFhejExMjIzMy9waXJsaWKUdWJzjAlmcmFtZXdvcmuUTowGY29uZmlnlH2UjAV0aW1lcpSJc4wGaW5wdXRzlF2UaACMBUlucHV0lJOUKYGUfZQoaAWMB2RhdGFzZXSUjAZpb3R5cGWUjAlESVJFQ1RPUlmUjAZzb3VyY2WUaACMCkRhdGFTb3VyY2WUk5QpgZR9lCiMB25vZGVfaWSUTowLc3ViZ3JhcGhfaWSUTowJb3V0cHV0X2lklE6MDmdyYXBoX2lucHV0X2lklIwHZGF0YXNldJR1YowEbWV0YZRoAIwITWV0YWRhdGGUk5QpgZR9lCiMBG5hbWWUjAdkYXRhc2V0lIwLYW5ub3RhdGlvbnOUTnVidWJhjAdvdXRwdXRzlF2UaACMBk91dHB1dJSTlCmBlH2UKGgFjAZyZXR1cm6UaCKMCURJUkVDVE9SWZRoLmgwKYGUfZQoaDOMBnJldHVybpRoNU51YnViYWguaDApgZR9lChoM4wUY3JlYXRlX3NwYXJrX3Nlc3Npb26UaDVOdWJ1Yi4= | ||
- gASVigAAAAAAAABdlIwKcGlybGliLnBpcpSMCkdyYXBoSW5wdXSUk5QpgZR9lCiMAmlklIwHZGF0YXNldJSMBmlvdHlwZZSMCURJUkVDVE9SWZSMBG1ldGGUaAGMCE1ldGFkYXRhlJOUKYGUfZQojARuYW1llIwHZGF0YXNldJSMC2Fubm90YXRpb25zlE51YnViYS4= | ||
volumeMounts: | ||
- name: node-outputs | ||
mountPath: /mnt/node_outputs | ||
- name: dataset | ||
mountPath: /mnt/graph_inputs/dataset | ||
volumes: | ||
- name: node-outputs | ||
nfs: | ||
server: k8s-master.cm.cluster | ||
path: /home/maaz/pirlib/examples/etl_pipeline/outputs | ||
readOnly: no | ||
- name: dataset | ||
nfs: | ||
server: k8s-master.cm.cluster | ||
path: /home/maaz/pirlib/examples/etl_pipeline/inputs | ||
readOnly: yes | ||
- name: extract-transform-load-template | ||
container: | ||
image: maaz112233/pirlib | ||
command: | ||
- python | ||
- -m | ||
- pirlib.backends.argo_batch | ||
- node | ||
- gASVfAIAAAAAAACMCnBpcmxpYi5waXKUjAROb2RllJOUKYGUfZQojAJpZJSMFmV4dHJhY3RfdHJhbnNmb3JtX2xvYWSUjAtlbnRyeXBvaW50c5R9lIwEbWFpbpRoAIwKRW50cnlwb2ludJSTlCmBlH2UKIwHdmVyc2lvbpSMAnYxlIwHaGFuZGxlcpSMMGV4YW1wbGVzLmV0bF9waXBlbGluZS5ldGw6ZXh0cmFjdF90cmFuc2Zvcm1fbG9hZJSMB3J1bnRpbWWUjApweXRob246My44lIwHY29kZXVybJROjAVpbWFnZZSMEW1hYXoxMTIyMzMvcGlybGlilHVic4wJZnJhbWV3b3JrlE6MBmNvbmZpZ5R9lIwFdGltZXKUiXOMBmlucHV0c5RdlGgAjAVJbnB1dJSTlCmBlH2UKGgFjAdkYXRhc2V0lIwGaW90eXBllIwJRElSRUNUT1JZlIwGc291cmNllGgAjApEYXRhU291cmNllJOUKYGUfZQojAdub2RlX2lklE6MC3N1YmdyYXBoX2lklE6MCW91dHB1dF9pZJROjA5ncmFwaF9pbnB1dF9pZJSMB2RhdGFzZXSUdWKMBG1ldGGUaACMCE1ldGFkYXRhlJOUKYGUfZQojARuYW1llIwHZGF0YXNldJSMC2Fubm90YXRpb25zlE51YnViYYwHb3V0cHV0c5RdlGgAjAZPdXRwdXSUk5QpgZR9lChoBYwGcmV0dXJulGgijAlESVJFQ1RPUlmUaC5oMCmBlH2UKGgzjAZyZXR1cm6UaDVOdWJ1YmFoLmgwKYGUfZQoaDOMFmV4dHJhY3RfdHJhbnNmb3JtX2xvYWSUaDVOdWJ1Yi4= | ||
- gASVigAAAAAAAABdlIwKcGlybGliLnBpcpSMCkdyYXBoSW5wdXSUk5QpgZR9lCiMAmlklIwHZGF0YXNldJSMBmlvdHlwZZSMCURJUkVDVE9SWZSMBG1ldGGUaAGMCE1ldGFkYXRhlJOUKYGUfZQojARuYW1llIwHZGF0YXNldJSMC2Fubm90YXRpb25zlE51YnViYS4= | ||
volumeMounts: | ||
- name: node-outputs | ||
mountPath: /mnt/node_outputs | ||
- name: dataset | ||
mountPath: /mnt/graph_inputs/dataset | ||
volumes: | ||
- name: node-outputs | ||
nfs: | ||
server: k8s-master.cm.cluster | ||
path: /home/maaz/pirlib/examples/etl_pipeline/outputs | ||
readOnly: no | ||
- name: dataset | ||
nfs: | ||
server: k8s-master.cm.cluster | ||
path: /home/maaz/pirlib/examples/etl_pipeline/inputs | ||
readOnly: yes | ||
- name: etl-pipeline-template | ||
container: | ||
image: maaz112233/pirlib | ||
command: | ||
- python | ||
- -m | ||
- pirlib.backends.argo_batch | ||
- graph | ||
- gASVBAEAAAAAAABdlIwKcGlybGliLnBpcpSMC0dyYXBoT3V0cHV0lJOUKYGUfZQojAJpZJSMBnJldHVybpSMBmlvdHlwZZSMCURJUkVDVE9SWZSMBnNvdXJjZZRoAYwKRGF0YVNvdXJjZZSTlCmBlH2UKIwHbm9kZV9pZJSMFmV4dHJhY3RfdHJhbnNmb3JtX2xvYWSUjAtzdWJncmFwaF9pZJROjAlvdXRwdXRfaWSUjAZyZXR1cm6UjA5ncmFwaF9pbnB1dF9pZJROdWKMBG1ldGGUaAGMCE1ldGFkYXRhlJOUKYGUfZQojARuYW1llIwGcmV0dXJulIwLYW5ub3RhdGlvbnOUTnVidWJhLg== | ||
volumeMounts: | ||
- name: node-outputs | ||
mountPath: /mnt/node_outputs | ||
- name: dataset | ||
mountPath: /mnt/graph_inputs/dataset | ||
- name: graph-outputs | ||
mountPath: /mnt/graph_outputs | ||
volumes: | ||
- name: node-outputs | ||
nfs: | ||
server: k8s-master.cm.cluster | ||
path: /home/maaz/pirlib/examples/etl_pipeline/outputs | ||
readOnly: no | ||
- name: dataset | ||
nfs: | ||
server: k8s-master.cm.cluster | ||
path: /home/maaz/pirlib/examples/etl_pipeline/inputs | ||
readOnly: yes | ||
- name: graph-outputs | ||
nfs: | ||
server: k8s-master.cm.cluster | ||
path: /home/maaz/pirlib/examples/etl_pipeline/outputs | ||
readOnly: no | ||
- name: DAG-etl-pipeline | ||
dag: | ||
tasks: | ||
- name: create-spark-session | ||
template: create-spark-session-template | ||
dependencies: [] | ||
- name: extract-transform-load | ||
template: extract-transform-load-template | ||
dependencies: [] | ||
- name: etl-pipeline | ||
template: etl-pipeline-template | ||
dependencies: | ||
- create-spark-session | ||
- extract-transform-load |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
from dataclasses import asdict | ||
import requests | ||
import pandas as pd | ||
from pirlib.iotypes import DirectoryPath, FilePath | ||
from pirlib.pipeline import pipeline | ||
from pirlib.task import task | ||
from pyspark.sql import SparkSession | ||
|
||
""" | ||
Python Extract Transform Load Pipeline Example | ||
""" | ||
|
||
|
||
@task | ||
def create_spark_session(dataset: DirectoryPath) -> DirectoryPath: | ||
# Create a Spark session | ||
spark = SparkSession.builder.appName("Simple Spark Session").getOrCreate() | ||
# Check if the Spark session is successfully created | ||
print("Spark version:", spark.version) | ||
# Perform some simple operations using the Spark session | ||
data = [("Alice", 34), ("Bob", 45), ("Charlie", 29)] | ||
df = spark.createDataFrame(data, ["Name", "Age"]) | ||
# Show the DataFrame | ||
df.show() | ||
# Stop the Spark session when done | ||
spark.stop() | ||
return dataset | ||
|
||
|
||
@task | ||
def extract_transform_load(dataset: DirectoryPath) -> DirectoryPath: | ||
"""This API extracts data from | ||
http://universities.hipolabs.com | ||
""" | ||
API_URL = "http://universities.hipolabs.com/search?country=United+States" | ||
data = requests.get(API_URL).json() | ||
|
||
df = pd.DataFrame(data) | ||
print(f"Total Number of universities from API {len(data)}") | ||
|
||
df = df[df["name"].str.contains("California")] | ||
print(f"Number of universities in california {len(df)}") | ||
|
||
df["domains"] = [",".join(map(str, l)) for l in df["domains"]] | ||
df["web_pages"] = [",".join(map(str, l)) for l in df["web_pages"]] | ||
df = df.reset_index(drop=True) | ||
df = df[["domains", "country", "web_pages", "name"]] | ||
|
||
outdir = task.context().output | ||
file_name = outdir / "file.csv" | ||
df.to_csv(file_name, sep="\t", encoding="utf-8") | ||
|
||
return outdir | ||
|
||
|
||
@pipeline | ||
def etl_pipeline(dataset: DirectoryPath) -> DirectoryPath: | ||
create_spark_session(dataset) | ||
data = extract_transform_load(dataset) | ||
return data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
graphs: | ||
- id: etl_pipeline | ||
nodes: | ||
- id: create_spark_session | ||
entrypoints: | ||
main: | ||
version: v1 | ||
handler: examples.etl_pipeline.etl:create_spark_session | ||
runtime: python:3.8 | ||
codeurl: null | ||
image: maaz112233/pirlib | ||
framework: null | ||
config: | ||
timer: false | ||
inputs: | ||
- id: dataset | ||
iotype: DIRECTORY | ||
source: | ||
node_id: null | ||
subgraph_id: null | ||
output_id: null | ||
graph_input_id: dataset | ||
meta: | ||
name: dataset | ||
annotations: null | ||
outputs: | ||
- id: return | ||
iotype: DIRECTORY | ||
meta: | ||
name: return | ||
annotations: null | ||
meta: | ||
name: create_spark_session | ||
annotations: null | ||
- id: extract_transform_load | ||
entrypoints: | ||
main: | ||
version: v1 | ||
handler: examples.etl_pipeline.etl:extract_transform_load | ||
runtime: python:3.8 | ||
codeurl: null | ||
image: maaz112233/pirlib | ||
framework: null | ||
config: | ||
timer: false | ||
inputs: | ||
- id: dataset | ||
iotype: DIRECTORY | ||
source: | ||
node_id: null | ||
subgraph_id: null | ||
output_id: null | ||
graph_input_id: dataset | ||
meta: | ||
name: dataset | ||
annotations: null | ||
outputs: | ||
- id: return | ||
iotype: DIRECTORY | ||
meta: | ||
name: return | ||
annotations: null | ||
meta: | ||
name: extract_transform_load | ||
annotations: null | ||
subgraphs: [] | ||
inputs: | ||
- id: dataset | ||
iotype: DIRECTORY | ||
meta: | ||
name: dataset | ||
annotations: null | ||
outputs: | ||
- id: return | ||
iotype: DIRECTORY | ||
source: | ||
node_id: extract_transform_load | ||
subgraph_id: null | ||
output_id: return | ||
graph_input_id: null | ||
meta: | ||
name: return | ||
annotations: null | ||
meta: | ||
name: etl_pipeline | ||
annotations: null |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
requests | ||
pandas |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
EXAMPLEDIR=$(dirname $0) | ||
ROOTDIR=$EXAMPLEDIR/../.. | ||
|
||
### Module 1: Docker_Packaging | ||
python $ROOTDIR/bin/pircli dockerize \ | ||
$ROOTDIR \ | ||
--auto \ | ||
--pipeline examples.etl_pipeline.etl:etl_pipeline \ | ||
--output $EXAMPLEDIR/package_argo.yml \ | ||
--flatten \ | ||
--docker_base_image godatadriven/pyspark:latest | ||
|
||
# Convert EXAMPLEDIR to absolute path since docker can't bind-mount relative paths. | ||
EXAMPLEDIR=$([[ $EXAMPLEDIR = /* ]] && echo "$EXAMPLEDIR" || echo "$PWD/${EXAMPLEDIR#./}") | ||
|
||
### Module 2: Argoize_Module | ||
INPUT_dataset=$EXAMPLEDIR/inputs \ | ||
OUTPUT=$EXAMPLEDIR/outputs \ | ||
NFS_SERVER=k8s-master.cm.cluster \ | ||
python $ROOTDIR/bin/pircli generate $EXAMPLEDIR/package_argo.yml \ | ||
--target pirlib.backends.argo_batch:ArgoBatchBackend \ | ||
--output $EXAMPLEDIR/argo-train.yml | ||
|
||
# Run the Argo workflow | ||
argo submit -n argo --watch $EXAMPLEDIR/argo-train.yml | ||
|
Oops, something went wrong.