diff --git a/build_language_container.sh b/build_language_container.sh deleted file mode 100755 index 08abf1c1..00000000 --- a/build_language_container.sh +++ /dev/null @@ -1,57 +0,0 @@ -#!/usr/bin/env bash - -set -eo pipefail - -# set release goal, set to test by default -RELEASE_GOAL="--release-goal test" -if [ ! -z $1 ] -then - if [ "$1" = "release" ] - then - RELEASE_GOAL="" - else - echo "Invalid release goal: $1" - exit - fi -fi - - - -# main package - release -SCRIPT_DIR="$(dirname "$(readlink -f "${BASH_SOURCE[0]}")")" -pushd $SCRIPT_DIR &> /dev/null -poetry build - -FLAVOR_NAME=exasol_advanced_analytics_framework_container -echo "$FLAVOR_NAME" -FLAVOR_PATH="language_container/$FLAVOR_NAME" -RELEASE_BUILD_STEP_DIST_DIRECTORY="$FLAVOR_PATH/flavor_base/release/dist" -echo "Copy" dist/*.whl "$RELEASE_BUILD_STEP_DIST_DIRECTORY" -mkdir -p "$RELEASE_BUILD_STEP_DIST_DIRECTORY" || true -cp dist/*.whl "$RELEASE_BUILD_STEP_DIST_DIRECTORY" - - -# test package - test -if [ ! "$1" = "release" ] -then - TEST_PACKAGE_SCRIPT_DIR="$SCRIPT_DIR/tests/test_package" - pushd $TEST_PACKAGE_SCRIPT_DIR &> /dev/null - poetry build - - RELEASE_BUILD_STEP_DIST_DIRECTORY="$SCRIPT_DIR/$FLAVOR_PATH/flavor_base/test/dist" - echo "Copy" dist/*.whl "$RELEASE_BUILD_STEP_DIST_DIRECTORY" - mkdir -p "$RELEASE_BUILD_STEP_DIST_DIRECTORY" || true - cp dist/*.whl "$RELEASE_BUILD_STEP_DIST_DIRECTORY" - - - # switch to main directory - popd -fi - - -# build container -echo "Build container" -./language_container/exaslct export --flavor-path "$FLAVOR_PATH" $RELEASE_GOAL - -echo "Generate language activation" -./language_container/exaslct generate-language-activation --flavor-path "$FLAVOR_PATH" --bucketfs-name bfsdefault --bucket-name default --path-in-bucket container --container-name "$FLAVOR_NAME" diff --git a/doc/changes/changes_0.1.0.md b/doc/changes/changes_0.1.0.md index 9454c4e6..f0da0875 100644 --- a/doc/changes/changes_0.1.0.md +++ b/doc/changes/changes_0.1.0.md @@ -15,15 +15,15 @@ Code name: * #24: Added integrations test for event loop * #28: Extended the EventHandlerContext to a scope-based system for handling temporary objects * #29: Fixed interface of EventContext and adapted implementation of UDFEventContext -* #30: Sort cleanup queries in reverse order of their creation to ensure that temporary objects that depend on other are removed first +* #30: Sorted cleanup queries in reverse order of their creation to ensure that temporary objects that depend on other are removed first * #34: Added MockEventContext -* #35: Clean up after EventHandler exceptions and throw exceptions when a parent EventHandlerContext encounters an unreleased child during release +* #35: Cleaned up after EventHandler exceptions and throw exceptions when a parent EventHandlerContext encounters an unreleased child during release * #94: Prepare for release to PyPi * #17: Added vagrant setup * #97: Added SocketFactory wrapper which injects faults by losing messages * #98: Added more robust connection protocol * #99: Added multi/node udf discovery -* #100: Add combined global and local UDF discovery +* #100: Added combined global and local UDF discovery ### Bug Fixes @@ -33,7 +33,7 @@ Code name: * #65: Fixed that the `ScopeQueryHandlerContext` might not `_release` all child contexts, if a grand-child-context wasn't released * #68: Fixed that methods called in BucketFSLocationProxy.cleanup can fail and stop the cleanup * #66: Fixed _ScopeQueryHandlerContextBase.get_temporary_path not being private -* #116: Fix AbortTimeoutSender and add reason to Timeout messages +* #116: Fixed AbortTimeoutSender and add reason to Timeout messages ### Refactoring @@ -41,12 +41,15 @@ Code name: * #42: Updated dependencies * #72: Unified naming of released resources in QueryHandler * #88: Introduced an abstraction for ZMQ in UDF Communication -* #95: Remove setup.py +* #95: Removed setup.py * #114: Refactored BackgroundPeerState and introduced parameter objects * #173: Introduced Python Toolbox * #174: Replaced Language Container Stuff with PEC and SLC plugin * #183: Fixed warning on tests with `__init__` constructor +* #180: Replaced `start_integration_test_environment.sh` with `pytest-backend-plugin` * #184: Updated micromamba to the latest version 2.0.0 +* #176: Updated usage of `exasol-bucketfs` to new API +* #185: Removed directory and script for building SLC AAF ### Documentation diff --git a/exasol_advanced_analytics_framework/deploy.py b/exasol_advanced_analytics_framework/deploy.py index 2e723d12..c35e07c8 100644 --- a/exasol_advanced_analytics_framework/deploy.py +++ b/exasol_advanced_analytics_framework/deploy.py @@ -6,7 +6,6 @@ ) from exasol_advanced_analytics_framework.deployment import ( scripts_deployer_cli, - language_container_deployer_cli, ) from exasol.python_extension_common.deployment.language_container_deployer_cli import ( language_container_deployer_main, @@ -24,7 +23,7 @@ def main(): slc_parameter_formatters.set_formatter(CustomizableParameters.container_name, SLC_FILE_NAME) main.add_command(scripts_deployer_cli.scripts_deployer_main) -main.add_command(language_container_deployer_cli.language_container_deployer_main) +main.add_command(language_container_deployer_main) if __name__ == '__main__': diff --git a/exasol_advanced_analytics_framework/query_handler/context/proxy/bucketfs_location_proxy.py b/exasol_advanced_analytics_framework/query_handler/context/proxy/bucketfs_location_proxy.py index 33eaba07..6857bc71 100644 --- a/exasol_advanced_analytics_framework/query_handler/context/proxy/bucketfs_location_proxy.py +++ b/exasol_advanced_analytics_framework/query_handler/context/proxy/bucketfs_location_proxy.py @@ -1,19 +1,18 @@ import logging -from exasol_bucketfs_utils_python.abstract_bucketfs_location import AbstractBucketFSLocation - from exasol_advanced_analytics_framework.query_handler.context.proxy.object_proxy import ObjectProxy +import exasol.bucketfs as bfs LOGGER = logging.getLogger(__file__) class BucketFSLocationProxy(ObjectProxy): - def __init__(self, bucketfs_location: AbstractBucketFSLocation): + def __init__(self, bucketfs_location: bfs.path.PathLike): super().__init__() self._bucketfs_location = bucketfs_location - def bucketfs_location(self) -> AbstractBucketFSLocation: + def bucketfs_location(self) -> bfs.path.PathLike: self._check_if_released() return self._bucketfs_location @@ -26,16 +25,15 @@ def cleanup(self): def _remove_file(self, file): try: - self._bucketfs_location.delete_file_in_bucketfs(file) + file.rm() except Exception as e: LOGGER.error(f"Failed to remove {file}, got exception", exc_info=True) def _list_files(self): - files = [] try: - files = self._bucketfs_location.list_files_in_bucketfs("") + return list(self._bucketfs_location.iterdir()) except FileNotFoundError as e: - LOGGER.debug(f"File not found {self._bucketfs_location.get_complete_file_path_in_bucket()} during cleanup.") + LOGGER.debug(f"File not found {self._bucketfs_location.as_udf_path} during cleanup.") except Exception as e: LOGGER.exception(f"Got exception during listing files in temporary BucketFSLocation") - return files + return [] diff --git a/exasol_advanced_analytics_framework/query_handler/context/top_level_query_handler_context.py b/exasol_advanced_analytics_framework/query_handler/context/top_level_query_handler_context.py index 46a25040..d4575d30 100644 --- a/exasol_advanced_analytics_framework/query_handler/context/top_level_query_handler_context.py +++ b/exasol_advanced_analytics_framework/query_handler/context/top_level_query_handler_context.py @@ -3,7 +3,7 @@ from abc import ABC from typing import Set, List, Callable -from exasol_bucketfs_utils_python.abstract_bucketfs_location import AbstractBucketFSLocation +import exasol.bucketfs as bfs from exasol_data_science_utils_python.schema.schema_name import SchemaName from exasol_data_science_utils_python.schema.table_name import TableName from exasol_data_science_utils_python.schema.table_name_builder import TableNameBuilder @@ -62,7 +62,7 @@ def get_all_not_released_contexts(self): class _ScopeQueryHandlerContextBase(ScopeQueryHandlerContext, ABC): def __init__(self, - temporary_bucketfs_location: AbstractBucketFSLocation, + temporary_bucketfs_location: bfs.path.PathLike, temporary_db_object_name_prefix: str, temporary_schema_name: str, connection_lookup: ConnectionLookup, @@ -232,7 +232,7 @@ def get_connection(self, name: str) -> Connection: class TopLevelQueryHandlerContext(_ScopeQueryHandlerContextBase): def __init__(self, - temporary_bucketfs_location: AbstractBucketFSLocation, + temporary_bucketfs_location: bfs.path.PathLike, temporary_db_object_name_prefix: str, temporary_schema_name: str, connection_lookup: ConnectionLookup, @@ -283,7 +283,7 @@ def transfer_object_to(self, object_proxy: ObjectProxy, class _ChildQueryHandlerContext(_ScopeQueryHandlerContextBase): def __init__(self, parent: _ScopeQueryHandlerContextBase, - temporary_bucketfs_location: AbstractBucketFSLocation, + temporary_bucketfs_location: bfs.path.PathLike, temporary_db_object_name_prefix: str, temporary_schema_name: str, connection_lookup: ConnectionLookup, diff --git a/exasol_advanced_analytics_framework/udf_framework/query_handler_runner_udf.py b/exasol_advanced_analytics_framework/udf_framework/query_handler_runner_udf.py index ce8e3ee9..2f805234 100644 --- a/exasol_advanced_analytics_framework/udf_framework/query_handler_runner_udf.py +++ b/exasol_advanced_analytics_framework/udf_framework/query_handler_runner_udf.py @@ -1,22 +1,20 @@ import dataclasses import importlib +import json +import joblib import logging import traceback from collections import OrderedDict from enum import Enum, auto -from pathlib import PurePosixPath -from typing import Tuple, List, Optional - -from exasol_bucketfs_utils_python.abstract_bucketfs_location import AbstractBucketFSLocation -from exasol_bucketfs_utils_python.bucketfs_factory import BucketFSFactory -from exasol_data_science_utils_python.schema.column import \ - Column -from exasol_data_science_utils_python.schema.column_name \ - import ColumnName -from exasol_data_science_utils_python.schema.column_type \ - import ColumnType -from exasol_data_science_utils_python.schema.schema_name \ - import SchemaName +from typing import Any, Tuple, List, Optional + +import exasol.bucketfs as bfs +from io import BytesIO + +from exasol_data_science_utils_python.schema.column import Column +from exasol_data_science_utils_python.schema.column_name import ColumnName +from exasol_data_science_utils_python.schema.column_type import ColumnType +from exasol_data_science_utils_python.schema.schema_name import SchemaName from exasol_data_science_utils_python.schema.udf_name_builder import UDFNameBuilder from exasol_advanced_analytics_framework.query_handler.context.scope_query_handler_context import \ @@ -33,6 +31,26 @@ from exasol_advanced_analytics_framework.udf_framework.udf_connection_lookup import UDFConnectionLookup +def create_bucketfs_location_from_conn_object(bfs_conn_obj) -> bfs.path.PathLike: + bfs_params = json.loads(bfs_conn_obj.address) + bfs_params.update(json.loads(bfs_conn_obj.user)) + bfs_params.update(json.loads(bfs_conn_obj.password)) + return bfs.path.build_path(**bfs_params) + + +def upload_via_joblib(location: bfs.path.PathLike, object: Any): + buffer = BytesIO() + joblib.dump(object, buffer) + location.write(buffer.getvalue()) + + +def read_via_joblib(location: bfs.path.PathLike) -> Any: + buffer = BytesIO() + for chunk in location.read(): + buffer.write(chunk) + return joblib.load(buffer) + + @dataclasses.dataclass class UDFParameter: iter_num: int @@ -65,7 +83,7 @@ class QueryHandlerRunnerUDF: def __init__(self, exa): self.exa = exa - self.bucketfs_location: Optional[AbstractBucketFSLocation] = None + self.bucketfs_location: Optional[bfs.path.PathLike] = None self.parameter: Optional[UDFParameter] = None def run(self, ctx) -> None: @@ -173,10 +191,8 @@ def _get_parameter(self, ctx): def _create_bucketfs_location(self): bucketfs_connection_obj = self.exa.get_connection(self.parameter.temporary_bfs_location_conn) - bucketfs_location_from_con = BucketFSFactory().create_bucketfs_location( - url=bucketfs_connection_obj.address, - user=bucketfs_connection_obj.user, - pwd=bucketfs_connection_obj.password) + bucketfs_location_from_con = create_bucketfs_location_from_conn_object( + bucketfs_connection_obj) self.bucketfs_location = bucketfs_location_from_con \ .joinpath(self.parameter.temporary_bfs_location_directory) \ .joinpath(self.parameter.temporary_name_prefix) @@ -207,20 +223,17 @@ def _create_state(self) -> QueryHandlerRunnerState: return query_handler_state def _load_latest_state(self) -> QueryHandlerRunnerState: - state_file_bucketfs_path = self._generate_state_file_bucketfs_path() - query_handler_state: QueryHandlerRunnerState = \ - self.bucketfs_location.read_file_from_bucketfs_via_joblib(str(state_file_bucketfs_path)) - query_handler_state.connection_lookup.exa = self.exa - return query_handler_state + path = self._state_file_bucketfs_location() + state = read_via_joblib(path) + state.connection_lookup.exa = self.exa + return state def _save_current_state(self, current_state: QueryHandlerRunnerState) -> None: - next_state_file_bucketfs_path = self._generate_state_file_bucketfs_path(1) - self.bucketfs_location.upload_object_to_bucketfs_via_joblib( - current_state, str(next_state_file_bucketfs_path)) + path = self._state_file_bucketfs_location(1) + upload_via_joblib(path, current_state) def _remove_previous_state(self) -> None: - state_file_bucketfs_path = self._generate_state_file_bucketfs_path() - self.bucketfs_location.delete_file_in_bucketfs(str(state_file_bucketfs_path)) + self._state_file_bucketfs_location().rm() def _create_udf_query_result( self, ctx, query_columns: List[Column]) -> UDFQueryResult: @@ -265,9 +278,9 @@ def _get_query_columns(self): Column(ColumnName(col_name), ColumnType(col_type))) return query_columns - def _generate_state_file_bucketfs_path(self, iter_offset: int = 0) -> PurePosixPath: + def _state_file_bucketfs_location(self, iter_offset: int = 0) -> bfs.path.PathLike: num_iter = self.parameter.iter_num + iter_offset - return PurePosixPath(f"state/{str(num_iter)}.pkl") + return self.bucketfs_location / f"state/{str(num_iter)}.pkl" @staticmethod def emit_udf_result(ctx, udf_result: UDFResult): diff --git a/language_container/exaslct b/language_container/exaslct deleted file mode 120000 index 68d09557..00000000 --- a/language_container/exaslct +++ /dev/null @@ -1 +0,0 @@ -exaslct_scripts/exaslct.sh \ No newline at end of file diff --git a/language_container/exaslct_scripts/construct_docker_runner_image_name.sh b/language_container/exaslct_scripts/construct_docker_runner_image_name.sh deleted file mode 100644 index 737ddd18..00000000 --- a/language_container/exaslct_scripts/construct_docker_runner_image_name.sh +++ /dev/null @@ -1,11 +0,0 @@ -#!/usr/bin/env bash - -set -euo pipefail - -if [ -z "${1-}" ]; then - VERSION="$(git rev-parse HEAD 2>/dev/null || echo latest)" -else - VERSION="$1" -fi - -echo "exasol/script-language-container:container-tool-runner-$VERSION" diff --git a/language_container/exaslct_scripts/construct_docker_runner_image_name.sh.sha512sum b/language_container/exaslct_scripts/construct_docker_runner_image_name.sh.sha512sum deleted file mode 100644 index cc1232f5..00000000 --- a/language_container/exaslct_scripts/construct_docker_runner_image_name.sh.sha512sum +++ /dev/null @@ -1 +0,0 @@ -8686e05faa2d20fd688012516bdda558f5093b33666ac0ed2c268b3a27124223d2dc85cfc17204c65cec31ab20b7c5e213e08c3cc41fabcd63b3447399b222a7 construct_docker_runner_image_name.sh diff --git a/language_container/exaslct_scripts/exaslct.sh b/language_container/exaslct_scripts/exaslct.sh deleted file mode 100755 index 05398293..00000000 --- a/language_container/exaslct_scripts/exaslct.sh +++ /dev/null @@ -1,11 +0,0 @@ -#!/usr/bin/env bash - -set -euo pipefail - -SCRIPT_DIR="$(dirname "$(readlink -f "${BASH_SOURCE[0]}")")" - -EXASLCT_GIT_REF="0.9.0" -RUNNER_IMAGE_NAME="$(bash "$SCRIPT_DIR/construct_docker_runner_image_name.sh" "$EXASLCT_GIT_REF")" - -bash $SCRIPT_DIR/exaslct_within_docker_container_without_container_build.sh "$RUNNER_IMAGE_NAME" "${@}" - diff --git a/language_container/exaslct_scripts/exaslct_install_template.sh.sha512sum b/language_container/exaslct_scripts/exaslct_install_template.sh.sha512sum deleted file mode 100644 index 29dab345..00000000 --- a/language_container/exaslct_scripts/exaslct_install_template.sh.sha512sum +++ /dev/null @@ -1 +0,0 @@ -933f77021ee8d08557863a78468df2ca6fc9e5c6677ee04aca4e9630ad2218e3615a57bdb7b733a9ce075391248c26ed2b6b1e50f2ffb81e08d2ec737fadf013 exaslct_install_template.sh diff --git a/language_container/exaslct_scripts/exaslct_within_docker_container.sh b/language_container/exaslct_scripts/exaslct_within_docker_container.sh deleted file mode 100644 index bfa4d08a..00000000 --- a/language_container/exaslct_scripts/exaslct_within_docker_container.sh +++ /dev/null @@ -1,60 +0,0 @@ -#!/usr/bin/env bash - -set -euo pipefail - -RUNNER_IMAGE_NAME="$1" -shift 1 - -if [[ -t 1 ]]; then - terminal_parameter=-it -else - terminal_parameter="" -fi - -quoted_arguments='' -for argument in "${@}"; do - argument="${argument//\\/\\\\}" - quoted_arguments="$quoted_arguments \"${argument//\"/\\\"}\"" -done - -RUN_COMMAND="/script-languages-container-tool/starter_scripts/exaslct_without_poetry.sh $quoted_arguments; RETURN_CODE=\$?; chown -R $(id -u):$(id -g) .build_output &> /dev/null; exit \$RETURN_CODE" - -HOST_DOCKER_SOCKER_PATH="/var/run/docker.sock" -CONTAINER_DOCKER_SOCKER_PATH="/var/run/docker.sock" -DOCKER_SOCKET_MOUNT="$HOST_DOCKER_SOCKER_PATH:$CONTAINER_DOCKER_SOCKER_PATH" - -function create_env_file() { - touch "$tmpfile_env" - if [ -n "${TARGET_DOCKER_PASSWORD-}" ]; then - echo "TARGET_DOCKER_PASSWORD=$TARGET_DOCKER_PASSWORD" >> "$tmpfile_env" - fi - if [ -n "${SOURCE_DOCKER_PASSWORD-}" ]; then - echo "SOURCE_DOCKER_PASSWORD=$SOURCE_DOCKER_PASSWORD" >> "$tmpfile_env" - fi -} - -function create_env_file_debug_protected() { - shell_options="$-" - case $shell_options in - *x*) set +x ;; - *) echo &>/dev/null ;; - esac - - create_env_file "$1" - - case $shell_options in - *x*) set -x ;; - *) echo &>/dev/null ;; - esac -} - -old_umask=$(umask) -umask 277 -tmpfile_env=$(mktemp) -trap 'rm -f -- "$tmpfile_env"' INT TERM HUP EXIT - -create_env_file_debug_protected "$tmpfile_env" - -docker run --env-file "$tmpfile_env" --rm $terminal_parameter -v "$PWD:$PWD" -v "$DOCKER_SOCKET_MOUNT" -w "$PWD" "$RUNNER_IMAGE_NAME" bash -c "$RUN_COMMAND" - -umask "$old_umask" diff --git a/language_container/exaslct_scripts/exaslct_within_docker_container.sh.sha512sum b/language_container/exaslct_scripts/exaslct_within_docker_container.sh.sha512sum deleted file mode 100644 index 6e4c6bc9..00000000 --- a/language_container/exaslct_scripts/exaslct_within_docker_container.sh.sha512sum +++ /dev/null @@ -1 +0,0 @@ -0ab76d3064c67f6e9031a949b6855202fd57af6c6382fa39ced517deb3662ccd567199fab610a27956a7676995c14becd9e6d72b2ca08495d8bd044f5363a7f4 exaslct_within_docker_container.sh diff --git a/language_container/exaslct_scripts/exaslct_within_docker_container_without_container_build.sh b/language_container/exaslct_scripts/exaslct_within_docker_container_without_container_build.sh deleted file mode 100644 index 0dd06358..00000000 --- a/language_container/exaslct_scripts/exaslct_within_docker_container_without_container_build.sh +++ /dev/null @@ -1,15 +0,0 @@ -#!/usr/bin/env bash - -set -euo pipefail - -SCRIPT_DIR="$(dirname "$(readlink -f "${BASH_SOURCE[0]}")")" - -RUNNER_IMAGE_NAME="$1" -shift 1 - -FIND_IMAGE_LOCALLY=$(docker images -q "$RUNNER_IMAGE_NAME") -if [ -z "$FIND_IMAGE_LOCALLY" ]; then - docker pull "$RUNNER_IMAGE_NAME" -fi - -bash "$SCRIPT_DIR/exaslct_within_docker_container.sh" "$RUNNER_IMAGE_NAME" "${@}" diff --git a/language_container/exaslct_scripts/exaslct_within_docker_container_without_container_build.sh.sha512sum b/language_container/exaslct_scripts/exaslct_within_docker_container_without_container_build.sh.sha512sum deleted file mode 100644 index 4140611b..00000000 --- a/language_container/exaslct_scripts/exaslct_within_docker_container_without_container_build.sh.sha512sum +++ /dev/null @@ -1 +0,0 @@ -468914d925bab35af2f4f848ea4cadab51e743aafbe8d7d705ea851f32024092977a011a7df93d41a505d712051899cc4fd1b07312d424483b59d7ae885d2ec9 exaslct_within_docker_container_without_container_build.sh diff --git a/language_container/exasol_advanced_analytics_framework_container/flavor_base/build_steps.py b/language_container/exasol_advanced_analytics_framework_container/flavor_base/build_steps.py deleted file mode 100644 index 47b77031..00000000 --- a/language_container/exasol_advanced_analytics_framework_container/flavor_base/build_steps.py +++ /dev/null @@ -1,26 +0,0 @@ -from pathlib import Path -from typing import Dict - -from exasol_script_languages_container_tool.lib.tasks.build.docker_flavor_image_task import DockerFlavorAnalyzeImageTask - - -class AnalyzeRelease(DockerFlavorAnalyzeImageTask): - def get_build_step(self) -> str: - return "release" - - def requires_tasks(self): - return {} - - def get_path_in_flavor(self): - return "flavor_base" - - -class AnalyzeTest(DockerFlavorAnalyzeImageTask): - def get_build_step(self) -> str: - return "test" - - def requires_tasks(self): - return {"release": AnalyzeRelease} - - def get_path_in_flavor(self): - return "flavor_base" diff --git a/language_container/exasol_advanced_analytics_framework_container/flavor_base/language_definition b/language_container/exasol_advanced_analytics_framework_container/flavor_base/language_definition deleted file mode 100644 index 6b0b6dcc..00000000 --- a/language_container/exasol_advanced_analytics_framework_container/flavor_base/language_definition +++ /dev/null @@ -1 +0,0 @@ -PYTHON3_AAF=localzmq+protobuf:///{{ bucketfs_name }}/{{ bucket_name }}/{{ path_in_bucket }}{{ release_name }}?lang=python#buckets/{{ bucketfs_name }}/{{ bucket_name }}/{{ path_in_bucket }}{{ release_name }}/exaudf/exaudfclient_py3 diff --git a/language_container/exasol_advanced_analytics_framework_container/flavor_base/release/Dockerfile b/language_container/exasol_advanced_analytics_framework_container/flavor_base/release/Dockerfile deleted file mode 100644 index 859e24b0..00000000 --- a/language_container/exasol_advanced_analytics_framework_container/flavor_base/release/Dockerfile +++ /dev/null @@ -1,12 +0,0 @@ -FROM exasol/script-language-container:template-Exasol-all-python-3.10-release_BFRSH344TDRPT7LK2FBOJK4KBIDW6A253FFPYEUYT4O2ERFMTCNA - -RUN apt-get update && \ - apt-get upgrade -y && \ - apt-get install -y git - -COPY release/dist /project -RUN python3.10 -m pip install --use-deprecated=legacy-resolver /project/*.whl - -RUN mkdir -p /build_info/actual_installed_packages/release && \ - /scripts/list_installed_scripts/list_installed_apt.sh > /build_info/actual_installed_packages/release/apt_get_packages && \ - /scripts/list_installed_scripts/list_installed_pip.sh python3.10 > /build_info/actual_installed_packages/release/python3_pip_packages diff --git a/language_container/exasol_advanced_analytics_framework_container/flavor_base/test/Dockerfile b/language_container/exasol_advanced_analytics_framework_container/flavor_base/test/Dockerfile deleted file mode 100644 index 74b7d65d..00000000 --- a/language_container/exasol_advanced_analytics_framework_container/flavor_base/test/Dockerfile +++ /dev/null @@ -1,8 +0,0 @@ -FROM {{release}} - -COPY test/dist /project -RUN python3.10 -m pip install --use-deprecated=legacy-resolver /project/*.whl - -RUN mkdir -p /build_info/actual_installed_packages/test && \ - /scripts/list_installed_scripts/list_installed_apt.sh > /build_info/actual_installed_packages/test/apt_get_packages && \ - /scripts/list_installed_scripts/list_installed_pip.sh python3.10 > /build_info/actual_installed_packages/test/python3_pip_packages diff --git a/language_container/install_or_update_exaslct.sh b/language_container/install_or_update_exaslct.sh deleted file mode 100644 index 2de662f1..00000000 --- a/language_container/install_or_update_exaslct.sh +++ /dev/null @@ -1,61 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -die() { - echo "$*" 1>&2 - exit 1 -} -download_raw_file_from_github() { - local repo=$1 - local ref=$2 - local remote_file_path=$3 - local local_file_path=$4 - local url="https://api.github.com/repos/$repo/contents/$remote_file_path?ref=$ref" - local arguments=(-s -H 'Accept: application/vnd.github.v3.raw' -L "$url" -o "$local_file_path") - if [ -z "${GITHUB_TOKEN-}" ]; then - curl "${arguments[@]}" - else - curl -H "Authorization: token $GITHUB_TOKEN" "${arguments[@]}" - fi -} -download_and_verify_raw_file_from_github() { - local repo=$1 - local ref=$2 - local remote_file_path=$3 - local file_name=${remote_file_path##*/} - local dir_path=${remote_file_path%$file_name} - local checksum_file_name="${file_name}.sha512sum" - local remote_checksum_file_path="${dir_path}checksums/$checksum_file_name" - - download_raw_file_from_github "$repo" "$ref" "$remote_file_path" "$file_name" || - die "ERROR: Could not download '$remote_file_path' from the github repository '$repo' at ref '$ref'." - download_raw_file_from_github "$repo" "$ref" "$remote_checksum_file_path" "$checksum_file_name" || - die "ERROR: Could not download the checksum for '$remote_file_path' from the github repository '$repo' at ref '$ref'." - sha512sum --check "${checksum_file_name}" || - die "ERROR: Could not verify the checksum for '$remote_file_path' from the github repository '$repo' at ref '$ref'." - -} - -main() { - local exaslct_git_ref="latest" - if [ -n "${1-}" ]; then - exaslct_git_ref="$1" - fi - - local repo="exasol/script-languages-container-tool" - tmp_directory_for_installer="$(mktemp -d)" - trap 'rm -rf -- "$tmp_directory_for_installer"' EXIT - - local installer_file_name="exaslct_installer.sh" - - pushd "$tmp_directory_for_installer" &>/dev/null - - download_and_verify_raw_file_from_github "$repo" "$exaslct_git_ref" "installer/$installer_file_name" - - popd &>/dev/null - - bash "$tmp_directory_for_installer/$installer_file_name" "$exaslct_git_ref" - -} - -main "${@}" diff --git a/tests/integration_tests/with_db/fixtures/setup_database_fixture.py b/tests/integration_tests/with_db/fixtures/setup_database_fixture.py index 301c7789..bf2902c7 100644 --- a/tests/integration_tests/with_db/fixtures/setup_database_fixture.py +++ b/tests/integration_tests/with_db/fixtures/setup_database_fixture.py @@ -27,53 +27,12 @@ def deployed_scripts(pyexasol_connection, db_schema_name, language_alias) -> Non ).deploy_scripts() -# Can be removed after -# https://github.com/exasol/advanced-analytics-framework/issues/176 -def _bucket_address( - bucketfs_params: dict[str, Any], - path_in_bucket: str = "my-folder", -) -> str: - url = bucketfs_params["url"] - bucket_name = bucketfs_params["bucket_name"] - service_name = bucketfs_params["service_name"] - return ( f"{url}/{bucket_name}/" - f"{path_in_bucket};{service_name}" ) - - -# Can be removed after -# https://github.com/exasol/advanced-analytics-framework/issues/176 -@pytest.fixture(scope='session') -def my_bucketfs_connection_factory( - use_onprem, - pyexasol_connection, - backend_aware_bucketfs_params, -) -> Callable[[str, str|None], None]: - def create(name, path_in_bucket): - if not use_onprem: - return - bucketfs_params = backend_aware_bucketfs_params - uri = _bucket_address(bucketfs_params, path_in_bucket) - user = bucketfs_params["username"] - pwd = bucketfs_params["password"] - pyexasol_connection.execute( - f"CREATE OR REPLACE CONNECTION {name} TO '{uri}' " \ - f"USER '{user}' IDENTIFIED BY '{pwd}'" - ) - return create - - @pytest.fixture(scope="module") def database_with_slc( - pyexasol_connection, deployed_scripts, db_schema_name, bucketfs_connection_factory, - my_bucketfs_connection_factory, deployed_slc, -) -> Tuple[str|None, str]: - # this requires updating query_handler_runner_udf.py to the new bucketfs API, first, - # which is planned to be done in ticket - # https://github.com/exasol/advanced-analytics-framework/issues/176 - # bucketfs_connection_factory(BUCKETFS_CONNECTION_NAME, "my-folder") - my_bucketfs_connection_factory(BUCKETFS_CONNECTION_NAME, "my-folder") +) -> Tuple[str, str]: + bucketfs_connection_factory(BUCKETFS_CONNECTION_NAME, "my-folder") return BUCKETFS_CONNECTION_NAME, db_schema_name diff --git a/tests/unit_tests/query_handler/fixtures.py b/tests/unit_tests/query_handler/fixtures.py index b733db86..53250c47 100644 --- a/tests/unit_tests/query_handler/fixtures.py +++ b/tests/unit_tests/query_handler/fixtures.py @@ -1,7 +1,5 @@ import pytest -from exasol_bucketfs_utils_python.abstract_bucketfs_location import AbstractBucketFSLocation -from exasol_bucketfs_utils_python.bucketfs_factory import BucketFSFactory -from exasol_bucketfs_utils_python.bucketfs_location import BucketFSLocation +import exasol.bucketfs as bfs from exasol_advanced_analytics_framework.query_handler.context.scope_query_handler_context import \ ScopeQueryHandlerContext, Connection @@ -59,17 +57,24 @@ def lookup(name: str) -> Connection: @pytest.fixture -def bucketfs_location(tmp_path) -> AbstractBucketFSLocation: - bucketfs_location = BucketFSFactory().create_bucketfs_location( - url=f"file://{tmp_path}/data", - user=None, - pwd=None) - return bucketfs_location +def sample_mounted_bucket(tmp_path): + return bfs.MountedBucket(base_path=str(tmp_path)) + + +@pytest.fixture +def bucketfs_location(sample_mounted_bucket): + return bfs.path.BucketPath("a/b", sample_mounted_bucket) + + +@pytest.fixture +def mocked_temporary_bucketfs_location(tmp_path): + mounted_bucket = bfs.MountedBucket(base_path=str(tmp_path / "bucketfs")) + return bfs.path.BucketPath("", mounted_bucket) @pytest.fixture def top_level_query_handler_context( - bucketfs_location: BucketFSLocation, + bucketfs_location: bfs.path.PathLike, prefix: str, schema: str, test_connection_lookup: ConnectionLookup) -> TopLevelQueryHandlerContext: diff --git a/tests/unit_tests/query_handler/test_scope_query_handler_context.py b/tests/unit_tests/query_handler/test_scope_query_handler_context.py index f831cf3e..dd226b63 100644 --- a/tests/unit_tests/query_handler/test_scope_query_handler_context.py +++ b/tests/unit_tests/query_handler/test_scope_query_handler_context.py @@ -1,7 +1,7 @@ from contextlib import contextmanager import pytest -from exasol_bucketfs_utils_python.bucketfs_location import BucketFSLocation +import exasol.bucketfs as bfs from exasol_data_science_utils_python.schema.column_builder import ColumnBuilder from exasol_data_science_utils_python.schema.column_name import ColumnName from exasol_data_science_utils_python.schema.column_type import ColumnType @@ -38,11 +38,11 @@ def test_temporary_view_temporary_schema(scope_query_handler_context: ScopeQuery assert proxy.schema_name.name == schema -def test_temporary_bucketfs_file_prefix_in_name(bucketfs_location: BucketFSLocation, +def test_temporary_bucketfs_file_prefix_in_name(bucketfs_location: bfs.path.PathLike, scope_query_handler_context: ScopeQueryHandlerContext): - proxy = scope_query_handler_context.get_temporary_bucketfs_location() - actual_path = proxy.bucketfs_location().get_complete_file_path_in_bucket() - expected_prefix_path = bucketfs_location.get_complete_file_path_in_bucket() + proxy = scope_query_handler_context.get_temporary_bucketfs_location() + actual_path = proxy.bucketfs_location().as_udf_path() + expected_prefix_path = bucketfs_location.as_udf_path() assert actual_path.startswith(expected_prefix_path) @@ -61,8 +61,8 @@ def test_two_temporary_view_are_not_equal(scope_query_handler_context: ScopeQuer def test_two_temporary_bucketfs_files_are_not_equal(scope_query_handler_context: ScopeQueryHandlerContext): proxy1 = scope_query_handler_context.get_temporary_bucketfs_location() proxy2 = scope_query_handler_context.get_temporary_bucketfs_location() - path1 = proxy1.bucketfs_location().get_complete_file_path_in_bucket() - path2 = proxy2.bucketfs_location().get_complete_file_path_in_bucket() + path1 = proxy1.bucketfs_location().as_udf_path() + path2 = proxy2.bucketfs_location().as_udf_path() assert path1 != path2 diff --git a/tests/unit_tests/query_handler/test_top_level_query_handler_context.py b/tests/unit_tests/query_handler/test_top_level_query_handler_context.py index 96408c17..84d281c2 100644 --- a/tests/unit_tests/query_handler/test_top_level_query_handler_context.py +++ b/tests/unit_tests/query_handler/test_top_level_query_handler_context.py @@ -1,5 +1,5 @@ import pytest -from exasol_bucketfs_utils_python.abstract_bucketfs_location import AbstractBucketFSLocation +import exasol.bucketfs as bfs from exasol_advanced_analytics_framework.query_handler.context.top_level_query_handler_context import \ TopLevelQueryHandlerContext, ChildContextNotReleasedError @@ -30,30 +30,24 @@ def test_cleanup_released_temporary_view_proxies( def test_cleanup_released_bucketfs_object_with_uploaded_file_proxies( top_level_query_handler_context: TopLevelQueryHandlerContext, - bucketfs_location: AbstractBucketFSLocation, - prefix: str): + bucketfs_location: bfs.path.PathLike): proxy = top_level_query_handler_context.get_temporary_bucketfs_location() - bucket_file_name = "test_file.txt" - proxy.bucketfs_location().upload_string_to_bucketfs(bucket_file_name, "test") + # create dummy file with content "test" + (proxy.bucketfs_location() / "test_file.txt").write(b"test") top_level_query_handler_context.release() top_level_query_handler_context.cleanup_released_object_proxies() - file_list = bucketfs_location.list_files_in_bucketfs("") - assert file_list == [] + assert not bucketfs_location.is_dir() def test_cleanup_released_bucketfs_object_without_uploaded_file_proxies_after_release( - top_level_query_handler_context: TopLevelQueryHandlerContext, - bucketfs_location: AbstractBucketFSLocation, - prefix: str): + top_level_query_handler_context: TopLevelQueryHandlerContext): _ = top_level_query_handler_context.get_temporary_bucketfs_location() top_level_query_handler_context.release() top_level_query_handler_context.cleanup_released_object_proxies() def test_cleanup_release_in_reverse_order_at_top_level( - top_level_query_handler_context: TopLevelQueryHandlerContext, - bucketfs_location: AbstractBucketFSLocation, - prefix: str): + top_level_query_handler_context: TopLevelQueryHandlerContext): proxies = [top_level_query_handler_context.get_temporary_table_name() for _ in range(10)] table_names = [proxy.fully_qualified for proxy in proxies] top_level_query_handler_context.release() @@ -65,9 +59,7 @@ def test_cleanup_release_in_reverse_order_at_top_level( def test_cleanup_release_in_reverse_order_at_child( - top_level_query_handler_context: TopLevelQueryHandlerContext, - bucketfs_location: AbstractBucketFSLocation, - prefix: str): + top_level_query_handler_context: TopLevelQueryHandlerContext): parent_proxies = [top_level_query_handler_context.get_temporary_table_name() for _ in range(10)] child = top_level_query_handler_context.get_child_query_handler_context() @@ -109,4 +101,4 @@ def test_cleanup_parent_before_grand_child_with_temporary_objects( with pytest.raises(ChildContextNotReleasedError): top_level_query_handler_context.release() cleanup_queries = top_level_query_handler_context.cleanup_released_object_proxies() - assert len(cleanup_queries) == 7 \ No newline at end of file + assert len(cleanup_queries) == 7 diff --git a/tests/unit_tests/query_handler_runner/test_mock_query_handler_runner.py b/tests/unit_tests/query_handler_runner/test_mock_query_handler_runner.py index 14698bb4..b370db55 100644 --- a/tests/unit_tests/query_handler_runner/test_mock_query_handler_runner.py +++ b/tests/unit_tests/query_handler_runner/test_mock_query_handler_runner.py @@ -2,7 +2,6 @@ from typing import Union import pytest -from exasol_bucketfs_utils_python.localfs_mock_bucketfs_location import LocalFSMockBucketFSLocation from exasol_data_science_utils_python.schema.column import Column from exasol_data_science_utils_python.schema.column_name import ColumnName from exasol_data_science_utils_python.schema.column_type import ColumnType @@ -28,12 +27,12 @@ def temporary_schema_name(): return "temp_schema_name" -@pytest.fixture() -def top_level_query_handler_context(tmp_path, +@pytest.fixture +def top_level_query_handler_context(mocked_temporary_bucketfs_location, temporary_schema_name, test_connection_lookup): top_level_query_handler_context = TopLevelQueryHandlerContext( - temporary_bucketfs_location=LocalFSMockBucketFSLocation(base_path=PurePosixPath(tmp_path) / "bucketfs"), + temporary_bucketfs_location=mocked_temporary_bucketfs_location, temporary_db_object_name_prefix="temp_db_object", connection_lookup=test_connection_lookup, temporary_schema_name=temporary_schema_name, diff --git a/tests/unit_tests/udf_framework/test_json_udf_query_handler.py b/tests/unit_tests/udf_framework/test_json_udf_query_handler.py index a5ef46e0..8ae06ba5 100644 --- a/tests/unit_tests/udf_framework/test_json_udf_query_handler.py +++ b/tests/unit_tests/udf_framework/test_json_udf_query_handler.py @@ -1,18 +1,15 @@ import json +import pytest + from json import JSONDecodeError -from pathlib import PurePosixPath from typing import Union -import pytest -from exasol_bucketfs_utils_python.localfs_mock_bucketfs_location import LocalFSMockBucketFSLocation from exasol_data_science_utils_python.schema.column import Column from exasol_data_science_utils_python.schema.column_name import ColumnName from exasol_data_science_utils_python.schema.column_type import ColumnType from exasol_advanced_analytics_framework.query_handler.context.scope_query_handler_context import \ ScopeQueryHandlerContext -from exasol_advanced_analytics_framework.query_handler.context.top_level_query_handler_context import \ - TopLevelQueryHandlerContext from exasol_advanced_analytics_framework.query_handler.json_udf_query_handler import JSONQueryHandler, JSONType from exasol_advanced_analytics_framework.query_handler.result import Continue, Finish from exasol_advanced_analytics_framework.query_result.mock_query_result import MockQueryResult @@ -20,24 +17,6 @@ from exasol_advanced_analytics_framework.udf_framework.json_udf_query_handler_factory import JsonUDFQueryHandler -@pytest.fixture() -def temporary_schema_name(): - return "temp_schema_name" - - -@pytest.fixture() -def top_level_query_handler_context(tmp_path, - temporary_schema_name, - test_connection_lookup): - top_level_query_handler_context = TopLevelQueryHandlerContext( - temporary_bucketfs_location=LocalFSMockBucketFSLocation(base_path=PurePosixPath(tmp_path) / "bucketfs"), - temporary_db_object_name_prefix="temp_db_object", - connection_lookup=test_connection_lookup, - temporary_schema_name=temporary_schema_name, - ) - return top_level_query_handler_context - - class ConstructorTestJSONQueryHandler(JSONQueryHandler): def __init__(self, parameter: JSONType, query_handler_context: ScopeQueryHandlerContext): diff --git a/tests/unit_tests/udf_framework/test_json_udf_query_handler_factory.py b/tests/unit_tests/udf_framework/test_json_udf_query_handler_factory.py index 922dce5a..8cb90f25 100644 --- a/tests/unit_tests/udf_framework/test_json_udf_query_handler_factory.py +++ b/tests/unit_tests/udf_framework/test_json_udf_query_handler_factory.py @@ -1,17 +1,13 @@ import json -from pathlib import PurePosixPath + from typing import Union -import pytest -from exasol_bucketfs_utils_python.localfs_mock_bucketfs_location import LocalFSMockBucketFSLocation from exasol_data_science_utils_python.schema.column import Column from exasol_data_science_utils_python.schema.column_name import ColumnName from exasol_data_science_utils_python.schema.column_type import ColumnType from exasol_advanced_analytics_framework.query_handler.context.scope_query_handler_context import \ ScopeQueryHandlerContext -from exasol_advanced_analytics_framework.query_handler.context.top_level_query_handler_context import \ - TopLevelQueryHandlerContext from exasol_advanced_analytics_framework.query_handler.json_udf_query_handler import JSONQueryHandler, JSONType from exasol_advanced_analytics_framework.query_handler.result import Continue, Finish from exasol_advanced_analytics_framework.query_result.mock_query_result import MockQueryResult @@ -20,24 +16,6 @@ from exasol_advanced_analytics_framework.udf_framework.udf_query_handler import UDFQueryHandler -@pytest.fixture() -def temporary_schema_name(): - return "temp_schema_name" - - -@pytest.fixture() -def top_level_query_handler_context(tmp_path, - temporary_schema_name, - test_connection_lookup): - top_level_query_handler_context = TopLevelQueryHandlerContext( - temporary_bucketfs_location=LocalFSMockBucketFSLocation(base_path=PurePosixPath(tmp_path) / "bucketfs"), - temporary_db_object_name_prefix="temp_db_object", - connection_lookup=test_connection_lookup, - temporary_schema_name=temporary_schema_name, - ) - return top_level_query_handler_context - - class TestJSONQueryHandler(JSONQueryHandler): __test__ = False def __init__(self, parameter: JSONType, query_handler_context: ScopeQueryHandlerContext): diff --git a/tests/unit_tests/udf_framework/test_query_handler_runner_udf_mock.py b/tests/unit_tests/udf_framework/test_query_handler_runner_udf_mock.py index aef8b069..8847e772 100644 --- a/tests/unit_tests/udf_framework/test_query_handler_runner_udf_mock.py +++ b/tests/unit_tests/udf_framework/test_query_handler_runner_udf_mock.py @@ -1,7 +1,9 @@ +import json +import pytest import re -from tempfile import TemporaryDirectory -from exasol_bucketfs_utils_python.bucketfs_factory import BucketFSFactory +from typing import Any, Dict + from exasol_udf_mock_python.column import Column from exasol_udf_mock_python.connection import Connection from exasol_udf_mock_python.group import Group @@ -9,11 +11,15 @@ from exasol_udf_mock_python.mock_meta_data import MockMetaData from exasol_udf_mock_python.udf_mock_executor import UDFMockExecutor -from exasol_advanced_analytics_framework.udf_framework.query_handler_runner_udf import QueryHandlerStatus +from exasol_advanced_analytics_framework.udf_framework.query_handler_runner_udf import ( + QueryHandlerStatus, + create_bucketfs_location_from_conn_object, +) from tests.unit_tests.udf_framework import mock_query_handlers from tests.unit_tests.udf_framework.mock_query_handlers import TEST_CONNECTION from tests.utils.test_utils import pytest_regex + TEMPORARY_NAME_PREFIX = "temporary_name_prefix" BUCKETFS_DIRECTORY = "directory" @@ -21,6 +27,41 @@ BUCKETFS_CONNECTION_NAME = "bucketfs_connection" +def kwargs2dict(**kwargs): + return dict(kwargs) + + +def udf_mock_connection(user=None, password=None, **kwargs) -> Connection: + """ + For MountedBucket provide kwargs backend="mounted", and base_path. + """ + return Connection( + address=json.dumps(kwargs2dict(**kwargs)), + user=json.dumps(kwargs2dict(username=user)) if user else "{}", + password=json.dumps(kwargs2dict(password=password)) if password else "{}", + ) + + +@pytest.fixture +def query_handler_bfs_connection(tmp_path): + path = tmp_path / "query_handler" + path.mkdir() + return udf_mock_connection( + backend="mounted", + base_path=f"{path}", + ) + +def create_mocked_exa_env(bfs_connection, connections: Dict[str, Any] = {}): + meta = create_mock_data() + connections[BUCKETFS_CONNECTION_NAME] = bfs_connection + return MockExaEnvironment(metadata=meta, connections=connections) + + +@pytest.fixture +def mocked_exa_env(query_handler_bfs_connection): + return create_mocked_exa_env(query_handler_bfs_connection) + + def _udf_wrapper(): from exasol_udf_mock_python.udf_context import UDFContext from exasol_advanced_analytics_framework.udf_framework. \ @@ -55,128 +96,91 @@ def create_mock_data(): return meta -def test_query_handler_udf_with_one_iteration(): - executor = UDFMockExecutor() - meta = create_mock_data() +def test_query_handler_udf_with_one_iteration(mocked_exa_env): + input_data = ( + 0, + BUCKETFS_CONNECTION_NAME, + BUCKETFS_DIRECTORY, + TEMPORARY_NAME_PREFIX, + "temp_schema", + "MockQueryHandlerWithOneIterationFactory", + "tests.unit_tests.udf_framework.mock_query_handlers", + mock_query_handlers.TEST_INPUT + ) + result = UDFMockExecutor().run([Group([input_data])], mocked_exa_env) + rows = [row[0] for row in result[0].rows] + expected_rows = [None, None, QueryHandlerStatus.FINISHED.name, mock_query_handlers.FINAL_RESULT] + assert rows == expected_rows - with TemporaryDirectory() as path: - bucketfs_connection = Connection(address=f"file://{path}/query_handler") - exa = MockExaEnvironment( - metadata=meta, - connections={"bucketfs_connection": bucketfs_connection}) - - input_data = ( - 0, - BUCKETFS_CONNECTION_NAME, - BUCKETFS_DIRECTORY, - TEMPORARY_NAME_PREFIX, - "temp_schema", - "MockQueryHandlerWithOneIterationFactory", - "tests.unit_tests.udf_framework.mock_query_handlers", - mock_query_handlers.TEST_INPUT - ) - result = executor.run([Group([input_data])], exa) - rows = [row[0] for row in result[0].rows] - expected_rows = [None, None, QueryHandlerStatus.FINISHED.name, mock_query_handlers.FINAL_RESULT] - assert rows == expected_rows - - -def test_query_handler_udf_with_one_iteration_with_not_released_child_query_handler_context(): - executor = UDFMockExecutor() - meta = create_mock_data() - with TemporaryDirectory() as path: - bucketfs_connection = Connection(address=f"file://{path}/query_handler") - exa = MockExaEnvironment( - metadata=meta, - connections={"bucketfs_connection": bucketfs_connection}) - - input_data = ( - 0, - BUCKETFS_CONNECTION_NAME, - BUCKETFS_DIRECTORY, - TEMPORARY_NAME_PREFIX, - "temp_schema", - "MockQueryHandlerWithOneIterationWithNotReleasedChildQueryHandlerContextFactory", - "tests.unit_tests.udf_framework.mock_query_handlers", - "{}" - ) - result = executor.run([Group([input_data])], exa) - rows = [row[0] for row in result[0].rows] - expected_rows = [None, - None, - QueryHandlerStatus.ERROR.name, - pytest_regex(r".*The following child contexts were not released:*", re.DOTALL)] - assert rows == expected_rows - - -def test_query_handler_udf_with_one_iteration_with_not_released_temporary_object(): - executor = UDFMockExecutor() - meta = create_mock_data() +def test_query_handler_udf_with_one_iteration_with_not_released_child_query_handler_context(mocked_exa_env): + input_data = ( + 0, + BUCKETFS_CONNECTION_NAME, + BUCKETFS_DIRECTORY, + TEMPORARY_NAME_PREFIX, + "temp_schema", + "MockQueryHandlerWithOneIterationWithNotReleasedChildQueryHandlerContextFactory", + "tests.unit_tests.udf_framework.mock_query_handlers", + "{}" + ) + result = UDFMockExecutor().run([Group([input_data])], mocked_exa_env) + rows = [row[0] for row in result[0].rows] + expected_rows = [None, + None, + QueryHandlerStatus.ERROR.name, + pytest_regex(r".*The following child contexts were not released:*", re.DOTALL)] + assert rows == expected_rows - with TemporaryDirectory() as path: - bucketfs_connection = Connection(address=f"file://{path}/query_handler") - exa = MockExaEnvironment( - metadata=meta, - connections={"bucketfs_connection": bucketfs_connection}) - - input_data = ( - 0, - BUCKETFS_CONNECTION_NAME, - BUCKETFS_DIRECTORY, - TEMPORARY_NAME_PREFIX, - "temp_schema", - "MockQueryHandlerWithOneIterationWithNotReleasedTemporaryObjectFactory", - "tests.unit_tests.udf_framework.mock_query_handlers", - "{}" - ) - result = executor.run([Group([input_data])], exa) - rows = [row[0] for row in result[0].rows] - expected_rows = [None, - None, - QueryHandlerStatus.ERROR.name, - pytest_regex(r".*The following child contexts were not released.*", re.DOTALL), - 'DROP TABLE IF EXISTS "temp_schema"."temporary_name_prefix_2_1";'] - assert rows == expected_rows - - -def test_query_handler_udf_with_one_iteration_and_temp_table(): - executor = UDFMockExecutor() - meta = create_mock_data() - with TemporaryDirectory() as path: - bucketfs_connection = Connection(address=f"file://{path}/query_handler") - exa = MockExaEnvironment( - metadata=meta, - connections={"bucketfs_connection": bucketfs_connection}) - - input_data = ( - 0, - BUCKETFS_CONNECTION_NAME, - BUCKETFS_DIRECTORY, - TEMPORARY_NAME_PREFIX, - "temp_schema", - "QueryHandlerTestWithOneIterationAndTempTableFactory", - "tests.unit_tests.udf_framework.mock_query_handlers", - "{}" - ) - result = executor.run([Group([input_data])], exa) - rows = [row[0] for row in result[0].rows] - table_cleanup_query = 'DROP TABLE IF EXISTS "temp_schema"."temporary_name_prefix_1";' - expected_rows = [None, None, QueryHandlerStatus.FINISHED.name, mock_query_handlers.FINAL_RESULT, - table_cleanup_query] - assert rows == expected_rows - - -def test_query_handler_udf_with_two_iteration(tmp_path): - executor = UDFMockExecutor() - meta = create_mock_data() +def test_query_handler_udf_with_one_iteration_with_not_released_temporary_object(mocked_exa_env): + input_data = ( + 0, + BUCKETFS_CONNECTION_NAME, + BUCKETFS_DIRECTORY, + TEMPORARY_NAME_PREFIX, + "temp_schema", + "MockQueryHandlerWithOneIterationWithNotReleasedTemporaryObjectFactory", + "tests.unit_tests.udf_framework.mock_query_handlers", + "{}" + ) + result = UDFMockExecutor().run([Group([input_data])], mocked_exa_env) + rows = [row[0] for row in result[0].rows] + expected_rows = [None, + None, + QueryHandlerStatus.ERROR.name, + pytest_regex(r".*The following child contexts were not released.*", re.DOTALL), + 'DROP TABLE IF EXISTS "temp_schema"."temporary_name_prefix_2_1";'] + assert rows == expected_rows - bucketfs_connection = Connection(address=f"file://{tmp_path}/query_handler") - exa = MockExaEnvironment( - metadata=meta, - connections={BUCKETFS_CONNECTION_NAME: bucketfs_connection}) +def test_query_handler_udf_with_one_iteration_and_temp_table(mocked_exa_env): + input_data = ( + 0, + BUCKETFS_CONNECTION_NAME, + BUCKETFS_DIRECTORY, + TEMPORARY_NAME_PREFIX, + "temp_schema", + "QueryHandlerTestWithOneIterationAndTempTableFactory", + "tests.unit_tests.udf_framework.mock_query_handlers", + "{}" + ) + result = UDFMockExecutor().run([Group([input_data])], mocked_exa_env) + rows = [row[0] for row in result[0].rows] + table_cleanup_query = 'DROP TABLE IF EXISTS "temp_schema"."temporary_name_prefix_1";' + expected_rows = [None, None, QueryHandlerStatus.FINISHED.name, mock_query_handlers.FINAL_RESULT, + table_cleanup_query] + assert rows == expected_rows + + +def test_query_handler_udf_with_two_iteration(query_handler_bfs_connection): + def state_file_exists(iteration: int) -> bool: + bucketfs_location = create_bucketfs_location_from_conn_object(query_handler_bfs_connection) + bucketfs_path = f"{BUCKETFS_DIRECTORY}/{TEMPORARY_NAME_PREFIX}/state" + state_file = f"{str(iteration)}.pkl" + return (bucketfs_location / bucketfs_path / state_file).exists() + + exa = create_mocked_exa_env(query_handler_bfs_connection) input_data = ( 0, BUCKETFS_CONNECTION_NAME, @@ -187,6 +191,7 @@ def test_query_handler_udf_with_two_iteration(tmp_path): "tests.unit_tests.udf_framework.mock_query_handlers", "{}" ) + executor = UDFMockExecutor() result = executor.run([Group([input_data])], exa) rows = [row[0] for row in result[0].rows] expected_return_query_view = 'CREATE VIEW "temp_schema"."temporary_name_prefix_2_1" AS ' \ @@ -202,9 +207,9 @@ def test_query_handler_udf_with_two_iteration(tmp_path): [query.query_string for query in mock_query_handlers.QUERY_LIST] assert rows == expected_rows - prev_state_exist = _is_state_exist(0, bucketfs_connection) - current_state_exist = _is_state_exist(1, bucketfs_connection) - assert prev_state_exist == False and current_state_exist == True + previous = 0 + current = 1 + assert not state_file_exists(previous) and state_file_exists(current) exa = MockExaEnvironment( metadata=MockMetaData( @@ -222,7 +227,7 @@ def test_query_handler_udf_with_two_iteration(tmp_path): Column("outputs", str, "VARCHAR(2000000)") ], is_variadic_input=True), - connections={BUCKETFS_CONNECTION_NAME: bucketfs_connection}) + connections={BUCKETFS_CONNECTION_NAME: query_handler_bfs_connection}) input_data = ( 1, @@ -240,50 +245,35 @@ def test_query_handler_udf_with_two_iteration(tmp_path): assert rows == expected_rows -def test_query_handler_udf_using_connection(): - executor = UDFMockExecutor() - meta = create_mock_data() - - with TemporaryDirectory() as path: - bucketfs_connection = Connection(address=f"file://{path}/query_handler") - test_connection = Connection(address=f"test_connection", - user="test_connection_user", - password="test_connection_pwd") - - exa = MockExaEnvironment( - metadata=meta, - connections={ - "bucketfs_connection": bucketfs_connection, - TEST_CONNECTION: test_connection} - ) - - input_data = ( - 0, - BUCKETFS_CONNECTION_NAME, - BUCKETFS_DIRECTORY, - TEMPORARY_NAME_PREFIX, - "temp_schema", - "MockQueryHandlerUsingConnectionFactory", - "tests.unit_tests.udf_framework.mock_query_handlers", - "{}" - ) - result = executor.run([Group([input_data])], exa) - rows = [row[0] for row in result[0].rows] - expected_rows = [ - None, None, QueryHandlerStatus.FINISHED.name, - f"{TEST_CONNECTION},{test_connection.address},{test_connection.user},{test_connection.password}" - ] - assert rows == expected_rows - - -def _is_state_exist( - iter_num: int, - model_connection: Connection) -> bool: - bucketfs_location = BucketFSFactory().create_bucketfs_location( - url=model_connection.address, - user=model_connection.user, - pwd=model_connection.password) - bucketfs_path = f"{BUCKETFS_DIRECTORY}/{TEMPORARY_NAME_PREFIX}/state/" - state_file = f"{str(iter_num)}.pkl" - files = bucketfs_location.list_files_in_bucketfs(bucketfs_path) - return state_file in files +def test_query_handler_udf_using_connection(query_handler_bfs_connection): + test_connection = udf_mock_connection( + address="test_connection", + user="test_connection_user", + password="test_connection_pwd", + ) + exa = create_mocked_exa_env( + query_handler_bfs_connection, + { TEST_CONNECTION: test_connection }, + ) + input_data = ( + 0, + BUCKETFS_CONNECTION_NAME, + BUCKETFS_DIRECTORY, + TEMPORARY_NAME_PREFIX, + "temp_schema", + "MockQueryHandlerUsingConnectionFactory", + "tests.unit_tests.udf_framework.mock_query_handlers", + "{}" + ) + result = UDFMockExecutor().run([Group([input_data])], exa) + rows = [row[0] for row in result[0].rows] + expected_rows = [ + None, None, QueryHandlerStatus.FINISHED.name, + ",".join([ + TEST_CONNECTION, + test_connection.address, + test_connection.user, + test_connection.password, + ]) + ] + assert rows == expected_rows