流量控制与内存控制的定制
v1.6.0 后支持。当我们使用“暴力”异步发送时。可以基于 TrafficLimiter 接口,实现帧的流量控制与内存控制。
默认帧流收发各为 1w/s。一般不需要再定制了。
1、为什么要限流?
因为 java sdk 的底层是异步IO处理,当暴力发送时,IO又跟不上,数据就会累积在“事件循环器”(就是个队列)。造成内存越来越大。限流,主要是为控制客户端的内存。
比如 netty 的适配,有时候以为发送很快,其实数据都在“事件循环器”里(异步IO都有类似情况)。
2、要控制流量与控制内存
比如,我们只想开放 3000/s 的帧流量窗口,且不想涨太多内存。可以使用默认的 TrafficLimiterDefault。此方案会使用 sleep 避免客户端内存暴涨。
let client = FolkMQ.createClient("folkmq://127.0.0.1:18602")
.config(c -> c.trafficLimiter(new TrafficLimiterDefault(3_000)))
.connect();
while (true) {
client.publishAsync("/jlwu/receive/gateway", new MqMessage("test"));
}
3、只要控制流量,内存无所谓(仅供参考)
这个方案对 FolkMQ 服务端是友好的,可以减轻压力。我们需要定制一个流量制限器,把 sleep 替换掉。
- 如果一直暴力异步发送,有可能会把自己发暴了。
- 如果偶尔暴力异步发送,可以起到很好的缓冲作用,后面流量下来内存可以自然降下来。
public class TrafficLimiterCached implements TrafficLimiter {
private int sendRate;
private int receRate;
private final long interval = 1000L;
private final ScheduledExecutorService executorService;
private int sendCount;
private int receCount;
private long sendLatestLimitTime = Long.MIN_VALUE; // 发送数据限流重置时间 //必须设为最小值
private long receLatestLimitTime = Long.MIN_VALUE; // 接收数据限流重置时间
private long receLatestTime = Long.MIN_VALUE; // 最后接收时间
private long sendLatestTime = Long.MIN_VALUE; // 最后发送时间
public TrafficLimiterCached(int sendAndReceRate) {
this(sendAndReceRate, sendAndReceRate);
}
public TrafficLimiterCached(int sendRate, int receRate) {
this.sendRate = sendRate;
this.receRate = receRate;
this.executorService = Executors.newSingleThreadScheduledExecutor();
}
/**
* 发送帧(在写锁范围,才有效)
*
* @param frameIoHandler 帧输入输出处理
* @param channel 通道
* @param frame 帧
* @param channelAssistant 通道助理
* @param target 发送目标
*/
@Override
public <S> void sendFrame(FrameIoHandler frameIoHandler, ChannelInternal channel, Frame frame, ChannelAssistant<S> channelAssistant, S target, IoCompletionHandler completionHandler) {
if (sendRate < 1) {
//没有限制
frameIoHandler.sendFrameHandle(channel, frame, channelAssistant, target, completionHandler);
return;
}
if (sendLatestTime >= sendLatestLimitTime) {
//超过间隔重置时间
sendCount = 0;
sendLatestLimitTime = RunUtils.milliSecondFromNano() + interval; // 更新下次重置时间
}
if (sendCount < sendRate) {
sendCount++;
frameIoHandler.sendFrameHandle(channel, frame, channelAssistant, target, completionHandler);
} else {
sendLatestTime = RunUtils.milliSecondFromNano(); // 到达限制了 记录最后时间
if (sendLatestTime < sendLatestLimitTime) {
try {
// 如果太快,则延时再试
executorService.schedule(() -> {
sendFrame(frameIoHandler, channel, frame, channelAssistant, target, completionHandler);
}, sendLatestLimitTime - sendLatestTime, TimeUnit.MICROSECONDS);
} catch (Throwable e) {
}
return;
}
sendFrame(frameIoHandler, channel, frame, channelAssistant, target, completionHandler);
}
}
/**
* 接收帧(在读线程里,才有效)
*
* @param frameIoHandler 帧输入输出处理
* @param channel 通道
* @param frame 帧
*/
@Override
public void reveFrame(FrameIoHandler frameIoHandler, ChannelInternal channel, Frame frame) {
if (receRate < 1) {
//没有限制
frameIoHandler.reveFrameHandle(channel, frame);
return;
}
if (receLatestTime >= receLatestLimitTime) {
//超过间隔重置时间
receCount = 0;
receLatestLimitTime = RunUtils.milliSecondFromNano() + interval; // 更新下次重置时间
}
if (receCount < receRate) {
receCount++;
frameIoHandler.reveFrameHandle(channel, frame);
} else {
receLatestTime = RunUtils.milliSecondFromNano(); // 到达限制了 记录最后时间
if (receLatestTime < receLatestLimitTime) {
try {
// 如果太快,则延时再试
executorService.schedule(() -> {
reveFrame(frameIoHandler, channel, frame);
}, sendLatestLimitTime - sendLatestTime, TimeUnit.MICROSECONDS);
} catch (Throwable e) {
}
return;
}
reveFrame(frameIoHandler, channel, frame);
}
}
}
使用定制的流量限制器
let client = FolkMQ.createClient("folkmq://127.0.0.1:18602")
.config(c -> c.trafficLimiter(new TrafficLimiterCached(3_000)))
.connect();
while (true) {
client.publishAsync("/jlwu/receive/gateway", new MqMessage("test"));
}