FolkMQ v1.7.8

生产者 - 发布

发布支持同步与异步。异步发布时,需要通过回调函数获得发布确认结果。

1、发布普通消息

发布接口有两个参数,依次为:主题,消息。

//同步
client.publish("demo.topic", new MqMessage("hello"));
//异步
client.publishAsync("demo.topic", new MqMessage("hello"))
      .whenComplete((isOk, err) -> {

      });

发布二进制消息

//同步
client.publish("demo.topic", new MqMessage("hello".getBytes()));
//异步
client.publishAsync("demo.topic", new MqMessage("hello".getBytes()))
      .whenComplete((isOk, err) -> {

      });

发布带属性的消息

//同步
client.publish("demo.topic", new MqMessage("hello").attr("from", "noear"));
//异步
client.publishAsync("demo.topic", new MqMessage("hello").attr("from", "noear"));
      .whenComplete((isOk, err) -> {

      });

2、发布定时消息

比如,指定5秒后派发。因为是纯内存运行,要控制好定时消息的数量与内存。如果量大,可先入数据库再中转。

Date scheduled = new Date(System.currentTimeMillis() + 5000);

//同步
client.publish("demo.topic", new MqMessage("hello").scheduled(scheduled));
//异步
client.publishAsync("demo.topic", new MqMessage("hello").scheduled(scheduled))
      .whenComplete((isOk, err) -> {

      });

3、发布时效消息(可以过期的,过期就会弃用)

比如,指定5秒后过期。5秒内如果没有被正常消费,此消息便弃用(永远没了)

Date expiration = new Date(System.currentTimeMillis() + 5000);

//同步
client.publish("demo.topic", new MqMessage("hello").expiration(expiration));
//异步
client.publishAsync("demo.topic", new MqMessage("hello").expiration(expiration))
      .whenComplete((isOk, err) -> {

      });

4、发布顺序消息

顺序消息,只对第一次发送时有效。如果消费失败自动延时重试就无效了。

//同步
client.publish("demo.topic", new MqMessage("hello").sequence(true)));
//异步
client.publishAsync("demo.topic", new MqMessage("hello").sequence(true)))
      .whenComplete((isOk, err) -> {

      });

顺序消息加分片负载均衡:

//同步
client.publish("demo.topic", new MqMessage("hello").sequence(true, "sharding-1")));
//异步
client.publishAsync("demo.topic", new MqMessage("hello").sequence(true, "sharding-1")))
      .whenComplete((isOk, err) -> {

      });

5、发布事务消息

事务消息,通过二段式提交,实现一批消息要么全成功要么全失败的原子效果。也可与其它事务结合使用。

//用于服务端发起的反向确认
client.transactionCheckback(m->{
  //客户端未完成事务确认。由服务端发起补尝确认
  if("1".equals(m.getAttr("orderId"))) {
      //一般这里,需要查询数据库之类的
      m.acknowledge(true);
  }
});

//发送事务消息    
MqTransaction tran = client.newTransaction();

try {
    client.publish("demo.topic", new MqMessage("demo1").transaction(tran));
    client.publish("demo.topic", new MqMessage("demo2").transaction(tran));
    client.publish("demo.topic", new MqMessage("demo3").transaction(tran));
    client.publish("demo.topic", new MqMessage("demo4").transaction(tran));

    tran.commit();
} catch (Throwable e) {
    tran.rollback();
}

6、发布广播消息

广播消息,是指生产者发布消息后,所有订阅此主题的消费者全部者能收到消息

  • 生产者发布消息,服务端会给确认(可靠)
  • 服务端只派发一次消息,不要求消费者给回执(这部是不可靠的)
//同步
client.publish("demo.topic", new MqMessage("hello").broadcast(true));
//异步
client.publishAsync("demo.topic", new MqMessage("hello").broadcast(true))
      .whenComplete((isOk, err) -> {

      });

7、发布 Qos0 消息(默认是 Qos1 的)

Qos0 是指发完后,服务端不需要确认。消费者端有没有收到,也不管了。

//同步
client.publish("demo.topic", new MqMessage("hello").qos(0));
//异步
client.publishAsync("demo.topic", new MqMessage("hello").qos(0))
      .whenComplete((isOk, err) -> {

      });