1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
| import numpy as np import pandas as pd import random import matplotlib.pyplot as plt
class kMeans(object): # 初始化参数: n_clusters--聚类数量;tolerance--误差;max_iter最大循环次数 def __init__(self, n_clusters=2, tolerance=0.01, max_iter=300): self.__K = n_clusters self.__tolerance = tolerance self.__max_iter = max_iter self.__centroids = None # def fit(self, data): # 获取点云size N, D = data.shape # 将其format为pandas的数据格式 __data = pd.DataFrame( data=data, index=np.arange(N), columns=[f'x{i:03d}' for i in range(D)] ) # 数据的聚类数量初始为0 __data['cluster'] = 0 # 得到适应于数据集的误差 self.__tolerance = kMeans.__tolerance(data, self.__tolerance) # 得到中心点 self.__centroids = self.__get_init_centroid_kmeanspp(data) # 遍历 for i in range(self.__max_iter): # 预测 __data.cluster = __data.apply( lambda x: kMeans.__assign(x[:-1].values, self.__centroids), axis=1 ) # 最大化 new_centroid = __data.groupby(['cluster']).mean().values # 计算L2范数 diff = (new_centroid - self.__centroids).ravel() squared_diff = np.dot(diff, diff) # 更新聚类中心点 self.__centroids = new_centroid if squared_diff <= self.__tolerance: print( f'[KMeans - Fit]: early stopping with squared centroids diff {squared_diff:.2f} at iteration {i:03d}') break # 作出预测 def predict(self, data): N, _ = data.shape result = np.asarray( [kMeans.__assign(data[i], self.__centroids) for i in range(N)] ) return result # 获取中心点 def get_centroids(self): return np.copy(self.__centroids) # 随机选择一个聚类中心点 def __get_init_centroid_random(self, data): N, _ = data.shape idx_centroids = np.random.choice(np.arange(N), size=self.__K, replace=False) centroids = data[idx_centroids] return centroids # 通过kmeans++获取初始的中心点 def __get_init_centroid_kmeanspp(self, data): N, _ = data.shape # 随机获得初始点 centroids = data[np.random.choice(np.arange(N), size=1, replace=False)] # 其他的中心点通过与初始中心点的最小距离按照概率选择 for _ in range(1, self.__K): distances = np.asarray( [ np.min(np.linalg.norm(d - centroids, axis=1)) ** 2 for d in data ] ) # 生成累计概率 probs = distances / np.sum(distances) # 得到给定轴的累计和 cum_probs = np.cumsum(probs) # 选择一个新的中心点 centroids = np.vstack((centroids, data[np.searchsorted(cum_probs, random.random())])) return centroids # 返回一个L2范数的评价指标 def __assign(data, centroids): return np.argmin(np.linalg.norm(centroids - data, axis=1)) # 返回一个基于数据集的误差参数 def __tolerance(data, tol): variances = np.var(data, axis=0) return np.mean(variances) * tol
if __name__ == '__main__': K = 2 X = np.array([ [1, 2], [1.5, 1.8], [5, 8], [8, 8], [1, 0.6], [9, 11] ])
k_means = kMeans(n_clusters=K) k_means.fit(X) category = k_means.predict(X) color = ['red', 'blue', 'cyan', 'magenta'] labels = [f'Cluster{k:02d}' for k in range(K)] for k in range(K): plt.scatter(X[category == k][:,0], X[category == k][:,1], c=color[k], label=labels[k]) centroids = k_means.get_centroids() plt.scatter(centroids[:, 0], centroids[:, 1], s=300, c='grey', marker='P', label='Centroids')
plt.xlabel('X') plt.ylabel('Y') plt.legend() plt.title('KMeans Testcase') plt.show()
|