From b959feed321a35e736cbfc06902dca8c4625cd10 Mon Sep 17 00:00:00 2001 From: Alexander Movsesyan Date: Sun, 13 Oct 2024 21:46:50 -0400 Subject: [PATCH] updates to have strided unique_id, take care of instant events, and to concat trace dfs --- pipit/readers/core_reader.py | 59 ++++++++++++++++++++++++++++-------- 1 file changed, 47 insertions(+), 12 deletions(-) diff --git a/pipit/readers/core_reader.py b/pipit/readers/core_reader.py index 9c08fcb..8fd8aa7 100644 --- a/pipit/readers/core_reader.py +++ b/pipit/readers/core_reader.py @@ -3,6 +3,7 @@ import pandas import numpy +from pipit.trace import Trace class CoreTraceReader: @@ -10,15 +11,18 @@ class CoreTraceReader: Helper Object to read traces from different sources and convert them into a common format """ - def __init__(self): + def __init__(self, start: int = 0, stride: int = 1): """ Should be called by each process to create an empty trace per process in the reader. Creates the following data structures to represent an empty trace: - events: Dict[int, Dict[int, List[Dict]]] - stacks: Dict[int, Dict[int, List[int]]] """ + # keep stride for how much unique id should be incremented + self.stride = stride + # keep track of a unique id for each event - self.unique_id = -1 + self.unique_id = start - self.stride # events are indexed by process number, then thread number # stores a list of events @@ -64,7 +68,9 @@ def add_event(self, event: Dict) -> None: # if the event is an enter event, add the event to the stack and update the parent-child relationships if event["Event Type"] == "Enter": - self.__update_parent_child_relationships(event, stack, event_list) + self.__update_parent_child_relationships(event, stack, event_list, False) + elif event["Event Type"] == "Instant": + self.__update_parent_child_relationships(event, stack, event_list, True) # if the event is a leave event, update the matching event and pop from the stack elif event["Event Type"] == "Leave": self.__update_match_event(event, stack, event_list) @@ -82,22 +88,35 @@ def finalize(self): all_events.extend(self.events[process][thread]) # create a dataframe - events_dataframe = pandas.DataFrame(all_events) - return events_dataframe - - def __update_parent_child_relationships(self, event: Dict, stack: List[int], event_list: List[Dict]) -> None: + trace_df = pandas.DataFrame(all_events) + + # categorical for memory savings + trace_df = trace_df.astype( + { + "Name": "category", + "Event Type": "category", + "Process": "category", + "_matching_event": "Int32", + "_parent": "Int32", + "_matching_timestamp": "Int32", + } + ) + return trace_df + + def __update_parent_child_relationships(self, event: Dict, stack: List[int], event_list: List[Dict],is_instant: bool) -> None: """ This method can be thought of the update upon an "Enter" event. It adds to the stack and CCT """ if len(stack) == 0: # root event - event["parent"] = numpy.nan + event["_parent"] = numpy.nan else: parent_event = event_list[stack[-1]] - event["parent"] = parent_event["unique_id"] + event["_parent"] = parent_event["unique_id"] # update stack - stack.append(len(event_list)) + if not is_instant: + stack.append(len(event_list)) def __update_match_event(self, leave_event: Dict, stack: List[int], event_list: List[Dict]) -> None: """ @@ -108,7 +127,7 @@ def __update_match_event(self, leave_event: Dict, stack: List[int], event_list: while len(stack) > 0: # popping matched events from the stack - enter_event = event_list[stack.pop(-1)] + enter_event = event_list[stack.pop()] if enter_event["Name"] == leave_event["Name"]: # matching event found @@ -117,8 +136,24 @@ def __update_match_event(self, leave_event: Dict, stack: List[int], event_list: leave_event["_matching_event"] = enter_event["unique_id"] enter_event["_matching_event"] = leave_event["unique_id"] + # update matching timestamps + leave_event["_matching_timestamp"] = enter_event["Timestamp (ns)"] + enter_event["_matching_timestamp"] = leave_event["Timestamp (ns)"] + break def __get_unique_id(self) -> int: - self.unique_id += 1 + self.unique_id += self.stride return self.unique_id + +def concat_trace_data(data_list): + """ + Concatenates the data from multiple trace readers into a single trace reader + """ + trace_data = pandas.concat(data_list, ignore_index=True) + # set index to unique_id + trace_data.set_index("unique_id", inplace=True) + trace_data.sort_values( + by="Timestamp (ns)", axis=0, ascending=True, inplace=True, ignore_index=True + ) + return Trace(None, trace_data, None)