前言:
毒丸对象,在日常开发中倒不是经常使用到。笔者在阅读datax的源码时,发现到这个骚操作,赶紧去查了下相关概念和使用场景。本文就当做一个简单的记录,以后在类似的开发场景中能够及时想到这么一种方案。
1.毒丸对象毒丸,是指放在一个队列上的对象。一般在FIFO的队列中,生产者生产消息,消费者消费消息,当生产者生产完所有的消息后,一般会最终发送一个毒丸对象(告诉消费者这是最后一个对象);而当消费者按顺序消费对象时,若消费到毒丸对象,则可以判定这是生产者生产的最后一个对象了,后续就可以关闭消费者。
所以,针对毒丸对象的使用场景一般就是如此,消费者可以通过毒丸对象来了解是否已经消费完所有的消息,若全部完成,可顺利结束当前消费线程。
2.datax的使用示例笔者不多介绍datax的一些知识,读者可以从其官网获取。
在笔者的示例中,从一个MysqlReader读取数据,同步到一个MysqlWriter。从源码看来这分别是通过两个线程来执行的任务。MysqlReader产生的数据则交给Channel来暂存,MysqlWriter则从Channel中拉取数据。具体代码如下:
2.1 生产数据代码:public class RecordExchanger extends TransformerExchanger implements RecordSender, RecordReceiver {
public void sendToWriter(Record record) {
if(shutdown){
throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
}
record = doTransformer(record);
if (record == null) {
return;
}
// 托管给channel
this.channel.push(record);
doStat();
}
}
2.1.1 毒丸对象的放置
当Channel关闭时或Exchange关闭时,则放置毒丸对象,告诉消费者数据已全部发送完毕
public class MemoryChannel extends Channel {
public void close() {
super.close();
try {
// 放置毒丸对象
this.queue.put(TerminateRecord.get());
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
2.2 拉取数据代码:
public class RecordExchanger extends TransformerExchanger implements RecordSender, RecordReceiver {
@Override
public Record getFromReader() {
if(shutdown){
throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
}
// 通过channel拉取数据
Record record = this.channel.pull();
// TerminateRecord则是毒丸数据
return (record instanceof TerminateRecord ? null : record);
}
}
2.2.1 消费者获取到毒丸对象之后的操作
public class CommonRdbmsWriter {
public void startWriteWithConnection(RecordReceiver recordReceiver, TaskPluginCollector taskPluginCollector, Connection connection) {
...
try {
Record record;
// 获取到毒丸对象时返回null,此时则跳出while循环
while((record = recordReceiver.getFromReader()) != null) {
...
writeBuffer.add(record);
bufferBytes += record.getMemorySize();
if (writeBuffer.size() >= this.batchSize || bufferBytes >= this.batchByteSize) {
this.doBatchInsert(connection, writeBuffer);
writeBuffer.clear();
bufferBytes = 0;
}
}
}
2.3 Channel(默认使用MemoryChannel)
public class MemoryChannel extends Channel {
private int bufferSize = 0;
// 使用ArrayBlockingQueue来暂存数据
private ArrayBlockingQueue queue = null;
// 存放数据
protected void doPush(Record r) {
try {
long startTime = System.nanoTime();
this.queue.put(r);
waitWriterTime += System.nanoTime() - startTime;
memoryBytes.addAndGet(r.getMemorySize());
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
// 拉取数据
protected Record doPull() {
try {
long startTime = System.nanoTime();
Record r = this.queue.take();
waitReaderTime += System.nanoTime() - startTime;
memoryBytes.addAndGet(-r.getMemorySize());
return r;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
}
}
总结:
多学习些源码还是有好处的,有很多意想不到的知识点运用。与君共勉。