您当前的位置: 首页 >  kafka

衣舞晨风

暂无认证

  • 1浏览

    0关注

    1156博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

storm kafka插件使用案例

衣舞晨风 发布时间:2017-04-29 11:52:10 ,浏览量:1

一、pom引用


    4.0.0

    jiankunking
    kafkastorm
    1.0-SNAPSHOT
    
        UTF-8
    
    http://blog.csdn.net/jiankunking
    
        
            junit
            junit
            3.8.1
            
        
        
            org.apache.storm
            storm-core
            1.1.0
            
            
        
        
            org.apache.kafka
            kafka_2.11
            0.10.1.1
            
                
                    org.apache.zookeeper
                    zookeeper
                
                
                    log4j
                    log4j
                
                
                    org.slf4j
                    slf4j-log4j12
                
            
        
        
            org.apache.storm
            storm-kafka
            1.1.0
        
        
            org.apache.httpcomponents
            httpclient
            4.3.3
        

    

    
        
            
                maven-assembly-plugin
                
                    
                        jar-with-dependencies
                    
                
                
                    
                        make-assembly
                        package
                        
                            single
                        
                    
                
            
            
                org.apache.maven.plugins
                maven-compiler-plugin
                
                    1.6
                    1.6
                
            
        
    

二、自定义bolt
package com.jiankunking.stormkafka.bolts;

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;

/**
 * Created by jiankunking on 2017/4/29 11:15.
 */
public class CustomBolt extends BaseBasicBolt {

    public void execute(Tuple input, BasicOutputCollector collector) {
        String sentence = input.getString(0);
        System.out.println(sentence);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        System.out.println("declareOutputFields");
    }
}
三、自定义Scheme
package com.jiankunking.stormkafka.schemes;


import org.apache.storm.spout.Scheme;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.List;

/**
 * Created by jiankunking on 2017/4/22 10:52.
 */
public class MessageScheme implements Scheme {

    private static final Logger LOGGER;

    static {
        LOGGER = LoggerFactory.getLogger(MessageScheme.class);
    }

    public List deserialize(ByteBuffer byteBuffer) {
        String msg = this.getString(byteBuffer);
        return new Values(msg);
    }

    public Fields getOutputFields() {
        return new Fields("msg");
    }

    private String getString(ByteBuffer buffer) {
        Charset charset = null;
        CharsetDecoder decoder = null;
        CharBuffer charBuffer = null;
        try {
            charset = Charset.forName("UTF-8");
            decoder = charset.newDecoder();
            //用这个的话,只能输出来一次结果,第二次显示为空
            // charBuffer = decoder.decode(buffer);
            charBuffer = decoder.decode(buffer.asReadOnlyBuffer());
            return charBuffer.toString();
        } catch (Exception ex) {
            LOGGER.error("Cannot parse the provided message!" + ex.toString());
            return "error";
        }
    }
}
四、自定义拓扑图入口类
package com.jiankunking.stormkafka.topologies;


import com.jiankunking.stormkafka.bolts.CustomBolt;
import com.jiankunking.stormkafka.schemes.MessageScheme;
import com.jiankunking.stormkafka.util.PropertiesUtil;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.TopologyBuilder;

import java.util.Arrays;
import java.util.Map;

/**
 * Created by jiankunking on 2017/4/19 16:27.
 */
public class CustomCounterTopology {

    /**
     * 入口类,即提交任务的类
     *
     * @throws InterruptedException
     * @throws AlreadyAliveException
     * @throws InvalidTopologyException
     */
    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
        System.out.println("11111");
        PropertiesUtil propertiesUtil = new PropertiesUtil("/application.properties", false);
        Map propsMap = propertiesUtil.getAllProperty();
        String zks = propsMap.get("zk_hosts").toString();
        String topic = propsMap.get("kafka.topic").toString();
        String zkRoot = propsMap.get("zk_root").toString();
        String zkPort = propsMap.get("zk_port").toString();
        String zkId = propsMap.get("zk_id").toString();
        BrokerHosts brokerHosts = new ZkHosts(zks);
        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, zkRoot, zkId);
        spoutConfig.zkServers = Arrays.asList(zks.split(","));
        if (zkPort != null && zkPort.length() > 0) {
            spoutConfig.zkPort = Integer.parseInt(zkPort);
        } else {
            spoutConfig.zkPort = 2181;
        }
        spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafkaSpout", new KafkaSpout(spoutConfig));
        builder.setBolt("customCounterBolt", new CustomBolt(), 1).shuffleGrouping("kafkaSpout");
        //Configuration
        Config conf = new Config();
        conf.setDebug(false);
        if (args != null && args.length > 0) {
            //提交到集群运行
            try {
                StormSubmitter.submitTopologyWithProgressBar("customCounterTopology", conf, builder.createTopology());
            } catch (AlreadyAliveException e) {
                e.printStackTrace();
            } catch (InvalidTopologyException e) {
                e.printStackTrace();
            } catch (AuthorizationException e) {
                e.printStackTrace();
            }
        } else {
            conf.setMaxTaskParallelism(3);
            //本地模式运行
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("CustomCounterTopology", conf, builder.createTopology());
        }
    }
}
五、配置文件application.properties
kafka.topic=test_one
# zookeeper
zk_hosts=10.10.10.10
zk_root=/kafka
zk_port=2181
# kafka消费组
zk_id="kafkaspout"

demo下载地址:http://download.csdn.net/detail/xunzaosiyecao/9829058

https://github.com/JianKunKing/storm-kafka-plugin-demo

作者:jiankunking 出处:http://blog.csdn.net/jiankunking

关注
打赏
1647422595
查看更多评论
立即登录/注册

微信扫码登录

0.0438s