通过前些文章,我们已经完全从整体架构和数据接入方面理解了ZK的前情工作。接下来,我们就来看ZK的正式工作吧。

  本文以 setData /a data 这个命令作为出发点,来观察zk是如何处理来自客户端的数据更新操作的吧!

  首先,我们需要明确各个角色所担任的工作,然后才能更好的理解其工作流程。我们以 Leader 为当前客户端连接的角色,揭开其 更新数据的面纱。其责任链是这样的:

  LeaderRequestProcessor -> PrepRequestProcessor -> ProposalRequestProcessor -> CommitProcessor -> ToBeAppliedRequestProcessor -> FinalRequestProcessor

  当然了, LeaderRequestProcessor 仅起到一个流程转发的作用,我们也不管它了。所以,起点是 PrepRequestProcessor 。

一、 PrepRequestProcessor 初步处理数据

  有一点明确的是,每个独立运行的 RequestProcessor, 都是通过队列与一个 RequestProcessor 通信的。 PrepRequestProcessor 也一样,在接到 LeaderRequestProcessor sumit 过来的数据后,开始自己的工作。

    // 提交任务为直接添加到队列过程
// org.apache.zookeeper.server.PrepRequestProcessor#processRequest
public void processRequest(Request request) {
request.prepQueueStartTime = Time.currentElapsedTime();
submittedRequests.add(request);
ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUED.add(1);
}
// org.apache.zookeeper.server.PrepRequestProcessor#run
@Override
public void run() {
try {
// 永远是死循环处理任务
while (true) {
ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_SIZE.add(submittedRequests.size());
// submittedRequests 阻塞队列获取
Request request = submittedRequests.take();
ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_TIME
.add(Time.currentElapsedTime() - request.prepQueueStartTime);
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) {
traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
}
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
}
if (Request.requestOfDeath == request) {
break;
} request.prepStartTime = Time.currentElapsedTime();
// 主要处理逻辑交由 pRequest
pRequest(request);
}
} catch (Exception e) {
handleException(this.getName(), e);
}
LOG.info("PrepRequestProcessor exited loop!");
} /**
* This method will be called inside the ProcessRequestThread, which is a
* singleton, so there will be a single thread calling this code.
*
* @param request
*/
protected void pRequest(Request request) throws RequestProcessorException {
// LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
// request.type + " id = 0x" + Long.toHexString(request.sessionId));
request.setHdr(null);
request.setTxn(null); try {
switch (request.type) {
case OpCode.createContainer:
case OpCode.create:
case OpCode.create2:
CreateRequest create2Request = new CreateRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);
break;
case OpCode.createTTL:
CreateTTLRequest createTtlRequest = new CreateTTLRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest, true);
break;
case OpCode.deleteContainer:
case OpCode.delete:
DeleteRequest deleteRequest = new DeleteRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true);
break;
// setData 的 type = 5
case OpCode.setData:
SetDataRequest setDataRequest = new SetDataRequest();
// 针写数据请求,转交给 2Txn 处理
pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);
break;
case OpCode.reconfig:
ReconfigRequest reconfigRequest = new ReconfigRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, reconfigRequest);
pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true);
break;
case OpCode.setACL:
SetACLRequest setAclRequest = new SetACLRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true);
break;
case OpCode.check:
CheckVersionRequest checkRequest = new CheckVersionRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true);
break;
case OpCode.multi:
MultiOperationRecord multiRequest = new MultiOperationRecord();
try {
ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);
} catch (IOException e) {
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), Time.currentWallTime(), OpCode.multi));
throw e;
}
List<Txn> txns = new ArrayList<Txn>();
//Each op in a multi-op must have the same zxid!
long zxid = zks.getNextZxid();
KeeperException ke = null; //Store off current pending change records in case we need to rollback
Map<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest); for (Op op : multiRequest) {
Record subrequest = op.toRequestRecord();
int type;
Record txn; /* If we've already failed one of the ops, don't bother
* trying the rest as we know it's going to fail and it
* would be confusing in the logfiles.
*/
if (ke != null) {
type = OpCode.error;
txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());
} else {
/* Prep the request and convert to a Txn */
try {
pRequest2Txn(op.getType(), zxid, request, subrequest, false);
type = request.getHdr().getType();
txn = request.getTxn();
} catch (KeeperException e) {
ke = e;
type = OpCode.error;
txn = new ErrorTxn(e.code().intValue()); if (e.code().intValue() > Code.APIERROR.intValue()) {
LOG.info("Got user-level KeeperException when processing {} aborting"
+ " remaining multi ops. Error Path:{} Error:{}",
request.toString(),
e.getPath(),
e.getMessage());
} request.setException(e); /* Rollback change records from failed multi-op */
rollbackPendingChanges(zxid, pendingChanges);
}
} // TODO: I don't want to have to serialize it here and then
// immediately deserialize in next processor. But I'm
// not sure how else to get the txn stored into our list.
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
txn.serialize(boa, "request");
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
txns.add(new Txn(type, bb.array()));
}
} request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), request.type));
request.setTxn(new MultiTxn(txns)); break; //create/close session don't require request record
case OpCode.createSession:
case OpCode.closeSession:
if (!request.isLocalSession()) {
pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
}
break; //All the rest don't need to create a Txn - just verify session
case OpCode.sync:
case OpCode.exists:
case OpCode.getData:
case OpCode.getACL:
case OpCode.getChildren:
case OpCode.getAllChildrenNumber:
case OpCode.getChildren2:
case OpCode.ping:
case OpCode.setWatches:
case OpCode.checkWatches:
case OpCode.removeWatches:
case OpCode.getEphemerals:
case OpCode.multiRead:
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
break;
default:
LOG.warn("unknown type {}", request.type);
break;
}
} catch (KeeperException e) {
if (request.getHdr() != null) {
request.getHdr().setType(OpCode.error);
request.setTxn(new ErrorTxn(e.code().intValue()));
} if (e.code().intValue() > Code.APIERROR.intValue()) {
LOG.info(
"Got user-level KeeperException when processing {} Error Path:{} Error:{}",
request.toString(),
e.getPath(),
e.getMessage());
}
request.setException(e);
} catch (Exception e) {
// log at error level as we are returning a marshalling
// error to the user
LOG.error("Failed to process {}", request, e); StringBuilder sb = new StringBuilder();
ByteBuffer bb = request.request;
if (bb != null) {
bb.rewind();
while (bb.hasRemaining()) {
sb.append(Integer.toHexString(bb.get() & 0xff));
}
} else {
sb.append("request buffer is null");
} LOG.error("Dumping request buffer: 0x{}", sb.toString());
if (request.getHdr() != null) {
request.getHdr().setType(OpCode.error);
request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue()));
}
}
// 将新的事务id获取到,传入下一个 RequestProcessor
request.zxid = zks.getZxid();
ServerMetrics.getMetrics().PREP_PROCESS_TIME.add(Time.currentElapsedTime() - request.prepStartTime);
// 这里的 nextProcessor 是 ProposalRequestProcessor
nextProcessor.processRequest(request);
} /**
* This method will be called inside the ProcessRequestThread, which is a
* singleton, so there will be a single thread calling this code.
*
* @param type
* @param zxid
* @param request
* @param record
*/
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException {
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type)); // 再次进行类型判定,确认处理逻辑
switch (type) {
case OpCode.create:
case OpCode.create2:
case OpCode.createTTL:
case OpCode.createContainer: {
pRequest2TxnCreate(type, request, record, deserialize);
break;
}
case OpCode.deleteContainer: {
// ...
break;
}
case OpCode.delete:
//...
break;
// 同样我们只看 setData 情形
case OpCode.setData:
// 分几步走:
// 1. 检查session;
// 2. 反序列化数据;
// 3. 路径检查;
// 4. 检查权限;
// 5. 检查版本号;
// 6. 添加到变更队列中,备用
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
SetDataRequest setDataRequest = (SetDataRequest) record;
if (deserialize) {
ByteBufferInputStream.byteBuffer2Record(request.request, setDataRequest);
}
path = setDataRequest.getPath();
validatePath(path, request.sessionId);
nodeRecord = getRecordForPath(path);
zks.checkACL(request.cnxn, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo, path, null);
int newVersion = checkAndIncVersion(nodeRecord.stat.getVersion(), setDataRequest.getVersion(), path);
request.setTxn(new SetDataTxn(path, setDataRequest.getData(), newVersion));
nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());
nodeRecord.stat.setVersion(newVersion);
addChangeRecord(nodeRecord);
break;
case OpCode.reconfig:
//...
addChangeRecord(nodeRecord);
break;
case OpCode.setACL:
// ...
addChangeRecord(nodeRecord);
break;
case OpCode.createSession:
// ...
break;
case OpCode.closeSession:
// ...
break;
case OpCode.check:
// ...
break;
default:
LOG.warn("unknown type {}", type);
break;
}
}

二、 ProposalRequestProcessor 处理写事务数据,投票发起

  ProposalRequestProcessor 不作为独立的线程运行,只是在处理写事务时被调用。其入口也只有 processRequest 方法而已,主要作用则是组件投票逻辑。

    // org.apache.zookeeper.server.quorum.ProposalRequestProcessor#processRequest
public void processRequest(Request request) throws RequestProcessorException {
// LOG.warn("Ack>>> cxid = " + request.cxid + " type = " +
// request.type + " id = " + request.sessionId);
// request.addRQRec(">prop"); /* In the following IF-THEN-ELSE block, we process syncs on the leader.
* If the sync is coming from a follower, then the follower
* handler adds it to syncHandler. Otherwise, if it is a client of
* the leader that issued the sync command, then syncHandler won't
* contain the handler. In this case, we add it to syncHandler, and
* call processRequest on the next processor.
*/ if (request instanceof LearnerSyncRequest) {
zks.getLeader().processSync((LearnerSyncRequest) request);
} else {
// 此处 nextProcessor 是 CommitProcessor
// 先交其处理,然后再判断是否需要本地化数据
nextProcessor.processRequest(request);
// 只要是写事务, hdr 不会为空
if (request.getHdr() != null) {
// We need to sync and get consensus on any transactions
// 投票决定是否更新数据
try {
// 投票逻辑则由 leader 类去处理
zks.getLeader().propose(request);
} catch (XidRolloverException e) {
throw new RequestProcessorException(e.getMessage(), e);
}
// 交由 SyncRequestProcessor 进行数据持久化操作
syncProcessor.processRequest(request);
}
}
} // 提交给 CommitProcessor 的任务队列
// org.apache.zookeeper.server.quorum.CommitProcessor#processRequest
@Override
public void processRequest(Request request) {
if (stopped) {
return;
}
LOG.debug("Processing request:: {}", request);
request.commitProcQueueStartTime = Time.currentElapsedTime();
queuedRequests.add(request);
// If the request will block, add it to the queue of blocking requests
if (needCommit(request)) {
queuedWriteRequests.add(request);
numWriteQueuedRequests.incrementAndGet();
} else {
numReadQueuedRequests.incrementAndGet();
}
wakeup();
} // 简单看一下事务投票过程
// org.apache.zookeeper.server.quorum.Leader#propose
/**
* create a proposal and send it out to all the members
*
* @param request
* @return the proposal that is queued to send to all the members
*/
public Proposal propose(Request request) throws XidRolloverException {
/**
* Address the rollover issue. All lower 32bits set indicate a new leader
* election. Force a re-election instead. See ZOOKEEPER-1277
*/
if ((request.zxid & 0xffffffffL) == 0xffffffffL) {
String msg = "zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start";
shutdown(msg);
throw new XidRolloverException(msg);
} // 封装 QuorumPacket, 进行节点间通信
byte[] data = SerializeUtils.serializeRequest(request);
proposalStats.setLastBufferSize(data.length);
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null); Proposal p = new Proposal();
p.packet = pp;
p.request = request; synchronized (this) {
// 将自己算入投票队列
p.addQuorumVerifier(self.getQuorumVerifier()); if (request.getHdr().getType() == OpCode.reconfig) {
self.setLastSeenQuorumVerifier(request.qv, true);
} if (self.getQuorumVerifier().getVersion() < self.getLastSeenQuorumVerifier().getVersion()) {
p.addQuorumVerifier(self.getLastSeenQuorumVerifier());
} LOG.debug("Proposing:: {}", request); lastProposed = p.packet.getZxid();
outstandingProposals.put(lastProposed, p);
// 此处会将 QuorumPacket 数据包,全部发送到各个通信节点
sendPacket(pp);
}
ServerMetrics.getMetrics().PROPOSAL_COUNT.add(1);
return p;
} // 同步处理器的加入逻辑,也是将其放入队列中,然后自行轮询队列处理
// org.apache.zookeeper.server.SyncRequestProcessor#processRequest
public void processRequest(final Request request) {
Objects.requireNonNull(request, "Request cannot be null"); request.syncQueueStartTime = Time.currentElapsedTime();
queuedRequests.add(request);
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUED.add(1);
}

三、 CommitProcessor 处理数据的提交操作,触发ack等

  由 PrepRequestProcessor 提交任务过来,自身独立处理request.

    // org.apache.zookeeper.server.quorum.CommitProcessor#processRequest
@Override
public void processRequest(Request request) {
if (stopped) {
return;
}
LOG.debug("Processing request:: {}", request);
request.commitProcQueueStartTime = Time.currentElapsedTime();
queuedRequests.add(request);
// If the request will block, add it to the queue of blocking requests
if (needCommit(request)) {
queuedWriteRequests.add(request);
numWriteQueuedRequests.incrementAndGet();
} else {
numReadQueuedRequests.incrementAndGet();
}
wakeup();
} // org.apache.zookeeper.server.quorum.CommitProcessor#run
@Override
public void run() {
try {
/*
* In each iteration of the following loop we process at most
* requestsToProcess requests of queuedRequests. We have to limit
* the number of request we poll from queuedRequests, since it is
* possible to endlessly poll read requests from queuedRequests, and
* that will lead to a starvation of non-local committed requests.
*/
int requestsToProcess = 0;
boolean commitIsWaiting = false;
do {
/*
* Since requests are placed in the queue before being sent to
* the leader, if commitIsWaiting = true, the commit belongs to
* the first update operation in the queuedRequests or to a
* request from a client on another server (i.e., the order of
* the following two lines is important!).
*/
commitIsWaiting = !committedRequests.isEmpty();
requestsToProcess = queuedRequests.size();
// Avoid sync if we have something to do
// wait/notify 等待任务处理完成
if (requestsToProcess == 0 && !commitIsWaiting) {
// Waiting for requests to process
synchronized (this) {
while (!stopped && requestsToProcess == 0 && !commitIsWaiting) {
wait();
commitIsWaiting = !committedRequests.isEmpty();
requestsToProcess = queuedRequests.size();
}
}
} ServerMetrics.getMetrics().READS_QUEUED_IN_COMMIT_PROCESSOR.add(numReadQueuedRequests.get());
ServerMetrics.getMetrics().WRITES_QUEUED_IN_COMMIT_PROCESSOR.add(numWriteQueuedRequests.get());
ServerMetrics.getMetrics().COMMITS_QUEUED_IN_COMMIT_PROCESSOR.add(committedRequests.size()); long time = Time.currentElapsedTime(); /*
* Processing up to requestsToProcess requests from the incoming
* queue (queuedRequests). If maxReadBatchSize is set then no
* commits will be processed until maxReadBatchSize number of
* reads are processed (or no more reads remain in the queue).
* After the loop a single committed request is processed if
* one is waiting (or a batch of commits if maxCommitBatchSize
* is set).
*/
Request request;
int readsProcessed = 0;
// 处理单批次数据,由前面取得的 requestsToProcess 决定
while (!stopped
&& requestsToProcess > 0
&& (maxReadBatchSize < 0 || readsProcessed <= maxReadBatchSize)
&& (request = queuedRequests.poll()) != null) {
// 将队列弹出后,就相当于处理了该任务
requestsToProcess--;
// 更新类操作将会进入第一个逻辑
if (needCommit(request) || pendingRequests.containsKey(request.sessionId)) {
// Add request to pending
Deque<Request> requests = pendingRequests.computeIfAbsent(request.sessionId, sid -> new ArrayDeque<>());
requests.addLast(request);
ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(requests.size());
} else {
readsProcessed++;
numReadQueuedRequests.decrementAndGet();
sendToNextProcessor(request);
}
/*
* Stop feeding the pool if there is a local pending update
* and a committed request that is ready. Once we have a
* pending request with a waiting committed request, we know
* we can process the committed one. This is because commits
* for local requests arrive in the order they appeared in
* the queue, so if we have a pending request and a
* committed request, the committed request must be for that
* pending write or for a write originating at a different
* server. We skip this if maxReadBatchSize is set.
*/
if (maxReadBatchSize < 0 && !pendingRequests.isEmpty() && !committedRequests.isEmpty()) {
/*
* We set commitIsWaiting so that we won't check
* committedRequests again.
*/
commitIsWaiting = true;
break;
}
}
ServerMetrics.getMetrics().READS_ISSUED_IN_COMMIT_PROC.add(readsProcessed); if (!commitIsWaiting) {
commitIsWaiting = !committedRequests.isEmpty();
} /*
* Handle commits, if any.
*/
if (commitIsWaiting && !stopped) {
/*
* Drain outstanding reads
*/
waitForEmptyPool(); if (stopped) {
return;
} int commitsToProcess = maxCommitBatchSize; /*
* Loop through all the commits, and try to drain them.
*/
Set<Long> queuesToDrain = new HashSet<>();
long startWriteTime = Time.currentElapsedTime();
int commitsProcessed = 0;
while (commitIsWaiting && !stopped && commitsToProcess > 0) {
// Process committed head
request = committedRequests.peek(); /*
* Check if this is a local write request is pending,
* if so, update it with the committed info. If the commit matches
* the first write queued in the blockedRequestQueue, we know this is
* a commit for a local write, as commits are received in order. Else
* it must be a commit for a remote write.
*/
if (!queuedWriteRequests.isEmpty()
&& queuedWriteRequests.peek().sessionId == request.sessionId
&& queuedWriteRequests.peek().cxid == request.cxid) {
/*
* Commit matches the earliest write in our write queue.
*/
Deque<Request> sessionQueue = pendingRequests.get(request.sessionId);
ServerMetrics.getMetrics().PENDING_SESSION_QUEUE_SIZE.add(pendingRequests.size());
if (sessionQueue == null || sessionQueue.isEmpty() || !needCommit(sessionQueue.peek())) {
/*
* Can't process this write yet.
* Either there are reads pending in this session, or we
* haven't gotten to this write yet.
*/
break;
} else {
ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(sessionQueue.size());
// If session queue != null, then it is also not empty.
Request topPending = sessionQueue.poll();
/*
* Generally, we want to send to the next processor our version of the request,
* since it contains the session information that is needed for post update processing.
* In more details, when a request is in the local queue, there is (or could be) a client
* attached to this server waiting for a response, and there is other bookkeeping of
* requests that are outstanding and have originated from this server
* (e.g., for setting the max outstanding requests) - we need to update this info when an
* outstanding request completes. Note that in the other case, the operation
* originated from a different server and there is no local bookkeeping or a local client
* session that needs to be notified.
*/
topPending.setHdr(request.getHdr());
topPending.setTxn(request.getTxn());
topPending.zxid = request.zxid;
topPending.commitRecvTime = request.commitRecvTime;
request = topPending;
// Only decrement if we take a request off the queue.
numWriteQueuedRequests.decrementAndGet();
queuedWriteRequests.poll();
queuesToDrain.add(request.sessionId);
}
}
/*
* Pull the request off the commit queue, now that we are going
* to process it.
*/
committedRequests.remove();
commitsToProcess--;
commitsProcessed++; // Process the write inline.
// 处理 commit, ToBeAppliedRequestProcessor
processWrite(request); commitIsWaiting = !committedRequests.isEmpty();
}
ServerMetrics.getMetrics().WRITE_BATCH_TIME_IN_COMMIT_PROCESSOR
.add(Time.currentElapsedTime() - startWriteTime);
ServerMetrics.getMetrics().WRITES_ISSUED_IN_COMMIT_PROC.add(commitsProcessed); /*
* Process following reads if any, remove session queue(s) if
* empty.
*/
readsProcessed = 0;
for (Long sessionId : queuesToDrain) {
Deque<Request> sessionQueue = pendingRequests.get(sessionId);
int readsAfterWrite = 0;
while (!stopped && !sessionQueue.isEmpty() && !needCommit(sessionQueue.peek())) {
numReadQueuedRequests.decrementAndGet();
// 构造 org.apache.zookeeper.server.quorum.CommitProcessor.CommitWorkRequest#CommitWorkRequest, 提交到线程池运行
sendToNextProcessor(sessionQueue.poll());
readsAfterWrite++;
}
ServerMetrics.getMetrics().READS_AFTER_WRITE_IN_SESSION_QUEUE.add(readsAfterWrite);
readsProcessed += readsAfterWrite; // Remove empty queues
if (sessionQueue.isEmpty()) {
pendingRequests.remove(sessionId);
}
}
ServerMetrics.getMetrics().SESSION_QUEUES_DRAINED.add(queuesToDrain.size());
ServerMetrics.getMetrics().READ_ISSUED_FROM_SESSION_QUEUE.add(readsProcessed);
} ServerMetrics.getMetrics().COMMIT_PROCESS_TIME.add(Time.currentElapsedTime() - time);
endOfIteration();
} while (!stoppedMainLoop);
} catch (Throwable e) {
handleException(this.getName(), e);
}
LOG.info("CommitProcessor exited loop!");
} // 当commitProcessor 检测到可以进行事务提交时,就会进行提交动作(这个动作是在投票已结束的时候才开始的)
// org.apache.zookeeper.server.quorum.CommitProcessor#processWrite
private void processWrite(Request request) throws RequestProcessorException {
processCommitMetrics(request, true); long timeBeforeFinalProc = Time.currentElapsedTime();
// ToBeAppliedRequestProcessor
nextProcessor.processRequest(request);
ServerMetrics.getMetrics().WRITE_FINAL_PROC_TIME.add(Time.currentElapsedTime() - timeBeforeFinalProc);
}
// org.apache.zookeeper.server.quorum.Leader.ToBeAppliedRequestProcessor#processRequest
public void processRequest(Request request) throws RequestProcessorException {
// FinalRequestProcessor, 由 FinalRequestProcessor 进行数据持久化
next.processRequest(request); // The only requests that should be on toBeApplied are write
// requests, for which we will have a hdr. We can't simply use
// request.zxid here because that is set on read requests to equal
// the zxid of the last write op.
if (request.getHdr() != null) {
long zxid = request.getHdr().getZxid();
Iterator<Proposal> iter = leader.toBeApplied.iterator();
if (iter.hasNext()) {
Proposal p = iter.next();
if (p.request != null && p.request.zxid == zxid) {
iter.remove();
return;
}
}
LOG.error("Committed request not found on toBeApplied: {}", request);
}
}
// org.apache.zookeeper.server.FinalRequestProcessor#processRequest
public void processRequest(Request request) {
LOG.debug("Processing request:: {}", request); // request.addRQRec(">final");
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) {
traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
}
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, traceMask, 'E', request, "");
} // DB 入库
ProcessTxnResult rc = zks.processTxn(request); // ZOOKEEPER-558:
// In some cases the server does not close the connection (e.g., closeconn buffer
// was not being queued — ZOOKEEPER-558) properly. This happens, for example,
// when the client closes the connection. The server should still close the session, though.
// Calling closeSession() after losing the cnxn, results in the client close session response being dropped.
if (request.type == OpCode.closeSession && connClosedByClient(request)) {
// We need to check if we can close the session id.
// Sometimes the corresponding ServerCnxnFactory could be null because
// we are just playing diffs from the leader.
if (closeSession(zks.serverCnxnFactory, request.sessionId)
|| closeSession(zks.secureServerCnxnFactory, request.sessionId)) {
return;
}
} if (request.getHdr() != null) {
/*
* Request header is created only by the leader, so this must be
* a quorum request. Since we're comparing timestamps across hosts,
* this metric may be incorrect. However, it's still a very useful
* metric to track in the happy case. If there is clock drift,
* the latency can go negative. Note: headers use wall time, not
* CLOCK_MONOTONIC.
*/
long propagationLatency = Time.currentWallTime() - request.getHdr().getTime();
if (propagationLatency >= 0) {
ServerMetrics.getMetrics().PROPAGATION_LATENCY.add(propagationLatency);
}
} if (request.cnxn == null) {
return;
}
ServerCnxn cnxn = request.cnxn; long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid(); String lastOp = "NA";
// Notify ZooKeeperServer that the request has finished so that it can
// update any request accounting/throttling limits
zks.decInProcess();
zks.requestFinished(request);
Code err = Code.OK;
Record rsp = null;
String path = null;
try {
if (request.getHdr() != null && request.getHdr().getType() == OpCode.error) {
/*
* When local session upgrading is disabled, leader will
* reject the ephemeral node creation due to session expire.
* However, if this is the follower that issue the request,
* it will have the correct error code, so we should use that
* and report to user
*/
if (request.getException() != null) {
throw request.getException();
} else {
throw KeeperException.create(KeeperException.Code.get(((ErrorTxn) request.getTxn()).getErr()));
}
} KeeperException ke = request.getException();
if (ke instanceof SessionMovedException) {
throw ke;
}
if (ke != null && request.type != OpCode.multi) {
throw ke;
} LOG.debug("{}", request); if (request.isStale()) {
ServerMetrics.getMetrics().STALE_REPLIES.add(1);
} switch (request.type) {
case OpCode.ping: {
lastOp = "PING";
updateStats(request, lastOp, lastZxid); cnxn.sendResponse(new ReplyHeader(-2, lastZxid, 0), null, "response");
return;
}
case OpCode.createSession: {
lastOp = "SESS";
updateStats(request, lastOp, lastZxid); zks.finishSessionInit(request.cnxn, true);
return;
}
case OpCode.multi: {
lastOp = "MULT";
rsp = new MultiResponse(); for (ProcessTxnResult subTxnResult : rc.multiResult) { OpResult subResult; switch (subTxnResult.type) {
case OpCode.check:
subResult = new CheckResult();
break;
case OpCode.create:
subResult = new CreateResult(subTxnResult.path);
break;
case OpCode.create2:
case OpCode.createTTL:
case OpCode.createContainer:
subResult = new CreateResult(subTxnResult.path, subTxnResult.stat);
break;
case OpCode.delete:
case OpCode.deleteContainer:
subResult = new DeleteResult();
break;
case OpCode.setData:
subResult = new SetDataResult(subTxnResult.stat);
break;
case OpCode.error:
subResult = new ErrorResult(subTxnResult.err);
if (subTxnResult.err == Code.SESSIONMOVED.intValue()) {
throw new SessionMovedException();
}
break;
default:
throw new IOException("Invalid type of op");
} ((MultiResponse) rsp).add(subResult);
} break;
}
case OpCode.multiRead: {
lastOp = "MLTR";
MultiOperationRecord multiReadRecord = new MultiOperationRecord();
ByteBufferInputStream.byteBuffer2Record(request.request, multiReadRecord);
rsp = new MultiResponse();
OpResult subResult;
for (Op readOp : multiReadRecord) {
try {
Record rec;
switch (readOp.getType()) {
case OpCode.getChildren:
rec = handleGetChildrenRequest(readOp.toRequestRecord(), cnxn, request.authInfo);
subResult = new GetChildrenResult(((GetChildrenResponse) rec).getChildren());
break;
case OpCode.getData:
rec = handleGetDataRequest(readOp.toRequestRecord(), cnxn, request.authInfo);
GetDataResponse gdr = (GetDataResponse) rec;
subResult = new GetDataResult(gdr.getData(), gdr.getStat());
break;
default:
throw new IOException("Invalid type of readOp");
}
} catch (KeeperException e) {
subResult = new ErrorResult(e.code().intValue());
}
((MultiResponse) rsp).add(subResult);
}
break;
}
case OpCode.create: {
lastOp = "CREA";
rsp = new CreateResponse(rc.path);
err = Code.get(rc.err);
requestPathMetricsCollector.registerRequest(request.type, rc.path);
break;
}
case OpCode.create2:
case OpCode.createTTL:
case OpCode.createContainer: {
lastOp = "CREA";
rsp = new Create2Response(rc.path, rc.stat);
err = Code.get(rc.err);
requestPathMetricsCollector.registerRequest(request.type, rc.path);
break;
}
case OpCode.delete:
case OpCode.deleteContainer: {
lastOp = "DELE";
err = Code.get(rc.err);
requestPathMetricsCollector.registerRequest(request.type, rc.path);
break;
}
case OpCode.setData: {
// 写数据
lastOp = "SETD";
rsp = new SetDataResponse(rc.stat);
err = Code.get(rc.err);
requestPathMetricsCollector.registerRequest(request.type, rc.path);
break;
}
case OpCode.reconfig: {
lastOp = "RECO";
rsp = new GetDataResponse(
((QuorumZooKeeperServer) zks).self.getQuorumVerifier().toString().getBytes(),
rc.stat);
err = Code.get(rc.err);
break;
}
case OpCode.setACL: {
lastOp = "SETA";
rsp = new SetACLResponse(rc.stat);
err = Code.get(rc.err);
requestPathMetricsCollector.registerRequest(request.type, rc.path);
break;
}
case OpCode.closeSession: {
lastOp = "CLOS";
err = Code.get(rc.err);
break;
}
case OpCode.sync: {
lastOp = "SYNC";
SyncRequest syncRequest = new SyncRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, syncRequest);
rsp = new SyncResponse(syncRequest.getPath());
requestPathMetricsCollector.registerRequest(request.type, syncRequest.getPath());
break;
}
case OpCode.check: {
lastOp = "CHEC";
rsp = new SetDataResponse(rc.stat);
err = Code.get(rc.err);
break;
}
case OpCode.exists: {
lastOp = "EXIS";
// TODO we need to figure out the security requirement for this!
ExistsRequest existsRequest = new ExistsRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, existsRequest);
path = existsRequest.getPath();
if (path.indexOf('\0') != -1) {
throw new KeeperException.BadArgumentsException();
}
Stat stat = zks.getZKDatabase().statNode(path, existsRequest.getWatch() ? cnxn : null);
rsp = new ExistsResponse(stat);
requestPathMetricsCollector.registerRequest(request.type, path);
break;
}
case OpCode.getData: {
lastOp = "GETD";
GetDataRequest getDataRequest = new GetDataRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, getDataRequest);
path = getDataRequest.getPath();
rsp = handleGetDataRequest(getDataRequest, cnxn, request.authInfo);
requestPathMetricsCollector.registerRequest(request.type, path);
break;
}
case OpCode.setWatches: {
lastOp = "SETW";
SetWatches setWatches = new SetWatches();
// TODO We really should NOT need this!!!!
request.request.rewind();
ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
long relativeZxid = setWatches.getRelativeZxid();
zks.getZKDatabase()
.setWatches(
relativeZxid,
setWatches.getDataWatches(),
setWatches.getExistWatches(),
setWatches.getChildWatches(),
cnxn);
break;
}
case OpCode.getACL: {
lastOp = "GETA";
GetACLRequest getACLRequest = new GetACLRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, getACLRequest);
path = getACLRequest.getPath();
DataNode n = zks.getZKDatabase().getNode(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
zks.checkACL(
request.cnxn,
zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.READ | ZooDefs.Perms.ADMIN, request.authInfo, path,
null); Stat stat = new Stat();
List<ACL> acl = zks.getZKDatabase().getACL(path, stat);
requestPathMetricsCollector.registerRequest(request.type, getACLRequest.getPath()); try {
zks.checkACL(
request.cnxn,
zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.ADMIN,
request.authInfo,
path,
null);
rsp = new GetACLResponse(acl, stat);
} catch (KeeperException.NoAuthException e) {
List<ACL> acl1 = new ArrayList<ACL>(acl.size());
for (ACL a : acl) {
if ("digest".equals(a.getId().getScheme())) {
Id id = a.getId();
Id id1 = new Id(id.getScheme(), id.getId().replaceAll(":.*", ":x"));
acl1.add(new ACL(a.getPerms(), id1));
} else {
acl1.add(a);
}
}
rsp = new GetACLResponse(acl1, stat);
}
break;
}
case OpCode.getChildren: {
lastOp = "GETC";
GetChildrenRequest getChildrenRequest = new GetChildrenRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, getChildrenRequest);
path = getChildrenRequest.getPath();
rsp = handleGetChildrenRequest(getChildrenRequest, cnxn, request.authInfo);
requestPathMetricsCollector.registerRequest(request.type, path);
break;
}
case OpCode.getAllChildrenNumber: {
lastOp = "GETACN";
GetAllChildrenNumberRequest getAllChildrenNumberRequest = new GetAllChildrenNumberRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, getAllChildrenNumberRequest);
path = getAllChildrenNumberRequest.getPath();
DataNode n = zks.getZKDatabase().getNode(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
zks.checkACL(
request.cnxn,
zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.READ,
request.authInfo,
path,
null);
int number = zks.getZKDatabase().getAllChildrenNumber(path);
rsp = new GetAllChildrenNumberResponse(number);
break;
}
case OpCode.getChildren2: {
lastOp = "GETC";
GetChildren2Request getChildren2Request = new GetChildren2Request();
ByteBufferInputStream.byteBuffer2Record(request.request, getChildren2Request);
Stat stat = new Stat();
path = getChildren2Request.getPath();
DataNode n = zks.getZKDatabase().getNode(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
zks.checkACL(
request.cnxn,
zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.READ,
request.authInfo, path,
null);
List<String> children = zks.getZKDatabase()
.getChildren(path, stat, getChildren2Request.getWatch() ? cnxn : null);
rsp = new GetChildren2Response(children, stat);
requestPathMetricsCollector.registerRequest(request.type, path);
break;
}
case OpCode.checkWatches: {
lastOp = "CHKW";
CheckWatchesRequest checkWatches = new CheckWatchesRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, checkWatches);
WatcherType type = WatcherType.fromInt(checkWatches.getType());
path = checkWatches.getPath();
boolean containsWatcher = zks.getZKDatabase().containsWatcher(path, type, cnxn);
if (!containsWatcher) {
String msg = String.format(Locale.ENGLISH, "%s (type: %s)", path, type);
throw new KeeperException.NoWatcherException(msg);
}
requestPathMetricsCollector.registerRequest(request.type, checkWatches.getPath());
break;
}
case OpCode.removeWatches: {
lastOp = "REMW";
RemoveWatchesRequest removeWatches = new RemoveWatchesRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, removeWatches);
WatcherType type = WatcherType.fromInt(removeWatches.getType());
path = removeWatches.getPath();
boolean removed = zks.getZKDatabase().removeWatch(path, type, cnxn);
if (!removed) {
String msg = String.format(Locale.ENGLISH, "%s (type: %s)", path, type);
throw new KeeperException.NoWatcherException(msg);
}
requestPathMetricsCollector.registerRequest(request.type, removeWatches.getPath());
break;
}
case OpCode.getEphemerals: {
lastOp = "GETE";
GetEphemeralsRequest getEphemerals = new GetEphemeralsRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, getEphemerals);
String prefixPath = getEphemerals.getPrefixPath();
Set<String> allEphems = zks.getZKDatabase().getDataTree().getEphemerals(request.sessionId);
List<String> ephemerals = new ArrayList<>();
if (StringUtils.isBlank(prefixPath) || "/".equals(prefixPath.trim())) {
ephemerals.addAll(allEphems);
} else {
for (String p : allEphems) {
if (p.startsWith(prefixPath)) {
ephemerals.add(p);
}
}
}
rsp = new GetEphemeralsResponse(ephemerals);
break;
}
}
} catch (SessionMovedException e) {
// session moved is a connection level error, we need to tear
// down the connection otw ZOOKEEPER-710 might happen
// ie client on slow follower starts to renew session, fails
// before this completes, then tries the fast follower (leader)
// and is successful, however the initial renew is then
// successfully fwd/processed by the leader and as a result
// the client and leader disagree on where the client is most
// recently attached (and therefore invalid SESSION MOVED generated)
cnxn.sendCloseSession();
return;
} catch (KeeperException e) {
err = e.code();
} catch (Exception e) {
// log at error level as we are returning a marshalling
// error to the user
LOG.error("Failed to process {}", request, e);
StringBuilder sb = new StringBuilder();
ByteBuffer bb = request.request;
bb.rewind();
while (bb.hasRemaining()) {
sb.append(Integer.toHexString(bb.get() & 0xff));
}
LOG.error("Dumping request buffer: 0x{}", sb.toString());
err = Code.MARSHALLINGERROR;
} ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue());
// 更新状态
updateStats(request, lastOp, lastZxid); try {
if (request.type == OpCode.getData && path != null && rsp != null) {
// Serialized read responses could be cached by the connection object.
// Cache entries are identified by their path and last modified zxid,
// so these values are passed along with the response.
GetDataResponse getDataResponse = (GetDataResponse) rsp;
Stat stat = null;
if (getDataResponse.getStat() != null) {
stat = getDataResponse.getStat();
}
cnxn.sendResponse(hdr, rsp, "response", path, stat);
} else {
// 返回数据给客户端
cnxn.sendResponse(hdr, rsp, "response");
}
if (request.type == OpCode.closeSession) {
cnxn.sendCloseSession();
}
} catch (IOException e) {
LOG.error("FIXMSG", e);
}
} // org.apache.zookeeper.server.ZooKeeperServer#processTxn
// entry point for FinalRequestProcessor.java
public ProcessTxnResult processTxn(Request request) {
TxnHeader hdr = request.getHdr();
processTxnForSessionEvents(request, hdr, request.getTxn()); final boolean writeRequest = (hdr != null);
final boolean quorumRequest = request.isQuorum(); // return fast w/o synchronization when we get a read
if (!writeRequest && !quorumRequest) {
return new ProcessTxnResult();
}
synchronized (outstandingChanges) {
ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn()); // request.hdr is set for write requests, which are the only ones
// that add to outstandingChanges.
if (writeRequest) {
long zxid = hdr.getZxid();
while (!outstandingChanges.isEmpty()
&& outstandingChanges.peek().zxid <= zxid) {
ChangeRecord cr = outstandingChanges.remove();
ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1);
if (cr.zxid < zxid) {
LOG.warn(
"Zxid outstanding 0x{} is less than current 0x{}",
Long.toHexString(cr.zxid),
Long.toHexString(zxid));
}
if (outstandingChangesForPath.get(cr.path) == cr) {
outstandingChangesForPath.remove(cr.path);
}
}
} // do not add non quorum packets to the queue.
// 此处的队列只是为了方便 follower 能够快速复制数据
if (quorumRequest) {
getZKDatabase().addCommittedProposal(request);
}
return rc;
}
}

四、Leader#propose 进行事务操作投票保证数据的高可用性

    // 半数支持原则,主要是 组装 QuorumPacket, 然后发送给各个投票节点
// org.apache.zookeeper.server.quorum.Leader#propose
/**
* create a proposal and send it out to all the members
*
* @param request
* @return the proposal that is queued to send to all the members
*/
public Proposal propose(Request request) throws XidRolloverException {
/**
* Address the rollover issue. All lower 32bits set indicate a new leader
* election. Force a re-election instead. See ZOOKEEPER-1277
*/
if ((request.zxid & 0xffffffffL) == 0xffffffffL) {
String msg = "zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start";
shutdown(msg);
throw new XidRolloverException(msg);
} byte[] data = SerializeUtils.serializeRequest(request);
proposalStats.setLastBufferSize(data.length);
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null); Proposal p = new Proposal();
p.packet = pp;
p.request = request; synchronized (this) {
p.addQuorumVerifier(self.getQuorumVerifier()); if (request.getHdr().getType() == OpCode.reconfig) {
self.setLastSeenQuorumVerifier(request.qv, true);
} if (self.getQuorumVerifier().getVersion() < self.getLastSeenQuorumVerifier().getVersion()) {
p.addQuorumVerifier(self.getLastSeenQuorumVerifier());
} LOG.debug("Proposing:: {}", request); lastProposed = p.packet.getZxid();
outstandingProposals.put(lastProposed, p);
// 发送数据包给各投票节点
sendPacket(pp);
}
ServerMetrics.getMetrics().PROPOSAL_COUNT.add(1);
return p;
} /**
* send a packet to all the followers ready to follow
*
* @param qp
* the packet to be sent
*/
void sendPacket(QuorumPacket qp) {
synchronized (forwardingFollowers) {
for (LearnerHandler f : forwardingFollowers) {
// 添加到 LearnerHandler 的队列,就返回,可见 LearnerHandler 肯定又是一个异步任务
f.queuePacket(qp);
}
}
} // org.apache.zookeeper.server.quorum.LearnerHandler#queuePacket
void queuePacket(QuorumPacket p) {
queuedPackets.add(p);
// Add a MarkerQuorumPacket at regular intervals.
// 周期性的放一个 Marker 到队列中
if (shouldSendMarkerPacketForLogging() && packetCounter.getAndIncrement() % markerPacketInterval == 0) {
queuedPackets.add(new MarkerQuorumPacket(System.nanoTime()));
}
queuedPacketsSize.addAndGet(packetSize(p));
}

五、 LearnerHandler 投票通信线程处理提交过来的投票队列

  上一节我们看到,Leader 只是交事务投票的包添加到 LearnerHandler 的队列后就返回了,所以, LearnerHandler 肯定有自己另外的处理逻辑。否则如何跟其他节点通信呢?

    // org.apache.zookeeper.server.quorum.LearnerHandler#sendPackets
/**
* This method will use the thread to send packets added to the
* queuedPackets list
*
* @throws InterruptedException
*/
private void sendPackets() throws InterruptedException {
long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
// 一个死循环一直处理发送队列
while (true) {
try {
QuorumPacket p;
p = queuedPackets.poll();
if (p == null) {
bufferedOutput.flush();
p = queuedPackets.take();
} ServerMetrics.getMetrics().LEARNER_HANDLER_QP_SIZE.add(Long.toString(this.sid), queuedPackets.size()); // 前面看到会定期的提交 MarkerQuorumPacket 过来,但这里会忽略掉,无实际影响
if (p instanceof MarkerQuorumPacket) {
MarkerQuorumPacket m = (MarkerQuorumPacket) p;
ServerMetrics.getMetrics().LEARNER_HANDLER_QP_TIME
.add(Long.toString(this.sid), (System.nanoTime() - m.time) / 1000000L);
continue;
} queuedPacketsSize.addAndGet(-packetSize(p));
if (p == proposalOfDeath) {
// Packet of death!
break;
}
// ping 包
if (p.getType() == Leader.PING) {
traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
}
// 投票包,我们要观察的包在这里
if (p.getType() == Leader.PROPOSAL) {
syncLimitCheck.updateProposal(p.getZxid(), System.nanoTime());
}
if (LOG.isTraceEnabled()) {
ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p);
} // Log the zxid of the last request, if it is a valid zxid.
if (p.getZxid() > 0) {
lastZxid = p.getZxid();
}
// 往socket中写入投票请求
oa.writeRecord(p, "packet");
packetsSent.incrementAndGet();
messageTracker.trackSent(p.getType());
} catch (IOException e) {
if (!sock.isClosed()) {
LOG.warn("Unexpected exception at {}", this, e);
try {
// this will cause everything to shutdown on
// this learner handler and will help notify
// the learner/observer instantaneously
sock.close();
} catch (IOException ie) {
LOG.warn("Error closing socket for handler {}", this, ie);
}
}
break;
}
}
}
// 上面这个发送线程,是在 LearnerHandler 被调用时生成的
/**
* Start thread that will forward any packet in the queue to the follower
*/
protected void startSendingPackets() {
if (!sendingThreadStarted) {
// Start sending packets
new Thread() {
public void run() {
Thread.currentThread().setName("Sender-" + sock.getRemoteSocketAddress());
try {
sendPackets();
} catch (InterruptedException e) {
LOG.warn("Unexpected interruption", e);
}
}
}.start();
sendingThreadStarted = true;
} else {
LOG.error("Attempting to start sending thread after it already started");
}
} /**
* This thread will receive packets from the peer and process them and
* also listen to new connections from new peers.
*/
@Override
public void run() {
try {
learnerMaster.addLearnerHandler(this);
tickOfNextAckDeadline = learnerMaster.getTickOfInitialAckDeadline(); ia = BinaryInputArchive.getArchive(bufferedInput);
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
oa = BinaryOutputArchive.getArchive(bufferedOutput); QuorumPacket qp = new QuorumPacket();
ia.readRecord(qp, "packet"); messageTracker.trackReceived(qp.getType());
if (qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO) {
LOG.error("First packet {} is not FOLLOWERINFO or OBSERVERINFO!", qp.toString()); return;
} if (learnerMaster instanceof ObserverMaster && qp.getType() != Leader.OBSERVERINFO) {
throw new IOException("Non observer attempting to connect to ObserverMaster. type = " + qp.getType());
}
byte[] learnerInfoData = qp.getData();
if (learnerInfoData != null) {
ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
if (learnerInfoData.length >= 8) {
this.sid = bbsid.getLong();
}
if (learnerInfoData.length >= 12) {
this.version = bbsid.getInt(); // protocolVersion
}
if (learnerInfoData.length >= 20) {
long configVersion = bbsid.getLong();
if (configVersion > learnerMaster.getQuorumVerifierVersion()) {
throw new IOException("Follower is ahead of the leader (has a later activated configuration)");
}
}
} else {
this.sid = learnerMaster.getAndDecrementFollowerCounter();
} String followerInfo = learnerMaster.getPeerInfo(this.sid);
if (followerInfo.isEmpty()) {
LOG.info(
"Follower sid: {} not in the current config {}",
this.sid,
Long.toHexString(learnerMaster.getQuorumVerifierVersion()));
} else {
LOG.info("Follower sid: {} : info : {}", this.sid, followerInfo);
} if (qp.getType() == Leader.OBSERVERINFO) {
learnerType = LearnerType.OBSERVER;
} learnerMaster.registerLearnerHandlerBean(this, sock); long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); long peerLastZxid;
StateSummary ss = null;
long zxid = qp.getZxid();
long newEpoch = learnerMaster.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0); if (this.getVersion() < 0x10000) {
// we are going to have to extrapolate the epoch information
long epoch = ZxidUtils.getEpochFromZxid(zxid);
ss = new StateSummary(epoch, zxid);
// fake the message
learnerMaster.waitForEpochAck(this.getSid(), ss);
} else {
byte[] ver = new byte[4];
ByteBuffer.wrap(ver).putInt(0x10000);
QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
oa.writeRecord(newEpochPacket, "packet");
messageTracker.trackSent(Leader.LEADERINFO);
bufferedOutput.flush();
QuorumPacket ackEpochPacket = new QuorumPacket();
ia.readRecord(ackEpochPacket, "packet");
messageTracker.trackReceived(ackEpochPacket.getType());
if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
LOG.error("{} is not ACKEPOCH", ackEpochPacket.toString());
return;
}
ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
learnerMaster.waitForEpochAck(this.getSid(), ss);
}
peerLastZxid = ss.getLastZxid(); // Take any necessary action if we need to send TRUNC or DIFF
// startForwarding() will be called in all cases
boolean needSnap = syncFollower(peerLastZxid, learnerMaster); // syncs between followers and the leader are exempt from throttling because it
// is importatnt to keep the state of quorum servers up-to-date. The exempted syncs
// are counted as concurrent syncs though
boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
/* if we are not truncating or sending a diff just send a snapshot */
if (needSnap) {
syncThrottler = learnerMaster.getLearnerSnapSyncThrottler();
syncThrottler.beginSync(exemptFromThrottle);
try {
long zxidToSend = learnerMaster.getZKDatabase().getDataTreeLastProcessedZxid();
oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
messageTracker.trackSent(Leader.SNAP);
bufferedOutput.flush(); LOG.info(
"Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, "
+ "send zxid of db as 0x{}, {} concurrent snapshot sync, "
+ "snapshot sync was {} from throttle",
Long.toHexString(peerLastZxid),
Long.toHexString(leaderLastZxid),
Long.toHexString(zxidToSend),
syncThrottler.getSyncInProgress(),
exemptFromThrottle ? "exempt" : "not exempt");
// Dump data to peer
learnerMaster.getZKDatabase().serializeSnapshot(oa);
oa.writeString("BenWasHere", "signature");
bufferedOutput.flush();
} finally {
ServerMetrics.getMetrics().SNAP_COUNT.add(1);
}
} else {
syncThrottler = learnerMaster.getLearnerDiffSyncThrottler();
syncThrottler.beginSync(exemptFromThrottle);
ServerMetrics.getMetrics().DIFF_COUNT.add(1);
} LOG.debug("Sending NEWLEADER message to {}", sid);
// the version of this quorumVerifier will be set by leader.lead() in case
// the leader is just being established. waitForEpochAck makes sure that readyToStart is true if
// we got here, so the version was set
if (getVersion() < 0x10000) {
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, null, null);
oa.writeRecord(newLeaderQP, "packet");
} else {
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, learnerMaster.getQuorumVerifierBytes(), null);
queuedPackets.add(newLeaderQP);
}
bufferedOutput.flush(); // Start thread that blast packets in the queue to learner
startSendingPackets(); /*
* Have to wait for the first ACK, wait until
* the learnerMaster is ready, and only then we can
* start processing messages.
*/
qp = new QuorumPacket();
ia.readRecord(qp, "packet"); messageTracker.trackReceived(qp.getType());
if (qp.getType() != Leader.ACK) {
LOG.error("Next packet was supposed to be an ACK, but received packet: {}", packetToString(qp));
return;
} LOG.debug("Received NEWLEADER-ACK message from {}", sid); learnerMaster.waitForNewLeaderAck(getSid(), qp.getZxid()); syncLimitCheck.start();
// sync ends when NEWLEADER-ACK is received
syncThrottler.endSync();
syncThrottler = null; // now that the ack has been processed expect the syncLimit
sock.setSoTimeout(learnerMaster.syncTimeout()); /*
* Wait until learnerMaster starts up
*/
learnerMaster.waitForStartup(); // Mutation packets will be queued during the serialize,
// so we need to mark when the peer can actually start
// using the data
//
LOG.debug("Sending UPTODATE message to {}", sid);
queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null)); while (true) {
qp = new QuorumPacket();
ia.readRecord(qp, "packet");
messageTracker.trackReceived(qp.getType()); long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
if (qp.getType() == Leader.PING) {
traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
}
if (LOG.isTraceEnabled()) {
ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
}
tickOfNextAckDeadline = learnerMaster.getTickOfNextAckDeadline(); packetsReceived.incrementAndGet(); ByteBuffer bb;
long sessionId;
int cxid;
int type; switch (qp.getType()) {
case Leader.ACK:
if (this.learnerType == LearnerType.OBSERVER) {
LOG.debug("Received ACK from Observer {}", this.sid);
}
syncLimitCheck.updateAck(qp.getZxid());
// 提交 ack, 即收到了一提交的请求,则调用 Leader 进行处理
learnerMaster.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
break;
case Leader.PING:
// Process the touches
ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
DataInputStream dis = new DataInputStream(bis);
while (dis.available() > 0) {
long sess = dis.readLong();
int to = dis.readInt();
learnerMaster.touch(sess, to);
}
break;
case Leader.REVALIDATE:
ServerMetrics.getMetrics().REVALIDATE_COUNT.add(1);
learnerMaster.revalidateSession(qp, this);
break;
case Leader.REQUEST:
bb = ByteBuffer.wrap(qp.getData());
sessionId = bb.getLong();
cxid = bb.getInt();
type = bb.getInt();
bb = bb.slice();
Request si;
if (type == OpCode.sync) {
si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
} else {
si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
}
si.setOwner(this);
learnerMaster.submitLearnerRequest(si);
requestsReceived.incrementAndGet();
break;
default:
LOG.warn("unexpected quorum packet, type: {}", packetToString(qp));
break;
}
}
} catch (IOException e) {
if (sock != null && !sock.isClosed()) {
LOG.error("Unexpected exception causing shutdown while sock still open", e);
//close the socket to make sure the
//other side can see it being close
try {
sock.close();
} catch (IOException ie) {
// do nothing
}
}
} catch (InterruptedException e) {
LOG.error("Unexpected exception in LearnerHandler.", e);
} catch (SyncThrottleException e) {
LOG.error("too many concurrent sync.", e);
syncThrottler = null;
} catch (Exception e) {
LOG.error("Unexpected exception in LearnerHandler.", e);
throw e;
} finally {
if (syncThrottler != null) {
syncThrottler.endSync();
syncThrottler = null;
}
String remoteAddr = getRemoteAddress();
LOG.warn("******* GOODBYE {} ********", remoteAddr);
messageTracker.dumpToLog(remoteAddr);
shutdown();
}
} // org.apache.zookeeper.server.quorum.Leader#processAck
/**
* Keep a count of acks that are received by the leader for a particular
* proposal
*
* @param zxid, the zxid of the proposal sent out
* @param sid, the id of the server that sent the ack
* @param followerAddr
*/
@Override
public synchronized void processAck(long sid, long zxid, SocketAddress followerAddr) {
if (!allowedToCommit) {
return; // last op committed was a leader change - from now on
}
// the new leader should commit
if (LOG.isTraceEnabled()) {
LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid));
for (Proposal p : outstandingProposals.values()) {
long packetZxid = p.packet.getZxid();
LOG.trace("outstanding proposal: 0x{}", Long.toHexString(packetZxid));
}
LOG.trace("outstanding proposals all");
} if ((zxid & 0xffffffffL) == 0) {
/*
* We no longer process NEWLEADER ack with this method. However,
* the learner sends an ack back to the leader after it gets
* UPTODATE, so we just ignore the message.
*/
return;
} if (outstandingProposals.size() == 0) {
LOG.debug("outstanding is 0");
return;
}
if (lastCommitted >= zxid) {
LOG.debug(
"proposal has already been committed, pzxid: 0x{} zxid: 0x{}",
Long.toHexString(lastCommitted),
Long.toHexString(zxid));
// The proposal has already been committed
return;
}
Proposal p = outstandingProposals.get(zxid);
if (p == null) {
LOG.warn("Trying to commit future proposal: zxid 0x{} from {}", Long.toHexString(zxid), followerAddr);
return;
} if (ackLoggingFrequency > 0 && (zxid % ackLoggingFrequency == 0)) {
p.request.logLatency(ServerMetrics.getMetrics().ACK_LATENCY, Long.toString(sid));
} p.addAck(sid); // 尝试 commit
boolean hasCommitted = tryToCommit(p, zxid, followerAddr); // If p is a reconfiguration, multiple other operations may be ready to be committed,
// since operations wait for different sets of acks.
// Currently we only permit one outstanding reconfiguration at a time
// such that the reconfiguration and subsequent outstanding ops proposed while the reconfig is
// pending all wait for a quorum of old and new config, so its not possible to get enough acks
// for an operation without getting enough acks for preceding ops. But in the future if multiple
// concurrent reconfigs are allowed, this can happen and then we need to check whether some pending
// ops may already have enough acks and can be committed, which is what this code does. if (hasCommitted && p.request != null && p.request.getHdr().getType() == OpCode.reconfig) {
long curZxid = zxid;
while (allowedToCommit && hasCommitted && p != null) {
curZxid++;
p = outstandingProposals.get(curZxid);
if (p != null) {
hasCommitted = tryToCommit(p, curZxid, null);
}
}
}
} // org.apache.zookeeper.server.quorum.Leader#tryToCommit
/**
* @return True if committed, otherwise false.
**/
public synchronized boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) {
// make sure that ops are committed in order. With reconfigurations it is now possible
// that different operations wait for different sets of acks, and we still want to enforce
// that they are committed in order. Currently we only permit one outstanding reconfiguration
// such that the reconfiguration and subsequent outstanding ops proposed while the reconfig is
// pending all wait for a quorum of old and new config, so it's not possible to get enough acks
// for an operation without getting enough acks for preceding ops. But in the future if multiple
// concurrent reconfigs are allowed, this can happen.
if (outstandingProposals.containsKey(zxid - 1)) {
return false;
} // in order to be committed, a proposal must be accepted by a quorum.
//
// getting a quorum from all necessary configurations.
if (!p.hasAllQuorums()) {
return false;
} // commit proposals in order
if (zxid != lastCommitted + 1) {
LOG.warn(
"Commiting zxid 0x{} from {} noy first!",
Long.toHexString(zxid),
followerAddr);
LOG.warn("First is {}", (lastCommitted + 1));
} outstandingProposals.remove(zxid); if (p.request != null) {
toBeApplied.add(p);
} if (p.request == null) {
LOG.warn("Going to commit null: {}", p);
} else if (p.request.getHdr().getType() == OpCode.reconfig) {
LOG.debug("Committing a reconfiguration! {}", outstandingProposals.size()); //if this server is voter in new config with the same quorum address,
//then it will remain the leader
//otherwise an up-to-date follower will be designated as leader. This saves
//leader election time, unless the designated leader fails
Long designatedLeader = getDesignatedLeader(p, zxid);
//LOG.warn("designated leader is: " + designatedLeader); QuorumVerifier newQV = p.qvAcksetPairs.get(p.qvAcksetPairs.size() - 1).getQuorumVerifier(); self.processReconfig(newQV, designatedLeader, zk.getZxid(), true); if (designatedLeader != self.getId()) {
allowedToCommit = false;
} // we're sending the designated leader, and if the leader is changing the followers are
// responsible for closing the connection - this way we are sure that at least a majority of them
// receive the commit message.
commitAndActivate(zxid, designatedLeader);
informAndActivate(p, designatedLeader);
//turnOffFollowers();
} else {
p.request.logLatency(ServerMetrics.getMetrics().QUORUM_ACK_LATENCY);
commit(zxid);
inform(p);
}
// 依次提交事务 request, 其实就是放到 CommitProcessor 的提交队列
zk.commitProcessor.commit(p.request);
if (pendingSyncs.containsKey(zxid)) {
// 为 Learner 发送 SYNC 通知
for (LearnerSyncRequest r : pendingSyncs.remove(zxid)) {
sendSync(r);
}
} return true;
}

六、 SyncRequestProcessor 数据持久化处理器

    // org.apache.zookeeper.server.quorum.ProposalRequestProcessor#processRequest
public void processRequest(Request request) throws RequestProcessorException {
// LOG.warn("Ack>>> cxid = " + request.cxid + " type = " +
// request.type + " id = " + request.sessionId);
// request.addRQRec(">prop"); /* In the following IF-THEN-ELSE block, we process syncs on the leader.
* If the sync is coming from a follower, then the follower
* handler adds it to syncHandler. Otherwise, if it is a client of
* the leader that issued the sync command, then syncHandler won't
* contain the handler. In this case, we add it to syncHandler, and
* call processRequest on the next processor.
*/ if (request instanceof LearnerSyncRequest) {
zks.getLeader().processSync((LearnerSyncRequest) request);
} else {
nextProcessor.processRequest(request);
if (request.getHdr() != null) {
// We need to sync and get consensus on any transactions
try {
zks.getLeader().propose(request);
} catch (XidRolloverException e) {
throw new RequestProcessorException(e.getMessage(), e);
}
// 只有写事务,才需要进行持久化操作
syncProcessor.processRequest(request);
}
}
} // org.apache.zookeeper.server.SyncRequestProcessor#processRequest
public void processRequest(final Request request) {
Objects.requireNonNull(request, "Request cannot be null"); request.syncQueueStartTime = Time.currentElapsedTime();
// 添加到队列后返回
queuedRequests.add(request);
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUED.add(1);
} // org.apache.zookeeper.server.SyncRequestProcessor#run
@Override
public void run() {
try {
// we do this in an attempt to ensure that not all of the servers
// in the ensemble take a snapshot at the same time
resetSnapshotStats();
lastFlushTime = Time.currentElapsedTime();
while (true) {
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size()); long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay());
// 从 queuedRequests 队列取数据
Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS);
if (si == null) {
/* We timed out looking for more writes to batch, go ahead and flush immediately */
flush();
si = queuedRequests.take();
} if (si == REQUEST_OF_DEATH) {
break;
} long startProcessTime = Time.currentElapsedTime();
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime); // track the number of records written to the log
// 添加到 事务日志中
if (zks.getZKDatabase().append(si)) {
if (shouldSnapshot()) {
resetSnapshotStats();
// roll the log
zks.getZKDatabase().rollLog();
// take a snapshot
if (!snapThreadMutex.tryAcquire()) {
LOG.warn("Too busy to snap, skipping");
} else {
new ZooKeeperThread("Snapshot Thread") {
public void run() {
try {
zks.takeSnapshot();
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
} finally {
snapThreadMutex.release();
}
}
}.start();
}
}
} else if (toFlush.isEmpty()) {
// optimization for read heavy workloads
// iff this is a read, and there are no pending
// flushes (writes), then just pass this to the next
// processor
if (nextProcessor != null) {
nextProcessor.processRequest(si);
if (nextProcessor instanceof Flushable) {
((Flushable) nextProcessor).flush();
}
}
continue;
}
toFlush.add(si);
// 2
// 刷写数据到磁盘,完成 Sync 操作
if (shouldFlush()) {
flush();
}
ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime);
}
} catch (Throwable t) {
handleException(this.getName(), t);
}
LOG.info("SyncRequestProcessor exited!");
} /** If both flushDelay and maxMaxBatchSize are set (bigger than 0), flush
* whenever either condition is hit. If only one or the other is
* set, flush only when the relevant condition is hit.
*/
private boolean shouldFlush() {
long flushDelay = zks.getFlushDelay();
long maxBatchSize = zks.getMaxBatchSize();
if ((flushDelay > 0) && (getRemainingDelay() == 0)) {
return true;
}
return (maxBatchSize > 0) && (toFlush.size() >= maxBatchSize);
}

  至此,一个数据的更新操作就完成了。看起来是有点复杂呢。

  主要有:

    1. 判断写事务;
    2. 发起投票操作;
    3. 发送 ack 回应;
    4. 可以提交事务, 交给 CommitProcessor;
    5. 刷写数据到磁盘;

七、 等等,各个事务的投票是如何处理的?

  是的,前面我们看到通过队列,将请求包给到了 LearnerHandler 的两个线程。顶多也只看到了发送的过程,并未看到有什么超过半数投票之类的动作,它在哪里呢?

  其实是在 tryToCommit 的时候进行判断的。

    // org.apache.zookeeper.server.quorum.Leader#tryToCommit
/**
* @return True if committed, otherwise false.
**/
public synchronized boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) {
// make sure that ops are committed in order. With reconfigurations it is now possible
// that different operations wait for different sets of acks, and we still want to enforce
// that they are committed in order. Currently we only permit one outstanding reconfiguration
// such that the reconfiguration and subsequent outstanding ops proposed while the reconfig is
// pending all wait for a quorum of old and new config, so it's not possible to get enough acks
// for an operation without getting enough acks for preceding ops. But in the future if multiple
// concurrent reconfigs are allowed, this can happen.
if (outstandingProposals.containsKey(zxid - 1)) {
return false;
} // in order to be committed, a proposal must be accepted by a quorum.
//
// getting a quorum from all necessary configurations.
// 确认是否达到了可以通过的确认数量,如果没有则不提交。
// 如果超过了半数提交,则直接进入提交后续,当然,这会有并发控制
if (!p.hasAllQuorums()) {
return false;
} // commit proposals in order
if (zxid != lastCommitted + 1) {
LOG.warn(
"Commiting zxid 0x{} from {} noy first!",
Long.toHexString(zxid),
followerAddr);
LOG.warn("First is {}", (lastCommitted + 1));
} outstandingProposals.remove(zxid); if (p.request != null) {
toBeApplied.add(p);
} if (p.request == null) {
LOG.warn("Going to commit null: {}", p);
} else if (p.request.getHdr().getType() == OpCode.reconfig) {
LOG.debug("Committing a reconfiguration! {}", outstandingProposals.size()); //if this server is voter in new config with the same quorum address,
//then it will remain the leader
//otherwise an up-to-date follower will be designated as leader. This saves
//leader election time, unless the designated leader fails
Long designatedLeader = getDesignatedLeader(p, zxid);
//LOG.warn("designated leader is: " + designatedLeader); QuorumVerifier newQV = p.qvAcksetPairs.get(p.qvAcksetPairs.size() - 1).getQuorumVerifier(); self.processReconfig(newQV, designatedLeader, zk.getZxid(), true); if (designatedLeader != self.getId()) {
allowedToCommit = false;
} // we're sending the designated leader, and if the leader is changing the followers are
// responsible for closing the connection - this way we are sure that at least a majority of them
// receive the commit message.
commitAndActivate(zxid, designatedLeader);
informAndActivate(p, designatedLeader);
//turnOffFollowers();
} else {
p.request.logLatency(ServerMetrics.getMetrics().QUORUM_ACK_LATENCY);
// 提交后,会把 lastCommitted 设置为当前 zxid
// 会给 各节点发送 COMMIT 信息提交
commit(zxid);
inform(p);
}
zk.commitProcessor.commit(p.request);
if (pendingSyncs.containsKey(zxid)) {
for (LearnerSyncRequest r : pendingSyncs.remove(zxid)) {
sendSync(r);
}
} return true;
}
// org.apache.zookeeper.server.quorum.SyncedLearnerTracker#hasAllQuorums
public boolean hasAllQuorums() {
for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) {
// 此处是判断是否超过半数的请求被 ack
if (!qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset())) {
return false;
}
}
// 如果超过半数 ack, 则进行提交动作
return true;
}
// org.apache.zookeeper.server.quorum.flexible.QuorumMaj#containsQuorum
/**
* Verifies if a set is a majority. Assumes that ackSet contains acks only
* from votingMembers
*/
public boolean containsQuorum(Set<Long> ackSet) {
return (ackSet.size() > half);
}
// 通知提交事务
// org.apache.zookeeper.server.quorum.Leader#commit
/**
* Create a commit packet and send it to all the members of the quorum
*
* @param zxid
*/
public void commit(long zxid) {
synchronized (this) {
lastCommitted = zxid;
}
QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
sendPacket(qp);
ServerMetrics.getMetrics().COMMIT_COUNT.add(1);
}
// 从上面可以看出,每收到一个节点的响应,就会走一次提交判断逻辑,且只要超过一半就会进行提交,如果不作后续限制,则肯定会出重复提交问题,这个问题是在 processAck 里通过 zxid 来判断的 /**
* Keep a count of acks that are received by the leader for a particular
* proposal
*
* @param zxid, the zxid of the proposal sent out
* @param sid, the id of the server that sent the ack
* @param followerAddr
*/
@Override
public synchronized void processAck(long sid, long zxid, SocketAddress followerAddr) {
if (!allowedToCommit) {
return; // last op committed was a leader change - from now on
}
// the new leader should commit
if (LOG.isTraceEnabled()) {
LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid));
for (Proposal p : outstandingProposals.values()) {
long packetZxid = p.packet.getZxid();
LOG.trace("outstanding proposal: 0x{}", Long.toHexString(packetZxid));
}
LOG.trace("outstanding proposals all");
} if ((zxid & 0xffffffffL) == 0) {
/*
* We no longer process NEWLEADER ack with this method. However,
* the learner sends an ack back to the leader after it gets
* UPTODATE, so we just ignore the message.
*/
return;
} if (outstandingProposals.size() == 0) {
LOG.debug("outstanding is 0");
return;
}
// 如果已提交,则 lastCommitted 会变大,从而使本次提交失效返回
if (lastCommitted >= zxid) {
LOG.debug(
"proposal has already been committed, pzxid: 0x{} zxid: 0x{}",
Long.toHexString(lastCommitted),
Long.toHexString(zxid));
// The proposal has already been committed
return;
}
Proposal p = outstandingProposals.get(zxid);
if (p == null) {
LOG.warn("Trying to commit future proposal: zxid 0x{} from {}", Long.toHexString(zxid), followerAddr);
return;
} if (ackLoggingFrequency > 0 && (zxid % ackLoggingFrequency == 0)) {
p.request.logLatency(ServerMetrics.getMetrics().ACK_LATENCY, Long.toString(sid));
} p.addAck(sid); boolean hasCommitted = tryToCommit(p, zxid, followerAddr); // If p is a reconfiguration, multiple other operations may be ready to be committed,
// since operations wait for different sets of acks.
// Currently we only permit one outstanding reconfiguration at a time
// such that the reconfiguration and subsequent outstanding ops proposed while the reconfig is
// pending all wait for a quorum of old and new config, so its not possible to get enough acks
// for an operation without getting enough acks for preceding ops. But in the future if multiple
// concurrent reconfigs are allowed, this can happen and then we need to check whether some pending
// ops may already have enough acks and can be committed, which is what this code does. if (hasCommitted && p.request != null && p.request.getHdr().getType() == OpCode.reconfig) {
long curZxid = zxid;
while (allowedToCommit && hasCommitted && p != null) {
curZxid++;
p = outstandingProposals.get(curZxid);
if (p != null) {
hasCommitted = tryToCommit(p, curZxid, null);
}
}
}
}

  至此,整体流程完成!Leader 任务完成, Follower 的流程略有差异。但是肯定没Leader的复杂,感兴趣同学可以去看看。

  虽然一个看起来很简单的更新动作,在这里显得如此之复杂,但是在准确性和高可用面前,内部的复杂性是必要的。至少在你使用时只需set一下就可以了,却享受了api带给你的关键保障,这也是基础设施给我们带来的好处。

最新文章

  1. OpenGL帧缓存对象(FBO:Frame Buffer Object)(转载)
  2. C语言基础(6)-char类型
  3. iOS10 SiriKit QQ适配详解
  4. [读书笔记]自动装箱的陷阱以及==与equals
  5. 用MathType编辑横三角形的方法
  6. MyBatis环境配置
  7. Law of total probability
  8. 高效率JAVA实现斐波那契
  9. HTTP gzip和deflate的几点区别
  10. 【M30】代理类
  11. Photoshop:制作方块背景
  12. Java.lang.RuntimeException: Unable to instantiate activity ComponentInfo
  13. Asp.net MVC入门视频教程
  14. unsigned int 转 RGB
  15. ajax+json+java
  16. c# 多线程 --Mutex(互斥锁) 【转】
  17. mac os app 开发
  18. Java中的深克隆和浅克隆
  19. CentOS下redis-cli安装
  20. TCP编程实践小结1

热门文章

  1. centos7 redis 6379端口telnet不通
  2. 《Java练习题》习题集一
  3. 爬取https://www.parenting.com/baby-names/boys/earl网站top10男女生名字及相关信息
  4. Spring Cloud进阶之路 | 二:服务提供者(discovery)
  5. python 虚拟环境安装与卸载
  6. [AI开发]DeepStream开发填坑记录
  7. Mysql 的异常:The last packet successfully received from the server was 90 milliseconds ago. The last packet sent successfully to the server was 43,603,303 milliseconds ago. is longer than the server con
  8. (2018版)webstorm的安装和破解
  9. Linux下使用docker 拉取 vsftpd 镜像搭建 Ftp 服务器,连接 Ftp 时遇到的错误(425 Failed to establish connection)
  10. Hyperledger Fabric私有数据