到目前为止,我们已经听说过几个上下文,例如 SparkContext,SQLContext,HiveContext, SparkSession,现在,我们将使用 Kudu 引入一个KuduContext。这是可在Spark应用程序中广播的主要可序列化对象。此类代表在 Spark 执行程序中与 Kudu Java 客户端进行交互。 KuduContext 提供执行DDL 操作所需的方法,与本机 Kudu RDD的接口,对数据执行更新/插入/删除,将数据类型从Kudu转换为Spark 等。
1.1、使用Spark操作Kudu需要的依赖kudu-client
kudu客户端的SDK,用于操作kudu
kudu-spark2_2.11
Kudu为和Spark整合提供的整合包,由于kudu-spark没有在中央仓库,所以需要添加cloudera的仓库
cloudera
https://repository.cloudera.com/artifactory/cloudera-repos/
scala-libray
Scala的基础库
spark-core_2.11
spark-sql_2.11
spark-hive_2.11
1.2、Spark操作Kudu
1.2.1、初始化,创建KuduContext
var kuduContext: KuduContext = null
@Before
def init(): Unit = {
// 1、创建KuduContext 和Spark'Session
val spark = SparkSession.builder().master("local[*]").appName("sparkOpKudu").getOrCreate()
val KUDU_MASTER = "s202:7051,s203:7051,s204:7051"
kuduContext = new KuduContext(KUDU_MASTER, spark.sparkContext)
}
1.2.2、创建表
定义 kudu 的表需要分成 5 个步骤:
1:提供表名
2:提供 schema, 注意Spark中的Schema是StuctType
3:提供主键
4:定义重要选项;例如:定义分区的 schema
@Test
def createTable(): Unit = {
// 判断表是否存在
val tableName = "sparkopKudu"
if (kuduContext.tableExists(tableName)) kuduContext.deleteTable(tableName)
// 创建表
val schema = StructType(
StructField("id", LongType, nullable = false) ::
StructField("name", StringType, nullable = false) ::
StructField("age", IntegerType, nullable = false) :: Nil
)
val keys = Seq("id")
import scala.collection.JavaConverters._
val options = new CreateTableOptions()
.setRangePartitionColumns(List("id").asJava)
.setNumReplicas(1)
kuduContext.createTable(tableName,
schema,
keys,
options
)
1.2.3、增删改
@Test
def curd(): Unit = {
// 1、创建KuduContext 和SparkSession
val spark = SparkSession.builder()
.master("local[*]")
.appName("sparkOpKudu")
.config("spark.some.config.option", "some-value")
.getOrCreate()
// 2、创建kuduContext
val KUDU_MASTER = "s202:7051,s203:7051,s204:7051"
val kuduContext = new KuduContext(KUDU_MASTER, spark.sparkContext)
import spark.implicits._
val df = Seq(Person(1, "ling", 18), Person(2, "chb", 28)).toDF()
val tableName = "sparkopKudu"
// 删: 只需要key
kuduContext.deleteRows(df.select("id"), tableName)
// 增
kuduContext.insertRows(df, tableName)
// 改
kuduContext.updateRows(df, tableName)
}