Datastax Cassandra Driver Analyst (8)-execute cql and prepared/batch/bound statement

通过Cassandra Driver来执行一段CQL有很多方式,每种方式的适用场景和性能不尽相同,所以一定要明确各种方式区别并合理选择才能最优化性能,通过代码阅读,可以归纳出以下几种方式:

(一) 按照是否等待返回结果: 同步方式和异步方式:

例如:


com.datastax.driver.core.AbstractSession.execute(String)

com.datastax.driver.core.AbstractSession.executeAsync(String)

区别在于:是否等待返回结果。


executeAsync(statement).getUninterruptibly();//getUninterruptibly is waiting response

对于不需要结果、异步处理的数据操作完全可以用异步方式来执行,很明显能否提高效率。

(二)按照是否prepare:prepared statement和非prepared的statement

2.1 如何使用

在使用上,必须先将要执行的CQL进行prepare,然后根据prepare的结果PreparedStatement创建BoundStatement然后执行,换言之:无法不进行prepare而直接使用BoundStatement(因为其仅有一个以PreparedStatement 为参数的构造器)


PreparedStatement statement=connect.prepare("insert into site(siteid,activeid) values (?,?)");

connect.execute(new BoundStatement(statement2).bind(12111l,12111l)) //style one
connect.execute(statement.bind(12111l,12111l))  //style two

2.2 Cassandra server对prepare和非prepare statement处理差异:

对于任何一个cql的执行,cassandra要分成两大基本步骤:prepare(代码如下所示)和执行,其中prepare会做解析语句、基本的语法检查等他准备工作,所以对于仅仅是参数不同的相同语法的CQL而言,prepare是重复多余的,所以有了prepare语句的概念,即对于经常执行的相同语法、不同参数的CQL可以预先prepare一次,以后不用在prepare。 通过下面的代码调用图示,比较普通Query(QueryMessage)和已Prepare的Query(即Bound statement: 消息类型为ExecuteMessage)可知区别在于是否含有示例代码: Java - cassandra-2.1.2srcorgapachecassandratransportmessagesExecuteMessage.java - Eclipse

public static ParsedStatement.Prepared getStatement(String queryStr, ClientState clientState)
throws RequestValidationException
{
Tracing.trace("Parsing {}", queryStr);
ParsedStatement statement = parseStatement(queryStr);

// Set keyspace for statement that require login
if (statement instanceof CFStatement)
((CFStatement)statement).prepareKeyspace(clientState);

Tracing.trace("Preparing statement");
return statement.prepare();
}

从实现上,大体是先将prepare statement请求以statementid为Key存储一份Prepared 的Map数据,以后接受到bound statement的语句时,直接根据statementId来获取。如果找不到存储的数据,抛出提示“事先没有Prepare”的。


ParsedStatement.Prepared prepared = handler.getPrepared(statementId);

if (prepared == null)

throw new PreparedQueryNotFoundException(statementId);

2.3  Cassandra client对prepare和非prepare statement处理差异

对于客户端,首先发出prepare请求时,会根据load balance策略发出一个请求给其中的一个节点,如果成功,则会将prepare请求发送给其他剩余的符合load balance策略的其他所有节点,这样等于说一次prepare会将所有以后可能处理请求的所有结点都prepare下。 同时定义的request type区分了不同的statement:


QUERY    (7, Requests.Query //common,can execute batch operations

PREPARE  (9, Requests.Prepare,  //prepare statement

EXECUTE  (10, Requests.Execute,  //Bound statement

BATCH   (13, Requests.Execute,  //Batch statement

通过以下三点的理解,在使用时,我们难免产生一些困惑:

 

困惑1: prepare的使用要求我们必须先prepare一次,那么假设当时某种因素导致prepare失败,是不是以后重复执行的bound语句都会报错:因为PreparedQueryNotFoundException 解惑:  实际使用中,如果遇到没有prepare的返回,会重试:这里的重试包括2个部分:在返回没有prepare的结点上重新prepare,然后在这个结点上将bound statement请求重新做一篇:

case UNPREPARED:
connection.write(prepareAndRetry(toPrepare.getQueryString()));

 

困惑2: 在一次性prepare之后新加入cassandra node,这个新结点上会做prepare么? 解惑: 只要prepare之后,client都会将它存进一个ConcurrentMap<MD5Digest, PreparedStatement>,然后有新的Node加入时,将曾经prepare过的statement重新做一次(com.datastax.driver.core.Cluster.Manager.onAdd(Host))

 

困惑3: 如果多prepare了几次,会有什么影响: 解惑: Server端本身会判断是否已在cache中,如果在则不会重新prepare,所以对于server本身影响不会太恶劣,但是对于客户端来说,影响很大,因为每次prepare都会将prepare statement发送给所有符合loadbalance策略的结点。

小结:

综上,使用prepare很明显对于重复执行、语法相同、参数不同的CQL具有很高的效益,能避免反复执行的prepare操作: 实测比较prepare和非prepare的trace可以知道,节约了这2步:

Parsing insert into site(siteid) values (1212) on /10.224.57.207[SharedPool-Worker-1] at Wed May 27 16:42:32 CST 2015
Preparing statement on /10.224.57.207[SharedPool-Worker-1] at Wed May 27 16:42:32 CST 2015

(三)按照是否批处理:batch statement和非batch的statement

batch这种方式很好理解,可以将可以批处理的请求融合到一起,很好的节约带宽,例如需要每笔业务需要写入3个号码(宅电,手机,工作电话等)的场景下就很适合,当然也可以将不同数据操作放在一次请求中做。

同时batch分为三种类型:


public enum Type {
/**
* A logged batch: Cassandra will first write the batch to its distributed batch log
* to ensure the atomicity of the batch.
*/
LOGGED,

/**
* A batch that doesn't use Cassandra's distributed batch log. Such batch are not
* guaranteed to be atomic.
*/
UNLOGGED,

/**
* A counter batch. Note that such batch is the only type that can contain counter
* operations and it can only contain these.
*/
COUNTER
};

按客户端处理的请求类型划分:Query and Batch,分别对应于com.datastax.driver.core.querybuilder.Batch和com.datastax.driver.core.BatchStatement:

前者对应代码组成Query类型:

 builder.append(isCounterOp()
                       ? "BEGIN COUNTER BATCH"
                       : (logged ? "BEGIN BATCH" : "BEGIN UNLOGGED BATCH"));

        if (!usings.usings.isEmpty()) {
            builder.append(" USING ");
            Utils.joinAndAppend(builder, " AND ", usings.usings, variables);
        }
        builder.append(' ');

        for (int i = 0; i < statements.size(); i++) {
            RegularStatement stmt = statements.get(i);
            if (stmt instanceof BuiltStatement) {
                BuiltStatement bst = (BuiltStatement)stmt;
                builder.append(maybeAddSemicolon(bst.buildQueryString(variables)));

            } else {
                String str = stmt.getQueryString();
                builder.append(str);
                if (!str.trim().endsWith(";"))
                    builder.append(';');

                // Note that we force hasBindMarkers if there is any non-BuiltStatement, so we know
                // that we can only get there with variables == null
                assert variables == null;
            }
        }
        builder.append("APPLY BATCH;");
        return builder;

服务器端接受到2种类型的batch操作,最终复用了batch statement的代码:

Java - cassandra-2.1.2srcorgapachecassandratransportMessage.java - Eclipse

同时batch操作必须符合两大基本要求:

(1)不能混合counter操作和非counter操作;

(2)仅能支持delete/update/insert三种修改操作;

(四)按照使用风格划分

使用风格更多的的是提供语法糖或操作的便捷性:例如对于第一种可以直接书写的CQL可以使用第二种:


public ResultSet execute(String query);

public ResultSet execute(String query, Object... values);

//session.execute( "INSERT INTO images (image_id, title, bytes) VALUES (?, ?, ?)", imageId, imageTitle, imageBytes );

public ResultSet execute(Statement statement);

这种方式比较简单,所以不做过多解释。

总结:

1  根据不同的划分方式可以找出不同的statement,在实际中,可以将多种方式结合起来,例如下面的代码示例: prepare/batch/bound三者结合:其中ps.bind(uid, mid1, title1, body1)返回的是bound statement

PreparedStatement ps = session.prepare("INSERT INTO messages (user_id, msg_id, title, body) VALUES (?, ?, ?, ?)");
BatchStatement batch = new BatchStatement();
batch.add(ps.bind(uid, mid1, title1, body1));
batch.add(ps.bind(uid, mid2, title2, body2));
batch.add(ps.bind(uid, mid3, title3, body3));
session.execute(batch);

2  摒弃不加思考的将某种方式贯彻到底,根据不同应用场合选择不同的方式(例如使用batch/prepare)可以提高效率。

Datastax Cassandra Driver Analyst (2)-Configuration-MetricsOptions

Datastax Cassandra Driver本身是启用了metrics且启用了jmx report功能,即提供了丰富的性能监控功能。

可以通过com.datastax.driver.core.Cluster.Builder看出来:


private boolean metricsEnabled = true;

private boolean jmxEnabled = true;

metricsEnabled ? new MetricsOptions(jmxEnabled) : null

我们可以查阅代码看都report了哪些信息:

private final Timer requests = registry.timer("requests");

private final Gauge<Integer> knownHosts = registry.register("known-hosts", new Gauge<Integer>()

private final Gauge<Integer> connectedTo = registry.register("connected-to", new Gauge<Integer>()

private final Gauge<Integer> openConnections = registry.register("open-connections", new Gauge<Integer>()

当然也可以使用visualvm/jconsole等工具直接查看mbean。

ps:  如果远程查看的话,记得加上 -Dcom.sun.management.jmxremote=true  -Dcom.sun.management.jmxremote.port=9999 -Dcom.sun.management.jmxremote.ssl=false   -Dcom.sun.management.jmxremote.authenticate=false ,其中端口号要大于1024:

 

requests

 

基本上字面意思解释的很清楚,比较难理解的是以下几点:

(1)StdDev:标准差,标明数据样本与平均数之间的差异的情况,越大代表样本分散越不均匀

(2)DurationUnit是处理Event的时间消费的单位,即50thpercentile(ile即<=)等的单位

(3)999thpercentile=99.9thpercentile

(4)最近1/5/15分钟的速率不是真实的存这么长时间然后算平均数,而是按照一定的算法(linux中的top)来计算的。

(5)median: 中数,所有数据样本中,最中间的数,即50%的概念以此为界限。

Metric本身很有用,例如可以通过它可以查看建立了多少连接,接受到多少请求,请求失败率,请求的处理的平均时间/最大时间/最小时间等,完成一定的性能监控。

而性能指标中“处理请求消费的时间”是什么时间段,可以查阅代码:

从:


public RequestHandler(SessionManager manager, Callback callback, Statement statement) {
......
this.timerContext = metricsEnabled()

? metrics().getRequestsTimer().time()
: null;
this.startTime = System.nanoTime();
}

 

到:

com.datastax.driver.core.RequestHandler.setFinalResult(Connection, Response)

or

com.datastax.driver.core.RequestHandler.setFinalException(Connection, Exception)

基本上可以认为就是一个请求从建立到接受之后的时间消费,这样我们可以定期去用visual vm或者jconsole等查看下。

写到这里我们可知,对于cassandra driver本身,实际上是有性能监控的方法(不过metrics官网提及对于高吞吐量、低延时的项目使用metric效果并不好,同时也提及jmx这种方式不适合产品级监控)。

而如果我们不知道它的存在,也没有利用上会有什么坏处? 不言而喻,会浪费driver代码的处理时间和CPU占用时间。

通过之前的一些测试,Driver本身处理的TPS能力对CPU的配置非常敏感,以所做的项目为例,2核时可以达到600TPS,但是在8核下可以达到3000TPS.所以我们可以通过Jprofiler来看看默认的Metric性能监控对CPU的影响:

 

jmx Jian Fu (jiafu) - Microsoft Outlook

 

可以看出,Metrics的性能监控占用了14.2%的CPU时间。

所以从另外一个角度说,如果去掉Metrics监控可以节约CPU资源从而提高TPS.


Cluster.builder().withoutMetrics()

如果不仔细阅读代码,可能会使用Cluster.builder().withoutJMXReporting().

一定要区分:前者是关闭Metrix,也同时不会启动JMX来report;而后者仅仅是关闭了JMX report但是仍然会监控性能。

 

总结:通过Metrics Options配置,我们可以使用它来监控Driver的性能,如果我们不想用或者不方便用或压根不知道这个功能,那么阅读本文之后,就直接禁用吧,这样会节约一些CPU资源,提升一定的性能。

这里需要提及的是,对于Metrics本身,除了JMX展示,还可以使用console/http/csv/SLF4J 等来展示。

当然datastax的driver使用的是metric+jmx这种方式。

driver中JMX reporter是可以替换成其他类型的,例如console, slf4j, cvs等定时输出的方式,置换方式只要几行代码,例如换成每1分钟输出监控数据一次:

cluster = Cluster.builder()
 .addContactPoints(nodes.split(",")).withoutJMXReporting()  //close jmx report
 .build();
 MetricRegistry registry = cluster.getMetrics().getRegistry();
 Slf4jReporter.forRegistry(registry).build().start(1, TimeUnit.MINUTES); //swich to log4j

03-19 2015 02:14:18:361 [metrics-logger-reporter-thread-1] INFO metrics – type=GAUGE, name=connected-to, value=3
03-19 2015 02:14:18:362 [metrics-logger-reporter-thread-1] INFO metrics – type=GAUGE, name=known-hosts, value=11
03-19 2015 02:14:18:362 [metrics-logger-reporter-thread-1] INFO metrics – type=GAUGE, name=open-connections, value=4
03-19 2015 02:14:18:362 [metrics-logger-reporter-thread-1] INFO metrics – type=COUNTER, name=connection-errors, count=0
03-19 2015 02:14:18:362 [metrics-logger-reporter-thread-1] INFO metrics – type=COUNTER, name=ignores, count=0
03-19 2015 02:14:18:362 [metrics-logger-reporter-thread-1] INFO metrics – type=COUNTER, name=ignores-on-read-timeout, count=0
03-19 2015 02:14:18:362 [metrics-logger-reporter-thread-1] INFO metrics – type=COUNTER, name=ignores-on-unavailable, count=0
03-19 2015 02:14:18:363 [metrics-logger-reporter-thread-1] INFO metrics – type=COUNTER, name=ignores-on-write-timeout, count=0
03-19 2015 02:14:18:363 [metrics-logger-reporter-thread-1] INFO metrics – type=COUNTER, name=other-errors, count=0
03-19 2015 02:14:18:363 [metrics-logger-reporter-thread-1] INFO metrics – type=COUNTER, name=read-timeouts, count=0
03-19 2015 02:14:18:363 [metrics-logger-reporter-thread-1] INFO metrics – type=COUNTER, name=retries, count=0
03-19 2015 02:14:18:363 [metrics-logger-reporter-thread-1] INFO metrics – type=COUNTER, name=retries-on-read-timeout, count=0
03-19 2015 02:14:18:363 [metrics-logger-reporter-thread-1] INFO metrics – type=COUNTER, name=retries-on-unavailable, count=0
03-19 2015 02:14:18:364 [metrics-logger-reporter-thread-1] INFO metrics – type=COUNTER, name=retries-on-write-timeout, count=0
03-19 2015 02:14:18:364 [metrics-logger-reporter-thread-1] INFO metrics – type=COUNTER, name=unavailables, count=0
03-19 2015 02:14:18:364 [metrics-logger-reporter-thread-1] INFO metrics – type=COUNTER, name=write-timeouts, count=0
03-19 2015 02:14:18:365 [metrics-logger-reporter-thread-1] INFO metrics – type=TIMER, name=requests, count=30, min=2.6754119999999997, max=8.28019, mean=3.9076371, stddev=1.0645403774100675, median=3.8236345, p75=4.342703, p95=6.798477349999997, p98=8.28019, p99=8.28019, p999=8.28019, mean_rate=0.12499429465364886, m1=0.10222425153981142, m5=0.06690157349035725, m15=0.02901789392361707, rate_unit=events/second, duration_unit=milliseconds

学习要点:Metrics