目录
1. 安装Hadoop
- 1. 安装Hadoop
- 1.1 下载解压
- 1.2 配置环境变量
- 1.3 修改配置文件
- 1.3.1 修改hadoop-3.3.1\etc\hadoop\hadoop-env.cmd
- 1.3.2 修改hadoop-3.3.1\etc\hadoop\core-site.xml
- 1.3.3 修改hadoop-3.3.1\etc\hadoop\mapred-site.xml
- 1.3.4 修改hadoop-3.3.1\etc\hadoop\hdfs-site.xml
- 1.3.5 修改hadoop-3.3.1\etc\hadoop\yarn-site.xml
- 1.4 格式化HDFS
- 1.5 启动
- 1.6 查看web界面
- 2. 安装spark
- 2.1 下载解压
- 2.2 配置环境变量
- 2.3 修改spark-3.1.2-bin-hadoop3.2\bin\spark-submit2.cmd
- 2.4 启动spark-shell
- 3. 安装Pyspark客户端
- 3.1 IDEA测试spark代码
- 3.2 pyspark连接mysql
- 3.3 spark-submit提交pyspark代码
安装要求:安装JAVA8
1.1 下载解压从hadoop下载地址进行下载,然后进行解压。windows解压tar.gz失败,我们可以上传到linux上,解压再打包成zip格式,再到windows上解压zip文件
从hadoop winutils下载hadoop winutils相关文件并解压,目前没有找到3.3.1版本的,用3.2.1版本也可以
将hadoop-3.2.1\bin目录下的所有文件复制到hadoop-3.3.1\bin目录下
1.2 配置环境变量添加系统变量如下:
HADOOP_HOME=D:\install_software\hadoop3.3.1\hadoop-3.3.1
在path中新增内容如下
%HADOOP_HOME%\bin
%HADOOP_HOME%\sbin
1.3 修改配置文件
1.3.1 修改hadoop-3.3.1\etc\hadoop\hadoop-env.cmd
修改内容如下:
set JAVA_HOME=D:\install_software\java8
1.3.2 修改hadoop-3.3.1\etc\hadoop\core-site.xml
新增hadoop-3.3.1\tmp目录
新增内容如下:
fs.default.name
hdfs://localhost:9000
hadoop.tmp.dir
/D:/install_software/hadoop3.3.1/hadoop-3.3.1/tmp
1.3.3 修改hadoop-3.3.1\etc\hadoop\mapred-site.xml
新增内容如下:
mapreduce.framework.name
yarn
1.3.4 修改hadoop-3.3.1\etc\hadoop\hdfs-site.xml
新增hadoop-3.3.1\namenode、hadoop-3.3.1\datanode、hadoop-3.3.1\checkpoint、hadoop-3.3.1\checkpoint-edits目录
新增内容如下:
dfs.replication
1
dfs.permissions
false
dfs.namenode.name.dir
/D:/install_software/hadoop3.3.1/hadoop-3.3.1/namenode
dfs.datanode.data.dir
/D:/install_software/hadoop3.3.1/hadoop-3.3.1/datanode
fs.checkpoint.dir
/D:/install_software/hadoop3.3.1/hadoop-3.3.1/checkpoint
fs.checkpoint.edits.dir
/D:/install_software/hadoop3.3.1/hadoop-3.3.1/checkpoint-edits
1.3.5 修改hadoop-3.3.1\etc\hadoop\yarn-site.xml
新增内容如下:
yarn.nodemanager.aux-services
mapreduce_shuffle
yarn.nodemanager.auxservices.mapreduce.shuffle.class
org.apache.hadoop.mapred.ShuffleHandler
1.4 格式化HDFS
D:\install_software\hadoop3.3.1\hadoop-3.3.1\bin>
D:\install_software\hadoop3.3.1\hadoop-3.3.1\bin>.\hdfs namenode -format
D:\install_software\hadoop3.3.1\hadoop-3.3.1\bin>
1.5 启动
D:\install_software\hadoop3.3.1\hadoop-3.3.1\sbin>
D:\install_software\hadoop3.3.1\hadoop-3.3.1\sbin>.\start-dfs.cmd
D:\install_software\hadoop3.3.1\hadoop-3.3.1\sbin>
D:\install_software\hadoop3.3.1\hadoop-3.3.1\sbin>.\start-yarn.cmd
D:\install_software\hadoop3.3.1\hadoop-3.3.1\sbin>
1.6 查看web界面
通过http://localhost:9870查看HDFS的界面,通过http://localhost:8088查看Yarn的界面,可以看到master和slave都已经启动成功
2. 安装spark安装要求:安装scala2.12.10
2.1 下载解压从spark下载地址进行下载,并解压
2.2 配置环境变量新增系统变量如下:
SPARK_HOME=D:\install_software\spark3.1.2\spark-3.1.2-bin-hadoop3.2
PYSPARK_PYTHON=D:\install_software\python\python
path新增变量如下:
%SPARK_HOME%\bin
2.3 修改spark-3.1.2-bin-hadoop3.2\bin\spark-submit2.cmd
在【“%~dp0spark-class2.cmd” %CLASS% %*】之前添加如下内容:
set JAVA_HOME=D:\install_software\java8
2.4 启动spark-shell
执行spark-shell出现以下内容,就表示安装成功了
C:\Users\dell>
C:\Users\dell>spark-shell
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://DESKTOP-FBDPRSA:4040
Spark context available as 'sc' (master = local[*], app id = local-1635396107473).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.1.2
/_/
Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_201)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
3. 安装Pyspark客户端
[root@bigdata001 ~]#
[root@bigdata001 ~]# pip3 install pyspark==3.1.2 -i https://pypi.tuna.tsinghua.edu.cn/simple
[root@bigdata001 ~]#
3.1 IDEA测试spark代码
from pyspark.sql import SparkSession,Row
from pyspark import SparkConf
from datetime import datetime, date
if __name__ == '__main__':
conf = SparkConf() \
.setMaster("local") \
.setAppName("pyspark_test")
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark.sparkContext.setLogLevel("WARN")
df = spark.createDataFrame([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df.show()
spark.stop()
执行结果如下:
+---+---+-------+----------+-------------------+
| a| b| c| d| e|
+---+---+-------+----------+-------------------+
| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
| 2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
| 4|5.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+
3.2 pyspark连接mysql
方式一:
df = spark.read.jdbc(url="jdbc:mysql://192.168.8.115:3306/test_db",
table = "test_tb",
column="id",
lowerBound=1,
upperBound=1000,
numPartitions=15,
properties = {"user": "root", "password": "Root_123"}
)
df.show()
- 需将mysql-connector-java-8.0.25.jar放到JAVA_HOME\jre\lib\ext目录下
- 该方式不能对表数据进行过滤,是对表的全部数据进行分区
方式二:
df = spark.read.jdbc(url="jdbc:mysql://192.168.8.115:3306/test_db",
table = "test_tb",
predicates=["update_time between '2021-10-27 00:00:00' and '2021-10-27 23:59:59'",
"update_time between '2021-10-28 00:00:00' and '2021-10-28 23:59:59'"],
properties = {"user": "root", "password": "Root_123"}
)
df.show()
- 该方式按predicates对表数据进行过滤,同时进行分区读取
[root@bigdata001 opt]#
[root@bigdata001 opt]# pushd /opt/work/data-quality && zip -rq ../data-quality.zip . && popd && spark-submit --master spark://192.168.8.111:7077,192.168.8.112:7077,192.168.8.113:7077 --driver-memory 2g --executor-memory 2g --total-executor-cores 3 --executor-cores 1 --py-files /opt/work/data-quality.zip /opt/work/data-quality/ruleDeal/ruleDeal.py
[root@bigdata001 opt]#