Skip to content

Commit

Permalink
thread
Browse files Browse the repository at this point in the history
  • Loading branch information
zprobot committed Nov 19, 2023
1 parent 48ba0fc commit 6299551
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 25 deletions.
51 changes: 26 additions & 25 deletions python/quantmsio/quantms_io/core/diann_convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from quantms_io.core.feature_in_memory import FeatureInMemory
from quantms_io.core.psm import PSMHandler
from collections import Counter
from quantms_io.utils.thread import MyThread
from typing import Any, List, Tuple, Dict, Set

MODIFICATION_PATTERN = re.compile(r"\((.*?)\)")
Expand Down Expand Up @@ -504,16 +505,6 @@ def extract_from_psm_to_pep_msg(self, report_path: str, qvalue_threshold: float,
:return: A pandas DataFrame containing the extracted information.
"""

def __find_info(folder, n):
files = list(Path(folder).glob(f"*{n}_mzml_info.tsv"))
# Check that it matches one and only one file
if not files:
raise ValueError(f"Could not find {n} info file in {dir}")
if len(files) > 1:
raise ValueError(f"Found multiple {n} info files in {dir}: {files}")

return files[0]

psm_unique_keys = []
spectra_count_dict = Counter()

Expand All @@ -522,25 +513,35 @@ def __find_info(folder, n):

report = report.merge(index_ref[["ms_run", "Run", "study_variable"]], on="Run", validate="many_to_one")

out_mztab_psh = pd.DataFrame()

for n, group in report.groupby(["Run"]):
if isinstance(n, tuple) and len(n) == 1:
# This is here only to support versions of pandas where the groupby
# key is a tuple.
# related: https://github.com/pandas-dev/pandas/pull/51817
n = n[0]

file = __find_info(folder, n)
target = pd.read_csv(file, sep="\t", usecols=["Retention_Time", "SpectrumID", "Exp_Mass_To_Charge"])
def intergrate_msg(folder,n,group):
files = list(Path(folder).glob(f"*{n}_mzml_info.tsv"))
if not files:
raise ValueError(f"Could not find {n} info file in {dir}")
target = pd.read_csv(files[0],sep='\t',usecols=["Retention_Time", "SpectrumID", "Exp_Mass_To_Charge"])
group.sort_values(by="RT.Start", inplace=True)
target.rename(columns={"Retention_Time": "RT.Start", "SpectrumID": "opt_global_spectrum_reference",
"Exp_Mass_To_Charge": "exp_mass_to_charge"}, inplace=True)
# TODO seconds returned from precursor.getRT()
"Exp_Mass_To_Charge": "exp_mass_to_charge"}, inplace=True)
target["RT.Start"] = target["RT.Start"] / 60
#target.loc[:, "RT.Start"] = target.apply(lambda x: x["RT.Start"] / 60, axis=1)
out_mztab_psh = pd.concat(
[out_mztab_psh, pd.merge_asof(group, target, on="RT.Start", direction="nearest")])
res = pd.merge_asof(group, target, on="RT.Start", direction="nearest")
return res

out_mztab_psh = pd.DataFrame()
ThreadPool = []
references = report['Run'].unique().tolist()
references_threadpools = [references[i:i+16] for i in range(0,len(references),16)]

for refs in references_threadpools:
for ref in refs:
ThreadPool.append(MyThread(target=intergrate_msg, args=(folder,ref,report[report['Run']==ref].copy())))
for p in ThreadPool:
p.start()
for p in ThreadPool:
p.join()
out_mztab_psh = pd.concat([out_mztab_psh]+[p.result for p in ThreadPool])
ThreadPool = []



## Score at PSM level: Q.Value
out_mztab_psh = out_mztab_psh[
Expand Down
15 changes: 15 additions & 0 deletions python/quantmsio/quantms_io/utils/thread.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from threading import Thread

class MyThread(Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, *, daemon=None) -> None:
super().__init__(group=group, target=target,
name=name, args=args,
kwargs=kwargs, daemon=daemon)

def run(self) -> None:
try:
if self._target:
self.result = self._target(*self._args, **self._kwargs)
finally:
del self._target, self._args, self._kwargs

0 comments on commit 6299551

Please sign in to comment.