Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resource allocation colormap #3453

Open
wants to merge 15 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions notebooks/resource-allocation/upload_df.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this file? Can it be deleted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can delete this file.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, thank you; then please rm.

Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import pandas as pd

# Example data loading
filename = './data/jobs_2024-02-21.tsv.gz'
df = pd.read_csv(filename, sep='\t', dtype={'extra_info': str})

# Convert string to timedelta, then to total seconds
df['ElapsedRawTime'] = pd.to_timedelta(
df['ElapsedRawTime']).apply(
lambda x: x.total_seconds())

cname = "Validate"
sname = "Diversity types - alpha_vector"
df = df[(df.cName == cname) & (df.sName == sname)]

df['samples'] = df['samples'].fillna(0).astype(int)
df['columns'] = df['columns'].fillna(0).astype(int)
df['input_size'] = df['input_size'].fillna(0).astype(int)
df['MaxRSSRaw'] = df['MaxRSSRaw'].fillna(0).astype(int)
df['ElapsedRawTime'] = df['ElapsedRawTime'].fillna(0).astype(int)

COL_NAME = 'samples * columns'
df[COL_NAME] = df['samples'] * df['columns']
columns = ["MaxRSSRaw", "ElapsedRawTime"]
max_rows = []

for curr in columns:
# Get the maximum value for 'curr' within each COL_NAME group
max_values = df.groupby(COL_NAME)[curr].transform(max)
# Filter rows where the current column's value
# is the maximum within its group
curr_rows = df[df[curr] == max_values]
max_rows.append(curr_rows)

filtered_df = pd.concat(max_rows).drop_duplicates().reset_index(drop=True)

# INSERT INTO qiita.processing_job(processing_job_id, email, command_id,
# command_parameters, processing_job_status_id)
# VALUES('ca27ddbc-a678-4b09-8a1d-b65f52f8eb49',
# '[email protected]', 1, '""'::json, 1);

# INSERT INTO qiita.slurm_resource_allocations(processing_job_id, samples,
# columns, input_size, extra_info, memory_used, walltime_used)
# VALUES('ca27ddbc-a678-4b09-8a1d-b65f52f8eb49', 39, 81, 2, 'nan',
# 327036000, 91);

# processing_job_id uuid NOT NULL,
# samples integer,
# columns integer,
# input_size bigint,
# extra_info varchar DEFAULT NULL,
# memory_used bigint,
# walltime_used integer,

res = ""

for index, row in filtered_df.iterrows():
res += f"""('{row['QiitaID']}', '[email protected]', 1, '""'::json, 1),\n"""
res += ";\n"
res += "Split\n"
for index, row in filtered_df.iterrows():
res += (
f"('{row['QiitaID']}', {int(row['samples'])}, "
f"{int(row['columns'])}, {int(row['input_size'])}, "
f"'{row['extra_info']}', {int(row['MaxRSSRaw'])}, "
f"{int(row['ElapsedRawTime'])}),\n"
)

res += ";\n"

with open("sql.txt", 'w') as filename:
filename.write(res)
23 changes: 10 additions & 13 deletions qiita_db/meta_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ def update_resource_allocation_redis(active=True):
if len(df) == 0:
continue

fig, axs = resource_allocation_plot(df, cname, sname, col_name)
fig, axs = resource_allocation_plot(df, col_name)
titles = [0, 0]
images = [0, 0]

Expand All @@ -605,21 +605,18 @@ def update_resource_allocation_redis(active=True):
# only time
new_fig = plt.figure()
new_ax = new_fig.add_subplot(111)

scatter_data = ax.collections[0]
new_ax.scatter(scatter_data.get_offsets()[:, 0],
scatter_data.get_offsets()[:, 1],
s=scatter_data.get_sizes(), label="data")

line = ax.lines[0]
new_ax.plot(line.get_xdata(), line.get_ydata(),
linewidth=1, color='orange')

if len(ax.collections) > 1:
failure_data = ax.collections[1]
new_ax.scatter(failure_data.get_offsets()[:, 0],
failure_data.get_offsets()[:, 1],
color='red', s=3, label="failures")
handles, labels = ax.get_legend_handles_labels()
for handle, label, scatter_data in zip(handles,
labels,
ax.collections):
color = handle.get_facecolor()
new_ax.scatter(scatter_data.get_offsets()[:, 0],
scatter_data.get_offsets()[:, 1],
s=scatter_data.get_sizes(), label=label,
color=color)

new_ax.set_xscale('log')
new_ax.set_yscale('log')
Expand Down
8 changes: 8 additions & 0 deletions qiita_db/support_files/patches/93.sql
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,11 @@ CREATE INDEX IF NOT EXISTS processing_job_command_parameters_payload ON qiita.pr
-- Addding contraints for the slurm_reservation column
ALTER TABLE qiita.analysis DROP CONSTRAINT IF EXISTS analysis_slurm_reservation_valid_chars;
ALTER TABLE qiita.analysis ADD CONSTRAINT analysis_slurm_reservation_valid_chars CHECK ( slurm_reservation ~ '^[a-zA-Z0-9_]*$' );

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we did a release this morning so these lines need to be moved to 94.sql.

-- Jan 7, 2025
-- Adding a table for formulas for resource allocations
CREATE TABLE qiita.allocation_equations (
equation_id SERIAL PRIMARY KEY,
equation_name TEXT NOT NULL,
expression TEXT NOT NULL
);
17 changes: 7 additions & 10 deletions qiita_db/test/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -1327,8 +1327,7 @@ def setUp(self):

def test_plot_return(self):
# check the plot returns correct objects
fig1, axs1 = qdb.util.resource_allocation_plot(
self.df, self.cname, self.sname, self.col_name)
fig1, axs1 = qdb.util.resource_allocation_plot(self.df, self.col_name)
self.assertIsInstance(
fig1, Figure,
"Returned object fig1 is not a Matplotlib Figure")
Expand All @@ -1345,13 +1344,12 @@ def test_minimize_const(self):
fig, axs = plt.subplots(ncols=2, figsize=(10, 4), sharey=False)

bm, options = qdb.util._resource_allocation_plot_helper(
self.df, axs[0], self.cname, self.sname, 'MaxRSSRaw',
qdb.util.MODELS_MEM, self.col_name)
self.df, axs[0], 'MaxRSSRaw', qdb.util.MODELS_MEM, self.col_name)
# check that the algorithm chooses correct model for MaxRSSRaw and
# has 0 failures
k, a, b = options.x
failures_df = qdb.util._resource_allocation_failures(
self.df, k, a, b, bm, self.col_name, 'MaxRSSRaw')
failures_df = qdb.util._resource_allocation_success_failures(
self.df, k, a, b, bm, self.col_name, 'MaxRSSRaw')[-1]
failures = failures_df.shape[0]
self.assertEqual(bm, qdb.util.mem_model3,
msg=f"""Best memory model
Expand All @@ -1367,11 +1365,10 @@ def test_minimize_const(self):
# check that the algorithm chooses correct model for ElapsedRaw and
# has 1 failure
bm, options = qdb.util._resource_allocation_plot_helper(
self.df, axs[1], self.cname, self.sname, 'ElapsedRaw',
qdb.util.MODELS_TIME, self.col_name)
self.df, axs[1], 'ElapsedRaw', qdb.util.MODELS_TIME, self.col_name)
k, a, b = options.x
failures_df = qdb.util._resource_allocation_failures(
self.df, k, a, b, bm, self.col_name, 'ElapsedRaw')
failures_df = qdb.util._resource_allocation_success_failures(
self.df, k, a, b, bm, self.col_name, 'ElapsedRaw')[-1]
failures = failures_df.shape[0]

self.assertEqual(bm, qdb.util.time_model1,
Expand Down
59 changes: 35 additions & 24 deletions qiita_db/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
from email.mime.text import MIMEText

import matplotlib.pyplot as plt
from matplotlib import colormaps
import numpy as np
import pandas as pd
from io import StringIO
Expand All @@ -93,8 +94,8 @@
time_model1 = (lambda x, k, a, b: a + b + np.log(x) * k)
time_model2 = (lambda x, k, a, b: a + b * x + np.log(x) * k)
time_model3 = (lambda x, k, a, b: a + b * np.log(x)**2 + np.log(x) * k)
time_model4 = (lambda x, k, a, b: a * np.log(x)**3 + b * np.log(x)**2
+ np.log(x) * k)
time_model4 = (lambda x, k, a, b: a * np.log(x)**3 + b * np.log(x)**2 +
np.log(x) * k)

MODELS_TIME = [time_model1, time_model2, time_model3, time_model4]

Expand Down Expand Up @@ -2363,17 +2364,13 @@ def send_email(to, subject, body):
smtp.close()


def resource_allocation_plot(df, cname, sname, col_name):
def resource_allocation_plot(df, col_name):
"""Builds resource allocation plot for given filename and jobs

Parameters
----------
file : str, required
Builds plot for the specified file name. Usually provided as tsv.gz
cname: str, required
Specified job type
sname: str, required
Specified job sub type.
col_name: str, required
Specifies x axis for the graph

Expand All @@ -2392,12 +2389,12 @@ def resource_allocation_plot(df, cname, sname, col_name):
ax = axs[0]
# models for memory
_resource_allocation_plot_helper(
df, ax, cname, sname, "MaxRSSRaw", MODELS_MEM, col_name)
df, ax, "MaxRSSRaw", MODELS_MEM, col_name)

ax = axs[1]
# models for time
_resource_allocation_plot_helper(
df, ax, cname, sname, "ElapsedRaw", MODELS_TIME, col_name)
df, ax, "ElapsedRaw", MODELS_TIME, col_name)

return fig, axs

Expand Down Expand Up @@ -2442,7 +2439,7 @@ def retrieve_resource_data(cname, sname, version, columns):


def _resource_allocation_plot_helper(
df, ax, cname, sname, curr, models, col_name):
df, ax, curr, models, col_name):
"""Helper function for resource allocation plot. Builds plot for MaxRSSRaw
and ElapsedRaw

Expand All @@ -2459,14 +2456,14 @@ def _resource_allocation_plot_helper(
col_name: str, required
Specifies x axis for the graph
curr: str, required
Either MaxRSSRaw or ElapsedRaw
Either MaxRSSRaw or ElapsedRaw (y axis)
models: list, required
List of functions that will be used for visualization

"""

x_data, y_data = df[col_name], df[curr]
ax.scatter(x_data, y_data, s=2, label="data")
# ax.scatter(x_data, y_data, s=2, label="data")
d = dict()
for index, row in df.iterrows():
x_value = row[col_name]
Expand Down Expand Up @@ -2518,13 +2515,21 @@ def _resource_allocation_plot_helper(
str(timedelta(seconds=round(cmin_value, 2))).rstrip('0').rstrip('.')

x_plot = np.array(df[col_name])
failures_df = _resource_allocation_failures(
success_df, failures_df = _resource_allocation_success_failures(
df, k, a, b, best_model, col_name, curr)
failures = failures_df.shape[0]

ax.scatter(failures_df[col_name], failures_df[curr], color='red', s=3,
label="failures")

success_df['node_name'] = success_df['node_name'].fillna('unknown')
slurm_hosts = set(success_df['node_name'].tolist())
cmap = colormaps.get_cmap('Accent').resampled(len(slurm_hosts))
colors = [cmap(
i / (len(slurm_hosts) - 1)) for i in range(len(slurm_hosts))]

for i, host in enumerate(slurm_hosts):
host_df = success_df[success_df['node_name'] == host]
ax.scatter(host_df[col_name], host_df[curr], color=colors[i], s=3,
label=host)
ax.set_title(
f'k||a||b: {k}||{a}||{b}\n'
f'model: {get_model_name(best_model)}\n'
Expand Down Expand Up @@ -2590,8 +2595,10 @@ def _resource_allocation_calculate(
options = minimize(_resource_allocation_custom_loss, init,
args=(x, y, model, middle))
k, a, b = options.x
failures_df = _resource_allocation_failures(
df, k, a, b, model, col_name, type_)
# important: here we take the 2nd (last) value of tuple since
# the helper function returns success, then failures.
failures_df = _resource_allocation_success_failures(
df, k, a, b, model, col_name, type_)[-1]
y_plot = model(x, k, a, b)
if not any(y_plot):
continue
Expand Down Expand Up @@ -2676,9 +2683,9 @@ def _resource_allocation_custom_loss(params, x, y, model, p):
return np.mean(weighted_residuals)


def _resource_allocation_failures(df, k, a, b, model, col_name, type_):
def _resource_allocation_success_failures(df, k, a, b, model, col_name, type_):
"""Helper function for resource allocation plot. Creates a dataframe with
failures.
successes and failures given current model.

Parameters
----------
Expand All @@ -2699,14 +2706,18 @@ def _resource_allocation_failures(df, k, a, b, model, col_name, type_):

Returns
----------
pandas.Dataframe
Dataframe containing failures for current type.
tuple with:
pandas.Dataframe
Dataframe containing successes for current type.
pandas.Dataframe
Dataframe containing failures for current type.
"""

x_plot = np.array(df[col_name])
df[f'c{type_}'] = model(x_plot, k, a, b)
success_df = df[df[type_] <= df[f'c{type_}']]
failures_df = df[df[type_] > df[f'c{type_}']]
return failures_df
return (success_df, failures_df)


def MaxRSS_helper(x):
Expand Down Expand Up @@ -2834,8 +2845,8 @@ def update_resource_allocation_table(weeks=1, test=None):
def merge_rows(rows):
date_fmt = '%Y-%m-%dT%H:%M:%S'
wait_time = (
datetime.strptime(rows.iloc[0]['Start'], date_fmt)
- datetime.strptime(rows.iloc[0]['Submit'], date_fmt))
datetime.strptime(rows.iloc[0]['Start'], date_fmt) -
datetime.strptime(rows.iloc[0]['Submit'], date_fmt))
if rows.shape[0] >= 2:
tmp = rows.iloc[1].copy()
else:
Expand Down
Loading
Loading