目录
介绍
多少数据可以大?
什么是大数据?
大数据面临两个重要挑战:
处理类型
基本概念和相关概念
No SQL
大数据工具
数据流
什么是Hadoop?
什么是Hadoop生态系统?
Hadoop如何工作?
Windows上的Tensorflow - Python - CPU
下载Anaconda Python 3.6
安装Tensorflow
什么是深度学习?
Hadoop逐步安装和实现
参考
如今我们遇到了数据量不断增长的现象。我们希望长期保存所有这些数据,并对其进行高速处理和分析,这是为了建立一个很好的解决方案。对于数据科学家来说,找到一种适合于存储任何类型结构化和非结构化数据的传统数据库的替代品是最大的挑战。
我从第一步到结果都选择了一个完整的场景,这很难在整个互联网上找到。我选择了MapReduce,它是由Hadoop和Scala在Intellij Idea中处理数据,而这一切都将在Ubuntu Linux下作为操作系统进行。
多少数据可以大?大数据是大量数据,这些数据是结构化的,非结构化的或半结构化的,并且难以使用传统数据库存储和管理。它的容量从太字节到千兆不等。诸如SQL Server之类的关系数据库不适合存储非结构化数据。如下图所示,大数据是针对所有类型的结构化或非结构化数据设置的,这些数据具有存储、管理、共享、分析等基本要求。
大数据需要将大量数据导入和流式传输到存储箱中以便处理和分析数据。
大数据面临两个重要挑战:1.收集大量数据。(例如导入,传输和加载数据)2.分析数据。(如处理,排序,计数,聚合数据)
要执行上述步骤,我们需要建立消息系统。消息系统是将数据从一个应用程序传输到另一个应用程序的解决方案。有两种类型的消息传递系统,点对点和发布订阅者。在这一点上,发送消息只能由一个消费者使用,而在pub-sub消息系统中,消费者可以使用来自发布者的多个主题。
连续处理
在连续处理中,只有一个任务要处理。这是一个与atch处理的对比。
异步处理
在异步处理中,每个任务将在一个单独的线程中处理。它可以同时执行多个任务。同步和异步之间的主要区别在于多个任务的并行过程。假设有两种吃法,一种是站在街上的烤肉串队列中(同步过程),另一种是坐在餐厅(异步过程)。
在烤肉串中,我们必须等待,直到每个人在餐厅吃自己的三明治,女服务员记录每个人的顺序,并随机根据空的烹饪场所准备食物。
批处理——离线
在批处理中,首先在磁盘上存储数据,可以包含一百万条记录,然后对所有这些大数据进行处理,分析。通过MapReduce的Hadoop是一个很好的例子。无需在批处理中实时了解实例和即时结果。它适用于离线模式下的大数据分析。
流处理——实时
在流处理中,首先将数据注入Spark或Storm等分析工具中,然后立即分析数据。此解决方案适用于处理需要实例回复的最新记录,例如身份验证或授权系统。由于在内存而不是磁盘中进行处理,因此该处理很快。由于分析记录和内存处理较少,因此延迟较高。
基本概念和相关概念什么是数据管道?
数据管道是一系列相关和连接的元素,它们的每个部分,处理数据使其可以重用于下一个元素,并且还产生具有所需形状和格式的灵活输出。为了查看实际示例,请查看:[借助管道读取CSV文件]
延迟:
从一个点到另一个点遍历数据包的持续时间。低延迟显示出高网络效率。
吞吐量:
从开始发送到过程结束完成的特定时间内所有操作的总数。
可靠性:
可靠的系统保证所有数据都能正确处理而不会出现故障,并且在崩溃的情况下会重新执行。
容错性:
容错系统保证所有数据都能正常保存而不会出现故障,这意味着当集群崩溃时,特定任务将被引导到另一个机器上。
耐用性:
完成该过程后,将向客户发送成功的消息。虽然它牺牲了性能但是值得。
可扩展性:
可伸缩性将主要在分布式系统和并行编程上定义。它可以保证流量和数据增长的速度。
性能:
即使在遇到大数据的情况下,性能 系统也能保证吞吐量。
No SQLMongoDB
Mongo是用C ++编写的。它是 面向文档的。 它适用于具有许多查询的数据处理。
Redis
Redis是具有缓存和消息代理的开源和内存数据结构存储。
大数据工具Hadoop/ HBase
它是开源的,非关系型数据库和分布式的。Apache HBase位于Hadoop之上。
Cassandra
它具有良好的可扩展性、容错性和较低的延迟,同时具有完美的缓存。
数据流Flink
它是批量处理或实时处理。Flink有用于流式传输和sql-query的API。
Storm
Storm是实时系统,具有高性能和可扩展性。它每个节点每秒可处理超过百万条记录。
Kinesis
Kinesis具有实时处理功能。
Kafka
Kafka是一种发布——订阅消息传递系统和实时处理,具有良好的性能、可靠性、可扩展性和耐用性。
什么是Hadoop?实际上,我们期望来自所有数据库的两个问题。首先,我们需要存储数据,其次我们希望以快速准确的方式正确处理存储的数据。由于任意形状和大量数据,不可能将它们存储在传统数据库中。我们需要考虑一个可以处理大数据存储或处理的新方法。
Hadoop是一个革命性的大数据数据库,它有能力保存任何形状的数据并处理它们的节点集群。它还大大降低了数据维护的成本。借助hadoop,我们可以存储任何类型的数据,例如长时间内的所有用户的点击。所以它很容易进行历史分析。Hadoop具有分布式存储以及分布式流程系统,例如Map Reduce。
正如我上面提到的,hadoop适用于存储非结构化数据库或处理它们。hadoop生态系统有抽象的定义。保存数据位于左侧,具有两种不同的存储可能性,如HDFS和HBase。HBase是HDFS的顶级,两者都是由java编写的。
HDFS:
1. Hadoop分布式文件系统,其允许在分布式平面文件中存储大数据。
2. HDFS适用于顺序访问数据。
3.没有对数据的随机实时读/写访问。它更适合离线批处理。
HBase:
1.以柱状方式存储数据I键/值对。
2. HBase有可能实时读/写。
Hadoop是用java编写的,但你也可以用R,Python,Ruby实现。
Spark是开源集群计算,它在Scala中实现,适用于支持分布式计算的作业迭代。Spark具有很高的性能。
Hive,pig,Sqoop和mahout是数据访问,其可以对数据库进行查询。Hive和pig是类似SQL的; mahout用于机器学习。
MapReduce和Yarn都在处理数据。MapReduce用于处理分布式系统中的数据。MapReduce可以处理所有任务,例如作业和任务跟踪器,监视和执行并行。Yarn,另一个资源谈判者是MapReduce 2.0,它具有更好的资源管理和调度。
Sqoop是从/向外部数据库导入和导出数据的连接器。它以并行方式轻松快速地传输数据。
Hadoop如何工作?Hadoop遵循主/从架构模型。对于HDFS,主站中的名称节点监视并跟踪所有从属群集的存储群集。
有两种不同类型的作业可以完成MapReduce处理的所有工作。映射发送查询的作业,用于处理集群中的各种节点。这项工作将细分为较小的任务。然后Reduce job收集每个节点生成的所有输出,并将它们组合成一个单独的值作为最终结果。
这种架构使Hadoop成为一种非常快速和可靠的廉价解决方案。将大工作划分为更小并将每个任务放在不同的节点中。这个故事可能会提醒您多线程过程。在多线程中,所有并发进程都是借助锁和信号量共享的,但MapReduce中的数据访问是在HDFS的控制下进行的。
实际例子
下图中有三个文本文件用于练习单词个数。正如我在本文顶部所解释的,MapReduce开始将每个文件拆分为节点集群。在映射阶段,每个节点负责计算单词。在中间分裂的每个节点只是同源单词和前一节点中每个数量的特定单词。然后在reducig阶段,每个节点将总结并收集自己的结果以产生单个值。
如果您希望使用舒适和轻松的IDE和专业编辑器,而无需安装库。你可以使用 Anaconda & Spider。
然后从star打开Anaconda Navigator并选择和启动“Spider”:
有一些要点:
1.Python是面向对象的
2.动态类型
3.Rich库
4.简单阅读
5.Python区分大小写
6.Indent对Python很重要
安装Tensorflow1.从“Start Menue”打开Anaconda Naviator - >从“左侧面板”中选择“Environments” - >转到“root” - >选择“All Channels” - >搜索“tensor”
2.选择“tensorflow”,但如果您认为需要使用R进行统计计算或使用GPU进行快速结果,请选择“r-tensorflow”和“tensorflow-gpu”。
3.然后按绿色“Apply”。
4. 在下一个窗口中,他们再次接受其余的依赖包。
实际上,深度学习是机器学习的一个分支。机器学习包括一些不同类型的算法,这些算法可以获得数千个数据,并尝试从中学习,以便预测未来的新事件。但深度学习将神经网络应用为扩展或变体形状。深度学习能够处理数百万个数据点。
深度学习的最基本的基础设施可能是; 它能够选择最好的功能。实际上,深度学习总结了数据并基于压缩数据计算结果。这是人工智能所需要的,特别是当我们拥有大量数据库并进行大量计算时。
深度学习具有源自神经网络的连续层。这些层具有非线性功能,具有特征选择的任务。每一层都有一个输出,用作下一层的输入。深度学习应用程序是计算机视觉(例如面部或对象识别),语音识别,自然语言处理(NLP)和网络威胁检测。
Hadoop逐步安装和实现1.下载并安装Java
转到[这里] 下载此版本 jdk1.8.0_144
选择驱动器(C:\ Java)作为安装路径。
2.下载并安装Hadoop
从[这里]下载hadoop 并放在驱动器(D:\)上,你应该像下面的图片一样。
首先制作新文件夹,如果没有,请将其命名为“data”。
Format
以管理员身份运行“Windows命令提示符”
D:\hadoop\bin>hadoop-data-dfs - remove all
D:\hadoop\bin>hadoop namenode -format
Start
1. D:\hadoop\sbin>start-dfs.cmd
(等一分钟)
2. D:\hadoop\sbin>yarn-dfs.cmd
你会看到四个窗口
1、yarn资源经理
2、yarn节点管理员
3、namenode
4、datanode
所以,如果你看到那4个窗户,就意味着一切都是正确的。调整环境
测试并运行简单和著名的python代码作为wordcount:
在D:\hdp中创建新文件夹,然后在其上创建并保存python和文本文件。
wordcount-mapper.py
import sys
for line in sys.stdin: # Input is read from STDIN and the output of this file is written into STDOUT
line = line.strip() # remove leading and trailing whitespace
words = line.split() # split the line into words
for word in words:
print( '%s\t%s' % (word, 1)) #Print all words (key) individually with the value 1
wordcount-reducer.py
from operator import itemgetter
import sys
current_word = None
current_count = 0
word = None
for line in sys.stdin: # input comes from STDIN
line = line.strip() # remove leading and trailing whitespace
word, count = line.split('\t', 1) # parse the input we got from mapper.py by a tab (space)
try:
count = int(count) # convert count from string to int
except ValueError:
continue #If the count is not a number then discard the line by doing nothing
if current_word == word: #comparing the current word with the previous word (since they are ordered by key (word))
current_count += count
else:
if current_word:
# write result to STDOUT
print( '%s\t%s' % (current_word, current_count))
current_count = count
current_word = word
if current_word == word: # do not forget to output the last word if needed!
print( '%s\t%s' % (current_word, current_count))
将文本文档创建为“mahsa.txt”
HelloHello Hello Hello Hello Good Good
让我们在hadoop上运行它
D:\hadoop\sbin>hadoop fs -mkdir -p /hdp
D:\hadoop\sbin>hadoop fs -copyFromLocal D:\hdp\wordcount-mapper.py /hdp
D:\hadoop\sbin>hadoop fs -copyFromLocal D:\hdp\wordcount-reducer.py /hdp
D:\hadoop\sbin>hadoop fs -copyFromLocal D:\hdp\mahsa.txt /hdp
D:\hadoop\sbin>hadoop fs -ls /hdp
D:\hadoop\sbin>D:\hadoop\bin\hadoop jar D:\hadoop\share\hadoop\tools\lib\hadoop-streaming-2.3.0.jar -file /hdp/wordcount-mapper.py -mapper "python wordcount-mapper.py" -file /hdp/wordcount-reducer.py -reducer "python wordcount-reducer.py" -input /hdp/mahsa.txt -output /outputpython
使用tensorflow python代码测试并运行简单的“Hello”:
在D:\hdp上创建代码
# -*- coding: utf-8 -*- """ Created on Sun Apr 1 15:42:59 2018 @author: Mahsa """ import tensorflow as tf
hello = tf.constant('Hello, TensorFlow!') sess = tf.Session() print(sess.run(hello))
D:\hadoop\sbin>hadoop fs -copyFromLocal D:\hdp\tensortest.py /hdp
D:\hadoop\sbin>D:\hadoop\bin\hadoop jar D:\hadoop\share\hadoop\tools\lib\hadoop-streaming-2.3.0.jar -D mapreduce.job.reduce=0 -file /hdp/tensortest.py -mapper "python tensortest.py" -input /hdp/mahsa.txt -output /outputtensortest
D:\hadoop\sbin>hadoop fs -ls /outputtensortest
D:\hadoop\sbin>hadoop fs -cat /outputtensortest/part-00000
使用tensorflow python代码测试并运行简单的“数字识别”:
在D:\hdp上创建代码
# -*- coding: utf-8 -*-
"""
Created on Sun Apr 1 15:42:59 2018
@author: Mahsa
"""
from tensorflow.examples.tutorials.mnist import input_data
# Downloading MNIS dataset
mnist_train = input_data.read_data_sets("data/", one_hot=True)
import tensorflow as tf
batch = 100
learning_rate = 0.01
training_epochs = 10
# matrix
x = tf.placeholder(tf.float32, shape=[None, 784])
yt = tf.placeholder(tf.float32, shape=[None, 10])
# Weight
Weight = tf.Variable(tf.zeros([784, 10]))
bias = tf.Variable(tf.zeros([10]))
# model
y = tf.nn.softmax(tf.matmul(x,Weight) + bias)
# entropy
cross_ent = tf.reduce_mean(-tf.reduce_sum(yt * tf.log(y), reduction_indices=[1]))
# Prediction
correct_pred = tf.equal(tf.argmax(y,1), tf.argmax(yt,1))
accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))
# Gradient Descent
train_optimizer = tf.train.GradientDescentOptimizer(learning_rate).minimize(cross_ent)
with tf.Session() as sess:
sess.run(tf.initialize_all_variables())
# Batch Processing
for epoch in range(training_epochs):
batch_num = int(mnist_train.train.num_examples / batch)
for i in range(batch_num):
batch_x, batch_y = mnist_train.train.next_batch(batch)
sess.run([train_optimizer], result={x: batch_x, yt: batch_y})
if epoch % 2 == 0:
print( "Epoch: ", epoch)
print ("Accuracy: ", accuracy.eval(result={x: mnist_train.test.images, yt: mnist_train.test.labels}))
print( "Complete")
D:\hadoop\sbin>D:\hadoop\bin\hadoop jar D:\hadoop\share\hadoop\tools\lib\hadoop-streaming-2.3.0.jar -D mapreduce.job.reduce=0 -file /hdp/tensordigit.py -mapper "python tensordigit.py" -input /hdp/mahsa.txt -output /outputtensordigitt
D:\hadoop\sbin>hadoop fs -ls /outputtensordigittt
参考
1. https://databricks.com/blog/2016/01/25/deep-learning-with-apache-spark-and-tensorflow.html
2. http://highscalability.com/blog/2012/9/11/how-big-is-a-petabyte-exabyte-zettabyte-or-a-yottabyte.html
3. http://bigdata-madesimple.com/a-deep-dive-into-nosql-a-complete-list-of-nosql-databases/
4. https://northconcepts.com/docs/what-is-data-pipeline/
5. http://dataaspirant.com/2017/05/03/handwritten-digits-recognition-tensorflow-python/
6. http://www.algoworks.com/blog/real-time-data-streaming-tools-and-technologies/
原文地址:https://www.codeproject.com/Articles/1239650/Big-Data-2