- 一、统计标签介绍
- 1.1、统计类型标签与规则匹配类型标签区别
- 二、开发样例
- 2.1、标签模型:年龄段
- 2.1.1、新建标签
- 2.1.2、标签模型分析
- 2.1.3、标签模型开发
- 2.1.3.1、自定义UDF函数,解析属性标签数据中规则rule字段信息
- 2.1.3.2、通过DataFrame使用关联JOIN方式,给每个用户打上年龄段标签
- 2.2、标签模型:消费周期
- 2.2.1、**业务字段为【 finishtime 】:订单完成时间。**
- 2.2.2、新建标签
- 2.2.2.1、新建 业务(4级)标签 : 消费周期标签
- 2.2.2.2、新建 **属性(5级)标签** :7日、2周、1月、2月、3月、4月、5月、6月
- 2.2.3、准备数据
- 2.2.4、提取属性标签规则
- 2.2.5、标签模型开发
- 2.2.5.1、业务思路分析
- 2.2.5.2、标签模型类 `ConsumeCycleModel`
- 2.3、标签模型:支付方式
- 2.3.1、业务字段调研
- 2.3.2、新建标签
- 2.3.2.1、新建 **业务(4级)标签** : 支付方式标签
- 2.3.2.2、新建 **属性(5级)标签** :支付宝、微信支付、银联支付、货到付款等
- 2.3.2.3、属性标签数据插入【`tbl_basic_tag`】INSERT SQL语句如下:
- 2.3.3、标签模型开发
- 2.3.3.1、`Dataset/DataFrame` 中添加列函数 `withColumn` , 如果列存在就替换值,不存在创建新列 :
- 2.3.3.2、在DSL编程中窗口函数的窗口设置使用 `Window类` 封装
- 2.3.3.3、完整代码
在 人口属性(用户特征)和商业属性(消费特征) 的标签中大部分是规则匹配类型标签和统计类型标签,选取3个统计类型标签开发模型:年龄段标签、消费周期标签和支付方式标签。
统计型标签是需要使用 聚合函数计算 后得到标签,比如最近3个月的退单率,用户最常用的支付方式等等,主要开发三个统计类型标签:
- 规则匹配类型标签,按照字段关联
- 依据业务字段的值,获取对应标签值(tagName)
- 基本上不涉及计算
- 统计类型标签,需要对业务的数据需要计算,再获取对应标签值(tagName)
- 例如:count,max,min,在。。。之间,大于,小于等等
构建用户画像时,其中用户年龄标签非常的重要,无论是数据报表分析,还是精准营销与个性化推荐,往往需要依据用户的年龄进行相关营销与推荐,如下展示了某母婴社群依据年龄和性别分析报表案例:
在标签管理平台新建对应的标签(业务标签和属性标签),编写标签模型类,继承标签模型基类 AbstractModel
,实现其中标签计算的方法 doTag
。
新建 业务(4级)标签 : 年龄段标签 ,相关字段信息如下:
标签名称:年龄段
标签分类:电商-某商城-人口属性
更新周期:
业务含义:注册用户的年龄段
标签规则:
inType=hbase
zkHosts=chb1
zkPort=2181
hbaseTable=tbl_tag_users
family=detail
selectFieldNames=id,birthday
程序入口:
com.chb.tags.models.statistics.AgeRangeModel
算法名称:
STATISTICS
算法引擎:
tags-model_2.11.jar
模型参数:
--driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1
新建 属性(5级)标签 :50后、60后、70后、80后、90后、00后、10后、20后,相关字段信息如下:
标签插入SQL语句:
-- 四级标签
INSERT INTO `tbl_basic_tag` VALUES ('338', '年龄段', null,'inType=hbase\nzkHosts=chb1\nzkPort=2181\nhbaseTable=tbl_tag_users\nfamily=detail\nselectFieldNames=id,birthday', null, '4', '314', '2019-12-20 17:06:48', '2019-12-20 17:06:48', null, null);
-- 五级标签
INSERT INTO `tbl_basic_tag` VALUES ('339', '50后', null, '19500101-19591231', null, '5', '338', '2019-12-20 17:11:23', '2019-12-20 17:11:23',null, null);
INSERT INTO `tbl_basic_tag` VALUES ('340', '60后', null, '19600101-19691231', null, '5', '338', '2019-12-20 17:11:38', '2019-12-20 17:11:38',null, null);
INSERT INTO `tbl_basic_tag` VALUES ('341', '70后', null, '19700101-19791231', null, '5', '338', '2019-12-20 17:12:54', '2019-12-20 17:12:54',null, null);
INSERT INTO `tbl_basic_tag` VALUES ('342', '80后', null, '19800101-19891231', null, '5', '338', '2019-12-20 17:13:08', '2019-12-20 17:13:08',null, null);
INSERT INTO `tbl_basic_tag` VALUES ('343', '90后', null, '19900101-19991231', null, '5', '338', '2019-12-20 17:13:22', '2019-12-20 17:13:22',null, null);
INSERT INTO `tbl_basic_tag` VALUES ('344', '00后', null, '20000101-20091231', null, '5', '338', '2019-12-20 17:13:38', '2019-12-20 17:13:38',null, null);
INSERT INTO `tbl_basic_tag` VALUES ('345', '10后', null, '20100101-20191231', null, '5', '338', '2019-12-20 17:13:54', '2019-12-20 17:13:54',null, null);
INSERT INTO `tbl_basic_tag` VALUES ('346', '20后', null, '20200101-20291231', null, '5', '338', '2019-12-20 17:13:54', '2019-12-20 17:13:54',null, null);
-- 模型表
INSERT INTO `tbl_model` VALUES ('7', '338', 'Statistics','com.chb.tags.models.statistics.AgeRangeModel', 'hdfs://chb1:8020/apps/temp/jars/499e0416-da3d-496c-8a32-994109918c17.jar', '0,2019-12-20 08:00:00,2029-12-20 08:00:00', '2019-12-20 17:06:48', '2019-12-20 17:06:48', '4', '--driver-memory 512m --executormemory 512m --num-executors 1 --executor-cores 1');
2.1.2、标签模型分析
继承基类 AbstractModel ,实现标签计算方法 doTag 。首先分析年龄段标签计算思路如下:
/*
338 国籍
属性标签数据:
339 50后 19500101-19591231
340 60后 19600101-19691231
341 70后 19700101-19791231
342 80后 19800101-19891231
343 90后 19900101-19991231
344 00后 20000101-20091231
345 10后 20100101-20191231
346 20后 20200101-20291231
业务数据:
99 column=detail:birthday, value=1982-01-11 -> 19820111
分析思路:
比较birthday日期 在 某个年龄段之内,给予标签值(tagName)
19820111 -> 80后
实现:JOIN,UDF函数
*/
解析属性标签数据中 标签规则rule ,采用自定义UDF函数方式,测试样例代码:
package com.chb.tags.test.spark
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.udf
/**
* @Author: chb
* @Date: 2021/4/25 8:52
* @E-Mail:
* @DESC:
*/
object SQLUdfTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[4]")
.getOrCreate()
import spark.implicits._
// 0. 自定UDF函数,解析分解属性标签的规则rule: 19500101-19591231
val rule_to_tuple: UserDefinedFunction = udf(
(rule: String) => {
val Array(start, end) = rule.split("-").map(_.toInt)
// 返回二元组
(start, end)
}
)
val datasDF: DataFrame = Seq(
"19500101-19591231",
"19600101-19691231",
"19700101-19791231",
"19800101-19891231",
"19900101-19991231",
"20000101-20091231",
"20100101-20191231",
"20200101-20291231"
).toDF("rule")
val ruleDF: DataFrame = datasDF
.select(
rule_to_tuple($"rule").as("rules")
)
.select(
$"rules._1".as("start"),
$"rules._2".as("end")
)
/*
root
|-- start: integer (nullable = true)
|-- end: integer (nullable = true)
+--------+--------+
| start| end|
+--------+--------+
|19500101|19591231|
|19600101|19691231|
|19700101|19791231|
|19800101|19891231|
|19900101|19991231|
|20000101|20091231|
|20100101|20191231|
|20200101|20291231|
+--------+--------+
*/
ruleDF.printSchema()
ruleDF.show()
}
}
2.1.3、标签模型开发
基于业务数据字段: birthday ,给用户打标签,具体步骤如下所示:
2.1.3.1、自定义UDF函数,解析属性标签数据中规则rule字段信息 // 导入隐式转换
import businessDF.sparkSession.implicits._
// 导入函数库
import org.apache.spark.sql.functions._
// 1. 自定UDF函数,解析分解属性标签的规则rule: 19500101-19591231
val rule_to_tuple: UserDefinedFunction = udf(
(rule: String) => {
val Array(start, end) = rule.split("-").map(_.toInt)
// 返回二元组
(start, end)
}
)
// 2. 获取属性标签数据,解析规则rule
val attrTagRuleDF: DataFrame = tagDF
.filter($"level" === 5) // 5级标签
.select(
$"name", //
rule_to_tuple($"rule").as("rules") //
)
// 获取起始start和结束end
.select(
$"name", //
$"rules._1".as("start"), //
$"rules._2".as("end") //
)
//attrTagRuleDF.show(20, truncate = false)
2.1.3.2、通过DataFrame使用关联JOIN方式,给每个用户打上年龄段标签
使用DataFrame中DSL编程时,涉及到两个函数处理数据,说明如下:
// 函数:正则替换
/**
* Replace all substrings of the specified string value that match regexp
with rep.
*
* @group string_funcs
* @since 1.5.0
*/
def regexp_replace(e: Column, pattern: String, replacement: String):
Column
// 函数:在。。。范围之内
/**
* True if the current column is between the lower bound and upper bound,
inclusive.
*
* @group java_expr_ops
* @since 1.4.0
*/
def between(lowerBound: Any, upperBound: Any): Column = {
(this >= lowerBound) && (this {
val Array(start, end) = rule.split("-").map(_.toInt)
// 返回二元组
(start, end)
}
)
// 2. 获取属性标签数据,解析规则rule
val ruleDF: DataFrame = tagDF
.filter($"level" === 5) // 5级标签
.select(
$"name", //
rule_to_tuple($"rule").as("rules") //
)
// 获取起始start和结束end
.select(
$"name", //
$"rules._1".as("start"), //
$"rules._2".as("end") //
)
//ruleDF.show(20, truncate = false)
// 3. 返回标签规则
ruleDF
}
2.2.5、标签模型开发
2.2.5.1、业务思路分析
-
1)、获取每个会员最近一个订单完成时间 按照memberid分组,获取finishtime最大值,使用max函数\
-
2)、转换订单完成时间数据格式 1589817600 转换为日期格式(yyyy-MM-dd HH:mm:ss),使用from_unixtime函数
def from_unixtime(ut: Column): Column = withExpr {
FromUnixTime(ut.expr, Literal("yyyy-MM-dd HH:mm:ss"))
}
def from_unixtime(ut: Column, f: String): Column = withExpr {
FromUnixTime(ut.expr, Literal(f))
}
- 3)、计算当前日期与订单完成期相差天数 使用函数:datediff
/**
* Returns the number of days from `start` to `end`.
* @group datetime_funcs
* @since 1.5.0
*/
def datediff(end: Column, start: Column): Column = withExpr {
DateDiff(end.expr, start.expr)
}
- 4)、获取当前时间 使用函数:current_timestamp()、current_date()
/**
* Returns the current timestamp as a timestamp column.
*
* @group datetime_funcs
* @since 1.5.0
*/
def current_timestamp(): Column = withExpr { CurrentTimestamp() }
2.2.5.2、标签模型类 ConsumeCycleModel
package com.chb.tags.models.statistics
import com.chb.tags.ModelType
import com.chb.tags.models.AbstractModel
import com.chb.tags.utils.TagTools
import org.apache.spark.sql.DataFrame
/**
* @Author: chb
* @Date: 2021/4/29 8:16
* @E-Mail:
* @DESC: 标签模型开发:消费周期标签模型
*/
class ConsumeCycleModel extends AbstractModel("消费周期标签", ModelType.STATISTICS) {
/**
* 4. 构建标签:依据业务数据和属性标签数据建立标签
* 347 消费周期
* 348 近7天 0-7
* 349 近2周 8-14
* 350 近1月 15-30
* 351 近2月 31-60
* 352 近3月 61-90
* 353 近4月 91-120
* 354 近5月 121-150
* 355 近半年 151-180
*
* @param businessDF
* @param tagDF
* @return
*/
override def doTag(businessDF: DataFrame, tagDF: DataFrame): DataFrame = {
// 导入隐式转换
import businessDF.sparkSession.implicits._
import org.apache.spark.sql.functions._
// 1. 获取属性标签数据,解析规则rule
val attrTagDF: DataFrame = TagTools.convertTuple(tagDF)
//attrTagDF.show(20, truncate = false)
// 2. 订单数据按照会员ID:memberid分组,获取最近一次订单完成时间: finishtime
val daysDF: DataFrame = businessDF
// 2.1. 分组,获取最新订单时间,并转换格式
.groupBy($"memberid") //
.agg(
from_unixtime(max($"finishtime")).as("finish_time")
)
// 2.2. 计算用户最新订单距今天数
.select(
$"memberid".as("userId"), //
datediff(current_timestamp(),
$"finish_time").as("consumer_days")
)
// 3. 关联属性标签数据和消费天数数据,加上判断条件,进行打标签
val modelDF: DataFrame = daysDF
.join(attrTagDF)
.where(
daysDF("consumer_days").between(attrTagDF("start"),
attrTagDF("end"))
)
.select($"userId", $"name".as("consumercycle"))
modelDF.printSchema()
modelDF.show(20, truncate = false)
// 4. 返回标签数据
modelDF
}
}
object ConsumeCycleModel {
def main(args: Array[String]): Unit = {
val tagModel = new ConsumeCycleModel()
tagModel.executeModel(347L)
}
}
2.3、标签模型:支付方式
用户的支付方式 有多种,比如使用支付宝/微信/货到付款/银联等等,需要知道用户最常用的支付方式,以便于了解用户最常用的支付平台。
2.3.1、业务字段调研在MySQL数据库中,查看业务数据【订单表tbl_tag_orders
】中支付相关字段【paymentCode、paymentName
】,编写SQL看看有哪些值,便于标签的开发。
-- 查看MySQL数据库中订单表tags_dat.tbl_tag_orders
SELECT paymentCode, paymentName from tags_dat.tbl_tag_orders limit 10 ;
/*
+-------------+-------------+
| paymentCode | paymentName |
+-------------+-------------+
| alipay | 支付宝 |
| alipay | 支付宝 |
| alipay | 支付宝 |
| cod | 货到付款 |
| cod | 货到付款 |
| alipay | 支付宝 |
| alipay | 支付宝 |
| alipay | 支付宝 |
| alipay | 支付宝 |
| alipay | 支付宝 |
+-------------+-------------+
*/
-- 查看支付编码和支付名称
SELECT paymentCode, paymentName, COUNT(1) AS cnt from
tags_dat.tbl_tag_orders GROUP BY paymentCode, paymentName;
/*
+-------------+-------------+-------+
| paymentCode | paymentName | cnt |
+-------------+-------------+-------+
| alipay | 支付宝 | 96425 |
| chinapay | 银联支付 | 4068 |
| cod | 货到付款 | 16832 |
| wxpay | 微信支付 | 2800 |
+-------------+-------------+-------+
*/
-- SELECT paymentCode, paymentName, COUNT(1) AS cnt from
tags_dat.tbl_orders GROUP BY paymentCode, paymentName;
-- 查看支付方式
SELECT paymentCode, COUNT(1) AS cnt from tags_dat.tbl_tag_orders GROUP BY
paymentCode;
/*
+-------------+-------+
| paymentCode | cnt |
+-------------+-------+
| alipay | 96425 |
| chinapay | 4068 |
| cod | 16832 |
| wxpay | 2800 |
+-------------+-------+
*/
-- SELECT paymentCode, COUNT(1) AS cnt from tags_dat.tbl_orders GROUP BY
paymentCode;
2.3.2、新建标签
2.3.2.1、新建 业务(4级)标签 : 支付方式标签
标签名称:支付方式
标签分类:电商-某商城-商业属性
更新周期:1天
业务含义:用户订单的支付方式:支付宝、微信支付、银联支付、货到付款
标签规则:
inType=hbase
zkHosts=chb1
zkPort=2181
hbaseTable=tbl_tag_orders
family=detail
selectFieldNames=memberid,paymentcode
程序入口:
com.chb.tags.models.statistics.PayTypeModel
算法名称:
STATISTICS
算法引擎:
tags-model_2.11.jar
模型参数:
--driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1
2.3.2.2、新建 属性(5级)标签 :支付宝、微信支付、银联支付、货到付款等
tbl_basic_tag
】INSERT SQL语句如下:
INSERT INTO `tbl_basic_tag` VALUES ('356', '支付方式', null,'inType=hbase\nzkHosts=chb1\nzkPort=2181\nhbaseTable=tbl_tag_orders\nfamily=detail\nselectFieldNames=memberid,paymentcode', null, '4', '315', '2019-12-20 17:26:23','2019-12-20 17:26:23', null, null);
INSERT INTO `tbl_basic_tag` VALUES ('357', '支付宝', null, 'alipay', null,'5', '356', '2019-12-20 17:26:50', '2019-12-20 17:26:50', null, null);
INSERT INTO `tbl_basic_tag` VALUES ('358', '微信支付', null, 'wxpay', null,'5', '356', '2019-12-20 17:27:04', '2019-12-20 17:27:04', null, null);
INSERT INTO `tbl_basic_tag` VALUES ('359', '银联支付', null, 'chinapay',null, '5', '356', '2019-12-20 17:27:17', '2019-12-20 17:27:17', null, null);
INSERT INTO `tbl_basic_tag` VALUES ('360', '货到付款', null, 'cod', null,'5', '356', '2019-12-20 17:27:32', '2019-12-20 17:27:32', null, null);
INSERT INTO `tbl_model` VALUES ('9', '356', 'Statistics','com.chb.tags.models.statistics.PayTypeModel', 'hdfs://chb1:8020/apps/temp/jars/cbcbe36a-2808-47e2-b476-bec0b319c6c3.jar', '1,2019-12-20 08:00:00,2029-12-20 08:00:00', '2019-12-2017:26:23', '2019-12-20 17:26:23', '4', '--driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1');
2.3.3、标签模型开发
2.3.3.1、Dataset/DataFrame
中添加列函数 withColumn
, 如果列存在就替换值,不存在创建新列 :
/**
* Returns a new Dataset by adding a column or replacing the existing
column
* that has the same name.
*
* @group untypedrel
* @since 2.0.0
*/
def withColumn(colName: String, col: Column): DataFrame
2.3.3.2、在DSL编程中窗口函数的窗口设置使用 Window类
封装
val window = Window
.partitionBy($"") // 分区字段设置
.orderBy($"".asc, $"".desc) // 排序字段及规则设置
val window = Window
.distributeBy($"") // 分区字段设置
.orderBy($"".asc, $"".desc) // 排序字段及规则设置
2.3.3.3、完整代码
package com.chb.tags.models.statistics
import com.chb.tags.ModelType
import com.chb.tags.models.AbstractModel
import com.chb.tags.utils.TagTools
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.expressions.Window
/**
* @Author: chb
* @Date: 2021/4/29 8:56
* @E-Mail:
* @DESC:
*/
class PayTypeModel extends AbstractModel("支付方式标签", ModelType.STATISTICS) {
/*
356 支付方式
357 支付宝 alipay
358 微信支付 wxpay
359 银联支付 chinapay
360 货到付款 cod
*/
override def doTag(businessDF: DataFrame, tagDF: DataFrame): DataFrame
= {
// 导入隐式转换
import businessDF.sparkSession.implicits._
import org.apache.spark.sql.functions._
// 1. 订单数据中获取每个用户最多支付方式
val paymentDF: DataFrame = businessDF
// 按照会员ID和支付编码分组,统计次数
.groupBy($"memberid", $"paymentcode")
.count()
// 获取每个会员支付方式最多,使用开窗函数ROW_NUMBER
.withColumn(
"rnk", //
row_number().over(
Window.partitionBy($"memberid").orderBy($"count".desc)
)
)
// 过滤rnk等于1数据
.where($"rnk".equalTo(1))
.select(
$"memberid".as("id"), //
$"paymentcode".as("payment") //
)
// 2. 计算标签,规则匹配
val modelDF: DataFrame = TagTools.ruleMatchTag(
paymentDF, "payment", tagDF
)
//modelDF.show(100, truncate = false)
// 3. 返回标签数据
modelDF
}
}
object PayTypeModel {
def main(args: Array[String]): Unit = {
val tagModel = new PayTypeModel()
tagModel.executeModel(356L)
}
}