Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trace: construct a CCT #29

Merged
merged 43 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
4c1e111
feat: initial commit for working logic of trace to cct
adityaranjan Sep 11, 2022
7d9a707
chore: moved over cct code to the reader
adityaranjan Sep 12, 2022
4df681c
style: formatting changes
adityaranjan Sep 12, 2022
9ef0464
style: minor formatting change
adityaranjan Sep 12, 2022
62d3caf
Feature: Communication Matrix (#12)
adityaranjan Sep 26, 2022
200f5ce
feature: add cct function to trace class
adityaranjan Sep 28, 2022
da2e817
Merge branch 'develop' into feat/otf2-cct
adityaranjan Sep 29, 2022
cc50cc0
Merge branch 'develop' into feat/otf2-cct
adityaranjan Dec 2, 2022
7f1b7f6
chore: made cct function consistent with recent changes
adityaranjan Dec 2, 2022
93252aa
Merge branch 'develop' into feat/otf2-cct
adityaranjan Feb 17, 2023
bb29923
chore: modify cct function to reflect recent changes
adityaranjan Feb 17, 2023
fdd5997
Merge branch 'feat/otf2-cct' of https://github.com/hpcgroup/pipit int…
adityaranjan Feb 17, 2023
96d7d67
Merge branch 'develop' into feat/otf2-cct
adityaranjan Feb 28, 2023
28d9638
chore: remove df indices as calling context ids
adityaranjan Feb 28, 2023
c8d2ce4
Merge branch 'develop' into feat/otf2-cct
adityaranjan Mar 6, 2023
e6f5ffd
chore: only store node in enter rows
adityaranjan Mar 6, 2023
9dd3366
style: minor
adityaranjan Mar 6, 2023
50fff4b
chore: remove depth column from df and restructure node class
adityaranjan Mar 10, 2023
742cc82
style: minor
adityaranjan Mar 10, 2023
2765f0c
Merge branch 'develop' into feat/otf2-cct
adityaranjan Mar 10, 2023
0240751
chore: return cct for all readers
adityaranjan Mar 10, 2023
89c9c5d
chore: revert any changes in other prs already
adityaranjan Mar 11, 2023
7074ac7
style: minor
adityaranjan Mar 11, 2023
5a4cae3
feat: make cct compatible with projections
adityaranjan Mar 11, 2023
17c3bcb
Merge branch 'develop' into feat/otf2-cct
adityaranjan Mar 11, 2023
8b30e0f
Merge branch 'develop' into feat/otf2-cct
adityaranjan Mar 29, 2023
1a33fec
Merge branch 'develop' into feat/otf2-cct
adityaranjan Apr 23, 2023
540e0de
revert graph changes
adityaranjan Apr 23, 2023
6fb5d52
minor
adityaranjan Apr 23, 2023
3fd498c
minor
adityaranjan Apr 23, 2023
83693c3
minor
adityaranjan Apr 23, 2023
04890c2
move cct function into util
adityaranjan Apr 23, 2023
8677038
minor
adityaranjan Apr 23, 2023
2440b4e
style
adityaranjan Apr 23, 2023
2393c52
minor
adityaranjan Apr 23, 2023
1025dda
Merge branch 'develop' into feat/otf2-cct
adityaranjan May 5, 2023
82f06c8
merge
Oct 22, 2023
d4db686
chore: Merge
Oct 22, 2023
32c16dc
temp
Nov 3, 2023
4acf0f8
fixed cct function to handle unmatched events
Nov 10, 2023
4f544c9
minor
Nov 10, 2023
f1d80c0
update year
bhatele Nov 14, 2023
0444c7a
Merge branch 'develop' into feat/otf2-cct
bhatele Nov 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions pipit/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@ class Node:
referenced by any calling_context_id directly under it
"""

def __init__(self, name_id, name, parent) -> None:
def __init__(self, name_id, name, parent, level=None) -> None:
self.calling_context_ids = []
self.name_id = name_id
self.name = name
self.children = []
self.parent = parent
self.level = self.__calculate_level()

if level is None:
self.level = self.__calculate_level()
else:
self.level = level

def add_child(self, child_node):
self.children.append(child_node)
Expand Down
2 changes: 1 addition & 1 deletion pipit/readers/hpctoolkit_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,4 +307,4 @@ def read(self):
by="Time", axis=0, ascending=True, inplace=True, ignore_index=True
)
self.trace_df = trace_df
return pipit.trace.Trace(None, trace_df)
return pipit.trace.Trace(None, trace_df, graph)
7 changes: 6 additions & 1 deletion pipit/readers/otf2_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#
# SPDX-License-Identifier: MIT


import math
import otf2
import pandas as pd
Expand Down Expand Up @@ -375,4 +376,8 @@ def read(self):
# close the trace and open it later per process
trace.close()
self.events = self.read_events() # events
return pipit.trace.Trace(self.definitions, self.events)

trace = pipit.trace.Trace(self.definitions, self.events, None)
trace.create_cct() # create cct

return trace
226 changes: 223 additions & 3 deletions pipit/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,23 @@
#
# SPDX-License-Identifier: MIT

import numpy as np


from pipit.graph import Graph, Node


class Trace:
"""A trace dataset is read into an object of this type, which includes one
or more dataframes.
"""
A trace dataset is read into an object of this type, which
includes one or more dataframes and a calling context tree.
"""

def __init__(self, definitions, events):
def __init__(self, definitions, events, cct):
"""Create a new Trace object."""
self.definitions = definitions
self.events = events
self.cct = cct

@staticmethod
def from_otf2(dirname):
Expand All @@ -29,3 +36,216 @@ def from_hpctoolkit(dirname):
from .readers.hpctoolkit_reader import HPCToolkitReader

return HPCToolkitReader(dirname).read()

def create_cct(self):
"""
Generic function to iterate through the trace events and create a CCT.
Uses pipit's graph data structure for this. Populates the trace's CCT
field and creates a new column in the trace's Events DataFrame that stores
a reference to each row's corresponding node in the CCT.

Thoughts/Concerns:
Currently, the DataFrame index of the entry row is being stored
in the node's calling context ids. This doesn't really have much of a
purpose right now. What should we be storing as calling context ids
for OTF2? Perhaps there should also be a way to map entry rows to
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is resolved by match_rows() function from #44 right?

corresponding exit rows.
"""

# only create the cct if it doesn't exist already
if self.cct is None:
graph = Graph()
callpath_to_node = dict() # used to determine the existence of a node
graph_nodes = [
None for i in range(len(self.events))
] # list of nodes in the DataFrame
node_id = 0 # each node has a unique id

"""
Iterate through each Location ID which is analagous to a thread
and iterate over its events using a stack to add to the cct
"""
for location_id in set(self.events["Location ID"]):
"""
Filter the DataFrame by Location ID and
events that are only of type Enter/Leave.
"""
location_df = self.events.loc[
(self.events["Name"] != "N/A")
& (self.events["Location ID"] == location_id)
]

curr_depth = 0
callpath = ""

"""
Instead of iterating over the DataFrame columns,
we save them as lists and iterate over those as
that is more efficient.
"""
df_indices = list(location_df.index)
function_names = list(location_df["Name"])
event_types = list(location_df["Event Type"])

# stacks used to iterate through the trace and add nodes to the cct
functions_stack, nodes_stack = [], []

# iterating over the events of the current location id's trace
for i in range(len(location_df)):
curr_df_index, evt_type, function_name = (
df_indices[i],
event_types[i],
function_names[i],
)

# encounter a new function through its entry point.
if evt_type == "Enter":
# add the function to the stack and get the call path
functions_stack.append(function_name)
callpath = "->".join(functions_stack)

# get the parent node of the function if it exists
if curr_depth == 0:
parent_node = None
else:
parent_node = nodes_stack[-1]

if callpath in callpath_to_node:
"""
if a node with the call path
exists, don't create a new one
"""
curr_node = callpath_to_node[callpath]
else:
"""
create a new node with the call path
if it doesn't exist yet
"""
curr_node = Node(
node_id, function_name, parent_node, curr_depth
)
callpath_to_node[callpath] = curr_node
node_id += 1

if curr_depth == 0:
"""
add the newly created node as a
root to the cct if the depth is 0
"""
graph.add_root(curr_node)
else:
"""
add the newly created node as a child
of its parent if it is not a root
"""
parent_node.add_child(curr_node)

"""
add the Enter DataFrame index as a calling context id
to the node (multiple function invocations with the
same call path)
"""
curr_node.add_calling_context_id(curr_df_index)

"""
maps the Enter DataFrame index to the node

note:
this seems redundant because the node will already
exist in the row's Graph_Node column
"""
graph.add_to_map(curr_df_index, curr_node)

# Update nodes stack, column, and current depth
nodes_stack.append(curr_node)
graph_nodes[curr_df_index] = curr_node
curr_depth += 1
else:
"""
Once you encounter the Leave event for a function,
get the corresponding node from the top of the nodes stack.
"""
curr_node = nodes_stack.pop()
graph_nodes[curr_df_index] = curr_node

# Update functions stack and current depth
functions_stack.pop()
curr_depth -= 1

# Update the Trace with the generated cct
self.events["Graph_Node"] = graph_nodes
self.cct = graph

def comm_matrix(self, comm_type="bytes"):
"""
Communication Matrix for Peer-to-Peer (P2P) MPI messages

Arguments:

1) comm_type -
string to choose whether the communication volume should be measured
by bytes transferred between two processes or the number of messages
sent (two choices - "bytes" or "counts")

Returns:
A 2D Numpy Array that represents the communication matrix for all P2P
messages of the given trace

Note:
The first dimension of the returned 2d array
is senders and the second dimension is receivers
ex) comm_matrix[sender_rank][receiver_rank]
"""

# get the list of ranks/process ids
# (mpi messages are sent between processes)
ranks = set(
self.events.loc[self.events["Location Group Type"] == "PROCESS"][
"Location Group ID"
]
)

# create a 2d numpy array that will be returned
# at the end of the function
communication_matrix = np.zeros(shape=(len(ranks), len(ranks)))

# filter the dataframe by MPI Send and Isend events
sender_dataframe = self.events.loc[
self.events["Event Type"].isin(["MpiSend", "MpiIsend"]),
["Location Group ID", "Attributes"],
]

# get the mpi ranks of all the sender processes
sender_ranks = sender_dataframe["Location Group ID"].to_list()

# get the corresponding mpi ranks of the receivers
receiver_ranks = (
sender_dataframe["Attributes"]
.apply(lambda attrDict: attrDict["receiver"])
.to_list()
)

# number of bytes communicated
if comm_type == "bytes":
# (1 communication is a single row in the sender dataframe)
message_volumes = (
sender_dataframe["Attributes"]
.apply(lambda attrDict: attrDict["msg_length"])
.to_list()
)
elif comm_type == "counts":
# 1 message between the pairs of processes
# for each row in the sender dataframe
message_volumes = np.full(len(sender_dataframe), 1)

for i in range(len(sender_ranks)):
"""
loops through all the communication events and adds the
message volumes to the corresponding entry of the 2d array
using the sender and receiver ranks
"""
communication_matrix[sender_ranks[i], receiver_ranks[i]] += message_volumes[
i
]

return communication_matrix