From 4c1e1112c39a528040e7e3265a125a71237609b0 Mon Sep 17 00:00:00 2001 From: Aditya Ranjan Date: Sun, 11 Sep 2022 15:36:58 -0400 Subject: [PATCH 01/30] feat: initial commit for working logic of trace to cct --- pipit/graph.py | 8 +- pipit/readers/experiment.ipynb | 141 +++++++++++++++++++++++++++++++++ 2 files changed, 147 insertions(+), 2 deletions(-) create mode 100644 pipit/readers/experiment.ipynb diff --git a/pipit/graph.py b/pipit/graph.py index 46a6ea3e..bf41d289 100644 --- a/pipit/graph.py +++ b/pipit/graph.py @@ -9,13 +9,17 @@ class Node: referenced by any calling_context_id directly under it """ - def __init__(self, name_id, name, parent) -> None: + def __init__(self, name_id, name, parent, level = None) -> None: self.calling_context_ids = [] self.name_id = name_id self.name = name self.children = [] self.parent = parent - self.level = self.__calculate_level() + + if level is None: + self.level = self.__calculate_level() + else: + self.level = level def add_child(self, child_node): self.children.append(child_node) diff --git a/pipit/readers/experiment.ipynb b/pipit/readers/experiment.ipynb new file mode 100644 index 00000000..0f48ad29 --- /dev/null +++ b/pipit/readers/experiment.ipynb @@ -0,0 +1,141 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "import sys\n", + "sys.path.append(\"/Users/adi/School/Research/pipit\")\n", + "\n", + "import pipit.trace\n", + "from pipit.graph import Graph, Node" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "trace = pipit.trace.Trace.from_otf2(\"/Users/adi/School/Research/otf2_tracing/traces/Large_Score-P\")" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "# can either add leave row index as calling context id or having a matching df indices column\n", + "\n", + "graph = Graph()\n", + "callpath_to_node = dict()\n", + "graph_nodes = [None for i in range(len(trace.events))]\n", + "node_id = 0\n", + "\n", + "for location_id in set(trace.events[\"Location ID\"]):\n", + " location_df = trace.events.loc[\n", + " (trace.events[\"Name\"] != \"N/A\")\n", + " & (trace.events[\"Location ID\"] == location_id)\n", + " ]\n", + "\n", + " curr_depth = 0\n", + " callpath = \"\"\n", + " df_indices = list(location_df.index)\n", + " function_names = list(location_df[\"Name\"])\n", + " event_types = list(location_df[\"Event Type\"])\n", + " functions_stack, nodes_stack = [], []\n", + "\n", + " for i in range(len(location_df)):\n", + " curr_df_index, evt_type, function_name = df_indices[i], event_types[i], function_names[i]\n", + "\n", + " if evt_type == \"Enter\":\n", + " functions_stack.append(function_name)\n", + " callpath = \"->\".join(functions_stack)\n", + "\n", + " if curr_depth == 0:\n", + " parent_node = None\n", + " else:\n", + " parent_node = nodes_stack[-1]\n", + "\n", + " if callpath in callpath_to_node:\n", + " curr_node = callpath_to_node[callpath]\n", + " else:\n", + " curr_node = Node(node_id, function_name, parent_node, curr_depth)\n", + " callpath_to_node[callpath] = curr_node\n", + " node_id += 1\n", + "\n", + " if curr_depth == 0:\n", + " graph.add_root(curr_node)\n", + " else:\n", + " parent_node.add_child(curr_node)\n", + "\n", + " curr_node.add_calling_context_id(curr_df_index)\n", + " graph.add_to_map(curr_df_index, curr_node)\n", + " \n", + " nodes_stack.append(curr_node)\n", + " graph_nodes[curr_df_index] = curr_node\n", + " curr_depth += 1\n", + " else:\n", + " curr_node = nodes_stack.pop()\n", + " graph_nodes[curr_df_index] = curr_node\n", + "\n", + " functions_stack.pop()\n", + " curr_depth -= 1" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "def print_helper(list_of_nodes, depth):\n", + " for node in list_of_nodes:\n", + " print((\"---\" * depth) + node.name)\n", + " print_helper(node.children, depth + 1)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "\"\"\"\n", + "think about optimizing space by converting some of the attributes to None if dict is empty,\n", + "or string if dict only has one key/value, or strings, if only dicts of len 1 exist, etc\n", + "\"\"\"" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3.9.13 64-bit", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.13" + }, + "orig_nbformat": 4, + "vscode": { + "interpreter": { + "hash": "949777d72b0d2535278d3dc13498b2535136f6dfe0678499012e853ee9abcab1" + } + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} From 7d9a707efb4f06a0211728b0c68abf33c077ac8c Mon Sep 17 00:00:00 2001 From: Aditya Ranjan Date: Mon, 12 Sep 2022 13:14:06 -0400 Subject: [PATCH 02/30] chore: moved over cct code to the reader --- pipit/readers/experiment.ipynb | 141 --------------------------------- pipit/readers/otf2_reader.py | 75 ++++++++++++++++++ 2 files changed, 75 insertions(+), 141 deletions(-) delete mode 100644 pipit/readers/experiment.ipynb diff --git a/pipit/readers/experiment.ipynb b/pipit/readers/experiment.ipynb deleted file mode 100644 index 0f48ad29..00000000 --- a/pipit/readers/experiment.ipynb +++ /dev/null @@ -1,141 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 1, - "metadata": {}, - "outputs": [], - "source": [ - "import sys\n", - "sys.path.append(\"/Users/adi/School/Research/pipit\")\n", - "\n", - "import pipit.trace\n", - "from pipit.graph import Graph, Node" - ] - }, - { - "cell_type": "code", - "execution_count": 2, - "metadata": {}, - "outputs": [], - "source": [ - "trace = pipit.trace.Trace.from_otf2(\"/Users/adi/School/Research/otf2_tracing/traces/Large_Score-P\")" - ] - }, - { - "cell_type": "code", - "execution_count": 3, - "metadata": {}, - "outputs": [], - "source": [ - "# can either add leave row index as calling context id or having a matching df indices column\n", - "\n", - "graph = Graph()\n", - "callpath_to_node = dict()\n", - "graph_nodes = [None for i in range(len(trace.events))]\n", - "node_id = 0\n", - "\n", - "for location_id in set(trace.events[\"Location ID\"]):\n", - " location_df = trace.events.loc[\n", - " (trace.events[\"Name\"] != \"N/A\")\n", - " & (trace.events[\"Location ID\"] == location_id)\n", - " ]\n", - "\n", - " curr_depth = 0\n", - " callpath = \"\"\n", - " df_indices = list(location_df.index)\n", - " function_names = list(location_df[\"Name\"])\n", - " event_types = list(location_df[\"Event Type\"])\n", - " functions_stack, nodes_stack = [], []\n", - "\n", - " for i in range(len(location_df)):\n", - " curr_df_index, evt_type, function_name = df_indices[i], event_types[i], function_names[i]\n", - "\n", - " if evt_type == \"Enter\":\n", - " functions_stack.append(function_name)\n", - " callpath = \"->\".join(functions_stack)\n", - "\n", - " if curr_depth == 0:\n", - " parent_node = None\n", - " else:\n", - " parent_node = nodes_stack[-1]\n", - "\n", - " if callpath in callpath_to_node:\n", - " curr_node = callpath_to_node[callpath]\n", - " else:\n", - " curr_node = Node(node_id, function_name, parent_node, curr_depth)\n", - " callpath_to_node[callpath] = curr_node\n", - " node_id += 1\n", - "\n", - " if curr_depth == 0:\n", - " graph.add_root(curr_node)\n", - " else:\n", - " parent_node.add_child(curr_node)\n", - "\n", - " curr_node.add_calling_context_id(curr_df_index)\n", - " graph.add_to_map(curr_df_index, curr_node)\n", - " \n", - " nodes_stack.append(curr_node)\n", - " graph_nodes[curr_df_index] = curr_node\n", - " curr_depth += 1\n", - " else:\n", - " curr_node = nodes_stack.pop()\n", - " graph_nodes[curr_df_index] = curr_node\n", - "\n", - " functions_stack.pop()\n", - " curr_depth -= 1" - ] - }, - { - "cell_type": "code", - "execution_count": 4, - "metadata": {}, - "outputs": [], - "source": [ - "def print_helper(list_of_nodes, depth):\n", - " for node in list_of_nodes:\n", - " print((\"---\" * depth) + node.name)\n", - " print_helper(node.children, depth + 1)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "\"\"\"\n", - "think about optimizing space by converting some of the attributes to None if dict is empty,\n", - "or string if dict only has one key/value, or strings, if only dicts of len 1 exist, etc\n", - "\"\"\"" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3.9.13 64-bit", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.9.13" - }, - "orig_nbformat": 4, - "vscode": { - "interpreter": { - "hash": "949777d72b0d2535278d3dc13498b2535136f6dfe0678499012e853ee9abcab1" - } - } - }, - "nbformat": 4, - "nbformat_minor": 2 -} diff --git a/pipit/readers/otf2_reader.py b/pipit/readers/otf2_reader.py index cd4118a6..16ff0d54 100644 --- a/pipit/readers/otf2_reader.py +++ b/pipit/readers/otf2_reader.py @@ -8,6 +8,7 @@ import pandas as pd import multiprocessing as mp import pipit.trace +from pipit.graph import Graph, Node class OTF2Reader: @@ -363,6 +364,79 @@ def read_events(self): return events_dataframe + def create_cct(self): + """ + To Do: + Add Comments + + Question: + What are calling context ids for hpctoolkit? Both entry and exit rows have the same node, but do + calling context ids correspond to something in the DataFrame or something outside of the trace like + the original hpctoolkit data? + + Currently, the DataFrame index of the entry row is being stored in the node's calling context ids. + This doesn't really have much of a purpose right now. What should we be storing as calling context ids + for OTF2? Perhaps there should also be a way to map entry rows to corresponding exit rows (could either + have a separate matching indices column or store both entry and exit indices as calling context ids). + """ + + graph = Graph() + callpath_to_node = dict() + graph_nodes = [None for i in range(len(self.events))] + node_id = 0 + + for location_id in set(self.events["Location ID"]): + location_df = self.events.loc[ + (self.events["Name"] != "N/A") + & (self.events["Location ID"] == location_id) + ] + + curr_depth = 0 + callpath = "" + df_indices = list(location_df.index) + function_names = list(location_df["Name"]) + event_types = list(location_df["Event Type"]) + functions_stack, nodes_stack = [], [] + + for i in range(len(location_df)): + curr_df_index, evt_type, function_name = df_indices[i], event_types[i], function_names[i] + + if evt_type == "Enter": + functions_stack.append(function_name) + callpath = "->".join(functions_stack) + + if curr_depth == 0: + parent_node = None + else: + parent_node = nodes_stack[-1] + + if callpath in callpath_to_node: + curr_node = callpath_to_node[callpath] + else: + curr_node = Node(node_id, function_name, parent_node, curr_depth) + callpath_to_node[callpath] = curr_node + node_id += 1 + + if curr_depth == 0: + graph.add_root(curr_node) + else: + parent_node.add_child(curr_node) + + curr_node.add_calling_context_id(curr_df_index) + graph.add_to_map(curr_df_index, curr_node) + + nodes_stack.append(curr_node) + graph_nodes[curr_df_index] = curr_node + curr_depth += 1 + else: + curr_node = nodes_stack.pop() + graph_nodes[curr_df_index] = curr_node + + functions_stack.pop() + curr_depth -= 1 + + self.events["Graph_Node"] = graph_nodes + def read(self): """ Returns a TraceData object for the otf2 file @@ -375,4 +449,5 @@ def read(self): # close the trace and open it later per process trace.close() self.events = self.read_events() # events + self.create_cct() # cct return pipit.trace.Trace(self.definitions, self.events) From 4df681c77f5e941b7ccd2f0d74ff487f8e1cdd8f Mon Sep 17 00:00:00 2001 From: Aditya Ranjan Date: Mon, 12 Sep 2022 13:18:21 -0400 Subject: [PATCH 03/30] style: formatting changes --- pipit/readers/otf2_reader.py | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/pipit/readers/otf2_reader.py b/pipit/readers/otf2_reader.py index 16ff0d54..c7e15600 100644 --- a/pipit/readers/otf2_reader.py +++ b/pipit/readers/otf2_reader.py @@ -370,14 +370,17 @@ def create_cct(self): Add Comments Question: - What are calling context ids for hpctoolkit? Both entry and exit rows have the same node, but do - calling context ids correspond to something in the DataFrame or something outside of the trace like + What are calling context ids for hpctoolkit? Both entry and exit rows + have the same node, but do calling context ids correspond to something + in the DataFrame or something outside of the trace like the original hpctoolkit data? - Currently, the DataFrame index of the entry row is being stored in the node's calling context ids. - This doesn't really have much of a purpose right now. What should we be storing as calling context ids - for OTF2? Perhaps there should also be a way to map entry rows to corresponding exit rows (could either - have a separate matching indices column or store both entry and exit indices as calling context ids). + Currently, the DataFrame index of the entry row is being stored + in the node's calling context ids. This doesn't really have much of a + purpose right now. What should we be storing as calling context ids + for OTF2? Perhaps there should also be a way to map entry rows to + corresponding exit rows (could either have a separate matching indices + column or store both entry and exit indices as calling context ids). """ graph = Graph() @@ -396,10 +399,14 @@ def create_cct(self): df_indices = list(location_df.index) function_names = list(location_df["Name"]) event_types = list(location_df["Event Type"]) - functions_stack, nodes_stack = [], [] + functions_stack, nodes_stack = [], [] for i in range(len(location_df)): - curr_df_index, evt_type, function_name = df_indices[i], event_types[i], function_names[i] + curr_df_index, evt_type, function_name = ( + df_indices[i], + event_types[i], + function_names[i], + ) if evt_type == "Enter": functions_stack.append(function_name) @@ -413,7 +420,9 @@ def create_cct(self): if callpath in callpath_to_node: curr_node = callpath_to_node[callpath] else: - curr_node = Node(node_id, function_name, parent_node, curr_depth) + curr_node = Node( + node_id, function_name, parent_node, curr_depth + ) callpath_to_node[callpath] = curr_node node_id += 1 @@ -424,7 +433,7 @@ def create_cct(self): curr_node.add_calling_context_id(curr_df_index) graph.add_to_map(curr_df_index, curr_node) - + nodes_stack.append(curr_node) graph_nodes[curr_df_index] = curr_node curr_depth += 1 @@ -449,5 +458,5 @@ def read(self): # close the trace and open it later per process trace.close() self.events = self.read_events() # events - self.create_cct() # cct + self.create_cct() # cct return pipit.trace.Trace(self.definitions, self.events) From 9ef0464113351f3b788b121854123da1ce69d068 Mon Sep 17 00:00:00 2001 From: Aditya Ranjan Date: Mon, 12 Sep 2022 13:23:26 -0400 Subject: [PATCH 04/30] style: minor formatting change --- pipit/graph.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipit/graph.py b/pipit/graph.py index bf41d289..f89c53ce 100644 --- a/pipit/graph.py +++ b/pipit/graph.py @@ -9,7 +9,7 @@ class Node: referenced by any calling_context_id directly under it """ - def __init__(self, name_id, name, parent, level = None) -> None: + def __init__(self, name_id, name, parent, level=None) -> None: self.calling_context_ids = [] self.name_id = name_id self.name = name From 62d3caf6953b89043752418c4557f6197a3f2b1f Mon Sep 17 00:00:00 2001 From: Aditya Ranjan Date: Mon, 26 Sep 2022 16:19:39 -0400 Subject: [PATCH 05/30] Feature: Communication Matrix (#12) * tracedata schema * renaming column * tracedata schema * renaming column * move p2p to trace.py * remove .pyc files * cleaned up communication matrix function and added documentation * docs: specify sender/receiver dimensions in top level function comment Co-authored-by: Abhinav Bhatele --- pipit/trace.py | 76 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/pipit/trace.py b/pipit/trace.py index 7442a833..ddd278f9 100644 --- a/pipit/trace.py +++ b/pipit/trace.py @@ -3,6 +3,8 @@ # # SPDX-License-Identifier: MIT +import numpy as np + class Trace: """A trace dataset is read into an object of this type, which includes one @@ -29,3 +31,77 @@ def from_hpctoolkit(dirname): from .readers.hpctoolkit_reader import HPCToolkitReader return HPCToolkitReader(dirname).read() + + def comm_matrix(self, comm_type="bytes"): + """ + Communication Matrix for Peer-to-Peer (P2P) MPI messages + + Arguments: + + 1) comm_type - + string to choose whether the communication volume should be measured + by bytes transferred between two processes or the number of messages + sent (two choices - "bytes" or "counts") + + Returns: + A 2D Numpy Array that represents the communication matrix for all P2P + messages of the given trace + + Note: + The first dimension of the returned 2d array + is senders and the second dimension is receivers + ex) comm_matrix[sender_rank][receiver_rank] + """ + + # get the list of ranks/process ids + # (mpi messages are sent between processes) + ranks = set( + self.events.loc[self.events["Location Group Type"] == "PROCESS"][ + "Location Group ID" + ] + ) + + # create a 2d numpy array that will be returned + # at the end of the function + communication_matrix = np.zeros(shape=(len(ranks), len(ranks))) + + # filter the dataframe by MPI Send and Isend events + sender_dataframe = self.events.loc[ + self.events["Event Type"].isin(["MpiSend", "MpiIsend"]), + ["Location Group ID", "Attributes"], + ] + + # get the mpi ranks of all the sender processes + sender_ranks = sender_dataframe["Location Group ID"].to_list() + + # get the corresponding mpi ranks of the receivers + receiver_ranks = ( + sender_dataframe["Attributes"] + .apply(lambda attrDict: attrDict["receiver"]) + .to_list() + ) + + # number of bytes communicated + if comm_type == "bytes": + # (1 communication is a single row in the sender dataframe) + message_volumes = ( + sender_dataframe["Attributes"] + .apply(lambda attrDict: attrDict["msg_length"]) + .to_list() + ) + elif comm_type == "counts": + # 1 message between the pairs of processes + # for each row in the sender dataframe + message_volumes = np.full(len(sender_dataframe), 1) + + for i in range(len(sender_ranks)): + """ + loops through all the communication events and adds the + message volumes to the corresponding entry of the 2d array + using the sender and receiver ranks + """ + communication_matrix[sender_ranks[i], receiver_ranks[i]] += message_volumes[ + i + ] + + return communication_matrix From 200f5ce9425daf801454663bc4f8cd0c3b88ea3a Mon Sep 17 00:00:00 2001 From: adityaranjan Date: Wed, 28 Sep 2022 16:09:13 -0400 Subject: [PATCH 06/30] feature: add cct function to trace class --- pipit/readers/hpctoolkit_reader.py | 2 +- pipit/readers/otf2_reader.py | 91 ++--------------- pipit/trace.py | 150 ++++++++++++++++++++++++++++- 3 files changed, 154 insertions(+), 89 deletions(-) diff --git a/pipit/readers/hpctoolkit_reader.py b/pipit/readers/hpctoolkit_reader.py index 4b4c7522..4f6a75f3 100644 --- a/pipit/readers/hpctoolkit_reader.py +++ b/pipit/readers/hpctoolkit_reader.py @@ -307,4 +307,4 @@ def read(self): by="Time", axis=0, ascending=True, inplace=True, ignore_index=True ) self.trace_df = trace_df - return pipit.trace.Trace(None, trace_df) + return pipit.trace.Trace(None, trace_df, graph) diff --git a/pipit/readers/otf2_reader.py b/pipit/readers/otf2_reader.py index c7e15600..d223909b 100644 --- a/pipit/readers/otf2_reader.py +++ b/pipit/readers/otf2_reader.py @@ -3,12 +3,12 @@ # # SPDX-License-Identifier: MIT + import math import otf2 import pandas as pd import multiprocessing as mp import pipit.trace -from pipit.graph import Graph, Node class OTF2Reader: @@ -364,88 +364,6 @@ def read_events(self): return events_dataframe - def create_cct(self): - """ - To Do: - Add Comments - - Question: - What are calling context ids for hpctoolkit? Both entry and exit rows - have the same node, but do calling context ids correspond to something - in the DataFrame or something outside of the trace like - the original hpctoolkit data? - - Currently, the DataFrame index of the entry row is being stored - in the node's calling context ids. This doesn't really have much of a - purpose right now. What should we be storing as calling context ids - for OTF2? Perhaps there should also be a way to map entry rows to - corresponding exit rows (could either have a separate matching indices - column or store both entry and exit indices as calling context ids). - """ - - graph = Graph() - callpath_to_node = dict() - graph_nodes = [None for i in range(len(self.events))] - node_id = 0 - - for location_id in set(self.events["Location ID"]): - location_df = self.events.loc[ - (self.events["Name"] != "N/A") - & (self.events["Location ID"] == location_id) - ] - - curr_depth = 0 - callpath = "" - df_indices = list(location_df.index) - function_names = list(location_df["Name"]) - event_types = list(location_df["Event Type"]) - functions_stack, nodes_stack = [], [] - - for i in range(len(location_df)): - curr_df_index, evt_type, function_name = ( - df_indices[i], - event_types[i], - function_names[i], - ) - - if evt_type == "Enter": - functions_stack.append(function_name) - callpath = "->".join(functions_stack) - - if curr_depth == 0: - parent_node = None - else: - parent_node = nodes_stack[-1] - - if callpath in callpath_to_node: - curr_node = callpath_to_node[callpath] - else: - curr_node = Node( - node_id, function_name, parent_node, curr_depth - ) - callpath_to_node[callpath] = curr_node - node_id += 1 - - if curr_depth == 0: - graph.add_root(curr_node) - else: - parent_node.add_child(curr_node) - - curr_node.add_calling_context_id(curr_df_index) - graph.add_to_map(curr_df_index, curr_node) - - nodes_stack.append(curr_node) - graph_nodes[curr_df_index] = curr_node - curr_depth += 1 - else: - curr_node = nodes_stack.pop() - graph_nodes[curr_df_index] = curr_node - - functions_stack.pop() - curr_depth -= 1 - - self.events["Graph_Node"] = graph_nodes - def read(self): """ Returns a TraceData object for the otf2 file @@ -458,5 +376,8 @@ def read(self): # close the trace and open it later per process trace.close() self.events = self.read_events() # events - self.create_cct() # cct - return pipit.trace.Trace(self.definitions, self.events) + + trace = pipit.trace.Trace(self.definitions, self.events, None) + trace.create_cct() # create cct + + return trace diff --git a/pipit/trace.py b/pipit/trace.py index 7442a833..44d1ca3f 100644 --- a/pipit/trace.py +++ b/pipit/trace.py @@ -4,15 +4,20 @@ # SPDX-License-Identifier: MIT +from pipit.graph import Graph, Node + + class Trace: - """A trace dataset is read into an object of this type, which includes one - or more dataframes. + """ + A trace dataset is read into an object of this type, which + includes one or more dataframes and a calling context tree. """ - def __init__(self, definitions, events): + def __init__(self, definitions, events, cct): """Create a new Trace object.""" self.definitions = definitions self.events = events + self.cct = cct @staticmethod def from_otf2(dirname): @@ -29,3 +34,142 @@ def from_hpctoolkit(dirname): from .readers.hpctoolkit_reader import HPCToolkitReader return HPCToolkitReader(dirname).read() + + def create_cct(self): + """ + Generic function to iterate through the trace events and create a CCT. + Uses pipit's graph data structure for this. Populates the trace's CCT + field and creates a new column in the trace's Events DataFrame that stores + a reference to each row's corresponding node in the CCT. + + Thoughts/Concerns: + Currently, the DataFrame index of the entry row is being stored + in the node's calling context ids. This doesn't really have much of a + purpose right now. What should we be storing as calling context ids + for OTF2? Perhaps there should also be a way to map entry rows to + corresponding exit rows. + """ + + # only create the cct if it doesn't exist already + if self.cct is None: + graph = Graph() + callpath_to_node = dict() # used to determine the existence of a node + graph_nodes = [ + None for i in range(len(self.events)) + ] # list of nodes in the DataFrame + node_id = 0 # each node has a unique id + + """ + Iterate through each Location ID which is analagous to a thread + and iterate over its events using a stack to add to the cct + """ + for location_id in set(self.events["Location ID"]): + """ + Filter the DataFrame by Location ID and + events that are only of type Enter/Leave. + """ + location_df = self.events.loc[ + (self.events["Name"] != "N/A") + & (self.events["Location ID"] == location_id) + ] + + curr_depth = 0 + callpath = "" + + """ + Instead of iterating over the DataFrame columns, + we save them as lists and iterate over those as + that is more efficient. + """ + df_indices = list(location_df.index) + function_names = list(location_df["Name"]) + event_types = list(location_df["Event Type"]) + + # stacks used to iterate through the trace and add nodes to the cct + functions_stack, nodes_stack = [], [] + + # iterating over the events of the current location id's trace + for i in range(len(location_df)): + curr_df_index, evt_type, function_name = ( + df_indices[i], + event_types[i], + function_names[i], + ) + + # encounter a new function through its entry point. + if evt_type == "Enter": + # add the function to the stack and get the call path + functions_stack.append(function_name) + callpath = "->".join(functions_stack) + + # get the parent node of the function if it exists + if curr_depth == 0: + parent_node = None + else: + parent_node = nodes_stack[-1] + + if callpath in callpath_to_node: + """ + if a node with the call path + exists, don't create a new one + """ + curr_node = callpath_to_node[callpath] + else: + """ + create a new node with the call path + if it doesn't exist yet + """ + curr_node = Node( + node_id, function_name, parent_node, curr_depth + ) + callpath_to_node[callpath] = curr_node + node_id += 1 + + if curr_depth == 0: + """ + add the newly created node as a + root to the cct if the depth is 0 + """ + graph.add_root(curr_node) + else: + """ + add the newly created node as a child + of its parent if it is not a root + """ + parent_node.add_child(curr_node) + + """ + add the Enter DataFrame index as a calling context id + to the node (multiple function invocations with the + same call path) + """ + curr_node.add_calling_context_id(curr_df_index) + + """ + maps the Enter DataFrame index to the node + + note: + this seems redundant because the node will already + exist in the row's Graph_Node column + """ + graph.add_to_map(curr_df_index, curr_node) + + # Update nodes stack, column, and current depth + nodes_stack.append(curr_node) + graph_nodes[curr_df_index] = curr_node + curr_depth += 1 + else: + """ + Once you encounter the Leave event for a function, + get the corresponding node from the top of the nodes stack. + """ + curr_node = nodes_stack.pop() + graph_nodes[curr_df_index] = curr_node + + # Update functions stack and current depth + functions_stack.pop() + curr_depth -= 1 + + # Update the Trace with the generated cct + self.events["Graph_Node"] = graph_nodes + self.cct = graph From 7f1b7f646c972371bc78f570932d02229435b2a0 Mon Sep 17 00:00:00 2001 From: adityaranjan Date: Fri, 2 Dec 2022 15:26:51 -0500 Subject: [PATCH 07/30] chore: made cct function consistent with recent changes --- pipit/trace.py | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/pipit/trace.py b/pipit/trace.py index 9836feed..39687df4 100644 --- a/pipit/trace.py +++ b/pipit/trace.py @@ -34,7 +34,7 @@ def from_hpctoolkit(dirname): from .readers.hpctoolkit_reader import HPCToolkitReader return HPCToolkitReader(dirname).read() - + @staticmethod def from_projections(dirname): """Read a Projections trace into a new Trace object.""" @@ -101,7 +101,7 @@ def comm_matrix(self, output="size"): .apply(lambda attrDict: attrDict["receiver"]) .to_list() ) - + # the length of the message_volume list created below # is the total number of messages sent @@ -154,19 +154,20 @@ def create_cct(self): ] # list of nodes in the DataFrame node_id = 0 # each node has a unique id + # Filter the DataFrame to only Enter/Leave + enter_leave_df = self.events.loc[ + self.events["Event Type"].isin(["Enter", "Leave"]) + ] + """ - Iterate through each Location ID which is analagous to a thread - and iterate over its events using a stack to add to the cct + Iterate through each Thread and iterate over + its events using a stack to add to the cct """ - for location_id in set(self.events["Location ID"]): + for thread_id in set(self.events["Thread"]): """ - Filter the DataFrame by Location ID and - events that are only of type Enter/Leave. + Filter the DataFrame by Thread """ - location_df = self.events.loc[ - (self.events["Name"] != "N/A") - & (self.events["Location ID"] == location_id) - ] + thread_df = enter_leave_df.loc[enter_leave_df["Thread"] == thread_id] curr_depth = 0 callpath = "" @@ -176,15 +177,15 @@ def create_cct(self): we save them as lists and iterate over those as that is more efficient. """ - df_indices = list(location_df.index) - function_names = list(location_df["Name"]) - event_types = list(location_df["Event Type"]) + df_indices = list(thread_df.index) + function_names = list(thread_df["Name"]) + event_types = list(thread_df["Event Type"]) # stacks used to iterate through the trace and add nodes to the cct functions_stack, nodes_stack = [], [] - # iterating over the events of the current location id's trace - for i in range(len(location_df)): + # iterating over the events of the current thread's trace + for i in range(len(thread_df)): curr_df_index, evt_type, function_name = ( df_indices[i], event_types[i], From bb2992356d390174962d5e393581289635eecc13 Mon Sep 17 00:00:00 2001 From: adityaranjan Date: Fri, 17 Feb 2023 00:23:49 -0500 Subject: [PATCH 08/30] chore: modify cct function to reflect recent changes --- pipit/readers/hpctoolkit_reader.py | 1 - pipit/readers/nsight_reader.py | 1 - pipit/readers/otf2_reader.py | 2 - pipit/readers/projections_reader.py | 3 - pipit/trace.py | 204 +++++++++++++--------------- 5 files changed, 93 insertions(+), 118 deletions(-) diff --git a/pipit/readers/hpctoolkit_reader.py b/pipit/readers/hpctoolkit_reader.py index 946ab4a1..e74ed58c 100644 --- a/pipit/readers/hpctoolkit_reader.py +++ b/pipit/readers/hpctoolkit_reader.py @@ -240,7 +240,6 @@ def read(self): # if it was then we skip it, as to not have multiple sets of # open/close events for a function that it's still in if last_id != calling_context_id: - # updating the trace_db node = graph.get_node(calling_context_id) # the node in the Graph diff --git a/pipit/readers/nsight_reader.py b/pipit/readers/nsight_reader.py index a5271ef4..2205eb27 100644 --- a/pipit/readers/nsight_reader.py +++ b/pipit/readers/nsight_reader.py @@ -29,7 +29,6 @@ def read(self): # check if PID and TID are NOT the same. singlethreaded or multithreaded if self.df["PID"].equals(self.df["TID"]) is False: - # Group the pids together and give each process it's own set of threads for i in pid: # Seeing where the rows of the PIDs match. Grabbing the rows in mask diff --git a/pipit/readers/otf2_reader.py b/pipit/readers/otf2_reader.py index 649192ff..4c8e90ae 100644 --- a/pipit/readers/otf2_reader.py +++ b/pipit/readers/otf2_reader.py @@ -231,11 +231,9 @@ def events_reader(self, rank_size): # iterates through the event's attributes # (ex: region, bytes sent, etc) for key, value in vars(event).items(): - # only adds non-empty attributes # and ignores time so there isn't a duplicate time if value is not None and key != "time": - # uses field_to_val to convert all data types # and ensure that there are no pickling errors attributes_dict[ diff --git a/pipit/readers/projections_reader.py b/pipit/readers/projections_reader.py index e10c4fd9..ac279b45 100644 --- a/pipit/readers/projections_reader.py +++ b/pipit/readers/projections_reader.py @@ -167,7 +167,6 @@ def read_sts_file(self): # add to self.entries elif line_arr[0] == "ENTRY": - # Need to concat entry_name while not line_arr[3].endswith('"'): line_arr[3] = line_arr[3] + " " + line_arr[4] @@ -269,7 +268,6 @@ def __create_empty_dict() -> dict: } def read(self): - if self.num_pes < 1: return None @@ -284,7 +282,6 @@ def read(self): return pipit.trace.Trace(None, trace_df) def __read_log_file(self, pe_num: int) -> pandas.DataFrame: - # has information needed in sts file sts_reader = self.sts_reader diff --git a/pipit/trace.py b/pipit/trace.py index 39687df4..5f33e94e 100644 --- a/pipit/trace.py +++ b/pipit/trace.py @@ -136,22 +136,17 @@ def create_cct(self): Uses pipit's graph data structure for this. Populates the trace's CCT field and creates a new column in the trace's Events DataFrame that stores a reference to each row's corresponding node in the CCT. - - Thoughts/Concerns: - Currently, the DataFrame index of the entry row is being stored - in the node's calling context ids. This doesn't really have much of a - purpose right now. What should we be storing as calling context ids - for OTF2? Perhaps there should also be a way to map entry rows to - corresponding exit rows. """ # only create the cct if it doesn't exist already if self.cct is None: + # CCT and list of nodes in DataFrame graph = Graph() - callpath_to_node = dict() # used to determine the existence of a node - graph_nodes = [ - None for i in range(len(self.events)) - ] # list of nodes in the DataFrame + graph_nodes = [None for i in range(len(self.events))] + + # determines whether a node exists or not + callpath_to_node = dict() + node_id = 0 # each node has a unique id # Filter the DataFrame to only Enter/Leave @@ -159,112 +154,99 @@ def create_cct(self): self.events["Event Type"].isin(["Enter", "Leave"]) ] - """ - Iterate through each Thread and iterate over - its events using a stack to add to the cct - """ - for thread_id in set(self.events["Thread"]): - """ - Filter the DataFrame by Thread - """ - thread_df = enter_leave_df.loc[enter_leave_df["Thread"] == thread_id] - - curr_depth = 0 - callpath = "" - - """ - Instead of iterating over the DataFrame columns, - we save them as lists and iterate over those as - that is more efficient. - """ - df_indices = list(thread_df.index) - function_names = list(thread_df["Name"]) - event_types = list(thread_df["Event Type"]) - - # stacks used to iterate through the trace and add nodes to the cct - functions_stack, nodes_stack = [], [] - - # iterating over the events of the current thread's trace - for i in range(len(thread_df)): - curr_df_index, evt_type, function_name = ( - df_indices[i], - event_types[i], - function_names[i], - ) - - # encounter a new function through its entry point. - if evt_type == "Enter": - # add the function to the stack and get the call path - functions_stack.append(function_name) - callpath = "->".join(functions_stack) - - # get the parent node of the function if it exists - if curr_depth == 0: - parent_node = None - else: - parent_node = nodes_stack[-1] + # Iterating over process & threads to + # read callstack sequentially + for process in set(enter_leave_df["Process"]): + curr_process_df = enter_leave_df.loc[ + enter_leave_df["Process"] == process + ] + + for thread in set(curr_process_df["Thread"]): + # filter by both process and thread + filtered_df = curr_process_df.loc[ + curr_process_df["Thread"] == thread + ] + + curr_depth, callpath = 0, "" + + """ + Iterating over lists instead of + DataFrame columns is more efficient + """ + df_indices = filtered_df.index.to_list() + function_names = filtered_df["Name"].to_list() + event_types = filtered_df["Event Type"].to_list() + + # stacks used to iterate through the trace and add nodes to the cct + functions_stack, nodes_stack = [], [] + + # iterating over the events of the current thread's trace + for i in range(len(filtered_df)): + curr_df_index, evt_type, function_name = ( + df_indices[i], + event_types[i], + function_names[i], + ) + + # encounter a new function through its entry point. + if evt_type == "Enter": + # add the function to the stack and get the call path + functions_stack.append(function_name) + callpath = "->".join(functions_stack) + + # get the parent node of the function if it exists + parent_node = None if curr_depth == 0 else nodes_stack[-1] + + if callpath in callpath_to_node: + # don't create new node if callpath is in map + curr_node = callpath_to_node[callpath] + else: + # create new node if callpath isn't in map + curr_node = Node( + node_id, function_name, parent_node, curr_depth + ) + callpath_to_node[callpath] = curr_node + node_id += 1 + + # add node as root or child of its + # parent depending on current depth + graph.add_root( + curr_node + ) if curr_depth == 0 else parent_node.add_child( + curr_node + ) - if callpath in callpath_to_node: """ - if a node with the call path - exists, don't create a new one + add the Enter DataFrame index as a calling context id """ - curr_node = callpath_to_node[callpath] + curr_node.add_calling_context_id(curr_df_index) + + """ + maps the Enter DataFrame index to the node + + note: + this seems redundant because the node will already + exist in the row's Graph_Node column + """ + graph.add_to_map(curr_df_index, curr_node) + + # Update nodes stack, column, and current depth + nodes_stack.append(curr_node) + graph_nodes[curr_df_index] = curr_node + curr_depth += 1 else: """ - create a new node with the call path - if it doesn't exist yet + Get the corresponding node from top of stack + once you encounter the Leave event for a function """ - curr_node = Node( - node_id, function_name, parent_node, curr_depth - ) - callpath_to_node[callpath] = curr_node - node_id += 1 - - if curr_depth == 0: - """ - add the newly created node as a - root to the cct if the depth is 0 - """ - graph.add_root(curr_node) - else: - """ - add the newly created node as a child - of its parent if it is not a root - """ - parent_node.add_child(curr_node) - - """ - add the Enter DataFrame index as a calling context id - to the node (multiple function invocations with the - same call path) - """ - curr_node.add_calling_context_id(curr_df_index) - - """ - maps the Enter DataFrame index to the node - - note: - this seems redundant because the node will already - exist in the row's Graph_Node column - """ - graph.add_to_map(curr_df_index, curr_node) - - # Update nodes stack, column, and current depth - nodes_stack.append(curr_node) - graph_nodes[curr_df_index] = curr_node - curr_depth += 1 - else: - """ - Once you encounter the Leave event for a function, - get the corresponding node from the top of the nodes stack. - """ - curr_node = nodes_stack.pop() - graph_nodes[curr_df_index] = curr_node - - # Update functions stack and current depth - functions_stack.pop() - curr_depth -= 1 + curr_node = nodes_stack.pop() + + # do we want to store node reference in leave row too? + graph_nodes[curr_df_index] = curr_node + + # Update functions stack and current depth + functions_stack.pop() + curr_depth -= 1 # Update the Trace with the generated cct self.events["Graph_Node"] = graph_nodes From 28d9638265e27c0878ed0f346272d3adf634f4c3 Mon Sep 17 00:00:00 2001 From: adityaranjan Date: Mon, 27 Feb 2023 19:59:55 -0500 Subject: [PATCH 09/30] chore: remove df indices as calling context ids --- pipit/trace.py | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/pipit/trace.py b/pipit/trace.py index 5f33e94e..0098ec19 100644 --- a/pipit/trace.py +++ b/pipit/trace.py @@ -155,7 +155,7 @@ def create_cct(self): ] # Iterating over process & threads to - # read callstack sequentially + # read call stack sequentially for process in set(enter_leave_df["Process"]): curr_process_df = enter_leave_df.loc[ enter_leave_df["Process"] == process @@ -216,20 +216,6 @@ def create_cct(self): curr_node ) - """ - add the Enter DataFrame index as a calling context id - """ - curr_node.add_calling_context_id(curr_df_index) - - """ - maps the Enter DataFrame index to the node - - note: - this seems redundant because the node will already - exist in the row's Graph_Node column - """ - graph.add_to_map(curr_df_index, curr_node) - # Update nodes stack, column, and current depth nodes_stack.append(curr_node) graph_nodes[curr_df_index] = curr_node From e6f5ffd25f7a035ba6456e2ddcd2cfc67a2f2c2b Mon Sep 17 00:00:00 2001 From: Aditya Ranjan Date: Mon, 6 Mar 2023 18:51:08 -0500 Subject: [PATCH 10/30] chore: only store node in enter rows --- pipit/trace.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/pipit/trace.py b/pipit/trace.py index 7bfe04ba..00017670 100644 --- a/pipit/trace.py +++ b/pipit/trace.py @@ -337,9 +337,6 @@ def create_cct(self): """ curr_node = nodes_stack.pop() - # do we want to store node reference in leave row too? - graph_nodes[curr_df_index] = curr_node - # Update functions stack and current depth functions_stack.pop() curr_depth -= 1 From 9dd336677cd55a0ccb8db9e8fea41da891f2c910 Mon Sep 17 00:00:00 2001 From: adityaranjan Date: Mon, 6 Mar 2023 18:57:17 -0500 Subject: [PATCH 11/30] style: minor --- pipit/trace.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pipit/trace.py b/pipit/trace.py index 00017670..2326f0dd 100644 --- a/pipit/trace.py +++ b/pipit/trace.py @@ -239,7 +239,7 @@ def calc_exc_time(self): exc_times[curr_parent_idx] -= inc_times[child_idx] self.events["time.exc"] = exc_times - + def create_cct(self): """ Generic function to iterate through the trace events and create a CCT. @@ -423,7 +423,7 @@ def comm_matrix(self, output="size"): ] return communication_matrix - + def message_histogram(self, bins=20, **kwargs): """Generates histogram of message frequency by size.""" From 50fff4b0fd675234478ea4d07e4de30c930cd8a2 Mon Sep 17 00:00:00 2001 From: adityaranjan Date: Fri, 10 Mar 2023 13:35:46 -0500 Subject: [PATCH 12/30] chore: remove depth column from df and restructure node class --- pipit/graph.py | 36 ++++++++++++++---------------------- pipit/tests/trace.py | 2 +- pipit/trace.py | 33 ++++++++++++--------------------- 3 files changed, 27 insertions(+), 44 deletions(-) diff --git a/pipit/graph.py b/pipit/graph.py index f8f09997..0074c670 100644 --- a/pipit/graph.py +++ b/pipit/graph.py @@ -9,24 +9,19 @@ class Node: referenced by any calling_context_id directly under it """ - def __init__(self, name_id, name, parent, level=None) -> None: - self.calling_context_ids = [] - self.name_id = name_id - self.name = name + def __init__(self, id, parent, level=None) -> None: + self._pipit_nid = id self.children = [] self.parent = parent if level is None: - self.level = self.__calculate_level() + self.level = self._calculate_level() else: self.level = level def add_child(self, child_node): self.children.append(child_node) - def add_calling_context_id(self, calling_context_id): - self.calling_context_ids.append(calling_context_id) - def get_level(self): """This function returns the depth of the current node (a root node would return 0) @@ -35,7 +30,7 @@ def get_level(self): def get_intersection(self, node: "Node"): """Given two nodes, this function returns the interesection of them - starting from their root nodes + starting from their root nodes (least common ancestor) If the two nodes do not share the same root node, their intersection would be None, otherwise it returns the nodes that they have in @@ -76,10 +71,9 @@ def get_node_list(self, min_level): def __str__(self) -> str: return ( - self.name - + ": " - + str(self.calling_context_ids) - + " -- level: " + "ID: " + + str(self._pipit_nid) + + " -- Level: " + str(self.level) ) @@ -88,13 +82,13 @@ def _calculate_level(self): if self.parent is None: return 0 else: - return 1 + self.parent.__calculate_level() + return 1 + self.parent._calculate_level() def __eq__(self, obj) -> bool: if type(obj) != Node: return False else: - return self.calling_context_ids == obj.calling_context_ids + return self._pipit_nid == obj._pipit_nid class Graph: @@ -102,14 +96,12 @@ class Graph: def __init__(self) -> None: self.roots = [] - self.calling_context_id_map = {} - - def add_to_map(self, calling_context_id, node): - """adds association between a calling_context_id and a specific node""" - self.calling_context_id_map[calling_context_id] = node def add_root(self, node): self.roots.append(node) - def get_node(self, calling_context_id) -> "Node": - return self.calling_context_id_map.get(str(calling_context_id)) + def __str__(self) -> str: + return ( + "Roots: " + + str([str(curr_root) for curr_root in self.roots]) + ) \ No newline at end of file diff --git a/pipit/tests/trace.py b/pipit/tests/trace.py index 1cb508fd..3eedffa0 100644 --- a/pipit/tests/trace.py +++ b/pipit/tests/trace.py @@ -102,4 +102,4 @@ def test_match_caller_callee(data_dir, ping_pong_otf2_trace): df = trace.events # all events of the ping pong trace are roots with no children - assert set(df.loc[df["Event Type"] == "Enter"]["Depth"]) == set([0]) + assert len(df.loc[(df["Event Type"] == "Enter") & (df["_parent"].notnull())]) == 0 diff --git a/pipit/trace.py b/pipit/trace.py index 2326f0dd..426c2834 100644 --- a/pipit/trace.py +++ b/pipit/trace.py @@ -116,20 +116,17 @@ def _match_events(self): self.events = self.events.astype({"_matching_event": "Int32"}) def _match_caller_callee(self): - """Matches callers (parents) to callees (children) and adds three + """Matches callers (parents) to callees (children) and adds two columns to the dataframe: - _depth, _parent, and _children + _parent, and _children - _depth is level in the call tree starting at 0. _parent is the dataframe index of a row's parent event. _children is a list of dataframe indices of a row's children events. """ if "_children" not in self.events.columns: children = [None] * len(self.events) - depth, parent = [float("nan")] * len(self.events), [float("nan")] * len( - self.events - ) + parent = [float("nan")] * len(self.events) # only using enter and leave rows # to determine calling relationships @@ -147,8 +144,7 @@ def _match_caller_callee(self): curr_process_df["Thread"] == thread ] - # Depth is the level in the - # Call Tree starting from 0 + # Level in CCT starting from 0 curr_depth = 0 stack = [] @@ -176,30 +172,25 @@ def _match_caller_callee(self): parent[curr_df_index] = parent_df_index - depth[curr_df_index] = curr_depth curr_depth += 1 # add enter dataframe index to stack stack.append(curr_df_index) else: # pop event off stack once matching leave found - # Note: depth, parent, and children for a leave row + # Note: parent, and children for a leave row # can be found using the matching index that # corresponds to the enter row stack.pop() curr_depth -= 1 - self.events["_depth"], self.events["_parent"], self.events["_children"] = ( - depth, - parent, - children, - ) + self.events["_parent"], self.events["_children"] = parent, children - self.events = self.events.astype({"_depth": "Int32", "_parent": "Int32"}) + self.events = self.events.astype({"_parent": "Int32"}) self.events = self.events.astype( - {"_depth": "category", "_parent": "category"} + {"_parent": "category"} ) def calc_inc_time(self): @@ -313,7 +304,7 @@ def create_cct(self): else: # create new node if callpath isn't in map curr_node = Node( - node_id, function_name, parent_node, curr_depth + node_id, parent_node, curr_depth ) callpath_to_node[callpath] = curr_node node_id += 1 @@ -332,10 +323,10 @@ def create_cct(self): curr_depth += 1 else: """ - Get the corresponding node from top of stack - once you encounter the Leave event for a function + Pop node from top of stack once you + encounter the Leave event for a function """ - curr_node = nodes_stack.pop() + nodes_stack.pop() # Update functions stack and current depth functions_stack.pop() From 742cc82ebf9baeaa5616e8118627e750a737fcf6 Mon Sep 17 00:00:00 2001 From: adityaranjan Date: Fri, 10 Mar 2023 13:36:10 -0500 Subject: [PATCH 13/30] style: minor --- pipit/graph.py | 12 ++---------- pipit/trace.py | 8 ++------ 2 files changed, 4 insertions(+), 16 deletions(-) diff --git a/pipit/graph.py b/pipit/graph.py index 0074c670..87f29662 100644 --- a/pipit/graph.py +++ b/pipit/graph.py @@ -70,12 +70,7 @@ def get_node_list(self, min_level): return return_list def __str__(self) -> str: - return ( - "ID: " - + str(self._pipit_nid) - + " -- Level: " - + str(self.level) - ) + return "ID: " + str(self._pipit_nid) + " -- Level: " + str(self.level) def _calculate_level(self): """private function to get depth of node""" @@ -101,7 +96,4 @@ def add_root(self, node): self.roots.append(node) def __str__(self) -> str: - return ( - "Roots: " - + str([str(curr_root) for curr_root in self.roots]) - ) \ No newline at end of file + return "Roots: " + str([str(curr_root) for curr_root in self.roots]) diff --git a/pipit/trace.py b/pipit/trace.py index 426c2834..ad62329e 100644 --- a/pipit/trace.py +++ b/pipit/trace.py @@ -189,9 +189,7 @@ def _match_caller_callee(self): self.events = self.events.astype({"_parent": "Int32"}) - self.events = self.events.astype( - {"_parent": "category"} - ) + self.events = self.events.astype({"_parent": "category"}) def calc_inc_time(self): # Adds "time.inc" column @@ -303,9 +301,7 @@ def create_cct(self): curr_node = callpath_to_node[callpath] else: # create new node if callpath isn't in map - curr_node = Node( - node_id, parent_node, curr_depth - ) + curr_node = Node(node_id, parent_node, curr_depth) callpath_to_node[callpath] = curr_node node_id += 1 From 0240751708f4e12183d388f74f9625f945900375 Mon Sep 17 00:00:00 2001 From: adityaranjan Date: Fri, 10 Mar 2023 14:01:37 -0500 Subject: [PATCH 14/30] chore: return cct for all readers --- pipit/readers/nsight_reader.py | 5 ++++- pipit/readers/otf2_reader.py | 2 +- pipit/readers/projections_reader.py | 5 ++++- pipit/trace.py | 2 +- 4 files changed, 10 insertions(+), 4 deletions(-) diff --git a/pipit/readers/nsight_reader.py b/pipit/readers/nsight_reader.py index cb72c9bf..8470d296 100644 --- a/pipit/readers/nsight_reader.py +++ b/pipit/readers/nsight_reader.py @@ -103,4 +103,7 @@ def read(self): # Applying the column list to the dataframe to rearrange self.df = self.df.loc[:, cols] - return pipit.trace.Trace(None, self.df) + trace = pipit.trace.Trace(None, self.df, None) + trace._create_cct() + + return trace \ No newline at end of file diff --git a/pipit/readers/otf2_reader.py b/pipit/readers/otf2_reader.py index a7a16e7d..330077dc 100644 --- a/pipit/readers/otf2_reader.py +++ b/pipit/readers/otf2_reader.py @@ -434,6 +434,6 @@ def read(self): self.events = self.read_events() # events trace = pipit.trace.Trace(self.definitions, self.events, None) - trace.create_cct() # create cct + trace._create_cct() # create cct return trace diff --git a/pipit/readers/projections_reader.py b/pipit/readers/projections_reader.py index ef327949..6036fe41 100644 --- a/pipit/readers/projections_reader.py +++ b/pipit/readers/projections_reader.py @@ -281,7 +281,10 @@ def read(self): # Concatinate the dataframes list into dataframe containing entire trace trace_df = pandas.concat(dataframes_list, ignore_index=True) - return pipit.trace.Trace(None, trace_df) + trace = pipit.trace.Trace(None, trace_df, None) + trace._create_cct() + + return trace def _read_log_file(self, pe_num: int) -> pandas.DataFrame: # has information needed in sts file diff --git a/pipit/trace.py b/pipit/trace.py index ad62329e..8527722d 100644 --- a/pipit/trace.py +++ b/pipit/trace.py @@ -229,7 +229,7 @@ def calc_exc_time(self): self.events["time.exc"] = exc_times - def create_cct(self): + def _create_cct(self): """ Generic function to iterate through the trace events and create a CCT. Uses pipit's graph data structure for this. Populates the trace's CCT From 89c9c5d8a8ec1f61c2ecdc5964ea2e4924cb879a Mon Sep 17 00:00:00 2001 From: adityaranjan Date: Sat, 11 Mar 2023 18:31:32 -0500 Subject: [PATCH 15/30] chore: revert any changes in other prs already --- pipit/readers/nsight_reader.py | 2 +- pipit/tests/trace.py | 2 +- pipit/trace.py | 28 +++++++++++++++++++--------- 3 files changed, 21 insertions(+), 11 deletions(-) diff --git a/pipit/readers/nsight_reader.py b/pipit/readers/nsight_reader.py index 8470d296..d99e2e2a 100644 --- a/pipit/readers/nsight_reader.py +++ b/pipit/readers/nsight_reader.py @@ -106,4 +106,4 @@ def read(self): trace = pipit.trace.Trace(None, self.df, None) trace._create_cct() - return trace \ No newline at end of file + return trace diff --git a/pipit/tests/trace.py b/pipit/tests/trace.py index 3eedffa0..1cb508fd 100644 --- a/pipit/tests/trace.py +++ b/pipit/tests/trace.py @@ -102,4 +102,4 @@ def test_match_caller_callee(data_dir, ping_pong_otf2_trace): df = trace.events # all events of the ping pong trace are roots with no children - assert len(df.loc[(df["Event Type"] == "Enter") & (df["_parent"].notnull())]) == 0 + assert set(df.loc[df["Event Type"] == "Enter"]["Depth"]) == set([0]) diff --git a/pipit/trace.py b/pipit/trace.py index 8527722d..951e39a1 100644 --- a/pipit/trace.py +++ b/pipit/trace.py @@ -116,17 +116,19 @@ def _match_events(self): self.events = self.events.astype({"_matching_event": "Int32"}) def _match_caller_callee(self): - """Matches callers (parents) to callees (children) and adds two + """Matches callers (parents) to callees (children) and adds three columns to the dataframe: - _parent, and _children - + _depth, _parent, and _children + _depth is level in the call tree starting at 0. _parent is the dataframe index of a row's parent event. _children is a list of dataframe indices of a row's children events. """ if "_children" not in self.events.columns: children = [None] * len(self.events) - parent = [float("nan")] * len(self.events) + depth, parent = [float("nan")] * len(self.events), [float("nan")] * len( + self.events + ) # only using enter and leave rows # to determine calling relationships @@ -144,7 +146,8 @@ def _match_caller_callee(self): curr_process_df["Thread"] == thread ] - # Level in CCT starting from 0 + # Depth is the level in the + # Call Tree starting from 0 curr_depth = 0 stack = [] @@ -172,24 +175,31 @@ def _match_caller_callee(self): parent[curr_df_index] = parent_df_index + depth[curr_df_index] = curr_depth curr_depth += 1 # add enter dataframe index to stack stack.append(curr_df_index) else: # pop event off stack once matching leave found - # Note: parent, and children for a leave row + # Note: depth, parent, and children for a leave row # can be found using the matching index that # corresponds to the enter row stack.pop() curr_depth -= 1 - self.events["_parent"], self.events["_children"] = parent, children + self.events["_depth"], self.events["_parent"], self.events["_children"] = ( + depth, + parent, + children, + ) - self.events = self.events.astype({"_parent": "Int32"}) + self.events = self.events.astype({"_depth": "Int32", "_parent": "Int32"}) - self.events = self.events.astype({"_parent": "category"}) + self.events = self.events.astype( + {"_depth": "category", "_parent": "category"} + ) def calc_inc_time(self): # Adds "time.inc" column From 7074ac7a153a1431126783947a697350a1e53999 Mon Sep 17 00:00:00 2001 From: adityaranjan Date: Sat, 11 Mar 2023 18:32:28 -0500 Subject: [PATCH 16/30] style: minor --- pipit/trace.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pipit/trace.py b/pipit/trace.py index 951e39a1..b539dae6 100644 --- a/pipit/trace.py +++ b/pipit/trace.py @@ -119,6 +119,7 @@ def _match_caller_callee(self): """Matches callers (parents) to callees (children) and adds three columns to the dataframe: _depth, _parent, and _children + _depth is level in the call tree starting at 0. _parent is the dataframe index of a row's parent event. _children is a list of dataframe indices of a row's children events. From 5a4cae3b6763538a7fcffd60dcbb521a43639d82 Mon Sep 17 00:00:00 2001 From: adityaranjan Date: Sat, 11 Mar 2023 18:37:54 -0500 Subject: [PATCH 17/30] feat: make cct compatible with projections --- pipit/trace.py | 142 ++++++++++++++++++++++++++----------------------- 1 file changed, 74 insertions(+), 68 deletions(-) diff --git a/pipit/trace.py b/pipit/trace.py index b539dae6..a8e5c098 100644 --- a/pipit/trace.py +++ b/pipit/trace.py @@ -264,80 +264,86 @@ def _create_cct(self): self.events["Event Type"].isin(["Enter", "Leave"]) ] - # Iterating over process & threads to - # read call stack sequentially - for process in set(enter_leave_df["Process"]): - curr_process_df = enter_leave_df.loc[ - enter_leave_df["Process"] == process - ] - - for thread in set(curr_process_df["Thread"]): - # filter by both process and thread - filtered_df = curr_process_df.loc[ - curr_process_df["Thread"] == thread + # list of processes and/or threads to iterate over + if "Thread" in self.events.columns: + exec_locations = set(zip(self.events["Process"], self.events["Thread"])) + has_thread = True + else: + exec_locations = set(self.events["Process"]) + has_thread = False + + for curr_loc in exec_locations: + # only filter by thread if the trace has a thread column + if has_thread: + curr_process, curr_thread = curr_loc + filtered_df = enter_leave_df.loc[ + (enter_leave_df["Process"] == curr_process) + & (enter_leave_df["Thread"] == curr_thread) + ] + else: + filtered_df = enter_leave_df.loc[ + (enter_leave_df["Process"] == curr_loc) ] - curr_depth, callpath = 0, "" - - """ - Iterating over lists instead of - DataFrame columns is more efficient - """ - df_indices = filtered_df.index.to_list() - function_names = filtered_df["Name"].to_list() - event_types = filtered_df["Event Type"].to_list() + curr_depth, callpath = 0, "" + + """ + Iterating over lists instead of + DataFrame columns is more efficient + """ + df_indices = filtered_df.index.to_list() + function_names = filtered_df["Name"].to_list() + event_types = filtered_df["Event Type"].to_list() + + # stacks used to iterate through the trace and add nodes to the cct + functions_stack, nodes_stack = [], [] + + # iterating over the events of the current thread's trace + for i in range(len(filtered_df)): + curr_df_index, evt_type, function_name = ( + df_indices[i], + event_types[i], + function_names[i], + ) - # stacks used to iterate through the trace and add nodes to the cct - functions_stack, nodes_stack = [], [] + # encounter a new function through its entry point. + if evt_type == "Enter": + # add the function to the stack and get the call path + functions_stack.append(function_name) + callpath = "->".join(functions_stack) - # iterating over the events of the current thread's trace - for i in range(len(filtered_df)): - curr_df_index, evt_type, function_name = ( - df_indices[i], - event_types[i], - function_names[i], - ) + # get the parent node of the function if it exists + parent_node = None if curr_depth == 0 else nodes_stack[-1] - # encounter a new function through its entry point. - if evt_type == "Enter": - # add the function to the stack and get the call path - functions_stack.append(function_name) - callpath = "->".join(functions_stack) - - # get the parent node of the function if it exists - parent_node = None if curr_depth == 0 else nodes_stack[-1] - - if callpath in callpath_to_node: - # don't create new node if callpath is in map - curr_node = callpath_to_node[callpath] - else: - # create new node if callpath isn't in map - curr_node = Node(node_id, parent_node, curr_depth) - callpath_to_node[callpath] = curr_node - node_id += 1 - - # add node as root or child of its - # parent depending on current depth - graph.add_root( - curr_node - ) if curr_depth == 0 else parent_node.add_child( - curr_node - ) - - # Update nodes stack, column, and current depth - nodes_stack.append(curr_node) - graph_nodes[curr_df_index] = curr_node - curr_depth += 1 + if callpath in callpath_to_node: + # don't create new node if callpath is in map + curr_node = callpath_to_node[callpath] else: - """ - Pop node from top of stack once you - encounter the Leave event for a function - """ - nodes_stack.pop() - - # Update functions stack and current depth - functions_stack.pop() - curr_depth -= 1 + # create new node if callpath isn't in map + curr_node = Node(node_id, parent_node, curr_depth) + callpath_to_node[callpath] = curr_node + node_id += 1 + + # add node as root or child of its + # parent depending on current depth + graph.add_root( + curr_node + ) if curr_depth == 0 else parent_node.add_child(curr_node) + + # Update nodes stack, column, and current depth + nodes_stack.append(curr_node) + graph_nodes[curr_df_index] = curr_node + curr_depth += 1 + else: + """ + Pop node from top of stack once you + encounter the Leave event for a function + """ + nodes_stack.pop() + + # Update functions stack and current depth + functions_stack.pop() + curr_depth -= 1 # Update the Trace with the generated cct self.events["Graph_Node"] = graph_nodes From 540e0de5c9aee69095e4a64bfd6385518edbe659 Mon Sep 17 00:00:00 2001 From: adityaranjan Date: Sun, 23 Apr 2023 17:56:40 -0400 Subject: [PATCH 18/30] revert graph changes --- pipit/graph.py | 40 +++++++++++++++++++++++++++------------- 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/pipit/graph.py b/pipit/graph.py index 87f29662..4ae6e461 100644 --- a/pipit/graph.py +++ b/pipit/graph.py @@ -1,3 +1,6 @@ + + + # Copyright 2022-2023 Parallel Software and Systems Group, University of # Maryland. See the top-level LICENSE file for details. # @@ -9,19 +12,20 @@ class Node: referenced by any calling_context_id directly under it """ - def __init__(self, id, parent, level=None) -> None: - self._pipit_nid = id + def __init__(self, name_id, name, parent) -> None: + self.calling_context_ids = [] + self.name_id = name_id + self.name = name self.children = [] self.parent = parent - - if level is None: - self.level = self._calculate_level() - else: - self.level = level + self.level = self._calculate_level() def add_child(self, child_node): self.children.append(child_node) + def add_calling_context_id(self, calling_context_id): + self.calling_context_ids.append(calling_context_id) + def get_level(self): """This function returns the depth of the current node (a root node would return 0) @@ -30,8 +34,7 @@ def get_level(self): def get_intersection(self, node: "Node"): """Given two nodes, this function returns the interesection of them - starting from their root nodes (least common ancestor) - + starting from their root nodes If the two nodes do not share the same root node, their intersection would be None, otherwise it returns the nodes that they have in common (starting from the root) as a new Node @@ -70,7 +73,13 @@ def get_node_list(self, min_level): return return_list def __str__(self) -> str: - return "ID: " + str(self._pipit_nid) + " -- Level: " + str(self.level) + return ( + self.name + + ": " + + str(self.calling_context_ids) + + " -- level: " + + str(self.level) + ) def _calculate_level(self): """private function to get depth of node""" @@ -83,7 +92,7 @@ def __eq__(self, obj) -> bool: if type(obj) != Node: return False else: - return self._pipit_nid == obj._pipit_nid + return self.calling_context_ids == obj.calling_context_ids class Graph: @@ -91,9 +100,14 @@ class Graph: def __init__(self) -> None: self.roots = [] + self.calling_context_id_map = {} + + def add_to_map(self, calling_context_id, node): + """adds association between a calling_context_id and a specific node""" + self.calling_context_id_map[calling_context_id] = node def add_root(self, node): self.roots.append(node) - def __str__(self) -> str: - return "Roots: " + str([str(curr_root) for curr_root in self.roots]) + def get_node(self, calling_context_id) -> "Node": + return self.calling_context_id_map.get(str(calling_context_id)) \ No newline at end of file From 6fb5d5274af05ba2af9f9a07acc923d8bd3e4041 Mon Sep 17 00:00:00 2001 From: adityaranjan Date: Sun, 23 Apr 2023 18:01:27 -0400 Subject: [PATCH 19/30] minor --- pipit/graph.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/pipit/graph.py b/pipit/graph.py index 4ae6e461..8f506f68 100644 --- a/pipit/graph.py +++ b/pipit/graph.py @@ -1,6 +1,3 @@ - - - # Copyright 2022-2023 Parallel Software and Systems Group, University of # Maryland. See the top-level LICENSE file for details. # From 3fd498cf9065869732f550ca19db8d486d708b56 Mon Sep 17 00:00:00 2001 From: adityaranjan Date: Sun, 23 Apr 2023 18:02:00 -0400 Subject: [PATCH 20/30] minor --- pipit/graph.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pipit/graph.py b/pipit/graph.py index 8f506f68..99a0d914 100644 --- a/pipit/graph.py +++ b/pipit/graph.py @@ -32,6 +32,7 @@ def get_level(self): def get_intersection(self, node: "Node"): """Given two nodes, this function returns the interesection of them starting from their root nodes + If the two nodes do not share the same root node, their intersection would be None, otherwise it returns the nodes that they have in common (starting from the root) as a new Node From 83693c37ac1741d2402a771e540d747dde63073c Mon Sep 17 00:00:00 2001 From: adityaranjan Date: Sun, 23 Apr 2023 18:02:37 -0400 Subject: [PATCH 21/30] minor --- pipit/graph.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pipit/graph.py b/pipit/graph.py index 99a0d914..9c3387a7 100644 --- a/pipit/graph.py +++ b/pipit/graph.py @@ -32,7 +32,7 @@ def get_level(self): def get_intersection(self, node: "Node"): """Given two nodes, this function returns the interesection of them starting from their root nodes - + If the two nodes do not share the same root node, their intersection would be None, otherwise it returns the nodes that they have in common (starting from the root) as a new Node @@ -108,4 +108,4 @@ def add_root(self, node): self.roots.append(node) def get_node(self, calling_context_id) -> "Node": - return self.calling_context_id_map.get(str(calling_context_id)) \ No newline at end of file + return self.calling_context_id_map.get(str(calling_context_id)) From 04890c257875cd24301be3df481743e15e817a41 Mon Sep 17 00:00:00 2001 From: adityaranjan Date: Sun, 23 Apr 2023 18:20:00 -0400 Subject: [PATCH 22/30] move cct function into util --- pipit/trace.py | 110 -------------------------------------------- pipit/util/cct.py | 115 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 115 insertions(+), 110 deletions(-) create mode 100644 pipit/util/cct.py diff --git a/pipit/trace.py b/pipit/trace.py index 73e1a886..fd3a0a0e 100644 --- a/pipit/trace.py +++ b/pipit/trace.py @@ -4,7 +4,6 @@ # SPDX-License-Identifier: MIT import numpy as np -from pipit.graph import Graph, Node import pandas as pd @@ -352,115 +351,6 @@ def calc_exc_metrics(self, columns=None): self.events[metric_col_name] = exc_values self.exc_metrics.append(metric_col_name) - def _create_cct(self): - """ - Generic function to iterate through the trace events and create a CCT. - Uses pipit's graph data structure for this. Populates the trace's CCT - field and creates a new column in the trace's Events DataFrame that stores - a reference to each row's corresponding node in the CCT. - """ - - # only create the cct if it doesn't exist already - if self.cct is None: - # CCT and list of nodes in DataFrame - graph = Graph() - graph_nodes = [None for i in range(len(self.events))] - - # determines whether a node exists or not - callpath_to_node = dict() - - node_id = 0 # each node has a unique id - - # Filter the DataFrame to only Enter/Leave - enter_leave_df = self.events.loc[ - self.events["Event Type"].isin(["Enter", "Leave"]) - ] - - # list of processes and/or threads to iterate over - if "Thread" in self.events.columns: - exec_locations = set(zip(self.events["Process"], self.events["Thread"])) - has_thread = True - else: - exec_locations = set(self.events["Process"]) - has_thread = False - - for curr_loc in exec_locations: - # only filter by thread if the trace has a thread column - if has_thread: - curr_process, curr_thread = curr_loc - filtered_df = enter_leave_df.loc[ - (enter_leave_df["Process"] == curr_process) - & (enter_leave_df["Thread"] == curr_thread) - ] - else: - filtered_df = enter_leave_df.loc[ - (enter_leave_df["Process"] == curr_loc) - ] - - curr_depth, callpath = 0, "" - - """ - Iterating over lists instead of - DataFrame columns is more efficient - """ - df_indices = filtered_df.index.to_list() - function_names = filtered_df["Name"].to_list() - event_types = filtered_df["Event Type"].to_list() - - # stacks used to iterate through the trace and add nodes to the cct - functions_stack, nodes_stack = [], [] - - # iterating over the events of the current thread's trace - for i in range(len(filtered_df)): - curr_df_index, evt_type, function_name = ( - df_indices[i], - event_types[i], - function_names[i], - ) - - # encounter a new function through its entry point. - if evt_type == "Enter": - # add the function to the stack and get the call path - functions_stack.append(function_name) - callpath = "->".join(functions_stack) - - # get the parent node of the function if it exists - parent_node = None if curr_depth == 0 else nodes_stack[-1] - - if callpath in callpath_to_node: - # don't create new node if callpath is in map - curr_node = callpath_to_node[callpath] - else: - # create new node if callpath isn't in map - curr_node = Node(node_id, parent_node, curr_depth) - callpath_to_node[callpath] = curr_node - node_id += 1 - - # add node as root or child of its - # parent depending on current depth - graph.add_root( - curr_node - ) if curr_depth == 0 else parent_node.add_child(curr_node) - - # Update nodes stack, column, and current depth - nodes_stack.append(curr_node) - graph_nodes[curr_df_index] = curr_node - curr_depth += 1 - else: - """ - Pop node from top of stack once you - encounter the Leave event for a function - """ - nodes_stack.pop() - - # Update functions stack and current depth - functions_stack.pop() - curr_depth -= 1 - - # Update the Trace with the generated cct - self.events["Graph_Node"] = graph_nodes - self.cct = graph - def comm_matrix(self, output="size"): """ Communication Matrix for Peer-to-Peer (P2P) MPI messages diff --git a/pipit/util/cct.py b/pipit/util/cct.py new file mode 100644 index 00000000..27a81ddd --- /dev/null +++ b/pipit/util/cct.py @@ -0,0 +1,115 @@ +# Copyright 2022-2023 Parallel Software and Systems Group, University of +# Maryland. See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: MIT + +from pipit.graph import Graph, Node + + +def create_cct(events): + """ + Generic function to iterate through the events dataframe and create a CCT. + Uses pipit's graph data structure for this. Returns a CCT + and creates a new column in the Events DataFrame that stores + a reference to each row's corresponding node in the CCT. + """ + + # CCT and list of nodes in DataFrame + graph = Graph() + graph_nodes = [None for i in range(len(events))] + + # determines whether a node exists or not + callpath_to_node = dict() + + node_id = 0 # each node has a unique id + + # Filter the DataFrame to only Enter/Leave + enter_leave_df = events.loc[ + events["Event Type"].isin(["Enter", "Leave"]) + ] + + # list of processes and/or threads to iterate over + if "Thread" in events.columns: + exec_locations = set(zip(events["Process"], events["Thread"])) + has_thread = True + else: + exec_locations = set(events["Process"]) + has_thread = False + + for curr_loc in exec_locations: + # only filter by thread if the trace has a thread column + if has_thread: + curr_process, curr_thread = curr_loc + filtered_df = enter_leave_df.loc[ + (enter_leave_df["Process"] == curr_process) + & (enter_leave_df["Thread"] == curr_thread) + ] + else: + filtered_df = enter_leave_df.loc[ + (enter_leave_df["Process"] == curr_loc) + ] + + curr_depth, callpath = 0, "" + + """ + Iterating over lists instead of + DataFrame columns is more efficient + """ + df_indices = filtered_df.index.to_list() + function_names = filtered_df["Name"].to_list() + event_types = filtered_df["Event Type"].to_list() + + # stacks used to iterate through the trace and add nodes to the cct + functions_stack, nodes_stack = [], [] + + # iterating over the events of the current thread's trace + for i in range(len(filtered_df)): + curr_df_index, evt_type, function_name = ( + df_indices[i], + event_types[i], + function_names[i], + ) + + # encounter a new function through its entry point. + if evt_type == "Enter": + # add the function to the stack and get the call path + functions_stack.append(function_name) + callpath = "->".join(functions_stack) + + # get the parent node of the function if it exists + parent_node = None if curr_depth == 0 else nodes_stack[-1] + + if callpath in callpath_to_node: + # don't create new node if callpath is in map + curr_node = callpath_to_node[callpath] + else: + # create new node if callpath isn't in map + curr_node = Node(node_id, parent_node, curr_depth) + callpath_to_node[callpath] = curr_node + node_id += 1 + + # add node as root or child of its + # parent depending on current depth + graph.add_root( + curr_node + ) if curr_depth == 0 else parent_node.add_child(curr_node) + + # Update nodes stack, column, and current depth + nodes_stack.append(curr_node) + graph_nodes[curr_df_index] = curr_node + curr_depth += 1 + else: + """ + Pop node from top of stack once you + encounter the Leave event for a function + """ + nodes_stack.pop() + + # Update functions stack and current depth + functions_stack.pop() + curr_depth -= 1 + + # Update the Trace with the generated cct + events["Graph_Node"] = graph_nodes + + return graph \ No newline at end of file From 8677038bca67a5b47982efd3585d042d88d19764 Mon Sep 17 00:00:00 2001 From: adityaranjan Date: Sun, 23 Apr 2023 18:23:23 -0400 Subject: [PATCH 23/30] minor --- pipit/readers/nsight_reader.py | 6 ++++-- pipit/readers/otf2_reader.py | 6 ++++-- pipit/readers/projections_reader.py | 9 +++++---- 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/pipit/readers/nsight_reader.py b/pipit/readers/nsight_reader.py index d99e2e2a..fea4d972 100644 --- a/pipit/readers/nsight_reader.py +++ b/pipit/readers/nsight_reader.py @@ -5,6 +5,7 @@ import pandas as pd import pipit.trace +from pipit.util.cct import create_cct class NsightReader: @@ -102,8 +103,9 @@ def read(self): # Applying the column list to the dataframe to rearrange self.df = self.df.loc[:, cols] + + cct = create_cct(self.df) - trace = pipit.trace.Trace(None, self.df, None) - trace._create_cct() + trace = pipit.trace.Trace(None, self.df, cct) return trace diff --git a/pipit/readers/otf2_reader.py b/pipit/readers/otf2_reader.py index aef22490..b2c1795c 100644 --- a/pipit/readers/otf2_reader.py +++ b/pipit/readers/otf2_reader.py @@ -8,6 +8,7 @@ import pandas as pd import multiprocessing as mp import pipit.trace +from util.cct import create_cct class OTF2Reader: @@ -516,7 +517,8 @@ def read(self): self.events = self.read_events() # events - trace = pipit.trace.Trace(self.definitions, self.events, None) - trace._create_cct() # create cct + cct = create_cct(self.events) + + trace = pipit.trace.Trace(self.definitions, self.events, cct) return trace diff --git a/pipit/readers/projections_reader.py b/pipit/readers/projections_reader.py index fb99bb60..12541da0 100644 --- a/pipit/readers/projections_reader.py +++ b/pipit/readers/projections_reader.py @@ -3,11 +3,11 @@ # # SPDX-License-Identifier: MIT - +import os import gzip import pandas import pipit.trace -import os +from util.cct import create_cct class ProjectionsConstants: @@ -286,8 +286,9 @@ def read(self): # Concatinate the dataframes list into dataframe containing entire trace trace_df = pandas.concat(dataframes_list, ignore_index=True) - trace = pipit.trace.Trace(None, trace_df, None) - trace._create_cct() + cct = create_cct(trace_df) + + trace = pipit.trace.Trace(None, trace_df, cct) return trace From 2440b4ee538cadc1335b30d9808e502aab1ed4f8 Mon Sep 17 00:00:00 2001 From: adityaranjan Date: Sun, 23 Apr 2023 18:23:37 -0400 Subject: [PATCH 24/30] style --- pipit/readers/nsight_reader.py | 2 +- pipit/util/cct.py | 10 +++------- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/pipit/readers/nsight_reader.py b/pipit/readers/nsight_reader.py index fea4d972..a7d4e56f 100644 --- a/pipit/readers/nsight_reader.py +++ b/pipit/readers/nsight_reader.py @@ -103,7 +103,7 @@ def read(self): # Applying the column list to the dataframe to rearrange self.df = self.df.loc[:, cols] - + cct = create_cct(self.df) trace = pipit.trace.Trace(None, self.df, cct) diff --git a/pipit/util/cct.py b/pipit/util/cct.py index 27a81ddd..3bb491bf 100644 --- a/pipit/util/cct.py +++ b/pipit/util/cct.py @@ -24,9 +24,7 @@ def create_cct(events): node_id = 0 # each node has a unique id # Filter the DataFrame to only Enter/Leave - enter_leave_df = events.loc[ - events["Event Type"].isin(["Enter", "Leave"]) - ] + enter_leave_df = events.loc[events["Event Type"].isin(["Enter", "Leave"])] # list of processes and/or threads to iterate over if "Thread" in events.columns: @@ -45,9 +43,7 @@ def create_cct(events): & (enter_leave_df["Thread"] == curr_thread) ] else: - filtered_df = enter_leave_df.loc[ - (enter_leave_df["Process"] == curr_loc) - ] + filtered_df = enter_leave_df.loc[(enter_leave_df["Process"] == curr_loc)] curr_depth, callpath = 0, "" @@ -112,4 +108,4 @@ def create_cct(events): # Update the Trace with the generated cct events["Graph_Node"] = graph_nodes - return graph \ No newline at end of file + return graph From 2393c52851a3d7d20d4893e88d6ebf773c9abc6f Mon Sep 17 00:00:00 2001 From: adityaranjan Date: Sun, 23 Apr 2023 18:38:32 -0400 Subject: [PATCH 25/30] minor --- pipit/readers/otf2_reader.py | 2 +- pipit/readers/projections_reader.py | 2 +- pipit/util/__init__.py | 4 ++++ 3 files changed, 6 insertions(+), 2 deletions(-) create mode 100644 pipit/util/__init__.py diff --git a/pipit/readers/otf2_reader.py b/pipit/readers/otf2_reader.py index b2c1795c..3f43f707 100644 --- a/pipit/readers/otf2_reader.py +++ b/pipit/readers/otf2_reader.py @@ -8,7 +8,7 @@ import pandas as pd import multiprocessing as mp import pipit.trace -from util.cct import create_cct +from pipit.util.cct import create_cct class OTF2Reader: diff --git a/pipit/readers/projections_reader.py b/pipit/readers/projections_reader.py index 12541da0..33ac5336 100644 --- a/pipit/readers/projections_reader.py +++ b/pipit/readers/projections_reader.py @@ -7,7 +7,7 @@ import gzip import pandas import pipit.trace -from util.cct import create_cct +from pipit.util.cct import create_cct class ProjectionsConstants: diff --git a/pipit/util/__init__.py b/pipit/util/__init__.py new file mode 100644 index 00000000..925b92c5 --- /dev/null +++ b/pipit/util/__init__.py @@ -0,0 +1,4 @@ +# Copyright 2022-2023 Parallel Software and Systems Group, University of +# Maryland. See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: MIT From d4db686702b3f5adb52657fb06c63b93f0ce84b8 Mon Sep 17 00:00:00 2001 From: adityaranjan Date: Sun, 22 Oct 2023 16:35:49 -0400 Subject: [PATCH 26/30] chore: Merge --- pipit/readers/hpctoolkit_reader.py | 4 +++- pipit/readers/nsight_reader.py | 10 +++++----- pipit/readers/otf2_reader.py | 10 +++++----- pipit/readers/projections_reader.py | 11 ++++++----- pipit/trace.py | 8 +++++++- 5 files changed, 26 insertions(+), 17 deletions(-) diff --git a/pipit/readers/hpctoolkit_reader.py b/pipit/readers/hpctoolkit_reader.py index 060cb1f1..b7fecbfb 100644 --- a/pipit/readers/hpctoolkit_reader.py +++ b/pipit/readers/hpctoolkit_reader.py @@ -1356,5 +1356,7 @@ def read(self) -> pipit.trace.Trace: } ) + # cct is needed to create trace in hpctoolkit, + # so always return it as part of the trace self.trace_df = trace_df - return pipit.trace.Trace(None, trace_df, graph) + return pipit.trace.Trace(None, trace_df, self.cct) diff --git a/pipit/readers/nsight_reader.py b/pipit/readers/nsight_reader.py index a7d4e56f..05043f6f 100644 --- a/pipit/readers/nsight_reader.py +++ b/pipit/readers/nsight_reader.py @@ -5,15 +5,15 @@ import pandas as pd import pipit.trace -from pipit.util.cct import create_cct class NsightReader: """Reader for Nsight trace files""" - def __init__(self, file_name) -> None: + def __init__(self, file_name, create_cct=True) -> None: self.file_name = file_name self.df = None + self.create_cct = create_cct def read(self): """ @@ -104,8 +104,8 @@ def read(self): # Applying the column list to the dataframe to rearrange self.df = self.df.loc[:, cols] - cct = create_cct(self.df) - - trace = pipit.trace.Trace(None, self.df, cct) + trace = pipit.trace.Trace(None, self.df) + if self.create_cct: + trace.create_cct() return trace diff --git a/pipit/readers/otf2_reader.py b/pipit/readers/otf2_reader.py index dd298337..8d3edb8f 100644 --- a/pipit/readers/otf2_reader.py +++ b/pipit/readers/otf2_reader.py @@ -8,15 +8,15 @@ import pandas as pd import multiprocessing as mp import pipit.trace -from pipit.util.cct import create_cct class OTF2Reader: """Reader for OTF2 trace files""" - def __init__(self, dir_name, num_processes=None): + def __init__(self, dir_name, num_processes=None, create_cct=True): self.dir_name = dir_name # directory of otf2 file being read self.file_name = self.dir_name + "/traces.otf2" + self.create_cct = create_cct num_cpus = mp.cpu_count() if num_processes is None or num_processes < 1 or num_processes > num_cpus: @@ -517,8 +517,8 @@ def read(self): self.events = self.read_events() # events - cct = create_cct(self.events) - - trace = pipit.trace.Trace(self.definitions, self.events, cct) + trace = pipit.trace.Trace(self.definitions, self.events) + if self.create_cct: + trace.create_cct() return trace diff --git a/pipit/readers/projections_reader.py b/pipit/readers/projections_reader.py index 9a154a58..f5efb246 100644 --- a/pipit/readers/projections_reader.py +++ b/pipit/readers/projections_reader.py @@ -8,7 +8,6 @@ import pipit.trace import pandas as pd import multiprocessing as mp -from pipit.util.cct import create_cct class ProjectionsConstants: @@ -225,7 +224,7 @@ def read_sts_file(self): class ProjectionsReader: - def __init__(self, projections_directory: str, num_processes=None) -> None: + def __init__(self, projections_directory: str, num_processes=None, create_cct=True) -> None: if not os.path.isdir(projections_directory): raise ValueError("Not a valid directory.") @@ -270,6 +269,8 @@ def __init__(self, projections_directory: str, num_processes=None) -> None: else: self.num_processes = num_processes + self.create_cct = create_cct + # Returns an empty dict, used for reading log file into dataframe @staticmethod def _create_empty_dict() -> dict: @@ -317,9 +318,9 @@ def read(self): ["Timestamp (ns)", "Event Type", "Name", "Process", "Attributes"] ] - cct = create_cct(trace_df) - - trace = pipit.trace.Trace(None, trace_df, cct) + trace = pipit.trace.Trace(None, trace_df) + if self.create_cct: + trace.create_cct() return trace diff --git a/pipit/trace.py b/pipit/trace.py index 655827eb..685ed997 100644 --- a/pipit/trace.py +++ b/pipit/trace.py @@ -5,6 +5,7 @@ import numpy as np import pandas as pd +from pipit.util.cct import create_cct class Trace: @@ -13,7 +14,7 @@ class Trace: includes one or more dataframes and a calling context tree. """ - def __init__(self, definitions, events, cct): + def __init__(self, definitions, events, cct=None): """Create a new Trace object.""" self.definitions = definitions self.events = events @@ -28,6 +29,11 @@ def __init__(self, definitions, events, cct): self.inc_metrics = [] self.exc_metrics = [] + def create_cct(self): + # adds a column of cct nodes to the events dataframe + # and stores the graph object in self.cct + self.cct = create_cct(self.events) + @staticmethod def from_otf2(dirname, num_processes=None): """Read an OTF2 trace into a new Trace object.""" From 32c16dc36e4b99841f3e363f0053417310b4d9b5 Mon Sep 17 00:00:00 2001 From: adityaranjan Date: Fri, 3 Nov 2023 12:19:39 -0400 Subject: [PATCH 27/30] temp --- pipit/readers/hpctoolkit_reader.py | 2 +- pipit/readers/nsight_reader.py | 2 +- pipit/readers/otf2_reader.py | 2 +- pipit/readers/projections_reader.py | 2 +- pipit/util/cct.py | 1 + 5 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pipit/readers/hpctoolkit_reader.py b/pipit/readers/hpctoolkit_reader.py index b7fecbfb..af140911 100644 --- a/pipit/readers/hpctoolkit_reader.py +++ b/pipit/readers/hpctoolkit_reader.py @@ -1359,4 +1359,4 @@ def read(self) -> pipit.trace.Trace: # cct is needed to create trace in hpctoolkit, # so always return it as part of the trace self.trace_df = trace_df - return pipit.trace.Trace(None, trace_df, self.cct) + return pipit.trace.Trace(None, trace_df, self.meta_reader.cct) diff --git a/pipit/readers/nsight_reader.py b/pipit/readers/nsight_reader.py index 05043f6f..a956fb83 100644 --- a/pipit/readers/nsight_reader.py +++ b/pipit/readers/nsight_reader.py @@ -10,7 +10,7 @@ class NsightReader: """Reader for Nsight trace files""" - def __init__(self, file_name, create_cct=True) -> None: + def __init__(self, file_name, create_cct=False) -> None: self.file_name = file_name self.df = None self.create_cct = create_cct diff --git a/pipit/readers/otf2_reader.py b/pipit/readers/otf2_reader.py index 8d3edb8f..b1fae889 100644 --- a/pipit/readers/otf2_reader.py +++ b/pipit/readers/otf2_reader.py @@ -13,7 +13,7 @@ class OTF2Reader: """Reader for OTF2 trace files""" - def __init__(self, dir_name, num_processes=None, create_cct=True): + def __init__(self, dir_name, num_processes=None, create_cct=False): self.dir_name = dir_name # directory of otf2 file being read self.file_name = self.dir_name + "/traces.otf2" self.create_cct = create_cct diff --git a/pipit/readers/projections_reader.py b/pipit/readers/projections_reader.py index f5efb246..40a59818 100644 --- a/pipit/readers/projections_reader.py +++ b/pipit/readers/projections_reader.py @@ -224,7 +224,7 @@ def read_sts_file(self): class ProjectionsReader: - def __init__(self, projections_directory: str, num_processes=None, create_cct=True) -> None: + def __init__(self, projections_directory: str, num_processes=None, create_cct=False) -> None: if not os.path.isdir(projections_directory): raise ValueError("Not a valid directory.") diff --git a/pipit/util/cct.py b/pipit/util/cct.py index 3bb491bf..a17a55a8 100644 --- a/pipit/util/cct.py +++ b/pipit/util/cct.py @@ -104,6 +104,7 @@ def create_cct(events): # Update functions stack and current depth functions_stack.pop() curr_depth -= 1 + # Update the Trace with the generated cct events["Graph_Node"] = graph_nodes From 4acf0f8345e540c17aba65f94540196e9ecae049 Mon Sep 17 00:00:00 2001 From: adityaranjan Date: Fri, 10 Nov 2023 01:59:34 -0500 Subject: [PATCH 28/30] fixed cct function to handle unmatched events --- pipit/readers/projections_reader.py | 4 +++- pipit/util/cct.py | 24 ++++++++++++++---------- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/pipit/readers/projections_reader.py b/pipit/readers/projections_reader.py index 40a59818..38153e8f 100644 --- a/pipit/readers/projections_reader.py +++ b/pipit/readers/projections_reader.py @@ -224,7 +224,9 @@ def read_sts_file(self): class ProjectionsReader: - def __init__(self, projections_directory: str, num_processes=None, create_cct=False) -> None: + def __init__( + self, projections_directory: str, num_processes=None, create_cct=False + ) -> None: if not os.path.isdir(projections_directory): raise ValueError("Not a valid directory.") diff --git a/pipit/util/cct.py b/pipit/util/cct.py index a17a55a8..b973a783 100644 --- a/pipit/util/cct.py +++ b/pipit/util/cct.py @@ -95,16 +95,20 @@ def create_cct(events): graph_nodes[curr_df_index] = curr_node curr_depth += 1 else: - """ - Pop node from top of stack once you - encounter the Leave event for a function - """ - nodes_stack.pop() - - # Update functions stack and current depth - functions_stack.pop() - curr_depth -= 1 - + # we want to iterate through the stack in reverse order + # until we find the corresponding "Enter" Event + enter_name, j = None, len(functions_stack) - 1 + while enter_name != function_name and j > -1: + enter_name = functions_stack[j] + j -= 1 + + if enter_name == function_name: + # update stacks and current depth + del functions_stack[j + 1] + del nodes_stack[j + 1] + curr_depth -= 1 + else: + continue # Update the Trace with the generated cct events["Graph_Node"] = graph_nodes From 4f544c908d34bc4e617a29b331179d54f0c98ddf Mon Sep 17 00:00:00 2001 From: adityaranjan Date: Fri, 10 Nov 2023 02:09:33 -0500 Subject: [PATCH 29/30] minor --- pipit/trace.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pipit/trace.py b/pipit/trace.py index 685ed997..9ef0793f 100644 --- a/pipit/trace.py +++ b/pipit/trace.py @@ -35,12 +35,12 @@ def create_cct(self): self.cct = create_cct(self.events) @staticmethod - def from_otf2(dirname, num_processes=None): + def from_otf2(dirname, num_processes=None, create_cct=False): """Read an OTF2 trace into a new Trace object.""" # import this lazily to avoid circular dependencies from .readers.otf2_reader import OTF2Reader - return OTF2Reader(dirname, num_processes).read() + return OTF2Reader(dirname, num_processes, create_cct).read() @staticmethod def from_hpctoolkit(dirname): @@ -51,20 +51,20 @@ def from_hpctoolkit(dirname): return HPCToolkitReader(dirname).read() @staticmethod - def from_projections(dirname, num_processes=None): + def from_projections(dirname, num_processes=None, create_cct=False): """Read a Projections trace into a new Trace object.""" # import this lazily to avoid circular dependencies from .readers.projections_reader import ProjectionsReader - return ProjectionsReader(dirname, num_processes).read() + return ProjectionsReader(dirname, num_processes, create_cct).read() @staticmethod - def from_nsight(filename): + def from_nsight(filename, create_cct=False): """Read an Nsight trace into a new Trace object.""" # import this lazily to avoid circular dependencies from .readers.nsight_reader import NsightReader - return NsightReader(filename).read() + return NsightReader(filename, create_cct).read() @staticmethod def from_csv(filename): From f1d80c01620631e9b67eef0f09f6a4b310f641d7 Mon Sep 17 00:00:00 2001 From: Abhinav Bhatele Date: Tue, 14 Nov 2023 14:27:16 -0700 Subject: [PATCH 30/30] update year --- pipit/util/__init__.py | 4 ++-- pipit/util/cct.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pipit/util/__init__.py b/pipit/util/__init__.py index 925b92c5..f2647290 100644 --- a/pipit/util/__init__.py +++ b/pipit/util/__init__.py @@ -1,4 +1,4 @@ -# Copyright 2022-2023 Parallel Software and Systems Group, University of -# Maryland. See the top-level LICENSE file for details. +# Copyright 2023 Parallel Software and Systems Group, University of Maryland. +# See the top-level LICENSE file for details. # # SPDX-License-Identifier: MIT diff --git a/pipit/util/cct.py b/pipit/util/cct.py index b973a783..6557a588 100644 --- a/pipit/util/cct.py +++ b/pipit/util/cct.py @@ -1,5 +1,5 @@ -# Copyright 2022-2023 Parallel Software and Systems Group, University of -# Maryland. See the top-level LICENSE file for details. +# Copyright 2023 Parallel Software and Systems Group, University of Maryland. +# See the top-level LICENSE file for details. # # SPDX-License-Identifier: MIT