-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#190 enabled to generate a dynamic module for custom udf #202
Changes from 43 commits
a7902de
9d05c4c
e231ac1
69ad70d
237a0cb
e94b29a
24df407
c1b7068
0b7a4cd
3e60314
73db337
4c8bdbb
dcb07f1
b5314ab
2d8ac83
1a8e0c4
658d90b
d03f8ee
7c1b470
5a0c798
88daa71
dfe2319
443bde6
3384a78
ee0b129
f39824f
9baf2b1
647dfc1
4d97386
9502fff
645b8a0
ff940bb
76f0bf8
209352b
670c77a
4fdd329
35932a6
ff55086
ffd56f6
62a089c
2cdeb0d
7045dff
8656120
0877c1e
bfa6c44
75cd145
408ac95
008854a
cf8bd2b
9d94ad3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
name: Check Code Generation | ||
|
||
on: | ||
push: | ||
branches-ignore: | ||
- main | ||
|
||
jobs: | ||
check_code_generation: | ||
name: Lua Amalgate and Example in User Guide | ||
strategy: | ||
fail-fast: false | ||
matrix: | ||
python-version: [ "3.10" ] | ||
runs-on: ubuntu-latest | ||
|
||
steps: | ||
- uses: actions/checkout@v4 | ||
|
||
- name: Setup Python & Poetry Environment | ||
uses: exasol/python-toolbox/.github/actions/[email protected] | ||
with: | ||
python-version: ${{ matrix.python-version }} | ||
|
||
- name: Install Development Environment | ||
run: poetry run nox -s install_dev_env | ||
|
||
- name: Poetry install | ||
run: poetry run -- nox -s run_in_dev_env -- poetry install | ||
|
||
- name: Amalgate Lua Scripts | ||
run: poetry run nox -s amalgate_lua_scripts | ||
|
||
- name: Check if re-generated files differ from commit | ||
run: git diff --exit-code |
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
--/ | ||
CREATE OR REPLACE PYTHON3_AAF SET SCRIPT "EXAMPLE_SCHEMA"."MY_QUERY_HANDLER_UDF"(...) | ||
EMITS (outputs VARCHAR(2000000)) AS | ||
|
||
from typing import Union | ||
from exasol_advanced_analytics_framework.udf_framework.udf_query_handler import UDFQueryHandler | ||
from exasol_advanced_analytics_framework.udf_framework.dynamic_modules import create_module | ||
from exasol_advanced_analytics_framework.query_handler.context.query_handler_context import QueryHandlerContext | ||
from exasol_advanced_analytics_framework.query_result.query_result import QueryResult | ||
from exasol_advanced_analytics_framework.query_handler.result import Result, Continue, Finish | ||
from exasol_advanced_analytics_framework.query_handler.query.select_query import SelectQuery, SelectQueryWithColumnDefinition | ||
from exasol_advanced_analytics_framework.query_handler.context.proxy.bucketfs_location_proxy import \ | ||
BucketFSLocationProxy | ||
from exasol_data_science_utils_python.schema.column import Column | ||
from exasol_data_science_utils_python.schema.column_name import ColumnName | ||
from exasol_data_science_utils_python.schema.column_type import ColumnType | ||
from datetime import datetime | ||
from exasol.bucketfs import as_string | ||
|
||
|
||
example_module = create_module("example_module") | ||
|
||
class ExampleQueryHandler(UDFQueryHandler): | ||
|
||
def __init__(self, parameter: str, query_handler_context: QueryHandlerContext): | ||
super().__init__(parameter, query_handler_context) | ||
self.parameter = parameter | ||
self.query_handler_context = query_handler_context | ||
self.bfs_proxy = None | ||
self.db_table_proxy = None | ||
|
||
def _bfs_file(self, proxy: BucketFSLocationProxy): | ||
return proxy.bucketfs_location() / "temp_file.txt" | ||
|
||
def start(self) -> Union[Continue, Finish[str]]: | ||
def sample_content(key: str) -> str: | ||
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | ||
return f"{timestamp} {key} {self.parameter}" | ||
|
||
def table_query_string(statement: str, **kwargs): | ||
table_name = self.db_table_proxy._db_object_name.fully_qualified | ||
return statement.format(table_name=table_name, **kwargs) | ||
|
||
def table_query(statement: str, **kwargs): | ||
return SelectQuery(table_query_string(statement, **kwargs)) | ||
|
||
self.bfs_proxy = self.query_handler_context.get_temporary_bucketfs_location() | ||
self._bfs_file(self.bfs_proxy).write(sample_content("bucketfs")) | ||
self.db_table_proxy = self.query_handler_context.get_temporary_table_name() | ||
query_list = [ | ||
table_query('CREATE TABLE {table_name} ("c1" VARCHAR(100), "c2" INTEGER)'), | ||
table_query("INSERT INTO {table_name} VALUES ('{value}', 4)", | ||
value=sample_content("table-insert")), | ||
] | ||
query_handler_return_query = SelectQueryWithColumnDefinition( | ||
query_string=table_query_string('SELECT "c1", "c2" from {table_name}'), | ||
output_columns=[ | ||
Column(ColumnName("c1"), ColumnType("VARCHAR(100)")), | ||
Column(ColumnName("c2"), ColumnType("INTEGER")), | ||
]) | ||
return Continue( | ||
query_list=query_list, | ||
input_query=query_handler_return_query) | ||
|
||
def handle_query_result(self, query_result: QueryResult) -> Union[Continue, Finish[str]]: | ||
c1 = query_result.c1 | ||
c2 = query_result.c2 | ||
bfs_content = as_string(self._bfs_file(self.bfs_proxy).read()) | ||
return Finish(result=f"Final result: from query '{c1}', {c2} and bucketfs: '{bfs_content}'") | ||
|
||
|
||
example_module.add_to_module(ExampleQueryHandler) | ||
|
||
class ExampleQueryHandlerFactory: | ||
def create(self, parameter: str, query_handler_context: QueryHandlerContext): | ||
return example_module.ExampleQueryHandler(parameter, query_handler_context) | ||
|
||
example_module.add_to_module(ExampleQueryHandlerFactory) | ||
|
||
from exasol_advanced_analytics_framework.udf_framework.query_handler_runner_udf \ | ||
import QueryHandlerRunnerUDF | ||
|
||
udf = QueryHandlerRunnerUDF(exa) | ||
|
||
def run(ctx): | ||
return udf.run(ctx) | ||
|
||
/ |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
EXECUTE SCRIPT "AAF_DB_SCHEMA"."AAF_RUN_QUERY_HANDLER"('{ | ||
"query_handler": { | ||
"factory_class": { | ||
"module": "example_module", | ||
"name": "ExampleQueryHandlerFactory" | ||
}, | ||
"parameter": "bla-bla", | ||
"udf": { | ||
"schema": "EXAMPLE_SCHEMA", | ||
"name": "MY_QUERY_HANDLER_UDF" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if we already run with the word example, should we use it everywhere (half a joke) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, during my experiments, I switched from MY_SCHEMA to MY_EXAMPLE to avoid false positive test results due to my test database being already partially initialized. Should I replace the name |
||
} | ||
}, | ||
"temporary_output": { | ||
"bucketfs_location": { | ||
"connection_name": "BFS_CON", | ||
"directory": "temp" | ||
}, | ||
"schema_name": "EXAMPLE_TEMP_SCHEMA" | ||
} | ||
}') |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
## AAF Proxies | ||
|
||
The Advanced Analytics Framework (AAF) uses _Object Proxies_ to manage temporary objects. | ||
|
||
An _Object Proxy_ | ||
* Encapsulates a temporary object | ||
* Provides a reference enabling using the object, i.e. its name incl. the database schema or the path in the BucketFS | ||
* Ensures the object is removed when leaving the current scope, e.g. the Query Handler. | ||
|
||
All Object Proxies are derived from class `exasol_advanced_analytics_framework.query_handler.context.proxy.object_proxy.ObjectProxy`: | ||
* `BucketFSLocationProxy` encapsulates a location in the BucketFS | ||
* `DBObjectNameProxy` encapsulates a database object, e.g. a table | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -108,10 +108,12 @@ python -m exasol_advanced_analytics_framework.deploy scripts \ | |
--dsn "$DB_HOST:DB_PORT" \ | ||
--db-user "$DB_USER" \ | ||
--db-pass "$DB_PASSWORD" \ | ||
--schema "$DB_SCHEMA" \ | ||
--schema "$AAF_DB_SCHEMA" \ | ||
--language-alias "$LANGUAGE_ALIAS" | ||
``` | ||
|
||
The name of the database schema must match the schema when executing the script. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is unclear There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See next push. |
||
|
||
## Usage | ||
|
||
The entry point of this framework is `AAF_RUN_QUERY_HANDLER` script. This script is simply a query loop which is responsible for executing the implemented algorithm. | ||
|
@@ -124,7 +126,7 @@ This script takes the necessary parameters to execute the desired algorithm in s | |
The following SQL statement shows how to call an AAF query handler: | ||
|
||
```sql | ||
EXECUTE SCRIPT AAF_RUN_QUERY_HANDLER('{ | ||
EXECUTE SCRIPT <AAF_DB_SCHEMA>.AAF_RUN_QUERY_HANDLER('{ | ||
"query_handler": { | ||
"factory_class": { | ||
"module": "<CLASS_MODULE>", | ||
|
@@ -152,6 +154,7 @@ See [Implementing a Custom Algorithm as Example Query Handler](#implementing-a-c | |
|
||
| Parameter | Required? | Description | | ||
|------------------------------|-----------|-------------------------------------------------------------------------------| | ||
| `<AAF_DB_SCHEMA>` | yes | Name of the database schema containing the default Query Handler, See [Additional Scripts](#additional-scripts) | | ||
| `<CLASS_NAME>` | yes | Name of the query handler class | | ||
| `<CLASS_MODULE>` | yes | Module name of the query handler class | | ||
| `<CLASS_PARAMETERS>` | yes | Parameters of the query handler class encoded as string | | ||
|
@@ -188,88 +191,31 @@ Each algorithm should extend the `UDFQueryHandler` abstract class and then imple | |
|
||
### Concrete Example Using an Adhoc Implementation Within the UDF | ||
|
||
The example uses the module `builtins` and dynamically adds `ExampleQueryHandler` and `ExampleQueryHandlerFactory` to it. | ||
|
||
```python | ||
--/ | ||
CREATE OR REPLACE PYTHON3_AAF SET SCRIPT "MY_SCHEMA"."MY_QUERY_HANDLER_UDF"(...) | ||
EMITS (outputs VARCHAR(2000000)) AS | ||
|
||
from typing import Union | ||
from exasol_advanced_analytics_framework.udf_framework.udf_query_handler import UDFQueryHandler | ||
from exasol_advanced_analytics_framework.query_handler.context.query_handler_context import QueryHandlerContext | ||
from exasol_advanced_analytics_framework.query_result.query_result import QueryResult | ||
from exasol_advanced_analytics_framework.query_handler.result import Result, Continue, Finish | ||
from exasol_advanced_analytics_framework.query_handler.query.select_query import SelectQuery, SelectQueryWithColumnDefinition | ||
from exasol_data_science_utils_python.schema.column import Column | ||
from exasol_data_science_utils_python.schema.column_name import ColumnName | ||
from exasol_data_science_utils_python.schema.column_type import ColumnType | ||
|
||
|
||
class ExampleQueryHandler(UDFQueryHandler): | ||
def __init__(self, parameter: str, query_handler_context: QueryHandlerContext): | ||
super().__init__(parameter, query_handler_context) | ||
self.parameter = parameter | ||
self.query_handler_context = query_handler_context | ||
|
||
def start(self) -> Union[Continue, Finish[str]]: | ||
query_list = [ | ||
SelectQuery("SELECT 1 FROM DUAL"), | ||
SelectQuery("SELECT 2 FROM DUAL")] | ||
query_handler_return_query = SelectQueryWithColumnDefinition( | ||
query_string="SELECT 5 AS 'return_column' FROM DUAL", | ||
output_columns=[ | ||
Column(ColumnName("return_column"), ColumnType("INTEGER"))]) | ||
|
||
return Continue( | ||
query_list=query_list, | ||
input_query=query_handler_return_query) | ||
The example dynamically creates a python module `xyz` and adds classes `ExampleQueryHandler` and `ExampleQueryHandlerFactory` to it. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would give the module a more meaningful name than "xyz". If you were doing this manually you would probably call the module "example_query_handler.py" or "example_qh.py", or something like that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about |
||
|
||
def handle_query_result(self, query_result: QueryResult) -> Union[Continue, Finish[str]]: | ||
return_value = query_result.return_column | ||
result = 2 ** return_value | ||
return Finish(result=result) | ||
In order to execute the example successfully you need to | ||
1. [Create a BucketFS connection](#bucketfs-connection) | ||
2. Activate the AAF's SLC | ||
3. Make sure the database schemas used in the example exist. | ||
|
||
import builtins | ||
builtins.ExampleQueryHandler=ExampleQueryHandler # required for pickle | ||
The example assumes | ||
* the name for the BucketFS Connection `<CONNECTION_NAME>` to be `BFS_CON` | ||
* the name for the AAF database schema `<AAF_DB_SCHEMA` to be `AAF_DB_SCHEMA`, see [Additional Scripts](#additional-scripts) | ||
|
||
class ExampleQueryHandlerFactory: | ||
def create(self, parameter: str, query_handler_context: QueryHandlerContext): | ||
return builtins.ExampleQueryHandler(parameter, query_handler_context) | ||
The following SQL statements activate the AAF's SLC and create the required database schemas unless they already exist: | ||
|
||
builtins.ExampleQueryHandlerFactory=ExampleQueryHandlerFactory | ||
|
||
from exasol_advanced_analytics_framework.udf_framework.query_handler_runner_udf \ | ||
import QueryHandlerRunnerUDF | ||
```shell | ||
ALTER SESSION SET SCRIPT_LANGUAGES='R=builtin_r JAVA=builtin_java PYTHON3=builtin_python3 PYTHON3_AAF=localzmq+protobuf:///bfsdefault/default/temp/exasol_advanced_analytics_framework_container_release?lang=python#/buckets/bfsdefault/default/temp/exasol_advanced_analytics_framework_container_release/exaudf/exaudfclient_py3'; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I applied your proposal, @tkilias. Thinking about it: maybe we can omit the ALTER SESSION statement, as all this is already done by the AAF deploy command? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See next push without ALTER SESSION command. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lets remove it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
|
||
udf = QueryHandlerRunnerUDF(exa) | ||
create schema IF NOT EXISTS "EXAMPLE_SCHEMA"; | ||
create schema IF NOT EXISTS "EXAMPLE_TEMP_SCHEMA"; | ||
``` | ||
|
||
def run(ctx): | ||
return udf.run(ctx) | ||
/ | ||
The following files contain the SQL statements for creating and executing the UDF script | ||
* [example-udf-script/create.sql](example-udf-script/create.sql) | ||
* [example-udf-script/execute.sql](example-udf-script/execute.sql) | ||
|
||
|
||
EXECUTE SCRIPT MY_SCHEMA.AAF_RUN_QUERY_HANDLER('{ | ||
"query_handler": { | ||
"factory_class": { | ||
"module": "builtins", | ||
"name": "ExampleQueryHandlerFactory" | ||
}, | ||
"parameter": "bla-bla", | ||
"udf": { | ||
"schema": "MY_SCHEMA", | ||
"name": "MY_QUERY_HANDLER_UDF" | ||
} | ||
}, | ||
"temporary_output": { | ||
"bucketfs_location": { | ||
"connection_name": "BFS_CON", | ||
"directory": "temp" | ||
}, | ||
"schema_name": "TEMP_SCHEMA" | ||
} | ||
}'); | ||
``` | ||
### Sequence Diagram | ||
|
||
The figure below illustrates the execution of this algorithm implemented in class `ExampleQueryHandler`. | ||
* When method `start()` is called, it executes two queries and an additional `input_query` to obtain the input for the next iteration. | ||
|
@@ -278,3 +224,7 @@ The figure below illustrates the execution of this algorithm implemented in clas | |
In this example, the algorithm is finished at this iteration and returns 2<sup>_return value_</sup> as final result. | ||
|
||
![Sample Execution](../images/sample_execution.png "Sample Execution") | ||
|
||
## Additional Information | ||
|
||
* [Object Proxies](proxies.md) for managing temporary locations in the database and BucketFS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this still true
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks - I removed the main instructions but forgot to update the bullet list.
See next push fixing the bullet list, too.