问题: 对一个维度McLocEcuTypeKey(自定义的Key), 使用reduceByKey对数据进行聚合,然后入库,但是入库时维度Key重复。
错误信息:
com.mysql.jdbc.exceptions.jdbc4.MySQLIntegrityConstraintViolationException: Duplicate entry '2018-12-21-12371-12387-12388-N1S-v3' for key 'sdcn_date'
McLocEcuTypeKey就是入库的索引保证唯一, McLocEcuTypeKey也重写了equals和hashCode方法。
下图是索引对应McLocEcuTypeKey的属性。
class McLocEcuTypeKey extends Serializable {
// yyyy-MM-dd yyyy-MM
private var date: String = null
private var area: Short = 0
private var province: Short = 0
private var city: Short = 0
private var maincate: String = null
private var ecuType: String = null
override def toString: String = {
date + "," + area + "," + province + "," + city + "," + maincate + "," + ecuType
}
// 重写了 equal, hashCode方法
def canEqual(other: Any): Boolean = other.isInstanceOf[McLocEcuTypeKey]
override def hashCode(): Int = {
val state = Seq(date, area, province, city, maincate, ecuType)
state.map(t => if (t == null) 0 else t.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
}
override def equals(other: Any): Boolean = other match {
case that: McLocEcuTypeKey =>
(that canEqual this) &&
date == that.date &&
area == that.area &&
province == that.province &&
city == that.city &&
maincate == that.maincate &&
ecuType == that.ecuType
case _ => false
}
}
入库逻辑, McLocEcuTypeKey作为reduceByKey的Key, 但是异常的是相同的Key, 结果没有合并。
def coreDailyChargeNum(joinRdd: RDD[(McLocEcuTypeKey, Int)]) = {
val outRdd = joinRdd.reduceByKey(_ + _).repartition(5)
outRdd.foreach(t => {println(t._1)})
outRdd.foreachPartition(ts => {
val mySqlUtil = new MySqlUtil()
try {
val connection = mySqlUtil.initConnection()
//insert
var iSql = "INSERT INTO " + Constant.stats_daily_charge_num +
" (`sdcn_date`, `sdcn_area`, `sdcn_province`, `sdcn_city`, `sdcn_maincate`, "
iSql += "`sdcn_charge_num`, `sdcn_week`, `sdcn_ecu_type`) "
iSql += "VALUES (?, ?, ?, ?, ?, ?, ?, ?)"
val iStmt = connection.prepareStatement(iSql)
for (t throw ex
} finally {
mySqlUtil.close()
}
})
}
解决:
这个不是reduceByKey的的问题, 是由于前一个步骤设置McLocEcuTypeKey错误,导致