diff --git a/.github/workflows/docker-pg2pg.yaml b/.github/workflows/docker-pg2pg.yaml new file mode 100644 index 0000000..ebf9b71 --- /dev/null +++ b/.github/workflows/docker-pg2pg.yaml @@ -0,0 +1,65 @@ +name: Push to GHCR + +on: + push: + branches: [ "ods-replication-pg2pg" ] + +env: + # DF-NOTE: pull ghcr.io/bcgov/nr-dap-ods-trino:main + REGISTRY: ghcr.io + DOCKERFILE_PATH: shared/ods_replication_pg2pg + IMAGE_NAME: ${{ github.repository }}-ods-pg2pg + +jobs: + build: + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + id-token: write + + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + - name: Install cosign + if: github.event_name != 'pull_request' + uses: sigstore/cosign-installer@v3.3.0 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@f95db51fddba0c2d1ec667646a06c2ce06100226 # v3.0.0 + + - name: Log into registry ${{ env.REGISTRY }} + if: github.event_name != 'pull_request' + uses: docker/login-action@343f7c4344506bcbf9b4de18042ae17996df046d # v3.0.0 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Extract Docker metadata + id: meta + uses: docker/metadata-action@96383f45573cb7f253c731d3b3ab81c87ef81934 # v5.0.0 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + + - name: Build and push Docker image + id: build-and-push + uses: docker/build-push-action@0565240e2d4ab88bba5387d719585280857ece09 # v5.0.0 + with: + # DF-NOTE: to help the action find the Dockerfile to build from + context: ${{ env.DOCKERFILE_PATH }}/ + push: ${{ github.event_name != 'pull_request' }} + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + cache-from: type=gha + cache-to: type=gha,mode=max + + #- name: Sign the published Docker image + #if: ${{ github.event_name != 'pull_request' }} + #env: + # https://docs.github.com/en/actions/security-guides/security-hardening-for-github-actions#using-an-intermediate-environment-variable + #TAGS: ${{ steps.meta.outputs.tags }} + #DIGEST: ${{ steps.build-and-push.outputs.digest }} + + #run: echo "${TAGS}" | xargs -I {} cosign sign --yes {}@${DIGEST} diff --git a/shared/ods_replication_pg2pg/Dockerfile b/shared/ods_replication_pg2pg/Dockerfile new file mode 100644 index 0000000..aba69ca --- /dev/null +++ b/shared/ods_replication_pg2pg/Dockerfile @@ -0,0 +1,17 @@ +FROM python:3.11.4-slim-buster + +WORKDIR /app + +# PostgreSQL library + +RUN apt-get update \ + && apt-get -y install libpq-dev gcc \ + && pip install psycopg2 + +ADD *.py . + +#COPY requirements.txt requirements.txt + +#RUN pip3 install -r requirements.txt + +CMD ["python3", "./data_replication_pg2pg.py"] \ No newline at end of file diff --git a/shared/ods_replication_pg2pg/data_replication_pg2pg.py b/shared/ods_replication_pg2pg/data_replication_pg2pg.py new file mode 100644 index 0000000..db61dec --- /dev/null +++ b/shared/ods_replication_pg2pg/data_replication_pg2pg.py @@ -0,0 +1,224 @@ +#!/usr/bin/env python +# coding: utf-8 + +# In[1]: Imports +# refer if block at line 38, some imports are conditional +import psycopg2 +import psycopg2.pool +import psycopg2.extras +from psycopg2.extras import execute_batch +import configparser +import time +import json +import concurrent.futures +from datetime import datetime +import sys +import os +import argparse + + +start = time.time() + +# In[3]: Retrieve Oracle database configuration +src_postgres_username = os.environ['DB_USERNAME'] +src_postgres_password = os.environ['DB_PASSWORD'] +src_postgres_host = os.environ['DB_HOST'] +src_postgres_port = os.environ['DB_PORT'] +src_postgres_database = os.environ['DATABASE'] +# In[4]: Retrieve Postgres database configuration +postgres_username = os.environ['ODS_USERNAME'] +postgres_password = os.environ['ODS_PASSWORD'] +postgres_host = os.environ['ODS_HOST'] +postgres_port = os.environ['ODS_PORT'] +postgres_database = os.environ['ODS_DATABASE'] +# In[5]: Script parameters +mstr_schema = os.environ['MSTR_SCHEMA'] +app_name = os.environ['APP_NAME'] +concurrent_tasks = int(os.environ['CONCUR_TASKS']) +audit_table = 'audit_batch_status' +current_date = datetime.now().strftime('%Y-%m-%d') + +#concurrent_tasks = int(concurrent_tasks) +#In[5]: Concurrent tasks - number of tables to be replicated in parallel +#concurrent_tasks = 5 + +# In[6]: Set up Oracle connection pool +# In[7]: Setup Postgres Pool +SrcPgresPool = psycopg2.pool.ThreadedConnectionPool( + minconn = concurrent_tasks, maxconn = concurrent_tasks,host=src_postgres_host, port=src_postgres_port, dbname=src_postgres_database, user=src_postgres_username, password=src_postgres_password +) +print('Source Postgres Connection Successful') + +# In[7]: Setup Postgres Pool +PgresPool = psycopg2.pool.ThreadedConnectionPool( + minconn = concurrent_tasks, maxconn = concurrent_tasks,host=postgres_host, port=postgres_port, dbname=postgres_database, user=postgres_username, password=postgres_password +) +print('Target Postgres Connection Successful') + +def del_audit_entries_rerun(current_date): + postgres_connection = PgresPool.getconn() + postgres_cursor = postgres_connection.cursor() + del_sql = f""" + DELETE FROM {mstr_schema}.{audit_table} c + where application_name='{app_name}' and batch_run_date='{current_date}' + """ + postgres_cursor.execute(del_sql) + postgres_connection.commit() + postgres_cursor.close() + PgresPool.putconn(postgres_connection) + return print(del_sql) + +# Function to insert the audit batch status entry +def audit_batch_status_insert(table_name,status): + postgres_connection = PgresPool.getconn() + postgres_cursor = postgres_connection.cursor() + try: + audit_batch_status_query = f"""INSERT INTO {mstr_schema}.{audit_table} VALUES ('{table_name}','{app_name}','replication','{status}',current_date)""" + print(audit_batch_status_query) + postgres_cursor.execute(audit_batch_status_query) + postgres_connection.commit() + print(f"Record inserted into audit batch status table") + return None + except Exception as e: + print(f"Error inserting record into to audit batch status table: {str(e)}") + return None + finally: + # Return the connection to the pool + if postgres_connection: + postgres_cursor.close() + PgresPool.putconn(postgres_connection) + +# In[8]: Function to get active rows from master table +def get_active_tables(mstr_schema,app_name): + postgres_connection = PgresPool.getconn() + postgres_cursor = postgres_connection.cursor() + list_sql = f""" + SELECT application_name,source_schema_name,source_table_name,target_schema_name,target_table_name,truncate_flag,cdc_flag,full_inc_flag,cdc_column,replication_order,customsql_ind,customsql_query + from {mstr_schema}.cdc_master_table_list c + where active_ind = 'Y' and application_name='{app_name}' + order by replication_order, source_table_name + """ + with postgres_connection.cursor() as curs: + curs.execute(list_sql) + rows = curs.fetchall() + postgres_connection.commit() + postgres_cursor.close() + PgresPool.putconn(postgres_connection) + return rows + +# In[9]: Function to extract data from Oracle +def extract_from_srcpg(table_name,source_schema,customsql_ind,customsql_query): + # Acquire a connection from the pool + srcpostgres_connection = SrcPgresPool.getconn() + srcpostgres_cursor = srcpostgres_connection.cursor() + try: + if customsql_ind == "Y": + # Use placeholders in the query and bind the table name as a parameter + sql_query=customsql_query + print(sql_query) + srcpostgres_cursor.execute(sql_query) + rows = srcpostgres_cursor.fetchall() + #OrcPool.release(oracle_connection) + return rows + else: + sql_query = f'SELECT * FROM {source_schema}.{table_name}' + print(sql_query) + srcpostgres_cursor.execute(sql_query) + rows = srcpostgres_cursor.fetchall() + #OrcPool.release(oracle_connection) + return rows + + except Exception as e: + audit_batch_status_insert(table_name,'failed') + print(f"Error extracting data from SrcPostgres: {str(e)}") + #OrcPool.release(oracle_connection) #Temporary change + return [] + + finally: + # Return the connection to the pool + if srcpostgres_connection: + srcpostgres_cursor.close() + SrcPgresPool.putconn(srcpostgres_connection) +# In[10]: Function to load data into Target PostgreSQL using data from Source Oracle +def load_into_postgres(table_name, data,target_schema): + postgres_connection = PgresPool.getconn() + postgres_cursor = postgres_connection.cursor() + try: + # Delete existing data in the target table + delete_query = f'TRUNCATE TABLE {target_schema}.{table_name}' + postgres_cursor.execute(delete_query) + + # Build the INSERT query with placeholders + insert_query = f'INSERT INTO {target_schema}.{table_name} VALUES ({", ".join(["%s"] * len(data[0]))})' + #insert_query = f'INSERT INTO {target_schema}.{table_name} VALUES %s' + + # Use execute_batch for efficient batch insert + with postgres_connection.cursor() as cursor: + # Prepare the data as a list of tuples + data_to_insert = [(tuple(row)) for row in data] + execute_batch(cursor, insert_query, data_to_insert) + postgres_connection.commit() + # Insert record to audit batch table + audit_batch_status_insert(table_name,'success') + + + except Exception as e: + print(f"Error loading data into PostgreSQL: {str(e)}") + audit_batch_status_insert(table_name,'failed') + finally: + # Return the connection to the pool + if postgres_connection: + postgres_cursor.close() + PgresPool.putconn(postgres_connection) + +# In[11]: Function to call both extract and load functions +def load_data_from_src_tgt(table_name,source_schema,target_schema,customsql_ind,customsql_query): + # Extract data from Oracle + print(f'Source: Thread {table_name} started at ' + datetime.now().strftime("%H:%M:%S")) + srcpg_data = extract_from_srcpg(table_name,source_schema,customsql_ind,customsql_query) # Ensure table name is in uppercase + print(f'Source: Extraction for {table_name} completed at ' + datetime.now().strftime("%H:%M:%S")) + + if srcpg_data: + # Load data into PostgreSQL + load_into_postgres(table_name, srcpg_data, target_schema) + print(f"Target: Data loaded into table: {table_name}") + print(f'Target: Thread {table_name} ended at ' + datetime.now().strftime("%H:%M:%S")) + +# In[12]: Initializing concurrency +if __name__ == '__main__': + # Main ETL process + active_tables_rows =get_active_tables(mstr_schema,app_name) + #print(active_tables_rows) + tables_to_extract = [(row[2],row[1],row[3],row[10],row[11]) for row in active_tables_rows] + + print(f"tables to extract are {tables_to_extract}") + print(f'No of concurrent tasks:{concurrent_tasks}') + #Delete audit entries for rerun on same day + del_audit_entries_rerun(current_date) + # Using ThreadPoolExecutor to run tasks concurrently + with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_tasks) as executor: + # Submit tasks to the executor + future_to_table = {executor.submit(load_data_from_src_tgt, table[0],table[1],table[2],table[3],table[4]): table for table in tables_to_extract} + + # Wait for all tasks to complete + concurrent.futures.wait(future_to_table) + + # Print results + for future in future_to_table: + table_name = future_to_table[future] + try: + # Get the result of the task, if any + future.result() + except Exception as e: + # Handle exceptions that occurred during the task + print(f"Error replicating {table_name}: {e}") + audit_batch_status_insert(table_name[0],'failed') + + # record end time + end = time.time() + SrcPgresPool.closeall() + PgresPool.closeall() + + print("ETL process completed successfully.") + print("The time of execution of the program is:", (end - start) , "secs") +