Transform 操作
transform操作允许将任意RDD到RDD函数应用于DStream。 它可用于应用任何未在DStream API中公开的RDD操作。 例如,将数据流中的每个批处理与其他数据集相结合的功能不会直接暴露在DStream API中。 但是,您可以轻松地使用transform来执行此操作。 这使得非常强大的可能性。 例如,可以通过将输入数据流与预先计算的垃圾信息(也可以用Spark一起生成)进行实时数据清理,然后根据它进行过滤。
一、案例:过滤刷广告的用户, 1.1、模拟一个黑名单 1.1.1、模拟用户在网站上点击广告, 但是存在刷广告的现象, 所以对这类用户的点击流量进行滤除,所以将此类用户加入黑名单,//黑名单列表 (user, boolean), true表示该用户在黑名单中, 在后续的计算中,不记录该用户的点击效果。
final List blockList = new ArrayList();
//ture表示在黑名单上
blockList.add(new Tuple2("lisi", true));
1.1.1、将黑名单列表转为一个RDD,
//黑名单RDD (user, boolean)
JavaPairRDD blackRDD = jssc.sparkContext().parallelizePairs(blockList);
1.2、//从指定端口获取模拟点击日志:”date user”
//从指定端口获取模拟点击日志:"date user"
JavaReceiverInputDStream adsClickLogDStream = jssc.socketTextStream("192.168.1.224", 9999);
1.3、将数据流中的数据进行格式转换
日志格式为date user,为了在后续工作中, 和黑名单RDD进行join操作方便, 将日志格式改为(user, log);
log: date user
改为
(user, log)
//为了后面对数据流中的RDD和黑名单中RDD进行join操作, 将RDD中的数据进行格式化(user, log)
JavaPairDStream userAdsClickLogDStream = adsClickLogDStream.mapToPair(
new PairFunction() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(String log) throws Exception {
//对日志格式进行转换,"date user" 变为(user, log)
return new Tuple2(log.split(" ")[1], log);
}
});
1.4、过滤黑名单中的用户日志, 此处使用transform操作
//实时进行黑名单过滤, 执行transform操作, 将每个batch的RDD,与黑名单中的RDD进行join操作
JavaDStream validAdsClickLogDStream = userAdsClickLogDStream.transform(
new Function() {
private static final long serialVersionUID = 1L;
@Override
public JavaRDD call(JavaPairRDD userAdsClickLogRDD)
throws Exception {
//将黑名单RDD和每个batch的RDD进行join操作
// 这里为什么是左外连接,因为并不是每个用户都在黑名单中,所以直接用join,那么没有在黑名单中的数据,无法join到就会丢弃
// string是用户,string是日志,是否在黑名单里是Optional
//(user, (log, boolean))
JavaPairRDD joindRDD =
userAdsClickLogRDD.leftOuterJoin(blackRDD);
//过滤
JavaPairRDD filteredRDD =
joindRDD.filter(new Function() {
/*
* public interface Function extends Serializable {
R call(T1 v1) throws Exception;
}
*/
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Tuple2 tuple)
throws Exception {//(user, (log, boolean))
//这里tuple就是每个用户对应的访问日志和在黑名单中状态
if (tuple._2._2.isPresent() && tuple._2._2.get()) {
return false;
}else {
return true;
}
}
});
// 到此为止,filteredRDD中就只剩下没有被过滤的正常用户了,用map函数转换成我们要的格式,我们只要点击日志
JavaRDD validAdsCiickLogRDD = filteredRDD.map(new Function() {
private static final long serialVersionUID = 1L;
@Override
public String call( Tuple2 tuple)
throws Exception {
return tuple._2._1;
}
});
//放回过滤的结果
return validAdsCiickLogRDD;
}
});
1.4.1、在transfrom操作中, 对每个batch中的RDD进行join操作
//将黑名单RDD和每个batch的RDD进行join操作
// 这里为什么是左外连接,因为并不是每个用户都在黑名单中,所以直接用join,那么没有在黑名单中的数据,无法join到就会丢弃
// string是用户,string是日志,是否在黑名单里是Optional
//得到的结果:(user, (log, boolean))
JavaPairRDD joindRDD =
userAdsClickLogRDD.leftOuterJoin(blackRDD);
1.4.2、黑名单和batch中的RDDjoin之后,对结果进行过滤
//过滤
JavaPairRDD filteredRDD =
joindRDD.filter(new Function() {
/*
* public interface Function extends Serializable {
R call(T1 v1) throws Exception;
}
*/
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Tuple2 tuple)
throws Exception {//(user, (log, boolean))
//这里tuple就是每个用户对应的访问日志和在黑名单中状态
if (tuple._2._2.isPresent() && tuple._2._2.get()) {
return false;
}else {
return true;
}
}
});
1.4.3、就只剩下没有被过滤的正常用户了,用map函数转换成我们要的格式,我们只要点击日志
// 到此为止,filteredRDD中就只剩下没有被过滤的正常用户了,用map函数转换成我们要的格式,我们只要点击日志
JavaRDD validAdsCiickLogRDD = filteredRDD.map(new Function() {
private static final long serialVersionUID = 1L;
@Override
public String call( Tuple2 tuple)
throws Exception {
return tuple._2._1;
}
});
//放回过滤的结果
return validAdsCiickLogRDD;
1.5、启动
// 这后面就可以写入Kafka中间件消息队列,作为广告计费服务的有效广告点击数据
validAdsClickLogDStream.print();
jssc.start();
jssc.awaitTermination();
jssc.close();
scala
package com.chb.scala
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Seconds
import org.apache.spark.SparkContext
object BlackListFilter {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("BlackListFilter")
.setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
//黑名单
val blackList = Array(("jack", true), ("rose", true))
//设置并行度
val blackListRDD = ssc.sparkContext.parallelize(blackList, 3)
//使用socketTextStream 监听端口
var st = ssc.socketTextStream("192.168.179.5", 8888)
//user, boolean==>
val users = st.map {
line => (line.split(" ")(1), line)
}
val validRddDS = users.transform(ld => {
//通过leftOuterJoin 将(k, v) join (k,w) ==> (k, (v, some(W)))
val ljoinRdd = ld.leftOuterJoin(blackListRDD)
//过滤掉黑名单
val fRdd = ljoinRdd.filter(tuple => {
println(tuple)
if(tuple._2._2.getOrElse(false)) {
false
} else {
true
}
})
//获取白名单
val validRdd = fRdd.map(tuple => tuple._2._1)
validRdd
})
validRddDS.print()
ssc.start()
ssc.awaitTermination()
}
}