本示例采用Spark2.3.3版本
- Spark写入ES支持
- 写入数据类型
- Native RDD support
- Map
- cass class
- JSON
- dynamic/multi-resources
- handling document metadata
- **完整代码 (可运行)**
- Spark Streaming support
- Map
- cass class
- JSON
- dynamic/multi-resources
- handling document metadata
- **完整代码 (可运行)**
- Spark SQL Support
- Writing DataFrame (Spark SQL 1.3+) to Elasticsearch
- 数据源
- Push-Down operations
- Data Sources as tables
- Reading `DataFrame`s (Spark SQL 1.3) from Elasticsearch
- Spark SQL Type conversion
- **完整代码 (可运行)**
- Spark Structured Streaming support
- Supported Spark Structured Streaming versions
- Writing Streaming `Datasets` (Spark SQL 2.0+) to Elasticsearch
- Writing existing JSON to Elasticsearch
- Spark Structured Streaming Type conversion
- Sink commit log in Spark Structured Streaming
- **完整代码 (可运行)**
- ES-Hadoop中的配置项说明
4.0.0
org.example
localsparkdemo
1.0-SNAPSHOT
1.8
1.8
UTF-8
2.11
2.11.8
2.3.3
1.2.76
4.5.13
1.18.6
org.apache.spark
spark-core_${scala.major.minor.version}
${spark.version}
org.apache.spark
spark-streaming_${scala.major.minor.version}
${spark.version}
org.apache.spark
spark-sql_${scala.major.minor.version}
${spark.version}
org.apache.spark
spark-hive_${scala.major.minor.version}
${spark.version}
org.projectlombok
lombok
${lombok.version}
com.google.guava
guava
30.1.1-jre
com.google.code.gson
gson
2.8.7
com.alibaba
fastjson
${fastjson.version}
org.apache.httpcomponents
httpclient
${httpclient.version}
org.elasticsearch
elasticsearch-spark-20_2.11
6.7.2
org.apache.hadoop
hadoop-common
2.7.1
org.apache.hadoop
hadoop-mapreduce-client-core
2.7.1
org.scala-tools
maven-scala-plugin
2.15.2
compile
testCompile
maven-compiler-plugin
3.6.0
1.8
1.8
org.apache.maven.plugins
maven-surefire-plugin
2.19
true
org.apache.maven.plugins
maven-assembly-plugin
jar-with-dependencies
Elasticsearch提供了官方的支持包,本文采用的是5.4.2版本的es
org.elasticsearch
elasticsearch-spark-20_2.11
5.4.2
Spark 连接器框架对版本不兼容最敏感。 为方便起见,下面提供了版本兼容性矩阵:
Spark VersionScala VersionES-Hadoop Artifact ID1.0 - 1.22.10
1.0 - 1.22.11
1.3 - 1.62.10elasticsearch-spark-13_2.101.3 - 1.62.11elasticsearch-spark-13_2.112.0+2.10elasticsearch-spark-20_2.102.0+2.11elasticsearch-spark-20_2.11
version中填写的是ES的版本号
写入数据类型 Native RDD support Map // 1.data of type map
val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")
val mapRdd = spark.sparkContext.makeRDD(Seq(numbers, airports))
EsSpark.saveToEs(mapRdd, "gudong20220715001/doc")
cass class
// define a case class called Trip
case class Trip(departure: String, arrival: String)
...
// 2.cass class of type map
val upcomingTrip = Trip("OTP", "SFO")
val lastWeekTrip = Trip("MUC", "OTP")
val cassClassRdd = spark.sparkContext.makeRDD(Seq(upcomingTrip, lastWeekTrip))
EsSpark.saveToEs(cassClassRdd, "gudong20220715001/doc")
JSON
// 3.JSON type data
val json1 = """{"reason" : "business", "airport" : "SFO"}"""
val json2 = """{"participants" : 5, "airport" : "OTP"}"""
val jsonRdd = spark.sparkContext.makeRDD(Seq(json1, json2))
EsSpark.saveJsonToEs(jsonRdd, "gudong20220715001/doc")
dynamic/multi-resources
动态支持需要将action.auto_create_index配置成true
PUT _cluster/settings
{
"persistent": {
"action.auto_create_index": "true"
}
}
// 4.dynamic/multi-resources need config [action.auto_create_index] to [true]
val game = Map("media_type" -> "game", "title" -> "FF VI", "year" -> "1994")
val book = Map("media_type" -> "book", "title" -> "Harry Potter", "year" -> "2010")
val cd = Map("media_type" -> "music", "title" -> "Surfing With The Alien")
val rdd = spark.sparkContext.makeRDD(Seq(game, book, cd))
// 会以各个Map中参数media_type中的值,创建索引,写入数据
EsSpark.saveToEs(rdd, "gudong20220715001-{media_type}/doc")
handling document metadata
支持的元数据在枚举类org.elasticsearch.spark.rdd.Metadata
中
public enum Metadata {
ID,
PARENT,
ROUTING,
TTL,
TIMESTAMP,
VERSION,
VERSION_TYPE;
private Metadata() {
}
}
// 5.handling document metadata
import org.elasticsearch.spark.rdd.Metadata._
val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
val muc = Map("iata" -> "MUC", "name" -> "Munich")
val sfo = Map("iata" -> "SFO", "name" -> "San Fran")
val otpMeta = Map(ID -> 1, TTL -> "3h")
val mucMeta = Map(ID -> 2)
val sfoMeta = Map(ID -> 3)
val metadataRdd = spark.sparkContext.makeRDD(Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo)))
val conf_extend = Map("es.mapping.id" -> "id", "es.write.operation" -> "upsert", "es.resource" -> "gudong20220715001/doc")
EsSpark.saveToEsWithMeta(metadataRdd, conf_extend)
// ID type for handling document metadata, also set ”Map("es.mapping.id" -> "iata")“ for upset data
val otp2 = Map("iata" -> "OTP", "name" -> "Otopeni2")
val muc2 = Map("iata" -> "MUC", "name" -> "Munich2")
val sfo2 = Map("iata" -> "SFO", "name" -> "San Fran2")
val onlyIdRdd = spark.sparkContext.makeRDD(Seq((otpMeta, otp2), (mucMeta, muc2), (sfoMeta, sfo2)))
EsSpark.saveToEsWithMeta(onlyIdRdd, conf_extend)
完整代码 (可运行)
package org.example
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark.rdd.EsSpark
object Main {
// define a case class called Trip
case class Trip(departure: String, arrival: String)
def main(args: Array[String]): Unit = {
// config for support es
// es.index.auto.create 当索引不存在的时候,进行自动创建
// es.nodes es节点的IP,可以使用协调节点和数据节点 es.port是集群的http端口
val conf = new SparkConf().setAppName("测试ES").setMaster("local")
conf.set("es.nodes", "192.168.1.1,192.168.1.2")
.set("es.port", "9200")
val spark = SparkSession.builder().config(conf).getOrCreate()
// write date to es
// 1.data of type map
val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")
val mapRdd = spark.sparkContext.makeRDD(Seq(numbers, airports))
EsSpark.saveToEs(mapRdd, "gudong20220715001/doc")
// 2.cass class of type map
val upcomingTrip = Trip("OTP", "SFO")
val lastWeekTrip = Trip("MUC", "OTP")
val cassClassRdd = spark.sparkContext.makeRDD(Seq(upcomingTrip, lastWeekTrip))
EsSpark.saveToEs(cassClassRdd, "gudong20220715001/doc")
// 3.JSON type data
val json1 = """{"reason" : "business", "airport" : "SFO"}"""
val json2 = """{"participants" : 5, "airport" : "OTP"}"""
val jsonRdd = spark.sparkContext.makeRDD(Seq(json1, json2))
EsSpark.saveJsonToEs(jsonRdd, "gudong20220715001/doc")
// 4.dynamic/multi-resources need config [action.auto_create_index] to [true]
val game = Map("media_type" -> "game", "title" -> "FF VI", "year" -> "1994")
val book = Map("media_type" -> "book", "title" -> "Harry Potter", "year" -> "2010")
val cd = Map("media_type" -> "music", "title" -> "Surfing With The Alien")
val rdd = spark.sparkContext.makeRDD(Seq(game, book, cd))
// 会以各个Map中参数media_type中的值,创建索引,写入数据
EsSpark.saveToEs(rdd, "gudong20220715001-{media_type}/doc")
// 5.handling document metadata
import org.elasticsearch.spark.rdd.Metadata._
val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
val muc = Map("iata" -> "MUC", "name" -> "Munich")
val sfo = Map("iata" -> "SFO", "name" -> "San Fran")
val otpMeta = Map(ID -> 1, TTL -> "3h")
val mucMeta = Map(ID -> 2)
val sfoMeta = Map(ID -> 3)
val metadataRdd = spark.sparkContext.makeRDD(Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo)))
val conf_extend = Map("es.mapping.id" -> "id", "es.write.operation" -> "upsert", "es.resource" -> "gudong20220715001/doc")
EsSpark.saveToEsWithMeta(metadataRdd, conf_extend)
// ID type for handling document metadata, also set ”Map("es.mapping.id" -> "iata")“ for upset data
val otp2 = Map("iata" -> "OTP", "name" -> "Otopeni2")
val muc2 = Map("iata" -> "MUC", "name" -> "Munich2")
val sfo2 = Map("iata" -> "SFO", "name" -> "San Fran2")
val onlyIdRdd = spark.sparkContext.makeRDD(Seq((otpMeta, otp2), (mucMeta, muc2), (sfoMeta, sfo2)))
EsSpark.saveToEsWithMeta(onlyIdRdd, conf_extend)
spark.stop()
}
}
Spark Streaming support
在Spark Steaming中,由于与 RDD 不同,由于 DStream 的连续性,您无法使用 DStream 从 Elasticsearch 中读取数据。
Map // 1.data of type map
val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")
val mapRdd = spark.sparkContext.makeRDD(Seq(numbers, airports))
val mapMicroBatches = mutable.Queue(mapRdd)
// 另一种写法 ssc.queueStream(microbatches).saveToEs("gudong20220715002/doc")
val mapDStream = ssc.queueStream(mapMicroBatches)
EsSparkStreaming.saveToEs(mapDStream, "gudong20220718001/doc")
cass class
// 2.cass class of type map
val upcomingTrip = Trip("OTP", "SFO")
val lastWeekTrip = Trip("MUC", "OTP")
val cassClassRdd = spark.sparkContext.makeRDD(Seq(upcomingTrip, lastWeekTrip))
val cassClassMicroBatches = mutable.Queue(cassClassRdd)
val cassClassDStream = ssc.queueStream(cassClassMicroBatches)
EsSparkStreaming.saveToEs(cassClassDStream, "gudong20220718001/doc")
JSON
// 3.JSON type data
val json1 = """{"reason" : "business", "airport" : "SFO"}"""
val json2 = """{"participants" : 5, "airport" : "OTP"}"""
val jsonRdd = spark.sparkContext.makeRDD(Seq(json1, json2))
val jsonMicroBatch = mutable.Queue(jsonRdd)
val jsonDStream = ssc.queueStream(jsonMicroBatch)
EsSparkStreaming.saveJsonToEs(jsonDStream, "gudong20220718001/doc")
dynamic/multi-resources
// 4.Writing to dynamic/multi-resources 4.dynamic/multi-resources need config [action.auto_create_index] to [true]
val game = Map("media_type" -> "game", "title" -> "FF VI", "year" -> "1994")
val book = Map("media_type" -> "book", "title" -> "Harry Potter", "year" -> "2010")
val cd = Map("media_type" -> "music", "title" -> "Surfing With The Alien")
val batch = spark.sparkContext.makeRDD(Seq(game, book, cd))
val microbatches = mutable.Queue(batch)
val dstream3 = ssc.queueStream(microbatches)
EsSparkStreaming.saveToEs(dstream3, "gudong20220715002-{media_type}/doc")
handling document metadata
// 5.Handling document metadata
val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
val muc = Map("iata" -> "MUC", "name" -> "Munich")
val sfo = Map("iata" -> "SFO", "name" -> "San Fran")
val airportsRDD = spark.sparkContext.makeRDD(Seq((1, otp), (2, muc), (3, sfo)))
val microbatches2 = mutable.Queue(airportsRDD)
val dstream4 = ssc.queueStream(microbatches2)
val conf_extend = Map("es.mapping.id" -> "id", "es.write.operation" -> "upsert", "es.resource" -> "gudong20220715002/doc")
EsSparkStreaming.saveToEsWithMeta(dstream4, conf_extend)
val otp2 = Map("iata" -> "OTP", "name" -> "Otopeni2")
val muc2 = Map("iata" -> "MUC", "name" -> "Munich2")
val sfo2 = Map("iata" -> "SFO", "name" -> "San Fran2")
val airportsRDD2 = spark.sparkContext.makeRDD(Seq((1, otp2), (2, muc2), (3, sfo2)))
val microbatches3 = mutable.Queue(airportsRDD2)
val dstream5 = ssc.queueStream(microbatches3)
EsSparkStreaming.saveToEsWithMeta(dstream5, conf_extend)
完整代码 (可运行)
package org.example
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.elasticsearch.spark.streaming.EsSparkStreaming
import scala.collection.mutable
object Main {
// define a case class called Trip
case class Trip(departure: String, arrival: String)
def main(args: Array[String]): Unit = {
// config for support es
// es.index.auto.create 当索引不存在的时候,进行自动创建
// es.nodes es节点的IP,可以使用协调节点和数据节点 es.port端口
val conf = new SparkConf().setAppName("测试ES").setMaster("local[*]")
conf.set("es.nodes", "192.168.1.1,192.168.1.2").set("es.port", "9200")
val spark = SparkSession.builder().config(conf).getOrCreate()
// Writing DStream to Elasticsearch
// Though, unlike RDDs, you are unable to read data out of Elasticsearch using a DStream due to the continuous nature of it.
val ssc = new StreamingContext(spark.sparkContext, Seconds(1))
// 1.data of type map
val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")
val mapRdd = spark.sparkContext.makeRDD(Seq(numbers, airports))
val mapMicroBatches = mutable.Queue(mapRdd)
// 另一种写法 ssc.queueStream(microbatches).saveToEs("gudong20220715002/doc")
val mapDStream = ssc.queueStream(mapMicroBatches)
EsSparkStreaming.saveToEs(mapDStream, "gudong20220718001/doc")
// 2.cass class of type map
val upcomingTrip = Trip("OTP", "SFO")
val lastWeekTrip = Trip("MUC", "OTP")
val cassClassRdd = spark.sparkContext.makeRDD(Seq(upcomingTrip, lastWeekTrip))
val cassClassMicroBatches = mutable.Queue(cassClassRdd)
val cassClassDStream = ssc.queueStream(cassClassMicroBatches)
EsSparkStreaming.saveToEs(cassClassDStream, "gudong20220718001/doc")
// 3.JSON type data
val json1 = """{"reason" : "business", "airport" : "SFO"}"""
val json2 = """{"participants" : 5, "airport" : "OTP"}"""
val jsonRdd = spark.sparkContext.makeRDD(Seq(json1, json2))
val jsonMicroBatch = mutable.Queue(jsonRdd)
val jsonDStream = ssc.queueStream(jsonMicroBatch)
EsSparkStreaming.saveJsonToEs(jsonDStream, "gudong20220718001/doc")
// 4.Writing to dynamic/multi-resources 4.dynamic/multi-resources need config [action.auto_create_index] to [true]
val game = Map("media_type" -> "game", "title" -> "FF VI", "year" -> "1994")
val book = Map("media_type" -> "book", "title" -> "Harry Potter", "year" -> "2010")
val cd = Map("media_type" -> "music", "title" -> "Surfing With The Alien")
val batch = spark.sparkContext.makeRDD(Seq(game, book, cd))
val microbatches = mutable.Queue(batch)
val dstream3 = ssc.queueStream(microbatches)
EsSparkStreaming.saveToEs(dstream3, "gudong20220715002-{media_type}/doc")
// 5 .Handling document metadata
val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
val muc = Map("iata" -> "MUC", "name" -> "Munich")
val sfo = Map("iata" -> "SFO", "name" -> "San Fran")
val airportsRDD = spark.sparkContext.makeRDD(Seq((1, otp), (2, muc), (3, sfo)))
val microbatches2 = mutable.Queue(airportsRDD)
val dstream4 = ssc.queueStream(microbatches2)
val conf_extend = Map("es.mapping.id" -> "id", "es.write.operation" -> "upsert", "es.resource" -> "gudong20220715002/doc")
EsSparkStreaming.saveToEsWithMeta(dstream4, conf_extend)
val otp2 = Map("iata" -> "OTP", "name" -> "Otopeni2")
val muc2 = Map("iata" -> "MUC", "name" -> "Munich2")
val sfo2 = Map("iata" -> "SFO", "name" -> "San Fran2")
val airportsRDD2 = spark.sparkContext.makeRDD(Seq((1, otp2), (2, muc2), (3, sfo2)))
val microbatches3 = mutable.Queue(airportsRDD2)
val dstream5 = ssc.queueStream(microbatches3)
EsSparkStreaming.saveToEsWithMeta(dstream5, conf_extend)
// 流的处理应该是实时的,这边只是为了本地测试,所以设置了等待超时关闭
ssc.start()
ssc.awaitTerminationOrTimeout(6000)
}
}
Spark SQL Support
在spark sql 1.3+
之后的版本向ES写入数据可以采用以下方式
// Writing DataFrame (Spark SQL 1.3+) to Elasticsearch
val people = spark.sparkContext.textFile("F:\\people.txt")
.map(_.split(","))
.map(p => Person(p(0), null, p(1).trim.toInt))
.toDF()
EsSparkSQL.saveToEs(people, "gudong20220718001/doc")
数据源
在使用 Spark SQL 时,elasticsearch-hadoop 允许通过 SQLContext 加载方法访问 Elasticsearch。 换句话说,以声明方式创建由 Elasticsearch 支持的 DataFrame/Dataset:
val sql = new SQLContext...
// Spark 1.3 style
val df = sql.load(
"spark/index",
"org.elasticsearch.spark.sql")
- 数据源的
SQLContext
的load
方法 - 要加载的路径或资源 - 在本例中为 Elasticsearch 中的spark/index
- 数据源提供者 -
org.elasticsearch.spark.sql
In Spark 1.4, one would use the following similar API calls:
// Spark 1.4 style
val df = sql.read
.format("org.elasticsearch.spark.sql")
.load("spark/index")
- 数据源的
SQLContext
的load
方法 - 数据源提供者 -
org.elasticsearch.spark.sql
- 要加载的路径或资源 - 在本例中为 Elasticsearch 中的spark/index
In Spark 1.5, this can be further simplified to:
// Spark 1.5 style
val df = sql.read.format("es")
.load("spark/index")
- 使用
es
作为别名而不是DataSource
提供程序的完整包名
无论使用什么 API,一旦创建,DataFrame 就可以自由访问以操作数据。
源声明还允许传入特定选项,即:
NameDefault valueDescriptionpath
requiredElasticsearch index/typepushdown
true
是否将Spark SQL 转换 (push-down) 为 Elasticsearch Query DSLstrict
false
是否使用精确(未分析)匹配或不(已分析)Usable in Spark 1.6 or higherdouble.filtering
true
是否告诉 Spark 在pushed down的过滤器上应用自己的过滤
下一节将解释这两个选项。 要指定选项(包括通用的 elasticsearch-hadoop 选项),只需将 Map 传递给上述方法:
For example:
val sql = new SQLContext...
// options for Spark 1.3 need to include the target path/resource
val options13 = Map("path" -> "spark/index",
"pushdown" -> "true",
"es.nodes" -> "someNode",
"es.port" -> "9200")
// Spark 1.3 style
val spark13DF = sql.load("org.elasticsearch.spark.sql", options13)
// options for Spark 1.4 - the path/resource is specified separately
val options = Map("pushdown" -> "true",
"es.nodes" -> "someNode",
"es.port" -> "9200")
// Spark 1.4 style
val spark14DF = sql.read.format("org.elasticsearch.spark.sql")
.options(options)
.load("spark/index")
sqlContext.sql(
"CREATE TEMPORARY TABLE myIndex " +
"USING org.elasticsearch.spark.sql " +
"OPTIONS (resource 'spark/index', nodes 'someNode')" )
请注意,由于 SQL 解析器的原因,不允许使用 .
(以及用于分隔的其他常用字符); 连接器尝试通过自动附加 es.
前缀来解决它,但这仅适用于仅使用一个 .
指定配置选项(如上面的 es.nodes
)。 因此,如果需要具有多个 .
的属性,应该使用上面的 SQLContext.load
或 SQLContext.read
方法,并将属性作为 Map
传递。
使用 elasticsearch-hadoop 作为 Spark 源的一个重要隐藏特性是连接器了解在 DataFrame/SQL 中执行的操作,并且默认情况下会将它们转换为适当的 QueryDSL。 换句话说,连接器直接在源头pushes down操作,在那里数据被有效地过滤掉,以便只有所需的数据流回 Spark。 这显着提高了查询性能,并最大限度地减少了 Spark 和 Elasticsearch 集群上的 CPU、内存和 I/O,因为只返回所需的数据(与批量返回数据仅由 Spark 处理和丢弃相反)。 请注意,即使指定查询,下推操作也适用 - 连接器将根据指定的 SQL 对其进行增强。
附带说明一下,elasticsearch-hadoop 支持 Spark(1.3.0 及更高版本)中可用的所有“Filter”,同时保持与 Spark 1.3.0 的向后二进制兼容性,无需任何用户即可将 SQL 操作完全推送到 Elasticsearch 干涉。
已优化为下推过滤器的运算符:
SQL 语法ES 1.x/2.x 语法ES 5.x 语法= null , is_nullmissingmust_not.exists= (strict)termterm= (not strict)matchmatch> , < , >= , 3连接器将查询转换为:
{
"query" : {
"filtered" : {
"query" : {
"match_all" : {}
},
"filter" : {
"and" : [{
"query" : {
"match" : {
"arrival" : "OTP"
}
}
}, {
"days" : {
"gt" : 3
}
}
]
}
}
}
}
此外,下推过滤器可以处理已分析的术语(默认),或者可以配置为严格并提供完全匹配(仅适用于未分析的字段)。 除非手动指定映射,否则强烈建议保留默认值。 Elasticsearch 参考文档中详细讨论了这个和其他主题。
请注意,自 elasticsearch-hadoop 2.2 起适用于 Spark 1.6 或更高版本的double.filtering
允许已经pushes down到 Elasticsearch 的过滤器也由 Spark 处理/评估(默认)或不处理。 关闭此功能,尤其是在处理大数据时会加快速度。 但是,应该注意语义,因为关闭此功能可能会返回不同的结果(取决于数据的索引方式、analyzed
与not_analyzed
)。 一般来说,当开启 strict 时,也可以禁用double.filtering
。
从 Spark SQL 1.2 开始可用,还可以通过将数据源声明为 Spark 临时表(由 elasticsearch-hadoop 支持)来访问数据源:
sqlContext.sql(
"CREATE TEMPORARY TABLE myIndex " +
"USING org.elasticsearch.spark.sql " +
"OPTIONS (resource 'spark/index', " +
"scroll_size '20')" )
-
myIndex is Spark’s temporary table name
-
USING` clause identifying the data source provider, in this case `org.elasticsearch.spark.sql
-
elasticsearch-hadoop 配置选项,强制性的是
resource
。 为方便起见,可以使用es
前缀或跳过它。 -
由于使用
.
会导致语法异常,因此应将其替换为_
样式。 因此,在这个例子中,es.scroll.size
变成了scroll_size
(因为可以删除前导的es
)。 请注意,这仅适用于 Spark 1.3,因为 Spark 1.4 具有更严格的解析器。 有关详细信息,请参阅上面的章节。
定义后,模式会自动拾取。 因此,可以立即发出查询:
val all = sqlContext.sql("SELECT * FROM myIndex WHERE id =1 AND id
关注
打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?