把netty引入项目后,检索了各种坑或者常见可能错误用法以避免犯错,其中不少文章提到:netty在响应请求端缓慢时,有可能OOM,例如文章http://www.wtoutiao.com/a/2331320.html。
鉴于之前没有深入研究netty的代码,所以不敢冒然使用各种文章提到的解决方案:(1)没有实际测试,所以不是100%肯定会出现;(2)各种文章提及的方案各有利弊,同时涉及到一些参数的设置,不言而喻,一旦有设置的需要,就有设置合理或者不合理的情况。总之,用的好锦上添花,用的不好适得其反。
首先翻阅源码查看到底会不会产生:
一 问题产生的原因:
首先netty的写入过程:可以划分为两个基本步骤:(1)write(2)flush.
(1) write的过程是将“数据请求”添加到ChannelOutboundBuffer,这个buffer是和每个socket具体绑定的。“数据请求”采用链的方式一一相接,在添加时候并无容量控制。
public void addMessage(Object msg, int size, ChannelPromise promise) { Entry entry = Entry.newInstance(msg, size, total(msg), promise); if (tailEntry == null) { flushedEntry = null; tailEntry = entry; } else { Entry tail = tailEntry; tail.next = entry; tailEntry = entry; } if (unflushedEntry == null) { unflushedEntry = entry; }
(2) flush:flush是将ChannelOutboundBuffer中的一批数据请求拿出来消费,即拷入socket的sendbuffer。能写入多少数据,则移除多少“数据请求”, 如果没有写完,并不会移除ChannelOutboundBuffer种的数据。
io.netty.channel.socket.nio.NioSocketChannel.doWrite(ChannelOutboundBuffer)
io.netty.channel.ChannelOutboundBuffer.removeBytes(long) public void removeBytes(long writtenBytes) { for (;;) { Object msg = current(); if (!(msg instanceof ByteBuf)) { assert writtenBytes == 0; break; } final ByteBuf buf = (ByteBuf) msg; final int readerIndex = buf.readerIndex(); final int readableBytes = buf.writerIndex() - readerIndex; if (readableBytes <= writtenBytes) { if (writtenBytes != 0) { progress(readableBytes); writtenBytes -= readableBytes; } remove(); //remove the flush one. } else { // readableBytes > writtenBytes if (writtenBytes != 0) { buf.readerIndex(readerIndex + (int) writtenBytes); progress(writtenBytes); } break; } } clearNioBuffers(); }
当对端处理repsonse缓慢时,对端的received buffer会不断减小,此时TCP滑动窗口可发送size会慢慢变小,最终当size变为0时,send buffer会不断增大至full。此时如果继续flush,会出现2种情况:
(1) 如果采用同步方式,则线程阻塞挂起;
(2) 如果采用异步方式,则线程继续前行,写的函数返回0表示没有写入任何数据。
不言而喻,Netty使用的是异步方式。所以IO线程不会阻塞,而线程不阻塞则让write可以继续执行,最终数据积累越来越多以致OOM。
二 问题的重现
编写只写不读客户端,持续发送数据。观察heap变化
Socket socket = new Socket("10.224.2.116", 7676); while(true){ socket.getOutputStream().write("FORMAT DATA".getBytes()); }
三 问题的解决
解决方式的基础:
netty提供了写的高低水位线,实现很简单,当写的请求的size超过高水位线时,设置unwritable,当水位线降低至低水位线时,设置writable.这里有2个地方要注意:
(1)请求的size不是请求的数目,而是请求的内容的size:这点必须如此,因为使用请求的数目,则忽略了请求的内容大小,在请求数目小时,占用的内容随着请求内容的变化而变化。
(2)当水位线降低至高水位线时,不是可写状态,因为很明显,如果仅仅用一个水位线来控制是否可读,则可能导致可写状态不断变化,并没有真正起到控制的作用。
private void incrementPendingOutboundBytes(long size, boolean invokeLater) { if (size == 0) { return; } long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size); if (newWriteBufferSize >= channel.config().getWriteBufferHighWaterMark()) { setUnwritable(invokeLater); } } private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) { if (size == 0) { return; } long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size); if (notifyWritability && (newWriteBufferSize == 0 || newWriteBufferSize <= channel.config().getWriteBufferLowWaterMark())) { setWritable(invokeLater); } }
serverBootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 4*1024*1024); serverBootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 2*1024*1024);
解决方案的选择:
利用读写水位线。可以做两件事情:
(1) 第一是监控水位线的变化
@Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception{ LOGGER.info(format(ctx, "channelWritabilityChanged() to:"+ctx.channel().isWritable())); super.channelWritabilityChanged(ctx); }
(2) 第二是控制读写。
控制读写是避免OOM的根本措施,可以采用如下几种方式:
a 直接丢弃,这种方式对于延时要求比较高的服务器比较合适,因为当积累到一定程度,响应时间必然延长以致对方timeout或者已经失去意义;
if(!item.getChannelHandlerContext().channel().isWritable()){
LOGGER.warn(“[tcp][handle][omit]response:”+item.getResponse().toString());
return;
}
b 关闭自动读,这种方式将读关闭,以让请求发送者减缓速度最终至停发。如果双方有自己的keepalive时,必然让双方连接断。这种方式本质是将难题交给了请求发送方;
channel().config().setAutoRead(false)
c 重试直至成功,这种方式等于把线程占用,如果每个连接都出现这种情况,则最终无线程可用。如果是工作线程,则后续发生取决于server自己具体的实现(基本划分2类,丢弃,让主线程即IO线程执行行最终占用IO线程);如果是IO线程,则将影响其他正常连接。
参考http://normanmaurer.me/presentations/2014-twitter-meetup-netty/slides.html#09.0
while(needsToWrite && channel.isWritable()) { channel.writeAndFlush(createMessage()); }
问题的影响
这种情况一般不多见,因为存在三种情况:
(1) 客户端接受的处理速度不会远低于发送请求的速度;
(2) 双方一般都有keepalive, 几秒没有数据写到客户端,双方就会断掉连接;
(3) 服务器不面对百万连接时,对于少数连接很难积累起来以致OOM。
所以考察服务器处理的并发连接,如果仅面临几个连接,可以将读写水位线调大点,但是如果面对N多连接,则相应调整小点。同时也要考虑读写水位线非常大也无必要,因为即使网络恢复或者对端恢复速度,最终处理到的响应是否早已失去意义。