Skip to content

Commit

Permalink
feat: Improve chunk analysis performance (#75)
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreSilveiraAzion authored Aug 4, 2024
1 parent 32ca128 commit 98ae5bc
Show file tree
Hide file tree
Showing 14 changed files with 429 additions and 326 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ bin
lib/
__pycache__/
.vscode/
kaggle_datasets/
43 changes: 27 additions & 16 deletions commands/db_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ def do_databases(shell, arg):
databases = db_list.get('databases')
columns = db_list.get('columns')
if databases and columns:
shell.query_output(databases,columns)
shell.query_output(databases, columns)
else:
utils.write_output("Error: Invalid database information.")
else:
utils.write_output("No databases found.")
except Exception as e:
raise RuntimeError(f"Error: {e}") from e
raise RuntimeError(f"Error listing databases: {e}") from e

#command .use
def do_use(shell, arg):
Expand All @@ -34,9 +34,14 @@ def do_use(shell, arg):
utils.write_output("Invalid database name.")
return

if shell.edgeSql.set_current_database(database_name):
shell.update_prompt()
utils.write_output(f"Switched to database '{arg}'.")
try:
if shell.edgeSql.set_current_database(database_name):
shell.update_prompt()
utils.write_output(f"Switched to database '{arg}'.")
else:
utils.write_output(f"Failed to switch to database '{arg}'.")
except Exception as e:
raise RuntimeError(f"Error switching database: {e}") from e

#comand .dbinfo
def do_dbinfo(shell, arg):
Expand All @@ -50,12 +55,13 @@ def do_dbinfo(shell, arg):
if db_info:
data = db_info.get('table_data')
columns = db_info.get('columns')
shell.query_output(data,columns)
shell.query_output(data, columns)
else:
utils.write_output("Error: Unable to fetch database information.")
except Exception as e:
raise RuntimeError(f"Error: {e}") from e
raise RuntimeError(f"Error fetching database info: {e}") from e

# Command .dbsize
def do_dbsize(shell, arg):
"""Get the size of the current database in MB."""
if not shell.edgeSql.get_current_database_id():
Expand All @@ -64,13 +70,14 @@ def do_dbsize(shell, arg):

try:
output = shell.edgeSql.get_database_size()
size = output.get('rows')
column = output.get('columns')
shell.query_output(size,column)
if not output['success']:
utils.write_output(f"{output['error']}")
return
shell.query_output(output['data']['rows'], output['data']['columns'])
except Exception as e:
raise RuntimeError(f"Error: {e}") from e
#Command .create
raise RuntimeError(f"Error fetching database size: {e}") from e

#command .create
def do_create(shell, arg):
"""
Create a new database.
Expand Down Expand Up @@ -99,10 +106,11 @@ def do_create(shell, arg):

try:
shell.edgeSql.create_database(database_name)
utils.write_output(f"Database '{database_name}' created successfully.")
except Exception as e:
raise RuntimeError(f"Error creating database: {e}") from e

#Command .destroy
#command .destroy
def do_destroy(shell, arg):
"""
Destroy a database by name.
Expand Down Expand Up @@ -131,7 +139,10 @@ def do_destroy(shell, arg):
return

try:
shell.edgeSql.destroy_database(database_name)
utils.write_output(f"Database '{database_name}' has been successfully destroyed.")
status = shell.edgeSql.destroy_database(database_name)
if status:
utils.write_output(f"Database '{database_name}' has been successfully destroyed.")
else:
utils.write_output(f"Failed to destroy database '{database_name}'.")
except Exception as e:
raise RuntimeError(f"Error destroying database: {e}") from e
raise RuntimeError(f"Error destroying database: {e}") from e
53 changes: 28 additions & 25 deletions commands/db_basic.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,51 @@
import utils

def do_tables(shell, arg):
"""List all tables."""
"""
List all tables in the current database.
Args:
shell (EdgeSQLShell): The shell instance.
arg (str): Additional arguments (not used).
"""
try:
output = shell.edgeSql.list_tables()
except Exception as e:
raise RuntimeError(f"Error: {e}") from e
if not output['success']:
utils.write_output(f"Error: {output['error']}")
return
except RuntimeError as e:
utils.write_output(f"Error: {e}")
return

if output is not None:
rows = output.get('rows')
columns = output.get('columns')
if rows and columns:
shell.query_output(rows, columns)
else:
utils.write_output("No tables available.")
if output['data'] and len(output['data']['rows']) > 0:
shell.query_output(output['data']['rows'], output['data']['columns'])
else:
utils.write_output("Error listing tables.")

utils.write_output("No tables available.")

def do_schema(shell, arg):
"""
Describe table.
Describe the schema of a table.
Args:
shell (EdgeSQLShell): The shell instance.
arg (str): The name of the table to describe.
"""
if not arg:
utils.write_output("Usage: .schema <table_name>")
return
else:
table_name = arg.strip()

table_name = arg.strip()

try:
output = shell.edgeSql.describe_table(table_name)
except Exception as e:
raise RuntimeError(f"Error: {e}") from e
if not output['success']:
utils.write_output(f"Error: {output['error']}")
return
except RuntimeError as e:
utils.write_output(f"Error: {e}")
return

if output is not None:
rows = output.get('rows')
columns = output.get('columns')
if rows and columns:
shell.query_output(rows, columns)
else:
utils.write_output(f"No schema information found for table '{table_name}'.")
if output['data'] and len(output['data']['rows']) > 0:
shell.query_output(output['data']['rows'], output['data']['columns'])
else:
utils.write_output("Error describing table schema.")
utils.write_output(f"No schema information found for table '{table_name}'.")
53 changes: 37 additions & 16 deletions commands/db_dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def dump_table(shell, table_name, dump=DUMP_ALL, batch_size=512):
Args:
table_name (str): Name of the table to dump.
dump (int, optional): Flag indicating what to dump (schema only, data only, or both). Defaults to DUMP_ALL.
batch_size (int, optional): Size of each data batch for fetching. Defaults to 1000.
batch_size (int, optional): Size of each data batch for fetching. Defaults to 512.
"""
try:
# Check if the table exists
Expand All @@ -25,22 +25,30 @@ def dump_table(shell, table_name, dump=DUMP_ALL, batch_size=512):
# Dump schema if requested
if dump & DUMP_SCHEMA_ONLY:
# Table structure
table_output = shell.edgeSql.execute(f"SELECT sql FROM sqlite_schema WHERE type like 'table' \
table_output = shell.edgeSql.execute(f"SELECT sql FROM sqlite_schema WHERE type = 'table' \
AND sql NOT NULL \
AND name like '{table_name}' \
AND name = '{table_name}' \
ORDER BY tbl_name='sqlite_sequence', rowid;")
if table_output:
statement = table_output['rows'][0][0]
if not table_output['success']:
utils.write_output(f"{table_output['error']}")
return

if table_output['data']:
statement = table_output['data']['rows'][0][0]
formatted_query = sql.format_sql(f'CREATE TABLE IF NOT EXISTS {statement[13:]};')
utils.write_output(formatted_query, shell.output)

# Indexes, Triggers, and Views
additional_objects_output = shell.edgeSql.execute(f"SELECT sql FROM sqlite_schema \
WHERE sql NOT NULL \
AND tbl_name like '{table_name}' \
AND tbl_name = '{table_name}' \
AND type IN ('index','trigger','view');")
if additional_objects_output:
additional_objects = additional_objects_output['rows']
if not additional_objects_output['success']:
utils.write_output(f"{additional_objects_output['error']}")
return

if additional_objects_output['data']:
additional_objects = additional_objects_output['data']['rows']
for obj in additional_objects:
statement = obj[0]
if 'INDEX' in statement.upper():
Expand All @@ -59,16 +67,24 @@ def dump_table(shell, table_name, dump=DUMP_ALL, batch_size=512):
if dump & DUMP_DATA_ONLY:
# Get total count of rows in the table
count_output = shell.edgeSql.execute(f'SELECT COUNT(*) FROM {table_name};')
if count_output and count_output['rows']:
total_rows = count_output['rows'][0][0]
if not count_output['success']:
utils.write_output(f"{count_output['error']}")
return

if count_output['data']['rows']:
total_rows = count_output['data']['rows'][0][0]

# Fetch data in batches and generate SQL insert statements
offset = 0
while offset < total_rows:
limit = min(batch_size, total_rows - offset) # Calculate limit for this batch
data_output = shell.edgeSql.execute(f'SELECT * FROM {table_name} LIMIT {limit} OFFSET {offset};')
if data_output:
df = pd.DataFrame(data_output['rows'], columns=data_output['columns'])
if not data_output['success']:
utils.write_output(f"{data_output['error']}")
return

if data_output['data']:
df = pd.DataFrame(data_output['data']['rows'], columns=data_output['data']['columns'])
sql_commands = sql.generate_insert_sql(df, table_name)
for cmd in sql_commands:
utils.write_output(cmd, shell.output)
Expand All @@ -93,9 +109,13 @@ def _dump(shell, arg=False, dump=DUMP_ALL):

# Dump all tables if no specific tables are provided
if not arg or len(arg) == 0:
tables_output = shell.edgeSql.execute("SELECT name FROM sqlite_schema WHERE type like 'table';")
if tables_output:
table_lst = tables_output['rows']
tables_output = shell.edgeSql.execute("SELECT name FROM sqlite_schema WHERE type = 'table';")
if not tables_output['success']:
utils.write_output(f"{tables_output['error']}")
return

if tables_output['data']:
table_lst = tables_output['data']['rows']
for table in table_lst:
table_name = table[0]
if table_name == "sqlite_sequence":
Expand Down Expand Up @@ -145,4 +165,5 @@ def do_dump(shell, arg):
if dump_type == DUMP_NONE:
_dump(shell, arg=args, dump=DUMP_ALL)
else:
_dump(shell, arg=args,dump=dump_type)
_dump(shell, arg=args, dump=dump_type)

26 changes: 14 additions & 12 deletions commands/import.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ def _import_data(edgeSql, dataset_generator, table_name, chunk_size=512):
bool: True if the import is successful, False otherwise.
"""
try:
# Initialize variables
total_chunks = 0
chunks = []

Expand All @@ -30,30 +29,33 @@ def _import_data(edgeSql, dataset_generator, table_name, chunk_size=512):
total_chunks += 1

utils.write_output('Importing data...')
# Initialize tqdm for progress tracking
progress_bar = tqdm(total=total_chunks, desc="Progress", unit="chunk", dynamic_ncols=True)

# Reset generator
# Import chunks
dataset_generator = iter(chunks)
# Iterate over chunks
for chunk in dataset_generator:
# Check if the table exists and create it if necessary
if not edgeSql.exist_table(table_name):
create_sql = sql.generate_create_table_sql(chunk, table_name)
edgeSql.execute(create_sql)

# Generate SQL for data insertion
result = edgeSql.execute(create_sql)
if not result['success']:
utils.write_output(f"Error creating table: {result['error']}")
return False

# Generate SQL for data insertion if not exist
insert_sql = sql.generate_insert_sql(chunk, table_name)
edgeSql.execute(insert_sql)
result = edgeSql.execute(insert_sql)
if not result['success']:
utils.write_output(f"Error inserting data: {result['error']}")
return False

# Update progress bar
progress_bar.update(1)

progress_bar.close()

return True # Import successful
return True
except Exception as e:
raise RuntimeError(f'Error inserting data into database: {e}') from e
raise RuntimeError(f'{e}') from e
return False

def do_import(shell, arg):
Expand Down Expand Up @@ -140,7 +142,7 @@ def do_import(shell, arg):
return

status = _import_data(shell.edgeSql, dataset_generator, table_name)
if status == True:
if status:
utils.write_output(f"Data imported successfully into table '{table_name}'.")
else:
utils.write_output("Error: No data to import or import failed.")
Expand Down
Loading

0 comments on commit 98ae5bc

Please sign in to comment.