您当前的位置: 首页 > 

宝哥大数据

暂无认证

  • 0浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

标签开发:统计型标签

宝哥大数据 发布时间:2021-04-25 09:03:55 ,浏览量:0

文章目录
  • 一、统计标签介绍
    • 1.1、统计类型标签与规则匹配类型标签区别
  • 二、开发样例
    • 2.1、标签模型:年龄段
      • 2.1.1、新建标签
      • 2.1.2、标签模型分析
      • 2.1.3、标签模型开发
        • 2.1.3.1、自定义UDF函数,解析属性标签数据中规则rule字段信息
        • 2.1.3.2、通过DataFrame使用关联JOIN方式,给每个用户打上年龄段标签
    • 2.2、标签模型:消费周期
      • 2.2.1、**业务字段为【 finishtime 】:订单完成时间。**
      • 2.2.2、新建标签
        • 2.2.2.1、新建 业务(4级)标签 : 消费周期标签
        • 2.2.2.2、新建 **属性(5级)标签** :7日、2周、1月、2月、3月、4月、5月、6月
      • 2.2.3、准备数据
      • 2.2.4、提取属性标签规则
      • 2.2.5、标签模型开发
        • 2.2.5.1、业务思路分析
        • 2.2.5.2、标签模型类 `ConsumeCycleModel`
    • 2.3、标签模型:支付方式
      • 2.3.1、业务字段调研
      • 2.3.2、新建标签
        • 2.3.2.1、新建 **业务(4级)标签** : 支付方式标签
        • 2.3.2.2、新建 **属性(5级)标签** :支付宝、微信支付、银联支付、货到付款等
        • 2.3.2.3、属性标签数据插入【`tbl_basic_tag`】INSERT SQL语句如下:
      • 2.3.3、标签模型开发
        • 2.3.3.1、`Dataset/DataFrame` 中添加列函数 `withColumn` , 如果列存在就替换值,不存在创建新列 :
        • 2.3.3.2、在DSL编程中窗口函数的窗口设置使用 `Window类` 封装
        • 2.3.3.3、完整代码

一、统计标签介绍

  在 人口属性(用户特征)和商业属性(消费特征) 的标签中大部分是规则匹配类型标签和统计类型标签,选取3个统计类型标签开发模型:年龄段标签、消费周期标签和支付方式标签。

在这里插入图片描述

统计型标签是需要使用 聚合函数计算 后得到标签,比如最近3个月的退单率,用户最常用的支付方式等等,主要开发三个统计类型标签:

在这里插入图片描述

1.1、统计类型标签与规则匹配类型标签区别
  • 规则匹配类型标签,按照字段关联
    • 依据业务字段的值,获取对应标签值(tagName)
    • 基本上不涉及计算
  • 统计类型标签,需要对业务的数据需要计算,再获取对应标签值(tagName)
    • 例如:count,max,min,在。。。之间,大于,小于等等
二、开发样例 2.1、标签模型:年龄段

  构建用户画像时,其中用户年龄标签非常的重要,无论是数据报表分析,还是精准营销与个性化推荐,往往需要依据用户的年龄进行相关营销与推荐,如下展示了某母婴社群依据年龄和性别分析报表案例:

在这里插入图片描述

在标签管理平台新建对应的标签(业务标签和属性标签),编写标签模型类,继承标签模型基类 AbstractModel,实现其中标签计算的方法 doTag

2.1.1、新建标签

新建 业务(4级)标签 : 年龄段标签 ,相关字段信息如下:

标签名称:年龄段
标签分类:电商-某商城-人口属性
更新周期:
业务含义:注册用户的年龄段
标签规则:
inType=hbase
zkHosts=chb1
zkPort=2181
hbaseTable=tbl_tag_users
family=detail
selectFieldNames=id,birthday
程序入口:
	com.chb.tags.models.statistics.AgeRangeModel
算法名称:
	STATISTICS
算法引擎:
	tags-model_2.11.jar
模型参数:
	--driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1

新建 属性(5级)标签 :50后、60后、70后、80后、90后、00后、10后、20后,相关字段信息如下:

在这里插入图片描述

标签插入SQL语句:

-- 四级标签
INSERT INTO `tbl_basic_tag` VALUES ('338', '年龄段', null,'inType=hbase\nzkHosts=chb1\nzkPort=2181\nhbaseTable=tbl_tag_users\nfamily=detail\nselectFieldNames=id,birthday', null, '4', '314', '2019-12-20 17:06:48', '2019-12-20 17:06:48', null, null);

-- 五级标签
INSERT INTO `tbl_basic_tag` VALUES ('339', '50后', null, '19500101-19591231', null, '5', '338', '2019-12-20 17:11:23', '2019-12-20 17:11:23',null, null);
INSERT INTO `tbl_basic_tag` VALUES ('340', '60后', null, '19600101-19691231', null, '5', '338', '2019-12-20 17:11:38', '2019-12-20 17:11:38',null, null);
INSERT INTO `tbl_basic_tag` VALUES ('341', '70后', null, '19700101-19791231', null, '5', '338', '2019-12-20 17:12:54', '2019-12-20 17:12:54',null, null);
INSERT INTO `tbl_basic_tag` VALUES ('342', '80后', null, '19800101-19891231', null, '5', '338', '2019-12-20 17:13:08', '2019-12-20 17:13:08',null, null);
INSERT INTO `tbl_basic_tag` VALUES ('343', '90后', null, '19900101-19991231', null, '5', '338', '2019-12-20 17:13:22', '2019-12-20 17:13:22',null, null);
INSERT INTO `tbl_basic_tag` VALUES ('344', '00后', null, '20000101-20091231', null, '5', '338', '2019-12-20 17:13:38', '2019-12-20 17:13:38',null, null);
INSERT INTO `tbl_basic_tag` VALUES ('345', '10后', null, '20100101-20191231', null, '5', '338', '2019-12-20 17:13:54', '2019-12-20 17:13:54',null, null);
INSERT INTO `tbl_basic_tag` VALUES ('346', '20后', null, '20200101-20291231', null, '5', '338', '2019-12-20 17:13:54', '2019-12-20 17:13:54',null, null);

-- 模型表
INSERT INTO `tbl_model` VALUES ('7', '338', 'Statistics','com.chb.tags.models.statistics.AgeRangeModel', 'hdfs://chb1:8020/apps/temp/jars/499e0416-da3d-496c-8a32-994109918c17.jar', '0,2019-12-20 08:00:00,2029-12-20 08:00:00', '2019-12-20 17:06:48', '2019-12-20 17:06:48', '4', '--driver-memory 512m --executormemory 512m --num-executors 1 --executor-cores 1');

2.1.2、标签模型分析

继承基类 AbstractModel ,实现标签计算方法 doTag 。首先分析年龄段标签计算思路如下:

/*
338 国籍
属性标签数据:
339 50后 19500101-19591231
340 60后 19600101-19691231
341 70后 19700101-19791231
342 80后 19800101-19891231
343 90后 19900101-19991231
344 00后 20000101-20091231
345 10后 20100101-20191231
346 20后 20200101-20291231
业务数据:
	99 column=detail:birthday, value=1982-01-11 -> 19820111
分析思路:
比较birthday日期 在 某个年龄段之内,给予标签值(tagName)
	19820111 -> 80后
实现:JOIN,UDF函数
*/

解析属性标签数据中 标签规则rule ,采用自定义UDF函数方式,测试样例代码:

package com.chb.tags.test.spark

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.udf

/**
 * @Author: chb
 * @Date: 2021/4/25 8:52
 * @E-Mail:
 * @DESC:
 */
object SQLUdfTest {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[4]")
      .getOrCreate()
    import spark.implicits._
    // 0. 自定UDF函数,解析分解属性标签的规则rule: 19500101-19591231
    val rule_to_tuple: UserDefinedFunction = udf(
      (rule: String) => {
        val Array(start, end) = rule.split("-").map(_.toInt)
        // 返回二元组
        (start, end)
      }
    )

    val datasDF: DataFrame = Seq(
      "19500101-19591231",
      "19600101-19691231",
      "19700101-19791231",
      "19800101-19891231",
      "19900101-19991231",
      "20000101-20091231",
      "20100101-20191231",
      "20200101-20291231"
    ).toDF("rule")
    val ruleDF: DataFrame = datasDF
      .select(
        rule_to_tuple($"rule").as("rules")
      )
      .select(
        $"rules._1".as("start"),
        $"rules._2".as("end")
      )
    /*
    root
    |-- start: integer (nullable = true)
    |-- end: integer (nullable = true)
    +--------+--------+
    | start| end|
    +--------+--------+
    |19500101|19591231|
    |19600101|19691231|
    |19700101|19791231|
    |19800101|19891231|
    |19900101|19991231|
    |20000101|20091231|
    |20100101|20191231|
    |20200101|20291231|
    +--------+--------+
    */
    ruleDF.printSchema()
    ruleDF.show()
  }
}

2.1.3、标签模型开发

基于业务数据字段: birthday ,给用户打标签,具体步骤如下所示:

2.1.3.1、自定义UDF函数,解析属性标签数据中规则rule字段信息
    // 导入隐式转换
    import businessDF.sparkSession.implicits._
    // 导入函数库
    import org.apache.spark.sql.functions._

    // 1. 自定UDF函数,解析分解属性标签的规则rule: 19500101-19591231
    val rule_to_tuple: UserDefinedFunction = udf(
      (rule: String) => {
        val Array(start, end) = rule.split("-").map(_.toInt)
        // 返回二元组
        (start, end)
      }
    )

    // 2. 获取属性标签数据,解析规则rule
    val attrTagRuleDF: DataFrame = tagDF
      .filter($"level" === 5) // 5级标签
      .select(
        $"name", //
        rule_to_tuple($"rule").as("rules") //
      )
      // 获取起始start和结束end
      .select(
        $"name", //
        $"rules._1".as("start"), //
        $"rules._2".as("end") //
      )
    //attrTagRuleDF.show(20, truncate = false)

2.1.3.2、通过DataFrame使用关联JOIN方式,给每个用户打上年龄段标签

在这里插入图片描述

使用DataFrame中DSL编程时,涉及到两个函数处理数据,说明如下:

// 函数:正则替换
/**
* Replace all substrings of the specified string value that match regexp
with rep.
*
* @group string_funcs
* @since 1.5.0
*/
def regexp_replace(e: Column, pattern: String, replacement: String):
Column
// 函数:在。。。范围之内
/**
* True if the current column is between the lower bound and upper bound,
inclusive.
*
* @group java_expr_ops
* @since 1.4.0
*/
def between(lowerBound: Any, upperBound: Any): Column = {
(this >= lowerBound) && (this  {
val Array(start, end) = rule.split("-").map(_.toInt)
// 返回二元组
(start, end)
}
)
// 2. 获取属性标签数据,解析规则rule
val ruleDF: DataFrame = tagDF
.filter($"level" === 5) // 5级标签
.select(
$"name", //
rule_to_tuple($"rule").as("rules") //
)
// 获取起始start和结束end
.select(
$"name", //
$"rules._1".as("start"), //
$"rules._2".as("end") //
)
//ruleDF.show(20, truncate = false)
// 3. 返回标签规则
ruleDF
}
2.2.5、标签模型开发 2.2.5.1、业务思路分析
  • 1)、获取每个会员最近一个订单完成时间 按照memberid分组,获取finishtime最大值,使用max函数\

  • 2)、转换订单完成时间数据格式 1589817600 转换为日期格式(yyyy-MM-dd HH:mm:ss),使用from_unixtime函数

def from_unixtime(ut: Column): Column = withExpr {
	FromUnixTime(ut.expr, Literal("yyyy-MM-dd HH:mm:ss"))
}
def from_unixtime(ut: Column, f: String): Column = withExpr {
	FromUnixTime(ut.expr, Literal(f))
}
  • 3)、计算当前日期与订单完成期相差天数 使用函数:datediff
/**
* Returns the number of days from `start` to `end`.
* @group datetime_funcs
* @since 1.5.0
*/
def datediff(end: Column, start: Column): Column = withExpr {
	DateDiff(end.expr, start.expr)
}
  • 4)、获取当前时间 使用函数:current_timestamp()、current_date()
/**
* Returns the current timestamp as a timestamp column.
*
* @group datetime_funcs
* @since 1.5.0
*/
def current_timestamp(): Column = withExpr { CurrentTimestamp() }
2.2.5.2、标签模型类 ConsumeCycleModel
package com.chb.tags.models.statistics

import com.chb.tags.ModelType
import com.chb.tags.models.AbstractModel
import com.chb.tags.utils.TagTools
import org.apache.spark.sql.DataFrame

/**
 * @Author: chb
 * @Date: 2021/4/29 8:16
 * @E-Mail:
 * @DESC: 标签模型开发:消费周期标签模型
 */
class ConsumeCycleModel extends AbstractModel("消费周期标签", ModelType.STATISTICS) {

  /**
   * 4. 构建标签:依据业务数据和属性标签数据建立标签
   * 347 消费周期
   * 348 近7天 0-7
   * 349 近2周 8-14
   * 350 近1月 15-30
   * 351 近2月 31-60
   * 352 近3月 61-90
   * 353 近4月 91-120
   * 354 近5月 121-150
   * 355 近半年 151-180
   *
   * @param businessDF
   * @param tagDF
   * @return
   */
  override def doTag(businessDF: DataFrame, tagDF: DataFrame): DataFrame = {
    // 导入隐式转换
    import businessDF.sparkSession.implicits._
    import org.apache.spark.sql.functions._

    // 1. 获取属性标签数据,解析规则rule
    val attrTagDF: DataFrame = TagTools.convertTuple(tagDF)
    //attrTagDF.show(20, truncate = false)


    // 2. 订单数据按照会员ID:memberid分组,获取最近一次订单完成时间: finishtime
    val daysDF: DataFrame = businessDF
      // 2.1. 分组,获取最新订单时间,并转换格式
      .groupBy($"memberid") //
      .agg(
        from_unixtime(max($"finishtime")).as("finish_time")
      )
      // 2.2. 计算用户最新订单距今天数
      .select(
        $"memberid".as("userId"), //
        datediff(current_timestamp(),
          $"finish_time").as("consumer_days")
      )


    // 3. 关联属性标签数据和消费天数数据,加上判断条件,进行打标签
    val modelDF: DataFrame = daysDF
      .join(attrTagDF)
      .where(
        daysDF("consumer_days").between(attrTagDF("start"),
          attrTagDF("end"))
      )
      .select($"userId", $"name".as("consumercycle"))
    modelDF.printSchema()
    modelDF.show(20, truncate = false)


    // 4. 返回标签数据
    modelDF


  }
}


object ConsumeCycleModel {
  def main(args: Array[String]): Unit = {
    val tagModel = new ConsumeCycleModel()
    tagModel.executeModel(347L)
  }
}
2.3、标签模型:支付方式

用户的支付方式 有多种,比如使用支付宝/微信/货到付款/银联等等,需要知道用户最常用的支付方式,以便于了解用户最常用的支付平台。

2.3.1、业务字段调研

在MySQL数据库中,查看业务数据【订单表tbl_tag_orders】中支付相关字段【paymentCode、paymentName】,编写SQL看看有哪些值,便于标签的开发。

-- 查看MySQL数据库中订单表tags_dat.tbl_tag_orders
SELECT paymentCode, paymentName from tags_dat.tbl_tag_orders limit 10 ;
/*
+-------------+-------------+
| paymentCode | paymentName |
+-------------+-------------+
| alipay | 支付宝 |
| alipay | 支付宝 |
| alipay | 支付宝 |
| cod | 货到付款 |
| cod | 货到付款 |
| alipay | 支付宝 |
| alipay | 支付宝 |
| alipay | 支付宝 |
| alipay | 支付宝 |
| alipay | 支付宝 |
+-------------+-------------+
*/
-- 查看支付编码和支付名称
SELECT paymentCode, paymentName, COUNT(1) AS cnt from
tags_dat.tbl_tag_orders GROUP BY paymentCode, paymentName;
/*
+-------------+-------------+-------+
| paymentCode | paymentName | cnt |
+-------------+-------------+-------+
| alipay | 支付宝 | 96425 |
| chinapay | 银联支付 | 4068 |
| cod | 货到付款 | 16832 |
| wxpay | 微信支付 | 2800 |
+-------------+-------------+-------+
*/
-- SELECT paymentCode, paymentName, COUNT(1) AS cnt from
tags_dat.tbl_orders GROUP BY paymentCode, paymentName;

-- 查看支付方式
SELECT paymentCode, COUNT(1) AS cnt from tags_dat.tbl_tag_orders GROUP BY
paymentCode;
/*
+-------------+-------+
| paymentCode | cnt |
+-------------+-------+
| alipay | 96425 |
| chinapay | 4068 |
| cod | 16832 |
| wxpay | 2800 |
+-------------+-------+
*/
-- SELECT paymentCode, COUNT(1) AS cnt from tags_dat.tbl_orders GROUP BY
paymentCode;
2.3.2、新建标签 2.3.2.1、新建 业务(4级)标签 : 支付方式标签
标签名称:支付方式
标签分类:电商-某商城-商业属性
更新周期:1天
业务含义:用户订单的支付方式:支付宝、微信支付、银联支付、货到付款
标签规则:
inType=hbase
zkHosts=chb1
zkPort=2181
hbaseTable=tbl_tag_orders
family=detail
selectFieldNames=memberid,paymentcode
程序入口:
	com.chb.tags.models.statistics.PayTypeModel
算法名称:
	STATISTICS
算法引擎:	
	tags-model_2.11.jar
模型参数:	
	--driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1
2.3.2.2、新建 属性(5级)标签 :支付宝、微信支付、银联支付、货到付款等

在这里插入图片描述

2.3.2.3、属性标签数据插入【tbl_basic_tag】INSERT SQL语句如下:
INSERT INTO `tbl_basic_tag` VALUES ('356', '支付方式', null,'inType=hbase\nzkHosts=chb1\nzkPort=2181\nhbaseTable=tbl_tag_orders\nfamily=detail\nselectFieldNames=memberid,paymentcode', null, '4', '315', '2019-12-20 17:26:23','2019-12-20 17:26:23', null, null);

INSERT INTO `tbl_basic_tag` VALUES ('357', '支付宝', null, 'alipay', null,'5', '356', '2019-12-20 17:26:50', '2019-12-20 17:26:50', null, null);
INSERT INTO `tbl_basic_tag` VALUES ('358', '微信支付', null, 'wxpay', null,'5', '356', '2019-12-20 17:27:04', '2019-12-20 17:27:04', null, null);
INSERT INTO `tbl_basic_tag` VALUES ('359', '银联支付', null, 'chinapay',null, '5', '356', '2019-12-20 17:27:17', '2019-12-20 17:27:17', null, null);
INSERT INTO `tbl_basic_tag` VALUES ('360', '货到付款', null, 'cod', null,'5', '356', '2019-12-20 17:27:32', '2019-12-20 17:27:32', null, null);

INSERT INTO `tbl_model` VALUES ('9', '356', 'Statistics','com.chb.tags.models.statistics.PayTypeModel', 'hdfs://chb1:8020/apps/temp/jars/cbcbe36a-2808-47e2-b476-bec0b319c6c3.jar', '1,2019-12-20 08:00:00,2029-12-20 08:00:00', '2019-12-2017:26:23', '2019-12-20 17:26:23', '4', '--driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1');

2.3.3、标签模型开发 2.3.3.1、Dataset/DataFrame 中添加列函数 withColumn , 如果列存在就替换值,不存在创建新列 :
/**
* Returns a new Dataset by adding a column or replacing the existing
column
* that has the same name.
*
* @group untypedrel
* @since 2.0.0
*/
def withColumn(colName: String, col: Column): DataFrame
2.3.3.2、在DSL编程中窗口函数的窗口设置使用 Window类 封装
val window = Window
	.partitionBy($"") // 分区字段设置
	.orderBy($"".asc, $"".desc) // 排序字段及规则设置
val window = Window
	.distributeBy($"") // 分区字段设置
	.orderBy($"".asc, $"".desc) // 排序字段及规则设置
2.3.3.3、完整代码
package com.chb.tags.models.statistics

import com.chb.tags.ModelType
import com.chb.tags.models.AbstractModel
import com.chb.tags.utils.TagTools
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.expressions.Window

/**
 * @Author: chb
 * @Date: 2021/4/29 8:56
 * @E-Mail:
 * @DESC:
 */
class PayTypeModel extends AbstractModel("支付方式标签", ModelType.STATISTICS) {
  /*
  356 支付方式
  357 支付宝 alipay
  358 微信支付 wxpay
  359 银联支付 chinapay
  360 货到付款 cod
  */
  override def doTag(businessDF: DataFrame, tagDF: DataFrame): DataFrame
  = {
    // 导入隐式转换
    import businessDF.sparkSession.implicits._
    import org.apache.spark.sql.functions._

    // 1. 订单数据中获取每个用户最多支付方式
    val paymentDF: DataFrame = businessDF
      // 按照会员ID和支付编码分组,统计次数
      .groupBy($"memberid", $"paymentcode")
      .count()
      // 获取每个会员支付方式最多,使用开窗函数ROW_NUMBER
      .withColumn(
        "rnk", //
        row_number().over(
          Window.partitionBy($"memberid").orderBy($"count".desc)
        )
      )
      // 过滤rnk等于1数据
      .where($"rnk".equalTo(1))
      .select(
        $"memberid".as("id"), //
        $"paymentcode".as("payment") //
      )
    // 2. 计算标签,规则匹配
    val modelDF: DataFrame = TagTools.ruleMatchTag(
      paymentDF, "payment", tagDF
    )
    //modelDF.show(100, truncate = false)
    // 3. 返回标签数据
    modelDF
  }
}


object PayTypeModel {
  def main(args: Array[String]): Unit = {
    val tagModel = new PayTypeModel()
    tagModel.executeModel(356L)
  }
}
关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.5401s