Zookeeper是一个经典的分布式数据一致性的解决方案致力于为分布式系统提供一个高性能、高可用且拥有严格顺序访问控制能力的分布式协调存储服务
优点
维护配置信息
随着分布式系统的兴起,由于许多服务都需要使用到配置文件,因此必须保证该配置服务的高可用性和各台服务器上配置数据的一致性。通常会将配置文件部署在一个集群上,然而一个集群动辄上千台服务器,此时如果再一台台服务器逐个修改配置文件那将是非常繁琐且危险的的操作,因此就需要—种服务,能够高效快速且可靠地完成配置项的更改等操作,并能够保证各配置项在每台服务器上的数据—致性。zookeeper就可以提供这样一种服务,其使用Zab这种一致性协议来保证一致性
分部式锁服务
一个集群是一个分布式系统,由多台服务器组成。为了提高并发度和可靠性,多台服务器上运行着同一种服务。当多个服务在运行时就需要协调各服务的进度,有时候需要保证当某个服务在进行某个操作时,其他的服务都不能进行该操作,即对该操作进行加锁,如果当前机器挂掉后,释放锁并fail over到其他的机器继续执行该服务
集群管理
一个集群有时会因为各种软硬件故障或者网络故障,出现某些服务器挂掉而被移除集群,而某些服务器加入到集群中的情况,zookeeper会将这些服务器加入/移出的情况通知给集群中的其他正常工作的服务器,以及时调整存储和计算等任务的分配和执行等,此外zookeeper还会对故障的服务器做出诊断并尝试修复
生成分布式唯一ID
在单库单表型系统中,通常可以使用数据库字段自带的auto_increment属性来自动为每条记录生成一个唯一的ID。但是分库分表后,就无法在依靠数据库的auto_incrernent属性来唯一标识—条记录了,此时可以用zookeeper在分布式环境下生成全局唯一ID
数据模型
zookeeper的数据节点可以视为树状结构,书中的各节点被称为znode,一个znode可以有多个子节点。使用路径path来定位某个znode。znode兼具文件
和目录
两种功能,即像文件一样维护者数据、元信息、ACL、时间戳的数据结构,又可以向目录一样作为路径的标识一部分
节点数据 zNode就像key,value的关系
节点的子节点 children
节点的状态 用来描述节点的创建、修改记录、cZxid、ctime等
节点状态属性
# 获取该节点的节点状态
stat path
cZxid 数据节点创建时的事务ID
对节点的所有写操作都会让zookeeper开启一个事务,并未每一个事务维护一个事务ID
ctime 数据节点创建的时间
mZxid 数据节点最后一次更新的事务ID
mtime 数据节点最后一次更新的时间
pZxid 数据节点的子节点组后一次被修改的事务ID
cversion 子节点的更改次数
dataVersion 节点数据的更改次数
aclVersion 节点的ACL更改次数
当前节点的权限列表被修改的次数
ephemeralOwner 是否是临时节点
如果节点是临时节点,则创建该节点会话的SessionID,如果是持久形节点则为0
连接zookeeper会建立一个会话,当会话结束临时节点将会自动删除,并且临时节点不允许拥有子节点
dataLength 数据内容的长度(字节)
numChildren 数据节点当前的子节点个数
安装
zookeeper的使用需要java环境,这里使用的是docker所以不需要考虑这个问题
不同版本的zookeeper配置文件目录不同,请参照dockerhub
非docker需要安装后启动/bin/zkServer.sh start启动(status查看状态,stop停止)
zookeeper无法直接通过-v挂载配置文件,否则会导致
/docker-entrypoint.sh: line 15: /conf/zoo.cfg: Permission denied
docker run -d --privileged=true \
--name zookeeper \
-p 2181:2181 \
--restart=always \
zookeeper:3.4.10
docker exec -it [容器id] bash
# 执行zkCli.sh
/zookeeper-3.4.10/bin/zkCli.sh
# 查看服务状态
/zookeeper-3.4.10/bin/zkServer.sh status
修改zoo.cfg
# 对外暴露的服务端口号
clientPort=2181
# zookeeper数据的内存快照、即事务日志文件
dataDir=/data/zookeeper/data
dataLogDir=/data/zookeeper/log
节点命令
新增节点
path为键(路径) data为值
键必须由 / 开始
# -s为有序节点 -e为临时节点
create [-s] [-e] path data
# 通过键获取值
get path
不使用-e为持久化节点,即使退出服务数据依然存在
使用-e为临时节点,服务结束数据失效
-s会为节点名增加序列号,之后需要通过新的节点名获取值
持久化有序节点可以为分布式环境创建唯一ID
更新节点
path为键 data为修改后的值
set path data
# 校验修改
set path data [dataVersion]
基于dataVersion修改,若和当前节点的数据不符时,则修改不会生效(乐观锁)
dataVersion可以通过stat命令查看
高版本需要改为set path -v [dataVersion] data
删除节点
delete path [dataVersion]
同样若通过dataVersion删除,若不符合则删除不会生效
并且节点必须为空才能删除
意思是若存在/hadoop/node,则delete /haddop无法使用
强制删除
删除节点和所有子节点
rmr path
节点列表
# 查看该节点所有子节点
ls path
# 同时还可以查看该节点的状态
ls2 path
监听器命令
一个监听器的使用只能使用一次,也就是说下列命令只有第一次会生效
get path和stat path
# 为当前客户端的该节点注册监听事件
get path watch
stat path watch
当其它客户端修改了该节点后,会显示以下 可以通过该特性捕获最新的配置信息
WatchedEvent state:SyncConnected type:NodeDataChanged path: [节点名]
ls和ls2 path
# 监听当前节点的子节点
ls patch watch
ls2 patch watch
当其它客户端修改创建了该节点的子节点,会显示以下
WatchedEvent state:SyncConnected type:NodeChildrenChanged path: [节点名]
ACL权限控制
Access Control List访问控制列表 一个节点可以同时使用多种授权模式 zookeeper通过授权策略scheme,授权对象id,授权策略permission来进行权限控制
授权模式
world 登陆zookeeper的所有用户(默认)
ip 使用ip进行认证
auth 对已添加的用户进行认证
digest 使用用户密码进行认证
授权权限
默认cdrwa
world授权模式
# 查看当前节点的权限
getAcl path
# 设置权限,授权模式为world
setAcl path world:[授权对象]:[授权权限]
实例
# 设置/node这个节点,world模式anyone用户drwa权限
setAcl /node world:anyone:drwa
# 这个时候无法创建子节点
create /node/nodeChild "test"
$ Authentication is not valid : /node/nodeChild
IP授权模式
通过./zkCli.sh -server [ip] 连接远程zookeeper
# 设置权限,授权模式为ip
setAcl path ip:[ip地址]:[授权权限]
实例
# 为节点授权IP地址,多个用逗号连接
setAcl /node ip:106.14.x.xxx:cdrwa
# 获取值
get /node
$ test
# 其它IP,无权获取
get /node
$ Authentication is not valid : /node
Auth授权模式
# 创建一个用户
addauth digest [账号]:[密码]
# auth
setAcl path auth:[账号]:[授权权限]
# 查看digest
getAcl path
实例
addauth digest mengnan:123456
setAcl /node auth:mengnan:cdrwa
getAcl /node
$ 'digest,'mengnan:D99QUIAbXdkPPr9XjWrAKYDTp1c=
: cdrwa
# 读取成功
get /node
$ test
# 如果退出客户端后则无法再获取
get /node
$ Authentication is not valid : /node
digest授权模式
通过手动创建的密文加密
echo -n mengnan:123456 | openssl dgst -binary -sha1 | openssl base64
$ D99QUIAbXdkPPr9XjWrAKYDTp1c=
为一个节点使用digest授权模式
setAcl path digest:[用户名]:[上述密文]:[授权权限]
实例
setAcl path digest:mengnan:D99QUIAbXdkPPr9XjWrAKYDTp1c=:cdrwa
超级管理员
设定一个超级管理员,可以任意访问所有节点
手动加密
echo -n super:admin | openssl dgst -binary -sha1 | openssl base64
$ xQJmxLMiHGwaqBvst5y6rkB6HQs=
修改zkServer.sh
# 通过vi的/nohup找到,增加Dzookeeper.DigestAuthenticationProvider.superDigest
nohup "$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}"
"-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}"
"-Dzookeeper.DigestAuthenticationProvider.superDigest=super:xQJmxLMiHGwaqBvst5y6rkB6HQs="\
为指定用户添加权限,之后该次连接对所有节点有任意访问权限
addauth digest super:admin
JavaAPI
构造方法
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
connectString 连接的ip:端口号(集群用逗号隔开)
sessionTimeout 超时毫秒时间
watcher 监视器对象,通过监视器对象返回连接状态
新增节点
create(final String path, byte data[], List<ACL> acl, CreateMode createMode)
path 需要创建的zNode路径
data 需要存储的数据
acl 节点的访问控制列表
createMode 节点的类型(临时/持久化等)
create(final String path, byte data[], List<ACL> acl,
CreateMode createMode, StringCallback cb, Object ctx)
StringCallback 异步回调
ctx 回调的参数传递的上下文参数
更新节点
setData(final String path, byte data[], int version)
path zNode路径
data data路径
version 验证版本号,如果为-1则跳过验证
setData(final String path, byte data[], int version, StatCallback cb, Object ctx)
cb(rc, path, ctx, stat) 回调函数
rc 为0代表成功
path 修改的路径
ctx 上下文参数路径
ctx 传递上下文参数
删除节点
delete(final String path, int version)
path zNode路径
version 验证版本号,如果为-1则跳过验证
delete(final String path, int version, VoidCallback cb, Object ctx)
cb(rc, path, ctx) 回调函数
rc 为0代表成功
path 删除的路径
ctx 上下文参数对象
ctx 传递上下文参数
查看节点
getData可以捕获到的事件 NodeDeleted节点删除,NodeDataChanged节点内容发生变化
getData(String path, boolean watch, Stat stat)
path zNode路径
watch 是否使用连接对象中注册的监视器
stat 返回zNode的元数据
getData(String path, boolean watch, DataCallback cb, Object ctx)
cb(rc, path, ctx, data, stat) 回调函数
rx 为0代表读取成功
path节点的路径
ctx 上下文参数对象
data 元数据
stat 节点属性
ctx 传递上下文参数
getData(final String path, Watcher watcher, Stat stat)
watcher 自定义监视器对象
stat 返回zNode的元数据
查看子节点
getChildren可以捕捉到的事件 NodeChildrenChanged子节点发生变化 NodeDeleted节点删除
getChildren(String path, boolean watch)
path 路径
watch 是否使用连接对象中注册的监视器
getChildren(String path, boolean watch, ChildrenCallback cb, Object ctx)
cb(rc, path, ctx, children) 回调函数
rc 为0代表读取成功
path 节点路径
ctx 上下文参数
children 子节点中的节点
ctx 传递上下文参数
getChildren(String path, Watcher watch)
watch 自定义监视器对象
查看节点是否存在
exists可以捕获到的事件 NodeCreated节点创建,NodeDeleted节点删除,NodeDataChanged 节点内容发送变化
exists(String path, boolean watch)
path zNode路径
watch 是否使用连接对象中注册的监视器
exists(String path, boolean watch, StatCallback cb, Object ctx)
cb(rc, path, ctx, stat) 回调函数
rc 为0代表读取成功
path 节点路径
ctx 上下文参数
stat 节点属性
ctx 传递上下文参数
exists(final String path, Watcher watcher)
watch 自定义监视器对象
使用Zookeeper
导入约束
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
</dependency>
<!--zookeeper日志都隐藏在lo4j中-->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
连接到zookeeper
@Test
public void test() throws Exception {
// 计数器对象
CountDownLatch countDownLatch = new CountDownLatch(1);
zooKeeper = new ZooKeeper(IP, 5000, event -> {
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
System.out.println("连接创建成功!");
// 放开阻塞
countDownLatch.countDown();
}
});
// 因为zookeeper是异步的,主线程阻塞等待对象创建成功
countDownLatch.await();
// 该连接会话编号
System.out.println(zooKeeper.getSessionId());
}
@After
public void after() throws Exception {
// 释放资源
zooKeeper.close();
}
新增节点
分离重复代码
@Before
public void before() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
zooKeeper = new ZooKeeper(IP, 5000, event -> {
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
System.out.println("连接创建成功!");
} else System.out.println("连接失败!");
countDownLatch.countDown();
});
countDownLatch.await();
}
@After
public void after() throws Exception {
zooKeeper.close();
}
同步方式
@Test
public void create() throws Exception {
String path = zooKeeper.create("/create", "create".getBytes(),
// ZooDefs.Ids.OPEN_ACL_UNSAFE world:anyone:cdrwa
// CreateMode.PERSISTENT 持久化节点
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
log.info(path);
log.info("创建节点");
}
ZooDefs.Ids使用
OPEN_ACL_UNSAFE 完全开放的权限
CREATOR_ALL_ACL 只有创建者才有权限
READ_ACL_UNSAFE 只能读取的权限
自定义,提供一个List<ACL> acls
@Test
public void create() throws Exception {
List<ACL> acls = new ArrayList<>();
// Id (授权模式,授权对象)
Id id = new Id("world", "anyone");
// ACL(权限信息,Id)
acls.add(new ACL(ZooDefs.Perms.READ, id));
acls.add(new ACL(ZooDefs.Perms.WRITE, id));
String path = zooKeeper.create("/create", "create".getBytes(), acls, CreateMode.PERSISTENT);
}
异步方式
和同步方式多了一个回调函数
@Test
public void create6() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
zooKeeper.create("/create", "create".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
(rc, path, ctx, name) -> {
// rc为0代表节点创建成功
System.out.println(rc);
// path和name描述节点路径
System.out.println(path);
System.out.println(name);
// 传递上下文,这里即"come here"字符串
System.out.println(ctx);
countDownLatch.countDown();
}, "come here");
// 异步,代码的执行和服务器是否接收到无关
countDownLatch.await();
}
Auth授权模式
账号密码
@Test
public void create2() throws Exception {
// 先添加授权用户
zooKeeper.addAuthInfo("digest", "mengnan:123456".getBytes());
// 给予cdrwa权限
zooKeeper.create("/create", "create".getBytes(),
Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
}
查看
getAcl /create
$ 'digest,'mengnan:D99QUIAbXdkPPr9XjWrAKYDTp1c=
: cdrwa
digest授权模式
自制密文
@Test
public void create4() throws Exception {
// 先添加授权用户
List<ACL> acls = new ArrayList<ACL>();
Id id = new Id("digest", "mengnan:D99QUIAbXdkPPr9XjWrAKYDTp1c=");
acls.add(new ACL(ZooDefs.Perms.ALL, id));
zooKeeper.create("/create", "create".getBytes(), acls, CreateMode.PERSISTENT);
}
查看
getAcl /create/node10
$ 'digest,'mengnan:D99QUIAbXdkPPr9XjWrAKYDTp1c=
: cdrwa
节点类型
@Test
public void create5() throws Exception {
String result = zooKeeper.create("/create", "create".getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
System.out.println(result);
}
CreateMode.PERSISTENT 持久节点
CreateMode.PERSISTENT_SEQUENTIAL 持久化有序节点
CreateMode.EPHEMERAL 临时节点
CreateMode.EPHEMERAL_SEQUENTIAL 临时有序节点
更新节点
同步方式
@Test
public void update() throws KeeperException, InterruptedException {
Stat stat = zooKeeper.setData("/set/node", "node".getBytes(), -1);
System.out.println(stat.getVersion());
System.out.println(stat.getCtime());
}
异步方式
@Test
public void update2() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
zooKeeper.setData("/set/node", "node".getBytes(), -1,
(rc, path, ctx, stat) -> {
System.out.println(rc);
System.out.println(path);
System.out.println(ctx);
System.out.println(stat.getVersion());
countDownLatch.countDown();
}, "context");
countDownLatch.await();
}
删除节点
同步方式
@Test
public void delete() throws Exception {
zooKeeper.delete("/set/nod1", -1);
}
异步方式
@Test
public void update2() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
zooKeeper.delete("/set/node", -1, (rc, path, ctx) -> {
System.out.println(rc);
System.out.println(path);
System.out.println(ctx);
countDownLatch.countDown();
}, "context");
countDownLatch.await();
}
查看节点
同步方式
@Test
public void get() throws KeeperException, InterruptedException {
Stat stat = new Stat();
byte[] data = zooKeeper.getData("/create", false, stat);
System.out.println(new String(data));
System.out.println(stat.getVersion());
}
异步方式
@Test
public void get2() {
CountDownLatch countDownLatch = new CountDownLatch(1);
zooKeeper.getData("/create", false, (rc, path, ctx, data, stat) -> {
System.out.println(rc);
System.out.println(path);
System.out.println(ctx);
System.out.println(new String(data));
System.out.println(stat.getVersion());
countDownLatch.countDown();
}, "context");
countDownLatch.await();
}
查看子节点
同步方式
@Test
public void getChild() throws KeeperException, InterruptedException {
List<String> children = zooKeeper.getChildren("/create", false);
System.out.println(children);
}
异步方式
@Test
public void getChild2() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
zooKeeper.getChildren("/create", false, (rc, path, ctx, children) -> {
System.out.println(rc);
System.out.println(path);
System.out.println(ctx);
System.out.println(children);
countDownLatch.countDown();
}, "context");
countDownLatch.await();
}
查看节点是否存在
同步方式
如果不存在,则返回null
@Test
public void exists() throws Exception {
// arg1:节点的路径
Stat stat = zooKeeper.exists("/exists", false);
if (stat != null)
System.out.println(stat.getVersion());
}
异步方式
@Test
public void exists2() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
zooKeeper.exists("/exists", false,
(rc, path, ctx, stat) -> {
System.out.println(rc);
System.out.println(path);
System.out.println(ctx);
if (stat != null) System.out.println(stat.getVersion());
countDownLatch.countDown();
}, "context");
countDownLatch.await();
}
Watcher
客户端先将Watcher注册到服务端,同时将Watcher对象保存到客户端的Watch管理器中。当Zookeeper服务端监听的数据状态发送改变时,服务端会主动通知客户端,记者客户端的Watch管理器会触发相关的Watch来回调相应处理逻辑,从而完成整体的数据发布/订阅流程
特性
一次性 watcher是一次性的,一旦触发就会被移除,再次使用需要重新注册
客户端顺序回调 watcher回调是顺序串行化执行的,只有回调后客户端才能看到最新的数据状态。一个watcher回调逻辑不应该太多,以免影响别的watcher执行
轻量级 WatchEvent是最小的通信单元,结构上只包含通知状态、事件类型和节点 路径,并不会得到数据节点变化前后的具体内容
时效性 watcher只有在当前session彻底失效时才会无效,若在session有效期内快速重连成功,则watcher依然存在,仍可接收到通知
使用
实现Watcher接口
public class ZKConnectionWatcher implements Watcher {
static CountDownLatch countDownLatch = new CountDownLatch(1);
static ZooKeeper zooKeeper;
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("连接创建成功");
System.out.println("path:" + event.getPath());
System.out.println("eventType" + event.getType());
countDownLatch.countDown();
}
}
}
Event.KeeperState枚举类型,是客户端与服务端连接状态发生变化时对应的通知类型
SyncConnected 客户端与服务器正常连接时
Disconnected 客户端与服务器断开连接时
Expired 会话session超时(超过构造方法中sessionTimeout定义的超时时间)
AuthFailed 身份认证失败时
使用ZooKeeper构造方法中定义的watcher
@Test
public void watcherExists() throws KeeperException, InterruptedException {
// true,使用连接对象中的watcher
Stat exists = zooKeeper.exists("/watcher", true);
if (exists != null) {
System.out.println(exists.getVersion());
}
// 手动阻塞,当该节点发生修改时执行构造方法中的回调函数
// 阻塞时即使发送了多次事件也只执行第一次,因为Watcher是一次性的
Thread.sleep(Integer.MAX_VALUE);
}
event.getType所有会返回列表,不同方法的监视器捕获到的事件不同,详细内容请参照JavaAPI部分
NodeCreated 节点创建
NodeDeleted 节点删除
NodeDataChanged 节点发送变化
NodeChildrenChanged 数据节点的子节点列表发生变更时
None 无状态
自定义watcher
watcher是一次性的
@Test
public void watcherExists2() throws KeeperException, InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
// 自定义监视器
zooKeeper.exists("/watcher", event -> {
System.out.println("使用自定义的监视器");
System.out.println("path:" + event.getPath());
System.out.println("eventType:" + event.getType());
countDownLatch.countDown();
});
countDownLatch.await();
}
实现消耗一次的同时创建一次Watcher
@Test
public void watcherExists3() throws KeeperException, InterruptedException {
Watcher watcher = new Watcher() {
@SneakyThrows
@Override
public void process(WatchedEvent event) {
System.out.println(event.getPath());
System.out.println(event.getType());
// 实现触发多次Watcher
byte[] data = zooKeeper.getData("/watcher", this, null);
System.out.println(new String(data));
}
};
byte[] data = zooKeeper.getData("/watcher", watcher, null);
System.out.println(new String(data));
Thread.sleep(Integer.MAX_VALUE);
}
可以同时为一个节点注册多个Watcher
@Test
public void watcherExists4() throws KeeperException, InterruptedException {
zooKeeper.exists("/watcher", event -> System.out.println(1));
zooKeeper.exists("/watcher", event -> System.out.println(2));
Thread.sleep(Integer.MAX_VALUE);
}
各种监听器代码完全相似,不重新记录
配置中心
当配置发生变化是自动完成配置缓存的改变
创建配置节点
create /config "config"
create /config/url "[IP地址]:3306"
create /config/username "[账号]"
create /config/passwrod "[密码]"
建立连接
@Data
public class ConfigCenter implements Watcher {
private final CountDownLatch countDownLatch = new CountDownLatch(1);
private ZooKeeper zooKeeper;
private final static String IP = "[IP地址]:2181";
// 配置信息
private String url;
private String username;
private String password;
@SneakyThrows
@Override
private void process(WatchedEvent event) {
if (event.getState() != Event.KeeperState.SyncConnected) {
System.out.println("连接失败" + event.getState());
return;
}
countDownLatch.countDown();
System.out.println("连接成功" + event.getState());
if (event.getType() == Event.EventType.NodeDataChanged) {
initValue();
}
}
// 1.建立连接
// 2.监听配置节点
}
监听配置节点
@SneakyThrows
public void initValue() {
zooKeeper = new ZooKeeper(IP, 5000, this);
// 阻塞线程,等待连接的创建
countDownLatch.await();
this.url = new String(zooKeeper.getData("/config/url", true, null));
this.username = new String(zooKeeper.getData("/config/username", true, null));
this.password = new String(zooKeeper.getData("/config/password", true, null));
// 只要修改配置信息就会打印到控制台上
System.out.println(this.getUsername());
System.out.println(this.getPassword());
System.out.println(this.getUrl());
System.out.println("-------获取修改后的配置信息成功-------");
}
测试
修改对应节点的值会触发Watcher
public static void main(String[] args) throws InterruptedException {
new ConfigCenter().initValue();
Thread.sleep(Integer.MAX_VALUE);
}
分布式唯一ID
实现
持久化的有序节点获取唯一ID,生成后返回唯一ID
建立连接
public class GloballyUniqueId implements Watcher {
private final CountDownLatch countDownLatch = new CountDownLatch(1);
private final ZooKeeper zooKeeper;
private final static String IP = "[IP地址]:2181";
private final static String defaultPath = "/uniqueId";
@SneakyThrows
@Override
public void process(WatchedEvent event) {
if (event.getState() != Event.KeeperState.SyncConnected) {
System.out.println("连接失败" + event.getState());
return;
}
countDownLatch.countDown();
System.out.println("连接成功" + event.getState());
}
@SneakyThrows
public GloballyUniqueId() {
zooKeeper = new ZooKeeper(IP, 5000, this);
countDownLatch.await();
}
// 1.建立连接
// 2.持久化有序节点获取返回唯一ID
}
生成分布式唯一ID
@SneakyThrows
public String getUniqueId() {
// 创建持久化有序节点
String path = zooKeeper.create(defaultPath, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL);
// 截取后面的数字字符串就可以作为分布式环境下的唯一id
return path.substring(9);
}
测试
public static void main(String[] args) {
GloballyUniqueId globallyUniqueId = new GloballyUniqueId();
// 全局ID各不相同,并且是之前的+1
for (int i = 1; i <= 15; i++) {
String uniqueId = globallyUniqueId.getUniqueId();
System.out.println(uniqueId);
}
}
分布式锁
思路
每个客户端往根节点下创建临时有序节点,客户端取得根节点下子节点,并进行排序,判断排在最前面的是否为本节点,如果自己的锁节点不在第一位,则监听自己前一位的锁节点
建立连接
public class ZookeeperLock {
private final CountDownLatch countDownLatch = new CountDownLatch(1);
private final ZooKeeper zooKeeper;
private static final String IP = "[IP地址]:2181";
private static final String LOCK_ROOT_PATH = "/Locks";
private static final String LOCK_NODE_NAME = "Lock_";
private String lockPath;
public ZookeeperLock() throws IOException, InterruptedException {
zooKeeper = new ZooKeeper(IP, 5000, event -> {
if (event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("连接创建成功");
countDownLatch.countDown();
}
});
countDownLatch.await();
}
// 1.创建节点
// 2.获取锁
// 3.释放锁
}
创建节点
private void createLock() throws KeeperException, InterruptedException {
Stat exists = zooKeeper.exists(LOCK_ROOT_PATH, false);
if (exists == null) {
// 判断根节点是否存在
zooKeeper.create(LOCK_ROOT_PATH, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
// 创建临时有序子节点
lockPath = zooKeeper.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME, null,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("节点创建成功" + lockPath);
}
获取锁
private void attemptLock() throws KeeperException, InterruptedException {
// 获取根节点的所有子节点
List<String> children = zooKeeper.getChildren(LOCK_ROOT_PATH, false);
// 排序
Collections.sort(children);
// 获取当前子节点的下标
int index = children.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1));
// 如果是0说明当前元素为第一个
if (index == 0) {
System.out.println("-------获取锁成功-------");
System.out.println("lockPath: " + lockPath);
// 如果不是第一个,则需要监视上一个节点是否被删除
} else {
String path = children.get(index - 1);
Stat exists = zooKeeper.exists(LOCK_ROOT_PATH + "/" + path, watcher);
// exists如果为null,则说明在执行上两行的时候上一个节点被删除了(双重检索)
if (exists != null) {
synchronized (watcher) {
// 阻塞
watcher.wait();
}
}
// 递归
attemptLock();
}
}
// 上一个节点是否删除监视器对象
private final Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
// 监听到删除事件,恢复线程
if (event.getType() == Event.EventType.NodeDeleted) {
synchronized (this) {
notifyAll();
}
}
}
};
释放锁
public void releaseLock() throws KeeperException, InterruptedException {
zooKeeper.delete(lockPath, -1);
zooKeeper.close();
System.out.println("锁已经释放" + lockPath);
}
测试
在一个线程中是同步进行的,在多个线程中是交替执行的
public class TicketSeller {
// 业务代码
private void sell() throws InterruptedException {
int sleepMillis = 5000;
Thread.sleep(sleepMillis);
System.out.println("收货结束");
}
public void sellTicketWithLock() throws Exception {
ZookeeperLock zookeeperLock = new ZookeeperLock();
zookeeperLock.acquireLock();
sell();
zookeeperLock.releaseLock();
}
public static void main(String[] args) throws Exception {
TicketSeller ticketSeller = new TicketSeller();
for (int i = 0; i < 10; i++) {
ticketSeller.sellTicketWithLock();
}
}
}
搭建集群
准备
docker-compose.yaml
version: '3'
services:
zoo1:
image: zookeeper:3.4.10
restart: always
container_name: zoo1
privileged: true
volumes:
- /data/zookeeper/zookeeper2181:/zookeeper-3.4.10/conf
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
zoo2:
image: zookeeper:3.4.10
restart: always
container_name: zoo2
privileged: true
volumes:
- /data/zookeeper/zookeeper2182:/zookeeper-3.4.10/conf
ports:
- "2182:2181"
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
zoo3:
image: zookeeper:3.4.10
restart: always
container_name: zoo3
privileged: true
volumes:
- /data/zookeeper/zookeeper2183:/zookeeper-3.4.10/conf
ports:
- "2183:2181"
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
启动
进入docker-compose.yaml路径后
# 启动
COMPOSE_PROJECT_NAME=zk_test docker-compose up
# 查看状态
COMPOSE_PROJECT_NAME=zk_test docker-compose ps
查看
依次执行,两个为follower,一个为leader,说明集群搭建成功
echo stat | nc 127.0.0.1 2183
# 登陆一个客户端,多个用逗号隔开
./zkCli.sh -server 127.0.0.1 2183
ZAB协议
Zookeepter Atomic Broadcast (zookeepter的原子广播)
Zookeeper有三种角色
ZAB协议工作原理,通过类似两阶段提交协议的方式解决数据一致性
leader从客户端收到一个写请求
leader生成一个新的事务并为它生成一个唯一的ZXID(事务ID)
leader将这个事务提议(propose)发送给所有follows节点
follower节点将收到的事务请求加入到历史队列中,并发送ack给leader
当leader收到大多数follower(半数以上节点)的ack消息,leader会发送commit请求
当follower收到commit请求时,从历史队列中将事务请求commit
Leader选举制度
服务器的状态
looking 当处于该状态时会被认为当前集群没有leader,因此需要选举
leading 领导者状态,leader
following 跟随者状态,follower
ovserving 观察者状态,observer
启动时选举
在集群初始化阶段,当有一台服务器server1启动时,其单独无法进行和完成 leader选举,当第二台服务器server2启动时,此时两台机器可以相互通信,每台机器都试图找到leader,于是进入leader选举过程
每个server发出一个投票,由于是初始情况,server1和server2都会将自己作为leader服务器来进行投票,每次投票会包含所推举的服务器的myid和zxid,使用 (myid, zxid)来表示,此时server1的投票为(1, 0),server2的投票为(2, 0),然后各自将这个投票发给集群中其他机器
集群中的每台服务器接收来自集群中各个服务器的投票并处理投票
对于server1、server2而言,都统计出集群中已经有两台机器接受 了(2, 0)的投票信息,此时便认为已经选出了leader为服务器ID更大的server2为leader
改变服务器状态
运行时选举
在zookeeper运行期间,leader与非leader服务器各司其职,即便当有非leader服务器宕机或新加入,此时也不会影响leader,但是如果leader服务器挂了,那么整个集群将暂停对外服务,进入新一轮leader选举,其过程和启动时期的Leader选举过程基本 一致
这个时候每个server会发出一个投票。在运行期间,每个服务器上的zxid可能不同,此时假定 server1的zxid为122,server3的zxid为122,在第一轮投票中,server1和server3都会投自己,产生投票(1, 122),(3, 122),然后各自将投票发送给集群中所有机器
Observer及其配置
为需要观察者的节点添加配置
peerType=observer
为每个配置文件server的配置追加:observer
server.3=[IP地址]:2888:3888:observer
重启后查看状态为observer
# 状态变为observer
/zookeeper-3.4.10/bin/zkServer.sh status
$ ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Mode: observer
连接集群
ZooKeeper zooKeeper = new ZooKeeper("[集群1]:[端口号],[集群2]:[端口号],[集群3]:[端口号]",
5000, (event -> {
if (event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("连接成功");
countDownLatch.countDown();
}
}));
countDownLatch.await();
System.out.println(zooKeeper.getSessionId());
zooKeeper.close();