sparkStreaming从kafka中拿取数据
完整代码
package com.chb.spark.streaming;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
public class KafkaDirectWordCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("kafka-wordCount").setMaster("local[2]");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(3));
//首先创建一个kafka的参数
Map kafkaParams = new HashMap();
//此处存储boker.list
kafkaParams.put("metadata.broker.list", "192.168.1.224:9092, 192.168.1.225:9092, 192.168.1.226:9092");
//可以读取多个topic
Set topics = new HashSet();
topics.add("wordcount");
JavaPairInputDStream lines = KafkaUtils.createDirectStream(
jssc,
String.class, //key的类型
String.class, //value的类型
StringDecoder.class, //解码器
StringDecoder.class,
kafkaParams,
topics
);
/**
* 输入的lines为Tuple2的元组,
* 其实key为空, value为line, flatMap目的将line切分成word
* val words = line.flatMap(_._2.split(" "));
*/
JavaDStream words = lines.flatMap(new FlatMapFunction() {
private static final long serialVersionUID = 1L;
@Override
public Iterable call(Tuple2 t)
throws Exception {
return Arrays.asList(t._2.split(" "));
}
});
/**
* 每个单词映射
* word=>(word, 1)
* val pairs words.map(x=>(x, 1));
*/
JavaPairDStream pairs = words.mapToPair(new PairFunction() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(String t) throws Exception {
return new Tuple2(t, 1);
}
});
/**
* 统计每个单词的总数
* val wcs = pairs.reduceByKey(_+_)
* public interface Function2 extends Serializable {
* R call(T1 v1, T2 v2) throws Exception;
* }
*/
JavaPairDStream wcs = pairs.reduceByKey(new Function2() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
//触发Action操作, 产生job, 打印每个batch的前10个结果
wcs.print();
jssc.start();
jssc.awaitTermination();
jssc.stop();
jssc.close();
}
}
scala代码
package com.spark.stream.scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Durations
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.StringDecoder
object WordCountKafka {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("WordCount").setMaster("local[2]") //至少2个线程,一个DRecive接受监听端口数据,一个计算
val sc = new StreamingContext(sparkConf, Durations.seconds(10));
// 首先要创建一份kafka参数map
// 我们这里是不需要zookeeper节点的啊,所以我们这里放broker.list
val kafkaParams = Map[String, String](
"metadata.broker.list" -> "node1:9092,node2:9092,node3:9092"
)
// 然后创建一个set,里面放入你要读取的Topic,这个就是我们所说的,它给你做的很好,可以并行读取多个topic
var topics = Set[String]("wordcount20160423");
//kafka返回的数据时key/value形式,后面只要对value进行分割就ok了
val linerdd = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
sc, kafkaParams, topics)
val wordrdd = linerdd.flatMap { _._2.split(" ") }
val resultrdd = wordrdd.map { x => (x, 1) }.reduceByKey { _ + _ }
// resultrdd.map(x => println(x._1+"--"+x._2))
resultrdd.print()
sc.start()
sc.awaitTermination()
sc.stop()
}
}