Skip to content

Commit

Permalink
[#] Primary Lens (Default Lens)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhongFuze committed Oct 29, 2024
1 parent 37183c7 commit 3e37460
Showing 1 changed file with 97 additions and 3 deletions.
100 changes: 97 additions & 3 deletions src/jobs/lens_process_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
Author: Zella Zhong
Date: 2024-09-13 17:53:04
LastEditors: Zella Zhong
LastEditTime: 2024-09-29 19:08:03
LastEditTime: 2024-10-29 23:10:58
FilePath: /data_process/src/jobs/lens_process_job.py
Description:
'''
Expand Down Expand Up @@ -45,6 +45,14 @@
from utils.timeutils import iso8601_string_to_datetime


QUERY_DEFAULT_PROFILE = """
query ProfileQueryDefault {
defaultProfile(request: {for: %s}) {
id
}
}
"""

QUERY_PROFILES_BY_IDS = """
query ProfileQuerrry {
profiles(request: {where: { profileIds: [%s]}}) {
Expand Down Expand Up @@ -367,6 +375,31 @@ def save_lens_social(self, rows):
cursor.close()
write_conn.close()

def update_lens_default_profile(self, update_data):
update_sql = """
UPDATE lensv2_profile
SET
is_primary = %(is_primary)s,
update_time = %(update_time)s
WHERE
profile_id = %(profile_id)s;
"""
if update_data:
write_conn = psycopg2.connect(setting.PG_DSN["write"])
write_conn.autocommit = True
cursor = write_conn.cursor()
try:
execute_batch(cursor, update_sql, update_data)
logging.info("Batch update completed for {} records.".format(len(update_data)))
except Exception as ex:
logging.exception(ex)
raise ex
finally:
cursor.close()
write_conn.close()
else:
logging.debug("No valid update_data to process.")

def update_lens_metadata(self, update_data):
update_sql = """
UPDATE lensv2_profile
Expand Down Expand Up @@ -398,6 +431,65 @@ def update_lens_metadata(self, update_data):
else:
logging.debug("No valid update_data to process.")

def process_lens_default_profile(self):
read_conn = psycopg2.connect(setting.PG_DSN["read"])
cursor = read_conn.cursor()
address_list = []
try:
select_sql = "select address from lensv2_profile where address is not null and is_primary=true group by address having count(*) > 1"
cursor.execute(select_sql)
rows = cursor.fetchall()
for row in rows:
address_list.append(row[0])
except Exception as ex:
logging.exception(ex)
raise ex
finally:
cursor.close()
read_conn.close()

session = Session()
adapter = LimiterAdapter(per_minute=60, per_second=1)
session.mount(setting.LENS["api"], adapter)
headers = {
"accept": "application/json",
"user-agent": "spectaql", # Add user agent header to avoid firewall
}
if len(address_list) == 0:
logging.info("No lens address need to fetch DefaultProfile")
return

logging.info("%d address need to fetch DefaultProfile", len(address_list))
update_profile = []
for addr in address_list:
try:
query_vars = "\"" + addr + "\""
payload = QUERY_DEFAULT_PROFILE % query_vars
response = session.post(url=setting.LENS["api"], json={"query": payload}, headers=headers, timeout=30)
if response.status_code != 200:
logging.warn("lens api response failed, defaultProfile(for: {}) {} {}".format(addr, response.status_code, response.reason))
continue
data = json.loads(response.text)
if "data" in data:
if "defaultProfile" in data["data"]:
default_profile_id_hex = data["data"]["defaultProfile"].get("id", "0x0")
default_profile_id = int(default_profile_id_hex, 16)
logging.info("lens address %s defaultProfile[%s]", addr, default_profile_id_hex)
update_profile.append({
"profile_id": default_profile_id,
"is_primary": True,
"update_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
})
except psycopg2.DatabaseError as db_err:
logging.exception(db_err)
raise db_err
except Exception as ex:
logging.exception(ex)
logging.error("lens address=%s fetch defaultProfile failed.", addr)
continue

self.update_lens_default_profile(update_profile)

def process_lens_extras(self):
read_conn = psycopg2.connect(setting.PG_DSN["read"])
cursor = read_conn.cursor()
Expand Down Expand Up @@ -502,6 +594,7 @@ def process_lens_extras(self):
def process_extras(self):
try:
self.update_extras_job_status("start")
self.process_lens_default_profile()
self.process_lens_extras()
self.update_extras_job_status("end")
except Exception as ex:
Expand Down Expand Up @@ -530,5 +623,6 @@ def process_pipeline(self):
logger.InitLogger(config)

LensProcess().process_lens_profile()
LensProcess().save_lens_profile()
# LensProcess().process_lens_extras()
# LensProcess().save_lens_profile()
# LensProcess().process_lens_extras()
# LensProcess().process_lens_default_profile()

0 comments on commit 3e37460

Please sign in to comment.