Fork me on GitHub

Zookeeper

Zookeeper概述

背景

随着互联网技术的高速发展,企业对计算机系统的计算、储存能力要求越来越高,最简单的证明就是出现了一些诸如:高并发,海量储存这样的词汇。在这样的背景下,单纯依靠少量高性能主机来完成计算任务已经不能满足企业的需求,企业的IT架构逐渐从集中式向分布式过渡,所谓的分布式是指:把一个计算任务分解成若干个计算单元,并且分派到若干不同的计算机中去执行,然后汇总计算结果的过程!

分布式系统要解决的核心任务是如何把众多的计算机协同起来完成计算任务

Zookeeper是什么?

Zookeeper是源代码开放的分布式协调服务,解决分布式数据一致性问题。它是一个分布式的服务协调组件

Zookeeper的基本概念

集群角色

Leader、Follower、Observer

  • Leader服务器是整个Zookeeper集群工作机制中的核心
  • Follower服务器是Zookeeper集群状态的跟随者
  • Observer服务器充当一个观察者的角色
  • 客户端是请求发起方

心跳检测

Zk中我们让所有的机器都注册一个临时节点,我们判断一个机器是否可用,我们只需要判断这个节点在zk中是否存在就可以了。

会话

会话是指客户端和Zookeeper服务器的连接,Zookeeper中的会话叫Session,客户端靠与服务器建立一个TCP的长连接来维持一个Session,客户端在启动的时候首先会与服务器建立一个TCP连接,通过这个连接,客户端能够通过心跳检测与服务器保持有效的会话,也能向ZK服务器发送请求并获得响应

Zookeeper安装

  • 下载后解压安装Zookeeper包,官方下载链接点击这里
  • 根据Zookeeper集群节点情况,创建如下格式的Zookeeper配置文件zoo.cfg:
    1
    2
    3
    4
    5
    6
    7
    8
    tickTime=2000 //基本时间度量单位,一次心跳时间
    dataDir=/var/zookeeper/config
    clientPort=2181
    initLimit=5 //初始化连接时, follower和leader之间的最长心跳时间5 * 2000ms = 10s
    syncLimit=2 //请求和应答的最大时间长度 2 * 2000ms
    server.1=zoo1:2888:3888 //server.1=192.168.111.101:2888:3888
    server.2=zoo2:2888:3888 //server.2=192.168.111.102:2888:3888
    server.3=zoo3:2888:3888 //server.3=192.168.111.103:2888:3888

其中,dataDir指定Zookeeper的数据文件目录;其中server.id=host:port:port,id是为每个Zookeeper节点的编号,保存在dataDir目录下的myid文件中,zoo1~zoo3表示各个Zookeeper节点的hostname,第一个port是用于连接leader的端口(仲裁通信),第二个port是用于leader选举的端口。

  • 在dataDir目录下创建myid文件,文件中只包含一行,且内容为该节点对应的server.id中的id编号。
  • 启动Zookeeper服务:
    通过bin/zkServer.sh脚本启动Zookeeper服务。 ./zkServer.sh start
    查看该节点是leader还是follow:zkServer.sh status

    zookeeper结构

    可以简单的说,zookeeper就是一个文件系统加一个通知机制

    文件系统

    下面我们看zookeeper是如何实现协同操作的,首先,zookeeper维护了一个类似文件系统的数据结构。可以看到有根目录,根目录下有若干个子目录。每一个子目录都被称为Znode。和文件系统一样我们能够自由地增加删除znode。Znode是可以存储数据的。Zookeeper中znode可以分为4种类型:持久节点、临时节点、持久有序节点、临时有序节点
    持久节点:
    即使znode的创建者不再属于系统,数据也可以保存下来而不丢失。例如:
    分配任务的主节点崩溃,但是从节点的任务分配情况也会保存下来。
    临时节点:
    主节点崩溃,该znode(master)和主节点一起消失–>系统检测到–>选举
    有序:
    唯一名称,创建顺序,task1,task2…

文件系统可以提供很多信息。针对一个znode,如果该znode是主节点的znode没有信息,那么说明还没有选出主节点。
其他znode节点:
workers作为父节点:其下znode节点为可用从节点信息
tasks作为父节点:其下znode节点为等待执行任务
assign作为父节点:其下znode节点为分配到某从节点任务

通知机制

客户端注册监听它关心的目录节点,当目录节点发生变化(数据改变,被删除,子目录节点增加删除)时,zookeeper会通知客户端。

leader节点负责写服务和数据的同步,follow节点负责读服务。Zookeeper不允许局部写入或读取znode节点的数据。

zookeeper shell客户端命令简介

zookeeper目录结构

客户端连接

1
2
//当前目录为zookeeper目录;且连接的是当前的zookeeper;连接其他服务器的例如:./bin/zkCli.sh –server 10.77.20.23:2181
$ ./bin/zkCli.sh

连接成功

ls 查看

1
[zk: localhost:2181(CONNECTED) 0] ls /


这个命令只能看到指定节点下第一级的所有子节点
第一次部署的zookeeper集群,默认在根节点”/“下面有一个叫做/zookeeper的保留节点。

create 创建节点并存储数据

1
[zk: localhost:2181(CONNECTED) 1] create /zhangsan "zhangsan"


create有以下属性: [-s] [-e]
-s 或 -e 分别指定节点特性:顺序 或 临时节点。默认情况下,不添加-s 或 -e 参数的,创建的是不带序号的持久节点。

get 获取内容

1
[zk: localhost:2181(CONNECTED) 10] get -s /zhangsan


之前版本的zookeeper不加-s能直接看到属性信息,该版本的不加-s只能看到数据内容
数据及各属性信息含义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//数据内容
"zhangsan"
//创建该节点的事务ID
cZxid = 0x43
ctime = Wed Sep 26 00:12:28 CST 2018
//最后一次更新该节点的事务ID
mZxid = 0x43
//最后一次更新该节点的时间
mtime = Wed Sep 26 00:12:28 CST 2018
pZxid = 0x43
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 10
numChildren = 0

其中numChildren为该节点下子节点的个数

set 重新设置

1
[zk: localhost:2181(CONNECTED) 11] set /zhangsan "zhangsan1"
1
[zk: localhost:2181(CONNECTED) 12] get -s /zhangsan

再次get并查看数据

1
2
3
4
5
6
7
8
9
10
11
12
"zhangsan1"
cZxid = 0x43
ctime = Wed Sep 26 00:12:28 CST 2018
mZxid = 0x46
mtime = Wed Sep 26 00:32:57 CST 2018
pZxid = 0x43
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 11
numChildren = 0

可以看到 数据内容、mZxid、mtime、dataVersion、dataLength属性发生了变化。

delete 删除节点

1
[zk: localhost:2181(CONNECTED) 13] delete /zhangsan

使用delete命令可以删除Zookeeper上的指定节点。
注意:想要删除某一个指定节点,该节点必须没有子节点存在。无法删除一个包含子节点的节点。

可以使用deleteall命令进行删除包含子节点的节点。

其他命令

1
[zk: localhost:2181(CONNECTED) 20] help

Java客户端API使用

目前没用过,用到再介绍;主要命令和客户端的一致

Zookeeper选举机制

Leader选举是保证分布式数据一致性的关键所在。当Zookeeper集群中的一台服务器出现以下两种情况之一时,需要进入Leader选举。

  • 服务器初始化启动。
  • 服务器运行期间无法和Leader保持连接。

服务器启动时期的Leader选举

  1. 每个Server发出一个投票。由于是初始情况,Server1和Server2都会将自己作为Leader服务器来进行投票,每次投票会包含所推举的服务器的myid和ZXID,使用(myid, ZXID)来表示,此时Server1的投票为(1, 0),Server2的投票为(2, 0),然后各自将这个投票发给集群中其他机器。

  2. 接受来自各个服务器的投票。集群的每个服务器收到投票后,首先判断该投票的有效性,如检查是否是本轮投票、是否来自LOOKING状态的服务器。

  3. 处理投票。针对每一个投票,服务器都需要将别人的投票和自己的投票进行PK,PK规则如下

    • 优先检查ZXID。ZXID比较大的服务器优先作为Leader。
    • 如果ZXID相同,那么就比较myid。myid较大的服务器作为Leader服务器。

      对于Server1而言,它的投票是(1, 0),接收Server2的投票为(2, 0),首先会比较两者的ZXID,均为0,再比较myid,此时Server2的myid最大,于是更新自己的投票为(2, 0),然后重新投票,对于Server2而言,其无须更新自己的投票,只是再次向集群中所有机器发出上一次投票信息即可。

  4. 统计投票。每次投票后,服务器都会统计投票信息,判断是否已经有过半机器接受到相同的投票信息,对于Server1、Server2而言,都统计出集群中已经有两台机器接受了(2, 0)的投票信息,此时便认为已经选出了Leader。

  5. 改变服务器状态。一旦确定了Leader,每个服务器就会更新自己的状态,如果是Follower,那么就变更为FOLLOWING,如果是Leader,就变更为LEADING。

简单例子
服务器1启动,无其他节点,得不到响应,选举状态为LOOKING状态
服务器2启动,没有历史数据,都选自己,2的id大,所以1同意2为leader,但是未过半数,服务器1/2状态依然为LOOKING
服务器3启动,1和2有历史数据,都选2,服务器3选自己,2的票数为2,2/3超过半数,2为leader
服务器4启动,发起投票,1/2/3根据历史数据选2,leader为2。

服务器运行时期的Leader选举

在Zookeeper运行期间,Leader与非Leader服务器各司其职,即便当有非Leader服务器宕机或新加入,此时也不会影响Leader,但是一旦Leader服务器挂了,那么整个集群将暂停对外服务,进入新一轮Leader选举,其过程和启动时期的Leader选举过程基本一致。假设正在运行的有Server1、Server2、Server3三台服务器,当前Leader是Server2,若某一时刻Leader挂了,此时便开始Leader选举。选举过程如下

  1. 变更状态。Leader挂后,余下的非Observer服务器都会讲自己的服务器状态变更为LOOKING,然后开始进入Leader选举过程。

  2. 每个Server会发出一个投票。在运行期间,每个服务器上的ZXID可能不同,此时假定Server1的ZXID为123,Server3的ZXID为122;在第一轮投票中,Server1和Server3都会投自己,产生投票(1, 123),(3, 122),然后各自将投票发送给集群中所有机器。

  3. 接收来自各个服务器的投票。与启动时过程相同。

  4. 处理投票。与启动时过程相同,此时,Server1将会成为Leader。

  5. 统计投票。与启动时过程相同。

  6. 改变服务器的状态。与启动时过程相同。

Leader选举算法分析

在3.4.0后的Zookeeper的版本只保留了TCP版本的FastLeaderElection选举算法。

当一台机器进入Leader选举时,当前集群可能会处于以下两种状态

  • 集群中已经存在Leader。

  • 集群中不存在Leader。

对于集群中已经存在Leader而言,此种情况一般都是某台机器启动得较晚,在其启动之前,集群已经在正常工作,对这种情况,该机器试图去选举Leader时,会被告知当前服务器的Leader信息,对于该机器而言,仅仅需要和Leader机器建立起连接,并进行状态同步即可。

而在集群中不存在Leader情况下则会相对复杂,其步骤如下:

  1. 第一次投票。无论哪种导致进行Leader选举,集群的所有机器都处于试图选举出一个Leader的状态,即LOOKING状态,LOOKING机器会向所有其他机器发送消息,该消息称为投票。投票中包含了SID(服务器的唯一标识)和ZXID(事务ID),(SID, ZXID)形式来标识一次投票信息。假定Zookeeper由5台机器组成,SID分别为1、2、3、4、5,ZXID分别为9、9、9、8、8,并且此时SID为2的机器是Leader机器,某一时刻,1、2所在机器出现故障,因此集群开始进行Leader选举。在第一次投票时,每台机器都会将自己作为投票对象,于是SID为3、4、5的机器投票情况分别为(3, 9),(4, 8), (5, 8)。

  2. 变更投票。每台机器发出投票后,也会收到其他机器的投票,每台机器会根据一定规则来处理收到的其他机器的投票,并以此来决定是否需要变更自己的投票,这个规则也是整个Leader选举算法的核心所在,其中术语描述如下

    • vote_sid:接收到的投票中所推举Leader服务器的SID。

    • vote_zxid:接收到的投票中所推举Leader服务器的ZXID。

    • self_sid:当前服务器自己的SID。

    • self_zxid:当前服务器自己的ZXID。

      每次对收到的投票的处理,都是对(vote_sid, vote_zxid)和(self_sid, self_zxid)对比的过程。
      规则一:如果vote_zxid大于self_zxid,就认可当前收到的投票,并再次将该投票发送出去。
      规则二:如果vote_zxid小于self_zxid,那么坚持自己的投票,不做任何变更。
      规则三:如果vote_zxid等于self_zxid,那么就对比两者的SID,如果vote_sid大于self_sid,那么就认可当前收到的投票,并再次将该投票发送出去。
      规则四:如果vote_zxid等于self_zxid,并且vote_sid小于self_sid,那么坚持自己的投票,不做任何变更。

      结合上面规则,给出下面的集群变更过程:

  3. 确定Leader。经过第二轮投票后,集群中的每台机器都会再次接收到其他机器的投票,然后开始统计投票,如果一台机器收到了超过半数的相同投票,那么这个投票对应的SID机器即为Leader。此时Server3将成为Leader。

由上面规则可知,通常那台服务器上的数据越新(ZXID会越大),其成为Leader的可能性越大,也就越能够保证数据的恢复。如果ZXID相同,则SID越大机会越大。

Leader选举实现细节

服务器状态

服务器具有四种状态,分别是LOOKING、FOLLOWING、LEADING、OBSERVING。

  • LOOKING:寻找Leader状态。当服务器处于该状态时,它会认为当前集群中没有Leader,因此需要进入Leader选举状态。

  • FOLLOWING:跟随者状态。表明当前服务器角色是Follower。

  • LEADING:领导者状态。表明当前服务器角色是Leader。

  • OBSERVING:观察者状态。表明当前服务器角色是Observer。

投票数据结构

每个投票中包含了两个最基本的信息,所推举服务器的SID和ZXID。
投票(Vote)在Zookeeper中包含字段如下:

  • id:被推举的Leader的SID。

  • zxid:被推举的Leader事务ID。

  • electionEpoch:逻辑时钟,用来判断多个投票是否在同一轮选举周期中,该值在服务端是一个自增序列,每次进入新一轮的投票后,都会对该值进行加1操作。

  • peerEpoch:被推举的Leader的epoch。每次leader选举完成之后,都会选举出一个新的peerEpoch,用来标记事务请求所属的轮次。

  • state:当前服务器的状态。

QuorumCnxManager:网络I/O

每台服务器在启动的过程中,会启动一个QuorumPeerManager,负责各台服务器之间的底层Leader选举过程中的网络通信。

  1. 消息队列。QuorumCnxManager内部维护了一系列的队列,用来保存接收到的、待发送的消息以及消息的发送器,除接收队列以外,其他队列都按照SID分组形成队列集合,如一个集群中除了自身还有3台机器,那么就会为这3台机器分别创建一个发送队列,互不干扰。

    • recvQueue:消息接收队列,用于存放那些从其他服务器接收到的消息。

    • queueSendMap:消息发送队列,用于保存那些待发送的消息,按照SID进行分组。

    • senderWorkerMap:发送器集合,每个SenderWorker消息发送器,都对应一台远程Zookeeper服务器,负责消息的发送,也按照SID进行分组。

    • lastMessageSent:最近发送过的消息,为每个SID保留最近发送过的一个消息。

  2. 建立连接。为了能够相互投票,Zookeeper集群中的所有机器都需要两两建立起网络连接。QuorumCnxManager在启动时会创建一个ServerSocket来监听Leader选举的通信端口(默认为3888)。开启监听后,Zookeeper能够不断地接收到来自其他服务器的创建连接请求,在接收到其他服务器的TCP连接请求时,会进行处理。为了避免两台机器之间重复地创建TCP连接,Zookeeper只允许SID大的服务器主动和其他机器建立连接,否则断开连接。在接收到创建连接请求后,服务器通过对比自己和远程服务器的SID值来判断是否接收连接请求,如果当前服务器发现自己的SID更大,那么会断开当前连接,然后自己主动和远程服务器建立连接。一旦连接建立,就会根据远程服务器的SID来创建相应的消息发送器SendWorker和消息接收器RecvWorker,并启动。

  3. 消息接收与发送。消息接收:由消息接收器RecvWorker负责,由于Zookeeper为每个远程服务器都分配一个单独的RecvWorker,因此,每个RecvWorker只需要不断地从这个TCP连接中读取消息,并将其保存到recvQueue队列中。消息发送:由于Zookeeper为每个远程服务器都分配一个单独的SendWorker,因此,每个SendWorker只需要不断地从对应的消息发送队列中获取出一个消息发送即可,同时将这个消息放入lastMessageSent中。在SendWorker中,一旦Zookeeper发现针对当前服务器的消息发送队列为空,那么此时需要从lastMessageSent中取出一个最近发送过的消息来进行再次发送,这是为了解决接收方在消息接收前或者接收到消息后服务器挂了,导致消息尚未被正确处理。同时,Zookeeper能够保证接收方在处理消息时,会对重复消息进行正确的处理。

FastLeaderElection:选举算法核心

  • 外部投票:特指其他服务器发来的投票。

  • 内部投票:服务器自身当前的投票。

  • 选举轮次:Zookeeper服务器Leader选举的轮次,即logicalclock。

  • PK:对内部投票和外部投票进行对比来确定是否需要变更内部投票。

选票管理
  • sendqueue:选票发送队列,用于保存待发送的选票。

  • recvqueue:选票接收队列,用于保存接收到的外部投票。

  • WorkerReceiver:选票接收器。其会不断地从QuorumCnxManager中获取其他服务器发来的选举消息,并将其转换成一个选票,然后保存到recvqueue中,在选票接收过程中,如果发现该外部选票的选举轮次小于当前服务器的,那么忽略该外部投票,同时立即发送自己的内部投票。

  • WorkerSender:选票发送器,不断地从sendqueue中获取待发送的选票,并将其传递到底层QuorumCnxManager中。

算法核心

  上图展示了FastLeaderElection模块是如何与底层网络I/O进行交互的。Leader选举的基本流程如下

  1. 自增选举轮次。Zookeeper规定所有有效的投票都必须在同一轮次中,在开始新一轮投票时,会首先对logicalclock进行自增操作。

  2. 初始化选票。在开始进行新一轮投票之前,每个服务器都会初始化自身的选票,并且在初始化阶段,每台服务器都会将自己推举为Leader。

  3. 发送初始化选票。完成选票的初始化后,服务器就会发起第一次投票。Zookeeper会将刚刚初始化好的选票放入sendqueue中,由发送器WorkerSender负责发送出去。

  4. 接收外部投票。每台服务器会不断地从recvqueue队列中获取外部选票。如果服务器发现无法获取到任何外部投票,那么就会立即确认自己是否和集群中其他服务器保持着有效的连接,如果没有连接,则马上建立连接,如果已经建立了连接,则再次发送自己当前的内部投票。

  5. 判断选举轮次。在发送完初始化选票之后,接着开始处理外部投票。在处理外部投票时,会根据选举轮次来进行不同的处理。

    • 外部投票的选举轮次大于内部投票。若服务器自身的选举轮次落后于该外部投票对应服务器的选举轮次,那么就会立即更新自己的选举轮次(logicalclock),并且清空所有已经收到的投票,然后使用初始化的投票来进行PK以确定是否变更内部投票。最终再将内部投票发送出去。

    • 外部投票的选举轮次小于内部投票。若服务器接收的外选票的选举轮次落后于自身的选举轮次,那么Zookeeper就会直接忽略该外部投票,不做任何处理,并返回步骤4。

    • 外部投票的选举轮次等于内部投票。此时可以开始进行选票PK。

  6. 选票PK。在进行选票PK时,符合任意一个条件就需要变更投票。

    • 若外部投票中推举的Leader服务器的选举轮次大于内部投票,那么需要变更投票。

    • 若选举轮次一致,那么就对比两者的ZXID,若外部投票的ZXID大,那么需要变更投票。

    • 若两者的ZXID一致,那么就对比两者的SID,若外部投票的SID大,那么就需要变更投票。

  7. 变更投票。经过PK后,若确定了外部投票优于内部投票,那么就变更投票,即使用外部投票的选票信息来覆盖内部投票,变更完成后,再次将这个变更后的内部投票发送出去。

  8. 选票归档。无论是否变更了投票,都会将刚刚收到的那份外部投票放入选票集合recvset中进行归档。recvset用于记录当前服务器在本轮次的Leader选举中收到的所有外部投票(按照服务队的SID区别,如{(1, vote1), (2, vote2)…})。

  9. 统计投票。完成选票归档后,就可以开始统计投票,统计投票是为了统计集群中是否已经有过半的服务器认可了当前的内部投票,如果确定已经有过半服务器认可了该投票,则终止投票。否则返回步骤4。

  10. 更新服务器状态。若已经确定可以终止投票,那么就开始更新服务器状态,服务器首选判断当前被过半服务器认可的投票所对应的Leader服务器是否是自己,若是自己,则将自己的服务器状态更新为LEADING,若不是,则根据具体情况来确定自己是FOLLOWING或是OBSERVING。

以上10个步骤就是FastLeaderElection的核心,其中步骤4-9会经过几轮循环,直到有Leader选举产生。

可能出现的问题:
选举轮次,也就是逻辑时钟,即logicalclock。这个值,不会频繁变化,一次选举,自增一次。一次选举过程中,可能包括多次投票,投票不涉及逻辑时钟的自增。
举例,初始情况下5台机器,sid分别为1、2、3、4、5,逻辑时钟都是0。依次启动后,开始选举,所有的机器逻辑时钟自增为1。经过多次投票,假设第三台机器为leader,其他4台机器为follower,此时5台机器的逻辑时钟都为1。
一般情况下,逻辑时钟应该都是相同的。但是,由于一些机器崩溃的问题,是可能出现逻辑时钟不一致的情况的。例如,上例中,sid=3的机器为leader。之后某一刻,sid为1、3的机器崩溃,zookeeper仍然可以正常对外提供服务。但需要重新选主,剩下的2、4、5重新投票选主,假设sid=5成为新的leader,逻辑时钟自增,由1变成2。之后某一刻,sid为5的机器奔溃,sid为1的机器复活,仍然有3台机器运行,zookeeper可以对外提供服务,但需要重新选主。重新选主,逻辑时钟自增,这时sid为2、4的机器的逻辑时钟是由2自增为3,而sid为1的机器的逻辑时钟是由1自增为2。这种情况下,就出现了逻辑时钟不一致的情况。这时,需要清除sid为1的机器内部的投票数据,因为这些投票数据都是过时的数据。

FastLeaderElection源码分析

类的继承关系
1
public class FastLeaderElection implements Election {}

说明:FastLeaderElection实现了Election接口,其需要实现接口中定义的lookForLeader方法和shutdown方法,其是标准的Fast Paxos算法的实现,各服务器之间基于TCP协议进行选举。

FastLeaderElection类的内部类

FastLeaderElection有三个较为重要的内部类,分别为Notification、ToSend、Messenger。

Notification类 

说明:Notification表示收到的选举投票信息(其他服务器发来的选举投票信息),其包含了被选举者的id、zxid、选举周期等信息,其buildMsg方法将选举信息封装至ByteBuffer中再进行发送。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
static public class Notification {
/*
* Format version, introduced in 3.4.6
*/
public final static int CURRENTVERSION = 0x1;
int version;
/*
* Proposed leader
*/
// 被推选的leader的id
long leader;
/*
* zxid of the proposed leader
*/
// 被推选的leader的事务id
long zxid;
/*
* Epoch
*/
// 推选者的选举周期
long electionEpoch;
/*
* current state of sender
*/
// 推选者的状态
QuorumPeer.ServerState state;
/*
* Address of sender
*/
// 推选者的id
long sid;
/*
* epoch of the proposed leader
*/
// 被推选者的选举周期
long peerEpoch;
@Override
public String toString() {
return new String(Long.toHexString(version) + " (message format version), "
+ leader + " (n.leader), 0x"
+ Long.toHexString(zxid) + " (n.zxid), 0x"
+ Long.toHexString(electionEpoch) + " (n.round), " + state
+ " (n.state), " + sid + " (n.sid), 0x"
+ Long.toHexString(peerEpoch) + " (n.peerEpoch) ");
}
}
static ByteBuffer buildMsg(int state,
long leader,
long zxid,
long electionEpoch,
long epoch) {
byte requestBytes[] = new byte[40];
ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
/*
* Building notification packet to send
*/
requestBuffer.clear();
requestBuffer.putInt(state);
requestBuffer.putLong(leader);
requestBuffer.putLong(zxid);
requestBuffer.putLong(electionEpoch);
requestBuffer.putLong(epoch);
requestBuffer.putInt(Notification.CURRENTVERSION);
return requestBuffer;
}

ToSend类 

说明:ToSend表示发送给其他服务器的选举投票信息,也包含了被选举者的id、zxid、选举周期等信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
static public class ToSend {
static enum mType {crequest, challenge, notification, ack}
ToSend(mType type,
long leader,
long zxid,
long electionEpoch,
ServerState state,
long sid,
long peerEpoch) {
this.leader = leader;
this.zxid = zxid;
this.electionEpoch = electionEpoch;
this.state = state;
this.sid = sid;
this.peerEpoch = peerEpoch;
}
/*
* Proposed leader in the case of notification
*/
//被推举的leader的id
long leader;
/*
* id contains the tag for acks, and zxid for notifications
*/
// 被推举的leader的最大事务id
long zxid;
/*
* Epoch
*/
// 推举者的选举周期
long electionEpoch;
/*
* Current state;
*/
// 推举者的状态
QuorumPeer.ServerState state;
/*
* Address of recipient
*/
// 推举者的id
long sid;
/*
* Leader epoch
*/
// 被推举的leader的选举周期
long peerEpoch;
}

Messenger类

Messenger包含了WorkerReceiver和WorkerSender两个内部类

WorkerReceiver
说明:WorkerReceiver实现了Runnable接口,是选票接收器。其会不断地从QuorumCnxManager中获取其他服务器发来的选举消息,并将其转换成一个选票,然后保存到recvqueue中,在选票接收过程中,如果发现该外部选票的选举轮次小于当前服务器的,那么忽略该外部投票,同时立即发送自己的内部投票。其是将QuorumCnxManager的Message转化为FastLeaderElection的Notification。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
class WorkerReceiver implements Runnable {
// 是否终止
volatile boolean stop;
// 服务器之间的连接
QuorumCnxManager manager;
WorkerReceiver(QuorumCnxManager manager) {
this.stop = false;
this.manager = manager;
}
public void run() {
// 响应
Message response;
while (!stop) { // 不终止
// Sleeps on receive
try{
// 从recvQueue中取出一个选举投票消息(从其他服务器发送过来)
response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
// 无投票,跳过
if(response == null) continue;
/*
* If it is from an observer, respond right away.
* Note that the following predicate assumes that
* if a server is not a follower, then it must be
* an observer. If we ever have any other type of
* learner in the future, we'll have to change the
* way we check for observers.
*/
if(!self.getVotingView().containsKey(response.sid)){ // 当前的投票者集合不包含服务器
// 获取自己的投票
Vote current = self.getCurrentVote();
// 构造ToSend消息
ToSend notmsg = new ToSend(ToSend.mType.notification,
current.getId(),
current.getZxid(),
logicalclock,
self.getPeerState(),
response.sid,
current.getPeerEpoch());
// 放入sendqueue队列,等待发送
sendqueue.offer(notmsg);
} else { // 包含服务器,表示接收到该服务器的选票消息
// Receive new message
if (LOG.isDebugEnabled()) {
LOG.debug("Receive new notification message. My id = "
+ self.getId());
}
/*
* We check for 28 bytes for backward compatibility
*/
// 检查向后兼容性
if (response.buffer.capacity() < 28) {
LOG.error("Got a short response: "
+ response.buffer.capacity());
continue;
}
// 若容量为28,则表示可向后兼容
boolean backCompatibility = (response.buffer.capacity() == 28);
// 设置buffer中的position、limit等属性
response.buffer.clear();
// Instantiate Notification and set its attributes
// 创建接收通知
Notification n = new Notification();
// State of peer that sent this message
// 推选者的状态
QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
switch (response.buffer.getInt()) { // 读取状态
case 0:
ackstate = QuorumPeer.ServerState.LOOKING;
break;
case 1:
ackstate = QuorumPeer.ServerState.FOLLOWING;
break;
case 2:
ackstate = QuorumPeer.ServerState.LEADING;
break;
case 3:
ackstate = QuorumPeer.ServerState.OBSERVING;
break;
default:
continue;
}
// 获取leader的id
n.leader = response.buffer.getLong();
// 获取zxid
n.zxid = response.buffer.getLong();
// 获取选举周期
n.electionEpoch = response.buffer.getLong();
n.state = ackstate;
// 设置服务器的id
n.sid = response.sid;
if(!backCompatibility){ // 不向后兼容
n.peerEpoch = response.buffer.getLong();
} else { // 向后兼容
if(LOG.isInfoEnabled()){
LOG.info("Backward compatibility mode, server id=" + n.sid);
}
// 获取选举周期
n.peerEpoch = ZxidUtils.getEpochFromZxid(n.zxid);
}
/*
* Version added in 3.4.6
*/
// 确定版本号
n.version = (response.buffer.remaining() >= 4) ?
response.buffer.getInt() : 0x0;
/*
* Print notification info
*/
if(LOG.isInfoEnabled()){
printNotification(n);
}
/*
* If this server is looking, then send proposed leader
*/
if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){ // 本服务器为LOOKING状态
// 将消息放入recvqueue中
recvqueue.offer(n);
/*
* Send a notification back if the peer that sent this
* message is also looking and its logical clock is
* lagging behind.
*/
if((ackstate == QuorumPeer.ServerState.LOOKING) // 推选者服务器为LOOKING状态
&& (n.electionEpoch < logicalclock)){ // 选举周期小于逻辑时钟
// 创建新的投票
Vote v = getVote();
// 构造新的发送消息(本服务器自己的投票)
ToSend notmsg = new ToSend(ToSend.mType.notification,
v.getId(),
v.getZxid(),
logicalclock,
self.getPeerState(),
response.sid,
v.getPeerEpoch());
// 将发送消息放置于队列,等待发送
sendqueue.offer(notmsg);
}
} else { // 推选服务器状态不为LOOKING
/*
* If this server is not looking, but the one that sent the ack
* is looking, then send back what it believes to be the leader.
*/
// 获取当前投票
Vote current = self.getCurrentVote();
if(ackstate == QuorumPeer.ServerState.LOOKING){ // 为LOOKING状态
if(LOG.isDebugEnabled()){
LOG.debug("Sending new notification. My id = " +
self.getId() + " recipient=" +
response.sid + " zxid=0x" +
Long.toHexString(current.getZxid()) +
" leader=" + current.getId());
}
ToSend notmsg;
if(n.version > 0x0) { // 版本号大于0
// 构造ToSend消息
notmsg = new ToSend(
ToSend.mType.notification,
current.getId(),
current.getZxid(),
current.getElectionEpoch(),
self.getPeerState(),
response.sid,
current.getPeerEpoch());
} else { // 版本号不大于0
// 构造ToSend消息
Vote bcVote = self.getBCVote();
notmsg = new ToSend(
ToSend.mType.notification,
bcVote.getId(),
bcVote.getZxid(),
bcVote.getElectionEpoch(),
self.getPeerState(),
response.sid,
bcVote.getPeerEpoch());
}
// 将发送消息放置于队列,等待发送
sendqueue.offer(notmsg);
}
}
}
} catch (InterruptedException e) {
System.out.println("Interrupted Exception while waiting for new message" +
e.toString());
}
}
LOG.info("WorkerReceiver is down");
}
}

其中,WorkerReceiver的主要逻辑在run方法中,其首先会从QuorumCnxManager中的recvQueue队列中取出其他服务器发来的选举消息,消息封装在Message数据结构中。然后判断消息中的服务器id是否包含在可以投票的服务器集合中,若不是,则会将本服务器的内部投票发送给该服务器,其流程如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
if(!self.getVotingView().containsKey(response.sid)){ // 当前的投票者集合不包含服务器
// 获取自己的投票
Vote current = self.getCurrentVote();
// 构造ToSend消息
ToSend notmsg = new ToSend(ToSend.mType.notification,
current.getId(),
current.getZxid(),
logicalclock,
self.getPeerState(),
response.sid,
current.getPeerEpoch());
// 放入sendqueue队列,等待发送
sendqueue.offer(notmsg);
}

若包含该服务器,则根据消息(Message)解析出投票服务器的投票信息并将其封装为Notification,然后判断当前服务器是否为LOOKING,若为LOOKING,则直接将Notification放入FastLeaderElection的recvqueue(区别于recvQueue)中。然后判断投票服务器是否为LOOKING状态,并且其选举周期小于当前服务器的逻辑时钟,则将本(当前)服务器的内部投票发送给该服务器,否则,直接忽略掉该投票。其流程如下 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){ // 本服务器为LOOKING状态
// 将消息放入recvqueue中
recvqueue.offer(n);
/*
* Send a notification back if the peer that sent this
* message is also looking and its logical clock is
* lagging behind.
*/
if((ackstate == QuorumPeer.ServerState.LOOKING) // 推选者服务器为LOOKING状态
&& (n.electionEpoch < logicalclock)){ // 选举周期小于逻辑时钟
// 创建新的投票
Vote v = getVote();
// 构造新的发送消息(本服务器自己的投票)
ToSend notmsg = new ToSend(ToSend.mType.notification,
v.getId(),
v.getZxid(),
logicalclock,
self.getPeerState(),
response.sid,
v.getPeerEpoch());
// 将发送消息放置于队列,等待发送
sendqueue.offer(notmsg);
}
}

若本服务器的状态不为LOOKING,则会根据投票服务器中解析的version信息来构造ToSend消息,放入sendqueue,等待发送,起流程如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
else { // 本服务器状态不为LOOKING
/*
* If this server is not looking, but the one that sent the ack
* is looking, then send back what it believes to be the leader.
*/
// 获取当前投票
Vote current = self.getCurrentVote();
if(ackstate == QuorumPeer.ServerState.LOOKING){ // 为LOOKING状态
if(LOG.isDebugEnabled()){
LOG.debug("Sending new notification. My id = " +
self.getId() + " recipient=" +
response.sid + " zxid=0x" +
Long.toHexString(current.getZxid()) +
" leader=" + current.getId());
}
ToSend notmsg;
if(n.version > 0x0) { // 版本号大于0
// 构造ToSend消息
notmsg = new ToSend(
ToSend.mType.notification,
current.getId(),
current.getZxid(),
current.getElectionEpoch(),
self.getPeerState(),
response.sid,
current.getPeerEpoch());
} else { // 版本号不大于0
// 构造ToSend消息
Vote bcVote = self.getBCVote();
notmsg = new ToSend(
ToSend.mType.notification,
bcVote.getId(),
bcVote.getZxid(),
bcVote.getElectionEpoch(),
self.getPeerState(),
response.sid,
bcVote.getPeerEpoch());
}
// 将发送消息放置于队列,等待发送
sendqueue.offer(notmsg);
}
}

WorkerSender
说明:WorkerSender也实现了Runnable接口,为选票发送器,其会不断地从sendqueue中获取待发送的选票,并将其传递到底层QuorumCnxManager中,其过程是将FastLeaderElection的ToSend转化为QuorumCnxManager的Message。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class WorkerSender implements Runnable {
// 是否终止
volatile boolean stop;
// 服务器之间的连接
QuorumCnxManager manager;
// 构造器
WorkerSender(QuorumCnxManager manager){
// 初始化属性
this.stop = false;
this.manager = manager;
}
public void run() {
while (!stop) { // 不终止
try {
// 从sendqueue中取出ToSend消息
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
// 若为空,则跳过
if(m == null) continue;
// 不为空,则进行处理
process(m);
} catch (InterruptedException e) {
break;
}
}
LOG.info("WorkerSender is down");
}
/**
* Called by run() once there is a new message to send.
*
* @param m message to send
*/
void process(ToSend m) {
// 构建消息
ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),
m.leader,
m.zxid,
m.electionEpoch,
m.peerEpoch);
// 发送消息
manager.toSend(m.sid, requestBuffer);
}
}

Messenger类的属性
说明:Messenger中维护了一个WorkerSender和WorkerReceiver,分别表示选票发送器和选票接收器。

1
2
3
4
5
6
protected class Messenger {
// 选票发送器
WorkerSender ws;
// 选票接收器
WorkerReceiver wr;
}

Messenger类的构造函数 
说明:会启动WorkerSender和WorkerReceiver,并设置为守护线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Messenger(QuorumCnxManager manager) {
// 创建WorkerSender
this.ws = new WorkerSender(manager);
// 新创建线程
Thread t = new Thread(this.ws,
"WorkerSender[myid=" + self.getId() + "]");
// 设置为守护线程
t.setDaemon(true);
// 启动
t.start();
// 创建WorkerReceiver
this.wr = new WorkerReceiver(manager);
// 创建线程
t = new Thread(this.wr,
"WorkerReceiver[myid=" + self.getId() + "]");
// 设置为守护线程
t.setDaemon(true);
// 启动
t.start();
}

FastLeaderElection类的属性

说明:其维护了服务器之间的连接(用于发送消息)、发送消息队列、接收消息队列、推选者的一些信息(zxid、id)、是否停止选举流程标识等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class FastLeaderElection implements Election {
// 日志
private static final Logger LOG = LoggerFactory.getLogger(FastLeaderElection.class);
/**
* Determine how much time a process has to wait
* once it believes that it has reached the end of
* leader election.
*/
// 完成Leader选举之后需要等待时长
final static int finalizeWait = 200;
/**
* Upper bound on the amount of time between two consecutive
* notification checks. This impacts the amount of time to get
* the system up again after long partitions. Currently 60 seconds.
*/
// 两个连续通知检查之间的最大时长
final static int maxNotificationInterval = 60000;
/**
* Connection manager. Fast leader election uses TCP for
* communication between peers, and QuorumCnxManager manages
* such connections.
*/
// 管理服务器之间的连接
QuorumCnxManager manager;
// 选票发送队列,用于保存待发送的选票
LinkedBlockingQueue<ToSend> sendqueue;
// 选票接收队列,用于保存接收到的外部投票
LinkedBlockingQueue<Notification> recvqueue;
// 投票者
QuorumPeer self;
Messenger messenger;
// 逻辑时钟
volatile long logicalclock; /* Election instance */
// 推选的leader的id
long proposedLeader;
// 推选的leader的zxid
long proposedZxid;
// 推选的leader的选举周期
long proposedEpoch;
// 是否停止选举
volatile boolean stop;
}

FastLeaderElection类的构造函数
1
2
3
4
5
6
7
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){
// 字段赋值
this.stop = false;
this.manager = manager;
// 初始化其他信息
starter(self, manager);
}

说明:构造函数中初始化了stop字段和manager字段,并且调用了starter函数,其源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
private void starter(QuorumPeer self, QuorumCnxManager manager) {
// 赋值,对Leader和投票者的ID进行初始化操作
this.self = self;
proposedLeader = -1;
proposedZxid = -1;
// 初始化发送队列
sendqueue = new LinkedBlockingQueue<ToSend>();
// 初始化接收队列
recvqueue = new LinkedBlockingQueue<Notification>();
// 创建Messenger,会启动接收器和发送器线程
this.messenger = new Messenger(manager);
}

说明:其完成在构造函数中未完成的部分,如会初始化FastLeaderElection的sendqueue和recvqueue,并且启动接收器和发送器线程。

FastLeaderElection类的核心函数分析
sendNotifications函数 

说明:其会遍历所有的参与者投票集合,然后将自己的选票信息发送至上述所有的投票者集合,其并非同步发送,而是将ToSend消息放置于sendqueue中,之后由WorkerSender进行发送。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void sendNotifications() {
for (QuorumServer server : self.getVotingView().values()) { // 遍历投票参与者集合
long sid = server.id;
// 构造发送消息
ToSend notmsg = new ToSend(ToSend.mType.notification,
proposedLeader,
proposedZxid,
logicalclock,
QuorumPeer.ServerState.LOOKING,
sid,
proposedEpoch);
if(LOG.isDebugEnabled()){
LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" +
Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock) +
" (n.round), " + sid + " (recipient), " + self.getId() +
" (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
}
// 将发送消息放置于队列
sendqueue.offer(notmsg);
}
}

totalOrderPredicate函数

说明:该函数将接收的投票与自身投票进行PK,查看是否消息中包含的服务器id是否更优,其按照epoch、zxid、id的优先级进行PK。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
if(self.getQuorumVerifier().getWeight(newId) == 0){ // 使用计票器判断当前服务器的权重是否为0
return false;
}
/*
* We return true if one of the following three cases hold:
* 1- New epoch is higher
* 2- New epoch is the same as current epoch, but new zxid is higher
* 3- New epoch is the same as current epoch, new zxid is the same
* as current zxid, but server id is higher.
*/
// 1. 判断消息里的epoch是不是比当前的大,如果大则消息中id对应的服务器就是leader
// 2. 如果epoch相等则判断zxid,如果消息里的zxid大,则消息中id对应的服务器就是leader
// 3. 如果前面两个都相等那就比较服务器id,如果大,则其就是leader
return ((newEpoch > curEpoch) ||
((newEpoch == curEpoch) &&
((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}

termPredicate函数

说明:该函数用于判断Leader选举是否结束,即是否有一半以上的服务器选出了相同的Leader,其过程是将收到的选票与当前选票进行对比,选票相同的放入同一个集合,之后判断选票相同的集合是否超过了半数。 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected boolean termPredicate(
HashMap<Long, Vote> votes,
Vote vote) {
HashSet<Long> set = new HashSet<Long>();
/*
* First make the views consistent. Sometimes peers will have
* different zxids for a server depending on timing.
*/
for (Map.Entry<Long,Vote> entry : votes.entrySet()) { // 遍历已经接收的投票集合
if (vote.equals(entry.getValue())){ // 将等于当前投票的项放入set
set.add(entry.getKey());
}
}
//统计set,查看投某个id的票数是否超过一半
return self.getQuorumVerifier().containsQuorum(set);
}

checkLeader函数

说明:该函数检查是否已经完成了Leader的选举,此时Leader的状态应该是LEADING状态。 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
protected boolean checkLeader(
HashMap<Long, Vote> votes,
long leader,
long electionEpoch){
boolean predicate = true;
/*
* If everyone else thinks I'm the leader, I must be the leader.
* The other two checks are just for the case in which I'm not the
* leader. If I'm not the leader and I haven't received a message
* from leader stating that it is leading, then predicate is false.
*/
if(leader != self.getId()){ // 自己不为leader
if(votes.get(leader) == null) predicate = false; // 还未选出leader
else if(votes.get(leader).getState() != ServerState.LEADING) predicate = false; // 选出的leader还未给出ack信号,其他服务器还不知道leader
} else if(logicalclock != electionEpoch) { // 逻辑时钟不等于选举周期
predicate = false;
}
return predicate;
}

lookForLeader函数  
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
public Vote lookForLeader() throws InterruptedException {
try {
self.jmxLeaderElectionBean = new LeaderElectionBean();
MBeanRegistry.getInstance().register(
self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
self.jmxLeaderElectionBean = null;
}
if (self.start_fle == 0) {
self.start_fle = System.currentTimeMillis();
}
try {
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = finalizeWait;
synchronized(this){
// 更新逻辑时钟,每进行一轮选举,都需要更新逻辑时钟
logicalclock++;
// 更新选票
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
LOG.info("New election. My id = " + self.getId() +
", proposed zxid=0x" + Long.toHexString(proposedZxid));
// 想其他服务器发送自己的选票
sendNotifications();
/*
* Loop in which we exchange notifications until we find a leader
*/
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){ // 本服务器状态为LOOKING并且还未选出leader
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*/
// 从recvqueue接收队列中取出投票
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
if(n == null){ // 如果没有收到足够多的选票,则发送选票
if(manager.haveDelivered()){ // manager已经发送了所有选票消息
// 向所有其他服务器发送消息
sendNotifications();
} else { // 还未发送所有消息
// 连接其他每个服务器
manager.connectAll();
}
/*
* Exponential backoff
*/
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut < maxNotificationInterval?
tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out: " + notTimeout);
}
else if(self.getVotingView().containsKey(n.sid)) { // 投票者集合中包含接收到消息中的服务器id
/*
* Only proceed if the vote comes from a replica in the
* voting view.
*/
switch (n.state) { // 确定接收消息中的服务器状态
case LOOKING:
// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock) { // 其选举周期大于逻辑时钟
// 重新赋值逻辑时钟
logicalclock = n.electionEpoch;
recvset.clear();
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { // 选出较优的服务器
// 更新选票
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else { // 无法选出较优的服务器
// 更新选票
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
// 发送消息
sendNotifications();
} else if (n.electionEpoch < logicalclock) { // 选举周期小于逻辑时钟,不做处理
if(LOG.isDebugEnabled()){
LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
+ Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock));
}
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) { // 等于,并且能选出较优的服务器
// 更新选票
updateProposal(n.leader, n.zxid, n.peerEpoch);
// 发送消息
sendNotifications();
}
if(LOG.isDebugEnabled()){
LOG.debug("Adding vote: from=" + n.sid +
", proposed leader=" + n.leader +
", proposed zxid=0x" + Long.toHexString(n.zxid) +
", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
}
// recvset用于记录当前服务器在本轮次的Leader选举中收到的所有外部投票
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock, proposedEpoch))) { // 若能选出leader
// Verify if there is any change in the proposed leader
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){ // 遍历已经接收的投票集合
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){ // 能够选出较优的服务器
recvqueue.put(n);
break;
}
}
/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid,
logicalclock,
proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
case OBSERVING:
LOG.debug("Notification from observer: " + n.sid);
break;
case FOLLOWING:
case LEADING: // 处于LEADING状态
/*
* Consider all notifications from the same epoch
* together.
*/
if(n.electionEpoch == logicalclock){ // 与逻辑时钟相等
// 将该服务器和选票信息放入recvset中
recvset.put(n.sid, new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch));
if(ooePredicate(recvset, outofelection, n)) { // 判断是否完成了leader选举
// 设置本服务器的状态
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
// 创建投票信息
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
/*
* Before joining an established ensemble, verify
* a majority is following the same leader.
*/
outofelection.put(n.sid, new Vote(n.version,
n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch,
n.state));
if(ooePredicate(outofelection, outofelection, n)) {
synchronized(this){
logicalclock = n.electionEpoch;
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",
n.state, n.sid);
break;
}
} else {
LOG.warn("Ignoring notification from non-cluster member " + n.sid);
}
}
return null;
} finally {
try {
if(self.jmxLeaderElectionBean != null){
MBeanRegistry.getInstance().unregister(
self.jmxLeaderElectionBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
}
}

说明:该函数用于开始新一轮的Leader选举,其首先会将逻辑时钟自增,然后更新本服务器的选票信息(初始化选票),之后将选票信息放入sendqueue等待发送给其他服务器,其流程如下 

1
2
3
4
5
6
7
8
9
10
11
synchronized(this){
// 更新逻辑时钟,每进行一轮新的leader选举,都需要更新逻辑时钟
logicalclock++;
// 更新选票(初始化选票)
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
LOG.info("New election. My id = " + self.getId() +
", proposed zxid=0x" + Long.toHexString(proposedZxid));
// 向其他服务器发送自己的选票(已更新的选票)
sendNotifications();

之后每台服务器会不断地从recvqueue队列中获取外部选票。如果服务器发现无法获取到任何外部投票,就立即确认自己是否和集群中其他服务器保持着有效的连接,如果没有连接,则马上建立连接,如果已经建立了连接,则再次发送自己当前的内部投票,其流程如下  

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 从recvqueue接收队列中取出投票
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
if(n == null){ // 无法获取选票
if(manager.haveDelivered()){ // manager已经发送了所有选票消息(表示有连接)
// 向所有其他服务器发送消息
sendNotifications();
} else { // 还未发送所有消息(表示无连接)
// 连接其他每个服务器
manager.connectAll();
}
/*
* Exponential backoff
*/
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut < maxNotificationInterval?
tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out: " + notTimeout);
}

在发送完初始化选票之后,接着开始处理外部投票。在处理外部投票时,会根据选举轮次来进行不同的处理。  

  • 外部投票的选举轮次大于内部投票。若服务器自身的选举轮次落后于该外部投票对应服务器的选举轮次,那么就会立即更新自己的选举轮次(logicalclock),并且清空所有已经收到的投票,然后使用初始化的投票来进行PK以确定是否变更内部投票。最终再将内部投票发送出去。

  • 外部投票的选举轮次小于内部投票。若服务器接收的外选票的选举轮次落后于自身的选举轮次,那么Zookeeper就会直接忽略该外部投票,不做任何处理。

  • 外部投票的选举轮次等于内部投票。此时可以开始进行选票PK,如果消息中的选票更优,则需要更新本服务器内部选票,再发送给其他服务器。

之后再对选票进行归档操作,无论是否变更了投票,都会将刚刚收到的那份外部投票放入选票集合recvset中进行归档,其中recvset用于记录当前服务器在本轮次的Leader选举中收到的所有外部投票,然后开始统计投票,统计投票是为了统计集群中是否已经有过半的服务器认可了当前的内部投票,如果确定已经有过半服务器认可了该投票,然后再进行最后一次确认,判断是否又有更优的选票产生,若无,则终止投票,然后最终的选票,其流程如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
if (n.electionEpoch > logicalclock) { // 其选举周期大于逻辑时钟
// 重新赋值逻辑时钟
logicalclock = n.electionEpoch;
// 清空所有接收到的所有选票
recvset.clear();
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { // 进行PK,选出较优的服务器
// 更新选票
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else { // 无法选出较优的服务器
// 更新选票
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
// 发送本服务器的内部选票消息
sendNotifications();
} else if (n.electionEpoch < logicalclock) { // 选举周期小于逻辑时钟,不做处理,直接忽略
if(LOG.isDebugEnabled()){
LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
+ Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock));
}
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) { // PK,选出较优的服务器
// 更新选票
updateProposal(n.leader, n.zxid, n.peerEpoch);
// 发送消息
sendNotifications();
}
if(LOG.isDebugEnabled()){
LOG.debug("Adding vote: from=" + n.sid +
", proposed leader=" + n.leader +
", proposed zxid=0x" + Long.toHexString(n.zxid) +
", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
}
// recvset用于记录当前服务器在本轮次的Leader选举中收到的所有外部投票
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock, proposedEpoch))) { // 若能选出leader
// Verify if there is any change in the proposed leader
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){ // 遍历已经接收的投票集合
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){ // 选票有变更,比之前提议的Leader有更好的选票加入
// 将更优的选票放在recvset中
recvqueue.put(n);
break;
}
}
/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) { // 表示之前提议的Leader已经是最优的
// 设置服务器状态
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
// 最终的选票
Vote endVote = new Vote(proposedLeader,
proposedZxid,
logicalclock,
proposedEpoch);
// 清空recvqueue队列的选票
leaveInstance(endVote);
// 返回选票
return endVote;
}
}

若选票中的服务器状态为FOLLOWING或者LEADING时,其大致步骤会判断选举周期是否等于逻辑时钟,归档选票,是否已经完成了Leader选举,设置服务器状态,修改逻辑时钟等于选举周期,返回最终选票,其流程如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
if(n.electionEpoch == logicalclock){ // 与逻辑时钟相等
// 将该服务器和选票信息放入recvset中
recvset.put(n.sid, new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch));
if(ooePredicate(recvset, outofelection, n)) { // 已经完成了leader选举
// 设置本服务器的状态
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
// 最终的选票
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
// 清空recvqueue队列的选票
leaveInstance(endVote);
return endVote;
}
}
/*
* Before joining an established ensemble, verify
* a majority is following the same leader.
*/
outofelection.put(n.sid, new Vote(n.version,
n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch,
n.state));
if(ooePredicate(outofelection, outofelection, n)) { // 已经完成了leader选举
synchronized(this){
// 设置逻辑时钟
logicalclock = n.electionEpoch;
// 设置状态
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
// 最终选票
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
// 清空recvqueue队列的选票
leaveInstance(endVote);
// 返回选票
return endVote;
}

Zookeeper的典型应用场景:

统一命名服务

在分布式系统中,通过使用命名服务,客户端应用能够根据指定名字来获取资源或服务的地址,提供者等信息。被命名的实体通常可以是集群中的机器,提供的服务地址,远程对象等等——这些我们都可以统称他们为名字(Name)

配置管理

配置的管理在分布式应用环境中很常见,例如同一个应用系统需要多台 PC Server 运行,但是它们运行的应用系统的某些配置项是相同的,如果要修改这些相同的配置项,那么就必须同时修改每台运行这个应用系统的 PC Server,这样非常麻烦而且容易出错。

像这样的配置信息完全可以交给 Zookeeper 来管理,将配置信息保存在 Zookeeper 的某个目录节点中,然后将所有需要修改的应用机器监控配置信息的状态,一旦配置信息发生变化,每台应用机器就会收到 Zookeeper 的通知,然后从 Zookeeper 获取新的配置信息应用到系统中

集群管理

Zookeeper 能够很容易的实现集群管理的功能,如有多台 Server 组成一个服务集群,那么必须要一个“总管”知道当前集群中每台机器的服务状态,一旦有机器不能提供服务,集群中其它集群必须知道,从而做出调整重新分配服务策略。同样当增加集群的服务能力时,就会增加一台或多台 Server,同样也必须让“总管”知道。

共享锁

共享锁在同一个进程中很容易实现,但是在跨进程或者在不同 Server 之间就不好实现了。Zookeeper 却很容易实现这个功能,实现方式也是需要获得锁的 Server 创建一个 EPHEMERAL_SEQUENTIAL 目录节点,然后调用 getChildren方法获取当前的目录节点列表中最小的目录节点是不是就是自己创建的目录节点,如果正是自己创建的,那么它就获得了这个锁,如果不是那么它就调用 exists(String path, boolean watch) 方法并监控 Zookeeper 上目录节点列表的变化,一直到自己创建的节点是列表中最小编号的目录节点,从而获得锁,释放锁很简单,只要删除前面它自己所创建的目录节点就行了。

负载均衡

在分布式环境中,为了保证高可用性,通常同一个应用或同一个服务的提供方都会部署多份,达到对等服务。而消费者就须要在这些对等的服务器中选择一个来执行相关的业务逻辑。

分布式通知/协调

ZooKeeper中特有watcher注册与异步通知机制,能够很好的实现分布式环境下不同系统之间的通知与协调,实现对数据变更的实时处理。使用方法通常是不同系统都对ZK上同一个znode进行注册,监听znode的变化(包括znode本身内容及子节点的),其中一个系统update了znode,那么另一个系统能够收到通知,并作出相应处理

实际应用

Zookeeper实现Dubbo注册中心

Dubbo的Provider,Consumer在启动时都会创建一个注册中心,注册中心可以选择Zookeeper,Redis。常用的是Zookeeper。
Dubbo在Zookeeper上注册的节点目录(假设接口名称是:com.bob.dubbo.service.CityDubboService):

Dubbo启动时,Consumer和Provider都会把自身的URL格式化为字符串,然后注册到zookeeper相应节点下,作为一个临时节点,当连断开时,节点被删除。
Consumer在启动时,不仅仅会注册自身到 …/consumers/目录下,同时还会订阅…/providers目录,实时获取其上Provider的URL字符串信息。

  • Provider和Consumer向Zookeeper注册临时节点,当连接断开时删除相应的注册节点。
  • Consumer订阅Providers节点的子节点,实时感知Provider的变化情况,实时同步自身的Invoker对象,保证RPC的可用性。

相关代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
public class ZookeeperRegistry extends FailbackRegistry {
......
/**
* 默认端口
*/
private final static int DEFAULT_ZOOKEEPER_PORT = 2181;
/**
* 默认 Zookeeper 根节点
*/
private final static String DEFAULT_ROOT = "dubbo";
/**
* Zookeeper 根节点
*/
private final String root;
/**
* Service 接口全名集合
*/
private final Set<String> anyServices = new ConcurrentHashSet<String>();
/**
* 监听器集合
*/
private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners
= new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>();
/**
* Zookeeper 客户端
*/
private final ZookeeperClient zkClient;
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url); // 调用父类FailbackRegistry的构造函数
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
// 获得 Zookeeper 根节点, 未指定 "group" 参数时为 dubbo
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); // `url.parameters.group` 参数值
if (!group.startsWith(Constants.PATH_SEPARATOR)) {
group = Constants.PATH_SEPARATOR + group;
}
this.root = group; // root = "/dubbo"
// 创建 Zookeeper Client
zkClient = zookeeperTransporter.connect(url);
// 添加 StateListener 对象。该监听器,在重连时,调用恢复方法。
zkClient.addStateListener(new StateListener() {
@Override
public void stateChanged(int state) {
if (state == RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
}
}
public abstract class FailbackRegistry extends AbstractRegistry {
......
/**
* 失败发起注册失败的 URL 集合
*/
private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();
/**
* 失败取消注册失败的 URL 集合
*/
private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>();
/**
* 失败发起订阅失败的监听器集合
*/
private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
/**
* 失败取消订阅失败的监听器集合
*/
private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
/**
* 失败通知通知的 URL 集合
*/
private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>();
public FailbackRegistry(URL url) {
super(url);
// 重试频率,单位:毫秒 ,默认 5*1000
int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
// 创建失败重试定时器
this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
public void run() {
// Check and connect to the registry
try {
retry();
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
}
}
}, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
}
/**
* 重试
*/
// Retry the failed actions
protected void retry() {
// 重试执行注册
if (!failedRegistered.isEmpty()) {
......
for (URL url : failed) {
try {
// 执行注册
doRegister(url);
// 移除出 `failedRegistered`
failedRegistered.remove(url);
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
}
// 重试执行取消注册
if (!failedUnregistered.isEmpty()) {
......
for (URL url : failed) {
try {
// 执行取消注册
doUnregister(url);
// 移除出 `failedUnregistered`
failedUnregistered.remove(url);
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to retry unregister " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
}
// 重试执行订阅
if (!failedSubscribed.isEmpty()) {
......
for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {
URL url = entry.getKey();
Set<NotifyListener> listeners = entry.getValue();
for (NotifyListener listener : listeners) {
try {
// 执行订阅
doSubscribe(url, listener);
// 移除监听器
listeners.remove(listener);
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
}
}
// 重试执行取消订阅
if (!failedUnsubscribed.isEmpty()) {
......
for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {
URL url = entry.getKey();
Set<NotifyListener> listeners = entry.getValue();
for (NotifyListener listener : listeners) {
try {
// 执行取消订阅
doUnsubscribe(url, listener);
// 移除监听器
listeners.remove(listener);
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
}
}
}
}

ZookeeperRegistry 在实例化时,调用父类构造函数。在父类构造函数中,会创建一个定时任务,每隔5S执行retry( ) 方法。

在retry( ) 方法中,重试那些失败的动作。重试的动作包括:

  • Provider向zookeeper注册自身的url,生成一个临时的znode
  • Provider从Dubbo容器中退出,停止提供RPC调用。也就是移除zookeeper内自身url对应的znode
  • Consumer订阅 ” /dubbo/..Service/providers” 目录的子节点,生成ChildListener
  • Consumer从Dubbo容器中退出,移除之前创建的ChildListener

为什么如此设置? 主要是和zookeeper的通信机制有关的。当zookeeper的Client和Server连接断开,或者心跳超时,那么Server会将相应Client注册的临时节点删除,当然注册的Listener也相应删除。

而Provider和Consumer注册的URL就属于临时节点,当连接断开时,Dubbo注册了zookeeper的StateListener,也就是状态监听器,当Dubbo里的zookeeper Client和Server重新连接上时,将之前注册的的URL添加入这几个失败集合中,然后重新注册和订阅。

看ZookeeperRegistry 的构造函数,其添加了一个StateListener:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class ZookeeperRegistry extends FailbackRegistry {
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
......
// 添加 StateListener 对象。该监听器,在重连时,调用恢复方法。
zkClient.addStateListener(new StateListener() {
@Override
public void stateChanged(int state) {
if (state == RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
}
}
public abstract class FailbackRegistry extends AbstractRegistry {
protected void recover() throws Exception {
// register 恢复注册,添加到 `failedRegistered` ,定时重试
Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
if (!recoverRegistered.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover register url " + recoverRegistered);
}
for (URL url : recoverRegistered) {
failedRegistered.add(url);
}
}
// subscribe 恢复订阅,添加到 `failedSubscribed` ,定时重试
Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
if (!recoverSubscribed.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover subscribe url " + recoverSubscribed.keySet());
}
for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
URL url = entry.getKey();
for (NotifyListener listener : entry.getValue()) {
addFailedSubscribed(url, listener);
}
}
}
}
}

ZookeeperRegistry 构造函数中为zookeeper的操作客户端添加了一个状态监听器 StateListener,当重新连接时( 重新连接意味着之前连接断开了 ),将已经注册和订阅的URL添加到失败集合中,定时重试,也就是重新注册和订阅。

zookeeper Client与Server断开连接后,会定时的不断尝试重新连接,当连接成功后就会触发一个Event,Dubbo注册了CONNECTED状态的监听器,当连接成功后重新注册和订阅。

zookeeper Server宕机了,Dubbo里的Client并没有对此事件做什么响应,当然其内部的zkClient会不停地尝试连接Server。当Zookeeper Server宕机了不影响Dubbo里已注册的组件的RPC调用,因为已经通过URL生成了Invoker对象,这些对象还在Dubbo容器内。当然因为注册中心宕机了,肯定不能感知到新的Provider。同时因为在之前订阅获得的Provider信息已经持久化到本地文件,当Dubbo应用重启时,如果zookeeper注册中心不可用,会加载缓存在文件内的Provider信息,还是能保证服务的高可用。

Consumer会一直维持着对Provider的ChildListener,监听Provider的实时数据信息。当Providers节点的子节点发生变化时,实时通知Dubbo,更新URL,同时更新Dubbo容器内的Consumer Invoker对象,只要是订阅成功均会实时同步Provider,更新Invoker对象,无论是第一次订阅还是断线重连后的订阅:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class ZookeeperRegistry extends FailbackRegistry {
protected void doSubscribe(final URL url, final NotifyListener listener) {
try {
// 处理所有 Service 层的发起订阅,例如监控中心的订阅
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
......
// 处理指定 Service 层的发起订阅,例如服务消费者的订阅
} else {
// 子节点数据数组
List<URL> urls = new ArrayList<URL>();
// 循环分类数组 , router, configurator, provider
for (String path : toCategoriesPath(url)) {
// 获得 url 对应的监听器集合
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) { // 不存在,进行创建
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
// 获得 ChildListener 对象
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) { // 不存在子目录的监听器,进行创建 ChildListener 对象
// 订阅父级目录, 当有子节点发生变化时,触发此回调函数
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
// 变更时,调用 `#notify(...)` 方法,回调 NotifyListener
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
}
// 创建 Type 节点。该节点为持久节点。
zkClient.create(path, false);
// 向 Zookeeper ,PATH 节点,发起订阅,返回此节点下的所有子元素 path : /根节点/接口全名/providers, 比如 : /dubbo/com.bob.service.CityService/providers
List<String> children = zkClient.addChildListener(path, zkListener);
// 添加到 `urls` 中
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 首次全量数据获取完成时,调用 `#notify(...)` 方法,回调 NotifyListener, 在这一步从连接Provider,实例化Invoker
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
}

订阅获取Providers的最新URL字符串,调用notify(…)方法,通知监听器,最终会执行如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
private volatile List<Configurator> configurators;
private volatile Map<String, Invoker<T>> urlInvokerMap;
private volatile Map<String, List<Invoker<T>>> methodInvokerMap;
private volatile Set<URL> cachedInvokerUrls;
private void refreshInvoker(List<URL> invokerUrls) {
// 从zookeeper获取到的url已经没有合适的了,在订阅返回为空时,会手动生成一个 EMPTY_PROTOCOL 的 url
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // Forbid to access
this.methodInvokerMap = null; // Set the method invoker map to null
destroyAllInvokers(); // Close all invokers
} else {
this.forbidden = false; // Allow to access
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<URL>();
this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
}
if (invokerUrls.isEmpty()) {
return;
}
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
// state change
// If the calculation is wrong, it is not processed.
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
logger.error(new IllegalStateException(
"urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
return;
}
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
try {
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
}

更新Dubbo内的Invoker相关数据,保证Consumer能实时感知到Provider的信息,保证PRC调用不会出错。

以上就是Dubbo内Zookeeper注册中心的实现过程。

Zookeeper的优势

1.源代码开放
2.已经被证实是高性能,易用稳定的工业级产品
3.有着广泛的应用:Hadoop、HBase、kafaka、Storm、Solr

「真诚赞赏,手留余香」