您当前的位置: 首页 >  ar

cuiyaonan2000

暂无认证

  • 2浏览

    0关注

    248博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Launch SparkSql Task With SpringBoot

cuiyaonan2000 发布时间:2022-02-16 13:57:46 ,浏览量:2

序言

之前我们都是使用Jdbc的形式连接到Hive或者Spark的服务操作数据.这次我们使用spark-submit命令来提交Jar的形式来操作数据.故此不需要启动Thrift JDBC/ODBC server服务cuiyaonan2000@163.com

参考资料:

  1. Quick Start - Spark 3.2.1 Documentation    --- 整体搭建信息
  2. Hive Tables - Spark 3.2.1 Documentation    ---Hive 相关

官方用例

官网有Java的示例,同时也是使用Maven构建的在我们下载好的Spark包中就有源代码.

Maven的依赖信息,注意红色字体,同时根据我们在搭建Spark集群的时候都是需要依赖Scala包的.所以这里也不例外.只是官方实例没有写出来而已,大家要自己摸索cuiyaonan2000@163.com

时代在发展历史在进步,我们代码里只需要引入这一个jar.通过它的引用我可以看到,相关的引用都被该Jar引入了,不需要我们在引用很多Jar就能直接使用SparkJar(cuiyaonan2000@163.com)

        

 同时也包含了scala的依赖

 

代码如下:

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package cui.yao.nan.config;

// $example on:spark_hive$
import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// $example off:spark_hive$

public class JavaSparkHiveExample {

  // $example on:spark_hive$
  public static class Record implements Serializable {
    private int key;
    private String value;

    public int getKey() {
      return key;
    }

    public void setKey(int key) {
      this.key = key;
    }

    public String getValue() {
      return value;
    }

    public void setValue(String value) {
      this.value = value;
    }
  }
  // $example off:spark_hive$

  public static void main(String[] args) {
    // $example on:spark_hive$
    // warehouseLocation points to the default location for managed databases and tables
    String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
    SparkSession spark = SparkSession
      .builder()
      .appName("Java Spark Hive Example")
      .config("spark.sql.warehouse.dir", warehouseLocation)
      .enableHiveSupport()
      .getOrCreate();

    spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive");
    spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");

    // Queries are expressed in HiveQL
    spark.sql("SELECT * FROM src").show();
    // +---+-------+
    // |key|  value|
    // +---+-------+
    // |238|val_238|
    // | 86| val_86|
    // |311|val_311|
    // ...

    // Aggregation queries are also supported.
    spark.sql("SELECT COUNT(*) FROM src").show();
    // +--------+
    // |count(1)|
    // +--------+
    // |    500 |
    // +--------+

    // The results of SQL queries are themselves DataFrames and support all normal functions.
    Dataset sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");

    // The items in DataFrames are of type Row, which lets you to access each column by ordinal.
    Dataset stringsDS = sqlDF.map(
        (MapFunction) row -> "Key: " + row.get(0) + ", Value: " + row.get(1),
        Encoders.STRING());
    stringsDS.show();
    // +--------------------+
    // |               value|
    // +--------------------+
    // |Key: 0, Value: val_0|
    // |Key: 0, Value: val_0|
    // |Key: 0, Value: val_0|
    // ...

    // You can also use DataFrames to create temporary views within a SparkSession.
    List records = new ArrayList();
    for (int key = 1; key < 100; key++) {
      Record record = new Record();
      record.setKey(key);
      record.setValue("val_" + key);
      records.add(record);
    }
    Dataset recordsDF = spark.createDataFrame(records, Record.class);
    recordsDF.createOrReplaceTempView("records");

    // Queries can then join DataFrames data with data stored in Hive.
    spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show();
    // +---+------+---+------+
    // |key| value|key| value|
    // +---+------+---+------+
    // |  2| val_2|  2| val_2|
    // |  2| val_2|  2| val_2|
    // |  4| val_4|  4| val_4|
    // ...
    // $example off:spark_hive$

    spark.stop();
  }
}

Spark任务启动方式

现在随着Spark版本的更迭,网上会有很多种的启动方式.但是我们可以从最新的官网上看到Spark的主启动程序是由SparkSession 来启动的.但是很多网上的代码由于Spark版本原因使用的是老的对象启动的.同时博主没有更新博客所以会引起困扰,所以这里顺便说明下cuiyaonan2000@163.com

SparkContext 
  1. 它可以帮助执行Spark任务,并与资源管理器(如YARN 或Mesos)进行协调。
  2. 使用SparkContext,可以访问其他上下文,比如SQLContext和HiveContext。
  3. 使用SparkContext,我们可以为Spark作业设置配置参数。

如果还没有SparkContext,可以先创建一个SparkConf。

//set up the spark configuration
val sparkConf = new SparkConf().setAppName("hirw").setMaster("yarn")
//get SparkContext using the SparkConf
val sc = new SparkContext(sparkConf)
SQLContext 

SQLContext是通往SparkSQL的入口。下面是如何使用SparkContext创建SQLContext。

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

HiveContext 

HiveContext是通往hive入口。 HiveContext集成了SQLContext的所有功能,并且进行了扩展.这意味着它支持SQLContext支持的功能以及更多(Hive特定的功能)

public class HiveContext extends SQLContext implements Logging

同理也是通过sc(sparkcontext)创建

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

SparkSession 

它使开发人员可以轻松地使用它,这样我们就不用担心不同的上下文, 并简化了对不同上下文的访问。通过访问SparkSession,我们可以自动访问SparkContext。

下面是如何创建一个SparkSession

val spark = SparkSession
.builder()
.appName("hirw-hive-test")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()

总结

通过spark-submit jar的形式提交的任务,其实也是需要启动Spark web UI的(这个直接连接spark thriftserver 其实更好,没有必要提交了cuiyaonan2000@163.com),查看日志可以看到 当任务结束的时候需要依次关掉SparkContext和thriftserver.

关注
打赏
1638267374
查看更多评论
立即登录/注册

微信扫码登录

0.0389s