diff --git a/tm_admin/projects/projects.py b/tm_admin/projects/projects.py index 4cfa02de..27f14244 100755 --- a/tm_admin/projects/projects.py +++ b/tm_admin/projects/projects.py @@ -28,7 +28,6 @@ from dateutil.parser import parse import tm_admin.types_tm import geojson -import concurrent.futures from cpuinfo import get_cpu_info from shapely.geometry import shape from shapely import centroid @@ -37,19 +36,22 @@ from shapely import wkb, get_coordinates from tm_admin.dbsupport import DBSupport from tm_admin.generator import Generator -from osm_rawdata.postgres import uriParser, PostgresClient +from osm_rawdata.pgasync import PostgresClient import re # from progress import Bar, PixelBar from tqdm import tqdm +import tqdm.asyncio +from codetiming import Timer +import asyncio # The number of threads is based on the CPU cores info = get_cpu_info() -cores = info["count"] +cores = info["count"] * 2 # Instantiate logger log = logging.getLogger(__name__) -def updateThread( +async def updateThread( data: list, db: PostgresClient, ): @@ -85,52 +87,218 @@ def __init__(self, self.pg = None self.profile = ProjectsTable() self.types = dir(tm_admin.types_tm) - super().__init__('projects', dburi) + super().__init__('projects') - def mergeProjectInfo(self, - inuri: str, - ): + async def mergeInfo(self, + inpg: PostgresClient, + ): table = 'project_info' - pg = PostgresClient(inuri) - sql = f"SELECT row_to_json({table}) as row FROM {table}" - # print(sql) - try: - result = pg.dbcursor.execute(sql) - except: - log.error(f"Couldn't execute query! {sql}") - return False - - ignore = ['project_id', 'project_id_str', 'text_searchable', 'per_task_instructions'] - result = pg.dbcursor.fetchall() - index = 0 - pbar = tqdm(result) + timer = Timer(initial_text=f"Merging {table} table...", + text="merging table took {seconds:.0f}s", + logger=log.debug, + ) + log.info(f"Merging {table} table...") + + timer.start() + # columns = await inpg.getColumns(table) + columns = f"project_id, name, short_description, description, instructions, per_task_instructions" + + sql = f"SELECT {columns} FROM {table} ORDER BY project_id" + print(sql) + result = await inpg.execute(sql) + timer.stop() + + pbar = tqdm.tqdm(result) for record in pbar: - index += 1 - settings = "" - column = list() - for key, val in record[0].items(): - if key in ignore: - continue - if key == "project_id": - key = "id" - elif key == "locale": - key = "default_locale" - else: - if val is None: - continue - key = f"{key}" - if type(val) == int: - entry = f"{key}={val}" - else: - val = val.replace("'", "") - entry = f"{key}='{val}'" - settings += f"{entry}, " - - sql = f"UPDATE projects SET {settings[:-2]} WHERE id={record[0]['project_id']};" + # in Postgres, to escape a single quote, you use two single quotes. So + # we have to fix this before encoding it. + if record['name']: + name = record['name'].replace("'", "''").encode('utf-8') + if record['description']: + description = record['description'].replace("'", "''").encode('utf-8') + else: + description = "NULL".encode('utf-8') + if record['short_description']: + short = record['short_description'].replace("'", "''").encode('utf-8') + else: + short = "NULL".encode('utf-8') + if record['per_task_instructions']: + task = record['per_task_instructions'].replace("'", "''").encode('utf-8') + else: + task = "NULL".encode('utf-8') + if record['instructions']: + instructions = record['instructions'].replace("'", "''").encode('utf-8') + else: + instructions = "NULL".encode('utf-8') + # sql = f"UPDATE projects SET id={record['project_id']}, default_locale='{record['locale']}', name={name},short_description={short},description={description},instructions={instructions},per_task_instructions={task} WHERE id={record['project_id']};" + sql = f"UPDATE projects SET name='{name.decode('utf-8')}', short_description='{short.decode('utf-8')}', description='{description.decode('utf-8')}', instructions='{instructions.decode('utf-8')}' WHERE id={record['project_id']};" + await self.pg.execute(sql) + + async def mergeInterests(self, + inpg: PostgresClient, + ): + table = "project_interests" + log.error(f"mergeTeams() Unimplemented!") + timer = Timer(initial_text=f"Merging {table} table...", + text="merging table took {seconds:.0f}s", + logger=log.debug, + ) + log.info(f"Merging {table} table...") + timer.start() + sql = f"SELECT * FROM {table} ORDER BY project_id" + print(sql) + result = await inpg.execute(sql) + + entries = len(result) + log.debug(f"There are {entries} entries in {table}") + chunk = round(entries / cores) + pbar = tqdm.tqdm(result) + + # This table has a small amount of data, so threading would be overkill. + for record in pbar: + pid = record.get('project_id') + sql = f" UPDATE projects SET interests = {record['interest_id']} WHERE id={pid}" + #print(sql) + result = await self.pg.execute(sql) + + timer.stop() + return True + + async def mergeAuxTables(self, + inuri: str, + outuri: str, + ): + """ + Merge more tables from TM into the unified projects table. + + Args: + inuri (str): The input database + outuri (str): The output database + """ + await self.connect(outuri) + + inpg = PostgresClient() + await inpg.connect(inuri) + + await self.mergeInfo(inpg) + + await self.mergeChat(inpg) + + await self.mergeTeams(inpg) + + await self.mergeInterests(inpg) + + await self.mergePriorities(inpg) + + await self.mergeAllowed(inpg) + + # The project favorites table is imported into the users table instead. + # await self.mergeFavorites(inpg) + + async def mergeChat(self, + inpg: PostgresClient, + ): + table = "project_chat" + log.error(f"mergeChat() Unimplemented!") + timer = Timer(initial_text=f"Merging {table} table...", + text="merging table took {seconds:.0f}s", + logger=log.debug, + ) + log.info(f"Merging {table} table...") + timer.start() + sql = f"SELECT * FROM project_chat ORDER BY project_id" + # print(sql) + result = await inpg.execute(sql) + data = dict() + for record in tqdm.tqdm(result): + # The messages has embedded quotes. + message = record['message'].replace("'", "'") + sql = f" INSERT INTO chat(id, project_id, user_id, time_stamp, message) VALUES({record['id']}, {record['project_id']}, {record['user_id']}, '{record['time_stamp']}', '{message}')" + # print(sql) + result = await self.pg.execute(sql) + + async def mergeTeams(self, + inpg: PostgresClient, + ): + table = "project_teams" + log.error(f"mergeTeams() Unimplemented!") + timer = Timer(initial_text=f"Merging {table} table...", + text="merging table took {seconds:.0f}s", + logger=log.debug, + ) + log.info(f"Merging {table} table...") + + async def mergeAllowed(self, + inpg: PostgresClient, + ): + table = "project_allowed_users" + timer = Timer(initial_text=f"Merging {table} table...", + text="merging table took {seconds:.0f}s", + logger=log.debug, + ) + log.info(f"Merging {table} table...") + timer.start() + # It's faster to do this in Python than postgres + # sql = f"SELECT u.user_id,(SELECT ARRAY(SELECT c.project_id FROM {table} c WHERE c.user_id = u.user_id)) AS projects FROM {table} u;" + sql = f"SELECT * FROM project_allowed_users ORDER BY project_id" + print(sql) + result = await inpg.execute(sql) + data = dict() + for record in result: + if not record['project_id'] in data: + entry = list() + data[record['project_id']] = entry + entry.append(record['user_id']) + + entries = len(result) + log.debug(f"There are {entries} entries in {table}, and {len(data)} in the array") + chunk = round(entries / cores) + # This table has a small amount of data, so threading would be overkill. + # for record in pbar: + for pid, array in tqdm.tqdm(data.items()): + sql = f" UPDATE projects SET allowed_users = ARRAY{array} WHERE id={pid}" + # print(sql) + result = await self.pg.execute(sql) + + timer.stop() + return True + + async def mergePriorities(self, + inpg: PostgresClient, + ): + table = "project_priority_areas" + timer = Timer(initial_text=f"Merging {table} table...", + text="merging table took {seconds:.0f}s", + logger=log.debug, + ) + log.info(f"Merging {table} table...") + timer.start() + # It's faster to do this in Python than postgres + # sql = f"SELECT u.user_id,(SELECT ARRAY(SELECT c.project_id FROM {table} c WHERE c.user_id = u.user_id)) AS projects FROM {table} u;" + sql = f"SELECT * FROM {table} ORDER BY project_id" + print(sql) + result = await inpg.execute(sql) + data = dict() + for record in result: + if not record['project_id'] in data: + entry = list() + data[record['project_id']] = entry + entry.append(record['priority_area_id']) + + entries = len(result) + log.debug(f"There are {entries} entries in {table}, and {len(data)} in the array") + chunk = round(entries / cores) + # This table has a small amount of data, so threading would be overkill. + # for record in pbar: + for pid, array in tqdm.tqdm(data.items()): + sql = f" UPDATE projects SET priority_areas = ARRAY{array} WHERE id={pid}" # print(sql) - self.pg.dbcursor.execute(sql[:-2]) + result = await self.pg.execute(sql) + + timer.stop() + return True -def main(): +async def main(): """This main function lets this class be run standalone by a bash script.""" parser = argparse.ArgumentParser() parser.add_argument("-v", "--verbose", nargs="?", const="0", help="verbose output") @@ -163,7 +331,7 @@ def main(): # user.resetSequence() #all = proj.getAll() - proj.mergeProjectInfo(args.inuri) + await proj.mergeAuxTables(args.inuri, args.outuri) # file = open(args.boundary, 'r') # boundary = geojson.load(file) @@ -187,4 +355,6 @@ def main(): if __name__ == "__main__": """This is just a hook so this file can be run standalone during development.""" - main() + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(main())