Skip to content

Commit

Permalink
[#] isolate ethereum vertex
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhongFuze committed Sep 28, 2024
1 parent 2d9f007 commit 90e03f6
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 15 deletions.
4 changes: 2 additions & 2 deletions src/data_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
Author: Zella Zhong
Date: 2024-09-12 19:05:02
LastEditors: Zella Zhong
LastEditTime: 2024-09-29 00:11:01
LastEditTime: 2024-09-29 04:40:50
FilePath: /data_process/src/data_process.py
Description:
'''
Expand Down Expand Up @@ -84,7 +84,7 @@ def ensname_graphdb_job():
id='farcaster_process_job'
)
farcaster_extras_job_trigger = CronTrigger(
year="*", month="*", day="*", hour="18", minute="30", second="0"
year="*", month="*", day="*", hour="15", minute="30", second="0"
)
scheduler.add_job(
farcaster_extras_job,
Expand Down
156 changes: 143 additions & 13 deletions src/jobs/ens_graphdb_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
Author: Zella Zhong
Date: 2024-09-26 16:48:23
LastEditors: Zella Zhong
LastEditTime: 2024-09-29 02:50:12
LastEditTime: 2024-09-29 04:34:21
FilePath: /data_process/src/jobs/ens_graphdb_job.py
Description:
'''
Expand Down Expand Up @@ -40,6 +40,18 @@
import setting
from utils.timeutils import get_unix_milliconds

def isolate_logic(row):
# ['owner', 'ethereum_unique_id', 'unique_id', 'graph_id', 'updated_nanosecond']
ethereum_graph_id = row['graph_id']
ethereum_updated_ns = row['updated_nanosecond']
if pd.notna(ethereum_graph_id):
# Case 1: exist
return ethereum_graph_id, int(ethereum_updated_ns), "exist"
elif pd.isna(ethereum_graph_id):
# Case 2: ethereum_unique_id does not exist
new_graph_id = str(uuid.uuid4())
current_time_ns = int(get_unix_milliconds())
return new_graph_id, current_time_ns, "missing"

def combine_logic(row):
ens_graph_id = row['graph_id_ens']
Expand Down Expand Up @@ -201,7 +213,7 @@ def process_ensname_temp_reverse(self):
read_conn.close()

def process_ensname_identity_graph(self):
graphdb_process_dirs = os.path.join(setting.Settings["datapath"], "tigergraph/import_graphs/ensname")
graphdb_process_dirs = os.path.join(setting.Settings["datapath"], "tigergraph/import_graphs/ensname_test")
if not os.path.exists(graphdb_process_dirs):
os.makedirs(graphdb_process_dirs)

Expand All @@ -226,12 +238,15 @@ def process_ensname_identity_graph(self):
try:
ensname = "ensname"
columns = ['name', 'is_wrapped', 'wrapped_owner', 'owner', 'resolved_address', 'reverse_address']
select_sql = "SELECT %s FROM %s WHERE name is not null" % (",".join(columns), ensname)
select_sql = "SELECT %s FROM %s WHERE name is not null order by id limit 3000" % (",".join(columns), ensname)
cursor.execute(select_sql)
rows = cursor.fetchall()
ensnames_df = pd.DataFrame(rows, columns=columns)
logging.debug("Successfully load table ensname row_count: %d" % ensnames_df.shape[0])

# Filter rows where 'name' length is less than 1024 characters
ensnames_df = ensnames_df[ensnames_df['name'].str.len() < 1024]

# Check if 'is_wrapped' is True, then replace 'owner' with 'wrapped_owner'
ensnames_df.loc[ensnames_df['is_wrapped'] == True, 'owner'] = ensnames_df['wrapped_owner']
ensnames_df = ensnames_df[['name', 'owner', 'resolved_address', 'reverse_address']]
Expand Down Expand Up @@ -260,6 +275,7 @@ def process_ensname_identity_graph(self):
# Hold.csv
hold_df = ensnames_df[ensnames_df['owner'].notna()]
hold_grouped = hold_df.groupby(['name', 'owner'], as_index=False).first()
hold_grouped = hold_grouped[hold_grouped['owner'] != '0x0000000000000000000000000000000000000000']
hold_grouped['from'] = hold_grouped.apply(lambda x: f"ethereum,{x['owner']}", axis=1)
hold_grouped['to'] = hold_grouped.apply(lambda x: f"ens,{x['name']}", axis=1)
hold_grouped['source'] = "ens"
Expand All @@ -271,6 +287,7 @@ def process_ensname_identity_graph(self):
# Resolve.csv
resolve_df = ensnames_df[ensnames_df['resolved_address'].notna()]
resolve_grouped = resolve_df.groupby(['name', 'resolved_address'], as_index=False).first()
resolve_grouped = resolve_grouped[resolve_grouped['resolved_address'] != '0x0000000000000000000000000000000000000000']
resolve_grouped['from'] = resolve_grouped.apply(lambda x: f"ens,{x['name']}", axis=1)
resolve_grouped['to'] = resolve_grouped.apply(lambda x: f"ethereum,{x['resolved_address']}", axis=1)
resolve_grouped['source'] = "ens"
Expand All @@ -282,6 +299,7 @@ def process_ensname_identity_graph(self):
# Reverse.csv
reverse_resolve_df = ensnames_df[ensnames_df['reverse_address'].notna()]
reverse_grouped = reverse_resolve_df.groupby(['name', 'reverse_address'], as_index=False).first()
reverse_grouped = reverse_grouped[reverse_grouped['reverse_address'] != '0x0000000000000000000000000000000000000000']
reverse_grouped['from'] = reverse_grouped.apply(lambda x: f"ethereum,{x['reverse_address']}", axis=1)
reverse_grouped['to'] = reverse_grouped.apply(lambda x: f"ens,{x['name']}", axis=1)
reverse_grouped['source'] = "ens"
Expand All @@ -299,16 +317,85 @@ def process_ensname_identity_graph(self):
allocation_df = pd.DataFrame(rows, columns=columns)
logging.debug("Successfully load table graph_id row_count: %d", allocation_df.shape[0])

# isolate vertex
# owner not null resolved_address is null, reverse_address is null:
# only owner can apply a graph_id
owner_filter_df = ensnames_df[
(ensnames_df['owner'].notna()) &
(ensnames_df['resolved_address'].isna()) &
(ensnames_df['reverse_address'].isna()) &
(ensnames_df['owner'] != "0x0000000000000000000000000000000000000000")]
owner_filter_df = owner_filter_df[['owner']].copy()
owner_isolate_df = owner_filter_df.drop_duplicates(subset=['owner'], keep='first')
owner_isolate_df['ethereum_unique_id'] = owner_isolate_df.apply(lambda x: f"ethereum,{x['owner']}", axis=1)
# Merge result columns: ['owner', 'ethereum_unique_id', 'unique_id', 'graph_id', 'updated_nanosecond']
owner_isolate_df = pd.merge(owner_isolate_df, allocation_df[['unique_id', 'graph_id', 'updated_nanosecond']],
left_on='ethereum_unique_id', right_on='unique_id', how='left', suffixes=('', '_ethereum'))
# After apply(isolate_logic) columns:
# ['owner', 'ethereum_unique_id', 'unique_id', 'graph_id',
# 'updated_nanosecond', 'ethereum_graph_id',
# 'ethereum_updated_nanosecond', 'combine_type']
owner_isolate_df[[
'ethereum_graph_id',
'ethereum_updated_nanosecond',
'combine_type'
]] = owner_isolate_df.apply(isolate_logic, axis=1, result_type="expand")
owner_isolate_df = owner_isolate_df[['owner', 'ethereum_unique_id', 'ethereum_graph_id', 'ethereum_updated_nanosecond', 'combine_type']]
owner_isolate_df = owner_isolate_df[owner_isolate_df['combine_type'] != "exist"]
print(owner_isolate_df.columns)
print(owner_isolate_df.head(10))
logging.debug("Successfully filter owner_isolate_df row_count: %d", owner_isolate_df.shape[0])

# owner not null, resolved_address not null, reverse_address is null:
# and owner != resolved_address
# only resolved_address can apply a graph_id
resolved_filter_df = ensnames_df[
(ensnames_df['owner'].notna()) &
(ensnames_df['resolved_address'].notna()) &
(ensnames_df['reverse_address'].isna()) &
(ensnames_df['owner'] != ensnames_df['resolved_address']) &
(ensnames_df['resolved_address'] != "0x0000000000000000000000000000000000000000")]
resolved_filter_df = resolved_filter_df[['resolved_address']].copy()
resolved_isolate_df = resolved_filter_df.drop_duplicates(subset=['resolved_address'], keep='first')
resolved_isolate_df['ethereum_unique_id'] = resolved_isolate_df.apply(lambda x: f"ethereum,{x['resolved_address']}", axis=1)
resolved_isolate_df = pd.merge(resolved_isolate_df, allocation_df[['unique_id', 'graph_id', 'updated_nanosecond']],
left_on='ethereum_unique_id', right_on='unique_id', how='left', suffixes=('', '_ethereum'))
resolved_isolate_df[[
'ethereum_graph_id',
'ethereum_updated_nanosecond',
'combine_type'
]] = resolved_isolate_df.apply(isolate_logic, axis=1, result_type="expand")
resolved_isolate_df = resolved_isolate_df[['resolved_address', 'ethereum_unique_id', 'ethereum_graph_id', 'ethereum_updated_nanosecond', 'combine_type']]
resolved_isolate_df = resolved_isolate_df[resolved_isolate_df['combine_type'] != "exist"]
print(resolved_isolate_df.columns)
print(resolved_isolate_df.head(10))
logging.debug("Successfully filter resolved_isolate_df row_count: %d", resolved_isolate_df.shape[0])

# ensname resolved_address not null, reverse_address not null:
# and resolved_address == reverse_address
# resolved_address can add to identity_graph
additional_df = ensnames_df[
(ensnames_df['resolved_address'].notna()) &
(ensnames_df['reverse_address'].notna()) &
(ensnames_df['resolved_address'] == ensnames_df['reverse_address']) &
(ensnames_df['resolved_address'] != '0x0000000000000000000000000000000000000000')]
additional_df = additional_df[['name', 'owner', 'resolved_address']]
print(additional_df.columns)
print(additional_df.head(10))
logging.debug("Successfully filter additional_df(resolved_address == reverse_address) row_count: %d", additional_df.shape[0])

# only resolved_address == owner can add to identity_graph, otherwise ens just `Hold`
filter_df = ensnames_df[(ensnames_df['owner'].notna()) &
(ensnames_df['resolved_address'].notna()) &
(ensnames_df['owner'] == ensnames_df['resolved_address'])]
filter_df = filter_df[['name', 'owner', 'resolved_address']]
filter_df = filter_df[filter_df['resolved_address'] != '0x0000000000000000000000000000000000000000']

# concat unique_id
filter_df['ens_unique_id'] = "ens," + filter_df['name']
filter_df['ethereum_unique_id'] = "ethereum," + filter_df['resolved_address']
final_df = filter_df[['ens_unique_id', 'ethereum_unique_id', 'name', 'resolved_address']]
final_df = pd.concat([filter_df, additional_df])
final_df['ens_unique_id'] = "ens," + final_df['name']
final_df['ethereum_unique_id'] = "ethereum," + final_df['resolved_address']
final_df = final_df[['ens_unique_id', 'ethereum_unique_id', 'name', 'resolved_address']]

logging.debug("Start merge final_df row_count: %d and allocation_df row_count: %d", final_df.shape[0], allocation_df.shape[0])
# merge final_df with allocation_df for both `ens_unique_id` and `ethereum_unique_id`
Expand Down Expand Up @@ -346,18 +433,30 @@ def process_ensname_identity_graph(self):

identities_graph_df = final_df[['ethereum_graph_id', 'ethereum_updated_nanosecond']].copy()
identities_graph_df = identities_graph_df[['ethereum_graph_id', 'ethereum_updated_nanosecond']]
identities_graph_df = identities_graph_df.drop_duplicates(subset=['ethereum_graph_id'], keep='last')
# rename the columns
identities_graph_df = identities_graph_df.rename(columns={
'ethereum_graph_id': 'primary_id',
'ethereum_updated_nanosecond': 'updated_nanosecond'
})
identities_graph_df.to_csv(identities_graph_path, sep='\t', index=False)
logging.debug("Successfully save %s row_count: %d", identities_graph_path, identities_graph_df.shape[0])

owner_ig_df = owner_isolate_df[['ethereum_graph_id', 'ethereum_updated_nanosecond']].copy()
owner_ig_df = owner_ig_df.rename(columns={
'ethereum_graph_id': 'primary_id',
'ethereum_updated_nanosecond': 'updated_nanosecond'
})
resolved_ig_df = resolved_isolate_df[['ethereum_graph_id', 'ethereum_updated_nanosecond']].copy()
resolved_ig_df = resolved_ig_df.rename(columns={
'ethereum_graph_id': 'primary_id',
'ethereum_updated_nanosecond': 'updated_nanosecond'
})

final_identities_graph_df = pd.concat([identities_graph_df, owner_ig_df, resolved_ig_df])
final_identities_graph_df = final_identities_graph_df.drop_duplicates(subset=['primary_id'], keep='last')
final_identities_graph_df.to_csv(identities_graph_path, sep='\t', index=False)
logging.debug("Successfully save %s row_count: %d", identities_graph_path, final_identities_graph_df.shape[0])

partof_ethereum = final_df[['ethereum_unique_id', 'ethereum_graph_id']].copy()
partof_ethereum = partof_ethereum[['ethereum_unique_id', 'ethereum_graph_id']]
partof_ethereum = partof_ethereum.drop_duplicates(subset=['ethereum_unique_id', 'ethereum_graph_id'], keep='last')
# rename the columns
partof_ethereum = partof_ethereum.rename(columns={
'ethereum_unique_id': 'from',
Expand All @@ -366,14 +465,25 @@ def process_ensname_identity_graph(self):

partof_ensname = final_df[['ens_unique_id', 'ens_graph_id']].copy()
partof_ensname = partof_ensname[['ens_unique_id', 'ens_graph_id']]
partof_ensname = partof_ensname.drop_duplicates(subset=['ens_unique_id', 'ens_graph_id'], keep='last')
# rename the columns
partof_ensname = partof_ensname.rename(columns={
'ens_unique_id': 'from',
'ens_graph_id': 'to'
})

part_of_identities_graph_df = pd.concat([partof_ensname, partof_ethereum])
owner_partof_ig_df = owner_isolate_df[['ethereum_unique_id', 'ethereum_graph_id']].copy()
owner_partof_ig_df = owner_partof_ig_df.rename(columns={
'ethereum_unique_id': 'from',
'ethereum_graph_id': 'to'
})
resolved_partof_ig_df = resolved_isolate_df[['ethereum_unique_id', 'ethereum_graph_id']].copy()
resolved_partof_ig_df = resolved_partof_ig_df.rename(columns={
'ethereum_unique_id': 'from',
'ethereum_graph_id': 'to'
})

part_of_identities_graph_df = pd.concat([partof_ensname, partof_ethereum, owner_partof_ig_df, resolved_partof_ig_df])
part_of_identities_graph_df = part_of_identities_graph_df.drop_duplicates(subset=['from', 'to'], keep='last')
part_of_identities_graph_df.to_csv(part_of_identities_graph_path, sep='\t', index=False)
logging.debug("Successfully save %s row_count: %d", part_of_identities_graph_path, part_of_identities_graph_df.shape[0])

Expand All @@ -398,7 +508,25 @@ def process_ensname_identity_graph(self):
'ens_updated_nanosecond': 'updated_nanosecond'
})

final_graph_id_df = pd.concat([ethereum_part, ens_part], ignore_index=True)
owner_isolate_part = owner_isolate_df[['ethereum_unique_id', 'ethereum_graph_id', 'owner', 'ethereum_updated_nanosecond']].copy()
owner_isolate_part['platform'] = 'ethereum'
owner_isolate_part = owner_isolate_part.rename(columns={
'ethereum_unique_id': 'unique_id',
'ethereum_graph_id': 'graph_id',
'owner': 'identity',
'ethereum_updated_nanosecond': 'updated_nanosecond'
})

resolved_isolate_part = resolved_isolate_df[['ethereum_unique_id', 'ethereum_graph_id', 'resolved_address', 'ethereum_updated_nanosecond']].copy()
resolved_isolate_part['platform'] = 'ethereum'
resolved_isolate_part = resolved_isolate_part.rename(columns={
'ethereum_unique_id': 'unique_id',
'ethereum_graph_id': 'graph_id',
'resolved_address': 'identity',
'ethereum_updated_nanosecond': 'updated_nanosecond'
})

final_graph_id_df = pd.concat([ethereum_part, owner_isolate_part, resolved_isolate_part, ens_part], ignore_index=True)
final_graph_id_df = final_graph_id_df[['unique_id', 'graph_id', 'platform', 'identity', 'updated_nanosecond']]
final_graph_id_df = final_graph_id_df.drop_duplicates(subset=['unique_id'], keep='last')
final_graph_id_df.to_csv(allocation_path, index=False, quoting=csv.QUOTE_ALL)
Expand Down Expand Up @@ -469,6 +597,8 @@ def save_graph_id(self, dump_batch_size=10000):
for row in csv_reader:
cnt += 1
batch.append(row)
if len(row[0]) > 1249:
continue
if len(batch) >= dump_batch_size:
# bulk insert
batch_times += 1
Expand Down

0 comments on commit 90e03f6

Please sign in to comment.