Skip to content

Commit

Permalink
update: diskcache
Browse files Browse the repository at this point in the history
  • Loading branch information
zprobot committed Dec 16, 2024
1 parent ae65a4d commit 83c7ed8
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 31 deletions.
67 changes: 67 additions & 0 deletions quantmsio/core/diskcache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import shutil
import uuid

import diskcache as diskcache


class DiskCache:
"""
Disk cache class for quantms.io. This class is used to store dictionaries in disks when the size of the dictionary
is too large to be stored in memory.
"""

def __init__(self, name_prefix: str):
# Create a cache name using a hash and uuid
if name_prefix is None:
name_prefix = "generic"
self._cache_name = str("cache_name_{}_{}".format(name_prefix, uuid.uuid4().hex))
self.cache = diskcache.Cache(self._cache_name, statistics=True)
self.cache.create_tag_index()

def get_item(self, key):
"""
Get an item from the cache
"""
return self.cache[key]

def get_first_subkey(self, subkey):
"""
Get an item from the cache
"""
for key in self.cache.iterkeys():
if subkey in key:
return self.cache[key]
return None

def add_item(self, key, value):
self.cache[key] = value

def delete_itm(self, key):
del self.cache[key]

def length(self):
return len(self.cache)

def get_all_keys(self):
return self.cache.iterkeys()

def contains(self, key):
return self.cache.__contains__(key)

def close(self):
"""
Close the cache and delete the cache files. This method should be called when the cache is not needed anymore.
"""
self.cache.clear()
self.cache.close()
shutil.rmtree(self._cache_name) # remove the disk cache files

def get_stats(self):
hits, misses = self.cache.stats()
count = self.cache._sql("select count(*) from Cache").fetchone()
return {
"hits": hits,
"misses": misses,
"count": count[0],
"size": self.cache.volume(),
}
53 changes: 28 additions & 25 deletions quantmsio/core/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from quantmsio.core.sdrf import SDRFHandler
from quantmsio.core.msstats_in import MsstatsIN
from quantmsio.core.common import FEATURE_SCHEMA

from quantmsio.core.diskcache import DiskCache

class Feature(MzTab):
def __init__(self, mzTab_path, sdrf_path, msstats_in_path):
Expand All @@ -23,33 +23,35 @@ def __init__(self, mzTab_path, sdrf_path, msstats_in_path):
self._automaton = get_ahocorasick(self._mods_map)

def extract_psm_msg(self, chunksize=2000000, protein_str=None):
self._map_dict = DiskCache("psm_map")
P = Psm(self.mztab_path)
pep_dict = P.extract_from_pep(chunksize=chunksize)
map_dict = {}
P.extract_from_pep(chunksize=chunksize)
self._pep_dict = P._pep_dict
for psm in P.iter_psm_table(chunksize, protein_str):
for key, df in psm.groupby(["reference_file_name", "peptidoform", "precursor_charge"]):
df.reset_index(drop=True, inplace=True)
temp_df = df.iloc[df["posterior_error_probability"].idxmin()]
if key not in map_dict:
map_dict[key] = [None for _ in range(7)]
if not self._map_dict.contains(key):
self._map_dict.add_item(key, [None for _ in range(7)])
pep_value = temp_df["posterior_error_probability"]
if map_dict[key][0] is None or float(map_dict[key][0]) > float(pep_value):
map_dict[key][0] = pep_value
map_dict[key][1] = temp_df["calculated_mz"]
map_dict[key][2] = temp_df["observed_mz"]
map_dict[key][3] = temp_df["mp_accessions"]
map_dict[key][4] = temp_df["is_decoy"]
map_dict[key][5] = temp_df["additional_scores"]
map_dict[key][6] = temp_df["cv_params"]
return map_dict, pep_dict
psm_msg = self._map_dict.get_item(key)
if psm_msg[0] is None or float(psm_msg[0]) > float(pep_value):
psm_msg[0] = pep_value
psm_msg[1] = temp_df["calculated_mz"]
psm_msg[2] = temp_df["observed_mz"]
psm_msg[3] = temp_df["mp_accessions"]
psm_msg[4] = temp_df["is_decoy"]
psm_msg[5] = temp_df["additional_scores"]
psm_msg[6] = temp_df["cv_params"]
self._map_dict.add_item(key, psm_msg)

def transform_msstats_in(self, file_num=10, protein_str=None, duckdb_max_memory="16GB", duckdb_threads=4):
Msstats = MsstatsIN(self._msstats_in, self._sdrf_path, duckdb_max_memory, duckdb_threads)
for msstats in Msstats.generate_msstats_in(file_num, protein_str):
yield msstats
Msstats.destroy_duckdb_database()

def merge_msstats_and_psm(self, msstats, map_dict):
def merge_msstats_and_psm(self, msstats):
map_features = [
"posterior_error_probability",
"calculated_mz",
Expand All @@ -62,8 +64,8 @@ def merge_msstats_and_psm(self, msstats, map_dict):

def merge_psm(rows, index):
key = (rows["reference_file_name"], rows["peptidoform"], rows["precursor_charge"])
if key in map_dict:
return map_dict[key][index]
if self._map_dict.contains(key):
return self._map_dict.get_item(key)[index]
else:
return None

Expand All @@ -79,10 +81,10 @@ def generate_feature(self, file_num=10, protein_str=None, duckdb_max_memory="16G
yield feature

def generate_feature_report(self, file_num=10, protein_str=None, duckdb_max_memory="16GB", duckdb_threads=4):
map_dict, pep_dict = self.extract_psm_msg(2000000, protein_str)
self.extract_psm_msg(2000000, protein_str)
for msstats in self.transform_msstats_in(file_num, protein_str, duckdb_max_memory, duckdb_threads):
self.merge_msstats_and_psm(msstats, map_dict)
self.add_additional_msg(msstats, pep_dict)
self.merge_msstats_and_psm(msstats)
self.add_additional_msg(msstats)
self.convert_to_parquet_format(msstats)
yield msstats

Expand Down Expand Up @@ -142,18 +144,19 @@ def write_features_to_file(
pqwriters = save_slice_file(feature, pqwriters, output_folder, key, filename)
close_file(pqwriters)

def generate_best_scan(self, rows, pep_dict):
def generate_best_scan(self, rows):
key = (rows["peptidoform"], rows["precursor_charge"])
if key in pep_dict:
return [pep_dict[key][1], pep_dict[key][2]]
if self._pep_dict.contains(key):
pep = self._pep_dict.get_item(key)
return [pep[1], pep[2]]
else:
return [None, None]

def add_additional_msg(self, msstats, pep_dict):
def add_additional_msg(self, msstats):
select_mods = list(self._mods_map.keys())
msstats.loc[:, "pg_global_qvalue"] = msstats["mp_accessions"].map(self._protein_global_qvalue_map)
msstats[["scan_reference_file_name", "scan"]] = msstats[["peptidoform", "precursor_charge"]].apply(
lambda rows: self.generate_best_scan(rows, pep_dict), axis=1, result_type="expand"
lambda rows: self.generate_best_scan(rows), axis=1, result_type="expand"
)
msstats[["peptidoform", "modifications"]] = msstats[["peptidoform"]].apply(
lambda row: self.generate_modifications_details(
Expand Down
14 changes: 8 additions & 6 deletions quantmsio/core/psm.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from quantmsio.operate.tools import get_ahocorasick, get_protein_accession
from quantmsio.core.common import PSM_USECOLS, PSM_MAP, PSM_SCHEMA, PEP
from quantmsio.core.mztab import MzTab
from quantmsio.core.diskcache import DiskCache
import pandas as pd


Expand Down Expand Up @@ -55,6 +56,7 @@ def _extract_pep_columns(self):
self._pep_columns = line.split("\n")[0].split("\t")

def extract_from_pep(self, chunksize=2000000):
self._pep_dict = DiskCache("pep_map")
self._extract_pep_columns()
pep_usecols = [
"opt_global_cv_MS:1000889_peptidoform_sequence",
Expand All @@ -72,7 +74,6 @@ def extract_from_pep(self, chunksize=2000000):
raise Exception("The peptide table don't have opt_global_cv_MS:1000889_peptidoform_sequence columns")
if "charge" in not_cols or "best_search_engine_score[1]" in not_cols:
raise Exception("The peptide table don't have best_search_engine_score[1] or charge columns")
pep_map = {}
indexs = [self._pep_columns.index(col) for col in live_cols]
for pep in self.skip_and_load_csv("PEH", usecols=indexs, chunksize=chunksize):
pep.reset_index(drop=True, inplace=True)
Expand Down Expand Up @@ -107,11 +108,12 @@ def extract_from_pep(self, chunksize=2000000):
)
map_dict = pep_msg.to_dict()["pep_msg"]
for key, value in map_dict.items():
if key not in pep_map:
pep_map[key] = value
elif value[0] < pep_map[key][0]:
pep_map[key] = value
return pep_map
if self._pep_dict.contains(key):
pep = self._pep_dict.get_item(key)
if value[0] < pep[0]:
self._pep_dict.add_item(key, value)
else:
self._pep_dict.add_item(key, value)

@staticmethod
def slice(df, partitions):
Expand Down

0 comments on commit 83c7ed8

Please sign in to comment.