您当前的位置: 首页 >  kafka

cuiyaonan2000

暂无认证

  • 0浏览

    0关注

    248博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Flink: Kafka source & sink

cuiyaonan2000 发布时间:2021-03-09 14:06:30 ,浏览量:0

序言

Kafka作为Flink的数据源来进行Demo的制作。

参考:

  • Apache Flink 1.12 Documentation: Apache Kafka 连接器
  • Apache Flink 1.10 Documentation: 配置依赖、连接器、类库
  • 生成 Watermark | Apache Flink
Kafka连接器版本选择

连接器JAR

Flink-kafka-connector用来做什么?

Kafka中的partition机制和Flink的并行度机制结合,实现数据恢复Kafka可以作为Flink的source和sink 任务失败,通过设置kafka的offset来恢复应用



	org.apache.flink
	flink-connector-kafka_2.11
	1.12.0

自定义序列化

如果要自定义序列化类则需要用到如下的jar

  1. flink 提供的SimpleStringSchema反序列化默认只将消息输出,topic信息没有,
  2. JSONKeyValueDeserializationSchema类提供了topic消息,要求消息体为json。
  3. 当这些不能满足时,flink也提供了序列化和反序列化接口KeyedDeserializationSchema和KeyedSerializationSchema,可以自定义实现。我们一般都是从kafka消费消息自定义实现KeyedDeserializationSchema接口就可以了。

  org.apache.flink
  flink-avro
  1.12.0

source
package cui.yao.nan.flink.string;

import java.util.Properties;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.springframework.boot.CommandLineRunner;

import com.google.gson.Gson;

import cui.yao.nan.pojo.Person;

public class KafkaProducer implements CommandLineRunner{
	
	public static Gson gson = new Gson();

	
	
	public static void main(String[] args) {
		
		new KafkaProducer().run(null);
		
	}
	
	@Override
	public void run(String... args) {
		
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		
		//FlinkKafkaConsumer(String topic, DeserializationSchema valueDeserializer, Properties props)
		FlinkKafkaProducer kafkaProducer = new 
				FlinkKafkaProducer("topic-name-cui"
				,   new SimpleStringSchema()  , getProperties());
		
		// 加上时间戳,0.10版本之后可以用
		kafkaProducer.setWriteTimestampToKafka(true);
		
		DataStream dataStream = env.fromElements(
				gson.toJson(new Person("cui",1)),
						gson.toJson(new Person("yao",2)),
								gson.toJson(new Person("nan",3)));
		
		//中间可以对dataStream进行处理
		
		//print() 打印其结果到 task manager 的日志中
		//(如果运行在 IDE 中时,将追加到你的 IDE 控制台)。它会对流中的每个元素都调用 toString() 方法。
		dataStream.addSink(kafkaProducer);
		
		try {
			env.execute("start kafkaproducer");
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	 
	 
	}

	private Properties getProperties() {
		
		Properties properties = new Properties();
		properties.setProperty("bootstrap.servers", "10.1.80.190:9092");
		properties.setProperty("zookeeper.connect", "10.1.80.190:2181");
		properties.setProperty("group.id", "15");
//		properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//		properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		return properties;
		
	}

}
source中控制消费的位置

消费失败的管理

对应的代码如下:

设置水位线控制类

这里必须要设置否则没窗口 

这里写法较多可以参考生成 Watermark | Apache Flink

参考代码如下:

package demo;
 
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import java.time.Duration;
 
/**
 * Author: Chenghui Bai
 * Date: 2021/3/4 17:48
 * ProjectName: frauddetection
 * PackageName: demo
 * ClassName: WatermarkWCDemo
 * Version:
 * Description:
 */
public class WatermarkWCDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource inputStream = env.socketTextStream("localhost", 7777);
        SerializableTimestampAssigner timestampAssigner = new SerializableTimestampAssigner() {
            @Override
            public long extractTimestamp(String element, long recordTimestamp) {
                String[] fields = element.split(",");
                Long aLong = new Long(fields[0]);
                return aLong * 1000L;
            }
        };
        SingleOutputStreamOperator watermarkStream = inputStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner(timestampAssigner));
        watermarkStream.flatMap(new FlatMapFunction() {
            @Override
            public void flatMap(String value, Collector out) throws Exception {
                String[] fields = value.split(",");
                out.collect(new Tuple2(fields[1], 1));
            }
        }).keyBy(data -> data.f0).window(TumblingEventTimeWindows.of(Time.seconds(10))).sum(1).print();
        env.execute("run watermark wc");
    }
}
sink
package cui.yao.nan.flink.string;

import java.util.Properties;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.springframework.boot.CommandLineRunner;

public class KafkaConsumer implements CommandLineRunner {
	
	
	
	
	public static void main(String[] args) {
		new KafkaConsumer().run(null);
	}
	
	@Override
	public void run(String... args) {

		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// FlinkKafkaConsumer(String topic, DeserializationSchema
		// valueDeserializer, Properties props)
		FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer("topic-name-cui", 
				new SimpleStringSchema(),
				getProperties());

		DataStream dataStream = env.addSource(kafkaConsumer);
		
		
		//打印也是一种sink
		dataStream.print();
		

		try {
			env.execute("start kafkaconsume");
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}


	}

	private Properties getProperties() {
		Properties properties = new Properties();
		properties.setProperty("bootstrap.servers", "10.1.80.190:9092");
		properties.setProperty("zookeeper.connect", "10.1.80.190:2181");
		properties.setProperty("group.id", "422");
//		properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//		properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		return properties;
	}

}

消费内容如下:( 2> 指出输出来自哪个 sub-task(即 thread))

关注
打赏
1638267374
查看更多评论
立即登录/注册

微信扫码登录

0.0355s