您当前的位置: 首页 > 

宝哥大数据

暂无认证

  • 0浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

维度数据

宝哥大数据 发布时间:2021-03-07 11:28:05 ,浏览量:0

维度数据全量装载

为了后续将订单、订单明细等数据进行实时ETL拉宽,需要提前将一些维度数据加载一个高性能存储中。此处,选择Redis作为商品维度、商品分类维度、门店维度、运营组织机构维度存储。先一次性将所有MySQL中的维度数据全量装载到Redis中,后续只要MySQL中的维度数据更新,马上使用Flink更新Redis中的维度数据

创建样例类
  • 在 com.chb.shop.realtime.bean 的 DimEntity 类中创建以下样例类

  • DimGoodsDBEntity 商品维度样例类

列名描述goodsName商品名称shopId店铺idgoodsCatId商品分类idshopPrice商品价格goodsId商品id
  • DimGoodsCatDBEntity 商品分类维度样例类
列名描述catId商品分类idparentId商品分类父idcatName商品分类名称cat_level商品分类级别
  • DimShopsDBEntity 店铺维度样例类
列名描述shopId店铺idareaId区域idshopName店铺名称shopCompany店铺公司名称
  • DimOrgDBEntity 机构维度样例表
列名描述orgId机构idparentId父机构idorgName机构名称orgLevel机构级别
  • DimShopCatsDBEntity门店商品分类维度样例表
列名描述catId门店商品分类idparentId门店商品分类父idcatName门店商品分类名称catSort门店商品分类级别

参考代码:

/**
 * 定义维度表的样例类
 *
 * @BeanProperty:生成set和get方法
 */
// 商品维度样例类
case class DimGoodsDBEntity(@BeanProperty goodsId:Long = 0,		// 商品id
                            @BeanProperty goodsName:String = "",	// 商品名称
                            @BeanProperty shopId:Long = 0,			// 店铺id
                            @BeanProperty goodsCatId:Int = 0,   // 商品分类id
                            @BeanProperty shopPrice:Double = 0) // 商品价格
/**
 * 商品的伴生对象
 */
object DimGoodsDBEntity{
  def apply(json:String): DimGoodsDBEntity = {
    //正常的话,订单明细表的商品id会存在与商品表中,假如商品id不存在商品表,这里解析的时候就会抛出异常
    if(json != null){
      val jsonObject: JSONObject = JSON.parseObject(json)
      new DimGoodsDBEntity(
        jsonObject.getLong("goodsId"),
        jsonObject.getString("goodsName"),
        jsonObject.getLong("shopId"),
        jsonObject.getInteger("goodsCatId"),
        jsonObject.getDouble("shopPrice"))
    }else{
      new DimGoodsDBEntity
    }
  }
}


// 商品分类维度样例类
case class DimGoodsCatDBEntity(@BeanProperty catId:String = "",	    // 商品分类id
                               @BeanProperty parentId:String = "",	// 商品分类父id
                               @BeanProperty catName:String = "",	  // 商品分类名称
                               @BeanProperty cat_level:String = "")	// 商品分类级别

object DimGoodsCatDBEntity {
  def apply(json:String): DimGoodsCatDBEntity = {
    if(json != null) {
      val jsonObj = JSON.parseObject(json)

      val catId = jsonObj.getString("catId")
      val catName = jsonObj.getString("catName")
      val cat_level = jsonObj.getString("cat_level")
      val parentId = jsonObj.getString("parentId")
      DimGoodsCatDBEntity(catId, parentId, catName, cat_level)
    }else{
      new DimGoodsCatDBEntity
    }
  }
}

// 店铺维度样例类
case class DimShopsDBEntity(@BeanProperty shopId:Int  = 0,		      // 店铺id
                            @BeanProperty areaId:Int  = 0,		      // 店铺所属区域id
                            @BeanProperty shopName:String  = "",	  // 店铺名称
                            @BeanProperty shopCompany:String  = "")	// 公司名称

object DimShopsDBEntity {
  def apply(json:String): DimShopsDBEntity = {
    if(json != null) {
      val jsonObject = JSON.parseObject(json)
      val areaId = jsonObject.getString("areaId")
      val shopCompany = jsonObject.getString("shopCompany")
      val shopId = jsonObject.getString("shopId")
      val shopName = jsonObject.getString("shopName")

      DimShopsDBEntity(shopId.toInt, areaId.toInt, shopName, shopCompany)
    }else{
      new DimShopsDBEntity
    }
  }
}

// 组织结构维度样例类
case class DimOrgDBEntity(@BeanProperty orgId:Int = 0,			  // 机构id
                          @BeanProperty parentId:Int = 0,		  // 机构父id
                          @BeanProperty orgName:String = "",	// 组织机构名称
                          @BeanProperty orgLevel:Int = 0)		  // 组织机构级别

object DimOrgDBEntity {
  def apply(json:String): DimOrgDBEntity = {
    if(json != null) {
      val jsonObject = JSON.parseObject(json)
      val orgId = jsonObject.getString("orgId")
      val orgLevel = jsonObject.getString("orgLevel")
      val orgName = jsonObject.getString("orgName")
      val parentId = jsonObject.getString("parentId")

      DimOrgDBEntity(orgId.toInt, parentId.toInt, orgName, orgLevel.toInt)
    }else{
      new DimOrgDBEntity()
    }
  }
}

// 门店商品分类维度样例类
case class DimShopCatDBEntity(@BeanProperty catId:String = "",	      // 商品分类id
                              @BeanProperty parentId:String = "",	  // 商品分类父id
                              @BeanProperty catName:String = "", 	  // 商品分类名称
                              @BeanProperty catSort:String = "")	    // 商品分类级别

object DimShopCatDBEntity {
  def apply(json:String): DimShopCatDBEntity = {
    if(json != null) {
      val jsonObj = JSON.parseObject(json)
      val catId = jsonObj.getString("catId")
      val catName = jsonObj.getString("catName")
      val catSort = jsonObj.getString("catSort")
      val parentId = jsonObj.getString("parentId")
      DimShopCatDBEntity(catId, parentId, catName, catSort)
    }else{
      new DimShopCatDBEntity()
    }
  }
}
在配置文件中添加Redis配置、MySQL配置
# Redis配置
redis.server.ip="node2"
redis.server.port=6379

# MySQL配置
mysql.server.ip="node1"
mysql.server.port=3306
mysql.server.database="chb_shop"
mysql.server.username="root"
mysql.server.password="123456"
编写配置工具类
val `redis.server.ip` = config.getString("redis.server.ip")
val `redis.server.port` = config.getString("redis.server.port")
val `mysql.server.ip` = config.getString("mysql.server.ip")
val `mysql.server.port` = config.getString("mysql.server.port")
val `mysql.server.database` = config.getString("mysql.server.database")
val `mysql.server.username` = config.getString("mysql.server.username")
val `mysql.server.password` = config.getString("mysql.server.password")
编写Redis操作工具类

在utils类中添加RedisUtils类,使用Redis连接池操作Redis

object RedisUtil {
  val config = new JedisPoolConfig()

  //是否启用后进先出, 默认true
  config.setLifo(true)
  //最大空闲连接数, 默认8个
  config.setMaxIdle(8)
  //最大连接数, 默认8个
  config.setMaxTotal(1000)
  //获取连接时的最大等待毫秒数(如果设置为阻塞时BlockWhenExhausted),如果超时就抛异常, 小于零:阻塞不确定的时间,  默认-1
  config.setMaxWaitMillis(-1)
  //逐出连接的最小空闲时间 默认1800000毫秒(30分钟)
  config.setMinEvictableIdleTimeMillis(1800000)
  //最小空闲连接数, 默认0
  config.setMinIdle(0)
  //每次逐出检查时 逐出的最大数目 如果为负数就是 : 1/abs(n), 默认3
  config.setNumTestsPerEvictionRun(3)
  //对象空闲多久后逐出, 当空闲时间>该值 且 空闲连接>最大空闲数 时直接逐出,不再根据MinEvictableIdleTimeMillis判断  (默认逐出策略)
  config.setSoftMinEvictableIdleTimeMillis(1800000)
  //在获取连接的时候检查有效性, 默认false
  config.setTestOnBorrow(false)
  //在空闲时检查有效性, 默认false
  config.setTestWhileIdle(false)

  var jedisPool: JedisPool = new JedisPool(config, GlobalConfigUtil.`redis.server.ip`, GlobalConfigUtil.`redis.server.port`.toInt)

  /**
    * 获取Redis连接
    * @return
    */
  def getResouce() = {
    jedisPool.getResource
  }
}
读取MySQL商品维度数据到Redis

在 cn.chb.shop.realtime.etl.dataloader 包中创建 DimensionDataLoader 单例对象,实现装载商品维度数据

实现步骤:

1、先从MySQL的 chb_goods 表中加载数据

2、将数据保存到键为 chb_goods:dim_goods 的 HASH 结构中

3、关闭资源

参考代码:

  def main(args: Array[String]): Unit = {
    Class.forName("com.mysql.jdbc.Driver")

    val connection = DriverManager.getConnection("jdbc:mysql://node1:3306/chb_shop", "root", "123456")
    val jedis = RedisUtil.getJedis()

    loadDimGoods(connection, jedis)
  }

  // 加载商品维度数据到Redis
  def loadDimGoods(connection: Connection, jedis: Jedis) = {
    val querySql: String =
      """
        |SELECT
        |	t1.`goodsId`,
        |	t1.`goodsName`,
        |	t1.`goodsCatId`,
        | t1.`shopPrice`,
        |	t1.`shopId`
        |FROM
        |	chb_goods t1
      """.stripMargin
    val statement: Statement = connection.createStatement
    val resultSet: ResultSet = statement.executeQuery(querySql)

    while (resultSet.next) {
      val goodsId: Long = resultSet.getLong("goodsId")
      val goodsName: String = resultSet.getString("goodsName")
      val goodsCatId: Long = resultSet.getLong("goodsCatId")
      val shopPrice:Double = resultSet.getDouble("shopPrice")
      val shopId: Long = resultSet.getLong("shopId")

      val entity = DimGoodsDBEntity(goodsId, goodsName, shopId, goodsCatId.toInt, shopPrice)
      println(entity)

      jedis.hset("chb_shop:dim_goods", goodsId + "", JSON.toJSONString(entity, SerializerFeature.DisableCircularReferenceDetect))
    }
    resultSet.close()
    statement.close()
  }

读取MySQL店铺维度数据到Redis

实现步骤:

1、先从MySQL的 chb_shops 表中加载数据

2、将数据保存到键为 chb_goods:dim_shops 的 HASH 结构中

3、关闭资源

参考代码:

// 加载商铺维度数据到Redis
  // 加载商铺维度数据到Redis
  def loadDimShops(connection: Connection, jedis: Jedis) = {
    val sql =
      """
        |SELECT
        |	t1.`shopId`,
        |	t1.`areaId`,
        |	t1.`shopName`,
        |	t1.`shopCompany`
        |FROM
        |	chb_shops t1
      """.stripMargin

    val statement = connection.createStatement()
    val resultSet = statement.executeQuery(sql)

    while (resultSet.next()) {
      val shopId = resultSet.getInt("shopId")
      val areaId = resultSet.getInt("areaId")
      val shopName = resultSet.getString("shopName")
      val shopCompany = resultSet.getString("shopCompany")

      val dimShop = DimShopsDBEntity(shopId, areaId, shopName, shopCompany)
      println(dimShop)
      jedis.hset("chb_shop:dim_shops", shopId + "", JSON.toJSONString(dimShop, SerializerFeature.DisableCircularReferenceDetect))
    }

    resultSet.close()
    statement.close()
  }
读取MySQL商品分类维度数据到Redis

实现步骤:

1、先从MySQL的 chb_goods_cats 表中加载数据

2、将数据保存到键为 chb_shop:dim_goods_cats 的 HASH 结构中

3、关闭资源

参考代码:

  def loadDimGoodsCats(connection: Connection, jedis: Jedis) = {
    val sql = """
                |SELECT
                |	t1.`catId`,
                |	t1.`parentId`,
                |	t1.`catName`,
                |	t1.`cat_level`
                |FROM
                |	chb_goods_cats t1
              """.stripMargin

    val statement = connection.createStatement()
    val resultSet = statement.executeQuery(sql)

    while(resultSet.next()) {
      val catId = resultSet.getString("catId")
      val parentId = resultSet.getString("parentId")
      val catName = resultSet.getString("catName")
      val cat_level = resultSet.getString("cat_level")

      val entity = DimGoodsCatDBEntity(catId, parentId, catName, cat_level)
      println(entity)

      jedis.hset("chb_shop:dim_goods_cats", catId, JSON.toJSONString(entity, SerializerFeature.DisableCircularReferenceDetect))
    }

    resultSet.close()
    statement.close()
  }
读取MySQL组织结构数据到Redis

实现步骤:

1、先从MySQL的 chb_org 表中加载数据

2、将数据保存到键为 chb_shop:dim_org 的 HASH 结构中

3、关闭资源

参考代码:

  // 加载组织结构维度数据
  def loadDimOrg(connection: Connection, jedis: Jedis) = {
    val sql = """
                |SELECT
                |	orgid,
                |	parentid,
                |	orgName,
                |	orgLevel
                |FROM
                |	chb_org
              """.stripMargin

    val statement = connection.createStatement()
    val resultSet = statement.executeQuery(sql)

    while(resultSet.next()) {
      val orgId = resultSet.getInt("orgId")
      val parentId = resultSet.getInt("parentId")
      val orgName = resultSet.getString("orgName")
      val orgLevel = resultSet.getInt("orgLevel")

      val entity = DimOrgDBEntity(orgId, parentId, orgName, orgLevel)
      println(entity)
      jedis.hset("chb_shop:dim_org", orgId + "", JSON.toJSONString(entity, SerializerFeature.DisableCircularReferenceDetect))
    }
  }
读取MySQL门店商品分类维度数据到Redis

实现步骤:

1、先从MySQL的 chb_shop_cats表中加载数据

2、将数据保存到键为 chb_shop:dim_shop_cats 的 HASH 结构中

3、关闭资源

参考代码:

// 加载门店商品分类维度数据到Redis
  def LoadDimShopCats(connection: Connection, jedis: Jedis): Unit ={
    val sql = """
                |SELECT
                |	t1.`catId`,
                |	t1.`parentId`,
                |	t1.`catName`,
                |	t1.`catSort`
                |FROM
                |	chb_shop_cats t1
              """.stripMargin

    val statement = connection.createStatement()
    val resultSet = statement.executeQuery(sql)

    while(resultSet.next()) {
      val catId = resultSet.getString("catId")
      val parentId = resultSet.getString("parentId")
      val catName = resultSet.getString("catName")
      val cat_level = resultSet.getString("catSort")

      val entity = DimShopCatDBEntity(catId, parentId, catName, cat_level)
      println(entity)

      jedis.hset("chb_shop:dim_shop_cats", catId, JSON.toJSONString(entity, SerializerFeature.DisableCircularReferenceDetect))
    }

    resultSet.close()
    statement.close()
  }
维度数据增量更新 创建同步Redis中维度数据ETL处理类

在etl包下创建SyncDimDataETL类,继承MySqlBaseETL特质,实现process方法

/**
  * Redis维度数据同步业务
  */
class SyncDimDataETL(env: StreamExecutionEnvironment) extends MySqlBaseETL(env) {
  /**
    * 业务处理接口
    */
  override def process(): Unit = {
    
  }
}
过滤维度表消息

只过滤出来维度表的数据流

// 过滤维度表消息
val dimRowDataDS = canalRowDataDS.filter {
    rowData =>
    rowData.getTableName match {
        case "chb_goods" => true
        case "chb_shops" => true
        case "chb_goods_cats" => true
        case "chb_org" => true
        case _ => false
    }
}
处理新增/更新维度消息

使用Redis处理增量同步,insert和update消息统一处理,直接将Redis中的Hash结构更新

/**
  * Redis维度数据同步业务
  */
class SyncDimDataETL(env: StreamExecutionEnvironment) extends MySqlBaseETL(env) {
    ........

    // 更新Redis
    dimRowDataDS.addSink(new RichSinkFunction[RowData] {
      var jedis: Jedis = _

      override def open(parameters: Configuration): Unit = {
        jedis = RedisUtil.getResouce()
        jedis.select(1)
      }

      override def close(): Unit = {
        if (jedis != null) {
          jedis.close()
        }
      }

      override def invoke(rowData: RowData, context: SinkFunction.Context[_]): Unit = {
        rowData.getEventType match {
          case eventType if(eventType == "insert" || eventType == "update") => updateDimData(rowData)
        }
      }

      private def updateDimData(rowData:RowData) = {
        rowData.getTableName match {
          case "chb_goods" => {
            val goodsId = rowData.getColumns.get("goodsId")
            val goodsName = rowData.getColumns.get("goodsName")
            val goodsCatId = rowData.getColumns.get("goodsCatId")
            val shopId = rowData.getColumns.get("shopId")
            val shopPrice = rowData.getColumns.get("shopPrice")

            val entity = DimGoodsDBEntity(goodsId.toLong, goodsName, shopId.toLong, goodsCatId.toInt, shopPrice.toDouble)
            println("增量更新:" + entity)

            jedis.hset("chb_shop:dim_goods", goodsId, JSON.toJSONString(entity, SerializerFeature.DisableCircularReferenceDetect))
          }
          case "chb_shops" => {
            val shopId = rowData.getColumns.get("shopId")
            val areaId = rowData.getColumns.get("areaId")
            val shopName = rowData.getColumns.get("shopName")
            val shopCompany = rowData.getColumns.get("shopCompany")

            val dimShop = DimShopsDBEntity(shopId.toInt, areaId.toInt, shopName, shopCompany)
            println("增量更新:" + dimShop)
            jedis.hset("chb_shop:dim_shops", shopId + "", JSON.toJSONString(dimShop, SerializerFeature.DisableCircularReferenceDetect))
          }
          case "chb_goods_cats" => {
            val catId = rowData.getColumns.get("catId")
            val parentId = rowData.getColumns.get("parentId")
            val catName = rowData.getColumns.get("catName")
            val cat_level = rowData.getColumns.get("cat_level")

            val entity = DimGoodsCatDBEntity(catId, parentId, catName, cat_level)
            println("增量更新:" + entity)

            jedis.hset("chb_shop:dim_goods_cats", catId, JSON.toJSONString(entity, SerializerFeature.DisableCircularReferenceDetect))
          }
          case "chb_org" => {
            val orgId = rowData.getColumns.get("orgId")
            val parentId = rowData.getColumns.get("parentId")
            val orgName = rowData.getColumns.get("orgName")
            val orgLevel = rowData.getColumns.get("orgLevel")

            val entity = DimOrgDBEntity(orgId.toInt, parentId.toInt, orgName, orgLevel.toInt)
            println("增量更新:" + entity)
            jedis.hset("chb_shop:dim_org", orgId + "", JSON.toJSONString(entity, SerializerFeature.DisableCircularReferenceDetect))

          }
        }
      }
    })
  }
}
处理删除维度消息

当接收到Kafka中的binlog消息是delete时,需要将Redis中的维度数据删除

/**
  * Redis维度数据同步业务
  */
class SyncDimDataETL(env: StreamExecutionEnvironment) extends MySqlBaseETL(env) {
  /**
    * 业务处理接口
    */
  override def process(): Unit = {
    ....

    // 更新Redis
    dimRowDataDS.addSink(new RichSinkFunction[RowData] {
      .......

      override def invoke(rowData: RowData, context: SinkFunction.Context[_]): Unit = {
        rowData.getEventType match {
          case eventType if(eventType == "insert" || eventType == "update") => updateDimData(rowData)
          case "delete" => deleteDimData(rowData)
        }
      }

      private def deleteDimData(rowData: RowData) = {
        rowData.getTableName match {
          case "chb_goods" => {
            jedis.hdel("chb_shop:dim_goods", rowData.getColumns.get("goodsId"))
          }
          case "chb_shops" => {
            jedis.hdel("chb_shop:dim_shops", rowData.getColumns.get("shopId"))
          }
          case "chb_goods_cats" => {
            jedis.hdel("chb_shop:dim_goods_cats", rowData.getColumns.get("catId"))
          }
          case "chb_org" => {
            jedis.hdel("chb_shop:dim_org", rowData.getColumns.get("orgId"))
          }
        }
      }
    })
  }
}
在App中调用同步ETL

1、在App中创建实时同步ETL,并调用处理方法

    val syncRedisDimDataETL = new SyncRedisDimDataETL(env)
    syncRedisDimDataETL.process()

2、启动Flink程序

3、测试

  • 新增MySQL中的一条表数据,测试Redis中的数据是否同步更新

  • 修改MySQL中的一条表数据,测试Redis中的数据是否同步更新

  • 删除MySQL中的一条表数据,测试Redis中的数据是否同步更新

查看redis中的数据

192.168.88.121:0>select 1
"OK"
192.168.88.121:1>hget chb_shop:dim_goods 115912
"{"goodsCatId":10361,"goodsId":115912,"goodsName":"机械革命深海泰坦X9Ti-R","shopId":100364,"shopPrice":13999.0}"
关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0427s