Skip to content

Commit

Permalink
#176 updated bucketfs python usage to new api (#186)
Browse files Browse the repository at this point in the history
* #176 Updated usage of `exasol-bucketfs` to new API
* Apply suggestions from code review
* #185: Removed directory and script for building SLC AAF

Co-authored-by: Nicola Coretti <[email protected]>
  • Loading branch information
ckunki and Nicoretti authored Oct 8, 2024
1 parent 991f72e commit e608398
Show file tree
Hide file tree
Showing 28 changed files with 262 additions and 614 deletions.
57 changes: 0 additions & 57 deletions build_language_container.sh

This file was deleted.

13 changes: 8 additions & 5 deletions doc/changes/changes_0.1.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ Code name:
* #24: Added integrations test for event loop
* #28: Extended the EventHandlerContext to a scope-based system for handling temporary objects
* #29: Fixed interface of EventContext and adapted implementation of UDFEventContext
* #30: Sort cleanup queries in reverse order of their creation to ensure that temporary objects that depend on other are removed first
* #30: Sorted cleanup queries in reverse order of their creation to ensure that temporary objects that depend on other are removed first
* #34: Added MockEventContext
* #35: Clean up after EventHandler exceptions and throw exceptions when a parent EventHandlerContext encounters an unreleased child during release
* #35: Cleaned up after EventHandler exceptions and throw exceptions when a parent EventHandlerContext encounters an unreleased child during release
* #94: Prepare for release to PyPi
* #17: Added vagrant setup
* #97: Added SocketFactory wrapper which injects faults by losing messages
* #98: Added more robust connection protocol
* #99: Added multi/node udf discovery
* #100: Add combined global and local UDF discovery
* #100: Added combined global and local UDF discovery

### Bug Fixes

Expand All @@ -33,20 +33,23 @@ Code name:
* #65: Fixed that the `ScopeQueryHandlerContext` might not `_release` all child contexts, if a grand-child-context wasn't released
* #68: Fixed that methods called in BucketFSLocationProxy.cleanup can fail and stop the cleanup
* #66: Fixed _ScopeQueryHandlerContextBase.get_temporary_path not being private
* #116: Fix AbortTimeoutSender and add reason to Timeout messages
* #116: Fixed AbortTimeoutSender and add reason to Timeout messages

### Refactoring

* #171: Updated poetry dependencies
* #42: Updated dependencies
* #72: Unified naming of released resources in QueryHandler
* #88: Introduced an abstraction for ZMQ in UDF Communication
* #95: Remove setup.py
* #95: Removed setup.py
* #114: Refactored BackgroundPeerState and introduced parameter objects
* #173: Introduced Python Toolbox
* #174: Replaced Language Container Stuff with PEC and SLC plugin
* #183: Fixed warning on tests with `__init__` constructor
* #180: Replaced `start_integration_test_environment.sh` with `pytest-backend-plugin`
* #184: Updated micromamba to the latest version 2.0.0
* #176: Updated usage of `exasol-bucketfs` to new API
* #185: Removed directory and script for building SLC AAF

### Documentation

Expand Down
3 changes: 1 addition & 2 deletions exasol_advanced_analytics_framework/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
)
from exasol_advanced_analytics_framework.deployment import (
scripts_deployer_cli,
language_container_deployer_cli,
)
from exasol.python_extension_common.deployment.language_container_deployer_cli import (
language_container_deployer_main,
Expand All @@ -24,7 +23,7 @@ def main():
slc_parameter_formatters.set_formatter(CustomizableParameters.container_name, SLC_FILE_NAME)

main.add_command(scripts_deployer_cli.scripts_deployer_main)
main.add_command(language_container_deployer_cli.language_container_deployer_main)
main.add_command(language_container_deployer_main)


if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
import logging

from exasol_bucketfs_utils_python.abstract_bucketfs_location import AbstractBucketFSLocation

from exasol_advanced_analytics_framework.query_handler.context.proxy.object_proxy import ObjectProxy
import exasol.bucketfs as bfs

LOGGER = logging.getLogger(__file__)


class BucketFSLocationProxy(ObjectProxy):

def __init__(self, bucketfs_location: AbstractBucketFSLocation):
def __init__(self, bucketfs_location: bfs.path.PathLike):
super().__init__()
self._bucketfs_location = bucketfs_location

def bucketfs_location(self) -> AbstractBucketFSLocation:
def bucketfs_location(self) -> bfs.path.PathLike:
self._check_if_released()
return self._bucketfs_location

Expand All @@ -26,16 +25,15 @@ def cleanup(self):

def _remove_file(self, file):
try:
self._bucketfs_location.delete_file_in_bucketfs(file)
file.rm()
except Exception as e:
LOGGER.error(f"Failed to remove {file}, got exception", exc_info=True)

def _list_files(self):
files = []
try:
files = self._bucketfs_location.list_files_in_bucketfs("")
return list(self._bucketfs_location.iterdir())
except FileNotFoundError as e:
LOGGER.debug(f"File not found {self._bucketfs_location.get_complete_file_path_in_bucket()} during cleanup.")
LOGGER.debug(f"File not found {self._bucketfs_location.as_udf_path} during cleanup.")
except Exception as e:
LOGGER.exception(f"Got exception during listing files in temporary BucketFSLocation")
return files
return []
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from abc import ABC
from typing import Set, List, Callable

from exasol_bucketfs_utils_python.abstract_bucketfs_location import AbstractBucketFSLocation
import exasol.bucketfs as bfs
from exasol_data_science_utils_python.schema.schema_name import SchemaName
from exasol_data_science_utils_python.schema.table_name import TableName
from exasol_data_science_utils_python.schema.table_name_builder import TableNameBuilder
Expand Down Expand Up @@ -62,7 +62,7 @@ def get_all_not_released_contexts(self):

class _ScopeQueryHandlerContextBase(ScopeQueryHandlerContext, ABC):
def __init__(self,
temporary_bucketfs_location: AbstractBucketFSLocation,
temporary_bucketfs_location: bfs.path.PathLike,
temporary_db_object_name_prefix: str,
temporary_schema_name: str,
connection_lookup: ConnectionLookup,
Expand Down Expand Up @@ -232,7 +232,7 @@ def get_connection(self, name: str) -> Connection:

class TopLevelQueryHandlerContext(_ScopeQueryHandlerContextBase):
def __init__(self,
temporary_bucketfs_location: AbstractBucketFSLocation,
temporary_bucketfs_location: bfs.path.PathLike,
temporary_db_object_name_prefix: str,
temporary_schema_name: str,
connection_lookup: ConnectionLookup,
Expand Down Expand Up @@ -283,7 +283,7 @@ def transfer_object_to(self, object_proxy: ObjectProxy,

class _ChildQueryHandlerContext(_ScopeQueryHandlerContextBase):
def __init__(self, parent: _ScopeQueryHandlerContextBase,
temporary_bucketfs_location: AbstractBucketFSLocation,
temporary_bucketfs_location: bfs.path.PathLike,
temporary_db_object_name_prefix: str,
temporary_schema_name: str,
connection_lookup: ConnectionLookup,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
import dataclasses
import importlib
import json
import joblib
import logging
import traceback
from collections import OrderedDict
from enum import Enum, auto
from pathlib import PurePosixPath
from typing import Tuple, List, Optional

from exasol_bucketfs_utils_python.abstract_bucketfs_location import AbstractBucketFSLocation
from exasol_bucketfs_utils_python.bucketfs_factory import BucketFSFactory
from exasol_data_science_utils_python.schema.column import \
Column
from exasol_data_science_utils_python.schema.column_name \
import ColumnName
from exasol_data_science_utils_python.schema.column_type \
import ColumnType
from exasol_data_science_utils_python.schema.schema_name \
import SchemaName
from typing import Any, Tuple, List, Optional

import exasol.bucketfs as bfs
from io import BytesIO

from exasol_data_science_utils_python.schema.column import Column
from exasol_data_science_utils_python.schema.column_name import ColumnName
from exasol_data_science_utils_python.schema.column_type import ColumnType
from exasol_data_science_utils_python.schema.schema_name import SchemaName
from exasol_data_science_utils_python.schema.udf_name_builder import UDFNameBuilder

from exasol_advanced_analytics_framework.query_handler.context.scope_query_handler_context import \
Expand All @@ -33,6 +31,26 @@
from exasol_advanced_analytics_framework.udf_framework.udf_connection_lookup import UDFConnectionLookup


def create_bucketfs_location_from_conn_object(bfs_conn_obj) -> bfs.path.PathLike:
bfs_params = json.loads(bfs_conn_obj.address)
bfs_params.update(json.loads(bfs_conn_obj.user))
bfs_params.update(json.loads(bfs_conn_obj.password))
return bfs.path.build_path(**bfs_params)


def upload_via_joblib(location: bfs.path.PathLike, object: Any):
buffer = BytesIO()
joblib.dump(object, buffer)
location.write(buffer.getvalue())


def read_via_joblib(location: bfs.path.PathLike) -> Any:
buffer = BytesIO()
for chunk in location.read():
buffer.write(chunk)
return joblib.load(buffer)


@dataclasses.dataclass
class UDFParameter:
iter_num: int
Expand Down Expand Up @@ -65,7 +83,7 @@ class QueryHandlerRunnerUDF:

def __init__(self, exa):
self.exa = exa
self.bucketfs_location: Optional[AbstractBucketFSLocation] = None
self.bucketfs_location: Optional[bfs.path.PathLike] = None
self.parameter: Optional[UDFParameter] = None

def run(self, ctx) -> None:
Expand Down Expand Up @@ -173,10 +191,8 @@ def _get_parameter(self, ctx):

def _create_bucketfs_location(self):
bucketfs_connection_obj = self.exa.get_connection(self.parameter.temporary_bfs_location_conn)
bucketfs_location_from_con = BucketFSFactory().create_bucketfs_location(
url=bucketfs_connection_obj.address,
user=bucketfs_connection_obj.user,
pwd=bucketfs_connection_obj.password)
bucketfs_location_from_con = create_bucketfs_location_from_conn_object(
bucketfs_connection_obj)
self.bucketfs_location = bucketfs_location_from_con \
.joinpath(self.parameter.temporary_bfs_location_directory) \
.joinpath(self.parameter.temporary_name_prefix)
Expand Down Expand Up @@ -207,20 +223,17 @@ def _create_state(self) -> QueryHandlerRunnerState:
return query_handler_state

def _load_latest_state(self) -> QueryHandlerRunnerState:
state_file_bucketfs_path = self._generate_state_file_bucketfs_path()
query_handler_state: QueryHandlerRunnerState = \
self.bucketfs_location.read_file_from_bucketfs_via_joblib(str(state_file_bucketfs_path))
query_handler_state.connection_lookup.exa = self.exa
return query_handler_state
path = self._state_file_bucketfs_location()
state = read_via_joblib(path)
state.connection_lookup.exa = self.exa
return state

def _save_current_state(self, current_state: QueryHandlerRunnerState) -> None:
next_state_file_bucketfs_path = self._generate_state_file_bucketfs_path(1)
self.bucketfs_location.upload_object_to_bucketfs_via_joblib(
current_state, str(next_state_file_bucketfs_path))
path = self._state_file_bucketfs_location(1)
upload_via_joblib(path, current_state)

def _remove_previous_state(self) -> None:
state_file_bucketfs_path = self._generate_state_file_bucketfs_path()
self.bucketfs_location.delete_file_in_bucketfs(str(state_file_bucketfs_path))
self._state_file_bucketfs_location().rm()

def _create_udf_query_result(
self, ctx, query_columns: List[Column]) -> UDFQueryResult:
Expand Down Expand Up @@ -265,9 +278,9 @@ def _get_query_columns(self):
Column(ColumnName(col_name), ColumnType(col_type)))
return query_columns

def _generate_state_file_bucketfs_path(self, iter_offset: int = 0) -> PurePosixPath:
def _state_file_bucketfs_location(self, iter_offset: int = 0) -> bfs.path.PathLike:
num_iter = self.parameter.iter_num + iter_offset
return PurePosixPath(f"state/{str(num_iter)}.pkl")
return self.bucketfs_location / f"state/{str(num_iter)}.pkl"

@staticmethod
def emit_udf_result(ctx, udf_result: UDFResult):
Expand Down
1 change: 0 additions & 1 deletion language_container/exaslct

This file was deleted.

This file was deleted.

This file was deleted.

11 changes: 0 additions & 11 deletions language_container/exaslct_scripts/exaslct.sh

This file was deleted.

This file was deleted.

Loading

0 comments on commit e608398

Please sign in to comment.