Skip to content

Commit

Permalink
fixes issue with same timestamps messing up the ordering of events
Browse files Browse the repository at this point in the history
  • Loading branch information
adityaranjan committed Apr 10, 2024
1 parent ef6bd05 commit dcb1402
Showing 1 changed file with 38 additions and 7 deletions.
45 changes: 38 additions & 7 deletions pipit/readers/pytorch_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import multiprocessing as mp


# TODO: also, all the columns need to be made numeric, categorical, etc
# TODO: need to handle duration events - B and E (look at chrome trace format documentation)
# TODO: compare with HTA tool reader
# TODO: add comments
# TODO: unit tests
Expand Down Expand Up @@ -60,7 +60,20 @@ def events_reader(self, rank_size):

del df["dur"]
df["ph"].replace({"X": "Enter", "i": "Instant"}, inplace=True)
df = pd.concat([df, temp_df], ignore_index=True)

complete_events_indices = complete_events_df.index.values

new_df_index = np.full(len(df.index.values), 0)
new_df_index[complete_events_indices] = 1
new_df_index = np.roll(new_df_index.cumsum(), 1)
new_df_index[0] = 0

df.index = df.index.values + new_df_index

temp_df.index = df.index.values[complete_events_indices] + 1

df = pd.concat([df, temp_df])
df.sort_index(inplace=True)

df["Rank"] = np.full(len(df), data["distributedInfo"]["rank"])

Expand All @@ -69,9 +82,6 @@ def events_reader(self, rank_size):
df = pd.concat(dfs)

df["ts"] *= 1000
# or do we want to leave these unchanged
# since this is technically changing the trace?
df["ts"] -= min(df["ts"])

df.rename(
columns={
Expand All @@ -84,7 +94,7 @@ def events_reader(self, rank_size):
inplace=True,
)

# is there a more performant way of doing this?
# TODO: is there a more performant way of doing this?
attribute_cols = set(df.columns) - set(
["Event Type", "Name", "Rank", "Process", "Thread", "Timestamp (ns)"]
)
Expand Down Expand Up @@ -120,21 +130,42 @@ def read(self):
events_df = pd.concat(events_dfs)
del events_dfs

events_df.sort_values(by="Timestamp (ns)", ignore_index=True, inplace=True)
# stable sorting so that order of events with same timestamps isn't "corrupted"
# TODO: this change needs to be made in all readers
events_df.sort_values(by="Timestamp (ns)", ignore_index=True, inplace=True, kind="stable")

definitions_df = events_df.loc[events_df["Event Type"] == "M"]
definitions_df.reset_index(inplace=True)

events_df = events_df.loc[events_df["Event Type"] != "M"]
events_df.reset_index(inplace=True, drop=True)

events_df = events_df.astype(
{
"Event Type": "category",
"Name": "category",
"Rank": "category",
"Process": "category",
"Thread": "category",
}
)

definitions_df.rename(
columns={"Name": "Definition Type", "args": "Attributes"}, inplace=True
)
definitions_df = definitions_df[
["Definition Type", "Rank", "Process", "Thread", "Attributes"]
]

definitions_df = definitions_df.astype(
{
"Definition Type": "category",
"Rank": "category",
"Process": "category",
"Thread": "category",
}
)

trace = pipit.trace.Trace(definitions_df, events_df)
if self.create_cct:
trace.create_cct()
Expand Down

0 comments on commit dcb1402

Please sign in to comment.