Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

export-db/pop-db: extend functionality to return more data, allow import of any table in database #534

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 90 additions & 81 deletions src/bindings/python/fluxacct/accounting/db_info_subcommands.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,90 +10,99 @@
# SPDX-License-Identifier: LGPL-3.0
###############################################################
import csv
import sqlite3

from fluxacct.accounting import bank_subcommands as b
from fluxacct.accounting import user_subcommands as u

def export_db_info(conn):
"""
Export all of the information from the tables in the flux-accounting DB into
separate .csv files.
"""
try:
cur = conn.cursor()
# get all tables from DB
cur.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cur.fetchall()

# loop through each table and export it to a separate .csv file
for table_name in tables:
output_csv = f"{table_name[0]}.csv"
cur.execute(f"SELECT * FROM {table_name[0]}")
rows = cur.fetchall()
column_names = [description[0] for description in cur.description]

# write data to .csv file
with open(output_csv, "w", newline="") as csv_file:
writer = csv.writer(csv_file)
writer.writerow(column_names)
writer.writerows(rows)
except sqlite3.OperationalError as exc:
raise sqlite3.OperationalError(f"export-db: {exc}")
except IOError as exc:
raise IOError(f"export-db: {exc}")


def populate_db(conn, csv_file, columns_included=None):
"""
Populate an existing table from a single .csv file with an option
to specify columns to include. The .csv file must have the column names in
the first line to indicate which columns to insert into the table.

Args:
csv_file: Path to the .csv file. The name of the .csv file must match the
name of the table in the flux-accounting DB.

columns_included (list, optional): List of columns to include from the .csv
file. If None, it will include all columns listed in the .csv file.

def export_db_info(conn, users=None, banks=None):
Raises:
ValueError: If the table derived from the .csv file name does not match
any of the tables in the flux-accounting DB.
"""
try:
cur = conn.cursor()
select_users_stmt = """
SELECT username, userid, bank, shares, max_running_jobs, max_active_jobs,
max_nodes, queues FROM association_table
"""
cur.execute(select_users_stmt)
table = cur.fetchall()

# open a .csv file for writing
users_filepath = users if users else "users.csv"
users_file = open(users_filepath, "w")
with users_file:
writer = csv.writer(users_file)

for row in table:
writer.writerow(row)

select_banks_stmt = """
SELECT bank, parent_bank, shares FROM bank_table
"""
cur.execute(select_banks_stmt)
table = cur.fetchall()

banks_filepath = banks if banks else "banks.csv"
banks_file = open(banks_filepath, "w")
with banks_file:
writer = csv.writer(banks_file)

for row in table:
writer.writerow(row)
except IOError as err:
print(err)


def populate_db(conn, users=None, banks=None):
if banks is not None:
try:
with open(banks) as csv_file:
csv_reader = csv.reader(csv_file, delimiter=",")

for row in csv_reader:
b.add_bank(
conn,
bank=row[0],
parent_bank=row[1],
shares=row[2],
)
except IOError as err:
print(err)

if users is not None:
try:
with open(users) as csv_file:
csv_reader = csv.reader(csv_file, delimiter=",")

# assign default values to fields if
# their slot is empty in the csv file
for row in csv_reader:
username = row[0]
uid = row[1]
bank = row[2]
shares = row[3] if row[3] != "" else 1
max_running_jobs = row[4] if row[4] != "" else 5
max_active_jobs = row[5] if row[5] != "" else 7
max_nodes = row[6] if row[6] != "" else 2147483647
queues = row[7]

u.add_user(
conn,
username,
bank,
uid,
shares,
max_running_jobs,
max_active_jobs,
max_nodes,
queues,

# extract table name from .csv filename; check if it exists in DB
table_name = csv_file.split("/")[-1].replace(".csv", "")
cur.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = [table[0] for table in cur.fetchall()]

if table_name not in tables:
raise ValueError(
f'pop-db: table "{table_name}" does not exist in the database'
)

with open(csv_file, "r", newline="") as file:
reader = csv.reader(file)
all_columns = next(reader) # column names

if columns_included:
# filter only the columns specified
columns = [col for col in all_columns if col in columns_included]
else:
columns = all_columns

for row in reader:
# build a list of (column, value) pairs for columns with non-empty values
column_value_pairs = [
(columns[i], row[i]) for i in range(len(columns)) if row[i] != ""
]
if column_value_pairs:
# separate columns and values for the SQL statement
cols_to_insert = [pair[0] for pair in column_value_pairs]
vals_to_insert = [pair[1] for pair in column_value_pairs]
insert_sql = (
f"INSERT INTO {table_name} "
f"({', '.join(cols_to_insert)}) "
f"VALUES ({', '.join(['?' for _ in vals_to_insert])})"
)
except IOError as err:
print(err)

# execute the insertion with only the non-empty values
cur.execute(insert_sql, vals_to_insert)

conn.commit()
except sqlite3.OperationalError as exc:
conn.rollback()
raise sqlite3.OperationalError(f"pop-db: {exc}")
except IOError as exc:
raise IOError(f"pop-db: {exc}")
20 changes: 13 additions & 7 deletions src/cmd/flux-account-service.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,17 +547,17 @@ def scrub_old_jobs(self, handle, watcher, msg, arg):

def export_db(self, handle, watcher, msg, arg):
try:
val = d.export_db_info(
self.conn,
msg.payload["users"],
msg.payload["banks"],
)
val = d.export_db_info(self.conn)

payload = {"export_db": val}

handle.respond(msg, payload)
except KeyError as exc:
handle.respond_error(msg, 0, f"missing key in payload: {exc}")
except sqlite3.OperationalError as exc:
handle.respond_error(msg, 0, f"a SQLite error occurred: {exc}")
except IOError as exc:
handle.respond_error(msg, 0, f"an IO error occurred: {exc}")
except Exception as exc:
handle.respond_error(
msg, 0, f"a non-OSError exception was caught: {str(exc)}"
Expand All @@ -567,15 +567,21 @@ def pop_db(self, handle, watcher, msg, arg):
try:
val = d.populate_db(
self.conn,
msg.payload["users"],
msg.payload["banks"],
msg.payload["csv_file"],
msg.payload["fields"],
)

payload = {"pop_db": val}

handle.respond(msg, payload)
except KeyError as exc:
handle.respond_error(msg, 0, f"missing key in payload: {exc}")
except sqlite3.OperationalError as exc:
handle.respond_error(msg, 0, f"a SQLite error occurred: {exc}")
except IOError as exc:
handle.respond_error(msg, 0, f"an IO error occurred: {exc}")
except ValueError as exc:
handle.respond_error(msg, 0, f"{exc}")
except Exception as exc:
handle.respond_error(
msg, 0, f"a non-OSError exception was caught: {str(exc)}"
Expand Down
53 changes: 4 additions & 49 deletions src/cmd/flux-account.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,69 +577,24 @@ def add_scrub_job_records_arg(subparsers):
def add_export_db_arg(subparsers):
subparser = subparsers.add_parser(
"export-db",
help="""
Extract flux-accounting database information into two .csv files.

Order of columns extracted from association_table:

Username,UserID,Bank,Shares,MaxRunningJobs,MaxActiveJobs,MaxNodes,Queues

If no custom path is specified, this will create a file in the
current working directory called users.csv.

----------------

Order of columns extracted from bank_table:

Bank,ParentBank,Shares

If no custom path is specified, this will create a file in the
current working directory called banks.csv.

Use these two files to populate a new flux-accounting DB with:

flux account pop-db -b banks.csv -u users.csv
""",
help="extract flux-accounting DB information into separate .csv files",
formatter_class=flux.util.help_formatter(),
)
subparser.set_defaults(func="export_db")
subparser.add_argument(
"-u", "--users", help="path to a .csv file containing user information"
)
subparser.add_argument(
"-b", "--banks", help="path to a .csv file containing bank information"
)


def add_pop_db_arg(subparsers):
subparser = subparsers.add_parser(
"pop-db",
help="""
Description: Populate a flux-accounting database with a .csv file.

Order of elements required for populating association_table:

Username,UserID,Bank,Shares,MaxRunningJobs,MaxActiveJobs,MaxNodes,Queues

[Shares], [MaxRunningJobs], [MaxActiveJobs], and [MaxNodes] can be left
blank ('') in the .csv file for a given row.

----------------

Order of elements required for populating bank_table:

Bank,ParentBank,Shares

[ParentBank] can be left blank ('') in .csv file for a given row.
""",
help="populate a table in the flux-accounting DB with a .csv file",
formatter_class=flux.util.help_formatter(),
)
subparser.set_defaults(func="pop_db")
subparser.add_argument(
"-u", "--users", help="path to a .csv file containing user information"
"-c", "--csv-file", help="path to a .csv file containing table information"
)
subparser.add_argument(
"-b", "--banks", help="path to a .csv file containing bank information"
"-f", "--fields", help="which fields to insert into the table"
)


Expand Down
53 changes: 31 additions & 22 deletions t/t1009-pop-db.t
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,15 @@ test_expect_success 'start flux-accounting service' '
flux account-service -p ${DB_PATH} -t
'

test_expect_success 'create a banks.csv file containing bank information' '
cat <<-EOF >banks.csv
test_expect_success 'try to populate flux-accounting DB with a bad filename' '
touch foo.csv &&
test_must_fail flux account pop-db -c foo.csv > error.out 2>&1 &&
grep "table \"foo\" does not exist in the database" error.out
'

test_expect_success 'create bank_table.csv' '
cat <<-EOF >bank_table.csv
bank,parent_bank,shares
root,,1
A,root,1
B,root,1
Expand All @@ -30,41 +37,43 @@ test_expect_success 'create a banks.csv file containing bank information' '
EOF
'

test_expect_success 'populate flux-accounting DB with banks.csv' '
flux account pop-db -b banks.csv
test_expect_success 'populate flux-accounting DB with bank_table.csv' '
flux account pop-db -c bank_table.csv
'

test_expect_success 'create a users.csv file containing user information' '
cat <<-EOF >users.csv
user1000,1000,A,1,10,15,5,""
user1001,1001,A,1,10,15,5,""
user1002,1002,A,1,10,15,5,""
user1003,1003,A,1,10,15,5,""
user1004,1004,A,1,10,15,5,""
test_expect_success 'create association_table.csv' '
cat <<-EOF >association_table.csv
creation_time,username,userid,bank,default_bank,shares,max_running_jobs,max_active_jobs,max_nodes,queues
0,user1000,1000,A,A,1,10,15,5,""
0,user1001,1001,A,A,1,10,15,5,""
0,user1002,1002,A,A,1,10,15,5,""
0,user1003,1003,A,A,1,10,15,5,""
0,user1004,1004,A,A,1,10,15,5,""
EOF
'

test_expect_success 'populate flux-accounting DB with users.csv' '
flux account pop-db -u users.csv
test_expect_success 'populate association_table with association_table.csv' '
flux account pop-db -c association_table.csv
'

test_expect_success 'check database hierarchy to make sure all banks & users were added' '
flux account view-bank root -t > db_hierarchy_base.test &&
test_cmp ${EXPECTED_FILES}/db_hierarchy_base.expected db_hierarchy_base.test
'

test_expect_success 'create a users.csv file with some missing optional user information' '
cat <<-EOF >users_optional_vals.csv
user1005,1005,B,1,5,,5,""
user1006,1006,B,,,,5,""
user1007,1007,B,1,7,,,""
user1008,1008,B,,,,5,""
user1009,1009,B,1,9,,,""
test_expect_success 'create association_table.csv with some missing user information' '
cat <<-EOF >association_table.csv
creation_time,username,userid,bank,default_bank,shares,max_running_jobs,max_active_jobs,max_nodes,queues
0,user1005,1005,B,B,1,5,,5,""
0,user1006,1006,B,B,,,,5,""
0,user1007,1007,B,B,1,7,,,""
0,user1008,1008,B,B,,,,5,""
0,user1009,1009,B,B,1,9,,,""
EOF
'

test_expect_success 'populate flux-accounting DB with users_optional_vals.csv' '
flux account pop-db -u users_optional_vals.csv
test_expect_success 'populate association_table' '
flux account pop-db -c association_table.csv
'

test_expect_success 'check database hierarchy to make sure new users were added' '
Expand Down
Loading
Loading