1.前言
QuorumPeer是一个线程对象,里面比较核心的方法是run方法,但QuorumPeer的run方法比较复杂,里面包含着针对QuorumPeer的各种状态的判断,里面的代码比较长,zk节点的looking状态下的操作,下面这块代码是针对QuorumPeer是Looking状态下的话,进行执行的代码逻辑,会有两个分支,根据判断节点是否配置readonlymode.enabled参数,然后有两个分支逻辑,这两个分支逻辑都会走同一个代码逻辑。readonlymode.enabled参数为true的时候会进行开启一个异步线程执行ReadOnlyZooKeeperServer的startup方法。
2.LOOKING状态下QuorumPeer的执行逻辑
下面这个是LOOKING状态下的代码逻辑
LOG.info("LOOKING");
ServerMetrics.getMetrics().LOOKING_COUNT.add(1);
//判断节点是否是一个只读节点的配置
if (Boolean.getBoolean("readonlymode.enabled")) {
LOG.info("Attempting to start ReadOnlyZooKeeperServer");
// Create read-only server but don't start it immediately
final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);
// Instead of starting roZk immediately, wait some grace
// period before we decide we're partitioned.
//
// Thread is used here because otherwise it would require
// changes in each of election strategy classes which is
// unnecessary code coupling.
Thread roZkMgr = new Thread() {
public void run() {
try {
// lower-bound grace period to 2 secs
sleep(Math.max(2000, tickTime));
if (ServerState.LOOKING.equals(getPeerState())) {
roZk.startup();
}
} catch (InterruptedException e) {
LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
} catch (Exception e) {
LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
}
}
};
try {
roZkMgr.start();
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
setCurrentVote(makeLEStrategy().lookForLeader());
checkSuspended();
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
} finally {
// If the thread is in the the grace period, interrupt
// to come out of waiting.
roZkMgr.interrupt();
roZk.shutdown();
}
} else {
try {
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
}
走同一块的代码逻辑
//将reconfigFlag这个字段的值设置为false
//不是很清楚这个 reconfigFlag字段的作用
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
//QuorumPeer#start方法中已经进行了startLeaderElection方法的调用
//这块看了下shuttingDownLE这个属性默认值为false 感觉一般情况下不会调用这个方法
//开启选举算法 这块开始选择
startLeaderElection();
}
//设置当前的选票
//makeLEStrategy().lookForLeader() 这个逻辑看上去是进行选举leader节点操作
setCurrentVote(makeLEStrategy().lookForLeader());
checkSuspended();
3.创建选举算法
虽然startLeaderElection这方法,在QuorumPeer的start方法中,已经被进行调用了,此处在looking状态下很有可能是不会被调用的,我们可以简单的看下startLeaderElection这个方法,我们这边看下的是zookeeper-3.9.1版本的代码
public synchronized void startLeaderElection() {
try {
//判断当前QuorumPeer的状态如果是LOOKING的状态 会进行构建一个选票信息
if (getPeerState() == ServerState.LOOKING) {
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
} catch (IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
//根据选举的类型进行创建一个选举的算法逻辑
this.electionAlg = createElectionAlgorithm(electionType);
}
/***
* 以前版本还支持 多个选举类型 会有不同的选举算法来进行对应
* 现在支持1种选举算法 FastLeaderElection
**/
protected Election createElectionAlgorithm(int electionAlgorithm) {
Election le = null;
switch (electionAlgorithm) {
case 1:
throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
case 2:
throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
case 3:
//进行构建一个网络通信的manager
QuorumCnxManager qcm = createCnxnManager();
QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
if (oldQcm != null) {
LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
oldQcm.halt();
}
//获取网络通信的组件的一个Listener
QuorumCnxManager.Listener listener = qcm.listener;
if (listener != null) {
//启动Listener 这个Listener主要是用来接收别的节点的信息
listener.start();
//构建FastLeaderElection对象
FastLeaderElection fle = new FastLeaderElection(this, qcm);
//启动选举算法 这个选举算法应该也是一个线程
fle.start();
le = fle;
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}
FastLeaderElection方法的构造方法,在FastLeaderElection的构造方法中,主要进行发送队列和接收队列的初始化,并对QuorumPeer的网络通信组件进行封装,用于后期进行网络通信。
//构造方法
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager) {
this.stop = false;
this.manager = manager;
starter(self, manager);
}
private void starter(QuorumPeer self, QuorumCnxManager manager) {
this.self = self;
proposedLeader = -1;
proposedZxid = -1;
//初始化一个发送队列
sendqueue = new LinkedBlockingQueue<>();
//初始化的一个接收队列
recvqueue = new LinkedBlockingQueue<>();
this.messenger = new Messenger(manager);
}
//FastLeaderElection的start方法
public void start() {
this.messenger.start();
}
//Messenger的start方法 此时会进行启动两个线程
void start() {
//发送线程的启动
this.wsThread.start();
//接收线程的启动
this.wrThread.start();
}
4.lookForLeader
从前端的代码逻辑中分析得出,org.apache.zookeeper.server.quorum.FastLeaderElection#lookForLeader,这个方法开启leader节点的选举操作,当QuorumPeer的状态为LOOKING状态的时候,会进行调用此方法。
public Vote lookForLeader() throws InterruptedException {
//.....省略了部分代码
try {
//当前选票存放的集合
Map<Long, Vote> recvset = new HashMap<>();
Map<Long, Vote> outofelection = new HashMap<>();
int notTimeout = minNotificationInterval;
synchronized (this) {
logicalclock.incrementAndGet();
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
LOG.info(
"New election. My id = {}, proposed zxid=0x{}",
self.getMyId(),
Long.toHexString(proposedZxid));
//发送选票信息
sendNotifications();
SyncedLearnerTracker voteSet = null;
//循环交换选票信息 直达选出leader节点
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
// 从接收队列中进行获取选票信息
Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
//如果选票信息为null 当zkServer节点第一次启动的时候肯定是null
if (n == null) {
//manager.haveDelivered() 这个方法主要进行判断是否有已经连接的机器信息
if (manager.haveDelivered()) {
//如果已经有连接的机器信息的话 就进行给所有的节点发送选票信息
sendNotifications();
} else {
//zkServer节点第一次启动的时候 连接机器的信息列表肯定为空
//所以第一次的时候肯定会进行连接其他机器信息
manager.connectAll();
}
/*
* Exponential backoff
*/
notTimeout = Math.min(notTimeout << 1, maxNotificationInterval);
/*
* When a leader failure happens on a master, the backup will be supposed to receive the honour from
* Oracle and become a leader, but the honour is likely to be delay. We do a re-check once timeout happens
*
* The leader election algorithm does not provide the ability of electing a leader from a single instance
* which is in a configuration of 2 instances.
* */
if (self.getQuorumVerifier() instanceof QuorumOracleMaj
&& self.getQuorumVerifier().revalidateVoteset(voteSet, notTimeout != minNotificationInterval)) {
setPeerState(proposedLeader, voteSet);
Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
leaveInstance(endVote);
return endVote;
}
LOG.info("Notification time out: {} ms", notTimeout);
} else if (validVoter(n.sid) && validVoter(n.leader)) {
//....省略很多代码
} else {
//....省略很多代码
}
}
return null;
} finally {
try {
if (self.jmxLeaderElectionBean != null) {
MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount());
}
}
5.集群中机器互联
zk节点最开始启动的时候,会进行leader节点的选举,在选举的过程中,要进行选票的统计,但要进行选票的统计的时候,需要接收zk集群中所有节点的数据。
//连接集群中的其他机器
public void connectAll() {
long sid;
//循环遍历进行拦截机器信息
for (Enumeration<Long> en = queueSendMap.keys(); en.hasMoreElements(); ) {
sid = en.nextElement();
connectOne(sid);
}
}
通过代码的一步一步的进去查看,我们找到了最终进行连接的操作, 这块zookeeper是进行启动一个异步线程进行连接操作,
public boolean initiateConnectionAsync(final MultipleAddresses electionAddr, final Long sid) {
if (!inprogressConnections.add(sid)) {
LOG.debug("Connection request to server id: {} is already in progress, so skipping this request", sid);
return true;
}
try {
//线程池启动一个异步线程进行连接别的zk节点信息
connectionExecutor.execute(new QuorumConnectionReqThread(electionAddr, sid));
connectionThreadCnt.incrementAndGet();
} catch (Throwable e) {
inprogressConnections.remove(sid);
LOG.error("Exception while submitting quorum connection request", e);
return false;
}
return true;
}
//异步连接请求线程
private class QuorumConnectionReqThread extends ZooKeeperThread {
final MultipleAddresses electionAddr;
final Long sid;
QuorumConnectionReqThread(final MultipleAddresses electionAddr, final Long sid) {
super("QuorumConnectionReqThread-" + sid);
this.electionAddr = electionAddr;
this.sid = sid;
}
@Override
public void run() {
try {
//连接请求
initiateConnection(electionAddr, sid);
} finally {
inprogressConnections.remove(sid);
}
}
}
initiateConnection方法是初始化连接请求的数据,
//初始化连接请求
public void initiateConnection(final MultipleAddresses electionAddr, final Long sid) {
Socket sock = null;
try {
LOG.debug("Opening channel to server {}", sid);
//根据是否是SSL的类型进行创建不同的socket
if (self.isSslQuorum()) {
sock = self.getX509Util().createSSLSocket();
} else {
sock = SOCKET_FACTORY.get();
}
//设置socket的一些属性
//tcpNoDelay soTimeout keepAlive等参数信息
setSockOpts(sock);
//开始正儿八经的连接操作
sock.connect(electionAddr.getReachableOrOne(), cnxTO);
if (sock instanceof SSLSocket) {
SSLSocket sslSock = (SSLSocket) sock;
sslSock.startHandshake();
LOG.info("SSL handshake complete with {} - {} - {}",
sslSock.getRemoteSocketAddress(),
sslSock.getSession().getProtocol(),
sslSock.getSession().getCipherSuite());
}
LOG.debug("Connected to server {} using election address: {}:{}",
sid, sock.getInetAddress(), sock.getPort());
} catch (X509Exception e) {
LOG.warn("Cannot open secure channel to {} at election address {}", sid, electionAddr, e);
closeSocket(sock);
return;
} catch (UnresolvedAddressException | IOException e) {
LOG.warn("Cannot open channel to {} at election address {}", sid, electionAddr, e);
closeSocket(sock);
return;
}
try {
//连接完成之后的一些处理,
//包含设置一些输入,输出流,读写线程的启动等
startConnection(sock, sid);
} catch (IOException e) {
LOG.error(
"Exception while connecting, id: {}, addr: {}, closing learner connection",
sid,
sock.getRemoteSocketAddress(),
e);
closeSocket(sock);
}
}
机器连接完成的一些操作,当机器连接完成之后,会进行调用startConnection方法
private boolean startConnection(Socket sock, Long sid) throws IOException {
//data数据的输出流 从socket中进行获取并进行封装
DataOutputStream dout = null;
//data数据的输入流,从socket中获取并进行封装
DataInputStream din = null;
LOG.debug("startConnection (myId:{} --> sid:{})", self.getMyId(), sid);
try {
BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
dout = new DataOutputStream(buf);
//连接完成之后 向连接的zk节点输出自己的节点id
long protocolVersion = self.isMultiAddressEnabled() ? PROTOCOL_VERSION_V2 : PROTOCOL_VERSION_V1;
dout.writeLong(protocolVersion);
dout.writeLong(self.getMyId());
Collection<InetSocketAddress> addressesToSend = protocolVersion == PROTOCOL_VERSION_V2
? self.getElectionAddress().getAllAddresses()
: Arrays.asList(self.getElectionAddress().getOne());
String addr = addressesToSend.stream()
.map(NetUtils::formatInetAddr).collect(Collectors.joining("|"));
byte[] addr_bytes = addr.getBytes();
dout.writeInt(addr_bytes.length);
dout.write(addr_bytes);
dout.flush();
din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
} catch (IOException e) {
LOG.warn("Ignoring exception reading or writing challenge: ", e);
closeSocket(sock);
return false;
}
QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);
if (qps != null) {
authLearner.authenticate(sock, qps.hostname);
}
//这块有一个逻辑 我觉得可以变更一下 这块是判断sid 如果小于自身的sid的时候
//会进行关闭连接 那这块我觉得是不是可以在连接的时候 只连接比自己大的sid就可以了
//而且这块的sid也是自己的sid 根本就不会大于 这个if里的操作就不会执行
if (sid > self.getMyId()) {
LOG.info("Have smaller server identifier, so dropping the connection: (myId:{} --> sid:{})", self.getMyId(), sid);
closeSocket(sock);
} else {
LOG.debug("Have larger server identifier, so keeping the connection: (myId:{} --> sid:{})", self.getMyId(), sid);
// 开始两个线程个 一个读的线程 一个写的线程 并进行启动 读写线程
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if (vsw != null) {
vsw.finish();
}
senderWorkerMap.put(sid, sw);
queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));
//启动读写线程
sw.start();
rw.start();
return true;
}
return false;
}
有连接就会有被连接的,比方说sid=1的节点进行发起连接的时候,别的zk节点是如何进行接收他的连接请求呢。这块代码逻辑是在哪里,还记得我们在创建选举算法的时候,会进行创建网络连接器,在网络连接器中有一个QuorumCnxManager.Listener,这个Listener会根据集群的数量启动一定的数量的ListenerHandler来进行监听连接。
public void run() {
if (!shutdown) {
LOG.debug("Listener thread started, myId: {}", self.getMyId());
//获取所有的连接地址的大小
Set<InetSocketAddress> addresses;
if (self.getQuorumListenOnAllIPs()) {
addresses = self.getElectionAddress().getWildcardAddresses();
} else {
addresses = self.getElectionAddress().getAllAddresses();
}
//启动一个CountDownLatch 大小为连接地址集合的大小
CountDownLatch latch = new CountDownLatch(addresses.size());
listenerHandlers = addresses.stream().map(address ->
new ListenerHandler(address, self.shouldUsePortUnification(), self.isSslQuorum(), latch))
.collect(Collectors.toList());
// 异步线程提交 listenerHandlers
final ExecutorService executor = Executors.newFixedThreadPool(addresses.size());
try {
listenerHandlers.forEach(executor::submit);
} finally {
executor.shutdown();
}
try {
//在此进行等待,等待所有的节点都连接成功
latch.await();
} catch (InterruptedException ie) {
LOG.error("Interrupted while sleeping. Ignoring exception", ie);
} finally {
//解释所有的ListenerHandler监听
for (ListenerHandler handler : listenerHandlers) {
try {
handler.close();
} catch (IOException ie) {
LOG.debug("Error closing server socket", ie);
}
}
}
}
LOG.info("Leaving listener");
if (!shutdown) {
LOG.error(
"As I'm leaving the listener thread, I won't be able to participate in leader election any longer: {}",
self.getElectionAddress().getAllAddresses().stream()
.map(NetUtils::formatInetAddr)
.collect(Collectors.joining("|")));
if (socketException.get()) {
// After leaving listener thread, the host cannot join the quorum anymore,
// this is a severe error that we cannot recover from, so we need to exit
socketBindErrorHandler.run();
}
}
}
ListenerHandler的主要逻辑 :进行创建ServerSocket这个东西,然后调用serverSocket.accept(),进行接受别的scoket的连接,接收到别的连接之后也会进行封装输入流和输出流,然后启动读写线程,用来进行接收后续的消息,但是这块有一个不同的地方。在接收完连接之后,会有一个handleConnection方法,在这个方法中会进行读取连接请求发送过来的sid,当sid小于当前sid的时候会尽心关闭连接,然后自己在主动发起一个连接请求。
public void run() {
try {
Thread.currentThread().setName("ListenerHandler-" + address);
//接收请求参数信息
acceptConnections();
try {
close();
} catch (IOException e) {
LOG.warn("Exception when shutting down listener: ", e);
}
} catch (Exception e) {
// Output of unexpected exception, should never happen
LOG.error("Unexpected error ", e);
} finally {
//lietener中传入的countDownLatch 进行减一的操作
latch.countDown();
}
}
//acceptConnections方法
private void acceptConnections() {
int numRetries = 0;
Socket client = null;
//如果机器没有宕机并且重试次数还没达到最大次数的时候 会在这里进行循环等待连接
while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {
try {
serverSocket = createNewServerSocket();
LOG.info("{} is accepting connections now, my election bind port: {}", QuorumCnxManager.this.mySid, address.toString());
while (!shutdown) {
try {
client = serverSocket.accept();
setSockOpts(client);
LOG.info("Received connection request from {}", client.getRemoteSocketAddress());
if (quorumSaslAuthEnabled) {
//异步接收连接请求
receiveConnectionAsync(client);
} else {
//同步接收连接请求
receiveConnection(client);
}
numRetries = 0;
} catch (SocketTimeoutException e) {
LOG.warn("The socket is listening for the election accepted "
+ "and it timed out unexpectedly, but will retry."
+ "see ZOOKEEPER-2836");
}
}
} catch (IOException e) {
if (shutdown) {
break;
}
LOG.error("Exception while listening to address {}", address, e);
if (e instanceof SocketException) {
socketException.set(true);
}
numRetries++;
try {
close();
Thread.sleep(1000);
} catch (IOException ie) {
LOG.error("Error closing server socket", ie);
} catch (InterruptedException ie) {
LOG.error("Interrupted while sleeping. Ignoring exception", ie);
}
closeSocket(client);
}
}
if (!shutdown) {
LOG.error( "Leaving listener thread for address {} after {} errors. Use {} property to increase retry count.",
formatInetAddr(address),
numRetries,
ELECTION_PORT_BIND_RETRY);
}
}