消费者 - 流转批消费
FolkMQ 默认情况下,消息是流式不断派发过来。如果需要批量处理消息,要借助“流转批”工具:PackagingLoop(打包循环器)。原理:
- 消息收到本地队列
- 有个线程不断取队列的消息,满足条件后转给 PackagingWorkHandler 处理
1、消费定制示例:
public class MqConsumeHandlerImpl implements MqConsumeHandler, PackagingWorkHandler<MqMessageReceived> {
private final PackagingLoop<MqMessageReceived> packagingLoop = new PackagingLoopImpl<>();
public MqConsumeHandlerImpl() {
//设置处理
packagingLoop.setWorkHandler(this);
//空闲间隔(是指没有消息时休息间隔)
packagingLoop.setIdleInterval(100);
//打包大小(收集多少条后再批量消费)
packagingLoop.setPacketSize(100);
}
@Override
public void consume(MqMessageReceived message) throws Exception {
packagingLoop.add(message);
}
@Override
public void doWork(List<MqMessageReceived> list) throws Exception {
//开始做批量处理
DemoService.insertList(list);
}
}
2、应用参考(只是示例参考):
let client = FolkMQ.createClient("folkmq://127.0.0.1:" + getPort())
.autoAcknowledge(true) //自动 ack
.connect();
//订阅
client.subscribe("demo.topic", new MqConsumeHandlerImpl());
//发布
for (int i = 0; i < count; i++) {
//用绝对顺序消费;用 qos0 策略,丢了不管,且只最多只消费一次
client.publishAsync("demo.topic", new MqMessage("demo-" + i).sequence(true).qos(0));
}