ZooKeeper(五):事务处理之更新数据逻辑解析
通过前些文章,我们已经完全从整体架构和数据接入方面理解了ZK的前情工作。接下来,我们就来看ZK的正式工作吧。
本文以 setData /a data 这个命令作为出发点,来观察zk是如何处理来自客户端的数据更新操作的吧!
首先,我们需要明确各个角色所担任的工作,然后才能更好的理解其工作流程。我们以 Leader 为当前客户端连接的角色,揭开其 更新数据的面纱。其责任链是这样的:
LeaderRequestProcessor -> PrepRequestProcessor -> ProposalRequestProcessor -> CommitProcessor -> ToBeAppliedRequestProcessor -> FinalRequestProcessor
当然了, LeaderRequestProcessor 仅起到一个流程转发的作用,我们也不管它了。所以,起点是 PrepRequestProcessor 。
一、 PrepRequestProcessor 初步处理数据
有一点明确的是,每个独立运行的 RequestProcessor, 都是通过队列与一个 RequestProcessor 通信的。 PrepRequestProcessor 也一样,在接到 LeaderRequestProcessor sumit 过来的数据后,开始自己的工作。
// 提交任务为直接添加到队列过程
// org.apache.zookeeper.server.PrepRequestProcessor#processRequest
public void processRequest(Request request) {
request.prepQueueStartTime = Time.currentElapsedTime();
submittedRequests.add(request);
ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUED.add(1);
}
// org.apache.zookeeper.server.PrepRequestProcessor#run
@Override
public void run() {
try {
// 永远是死循环处理任务
while (true) {
ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_SIZE.add(submittedRequests.size());
// submittedRequests 阻塞队列获取
Request request = submittedRequests.take();
ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_TIME
.add(Time.currentElapsedTime() - request.prepQueueStartTime);
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) {
traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
}
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
}
if (Request.requestOfDeath == request) {
break;
} request.prepStartTime = Time.currentElapsedTime();
// 主要处理逻辑交由 pRequest
pRequest(request);
}
} catch (Exception e) {
handleException(this.getName(), e);
}
LOG.info("PrepRequestProcessor exited loop!");
} /**
* This method will be called inside the ProcessRequestThread, which is a
* singleton, so there will be a single thread calling this code.
*
* @param request
*/
protected void pRequest(Request request) throws RequestProcessorException {
// LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
// request.type + " id = 0x" + Long.toHexString(request.sessionId));
request.setHdr(null);
request.setTxn(null); try {
switch (request.type) {
case OpCode.createContainer:
case OpCode.create:
case OpCode.create2:
CreateRequest create2Request = new CreateRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);
break;
case OpCode.createTTL:
CreateTTLRequest createTtlRequest = new CreateTTLRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest, true);
break;
case OpCode.deleteContainer:
case OpCode.delete:
DeleteRequest deleteRequest = new DeleteRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true);
break;
// setData 的 type = 5
case OpCode.setData:
SetDataRequest setDataRequest = new SetDataRequest();
// 针写数据请求,转交给 2Txn 处理
pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);
break;
case OpCode.reconfig:
ReconfigRequest reconfigRequest = new ReconfigRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, reconfigRequest);
pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true);
break;
case OpCode.setACL:
SetACLRequest setAclRequest = new SetACLRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true);
break;
case OpCode.check:
CheckVersionRequest checkRequest = new CheckVersionRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true);
break;
case OpCode.multi:
MultiOperationRecord multiRequest = new MultiOperationRecord();
try {
ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);
} catch (IOException e) {
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), Time.currentWallTime(), OpCode.multi));
throw e;
}
List<Txn> txns = new ArrayList<Txn>();
//Each op in a multi-op must have the same zxid!
long zxid = zks.getNextZxid();
KeeperException ke = null; //Store off current pending change records in case we need to rollback
Map<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest); for (Op op : multiRequest) {
Record subrequest = op.toRequestRecord();
int type;
Record txn; /* If we've already failed one of the ops, don't bother
* trying the rest as we know it's going to fail and it
* would be confusing in the logfiles.
*/
if (ke != null) {
type = OpCode.error;
txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());
} else {
/* Prep the request and convert to a Txn */
try {
pRequest2Txn(op.getType(), zxid, request, subrequest, false);
type = request.getHdr().getType();
txn = request.getTxn();
} catch (KeeperException e) {
ke = e;
type = OpCode.error;
txn = new ErrorTxn(e.code().intValue()); if (e.code().intValue() > Code.APIERROR.intValue()) {
LOG.info("Got user-level KeeperException when processing {} aborting"
+ " remaining multi ops. Error Path:{} Error:{}",
request.toString(),
e.getPath(),
e.getMessage());
} request.setException(e); /* Rollback change records from failed multi-op */
rollbackPendingChanges(zxid, pendingChanges);
}
} // TODO: I don't want to have to serialize it here and then
// immediately deserialize in next processor. But I'm
// not sure how else to get the txn stored into our list.
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
txn.serialize(boa, "request");
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
txns.add(new Txn(type, bb.array()));
}
} request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), request.type));
request.setTxn(new MultiTxn(txns)); break; //create/close session don't require request record
case OpCode.createSession:
case OpCode.closeSession:
if (!request.isLocalSession()) {
pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
}
break; //All the rest don't need to create a Txn - just verify session
case OpCode.sync:
case OpCode.exists:
case OpCode.getData:
case OpCode.getACL:
case OpCode.getChildren:
case OpCode.getAllChildrenNumber:
case OpCode.getChildren2:
case OpCode.ping:
case OpCode.setWatches:
case OpCode.checkWatches:
case OpCode.removeWatches:
case OpCode.getEphemerals:
case OpCode.multiRead:
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
break;
default:
LOG.warn("unknown type {}", request.type);
break;
}
} catch (KeeperException e) {
if (request.getHdr() != null) {
request.getHdr().setType(OpCode.error);
request.setTxn(new ErrorTxn(e.code().intValue()));
} if (e.code().intValue() > Code.APIERROR.intValue()) {
LOG.info(
"Got user-level KeeperException when processing {} Error Path:{} Error:{}",
request.toString(),
e.getPath(),
e.getMessage());
}
request.setException(e);
} catch (Exception e) {
// log at error level as we are returning a marshalling
// error to the user
LOG.error("Failed to process {}", request, e); StringBuilder sb = new StringBuilder();
ByteBuffer bb = request.request;
if (bb != null) {
bb.rewind();
while (bb.hasRemaining()) {
sb.append(Integer.toHexString(bb.get() & 0xff));
}
} else {
sb.append("request buffer is null");
} LOG.error("Dumping request buffer: 0x{}", sb.toString());
if (request.getHdr() != null) {
request.getHdr().setType(OpCode.error);
request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue()));
}
}
// 将新的事务id获取到,传入下一个 RequestProcessor
request.zxid = zks.getZxid();
ServerMetrics.getMetrics().PREP_PROCESS_TIME.add(Time.currentElapsedTime() - request.prepStartTime);
// 这里的 nextProcessor 是 ProposalRequestProcessor
nextProcessor.processRequest(request);
} /**
* This method will be called inside the ProcessRequestThread, which is a
* singleton, so there will be a single thread calling this code.
*
* @param type
* @param zxid
* @param request
* @param record
*/
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException {
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type)); // 再次进行类型判定,确认处理逻辑
switch (type) {
case OpCode.create:
case OpCode.create2:
case OpCode.createTTL:
case OpCode.createContainer: {
pRequest2TxnCreate(type, request, record, deserialize);
break;
}
case OpCode.deleteContainer: {
// ...
break;
}
case OpCode.delete:
//...
break;
// 同样我们只看 setData 情形
case OpCode.setData:
// 分几步走:
// 1. 检查session;
// 2. 反序列化数据;
// 3. 路径检查;
// 4. 检查权限;
// 5. 检查版本号;
// 6. 添加到变更队列中,备用
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
SetDataRequest setDataRequest = (SetDataRequest) record;
if (deserialize) {
ByteBufferInputStream.byteBuffer2Record(request.request, setDataRequest);
}
path = setDataRequest.getPath();
validatePath(path, request.sessionId);
nodeRecord = getRecordForPath(path);
zks.checkACL(request.cnxn, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo, path, null);
int newVersion = checkAndIncVersion(nodeRecord.stat.getVersion(), setDataRequest.getVersion(), path);
request.setTxn(new SetDataTxn(path, setDataRequest.getData(), newVersion));
nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());
nodeRecord.stat.setVersion(newVersion);
addChangeRecord(nodeRecord);
break;
case OpCode.reconfig:
//...
addChangeRecord(nodeRecord);
break;
case OpCode.setACL:
// ...
addChangeRecord(nodeRecord);
break;
case OpCode.createSession:
// ...
break;
case OpCode.closeSession:
// ...
break;
case OpCode.check:
// ...
break;
default:
LOG.warn("unknown type {}", type);
break;
}
}
二、 ProposalRequestProcessor 处理写事务数据,投票发起
ProposalRequestProcessor 不作为独立的线程运行,只是在处理写事务时被调用。其入口也只有 processRequest 方法而已,主要作用则是组件投票逻辑。
// org.apache.zookeeper.server.quorum.ProposalRequestProcessor#processRequest
public void processRequest(Request request) throws RequestProcessorException {
// LOG.warn("Ack>>> cxid = " + request.cxid + " type = " +
// request.type + " id = " + request.sessionId);
// request.addRQRec(">prop"); /* In the following IF-THEN-ELSE block, we process syncs on the leader.
* If the sync is coming from a follower, then the follower
* handler adds it to syncHandler. Otherwise, if it is a client of
* the leader that issued the sync command, then syncHandler won't
* contain the handler. In this case, we add it to syncHandler, and
* call processRequest on the next processor.
*/ if (request instanceof LearnerSyncRequest) {
zks.getLeader().processSync((LearnerSyncRequest) request);
} else {
// 此处 nextProcessor 是 CommitProcessor
// 先交其处理,然后再判断是否需要本地化数据
nextProcessor.processRequest(request);
// 只要是写事务, hdr 不会为空
if (request.getHdr() != null) {
// We need to sync and get consensus on any transactions
// 投票决定是否更新数据
try {
// 投票逻辑则由 leader 类去处理
zks.getLeader().propose(request);
} catch (XidRolloverException e) {
throw new RequestProcessorException(e.getMessage(), e);
}
// 交由 SyncRequestProcessor 进行数据持久化操作
syncProcessor.processRequest(request);
}
}
} // 提交给 CommitProcessor 的任务队列
// org.apache.zookeeper.server.quorum.CommitProcessor#processRequest
@Override
public void processRequest(Request request) {
if (stopped) {
return;
}
LOG.debug("Processing request:: {}", request);
request.commitProcQueueStartTime = Time.currentElapsedTime();
queuedRequests.add(request);
// If the request will block, add it to the queue of blocking requests
if (needCommit(request)) {
queuedWriteRequests.add(request);
numWriteQueuedRequests.incrementAndGet();
} else {
numReadQueuedRequests.incrementAndGet();
}
wakeup();
} // 简单看一下事务投票过程
// org.apache.zookeeper.server.quorum.Leader#propose
/**
* create a proposal and send it out to all the members
*
* @param request
* @return the proposal that is queued to send to all the members
*/
public Proposal propose(Request request) throws XidRolloverException {
/**
* Address the rollover issue. All lower 32bits set indicate a new leader
* election. Force a re-election instead. See ZOOKEEPER-1277
*/
if ((request.zxid & 0xffffffffL) == 0xffffffffL) {
String msg = "zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start";
shutdown(msg);
throw new XidRolloverException(msg);
} // 封装 QuorumPacket, 进行节点间通信
byte[] data = SerializeUtils.serializeRequest(request);
proposalStats.setLastBufferSize(data.length);
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null); Proposal p = new Proposal();
p.packet = pp;
p.request = request; synchronized (this) {
// 将自己算入投票队列
p.addQuorumVerifier(self.getQuorumVerifier()); if (request.getHdr().getType() == OpCode.reconfig) {
self.setLastSeenQuorumVerifier(request.qv, true);
} if (self.getQuorumVerifier().getVersion() < self.getLastSeenQuorumVerifier().getVersion()) {
p.addQuorumVerifier(self.getLastSeenQuorumVerifier());
} LOG.debug("Proposing:: {}", request); lastProposed = p.packet.getZxid();
outstandingProposals.put(lastProposed, p);
// 此处会将 QuorumPacket 数据包,全部发送到各个通信节点
sendPacket(pp);
}
ServerMetrics.getMetrics().PROPOSAL_COUNT.add(1);
return p;
} // 同步处理器的加入逻辑,也是将其放入队列中,然后自行轮询队列处理
// org.apache.zookeeper.server.SyncRequestProcessor#processRequest
public void processRequest(final Request request) {
Objects.requireNonNull(request, "Request cannot be null"); request.syncQueueStartTime = Time.currentElapsedTime();
queuedRequests.add(request);
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUED.add(1);
}
三、 CommitProcessor 处理数据的提交操作,触发ack等
由 PrepRequestProcessor 提交任务过来,自身独立处理request.
// org.apache.zookeeper.server.quorum.CommitProcessor#processRequest
@Override
public void processRequest(Request request) {
if (stopped) {
return;
}
LOG.debug("Processing request:: {}", request);
request.commitProcQueueStartTime = Time.currentElapsedTime();
queuedRequests.add(request);
// If the request will block, add it to the queue of blocking requests
if (needCommit(request)) {
queuedWriteRequests.add(request);
numWriteQueuedRequests.incrementAndGet();
} else {
numReadQueuedRequests.incrementAndGet();
}
wakeup();
} // org.apache.zookeeper.server.quorum.CommitProcessor#run
@Override
public void run() {
try {
/*
* In each iteration of the following loop we process at most
* requestsToProcess requests of queuedRequests. We have to limit
* the number of request we poll from queuedRequests, since it is
* possible to endlessly poll read requests from queuedRequests, and
* that will lead to a starvation of non-local committed requests.
*/
int requestsToProcess = 0;
boolean commitIsWaiting = false;
do {
/*
* Since requests are placed in the queue before being sent to
* the leader, if commitIsWaiting = true, the commit belongs to
* the first update operation in the queuedRequests or to a
* request from a client on another server (i.e., the order of
* the following two lines is important!).
*/
commitIsWaiting = !committedRequests.isEmpty();
requestsToProcess = queuedRequests.size();
// Avoid sync if we have something to do
// wait/notify 等待任务处理完成
if (requestsToProcess == 0 && !commitIsWaiting) {
// Waiting for requests to process
synchronized (this) {
while (!stopped && requestsToProcess == 0 && !commitIsWaiting) {
wait();
commitIsWaiting = !committedRequests.isEmpty();
requestsToProcess = queuedRequests.size();
}
}
} ServerMetrics.getMetrics().READS_QUEUED_IN_COMMIT_PROCESSOR.add(numReadQueuedRequests.get());
ServerMetrics.getMetrics().WRITES_QUEUED_IN_COMMIT_PROCESSOR.add(numWriteQueuedRequests.get());
ServerMetrics.getMetrics().COMMITS_QUEUED_IN_COMMIT_PROCESSOR.add(committedRequests.size()); long time = Time.currentElapsedTime(); /*
* Processing up to requestsToProcess requests from the incoming
* queue (queuedRequests). If maxReadBatchSize is set then no
* commits will be processed until maxReadBatchSize number of
* reads are processed (or no more reads remain in the queue).
* After the loop a single committed request is processed if
* one is waiting (or a batch of commits if maxCommitBatchSize
* is set).
*/
Request request;
int readsProcessed = 0;
// 处理单批次数据,由前面取得的 requestsToProcess 决定
while (!stopped
&& requestsToProcess > 0
&& (maxReadBatchSize < 0 || readsProcessed <= maxReadBatchSize)
&& (request = queuedRequests.poll()) != null) {
// 将队列弹出后,就相当于处理了该任务
requestsToProcess--;
// 更新类操作将会进入第一个逻辑
if (needCommit(request) || pendingRequests.containsKey(request.sessionId)) {
// Add request to pending
Deque<Request> requests = pendingRequests.computeIfAbsent(request.sessionId, sid -> new ArrayDeque<>());
requests.addLast(request);
ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(requests.size());
} else {
readsProcessed++;
numReadQueuedRequests.decrementAndGet();
sendToNextProcessor(request);
}
/*
* Stop feeding the pool if there is a local pending update
* and a committed request that is ready. Once we have a
* pending request with a waiting committed request, we know
* we can process the committed one. This is because commits
* for local requests arrive in the order they appeared in
* the queue, so if we have a pending request and a
* committed request, the committed request must be for that
* pending write or for a write originating at a different
* server. We skip this if maxReadBatchSize is set.
*/
if (maxReadBatchSize < 0 && !pendingRequests.isEmpty() && !committedRequests.isEmpty()) {
/*
* We set commitIsWaiting so that we won't check
* committedRequests again.
*/
commitIsWaiting = true;
break;
}
}
ServerMetrics.getMetrics().READS_ISSUED_IN_COMMIT_PROC.add(readsProcessed); if (!commitIsWaiting) {
commitIsWaiting = !committedRequests.isEmpty();
} /*
* Handle commits, if any.
*/
if (commitIsWaiting && !stopped) {
/*
* Drain outstanding reads
*/
waitForEmptyPool(); if (stopped) {
return;
} int commitsToProcess = maxCommitBatchSize; /*
* Loop through all the commits, and try to drain them.
*/
Set<Long> queuesToDrain = new HashSet<>();
long startWriteTime = Time.currentElapsedTime();
int commitsProcessed = 0;
while (commitIsWaiting && !stopped && commitsToProcess > 0) {
// Process committed head
request = committedRequests.peek(); /*
* Check if this is a local write request is pending,
* if so, update it with the committed info. If the commit matches
* the first write queued in the blockedRequestQueue, we know this is
* a commit for a local write, as commits are received in order. Else
* it must be a commit for a remote write.
*/
if (!queuedWriteRequests.isEmpty()
&& queuedWriteRequests.peek().sessionId == request.sessionId
&& queuedWriteRequests.peek().cxid == request.cxid) {
/*
* Commit matches the earliest write in our write queue.
*/
Deque<Request> sessionQueue = pendingRequests.get(request.sessionId);
ServerMetrics.getMetrics().PENDING_SESSION_QUEUE_SIZE.add(pendingRequests.size());
if (sessionQueue == null || sessionQueue.isEmpty() || !needCommit(sessionQueue.peek())) {
/*
* Can't process this write yet.
* Either there are reads pending in this session, or we
* haven't gotten to this write yet.
*/
break;
} else {
ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(sessionQueue.size());
// If session queue != null, then it is also not empty.
Request topPending = sessionQueue.poll();
/*
* Generally, we want to send to the next processor our version of the request,
* since it contains the session information that is needed for post update processing.
* In more details, when a request is in the local queue, there is (or could be) a client
* attached to this server waiting for a response, and there is other bookkeeping of
* requests that are outstanding and have originated from this server
* (e.g., for setting the max outstanding requests) - we need to update this info when an
* outstanding request completes. Note that in the other case, the operation
* originated from a different server and there is no local bookkeeping or a local client
* session that needs to be notified.
*/
topPending.setHdr(request.getHdr());
topPending.setTxn(request.getTxn());
topPending.zxid = request.zxid;
topPending.commitRecvTime = request.commitRecvTime;
request = topPending;
// Only decrement if we take a request off the queue.
numWriteQueuedRequests.decrementAndGet();
queuedWriteRequests.poll();
queuesToDrain.add(request.sessionId);
}
}
/*
* Pull the request off the commit queue, now that we are going
* to process it.
*/
committedRequests.remove();
commitsToProcess--;
commitsProcessed++; // Process the write inline.
// 处理 commit, ToBeAppliedRequestProcessor
processWrite(request); commitIsWaiting = !committedRequests.isEmpty();
}
ServerMetrics.getMetrics().WRITE_BATCH_TIME_IN_COMMIT_PROCESSOR
.add(Time.currentElapsedTime() - startWriteTime);
ServerMetrics.getMetrics().WRITES_ISSUED_IN_COMMIT_PROC.add(commitsProcessed); /*
* Process following reads if any, remove session queue(s) if
* empty.
*/
readsProcessed = 0;
for (Long sessionId : queuesToDrain) {
Deque<Request> sessionQueue = pendingRequests.get(sessionId);
int readsAfterWrite = 0;
while (!stopped && !sessionQueue.isEmpty() && !needCommit(sessionQueue.peek())) {
numReadQueuedRequests.decrementAndGet();
// 构造 org.apache.zookeeper.server.quorum.CommitProcessor.CommitWorkRequest#CommitWorkRequest, 提交到线程池运行
sendToNextProcessor(sessionQueue.poll());
readsAfterWrite++;
}
ServerMetrics.getMetrics().READS_AFTER_WRITE_IN_SESSION_QUEUE.add(readsAfterWrite);
readsProcessed += readsAfterWrite; // Remove empty queues
if (sessionQueue.isEmpty()) {
pendingRequests.remove(sessionId);
}
}
ServerMetrics.getMetrics().SESSION_QUEUES_DRAINED.add(queuesToDrain.size());
ServerMetrics.getMetrics().READ_ISSUED_FROM_SESSION_QUEUE.add(readsProcessed);
} ServerMetrics.getMetrics().COMMIT_PROCESS_TIME.add(Time.currentElapsedTime() - time);
endOfIteration();
} while (!stoppedMainLoop);
} catch (Throwable e) {
handleException(this.getName(), e);
}
LOG.info("CommitProcessor exited loop!");
} // 当commitProcessor 检测到可以进行事务提交时,就会进行提交动作(这个动作是在投票已结束的时候才开始的)
// org.apache.zookeeper.server.quorum.CommitProcessor#processWrite
private void processWrite(Request request) throws RequestProcessorException {
processCommitMetrics(request, true); long timeBeforeFinalProc = Time.currentElapsedTime();
// ToBeAppliedRequestProcessor
nextProcessor.processRequest(request);
ServerMetrics.getMetrics().WRITE_FINAL_PROC_TIME.add(Time.currentElapsedTime() - timeBeforeFinalProc);
}
// org.apache.zookeeper.server.quorum.Leader.ToBeAppliedRequestProcessor#processRequest
public void processRequest(Request request) throws RequestProcessorException {
// FinalRequestProcessor, 由 FinalRequestProcessor 进行数据持久化
next.processRequest(request); // The only requests that should be on toBeApplied are write
// requests, for which we will have a hdr. We can't simply use
// request.zxid here because that is set on read requests to equal
// the zxid of the last write op.
if (request.getHdr() != null) {
long zxid = request.getHdr().getZxid();
Iterator<Proposal> iter = leader.toBeApplied.iterator();
if (iter.hasNext()) {
Proposal p = iter.next();
if (p.request != null && p.request.zxid == zxid) {
iter.remove();
return;
}
}
LOG.error("Committed request not found on toBeApplied: {}", request);
}
}
// org.apache.zookeeper.server.FinalRequestProcessor#processRequest
public void processRequest(Request request) {
LOG.debug("Processing request:: {}", request); // request.addRQRec(">final");
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) {
traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
}
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, traceMask, 'E', request, "");
} // DB 入库
ProcessTxnResult rc = zks.processTxn(request); // ZOOKEEPER-558:
// In some cases the server does not close the connection (e.g., closeconn buffer
// was not being queued — ZOOKEEPER-558) properly. This happens, for example,
// when the client closes the connection. The server should still close the session, though.
// Calling closeSession() after losing the cnxn, results in the client close session response being dropped.
if (request.type == OpCode.closeSession && connClosedByClient(request)) {
// We need to check if we can close the session id.
// Sometimes the corresponding ServerCnxnFactory could be null because
// we are just playing diffs from the leader.
if (closeSession(zks.serverCnxnFactory, request.sessionId)
|| closeSession(zks.secureServerCnxnFactory, request.sessionId)) {
return;
}
} if (request.getHdr() != null) {
/*
* Request header is created only by the leader, so this must be
* a quorum request. Since we're comparing timestamps across hosts,
* this metric may be incorrect. However, it's still a very useful
* metric to track in the happy case. If there is clock drift,
* the latency can go negative. Note: headers use wall time, not
* CLOCK_MONOTONIC.
*/
long propagationLatency = Time.currentWallTime() - request.getHdr().getTime();
if (propagationLatency >= 0) {
ServerMetrics.getMetrics().PROPAGATION_LATENCY.add(propagationLatency);
}
} if (request.cnxn == null) {
return;
}
ServerCnxn cnxn = request.cnxn; long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid(); String lastOp = "NA";
// Notify ZooKeeperServer that the request has finished so that it can
// update any request accounting/throttling limits
zks.decInProcess();
zks.requestFinished(request);
Code err = Code.OK;
Record rsp = null;
String path = null;
try {
if (request.getHdr() != null && request.getHdr().getType() == OpCode.error) {
/*
* When local session upgrading is disabled, leader will
* reject the ephemeral node creation due to session expire.
* However, if this is the follower that issue the request,
* it will have the correct error code, so we should use that
* and report to user
*/
if (request.getException() != null) {
throw request.getException();
} else {
throw KeeperException.create(KeeperException.Code.get(((ErrorTxn) request.getTxn()).getErr()));
}
} KeeperException ke = request.getException();
if (ke instanceof SessionMovedException) {
throw ke;
}
if (ke != null && request.type != OpCode.multi) {
throw ke;
} LOG.debug("{}", request); if (request.isStale()) {
ServerMetrics.getMetrics().STALE_REPLIES.add(1);
} switch (request.type) {
case OpCode.ping: {
lastOp = "PING";
updateStats(request, lastOp, lastZxid); cnxn.sendResponse(new ReplyHeader(-2, lastZxid, 0), null, "response");
return;
}
case OpCode.createSession: {
lastOp = "SESS";
updateStats(request, lastOp, lastZxid); zks.finishSessionInit(request.cnxn, true);
return;
}
case OpCode.multi: {
lastOp = "MULT";
rsp = new MultiResponse(); for (ProcessTxnResult subTxnResult : rc.multiResult) { OpResult subResult; switch (subTxnResult.type) {
case OpCode.check:
subResult = new CheckResult();
break;
case OpCode.create:
subResult = new CreateResult(subTxnResult.path);
break;
case OpCode.create2:
case OpCode.createTTL:
case OpCode.createContainer:
subResult = new CreateResult(subTxnResult.path, subTxnResult.stat);
break;
case OpCode.delete:
case OpCode.deleteContainer:
subResult = new DeleteResult();
break;
case OpCode.setData:
subResult = new SetDataResult(subTxnResult.stat);
break;
case OpCode.error:
subResult = new ErrorResult(subTxnResult.err);
if (subTxnResult.err == Code.SESSIONMOVED.intValue()) {
throw new SessionMovedException();
}
break;
default:
throw new IOException("Invalid type of op");
} ((MultiResponse) rsp).add(subResult);
} break;
}
case OpCode.multiRead: {
lastOp = "MLTR";
MultiOperationRecord multiReadRecord = new MultiOperationRecord();
ByteBufferInputStream.byteBuffer2Record(request.request, multiReadRecord);
rsp = new MultiResponse();
OpResult subResult;
for (Op readOp : multiReadRecord) {
try {
Record rec;
switch (readOp.getType()) {
case OpCode.getChildren:
rec = handleGetChildrenRequest(readOp.toRequestRecord(), cnxn, request.authInfo);
subResult = new GetChildrenResult(((GetChildrenResponse) rec).getChildren());
break;
case OpCode.getData:
rec = handleGetDataRequest(readOp.toRequestRecord(), cnxn, request.authInfo);
GetDataResponse gdr = (GetDataResponse) rec;
subResult = new GetDataResult(gdr.getData(), gdr.getStat());
break;
default:
throw new IOException("Invalid type of readOp");
}
} catch (KeeperException e) {
subResult = new ErrorResult(e.code().intValue());
}
((MultiResponse) rsp).add(subResult);
}
break;
}
case OpCode.create: {
lastOp = "CREA";
rsp = new CreateResponse(rc.path);
err = Code.get(rc.err);
requestPathMetricsCollector.registerRequest(request.type, rc.path);
break;
}
case OpCode.create2:
case OpCode.createTTL:
case OpCode.createContainer: {
lastOp = "CREA";
rsp = new Create2Response(rc.path, rc.stat);
err = Code.get(rc.err);
requestPathMetricsCollector.registerRequest(request.type, rc.path);
break;
}
case OpCode.delete:
case OpCode.deleteContainer: {
lastOp = "DELE";
err = Code.get(rc.err);
requestPathMetricsCollector.registerRequest(request.type, rc.path);
break;
}
case OpCode.setData: {
// 写数据
lastOp = "SETD";
rsp = new SetDataResponse(rc.stat);
err = Code.get(rc.err);
requestPathMetricsCollector.registerRequest(request.type, rc.path);
break;
}
case OpCode.reconfig: {
lastOp = "RECO";
rsp = new GetDataResponse(
((QuorumZooKeeperServer) zks).self.getQuorumVerifier().toString().getBytes(),
rc.stat);
err = Code.get(rc.err);
break;
}
case OpCode.setACL: {
lastOp = "SETA";
rsp = new SetACLResponse(rc.stat);
err = Code.get(rc.err);
requestPathMetricsCollector.registerRequest(request.type, rc.path);
break;
}
case OpCode.closeSession: {
lastOp = "CLOS";
err = Code.get(rc.err);
break;
}
case OpCode.sync: {
lastOp = "SYNC";
SyncRequest syncRequest = new SyncRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, syncRequest);
rsp = new SyncResponse(syncRequest.getPath());
requestPathMetricsCollector.registerRequest(request.type, syncRequest.getPath());
break;
}
case OpCode.check: {
lastOp = "CHEC";
rsp = new SetDataResponse(rc.stat);
err = Code.get(rc.err);
break;
}
case OpCode.exists: {
lastOp = "EXIS";
// TODO we need to figure out the security requirement for this!
ExistsRequest existsRequest = new ExistsRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, existsRequest);
path = existsRequest.getPath();
if (path.indexOf('\0') != -1) {
throw new KeeperException.BadArgumentsException();
}
Stat stat = zks.getZKDatabase().statNode(path, existsRequest.getWatch() ? cnxn : null);
rsp = new ExistsResponse(stat);
requestPathMetricsCollector.registerRequest(request.type, path);
break;
}
case OpCode.getData: {
lastOp = "GETD";
GetDataRequest getDataRequest = new GetDataRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, getDataRequest);
path = getDataRequest.getPath();
rsp = handleGetDataRequest(getDataRequest, cnxn, request.authInfo);
requestPathMetricsCollector.registerRequest(request.type, path);
break;
}
case OpCode.setWatches: {
lastOp = "SETW";
SetWatches setWatches = new SetWatches();
// TODO We really should NOT need this!!!!
request.request.rewind();
ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
long relativeZxid = setWatches.getRelativeZxid();
zks.getZKDatabase()
.setWatches(
relativeZxid,
setWatches.getDataWatches(),
setWatches.getExistWatches(),
setWatches.getChildWatches(),
cnxn);
break;
}
case OpCode.getACL: {
lastOp = "GETA";
GetACLRequest getACLRequest = new GetACLRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, getACLRequest);
path = getACLRequest.getPath();
DataNode n = zks.getZKDatabase().getNode(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
zks.checkACL(
request.cnxn,
zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.READ | ZooDefs.Perms.ADMIN, request.authInfo, path,
null); Stat stat = new Stat();
List<ACL> acl = zks.getZKDatabase().getACL(path, stat);
requestPathMetricsCollector.registerRequest(request.type, getACLRequest.getPath()); try {
zks.checkACL(
request.cnxn,
zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.ADMIN,
request.authInfo,
path,
null);
rsp = new GetACLResponse(acl, stat);
} catch (KeeperException.NoAuthException e) {
List<ACL> acl1 = new ArrayList<ACL>(acl.size());
for (ACL a : acl) {
if ("digest".equals(a.getId().getScheme())) {
Id id = a.getId();
Id id1 = new Id(id.getScheme(), id.getId().replaceAll(":.*", ":x"));
acl1.add(new ACL(a.getPerms(), id1));
} else {
acl1.add(a);
}
}
rsp = new GetACLResponse(acl1, stat);
}
break;
}
case OpCode.getChildren: {
lastOp = "GETC";
GetChildrenRequest getChildrenRequest = new GetChildrenRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, getChildrenRequest);
path = getChildrenRequest.getPath();
rsp = handleGetChildrenRequest(getChildrenRequest, cnxn, request.authInfo);
requestPathMetricsCollector.registerRequest(request.type, path);
break;
}
case OpCode.getAllChildrenNumber: {
lastOp = "GETACN";
GetAllChildrenNumberRequest getAllChildrenNumberRequest = new GetAllChildrenNumberRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, getAllChildrenNumberRequest);
path = getAllChildrenNumberRequest.getPath();
DataNode n = zks.getZKDatabase().getNode(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
zks.checkACL(
request.cnxn,
zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.READ,
request.authInfo,
path,
null);
int number = zks.getZKDatabase().getAllChildrenNumber(path);
rsp = new GetAllChildrenNumberResponse(number);
break;
}
case OpCode.getChildren2: {
lastOp = "GETC";
GetChildren2Request getChildren2Request = new GetChildren2Request();
ByteBufferInputStream.byteBuffer2Record(request.request, getChildren2Request);
Stat stat = new Stat();
path = getChildren2Request.getPath();
DataNode n = zks.getZKDatabase().getNode(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
zks.checkACL(
request.cnxn,
zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.READ,
request.authInfo, path,
null);
List<String> children = zks.getZKDatabase()
.getChildren(path, stat, getChildren2Request.getWatch() ? cnxn : null);
rsp = new GetChildren2Response(children, stat);
requestPathMetricsCollector.registerRequest(request.type, path);
break;
}
case OpCode.checkWatches: {
lastOp = "CHKW";
CheckWatchesRequest checkWatches = new CheckWatchesRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, checkWatches);
WatcherType type = WatcherType.fromInt(checkWatches.getType());
path = checkWatches.getPath();
boolean containsWatcher = zks.getZKDatabase().containsWatcher(path, type, cnxn);
if (!containsWatcher) {
String msg = String.format(Locale.ENGLISH, "%s (type: %s)", path, type);
throw new KeeperException.NoWatcherException(msg);
}
requestPathMetricsCollector.registerRequest(request.type, checkWatches.getPath());
break;
}
case OpCode.removeWatches: {
lastOp = "REMW";
RemoveWatchesRequest removeWatches = new RemoveWatchesRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, removeWatches);
WatcherType type = WatcherType.fromInt(removeWatches.getType());
path = removeWatches.getPath();
boolean removed = zks.getZKDatabase().removeWatch(path, type, cnxn);
if (!removed) {
String msg = String.format(Locale.ENGLISH, "%s (type: %s)", path, type);
throw new KeeperException.NoWatcherException(msg);
}
requestPathMetricsCollector.registerRequest(request.type, removeWatches.getPath());
break;
}
case OpCode.getEphemerals: {
lastOp = "GETE";
GetEphemeralsRequest getEphemerals = new GetEphemeralsRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, getEphemerals);
String prefixPath = getEphemerals.getPrefixPath();
Set<String> allEphems = zks.getZKDatabase().getDataTree().getEphemerals(request.sessionId);
List<String> ephemerals = new ArrayList<>();
if (StringUtils.isBlank(prefixPath) || "/".equals(prefixPath.trim())) {
ephemerals.addAll(allEphems);
} else {
for (String p : allEphems) {
if (p.startsWith(prefixPath)) {
ephemerals.add(p);
}
}
}
rsp = new GetEphemeralsResponse(ephemerals);
break;
}
}
} catch (SessionMovedException e) {
// session moved is a connection level error, we need to tear
// down the connection otw ZOOKEEPER-710 might happen
// ie client on slow follower starts to renew session, fails
// before this completes, then tries the fast follower (leader)
// and is successful, however the initial renew is then
// successfully fwd/processed by the leader and as a result
// the client and leader disagree on where the client is most
// recently attached (and therefore invalid SESSION MOVED generated)
cnxn.sendCloseSession();
return;
} catch (KeeperException e) {
err = e.code();
} catch (Exception e) {
// log at error level as we are returning a marshalling
// error to the user
LOG.error("Failed to process {}", request, e);
StringBuilder sb = new StringBuilder();
ByteBuffer bb = request.request;
bb.rewind();
while (bb.hasRemaining()) {
sb.append(Integer.toHexString(bb.get() & 0xff));
}
LOG.error("Dumping request buffer: 0x{}", sb.toString());
err = Code.MARSHALLINGERROR;
} ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue());
// 更新状态
updateStats(request, lastOp, lastZxid); try {
if (request.type == OpCode.getData && path != null && rsp != null) {
// Serialized read responses could be cached by the connection object.
// Cache entries are identified by their path and last modified zxid,
// so these values are passed along with the response.
GetDataResponse getDataResponse = (GetDataResponse) rsp;
Stat stat = null;
if (getDataResponse.getStat() != null) {
stat = getDataResponse.getStat();
}
cnxn.sendResponse(hdr, rsp, "response", path, stat);
} else {
// 返回数据给客户端
cnxn.sendResponse(hdr, rsp, "response");
}
if (request.type == OpCode.closeSession) {
cnxn.sendCloseSession();
}
} catch (IOException e) {
LOG.error("FIXMSG", e);
}
} // org.apache.zookeeper.server.ZooKeeperServer#processTxn
// entry point for FinalRequestProcessor.java
public ProcessTxnResult processTxn(Request request) {
TxnHeader hdr = request.getHdr();
processTxnForSessionEvents(request, hdr, request.getTxn()); final boolean writeRequest = (hdr != null);
final boolean quorumRequest = request.isQuorum(); // return fast w/o synchronization when we get a read
if (!writeRequest && !quorumRequest) {
return new ProcessTxnResult();
}
synchronized (outstandingChanges) {
ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn()); // request.hdr is set for write requests, which are the only ones
// that add to outstandingChanges.
if (writeRequest) {
long zxid = hdr.getZxid();
while (!outstandingChanges.isEmpty()
&& outstandingChanges.peek().zxid <= zxid) {
ChangeRecord cr = outstandingChanges.remove();
ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1);
if (cr.zxid < zxid) {
LOG.warn(
"Zxid outstanding 0x{} is less than current 0x{}",
Long.toHexString(cr.zxid),
Long.toHexString(zxid));
}
if (outstandingChangesForPath.get(cr.path) == cr) {
outstandingChangesForPath.remove(cr.path);
}
}
} // do not add non quorum packets to the queue.
// 此处的队列只是为了方便 follower 能够快速复制数据
if (quorumRequest) {
getZKDatabase().addCommittedProposal(request);
}
return rc;
}
}
四、Leader#propose 进行事务操作投票保证数据的高可用性
// 半数支持原则,主要是 组装 QuorumPacket, 然后发送给各个投票节点
// org.apache.zookeeper.server.quorum.Leader#propose
/**
* create a proposal and send it out to all the members
*
* @param request
* @return the proposal that is queued to send to all the members
*/
public Proposal propose(Request request) throws XidRolloverException {
/**
* Address the rollover issue. All lower 32bits set indicate a new leader
* election. Force a re-election instead. See ZOOKEEPER-1277
*/
if ((request.zxid & 0xffffffffL) == 0xffffffffL) {
String msg = "zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start";
shutdown(msg);
throw new XidRolloverException(msg);
} byte[] data = SerializeUtils.serializeRequest(request);
proposalStats.setLastBufferSize(data.length);
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null); Proposal p = new Proposal();
p.packet = pp;
p.request = request; synchronized (this) {
p.addQuorumVerifier(self.getQuorumVerifier()); if (request.getHdr().getType() == OpCode.reconfig) {
self.setLastSeenQuorumVerifier(request.qv, true);
} if (self.getQuorumVerifier().getVersion() < self.getLastSeenQuorumVerifier().getVersion()) {
p.addQuorumVerifier(self.getLastSeenQuorumVerifier());
} LOG.debug("Proposing:: {}", request); lastProposed = p.packet.getZxid();
outstandingProposals.put(lastProposed, p);
// 发送数据包给各投票节点
sendPacket(pp);
}
ServerMetrics.getMetrics().PROPOSAL_COUNT.add(1);
return p;
} /**
* send a packet to all the followers ready to follow
*
* @param qp
* the packet to be sent
*/
void sendPacket(QuorumPacket qp) {
synchronized (forwardingFollowers) {
for (LearnerHandler f : forwardingFollowers) {
// 添加到 LearnerHandler 的队列,就返回,可见 LearnerHandler 肯定又是一个异步任务
f.queuePacket(qp);
}
}
} // org.apache.zookeeper.server.quorum.LearnerHandler#queuePacket
void queuePacket(QuorumPacket p) {
queuedPackets.add(p);
// Add a MarkerQuorumPacket at regular intervals.
// 周期性的放一个 Marker 到队列中
if (shouldSendMarkerPacketForLogging() && packetCounter.getAndIncrement() % markerPacketInterval == 0) {
queuedPackets.add(new MarkerQuorumPacket(System.nanoTime()));
}
queuedPacketsSize.addAndGet(packetSize(p));
}
五、 LearnerHandler 投票通信线程处理提交过来的投票队列
上一节我们看到,Leader 只是交事务投票的包添加到 LearnerHandler 的队列后就返回了,所以, LearnerHandler 肯定有自己另外的处理逻辑。否则如何跟其他节点通信呢?
// org.apache.zookeeper.server.quorum.LearnerHandler#sendPackets
/**
* This method will use the thread to send packets added to the
* queuedPackets list
*
* @throws InterruptedException
*/
private void sendPackets() throws InterruptedException {
long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
// 一个死循环一直处理发送队列
while (true) {
try {
QuorumPacket p;
p = queuedPackets.poll();
if (p == null) {
bufferedOutput.flush();
p = queuedPackets.take();
} ServerMetrics.getMetrics().LEARNER_HANDLER_QP_SIZE.add(Long.toString(this.sid), queuedPackets.size()); // 前面看到会定期的提交 MarkerQuorumPacket 过来,但这里会忽略掉,无实际影响
if (p instanceof MarkerQuorumPacket) {
MarkerQuorumPacket m = (MarkerQuorumPacket) p;
ServerMetrics.getMetrics().LEARNER_HANDLER_QP_TIME
.add(Long.toString(this.sid), (System.nanoTime() - m.time) / 1000000L);
continue;
} queuedPacketsSize.addAndGet(-packetSize(p));
if (p == proposalOfDeath) {
// Packet of death!
break;
}
// ping 包
if (p.getType() == Leader.PING) {
traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
}
// 投票包,我们要观察的包在这里
if (p.getType() == Leader.PROPOSAL) {
syncLimitCheck.updateProposal(p.getZxid(), System.nanoTime());
}
if (LOG.isTraceEnabled()) {
ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p);
} // Log the zxid of the last request, if it is a valid zxid.
if (p.getZxid() > 0) {
lastZxid = p.getZxid();
}
// 往socket中写入投票请求
oa.writeRecord(p, "packet");
packetsSent.incrementAndGet();
messageTracker.trackSent(p.getType());
} catch (IOException e) {
if (!sock.isClosed()) {
LOG.warn("Unexpected exception at {}", this, e);
try {
// this will cause everything to shutdown on
// this learner handler and will help notify
// the learner/observer instantaneously
sock.close();
} catch (IOException ie) {
LOG.warn("Error closing socket for handler {}", this, ie);
}
}
break;
}
}
}
// 上面这个发送线程,是在 LearnerHandler 被调用时生成的
/**
* Start thread that will forward any packet in the queue to the follower
*/
protected void startSendingPackets() {
if (!sendingThreadStarted) {
// Start sending packets
new Thread() {
public void run() {
Thread.currentThread().setName("Sender-" + sock.getRemoteSocketAddress());
try {
sendPackets();
} catch (InterruptedException e) {
LOG.warn("Unexpected interruption", e);
}
}
}.start();
sendingThreadStarted = true;
} else {
LOG.error("Attempting to start sending thread after it already started");
}
} /**
* This thread will receive packets from the peer and process them and
* also listen to new connections from new peers.
*/
@Override
public void run() {
try {
learnerMaster.addLearnerHandler(this);
tickOfNextAckDeadline = learnerMaster.getTickOfInitialAckDeadline(); ia = BinaryInputArchive.getArchive(bufferedInput);
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
oa = BinaryOutputArchive.getArchive(bufferedOutput); QuorumPacket qp = new QuorumPacket();
ia.readRecord(qp, "packet"); messageTracker.trackReceived(qp.getType());
if (qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO) {
LOG.error("First packet {} is not FOLLOWERINFO or OBSERVERINFO!", qp.toString()); return;
} if (learnerMaster instanceof ObserverMaster && qp.getType() != Leader.OBSERVERINFO) {
throw new IOException("Non observer attempting to connect to ObserverMaster. type = " + qp.getType());
}
byte[] learnerInfoData = qp.getData();
if (learnerInfoData != null) {
ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
if (learnerInfoData.length >= 8) {
this.sid = bbsid.getLong();
}
if (learnerInfoData.length >= 12) {
this.version = bbsid.getInt(); // protocolVersion
}
if (learnerInfoData.length >= 20) {
long configVersion = bbsid.getLong();
if (configVersion > learnerMaster.getQuorumVerifierVersion()) {
throw new IOException("Follower is ahead of the leader (has a later activated configuration)");
}
}
} else {
this.sid = learnerMaster.getAndDecrementFollowerCounter();
} String followerInfo = learnerMaster.getPeerInfo(this.sid);
if (followerInfo.isEmpty()) {
LOG.info(
"Follower sid: {} not in the current config {}",
this.sid,
Long.toHexString(learnerMaster.getQuorumVerifierVersion()));
} else {
LOG.info("Follower sid: {} : info : {}", this.sid, followerInfo);
} if (qp.getType() == Leader.OBSERVERINFO) {
learnerType = LearnerType.OBSERVER;
} learnerMaster.registerLearnerHandlerBean(this, sock); long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); long peerLastZxid;
StateSummary ss = null;
long zxid = qp.getZxid();
long newEpoch = learnerMaster.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0); if (this.getVersion() < 0x10000) {
// we are going to have to extrapolate the epoch information
long epoch = ZxidUtils.getEpochFromZxid(zxid);
ss = new StateSummary(epoch, zxid);
// fake the message
learnerMaster.waitForEpochAck(this.getSid(), ss);
} else {
byte[] ver = new byte[4];
ByteBuffer.wrap(ver).putInt(0x10000);
QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
oa.writeRecord(newEpochPacket, "packet");
messageTracker.trackSent(Leader.LEADERINFO);
bufferedOutput.flush();
QuorumPacket ackEpochPacket = new QuorumPacket();
ia.readRecord(ackEpochPacket, "packet");
messageTracker.trackReceived(ackEpochPacket.getType());
if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
LOG.error("{} is not ACKEPOCH", ackEpochPacket.toString());
return;
}
ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
learnerMaster.waitForEpochAck(this.getSid(), ss);
}
peerLastZxid = ss.getLastZxid(); // Take any necessary action if we need to send TRUNC or DIFF
// startForwarding() will be called in all cases
boolean needSnap = syncFollower(peerLastZxid, learnerMaster); // syncs between followers and the leader are exempt from throttling because it
// is importatnt to keep the state of quorum servers up-to-date. The exempted syncs
// are counted as concurrent syncs though
boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
/* if we are not truncating or sending a diff just send a snapshot */
if (needSnap) {
syncThrottler = learnerMaster.getLearnerSnapSyncThrottler();
syncThrottler.beginSync(exemptFromThrottle);
try {
long zxidToSend = learnerMaster.getZKDatabase().getDataTreeLastProcessedZxid();
oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
messageTracker.trackSent(Leader.SNAP);
bufferedOutput.flush(); LOG.info(
"Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, "
+ "send zxid of db as 0x{}, {} concurrent snapshot sync, "
+ "snapshot sync was {} from throttle",
Long.toHexString(peerLastZxid),
Long.toHexString(leaderLastZxid),
Long.toHexString(zxidToSend),
syncThrottler.getSyncInProgress(),
exemptFromThrottle ? "exempt" : "not exempt");
// Dump data to peer
learnerMaster.getZKDatabase().serializeSnapshot(oa);
oa.writeString("BenWasHere", "signature");
bufferedOutput.flush();
} finally {
ServerMetrics.getMetrics().SNAP_COUNT.add(1);
}
} else {
syncThrottler = learnerMaster.getLearnerDiffSyncThrottler();
syncThrottler.beginSync(exemptFromThrottle);
ServerMetrics.getMetrics().DIFF_COUNT.add(1);
} LOG.debug("Sending NEWLEADER message to {}", sid);
// the version of this quorumVerifier will be set by leader.lead() in case
// the leader is just being established. waitForEpochAck makes sure that readyToStart is true if
// we got here, so the version was set
if (getVersion() < 0x10000) {
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, null, null);
oa.writeRecord(newLeaderQP, "packet");
} else {
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, learnerMaster.getQuorumVerifierBytes(), null);
queuedPackets.add(newLeaderQP);
}
bufferedOutput.flush(); // Start thread that blast packets in the queue to learner
startSendingPackets(); /*
* Have to wait for the first ACK, wait until
* the learnerMaster is ready, and only then we can
* start processing messages.
*/
qp = new QuorumPacket();
ia.readRecord(qp, "packet"); messageTracker.trackReceived(qp.getType());
if (qp.getType() != Leader.ACK) {
LOG.error("Next packet was supposed to be an ACK, but received packet: {}", packetToString(qp));
return;
} LOG.debug("Received NEWLEADER-ACK message from {}", sid); learnerMaster.waitForNewLeaderAck(getSid(), qp.getZxid()); syncLimitCheck.start();
// sync ends when NEWLEADER-ACK is received
syncThrottler.endSync();
syncThrottler = null; // now that the ack has been processed expect the syncLimit
sock.setSoTimeout(learnerMaster.syncTimeout()); /*
* Wait until learnerMaster starts up
*/
learnerMaster.waitForStartup(); // Mutation packets will be queued during the serialize,
// so we need to mark when the peer can actually start
// using the data
//
LOG.debug("Sending UPTODATE message to {}", sid);
queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null)); while (true) {
qp = new QuorumPacket();
ia.readRecord(qp, "packet");
messageTracker.trackReceived(qp.getType()); long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
if (qp.getType() == Leader.PING) {
traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
}
if (LOG.isTraceEnabled()) {
ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
}
tickOfNextAckDeadline = learnerMaster.getTickOfNextAckDeadline(); packetsReceived.incrementAndGet(); ByteBuffer bb;
long sessionId;
int cxid;
int type; switch (qp.getType()) {
case Leader.ACK:
if (this.learnerType == LearnerType.OBSERVER) {
LOG.debug("Received ACK from Observer {}", this.sid);
}
syncLimitCheck.updateAck(qp.getZxid());
// 提交 ack, 即收到了一提交的请求,则调用 Leader 进行处理
learnerMaster.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
break;
case Leader.PING:
// Process the touches
ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
DataInputStream dis = new DataInputStream(bis);
while (dis.available() > 0) {
long sess = dis.readLong();
int to = dis.readInt();
learnerMaster.touch(sess, to);
}
break;
case Leader.REVALIDATE:
ServerMetrics.getMetrics().REVALIDATE_COUNT.add(1);
learnerMaster.revalidateSession(qp, this);
break;
case Leader.REQUEST:
bb = ByteBuffer.wrap(qp.getData());
sessionId = bb.getLong();
cxid = bb.getInt();
type = bb.getInt();
bb = bb.slice();
Request si;
if (type == OpCode.sync) {
si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
} else {
si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
}
si.setOwner(this);
learnerMaster.submitLearnerRequest(si);
requestsReceived.incrementAndGet();
break;
default:
LOG.warn("unexpected quorum packet, type: {}", packetToString(qp));
break;
}
}
} catch (IOException e) {
if (sock != null && !sock.isClosed()) {
LOG.error("Unexpected exception causing shutdown while sock still open", e);
//close the socket to make sure the
//other side can see it being close
try {
sock.close();
} catch (IOException ie) {
// do nothing
}
}
} catch (InterruptedException e) {
LOG.error("Unexpected exception in LearnerHandler.", e);
} catch (SyncThrottleException e) {
LOG.error("too many concurrent sync.", e);
syncThrottler = null;
} catch (Exception e) {
LOG.error("Unexpected exception in LearnerHandler.", e);
throw e;
} finally {
if (syncThrottler != null) {
syncThrottler.endSync();
syncThrottler = null;
}
String remoteAddr = getRemoteAddress();
LOG.warn("******* GOODBYE {} ********", remoteAddr);
messageTracker.dumpToLog(remoteAddr);
shutdown();
}
} // org.apache.zookeeper.server.quorum.Leader#processAck
/**
* Keep a count of acks that are received by the leader for a particular
* proposal
*
* @param zxid, the zxid of the proposal sent out
* @param sid, the id of the server that sent the ack
* @param followerAddr
*/
@Override
public synchronized void processAck(long sid, long zxid, SocketAddress followerAddr) {
if (!allowedToCommit) {
return; // last op committed was a leader change - from now on
}
// the new leader should commit
if (LOG.isTraceEnabled()) {
LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid));
for (Proposal p : outstandingProposals.values()) {
long packetZxid = p.packet.getZxid();
LOG.trace("outstanding proposal: 0x{}", Long.toHexString(packetZxid));
}
LOG.trace("outstanding proposals all");
} if ((zxid & 0xffffffffL) == 0) {
/*
* We no longer process NEWLEADER ack with this method. However,
* the learner sends an ack back to the leader after it gets
* UPTODATE, so we just ignore the message.
*/
return;
} if (outstandingProposals.size() == 0) {
LOG.debug("outstanding is 0");
return;
}
if (lastCommitted >= zxid) {
LOG.debug(
"proposal has already been committed, pzxid: 0x{} zxid: 0x{}",
Long.toHexString(lastCommitted),
Long.toHexString(zxid));
// The proposal has already been committed
return;
}
Proposal p = outstandingProposals.get(zxid);
if (p == null) {
LOG.warn("Trying to commit future proposal: zxid 0x{} from {}", Long.toHexString(zxid), followerAddr);
return;
} if (ackLoggingFrequency > 0 && (zxid % ackLoggingFrequency == 0)) {
p.request.logLatency(ServerMetrics.getMetrics().ACK_LATENCY, Long.toString(sid));
} p.addAck(sid); // 尝试 commit
boolean hasCommitted = tryToCommit(p, zxid, followerAddr); // If p is a reconfiguration, multiple other operations may be ready to be committed,
// since operations wait for different sets of acks.
// Currently we only permit one outstanding reconfiguration at a time
// such that the reconfiguration and subsequent outstanding ops proposed while the reconfig is
// pending all wait for a quorum of old and new config, so its not possible to get enough acks
// for an operation without getting enough acks for preceding ops. But in the future if multiple
// concurrent reconfigs are allowed, this can happen and then we need to check whether some pending
// ops may already have enough acks and can be committed, which is what this code does. if (hasCommitted && p.request != null && p.request.getHdr().getType() == OpCode.reconfig) {
long curZxid = zxid;
while (allowedToCommit && hasCommitted && p != null) {
curZxid++;
p = outstandingProposals.get(curZxid);
if (p != null) {
hasCommitted = tryToCommit(p, curZxid, null);
}
}
}
} // org.apache.zookeeper.server.quorum.Leader#tryToCommit
/**
* @return True if committed, otherwise false.
**/
public synchronized boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) {
// make sure that ops are committed in order. With reconfigurations it is now possible
// that different operations wait for different sets of acks, and we still want to enforce
// that they are committed in order. Currently we only permit one outstanding reconfiguration
// such that the reconfiguration and subsequent outstanding ops proposed while the reconfig is
// pending all wait for a quorum of old and new config, so it's not possible to get enough acks
// for an operation without getting enough acks for preceding ops. But in the future if multiple
// concurrent reconfigs are allowed, this can happen.
if (outstandingProposals.containsKey(zxid - 1)) {
return false;
} // in order to be committed, a proposal must be accepted by a quorum.
//
// getting a quorum from all necessary configurations.
if (!p.hasAllQuorums()) {
return false;
} // commit proposals in order
if (zxid != lastCommitted + 1) {
LOG.warn(
"Commiting zxid 0x{} from {} noy first!",
Long.toHexString(zxid),
followerAddr);
LOG.warn("First is {}", (lastCommitted + 1));
} outstandingProposals.remove(zxid); if (p.request != null) {
toBeApplied.add(p);
} if (p.request == null) {
LOG.warn("Going to commit null: {}", p);
} else if (p.request.getHdr().getType() == OpCode.reconfig) {
LOG.debug("Committing a reconfiguration! {}", outstandingProposals.size()); //if this server is voter in new config with the same quorum address,
//then it will remain the leader
//otherwise an up-to-date follower will be designated as leader. This saves
//leader election time, unless the designated leader fails
Long designatedLeader = getDesignatedLeader(p, zxid);
//LOG.warn("designated leader is: " + designatedLeader); QuorumVerifier newQV = p.qvAcksetPairs.get(p.qvAcksetPairs.size() - 1).getQuorumVerifier(); self.processReconfig(newQV, designatedLeader, zk.getZxid(), true); if (designatedLeader != self.getId()) {
allowedToCommit = false;
} // we're sending the designated leader, and if the leader is changing the followers are
// responsible for closing the connection - this way we are sure that at least a majority of them
// receive the commit message.
commitAndActivate(zxid, designatedLeader);
informAndActivate(p, designatedLeader);
//turnOffFollowers();
} else {
p.request.logLatency(ServerMetrics.getMetrics().QUORUM_ACK_LATENCY);
commit(zxid);
inform(p);
}
// 依次提交事务 request, 其实就是放到 CommitProcessor 的提交队列
zk.commitProcessor.commit(p.request);
if (pendingSyncs.containsKey(zxid)) {
// 为 Learner 发送 SYNC 通知
for (LearnerSyncRequest r : pendingSyncs.remove(zxid)) {
sendSync(r);
}
} return true;
}
六、 SyncRequestProcessor 数据持久化处理器
// org.apache.zookeeper.server.quorum.ProposalRequestProcessor#processRequest
public void processRequest(Request request) throws RequestProcessorException {
// LOG.warn("Ack>>> cxid = " + request.cxid + " type = " +
// request.type + " id = " + request.sessionId);
// request.addRQRec(">prop"); /* In the following IF-THEN-ELSE block, we process syncs on the leader.
* If the sync is coming from a follower, then the follower
* handler adds it to syncHandler. Otherwise, if it is a client of
* the leader that issued the sync command, then syncHandler won't
* contain the handler. In this case, we add it to syncHandler, and
* call processRequest on the next processor.
*/ if (request instanceof LearnerSyncRequest) {
zks.getLeader().processSync((LearnerSyncRequest) request);
} else {
nextProcessor.processRequest(request);
if (request.getHdr() != null) {
// We need to sync and get consensus on any transactions
try {
zks.getLeader().propose(request);
} catch (XidRolloverException e) {
throw new RequestProcessorException(e.getMessage(), e);
}
// 只有写事务,才需要进行持久化操作
syncProcessor.processRequest(request);
}
}
} // org.apache.zookeeper.server.SyncRequestProcessor#processRequest
public void processRequest(final Request request) {
Objects.requireNonNull(request, "Request cannot be null"); request.syncQueueStartTime = Time.currentElapsedTime();
// 添加到队列后返回
queuedRequests.add(request);
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUED.add(1);
} // org.apache.zookeeper.server.SyncRequestProcessor#run
@Override
public void run() {
try {
// we do this in an attempt to ensure that not all of the servers
// in the ensemble take a snapshot at the same time
resetSnapshotStats();
lastFlushTime = Time.currentElapsedTime();
while (true) {
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size()); long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay());
// 从 queuedRequests 队列取数据
Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS);
if (si == null) {
/* We timed out looking for more writes to batch, go ahead and flush immediately */
flush();
si = queuedRequests.take();
} if (si == REQUEST_OF_DEATH) {
break;
} long startProcessTime = Time.currentElapsedTime();
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime); // track the number of records written to the log
// 添加到 事务日志中
if (zks.getZKDatabase().append(si)) {
if (shouldSnapshot()) {
resetSnapshotStats();
// roll the log
zks.getZKDatabase().rollLog();
// take a snapshot
if (!snapThreadMutex.tryAcquire()) {
LOG.warn("Too busy to snap, skipping");
} else {
new ZooKeeperThread("Snapshot Thread") {
public void run() {
try {
zks.takeSnapshot();
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
} finally {
snapThreadMutex.release();
}
}
}.start();
}
}
} else if (toFlush.isEmpty()) {
// optimization for read heavy workloads
// iff this is a read, and there are no pending
// flushes (writes), then just pass this to the next
// processor
if (nextProcessor != null) {
nextProcessor.processRequest(si);
if (nextProcessor instanceof Flushable) {
((Flushable) nextProcessor).flush();
}
}
continue;
}
toFlush.add(si);
// 2
// 刷写数据到磁盘,完成 Sync 操作
if (shouldFlush()) {
flush();
}
ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime);
}
} catch (Throwable t) {
handleException(this.getName(), t);
}
LOG.info("SyncRequestProcessor exited!");
} /** If both flushDelay and maxMaxBatchSize are set (bigger than 0), flush
* whenever either condition is hit. If only one or the other is
* set, flush only when the relevant condition is hit.
*/
private boolean shouldFlush() {
long flushDelay = zks.getFlushDelay();
long maxBatchSize = zks.getMaxBatchSize();
if ((flushDelay > 0) && (getRemainingDelay() == 0)) {
return true;
}
return (maxBatchSize > 0) && (toFlush.size() >= maxBatchSize);
}
至此,一个数据的更新操作就完成了。看起来是有点复杂呢。
主要有:
1. 判断写事务;
2. 发起投票操作;
3. 发送 ack 回应;
4. 可以提交事务, 交给 CommitProcessor;
5. 刷写数据到磁盘;
七、 等等,各个事务的投票是如何处理的?
是的,前面我们看到通过队列,将请求包给到了 LearnerHandler 的两个线程。顶多也只看到了发送的过程,并未看到有什么超过半数投票之类的动作,它在哪里呢?
其实是在 tryToCommit 的时候进行判断的。
// org.apache.zookeeper.server.quorum.Leader#tryToCommit
/**
* @return True if committed, otherwise false.
**/
public synchronized boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) {
// make sure that ops are committed in order. With reconfigurations it is now possible
// that different operations wait for different sets of acks, and we still want to enforce
// that they are committed in order. Currently we only permit one outstanding reconfiguration
// such that the reconfiguration and subsequent outstanding ops proposed while the reconfig is
// pending all wait for a quorum of old and new config, so it's not possible to get enough acks
// for an operation without getting enough acks for preceding ops. But in the future if multiple
// concurrent reconfigs are allowed, this can happen.
if (outstandingProposals.containsKey(zxid - 1)) {
return false;
} // in order to be committed, a proposal must be accepted by a quorum.
//
// getting a quorum from all necessary configurations.
// 确认是否达到了可以通过的确认数量,如果没有则不提交。
// 如果超过了半数提交,则直接进入提交后续,当然,这会有并发控制
if (!p.hasAllQuorums()) {
return false;
} // commit proposals in order
if (zxid != lastCommitted + 1) {
LOG.warn(
"Commiting zxid 0x{} from {} noy first!",
Long.toHexString(zxid),
followerAddr);
LOG.warn("First is {}", (lastCommitted + 1));
} outstandingProposals.remove(zxid); if (p.request != null) {
toBeApplied.add(p);
} if (p.request == null) {
LOG.warn("Going to commit null: {}", p);
} else if (p.request.getHdr().getType() == OpCode.reconfig) {
LOG.debug("Committing a reconfiguration! {}", outstandingProposals.size()); //if this server is voter in new config with the same quorum address,
//then it will remain the leader
//otherwise an up-to-date follower will be designated as leader. This saves
//leader election time, unless the designated leader fails
Long designatedLeader = getDesignatedLeader(p, zxid);
//LOG.warn("designated leader is: " + designatedLeader); QuorumVerifier newQV = p.qvAcksetPairs.get(p.qvAcksetPairs.size() - 1).getQuorumVerifier(); self.processReconfig(newQV, designatedLeader, zk.getZxid(), true); if (designatedLeader != self.getId()) {
allowedToCommit = false;
} // we're sending the designated leader, and if the leader is changing the followers are
// responsible for closing the connection - this way we are sure that at least a majority of them
// receive the commit message.
commitAndActivate(zxid, designatedLeader);
informAndActivate(p, designatedLeader);
//turnOffFollowers();
} else {
p.request.logLatency(ServerMetrics.getMetrics().QUORUM_ACK_LATENCY);
// 提交后,会把 lastCommitted 设置为当前 zxid
// 会给 各节点发送 COMMIT 信息提交
commit(zxid);
inform(p);
}
zk.commitProcessor.commit(p.request);
if (pendingSyncs.containsKey(zxid)) {
for (LearnerSyncRequest r : pendingSyncs.remove(zxid)) {
sendSync(r);
}
} return true;
}
// org.apache.zookeeper.server.quorum.SyncedLearnerTracker#hasAllQuorums
public boolean hasAllQuorums() {
for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) {
// 此处是判断是否超过半数的请求被 ack
if (!qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset())) {
return false;
}
}
// 如果超过半数 ack, 则进行提交动作
return true;
}
// org.apache.zookeeper.server.quorum.flexible.QuorumMaj#containsQuorum
/**
* Verifies if a set is a majority. Assumes that ackSet contains acks only
* from votingMembers
*/
public boolean containsQuorum(Set<Long> ackSet) {
return (ackSet.size() > half);
}
// 通知提交事务
// org.apache.zookeeper.server.quorum.Leader#commit
/**
* Create a commit packet and send it to all the members of the quorum
*
* @param zxid
*/
public void commit(long zxid) {
synchronized (this) {
lastCommitted = zxid;
}
QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
sendPacket(qp);
ServerMetrics.getMetrics().COMMIT_COUNT.add(1);
}
// 从上面可以看出,每收到一个节点的响应,就会走一次提交判断逻辑,且只要超过一半就会进行提交,如果不作后续限制,则肯定会出重复提交问题,这个问题是在 processAck 里通过 zxid 来判断的 /**
* Keep a count of acks that are received by the leader for a particular
* proposal
*
* @param zxid, the zxid of the proposal sent out
* @param sid, the id of the server that sent the ack
* @param followerAddr
*/
@Override
public synchronized void processAck(long sid, long zxid, SocketAddress followerAddr) {
if (!allowedToCommit) {
return; // last op committed was a leader change - from now on
}
// the new leader should commit
if (LOG.isTraceEnabled()) {
LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid));
for (Proposal p : outstandingProposals.values()) {
long packetZxid = p.packet.getZxid();
LOG.trace("outstanding proposal: 0x{}", Long.toHexString(packetZxid));
}
LOG.trace("outstanding proposals all");
} if ((zxid & 0xffffffffL) == 0) {
/*
* We no longer process NEWLEADER ack with this method. However,
* the learner sends an ack back to the leader after it gets
* UPTODATE, so we just ignore the message.
*/
return;
} if (outstandingProposals.size() == 0) {
LOG.debug("outstanding is 0");
return;
}
// 如果已提交,则 lastCommitted 会变大,从而使本次提交失效返回
if (lastCommitted >= zxid) {
LOG.debug(
"proposal has already been committed, pzxid: 0x{} zxid: 0x{}",
Long.toHexString(lastCommitted),
Long.toHexString(zxid));
// The proposal has already been committed
return;
}
Proposal p = outstandingProposals.get(zxid);
if (p == null) {
LOG.warn("Trying to commit future proposal: zxid 0x{} from {}", Long.toHexString(zxid), followerAddr);
return;
} if (ackLoggingFrequency > 0 && (zxid % ackLoggingFrequency == 0)) {
p.request.logLatency(ServerMetrics.getMetrics().ACK_LATENCY, Long.toString(sid));
} p.addAck(sid); boolean hasCommitted = tryToCommit(p, zxid, followerAddr); // If p is a reconfiguration, multiple other operations may be ready to be committed,
// since operations wait for different sets of acks.
// Currently we only permit one outstanding reconfiguration at a time
// such that the reconfiguration and subsequent outstanding ops proposed while the reconfig is
// pending all wait for a quorum of old and new config, so its not possible to get enough acks
// for an operation without getting enough acks for preceding ops. But in the future if multiple
// concurrent reconfigs are allowed, this can happen and then we need to check whether some pending
// ops may already have enough acks and can be committed, which is what this code does. if (hasCommitted && p.request != null && p.request.getHdr().getType() == OpCode.reconfig) {
long curZxid = zxid;
while (allowedToCommit && hasCommitted && p != null) {
curZxid++;
p = outstandingProposals.get(curZxid);
if (p != null) {
hasCommitted = tryToCommit(p, curZxid, null);
}
}
}
}
至此,整体流程完成!Leader 任务完成, Follower 的流程略有差异。但是肯定没Leader的复杂,感兴趣同学可以去看看。
虽然一个看起来很简单的更新动作,在这里显得如此之复杂,但是在准确性和高可用面前,内部的复杂性是必要的。至少在你使用时只需set一下就可以了,却享受了api带给你的关键保障,这也是基础设施给我们带来的好处。
最新文章
- OpenGL帧缓存对象(FBO:Frame Buffer Object)(转载)
- C语言基础(6)-char类型
- iOS10 SiriKit QQ适配详解
- [读书笔记]自动装箱的陷阱以及==与equals
- 用MathType编辑横三角形的方法
- MyBatis环境配置
- Law of total probability
- 高效率JAVA实现斐波那契
- HTTP gzip和deflate的几点区别
- 【M30】代理类
- Photoshop:制作方块背景
- Java.lang.RuntimeException: Unable to instantiate activity ComponentInfo
- Asp.net MVC入门视频教程
- unsigned int 转 RGB
- ajax+json+java
- c# 多线程 --Mutex(互斥锁) 【转】
- mac os app 开发
- Java中的深克隆和浅克隆
- CentOS下redis-cli安装
- TCP编程实践小结1
热门文章
- centos7 redis 6379端口telnet不通
- 《Java练习题》习题集一
- 爬取https://www.parenting.com/baby-names/boys/earl网站top10男女生名字及相关信息
- Spring Cloud进阶之路 | 二:服务提供者(discovery)
- python 虚拟环境安装与卸载
- [AI开发]DeepStream开发填坑记录
- Mysql 的异常:The last packet successfully received from the server was 90 milliseconds ago. The last packet sent successfully to the server was 43,603,303 milliseconds ago. is longer than the server con
- (2018版)webstorm的安装和破解
- Linux下使用docker 拉取 vsftpd 镜像搭建 Ftp 服务器,连接 Ftp 时遇到的错误(425 Failed to establish connection)
- Hyperledger Fabric私有数据