您当前的位置: 首页 >  sql

宝哥大数据

暂无认证

  • 1浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

DWS 层-关键词主题表(FlinkSQL)

宝哥大数据 发布时间:2021-03-23 20:52:06 ,浏览量:1

文章目录
    • 1.1、需求分析与思路
    • 1.2、关于分词
    • 1.3、 搜索关键词功能实现
      • 1.3.1、IK 分词器的使用
      • 1.3.2、自定义函数
        • 1.3.2.1、自定义函数分类
        • 1.3.2.2、封装 KeywordUDTF 函数 [【参考】](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html)
    • 1.4、创建Table环境
    • 1.5、声明动态表和自定义函数
      • 1.5.1、数据格式
      • 1.5.2、创建table
    • 1.6、从动态表中查询数据
    • 1.7、利用自定义函数 对搜索关键词进行拆分 [参考 joins--Join 表函数](https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/sql/queries.html#joins)
    • 1.8、分组、开窗、聚合
    • 1.9、转换流并写入ClickHouse
      • 1.9.1、创建表
  • 二、整体测试

1.1、需求分析与思路

在这里插入图片描述

  关键词主题这个主要是为了大屏展示中的字符云的展示效果,用于感性的让大屏观看者,感知目前的用户都更关心的那些商品和关键词。   关键词的展示也是一种维度聚合的结果,根据聚合的大小来决定关键词的大小。   关键词的第一重要来源的就是用户在搜索栏的搜索,另外就是从以商品为主题的统计中获取关键词。

1.2、关于分词

  需要根据把长文本分割成一个一个的词,这种分词技术,在搜索引擎中可能会用到。对于中文分词,现在的搜索引擎基本上都是使用的第三方分词器,咱们在计算数据中也可以,使用和搜索引擎中一致的分词器,IK。

1.3、 搜索关键词功能实现 1.3.1、IK 分词器的使用
  1. 添加依赖

 com.janeluo
 ikanalyzer
 2012_u6

  1. 封装分词工具类com.chb.realtime.utils.KeywordUtil
1.3.2、自定义函数

  有了分词器,那么另外一个要考虑的问题就是如何把分词器的使用揉进 FlinkSQL 中。因为 SQL 的语法和相关的函数都是 Flink 内定的,想要使用外部工具,就必须结合自定义函数。

1.3.2.1、自定义函数分类

➢ Scalar Function(相当于 Spark 的 UDF), ➢ Table Function(相当于 Spark 的 UDTF), ➢ Aggregation Functions (相当于 Spark 的 UDAF)   考虑到一个词条包括多个词语所以分词是指上是一种一对多的拆分,一拆多的情况,我们应该选择 Table Function。

1.3.2.2、封装 KeywordUDTF 函数 【参考】

@FunctionHint 主要是为了标识输出数据的类型 row.setField(0,keyword)中的 0 表示返回值下标为 0 的值

@FunctionHint(output = @DataTypeHint("ROW"))
public class KeywordUDTF extends TableFunction {
    public void eval(String value) {
        //使用工具类对字符串进行分词
        List keywordList = KeywordUtil.analyze(value);
        for (String keyword : keywordList) {
            //collect(Row.of(keyword));
            Row row = new Row(1);
            row.setField(0,keyword);
            collect(row);
        }
    }
}

1.4、创建Table环境
        //1.4 创建Table环境
        EnvironmentSettings setting = EnvironmentSettings
            .newInstance()
            .inStreamingMode()
            .build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, setting);
1.5、声明动态表和自定义函数 1.5.1、数据格式
{
	"common": {
		"ar": "310000",
		"uid": "34",
		"os": "Android 11.0",
		"ch": "wandoujia",
		"is_new": "1",
		"md": "vivo iqoo3",
		"mid": "mid_5",
		"vc": "v2.1.134",
		"ba": "vivo"
	},
	"page": {
		"page_id": "good_list",			# 在 商品详情页才会有关键词
		"item": "电视",				# 关键词
		"during_time": 3654,
		"item_type": "keyword",
		"last_page_id": "home"
	},
	"ts": 1616571967000		# 时间
}
1.5.2、创建table

注意 json 格式的要定义为 Map 对象 FROM_UNIXTIME将距离1970-01-01 00:00:00的秒, 转为指定格式的字符串 TO_TIMESTAMP将字符串日期转为timestamp WATERMARK指定水位标记

  • WATERMARK FOR rowtime AS rowtime - INTERVAL '2' SECOND
  • WATERMARK字段的日期必须是timestamp
  • INTERVAL '2' SECOND指定乱序时间

WITH 指定 kafka connector的属性配置

        //TODO 2.注册自定义函数
        tableEnv.createTemporarySystemFunction("ik_analyze", KeywordUDTF.class);

        //TODO 3.创建动态表
        //3.1 声明主题以及消费者组
        String pageViewSourceTopic = "dwd_page_log";
        String groupId = "keywordstats_app_group";
        //3.2建表
       // 注意 json 格式的要定义为 Map 对象
        tableEnv.executeSql(
            "CREATE TABLE page_view (" +
                " common MAP," +
                " page MAP," +
                " ts BIGINT," +
                " rowtime as TO_TIMESTAMP(FROM_UNIXTIME(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +
                " WATERMARK FOR rowtime AS rowtime - INTERVAL '2' SECOND) " +
                " WITH (" + MyKafkaUtil.getKafkaDDL(pageViewSourceTopic, groupId) + ")"
        );
1.6、从动态表中查询数据
        //TODO 4.从动态表中查询数据  --->大数据数仓-> [大, 数据, 数, 仓]
        Table fullwordTable = tableEnv.sqlQuery(
            "select page['item'] fullword,rowtime " +
                " from page_view " +
                " where page['page_id']='good_list' and page['item'] IS NOT NULL"
        );
1.7、利用自定义函数 对搜索关键词进行拆分 参考 joins–Join 表函数

在这里插入图片描述

        //TODO 5.利用自定义函数  对搜索关键词进行拆分
        Table keywordTable = tableEnv.sqlQuery(
            "SELECT keyword, rowtime " +
                "FROM  " + fullwordTable + "," +
                "LATERAL TABLE(ik_analyze(fullword)) AS t(keyword)"
        );
1.8、分组、开窗、聚合

聚合属于查询 参考【分组窗口】 SQL 查询的分组窗口是通过 GROUP BY 子句定义的

        //TODO 6.分组、开窗、聚合
        Table reduceTable = tableEnv.sqlQuery(
            "select keyword,count(*) ct,  '" + ChbConstant.KEYWORD_SEARCH + "' source," +
                "DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND),'yyyy-MM-dd HH:mm:ss') stt," +
                "DATE_FORMAT(TUMBLE_END(rowtime, INTERVAL '10' SECOND),'yyyy-MM-dd HH:mm:ss') edt ," +
                "UNIX_TIMESTAMP()*1000 ts from " + keywordTable +
                " group by TUMBLE(rowtime, INTERVAL '10' SECOND),keyword"
        );
1.9、转换流并写入ClickHouse
        //TODO 7.转换为流
        DataStream keywordStatsDS = tableEnv.toAppendStream(reduceTable, KeywordStats.class);

        keywordStatsDS.print(">>>>");

        //TODO 8.写入到ClickHouse
        keywordStatsDS.addSink(
            ClickHouseUtil.getJdbcSink("insert into keyword_stats(keyword,ct,source,stt,edt,ts) values(?,?,?,?,?,?)")
        );

1.9.1、创建表
create table keyword_stats (
 stt DateTime,
 edt DateTime,
 keyword String ,
 source String ,
 ct UInt64 ,
 ts UInt64
)engine =ReplacingMergeTree( ts)
 partition by toYYYYMMDD(stt)
 order by ( stt,edt,keyword,source );
二、整体测试

➢ 启动 ZK、Kafka、logger.sh、ClickHouse ➢ 运行 BaseLogApp ➢ 运行 KeywordStatsApp ➢ 运行 rt_applog 目录下的 jar 包 ➢ 查看控制台输出 ➢ 查看 ClickHouse 中 keyword_stats 表数据

在这里插入图片描述

关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0473s