您当前的位置: 首页 >  ar

宝哥大数据

暂无认证

  • 1浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

spark 批量读写 ES

宝哥大数据 发布时间:2021-07-09 08:40:08 ,浏览量:1

一、准备工作

需要添加elasticsearch-hadoop-version.jar, version对应集群的版本,

二、Hive 2 ES 2.1、写入ES代码

# encoding=UTF-8
# -*-coding:UTF-8-*-
import json
from pyspark.sql import SparkSession

# 设置doc_id
def addId(data):
	# 注意此处元组(id, data)
    return (data['id'], json.dumps(data))


if __name__ == '__main__':
    spark = SparkSession.builder.appName('test') \
        .enableHiveSupport().getOrCreate()


    d = {'id': '1',
         'code': '1',
         'location': {
             'lat': '32.5409',
             'lon': '123.000'
         },
         'name': '天津路',
         'cityName': '天津',

         }
    d2 = {'id': '2',
          'code': '2',
          'location': {
              'lat': '32.5409',
              'lon': '123.000'
          },
          'name': '关山路',
          'cityName': '河南',
          }

    rdd = spark.sparkContext.parallelize((d, d2))\
        .map(addId)

    #  spark.sql("select * from tableName").map()


    es_write_conf = {
        "es.nodes": "es.node.hosts",
        "es.port": "es.node.port",
        "es.resource": 'test',
        "es.input.json": "yes",
        "es.mapping.id": "id"
    }




    rdd.saveAsNewAPIHadoopFile(
        path='-',
        outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
        keyClass="org.apache.hadoop.io.NullWritable",
        valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
        conf=es_write_conf)

2.2.、运行
spark-submit --jars elasticsearch-hadoop-7.9.3.jar rdd2es.py 
三、ES 2 Hive 3.1、读取es
    reload(sys)
    sys.setdefaultencoding("utf-8" )
    spark = SparkSession.builder.appName("test") \
        .config("spark.sql.crossJoin.enabled", "true") \
        .config("spark.sql.autoBroadcastJoinThreshold", "100") \
        .enableHiveSupport().getOrCreate()

    spark.sql("SET hive.exec.dynamic.partition = true")
    spark.sql("SET hive.exec.dynamic.partition.mode = nonstrict")
    spark.sql("SET mapreduce.job.priority = HIGH")

    
    ES_HOST = "chb1"

    df01 = spark.read \
          .format("org.elasticsearch.spark.sql") \
          .option("es.nodes", ES_HOST) \
          .option("es.port", "9200") \
          .option("es.resource", "test") \
          .option("es.query", """{ "query": { "match_all": {} } }""") \
          .option("es.scroll.keepalive", "5m") \
          .option("es.scroll.size", "5000") \
          .option("es.net.http.auth.user", "user") \
          .option("es.net.http.auth.pass", "passwd") \
          .option('es.nodes.wan.only','true') \
          .option("es.read.field.as.array.include", "arrfield1, arrfield2") \
          .load()
3.2、问题 3.2.1、UnicodeEncodeError: 'ascii' codec can't encode characters in position 32-34: ordinal not in range(128)

解决方法1:

在开头加上
import sys
reload(sys)
sys.setdefaultencoding( "utf-8" )
3.2、读取array类型的es数据
.option("es.read.field.as.array.include", "arrfield1,arrfield2,arrfield3.col1") 
关注我的个人公众号【宝哥大数据】,更多干货

在这里插入图片描述

关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0457s