Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add log_task_usage plugin #26

Merged
merged 4 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ RUN sh -c 'cd /tmp && unzip awscliv2.zip' && sh /tmp/aws/install

# miniwdl-aws (and PyPI dependencies listed in setup.py)
COPY ./ /tmp/miniwdl-aws/
RUN bash -c 'cd /tmp/miniwdl-aws && pip3 install .'
RUN bash -c 'cd /tmp/miniwdl-aws && pip3 install . && pip3 install ./plugin_log_task_usage'

# cleanup (for squashed image)
RUN yum clean all && rm -rf /tmp/miniwdl* /tmp/aws*
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ Each task job's log is also forwarded to [CloudWatch Logs](https://docs.aws.amaz

Misconfigured infrastructure might prevent logs from being written to EFS or CloudWatch at all. In that case, use the AWS Batch console/API to find status messages for the workflow or task jobs.

Tasks can self-report their CPU & memory usage in their standard error logs, by setting `MINIWDL__LOG_TASK_USAGE__PERIOD=60` to report every 60 seconds (or as desired). Submit with `--verbose`, or look at the `stderr.txt` files in the task run directories, to see the "container usage" log messages.

## GPU jobs

Miniwdl-aws recognizes the `gpu: true` setting in a task `runtime{}` section, and translates that to a [GPU resource requirement](https://docs.aws.amazon.com/batch/latest/userguide/gpu-jobs.html) for AWS Batch. For the job to be scheduled, the Batch compute environment must of course make GPU instance types available.
Expand Down
36 changes: 36 additions & 0 deletions plugin_log_task_usage/StressTest.wdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
version 1.1
# MINIWDL__LOG_TASK_USAGE__PERIOD=2 miniwdl run examples/plugin_log_task_usage/StressTest.wdl --dir /tmp --verbose
# MINIWDL__LOG_TASK_USAGE__PERIOD=2 miniwdl-aws-submit plugin_log_task_usage/StressTest.wdl --verbose --follow

task StressTest {
input {
Int cpu = 4
Int memory_G = 2
Int cpu_memory_duration_s = 10
Int disk_load_G = 2

String docker = "polinux/stress" # Docker image with stress tool
}

command <<<
set -euxo pipefail

>&2 ls -l /sys/fs/cgroup

stress --cpu 4 --vm 1 --vm-bytes ~{memory_G}G --vm-hang 0 --timeout ~{cpu_memory_duration_s}s || true
dd if=/dev/zero of=testfile bs=1G count=~{disk_load_G}
sync
cat testfile > /dev/null &
sleep 5
>>>

runtime {
docker: docker
memory: "${memory_G*2}G"
cpu: cpu
}

output {
File stderr_txt = stderr()
}
}
87 changes: 87 additions & 0 deletions plugin_log_task_usage/miniwdl_log_task_usage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""
miniwdl plugin instrumenting each task container to log its own CPU & memory resource usage
periodically. The logs are written to the task's standard error stream, so they'll appear on the
console only with --verbose logging (but are always recorded in each task's stderr.txt).

To enable, install this plugin (`pip3 install .` & confirm listed by `miniwdl --version`) and
set configuration [log_task_usage] period (or the environment variable
MINIWDL__LOG_TASK_USAGE__PERIOD) to the desired logging period in seconds.

YMMV because host OS version & configuration may affect availability of the cgroup counters read
from pseudo-files under /sys/fs/cgroup
"""


def main(cfg, logger, run_id, run_dir, task, **recv):
# do nothing with inputs
recv = yield recv

# inject logger into command script
if cfg.has_option("log_task_usage", "period"):
period = cfg["log_task_usage"].get_int("period")
recv["command"] = _logger_sh + f"_miniwdl_log_task_usage {period} &\n\n" + recv["command"]
recv = yield recv

# do nothing with outputs
yield recv


_logger_sh = r"""
_miniwdl_log_task_usage() {
set +ex
local PERIOD_SECS=${1:-10} # logging period (default 10s)

# detect whether host provides cgroup v2 or v1, and helper functions to read CPU & memory usage
# counters from the appropriate pseudo-files
local cgroup_version=""
if [ -f /sys/fs/cgroup/cpu.stat ]; then
cgroup_version=2
elif [ -f /sys/fs/cgroup/cpuacct/cpuacct.stat ]; then
cgroup_version=1
else
>&2 echo "miniwdl_log_task_usage unable to report: cgroup CPU usage counters not found"
exit 1
fi

cpu_secs() {
local ans
if [ $cgroup_version -eq 2 ]; then
ans=$(awk '/^usage_usec/ {print $2}' /sys/fs/cgroup/cpu.stat)
echo $(( ans / 1000000 ))
else
ans=$(cut -f2 -d ' ' /sys/fs/cgroup/cpuacct/cpuacct.stat | head -n 1)
echo $(( ans / 100 )) # 100 "jiffies" per second
fi
}

mem_bytes() {
if [ $cgroup_version -eq 2 ]; then
awk '$1 == "anon" { print $2 }' /sys/fs/cgroup/memory.stat
else
awk -F ' ' '$1 == "total_rss" { print $2 }' /sys/fs/cgroup/memory/memory.stat
fi
}

local T_0=$(date +%s)
local t_last=$T_0
local cpu_secs_0=$(cpu_secs)
local cpu_secs_last=$cpu_secs_0

while true; do
sleep "$PERIOD_SECS"
local t=$(date +%s)
local wall_secs=$(( t - T_0 ))

local cpu_secs_current=$(cpu_secs)
local cpu_total_secs=$(( cpu_secs_current - cpu_secs_0 ))
local cpu_period_secs=$(( cpu_secs_current - cpu_secs_last ))

local mem_bytes_current=$(mem_bytes)

>&2 echo "container usage :: cpu_pct: $(( 100 * cpu_period_secs / PERIOD_SECS )), mem_MiB: $(( mem_bytes_current/1048576 )), cpu_total_s: ${cpu_total_secs}, elapsed_s: ${wall_secs}"

cpu_secs_last=$cpu_secs_current
t_last=$t
done
}
"""
16 changes: 16 additions & 0 deletions plugin_log_task_usage/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from setuptools import setup

setup(
name="miniwdl_log_task_usage",
version="0.1.0",
description="miniwdl task plugin to log container cpu/mem usage",
author="Wid L. Hacker",
py_modules=["miniwdl_log_task_usage"],
python_requires=">=3.6",
setup_requires=["reentry"],
install_requires=["miniwdl"],
reentry_register=True,
entry_points={
"miniwdl.plugin.task": ["log_task_usage = miniwdl_log_task_usage:main"],
},
)
20 changes: 20 additions & 0 deletions test/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,3 +472,23 @@ def test_shipping_local_wdl_error(aws_batch, tmp_path, test_s3_folder):
],
)
assert rslt["exit_code"] == 123


def test_log_task_usage(aws_batch, test_s3_folder):
env = dict(os.environ)
env["MINIWDL__LOG_TASK_USAGE__PERIOD"] = "2"
rslt = batch_miniwdl(
aws_batch,
[
os.path.join(os.path.dirname(__file__), "../plugin_log_task_usage/StressTest.wdl"),
"--dir",
"/mnt/efs/miniwdl_aws_tests",
"--verbose",
"--delete-after",
"always",
],
upload=test_s3_folder + "test_log_task_usage/",
env=env,
)
assert rslt["success"]
assert "container usage ::" in get_s3uri(rslt["outputs"]["StressTest.stderr_txt"]).decode()
Loading