ZK集群的Leader选举源码阅读
- 2019 年 10 月 4 日
- 筆記
前言
ZooKeeper对Zab协议的实现有自己的主备模型,即Leader和learner(Observer + Follower),有如下几种情况需要进行领导者的选举工作
- 情形1: 集群在启动的过程中,需要选举Leader
- 情形2: 集群正常启动后,leader因故障挂掉了,需要选举Leader
- 情形3: 集群中的Follower数量不足以通过半数检验,Leader会挂掉自己,选举新leader
- 情景4: 集群正常运行,新增加1个Follower
本篇博文,从这四个方面进行源码的追踪阅读
程序入口
QuorumPeer.java
相当于集群中的每一个节点server,在它的start()
方法中,完成当前节点的启动工作,源码如下:
// todo 进入了 QuorumPeer(意为仲裁人数)类中,可以把这个类理解成集群中的某一个点 @Override public synchronized void start() { // todo 从磁盘中加载数据到内存中 loadDataBase(); // todo 启动上下文的这个工厂,他是个线程类, 接受客户端的请求 cnxnFactory.start(); // todo 开启leader的选举工作 startLeaderElection(); // todo 确定服务器的角色, 启动的就是当前类的run方法在900行 super.start(); }
第一个loadDataBase();
目的是将数据从集群中恢复到内存中
第二个cnxnFactory.start();
是当前的节点可以接受来自客户端(java代码,或者控制台)发送过来的连接请求
第三个startLeaderElection();
开启leader的选举工作, 但是其实他是初始化了一系列的辅助类,用来辅助leader的选举,并非真正在选举
当前类,quorumPeer
继承了ZKThread,它本身就是一个线程类,super.start();
就是启动它的run方法,在他的Run方法中有一个while循环,一开始在程序启动的阶段,所有的节点的默认值都是Looking
,于是会进入这个分支中,在这个分之中会进行真正的leader选举工作
小结
从程序的入口介绍中,可以看出本篇文章在会着重看下startLeaderElection();
做了哪些工作? 以及在looking
分支中如何选举leader
情形1:集群在启动的过程中,选举新Leader
进入startLeaderElection();
方法,源码如下, 他主要做了两件事
- 对本类
QuorumPeer.java
维护的变量(volatile private Vote currentVote;
)初始化 -
createElectionAlgorithm()
创建一个leader选举的方法其实到现在,就剩下一个算法没过期了,就是
fastLeaderElection
// TODO 开启投票选举Leader的工作 synchronized public void startLeaderElection() { try { // todo 创建了一个封装了投票结果对象 包含myid 最大的zxid 第几轮Leader // todo 先投票给自己 // todo 跟进它的构造函数 currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch()); } catch(IOException e) { RuntimeException re = new RuntimeException(e.getMessage()); re.setStackTrace(e.getStackTrace()); throw re; } for (QuorumServer p : getView().values()) { if (p.id == myid) { myQuorumAddr = p.addr; break; } } if (myQuorumAddr == null) { throw new RuntimeException("My id " + myid + " not in the peer list"); } if (electionType == 0) { try { udpSocket = new DatagramSocket(myQuorumAddr.getPort()); responder = new ResponderThread(); responder.start(); } catch (SocketException e) { throw new RuntimeException(e); } } // todo 创建一个领导者选举的算法,这个算法还剩下一个唯一的实现 快速选举 this.electionAlg = createElectionAlgorithm(electionType); }
继续跟进createElectionAlgorithm(electionType)
, 在这个方法中做了如下三件大事
- 创建了
QuorumCnxnManager
- 创建
Listenner
- 创建
FastLeaderElection
protected Election createElectionAlgorithm(int electionAlgorithm){ Election le=null; //TODO: use a factory rather than a switch switch (electionAlgorithm) { case 0: le = new LeaderElection(this); break; case 1: le = new AuthFastLeaderElection(this); break; case 2: le = new AuthFastLeaderElection(this, true); break; case 3: // todo 创建CnxnManager 上下文的管理器 qcm = createCnxnManager(); QuorumCnxManager.Listener listener = qcm.listener; if(listener != null){ // todo 在这里将listener 开启 listener.start(); // todo 实例化领导者选举的算法 le = new FastLeaderElection(this, qcm); } else { LOG.error("Null listener when initializing cnx manager"); } break;
准备选举环境
QuorumManager
上图是QuorumCnxManager的类图,看一下,它有6个内部类, 其中的除了Message
外其他都是可以单独运行的线程类
这个类有着举足轻重的作用,它是集群中全体节点共享辅助类, 那到底有什么作用呢? 我不卖关子直接说,因为leader的选举是通过投票决议出来的,既然要相互投票,那集群中的各个点就得两两之间建立连接,这个QuorumCnxManager
就负责维护集群中的各个点的通信
它维护了两种队列,源码在下面,第一个队列被存入了ConcurrentHashMap
中 key就是节点的myid(或者说是serverId),值可以理解成存储它往其他服务器发送投票的队列
第二个队列是收到的其他服务器发送过来的msg
// todo key=serverId(myid) value = 保存着当前服务器向其他服务器发送消息的队列 final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap; // todo 接收到的所有数据都在这个队列中 public final ArrayBlockingQueue<Message> recvQueue;
如上图是手绘的QuorumCnxManager.java
的体系图,最直观的可以看到它内部的三条线程类,那三条线程类的run()方法又分别做了什么呢?
SendWorker的run(), 可以看到它根据sid取出了当前节点对应的队列,然后将队列中的数据往外发送
public void run() { threadCnt.incrementAndGet(); try { ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid); if (bq == null || isSendQueueEmpty(bq)) { ByteBuffer b = lastMessageSent.get(sid); if (b != null) { LOG.debug("Attempting to send lastMessage to sid=" + sid); send(b); } } } catch (IOException e) { LOG.error("Failed to send last message. Shutting down thread.", e); this.finish(); } try { while (running && !shutdown && sock != null) { ByteBuffer b = null; try { // todo 取出任务所在的队列 ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid); if (bq != null) { // todo 将bq,添加进sendQueue b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS); } else { LOG.error("No queue of incoming messages for " + "server " + sid); break; } if(b != null){ lastMessageSent.put(sid, b); // todo send(b); } } catch (InterruptedException e) { LOG.warn("Interrupted while waiting for message on queue", e); } }
RecvWorker的run方法,接受到了msg,然后将msg存入了recvQueue
队列中
public void run() { threadCnt.incrementAndGet(); try { while (running && !shutdown && sock != null) { /** * Reads the first int to determine the length of the * message */ int length = din.readInt(); if (length <= 0 || length > PACKETMAXSIZE) { throw new IOException( "Received packet with invalid packet: " + length); } /** * Allocates a new ByteBuffer to receive the message */ // todo 从数组中把数据读取到数组中 byte[] msgArray = new byte[length]; din.readFully(msgArray, 0, length); // todo 将数组包装成ByteBuf ByteBuffer message = ByteBuffer.wrap(msgArray); // todo 添加到RecvQueue中 addToRecvQueue(new Message(message.duplicate(), sid)); }
]
Listenner的run(),它会使用我们在配置文件中配置的集群键通信使用的端口(如上图的3888)建立彼此之间的连接
还能发现,集群中各个点之间的通信使用的传统socket通信
InetSocketAddress addr; while((!shutdown) && (numRetries < 3)){ try { // todo 创建serversocket ss = new ServerSocket(); ss.setReuseAddress(true); if (listenOnAllIPs) { int port = view.get(QuorumCnxManager.this.mySid) .electionAddr.getPort(); //todo 它取出来的地址就是address就是我们在配置文件中配置集群时添加进去的 port 3888... addr = new InetSocketAddress(port); } else { addr = view.get(QuorumCnxManager.this.mySid) .electionAddr; } LOG.info("My election bind port: " + addr.toString()); setName(view.get(QuorumCnxManager.this.mySid) .electionAddr.toString()); // todo 绑定端口 ss.bind(addr); while (!shutdown) { // todo 阻塞接受其他的服务器发起连接 Socket client = ss.accept(); setSockOpts(client); LOG.info("Received connection request " + client.getRemoteSocketAddress()); // todo 如果启用了仲裁SASL身份验证,则异步接收和处理连接请求 // todo 这是必需的,因为sasl服务器身份验证过程可能需要几秒钟才能完成,这可能会延迟下一个对等连接请求。 if (quorumSaslAuthEnabled) { // todo 异步接受一个连接 receiveConnectionAsync(client); } else { // todo 跟进这个方法 receiveConnection(client); } numRetries = 0; }
继续跟进源码,回到QuorumPeer.java
的createElectionAlgorithm()
方法中,重新截取源码如下,完成了QuorumCnxManager
的创建,后进行Listener的启动, Listenner的启动标记着集群中的各个节点之间有了两两之间建立通信能力, 同时Listenner是个线程类,它的Run()方法就在上面的代码中
FastLeaderElection
启动Listenner之后, 开始实例化领导者选举的算法对象new FastLeaderElection(this, qcm)
... break; case 3: // todo 创建CnxnManager 上下文的管理器 qcm = createCnxnManager(); QuorumCnxManager.Listener listener = qcm.listener; if(listener != null){ // todo 在这里将listener 开启 listener.start(); // todo 实例化领导者选举的算法 le = new FastLeaderElection(this, qcm); } else { LOG.error("Null listener when initializing cnx manager"); }
如下图是FasterElection
的类图
直观的看到它三个直接内部类
- Messager(它又有两个内部线程类)
- WorkerRecriver
- 负责将
- WorkerSender
- WorkerRecriver
- Notification
- 一般是当新节点启动时状态为looking,然后发起投票决议,其他节点收到后会用
Notification
告诉它自己信任的leader
- 一般是当新节点启动时状态为looking,然后发起投票决议,其他节点收到后会用
- ToSend
- 给对方发送,或者来自其他节点的消息。这些消息既可以是通知,也可以是接收通知的ack
对应着QuorumCnxManager
维护的两种队列,FasterElection
同样维护下面两个队列与之照应,一个是sendqueue
另一个是recvqueue
LinkedBlockingQueue<ToSend> sendqueue; LinkedBlockingQueue<Notification> recvqueue;
具体怎么玩呢? 如下图
就是当节点启动过程中对外的投票会存入FasterElection
的sendqueue
,然后经过QuorumCnxManager
的sendWorker
通过NIO发送出去, 与之相反的过程,收到的其他节点的投票会被QuorumCnxManager
的recvWorker
收到,然后存入QuorumCnxManager
的recvQueue中
,这个队列中的msg会继续被FasterElection
的内部线程类workerRecviver
取出存放到FasterElection
的recvqueue中
通过追踪代码,可以发现,Message的两个内部线程都被作为守护线程的方式开启
Messenger(QuorumCnxManager manager) { // todo WorderSender 作为一条新的线程 this.ws = new WorkerSender(manager); Thread t = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]"); t.setDaemon(true); t.start(); //todo------------------------------------ // todo WorkerReceiver 作为一条新的线程 this.wr = new WorkerReceiver(manager); t = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]"); t.setDaemon(true); t.start(); }
小结
代码看到这里,其实选举leader的准备工作已经完成了,也就是说quorumPeer.java
的start()
方法中的startLeaderElection();
已经准备领导选举的环境,就是上图
真正开始选举
下面就去看一下quorumPeer.java
的这个线程类的启动,部分run()
方法的截取,我们关心它的lookForLeader()
方法
while (running) { switch (getPeerState()) { /** * todo 四种可能的状态, 经过了leader选举之后, 不同的服务器就有不同的角色 * todo 也就是说,不同的服务器会会走动下面不同的分支中 * LOOKING 正在进行领导者选举 * Observing * Following * Leading */ case LOOKING: // todo 当为Looking状态时,会进入领导者选举的阶段 LOG.info("LOOKING"); if (Boolean.getBoolean("readonlymode.enabled")) { LOG.info("Attempting to start ReadOnlyZooKeeperServer"); // Create read-only server but don't start it immediately // todo 创建了一个 只读的server但是不着急立即启动它 final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer( logFactory, this, new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb); // Instead of starting roZk immediately, wait some grace(优雅) period(期间) before we decide we're partitioned. // todo 为了立即启动roZK ,在我们决定分区之前先等一会 // Thread is used here because otherwise it would require changes in each of election strategy classes which is // unnecessary code coupling. //todo 这里新开启一条线程,避免每一个选举策略类上有不同的改变 而造成的代码的耦合 Thread roZkMgr = new Thread() { public void run() { try { // lower-bound grace period to 2 secs sleep(Math.max(2000, tickTime)); if (ServerState.LOOKING.equals(getPeerState())) { // todo 启动上面那个只读的Server roZk.startup(); } } catch (InterruptedException e) { LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started"); } catch (Exception e) { LOG.error("FAILED to start ReadOnlyZooKeeperServer", e); } } }; try { roZkMgr.start(); setBCVote(null); // todo 上面的代码都不关系,直接看它的 lookForLeader()方法 // todo 直接点进去,进入的是接口,我们看它的实现类 setCurrentVote(makeLEStrategy().lookForLeader()); } catch (Exception e) { LOG.warn("Unexpected exception",e); setPeerState(ServerState.LOOKING); } finally { // If the thread is in the the grace period, interrupt // to come out of waiting. roZkMgr.interrupt(); roZk.shutdown(); }
下面是lookForLeader()
的源码解读
说实话这个方法还真的是挺长的,但是吧这个方法真的很重要,因为我们可以从这个方法中找到网络上大家针对Leader的选举总结的点点滴滴
第一点: 每次的投票都会先投自己一票,说白了new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
将自己的myid,最大的zxid,以及第几届封装起来,但是还有一个细节,就是在投自己的同时,还是会将存有自己信息的这一票通过socket发送给其他的节点
接受别人的投票是通过QuorumManager
的recvWorker
线程类将投票添加进recvQueue
队列中,投票给自己时,就不走这条路线了,而是选择直接将票添加进recvQueue
队列中
在下面代码中存在一行HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
这个map可以理解成一个小信箱,每一个节点都会维护一个信箱,这里面可能存放着自己投给自己的票,或者别人投给自己的票,或者别人投给别人的票,或者自己投给别人的票,通过统计这个信箱中的票数可以决定某一个节点是否可以成为leader,源码如下, 使用信箱中的信息,
// todo 根据别人的投票,以及自己的投票判断,本轮得到投票的集群能不能成为leader if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch))) { // todo 到这里说明接收到投票的机器已经是准leader了 // Verify if there is any change in the proposed leader // todo 校验一下, leader有没有变动 while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) { if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { recvqueue.put(n); break; } } if (n == null) { // todo 判断自己是不是leader, 如果是,更改自己的状态未leading , 否则根据配置文件确定状态是 Observer 还是Follower // todo leader选举出来后, QuorumPeer中的run方法中的while再循环,不同角色的服务器就会进入到 不同的分支 self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING : learningState()); Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch); leaveInstance(endVote); return endVote; } }
在termPredicate()
函数中有如下的逻辑,self.getQuorumVerifier().containsQuorum(set);
它的实现如下,实际上就是在进行过半机制的检验,结论就是当某个节点拥有了集群中一半以上的节点的投票时,它就会把自己的状态修改成leading, 其他的节点根据自己的需求将状态该变成following或者observing
public boolean containsQuorum(Set<Long> set){ return (set.size() > half); }
维护着一个时钟,标记这是第几次投票了logicalclock
他是AutomicLong类型的变量,他有什么用呢? 通过下面的代码可以看到如下的逻辑,就是当自己的时钟比当前接收到投票的时钟小时,说明自己可能因为其他原因错过了某次投票,所以更新自己的时钟,重新判断投自己还是投别人, 同理,如果接收到的投票的时钟小于自己当前的时钟,说明这个票是没有意义的,直接丢弃不理会
if (n.electionEpoch > logicalclock.get()) { // todo 将自己的时钟调整为更新的时间 logicalclock.set(n.electionEpoch); // todo 清空自己的投票箱 recvset.clear();
那么根据什么判断是投给自己还是投给别人呢? 通过解析出票的封装类中封装的节点的信息,什么信息呢?zxid,myid,epoch 通常情况是epoch大的优先成为leader,一般来说epoch都会相同,所以zxid大的优先成为leader,如果zxid再相同,则myid大的优先成为leader
检查到别的节点比自己更适合当leader,会重新投票,选举更适合的节点
完整的源码
// todo 当前进入的是FastLeaderElection.java的实现类 public Vote lookForLeader() throws InterruptedException { try { // todo 创建用来选举Leader的Bean self.jmxLeaderElectionBean = new LeaderElectionBean(); MBeanRegistry.getInstance().register( self.jmxLeaderElectionBean, self.jmxLocalPeerBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); self.jmxLeaderElectionBean = null; } if (self.start_fle == 0) { self.start_fle = Time.currentElapsedTime(); } try { // todo 每台服务器独有的投票箱 , 存放其他服务器投过来的票的map // todo long类型的key (sid)标记谁给当前的server投的票 Vote类型的value 投的票 HashMap<Long, Vote> recvset = new HashMap<Long, Vote>(); HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>(); int notTimeout = finalizeWait; synchronized (this) { //todo Automic 类型的时钟 logicalclock.incrementAndGet(); //todo 一开始启动时,入参位置的值都取自己的,相当于投票给自己 updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } LOG.info("New election. My id = " + self.getId() + ", proposed zxid=0x" + Long.toHexString(proposedZxid)); // todo 发送出去,投票自己 sendNotifications(); /* * Loop in which we exchange notifications until we find a leader */ // todo 如果自己一直处于LOOKING的状态,一直循环 while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) { /* * Remove next notification from queue, times out after 2 times * the termination time */ //todo 尝试获取其他服务器的投票的信息 // todo 从接受消息的队列中取出一个msg(这个队列中的数据就是它投票给自己的票) // todo 在QuorumCxnManager.java中 发送的投票的逻辑中,如果是发送给自己的,就直接加到recvQueue,而不经过socket // todo 所以它在这里是取出了自己的投票 Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS); /* * Sends more notifications if haven't received enough. * Otherwise processes new notification. */ // todo 第一轮投票这里不为空 if (n == null) { // todo 第二轮就没有投票了,为null, 进入这个分支 // todo 进行判断 ,如果集群中有三台服务器,现在仅仅启动一台服务器,还剩下两台服务器没启动 // todo 那就会有3票, 其中1票直接放到 recvQueue , 另外两票需要发送给其他两台机器的逻辑就在这里判断 // todo 验证是通不过的,因为queueSendMap中的两条队列都不为空 if (manager.haveDelivered()) { sendNotifications(); } else { // todo 进入这个逻辑 manager.connectAll(); } /* * Exponential backoff */ int tmpTimeOut = notTimeout * 2; notTimeout = (tmpTimeOut < maxNotificationInterval ? tmpTimeOut : maxNotificationInterval); LOG.info("Notification time out: " + notTimeout); } else if (validVoter(n.sid) && validVoter(n.leader)) { // todo 收到了其他服务器的投票信息后,来到下面的分支中处理 /* * Only proceed if the vote comes from a replica in the * voting view for a replica in the voting view. * todo 仅当投票来自投票视图中的副本时,才能继续进行投票。 */ switch (n.state) { case LOOKING: // todo 表示获取到投票的服务器的状态也是looking // If notification > current, replace and send messages out // todo 对比接收到的头片的 epoch和当前时钟先后 // todo 接收到的投票 > 当前服务器的时钟 // todo 表示当前server在投票过程中可能以为故障比其他机器少投了几次,需要重新投票 if (n.electionEpoch > logicalclock.get()) { // todo 将自己的时钟调整为更新的时间 logicalclock.set(n.electionEpoch); // todo 清空自己的投票箱 recvset.clear(); // todo 用别人的信息和自己的信息对比,选出一个更适合当leader的,如果还是自己适合,不作为, 对方适合,修改投票,投 对方 if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { updateProposal(n.leader, n.zxid, n.peerEpoch); } else { updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } sendNotifications(); // todo 接收到的投票 < 当前服务器的时钟 // todo 说明这个投票已经不能再用了 } else if (n.electionEpoch < logicalclock.get()) { if (LOG.isDebugEnabled()) { LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch) + ", logicalclock=0x" + Long.toHexString(logicalclock.get())); } break; // todo 别人的投票时钟和我的时钟是相同的 // todo 满足 totalOrderPredicate 后,会更改当前的投票,重新投票 /** * 在 totalOrderPredicate 比较两者之间谁更满足条件 * ((newEpoch > curEpoch) || * ((newEpoch == curEpoch) && * ((newZxid > curZxid) || * ((newZxid == curZxid) && * (newId > curId))))); */ // todo 返回true说明 对方更适合当leader } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); } if (LOG.isDebugEnabled()) { LOG.debug("Adding vote: from=" + n.sid + ", proposed leader=" + n.leader + ", proposed zxid=0x" + Long.toHexString(n.zxid) + ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch)); } // todo 将自己的投票存放到投票箱子中 recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); // todo 根据别人的投票,以及自己的投票判断,本轮得到投票的集群能不能成为leader if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch))) { // todo 到这里说明接收到投票的机器已经是准leader了 // Verify if there is any change in the proposed leader // todo 校验一下, leader有没有变动 while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) { if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { recvqueue.put(n); break; } } /* * This predicate is true once we don't read any new * relevant message from the reception queue */ if (n == null) { // todo 判断自己是不是leader, 如果是,更改自己的状态未leading , 否则根据配置文件确定状态是 Observer 还是Follower // todo leader选举出来后, QuorumPeer中的run方法中的while再循环,不同角色的服务器就会进入到 不同的分支 self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING : learningState()); Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch); leaveInstance(endVote); return endVote; } } break; case OBSERVING: // todo 禁止Observer参加投票 LOG.debug("Notification from observer: " + n.sid); break; case FOLLOWING: case LEADING: /* * Consider all notifications from the same epoch * together. */ if (n.electionEpoch == logicalclock.get()) { recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); if (ooePredicate(recvset, outofelection, n)) { self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING : learningState()); Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } } /* * Before joining an established ensemble, verify * a majority is following the same leader. */ outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); if (ooePredicate(outofelection, outofelection, n)) { synchronized (this) { logicalclock.set(n.electionEpoch); self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING : learningState()); } Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } break; default: LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)", n.state, n.sid); break; } } else { if (!validVoter(n.leader)) { LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid); } if (!validVoter(n.sid)) { LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid); } } } return null;
经过如上的判断各个节点的就可以选举出不同的角色,再次回到QuorumPeer.java
的run()
中进行循环时,不再会进入case LOOKING:
代码块了,而是按照自己不同的角色各司其职,完成不同的初始化启动
情形2: 集群正常启动后,leader因故障挂掉了,选举新Leader
第二种选举leader的情况,集群正常启动后,leader因故障挂掉了,选举新Leader
这部分的逻辑是怎样的呢?
leader虽然挂了,但是角色为Follower的server依然会去执行QuorumPeer.java
的run()
方法中的无限while循环,当它执行follower.followLeader();
方法时找不到leader,就会出异常,最终执行finally
代码块中的逻辑,可以看到它修改了自己的状态为looking,进而重新选举leader
break; case FOLLOWING: // todo server 当选follow角色 try { LOG.info("FOLLOWING"); setFollower(makeFollower(logFactory)); follower.followLeader(); } catch (Exception e) { LOG.warn("Unexpected exception",e); } finally { follower.shutdown(); setFollower(null); setPeerState(ServerState.LOOKING); } break;
情形3: 集群中的Follower数量不足以通过半数检验,Leader会挂掉自己,然后选举新leader
情形3: 假设集群中2台Follower,1台leader,那么当挂掉一台Follower时,剩下1台Follower无法满足过半检查机制因此会重新选举leader
回到源码:leader每次都进入case LEADING:
去执行leader.lead();
case LEADING: // todo 服务器成功当选成leader LOG.info("LEADING"); try { setLeader(makeLeader(logFactory)); // todo 跟进lead leader.lead(); setLeader(null); } catch (Exception e) { LOG.warn("Unexpected exception",e); } finally { if (leader != null) { leader.shutdown("Forcing shutdown"); setLeader(null); } setPeerState(ServerState.LOOKING); } break;
但是在leader.lead();
中每次执行都会进行如下的判断,很明显,当不满足半数检验时,leader直接挂掉自己,最终将集群中所有节点的状态改成LOOKING
,重新选举
if (!tickSkip && !self.getQuorumVerifier().containsQuorum(syncedSet)) { //if (!tickSkip && syncedCount < self.quorumPeers.size() / 2) { // Lost quorum, shutdown shutdown("Not sufficient followers synced, only synced with sids: [ " + getSidSetString(syncedSet) + " ]"); // make sure the order is the same! // the leader goes to looking return; }
情景4: 集群正常运行,新增加1个Follower
新增加的进来的Follower在启动时它的状态是looking, 同样她也会去尝试选举leader,同样会把第一票投给自己,但是对于一个稳定的集群来说
集群中的各个橘色已经确定下来了,在这种情况下,会进入FastLeaderElection.java
的lookForLeader()
方法的如下分支,使当前新添加进来的节点
直接认Leader
case OBSERVING: // todo 禁止Observer参加投票 LOG.debug("Notification from observer: " + n.sid); break; case FOLLOWING: case LEADING: /* * Consider all notifications from the same epoch * together. */ if (n.electionEpoch == logicalclock.get()) { recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); if (ooePredicate(recvset, outofelection, n)) { self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING : learningState()); Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } }
如果有错误欢迎指出,如果对您有帮助,欢迎点支持