Datastax Cassandra Driver Analyst (9)-understand contact points

本节主要解决以下几个问题:

(1)contact point是如何工作的?

(2)如何配置contact point?

例如假设在DCAwareRoundRobinPolicy开启“可使用远程DC”,是否需要配置远程DC的结点作为contact point.

1 contact point 如何工作?

contact的作用是获取cassandra的cluster的结点、表等所有信息,相当于“通信兵”的角色。通过代码阅读可知,cluster在初始化中使用contact point做了以下几件事情:

a   acquire contact points. Then try one-by-one until success.(before success one, the failed one will retry forever, after success, it won’t retry others)

b   use contact point to Registering for events:


 List<ProtocolEvent.Type> evs = Arrays.asList(

 ProtocolEvent.Type. TOPOLOGY_CHANGE,

 ProtocolEvent.Type. STATUS_CHANGE,

 ProtocolEvent.Type. SCHEMA_CHANGE

 );

 

c  use contact point to Refreshing schema and Refreshing node list and token map

d  use contact point to get all hosts.

因此其作用相当于开启cassandra大门的”钥匙“,考虑以下几种情况:

(1)仅配置1个结点,则启动时,假设这个结点不能正常工作,则启动不了:


 com.datastax.driver.core.Cluster.Manager.init()

 catch (NoHostAvailableException e) { //no control point connected
 close();
 throw e;
 }

 Exception in thread "main" com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /10.224.57.163:9042 (com.datastax.driver.core.TransportException: [/10.224.57.163:9042] Cannot connect))

 at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:223)

 at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:78)

 at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1230)

 at com.datastax.driver.core.Cluster.init(Cluster.java:157)

 at com.datastax.driver.core.Cluster.connect(Cluster.java:245)

 at com.datastax.driver.core.Cluster.connect(Cluster.java:278)

 

(2)假设配置2+以上结点,当第一个可以工作时,会继续连接后面配置的么?

答案是不会,代码如下:


 com.datastax.driver.core.ControlConnection.reconnectInternal(Iterator<Host>, boolean)

  while (iter.hasNext()) {

 host = iter.next();

  try {

  return tryConnect(host, isInitialConnection);   //try one be one

 }  catch (ConnectionException e) {

 errors = logError(host, e, errors, iter);

  if (isInitialConnection) {

 // Mark the host down right away so that we don't try it again during the initialization process.

 // We don't call cluster.triggerOnDown because it does a bunch of other things we don't want to do here (notify LBP, etc.)

 host.setDown();

 cluster.startPeriodicReconnectionAttempt(host,  true); //重试

 }

 }

 

(3)假设配置了2+结点,其中第二个可以工作,第一个不能正常工作,第一个会重试么?

答案是会的。参考(2)代码,当连不上时,会重试,重试策略是根据配置的,默认如下:


  private static final ReconnectionPolicy  DEFAULT_RECONNECTION_POLICY  =  new ExponentialReconnectionPolicy(1000, 10 * 60 * 1000);  //从1S开始重试,然后2,4,8等等,最终重试到10分钟,然后保持10分钟重试一次的节奏。

 

重试后,假设第一个恢复了,会启用它么?不会:


protected void onReconnection(Connection connection) {
// We don't use that first connection so close it.
// TODO: this is a bit wasteful, we should consider passing it to onAdd/onUp so
// we use it for the first HostConnectionPool created
connection.closeAsync();
// Make sure we have up-to-date infos on that host before adding it (so we typically
// catch that an upgraded node uses a new cassandra version).
if (controlConnection.refreshNodeInfo(host)) {
logger.debug("Successful reconnection to {}, setting host UP", host);
try {
if (isHostAddition)
onAdd(host);
else
onUp(host);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
logger.error("Unexpected error while setting node up", e);
}
} else {
logger.debug("Not enough info for {}, ignoring host", host);
}
}

所以通过以上三个问题的分析,可知通信结点,不管配置多少,最终只会有一个通信连接Control connection。那么引出问题4,:

(4)假设通信连接挂了,会如何?

按照常理,如果通信结点挂了,那么会使用配置的其他的通信结点,实际上从代码中可以看出,完全不是如此,而是使用了query plan中的结点。query plan是什么?是和loadbalance绑定的一批结点。例如假设使用DCAwareRoundRobinPolicy,且指定DC1是本地DC,那么query plan中的结点是DC1的所有节点,而如果这个DCAwareRoundRobinPolicy开启了远程DC允许,且允许数目是1,则query plan还包括其他所有DC中的:每个DC中连接1个。之所以称为query plan是其含有多个结点以便于重试。

所以当通信结点挂了时,通信结点的任务会转交给load balance策略中的结点,即负责处理请求的”协调者“。


com.datastax.driver.core.Connection.Dispatcher.channelClosed(ChannelHandlerContext, ChannelStateEvent) //trigger it when connection down.

 com.datastax.driver.core.ControlConnection.backgroundReconnect(long)

 protected Connection tryReconnect() throws ConnectionException {
 try {
 return reconnectInternal(queryPlan(), false);  //will use nodes in query plan
 } catch (NoHostAvailableException e) {
 throw new ConnectionException(null, e.getMessage());
 } catch (UnsupportedProtocolVersionException e) {
 // reconnectInternal only propagate those if we've not decided on the protocol version yet,
 // which should only happen on the initial connection and thus in connect() but never here.
 throw new AssertionError();
 }
 }

 

(5)通信连接会复用处理请求的连接么?

不会,例如下图:10.224.57.168存在2个连接,其中1个是通信连接,一个是数据连接。

12344

(6)配置的一批通信结点中,最前面的会先尝试么?

不一定,这个问题比较让人无语,但不管是从代码还是实际测试都是如此。

因为在具体连接时,使用的通信结点的信息是转化过后的concurrent hash map的values(),而不是一个有序的list.。反过来看,cassandra  driver真的没有对contact point进行优选的过程。

 

结论:

通过以上概要分析,可以得出以下几点关键信息:

(1)不管contact point配置多少个,最终只有一个control connection;

(2)DCAwareRoundRobinPolicy中开启远程DC读(allowRemoteDCsForLocalConsistencyLevel),不需要让contact point增加一个远程DC的结点配置:因为假设通信结点所在的DC全部down,则query plan会适应这种变化(将以后的所有的请求转往其他DC),而通信结点正好使用的也是query plan来重建连接,所以自然OK.

(需要说明的是:如果需要在本DC结点全部都挂掉的情况下,仍然可以启动起来,那么真的需要配置一个远程DC的结点作为通信结点,否则压根启动不了,但是这种情况下确定需要启动起来么?如果启动起来或许掩盖了后续问题?)

可知理论上contact point配置的越多越好,但是从另外一个角度说太多(真的太多)也无意义,因为即使配置的少点,只要有一个能工作,让cassandra启动起来,则以后的各种情景都不会存在问题,因为随着各种down/up的切换,最终cassandra可以完全不使用配置的任何contact point.

综上:从代码阅读和一些测试结果来看,contact point配不配远程DC node从现在的driver实现来看已成悖论:配置的话,有可能让其成为通信连接(因为不会因为配置在后面,就最后一个尝试),node change,schema change可能稍慢处理(好在这些数据对适时性要求不是特别高)。不配,在本地DC全部node down时,应用启动不起来。

所以结论是:如果希望本DC结点全部都不work时,还能启动起来,那么需要至少配置一个远程DC Node作为通信结点。否则全部配置本DC结点吧,越多越好。

 

PS:  一直觉得不优选contact point是一个可以优化的地方,翻阅了github上的代码历史,在以后的版本中,不仅没有优化这个,而且会故意完全打乱contact point,从driver设计者的角度来看是为了避免多个connect()启动时都会用某一个从而产生hotspot.

参考:https://github.com/datastax/java-driver/commit/e6b28bcca1882b198064594d82696fc247ffac1f

 

 

 

 

 

 

 

Datastax Cassandra Driver Analyst (8)-execute cql and prepared/batch/bound statement

通过Cassandra Driver来执行一段CQL有很多方式,每种方式的适用场景和性能不尽相同,所以一定要明确各种方式区别并合理选择才能最优化性能,通过代码阅读,可以归纳出以下几种方式:

(一) 按照是否等待返回结果: 同步方式和异步方式:

例如:


com.datastax.driver.core.AbstractSession.execute(String)

com.datastax.driver.core.AbstractSession.executeAsync(String)

区别在于:是否等待返回结果。


executeAsync(statement).getUninterruptibly();//getUninterruptibly is waiting response

对于不需要结果、异步处理的数据操作完全可以用异步方式来执行,很明显能否提高效率。

(二)按照是否prepare:prepared statement和非prepared的statement

2.1 如何使用

在使用上,必须先将要执行的CQL进行prepare,然后根据prepare的结果PreparedStatement创建BoundStatement然后执行,换言之:无法不进行prepare而直接使用BoundStatement(因为其仅有一个以PreparedStatement 为参数的构造器)


PreparedStatement statement=connect.prepare("insert into site(siteid,activeid) values (?,?)");

connect.execute(new BoundStatement(statement2).bind(12111l,12111l)) //style one
connect.execute(statement.bind(12111l,12111l))  //style two

2.2 Cassandra server对prepare和非prepare statement处理差异:

对于任何一个cql的执行,cassandra要分成两大基本步骤:prepare(代码如下所示)和执行,其中prepare会做解析语句、基本的语法检查等他准备工作,所以对于仅仅是参数不同的相同语法的CQL而言,prepare是重复多余的,所以有了prepare语句的概念,即对于经常执行的相同语法、不同参数的CQL可以预先prepare一次,以后不用在prepare。 通过下面的代码调用图示,比较普通Query(QueryMessage)和已Prepare的Query(即Bound statement: 消息类型为ExecuteMessage)可知区别在于是否含有示例代码: Java - cassandra-2.1.2srcorgapachecassandratransportmessagesExecuteMessage.java - Eclipse

public static ParsedStatement.Prepared getStatement(String queryStr, ClientState clientState)
throws RequestValidationException
{
Tracing.trace("Parsing {}", queryStr);
ParsedStatement statement = parseStatement(queryStr);

// Set keyspace for statement that require login
if (statement instanceof CFStatement)
((CFStatement)statement).prepareKeyspace(clientState);

Tracing.trace("Preparing statement");
return statement.prepare();
}

从实现上,大体是先将prepare statement请求以statementid为Key存储一份Prepared 的Map数据,以后接受到bound statement的语句时,直接根据statementId来获取。如果找不到存储的数据,抛出提示“事先没有Prepare”的。


ParsedStatement.Prepared prepared = handler.getPrepared(statementId);

if (prepared == null)

throw new PreparedQueryNotFoundException(statementId);

2.3  Cassandra client对prepare和非prepare statement处理差异

对于客户端,首先发出prepare请求时,会根据load balance策略发出一个请求给其中的一个节点,如果成功,则会将prepare请求发送给其他剩余的符合load balance策略的其他所有节点,这样等于说一次prepare会将所有以后可能处理请求的所有结点都prepare下。 同时定义的request type区分了不同的statement:


QUERY    (7, Requests.Query //common,can execute batch operations

PREPARE  (9, Requests.Prepare,  //prepare statement

EXECUTE  (10, Requests.Execute,  //Bound statement

BATCH   (13, Requests.Execute,  //Batch statement

通过以下三点的理解,在使用时,我们难免产生一些困惑:

 

困惑1: prepare的使用要求我们必须先prepare一次,那么假设当时某种因素导致prepare失败,是不是以后重复执行的bound语句都会报错:因为PreparedQueryNotFoundException 解惑:  实际使用中,如果遇到没有prepare的返回,会重试:这里的重试包括2个部分:在返回没有prepare的结点上重新prepare,然后在这个结点上将bound statement请求重新做一篇:

case UNPREPARED:
connection.write(prepareAndRetry(toPrepare.getQueryString()));

 

困惑2: 在一次性prepare之后新加入cassandra node,这个新结点上会做prepare么? 解惑: 只要prepare之后,client都会将它存进一个ConcurrentMap<MD5Digest, PreparedStatement>,然后有新的Node加入时,将曾经prepare过的statement重新做一次(com.datastax.driver.core.Cluster.Manager.onAdd(Host))

 

困惑3: 如果多prepare了几次,会有什么影响: 解惑: Server端本身会判断是否已在cache中,如果在则不会重新prepare,所以对于server本身影响不会太恶劣,但是对于客户端来说,影响很大,因为每次prepare都会将prepare statement发送给所有符合loadbalance策略的结点。

小结:

综上,使用prepare很明显对于重复执行、语法相同、参数不同的CQL具有很高的效益,能避免反复执行的prepare操作: 实测比较prepare和非prepare的trace可以知道,节约了这2步:

Parsing insert into site(siteid) values (1212) on /10.224.57.207[SharedPool-Worker-1] at Wed May 27 16:42:32 CST 2015
Preparing statement on /10.224.57.207[SharedPool-Worker-1] at Wed May 27 16:42:32 CST 2015

(三)按照是否批处理:batch statement和非batch的statement

batch这种方式很好理解,可以将可以批处理的请求融合到一起,很好的节约带宽,例如需要每笔业务需要写入3个号码(宅电,手机,工作电话等)的场景下就很适合,当然也可以将不同数据操作放在一次请求中做。

同时batch分为三种类型:


public enum Type {
/**
* A logged batch: Cassandra will first write the batch to its distributed batch log
* to ensure the atomicity of the batch.
*/
LOGGED,

/**
* A batch that doesn't use Cassandra's distributed batch log. Such batch are not
* guaranteed to be atomic.
*/
UNLOGGED,

/**
* A counter batch. Note that such batch is the only type that can contain counter
* operations and it can only contain these.
*/
COUNTER
};

按客户端处理的请求类型划分:Query and Batch,分别对应于com.datastax.driver.core.querybuilder.Batch和com.datastax.driver.core.BatchStatement:

前者对应代码组成Query类型:

 builder.append(isCounterOp()
                       ? "BEGIN COUNTER BATCH"
                       : (logged ? "BEGIN BATCH" : "BEGIN UNLOGGED BATCH"));

        if (!usings.usings.isEmpty()) {
            builder.append(" USING ");
            Utils.joinAndAppend(builder, " AND ", usings.usings, variables);
        }
        builder.append(' ');

        for (int i = 0; i < statements.size(); i++) {
            RegularStatement stmt = statements.get(i);
            if (stmt instanceof BuiltStatement) {
                BuiltStatement bst = (BuiltStatement)stmt;
                builder.append(maybeAddSemicolon(bst.buildQueryString(variables)));

            } else {
                String str = stmt.getQueryString();
                builder.append(str);
                if (!str.trim().endsWith(";"))
                    builder.append(';');

                // Note that we force hasBindMarkers if there is any non-BuiltStatement, so we know
                // that we can only get there with variables == null
                assert variables == null;
            }
        }
        builder.append("APPLY BATCH;");
        return builder;

服务器端接受到2种类型的batch操作,最终复用了batch statement的代码:

Java - cassandra-2.1.2srcorgapachecassandratransportMessage.java - Eclipse

同时batch操作必须符合两大基本要求:

(1)不能混合counter操作和非counter操作;

(2)仅能支持delete/update/insert三种修改操作;

(四)按照使用风格划分

使用风格更多的的是提供语法糖或操作的便捷性:例如对于第一种可以直接书写的CQL可以使用第二种:


public ResultSet execute(String query);

public ResultSet execute(String query, Object... values);

//session.execute( "INSERT INTO images (image_id, title, bytes) VALUES (?, ?, ?)", imageId, imageTitle, imageBytes );

public ResultSet execute(Statement statement);

这种方式比较简单,所以不做过多解释。

总结:

1  根据不同的划分方式可以找出不同的statement,在实际中,可以将多种方式结合起来,例如下面的代码示例: prepare/batch/bound三者结合:其中ps.bind(uid, mid1, title1, body1)返回的是bound statement

PreparedStatement ps = session.prepare("INSERT INTO messages (user_id, msg_id, title, body) VALUES (?, ?, ?, ?)");
BatchStatement batch = new BatchStatement();
batch.add(ps.bind(uid, mid1, title1, body1));
batch.add(ps.bind(uid, mid2, title2, body2));
batch.add(ps.bind(uid, mid3, title3, body3));
session.execute(batch);

2  摒弃不加思考的将某种方式贯彻到底,根据不同应用场合选择不同的方式(例如使用batch/prepare)可以提高效率。