本文用的是jstorm 2.2.1 一、pom引用
4.0.0
jiankunking
kafkajstorm
1.0-SNAPSHOT
http://blog.csdn.net/jiankunking
UTF-8
junit
junit
3.8.1
com.alibaba.jstorm
jstorm-core
2.2.1
org.slf4j
slf4j-log4j12
log4j
log4j
org.apache.kafka
kafka_2.9.2
0.8.1
org.apache.zookeeper
zookeeper
log4j
log4j
junit
junit
org.apache.zookeeper
zookeeper
3.4.5
log4j
log4j
org.slf4j
slf4j-log4j12
org.apache.curator
curator-framework
2.5.0
log4j
log4j
org.slf4j
slf4j-log4j12
com.googlecode.json-simple
json-simple
1.1
org.apache.httpcomponents
httpclient
4.3.3
org.slf4j
slf4j-api
1.7.5
org.slf4j
log4j-over-slf4j
1.7.10
ch.qos.logback
logback-classic
1.0.13
maven-assembly-plugin
jar-with-dependencies
make-assembly
package
single
org.apache.maven.plugins
maven-compiler-plugin
1.6
1.6
二、自定义bolt
package jiankunking.kafkajstorm.bolts;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.TupleImplExt;
import jiankunking.kafkajstorm.util.ByteUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
/**
* Created by jiankunking on 2017/4/19 16:47.
*/
public class CustomBolt extends BaseBasicBolt {
protected final Logger logger = LoggerFactory.getLogger(CustomBolt.class);
public void execute(Tuple input, BasicOutputCollector collector) {
try {
String ss=ByteUtil.getStringFromByteArray((byte[]) ((TupleImplExt) input).get("bytes"));
System.out.println(ss);
logger.info(ss);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
System.out.println("declareOutputFields");
}
}
三、自定义拓扑图入口类
package jiankunking.kafkajstorm.topologies;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import com.alibaba.jstorm.client.ConfigExtension;
import jiankunking.kafkajstorm.bolts.CustomBolt;
import jiankunking.kafkajstorm.kafka.KafkaSpout;
import jiankunking.kafkajstorm.kafka.KafkaSpoutConfig;
import jiankunking.kafkajstorm.util.PropertiesUtil;
import java.util.Map;
/**
* Created by jiankunking on 2017/4/19 16:27.
* 拓扑图 入口类
*/
public class CustomCounterTopology {
/**
* 入口类,即提交任务的类
*
* @throws InterruptedException
* @throws AlreadyAliveException
* @throws InvalidTopologyException
*/
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
System.out.println("11111");
PropertiesUtil propertiesUtil = new PropertiesUtil("/application.properties", false);
Map propsMap = propertiesUtil.getAllProperty();
KafkaSpoutConfig spoutConfig = new KafkaSpoutConfig(propertiesUtil.getProps());
spoutConfig.configure(propsMap);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafkaSpout", new KafkaSpout(spoutConfig));
builder.setBolt("customBolt", new CustomBolt(), 1).shuffleGrouping("kafkaSpout");
//Configuration
Config conf = new Config();
conf.setDebug(false);
//指定使用logback.xml
//需要把logback.xml文件放到jstorm conf目录下
ConfigExtension.setUserDefinedLogbackConf(conf, "%JSTORM_HOME%/conf/logback.xml");
if (args != null && args.length > 0) {
//提交到集群运行
StormSubmitter.submitTopologyWithProgressBar("customCounterTopology", conf, builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
//本地模式运行
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("CustomCounterTopology", conf, builder.createTopology());
}
}
}
四、配置文件application.properties
# kafka
# kafka 消费组
kafka.client.id=kafkaspoutid
kafka.broker.partitions=4
kafka.fetch.from.beginning=false
kafka.topic=test_one
kafka.broker.hosts=10.10.10.10:9092
kafka.zookeeper.hosts=10.10.10.10:2181
storm.zookeeper.root=/kafka
小注: 1、jstorm kafka插件源码集成 需要到jstorm的github官网:https://github.com/alibaba/jstorm/releases中找到你需要使用的release版本,下载源码,将其中的插件源码集成到你自己的项目中,插件源码位置如下图:
2、logback的使用 jstorm 2.1.1之后,jstorm默认使用了logback作为日志框架,logback在一般使用时是兼容log4j的,也就是说log4j可以直接桥接到logback,具体为:
a. 添加slf4j-api, log4j-over-slf4j和logback依赖(其实加了logback依赖之后就不需要加slf4j-api依赖了),具体:
org.slf4j
slf4j-api
1.7.5
org.slf4j
log4j-over-slf4j
1.7.10
ch.qos.logback
logback-classic
1.0.13
b. 排除pom中所有的slf4j-log4j12的依赖,因为slf4j-log4j12跟log4j-over-slf4j是冲突的:
org.slf4j
slf4j-log4j12
1.7.5
provided
这里版本一般是1.7.5,但是还要具体看你的应用pom仲裁出的版本。
理论上,这样就能够把log4j桥接到slf4j。
demo下载地址:
http://download.csdn.net/detail/xunzaosiyecao/9829079
https://github.com/JianKunKing/jstorm-kafka-plugin-demo
作者:jiankunking 出处:http://blog.csdn.net/jiankunking