一、准备工作
需要添加elasticsearch-hadoop-version.jar
, version对应集群的版本,
# encoding=UTF-8
# -*-coding:UTF-8-*-
import json
from pyspark.sql import SparkSession
# 设置doc_id
def addId(data):
# 注意此处元组(id, data)
return (data['id'], json.dumps(data))
if __name__ == '__main__':
spark = SparkSession.builder.appName('test') \
.enableHiveSupport().getOrCreate()
d = {'id': '1',
'code': '1',
'location': {
'lat': '32.5409',
'lon': '123.000'
},
'name': '天津路',
'cityName': '天津',
}
d2 = {'id': '2',
'code': '2',
'location': {
'lat': '32.5409',
'lon': '123.000'
},
'name': '关山路',
'cityName': '河南',
}
rdd = spark.sparkContext.parallelize((d, d2))\
.map(addId)
# spark.sql("select * from tableName").map()
es_write_conf = {
"es.nodes": "es.node.hosts",
"es.port": "es.node.port",
"es.resource": 'test',
"es.input.json": "yes",
"es.mapping.id": "id"
}
rdd.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_write_conf)
2.2.、运行
spark-submit --jars elasticsearch-hadoop-7.9.3.jar rdd2es.py
三、ES 2 Hive
3.1、读取es
reload(sys)
sys.setdefaultencoding("utf-8" )
spark = SparkSession.builder.appName("test") \
.config("spark.sql.crossJoin.enabled", "true") \
.config("spark.sql.autoBroadcastJoinThreshold", "100") \
.enableHiveSupport().getOrCreate()
spark.sql("SET hive.exec.dynamic.partition = true")
spark.sql("SET hive.exec.dynamic.partition.mode = nonstrict")
spark.sql("SET mapreduce.job.priority = HIGH")
ES_HOST = "chb1"
df01 = spark.read \
.format("org.elasticsearch.spark.sql") \
.option("es.nodes", ES_HOST) \
.option("es.port", "9200") \
.option("es.resource", "test") \
.option("es.query", """{ "query": { "match_all": {} } }""") \
.option("es.scroll.keepalive", "5m") \
.option("es.scroll.size", "5000") \
.option("es.net.http.auth.user", "user") \
.option("es.net.http.auth.pass", "passwd") \
.option('es.nodes.wan.only','true') \
.option("es.read.field.as.array.include", "arrfield1, arrfield2") \
.load()
3.2、问题
3.2.1、UnicodeEncodeError: 'ascii' codec can't encode characters in position 32-34: ordinal not in range(128)
解决方法1:
在开头加上
import sys
reload(sys)
sys.setdefaultencoding( "utf-8" )
3.2、读取array类型的es数据
.option("es.read.field.as.array.include", "arrfield1,arrfield2,arrfield3.col1")
关注我的个人公众号【宝哥大数据】,更多干货