Source code for prody.chromatin.cluster

import numpy as np
from prody import LOGGER, SETTINGS
from prody.dynamics import MaskedGNM
from prody.utilities import showFigure, bin2dec
from numbers import Integral

from .functions import _getEigvecs

__all__ = ['calcGNMDomains', 'Hingeplane', 'KMeans', 'Hierarchy', 'Discretize', 'showLinkage', 'GaussianMixture', 'BayesianGaussianMixture']

def Hingeplane(V, **kwargs):
    S = np.sign(np.sign(V) + 1)
    n, m = S.shape

    labels = np.zeros(n)
    for i, s in enumerate(S):
        labels[i] = bin2dec(s)

    uniq_labels = np.unique(labels)

    for i, l in enumerate(uniq_labels):
        labels[labels==l] = i
    return labels

[docs]def KMeans(V, **kwargs): """Performs k-means clustering on *V*. The function uses :func:`sklearn.cluster.KMeans`. See sklearn documents for details. :arg V: row-normalized eigenvectors for the purpose of clustering. :type V: :class:`~numpy.ndarray` :arg n_clusters: specifies the number of clusters. :type n_clusters: int """ try: from sklearn.cluster import KMeans except ImportError: raise ImportError('Use of this function (KMeans) requires the ' 'installation of sklearn.') n_clusters = kwargs.get('n_clusters', None) if n_clusters is None: raise ValueError('KMeans requires to desiginate the number of clusters.') n_init = kwargs.pop('n_init', 100) kmeans = KMeans(n_init=n_init, **kwargs).fit(V) return kmeans.labels_
[docs]def Hierarchy(V, **kwargs): """Performs hierarchical clustering on *V*. The function essentially uses two scipy functions: ``linkage`` and ``fcluster``. See :func:`scipy.cluster.hierarchy.linkage` and :func:`scipy.cluster.hierarchy.fcluster` for the explaination of the arguments. Here lists arguments that are different from those of scipy. :arg V: row-normalized eigenvectors for the purpose of clustering. :type V: :class:`~numpy.ndarray` :arg inconsistent_percentile: if the clustering *criterion* for :func:`scipy.cluster.hierarchy.fcluster` is ``inconsistent`` and threshold *t* is not given (default), then the function will use the percentile specified by this argument as the threshold. :type inconsistent_percentile: double :arg n_clusters: specifies the maximal number of clusters. If this argument is given, then the function will automatically set *criterion* to ``maxclust`` and *t* equal to *n_clusters*. :type n_clusters: int """ from scipy.cluster.hierarchy import linkage, fcluster, inconsistent method = kwargs.pop('linkage', 'single') metric = kwargs.pop('metric', 'cosine') Z = linkage(V, method=method, metric=metric) criterion = kwargs.pop('criterion', 'inconsistent') t = kwargs.get('t', None) ip = kwargs.pop('inconsistent_percentile', 99.9) if t is None and criterion == 'inconsistent': I = inconsistent(Z) i = np.percentile(I[:,3], ip) t = kwargs.pop('t', i) depth = kwargs.pop('depth', 2) R = kwargs.pop('R', None) monocrit = kwargs.pop('monocrit', None) n_clusters = kwargs.pop('n_clusters', None) if n_clusters is not None: criterion = 'maxclust' t = n_clusters labels = fcluster(Z, t, criterion=criterion, depth=depth, R=R, monocrit=monocrit) return labels.flatten()
[docs]def Discretize(V, **kwargs): """Adapted from :func:`~sklearn.cluster.spectral.discretize`. Copyright please see LICENSE.rst. """ from scipy.sparse import csc_matrix from scipy.linalg import LinAlgError copy = kwargs.pop('copy', False) max_svd_restarts = kwargs.pop('max_svd_restarts', 30) n_iter_max = kwargs.pop('n_iter_max', 20) random_state = kwargs.pop('random_state', None) info = kwargs.pop('info', None) def check_random_state(seed): """Adapted from :func:`~sklearn.utils.validation.check_random_state`.""" if seed is None or seed is np.random: return np.random.mtrand._rand if isinstance(seed, Integral): return np.random.RandomState(seed) if isinstance(seed, np.random.RandomState): return seed raise ValueError('%r cannot be used to seed a numpy.random.RandomState' ' instance' % seed) random_state = check_random_state(random_state) eps = np.finfo(float).eps vectors = np.array(V, dtype=float, copy=copy) n_samples, n_components = vectors.shape # Normalize the eigenvectors to an equal length of a vector of ones. # Reorient the eigenvectors to point in the negative direction with respect # to the first element. This may have to do with constraining the # eigenvectors to lie in a specific quadrant to make the discretization # search easier. norm_ones = np.sqrt(n_samples) for i in range(vectors.shape[1]): vectors[:, i] *= norm_ones if vectors[0, i] != 0: vectors[:, i] = -1 * vectors[:, i] * np.sign(vectors[0, i]) # Normalize the rows of the eigenvectors. Samples should lie on the unit # hypersphere centered at the origin. This transforms the samples in the # embedding space to the space of partition matrices. vectors = vectors / np.sqrt((vectors ** 2).sum(axis=1))[:, np.newaxis] svd_restarts = 0 has_converged = False # If there is an exception we try to randomize and rerun SVD again # do this max_svd_restarts times. while (svd_restarts < max_svd_restarts) and not has_converged: # Initialize first column of rotation matrix with a row of the # eigenvectors rotation = np.zeros((n_components, n_components)) rotation[:, 0] = vectors[random_state.randint(n_samples), :].T # To initialize the rest of the rotation matrix, find the rows # of the eigenvectors that are as orthogonal to each other as # possible c = np.zeros(n_samples) for j in range(1, n_components): # Accumulate c to ensure row is as orthogonal as possible to # previous picks as well as current one c += np.abs(np.dot(vectors, rotation[:, j - 1])) rotation[:, j] = vectors[c.argmin(), :].T last_objective_value = 0.0 n_iter = 0 while not has_converged: n_iter += 1 t_discrete = np.dot(vectors, rotation) labels = t_discrete.argmax(axis=1) vectors_discrete = csc_matrix( (np.ones(len(labels)), (np.arange(0, n_samples), labels)), shape=(n_samples, n_components)) t_svd = vectors_discrete.T * vectors try: U, S, Vh = np.linalg.svd(t_svd) svd_restarts += 1 except LinAlgError: print("SVD did not converge, randomizing and trying again") break ncut_value = 2.0 * (n_samples - S.sum()) if ((abs(ncut_value - last_objective_value) < eps) or (n_iter > n_iter_max)): has_converged = True else: # otherwise calculate rotation and continue last_objective_value = ncut_value rotation = np.dot(Vh.T, U.T) if info is not None: if not isinstance(info, dict): raise TypeError('info must be a dict') info['indicators'] = t_discrete info['vectors'] = vectors if not has_converged: raise LinAlgError('SVD did not converge') return labels
[docs]def GaussianMixture(V, **kwargs): """Performs clustering on *V* by using Gaussian mixture models. The function uses :func:`sklearn.micture.GaussianMixture`. See sklearn documents for details. :arg V: row-normalized eigenvectors for the purpose of clustering. :type V: :class:`~numpy.ndarray` :arg n_clusters: specifies the number of clusters. :type n_clusters: int """ try: from sklearn.mixture import GaussianMixture except ImportError: raise ImportError('Use of this function (GaussianMixture) requires the ' 'installation of sklearn.') n_components = kwargs.pop('n_components', None) if n_components == None: n_components = kwargs.pop('n_clusters',None) if n_components == None: n_components = 1 n_init = kwargs.pop('n_init', 1) mixture = GaussianMixture(n_init=n_init, n_components=n_components, **kwargs).fit(V) return mixture.fit_predict(V)
[docs]def BayesianGaussianMixture(V, **kwargs): """Performs clustering on *V* by using Gaussian mixture models with variational inference. The function uses :func:`sklearn.micture.GaussianMixture`. See sklearn documents for details. :arg V: row-normalized eigenvectors for the purpose of clustering. :type V: :class:`~numpy.ndarray` :arg n_clusters: specifies the number of clusters. :type n_clusters: int """ try: from sklearn.mixture import BayesianGaussianMixture except ImportError: raise ImportError('Use of this function (BayesianGaussianMixture) requires the ' 'installation of sklearn.') n_components = kwargs.pop('n_components', None) if n_components == None: n_components = kwargs.pop('n_clusters',None) if n_components == None: n_components = 1 n_init = kwargs.pop('n_init', 1) mixture = BayesianGaussianMixture(n_init=n_init, **kwargs).fit(V) return mixture.fit_predict(V)
[docs]def showLinkage(V, **kwargs): """Shows the dendrogram of hierarchical clustering on *V*. See :func:`scipy.cluster.hierarchy.dendrogram` for details. :arg V: row-normalized eigenvectors for the purpose of clustering. :type V: :class:`~numpy.ndarray` """ V = _getEigvecs(V, row_norm=True) try: from scipy.cluster.hierarchy import linkage, dendrogram except ImportError: raise ImportError('Use of this function (showLinkage) requires the ' 'installation of scipy.') method = kwargs.pop('method', 'single') metric = kwargs.pop('metric', 'euclidean') Z = linkage(V, method=method, metric=metric) no_labels = kwargs.pop('no_labels', True) dendrogram(Z, no_labels=no_labels, **kwargs) if SETTINGS['auto_show']: showFigure() return Z
[docs]def calcGNMDomains(modes, method=Discretize, **kwargs): """Uses spectral clustering to separate structural domains in the chromosome. :arg modes: GNM modes used for segmentation :type modes: :class:`ModeSet` :arg method: Label assignment algorithm used after Laplacian embedding of loci. :type method: func """ dummy_mode = kwargs.pop('dummy_mode', True) row_norm = kwargs.pop('row_norm', True) linear = kwargs.pop('linear', False) V = _getEigvecs(modes, row_norm=row_norm, dummy_mode=dummy_mode) labels_ = method(V, **kwargs) if linear: split_labels = lambda l: np.split(l, np.where(np.diff(l) != 0)[0]+1) labels = split_labels(labels_) for i in range(len(labels)): l = np.empty_like(labels[i]) l.fill(i) labels[i] = l labels = np.hstack(labels) else: labels = labels_ if hasattr(modes, '_model'): model = modes._model else: model = modes if isinstance(model, MaskedGNM): currlbl = labels[0] labels = model._extend(labels, -1) for i, l in enumerate(labels): if l < 0: labels[i] = currlbl elif currlbl != l: currlbl = l return labels