您当前的位置: 首页 >  ar

宝哥大数据

暂无认证

  • 1浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Spark之WordCount

宝哥大数据 发布时间:2017-04-29 08:29:47 ,浏览量:1

一、scala
package com.chb.scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageLevel
/**
 * 
 */
object WordCount {
    def main(args: Array[String]): Unit = {
      val conf = new SparkConf().setAppName("wordcount").setMaster("local");

      val sc = new SparkContext(conf);

      val lines = sc.textFile("hdfs://192.168.1.224:9000/user/root/abc.log", 1);

      val words = lines.flatMap { line => line.split(" ") };

      val pairs = words.map { word => (word, 1) };

      val wcs = pairs.reduceByKey(_+_);

      val sortWCs = wcs.map(x=>(x._2,x._1)).sortByKey().map(x=>(x._2,x._1));

      sortWCs.foreach(x=>println(x._1+" appears " + x._2 + "times"));

    }
}
二、错误出现scala版本冲突问题
The version of scala library found in the build path of TestSpark (2.10.5) is prior to the one provided by scala IDE (2.11.7). Setting a Scala Installation Choice to match.    TestSpark       Unknown Scala Version Problem
解决方法:

新建scala工程 1.在工程中移除自带的scala版本库 1.添加spark 库spark-assembly-1.1.0-cdh5.2.0-hadoop2.5.0-cdh5.2.0.jar 3.修改工程中的scala编译版本 4.右击 –> Scala –> set the Scala Installation

也可以

右击工程–> Properties –> Scala Compiler –> Use project Setting 中选择spark对应的scala版本,此处选择Lastest2.10 bundle

三、java java的程序基本按照scala的流程, 改过来的,
package com.chb.java;
//java包
import java.util.Arrays;


//spark包
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;


//scala包
import scala.Tuple2;

public class WordCount {
    public static void main(String[] args) {


        SparkConf conf = new SparkConf().setAppName("wordcount").setMaster("local");
        //java的context
        JavaSparkContext jsc = new JavaSparkContext(conf);

        JavaRDD lines = jsc.textFile("hdfs://192.168.1.224:9000/user/root/abc.log");

        JavaRDD words = lines.flatMap(new FlatMapFunction() {

            private static final long serialVersionUID = 1L;

            @Override
            public Iterable call(String line) throws Exception {
                //通过Arrays.asList()方法将数据转为List
                return Arrays.asList(line.split(" "));
            }
        });

        //此处使用JAVAPariRDD,将单词组装为(word, 1)
        JavaPairRDD pairs =  words.mapToPair(new PairFunction() {
            private static final long serialVersionUID = 1L;

            @Override
            public Tuple2 call(String word) throws Exception {
                return new Tuple2(word, 1);
            }
        });
        //reduceByKey
        JavaPairRDD wcs = pairs.reduceByKey(new Function2() {

            private static final long serialVersionUID = 1L;

            @Override
            public Integer call(Integer arg0, Integer arg1) throws Exception {
                return  arg0 + arg1;
            }
        });

        //将key value调换位置, 以便按照word的次数排序
        JavaPairRDD tmpWCs = wcs.mapToPair(new PairFunction() {

            private static final long serialVersionUID = 1L;

            @Override
            public Tuple2 call(Tuple2 tuple)
                    throws Exception {
                return new Tuple2(tuple._2, tuple._1);
            }
        });


        //排序
        JavaPairRDD sortedWCs = tmpWCs.sortByKey();


        //将key value调换位置,按照(word, num)显示
        JavaPairRDD resultWCs = sortedWCs.mapToPair(new PairFunction() {

            private static final long serialVersionUID = 1L;

            @Override
            public Tuple2 call(Tuple2 tuple)
                    throws Exception {
                return new Tuple2(tuple._2, tuple._1);
            }
        });
        resultWCs.foreach(new VoidFunction() {

            @Override
            public void call(Tuple2 arg0) throws Exception {
                System.out.println(arg0._1 + "---->" + arg0._2);
            }
        });
        jsc.close();






    }
}
关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0392s