您当前的位置: 首页 >  kafka

宝哥大数据

暂无认证

  • 1浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Kafka-Connect实践

宝哥大数据 发布时间:2021-08-10 14:17:22 ,浏览量:1

一、Kafka-Connect介绍

  Kafka是一个使用越来越广的消息系统,尤其是在大数据开发中(实时数据处理和分析)。为集成其他系统和解耦应用,

  • 之前经常使用Producer来发送消息到Broker,并使用Consumer来消费Broker中的消息。
  • Kafka Connect是到0.9版本才提供的并极大的简化了其他系统与Kafka的集成。Kafka Connect运用用户快速定义并实现各种Connector(File,Jdbc,Hdfs等),这些功能让大批量数据导入/导出Kafka很方便。 在这里插入图片描述
1.1、Kafka Connect 特性包括:
  • Kafka connector通用框架,提供统一的集成API
  • 同时支持分布式模式和单机模式
  • REST 接口,用来查看和管理Kafka connectors
  • 自动化的offset管理,开发人员不必担心错误处理的影响
  • 分布式、可扩展
  • 流/批处理集成
二、Kafka-Connect实践 2.1、Flie Connector 测试

在这里插入图片描述 本例使用到了两个Connector:

  • FileStreamSource:从test.txt中读取并发布到Broker
  • FileStreamSink:从Broker中读取数据并写入到test.sink.txt文件中
2.1.1、配置Source

其中的Source使用到的配置文件是${KAFKA_HOME}/config/connect-file-source.properties

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
2.1.2、配置Sink

其中的Sink使用到的配置文件是${KAFKA_HOME}/config/connect-file-sink.properties

name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test
2.1.3、配置 Broker

Broker使用到的配置文件是${KAFKA_HOME}/config/connect-standalone.properties

# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# The internal converter used for offsets and config data is configurable and must be specified, but most users will
# always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
2.1.4、启动Source Connector和Sink Connector
cd ${KAFKA_HOME}
./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties 
2.1.5、打开console-consumer
./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic connect-test
2.1.6、写入到test.txt文件中,并观察console-consumer中的变化
[root@Server4 kafka_2.12-0.11.0.0]# echo 'firest line' >> test.txt
[root@Server4 kafka_2.12-0.11.0.0]# echo 'second line' >> test.txt

console-consumer中打开的窗口输出如下

{"schema":{"type":"string","optional":false},"payload":"firest line"}
{"schema":{"type":"string","optional":false},"payload":"second line"}

查看 test.sink.txt

[root@Server4 kafka_2.12-0.11.0.0]# cat test.sink.txt 
firest line
second line
关注我的公众号【宝哥大数据】,更多干货

在这里插入图片描述 参考 https://www.cnblogs.com/videring/articles/6371081.html

关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0400s