Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

_match_messages #131

Open
wants to merge 7 commits into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions pipit/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,112 @@ def _match_events(self):

self.events = self.events.astype({"_matching_event": "Int32"})

def _match_messages(self):
movsesyanae marked this conversation as resolved.
Show resolved Hide resolved
"""
Matches corresponding MpiSend/MpiRecv and MpiIsend/MpiIrecv instant events
in a new column _matching_message_event
"""
if "_matching_message_event" not in self.events.columns:
self.events["_matching_message_event"] = None
else:
return

matching_events = list(self.events["_matching_message_event"])

# Filter by send/receive events
send_event_wrapper_names = ["MPI_Send"]
send_events_names = ["MpiSend", "MpiISend"]
send_events = self.events[self.events["Name"].isin(send_events_names)]

receive_event_wrapper_names = ["MPI_Recv"]
receive_events_names = ["MpiRecv", "MpiIrecv"]
receive_events = self.events[self.events["Name"].isin(receive_events_names)]

# Queue is a dictionary in which each receiving process (key) will have
# another dictionary of sending processes (key) that will have a list of
# tuple that each contain information about an associated send event
queue: dict[int, dict[int, (int, int)]] = {}
df_indices = list(send_events.index)
timestamps = list(send_events["Timestamp (ns)"])
attrs = list(send_events["Attributes"])
processes = list(send_events["Process"])

# First iterate over send events, adding each event's information
# to the receiver's list in the dictionary
for i in range(len(send_events)):
curr_df_index = df_indices[i]
curr_timestamp = timestamps[i]
curr_attrs = attrs[i]
curr_process = processes[i]

# Add current dataframe index, timestamp, and process to stack
if "receiver" in curr_attrs:
receiving_process = curr_attrs["receiver"]

# Add receiving process to queue if not already present
if receiving_process not in queue:
queue[receiving_process] = {}
# Add sending process to receiving process's queue
# if not already present
if curr_process not in queue[receiving_process]:
queue[receiving_process][curr_process] = []

# Add current dataframe index and timestamp to the correct queue
queue[receiving_process][curr_process].append(
(curr_df_index, curr_timestamp)
)

df_indices = list(receive_events.index)
attrs = list(receive_events["Attributes"])
processes = list(receive_events["Process"])

# Now iterate over receive events
for i in range(len(receive_events)):
curr_df_index = df_indices[i]
curr_attrs = attrs[i]
curr_process = processes[i]

if "sender" in curr_attrs:
# send_process = None
send_process = curr_attrs["sender"]

if len(queue[curr_process][send_process]) > 0:
# Get the corresponding "send" event
send_df_index, send_timestamp = queue[curr_process][
send_process
].pop(0)

# Fill in the lists with the matching values
matching_events[send_df_index] = curr_df_index
matching_events[curr_df_index] = send_df_index
# update matched message event for instant events
self.events["_matching_message_event"] = matching_events

# match send enter/leave events with their corresponding instant events
self.events.loc[
(self.events["Name"].isin(send_event_wrapper_names))
& (self.events["Event Type"] == "Enter"),
"_matching_message_event",
] = send_events.index
self.events.loc[
(self.events["Name"].isin(send_event_wrapper_names))
& (self.events["Event Type"] == "Leave"),
"_matching_message_event",
] = send_events.index
# match receive enter/leave events with their corresponding instant events
self.events.loc[
(self.events["Name"].isin(receive_event_wrapper_names))
& (self.events["Event Type"] == "Enter"),
"_matching_message_event",
] = receive_events.index
self.events.loc[
(self.events["Name"].isin(receive_event_wrapper_names))
& (self.events["Event Type"] == "Leave"),
"_matching_message_event",
] = receive_events.index

self.events = self.events.astype({"_matching_message_event": "Int32"})

def _match_caller_callee(self):
"""Matches callers (parents) to callees (children) and adds three
columns to the dataframe:
Expand Down
Loading