call回调
/** * Return a new RDD by first applying a function to all elements of this * RDD, and then flattening the results. */ def flatMap[U](f: FlatMapFunction[T, U]): JavaRDD[U] = { import scala.collection.JavaConverters._ def fn: (T) => Iterable[U] = (x: T) => f.call(x).asScala JavaRDD.fromRDD(rdd.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U]) }
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */
package com.dt.spark.java.cores;
import scala.Tuple2; import scala.annotation.serializable;
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction;
import java.util.Arrays; import java.util.List; import java.util.regex.Pattern;
public final class JavaWordCount extends serializable{ private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws Exception {
/* if (args.length < 1) { System.err.println("Usage: JavaWordCount "); System.exit(1); }*/
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount").setMaster("local"); JavaSparkContext ctx = new JavaSparkContext(sparkConf); //JavaRDD lines = ctx.textFile(args[0], 1); JavaRDD lines = ctx.textFile( "G://IMFBigDataSpark2016//Bigdata_Software//spark-1.6.0-bin-hadoop2.6//spark-1.6.0-bin-hadoop2.6//spark-1.6.0-bin-hadoop2.6//README.md"); JavaRDD words = lines.flatMap(new FlatMapFunction() { /** * */ private static final long serialVersionUID = 1L;
@Override public Iterable call(String s) { return Arrays.asList(SPACE.split(s)); } });
JavaPairRDD ones = words.mapToPair(new PairFunction() { @Override public Tuple2 call(String s) { return new Tuple2(s, 1); } });
JavaPairRDD counts = ones.reduceByKey(new Function2() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } });
List output = counts.collect(); for (Tuple2 tuple : output) { System.out.println(tuple._1() + ": " + tuple._2()); } ctx.stop(); } }