目录
1. Transaction的作用
- 1. Transaction的作用
- 2. Transaction的各种概念
- 3. 开启Transaction
- 4. 使用事务
事务API的使用场景:
- 实现从一个topic消费数据,发送消息到另一个topic的原子性操作
- 实现一条消息发送到多个topic的原子性操作
- 实现从多个topic消费数据进行ack确认的原子性操作
允许事件流应用将消费、处理、生产消息整个过程定义为一个原子操作。一个事务涉及的所有操作要么全部成功,要么全部失败
2. Transaction的各种概念- 事务调度器:事务提交后,事务调度器与相关联的topic所在的broker交互完成事务。 所有事务元数据都持久化在pulsar的一个topic中
- 事务ID:pulsar中用于标记一条事务。长度为128位字节, 前16位表示事务协调器的ID, 其余位用于代表事务协调器中的一个个事务, 是递增的
- 事务缓存:事务中产生的消息存储在事务缓冲区中。在事务提交之前,事务缓存中的消息对消费者是不可见的。当事务中止时,事务缓冲区中的消息将被丢弃
- 待确认状态:挂起确认状态在事务完成之前维护事务中的消息确认。 如果消息处于挂起确认状态,则在该消息从挂起确认状态中移除之前,其他事务无法确认该消息。挂起的确认状态被保留到挂起的确认日志中(cursor ledger)。 新启动的broker可以从挂起的确认日志中恢复状态,以确保状态确认不会丢失
- 修改Pulsar集群所有服务器的conf/broker.conf文件,修改内容如下:
transactionCoordinatorEnabled=true
acknowledgmentAtBatchIndexLevelEnabled=true
- 在Pulsar集群的一台服务器上,初始化事务协调器的元数据
[root@bigdata001 apache-pulsar-2.9.1]#
[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar initialize-transaction-coordinator-metadata -cs bigdata002:2181 -c pulsar-cluster
......省略部分......
2022-04-11T18:03:11,386+0800 [main-EventThread] INFO org.apache.zookeeper.ClientCnxn - EventThread shut down for session: 0x1007d52d99e691d
Transaction coordinator metadata setup success
[root@bigdata001 apache-pulsar-2.9.1]#
然后重启Pulsar集群的所有Bookie和Broker
4. 使用事务import org.apache.pulsar.client.api.{Consumer, Message, Producer, PulsarClient, Schema}
import org.apache.pulsar.client.api.transaction.Transaction
import java.util.concurrent.TimeUnit
object pulsar_transaction_test {
def main(args: Array[String]): Unit = {
// 构建一个支持事务的客户端
val pulsarClient:PulsarClient = PulsarClient.builder()
.serviceUrl("pulsar://192.168.8.111:6650,192.168.8.113:6650")
.enableTransaction(true)
.build()
// 开启事务支持
val txn:Transaction = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.MINUTES)
.build().get()
try {
// =======接收消息======
val consumer:Consumer[String] = pulsarClient.newConsumer(Schema.STRING)
.topic("persistent://public/default/txn1")
.subscriptionName("txn_sub")
// 开启批量消息确认
.enableBatchIndexAcknowledgment(true)
.subscribe()
// ======获取消息的数据======
val message:Message[String] = consumer.receive()
println("topic是: " + message.getTopicName() + ", 消息是: " + new String(message.getData()))
// ======将接收到的消息, 处理后, 发送到另一个Topic中======
val producer:Producer[String] = pulsarClient.newProducer(Schema.STRING)
.topic("persistent://public/default/txn2")
// .sendTimeout(0, TimeUnit.MILLISECONDS)
.create()
producer.newMessage(txn).value(message.getValue()).send()
// ======抛出异常======
1 / 0
// ======确认消费的消息======
consumer.acknowledge(message)
// ======提交事务======
txn.commit()
} catch {
case e: Exception => {
// ======回滚事务======
txn.abort()
e.printStackTrace()
}
}
}
}
执行程序,然后向topic txn1发送消息"Hello Transaction Pulsar"。同时启动consume消费topic txn2的数据
此时程序的结果如下:
topic是: persistent://public/default/txn1, 消息是: Hello Transaction Pulsar
java.lang.ArithmeticException: / by zero
at flink_test$.main(flink_test.scala:49)
at flink_test.main(flink_test.scala)
将程序终止,将程序1 / 0
这行注释,再次运行程序,结果如下
topic是: persistent://public/default/txn1, 消息是: Hello Transaction Pulsar
此时consume txn2可以得到数据了