当使用DCAwareRoundRobinPolicy策略时,里面暗含了一个dc level failover的有趣功能,当然默认是关闭的,默认关闭应该是出于两种考虑:
(1)如果支持远程dc level failover,那么当整个dc都down时,所有的请求都会到remote dc,那么对于请求的处理肯定很慢,是否能满足应用需求?
(2)当使用local_quram等local一致性级别时,如果支持远程dc level failover,那么临时会打破了一致性级别的约束。
假设我们不在乎这2种情况,想最大化可用性,那么我们就开始要支持dc level failover的support.
(1)如何开启?
如果要开启,则不能使用DCAwareRoundRobinPolicy默认的构造器,而是使用如下构造器:
public DCAwareRoundRobinPolicy(String localDc, int usedHostsPerRemoteDc, boolean allowRemoteDCsForLocalConsistencyLevel)
a allowRemoteDCsForLocalConsistencyLevel 允许打破一致性约束。当然对于非local consistency来说则无所谓了。
b usedHostsPerRemoteDc 一定要注意到这个是每个远程DC的可使用的used hosts.
实现上,在初始化连接时,就会建立一批数据连接,如果这些参数有配置,例如usedHostsPerRemoteDc =1,allowRemoteDCsForLocalConsistencyLevel =true, 则每个DC都会建立1个连接:
通过下面的代码可以看出,将启用这些参数时,所有node划分成了3类,local, remote,ignored,而不启用时,只有local和ignore 2类。
@Override public HostDistance distance(Host host) { String dc = dc(host); if (dc == UNSET || dc.equals(localDc)) return HostDistance.LOCAL; CopyOnWriteArrayList<Host> dcHosts = perDcLiveHosts.get(dc); if (dcHosts == null || usedHostsPerRemoteDc == 0) return HostDistance.IGNORED; // We need to clone, otherwise our subList call is not thread safe dcHosts = cloneList(dcHosts); return dcHosts.subList(0, Math.min(dcHosts.size(), usedHostsPerRemoteDc)).contains(host) ? HostDistance.REMOTE : HostDistance.IGNORED; }
然后在初始化cluster连接时,除了ignore都会建立连接。所以使用这些参数就让远程DC也存在了连接,且每个DC都有usedHostsPerRemoteDc个;
// Returns whether there was problem creating the pool ListenableFuture<Boolean> forceRenewPool(final Host host, ListeningExecutorService executor) { final HostDistance distance = cluster.manager.loadBalancingPolicy().distance(host); if (distance == HostDistance.IGNORED) //SO will connect to local and remote nodes return Futures.immediateFuture(true);
(2)driver如何支持的?
实现上很简单,一旦设置了上面2个参数,query plan里面即会把远程DC的一些结点纳入到考虑范围,如果本地DC的结点全部失败,则开始使用远程DC的结点。
com.datastax.driver.core.policies.DCAwareRoundRobinPolicy.newQueryPlan(String, Statement) if (remainingLocal > 0) { remainingLocal--; int c = idx++ % hosts.size(); if (c < 0) { c += hosts.size(); } return hosts.get(c); } if (localSuspected == null) { List<Host> l = perDcSuspectedHosts.get(localDc); localSuspected = l == null ? Collections.<Host>emptySet().iterator() : l.iterator(); } while (localSuspected.hasNext()) { Host h = localSuspected.next(); waitOnReconnection(h); if (h.isUp()) return h; } ConsistencyLevel cl = statement.getConsistencyLevel() == null ? configuration.getQueryOptions().getConsistencyLevel() : statement.getConsistencyLevel(); if (dontHopForLocalCL && cl.isDCLocal()) //不支持时,到这就结束了,不会尝试远程DC return endOfData(); if (remoteDcs == null) { Set<String> copy = new HashSet<String>(perDcLiveHosts.keySet()); copy.remove(localDc); remoteDcs = copy.iterator(); } while (true) { if (currentDcHosts != null && currentDcRemaining > 0) { currentDcRemaining--; int c = idx++ % currentDcHosts.size(); if (c < 0) { c += currentDcHosts.size(); } return currentDcHosts.get(c); } if (currentDcSuspected != null) { while (currentDcSuspected.hasNext()) { Host h = currentDcSuspected.next(); waitOnReconnection(h); if (h.isUp()) return h; } } if (!remoteDcs.hasNext()) break; String nextRemoteDc = remoteDcs.next(); CopyOnWriteArrayList<Host> nextDcHosts = perDcLiveHosts.get(nextRemoteDc); if (nextDcHosts != null) { // Clone for thread safety List<Host> dcHosts = cloneList(nextDcHosts); currentDcHosts = dcHosts.subList(0, Math.min(dcHosts.size(), usedHostsPerRemoteDc)); currentDcRemaining = currentDcHosts.size(); } List<Host> suspectedList = perDcSuspectedHosts.get(nextRemoteDc); currentDcSuspected = suspectedList == null ? null : suspectedList.iterator(); } return endOfData();
(3)带来了什么变化?
当开启后,确实能支持整个dc level的failover,而且很强大:DC1挂了数据会发往DC2,DC2挂了数据请求会发往DC3,当然也带来一些额外变化(以下讨论基于usedHostsPerRemoteDc=1):
a 预先建立的数据连接变多了:
cassandra对于数据连接的维护不是on demand或lazy加载的方式来管理,而是直接上来就建立所有以后可能连接的数据连接(但是这个数据连接的结点只是协调者,并不一定是数据存储的结点,假设一个keyspace只存DC2和DC3,但是DC1也会建立连接。)。所以当开启这个功能后,明显TCP常连接多了。具体多的数目是(DC Number-1)*usedHostsPerRemoteDc,与keyspace的存储策略无关。
例如不开启时,存在的连接数是 3(本地DC节点数)+1, 3条数据连接+1条通信连接。
而开启后,则存在的连接数是3+1+(额外的每个DC每个usedHostsPerRemoteDc)
对于如此多的数据连接的维护,不免产生如下疑问:
(1)假设远程DC的连接结点down了。会切换到其他Node么?
会的。当down时(或者一开始就连接不上),会触发onDown,然后会删除这个连接,然后更新pool的连接,此时会补上另外一个远程DC的连接。
com.datastax.driver.core.SessionManager.onDown(Host) void onDown(Host host) throws InterruptedException, ExecutionException { removePool(host).force().get(); //remote it updateCreatedPools(MoreExecutors.<em>sameThreadExecutor</em>()); //update and add new connection for remote dc. }
关键log如下:
10:19:50,143 DEBUG Connection:274 - Defuncting connection to /10.194.250.245:9042 com.datastax.driver.core.TransportException: [/10.194.250.245:9042] Channel has been closed //当连接down时,触发channel chosed方法。 at com.datastax.driver.core.Connection$Dispatcher.channelClosed(Connection.java:750) 10:19:50,143 DEBUG Cluster:1526 - Host /10.194.250.245:9042 is Suspected 10:19:50,159 DEBUG Connection:449 - Connection[/10.194.250.245:9042-1, inFlight=0, closed=true] closing connection 10:19:51,720 DEBUG Connection:103 - Connection[/10.194.250.245:9042-2, inFlight=0, closed=false] Error connecting to /10.194.250.245:9042 (Connection refused: no further information: /10.194.250.245:9042) 09:25:52,166 DEBUG Cluster:1592 - Host /10.194.250.245:9042 is DOWN 09:25:53,249 DEBUG SingleConnectionPool:84 - Created connection pool to host /10.194.250.156:9042 //切换到新node. 09:25:55,965 DEBUG Cluster:1691 - Failed reconnection to /10.194.250.245:9042 ([/10.194.250.245:9042] Cannot connect), scheduling retry in 2000 milliseconds //重试down结点。
效果:从10.194.250.245:9042切换到了10.194.250.156:9042
# netstat -nap|grep 9042 tcp 0 0 10.224.89.38:17748 10.224.57.166:9042 ESTABLISHED 3104/java tcp 0 0 10.224.89.38:60873 10.224.57.168:9042 ESTABLISHED 3104/java tcp 0 0 10.224.89.38:35391 10.89.108.187:9042 ESTABLISHED 3104/java tcp 0 0 10.224.89.38:27454 10.194.250.245:9042 ESTABLISHED 3104/java //the down node in dc x tcp 0 0 10.224.89.38:39537 10.224.57.207:9042 ESTABLISHED 3104/java tcp 0 0 10.224.89.38:60875 10.224.57.168:9042 ESTABLISHED 3104/java tcp 0 0 10.224.89.38:19961 10.224.57.167:9042 ESTABLISHED 3104/java [root@hf3dsa002 ~]# netstat -nap|grep 9042 tcp 0 0 10.224.89.38:49385 10.194.250.156:9042 ESTABLISHED 3104/java // other live node in dc x tcp 0 0 10.224.89.38:17748 10.224.57.166:9042 ESTABLISHED 3104/java tcp 0 0 10.224.89.38:60873 10.224.57.168:9042 ESTABLISHED 3104/java tcp 0 0 10.224.89.38:35391 10.89.108.187:9042 ESTABLISHED 3104/java tcp 0 0 10.224.89.38:39537 10.224.57.207:9042 ESTABLISHED 3104/java tcp 0 0 10.224.89.38:60875 10.224.57.168:9042 ESTABLISHED 3104/java tcp 0 0 10.224.89.38:19961 10.224.57.167:9042 ESTABLISHED 3104/java
(2)假设远程DC的连接结点down了。这个结点会重试么?
会的,而且直到成功,log可参考以上。
09:25:53,249 DEBUG SingleConnectionPool:84 - Created connection pool to host /10.194.250.156:9042 //切换到new node 09:25:55,965 DEBUG Cluster:1691 - Failed reconnection to /10.194.250.245:9042 ([/10.194.250.245:9042] Cannot connect), scheduling retry in 2000 milliseconds //重试down node
(3)假设远程DC的连接结点down了后也连上了新的node,这个down结点的重试还会继续么?
会,所以引入一个新问题,假设不是日常维护(先关闭后启动),而是真正移除(remove)一个结点,是否有重试,如果存在,是否永远存在,从代码看如果是remove,则不会触发重试,比较down和remove两种处理,down只多了一个启动重试)。但是如果是真的关闭一个node再不启动它,则会永远重试。
com.datastax.driver.core.Cluster.Manager.onDown(Host, boolean, boolean) logger.debug("{} is down, scheduling connection retries", host); startPeriodicReconnectionAttempt(host, isHostAddition);
(4)假设远程DC的连接结点down了后又恢复了,会重新连接这个节点么?
不会,也没有必要。从代码来看,调用onup()方法后,并没有创建新的连接。
loadBalancingPolicy().onUp(host); //将结点增加到DC所对应的结点的list末尾。 controlConnection.onUp(host); logger.trace("Adding/renewing host pools for newly UP host {}", host); List<ListenableFuture<Boolean>> futures = new ArrayList<ListenableFuture<Boolean>>(sessions.size()); for (SessionManager s : sessions) futures.add(s.forceRenewPool(host, poolCreationExecutor)); //貌似有重新连接的过程 //但是实际上:算出node的距离已经不再是remote,而是ignore,因为dcHosts的末尾才是刚up的机器,而usedHostsPerRemoteDc只有1个。所以ignore了。 @Override public HostDistance distance(Host host) { String dc = dc(host); if (dc == UNSET || dc.equals(localDc)) return HostDistance.LOCAL; CopyOnWriteArrayList<Host> dcHosts = perDcLiveHosts.get(dc); if (dcHosts == null || usedHostsPerRemoteDc == 0) return HostDistance.IGNORED; // We need to clone, otherwise our subList call is not thread safe dcHosts = cloneList(dcHosts); return dcHosts.subList(0, Math.min(dcHosts.size(), usedHostsPerRemoteDc)).contains(host) ? HostDistance.REMOTE : HostDistance.IGNORED; }
然而问题的关键是:确实后来没有再创建连接,但是开始重试的时候不就已经创建一个连接了?正常的情况下,某个结点挂了,然后重试到最后成功了,那即使以后不用这个连接,这个连接本身应该在,因为其来自重试,这个连接去哪了?实际代码中可以看出这个连接在重试成功后就立马关闭了,如果不关闭,就会打破usedHostsPerRemoteDC的要求:
protected Connection tryReconnect() throws ConnectionException, InterruptedException, UnsupportedProtocolVersionException, ClusterNameMismatchException { return connectionFactory.open(host); } protected void onReconnection(Connection connection) { // We don't use that first connection so close it. // TODO: this is a bit wasteful, we should consider passing it to onAdd/onUp so // we use it for the first HostConnectionPool created connection.closeAsync();
总结:
从上面可以看出:
(1)支持强大的功能后,带来的影响是会事先会建立更多的TCP常连接,多的具体数目是(DC Number-1)*usedHostsPerRemoteDc。所以这个和所在cassandra cluster的规模息息相关。
(2)重试直至成功的这种方式不能适应“不通过标准remove来实现的down且不再up“这种情况,遇到这种情况,除非重启app,否则最终会以10分钟的频率重试下去。
(3)开启dc level failover后,是否能满足性能需求,如果不能,和一个挂掉的系统又有何区别。
所以最好设置一个flag来关闭这个功能,以避免以上3个问题的不可控。