您当前的位置: 首页 >  kafka

宝哥大数据

暂无认证

  • 1浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

flume 自定义Sink之kafkaSink

宝哥大数据 发布时间:2018-03-08 00:05:48 ,浏览量:1

在flume1.5.2中没有kafkasink,需要自定义KafkaSink 在fluem-1.6.0中提供了kafkasink

kafkaSink就是将Channel中的输出通过sink写到kafka,所有kafka相当与一个生产者的功能 1.1、官网的开发者文档Developer Guide

这里写图片描述

问题 1、Cannot Append to Appender! Appender either closed or not setup correctly!

这里写图片描述

1.2、自定义KafkaSink

这里写图片描述

完整代码

package com.chb.test.flume.sink;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;

import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

/**
 * 自定义KafkaSink: flume整合kafka
 * @author 12285
 */
public class MyKafkaSink extends AbstractSink implements Configurable {
      private KafkaProducer producer;

      @Override
      public void configure(Context context) {
        Properties originalProps = new Properties();
        originalProps.put("bootstrap.servers", "idc007123:9092,idc007124:9092,idc007128:9092");
        originalProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        originalProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer(originalProps);
      }


      @Override
      public Status process() throws EventDeliveryException {
        Status status = null;
        Channel ch = getChannel();
        Transaction txn = ch.getTransaction();
        txn.begin();
        try {

          Event event = ch.take();
          if(event == null) {
              status = Status.BACKOFF;
          }
          byte[] byte_message = event.getBody();
          //生产者
          ProducerRecord record =new ProducerRecord("topic2", new String(byte_message)); 
          producer.send(record);

          txn.commit();
          status = Status.READY;
        } catch (Throwable t) {
          txn.rollback();
          status = Status.BACKOFF;
          if (t instanceof Error) {
            throw (Error)t;
          }
        }finally {
              txn.close();
        }
        return status;
      }
    }

依赖配置文件pom.xml


    4.0.0

    com.chb.shbaobiao
    TestDemo
    0.0.1-SNAPSHOT
    jar

    TestDemo
    http://maven.apache.org

    
        UTF-8
    

    
        
            org.apache.flume
            flume-ng-core
            1.5.2
        
        
            org.apache.kafka
            kafka_2.10
            0.10.2.1
        
        
            org.apache.flume
            flume-ng-configuration
            1.5.2
        

    


    
        
            
                maven-assembly-plugin
                
                    
                        jar-with-dependencies
                    
                    
                        
                            
                        
                    
                
                
                    
                        make-assembly
                        package
                        
                            single
                        
                    
                
            
            
            
                org.apache.maven.plugins
                maven-compiler-plugin
                2.3.2
                
                    UTF-8
                    1.7
                    1.7
                    true
                
            
        
    
关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0405s