Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

priority plugin: add support for unpacking priorities for queues in TOML config #541

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions doc/man5/flux-config-accounting.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ flux-config-accounting(5)
DESCRIPTION
===========

accounting.factor-weights
-------------------------

The flux-accounting priority plugin can be configured to assign different
weights to the different factors used when calculating a job's priority.
Assigning a higher weight to a factor will result in it having more
Expand All @@ -24,7 +27,7 @@ The ``accounting.factor-weights`` sub-table may contain the following keys:


KEYS
====
^^^^

fairshare
Integer value that represents the weight to be associated with an
Expand All @@ -36,10 +39,36 @@ queue


EXAMPLE
=======
^^^^^^^

::

[accounting.factor-weights]
fairshare = 10000
queue = 1000

accounting.queue-priorities
---------------------------

The priority plugin's queues can also be configured to have a different
associated integer priority in the TOML file. Queues can positively or
negatively affect a job's calculated priority depending on the priority
assigned to each one. By default, queues have an associated priority of 0,
meaning they do not affect a job's priority at all.

If a queue defined in the config file is unknown to the priority plugin, it
will add the queue to its internal map. Otherwise, it will update the queue's
priority with the new value.

The ``accounting.queue-priorities`` sub-table should list any configured queue
as the key and its associated integer priority as the value.

EXAMPLE
^^^^^^^

::

[accounting.queue-priorities]
bronze=100
silver=200
gold=300
35 changes: 32 additions & 3 deletions src/plugins/mf_priority.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,23 +205,26 @@ static void add_special_association (flux_plugin_t *p, flux_t *h, int userid)

/*
* Get config information about the various priority factor weights
* and assign them in the priority_weights map.
* and assign them in the priority_weights map. Update the queues map with
* any defined integer priorities.
*/
static int conf_update_cb (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
void *data)
{
int fshare_weight = -1, queue_weight = -1;
json_t *queue_priorities = nullptr;
flux_t *h = flux_jobtap_get_flux (p);

// unpack the various factors to be used in job priority calculation
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s?{s?{s?{s?i, s?i}}}}",
"{s?{s?{s?{s?i, s?i}, s?o}}}",
"conf", "accounting", "factor-weights",
"fairshare", &fshare_weight,
"queue", &queue_weight) < 0) {
"queue", &queue_weight,
"queue-priorities", &queue_priorities) < 0) {
flux_log_error (flux_jobtap_get_flux (p),
"mf_priority: conf.update: flux_plugin_arg_unpack: %s",
flux_plugin_arg_strerror (args));
Expand All @@ -234,6 +237,32 @@ static int conf_update_cb (flux_plugin_t *p,
if (queue_weight != -1)
priority_weights["queue"] = queue_weight;

if (queue_priorities) {
const char *key;
json_t *value;

json_object_foreach (queue_priorities, key, value) {
if (!key || !json_is_integer (value)) {
flux_log_error (h, "mf_priority: invalid data in queue-priorities");
continue;
}

const std::string queue_name (key);
int priority = json_integer_value (value);

auto it = queues.find (queue_name);
if (it != queues.end ()) {
// update the priority for the existing queue
it->second.priority = priority;
} else {
// queue does not exist; create a new queue
Queue new_queue;
new_queue.priority = priority;
queues[queue_name] = new_queue;
}
}
}

return 0;
}

Expand Down
1 change: 1 addition & 0 deletions t/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ TESTSCRIPTS = \
t1041-view-jobs-by-project.t \
t1042-issue508.t \
t1043-view-jobs-by-bank.t \
t1044-mf-priority-queue-config.t \
t5000-valgrind.t \
python/t1000-example.py \
python/t1001_db.py \
Expand Down
119 changes: 119 additions & 0 deletions t/t1044-mf-priority-queue-config.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
#!/bin/bash

test_description='test configuring priorities of queues in priority plugin'

. `dirname $0`/sharness.sh

mkdir -p conf.d

MULTI_FACTOR_PRIORITY=${FLUX_BUILD_DIR}/src/plugins/.libs/mf_priority.so
SUBMIT_AS=${SHARNESS_TEST_SRCDIR}/scripts/submit_as.py
SEND_PAYLOAD=${SHARNESS_TEST_SRCDIR}/scripts/send_payload.py
DB_PATH=$(pwd)/FluxAccountingTest.db

export TEST_UNDER_FLUX_NO_JOB_EXEC=y
export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1"
test_under_flux 1 job -o,--config-path=$(pwd)/conf.d

flux setattr log-stderr-level 1

test_expect_success 'allow guest access to testexec' '
flux config load <<-EOF
[exec.testexec]
allow-guests = true
EOF
'

test_expect_success 'load priority plugin' '
flux jobtap load ${MULTI_FACTOR_PRIORITY}
'

test_expect_success 'create a flux-accounting DB' '
flux account -p ${DB_PATH} create-db
'

test_expect_success 'start flux-accounting service' '
flux account-service -p ${DB_PATH} -t
'

test_expect_success 'add some banks to the DB' '
flux account add-bank root 1 &&
flux account add-bank --parent-bank=root bankA 1
'

test_expect_success 'add some queues to the DB with configured priorities' '
flux account add-queue bronze --priority=100 &&
flux account add-queue silver --priority=500 &&
flux account add-queue gold --priority=1000
'

test_expect_success 'add a user to the DB' '
flux account add-user \
--username=user5001 \
--userid=5001 \
--bank=bankA \
--queues=bronze,silver
'

test_expect_success 'send flux-accounting DB information to the plugin' '
flux account-priority-update -p ${DB_PATH}
'

test_expect_success 'stop the queue' '
flux queue stop
'

test_expect_success 'configure flux with queues' '
cat >conf.d/queues.toml <<-EOT &&
[queues.bronze]
[queues.silver]
[queues.gold]
EOT
flux config reload &&
flux queue stop --all
'

test_expect_success 'submit a job to bronze queue' '
job=$(flux python ${SUBMIT_AS} 5001 -n1 --queue=bronze hostname) &&
flux job wait-event -f json ${job} priority | jq '.context.priority' > job.priority &&
test $(cat job.priority) -eq 1050000 &&
flux cancel ${job}
'

test_expect_success 'decrease priority for the bronze queue in config' '
cat >conf.d/test.toml <<-EOT &&
[accounting.queue-priorities]
bronze = 0
EOT
flux config reload &&
flux queue stop --all
'

test_expect_success 'submit another job to bronze queue; priority is negatively affected' '
job=$(flux python ${SUBMIT_AS} 5001 -n1 --queue=bronze hostname) &&
flux job wait-event -f json ${job} priority | jq '.context.priority' > job.priority &&
test $(cat job.priority) -eq 50000 &&
flux cancel ${job}
'

test_expect_success 'increase priority for the bronze queue in config' '
cat >conf.d/test.toml <<-EOT &&
[accounting.queue-priorities]
bronze = 123
EOT
flux config reload &&
flux queue stop --all
'

test_expect_success 'submit another job to bronze queue; priority is positively affected' '
job=$(flux python ${SUBMIT_AS} 5001 -n1 --queue=bronze hostname) &&
flux job wait-event -f json ${job} priority | jq '.context.priority' > job.priority &&
test $(cat job.priority) -eq 1280000 &&
flux cancel ${job}
'

test_expect_success 'shut down flux-accounting service' '
flux python -c "import flux; flux.Flux().rpc(\"accounting.shutdown_service\").get()"
'

test_done
Loading