Skip to content

Commit

Permalink
updates to have strided unique_id, take care of instant events, and t…
Browse files Browse the repository at this point in the history
…o concat trace dfs
  • Loading branch information
movsesyanae committed Oct 14, 2024
1 parent 65e4332 commit b959fee
Showing 1 changed file with 47 additions and 12 deletions.
59 changes: 47 additions & 12 deletions pipit/readers/core_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,26 @@

import pandas
import numpy
from pipit.trace import Trace


class CoreTraceReader:
"""
Helper Object to read traces from different sources and convert them into a common format
"""

def __init__(self):
def __init__(self, start: int = 0, stride: int = 1):
"""
Should be called by each process to create an empty trace per process in the reader. Creates the following
data structures to represent an empty trace:
- events: Dict[int, Dict[int, List[Dict]]]
- stacks: Dict[int, Dict[int, List[int]]]
"""
# keep stride for how much unique id should be incremented
self.stride = stride

# keep track of a unique id for each event
self.unique_id = -1
self.unique_id = start - self.stride

# events are indexed by process number, then thread number
# stores a list of events
Expand Down Expand Up @@ -64,7 +68,9 @@ def add_event(self, event: Dict) -> None:

# if the event is an enter event, add the event to the stack and update the parent-child relationships
if event["Event Type"] == "Enter":
self.__update_parent_child_relationships(event, stack, event_list)
self.__update_parent_child_relationships(event, stack, event_list, False)
elif event["Event Type"] == "Instant":
self.__update_parent_child_relationships(event, stack, event_list, True)
# if the event is a leave event, update the matching event and pop from the stack
elif event["Event Type"] == "Leave":
self.__update_match_event(event, stack, event_list)
Expand All @@ -82,22 +88,35 @@ def finalize(self):
all_events.extend(self.events[process][thread])

# create a dataframe
events_dataframe = pandas.DataFrame(all_events)
return events_dataframe

def __update_parent_child_relationships(self, event: Dict, stack: List[int], event_list: List[Dict]) -> None:
trace_df = pandas.DataFrame(all_events)

# categorical for memory savings
trace_df = trace_df.astype(
{
"Name": "category",
"Event Type": "category",
"Process": "category",
"_matching_event": "Int32",
"_parent": "Int32",
"_matching_timestamp": "Int32",
}
)
return trace_df

def __update_parent_child_relationships(self, event: Dict, stack: List[int], event_list: List[Dict],is_instant: bool) -> None:
"""
This method can be thought of the update upon an "Enter" event. It adds to the stack and CCT
"""
if len(stack) == 0:
# root event
event["parent"] = numpy.nan
event["_parent"] = numpy.nan
else:
parent_event = event_list[stack[-1]]
event["parent"] = parent_event["unique_id"]
event["_parent"] = parent_event["unique_id"]

# update stack
stack.append(len(event_list))
if not is_instant:
stack.append(len(event_list))

def __update_match_event(self, leave_event: Dict, stack: List[int], event_list: List[Dict]) -> None:
"""
Expand All @@ -108,7 +127,7 @@ def __update_match_event(self, leave_event: Dict, stack: List[int], event_list:
while len(stack) > 0:

# popping matched events from the stack
enter_event = event_list[stack.pop(-1)]
enter_event = event_list[stack.pop()]

if enter_event["Name"] == leave_event["Name"]:
# matching event found
Expand All @@ -117,8 +136,24 @@ def __update_match_event(self, leave_event: Dict, stack: List[int], event_list:
leave_event["_matching_event"] = enter_event["unique_id"]
enter_event["_matching_event"] = leave_event["unique_id"]

# update matching timestamps
leave_event["_matching_timestamp"] = enter_event["Timestamp (ns)"]
enter_event["_matching_timestamp"] = leave_event["Timestamp (ns)"]

break

def __get_unique_id(self) -> int:
self.unique_id += 1
self.unique_id += self.stride
return self.unique_id

def concat_trace_data(data_list):
"""
Concatenates the data from multiple trace readers into a single trace reader
"""
trace_data = pandas.concat(data_list, ignore_index=True)
# set index to unique_id
trace_data.set_index("unique_id", inplace=True)
trace_data.sort_values(
by="Timestamp (ns)", axis=0, ascending=True, inplace=True, ignore_index=True
)
return Trace(None, trace_data, None)

0 comments on commit b959fee

Please sign in to comment.