您当前的位置: 首页 > 

宝哥大数据

暂无认证

  • 1浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

reduceByKey聚合,key相同, 结果没有合并的问题

宝哥大数据 发布时间:2019-10-11 11:01:42 ,浏览量:1

问题: 对一个维度McLocEcuTypeKey(自定义的Key), 使用reduceByKey对数据进行聚合,然后入库,但是入库时维度Key重复。
错误信息:
com.mysql.jdbc.exceptions.jdbc4.MySQLIntegrityConstraintViolationException: Duplicate entry '2018-12-21-12371-12387-12388-N1S-v3' for key 'sdcn_date'
McLocEcuTypeKey就是入库的索引保证唯一, McLocEcuTypeKey也重写了equals和hashCode方法。

下图是索引对应McLocEcuTypeKey的属性。 在这里插入图片描述

class McLocEcuTypeKey extends Serializable {
    // yyyy-MM-dd yyyy-MM
    private var date: String = null

    private var area: Short = 0
    private var province: Short = 0
    private var city: Short = 0

    private var maincate: String = null
    private var ecuType: String = null

    override def toString: String = {
        date + "," + area + "," + province + "," + city + "," + maincate + "," + ecuType
    }
	// 重写了 equal, hashCode方法
    def canEqual(other: Any): Boolean = other.isInstanceOf[McLocEcuTypeKey]

    override def hashCode(): Int = {
        val state = Seq(date, area, province, city, maincate, ecuType)
        state.map(t => if (t == null) 0 else t.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
    }

    override def equals(other: Any): Boolean = other match {
        case that: McLocEcuTypeKey =>
            (that canEqual this) &&
                date == that.date &&
                area == that.area &&
                province == that.province &&
                city == that.city &&
                maincate == that.maincate &&
                ecuType == that.ecuType
      case _ => false
    }
}

入库逻辑, McLocEcuTypeKey作为reduceByKey的Key, 但是异常的是相同的Key, 结果没有合并。
    def coreDailyChargeNum(joinRdd: RDD[(McLocEcuTypeKey, Int)]) = {
	
        val outRdd = joinRdd.reduceByKey(_ + _).repartition(5)

        outRdd.foreach(t => {println(t._1)})

        outRdd.foreachPartition(ts => {
                val mySqlUtil = new MySqlUtil()
                try {
                    val connection = mySqlUtil.initConnection()

                    //insert
                    var iSql = "INSERT INTO " + Constant.stats_daily_charge_num +
                        " (`sdcn_date`, `sdcn_area`, `sdcn_province`, `sdcn_city`, `sdcn_maincate`, "
                    iSql += "`sdcn_charge_num`, `sdcn_week`, `sdcn_ecu_type`) "
                    iSql += "VALUES (?, ?, ?, ?, ?, ?, ?, ?)"
                    val iStmt = connection.prepareStatement(iSql)

                    for (t  throw ex
                } finally {
                    mySqlUtil.close()
                }
            })
    }
解决:

这个不是reduceByKey的的问题, 是由于前一个步骤设置McLocEcuTypeKey错误,导致

关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0376s