您当前的位置: 首页 > 

宝哥大数据

暂无认证

  • 1浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

textFile

宝哥大数据 发布时间:2019-04-20 09:39:22 ,浏览量:1

1.1、textFile, 实际调用hadoopFile

    /**
     * Read a text file from HDFS, a local file system (available on all nodes), or any
     * Hadoop-supported file system URI, and return it as an RDD of Strings.
     */
    def textFile(
            path: String,
            minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
        assertNotStopped()
        // 实际调用hadoopFile
        hadoopFile(path, 
        	classOf[TextInputFormat], classOf[LongWritable], classOf[Text],   // 此处是hadoop的类型
            minPartitions)
            .map(pair => pair._2.toString).setName(path)
    }
1.1.1、hadoopFile
  • 广播hadoop configuration, 通过BroadcastManager
  • 定义偏函数(jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path), 用于以后构件输入路径
  • 构件hadoopRDD
    /** Get an RDD for a Hadoop file with an arbitrary InputFormat
     *
     * @note Because Hadoop's RecordReader class re-uses the same Writable object for each
     * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
     * operation will create many references to the same object.
     * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
     * copy them using a `map` function.
     */
    def hadoopFile[K, V](
            path: String,
            inputFormatClass: Class[_  FileInputFormat.setInputPaths(jobConf, path)
        new HadoopRDD(
            this,
            confBroadcast,
            Some(setInputPathsFunc),
            inputFormatClass,
            keyClass,
            valueClass,
            minPartitions).setName(path)
    }
1.1.2、调用map方法将hadoopRDD映射为MappeedRDD
    /**
     * Return a new RDD by applying a function to all elements of this RDD.
     */
    def map[U: ClassTag](f: T => U): RDD[U] = withScope {
        val cleanF = sc.clean(f)
        new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
    }
  • 调用SparkContext的clean方法, 实际调用的是ClosureCleaner的clean方法, 这里意在清除闭包中不能序列化的变量, 防止RDD在网络传输过程中反序列化失败
  • 构建MapPartitionsRDD
    • 调用父类RDD的辅助构造器, 辅助构造器首先将oneParent封装为OneToOneDependency, 是NarrowDependency的实现
    /** Construct an RDD with just a one-to-one dependency on one parent */
    def this(@transient oneParent: RDD[_]) =
        this(oneParent.context, List(new OneToOneDependency(oneParent)))
关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0401s