FolkMQ v1.7.8

流量控制与内存控制的定制

v1.6.0 后支持。当我们使用“暴力”异步发送时。可以基于 TrafficLimiter 接口,实现帧的流量控制与内存控制。

默认帧流收发各为 1w/s。一般不需要再定制了。

1、为什么要限流?

因为 java sdk 的底层是异步IO处理,当暴力发送时,IO又跟不上,数据就会累积在“事件循环器”(就是个队列)。造成内存越来越大。限流,主要是为控制客户端的内存。

比如 netty 的适配,有时候以为发送很快,其实数据都在“事件循环器”里(异步IO都有类似情况)。

2、要控制流量与控制内存

比如,我们只想开放 3000/s 的帧流量窗口,且不想涨太多内存。可以使用默认的 TrafficLimiterDefault。此方案会使用 sleep 避免客户端内存暴涨。

let client = FolkMQ.createClient("folkmq://127.0.0.1:18602")
    .config(c -> c.trafficLimiter(new TrafficLimiterDefault(3_000)))
    .connect();
    
while (true) {
    client.publishAsync("/jlwu/receive/gateway", new MqMessage("test"));
}

3、只要控制流量,内存无所谓(仅供参考)

这个方案对 FolkMQ 服务端是友好的,可以减轻压力。我们需要定制一个流量制限器,把 sleep 替换掉。

  • 如果一直暴力异步发送,有可能会把自己发暴了。
  • 如果偶尔暴力异步发送,可以起到很好的缓冲作用,后面流量下来内存可以自然降下来。
public class TrafficLimiterCached implements TrafficLimiter {
    private int sendRate;
    private int receRate;
    private final long interval = 1000L;
    private final ScheduledExecutorService executorService;

    private int sendCount;
    private int receCount;
    private long sendLatestLimitTime = Long.MIN_VALUE; // 发送数据限流重置时间 //必须设为最小值
    private long receLatestLimitTime = Long.MIN_VALUE; // 接收数据限流重置时间

    private long receLatestTime = Long.MIN_VALUE; // 最后接收时间
    private long sendLatestTime = Long.MIN_VALUE; // 最后发送时间

    public TrafficLimiterCached(int sendAndReceRate) {
        this(sendAndReceRate, sendAndReceRate);
    }

    public TrafficLimiterCached(int sendRate, int receRate) {
        this.sendRate = sendRate;
        this.receRate = receRate;
        this.executorService = Executors.newSingleThreadScheduledExecutor();
    }

    /**
     * 发送帧(在写锁范围,才有效)
     *
     * @param frameIoHandler   帧输入输出处理
     * @param channel          通道
     * @param frame            帧
     * @param channelAssistant 通道助理
     * @param target           发送目标
     */
    @Override
    public <S> void sendFrame(FrameIoHandler frameIoHandler, ChannelInternal channel, Frame frame, ChannelAssistant<S> channelAssistant, S target, IoCompletionHandler completionHandler) {
        if (sendRate < 1) {
            //没有限制
            frameIoHandler.sendFrameHandle(channel, frame, channelAssistant, target, completionHandler);
            return;
        }

        if (sendLatestTime >= sendLatestLimitTime) {
            //超过间隔重置时间
            sendCount = 0;
            sendLatestLimitTime = RunUtils.milliSecondFromNano() + interval; // 更新下次重置时间
        }

        if (sendCount < sendRate) {
            sendCount++;
            frameIoHandler.sendFrameHandle(channel, frame, channelAssistant, target, completionHandler);
        } else {
            sendLatestTime = RunUtils.milliSecondFromNano(); // 到达限制了 记录最后时间
            if (sendLatestTime < sendLatestLimitTime) {
                try {
                    // 如果太快,则延时再试
                    executorService.schedule(() -> {
                        sendFrame(frameIoHandler, channel, frame, channelAssistant, target, completionHandler);
                    }, sendLatestLimitTime - sendLatestTime, TimeUnit.MICROSECONDS);
                } catch (Throwable e) {
                }
                return;
            }

            sendFrame(frameIoHandler, channel, frame, channelAssistant, target, completionHandler);
        }
    }

    /**
     * 接收帧(在读线程里,才有效)
     *
     * @param frameIoHandler 帧输入输出处理
     * @param channel        通道
     * @param frame          帧
     */
    @Override
    public void reveFrame(FrameIoHandler frameIoHandler, ChannelInternal channel, Frame frame) {
        if (receRate < 1) {
            //没有限制
            frameIoHandler.reveFrameHandle(channel, frame);
            return;
        }

        if (receLatestTime >= receLatestLimitTime) {
            //超过间隔重置时间
            receCount = 0;
            receLatestLimitTime = RunUtils.milliSecondFromNano() + interval; // 更新下次重置时间
        }

        if (receCount < receRate) {
            receCount++;
            frameIoHandler.reveFrameHandle(channel, frame);
        } else {
            receLatestTime = RunUtils.milliSecondFromNano(); // 到达限制了 记录最后时间
            if (receLatestTime < receLatestLimitTime) {
                try {
                    // 如果太快,则延时再试
                    executorService.schedule(() -> {
                        reveFrame(frameIoHandler, channel, frame);
                    }, sendLatestLimitTime - sendLatestTime, TimeUnit.MICROSECONDS);
                } catch (Throwable e) {
                }
                return;
            }

            reveFrame(frameIoHandler, channel, frame);
        }
    }
}

使用定制的流量限制器

let client = FolkMQ.createClient("folkmq://127.0.0.1:18602")
    .config(c -> c.trafficLimiter(new TrafficLimiterCached(3_000)))
    .connect();
    
while (true) {
    client.publishAsync("/jlwu/receive/gateway", new MqMessage("test"));
}