Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor encore conformational_distance_matrix #1145

Merged
merged 15 commits into from
Jan 15, 2017
Merged
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ before_install:
- conda install --yes pylint
install:
# Minimal installation!
- conda create --yes -q -n pyenv python=$PYTHON_VERSION numpy mmtf-python nose=1.3.7 mock sphinx=1.3 six biopython networkx cython
- conda create --yes -q -n pyenv python=$PYTHON_VERSION numpy mmtf-python nose=1.3.7 mock sphinx=1.3 six biopython networkx cython joblib
- source activate pyenv
# Install griddataformats from PIP so that scipy is only installed in the full build (#1147)
- pip install griddataformats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@
try:
import sklearn.cluster
except ImportError:
sklearn = None
msg = "sklearn.cluster could not be imported: some functionality will " \
"not be available in encore.fit_clusters()"
warnings.warn(msg, category=ImportWarning)
logging.warn(msg)
del msg
sklearn = None
msg = "sklearn.cluster could not be imported: some functionality will " \
"not be available in encore.fit_clusters()"
warnings.warn(msg, category=ImportWarning)
logging.warn(msg)
del msg


def encode_centroid_info(clusters, cluster_centers_indices):
Expand Down
213 changes: 54 additions & 159 deletions package/MDAnalysis/analysis/encore/confdistmatrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ class to compute an RMSD matrix in such a way is also available.
"""

import numpy as np
from multiprocessing import Process, Array, RawValue
from ctypes import c_float
from getpass import getuser
from socket import gethostname
from datetime import datetime
Expand All @@ -49,61 +47,51 @@ class to compute an RMSD matrix in such a way is also available.
from ..align import rotation_matrix

from .cutils import PureRMSD
from .utils import TriangularMatrix, trm_indeces, \
AnimatedProgressBar
from .utils import TriangularMatrix, trm_indices

try:
from joblib import Parallel, delayed
except ImportError:
import warnings
warnings.warn( "Couldn't import joblib. Can't use conformational_distance_matrix", category=ImportWarning)


def conformational_distance_matrix(ensemble,
conf_dist_function, selection="",
superimposition_selection="", ncores=1, pairwise_align=True,
mass_weighted=True, metadata=True, *args, **kwargs):
conf_dist_function, selection="",
superimposition_selection="", n_jobs=1, pairwise_align=True,
mass_weighted=True, metadata=True, verbose=False):
"""
Run the conformational distance matrix calculation.
args and kwargs are passed to conf_dist_function.

Parameters
----------

ensemble : Universe object
Universe object for which the conformational distance matrix will
be computed.

conf_dist_function : function object
Function that fills the matrix with conformational distance
values. See set_rmsd_matrix_elements for an example.

pairwise_align : bool
Whether to perform pairwise alignment between conformations.
Default is True (do the superimposition)

mass_weighted : bool
Whether to perform mass-weighted superimposition and metric
calculation. Default is True.

metadata : bool
Whether to build a metadata dataset for the calculated matrix.
Default is True.

ncores : int
n_jobs : int
Number of cores to be used for parallel calculation
Default is 1.
Default is 1. -1 uses all available cores

Returns
-------

conf_dist_matrix : encore.utils.TriangularMatrix object
Conformational distance matrix in triangular representation.

"""

# Decide how many cores have to be used. Since the main process is
# stopped while the workers do their job, ncores workers will be
# spawned.

if ncores < 1:
ncores = 1

# framesn: number of frames
framesn = len(ensemble.trajectory.timeseries(
ensemble.select_atoms(selection), format='fac'))
Expand Down Expand Up @@ -160,73 +148,30 @@ def conformational_distance_matrix(ensemble,
else:
subset_masses = None

# matsize: number of elements of the triangular matrix, diagonal
# elements included.
matsize = framesn * (framesn + 1) / 2

# Calculate the number of matrix elements that each core has to
# calculate as equally as possible.
if ncores > matsize:
ncores = matsize
runs_per_worker = [matsize / int(ncores) for x in range(ncores)]
unfair_work = matsize % ncores
for i in range(unfair_work):
runs_per_worker[i] += 1

# Splice the matrix in ncores segments. Calculate the first and the
# last (i,j) matrix elements of the slices that will be assigned to
# each worker. Each of them will proceed in a column-then-row order
# (e.g. 0,0 1,0 1,1 2,0 2,1 2,2 ... )
i = 0
a = [0, 0]
b = [0, 0]
tasks_per_worker = []
for n,r in enumerate(runs_per_worker):
while i * (i - 1) / 2 < np.sum(runs_per_worker[:n + 1]):
i += 1
b = [i - 2,
np.sum(runs_per_worker[0:n + 1]) - (i - 2) * (i - 1) / 2 - 1]
tasks_per_worker.append((tuple(a), tuple(b)))
if b[0] == b[1]:
a[0] = b[0] + 1
a[1] = 0
else:
a[0] = b[0]
a[1] = b[1] + 1

# Allocate for output matrix
distmat = Array(c_float, matsize)
matsize = framesn * (framesn + 1) / 2
distmat = np.empty(matsize, np.float64)

# Prepare progress bar stuff and run it
pbar = AnimatedProgressBar(end=matsize, width=80)
partial_counters = [RawValue('i', 0) for i in range(ncores)]

# Initialize workers. Simple worker doesn't perform fitting,
# fitter worker does.

workers = [Process(target=conf_dist_function, args=(
tasks_per_worker[i],
indices = trm_indices((0, 0), (framesn - 1, framesn - 1))
Parallel(n_jobs=n_jobs, verbose=verbose)(delayed(conf_dist_function)(
element,
rmsd_coordinates,
distmat,
masses,
fitting_coordinates,
subset_masses,
partial_counters[i],
args,
kwargs)) for i in range(ncores)]
masses) for element in indices)

# Start & join the workers
for w in workers:
w.start()
for w in workers:
w.join()

# When the workers have finished, return a TriangularMatrix object
return TriangularMatrix(distmat, metadata=metadata)


def set_rmsd_matrix_elements(tasks, coords, rmsdmat, masses, fit_coords=None,
fit_masses=None, pbar_counter=None, *args, **kwargs):
fit_masses=None, pbar_counter=None, *args, **kwargs):

'''
RMSD Matrix calculator
Expand All @@ -237,7 +182,7 @@ def set_rmsd_matrix_elements(tasks, coords, rmsdmat, masses, fit_coords=None,
tasks : iterator of int of length 2
Given a triangular matrix, this function will calculate RMSD
values from element tasks[0] to tasks[1]. Since the matrix
is triangular, the trm_indeces matrix automatically
is triangular, the trm_indices matrix automatically
calculates the corrisponding i,j matrix indices.
The matrix is written as an array in a row-major
order (see the TriangularMatrix class for details).
Expand All @@ -264,82 +209,38 @@ def set_rmsd_matrix_elements(tasks, coords, rmsdmat, masses, fit_coords=None,
fit_masses : numpy.array
Array of atomic masses, having the same order as the
fit_coords array

pbar_counter : multiprocessing.RawValue
Thread-safe shared value. This counter is updated at
every cycle and used to evaluate the progress of
each worker in a parallel calculation.
'''

i, j = tasks

if fit_coords is None and fit_masses is None:
for i, j in trm_indeces(tasks[0], tasks[1]):
summasses = np.sum(masses)
rmsdmat[(i + 1) * i / 2 + j] = PureRMSD(coords[i].astype(np.float64),
coords[j].astype(np.float64),
coords[j].shape[0],
masses,
summasses)
summasses = np.sum(masses)
rmsdmat[(i + 1) * i / 2 + j] = PureRMSD(coords[i],
coords[j],
coords[j].shape[0],
masses,
summasses)

elif fit_coords is not None and fit_coords is not None:
for i, j in trm_indeces(tasks[0], tasks[1]):
summasses = np.sum(masses)
subset_weights = np.asarray(fit_masses) / np.mean(fit_masses)
com_i = np.average(fit_coords[i], axis=0,
weights=fit_masses)
translated_i = coords[i] - com_i
subset1_coords = fit_coords[i] - com_i
com_j = np.average(fit_coords[j], axis=0,
weights=fit_masses)
translated_j = coords[j] - com_j
subset2_coords = fit_coords[j] - com_j
rotamat = rotation_matrix(subset1_coords, subset2_coords,
subset_weights)[0]
rotated_i = np.transpose(np.dot(rotamat, np.transpose(translated_i)))
rmsdmat[(i + 1) * i / 2 + j] = PureRMSD(
rotated_i.astype(np.float64), translated_j.astype(np.float64),
coords[j].shape[0], masses, summasses)

summasses = np.sum(masses)
subset_weights = np.asarray(fit_masses) / np.mean(fit_masses)
com_i = np.average(fit_coords[i], axis=0,
weights=fit_masses)
translated_i = coords[i] - com_i
subset1_coords = fit_coords[i] - com_i
com_j = np.average(fit_coords[j], axis=0,
weights=fit_masses)
translated_j = coords[j] - com_j
subset2_coords = fit_coords[j] - com_j
rotamat = rotation_matrix(subset1_coords, subset2_coords,
subset_weights)[0]
rotated_i = np.transpose(np.dot(rotamat, np.transpose(translated_i)))
rmsdmat[(i + 1) * i / 2 + j] = PureRMSD(
rotated_i.astype(np.float64), translated_j.astype(np.float64),
coords[j].shape[0], masses, summasses)
else:
raise TypeError("Both fit_coords and fit_masses must be specified \
if one of them is given")

if pbar_counter is not None:
pbar_counter.value += 1

def pbar_updater(pbar, pbar_counters, max_val, update_interval=0.2):
'''Method that updates and prints the progress bar, upon polling
progress status from workers.

Parameters
----------

pbar : encore.utils.AnimatedProgressBar object
Progress bar object

pbar_counters : list of multiprocessing.RawValue
List of counters. Each worker is given a counter, which is updated
at every cycle. In this way the _pbar_updater process can
asynchronously fetch progress reports.

max_val : int
Total number of matrix elements to be calculated

update_interval : float
Number of seconds between progress bar updates

'''

val = 0
while val < max_val:
val = 0
for c in pbar_counters:
val += c.value
pbar.update(val)
pbar.show_progress()
sleep(update_interval)



def get_distance_matrix(ensemble,
selection="name CA",
Expand All @@ -348,7 +249,8 @@ def get_distance_matrix(ensemble,
superimpose=True,
superimposition_subset="name CA",
mass_weighted=True,
ncores=1,
n_jobs=1,
verbose=False,
*conf_dist_args,
**conf_dist_kwargs):
"""
Expand All @@ -369,34 +271,28 @@ def get_distance_matrix(ensemble,

Parameters
----------

ensemble : Universe

selection : str
Atom selection string in the MDAnalysis format. Default is "name CA"

load_matrix : str, optional
Load similarity/dissimilarity matrix from numpy binary file instead
of calculating it (default is None). A filename is required.

save_matrix : bool, optional
Save calculated matrix as numpy binary file (default is None). A
filename is required.

superimpose : bool, optional
Whether to superimpose structures before calculating distance
(default is True).

superimposition_subset : str, optional
Group for superimposition using MDAnalysis selection syntax
(default is CA atoms: "name CA")

mass_weighted : bool, optional
calculate a mass-weighted RMSD (default is True). If set to False
the superimposition will also not be mass-weighted.

ncores : int, optional
Maximum number of cores to be used (default is 1)
n_jobs : int, optional
Maximum number of cores to be used (default is 1). If -1 use all cores.
verbose : bool, optional
print progress

Returns
-------
Expand Down Expand Up @@ -446,13 +342,12 @@ def get_distance_matrix(ensemble,
# Use superimposition subset, if necessary. If the pairwise alignment
# is not required, it will not be performed anyway.
confdistmatrix = conformational_distance_matrix(ensemble,
conf_dist_function=set_rmsd_matrix_elements,
selection=selection,
pairwise_align=superimpose,
mass_weighted=mass_weighted,
ncores=ncores,
*conf_dist_args,
kwargs=conf_dist_kwargs)
conf_dist_function=set_rmsd_matrix_elements,
selection=selection,
pairwise_align=superimpose,
mass_weighted=mass_weighted,
n_jobs=n_jobs,
verbose=verbose)

logging.info(" Done!")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,11 @@
try:
import sklearn.decomposition
except ImportError:
sklearn = None
msg = "sklearn.decomposition could not be imported: some functionality will"\
"not be available in encore.dimensionality_reduction()"
warnings.warn(msg, category=ImportWarning)
logging.warn(msg)
del msg
sklearn = None
import warnings
warnings.warn("sklearn.decomposition could not be imported: some "
"functionality will not be available in "
"encore.dimensionality_reduction()", category=ImportWarning)


class DimensionalityReductionMethod (object):
Expand Down
Loading