From a3d4fcabf32a75ea296acf71e8738ef3d7389c6e Mon Sep 17 00:00:00 2001 From: ZhongFuze Date: Fri, 8 Nov 2024 00:21:14 +0800 Subject: [PATCH] [#] `solana`, `sns.id` with cache --- src/resolver/ethereum.py | 10 +- src/resolver/fetch.py | 17 +- src/resolver/sns.py | 334 +++++++++++++++++++++++++++--- src/resolver/solana.py | 375 +++++++++++++++++++++++++++++++++- src/scalar/identity_record.py | 2 +- 5 files changed, 693 insertions(+), 45 deletions(-) diff --git a/src/resolver/ethereum.py b/src/resolver/ethereum.py index f5ca35f..5a27801 100644 --- a/src/resolver/ethereum.py +++ b/src/resolver/ethereum.py @@ -4,7 +4,7 @@ Author: Zella Zhong Date: 2024-10-06 18:41:34 LastEditors: Zella Zhong -LastEditTime: 2024-10-30 15:10:26 +LastEditTime: 2024-11-07 23:24:00 FilePath: /data_service/src/resolver/ethereum.py Description: ''' @@ -654,17 +654,21 @@ async def query_and_update_missing_query_ids(query_ids): return identity_records def filter_ethereum_query_ids(identities): + cnt = 0 final_query_ids = set() for identity in identities: + cnt += 1 is_evm = is_ethereum_address(identity) if is_evm: final_query_ids.add(f"{Platform.ethereum.value},{identity}") + if cnt > QUERY_MAX_LIMIT: + break return list(final_query_ids) async def query_ethereum_profile_by_ids_cache(info, identities, require_cache=False): - if len(identities) > QUERY_MAX_LIMIT: - return ExceedRangeInput(QUERY_MAX_LIMIT) + # if len(identities) > QUERY_MAX_LIMIT: + # return ExceedRangeInput(QUERY_MAX_LIMIT) filter_query_ids = filter_ethereum_query_ids(identities) if len(filter_query_ids) == 0: diff --git a/src/resolver/fetch.py b/src/resolver/fetch.py index 7337996..63f6a38 100644 --- a/src/resolver/fetch.py +++ b/src/resolver/fetch.py @@ -4,7 +4,7 @@ Author: Zella Zhong Date: 2024-10-06 19:05:41 LastEditors: Zella Zhong -LastEditTime: 2024-11-07 21:05:51 +LastEditTime: 2024-11-08 00:15:19 FilePath: /data_service/src/resolver/fetch.py Description: ''' @@ -51,6 +51,7 @@ from resolver.unstoppabledomains import query_unstoppabledomains_profile_by_ids_cache from resolver.basenames import query_basenames_profile_by_ids_cache from resolver.sns import query_sns_profile_by_ids_cache +from resolver.solana import query_solana_profile_by_ids_cache async def single_fetch(info, platform, identity): identity_records = [] @@ -68,10 +69,9 @@ async def single_fetch(info, platform, identity): # return await query_profile_by_single_lens_handle(info, identity) identity_records = await query_lens_profile_by_ids_cache(info, [identity], require_cache=True) elif platform == Platform.solana: - record = await query_profile_by_single_solana(info, identity) - identity_records.append(record) + identity_records = await query_solana_profile_by_ids_cache(info, [identity], require_cache=True) elif platform == Platform.sns: - identity_records = await query_sns_profile_by_ids_cache(info, [identity], require_cache=False) + identity_records = await query_sns_profile_by_ids_cache(info, [identity], require_cache=True) elif platform == Platform.clusters: record = await query_profile_by_single_clusters(info, identity) identity_records.append(record) @@ -255,24 +255,21 @@ async def batch_fetch_all(info, vertices_map): return PlatformNotSupport(platform) if platform_enum == Platform.ethereum: - # tasks.append(query_profile_by_addresses(info, identities)) tasks.append(query_ethereum_profile_by_ids_cache(info, identities, require_cache=True)) elif platform_enum == Platform.ens: - # tasks.append(query_profile_by_ensnames(info, identities)) tasks.append(query_ensname_profile_by_ids_cache(info, identities, require_cache=True)) elif platform_enum == Platform.farcaster: - # tasks.append(query_profile_by_fnames(info, identities)) tasks.append(query_farcaster_profile_by_ids_cache(info, identities, require_cache=True)) elif platform_enum == Platform.lens: - # tasks.append(query_profile_by_lens_handle(info, identities)) tasks.append(query_lens_profile_by_ids_cache(info, identities, require_cache=True)) elif platform_enum == Platform.clusters: tasks.append(query_profile_by_batch_clusters(info, identities)) elif platform_enum == Platform.basenames: - # tasks.append(query_profile_by_basenames(info, identities)) tasks.append(query_basenames_profile_by_ids_cache(info, identities, require_cache=True)) + elif platform_enum == Platform.sns: + tasks.append(query_sns_profile_by_ids_cache(info, identities, require_cache=True)) elif platform_enum == Platform.solana: - tasks.append(query_profile_by_solana_addresses(info, identities)) + tasks.append(query_solana_profile_by_ids_cache(info, identities, require_cache=True)) elif platform_enum == Platform.unstoppabledomains: tasks.append(query_unstoppabledomains_profile_by_ids_cache(info, identities, require_cache=True)) elif platform_enum == Platform.bitcoin: diff --git a/src/resolver/sns.py b/src/resolver/sns.py index 71c455e..b4e16d0 100644 --- a/src/resolver/sns.py +++ b/src/resolver/sns.py @@ -4,7 +4,7 @@ Author: Zella Zhong Date: 2024-11-07 19:55:46 LastEditors: Zella Zhong -LastEditTime: 2024-11-07 21:06:17 +LastEditTime: 2024-11-07 23:13:07 FilePath: /data_service/src/resolver/sns.py Description: ''' @@ -45,6 +45,273 @@ QUERY_MAX_LIMIT = 200 + +def convert_cache_to_identity_record(cache_value): + try: + if not cache_value: + return None + primary_id = cache_value.get('id', None) + if primary_id is None: + return None + + # Convert resolved_address list of dictionaries to list of Address instances + resolved_address = [Address(**address) for address in cache_value.get("resolved_address", [])] + + # Convert owner_address list of dictionaries to list of Address instances + owner_address = [Address(**address) for address in cache_value.get("owner_address", [])] + + platform_str = cache_value.get("platform", None) + platform = Platform(platform_str) if platform_str else None + + network_str = cache_value.get("network", None) + network = Network(network_str) if network_str else None + + # Convert profile dictionary to Profile instance + profile_data = cache_value.get("profile", None) + profile = None + if profile_data: + addresses = [Address(**addr) for addr in profile_data.get("addresses", [])] + profile_platform_str = profile_data.get("platform", None) + profile_platform = Platform(profile_platform_str) if profile_platform_str else None + + profile_network_str = profile_data.get("network", None) + profile_network = Network(profile_network_str) if profile_network_str else None + social_dict = profile_data.get("social", None) + social = None + if social_dict is not None: + social_updated_at_str = social_dict.get("updated_at", None) + social_updated_at = None + if social_updated_at_str is not None: + social_updated_at = datetime.strptime(social_updated_at_str, "%Y-%m-%d %H:%M:%S") if social_updated_at_str else None + social = SocialProfile( + uid=social_dict.get("uid", None), + following=social_dict.get("following", 0), + follower=social_dict.get("follower", 0), + updated_at=social_updated_at, + ) + profile = Profile( + uid=profile_data.get("uid"), + identity=profile_data.get("identity"), + platform=profile_platform, + network=profile_network, + address=profile_data.get("address"), + display_name=profile_data.get("display_name"), + avatar=profile_data.get("avatar"), + description=profile_data.get("description"), + contenthash=profile_data.get("contenthash"), + texts=profile_data.get("texts", {}), + addresses=addresses, + social=social, + ) + + expired_at_str = cache_value.get("expired_at") + updated_at_str = cache_value.get("updated_at") + + expired_at = datetime.strptime(expired_at_str, "%Y-%m-%d %H:%M:%S") if expired_at_str else None + updated_at = datetime.strptime(updated_at_str, "%Y-%m-%d %H:%M:%S") if updated_at_str else None + + # Return the IdentityRecord instance + return IdentityRecord( + id=cache_value.get("id"), + aliases=cache_value.get("aliases"), + identity=cache_value.get("identity"), + platform=platform, + network=network, + primary_name=cache_value.get("primary_name"), + is_primary=cache_value.get("is_primary"), + resolved_address=resolved_address, + owner_address=owner_address, + expired_at=expired_at, + updated_at=updated_at, + profile=profile, + ) + + except Exception as ex: + logging.exception(ex) + return None + +async def batch_get_sns_profile_from_cache(query_ids, expire_window): + ''' + description: + return { + cache_identity_records: List[IdentityRecordSimplified], + require_update_ids: List[str], # which exist in cache but expired (return old data first to speed up response) + missing_query_ids: List[str], # which not exist in cache, must query_from_db + } + ''' + try: + cache_identity_records = [] + require_update_ids = [] + missing_query_ids = [] + redis_client = await RedisClient.get_instance() + + aliases_keys = [] + for query_id in query_ids: + aliases_keys.append(f"aliases:{query_id}") + aliases_keys = list(set(aliases_keys)) + + aliases_values = await redis_client.mget(*aliases_keys) + aliases_cache_item = dict(zip(aliases_keys, aliases_values)) + + profile_map_aliases_key = {} + for alias_cache_key_bytes, profile_cache_key_bytes in aliases_cache_item.items(): + alias_cache_key = alias_cache_key_bytes.decode("utf-8") if isinstance(alias_cache_key_bytes, bytes) else alias_cache_key_bytes + profile_cache_key = profile_cache_key_bytes.decode("utf-8") if profile_cache_key_bytes is not None else None + + if profile_cache_key is None: + missing_query_ids.append(alias_cache_key.removeprefix("aliases:")) + else: + if profile_cache_key not in profile_map_aliases_key: + profile_map_aliases_key[profile_cache_key] = [] + profile_map_aliases_key[profile_cache_key].append(alias_cache_key.removeprefix("aliases:")) + + batch_profile_cache_keys = list(profile_map_aliases_key.keys()) + if batch_profile_cache_keys: + profile_json_values = await redis_client.mget(*batch_profile_cache_keys) + profile_cache_json_values = dict(zip(batch_profile_cache_keys, profile_json_values)) + for profile_cache_key_bytes, profile_json_value_bytes in profile_cache_json_values.items(): + profile_cache_key = profile_cache_key_bytes.decode("utf-8") if isinstance(profile_cache_key_bytes, bytes) else profile_cache_key_bytes + profile_json_value = profile_json_value_bytes.decode("utf-8") if profile_json_value_bytes is not None else None + + if profile_json_value is None: + # add aliases:platform,alias_value to missing_query_ids + missing_query_ids.append(profile_cache_key.removeprefix("profile:")) + missing_aliases_ids = profile_map_aliases_key.get(profile_cache_key, []) + missing_query_ids.extend(missing_aliases_ids) + else: + profile_value_dict = json.loads(profile_json_value) + updated_at = profile_value_dict.get("updated_at", None) + if not updated_at: + logging.warning(f"Cache key {profile_cache_key} is missing 'updated_at'. Marking for update.") + missing_query_ids.append(profile_cache_key.removeprefix("profile:")) + else: + updated_at_datetime = parse_time_string(updated_at) + now = datetime.now() + # Compare now and updated_at, if value is expired in window + if now - updated_at_datetime > timedelta(seconds=expire_window): + if len(profile_value_dict) == 1: + # only have one field(updated_at) is also not exist + # logging.debug(f"Cache key {profile_cache_key} is empty. Returning old data, but marking for update.") + require_update_ids.append(profile_cache_key.removeprefix("profile:")) + else: + # Old data is returned, but it needs to be updated + # logging.debug(f"Cache key {profile_cache_key} is expired. Returning old data, but marking for update.") + require_update_ids.append(profile_cache_key.removeprefix("profile:")) + identity_record = convert_cache_to_identity_record(profile_value_dict) + if identity_record: + cache_identity_records.append(identity_record) + else: + if len(profile_value_dict) == 1: + # only have one field(updated_at) is also not exist + # logging.debug(f"Cache key {profile_cache_key} is empty but has been caching.") + continue + else: + # logging.debug(f"Cache key {profile_cache_key} has been caching.") + identity_record = convert_cache_to_identity_record(profile_value_dict) + if identity_record: + cache_identity_records.append(identity_record) + + return cache_identity_records, require_update_ids, missing_query_ids + except Exception as ex: + logging.exception(ex) + # if cache logic is failed, just return query_from_db immediately + return [], [], query_ids + +async def batch_set_sns_profile_to_cache( + cache_identity_records: typing.List[IdentityRecordSimplified], expire_window: int +): + random_offset = random.randint(0, 30 * 60) # Adding up to 30 minutes of randomness + # random_offset = 0 + final_expire_window = expire_window + random_offset + + profile_data = {} + aliases_data = {} + keys_with_expiration = [] + + for record in cache_identity_records: + primary_id = record.id + profile_cache_key = f"profile:{primary_id}" + # Set the current time as 'updated_at' + # in "yyyy-MM-dd HH:MM:SS" format later in strawberry_type_to_jsonstr + record.updated_at = datetime.now() + profile_value_json = strawberry_type_to_jsonstr(record) + profile_data[profile_cache_key] = profile_value_json + keys_with_expiration.append(profile_cache_key) + + if record.aliases: + for alias in record.aliases: + alias_cache_key = f"aliases:{alias}" + aliases_data[alias_cache_key] = profile_cache_key + keys_with_expiration.append(alias_cache_key) + + redis_client = await RedisClient.get_instance() + + # Use MSET to set all profile data + if profile_data: + await redis_client.mset(profile_data) + if aliases_data: + await redis_client.mset(aliases_data) + + # Use a Lua script to set expiration for all keys at once + if keys_with_expiration: + # Lua script to set expiration for multiple keys + lua_script = """ + for i, key in ipairs(KEYS) do + redis.call("EXPIRE", key, tonumber(ARGV[1])) + end + """ + await redis_client.eval(lua_script, len(keys_with_expiration), *keys_with_expiration, final_expire_window) + + logging.info("sns_profile batch set profiles and aliases successfully, with expirations [%s]", list(profile_data.keys())) + +async def set_sns_empty_profile_to_cache(query_id, empty_record, expire_window): + random_offset = random.randint(0, 30 * 60) # Adding up to 30 minutes of randomness + # random_offset = 0 + final_expire_window = expire_window + random_offset + + profile_cache_key = f"profile:{query_id}" # e.g. profile:ud,#notexist_profile_id which is not exist + profile_lock_key = f"{query_id}.lock" + + profile_unique_value = "{}:{}".format(profile_lock_key, get_unix_microseconds()) + try: + # Try acquiring the lock (with a timeout of 30 seconds) + if await RedisClient.acquire_lock(profile_lock_key, profile_unique_value, lock_timeout=30): + # logging.debug(f"Lock acquired for key: {profile_lock_key}") + # Set the current time as 'updated_at' in "yyyy-mm-dd HH:MM:SS" format + empty_record["updated_at"] = get_current_time_string() + profile_value_json = json.dumps(empty_record) + + # Set the cache in Redis with the specified expiration time (in seconds) + redis_client = await RedisClient.get_instance() + await redis_client.set(profile_cache_key, profile_value_json, ex=final_expire_window) + # logging.debug(f"Cache updated for key: {profile_cache_key}") + else: + logging.warning(f"Could not acquire lock for key: {profile_lock_key}") + + finally: + # Always release the lock after the critical section is done + await RedisClient.release_lock(profile_lock_key, profile_unique_value) + # logging.debug(f"Lock released for key: {profile_lock_key}") + + aliases_lock_key = f"aliases:{query_id}.lock" + aliases_unique_value = "{}:{}".format(aliases_lock_key, get_unix_microseconds()) + try: + # Try acquiring the lock (with a timeout of 30 seconds) + if await RedisClient.acquire_lock(aliases_lock_key, aliases_unique_value, lock_timeout=30): + # logging.debug(f"Lock acquired for key: {aliases_lock_key}") + redis_client = await RedisClient.get_instance() + # Save the empty query_id to [profile_key], and profile_key only have updated_at + alias_cache_key = f"aliases:{query_id}" + await redis_client.set(alias_cache_key, profile_cache_key, ex=final_expire_window) + # logging.debug(f"Cache updated aliases[{aliases_lock_key}] map to key[{profile_cache_key}]") + else: + logging.warning(f"Could not acquire lock for key: {aliases_lock_key}") + + finally: + # Always release the lock after the critical section is done + await RedisClient.release_lock(aliases_lock_key, aliases_unique_value) + # logging.debug(f"Lock released for key: {aliases_lock_key}") + def get_sns_profile_fields(): ''' description: retrieve all fields @@ -64,7 +331,7 @@ async def batch_query_sns_profile_by_id_from_db(query_ids) -> typing.List[Ident address_list.add(identity) else: name_list.add(identity) - + checked_addresses = list(address_list) checked_names = list(name_list) @@ -78,7 +345,7 @@ async def batch_query_sns_profile_by_id_from_db(query_ids) -> typing.List[Ident db_records = result.scalars().all() for row in db_records: profile_dict[row.namenode] = row - + if checked_addresses: sql = select(SnsModel).options(load_only(*profile_fields)) \ .filter(SnsModel.reverse_address.in_(checked_addresses)) @@ -121,12 +388,12 @@ async def batch_query_sns_profile_by_id_from_db(query_ids) -> typing.List[Ident reverse_address = profile_record.reverse_address if is_primary and reverse_address: aliases.append(f"{Platform.sns.value},{reverse_address}") - + display_name = name avatar = None description = None texts = profile_record.texts - + if texts: # Filter out empty strings and decode non-empty texts process_texts = {key: unquote(text, 'utf-8') for key, text in texts.items() if text != ""} @@ -137,7 +404,7 @@ async def batch_query_sns_profile_by_id_from_db(query_ids) -> typing.List[Ident if not texts: texts = None - + twitter_handle = profile_record.twitter_handle if twitter_handle: if texts is not None: @@ -175,6 +442,25 @@ async def batch_query_sns_profile_by_id_from_db(query_ids) -> typing.List[Ident )) return result +async def query_and_update_missing_query_ids(query_ids): + # logging.debug("query_and_update_missing_query_ids input %s", query_ids) + identity_records = await batch_query_sns_profile_by_id_from_db(query_ids) + # need cache where query_id is not in storage to avoid frequency access db + + exists_query_ids = [] + if identity_records: + asyncio.create_task(batch_set_sns_profile_to_cache(identity_records, expire_window=24*3600)) + + for record in identity_records: + exists_query_ids.extend(record.aliases) + # asyncio.create_task(set_sns_profile_to_cache(record, expire_window=24*3600)) + + empty_query_ids = list(set(query_ids) - set(exists_query_ids)) + for empty_query_id in empty_query_ids: + asyncio.create_task(set_sns_empty_profile_to_cache(empty_query_id, {}, expire_window=24*3600)) + + return identity_records + def filter_sns_query_ids(identities): final_query_ids = set() cnt = 0 @@ -210,25 +496,25 @@ async def query_sns_profile_by_ids_cache(info, identities, require_cache=False): identity_records = await batch_query_sns_profile_by_id_from_db(filter_query_ids) return identity_records - # # require_cache is True: - # cache_identity_records, \ - # require_update_ids, \ - # missing_query_ids = await batch_get_sns_profile_from_cache(filter_query_ids, expire_window=12*3600) + # require_cache is True: + cache_identity_records, \ + require_update_ids, \ + missing_query_ids = await batch_get_sns_profile_from_cache(filter_query_ids, expire_window=12*3600) - # logging.info("sns input filter_query_ids: {}".format(filter_query_ids)) - # # logging.debug("sns missing_query_ids: {}".format(missing_query_ids)) - # # logging.debug("sns require_update_ids: {}".format(require_update_ids)) - # # logging.debug("sns cache_identity_records: {}".format(len(cache_identity_records))) + logging.info("sns input filter_query_ids: {}".format(filter_query_ids)) + # logging.debug("sns missing_query_ids: {}".format(missing_query_ids)) + # logging.debug("sns require_update_ids: {}".format(require_update_ids)) + # logging.debug("sns cache_identity_records: {}".format(len(cache_identity_records))) - # final_identity_records = cache_identity_records.copy() - # if missing_query_ids: - # logging.info("sns missing data {}".format(missing_query_ids)) - # missing_identity_records = await query_and_update_missing_query_ids(missing_query_ids) - # final_identity_records.extend(missing_identity_records) + final_identity_records = cache_identity_records.copy() + if missing_query_ids: + logging.info("sns missing data {}".format(missing_query_ids)) + missing_identity_records = await query_and_update_missing_query_ids(missing_query_ids) + final_identity_records.extend(missing_identity_records) - # if require_update_ids: - # logging.info("sns has olddata and return immediately {}".format(require_update_ids)) - # # Update background - # asyncio.create_task(query_and_update_missing_query_ids(require_update_ids)) + if require_update_ids: + logging.info("sns has olddata and return immediately {}".format(require_update_ids)) + # Update background + asyncio.create_task(query_and_update_missing_query_ids(require_update_ids)) - # return final_identity_records \ No newline at end of file + return final_identity_records diff --git a/src/resolver/solana.py b/src/resolver/solana.py index 89196a1..8de4cad 100644 --- a/src/resolver/solana.py +++ b/src/resolver/solana.py @@ -4,28 +4,37 @@ Author: Zella Zhong Date: 2024-10-07 23:11:29 LastEditors: Zella Zhong -LastEditTime: 2024-10-16 16:30:20 +LastEditTime: 2024-11-08 00:04:04 FilePath: /data_service/src/resolver/solana.py Description: ''' +import asyncio +import copy +import json +import random import logging -from datetime import datetime +from datetime import datetime, timedelta from sqlalchemy.inspection import inspect from sqlalchemy import select, update, and_, or_ from sqlalchemy.orm import load_only from urllib.parse import unquote +from pydantic import typing from session import get_session -from model import EnsnameModel +from model.snsname import SnsModel +from cache.redis import RedisClient -from utils import check_evm_address, convert_camel_case +from utils import convert_camel_case +from utils.address import is_base58_solana_address +from utils.timeutils import get_unix_microseconds, parse_time_string, get_current_time_string from scalar.platform import Platform -from scalar.network import Network +from scalar.network import Network, Address, CoinTypeMap from scalar.identity_graph import IdentityRecordSimplified from scalar.identity_record import IdentityRecord -from scalar.profile import Profile -from scalar.error import EmptyInput, EvmAddressInvalid, ExceedRangeInput +from scalar.profile import Profile, SocialProfile +from scalar.error import EmptyInput, ExceedRangeInput +from scalar.type_convert import strawberry_type_to_jsonstr QUERY_MAX_LIMIT = 200 @@ -61,3 +70,355 @@ async def query_profile_by_solana_addresses(info, addresses): )) return result + +def convert_cache_to_identity_record(cache_value): + try: + if not cache_value: + return None + primary_id = cache_value.get('id', None) + if primary_id is None: + return None + + # Convert resolved_address list of dictionaries to list of Address instances + resolved_address = [Address(**address) for address in cache_value.get("resolved_address", [])] + + # Convert owner_address list of dictionaries to list of Address instances + owner_address = [Address(**address) for address in cache_value.get("owner_address", [])] + + platform_str = cache_value.get("platform", None) + platform = Platform(platform_str) if platform_str else None + + network_str = cache_value.get("network", None) + network = Network(network_str) if network_str else None + + # Convert profile dictionary to Profile instance + profile_data = cache_value.get("profile", None) + profile = None + if profile_data: + addresses = [Address(**addr) for addr in profile_data.get("addresses", [])] + profile_platform_str = profile_data.get("platform", None) + profile_platform = Platform(profile_platform_str) if profile_platform_str else None + + profile_network_str = profile_data.get("network", None) + profile_network = Network(profile_network_str) if profile_network_str else None + social_dict = profile_data.get("social", None) + social = None + if social_dict is not None: + social_updated_at_str = social_dict.get("updated_at", None) + social_updated_at = None + if social_updated_at_str is not None: + social_updated_at = datetime.strptime(social_updated_at_str, "%Y-%m-%d %H:%M:%S") if social_updated_at_str else None + social = SocialProfile( + uid=social_dict.get("uid", None), + following=social_dict.get("following", 0), + follower=social_dict.get("follower", 0), + updated_at=social_updated_at, + ) + profile = Profile( + uid=profile_data.get("uid"), + identity=profile_data.get("identity"), + platform=profile_platform, + network=profile_network, + address=profile_data.get("address"), + display_name=profile_data.get("display_name"), + avatar=profile_data.get("avatar"), + description=profile_data.get("description"), + contenthash=profile_data.get("contenthash"), + texts=profile_data.get("texts", {}), + addresses=addresses, + social=social, + ) + + expired_at_str = cache_value.get("expired_at") + updated_at_str = cache_value.get("updated_at") + + expired_at = datetime.strptime(expired_at_str, "%Y-%m-%d %H:%M:%S") if expired_at_str else None + updated_at = datetime.strptime(updated_at_str, "%Y-%m-%d %H:%M:%S") if updated_at_str else None + + # Return the IdentityRecord instance + return IdentityRecord( + id=cache_value.get("id"), + aliases=cache_value.get("aliases"), + identity=cache_value.get("identity"), + platform=platform, + network=network, + primary_name=cache_value.get("primary_name"), + is_primary=cache_value.get("is_primary"), + resolved_address=resolved_address, + owner_address=owner_address, + expired_at=expired_at, + updated_at=updated_at, + profile=profile, + ) + + except Exception as ex: + logging.exception(ex) + return None + +async def get_solana_profile_from_cache(query_ids, expire_window): + ''' + description: + return { + cache_identity_records: List[IdentityRecordSimplified], + require_update_ids: List[str], # which exist in cache but expired (return old data first to speed up response) + missing_query_ids: List[str], # which not exist in cache, must query_from_db + } + ''' + try: + require_update_ids = [] + missing_query_ids = [] + cache_identity_records = [] + redis_client = await RedisClient.get_instance() + + profile_cache_keys = [] + for query_id in query_ids: + profile_cache_keys.append(f"profile:{query_id}") + + profile_json_values = await redis_client.mget(*profile_cache_keys) + profile_cache_json_values = dict(zip(profile_cache_keys, profile_json_values)) + for profile_cache_key_bytes, profile_json_value_bytes in profile_cache_json_values.items(): + profile_cache_key = profile_cache_key_bytes.decode("utf-8") if isinstance(profile_cache_key_bytes, bytes) else profile_cache_key_bytes + profile_json_value = profile_json_value_bytes.decode("utf-8") if profile_json_value_bytes is not None else None + + if profile_json_value is None: + missing_query_ids.append(profile_cache_key.removeprefix("profile:")) + else: + profile_value_dict = json.loads(profile_json_value) + updated_at = profile_value_dict.get("updated_at", None) + if not updated_at: + logging.warning(f"Cache key {profile_cache_key} is missing 'updated_at'. Marking for update.") + missing_query_ids.append(profile_cache_key.removeprefix("profile:")) + else: + updated_at_datetime = parse_time_string(updated_at) + now = datetime.now() + # Compare now and updated_at, if value is expired in window + if now - updated_at_datetime > timedelta(seconds=expire_window): + if len(profile_value_dict) == 1: + # only have one field(updated_at) is also not exist + # logging.debug(f"Cache key {profile_cache_key} is empty. Returning old data, but marking for update.") + require_update_ids.append(profile_cache_key.removeprefix("profile:")) + else: + # Old data is returned, but it needs to be updated + # logging.debug(f"Cache key {profile_cache_key} is expired. Returning old data, but marking for update.") + require_update_ids.append(profile_cache_key.removeprefix("profile:")) + identity_record = convert_cache_to_identity_record(profile_value_dict) + if identity_record: + cache_identity_records.append(identity_record) + else: + if len(profile_value_dict) == 1: + # only have one field(updated_at) is also not exist + # logging.debug(f"Cache key {profile_cache_key} is empty but has been caching.") + continue + else: + # logging.debug(f"Cache key {profile_cache_key} has been caching.") + identity_record = convert_cache_to_identity_record(profile_value_dict) + if identity_record: + cache_identity_records.append(identity_record) + return cache_identity_records, require_update_ids, missing_query_ids + except Exception as ex: + logging.exception(ex) + # if cache logic is failed, just return query_from_db immediately + return [], [], query_ids + +async def set_sns_profile_to_cache(cache_identity_record: IdentityRecordSimplified, expire_window): + random_offset = random.randint(0, 30 * 60) # Adding up to 30 minutes of randomness + # random_offset = 0 + final_expire_window = expire_window + random_offset + + primary_id = cache_identity_record.id + profile_cache_key = f"profile:{primary_id}" # e.g. profile:lens,zella.lens + profile_lock_key = f"{primary_id}.lock" + + profile_unique_value = "{}:{}".format(profile_lock_key, get_unix_microseconds()) + try: + # Try acquiring the lock (with a timeout of 30 seconds) + if await RedisClient.acquire_lock(profile_lock_key, profile_unique_value, lock_timeout=30): + # logging.debug(f"Lock acquired for key: {profile_lock_key}") + # Set the current time as 'updated_at' in "yyyy-mm-dd HH:MM:SS" format + cache_identity_record.updated_at = datetime.now() + profile_value_json = strawberry_type_to_jsonstr(cache_identity_record) + + # Set the cache in Redis with the specified expiration time (in seconds) + redis_client = await RedisClient.get_instance() + await redis_client.set(profile_cache_key, profile_value_json, ex=final_expire_window) + # logging.debug(f"Cache updated for key: {profile_cache_key}") + else: + logging.warning(f"Could not acquire lock for key: {profile_lock_key}") + + finally: + # Always release the lock after the critical section is done + await RedisClient.release_lock(profile_lock_key, profile_unique_value) + # logging.debug(f"Lock released for key: {profile_lock_key}") + + +def get_sns_profile_fields(): + ''' + description: retrieve all fields + return {*} + ''' + # Get all fields for each model using reflection + profile_fields = [getattr(SnsModel, c.key) for c in inspect(SnsModel).mapper.column_attrs] + return profile_fields + +async def batch_query_profile_by_address_db(query_ids) -> typing.List[IdentityRecordSimplified]: + checked_addresses = [] + for _id in query_ids: + identity = _id.split(",")[1] + is_solana = is_base58_solana_address(identity) + if is_solana: + checked_addresses.append(identity) + + # logging.debug("checked_addresses = %s", checked_addresses) + + # No need to select fields anymore, just query all fields + profile_fields = get_sns_profile_fields() + profile_dict = {} + async with get_session() as s: + sql = select(SnsModel).options(load_only(*profile_fields)) \ + .filter(SnsModel.reverse_address.in_(checked_addresses)) + result = await s.execute(sql) + db_records = result.scalars().all() + for row in db_records: + profile_dict[row.reverse_address] = row + + result = [] + for addr in checked_addresses: + profile_record: SnsModel = profile_dict.get(addr, None) + if profile_record is None: + address_primary_id = f"{Platform.solana.value},{addr}" + aliases = [address_primary_id] + result.append(IdentityRecordSimplified( + id=address_primary_id, + aliases=aliases, + identity=addr, + platform=Platform.solana, + network=Network.solana, + primary_name=None, + is_primary=False, + profile=None + )) + else: + # Ensure 'texts' exists dynamically using hasattr() + name = profile_record.name + if not name: + continue + + display_name = name + avatar = None + description = None + texts = profile_record.texts + if texts: + # Filter out empty strings and decode non-empty texts + process_texts = {key: unquote(text, 'utf-8') for key, text in texts.items() if text != ""} + avatar = process_texts.get("avatar", None) + description = process_texts.get("description", None) + display_name = process_texts.get("name", name) + texts = process_texts + + if not texts: + texts = None + + twitter_handle = profile_record.twitter_handle + if twitter_handle: + if texts is not None: + texts["twitter"] = twitter_handle + else: + texts = {"twitter": twitter_handle} + + network = None + address = profile_record.reverse_address + if address is not None: + network = Network.solana + + profile = Profile( + uid=None, + identity=name, + platform=Platform.sns, + network=network, + address=address, + display_name=display_name, + avatar=avatar, + description=description, + contenthash=profile_record.contenthash, + texts=texts, + addresses=[], + social=None + ) + + address_primary_id = f"{Platform.solana.value},{addr}" + aliases = [address_primary_id] + result.append(IdentityRecordSimplified( + id=address_primary_id, + aliases=aliases, + identity=addr, + platform=Platform.solana, + network=Network.solana, + primary_name=name, + is_primary=profile_record.is_primary, + expired_at=None, + profile=profile + )) + return result + +async def query_and_update_missing_query_ids(query_ids): + # logging.debug("query_and_update_missing_query_ids input %s", query_ids) + identity_records = await batch_query_profile_by_address_db(query_ids) + # need cache where query_id is not in storage to avoid frequency access db + + # exists_query_ids = [] + for record in identity_records: + # exists_query_ids.extend(record.aliases) + asyncio.create_task(set_sns_profile_to_cache(record, expire_window=24*3600)) + + return identity_records + +def filter_solana_query_ids(identities): + cnt = 0 + final_query_ids = set() + for identity in identities: + cnt += 1 + is_solana = is_base58_solana_address(identity) + if is_solana: + final_query_ids.add(f"{Platform.solana.value},{identity}") + if cnt > QUERY_MAX_LIMIT: + break + + return list(final_query_ids) + +async def query_solana_profile_by_ids_cache(info, identities, require_cache=False): + if len(identities) > QUERY_MAX_LIMIT: + return ExceedRangeInput(QUERY_MAX_LIMIT) + + filter_query_ids = filter_solana_query_ids(identities) + if len(filter_query_ids) == 0: + return [] + + identity_records = [] + if require_cache is False: + # query data from db and return immediately + logging.info("solana input %s", filter_query_ids) + identity_records = await batch_query_profile_by_address_db(filter_query_ids) + return identity_records + + # require_cache is True: + cache_identity_records, \ + require_update_ids, \ + missing_query_ids = await get_solana_profile_from_cache(filter_query_ids, expire_window=12*3600) + + logging.info("solana input filter_query_ids: {}".format(filter_query_ids)) + # logging.debug("solana missing_query_ids: {}".format(missing_query_ids)) + # logging.debug("solana require_update_ids: {}".format(require_update_ids)) + # logging.debug("solana cache_identity_records: {}".format(len(cache_identity_records))) + + final_identity_records = cache_identity_records.copy() + if missing_query_ids: + logging.info("solana missing data {}".format(missing_query_ids)) + missing_identity_records = await query_and_update_missing_query_ids(missing_query_ids) + final_identity_records.extend(missing_identity_records) + + if require_update_ids: + logging.info("solana has olddata and return immediately {}".format(require_update_ids)) + # Update background + asyncio.create_task(query_and_update_missing_query_ids(require_update_ids)) + + return final_identity_records \ No newline at end of file diff --git a/src/scalar/identity_record.py b/src/scalar/identity_record.py index 833ab74..0bbbf78 100644 --- a/src/scalar/identity_record.py +++ b/src/scalar/identity_record.py @@ -4,7 +4,7 @@ Author: Zella Zhong Date: 2024-10-06 17:59:57 LastEditors: Zella Zhong -LastEditTime: 2024-11-01 10:09:53 +LastEditTime: 2024-11-08 00:15:59 FilePath: /data_service/src/scalar/identity_record.py Description: '''