事务消息
1、介绍
“事务消息”,是在 “普通消息” 的基础上,要求按发布的一批消息,要么全成功,要么全失败。
- 生产者发布消息,会要求服务端给确认(confirm),否则异常提醒
- 服务端收到消息,并转到事务中转队列后,会答复一个接收确认
- 生产者提交事务确认(提交或回滚),并要求服务端给确认(confirm)
- 如果生产者没有提交确认,则服务端会有1分钟后发起反向检查。如果一直没成功,则在1小时后放弃
- 服务收到事务确认后,把事务消息转到派发队列,并答复一个接收确认
- 如果是回滚,则删除事务消息
- 服务端派发消息,会要求消费者给回执(ack)
- 如果派发失败,会不断延时重试,直到派发成功为止
- 消费者消费后,会答复一个消费回执
2、代码演示
//准备(1.取名字;2.添加响应实现)
MqClient client = FolkMQ.createClient("folkmq://127.0.0.1:18602")
.nameAs("demoapp") //一般用当前应用名
.connect();
//用于服务端发起的反向确认
client.transactionCheckback(m->{
//客户端未完成事务确认。由服务端发起补尝确认
if("1".equals(m.getAttr("orderId"))) {
//一般这里,需要查询数据库之类的
m.acknowledge(true);
}
});
//发送事务消息
MqTransaction tran = client.newTransaction();
try {
//同步
client.publish("demo.topic", new MqMessage("demo1").attr("orderId","1").transaction(tran));
client.publish("demo.topic", new MqMessage("demo2").attr("orderId","1").transaction(tran));
//异步,也行!
client.publishAsync("demo.topic", new MqMessage("demo3").attr("orderId","1").transaction(tran));
client.publishAsync("demo.topic", new MqMessage("demo4").attr("orderId","1").transaction(tran));
tran.commit();
} catch (Throwable e) {
tran.rollback();
}