Skip to content

Commit

Permalink
cmd: update create-elastic-logs script
Browse files Browse the repository at this point in the history
commit message
  • Loading branch information
cmoussa1 committed Jan 21, 2025
1 parent 2e95552 commit 97339d4
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 55 deletions.
154 changes: 99 additions & 55 deletions src/cmd/flux-account-create-elastic-logs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python3
#! /usr/bin/python3

import sys
import json
Expand All @@ -14,29 +14,31 @@

queue_timelimits = {}

OUTCOME_CONVERSION = {1: "COMPLETED", 2: "FAILED", 4: "CANCELLED", 8: "TIMEOUT"}

def get_username(uid):

def get_username(uid) -> str:
try:
return pwd.getpwuid(uid).pw_name
except KeyError:
return str(uid)


def get_gid(uid):
def get_gid(uid) -> str:
try:
return pwd.getpwuid(uid).pw_gid
except KeyError:
return ""


def get_groupname(gid):
def get_groupname(gid) -> str:
try:
return grp.getgrgid(gid).gr_name
except KeyError:
return ""


def get_jobs(rpc_handle):
def get_jobs(rpc_handle) -> list:
try:
jobs = rpc_handle.get_jobs()
return jobs
Expand All @@ -45,14 +47,18 @@ def get_jobs(rpc_handle):
sys.exit(1)


def fetch_new_jobs(last_timestamp=time.time() - 3600):
def fetch_new_jobs(last_timestamp) -> list:
"""
Fetch new jobs using Flux's job-list and job-info interfaces. Return a
list of dictionaries that contain attribute information for inactive jobs.
last_timstamp: a timestamp field to filter to only look for jobs that have
finished since this time.
"""
if last_timestamp is None:
# a timestamp wasn't specified; default to gathering all jobs
# that finished in the last hour
last_timestamp = time.time() - 3600
handle = flux.Flux()

# get queue information
Expand All @@ -64,9 +70,9 @@ def fetch_new_jobs(last_timestamp=time.time() - 3600):

queue_info = qlist.get("queues")
if queue_info is not None:
for q in queue_info:
for queue in queue_info:
# place queue name and time limit in map
queue_timelimits[q] = queue_info[q]["policy"]["limits"]["duration"]
queue_timelimits[queue] = queue_info[queue]["policy"]["limits"]["duration"]

# construct and send RPC
rpc_handle = flux.job.job_list_inactive(handle, since=last_timestamp, max_entries=0)
Expand Down Expand Up @@ -101,7 +107,7 @@ def fetch_new_jobs(last_timestamp=time.time() - 3600):
return jobs


def create_job_dicts(jobs):
def create_job_dicts(jobs) -> list:
"""
Create a list of dictionaries where each dictionary represents info about
a single inactive job.
Expand All @@ -113,85 +119,110 @@ def create_job_dicts(jobs):
# the 'result' field represents a pre-defined set of values for a job,
# defined in libjob/job.h in flux-core
for job in jobs:
rec = {
key: job[key]
for key in [
"id",
"userid",
"name",
"priority",
"state",
"bank",
"queue",
"duration",
"nodelist",
"nnodes",
"ntasks",
"cwd",
"urgency",
"success",
"result",
"queue",
"project",
"eventlog",
"jobspec",
]
if job.get(key) is not None
}

if rec.get("queue") is not None:
# create dictionary for job
rec = {}

# create empty parent dictionaries
rec["event"] = {}
rec["job"] = {}
rec["job"]["node"] = {}
rec["job"]["task"] = {}
rec["job"]["proc"] = {}
rec["user"] = {}
rec["group"] = {}

rec["event"]["dataset"] = "flux.joblog"

# convert flux keys to defined common schema keys
rec["job"]["id"] = job.get("id")
rec["user"]["id"] = job.get("userid")
rec["job"]["name"] = job.get("name")
rec["job"]["priority"] = job.get("priority")
rec["job"]["state"] = job.get("state")
rec["job"]["bank"] = job.get("bank")
rec["job"]["queue"] = job.get("queue")
rec["job"]["project"] = job.get("project")
rec["job"]["jobspec"] = job.get("jobspec")
rec["job"]["eventlog"] = job.get("eventlog")
rec["event"]["duration"] = job.get("duration")
rec["job"]["node"]["list"] = job.get("nodelist")
rec["job"]["node"]["count"] = job.get("nnodes")
rec["job"]["task"]["count"] = job.get("ntasks")
rec["job"]["cwd"] = job.get("cwd")
rec["job"]["urgency"] = job.get("urgency")
rec["job"]["success"] = job.get("success")

if job.get("result") is not None:
# convert outcome code to a text value
rec["event"]["outcome"] = OUTCOME_CONVERSION[job.get("result")]

if rec.get("job", {}).get("queue") is not None:
# place max timelimit for queue in job record
rec["queue_max_timelimit"] = queue_timelimits[rec.get("queue")]
rec["job"]["queue_maxtimelimit"] = queue_timelimits[rec["job"]["queue"]]

if rec.get("userid") is not None:
if rec.get("user", {}).get("id") is not None:
# add username, gid, groupname
rec["username"] = get_username(rec["userid"])
rec["gid"] = get_gid(rec["userid"])
rec["groupname"] = get_groupname(rec["gid"])
rec["user"]["name"] = get_username(rec["user"]["id"])
rec["group"]["id"] = get_gid(rec["user"]["id"])
rec["group"]["name"] = get_groupname(rec["group"]["id"])

# convert timestamps to ISO8601
if job.get("t_submit") is not None:
rec["t_submit"] = datetime.datetime.fromtimestamp(
rec["job"]["submittime"] = datetime.datetime.fromtimestamp(
job["t_submit"], tz=datetime.timezone.utc
).isoformat()
if job.get("t_run") is not None:
rec["t_run"] = datetime.datetime.fromtimestamp(
rec["event"]["start"] = datetime.datetime.fromtimestamp(
job["t_run"], tz=datetime.timezone.utc
).isoformat()
if job.get("t_inactive") is not None:
rec["t_inactive"] = datetime.datetime.fromtimestamp(
rec["event"]["end"] = datetime.datetime.fromtimestamp(
job["t_inactive"], tz=datetime.timezone.utc
).isoformat()
if job.get("expiration") is not None:
# convert expiration to total seconds
rec["expiration"] = datetime.datetime.fromtimestamp(
rec["job"]["timelimit"] = datetime.datetime.fromtimestamp(
job.get("expiration"), tz=datetime.timezone.utc
).isoformat()

if job.get("t_depend") is not None and job.get("t_run") is not None:
# compute eligible time
rec["t_eligible"] = job.get("t_run") - job.get("t_depend")
# compute the timestamp of when the job first became eligible
t_eligible = job.get("t_run") - (job.get("t_run") - job.get("t_depend"))
rec["job"]["eligibletime"] = datetime.datetime.fromtimestamp(
t_eligible, tz=datetime.timezone.utc
).isoformat()
# compute the time spend in queue
rec["job"]["queue_time"] = round(job.get("t_run") - t_eligible, 1)

if job.get("t_inactive") is not None and job.get("t_run") is not None:
# compute actual execution time
rec["actual_duration"] = round(job.get("t_inactive") - job.get("t_run"), 1)
rec["event"]["duration_seconds"] = round(
job.get("t_inactive") - job.get("t_run"), 1
)

if job.get("nnodes") is not None and job.get("ntasks") is not None:
# compute number of processes * number of nodes
rec["proc.count"] = job.get("nnodes") * job.get("ntasks")
rec["job"]["proc"]["count"] = job.get("nnodes") * job.get("ntasks")

if job.get("exception_occurred") is not None and job.get("exception_occurred") == True:
print(f"exception occurred!")
if job.get("exception_type") is not None:
rec["job"]["exception_type"] = job.get("exception_type")
if job.get("exception_note") is not None:
rec["job"]["exception_note"] = job.get("exception_note")

# add scheduler used
rec["scheduler"] = "flux"
rec["job"]["scheduler"] = "flux"

job_dicts.append(rec)

return job_dicts


def write_to_file(job_records, output_file):
with open(output_file, "w") as file:
with open(output_file, "a") as fp:
for record in job_records:
file.write(json.dumps(record) + "\n")
fp.write(json.dumps(record) + "\n")


def main():
Expand All @@ -202,18 +233,31 @@ def main():
"""
)

parser.add_argument(
"--output-file",
type=str,
help="specify a file path to append logs to",
metavar="OUTPUT_FILE",
)
parser.add_argument(
"--since",
type=int,
help="fetch all jobs since a certain time (formatted in seconds since epoch)",
help=(
"fetch all jobs since a certain time (formatted in seconds since epoch); "
"by default, this script will fetch all jobs that have completed in the "
"last hour"
),
metavar="TIMESTAMP",
)
args = parser.parse_args()

jobs = fetch_new_jobs(args.since) if args.since is not None else fetch_new_jobs()
jobs = fetch_new_jobs(args.since)
job_records = create_job_dicts(jobs)

filename = f"{round(time.time())}_flux_jobs.ndjson"
if args.output_file is None:
filename = "flux_jobs.ndjson"
else:
filename = args.output_file
write_to_file(job_records, filename)


Expand Down
7 changes: 7 additions & 0 deletions t/t1044-elastic.t
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ test_expect_success 'submit some sleep 1 jobs under one user' '
jobid3=$(flux submit -n 2 -N 2 --setattr=system.bank=bankA sleep 1)
'

test_expect_success 'submit a job that gets canceled' '
jobid4=$(flux submit -N 1 --setattr=system.bank=bankA sleep 60) &&
sleep 5 &&
flux cancel ${jobid4} &&
flux job info ${jobid4} eventlog
'

test_expect_success 'wait for jobs to finish running' '
sleep 5
'
Expand Down

0 comments on commit 97339d4

Please sign in to comment.