Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] plugin: add "age" factor to priority calculation #297

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 75 additions & 4 deletions src/plugins/mf_priority.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ extern "C" {
#include <cinttypes>
#include <vector>
#include <sstream>
#include <chrono>

// the plugin does not know about the association who submitted a job and will
// assign default values to the association until it receives information from
Expand All @@ -48,6 +49,7 @@ extern "C" {
std::map<int, std::map<std::string, struct bank_info>> users;
std::map<std::string, struct queue_info> queues;
std::map<int, std::string> users_def_bank;
std::map<std::string, int> priority_weights;

struct bank_info {
double fairshare;
Expand Down Expand Up @@ -96,11 +98,24 @@ int64_t priority_calculation (flux_plugin_t *p,
{
double fshare_factor = 0.0, priority = 0.0;
int queue_factor = 0;
int fshare_weight, queue_weight;
int fshare_weight, queue_weight, age_weight;
struct bank_info *b;

fshare_weight = 100000;
queue_weight = 10000;
double t_submit = 0.0;
double cur_time = 0.0;
double age_factor = 0.0;

fshare_weight = priority_weights["fshare_weight"];
queue_weight = priority_weights["queue_weight"];
age_weight = priority_weights["age_weight"];

// check values of priority factor weights; if not configured,
// these will be set to -1, so just use default weights
if (fshare_weight == -1)
fshare_weight = 100000;
if (queue_weight == -1)
queue_weight = 10000;
if (age_weight == -1)
age_weight = 1000;

if (urgency == FLUX_JOB_URGENCY_HOLD)
return FLUX_JOB_PRIORITY_MIN;
Expand All @@ -120,11 +135,31 @@ int64_t priority_calculation (flux_plugin_t *p,
return -1;
}

// fetch t_submit
flux_t *h = flux_jobtap_get_flux (p);
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:f}",
"t_submit", &t_submit) < 0) {
flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB, "mf_priority",
0, "job.state.priority: failed to get "
"t_submit");
return -1;
}

// get current time
cur_time = std::chrono::duration_cast <std::chrono::duration<double>> (
std::chrono::system_clock::now ().time_since_epoch ()).count ();

// calculate age of job
age_factor = cur_time - t_submit;

fshare_factor = b->fairshare;
queue_factor = b->queue_factor;

priority = round ((fshare_weight * fshare_factor) +
(queue_weight * queue_factor) +
(age_weight * age_factor) +
(urgency - 16));

if (priority < 0)
Expand Down Expand Up @@ -337,6 +372,41 @@ static json_t *user_to_json (
* *
*****************************************************************************/

/*
* Get config information about the various priority factor weights
* and assign them in the priority_weights map.
*/
static int conf_update_cb (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
void *data)
{
int fshare_weight = -1, queue_weight = -1, age_weight = -1;
flux_t *h = flux_jobtap_get_flux (p);

// unpack the various factors to be used in the multi-factor
// priority calculation
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s?{s?{s?i, s?i, s?i}}}",
"conf", "priority_factors",
"fshare_weight", &fshare_weight,
"queue_weight", &queue_weight,
"age_weight", &age_weight) < 0) {
flux_log_error (flux_jobtap_get_flux (p),
"mf_priority: conf.update: flux_plugin_arg_unpack: %s",
flux_plugin_arg_strerror (args));
}

// assign unpacked weights into priority_weights map
priority_weights["fshare_weight"] = fshare_weight;
priority_weights["queue_weight"] = queue_weight;
priority_weights["age_weight"] = age_weight;

return 0;
}


/*
* Get state of all user and bank information from plugin
*/
Expand Down Expand Up @@ -981,6 +1051,7 @@ static const struct flux_plugin_handler tab[] = {
{ "job.state.inactive", inactive_cb, NULL },
{ "job.state.depend", depend_cb, NULL },
{ "plugin.query", query_cb, NULL},
{ "conf.update", conf_update_cb, NULL},
{ 0 },
};

Expand Down
4 changes: 3 additions & 1 deletion t/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ TESTSCRIPTS = \
t1016-export-db.t \
t1017-update-db.t \
t1018-mf-priority-disable-entry.t \
t1019-mf-priority-info-fetch.t
t1019-mf-priority-info-fetch.t \
t1020-mf-priority-config.t \
t1021-mf-priority-age.t

dist_check_SCRIPTS = \
$(TESTSCRIPTS) \
Expand Down
12 changes: 11 additions & 1 deletion t/t1001-mf-priority-basic.t
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ MULTI_FACTOR_PRIORITY=${FLUX_BUILD_DIR}/src/plugins/.libs/mf_priority.so
SUBMIT_AS=${SHARNESS_TEST_SRCDIR}/scripts/submit_as.py
SEND_PAYLOAD=${SHARNESS_TEST_SRCDIR}/scripts/send_payload.py

mkdir -p config

export TEST_UNDER_FLUX_NO_JOB_EXEC=y
export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1"
test_under_flux 1 job
test_under_flux 1 job -o,--config-path=$(pwd)/config

flux setattr log-stderr-level 1

Expand All @@ -21,6 +23,14 @@ test_expect_success 'check that mf_priority plugin is loaded' '
flux jobtap list | grep mf_priority
'

test_expect_success 'disable age factor in multi-factor priority plugin' '
cat >config/test.toml <<-EOT &&
[priority_factors]
age_weight = 0
EOT
flux config reload
'

test_expect_success 'send an empty payload to make sure unpack fails' '
cat <<-EOF >bad_payload.py &&
import flux
Expand Down
12 changes: 11 additions & 1 deletion t/t1002-mf-priority-small-no-tie.t
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ MULTI_FACTOR_PRIORITY=${FLUX_BUILD_DIR}/src/plugins/.libs/mf_priority.so
SUBMIT_AS=${SHARNESS_TEST_SRCDIR}/scripts/submit_as.py
SEND_PAYLOAD=${SHARNESS_TEST_SRCDIR}/scripts/send_payload.py

mkdir -p config

export TEST_UNDER_FLUX_NO_JOB_EXEC=y
export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1"
test_under_flux 1 job
test_under_flux 1 job -o,--config-path=$(pwd)/config

flux setattr log-stderr-level 1

Expand All @@ -21,6 +23,14 @@ test_expect_success 'check that mf_priority plugin is loaded' '
flux jobtap list | grep mf_priority
'

test_expect_success 'disable age factor in multi-factor priority plugin' '
cat >config/test.toml <<-EOT &&
[priority_factors]
age_weight = 0
EOT
flux config reload
'

test_expect_success 'create a group of users with unique fairshare values' '
cat <<-EOF >fake_small_no_tie.json
{
Expand Down
12 changes: 11 additions & 1 deletion t/t1003-mf-priority-small-tie.t
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ MULTI_FACTOR_PRIORITY=${FLUX_BUILD_DIR}/src/plugins/.libs/mf_priority.so
SUBMIT_AS=${SHARNESS_TEST_SRCDIR}/scripts/submit_as.py
SEND_PAYLOAD=${SHARNESS_TEST_SRCDIR}/scripts/send_payload.py

mkdir -p config

export TEST_UNDER_FLUX_NO_JOB_EXEC=y
export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1"
test_under_flux 1 job
test_under_flux 1 job -o,--config-path=$(pwd)/config

flux setattr log-stderr-level 1

Expand All @@ -21,6 +23,14 @@ test_expect_success 'check that mf_priority plugin is loaded' '
flux jobtap list | grep mf_priority
'

test_expect_success 'disable age factor in multi-factor priority plugin' '
cat >config/test.toml <<-EOT &&
[priority_factors]
age_weight = 0
EOT
flux config reload
'

test_expect_success 'create a group of users with some ties in fairshare values' '
cat <<-EOF >fake_small_tie.json
{
Expand Down
12 changes: 11 additions & 1 deletion t/t1004-mf-priority-small-tie-all.t
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ MULTI_FACTOR_PRIORITY=${FLUX_BUILD_DIR}/src/plugins/.libs/mf_priority.so
SUBMIT_AS=${SHARNESS_TEST_SRCDIR}/scripts/submit_as.py
SEND_PAYLOAD=${SHARNESS_TEST_SRCDIR}/scripts/send_payload.py

mkdir -p config

export TEST_UNDER_FLUX_NO_JOB_EXEC=y
export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1"
test_under_flux 1 job
test_under_flux 1 job -o,--config-path=$(pwd)/config

flux setattr log-stderr-level 1

Expand All @@ -21,6 +23,14 @@ test_expect_success 'check that mf_priority plugin is loaded' '
flux jobtap list | grep mf_priority
'

test_expect_success 'disable age factor in multi-factor priority plugin' '
cat >config/test.toml <<-EOT &&
[priority_factors]
age_weight = 0
EOT
flux config reload
'

test_expect_success 'create a group of users with many ties in fairshare values' '
cat <<-EOF >fake_small_tie_all.json
{
Expand Down
12 changes: 11 additions & 1 deletion t/t1005-max-jobs-limits.t
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ MULTI_FACTOR_PRIORITY=${FLUX_BUILD_DIR}/src/plugins/.libs/mf_priority.so
SUBMIT_AS=${SHARNESS_TEST_SRCDIR}/scripts/submit_as.py
SEND_PAYLOAD=${SHARNESS_TEST_SRCDIR}/scripts/send_payload.py

mkdir -p config

export TEST_UNDER_FLUX_NO_JOB_EXEC=y
export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1"
test_under_flux 1 job
test_under_flux 1 job -o,--config-path=$(pwd)/config

flux setattr log-stderr-level 1

Expand All @@ -21,6 +23,14 @@ test_expect_success 'check that mf_priority plugin is loaded' '
flux jobtap list | grep mf_priority
'

test_expect_success 'disable age factor in multi-factor priority plugin' '
cat >config/test.toml <<-EOT &&
[priority_factors]
age_weight = 0
EOT
flux config reload
'

test_expect_success 'create fake_user.json' '
cat <<-EOF >fake_user.json
{
Expand Down
12 changes: 11 additions & 1 deletion t/t1008-mf-priority-update.t
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ MULTI_FACTOR_PRIORITY=${FLUX_BUILD_DIR}/src/plugins/.libs/mf_priority.so
SUBMIT_AS=${SHARNESS_TEST_SRCDIR}/scripts/submit_as.py
DB_PATH=$(pwd)/FluxAccountingTest.db

mkdir -p config

export TEST_UNDER_FLUX_NO_JOB_EXEC=y
export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1"
test_under_flux 1 job
test_under_flux 1 job -o,--config-path=$(pwd)/config

flux setattr log-stderr-level 1

Expand All @@ -21,6 +23,14 @@ test_expect_success 'check that mf_priority plugin is loaded' '
flux jobtap list | grep mf_priority
'

test_expect_success 'disable age factor in multi-factor priority plugin' '
cat >config/test.toml <<-EOT &&
[priority_factors]
age_weight = 0
EOT
flux config reload
'

test_expect_success 'create flux-accounting DB' '
flux account -p $(pwd)/FluxAccountingTest.db create-db
'
Expand Down
12 changes: 11 additions & 1 deletion t/t1012-mf-priority-load.t
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ test_description='Test multi-factor priority plugin and loading user information
MULTI_FACTOR_PRIORITY=${FLUX_BUILD_DIR}/src/plugins/.libs/mf_priority.so
SEND_PAYLOAD=${SHARNESS_TEST_SRCDIR}/scripts/send_payload.py

mkdir -p config

export TEST_UNDER_FLUX_NO_JOB_EXEC=y
export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1"
test_under_flux 1 job
test_under_flux 1 job -o,--config-path=$(pwd)/config

flux setattr log-stderr-level 1

Expand All @@ -20,6 +22,14 @@ test_expect_success 'check that mf_priority plugin is loaded' '
flux jobtap list | grep mf_priority
'

test_expect_success 'disable age factor in multi-factor priority plugin' '
cat >config/test.toml <<-EOT &&
[priority_factors]
age_weight = 0
EOT
flux config reload
'

test_expect_success 'create fake_payload.py' '
cat <<-EOF >fake_payload.py
import flux
Expand Down
10 changes: 10 additions & 0 deletions t/t1013-mf-priority-queues.t
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ test_expect_success 'check that mf_priority plugin is loaded' '
flux jobtap list | grep mf_priority
'

test_expect_success 'disable age factor in multi-factor priority plugin' '
cat >conf.d/test.toml <<-EOT &&
[priority_factors]
age_weight = 0
EOT
flux config reload
'

test_expect_success 'create flux-accounting DB' '
flux account -p $(pwd)/FluxAccountingTest.db create-db
'
Expand Down Expand Up @@ -82,6 +90,8 @@ test_expect_success 'configure flux with those queues' '
[queues.silver]
[queues.gold]
[queues.foo]
[priority_factors]
age_factor = 0
EOT
flux config reload
'
Expand Down
12 changes: 11 additions & 1 deletion t/t1014-mf-priority-dne.t
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ test_description='Test cancelling active jobs with a late user/bank info load'
. `dirname $0`/sharness.sh
MULTI_FACTOR_PRIORITY=${FLUX_BUILD_DIR}/src/plugins/.libs/mf_priority.so

mkdir -p config

export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1"
test_under_flux 1 job
test_under_flux 1 job -o,--config-path=$(pwd)/config

flux setattr log-stderr-level 1

Expand All @@ -18,6 +20,14 @@ test_expect_success 'check that mf_priority plugin is loaded' '
flux jobtap list | grep mf_priority
'

test_expect_success 'disable age factor in multi-factor priority plugin' '
cat >config/test.toml <<-EOT &&
[priority_factors]
age_weight = 0
EOT
flux config reload
'

test_expect_success 'submit a number of jobs with no user/bank info loaded to plugin' '
jobid1=$(flux mini submit --wait-event=depend hostname) &&
jobid2=$(flux mini submit --wait-event=depend hostname) &&
Expand Down
Loading