在flume1.5.2中没有kafkasink,需要自定义KafkaSink
在fluem-1.6.0中提供了kafkasink
kafkaSink就是将Channel中的输出通过sink写到kafka,所有kafka相当与一个生产者的功能
1.1、官网的开发者文档Developer Guide
完整代码
package com.chb.test.flume.sink;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* 自定义KafkaSink: flume整合kafka
* @author 12285
*/
public class MyKafkaSink extends AbstractSink implements Configurable {
private KafkaProducer producer;
@Override
public void configure(Context context) {
Properties originalProps = new Properties();
originalProps.put("bootstrap.servers", "idc007123:9092,idc007124:9092,idc007128:9092");
originalProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
originalProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer(originalProps);
}
@Override
public Status process() throws EventDeliveryException {
Status status = null;
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
Event event = ch.take();
if(event == null) {
status = Status.BACKOFF;
}
byte[] byte_message = event.getBody();
//生产者
ProducerRecord record =new ProducerRecord("topic2", new String(byte_message));
producer.send(record);
txn.commit();
status = Status.READY;
} catch (Throwable t) {
txn.rollback();
status = Status.BACKOFF;
if (t instanceof Error) {
throw (Error)t;
}
}finally {
txn.close();
}
return status;
}
}
依赖配置文件pom.xml
4.0.0
com.chb.shbaobiao
TestDemo
0.0.1-SNAPSHOT
jar
TestDemo
http://maven.apache.org
UTF-8
org.apache.flume
flume-ng-core
1.5.2
org.apache.kafka
kafka_2.10
0.10.2.1
org.apache.flume
flume-ng-configuration
1.5.2
maven-assembly-plugin
jar-with-dependencies
make-assembly
package
single
org.apache.maven.plugins
maven-compiler-plugin
2.3.2
UTF-8
1.7
1.7
true