Skip to content
This repository has been archived by the owner on Aug 5, 2024. It is now read-only.

Commit

Permalink
fix: Import all the other project tables
Browse files Browse the repository at this point in the history
  • Loading branch information
rsavoye committed Feb 17, 2024
1 parent f5a94d7 commit d419d20
Showing 1 changed file with 217 additions and 47 deletions.
264 changes: 217 additions & 47 deletions tm_admin/projects/projects.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from dateutil.parser import parse
import tm_admin.types_tm
import geojson
import concurrent.futures
from cpuinfo import get_cpu_info
from shapely.geometry import shape
from shapely import centroid
Expand All @@ -37,19 +36,22 @@
from shapely import wkb, get_coordinates
from tm_admin.dbsupport import DBSupport
from tm_admin.generator import Generator
from osm_rawdata.postgres import uriParser, PostgresClient
from osm_rawdata.pgasync import PostgresClient
import re
# from progress import Bar, PixelBar
from tqdm import tqdm
import tqdm.asyncio
from codetiming import Timer
import asyncio

# The number of threads is based on the CPU cores
info = get_cpu_info()
cores = info["count"]
cores = info["count"] * 2

# Instantiate logger
log = logging.getLogger(__name__)

def updateThread(
async def updateThread(
data: list,
db: PostgresClient,
):
Expand Down Expand Up @@ -85,52 +87,218 @@ def __init__(self,
self.pg = None
self.profile = ProjectsTable()
self.types = dir(tm_admin.types_tm)
super().__init__('projects', dburi)
super().__init__('projects')

def mergeProjectInfo(self,
inuri: str,
):
async def mergeInfo(self,
inpg: PostgresClient,
):
table = 'project_info'
pg = PostgresClient(inuri)
sql = f"SELECT row_to_json({table}) as row FROM {table}"
# print(sql)
try:
result = pg.dbcursor.execute(sql)
except:
log.error(f"Couldn't execute query! {sql}")
return False

ignore = ['project_id', 'project_id_str', 'text_searchable', 'per_task_instructions']
result = pg.dbcursor.fetchall()
index = 0
pbar = tqdm(result)
timer = Timer(initial_text=f"Merging {table} table...",
text="merging table took {seconds:.0f}s",
logger=log.debug,
)
log.info(f"Merging {table} table...")

timer.start()
# columns = await inpg.getColumns(table)
columns = f"project_id, name, short_description, description, instructions, per_task_instructions"

sql = f"SELECT {columns} FROM {table} ORDER BY project_id"
print(sql)
result = await inpg.execute(sql)
timer.stop()

pbar = tqdm.tqdm(result)
for record in pbar:
index += 1
settings = ""
column = list()
for key, val in record[0].items():
if key in ignore:
continue
if key == "project_id":
key = "id"
elif key == "locale":
key = "default_locale"
else:
if val is None:
continue
key = f"{key}"
if type(val) == int:
entry = f"{key}={val}"
else:
val = val.replace("'", "")
entry = f"{key}='{val}'"
settings += f"{entry}, "

sql = f"UPDATE projects SET {settings[:-2]} WHERE id={record[0]['project_id']};"
# in Postgres, to escape a single quote, you use two single quotes. So
# we have to fix this before encoding it.
if record['name']:
name = record['name'].replace("'", "''").encode('utf-8')
if record['description']:
description = record['description'].replace("'", "''").encode('utf-8')
else:
description = "NULL".encode('utf-8')
if record['short_description']:
short = record['short_description'].replace("'", "''").encode('utf-8')
else:
short = "NULL".encode('utf-8')
if record['per_task_instructions']:
task = record['per_task_instructions'].replace("'", "''").encode('utf-8')
else:
task = "NULL".encode('utf-8')
if record['instructions']:
instructions = record['instructions'].replace("'", "''").encode('utf-8')
else:
instructions = "NULL".encode('utf-8')
# sql = f"UPDATE projects SET id={record['project_id']}, default_locale='{record['locale']}', name={name},short_description={short},description={description},instructions={instructions},per_task_instructions={task} WHERE id={record['project_id']};"
sql = f"UPDATE projects SET name='{name.decode('utf-8')}', short_description='{short.decode('utf-8')}', description='{description.decode('utf-8')}', instructions='{instructions.decode('utf-8')}' WHERE id={record['project_id']};"
await self.pg.execute(sql)

async def mergeInterests(self,
inpg: PostgresClient,
):
table = "project_interests"
log.error(f"mergeTeams() Unimplemented!")
timer = Timer(initial_text=f"Merging {table} table...",
text="merging table took {seconds:.0f}s",
logger=log.debug,
)
log.info(f"Merging {table} table...")
timer.start()
sql = f"SELECT * FROM {table} ORDER BY project_id"
print(sql)
result = await inpg.execute(sql)

entries = len(result)
log.debug(f"There are {entries} entries in {table}")
chunk = round(entries / cores)
pbar = tqdm.tqdm(result)

# This table has a small amount of data, so threading would be overkill.
for record in pbar:
pid = record.get('project_id')
sql = f" UPDATE projects SET interests = {record['interest_id']} WHERE id={pid}"
#print(sql)
result = await self.pg.execute(sql)

timer.stop()
return True

async def mergeAuxTables(self,
inuri: str,
outuri: str,
):
"""
Merge more tables from TM into the unified projects table.
Args:
inuri (str): The input database
outuri (str): The output database
"""
await self.connect(outuri)

inpg = PostgresClient()
await inpg.connect(inuri)

await self.mergeInfo(inpg)

await self.mergeChat(inpg)

await self.mergeTeams(inpg)

await self.mergeInterests(inpg)

await self.mergePriorities(inpg)

await self.mergeAllowed(inpg)

# The project favorites table is imported into the users table instead.
# await self.mergeFavorites(inpg)

async def mergeChat(self,
inpg: PostgresClient,
):
table = "project_chat"
log.error(f"mergeChat() Unimplemented!")
timer = Timer(initial_text=f"Merging {table} table...",
text="merging table took {seconds:.0f}s",
logger=log.debug,
)
log.info(f"Merging {table} table...")
timer.start()
sql = f"SELECT * FROM project_chat ORDER BY project_id"
# print(sql)
result = await inpg.execute(sql)
data = dict()
for record in tqdm.tqdm(result):
# The messages has embedded quotes.
message = record['message'].replace("'", "'")
sql = f" INSERT INTO chat(id, project_id, user_id, time_stamp, message) VALUES({record['id']}, {record['project_id']}, {record['user_id']}, '{record['time_stamp']}', '{message}')"
# print(sql)
result = await self.pg.execute(sql)

async def mergeTeams(self,
inpg: PostgresClient,
):
table = "project_teams"
log.error(f"mergeTeams() Unimplemented!")
timer = Timer(initial_text=f"Merging {table} table...",
text="merging table took {seconds:.0f}s",
logger=log.debug,
)
log.info(f"Merging {table} table...")

async def mergeAllowed(self,
inpg: PostgresClient,
):
table = "project_allowed_users"
timer = Timer(initial_text=f"Merging {table} table...",
text="merging table took {seconds:.0f}s",
logger=log.debug,
)
log.info(f"Merging {table} table...")
timer.start()
# It's faster to do this in Python than postgres
# sql = f"SELECT u.user_id,(SELECT ARRAY(SELECT c.project_id FROM {table} c WHERE c.user_id = u.user_id)) AS projects FROM {table} u;"
sql = f"SELECT * FROM project_allowed_users ORDER BY project_id"
print(sql)
result = await inpg.execute(sql)
data = dict()
for record in result:
if not record['project_id'] in data:
entry = list()
data[record['project_id']] = entry
entry.append(record['user_id'])

entries = len(result)
log.debug(f"There are {entries} entries in {table}, and {len(data)} in the array")
chunk = round(entries / cores)
# This table has a small amount of data, so threading would be overkill.
# for record in pbar:
for pid, array in tqdm.tqdm(data.items()):
sql = f" UPDATE projects SET allowed_users = ARRAY{array} WHERE id={pid}"
# print(sql)
result = await self.pg.execute(sql)

timer.stop()
return True

async def mergePriorities(self,
inpg: PostgresClient,
):
table = "project_priority_areas"
timer = Timer(initial_text=f"Merging {table} table...",
text="merging table took {seconds:.0f}s",
logger=log.debug,
)
log.info(f"Merging {table} table...")
timer.start()
# It's faster to do this in Python than postgres
# sql = f"SELECT u.user_id,(SELECT ARRAY(SELECT c.project_id FROM {table} c WHERE c.user_id = u.user_id)) AS projects FROM {table} u;"
sql = f"SELECT * FROM {table} ORDER BY project_id"
print(sql)
result = await inpg.execute(sql)
data = dict()
for record in result:
if not record['project_id'] in data:
entry = list()
data[record['project_id']] = entry
entry.append(record['priority_area_id'])

entries = len(result)
log.debug(f"There are {entries} entries in {table}, and {len(data)} in the array")
chunk = round(entries / cores)
# This table has a small amount of data, so threading would be overkill.
# for record in pbar:
for pid, array in tqdm.tqdm(data.items()):
sql = f" UPDATE projects SET priority_areas = ARRAY{array} WHERE id={pid}"
# print(sql)
self.pg.dbcursor.execute(sql[:-2])
result = await self.pg.execute(sql)

timer.stop()
return True

def main():
async def main():
"""This main function lets this class be run standalone by a bash script."""
parser = argparse.ArgumentParser()
parser.add_argument("-v", "--verbose", nargs="?", const="0", help="verbose output")
Expand Down Expand Up @@ -163,7 +331,7 @@ def main():
# user.resetSequence()
#all = proj.getAll()

proj.mergeProjectInfo(args.inuri)
await proj.mergeAuxTables(args.inuri, args.outuri)

# file = open(args.boundary, 'r')
# boundary = geojson.load(file)
Expand All @@ -187,4 +355,6 @@ def main():

if __name__ == "__main__":
"""This is just a hook so this file can be run standalone during development."""
main()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(main())

0 comments on commit d419d20

Please sign in to comment.