###一、安装 flume,kafka, storm 的安装在下面三篇文章: flume:1.6.0 kafka:注意这里最好下载scala2.10版本的kafka,因为scala2.10版本的兼容性比较好和2.11版本差别太大
flume安装部署 kafka安装部署 storm安装部署
###二、各个部分调试 ####2.1、flume flume Source之SpoolDir flume 自定义Sink之kafkaSink
# 监听ftp日志
#agent的名字a1
a1.sources = src
a1.channels = chl
a1.sinks = sk
#定义source, source使用spooldir, 监控ftp日志
a1.sources.src.type=spooldir
#监控目录
a1.sources.src.spoolDir=/hadoop/ftp/idc_bakupload/20180307/23/idc/
#忽略的文件
a1.sources.src.ignorePattern = ^(.)*\\.AVL\\.(.)*$
#处理后的文件,添加后缀
a1.sources.src.fileSuffix = .bak
#定义channel, 使用memory 作为channel
a1.channels.chl.type = memory
a1.channels.chl.capacity = 100000
a1.channels.chl.transactionCapacity = 10000
#定义sink, 输出到控制台
a1.sinks.sk.type = com.chb.test.sink.MyKafkaSink
#定义sink, source 与channel的关系
#注意sink后面是channel, 而不是s
a1.sinks.sk.channel = chl
a1.sources.src.channels = chl
####2.2、kafka 2.2.1、kafka自身测试, 起一个生产者,一个消费者 2.2.2、启动消费这去消费flumesink的数据
##三、 Storm获取数据流程 ##3.1、首先来了解Strom-kafka Strom-kafka的官网介绍项目 注意:可能使用浏览器的问题, 导致在IE上只能看到部分,换成其他浏览器就好了。
介绍Storm核心Spout 和Trident spout的实现,用户消费从 Apache Kafka 0.8.x获取的数据。 ##3.1.1、Spouts We support both Trident and core Storm spouts.为了两种Spout实现,Strom使用一个BrokerHost interface 跟踪Kafka broker主机到分区映射和kafkaConfig控制一些Kafka相关的参数。 ##3.1.2 BrokerHosts 为了初始化您的Kafka spout/emitter,您需要创建一个标记BrokerHosts接口的实例。 目前,支持以下两种实现: ###ZkHosts 如果你想动态跟踪Kafka broker到分区映射(partition mapping), 你应该使用ZkHosts。 这个类使用Kafka的ZooKeeper entries 来跟踪brokerHost - >分区映射。 您可以通过调用下面方法实例化对象:
public ZkHosts(String brokerZkStr, String brokerZkPath)
public ZkHosts(String brokerZkStr)
其中:
- brokerZkStr只为ip:post(eg. localhost:2181),
- brokerZkPath: the root directory under which all the topics and partition information is stored, 默认为 /brokers 。
- 默认情况下,代理分区映射(borker-partition mapping)每60秒从ZooKeeper刷新。 如果要更改它,您应该将
host.refreshFrezqSecs
设置为您选择的值。 实现如:
ZkHosts zkHosts = new ZkHosts("192.168.57.4:2181,192.168.57.5:2181,192.168.57.6:2181");
###StaticHosts 这是一个可选的实现,其中broker - >分区信息是静态的。 为了构造这个类的实例,您需要首先构造一个GlobalPartitionInformation的实例。
Broker brokerForPartition0 = new Broker("localhost");//localhost:9092
Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly
Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string.
GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
partitionInfo.addPartition(0, brokerForPartition0);//mapping from partition 0 to brokerForPartition0
partitionInfo.addPartition(1, brokerForPartition1);//mapping from partition 1 to brokerForPartition1
partitionInfo.addPartition(2, brokerForPartition2);//mapping from partition 2 to brokerForPartition2
StaticHosts hosts = new StaticHosts(partitionInfo);
##KafkaConfig 为创建KafkaSpout所需的第二个对象是KafkaConfig 创建KafkaConfig
public KafkaConfig(BrokerHosts hosts, String topic)
public KafkaConfig(BrokerHosts hosts, String topic, String clientId)
BrokerHosts可以是如上所述的BrokerHosts接口的任何实现。 topic是Kafka topic的名称。 ClientId是可选的, 用作ZooKeeper路径的一部分,其中存储了spout的当前消耗偏移量。
###目前有2个KafkaConfig的扩展。 ###Spoutconfig Spoutconfig是KafkaConfig的扩展,它支持使用ZooKeeper连接信息的其他字段,并用于控制特定于KafkaSpout的行为。 Zkroot将用作root来存储消费的偏移量。 ID应该唯一标识您的spout。
public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id);
实现:
SpoutConfig spoutConfig
= new SpoutConfig(
zkHosts,
topic,
"/test", // 偏移量offset的根目录
"test");// ID应该唯一标识您的spout
除了这些参数,SpoutConfig包含以下字段控制KafkaSpout的行为:
spoutConfig.forceFromStart = false; // 不从头开始消费,保证spout出现故障, 重启之后,能够从kafka的原来位置处理, 而不是从开始位置处理,kafka的偏移量,周期性的写入zookeeper中,
// setting for how often to save the current Kafka offset to ZooKeeper
public long stateUpdateIntervalMs = 2000;
// Retry strategy for failed messages
public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName();
// Exponential back-off retry settings. These are used by ExponentialBackoffMsgRetryManager for retrying messages after a bolt
// calls OutputCollector.fail(). These come into effect only if ExponentialBackoffMsgRetryManager is being used.
// Initial delay between successive retries
public long retryInitialDelayMs = 0;
public double retryDelayMultiplier = 1.0;
// Maximum delay between successive retries
public long retryDelayMaxMs = 60 * 1000;
// Failed message will be retried infinitely if retryLimit is less than zero.
public int retryLimit = -1;
Core KafkaSpout only accepts an instance of SpoutConfig.
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
###TridentKafkaConfig TridentKafkaConfig is another extension of KafkaConfig. TridentKafkaEmitter only accepts TridentKafkaConfig. ###KafkaConfig类还有一堆公共变量,用于控制应用程序的行为。 这里是默认值:
public int fetchSizeBytes = 1024 * 1024;
public int socketTimeoutMs = 10000;
public int fetchMaxWait = 10000;
public int bufferSizeBytes = 1024 * 1024;
public MultiScheme scheme = new RawMultiScheme();
public boolean ignoreZkOffsets = false;
public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
public long maxOffsetBehind = Long.MAX_VALUE;
public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
public int metricsTimeBucketSizeInSecs = 60;
Most of them are self explanatory except MultiScheme. ##MultiScheme MultiScheme是一个接口,指示如何将从Kafka中消耗的ByteBuffer转换为成Storm中的tuple。 它还控制输出字段的命名。
public Iterable deserialize(ByteBuffer ser);
public Fields getOutputFields();
默认的RawMultiScheme只接受ByteBuffer,并返回一个带有ByteBuffer的tuple,ByteBuffer转换为byte []
。 outputField的名称为“bytes”。 还有一些可选实现,如SchemeAsMultiScheme和KeyValueSchemeAsMultiScheme,它们可以将ByteBuffer转换为String。 //从Kafka中取出的byte[],该如何反序列化 如在整合项目中实现:使用SchemeAsMultiScheme
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); // 定义输出为String
还有SchemeAsMultiScheme,MessageMetadataSchemeAsMultiScheme的扩展,它具有一个附加的反序列化方法,除了与消息关联的分区和偏移之外,还接受消息ByteBuffer。
public Iterable deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset)
这对于从Kafka topic上的任意处 auditing/replaying 消息非常有用,可以保存离散流的每个消息的分区和偏移量,而不是保留整个消息。
###四、KafakSpout的具体实现
TopologyBuilder builder = new TopologyBuilder();
// config kafka spout
String topic = "testflume";
//第一步创建Zkhosts
ZkHosts zkHosts = new ZkHosts("192.168.57.4:2181,192.168.57.5:2181,192.168.57.6:2181");
//第二步创建SpoutConfig, 为了设置各种参数
SpoutConfig spoutConfig = new SpoutConfig(zkHosts,
topic, //kafka的topic名称
"/test", // 偏移量offset的根目录
"test"); // kafka的唯一表示。
//设置zkserver的信息, 可选的, 应为在上面的ZkHosts中已经设置了zookeeper的主机和端口号。
List zkServers = new ArrayList();
System.out.println(zkHosts.brokerZkStr);
for (String host : zkHosts.brokerZkStr.split(",")) {
zkServers.add(host.split(":")[0]);
}
spoutConfig.zkServers = zkServers;
spoutConfig.zkPort = 2181;
//设置kafka的消费模式, 是否从头开始。
spoutConfig.forceFromStart = false; // 不从头开始消费
spoutConfig.socketTimeoutMs = 60 * 1000; //与Kafka broker的连接的socket超时时间
//从Kafka中取出的byte[],该如何反序列化
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); // 定义输出为String
//KafkaSpout之接收一个参数SpoutConfig.
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
// set kafka spout
builder.setSpout("kafka_spout", kafkaSpout, 3);