Production Zookeeper Issue 1: Len error cause outage

问题现象

产线服务器Outage若干分钟,且重启后无法启动,检查错误日志显示如下:

2023-06-15 03:35:54.556[][ERROR][nioEventLoopGroup-16-1][ClientCnxnSocketNetty:522]Unexpected throwable
java.io.IOException: Packet len 1048603 is out of range!

通过日志可知这是由Zookeeper客户端抛出的“接受包大小”超过阈值导致,  后续分析可知当遇到这种错误时,连接会断开错误。而当Zookeeper连接被断开后,服务无法正常工作,重启又无法启动。

问题原因

根据错误提示地方,我们可以找到出错的代码如下,

/**
 * This buffer is only used to read the length of the incoming message.
 */
protected final ByteBuffer lenBuffer = ByteBuffer.allocateDirect(4);

/**
 * After the length is read, a new incomingBuffer is allocated in
 * readLength() to receive the full message.
 */
protected ByteBuffer incomingBuffer = lenBuffer;


void readLength() throws IOException {
  int len = incomingBuffer.getInt();
  if (len < 0 || len > packetLen) {
   throw new IOException("Packet len " + len + " is out of range!");
  }
  incomingBuffer = ByteBuffer.allocate(len);
}

分析代码可知,ZK的消息格式是使用基于自定义TCP定义自己的PDU,具体而言,这种PDU采用了基于固定4字节字段存取报文长度分帧的方式。所以在接受消息时,在继续读取具体内容之前,如果发现长度已经不符合预期,就可以直接做一些特殊的处理,例如抛出异常(此处是IOException),抛出异常后,Zookeeper Client针对这种异常采取了断开连接的方式来保护自身。

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    LOG.error("Unexpected throwable", cause);
    cleanup();
}

private void cleanup() {
    if (!channelClosed.compareAndSet(false, true)) {
        return;
    }
    disconnected.set(true);
    onClosing();
}

那么,什么情况下会出现读取的响应包的Size超过最大允许的值,最大的允许值又是多少?很明显,只要有响应的请求都可能会触发这种情况,另外这个最大允许的值默认是1M,可以通过配置参数来调整。

public static final int CLIENT_MAX_PACKET_LENGTH_DEFAULT = 0xfffff; /* 1 MB */
private int packetLen = ZKClientConfig.CLIENT_MAX_PACKET_LENGTH_DEFAULT;

//org.apache.zookeeper.ClientCnxnSocket#initProperties
//初始化时,会根据配置("jute.maxbuffer")来修改生效:
packetLen = clientConfig.getInt(
        ZKConfig.JUTE_MAXBUFFER,
        ZKClientConfig.CLIENT_MAX_PACKET_LENGTH_DEFAULT);

当一个系统使用Zookeeper的操作很多时,且又是产线时,找出这样的请求并不方便,实际上,网上搜索这种错误,大多也都会提醒是某个node大了,然后访问这个node引发的错误,实际上这个是一个巨大的误导,因为在大多情况下,客户端保护了最大size数据包的控制,Zookeeper的服务端肯定也有这样的保护,那如果一个大的node都写不进去,何来去读这样的Node的情况发生。这里我们不妨验证下服务器端对Size的保护:

org.apache.zookeeper.server.NettyServerCnxn#receiveMessage
if (len < 0 || len > BinaryInputArchive.maxBuffer) {
    throw new IOException("Len error " + len);
}

public static final int maxBuffer = Integer.getInteger("jute.maxbuffer", 0xfffff);

可以看出,代码类似,基本逻辑也是类似,最终也是会断掉连接,测试可以得出日志再次验证:


Closing connection to /38.99.100.2:36532
java.io.IOException: Len error 1441850

综合以上分析,可以看出Node本身大小绝对不应该重点排除的方向,除非服务器端设置的jute.maxbuffer和客户端设置的不同,让服务端接受了大数据的写,然后读又读不出来,这种情况确实存在,毕竟属于人为配置,但是作为本应用,排查所有配置可以轻松排除这种情况。

那么还有什么情况,可以引发一个大响应,其实很容易想到的是一个情况:一个节点,下面含有太多子节点,那么这个时候去getChildren获取所有子节点时,返回的所有子节点的名称列表可能会超过限制的1M. 这里可以直接模仿产线数据进行测试,例如创建一个节点,然后创建很多名称为随机UUID的子节点,子节点内容不限制。

考虑UUID为36字节,结合Json格式为“”包围字符串,我们可以按38字节进行估算,看起来虽然很小的字节数,但是只要计算下就会发现如果当Node超过2万多时,此时光返回Node名称的列表就已经超过了1M限制。

复盘当前出现问题的实际,当天子节点因业务拓展新的环境,一次性创建了上前Node,刚好超过了所能容纳的Node数量。所以引发问题。而重启后,仍然不能解决问题。实际上,Zookeeper并没有限制你去新建2万或者3万甚至更多节点,而是你可以去建,也可以去getData但是你不要去做getChildren,因为这样的返回的子节点名称列表的Size就会超过数据包的最大读取Size。

有了问题的思路,也模拟出怀疑的对象,那么最后一步是找出代码中,会执行getChildren的地方,实际上,最后发现,代码中并没有任何代码调用过这个方法,但是都是通过TreeCache来访问。这里不详细介绍TreeCache的功能,可以直接使用下面的代码使用它来复现问题:

String ZOOKEEPER_CONNECTION_STRING = "10.30.0.13:2181"; // ZooKeeper服务器地址

CuratorFramework client = CuratorFrameworkFactory.newClient(
        ZOOKEEPER_CONNECTION_STRING,
        new ExponentialBackoffRetry(1000, 3));

client.start();

TreeCache treeCache = new TreeCache(client, "/test/pathWithHugeChild");
treeCache.start();

TreeCache看起来使用很简单方便,但是实际上,它消费了很大资源来维持本地缓存和Zookeeper数据的同步,而且它在启动时,会访问缓存的path,然后遍历所有的节点,并获取所有的数据。关键的错误就发生在遍历所有的子节点:

//org.apache.curator.framework.recipes.cache.TreeCache.TreeNode#doRefreshChildren
private void doRefreshChildren() throws Exception
{
    if ( treeState.get() == TreeState.STARTED )
    {
        maybeWatch(client.getChildren()).forPath(path);
    }
}

问题解决方案

分析完问题的原因,可以讨论下如何解决这个问题,首先问题出现的根本还是在于对Zookeeper的误用:完全当数据库来使用存取了大量数据到一个节点下,且使用了TreeCache,而TreeCache存在一次请求节点的所有子节点名称(实际)的操作。这个时候就会引发问题。所以不考虑使用其他的存储方案,如果坚持使用Zookeeper, 这里可以提供两种方式来修复问题:

(1)修改“jute.maxbuffer”到更大的值,这种方案只是一个临时方案,数据继续增长后还是会遇到问题,只是时间问题,不过临时使用的话,也能凑效,注意服务端和客户端最好同时修改,以避免以后挖坑给后人。

(2)避免一个节点下存在太多子节点的设计,例如可以采用分库分表的方式进行,例如根据不同特性建立二级子节点,例如从path/node1…..nodeN改成path/subPath/node1…..nodeN。然后对于针对二级子节点,分别采用TreeCache。如果没有分布均匀的业务特性,可以直接建立1….N个folder,然后业务关键字取余的方式来分着放:

//二级Folder名称:每个Folder创建一个TreeCache.
private String getTreeCachePathByKey(String key){
    final int index = Math.abs(key.hashCode()) % shardNumber;
    return String.format("%s/%d", STORE, index);
}

这种方式会额外带来一个好处,例如新建子节点时,原来会触发父节点的path的getChildren,获取到了所有的子节点的名称列表,而修改后,只会触发二级父节点path/subPath的getChildren,返回的子节点名称列表要小很多(取决于建立多少个subPath)。不过要注意的是,每常见一个TreeCache会创建一个线程来完成本地“数据”的更新同步。所以当TreeCache被拆分成很多时,一定要去使用共享的线程,例如采用下面的代码:

private static final ExecutorService EXECUTE_SERVICE = Executors.newFixedThreadPool(5, ThreadUtils.newThreadFactory("tree-cache-for-dedicated") );
//创建要给共享的,线程数可以根据情况做调整,然后所有的拆开后的TreeCache共享。如果不指定ExecutorService来共享,每个新TreeCache会默认使用一个新线程。

TreeCache treeCache = executorService == null? TreeCache.newBuilder(client, storePath).build():
TreeCache.newBuilder(client, storePath).setExecutor(executorService).build();

以上即为这个问题的3W分析,追根溯源,还是业务初期,数据量小时不管对采用的开源组件是否了解,无论如何用,都不会出问题,但是如果数据量大时,就会暴露设计的可扩展性缺陷。

 

题外:

查看启动时间:

[root@ip-10-0-1-85 zookeeper]# echo mntr |nc localhost 2181 |grep time

zk_leader_uptime 6251602595
zk_uptime 6251602802

查看选举时间:

echo mntr |nc localhost 2181 |grep election

zk_avg_election_time 1818.0 
zk_min_election_time 20 
zk_max_election_time 4437 
zk_cnt_election_time 3
zk_sum_election_time 5454

kafka send async timeout

org.apache.kafka.clients.producer.KafkaProducer#doSend
try{
//第一段:
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
//第二段:
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
}catch (ApiException e) {
log.debug(“Exception occurred during message send:”, e);
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
}

其中,第二段:
org.apache.kafka.clients.producer.internals.RecordAccumulator#append{
buffer = free.allocate(size, maxTimeToBlock);
}

其中关键行buffer = free.allocate(size, maxTimeToBlock);会抛出异常:

if (waitingTimeElapsed) {
throw new TimeoutException(“Failed to allocate memory within the configured max blocking time ” + maxTimeToBlockMs + ” ms.”);
}

TimeoutException属于KafkaException中的一种: