生产者 - 发布
发布支持同步与异步。异步发布时,需要通过回调函数获得发布确认结果。
1、发布普通消息
发布接口有两个参数,依次为:主题,消息。
//同步
client.publish("demo.topic", new MqMessage("hello"));
//异步
client.publishAsync("demo.topic", new MqMessage("hello"))
.whenComplete((isOk, err) -> {
});
发布二进制消息
//同步
client.publish("demo.topic", new MqMessage("hello".getBytes()));
//异步
client.publishAsync("demo.topic", new MqMessage("hello".getBytes()))
.whenComplete((isOk, err) -> {
});
发布带属性的消息
//同步
client.publish("demo.topic", new MqMessage("hello").attr("from", "noear"));
//异步
client.publishAsync("demo.topic", new MqMessage("hello").attr("from", "noear"));
.whenComplete((isOk, err) -> {
});
2、发布定时消息
比如,指定5秒后派发。因为是纯内存运行,要控制好定时消息的数量与内存。如果量大,可先入数据库再中转。
Date scheduled = new Date(System.currentTimeMillis() + 5000);
//同步
client.publish("demo.topic", new MqMessage("hello").scheduled(scheduled));
//异步
client.publishAsync("demo.topic", new MqMessage("hello").scheduled(scheduled))
.whenComplete((isOk, err) -> {
});
3、发布时效消息(可以过期的,过期就会弃用)
比如,指定5秒后过期。5秒内如果没有被正常消费,此消息便弃用(永远没了)
Date expiration = new Date(System.currentTimeMillis() + 5000);
//同步
client.publish("demo.topic", new MqMessage("hello").expiration(expiration));
//异步
client.publishAsync("demo.topic", new MqMessage("hello").expiration(expiration))
.whenComplete((isOk, err) -> {
});
4、发布顺序消息
顺序消息,只对第一次发送时有效。如果消费失败自动延时重试就无效了。
//同步
client.publish("demo.topic", new MqMessage("hello").sequence(true)));
//异步
client.publishAsync("demo.topic", new MqMessage("hello").sequence(true)))
.whenComplete((isOk, err) -> {
});
顺序消息加分片负载均衡:
//同步
client.publish("demo.topic", new MqMessage("hello").sequence(true, "sharding-1")));
//异步
client.publishAsync("demo.topic", new MqMessage("hello").sequence(true, "sharding-1")))
.whenComplete((isOk, err) -> {
});
5、发布事务消息
事务消息,通过二段式提交,实现一批消息要么全成功要么全失败的原子效果。也可与其它事务结合使用。
//用于服务端发起的反向确认
client.transactionCheckback(m->{
//客户端未完成事务确认。由服务端发起补尝确认
if("1".equals(m.getAttr("orderId"))) {
//一般这里,需要查询数据库之类的
m.acknowledge(true);
}
});
//发送事务消息
MqTransaction tran = client.newTransaction();
try {
client.publish("demo.topic", new MqMessage("demo1").transaction(tran));
client.publish("demo.topic", new MqMessage("demo2").transaction(tran));
client.publish("demo.topic", new MqMessage("demo3").transaction(tran));
client.publish("demo.topic", new MqMessage("demo4").transaction(tran));
tran.commit();
} catch (Throwable e) {
tran.rollback();
}
6、发布广播消息
广播消息,是指生产者发布消息后,所有订阅此主题的消费者全部者能收到消息
- 生产者发布消息,服务端会给确认(可靠)
- 服务端只派发一次消息,不要求消费者给回执(这部是不可靠的)
//同步
client.publish("demo.topic", new MqMessage("hello").broadcast(true));
//异步
client.publishAsync("demo.topic", new MqMessage("hello").broadcast(true))
.whenComplete((isOk, err) -> {
});
7、发布 Qos0 消息(默认是 Qos1 的)
Qos0 是指发完后,服务端不需要确认。消费者端有没有收到,也不管了。
//同步
client.publish("demo.topic", new MqMessage("hello").qos(0));
//异步
client.publishAsync("demo.topic", new MqMessage("hello").qos(0))
.whenComplete((isOk, err) -> {
});