Skip to content

Commit

Permalink
Merge pull request #3 from adidas/release/5.3.0
Browse files Browse the repository at this point in the history
Integrate release/5.3.0 changes into master
  • Loading branch information
bemu authored Aug 27, 2020
2 parents f2c0e1b + df4d3b7 commit 49bea6f
Show file tree
Hide file tree
Showing 80 changed files with 1,401 additions and 904 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
*.yml
*.iml
*.pyc
config/credentials/*
config/keytab/*
config/aws/*
.idea/*
.cache/*
.pytest_cache/*
Expand Down
20 changes: 6 additions & 14 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,18 @@ RUN yum update -y && \
which \
gcc-c++ \
python3 \
python3-devel \
unixODBC-devel

# Installing Oracle client
RUN wget -q -O /etc/yum.repos.d/public-yum-ol7.repo http://yum.oracle.com/public-yum-ol7.repo && \
wget -q -O /tmp/RPM-GPG-KEY-oracle-ol7 http://yum.oracle.com/RPM-GPG-KEY-oracle-ol7 && \
rpm --import /tmp/RPM-GPG-KEY-oracle-ol7 && \
rm /tmp/RPM-GPG-KEY-oracle-ol7 && \
yum install -y yum-utils && \
yum-config-manager --enable ol7_oracle_instantclient && \
yum -y install oracle-instantclient18.3-basiclite && \
yum clean all
python3-devel

# Installing Python dependencies
COPY requirements.txt /tmp/m3d-api-requirements.txt
RUN pip3 install -r /tmp/m3d-api-requirements.txt --ignore-installed chardet && \
pip3 install awscli==1.16.96 && \
rm /tmp/m3d-api-requirements.txt

# Setting environment variables
ENV LD_LIBRARY_PATH=/usr/lib/oracle/18.3/client64/lib:$LD_LIBRARY_PATH
RUN groupadd -r m3d && \
useradd -r -g m3d m3d && \
mkdir -p /home/m3d && \
chown m3d:m3d /home/m3d
USER m3d

CMD ["/bin/bash"]
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
M3D API
=======

![](static/images/m3d_logo.png)
![M3D logo](/static/images/m3d_logo.png)

**M3D** stands for _Metadata Driven Development_ and is a cloud and platform agnostic framework for the automated creation, management and governance of metadata and data flows from multiple source to multiple target systems. The main features and design goals of M3D are:

Expand Down Expand Up @@ -41,7 +41,7 @@ These are the layers defined in the M3D architecture:

Graphically, the architecture of M3D looks like this:

![](static/images/m3d_layers.svg)
![M3D Architecture](/static/images/m3d_layers.png)


### AWS Prerequisites for Out of the Box Usage
Expand Down Expand Up @@ -117,7 +117,6 @@ For advanced users, you may use [conda](https://conda.io) for installing M3D by
"upload": "upload",
"pushdown": "pushdown",
"aws": "aws",
"hdfs": "hdfs",
"file": "file"
},
"data_dict_delimiter": "|"
Expand Down Expand Up @@ -173,6 +172,7 @@ The steps are the following:
-destination_database emr_database \
-destination_environment test \
-destination_table table_name \
-destination_table_location_prefix table_location_prefix \
-emr_cluster_id id-of-started-cluster
```

Expand Down
4 changes: 2 additions & 2 deletions common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ function exec_command_within_container() {

if [[ -z "$LOCAL_IS_INTERACTIVE" ]]; then
echo "Executing command within container: $LOCAL_CMD"
docker exec "$LOCAL_CONTAINER_INSTANCE_NAME" bash -c "cd /root/workspace/${LOCAL_PROJECT_NAME} && ${LOCAL_CMD}"
docker exec "$LOCAL_CONTAINER_INSTANCE_NAME" bash -c "cd /m3d/workspace/${LOCAL_PROJECT_NAME} && ${LOCAL_CMD}"
else
echo "Executing command within container in interactive mode: $LOCAL_CMD"
docker exec -it "$LOCAL_CONTAINER_INSTANCE_NAME" bash -c "cd /root/workspace/${LOCAL_PROJECT_NAME} && ${LOCAL_CMD}"
docker exec -it "$LOCAL_CONTAINER_INSTANCE_NAME" bash -c "cd /m3d/workspace/${LOCAL_PROJECT_NAME} && ${LOCAL_CMD}"
fi
}

Expand Down
3 changes: 2 additions & 1 deletion config/m3d/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
"tags": {
"full_load": "full_load",
"delta_load": "delta_load",
"delta_lake_load": "delta_lake_load",
"append_load": "append_load",
"decom_gzip": "gzip_decompressor",
"table_suffix_stage": "_stg1",
"table_suffix_swap": "_swap",
"config": "config",
Expand All @@ -25,7 +27,6 @@
"upload": "upload",
"pushdown": "pushdown",
"aws": "aws",
"hdfs": "hdfs",
"file": "file"
},
"data_dict_delimiter": "|"
Expand Down
25 changes: 13 additions & 12 deletions config/system/scon-bdp-emr_test.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,17 @@
"api_long_timeout_seconds": 43200,
"aws_region": "eu-west-1",
"packages_to_deploy": [
"config",
"exceptions",
"hadoop",
"hadoop/core",
"hadoop/emr",
"hadoop/dataset",
"hadoop/algorithm",
"hadoop/load",
"system",
"oracle",
"util"
],
"config",
"exceptions",
"hadoop",
"hadoop/core",
"hadoop/emr",
"hadoop/dataset",
"hadoop/algorithm",
"hadoop/load",
"system",
"util"
],
"configs_to_deploy": [
"api",
"m3d",
Expand All @@ -96,6 +95,7 @@
"s3_dir_base": "/bdp/",
"subdir": {
"data": "data/",
"delta_table": "delta_table/",
"error": "error/",
"log": "log/",
"work": "work/",
Expand All @@ -107,6 +107,7 @@
"loading": "loading/",
"full_load": "full_load/",
"delta_load": "delta_load/",
"delta_lake_load": "delta_lake_load/",
"append_load": "append_load/",
"black_whole": "black_whole/",
"credentials": "credentials/",
Expand Down
6 changes: 3 additions & 3 deletions dev-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ CONTAINER_IMAGE_NAME="$PROJECT_NAME"

PARAM_WORKSPACE=( "workspace" "w" "m3d-engine code directory (must be the same within the container life-cycle)")
PARAM_TEST_TYPE=( "test-type" "t" "type of tests to run, possible values are [unit|integration|all]")
PARAM_TEST_MARK=( "test-mark" "m" "pytest mark for filtering tests, possible values are [bdp|emr|algo|oracle]")
PARAM_TEST_MARK=( "test-mark" "m" "pytest mark for filtering tests, possible values are [bdp|emr|algo]")
OPTION_HELP=( "help" "h" "show help message for the command")
OPTION_INTERACTIVE=( "interactive" "i" "use interactive mode and allocate pseudo-TTY when executing a command inside the container")

Expand Down Expand Up @@ -152,7 +152,7 @@ elif [[ "$ACTION" == "$ARG_ACTION_CONTAINER_RUN" ]]; then
echo "Running the container $CONTAINER_INSTANCE_NAME ..."
validate_args_are_empty "$HELP_STRING" "${OTHER_ARGS[@]}"

docker run -t -d --name "$CONTAINER_INSTANCE_NAME" -v "${WORKSPACE}:/root/workspace/${PROJECT_NAME}" "$CONTAINER_IMAGE_NAME"
docker run -t -d --name "$CONTAINER_INSTANCE_NAME" -v "${WORKSPACE}:/m3d/workspace/${PROJECT_NAME}" "$CONTAINER_IMAGE_NAME"

# clean pyc-files in the project directory
elif [[ "$ACTION" == "$ARG_ACTION_PROJECT_CLEAN" ]]; then
Expand Down Expand Up @@ -199,7 +199,7 @@ elif [[ "$ACTION" == "$ARG_ACTION_PROJECT_TEST" ]]; then
validate_args_are_empty "$HELP_STRING" "${OTHER_ARGS[@]}"

AVAILABLE_TEST_TYPES=("all" "unit" "integration")
AVAILABLE_TEST_MARKS=("bdp" "emr" "algo" "oracle")
AVAILABLE_TEST_MARKS=("bdp" "emr" "algo")

if [[ -z "$TEST_TYPE" ]]; then
RUN_TESTS_CMD="python3 ./test/test_runner.py all"
Expand Down
3 changes: 0 additions & 3 deletions m3d/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,2 @@
# __init__.py file in a directory indicates to the Python interpreter that the directory should be treated
# like a Python package
# flake8: noqa

from .m3d import M3D
29 changes: 2 additions & 27 deletions m3d/config/config_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,14 @@ class ConfigService(object):
class Prefixes(object):
ACON = "acon"
SCON = "scon"
DDIC = "ddic"
TCONX = "tconx"

class Protocols(object):
S3A = "s3a://"
S3 = "s3://"

class Extensions(object):
JSON = ".json"
SQL = ".sql"
HQL = ".hql"
SH = ".sh"

def __init__(self, config):
# store parameters
Expand All @@ -43,7 +39,9 @@ def __init__(self, config):
# prefixes for algorithm configuration files
self.tag_full_load = params["tags"]["full_load"]
self.tag_delta_load = params["tags"]["delta_load"]
self.tag_delta_lake_load = params["tags"]["delta_lake_load"]
self.tag_append_load = params["tags"]["append_load"]
self.tag_decom_gzip = params["tags"]["decom_gzip"]

# suffixes for staging and swap tables
self.tag_table_suffix_stage = params["tags"]["table_suffix_stage"]
Expand All @@ -60,7 +58,6 @@ def __init__(self, config):
self.tag_aws = params["tags"]["aws"]

# protocol tags and required constants for them
# TODO: Remove hdfs tag from config
self.tag_file = params["tags"]["file"]

def get_scon_path(self, source_system, database):
Expand Down Expand Up @@ -181,25 +178,3 @@ def get_acon_path(self, destination_database, destination_environment, algorithm
)

return base_path

def get_ddic_path(self, source_system, src_database, source_schema, source_table):
"""
Return ddic path for upload system export
:param source_system source system code
:param src_database source database code
:param source_schema upload schema code
:param source_table: upload table code
:return: ddic file for upload system export
"""
filename = "-".join([
ConfigService.Prefixes.DDIC,
source_system,
src_database,
source_schema,
source_table
]) + ConfigService.Extensions.CSV

base_path = os.path.join(self.tag_config, self.tag_table, self.tag_upload, source_system, filename)

return base_path
19 changes: 2 additions & 17 deletions m3d/exceptions/m3d_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def __init__(self, load_type, message=None):
class M3DUnsupportedSystemException(M3DUnsupportedDataTypeException):
"""
This exception is thrown when the requested service does not support the system/technology.
For example, some supported systems/technologies are Hive, Oracle, Exasol, etc.
For example: Hive...
"""

def __init__(self, system, message=None):
Expand All @@ -89,8 +89,7 @@ def __init__(self, system, message=None):
class M3DUnsupportedStorageException(M3DUnsupportedDataTypeException):
"""
This exception is thrown when the requested service does not support the system/technology.
For example, it can be raised while calling the hdfs_table.HDFSTable().drop_tables()
over a non HDFS storage.
For example, it can be raised while calling the drop_tables over a non HDFS compatible storage.
"""

def __init__(self, storage, message=None):
Expand Down Expand Up @@ -190,17 +189,3 @@ def __init__(self, message=None):
message = "Error integration with EMR API"
self.message = message
super(M3DEMRApiException, self).__init__(message)


class M3DReconciliationDeviationException(M3DIOException):
"""
A general exception to be thrown if at least one reconciliation task for a table
returned a deviation from the expected result
The programmer is responsible to throw an appropriate message.
"""

def __init__(self, message=None):
if message is None:
message = "Reconciliation failed."
self.message = message
super(M3DReconciliationDeviationException, self).__init__(message)
4 changes: 2 additions & 2 deletions m3d/hadoop/algorithm/algorithm_configuration_hadoop.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ def __init__(self, algorithm_instance, acon_dict):
self._algorithm_instance = algorithm_instance

environment_section = acon_dict[self.Sections.ENVIRONMENT]
# TODO: this should not be a class field because all the parameters should be contained in algorithm/parameters,
# at the moment it is used for compatibility with Reconciliation and GzipDecompressor
# This should not be an instance variable because all the parameters should be contained in
# algorithm/parameters. Used for compatibility with GzipDecompressor
self._algorithm_params = acon_dict[self.Sections.ALGORITHM]

self._python_class = self._algorithm_params[self.Keys.PYTHON_CLASS]
Expand Down
14 changes: 8 additions & 6 deletions m3d/hadoop/algorithm/algorithm_executor_hadoop.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
from m3d.hadoop.algorithm.algorithm_algorithm_template import AlgorithmAlgorithmTemplate
from m3d.hadoop.algorithm.algorithm_configuration_hadoop import AlgorithmConfigurationHadoop
from m3d.hadoop.algorithm.algorithm_fixed_length_string_extractor import AlgorithmFixedLengthStringExtractor
from m3d.hadoop.algorithm.algorithm_gzip_decompression_emr import AlgorithmGzipDecompressionEMR
from m3d.hadoop.algorithm.algorithm_gzip_decompressor import AlgorithmGzipDecompressor
from m3d.hadoop.algorithm.algorithm_nested_flattener import AlgorithmNestedFlattener
from m3d.hadoop.algorithm.algorithm_partition_materialization import AlgorithmPartitionMaterialization
from m3d.hadoop.algorithm.algorithm_materialization import AlgorithmMaterialization
from m3d.hadoop.algorithm.algorithm_transpose import AlgorithmTranspose
from m3d.hadoop.algorithm.algorithm_scala_runner import AlgorithmScalaRunner
from m3d.hadoop.core.spark_executor import SparkExecutor
from m3d.hadoop.emr.emr_system import EMRSystem
Expand Down Expand Up @@ -52,13 +53,14 @@ def _get_supported_emr_algorithms():
"""

return {
"AlgorithmGzipDecompressionBytesEMR": AlgorithmGzipDecompressionEMR,
"AlgorithmGzipDecompressorBytes": AlgorithmGzipDecompressor,
"AlgorithmScalaRunner": AlgorithmScalaRunner,
"AlgorithmPartitionFullMaterialization": AlgorithmPartitionMaterialization.FullPartitionMaterialization,
"AlgorithmPartitionQueryMaterialization": AlgorithmPartitionMaterialization.QueryPartitionMaterialization,
"AlgorithmPartitionRangeMaterialization": AlgorithmPartitionMaterialization.RangePartitionMaterialization,
"AlgorithmFullMaterialization": AlgorithmMaterialization.FullMaterialization,
"AlgorithmQueryMaterialization": AlgorithmMaterialization.QueryMaterialization,
"AlgorithmRangeMaterialization": AlgorithmMaterialization.RangeMaterialization,
"AlgorithmFixedLengthStringExtractor": AlgorithmFixedLengthStringExtractor,
"AlgorithmNestedFlattener": AlgorithmNestedFlattener,
"AlgorithmTranspose": AlgorithmTranspose,
"AlgorithmAlgorithmTemplate": AlgorithmAlgorithmTemplate
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,42 @@
from m3d.hadoop.emr.s3_table import S3Table


class AlgorithmGzipDecompressionEMR(AlgorithmHadoop):
class AlgorithmGzipDecompressor(AlgorithmHadoop):

def __init__(self, execution_system, algorithm_instance, algorithm_params):
"""
Initialize Algorithm Decompression
Initialize Algorithm GzipDecompressor
:param execution_system: an instance of EMRSystem object
:param algorithm_instance: name of the algorithm instance
:param algorithm_params: algorithm configuration
"""

super(AlgorithmGzipDecompressionEMR, self).__init__(execution_system, algorithm_instance, algorithm_params)
super(AlgorithmGzipDecompressor, self).__init__(execution_system, algorithm_instance, algorithm_params)

destination_table_name = algorithm_params["destination_table"]
self._table = S3Table(execution_system, destination_table_name)

if 'format' in algorithm_params:
self._format = algorithm_params["format"]
else:
self._format = "csv"

self._thread_pool_size = self._parameters["thread_pool_size"]

def get_scala_class(self):
return ScalaClasses.GZIP_DECOMPRESSOR

def build_params(self):
return GzipDecompressionParams(self._table.dir_landing_final, self._thread_pool_size).__dict__
return GzipDecompressorParams(self._table.dir_landing_final, self._format, self._thread_pool_size).__dict__


class GzipDecompressionParams(object):
class GzipDecompressorParams(object):
"""
Class resembling the contents of the algorithms parameter file
"""
def __init__(self, directory, thread_pool_size):
self.format = "csv" # TODO: Make this dynamic in the future. As of now, we are only dealing with csv files.

def __init__(self, directory, format, thread_pool_size):
self.format = format
self.directory = directory
self.thread_pool_size = thread_pool_size
1 change: 0 additions & 1 deletion m3d/hadoop/algorithm/algorithm_hadoop.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

class AlgorithmHadoop(object):

# TODO: "algorithm_params" should contain items from algorithm/parameters
def __init__(self, execution_system, algorithm_instance, algorithm_params):
"""
Initialize generic Algorithm class
Expand Down
Loading

0 comments on commit 49bea6f

Please sign in to comment.