为了后续将订单、订单明细等数据进行实时ETL拉宽,需要提前将一些维度数据加载一个高性能存储中。此处,选择Redis作为商品维度、商品分类维度、门店维度、运营组织机构维度存储。先一次性将所有MySQL中的维度数据全量装载到Redis中,后续只要MySQL中的维度数据更新,马上使用Flink更新Redis中的维度数据
创建样例类-
在 com.chb.shop.realtime.bean 的 DimEntity 类中创建以下样例类
-
DimGoodsDBEntity 商品维度样例类
- DimGoodsCatDBEntity 商品分类维度样例类
- DimShopsDBEntity 店铺维度样例类
- DimOrgDBEntity 机构维度样例表
- DimShopCatsDBEntity门店商品分类维度样例表
参考代码:
/**
* 定义维度表的样例类
*
* @BeanProperty:生成set和get方法
*/
// 商品维度样例类
case class DimGoodsDBEntity(@BeanProperty goodsId:Long = 0, // 商品id
@BeanProperty goodsName:String = "", // 商品名称
@BeanProperty shopId:Long = 0, // 店铺id
@BeanProperty goodsCatId:Int = 0, // 商品分类id
@BeanProperty shopPrice:Double = 0) // 商品价格
/**
* 商品的伴生对象
*/
object DimGoodsDBEntity{
def apply(json:String): DimGoodsDBEntity = {
//正常的话,订单明细表的商品id会存在与商品表中,假如商品id不存在商品表,这里解析的时候就会抛出异常
if(json != null){
val jsonObject: JSONObject = JSON.parseObject(json)
new DimGoodsDBEntity(
jsonObject.getLong("goodsId"),
jsonObject.getString("goodsName"),
jsonObject.getLong("shopId"),
jsonObject.getInteger("goodsCatId"),
jsonObject.getDouble("shopPrice"))
}else{
new DimGoodsDBEntity
}
}
}
// 商品分类维度样例类
case class DimGoodsCatDBEntity(@BeanProperty catId:String = "", // 商品分类id
@BeanProperty parentId:String = "", // 商品分类父id
@BeanProperty catName:String = "", // 商品分类名称
@BeanProperty cat_level:String = "") // 商品分类级别
object DimGoodsCatDBEntity {
def apply(json:String): DimGoodsCatDBEntity = {
if(json != null) {
val jsonObj = JSON.parseObject(json)
val catId = jsonObj.getString("catId")
val catName = jsonObj.getString("catName")
val cat_level = jsonObj.getString("cat_level")
val parentId = jsonObj.getString("parentId")
DimGoodsCatDBEntity(catId, parentId, catName, cat_level)
}else{
new DimGoodsCatDBEntity
}
}
}
// 店铺维度样例类
case class DimShopsDBEntity(@BeanProperty shopId:Int = 0, // 店铺id
@BeanProperty areaId:Int = 0, // 店铺所属区域id
@BeanProperty shopName:String = "", // 店铺名称
@BeanProperty shopCompany:String = "") // 公司名称
object DimShopsDBEntity {
def apply(json:String): DimShopsDBEntity = {
if(json != null) {
val jsonObject = JSON.parseObject(json)
val areaId = jsonObject.getString("areaId")
val shopCompany = jsonObject.getString("shopCompany")
val shopId = jsonObject.getString("shopId")
val shopName = jsonObject.getString("shopName")
DimShopsDBEntity(shopId.toInt, areaId.toInt, shopName, shopCompany)
}else{
new DimShopsDBEntity
}
}
}
// 组织结构维度样例类
case class DimOrgDBEntity(@BeanProperty orgId:Int = 0, // 机构id
@BeanProperty parentId:Int = 0, // 机构父id
@BeanProperty orgName:String = "", // 组织机构名称
@BeanProperty orgLevel:Int = 0) // 组织机构级别
object DimOrgDBEntity {
def apply(json:String): DimOrgDBEntity = {
if(json != null) {
val jsonObject = JSON.parseObject(json)
val orgId = jsonObject.getString("orgId")
val orgLevel = jsonObject.getString("orgLevel")
val orgName = jsonObject.getString("orgName")
val parentId = jsonObject.getString("parentId")
DimOrgDBEntity(orgId.toInt, parentId.toInt, orgName, orgLevel.toInt)
}else{
new DimOrgDBEntity()
}
}
}
// 门店商品分类维度样例类
case class DimShopCatDBEntity(@BeanProperty catId:String = "", // 商品分类id
@BeanProperty parentId:String = "", // 商品分类父id
@BeanProperty catName:String = "", // 商品分类名称
@BeanProperty catSort:String = "") // 商品分类级别
object DimShopCatDBEntity {
def apply(json:String): DimShopCatDBEntity = {
if(json != null) {
val jsonObj = JSON.parseObject(json)
val catId = jsonObj.getString("catId")
val catName = jsonObj.getString("catName")
val catSort = jsonObj.getString("catSort")
val parentId = jsonObj.getString("parentId")
DimShopCatDBEntity(catId, parentId, catName, catSort)
}else{
new DimShopCatDBEntity()
}
}
}
在配置文件中添加Redis配置、MySQL配置
# Redis配置
redis.server.ip="node2"
redis.server.port=6379
# MySQL配置
mysql.server.ip="node1"
mysql.server.port=3306
mysql.server.database="chb_shop"
mysql.server.username="root"
mysql.server.password="123456"
编写配置工具类
val `redis.server.ip` = config.getString("redis.server.ip")
val `redis.server.port` = config.getString("redis.server.port")
val `mysql.server.ip` = config.getString("mysql.server.ip")
val `mysql.server.port` = config.getString("mysql.server.port")
val `mysql.server.database` = config.getString("mysql.server.database")
val `mysql.server.username` = config.getString("mysql.server.username")
val `mysql.server.password` = config.getString("mysql.server.password")
编写Redis操作工具类
在utils类中添加RedisUtils类,使用Redis连接池操作Redis
object RedisUtil {
val config = new JedisPoolConfig()
//是否启用后进先出, 默认true
config.setLifo(true)
//最大空闲连接数, 默认8个
config.setMaxIdle(8)
//最大连接数, 默认8个
config.setMaxTotal(1000)
//获取连接时的最大等待毫秒数(如果设置为阻塞时BlockWhenExhausted),如果超时就抛异常, 小于零:阻塞不确定的时间, 默认-1
config.setMaxWaitMillis(-1)
//逐出连接的最小空闲时间 默认1800000毫秒(30分钟)
config.setMinEvictableIdleTimeMillis(1800000)
//最小空闲连接数, 默认0
config.setMinIdle(0)
//每次逐出检查时 逐出的最大数目 如果为负数就是 : 1/abs(n), 默认3
config.setNumTestsPerEvictionRun(3)
//对象空闲多久后逐出, 当空闲时间>该值 且 空闲连接>最大空闲数 时直接逐出,不再根据MinEvictableIdleTimeMillis判断 (默认逐出策略)
config.setSoftMinEvictableIdleTimeMillis(1800000)
//在获取连接的时候检查有效性, 默认false
config.setTestOnBorrow(false)
//在空闲时检查有效性, 默认false
config.setTestWhileIdle(false)
var jedisPool: JedisPool = new JedisPool(config, GlobalConfigUtil.`redis.server.ip`, GlobalConfigUtil.`redis.server.port`.toInt)
/**
* 获取Redis连接
* @return
*/
def getResouce() = {
jedisPool.getResource
}
}
读取MySQL商品维度数据到Redis
在 cn.chb.shop.realtime.etl.dataloader 包中创建 DimensionDataLoader 单例对象,实现装载商品维度数据
实现步骤:
1、先从MySQL的 chb_goods 表中加载数据
2、将数据保存到键为 chb_goods:dim_goods 的 HASH 结构中
3、关闭资源
参考代码:
def main(args: Array[String]): Unit = {
Class.forName("com.mysql.jdbc.Driver")
val connection = DriverManager.getConnection("jdbc:mysql://node1:3306/chb_shop", "root", "123456")
val jedis = RedisUtil.getJedis()
loadDimGoods(connection, jedis)
}
// 加载商品维度数据到Redis
def loadDimGoods(connection: Connection, jedis: Jedis) = {
val querySql: String =
"""
|SELECT
| t1.`goodsId`,
| t1.`goodsName`,
| t1.`goodsCatId`,
| t1.`shopPrice`,
| t1.`shopId`
|FROM
| chb_goods t1
""".stripMargin
val statement: Statement = connection.createStatement
val resultSet: ResultSet = statement.executeQuery(querySql)
while (resultSet.next) {
val goodsId: Long = resultSet.getLong("goodsId")
val goodsName: String = resultSet.getString("goodsName")
val goodsCatId: Long = resultSet.getLong("goodsCatId")
val shopPrice:Double = resultSet.getDouble("shopPrice")
val shopId: Long = resultSet.getLong("shopId")
val entity = DimGoodsDBEntity(goodsId, goodsName, shopId, goodsCatId.toInt, shopPrice)
println(entity)
jedis.hset("chb_shop:dim_goods", goodsId + "", JSON.toJSONString(entity, SerializerFeature.DisableCircularReferenceDetect))
}
resultSet.close()
statement.close()
}
读取MySQL店铺维度数据到Redis
实现步骤:
1、先从MySQL的 chb_shops 表中加载数据
2、将数据保存到键为 chb_goods:dim_shops 的 HASH 结构中
3、关闭资源
参考代码:
// 加载商铺维度数据到Redis
// 加载商铺维度数据到Redis
def loadDimShops(connection: Connection, jedis: Jedis) = {
val sql =
"""
|SELECT
| t1.`shopId`,
| t1.`areaId`,
| t1.`shopName`,
| t1.`shopCompany`
|FROM
| chb_shops t1
""".stripMargin
val statement = connection.createStatement()
val resultSet = statement.executeQuery(sql)
while (resultSet.next()) {
val shopId = resultSet.getInt("shopId")
val areaId = resultSet.getInt("areaId")
val shopName = resultSet.getString("shopName")
val shopCompany = resultSet.getString("shopCompany")
val dimShop = DimShopsDBEntity(shopId, areaId, shopName, shopCompany)
println(dimShop)
jedis.hset("chb_shop:dim_shops", shopId + "", JSON.toJSONString(dimShop, SerializerFeature.DisableCircularReferenceDetect))
}
resultSet.close()
statement.close()
}
读取MySQL商品分类维度数据到Redis
实现步骤:
1、先从MySQL的 chb_goods_cats 表中加载数据
2、将数据保存到键为 chb_shop:dim_goods_cats 的 HASH 结构中
3、关闭资源
参考代码:
def loadDimGoodsCats(connection: Connection, jedis: Jedis) = {
val sql = """
|SELECT
| t1.`catId`,
| t1.`parentId`,
| t1.`catName`,
| t1.`cat_level`
|FROM
| chb_goods_cats t1
""".stripMargin
val statement = connection.createStatement()
val resultSet = statement.executeQuery(sql)
while(resultSet.next()) {
val catId = resultSet.getString("catId")
val parentId = resultSet.getString("parentId")
val catName = resultSet.getString("catName")
val cat_level = resultSet.getString("cat_level")
val entity = DimGoodsCatDBEntity(catId, parentId, catName, cat_level)
println(entity)
jedis.hset("chb_shop:dim_goods_cats", catId, JSON.toJSONString(entity, SerializerFeature.DisableCircularReferenceDetect))
}
resultSet.close()
statement.close()
}
读取MySQL组织结构数据到Redis
实现步骤:
1、先从MySQL的 chb_org 表中加载数据
2、将数据保存到键为 chb_shop:dim_org 的 HASH 结构中
3、关闭资源
参考代码:
// 加载组织结构维度数据
def loadDimOrg(connection: Connection, jedis: Jedis) = {
val sql = """
|SELECT
| orgid,
| parentid,
| orgName,
| orgLevel
|FROM
| chb_org
""".stripMargin
val statement = connection.createStatement()
val resultSet = statement.executeQuery(sql)
while(resultSet.next()) {
val orgId = resultSet.getInt("orgId")
val parentId = resultSet.getInt("parentId")
val orgName = resultSet.getString("orgName")
val orgLevel = resultSet.getInt("orgLevel")
val entity = DimOrgDBEntity(orgId, parentId, orgName, orgLevel)
println(entity)
jedis.hset("chb_shop:dim_org", orgId + "", JSON.toJSONString(entity, SerializerFeature.DisableCircularReferenceDetect))
}
}
读取MySQL门店商品分类维度数据到Redis
实现步骤:
1、先从MySQL的 chb_shop_cats表中加载数据
2、将数据保存到键为 chb_shop:dim_shop_cats 的 HASH 结构中
3、关闭资源
参考代码:
// 加载门店商品分类维度数据到Redis
def LoadDimShopCats(connection: Connection, jedis: Jedis): Unit ={
val sql = """
|SELECT
| t1.`catId`,
| t1.`parentId`,
| t1.`catName`,
| t1.`catSort`
|FROM
| chb_shop_cats t1
""".stripMargin
val statement = connection.createStatement()
val resultSet = statement.executeQuery(sql)
while(resultSet.next()) {
val catId = resultSet.getString("catId")
val parentId = resultSet.getString("parentId")
val catName = resultSet.getString("catName")
val cat_level = resultSet.getString("catSort")
val entity = DimShopCatDBEntity(catId, parentId, catName, cat_level)
println(entity)
jedis.hset("chb_shop:dim_shop_cats", catId, JSON.toJSONString(entity, SerializerFeature.DisableCircularReferenceDetect))
}
resultSet.close()
statement.close()
}
维度数据增量更新
创建同步Redis中维度数据ETL处理类
在etl包下创建SyncDimDataETL类,继承MySqlBaseETL特质,实现process方法
/**
* Redis维度数据同步业务
*/
class SyncDimDataETL(env: StreamExecutionEnvironment) extends MySqlBaseETL(env) {
/**
* 业务处理接口
*/
override def process(): Unit = {
}
}
过滤维度表消息
只过滤出来维度表的数据流
// 过滤维度表消息
val dimRowDataDS = canalRowDataDS.filter {
rowData =>
rowData.getTableName match {
case "chb_goods" => true
case "chb_shops" => true
case "chb_goods_cats" => true
case "chb_org" => true
case _ => false
}
}
处理新增/更新维度消息
使用Redis处理增量同步,insert和update消息统一处理,直接将Redis中的Hash结构更新
/**
* Redis维度数据同步业务
*/
class SyncDimDataETL(env: StreamExecutionEnvironment) extends MySqlBaseETL(env) {
........
// 更新Redis
dimRowDataDS.addSink(new RichSinkFunction[RowData] {
var jedis: Jedis = _
override def open(parameters: Configuration): Unit = {
jedis = RedisUtil.getResouce()
jedis.select(1)
}
override def close(): Unit = {
if (jedis != null) {
jedis.close()
}
}
override def invoke(rowData: RowData, context: SinkFunction.Context[_]): Unit = {
rowData.getEventType match {
case eventType if(eventType == "insert" || eventType == "update") => updateDimData(rowData)
}
}
private def updateDimData(rowData:RowData) = {
rowData.getTableName match {
case "chb_goods" => {
val goodsId = rowData.getColumns.get("goodsId")
val goodsName = rowData.getColumns.get("goodsName")
val goodsCatId = rowData.getColumns.get("goodsCatId")
val shopId = rowData.getColumns.get("shopId")
val shopPrice = rowData.getColumns.get("shopPrice")
val entity = DimGoodsDBEntity(goodsId.toLong, goodsName, shopId.toLong, goodsCatId.toInt, shopPrice.toDouble)
println("增量更新:" + entity)
jedis.hset("chb_shop:dim_goods", goodsId, JSON.toJSONString(entity, SerializerFeature.DisableCircularReferenceDetect))
}
case "chb_shops" => {
val shopId = rowData.getColumns.get("shopId")
val areaId = rowData.getColumns.get("areaId")
val shopName = rowData.getColumns.get("shopName")
val shopCompany = rowData.getColumns.get("shopCompany")
val dimShop = DimShopsDBEntity(shopId.toInt, areaId.toInt, shopName, shopCompany)
println("增量更新:" + dimShop)
jedis.hset("chb_shop:dim_shops", shopId + "", JSON.toJSONString(dimShop, SerializerFeature.DisableCircularReferenceDetect))
}
case "chb_goods_cats" => {
val catId = rowData.getColumns.get("catId")
val parentId = rowData.getColumns.get("parentId")
val catName = rowData.getColumns.get("catName")
val cat_level = rowData.getColumns.get("cat_level")
val entity = DimGoodsCatDBEntity(catId, parentId, catName, cat_level)
println("增量更新:" + entity)
jedis.hset("chb_shop:dim_goods_cats", catId, JSON.toJSONString(entity, SerializerFeature.DisableCircularReferenceDetect))
}
case "chb_org" => {
val orgId = rowData.getColumns.get("orgId")
val parentId = rowData.getColumns.get("parentId")
val orgName = rowData.getColumns.get("orgName")
val orgLevel = rowData.getColumns.get("orgLevel")
val entity = DimOrgDBEntity(orgId.toInt, parentId.toInt, orgName, orgLevel.toInt)
println("增量更新:" + entity)
jedis.hset("chb_shop:dim_org", orgId + "", JSON.toJSONString(entity, SerializerFeature.DisableCircularReferenceDetect))
}
}
}
})
}
}
处理删除维度消息
当接收到Kafka中的binlog消息是delete时,需要将Redis中的维度数据删除
/**
* Redis维度数据同步业务
*/
class SyncDimDataETL(env: StreamExecutionEnvironment) extends MySqlBaseETL(env) {
/**
* 业务处理接口
*/
override def process(): Unit = {
....
// 更新Redis
dimRowDataDS.addSink(new RichSinkFunction[RowData] {
.......
override def invoke(rowData: RowData, context: SinkFunction.Context[_]): Unit = {
rowData.getEventType match {
case eventType if(eventType == "insert" || eventType == "update") => updateDimData(rowData)
case "delete" => deleteDimData(rowData)
}
}
private def deleteDimData(rowData: RowData) = {
rowData.getTableName match {
case "chb_goods" => {
jedis.hdel("chb_shop:dim_goods", rowData.getColumns.get("goodsId"))
}
case "chb_shops" => {
jedis.hdel("chb_shop:dim_shops", rowData.getColumns.get("shopId"))
}
case "chb_goods_cats" => {
jedis.hdel("chb_shop:dim_goods_cats", rowData.getColumns.get("catId"))
}
case "chb_org" => {
jedis.hdel("chb_shop:dim_org", rowData.getColumns.get("orgId"))
}
}
}
})
}
}
在App中调用同步ETL
1、在App中创建实时同步ETL,并调用处理方法
val syncRedisDimDataETL = new SyncRedisDimDataETL(env)
syncRedisDimDataETL.process()
2、启动Flink程序
3、测试
-
新增MySQL中的一条表数据,测试Redis中的数据是否同步更新
-
修改MySQL中的一条表数据,测试Redis中的数据是否同步更新
-
删除MySQL中的一条表数据,测试Redis中的数据是否同步更新
查看redis中的数据
192.168.88.121:0>select 1
"OK"
192.168.88.121:1>hget chb_shop:dim_goods 115912
"{"goodsCatId":10361,"goodsId":115912,"goodsName":"机械革命深海泰坦X9Ti-R","shopId":100364,"shopPrice":13999.0}"