From dcb1402df11f6514eb9ba5e63e34e601b1cf119f Mon Sep 17 00:00:00 2001 From: adityaranjan Date: Wed, 10 Apr 2024 18:38:46 -0400 Subject: [PATCH] fixes issue with same timestamps messing up the ordering of events --- pipit/readers/pytorch_reader.py | 45 ++++++++++++++++++++++++++++----- 1 file changed, 38 insertions(+), 7 deletions(-) diff --git a/pipit/readers/pytorch_reader.py b/pipit/readers/pytorch_reader.py index 19c951a6..58b5138d 100644 --- a/pipit/readers/pytorch_reader.py +++ b/pipit/readers/pytorch_reader.py @@ -6,7 +6,7 @@ import multiprocessing as mp -# TODO: also, all the columns need to be made numeric, categorical, etc +# TODO: need to handle duration events - B and E (look at chrome trace format documentation) # TODO: compare with HTA tool reader # TODO: add comments # TODO: unit tests @@ -60,7 +60,20 @@ def events_reader(self, rank_size): del df["dur"] df["ph"].replace({"X": "Enter", "i": "Instant"}, inplace=True) - df = pd.concat([df, temp_df], ignore_index=True) + + complete_events_indices = complete_events_df.index.values + + new_df_index = np.full(len(df.index.values), 0) + new_df_index[complete_events_indices] = 1 + new_df_index = np.roll(new_df_index.cumsum(), 1) + new_df_index[0] = 0 + + df.index = df.index.values + new_df_index + + temp_df.index = df.index.values[complete_events_indices] + 1 + + df = pd.concat([df, temp_df]) + df.sort_index(inplace=True) df["Rank"] = np.full(len(df), data["distributedInfo"]["rank"]) @@ -69,9 +82,6 @@ def events_reader(self, rank_size): df = pd.concat(dfs) df["ts"] *= 1000 - # or do we want to leave these unchanged - # since this is technically changing the trace? - df["ts"] -= min(df["ts"]) df.rename( columns={ @@ -84,7 +94,7 @@ def events_reader(self, rank_size): inplace=True, ) - # is there a more performant way of doing this? + # TODO: is there a more performant way of doing this? attribute_cols = set(df.columns) - set( ["Event Type", "Name", "Rank", "Process", "Thread", "Timestamp (ns)"] ) @@ -120,7 +130,9 @@ def read(self): events_df = pd.concat(events_dfs) del events_dfs - events_df.sort_values(by="Timestamp (ns)", ignore_index=True, inplace=True) + # stable sorting so that order of events with same timestamps isn't "corrupted" + # TODO: this change needs to be made in all readers + events_df.sort_values(by="Timestamp (ns)", ignore_index=True, inplace=True, kind="stable") definitions_df = events_df.loc[events_df["Event Type"] == "M"] definitions_df.reset_index(inplace=True) @@ -128,6 +140,16 @@ def read(self): events_df = events_df.loc[events_df["Event Type"] != "M"] events_df.reset_index(inplace=True, drop=True) + events_df = events_df.astype( + { + "Event Type": "category", + "Name": "category", + "Rank": "category", + "Process": "category", + "Thread": "category", + } + ) + definitions_df.rename( columns={"Name": "Definition Type", "args": "Attributes"}, inplace=True ) @@ -135,6 +157,15 @@ def read(self): ["Definition Type", "Rank", "Process", "Thread", "Attributes"] ] + definitions_df = definitions_df.astype( + { + "Definition Type": "category", + "Rank": "category", + "Process": "category", + "Thread": "category", + } + ) + trace = pipit.trace.Trace(definitions_df, events_df) if self.create_cct: trace.create_cct()