一、scala
package com.chb.scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageLevel
/**
*
*/
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("wordcount").setMaster("local");
val sc = new SparkContext(conf);
val lines = sc.textFile("hdfs://192.168.1.224:9000/user/root/abc.log", 1);
val words = lines.flatMap { line => line.split(" ") };
val pairs = words.map { word => (word, 1) };
val wcs = pairs.reduceByKey(_+_);
val sortWCs = wcs.map(x=>(x._2,x._1)).sortByKey().map(x=>(x._2,x._1));
sortWCs.foreach(x=>println(x._1+" appears " + x._2 + "times"));
}
}
二、错误出现scala版本冲突问题
The version of scala library found in the build path of TestSpark (2.10.5) is prior to the one provided by scala IDE (2.11.7). Setting a Scala Installation Choice to match. TestSpark Unknown Scala Version Problem
解决方法:
新建scala工程 1.在工程中移除自带的scala版本库 1.添加spark 库spark-assembly-1.1.0-cdh5.2.0-hadoop2.5.0-cdh5.2.0.jar 3.修改工程中的scala编译版本 4.右击 –> Scala –> set the Scala Installation
也可以
右击工程–> Properties –> Scala Compiler –> Use project Setting 中选择spark对应的scala版本,此处选择Lastest2.10 bundle
三、java java的程序基本按照scala的流程, 改过来的,package com.chb.java;
//java包
import java.util.Arrays;
//spark包
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
//scala包
import scala.Tuple2;
public class WordCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("wordcount").setMaster("local");
//java的context
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD lines = jsc.textFile("hdfs://192.168.1.224:9000/user/root/abc.log");
JavaRDD words = lines.flatMap(new FlatMapFunction() {
private static final long serialVersionUID = 1L;
@Override
public Iterable call(String line) throws Exception {
//通过Arrays.asList()方法将数据转为List
return Arrays.asList(line.split(" "));
}
});
//此处使用JAVAPariRDD,将单词组装为(word, 1)
JavaPairRDD pairs = words.mapToPair(new PairFunction() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(String word) throws Exception {
return new Tuple2(word, 1);
}
});
//reduceByKey
JavaPairRDD wcs = pairs.reduceByKey(new Function2() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer arg0, Integer arg1) throws Exception {
return arg0 + arg1;
}
});
//将key value调换位置, 以便按照word的次数排序
JavaPairRDD tmpWCs = wcs.mapToPair(new PairFunction() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(Tuple2 tuple)
throws Exception {
return new Tuple2(tuple._2, tuple._1);
}
});
//排序
JavaPairRDD sortedWCs = tmpWCs.sortByKey();
//将key value调换位置,按照(word, num)显示
JavaPairRDD resultWCs = sortedWCs.mapToPair(new PairFunction() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(Tuple2 tuple)
throws Exception {
return new Tuple2(tuple._2, tuple._1);
}
});
resultWCs.foreach(new VoidFunction() {
@Override
public void call(Tuple2 arg0) throws Exception {
System.out.println(arg0._1 + "---->" + arg0._2);
}
});
jsc.close();
}
}