Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retrieve I/O offset of a file #24

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
288 changes: 223 additions & 65 deletions prismio/readers/recorder_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,11 +408,11 @@ def read(self):

all_records = []
for rank in range(self.reader.GM.total_ranks):
per_rank_records = []
for record_index in range(self.reader.LMs[rank].total_records):
per_rank_records.append(self.reader.records[rank][record_index])
per_rank_records = sorted(per_rank_records, key=lambda x: x.tstart)
all_records.append(per_rank_records)
record = self.reader.records[rank][record_index]
record.rank = rank
all_records.append(record)
all_records = sorted(all_records, key=lambda x: x.tstart)

records_as_dict = {
"rank": [],
Expand All @@ -427,75 +427,230 @@ def read(self):
"file_name": [],
"io_volume": [],
"function_type": [],
"offset": [],
"error": [],
}

fd_to_filenames = [
{0: "stdin", 1: "stdout", 2: "stderr"}
] * self.reader.GM.total_ranks
for _ in range(self.reader.GM.total_ranks)
]
fd_offsets = [{0: 0, 1: 0, 2: 0} for _ in range(self.reader.GM.total_ranks)]
end_of_files = {"stdin": 0, "stdout": 0, "stderr": 0}

for rank in range(self.reader.GM.total_ranks):
for record in all_records[rank]:
fd_to_filename = fd_to_filenames[rank]
for record in all_records:
rank = record.rank
fd_to_filename = fd_to_filenames[rank]
fd_offset = fd_offsets[rank]
func_name = self.reader.funcs[record.func_id]

filename = None
io_size = None
offset = None
error = None

try:
function_args = record.args_to_strs()
func_name = self.reader.funcs[record.func_id]
io_size = None
if "fdopen" in func_name:
fd = record.res
old_fd = int(function_args[0])
if old_fd not in fd_to_filename:
filename = "__unknown__"
else:
filename = fd_to_filename[old_fd]
fd_to_filename[fd] = filename
elif "fopen" in func_name or "open" in func_name:
fd = record.res
filename = function_args[0]
except UnicodeDecodeError:
function_args = None
error = "Error: cannot decode function arguments"
except AttributeError:
function_args = None
error = "Error: cannot decode function arguments"

if error is not None:
pass
elif "MPI" in func_name or "H5" in func_name:
pass
elif "fdopen" in func_name:
fd = record.res
old_fd = int(function_args[0])
if old_fd not in fd_to_filename:
error = "Error: fdopen a non-existing file descriptor \
(no previous open returns this file descriptor or already closed)"
print(error)
filename = "__unknown__"
else:
filename = fd_to_filename[old_fd]
fd_to_filename[fd] = filename
elif "fwrite" in func_name or "fread" in func_name:
io_size = int(function_args[1]) * int(function_args[2])
fd = int(function_args[3])
if fd not in fd_to_filename:
filename = "__unknown__"
else:
filename = fd_to_filename[fd]
elif (
"seek" in func_name
or "close" in func_name
or "sync" in func_name
or "fprintf" in func_name
):
try:
fd = int(function_args[0])
except ValueError:
fd = -1
if fd not in fd_to_filename:
filename = "__unknown__"
else:
filename = fd_to_filename[fd]
elif func_name and (
"writev" in func_name
or "readv" in func_name
or "pwrite" in func_name
or "pread" in func_name
or "write" in func_name
or "read" in func_name
):
try:
io_size = int(function_args[2])
except ValueError:
io_size = None
except IndexError:
io_size = None
try:
fd = int(function_args[0])
except ValueError:
fd = -1
if fd not in fd_to_filename:
filename = "__unknown__"
else:
filename = fd_to_filename[fd]
fd_offset[fd] = 0

elif "fopen" in func_name:
fd = record.res
filename = function_args[0]
fd_to_filename[fd] = filename
if filename not in end_of_files:
end_of_files[filename] = 0

fd_offset[fd] = 0
openMode = function_args[1]
if "a" in openMode:
fd_offset[fd] = end_of_files[filename]

elif "open" in func_name:
fd = record.res
filename = function_args[0]
fd_to_filename[fd] = filename
if filename not in end_of_files:
end_of_files[filename] = 0

fd_offset[fd] = 0

openMode = int(function_args[1])
if openMode == 2:
fd_offset[fd] = end_of_files[filename]

elif "seek" in func_name:
fd, offset, whence = (
int(function_args[0]),
int(function_args[1]),
int(function_args[2]),
)

if fd not in fd_to_filename:
error = "Error: seek a non-existing file descriptor \
(no open returns this file descriptor or already closed)"
print(error)
filename = "__unknown__"
offset = -1
else:
filename = fd_to_filename[fd]
if whence == 0: # SEEK_SET
fd_offset[fd] = offset
elif whence == 1: # SEEK_CUR
if fd_offset[fd] + offset > end_of_files[filename]:
error = "Warning: seek beyond end of file"
print(error)
fd_offset[fd] += offset
elif fd_offset[fd] + offset < 0:
error = "Error: seek beyond start of file"
print(error)
else:
fd_offset[fd] += offset
elif whence == 2: # SEEK_END'
if offset > 0:
error = "Warning: seek beyond end of file"
print(error)
fd_offset[fd] = end_of_files[filename] + offset
elif end_of_files[filename] + offset < 0:
error = "Error: seek beyond start of file"
print(error)
else:
fd_offset[fd] = end_of_files[filename] + offset

elif "close" in func_name:
fd = int(function_args[0])
if fd not in fd_to_filename:
filename = "__unknown__"
error = "Error: close a non-existing file descriptor \
(no open returns this file descriptor or already closed)"
print(error)
else:
filename = None
filename = fd_to_filename[fd]
del fd_to_filename[fd]
del fd_offset[fd]

elif "sync" in func_name:
fd = int(function_args[0])
if fd not in fd_to_filename:
filename = "__unknown__"
error = "Error: sync a non-existing file descriptor \
(no open returns this file descriptor or already closed)"
print(error)
else:
filename = fd_to_filename[fd]

elif "fwrite" in func_name or "fread" in func_name:
io_size = int(function_args[1]) * int(function_args[2])
fd = int(function_args[3])

if fd not in fd_to_filename:
error = "Error: write or read a non-existing file descriptor \
(no open returns this file descriptor or already closed)"
print(error)
filename = "__unknown__"
offset = -1
else:
filename = fd_to_filename[fd]
offset = fd_offset[fd]
fd_offset[fd] += io_size
end_of_files[filename] = max(end_of_files[filename], fd_offset[fd])

elif "writev" in func_name or "readv" in func_name:
fd, io_size = int(function_args[0]), int(function_args[1])

if fd not in fd_to_filename:
error = "Error: write or read a non-existing file descriptor \
(no open returns this file descriptor or already closed)"
print(error)
filename = "__unknown__"
offset = -1
else:
filename = fd_to_filename[fd]
offset = fd_offset[fd]
fd_offset[fd] += io_size
end_of_files[filename] = max(end_of_files[filename], fd_offset[fd])

elif (
"pwrite" in func_name or "pread" in func_name
): # does not change offset
fd, io_size, offset = (
int(function_args[0]),
int(function_args[2]),
int(function_args[3]),
)

if fd not in fd_to_filename:
error = "Error: write or read a non-existing file descriptor \
(no open returns this file descriptor or already closed)"
print(error)
filename = "__unknown__"
else:
filename = fd_to_filename[fd]

if offset > end_of_files[filename]:
error = "Warning: pwrite or pread beyond end of file"
print(error)
elif offset < 0:
error = "Error: pwrite or pread beyond start of file"
print(error)
end_of_files[filename] = max(
end_of_files[filename], offset + io_size
)

elif (
("write" in func_name or "read" in func_name)
and func_name not in MPI_IO_functions
and func_name not in HDF5_IO_functions
):
fd, io_size = int(function_args[0]), int(function_args[2])

if fd not in fd_to_filename:
error = "Error: write or read a non-existing file descriptor \
(no open returns this file descriptor or already closed)"
print(error)
filename = "__unknown__"
offset = -1
else:
filename = fd_to_filename[fd]
offset = fd_offset[fd]
fd_offset[fd] += io_size
end_of_files[filename] = max(end_of_files[filename], fd_offset[fd])

elif "fprintf" in func_name:
fd, io_size = int(function_args[0]), int(function_args[1])

if fd not in fd_to_filename:
error = "Error: fprintf a non-existing file descriptor \
(no open returns this file descriptor or already closed)"
print(error)
filename = "__unknown__"
offset = -1
else:
filename = fd_to_filename[fd]
offset = fd_offset[fd]
fd_offset[fd] += io_size
end_of_files[filename] = max(end_of_files[filename], fd_offset[fd])

records_as_dict["rank"].append(rank)
records_as_dict["function_id"].append(record.func_id)
Expand All @@ -508,6 +663,9 @@ def read(self):
records_as_dict["return_value"].append(record.res)
records_as_dict["file_name"].append(filename)
records_as_dict["io_volume"].append(io_size)
records_as_dict["offset"].append(offset)
records_as_dict["error"].append(error)

if "write" in func_name:
records_as_dict["function_type"].append("write")
elif "read" in func_name:
Expand Down