TimestampGenerator主要是标示请求的先后顺序,分本地和远程产生两种方式。
功能
功能是为了resolve conflicts,也为了识别数据新旧等。API DOC如下:
* Generates client-side, microsecond-precision query timestamps.
* Given that Cassandra uses those timestamps to resolve conflicts, implementations should generate
* incrementing timestamps for successive implementations.
查询timestamp可以使用下列CQL语法:writetime()
select id,writetime(id),ttl(id) FROM table;
分类
Monotonic的意思可以参考网上找的一篇文章:
clock-realtime
clock-monotonic
private AtomicLong lastRef = new AtomicLong(0); @Override public long next() { while (true) { long last = lastRef.get(); long next = computeNext(last); if (lastRef.compareAndSet(last, next)) return next; } }
private final ThreadLocal<Long> lastRef = new ThreadLocal<Long>(); @Override public long next() { Long last = this.lastRef.get(); if (last == null) last = 0L; long next = computeNext(last); this.lastRef.set(next); return next; }
其中后两者复用如下代码:即在1ms之内能区分1000个请求,如果超过1000就区分不了。
protected long computeNext(long last) { long millis = last / 1000; long counter = last % 1000; long now = clock.currentTime(); // System.currentTimeMillis can go backwards on an NTP resync, hence the ">" below if (millis >= now) { if (counter == 999) logger.warn("Sub-millisecond counter overflowed, some query timestamps will not be distinct"); else counter += 1; } else { millis = now; counter = 0; } return millis * 1000 + counter; }
使用
long defaultTimestamp = Long.MIN_VALUE; if (cluster.manager.protocolVersion().compareTo(ProtocolVersion.V3) >= 0) { defaultTimestamp = statement.getDefaultTimestamp(); if (defaultTimestamp == Long.MIN_VALUE) defaultTimestamp = cluster.getConfiguration().getPolicies().getTimestampGenerator().next(); }
public static TimestampGenerator defaultTimestampGenerator() { return ServerSideTimestampGenerator.INSTANCE; }
(2)编码请求:
第一步:初始化请求
if (defaultTimestamp != Long.MIN_VALUE) flags.add(QueryFlag.DEFAULT_TIMESTAMP);
第二步:编码PDU
case V3: if (version == ProtocolVersion.V3 && flags.contains(QueryFlag.DEFAULT_TIMESTAMP)) dest.writeLong(defaultTimestamp);
com.datastax.driver.core.Statement.setDefaultTimestamp(long)