Source code for cytoflow.utility.consensus

# from https://github.com/burtonrj/consensusclustering/

from __future__ import annotations

from itertools import combinations
from typing import Type
from warnings import warn

import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
from joblib import Parallel, delayed
from .kneed import KneeLocator
from tqdm.auto import tqdm

[docs] def resample(x: np.ndarray, frac: float) -> tuple[np.ndarray, np.ndarray]: """ Resample a matrix x by a fraction frac Parameters ---------- x: np.ndarray Matrix to resample frac: float Fraction of rows to resample Returns ------- tuple[np.ndarray, np.ndarray] Indices of resampled rows and resampled matrix """ resampled_indices = np.random.choice( range(x.shape[0]), size=int(x.shape[0] * frac), replace=False ) return resampled_indices, x[resampled_indices, :]
[docs] def compute_connectivity_matrix(labels: np.ndarray) -> np.ndarray: """ Compute a connectivity matrix from a vector of labels - the connectivity matrix is a symmetric matrix where the (i, j)th entry is 1 if the ith and jth elements of the labels vector are the same and 0 otherwise. Computation of connectivity matrices handles out-of-bag samples by setting the (i, j)th entry to 0 if either the ith or jth element of the labels vector is -1, where a -1 indicates the item is not included in the sample. IF YOU ARE USING A CLUSTERING ALGORITHM THAT DESIGNATES LABELS AS -1, YOU MUST CHANGE THE LABELS TO A DIFFERENT VALUE BEFORE CALLING THIS FUNCTION. Parameters ---------- labels: np.ndarray Vector of labels Returns ------- np.ndarray Connectivity matrix """ out_of_bag_idx = np.where(labels == -1)[0] connectivity_matrix = np.equal.outer(labels, labels).astype("int8") rows, cols = zip(*list(combinations(out_of_bag_idx, 2))) connectivity_matrix[rows, cols] = 0 connectivity_matrix[cols, rows] = 0 connectivity_matrix[out_of_bag_idx, out_of_bag_idx] = 0 return connectivity_matrix
[docs] def compute_identity_matrix(x: np.ndarray, resampled_indices: np.ndarray) -> np.ndarray: """ Compute an identity matrix from a matrix x and a vector of resampled indices - the identity matrix is a symmetric matrix where the (i, j)th entry is 1 if the ith and jth elements are both in the resampled indices and 0 otherwise. Parameters ---------- x: np.ndarray Matrix that was resampled resampled_indices: np.ndarray Indices of resampled rows Returns ------- np.ndarray Identity matrix """ n = x.shape[0] identity_matrix = np.zeros((n, n), dtype="int8") rows, cols = zip(*list(combinations(resampled_indices, 2))) identity_matrix[rows, cols] = 1 identity_matrix[cols, rows] = 1 identity_matrix[resampled_indices, resampled_indices] = 1 return identity_matrix
[docs] def compute_consensus_matrix( connectivity_matrices: list[np.ndarray], identity_matrices: list[np.ndarray] ) -> np.ndarray: """ Compute a consensus matrix from a list of connectivity matrices and a list of identity matrices - the consensus matrix is a symmetric matrix where the (i, j)th entry is the proportion of connectivity matrices where the ith and jth elements are connected normalized by the proportion of identity matrices where the ith and jth elements are sampled. Parameters ---------- connectivity_matrices: list[np.ndarray] List of connectivity matrices identity_matrices: list[np.ndarray] List of identity matrices Returns ------- np.ndarray Consensus matrix """ return np.nan_to_num( np.sum(connectivity_matrices, axis=0) / np.sum(identity_matrices, axis=0), nan=0 )
[docs] def valid_clustering_obj(clustering_obj, k_param: str) -> bool: """ Check if a clustering object has the necessary methods to be used in consensus clustering (i.e. Scikit-learn signatures) Parameters ---------- clustering_obj: object Clustering object to check k_param: str Name of the parameter that sets the number of clusters Returns ------- bool True if clustering_obj has fit_predict and set_params methods, False otherwise. """ if not hasattr(clustering_obj, "fit_predict"): return False if not hasattr(clustering_obj, "set_params"): return False if not hasattr(clustering_obj, k_param): return False return True
[docs] def cluster( x: np.ndarray, resample_frac: float, k: int, clustering_obj, k_param: str = "n_clusters", ) -> tuple[Type, np.ndarray, np.ndarray]: """ Sample a matrix x and cluster the resampled matrix, returning the clustering object, the indices of the resampled rows, and labels for the items in x with a value of -1 for items not included in the resampled matrix. Parameters ---------- x: np.ndarray Matrix to resample and cluster resample_frac: float Fraction of rows to resample k: int Number of clusters clustering_obj Clustering object to use; must have fit_predict and set_params methods k_param: str Name of the parameter that sets the number of clusters Returns ------- tuple[Type, np.ndarray, np.ndarray] Clustering object, indices of resampled rows, and labels for the items in x """ if not valid_clustering_obj(clustering_obj, k_param): raise ValueError("clustering_obj must have fit_predict and set_params methods") clustering_obj.set_params(**{k_param: k}) resampled_indices, resampled_x = resample(x, resample_frac) resampled_labels = clustering_obj.fit_predict(resampled_x) if -1 in resampled_labels: raise ValueError( "Clustering object returned -1 as a label. -1 is reserved for items not " "included in the resampled matrix. Please modify the clustering object " "to return a label other than -1." ) labels = np.full(x.shape[0], -1) labels[resampled_indices] = resampled_labels return clustering_obj, resampled_indices, labels
[docs] class ConsensusClustering: """ Consensus clustering for measuring the stability of clusters and selecting the optimal number of clusters. Consensus clustering is originally described in https://link.springer.com/article/10.1023/A:1023949509487 and implemented in R in the ConsensusClusterPlus package (https://academic.oup.com/bioinformatics/article/26/12/1572/281699). This class is a Python implementation of the same algorithm. Clustering stability is measured by resampling rows of a matrix, clustering the resampled matrix, and computing a consensus matrix from the resampled clusters. The consensus distribution of each pair of items is used to measure cluster stability and the optimal number of clusters chosen by maximizing the change in the area under the cumulative distribution function (CDF) of the consensus matrix. """ def __init__( self, clustering_obj, min_clusters: int, max_clusters: int, n_resamples: int, resample_frac: float = 0.5, k_param: str = "n_clusters", ): """ Initialize a ConsensusClustering object. Parameters ---------- clustering_obj Clustering object to use; must have fit_predict and set_params methods. min_clusters: int Minimum number of clusters to consider. Must be greater than or equal to 2. max_clusters: int Maximum number of clusters to consider. n_resamples: int Number of resamples to perform. resample_frac: float Fraction of rows to resample. k_param: str Name of the parameter that sets the number of clusters. """ self.clustering_obj = clustering_obj self.min_clusters = min_clusters self.max_clusters = max_clusters self.n_resamples = n_resamples self.resample_frac = resample_frac self.consensus_matrices_: list[np.ndarray] = [] self.k_param = k_param if self.min_clusters < 2: raise ValueError("min_clusters must be >= 2") if self.max_clusters < self.min_clusters: raise ValueError("max_clusters must be >= min_clusters") @property def cluster_range_(self) -> list[int]: return list(range(self.min_clusters, self.max_clusters + 1))
[docs] def consensus_k(self, k: int) -> np.ndarray: """ Get the consensus matrix for a given number of clusters. Parameters ---------- k: int Number of clusters Returns ------- np.ndarray Consensus matrix for k """ if len(self.consensus_matrices_) == 0: raise ValueError("Consensus matrices not computed yet.") return self.consensus_matrices_[k - self.min_clusters]
def _fit_multiprocess( self, x: np.ndarray, progress_bar: bool = False, n_jobs: int = -1 ): """ Compute consensus matrices for all values of k in parallel Parameters ---------- x: np.ndarray Matrix to cluster progress_bar: bool (default: False) Whether to display a progress bar n_jobs: int (default: -1) Number of jobs to run in parallel Returns ------- None Consensus matrices are stored in self.consensus_matrices_ """ self.consensus_matrices_ = [] with Parallel(n_jobs) as parallel: self.consensus_matrices_ = list( parallel( delayed(self._fit_single_k)(x, k) for k in tqdm( self.cluster_range_, disable=not progress_bar, desc="Computing consensus matrices", total=self.max_clusters - self.min_clusters + 1, ) ) ) def _fit_single_k(self, x: np.ndarray, k: int) -> np.ndarray: """ Compute the consensus matrix for a single value of k. Parameters ---------- x: np.ndarray Matrix to cluster k: int Number of clusters Returns ------- np.ndarray Consensus matrix for k """ connectivity_matrices = [] identity_matrices = [] for _ in range(self.n_resamples): _, resampled_indices, labels = cluster( x, self.resample_frac, k, self.clustering_obj, self.k_param ) connectivity_matrices.append(compute_connectivity_matrix(labels)) identity_matrices.append(compute_identity_matrix(x, resampled_indices)) return compute_consensus_matrix(connectivity_matrices, identity_matrices)
[docs] def fit(self, x: np.ndarray, progress_bar: bool = False, n_jobs: int = 0) -> None: """ Compute consensus matrices for all values of k. Parameters ---------- x: np.ndarray Matrix to cluster progress_bar: bool (default: False) Whether to display a progress bar n_jobs: int (default: 0) Number of jobs to run in parallel. If 0, run in serial. Returns ------- None Consensus matrices are stored in ``self.consensus_matrices_`` """ self.consensus_matrices_ = [] if n_jobs == 0: for k in tqdm( self.cluster_range_, disable=not progress_bar, desc="Computing consensus matrices", total=self.max_clusters - self.min_clusters + 1, ): self.consensus_matrices_.append(self._fit_single_k(x, k)) else: self._fit_multiprocess(x, progress_bar, n_jobs)
[docs] def hist(self, k: int) -> tuple[np.ndarray, np.ndarray]: """ Compute the histogram of the consensus matrix for a given number of clusters. Parameters ---------- k: int Number of clusters Returns ------- tuple[np.ndarray, np.ndarray] Histogram and bins """ hist, bins = np.histogram(self.consensus_k(k).ravel(), density=True) return hist, bins
[docs] def cdf(self, k: int) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """ Compute the cumulative distribution function (CDF) of the consensus matrix for a given number of clusters. Parameters ---------- k: int Number of clusters Returns ------- tuple[np.ndarray, np.ndarray, np.ndarray] CDF, histogram, and bins """ hist, bins = self.hist(k) ecdf = np.cumsum(hist) bin_widths = np.diff(bins) ecdf = ecdf * bin_widths return ecdf, hist, bins
[docs] def area_under_cdf(self, k: int) -> float: """ Compute the area under the cumulative distribution function (CDF) of the consensus matrix for a given number of clusters. Parameters ---------- k: int Number of clusters Returns ------- float Area under the CDF """ ecdf, _, bins = self.cdf(k) return np.sum(ecdf * (bins[1:] - bins[:-1]))
[docs] def change_in_area_under_cdf(self) -> np.ndarray: """ Compute the proportional change in the area under the cumulative distribution function (CDF) of the consensus matrix for each number of clusters. Returns ------- np.ndarray Proportional change in area under the CDF as a function of the number of clusters """ auc = [self.area_under_cdf(k) for k in self.cluster_range_] delta_k = [ (b - a) / b if k > 2 else a for a, b, k in zip(auc[:-1], auc[1:], self.cluster_range_) ] return np.array(delta_k)
[docs] def best_k(self, method: str = "knee") -> int: """ Compute the optimal number of clusters by maximizing the change in the area under the cumulative distribution function (CDF) of the consensus matrix. Returns ------- int Optimal number of clusters """ if method == "change_in_auc": change = self.change_in_area_under_cdf() largest_change_in_auc = np.argmax(change) return list(range(self.min_clusters, self.max_clusters))[ largest_change_in_auc ] elif method == "knee": kneedle = KneeLocator( self.cluster_range_, [self.area_under_cdf(k) for k in self.cluster_range_], curve="concave", direction="increasing", ) if kneedle.knee is None: warn( "Kneedle algorithm failed to find a knee. " "Returning maximum number of clusters, however, it is likely that " "the clustering is unstable. Plot the CDFs and consensus matrices " "to check." ) return self.max_clusters return kneedle.knee else: raise ValueError("method must be one of 'change_in_auc' or 'knee'")
[docs] def plot_auc_cdf(self, include_knee: bool = True, ax: plt.Axes | None = None): """ Plot the area under the cumulative distribution function (CDF) of the consensus matrix as a function of the number of clusters. Parameters ---------- include_knee: bool (default: True) Whether to include a vertical line at the knee ax: plt.Axes (default: None) Axis on which to plot Returns ------- None """ ax = plt.subplots(figsize=(5, 5))[1] if ax is None else ax auc = [self.area_under_cdf(k) for k in self.cluster_range_] ax.plot( self.cluster_range_, auc, "--", marker="o", markerfacecolor="white", markeredgecolor="black", ) if include_knee: knee = self.best_k(method="knee") ax.axvline( knee, color="k", linestyle="--", label="Knee", ) ax.set_xlabel("K") ax.set_ylabel("Area under CDF") return ax
[docs] def plot_clustermap(self, k: int, **kwargs): """ Plot a clustermap of the consensus matrix for a given number of clusters. Parameters ---------- k: int Number of clusters kwargs: Keyword arguments to pass to seaborn.clustermap Returns ------- seaborn.ClusterGrid """ return sns.clustermap(self.consensus_k(k), **kwargs)
[docs] def plot_hist(self, k: int, ax: plt.Axes | None = None) -> plt.Axes: """ Plot a histogram of the consensus matrix for a given number of clusters. Parameters ---------- k: int Number of clusters ax: plt.Axes | None Axis to plot on, or None to create a new figure Returns ------- matplotlib.pyplot.Axes """ ax = ax if ax is not None else plt.subplots(figsize=(6.5, 6.5))[1] hist, bins = self.hist(k) ax.bar(bins[:-1], hist, width=np.diff(bins)) ax.set_xlabel("Consensus index value") ax.set_ylabel("Density") ax.set_xlim(0, 1) return ax
[docs] def plot_cdf(self, ax: plt.Axes | None = None) -> plt.Axes: """ Plot the cumulative distribution function (CDF) of the consensus matrix for each number of clusters. Parameters ---------- ax: plt.Axes | None Axis to plot on, or None to create a new figure Returns ------- matplotlib.pyplot.Axes """ ax = ax if ax is not None else plt.subplots(figsize=(5, 5))[1] for k in self.cluster_range_: ecdf, _, bins = self.cdf(k) ax.step(bins[:-1], ecdf, label=f"{k} clusters") ax.set_xlabel("Consensus index value") ax.set_ylabel("CDF") ax.set_xlim(0, 1) ax.legend() return ax
[docs] def plot_change_area_under_cdf(self, ax: plt.Axes | None = None) -> plt.Axes: """ Plot the change in the area under the cumulative distribution function (CDF) of the consensus matrix for each number of clusters. Parameters ---------- ax: plt.Axes | None Axis to plot on, or None to create a new figure Returns ------- matplotlib.pyplot.Axes """ ax = ax if ax is not None else plt.subplots(figsize=(5, 5))[1] change = self.change_in_area_under_cdf() ax.plot( self.cluster_range_[:-1], change, "--", marker="o", markerfacecolor="white", markeredgecolor="black", ) ax.set_xlabel("K") ax.set_ylabel("Change in area under CDF") return ax