一、聚类的思想及原理
聚类是一种无监督学习,它与分类的不同,聚类所要求划分的类是未知的。聚类算法的思想就是物以类聚的思想,相同性质的点在空间中表现的较为紧密和接近,主要用于数据探索与异常检测。 聚类分析是一种探索性的分析,在分类的过程中,人们不必事先给出一个分类的标准,它能够从样本数据出发,自动进行分类。聚类分析也有很多方法,使用不同方法往往会得到不同的结论。从实际应用的角度看,聚类分析是数据挖掘的主要任务之一。而且聚类能够作为一个独立的工具获得数据的分布状况,观察每一簇数据的特征,集中对特定的聚簇集合作进一步地分析。聚类分析还可以作为其他算法(如分类和推荐等算法)的预处理步骤
二、构建聚类模型 2.1、加载数据 val rawdata = spark.read
.format("csv")
.option("header", true)
.load("C:\\Users\\12285\\Desktop\\customers_sale.csv")
val df = rawdata.select(
rawdata("Channel").cast("Double"),
rawdata("Region").cast("Double"),
rawdata("Fresh").cast("Double"),
rawdata("Milk").cast("Double"),
rawdata("Grocery").cast("Double"),
rawdata("Frozen").cast("Double"),
rawdata("Detergents_Paper").cast("Double"),
rawdata("Delicassen").cast("Double"))
2.2、分析特征间的相关性
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
df = pd.read_csv('C:\\Users\\12285\\Desktop\\customers_sale.csv', header=0)
cols = ["Channel", "Region", "Fresh", "Milk",
"Grocery", "Frozen", "Detergents_Paper", "Delicassen"]
cm = np.corrcoef(df[cols].values.T)
sns.set(font_scale=0.8)
hm = sns.heatmap(cm, cbar=True, annot=True, square=True, fmt='.2f',
annot_kws={'size': 10.0}, yticklabels=cols, xticklabels=cols)
plt.show()
从上图可以看出, 零食上的消费(Grocery)和在洗涤用品和纸上的消费(Detergents_paper)的相关性最大0.92, 零食上的消费(Grocery)和奶制品的消费(Milk)的关联也比较大0.73;另外渠道(Channel)和零食上的消费、洗涤用品、纸上的消费有一定关联。
2.3、数据的预处理 从统计信息可以看出Channel, Region为类别型, 其余6个字段为连续型。 在训练模型前,需要对类别型特征先转换为二元向量, 然后,对各个特征进行规范化。最后得到一个新的特征向量。
// 统计信息
df.describe().show()
+-------+------------------+------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+
|summary| Channel| Region| Fresh| Milk| Grocery| Frozen| Detergents_Paper| Delicassen|
+-------+------------------+------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+
| count| 440| 440| 440| 440| 440| 440| 440| 440|
| mean|1.3227272727272728| 2.543181818181818|12000.297727272728| 5796.265909090909|7951.277272727273|3071.931818181818|2881.4931818181817|1524.8704545454545|
| stddev|0.4680515694791137|0.7742724492301002|12647.328865076885|7380.3771745708445|9503.162828994346|4854.673332592367| 4767.854447904201|2820.1059373693965|
| min| 1.0| 1.0| 3.0| 55.0| 3.0| 25.0| 3.0| 3.0|
| max| 2.0| 3.0| 112151.0| 73498.0| 92780.0| 60869.0| 40827.0| 47943.0|
+-------+------------------+------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+
2.3.1、将类别特征转换为二元编码
// 对类别特征转换为二元向量
val hotEstimator = new OneHotEncoderEstimator()
.setInputCols(Array("Channel", "Region"))
.setOutputCols(Array("ChannelVc", "RegionVc"))
.setDropLast(false)
val df2 = hotEstimator.fit(df).transform(df)
df2.show(10)
// df2.printSchema()
// 把新生成的两个特征及原来的6个特征组成一个特征向量
val featuresArray = Array("ChannelVc", "RegionVc", "Fresh", "Milk",
"Grocery", "Frozen", "Detergents_Paper", "Delicassen")
// 把源数据组合成特征向量features
val vcAssembler = new VectorAssembler()
.setInputCols(featuresArray)
.setOutputCol("features")
2.3.3、对特征进行规范化
// 对特征进行规范化
val scaledDF = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithStd(true)
.setWithMean(false)
三、组装
3.1、组装训练模型
//把转换二元向量、特征规范化转换等组装到流水线上,因pipeline中无聚类的评估函数,故,这里流水线中不纳入kmeans。具体实现如下:
val pipeline1 = new Pipeline().setStages(Array(hotEstimator, vcAssembler, scaledDF))
val data2 = pipeline1.fit(df).transform(df)
// kmeans模型
val kmeans = new KMeans().setFeaturesCol("scaledFeatures").setK(4).setSeed(123)
//训练模型
val model = kmeans.fit(data2)
val results = model.transform(data2)
3.2、模型评估
但是发现两种方法的评估相差很大。
//评估模型
val WSSSE = model.computeCost(results)
println(s"Within Set Sum of Squared Errors = $WSSSE")
// Within Set Sum of Squared Errors = 2595.7328287620885
// 推荐使用ClusteringEvaluator
val evaluator = new ClusteringEvaluator()
val silhouette = evaluator.evaluate(results)
println(s"Silhouette with squared euclidean distance = $silhouette")
// Silhouette with squared euclidean distance = 0.017629072465542267
3.3、显示聚类结果
//显示聚类结果。
println("Cluster Centers: ")
model.clusterCenters.foreach(println)
results.collect().foreach(row => {
println(row(10) + " is predicted as cluster " + row(11))
})
// 当k = 4 时
results.select("scaledFeatures", "prediction").groupBy("prediction").count.show()
+----------+-----+
|prediction|count|
+----------+-----+
| 1| 10|
| 3| 136|
| 2| 64|
| 0| 230|
+----------+-----+
四、模型优化
聚类模型中最重要的是参数k的选择,下面我们通过循环来获取哪个k值的性能最好。
// 模型的优化
val KSSE = (2 to 20 by 1).toList.map { k =>
val kmeans = new KMeans().setFeaturesCol("scaledFeatures").setK(k).setSeed(123)
val model = kmeans.fit(data2)
// 评估性能.
val WSSSE = model.computeCost(data2)
// K,实际迭代次数,SSE,聚类类别编号,每类的记录数,类中心点
(k, model.getMaxIter, WSSSE, model.summary.cluster, model.summary.clusterSizes, model.clusterCenters)
}
//显示k、WSSSE评估指标,并按指标排序
KSSE.map(x => (x._1, x._3)).sortBy(x => x._2).foreach(println)
// 将结果存到文件中
import spark.implicits._
KSSE.map(x=>(x._1,x._3)).sortBy(x=>x._2).toDF.write.save("C:\\Users\\12285\\Desktop\\ksse")
//显示结果
(20,635.6231456631109)
(19,674.1240263779249)
(18,696.2925462727684)
(17,747.697734807987)
(15,848.393503421027)
(16,878.8045714559038)
(14,932.4137349866897)
(13,988.2458378719449)
(12,1026.9426528633646)
(11,1165.7468060138433)
(10,1201.1295734061587)
(9,1242.388169008257)
(8,1399.0770764839624)
(7,1523.4613624094593)
(6,1965.6551642041663)
(5,2405.5349119889274)
(4,2595.7328287620885)
(3,3123.9948271417393)
(2,3480.224930619828)
以上数据可视化, k12后,逐渐变缓。所以K越大不一定越好,恰当才是重要的。