FolkMQ v1.7.8

事务消息

1、介绍

“事务消息”,是在 “普通消息” 的基础上,要求按发布的一批消息,要么全成功,要么全失败。

  • 生产者发布消息,会要求服务端给确认(confirm),否则异常提醒
  • 服务端收到消息,并转到事务中转队列后,会答复一个接收确认
  • 生产者提交事务确认(提交或回滚),并要求服务端给确认(confirm)
    • 如果生产者没有提交确认,则服务端会有1分钟后发起反向检查。如果一直没成功,则在1小时后放弃
  • 服务收到事务确认后,把事务消息转到派发队列,并答复一个接收确认
    • 如果是回滚,则删除事务消息
  • 服务端派发消息,会要求消费者给回执(ack)
    • 如果派发失败,会不断延时重试,直到派发成功为止
  • 消费者消费后,会答复一个消费回执

2、代码演示

//准备(1.取名字;2.添加响应实现)
MqClient client = FolkMQ.createClient("folkmq://127.0.0.1:18602")
    .nameAs("demoapp") //一般用当前应用名
    .connect();

//用于服务端发起的反向确认
client.transactionCheckback(m->{
  //客户端未完成事务确认。由服务端发起补尝确认
  if("1".equals(m.getAttr("orderId"))) {
      //一般这里,需要查询数据库之类的
      m.acknowledge(true);
  }
});
    
//发送事务消息    
MqTransaction tran = client.newTransaction();

try {
    //同步
    client.publish("demo.topic", new MqMessage("demo1").attr("orderId","1").transaction(tran));
    client.publish("demo.topic", new MqMessage("demo2").attr("orderId","1").transaction(tran));
    //异步,也行!
    client.publishAsync("demo.topic", new MqMessage("demo3").attr("orderId","1").transaction(tran));
    client.publishAsync("demo.topic", new MqMessage("demo4").attr("orderId","1").transaction(tran));

    tran.commit();
} catch (Throwable e) {
    tran.rollback();
}