Source code for cytoflow.utility.consensus_cluster

###############################################
# Copyright Žiga Sajovic, XLAB 2019           #
# Distributed under the MIT License           #
#                                             #
# github.com/ZigaSajovic/Consensus_Clustering #
#                                             #
###############################################

import numpy as np
from itertools import combinations
import bisect


[docs] class ConsensusCluster: """ Implementation of Consensus clustering, following the paper https://link.springer.com/content/pdf/10.1023%2FA%3A1023949509487.pdf Args: * cluster -> clustering class * NOTE: the class is to be instantiated with parameter ``n_clusters``, and possess a ``fit_predict`` method, which is invoked on data. * L -> smallest number of clusters to try * K -> biggest number of clusters to try * H -> number of resamplings for each cluster number * resample_proportion -> percentage to sample * Mk -> consensus matrices for each k (shape =(K,data.shape[0],data.shape[0])) (NOTE: every consensus matrix is retained, like specified in the paper) * Ak -> area under CDF for each number of clusters (see paper: section 3.3.1. Consensus distribution.) * deltaK -> changes in areas under CDF (see paper: section 3.3.1. Consensus distribution.) * self.bestK -> number of clusters that was found to be best """ def __init__(self, cluster, L, K, H, resample_proportion=0.5): assert 0 <= resample_proportion <= 1, "proportion has to be between 0 and 1" self.cluster_ = cluster self.resample_proportion_ = resample_proportion self.L_ = L self.K_ = K self.H_ = H self.Mk = None self.Ak = None self.deltaK = None self.bestK = None def _internal_resample(self, data, proportion): """ Args: * data -> (examples,attributes) format * proportion -> percentage to sample """ resampled_indices = np.sort(np.random.choice( range(data.shape[0]), size=int(data.shape[0]*proportion), replace=False)) return resampled_indices, data[resampled_indices, :]
[docs] def fit(self, data, verbose=False): """ Fits a consensus matrix for each number of clusters Args: * data -> (examples,attributes) format * verbose -> should print or not """ Mk = np.zeros((self.K_-self.L_, data.shape[0], data.shape[0])) Is = np.zeros((data.shape[0],)*2) for k in range(self.L_, self.K_): # for each number of clusters i_ = k-self.L_ if verbose: print("At k = %d, aka. iteration = %d" % (k, i_)) for h in range(self.H_): # resample H times if verbose: print("\tAt resampling h = %d, (k = %d)" % (h, k)) resampled_indices, resample_data = self._internal_resample( data, self.resample_proportion_) Mh = self.cluster_(n_clusters=k).fit_predict(resample_data) # find indexes of elements from same clusters with bisection # on sorted array => this is more efficient than brute force search index_mapping = np.array((Mh, resampled_indices)).T index_mapping = index_mapping[index_mapping[:, 0].argsort()] sorted_ = index_mapping[:, 0] id_clusts = index_mapping[:, 1] for i in range(k): # for each cluster ia = bisect.bisect_left(sorted_, i) ib = bisect.bisect_right(sorted_, i) is_ = np.sort(id_clusts[ia:ib]) ids_ = np.array(list(combinations(is_, 2))).T # sometimes only one element is in a cluster (no combinations) if ids_.size != 0: Mk[i_, ids_[0], ids_[1]] += 1 # increment counts ids_2 = np.array(list(combinations(resampled_indices, 2))).T Is[ids_2[0], ids_2[1]] += 1 Mk[i_] /= Is+1e-8 # consensus matrix # Mk[i_] is upper triangular (with zeros on diagonal), we now make it symmetric Mk[i_] += Mk[i_].T Mk[i_, range(data.shape[0]), range( data.shape[0])] = 1 # always with self Is.fill(0) # reset counter self.Mk = Mk # fits areas under the CDFs self.Ak = np.zeros(self.K_-self.L_) for i, m in enumerate(Mk): hist, bins = np.histogram(m.ravel(), density=True) self.Ak[i] = np.sum(h*(b-a) for b, a, h in zip(bins[1:], bins[:-1], np.cumsum(hist))) # fits differences between areas under CDFs self.deltaK = np.array([(Ab-Aa)/Aa if i > 2 else Aa for Ab, Aa, i in zip(self.Ak[1:], self.Ak[:-1], range(self.L_, self.K_-1))]) self.bestK = np.argmax(self.deltaK) + \ self.L_ if self.deltaK.size > 0 else self.L_
[docs] def predict(self): """ Predicts on the consensus matrix, for best found cluster number """ assert self.Mk is not None, "First run fit" return self.cluster_(n_clusters=self.bestK).fit_predict( 1-self.Mk[self.bestK-self.L_])
[docs] def predict_data(self, data): """ Predicts on the data, for best found cluster number Args: * data -> (examples,attributes) format """ assert self.Mk is not None, "First run fit" return self.cluster_(n_clusters=self.bestK).fit_predict( data)