生产者 - 发布
1、发布普通消息
发布接口有两个参数,依次为:主题,消息。
client.publish("demo", FolkMQ.newMessage("hello"));
发布带属性的消息
client.publish("demo", FolkMQ.newMessage("hello").attr("from", "noear"));
2、发布定时消息
比如,指定5秒后派发。因为是纯内存运行,要控制好定时消息的数量与内存。如果量大,可先入数据库再中转。
let scheduled = new Date(new Date().getTime() + 5000);
client.publish("demo", FolkMQ.newMessage("hello").scheduled(scheduled));
3、发布时效消息(可以过期的,过期就会弃用)
比如,指定5秒后过期。5秒内如果没有被正常消费,此消息便弃用(永远没了)
let expiration = new Date(new Date().getTime() + 5000);
client.publish("demo", FolkMQ.newMessage("hello").expiration(expiration));
4、发布顺序消息
顺序消息,只对第一次发送时有效。如果消费失败自动延时重试就无效了。
client.publish("demo", FolkMQ.newMessage("hello").sequence(true));
顺序消息加分片负载均衡:
client.publish("demo", FolkMQ.newMessage("hello").sequence(true, "sharding-1")));
5、发布事务消息
事务消息,通过二段式提交,实现一批消息要么全成功要么全失败的原子效果。也可与其它事务结合使用。
//用于服务端发起的反向确认
client.transactionCheckback(m => {
//客户端未完成事务确认。由服务端发起补尝确认
if("1".equals(m.getAttr("orderId"))) {
//一般这里,需要查询数据库之类的
m.acknowledge(true);
}
});
//发送事务消息
const tran = client.newTransaction();
try {
client.publish("demo", FolkMQ.newMessage("demo1").transaction(tran));
client.publish("demo", FolkMQ.newMessage("demo2").transaction(tran));
client.publish("demo", FolkMQ.newMessage("demo3").transaction(tran));
client.publish("demo", FolkMQ.newMessage("demo4").transaction(tran));
tran.commit();
} catch (err) {
tran.rollback();
}
6、发布 Qos0 消息(默认是 Qos1 的)
Qos0 是指发完后,服务端不需要确认。消费者端有没有收到,也不管了。
client.publish("demo", FolkMQ.newMessage("hello").qos(0));