- 1.1、需求分析与思路
- 1.2、关于分词
- 1.3、 搜索关键词功能实现
- 1.3.1、IK 分词器的使用
- 1.3.2、自定义函数
- 1.3.2.1、自定义函数分类
- 1.3.2.2、封装 KeywordUDTF 函数 [【参考】](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html)
- 1.4、创建Table环境
- 1.5、声明动态表和自定义函数
- 1.5.1、数据格式
- 1.5.2、创建table
- 1.6、从动态表中查询数据
- 1.7、利用自定义函数 对搜索关键词进行拆分 [参考 joins--Join 表函数](https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/sql/queries.html#joins)
- 1.8、分组、开窗、聚合
- 1.9、转换流并写入ClickHouse
- 1.9.1、创建表
- 二、整体测试
关键词主题这个主要是为了大屏展示中的字符云的展示效果,用于感性的让大屏观看者,感知目前的用户都更关心的那些商品和关键词。 关键词的展示也是一种维度聚合的结果,根据聚合的大小来决定关键词的大小。 关键词的第一重要来源的就是用户在搜索栏的搜索,另外就是从以商品为主题的统计中获取关键词。
1.2、关于分词需要根据把长文本分割成一个一个的词,这种分词技术,在搜索引擎中可能会用到。对于中文分词,现在的搜索引擎基本上都是使用的第三方分词器,咱们在计算数据中也可以,使用和搜索引擎中一致的分词器,IK。
1.3、 搜索关键词功能实现 1.3.1、IK 分词器的使用- 添加依赖
com.janeluo
ikanalyzer
2012_u6
- 封装分词工具类
com.chb.realtime.utils.KeywordUtil
有了分词器,那么另外一个要考虑的问题就是如何把分词器的使用揉进 FlinkSQL 中。因为 SQL 的语法和相关的函数都是 Flink 内定的,想要使用外部工具,就必须结合自定义函数。
1.3.2.1、自定义函数分类➢ Scalar Function(相当于 Spark 的 UDF), ➢ Table Function(相当于 Spark 的 UDTF), ➢ Aggregation Functions (相当于 Spark 的 UDAF) 考虑到一个词条包括多个词语所以分词是指上是一种一对多的拆分,一拆多的情况,我们应该选择 Table Function。
1.3.2.2、封装 KeywordUDTF 函数 【参考】@FunctionHint 主要是为了标识输出数据的类型 row.setField(0,keyword)
中的 0 表示返回值下标为 0 的值
@FunctionHint(output = @DataTypeHint("ROW"))
public class KeywordUDTF extends TableFunction {
public void eval(String value) {
//使用工具类对字符串进行分词
List keywordList = KeywordUtil.analyze(value);
for (String keyword : keywordList) {
//collect(Row.of(keyword));
Row row = new Row(1);
row.setField(0,keyword);
collect(row);
}
}
}
1.4、创建Table环境
//1.4 创建Table环境
EnvironmentSettings setting = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, setting);
1.5、声明动态表和自定义函数
1.5.1、数据格式
{
"common": {
"ar": "310000",
"uid": "34",
"os": "Android 11.0",
"ch": "wandoujia",
"is_new": "1",
"md": "vivo iqoo3",
"mid": "mid_5",
"vc": "v2.1.134",
"ba": "vivo"
},
"page": {
"page_id": "good_list", # 在 商品详情页才会有关键词
"item": "电视", # 关键词
"during_time": 3654,
"item_type": "keyword",
"last_page_id": "home"
},
"ts": 1616571967000 # 时间
}
1.5.2、创建table
注意 json 格式的要定义为 Map 对象 FROM_UNIXTIME
将距离1970-01-01 00:00:00的秒, 转为指定格式的字符串 TO_TIMESTAMP
将字符串日期转为timestamp
WATERMARK
指定水位标记
WATERMARK FOR rowtime AS rowtime - INTERVAL '2' SECOND
WATERMARK
字段的日期必须是timestampINTERVAL '2' SECOND
指定乱序时间
WITH
指定 kafka connector的属性配置
//TODO 2.注册自定义函数
tableEnv.createTemporarySystemFunction("ik_analyze", KeywordUDTF.class);
//TODO 3.创建动态表
//3.1 声明主题以及消费者组
String pageViewSourceTopic = "dwd_page_log";
String groupId = "keywordstats_app_group";
//3.2建表
// 注意 json 格式的要定义为 Map 对象
tableEnv.executeSql(
"CREATE TABLE page_view (" +
" common MAP," +
" page MAP," +
" ts BIGINT," +
" rowtime as TO_TIMESTAMP(FROM_UNIXTIME(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +
" WATERMARK FOR rowtime AS rowtime - INTERVAL '2' SECOND) " +
" WITH (" + MyKafkaUtil.getKafkaDDL(pageViewSourceTopic, groupId) + ")"
);
1.6、从动态表中查询数据
//TODO 4.从动态表中查询数据 --->大数据数仓-> [大, 数据, 数, 仓]
Table fullwordTable = tableEnv.sqlQuery(
"select page['item'] fullword,rowtime " +
" from page_view " +
" where page['page_id']='good_list' and page['item'] IS NOT NULL"
);
1.7、利用自定义函数 对搜索关键词进行拆分 参考 joins–Join 表函数
//TODO 5.利用自定义函数 对搜索关键词进行拆分
Table keywordTable = tableEnv.sqlQuery(
"SELECT keyword, rowtime " +
"FROM " + fullwordTable + "," +
"LATERAL TABLE(ik_analyze(fullword)) AS t(keyword)"
);
1.8、分组、开窗、聚合
聚合属于查询 参考【分组窗口】 SQL 查询的分组窗口是通过 GROUP BY 子句定义的
//TODO 6.分组、开窗、聚合
Table reduceTable = tableEnv.sqlQuery(
"select keyword,count(*) ct, '" + ChbConstant.KEYWORD_SEARCH + "' source," +
"DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND),'yyyy-MM-dd HH:mm:ss') stt," +
"DATE_FORMAT(TUMBLE_END(rowtime, INTERVAL '10' SECOND),'yyyy-MM-dd HH:mm:ss') edt ," +
"UNIX_TIMESTAMP()*1000 ts from " + keywordTable +
" group by TUMBLE(rowtime, INTERVAL '10' SECOND),keyword"
);
1.9、转换流并写入ClickHouse
//TODO 7.转换为流
DataStream keywordStatsDS = tableEnv.toAppendStream(reduceTable, KeywordStats.class);
keywordStatsDS.print(">>>>");
//TODO 8.写入到ClickHouse
keywordStatsDS.addSink(
ClickHouseUtil.getJdbcSink("insert into keyword_stats(keyword,ct,source,stt,edt,ts) values(?,?,?,?,?,?)")
);
1.9.1、创建表
create table keyword_stats (
stt DateTime,
edt DateTime,
keyword String ,
source String ,
ct UInt64 ,
ts UInt64
)engine =ReplacingMergeTree( ts)
partition by toYYYYMMDD(stt)
order by ( stt,edt,keyword,source );
二、整体测试
➢ 启动 ZK、Kafka、logger.sh、ClickHouse ➢ 运行 BaseLogApp ➢ 运行 KeywordStatsApp ➢ 运行 rt_applog 目录下的 jar 包 ➢ 查看控制台输出 ➢ 查看 ClickHouse 中 keyword_stats 表数据