Skip to content

Commit

Permalink
[!] fix drop_duplicates keep last element
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhongFuze committed Sep 28, 2024
1 parent f85b14c commit 2d9f007
Showing 1 changed file with 15 additions and 10 deletions.
25 changes: 15 additions & 10 deletions src/jobs/ens_graphdb_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
Author: Zella Zhong
Date: 2024-09-26 16:48:23
LastEditors: Zella Zhong
LastEditTime: 2024-09-29 02:13:11
LastEditTime: 2024-09-29 02:50:12
FilePath: /data_process/src/jobs/ens_graphdb_job.py
Description:
'''
Expand Down Expand Up @@ -155,13 +155,13 @@ def process_ensname_temp_reverse(self):
ethereum_df.to_csv(etheruem_path, sep='\t', index=False)

identities_graph_df = temp_reverse_df[['graph_id', 'updated_nanosecond']].copy()
identities_graph_df = identities_graph_df.drop_duplicates(subset=['graph_id'])
identities_graph_df = identities_graph_df.drop_duplicates(subset=['graph_id'], keep='last')
identities_graph_df.to_csv(identities_graph_path, sep='\t', index=False)

partof_ethereum = temp_reverse_df[['reverse_address', 'graph_id']].copy()
partof_ethereum['ethereum_unique_id'] = partof_ethereum['reverse_address'].apply(lambda x: f"ethereum,{x}")
partof_ethereum = partof_ethereum[['ethereum_unique_id', 'graph_id']]
partof_ethereum = partof_ethereum.drop_duplicates(subset=['ethereum_unique_id', 'graph_id'])
partof_ethereum = partof_ethereum.drop_duplicates(subset=['ethereum_unique_id', 'graph_id'], keep='last')
# rename the columns
partof_ethereum = partof_ethereum.rename(columns={
'ethereum_unique_id': 'from',
Expand All @@ -171,7 +171,7 @@ def process_ensname_temp_reverse(self):
partof_ensname = temp_reverse_df[['name', 'graph_id']].copy()
partof_ensname['ens_unique_id'] = partof_ensname['name'].apply(lambda x: f"ens,{x}")
partof_ensname = partof_ensname[['ens_unique_id', 'graph_id']]
partof_ensname = partof_ensname.drop_duplicates(subset=['ens_unique_id', 'graph_id'])
partof_ensname = partof_ensname.drop_duplicates(subset=['ens_unique_id', 'graph_id'], keep='last')
# rename the columns
partof_ensname = partof_ensname.rename(columns={
'ens_unique_id': 'from',
Expand Down Expand Up @@ -244,7 +244,7 @@ def process_ensname_identity_graph(self):
ethereum_df['platform'] = 'ethereum'
ethereum_df['update_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
ethereum_df = ethereum_df[['primary_id', 'platform', 'identity', 'update_time']]
ethereum_df = ethereum_df.drop_duplicates(subset=['identity'])
ethereum_df = ethereum_df.drop_duplicates(subset=['identity'], keep='last')

name_df = ensnames_df[['name']].copy()
name_df['primary_id'] = name_df['name'].apply(lambda x: f"ens,{x}")
Expand Down Expand Up @@ -346,7 +346,7 @@ def process_ensname_identity_graph(self):

identities_graph_df = final_df[['ethereum_graph_id', 'ethereum_updated_nanosecond']].copy()
identities_graph_df = identities_graph_df[['ethereum_graph_id', 'ethereum_updated_nanosecond']]
identities_graph_df = identities_graph_df.drop_duplicates(subset=['ethereum_graph_id'])
identities_graph_df = identities_graph_df.drop_duplicates(subset=['ethereum_graph_id'], keep='last')
# rename the columns
identities_graph_df = identities_graph_df.rename(columns={
'ethereum_graph_id': 'primary_id',
Expand All @@ -357,7 +357,7 @@ def process_ensname_identity_graph(self):

partof_ethereum = final_df[['ethereum_unique_id', 'ethereum_graph_id']].copy()
partof_ethereum = partof_ethereum[['ethereum_unique_id', 'ethereum_graph_id']]
partof_ethereum = partof_ethereum.drop_duplicates(subset=['ethereum_unique_id', 'ethereum_graph_id'])
partof_ethereum = partof_ethereum.drop_duplicates(subset=['ethereum_unique_id', 'ethereum_graph_id'], keep='last')
# rename the columns
partof_ethereum = partof_ethereum.rename(columns={
'ethereum_unique_id': 'from',
Expand All @@ -366,7 +366,7 @@ def process_ensname_identity_graph(self):

partof_ensname = final_df[['ens_unique_id', 'ens_graph_id']].copy()
partof_ensname = partof_ensname[['ens_unique_id', 'ens_graph_id']]
partof_ensname = partof_ensname.drop_duplicates(subset=['ens_unique_id', 'ens_graph_id'])
partof_ensname = partof_ensname.drop_duplicates(subset=['ens_unique_id', 'ens_graph_id'], keep='last')
# rename the columns
partof_ensname = partof_ensname.rename(columns={
'ens_unique_id': 'from',
Expand Down Expand Up @@ -400,7 +400,7 @@ def process_ensname_identity_graph(self):

final_graph_id_df = pd.concat([ethereum_part, ens_part], ignore_index=True)
final_graph_id_df = final_graph_id_df[['unique_id', 'graph_id', 'platform', 'identity', 'updated_nanosecond']]
final_graph_id_df = final_graph_id_df.drop_duplicates(subset=['unique_id', 'graph_id'])
final_graph_id_df = final_graph_id_df.drop_duplicates(subset=['unique_id'], keep='last')
final_graph_id_df.to_csv(allocation_path, index=False, quoting=csv.QUOTE_ALL)
logging.debug("Successfully save %s row_count: %d", allocation_path, final_graph_id_df.shape[0])

Expand All @@ -427,6 +427,11 @@ def save_graph_id(self, dump_batch_size=10000):
if not os.path.exists(allocation_path):
raise FileNotFoundError(f"No data path {allocation_path}")

# df = pd.read_csv(allocation_path)
# print(df.shape[0])
# df_deduped = df.drop_duplicates(subset=['unique_id'], keep='last')
# print(df_deduped.shape[0])

start = time.time()
logging.info("saving graph_id allocation start at: %s", \
time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start)))
Expand Down Expand Up @@ -677,5 +682,5 @@ def dumps_to_graphdb(self):
logger.InitLogger(config)

EnsGraphDB().process_ensname_identity_graph()
EnsGraphDB().save_graph_id()
# EnsGraphDB().save_graph_id()
# EnsGraphDB().run_loading_job()

0 comments on commit 2d9f007

Please sign in to comment.