您当前的位置: 首页 >  kafka

衣舞晨风

暂无认证

  • 2浏览

    0关注

    1156博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

jstorm kafka插件使用案例

衣舞晨风 发布时间:2017-04-29 12:06:10 ,浏览量:2

本文用的是jstorm 2.2.1 一、pom引用



    4.0.0

    jiankunking
    kafkajstorm
    1.0-SNAPSHOT

    http://blog.csdn.net/jiankunking

    
        UTF-8
    

    
        
            junit
            junit
            3.8.1
            
        
        
        
            com.alibaba.jstorm
            jstorm-core
            2.2.1
            
            
                
                    org.slf4j
                    slf4j-log4j12
                
                
                    log4j
                    log4j
                
            
        
        
            org.apache.kafka
            kafka_2.9.2
            0.8.1
            
                
                    org.apache.zookeeper
                    zookeeper
                
                
                    log4j
                    log4j
                
                
                    junit
                    junit
                
            
        


        
            org.apache.zookeeper
            zookeeper
            3.4.5
            
                
                    log4j
                    log4j
                
                
                    org.slf4j
                    slf4j-log4j12
                
            
        

        
            org.apache.curator
            curator-framework
            2.5.0
            
                
                    log4j
                    log4j
                
                
                    org.slf4j
                    slf4j-log4j12
                
            
        
        
            com.googlecode.json-simple
            json-simple
            1.1
        
        


        
            org.apache.httpcomponents
            httpclient
            4.3.3
        


        
        
            org.slf4j
            slf4j-api
            1.7.5
        
        
            org.slf4j
            log4j-over-slf4j
            1.7.10
        
        
            ch.qos.logback
            logback-classic
            1.0.13
        
        
    


    
        
            
                maven-assembly-plugin
                
                    
                        jar-with-dependencies
                    
                
                
                    
                        make-assembly
                        package
                        
                            single
                        
                    
                
            
            
                org.apache.maven.plugins
                maven-compiler-plugin
                
                    1.6
                    1.6
                
            
        
    

二、自定义bolt

package jiankunking.kafkajstorm.bolts;


import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.TupleImplExt;
import jiankunking.kafkajstorm.util.ByteUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.UnsupportedEncodingException;




/**
 * Created by jiankunking on 2017/4/19 16:47.
 */
public class CustomBolt extends BaseBasicBolt {

    protected final Logger logger = LoggerFactory.getLogger(CustomBolt.class);

    public void execute(Tuple input, BasicOutputCollector collector) {
        try {
            String ss=ByteUtil.getStringFromByteArray((byte[]) ((TupleImplExt) input).get("bytes"));
            System.out.println(ss);
            logger.info(ss);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        System.out.println("declareOutputFields");
    }
}

三、自定义拓扑图入口类

package jiankunking.kafkajstorm.topologies;


import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import com.alibaba.jstorm.client.ConfigExtension;
import jiankunking.kafkajstorm.bolts.CustomBolt;
import jiankunking.kafkajstorm.kafka.KafkaSpout;
import jiankunking.kafkajstorm.kafka.KafkaSpoutConfig;
import jiankunking.kafkajstorm.util.PropertiesUtil;

import java.util.Map;

/**
 * Created by jiankunking on 2017/4/19 16:27.
 * 拓扑图 入口类
 */
public class CustomCounterTopology {

    /**
     * 入口类,即提交任务的类
     *
     * @throws InterruptedException
     * @throws AlreadyAliveException
     * @throws InvalidTopologyException
     */
    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
        System.out.println("11111");
        PropertiesUtil propertiesUtil = new PropertiesUtil("/application.properties", false);
        Map propsMap = propertiesUtil.getAllProperty();
        KafkaSpoutConfig spoutConfig = new KafkaSpoutConfig(propertiesUtil.getProps());
        spoutConfig.configure(propsMap);
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafkaSpout", new KafkaSpout(spoutConfig));
        builder.setBolt("customBolt", new CustomBolt(), 1).shuffleGrouping("kafkaSpout");
        //Configuration
        Config conf = new Config();
        conf.setDebug(false);
        //指定使用logback.xml
        //需要把logback.xml文件放到jstorm conf目录下
        ConfigExtension.setUserDefinedLogbackConf(conf, "%JSTORM_HOME%/conf/logback.xml");
        if (args != null && args.length > 0) {
            //提交到集群运行
            StormSubmitter.submitTopologyWithProgressBar("customCounterTopology", conf, builder.createTopology());
        } else {
            conf.setMaxTaskParallelism(3);
            //本地模式运行
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("CustomCounterTopology", conf, builder.createTopology());
        }
    }
}

四、配置文件application.properties


# kafka
# kafka 消费组
kafka.client.id=kafkaspoutid
kafka.broker.partitions=4
kafka.fetch.from.beginning=false
kafka.topic=test_one
kafka.broker.hosts=10.10.10.10:9092
kafka.zookeeper.hosts=10.10.10.10:2181

storm.zookeeper.root=/kafka

小注: 1、jstorm kafka插件源码集成 需要到jstorm的github官网:https://github.com/alibaba/jstorm/releases中找到你需要使用的release版本,下载源码,将其中的插件源码集成到你自己的项目中,插件源码位置如下图: 这里写图片描述

2、logback的使用 jstorm 2.1.1之后,jstorm默认使用了logback作为日志框架,logback在一般使用时是兼容log4j的,也就是说log4j可以直接桥接到logback,具体为:

a. 添加slf4j-api, log4j-over-slf4j和logback依赖(其实加了logback依赖之后就不需要加slf4j-api依赖了),具体:


    org.slf4j
    slf4j-api
    1.7.5


    org.slf4j
    log4j-over-slf4j
    1.7.10


    ch.qos.logback
    logback-classic
    1.0.13

b. 排除pom中所有的slf4j-log4j12的依赖,因为slf4j-log4j12跟log4j-over-slf4j是冲突的:


    org.slf4j
    slf4j-log4j12
    1.7.5
    provided

这里版本一般是1.7.5,但是还要具体看你的应用pom仲裁出的版本。

理论上,这样就能够把log4j桥接到slf4j。

demo下载地址:

http://download.csdn.net/detail/xunzaosiyecao/9829079

https://github.com/JianKunKing/jstorm-kafka-plugin-demo

作者:jiankunking 出处:http://blog.csdn.net/jiankunking

关注
打赏
1647422595
查看更多评论
立即登录/注册

微信扫码登录

0.0406s