-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathconsensus_cluster.py
More file actions
116 lines (102 loc) · 4.17 KB
/
consensus_cluster.py
File metadata and controls
116 lines (102 loc) · 4.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
###############################################
# Copyright Žiga Sajovic, XLAB 2019 #
# Distributed under the MIT License #
# #
# github.com/ZigaSajovic/Consensus_Clustering #
# #
###############################################
from __future__ import annotations
import bisect
from itertools import combinations
import numpy as np
from scipy.stats import zscore
from sklearn.cluster import AgglomerativeClustering
from . import BaseClusterEstimator
class ConsensusCluster(BaseClusterEstimator):
"""
Implementation of Consensus clustering.
This follows the paper
https://link.springer.com/content/pdf/10.1023%2FA%3A1023949509487.pdf
https://github.com/ZigaSajovic/Consensus_Clustering/blob/master/consensusClustering.py
* cluster -> clustering class
* NOTE: the class is to be instantiated with parameter `n_clusters`,
and possess a `fit_predict` method, which is invoked on data.
* L -> smallest number of clusters to try
* K -> biggest number of clusters to try
* H -> number of resamplings for each cluster number
* resample_proportion -> percentage to sample.
"""
def __init__(
self,
n_clusters,
K=None,
H=100,
resample_proportion=0.9,
linkage="average",
z_score=False,
z_cap=3, # ignored if z_score is False
cluster=AgglomerativeClustering,
):
super().__init__()
assert 0 <= resample_proportion <= 1, "proportion has to be between 0 and 1"
self.n_clusters = n_clusters
self.K = K if K else n_clusters
self.H = H
self.resample_proportion = resample_proportion
self.cluster = cluster
self.linkage = linkage
self.z_score = z_score
assert z_cap > 0, f"z_cap should be stricly positive, but got {z_cap}"
self.z_cap = z_cap
def _internal_resample(self, data, proportion):
"""Resamples the data.
Args:
* data -> (examples,attributes) format
* proportion -> percentage to sample.
"""
resampled_indices = np.random.choice(range(data.shape[0]), size=int(data.shape[0] * proportion), replace=False)
return resampled_indices, data[resampled_indices, :]
def fit(self, data):
"""
Fits a consensus matrix for each number of clusters.
Args:
* data -> (examples,attributes) format
"""
# zscore and clip
if self.z_score:
data = self._z_score(data)
Mk = np.zeros((data.shape[0], data.shape[0]))
Is = np.zeros((data.shape[0],) * 2)
for _ in range(self.H):
resampled_indices, resample_data = self._internal_resample(data, self.resample_proportion)
Mh = self.cluster(n_clusters=self.K, linkage=self.linkage).fit_predict(resample_data)
index_mapping = np.array((Mh, resampled_indices)).T
index_mapping = index_mapping[index_mapping[:, 0].argsort()]
sorted_ = index_mapping[:, 0]
id_clusts = index_mapping[:, 1]
for i in range(self.K):
ia = bisect.bisect_left(sorted_, i)
ib = bisect.bisect_right(sorted_, i)
is_ = id_clusts[ia:ib]
ids_ = np.array(list(combinations(is_, 2))).T
if ids_.size != 0:
Mk[ids_[0], ids_[1]] += 1
ids_2 = np.array(list(combinations(resampled_indices, 2))).T
Is[ids_2[0], ids_2[1]] += 1
Mk /= Is + 1e-8
Mk += Mk.T
Mk[range(data.shape[0]), range(data.shape[0])] = 1
self.Mk = Mk
self._is_fitted = True
return self
def fit_predict(self, data):
"""Builds consensus matrix via fit(), then clusters on it."""
self.fit(data)
distance_matrix = 1 - self.Mk
return AgglomerativeClustering(
n_clusters=self.n_clusters, metric="precomputed", linkage=self.linkage
).fit_predict(distance_matrix)
def _z_score(self, data):
data = zscore(data, axis=0)
data = np.clip(data, a_min=-self.z_cap, a_max=self.z_cap)
return data