-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
7 changed files
with
210 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
32 changes: 32 additions & 0 deletions
32
bin/migrate-oats-data/srw/sql/submission/primary_contact/srw_primary_contact.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
WITH ranked_contacts AS ( | ||
SELECT oaap.alr_application_id, | ||
oaap.alr_application_party_id, | ||
op.person_id, | ||
op.first_name, | ||
op.last_name, | ||
op.middle_name, | ||
op.title, | ||
oo.organization_id, | ||
oo.organization_name, | ||
oo.alias_name, | ||
opo.phone_number, | ||
opo.cell_phone_number, | ||
opo.email_address, | ||
oaap.alr_appl_role_code, | ||
ns."uuid" AS notification_submission_uuid, | ||
ROW_NUMBER() OVER ( | ||
PARTITION BY alr_application_id | ||
ORDER BY alr_appl_role_code, | ||
oaap.when_created | ||
) AS rn | ||
FROM oats.oats_alr_application_parties oaap | ||
JOIN oats.oats_person_organizations opo ON opo.person_organization_id = oaap.person_organization_id | ||
LEFT JOIN oats.oats_organizations oo ON oo.organization_id = opo.organization_id | ||
LEFT JOIN oats.oats_persons op ON op.person_id = opo.person_id | ||
JOIN alcs.notification_submission ns ON ns.file_number = oaap.alr_application_id::TEXT | ||
AND ns.type_code = 'SRW' | ||
WHERE oaap.alr_appl_role_code in ('AGENT', 'APPL') | ||
) | ||
SELECT * | ||
FROM ranked_contacts | ||
WHERE rn = 1 |
31 changes: 31 additions & 0 deletions
31
bin/migrate-oats-data/srw/sql/submission/primary_contact/srw_primary_contact_count.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
WITH ranked_contacts AS ( | ||
SELECT oaap.alr_application_id, | ||
oaap.alr_application_party_id, | ||
op.person_id, | ||
op.first_name, | ||
op.last_name, | ||
op.middle_name, | ||
oo.organization_id, | ||
oo.organization_name, | ||
oo.alias_name, | ||
opo.phone_number, | ||
opo.cell_phone_number, | ||
opo.email_address, | ||
oaap.alr_appl_role_code, | ||
ns."uuid" AS notification_submission_uuid, | ||
ROW_NUMBER() OVER ( | ||
PARTITION BY alr_application_id | ||
ORDER BY alr_appl_role_code, | ||
oaap.when_created | ||
) AS rn | ||
FROM oats.oats_alr_application_parties oaap | ||
JOIN oats.oats_person_organizations opo ON opo.person_organization_id = oaap.person_organization_id | ||
LEFT JOIN oats.oats_organizations oo ON oo.organization_id = opo.organization_id | ||
LEFT JOIN oats.oats_persons op ON op.person_id = opo.person_id | ||
JOIN alcs.notification_submission ns ON ns.file_number = oaap.alr_application_id::TEXT | ||
AND ns.type_code = 'SRW' | ||
WHERE oaap.alr_appl_role_code in ('AGENT', 'APPL') | ||
) | ||
SELECT count(*) | ||
FROM ranked_contacts | ||
WHERE rn = 1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
1 change: 1 addition & 0 deletions
1
bin/migrate-oats-data/srw/submission/primary_contact/__init__.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
from .srw_process_primary_contact import process_alcs_srw_primary_contact |
140 changes: 140 additions & 0 deletions
140
bin/migrate-oats-data/srw/submission/primary_contact/srw_process_primary_contact.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
from common import BATCH_UPLOAD_SIZE, setup_and_get_logger | ||
from db import inject_conn_pool | ||
from psycopg2.extras import RealDictCursor, execute_batch | ||
|
||
etl_name = "process_alcs_srw_primary_contact" | ||
logger = setup_and_get_logger(etl_name) | ||
|
||
|
||
@inject_conn_pool | ||
def process_alcs_srw_primary_contact(conn=None, batch_size=BATCH_UPLOAD_SIZE): | ||
""" | ||
This function is responsible for populating the primary contact details notification_submission | ||
Args: | ||
conn (psycopg2.extensions.connection): PostgreSQL database connection. Provided by the decorator. | ||
batch_size (int): The number of items to process at once. Defaults to BATCH_UPLOAD_SIZE. | ||
""" | ||
|
||
logger.info(f"Start {etl_name}") | ||
with conn.cursor(cursor_factory=RealDictCursor) as cursor: | ||
with open( | ||
"srw/sql/submission/primary_contact/srw_primary_contact_count.sql", | ||
"r", | ||
encoding="utf-8", | ||
) as sql_file: | ||
count_query = sql_file.read() | ||
cursor.execute(count_query) | ||
count_total = dict(cursor.fetchone())["count"] | ||
logger.info(f"Total SRW data to update: {count_total}") | ||
|
||
failed_inserts = 0 | ||
successful_updates_count = 0 | ||
last_application_id = 0 | ||
|
||
with open( | ||
"srw/sql/submission/primary_contact/srw_primary_contact.sql", | ||
"r", | ||
encoding="utf-8", | ||
) as sql_file: | ||
application_sql = sql_file.read() | ||
while True: | ||
cursor.execute( | ||
f""" | ||
{application_sql} | ||
AND alr_application_id > {last_application_id} ORDER BY alr_application_id; | ||
""" | ||
) | ||
|
||
rows = cursor.fetchmany(batch_size) | ||
|
||
if not rows: | ||
break | ||
try: | ||
records_to_be_updated_count = len(rows) | ||
|
||
_update_records(conn, batch_size, cursor, rows) | ||
|
||
successful_updates_count = ( | ||
successful_updates_count + records_to_be_updated_count | ||
) | ||
last_application_id = dict(rows[-1])["alr_application_id"] | ||
|
||
logger.debug( | ||
f"retrieved/updated items count: {records_to_be_updated_count}; total successfully updated SRWs so far {successful_updates_count}; last updated alr_application_id: {last_application_id}" | ||
) | ||
except Exception as err: | ||
# this is NOT going to be caused by actual data update failure. This code is only executed when the code error appears or connection to DB is lost | ||
logger.exception(err) | ||
conn.rollback() | ||
failed_inserts = count_total - successful_updates_count | ||
last_application_id = last_application_id + 1 | ||
|
||
logger.info( | ||
f"Finished {etl_name}: total amount of successful updates {successful_updates_count}, total failed updates {failed_inserts}" | ||
) | ||
|
||
|
||
def _update_records(conn, batch_size, cursor, rows): | ||
parsed_data_list = _prepare_oats_data(rows) | ||
|
||
if len(parsed_data_list) > 0: | ||
execute_batch( | ||
cursor, | ||
_update_query, | ||
parsed_data_list, | ||
page_size=batch_size, | ||
) | ||
|
||
conn.commit() | ||
|
||
|
||
_update_query = """ | ||
UPDATE | ||
alcs.notification_submission | ||
SET | ||
contact_email = %(contact_email)s, | ||
contact_first_name = %(contact_first_name)s, | ||
contact_last_name= %(contact_last_name)s, | ||
contact_organization = %(contact_organization)s, | ||
contact_phone = %(contact_phone)s | ||
WHERE | ||
alcs.notification_submission.file_number = %(file_number)s::TEXT | ||
""" | ||
|
||
|
||
def _prepare_oats_data(row_data_list): | ||
data_list = [] | ||
for row in row_data_list: | ||
data_list.append(_map_fields(dict(row))) | ||
return data_list | ||
|
||
|
||
def _map_fields(data): | ||
return { | ||
"contact_email": data["email_address"], | ||
"contact_first_name": _get_name(data), | ||
"contact_last_name": data["last_name"], | ||
"contact_organization": _get_organization_name(data), | ||
"contact_phone": data.get("phone_number", "cell_phone_number"), | ||
"file_number": data["alr_application_id"], | ||
} | ||
|
||
|
||
def _get_organization_name(row): | ||
organization_name = (row.get("organization_name") or "").strip() | ||
alias_name = (row.get("alias_name") or "").strip() | ||
|
||
if not organization_name and not alias_name: | ||
return row["title"] | ||
|
||
return f"{organization_name} {alias_name}".strip() | ||
|
||
|
||
def _get_name(row): | ||
first_name = row.get("first_name", None) | ||
middle_name = row.get("middle_name", None) | ||
|
||
return " ".join( | ||
[name for name in (first_name, middle_name) if name is not None] | ||
).strip() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters