您当前的位置: 首页 >  ar

宝哥大数据

暂无认证

  • 0浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Spark操作Kudu

宝哥大数据 发布时间:2021-01-21 09:24:48 ,浏览量:0

  到目前为止,我们已经听说过几个上下文,例如 SparkContext,SQLContext,HiveContext, SparkSession,现在,我们将使用 Kudu 引入一个KuduContext。这是可在Spark应用程序中广播的主要可序列化对象。此类代表在 Spark 执行程序中与 Kudu Java 客户端进行交互。 KuduContext 提供执行DDL 操作所需的方法,与本机 Kudu RDD的接口,对数据执行更新/插入/删除,将数据类型从Kudu转换为Spark 等。

1.1、使用Spark操作Kudu需要的依赖
kudu-client
	kudu客户端的SDK,用于操作kudu
kudu-spark2_2.11
	Kudu为和Spark整合提供的整合包,由于kudu-spark没有在中央仓库,所以需要添加cloudera的仓库
	
        
            cloudera
            https://repository.cloudera.com/artifactory/cloudera-repos/
        
    
    
scala-libray
	Scala的基础库
spark-core_2.11
spark-sql_2.11
spark-hive_2.11

1.2、Spark操作Kudu 1.2.1、初始化,创建KuduContext
  var kuduContext: KuduContext = null

  @Before
  def init(): Unit = {
    // 1、创建KuduContext 和Spark'Session
    val spark = SparkSession.builder().master("local[*]").appName("sparkOpKudu").getOrCreate()

    val KUDU_MASTER = "s202:7051,s203:7051,s204:7051"
    kuduContext = new KuduContext(KUDU_MASTER, spark.sparkContext)
  }

1.2.2、创建表
定义 kudu 的表需要分成 5 个步骤:
1:提供表名
2:提供 schema, 注意Spark中的Schema是StuctType
3:提供主键
4:定义重要选项;例如:定义分区的 schema

  @Test
  def createTable(): Unit = {
    // 判断表是否存在
    val tableName = "sparkopKudu"
    if (kuduContext.tableExists(tableName)) kuduContext.deleteTable(tableName)


    // 创建表
    val schema = StructType(
      StructField("id", LongType, nullable = false) ::
        StructField("name", StringType, nullable = false) ::
        StructField("age", IntegerType, nullable = false) :: Nil
    )
    val keys = Seq("id")

    import scala.collection.JavaConverters._
    val options = new CreateTableOptions()
      .setRangePartitionColumns(List("id").asJava)
      .setNumReplicas(1)

    kuduContext.createTable(tableName,
      schema,
      keys,
      options
    )
1.2.3、增删改
  @Test
  def curd(): Unit = {
    // 1、创建KuduContext 和SparkSession
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("sparkOpKudu")
      .config("spark.some.config.option", "some-value")
      .getOrCreate()

    // 2、创建kuduContext
    val KUDU_MASTER = "s202:7051,s203:7051,s204:7051"
    val kuduContext = new KuduContext(KUDU_MASTER, spark.sparkContext)

    import spark.implicits._
    val df = Seq(Person(1, "ling", 18), Person(2, "chb", 28)).toDF()
    val tableName = "sparkopKudu"

    // 删: 只需要key
    kuduContext.deleteRows(df.select("id"), tableName)

    // 增
    kuduContext.insertRows(df, tableName)

    // 改
    kuduContext.updateRows(df, tableName)
  }
关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0387s