FolkMQ v1.7.8

消费者 - 流转批消费

FolkMQ 默认情况下,消息是流式不断派发过来。如果需要批量处理消息,要借助“流转批”工具:PackagingLoop(打包循环器)。原理:

  • 消息收到本地队列
  • 有个线程不断取队列的消息,满足条件后转给 PackagingWorkHandler 处理

1、消费定制示例:

public class MqConsumeHandlerImpl implements MqConsumeHandler, PackagingWorkHandler<MqMessageReceived> {

    private final PackagingLoop<MqMessageReceived> packagingLoop = new PackagingLoopImpl<>();

    public MqConsumeHandlerImpl() {
        //设置处理
        packagingLoop.setWorkHandler(this);
        //空闲间隔(是指没有消息时休息间隔)
        packagingLoop.setIdleInterval(100);
        //打包大小(收集多少条后再批量消费)
        packagingLoop.setPacketSize(100);
    }

    @Override
    public void consume(MqMessageReceived message) throws Exception {
        packagingLoop.add(message);
    }

    @Override
    public void doWork(List<MqMessageReceived> list) throws Exception {
        //开始做批量处理
        DemoService.insertList(list);
    }
}

2、应用参考(只是示例参考):

let client = FolkMQ.createClient("folkmq://127.0.0.1:" + getPort())
                .autoAcknowledge(true) //自动 ack
                .connect();

//订阅
client.subscribe("demo.topic", new MqConsumeHandlerImpl());

//发布
for (int i = 0; i < count; i++) {
    //用绝对顺序消费;用 qos0 策略,丢了不管,且只最多只消费一次
    client.publishAsync("demo.topic", new MqMessage("demo-" + i).sequence(true).qos(0)); 
}