您当前的位置: 首页 >  ar

顧棟

暂无认证

  • 0浏览

    0关注

    227博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

【ES实战】ES-Hadoop之Spark ES支持

顧棟 发布时间:2022-07-15 20:11:28 ,浏览量:0

Spark写入ES支持

本示例采用Spark2.3.3版本

文章目录
  • Spark写入ES支持
    • 写入数据类型
      • Native RDD support
        • Map
        • cass class
        • JSON
        • dynamic/multi-resources
        • handling document metadata
        • **完整代码 (可运行)**
      • Spark Streaming support
        • Map
        • cass class
        • JSON
        • dynamic/multi-resources
        • handling document metadata
        • **完整代码 (可运行)**
      • Spark SQL Support
        • Writing DataFrame (Spark SQL 1.3+) to Elasticsearch
        • 数据源
        • Push-Down operations
        • Data Sources as tables
        • Reading `DataFrame`s (Spark SQL 1.3) from Elasticsearch
        • Spark SQL Type conversion
        • **完整代码 (可运行)**
      • Spark Structured Streaming support
        • Supported Spark Structured Streaming versions
        • Writing Streaming `Datasets` (Spark SQL 2.0+) to Elasticsearch
        • Writing existing JSON to Elasticsearch
        • Spark Structured Streaming Type conversion
        • Sink commit log in Spark Structured Streaming
        • **完整代码 (可运行)**
    • ES-Hadoop中的配置项说明
其他软件版本,工程POM文件


    4.0.0
    org.example
    localsparkdemo
    1.0-SNAPSHOT

    
        1.8
        1.8
        UTF-8
        2.11
        2.11.8
        2.3.3
        1.2.76
        4.5.13
        1.18.6
    


    
        
        
            org.apache.spark
            spark-core_${scala.major.minor.version}
            ${spark.version}

        
        
            org.apache.spark
            spark-streaming_${scala.major.minor.version}
            ${spark.version}

        
        
            org.apache.spark
            spark-sql_${scala.major.minor.version}
            ${spark.version}

        
        
            org.apache.spark
            spark-hive_${scala.major.minor.version}
            ${spark.version}

        
        
        
            org.projectlombok
            lombok
            ${lombok.version}

        
        
            com.google.guava
            guava
            30.1.1-jre
        
        
            com.google.code.gson
            gson
            2.8.7
        

        
        
            com.alibaba
            fastjson
            ${fastjson.version}
        
        
        
            org.apache.httpcomponents
            httpclient
            ${httpclient.version}
        
        
            org.elasticsearch
            elasticsearch-spark-20_2.11
            6.7.2
        
        
            org.apache.hadoop
            hadoop-common
            2.7.1
        
        
            org.apache.hadoop
            hadoop-mapreduce-client-core
            2.7.1
        
    

    
        
            
                org.scala-tools
                maven-scala-plugin
                2.15.2
                
                    
                        
                            compile
                            testCompile
                        
                    
                
            
            
                maven-compiler-plugin
                3.6.0
                
                    1.8
                    1.8
                
            
            
                org.apache.maven.plugins
                maven-surefire-plugin
                2.19
                
                    true
                
            
            
                org.apache.maven.plugins
                maven-assembly-plugin
                
                    
                        jar-with-dependencies
                    
                    
                        
                            
                        
                    
                
            
        
    

Elasticsearch提供了官方的支持包,本文采用的是5.4.2版本的es

        
            org.elasticsearch
            elasticsearch-spark-20_2.11
            5.4.2
        

Spark 连接器框架对版本不兼容最敏感。 为方便起见,下面提供了版本兼容性矩阵:

Spark VersionScala VersionES-Hadoop Artifact ID1.0 - 1.22.101.0 - 1.22.111.3 - 1.62.10elasticsearch-spark-13_2.101.3 - 1.62.11elasticsearch-spark-13_2.112.0+2.10elasticsearch-spark-20_2.102.0+2.11elasticsearch-spark-20_2.11

version中填写的是ES的版本号

写入数据类型 Native RDD support Map
    // 1.data of type map
    val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
    val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")
    val mapRdd = spark.sparkContext.makeRDD(Seq(numbers, airports))
    EsSpark.saveToEs(mapRdd, "gudong20220715001/doc")
cass class
	  // define a case class called Trip
	  case class Trip(departure: String, arrival: String)
	  
	...

    // 2.cass class of type map
    val upcomingTrip = Trip("OTP", "SFO")
    val lastWeekTrip = Trip("MUC", "OTP")
    val cassClassRdd = spark.sparkContext.makeRDD(Seq(upcomingTrip, lastWeekTrip))
    EsSpark.saveToEs(cassClassRdd, "gudong20220715001/doc")
JSON
	// 3.JSON type data
    val json1 = """{"reason" : "business", "airport" : "SFO"}"""
    val json2 = """{"participants" : 5, "airport" : "OTP"}"""
    val jsonRdd = spark.sparkContext.makeRDD(Seq(json1, json2))
    EsSpark.saveJsonToEs(jsonRdd, "gudong20220715001/doc")
dynamic/multi-resources

动态支持需要将action.auto_create_index配置成true

PUT _cluster/settings
{
    "persistent": {
        "action.auto_create_index": "true" 
    }
}
    // 4.dynamic/multi-resources need config [action.auto_create_index] to [true]
    val game = Map("media_type" -> "game", "title" -> "FF VI", "year" -> "1994")
    val book = Map("media_type" -> "book", "title" -> "Harry Potter", "year" -> "2010")
    val cd = Map("media_type" -> "music", "title" -> "Surfing With The Alien")
    val rdd = spark.sparkContext.makeRDD(Seq(game, book, cd))
    // 会以各个Map中参数media_type中的值,创建索引,写入数据
    EsSpark.saveToEs(rdd, "gudong20220715001-{media_type}/doc")
handling document metadata

支持的元数据在枚举类org.elasticsearch.spark.rdd.Metadata

public enum Metadata {
    ID,
    PARENT,
    ROUTING,
    TTL,
    TIMESTAMP,
    VERSION,
    VERSION_TYPE;

    private Metadata() {
    }
}
 // 5.handling document metadata
    import org.elasticsearch.spark.rdd.Metadata._
    val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
    val muc = Map("iata" -> "MUC", "name" -> "Munich")
    val sfo = Map("iata" -> "SFO", "name" -> "San Fran")
    val otpMeta = Map(ID -> 1, TTL -> "3h")
    val mucMeta = Map(ID -> 2)
    val sfoMeta = Map(ID -> 3)
    val metadataRdd = spark.sparkContext.makeRDD(Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo)))
    val conf_extend = Map("es.mapping.id" -> "id", "es.write.operation" -> "upsert", "es.resource" -> "gudong20220715001/doc")
    EsSpark.saveToEsWithMeta(metadataRdd, conf_extend)

    // ID type for handling document metadata, also set ”Map("es.mapping.id" -> "iata")“ for upset data
    val otp2 = Map("iata" -> "OTP", "name" -> "Otopeni2")
    val muc2 = Map("iata" -> "MUC", "name" -> "Munich2")
    val sfo2 = Map("iata" -> "SFO", "name" -> "San Fran2")

    val onlyIdRdd = spark.sparkContext.makeRDD(Seq((otpMeta, otp2), (mucMeta, muc2), (sfoMeta, sfo2)))
    EsSpark.saveToEsWithMeta(onlyIdRdd, conf_extend)

完整代码 (可运行)
package org.example

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark.rdd.EsSpark

object Main {

  // define a case class called Trip
  case class Trip(departure: String, arrival: String)

  def main(args: Array[String]): Unit = {
    // config for support es
    // es.index.auto.create 当索引不存在的时候,进行自动创建
    // es.nodes es节点的IP,可以使用协调节点和数据节点 es.port是集群的http端口
    val conf = new SparkConf().setAppName("测试ES").setMaster("local")
    conf.set("es.nodes", "192.168.1.1,192.168.1.2")
      .set("es.port", "9200")
    val spark = SparkSession.builder().config(conf).getOrCreate()

    // write date to es
    // 1.data of type map
    val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
    val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")
    val mapRdd = spark.sparkContext.makeRDD(Seq(numbers, airports))
    EsSpark.saveToEs(mapRdd, "gudong20220715001/doc")

    // 2.cass class of type map
    val upcomingTrip = Trip("OTP", "SFO")
    val lastWeekTrip = Trip("MUC", "OTP")
    val cassClassRdd = spark.sparkContext.makeRDD(Seq(upcomingTrip, lastWeekTrip))
    EsSpark.saveToEs(cassClassRdd, "gudong20220715001/doc")

    // 3.JSON type data
    val json1 = """{"reason" : "business", "airport" : "SFO"}"""
    val json2 = """{"participants" : 5, "airport" : "OTP"}"""
    val jsonRdd = spark.sparkContext.makeRDD(Seq(json1, json2))
    EsSpark.saveJsonToEs(jsonRdd, "gudong20220715001/doc")

    // 4.dynamic/multi-resources need config [action.auto_create_index] to [true]
    val game = Map("media_type" -> "game", "title" -> "FF VI", "year" -> "1994")
    val book = Map("media_type" -> "book", "title" -> "Harry Potter", "year" -> "2010")
    val cd = Map("media_type" -> "music", "title" -> "Surfing With The Alien")
    val rdd = spark.sparkContext.makeRDD(Seq(game, book, cd))
    // 会以各个Map中参数media_type中的值,创建索引,写入数据
    EsSpark.saveToEs(rdd, "gudong20220715001-{media_type}/doc")

    // 5.handling document metadata
    import org.elasticsearch.spark.rdd.Metadata._
    val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
    val muc = Map("iata" -> "MUC", "name" -> "Munich")
    val sfo = Map("iata" -> "SFO", "name" -> "San Fran")
    val otpMeta = Map(ID -> 1, TTL -> "3h")
    val mucMeta = Map(ID -> 2)
    val sfoMeta = Map(ID -> 3)
    val metadataRdd = spark.sparkContext.makeRDD(Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo)))
    val conf_extend = Map("es.mapping.id" -> "id", "es.write.operation" -> "upsert", "es.resource" -> "gudong20220715001/doc")
    EsSpark.saveToEsWithMeta(metadataRdd, conf_extend)

    // ID type for handling document metadata, also set ”Map("es.mapping.id" -> "iata")“ for upset data
    val otp2 = Map("iata" -> "OTP", "name" -> "Otopeni2")
    val muc2 = Map("iata" -> "MUC", "name" -> "Munich2")
    val sfo2 = Map("iata" -> "SFO", "name" -> "San Fran2")

    val onlyIdRdd = spark.sparkContext.makeRDD(Seq((otpMeta, otp2), (mucMeta, muc2), (sfoMeta, sfo2)))
    EsSpark.saveToEsWithMeta(onlyIdRdd, conf_extend)

    spark.stop()
  }
}
Spark Streaming support

在Spark Steaming中,由于与 RDD 不同,由于 DStream 的连续性,您无法使用 DStream 从 Elasticsearch 中读取数据。

Map
	// 1.data of type map
    val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
    val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")

    val mapRdd = spark.sparkContext.makeRDD(Seq(numbers, airports))
    val mapMicroBatches = mutable.Queue(mapRdd)
    // 另一种写法 ssc.queueStream(microbatches).saveToEs("gudong20220715002/doc")
    val mapDStream = ssc.queueStream(mapMicroBatches)
    EsSparkStreaming.saveToEs(mapDStream, "gudong20220718001/doc")
cass class
    // 2.cass class of type map
    val upcomingTrip = Trip("OTP", "SFO")
    val lastWeekTrip = Trip("MUC", "OTP")
    val cassClassRdd = spark.sparkContext.makeRDD(Seq(upcomingTrip, lastWeekTrip))
    val cassClassMicroBatches = mutable.Queue(cassClassRdd)
    val cassClassDStream = ssc.queueStream(cassClassMicroBatches)
    EsSparkStreaming.saveToEs(cassClassDStream, "gudong20220718001/doc")
JSON
    // 3.JSON type data
    val json1 = """{"reason" : "business", "airport" : "SFO"}"""
    val json2 = """{"participants" : 5, "airport" : "OTP"}"""

    val jsonRdd = spark.sparkContext.makeRDD(Seq(json1, json2))
    val jsonMicroBatch = mutable.Queue(jsonRdd)
    val jsonDStream = ssc.queueStream(jsonMicroBatch)
    EsSparkStreaming.saveJsonToEs(jsonDStream, "gudong20220718001/doc")
dynamic/multi-resources
    // 4.Writing to dynamic/multi-resources 4.dynamic/multi-resources need config [action.auto_create_index] to [true]
    val game = Map("media_type" -> "game", "title" -> "FF VI", "year" -> "1994")
    val book = Map("media_type" -> "book", "title" -> "Harry Potter", "year" -> "2010")
    val cd = Map("media_type" -> "music", "title" -> "Surfing With The Alien")

    val batch = spark.sparkContext.makeRDD(Seq(game, book, cd))
    val microbatches = mutable.Queue(batch)
    val dstream3 = ssc.queueStream(microbatches)
    EsSparkStreaming.saveToEs(dstream3, "gudong20220715002-{media_type}/doc")
handling document metadata
    // 5.Handling document metadata
    val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
    val muc = Map("iata" -> "MUC", "name" -> "Munich")
    val sfo = Map("iata" -> "SFO", "name" -> "San Fran")

    val airportsRDD = spark.sparkContext.makeRDD(Seq((1, otp), (2, muc), (3, sfo)))
    val microbatches2 = mutable.Queue(airportsRDD)
    val dstream4 = ssc.queueStream(microbatches2)
    val conf_extend = Map("es.mapping.id" -> "id", "es.write.operation" -> "upsert", "es.resource" -> "gudong20220715002/doc")
    EsSparkStreaming.saveToEsWithMeta(dstream4, conf_extend)

    val otp2 = Map("iata" -> "OTP", "name" -> "Otopeni2")
    val muc2 = Map("iata" -> "MUC", "name" -> "Munich2")
    val sfo2 = Map("iata" -> "SFO", "name" -> "San Fran2")
    val airportsRDD2 = spark.sparkContext.makeRDD(Seq((1, otp2), (2, muc2), (3, sfo2)))

    val microbatches3 = mutable.Queue(airportsRDD2)
    val dstream5 = ssc.queueStream(microbatches3)
    EsSparkStreaming.saveToEsWithMeta(dstream5, conf_extend)

完整代码 (可运行)
package org.example

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.elasticsearch.spark.streaming.EsSparkStreaming

import scala.collection.mutable

object Main {

  // define a case class called Trip
  case class Trip(departure: String, arrival: String)

  def main(args: Array[String]): Unit = {
    // config for support es
    // es.index.auto.create 当索引不存在的时候,进行自动创建
    // es.nodes es节点的IP,可以使用协调节点和数据节点 es.port端口
    val conf = new SparkConf().setAppName("测试ES").setMaster("local[*]")
    conf.set("es.nodes", "192.168.1.1,192.168.1.2").set("es.port", "9200")
    val spark = SparkSession.builder().config(conf).getOrCreate()

    // Writing DStream to Elasticsearch
    // Though, unlike RDDs, you are unable to read data out of Elasticsearch using a DStream due to the continuous nature of it.

    val ssc = new StreamingContext(spark.sparkContext, Seconds(1))
    // 1.data of type map
    val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
    val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")

    val mapRdd = spark.sparkContext.makeRDD(Seq(numbers, airports))
    val mapMicroBatches = mutable.Queue(mapRdd)
    // 另一种写法 ssc.queueStream(microbatches).saveToEs("gudong20220715002/doc")
    val mapDStream = ssc.queueStream(mapMicroBatches)
    EsSparkStreaming.saveToEs(mapDStream, "gudong20220718001/doc")

    // 2.cass class of type map
    val upcomingTrip = Trip("OTP", "SFO")
    val lastWeekTrip = Trip("MUC", "OTP")
    val cassClassRdd = spark.sparkContext.makeRDD(Seq(upcomingTrip, lastWeekTrip))
    val cassClassMicroBatches = mutable.Queue(cassClassRdd)
    val cassClassDStream = ssc.queueStream(cassClassMicroBatches)
    EsSparkStreaming.saveToEs(cassClassDStream, "gudong20220718001/doc")

    // 3.JSON type data
    val json1 = """{"reason" : "business", "airport" : "SFO"}"""
    val json2 = """{"participants" : 5, "airport" : "OTP"}"""

    val jsonRdd = spark.sparkContext.makeRDD(Seq(json1, json2))
    val jsonMicroBatch = mutable.Queue(jsonRdd)
    val jsonDStream = ssc.queueStream(jsonMicroBatch)
    EsSparkStreaming.saveJsonToEs(jsonDStream, "gudong20220718001/doc")

    // 4.Writing to dynamic/multi-resources 4.dynamic/multi-resources need config [action.auto_create_index] to [true]
    val game = Map("media_type" -> "game", "title" -> "FF VI", "year" -> "1994")
    val book = Map("media_type" -> "book", "title" -> "Harry Potter", "year" -> "2010")
    val cd = Map("media_type" -> "music", "title" -> "Surfing With The Alien")

    val batch = spark.sparkContext.makeRDD(Seq(game, book, cd))
    val microbatches = mutable.Queue(batch)
    val dstream3 = ssc.queueStream(microbatches)
    EsSparkStreaming.saveToEs(dstream3, "gudong20220715002-{media_type}/doc")

    // 5 .Handling document metadata
    val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
    val muc = Map("iata" -> "MUC", "name" -> "Munich")
    val sfo = Map("iata" -> "SFO", "name" -> "San Fran")

    val airportsRDD = spark.sparkContext.makeRDD(Seq((1, otp), (2, muc), (3, sfo)))
    val microbatches2 = mutable.Queue(airportsRDD)
    val dstream4 = ssc.queueStream(microbatches2)
    val conf_extend = Map("es.mapping.id" -> "id", "es.write.operation" -> "upsert", "es.resource" -> "gudong20220715002/doc")
    EsSparkStreaming.saveToEsWithMeta(dstream4, conf_extend)

    val otp2 = Map("iata" -> "OTP", "name" -> "Otopeni2")
    val muc2 = Map("iata" -> "MUC", "name" -> "Munich2")
    val sfo2 = Map("iata" -> "SFO", "name" -> "San Fran2")
    val airportsRDD2 = spark.sparkContext.makeRDD(Seq((1, otp2), (2, muc2), (3, sfo2)))

    val microbatches3 = mutable.Queue(airportsRDD2)
    val dstream5 = ssc.queueStream(microbatches3)
    EsSparkStreaming.saveToEsWithMeta(dstream5, conf_extend)

    // 流的处理应该是实时的,这边只是为了本地测试,所以设置了等待超时关闭
    ssc.start()
    ssc.awaitTerminationOrTimeout(6000)

  }
}
Spark SQL Support

spark sql 1.3+之后的版本向ES写入数据可以采用以下方式

Writing DataFrame (Spark SQL 1.3+) to Elasticsearch
// Writing DataFrame (Spark SQL 1.3+) to Elasticsearch
val people = spark.sparkContext.textFile("F:\\people.txt")
  .map(_.split(","))
  .map(p => Person(p(0), null, p(1).trim.toInt))
  .toDF()
EsSparkSQL.saveToEs(people, "gudong20220718001/doc")
数据源

在使用 Spark SQL 时,elasticsearch-hadoop 允许通过 SQLContext 加载方法访问 Elasticsearch。 换句话说,以声明方式创建由 Elasticsearch 支持的 DataFrame/Dataset:

val sql = new SQLContext...
// Spark 1.3 style
val df = sql.load( 
  "spark/index",   
  "org.elasticsearch.spark.sql") 
  • 数据源的SQLContextload方法
  • 要加载的路径或资源 - 在本例中为 Elasticsearch 中的spark/index
  • 数据源提供者 - org.elasticsearch.spark.sql

In Spark 1.4, one would use the following similar API calls:

// Spark 1.4 style
val df = sql.read      
  .format("org.elasticsearch.spark.sql") 
  .load("spark/index") 
  • 数据源的SQLContextload方法
  • 数据源提供者 - org.elasticsearch.spark.sql
  • 要加载的路径或资源 - 在本例中为 Elasticsearch 中的spark/index

In Spark 1.5, this can be further simplified to:

// Spark 1.5 style
val df = sql.read.format("es")
  .load("spark/index")
  • 使用 es 作为别名而不是DataSource 提供程序的完整包名

无论使用什么 API,一旦创建,DataFrame 就可以自由访问以操作数据。

源声明还允许传入特定选项,即:

NameDefault valueDescriptionpathrequiredElasticsearch index/typepushdowntrue是否将Spark SQL 转换 (push-down) 为 Elasticsearch Query DSLstrictfalse是否使用精确(未分析)匹配或不(已分析)Usable in Spark 1.6 or higherdouble.filteringtrue是否告诉 Spark 在pushed down的过滤器上应用自己的过滤

下一节将解释这两个选项。 要指定选项(包括通用的 elasticsearch-hadoop 选项),只需将 Map 传递给上述方法:

For example:

val sql = new SQLContext...
// options for Spark 1.3 need to include the target path/resource
val options13 = Map("path" -> "spark/index",
                    "pushdown" -> "true",     
                    "es.nodes" -> "someNode", 
                     "es.port" -> "9200")

// Spark 1.3 style
val spark13DF = sql.load("org.elasticsearch.spark.sql", options13) 

// options for Spark 1.4 - the path/resource is specified separately
val options = Map("pushdown" -> "true",     
                  "es.nodes" -> "someNode", 
                   "es.port" -> "9200")

// Spark 1.4 style
val spark14DF = sql.read.format("org.elasticsearch.spark.sql")
                        .options(options) 
                        .load("spark/index")
sqlContext.sql(
   "CREATE TEMPORARY TABLE myIndex    " + 
   "USING org.elasticsearch.spark.sql " + 
   "OPTIONS (resource 'spark/index', nodes 'someNode')" ) 

请注意,由于 SQL 解析器的原因,不允许使用 .(以及用于分隔的其他常用字符); 连接器尝试通过自动附加 es. 前缀来解决它,但这仅适用于仅使用一个 . 指定配置选项(如上面的 es.nodes)。 因此,如果需要具有多个 . 的属性,应该使用上面的 SQLContext.loadSQLContext.read 方法,并将属性作为 Map 传递。

Push-Down operations

使用 elasticsearch-hadoop 作为 Spark 源的一个重要隐藏特性是连接器了解在 DataFrame/SQL 中执行的操作,并且默认情况下会将它们转换为适当的 QueryDSL。 换句话说,连接器直接在源头pushes down操作,在那里数据被有效地过滤掉,以便只有所需的数据流回 Spark。 这显着提高了查询性能,并最大限度地减少了 Spark 和 Elasticsearch 集群上的 CPU、内存和 I/O,因为只返回所需的数据(与批量返回数据仅由 Spark 处理和丢弃相反)。 请注意,即使指定查询,下推操作也适用 - 连接器将根据指定的 SQL 对其进行增强。

附带说明一下,elasticsearch-hadoop 支持 Spark(1.3.0 及更高版本)中可用的所有“Filter”,同时保持与 Spark 1.3.0 的向后二进制兼容性,无需任何用户即可将 SQL 操作完全推送到 Elasticsearch 干涉。

已优化为下推过滤器的运算符:

SQL 语法ES 1.x/2.x 语法ES 5.x 语法= null , is_nullmissingmust_not.exists= (strict)termterm= (not strict)matchmatch> , < , >= , 3

连接器将查询转换为:

{
  "query" : {
    "filtered" : {
      "query" : {
        "match_all" : {}

      },
      "filter" : {
        "and" : [{
            "query" : {
              "match" : {
                "arrival" : "OTP"
              }
            }
          }, {
            "days" : {
              "gt" : 3
            }
          }
        ]
      }
    }
  }
}

此外,下推过滤器可以处理已分析的术语(默认),或者可以配置为严格并提供完全匹配(仅适用于未分析的字段)。 除非手动指定映射,否则强烈建议保留默认值。 Elasticsearch 参考文档中详细讨论了这个和其他主题。

请注意,自 elasticsearch-hadoop 2.2 起适用于 Spark 1.6 或更高版本的double.filtering允许已经pushes down到 Elasticsearch 的过滤器也由 Spark 处理/评估(默认)或不处理。 关闭此功能,尤其是在处理大数据时会加快速度。 但是,应该注意语义,因为关闭此功能可能会返回不同的结果(取决于数据的索引方式、analyzednot_analyzed)。 一般来说,当开启 strict 时,也可以禁用double.filtering

Data Sources as tables

从 Spark SQL 1.2 开始可用,还可以通过将数据源声明为 Spark 临时表(由 elasticsearch-hadoop 支持)来访问数据源:

sqlContext.sql(
   "CREATE TEMPORARY TABLE myIndex    " + 
   "USING org.elasticsearch.spark.sql " + 
   "OPTIONS (resource 'spark/index', " + 
            "scroll_size '20')" ) 
  • myIndex is Spark’s temporary table name

  • USING` clause identifying the data source provider, in this case `org.elasticsearch.spark.sql
    
  • elasticsearch-hadoop 配置选项,强制性的是resource。 为方便起见,可以使用 es 前缀或跳过它。

  • 由于使用 . 会导致语法异常,因此应将其替换为 _ 样式。 因此,在这个例子中,es.scroll.size 变成了 scroll_size(因为可以删除前导的 es)。 请注意,这仅适用于 Spark 1.3,因为 Spark 1.4 具有更严格的解析器。 有关详细信息,请参阅上面的章节。

定义后,模式会自动拾取。 因此,可以立即发出查询:

val all = sqlContext.sql("SELECT * FROM myIndex WHERE id =1 AND id             
关注
打赏
1663402667
查看更多评论
0.3799s