zookeeper

288

Zookeeper是一个经典的分布式数据一致性的解决方案致力于为分布式系统提供一个高性能、高可用且拥有严格顺序访问控制能力的分布式协调存储服务

优点

维护配置信息

随着分布式系统的兴起,由于许多服务都需要使用到配置文件,因此必须保证该配置服务的高可用性和各台服务器上配置数据的一致性。通常会将配置文件部署在一个集群上,然而一个集群动辄上千台服务器,此时如果再一台台服务器逐个修改配置文件那将是非常繁琐且危险的的操作,因此就需要—种服务,能够高效快速且可靠地完成配置项的更改等操作,并能够保证各配置项在每台服务器上的数据—致性。zookeeper就可以提供这样一种服务,其使用Zab这种一致性协议来保证一致性

分部式锁服务

一个集群是一个分布式系统,由多台服务器组成。为了提高并发度和可靠性,多台服务器上运行着同一种服务。当多个服务在运行时就需要协调各服务的进度,有时候需要保证当某个服务在进行某个操作时,其他的服务都不能进行该操作,即对该操作进行加锁,如果当前机器挂掉后,释放锁并fail over到其他的机器继续执行该服务

集群管理

一个集群有时会因为各种软硬件故障或者网络故障,出现某些服务器挂掉而被移除集群,而某些服务器加入到集群中的情况,zookeeper会将这些服务器加入/移出的情况通知给集群中的其他正常工作的服务器,以及时调整存储和计算等任务的分配和执行等,此外zookeeper还会对故障的服务器做出诊断并尝试修复

生成分布式唯一ID

在单库单表型系统中,通常可以使用数据库字段自带的auto_increment属性来自动为每条记录生成一个唯一的ID。但是分库分表后,就无法在依靠数据库的auto_incrernent属性来唯一标识—条记录了,此时可以用zookeeper在分布式环境下生成全局唯一ID

数据模型

zookeeper的数据节点可以视为树状结构,书中的各节点被称为znode,一个znode可以有多个子节点。使用路径path来定位某个znode。znode兼具文件目录两种功能,即像文件一样维护者数据、元信息、ACL、时间戳的数据结构,又可以向目录一样作为路径的标识一部分

数据类型.svg

  • 节点数据 zNode就像key,value的关系

  • 节点的子节点 children

  • 节点的状态 用来描述节点的创建、修改记录、cZxid、ctime等

节点状态属性

# 获取该节点的节点状态
stat path
  • cZxid 数据节点创建时的事务ID

对节点的所有写操作都会让zookeeper开启一个事务,并未每一个事务维护一个事务ID

  • ctime 数据节点创建的时间

  • mZxid 数据节点最后一次更新的事务ID

  • mtime 数据节点最后一次更新的时间

  • pZxid 数据节点的子节点组后一次被修改的事务ID

  • cversion 子节点的更改次数

  • dataVersion 节点数据的更改次数

  • aclVersion 节点的ACL更改次数

当前节点的权限列表被修改的次数

  • ephemeralOwner 是否是临时节点

如果节点是临时节点,则创建该节点会话的SessionID,如果是持久形节点则为0

连接zookeeper会建立一个会话,当会话结束临时节点将会自动删除,并且临时节点不允许拥有子节点

  • dataLength 数据内容的长度(字节)

  • numChildren 数据节点当前的子节点个数

安装

zookeeper的使用需要java环境,这里使用的是docker所以不需要考虑这个问题

不同版本的zookeeper配置文件目录不同,请参照dockerhub

非docker需要安装后启动/bin/zkServer.sh start启动(status查看状态,stop停止)

zookeeper无法直接通过-v挂载配置文件,否则会导致

/docker-entrypoint.sh: line 15: /conf/zoo.cfg: Permission denied

docker run -d --privileged=true \
 --name zookeeper \
-p 2181:2181 \
--restart=always \
zookeeper:3.4.10
​
docker exec -it [容器id] bash
​
# 执行zkCli.sh
/zookeeper-3.4.10/bin/zkCli.sh
​
# 查看服务状态
/zookeeper-3.4.10/bin/zkServer.sh status

修改zoo.cfg

# 对外暴露的服务端口号
clientPort=2181
# zookeeper数据的内存快照、即事务日志文件
dataDir=/data/zookeeper/data
dataLogDir=/data/zookeeper/log

节点命令

新增节点

path为键(路径) data为值

键必须由 / 开始

# -s为有序节点 -e为临时节点
create [-s] [-e] path data
​
# 通过键获取值
get path

不使用-e为持久化节点,即使退出服务数据依然存在

使用-e为临时节点,服务结束数据失效

-s会为节点名增加序列号,之后需要通过新的节点名获取值

持久化有序节点可以为分布式环境创建唯一ID

更新节点

path为键 data为修改后的值

set path data
​
# 校验修改
set path data [dataVersion]

基于dataVersion修改,若和当前节点的数据不符时,则修改不会生效(乐观锁)

dataVersion可以通过stat命令查看

高版本需要改为set path -v [dataVersion] data

删除节点

delete path [dataVersion]

同样若通过dataVersion删除,若不符合则删除不会生效

并且节点必须为空才能删除

意思是若存在/hadoop/node,则delete /haddop无法使用

强制删除

删除节点和所有子节点

rmr path

节点列表

# 查看该节点所有子节点
ls path
​
# 同时还可以查看该节点的状态
ls2 path

监听器命令

一个监听器的使用只能使用一次,也就是说下列命令只有第一次会生效

get path和stat path

# 为当前客户端的该节点注册监听事件
get path watch
stat path watch

当其它客户端修改了该节点后,会显示以下 可以通过该特性捕获最新的配置信息

WatchedEvent state:SyncConnected type:NodeDataChanged path: [节点名]

ls和ls2 path

# 监听当前节点的子节点
ls patch watch
ls2 patch watch

当其它客户端修改创建了该节点的子节点,会显示以下

WatchedEvent state:SyncConnected type:NodeChildrenChanged path: [节点名]

ACL权限控制

Access Control List访问控制列表 一个节点可以同时使用多种授权模式 zookeeper通过授权策略scheme,授权对象id,授权策略permission来进行权限控制

授权模式

  • world 登陆zookeeper的所有用户(默认)

  • ip 使用ip进行认证

  • auth 对已添加的用户进行认证

  • digest 使用用户密码进行认证

授权权限

默认cdrwa

权限

ACL简写

描述

create

c

可以创建子节点

delete

d

可以删除子节点(仅下一级节点)

read

r

可以读取节点数据及显示子节点列表

write

w

可以设置节点数据

admin

a

可以设置节点访问控制列表权限

world授权模式

# 查看当前节点的权限
getAcl path

# 设置权限,授权模式为world
setAcl path world:[授权对象]:[授权权限]

实例

# 设置/node这个节点,world模式anyone用户drwa权限
setAcl /node world:anyone:drwa

# 这个时候无法创建子节点
create /node/nodeChild "test"
$ Authentication is not valid : /node/nodeChild 

IP授权模式

通过./zkCli.sh -server [ip] 连接远程zookeeper

# 设置权限,授权模式为ip
setAcl path ip:[ip地址]:[授权权限]

实例

# 为节点授权IP地址,多个用逗号连接
setAcl /node ip:106.14.x.xxx:cdrwa

# 获取值
get /node
$ test
# 其它IP,无权获取
get /node
$ Authentication is not valid : /node

Auth授权模式

# 创建一个用户
addauth digest [账号]:[密码]

# auth
setAcl path auth:[账号]:[授权权限]

# 查看digest
getAcl path

实例

addauth digest mengnan:123456

setAcl /node auth:mengnan:cdrwa

getAcl /node
$ 'digest,'mengnan:D99QUIAbXdkPPr9XjWrAKYDTp1c=
: cdrwa

# 读取成功
get /node
$ test

# 如果退出客户端后则无法再获取
get /node
$ Authentication is not valid : /node

digest授权模式

通过手动创建的密文加密

echo -n mengnan:123456 | openssl dgst -binary -sha1 | openssl base64
$ D99QUIAbXdkPPr9XjWrAKYDTp1c=

为一个节点使用digest授权模式

setAcl path digest:[用户名]:[上述密文]:[授权权限]

实例

setAcl path digest:mengnan:D99QUIAbXdkPPr9XjWrAKYDTp1c=:cdrwa

超级管理员

设定一个超级管理员,可以任意访问所有节点

手动加密

echo -n super:admin | openssl dgst -binary -sha1 | openssl base64
$ xQJmxLMiHGwaqBvst5y6rkB6HQs=

修改zkServer.sh

# 通过vi的/nohup找到,增加Dzookeeper.DigestAuthenticationProvider.superDigest
nohup "$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" 
"-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" 
"-Dzookeeper.DigestAuthenticationProvider.superDigest=super:xQJmxLMiHGwaqBvst5y6rkB6HQs="\

为指定用户添加权限,之后该次连接对所有节点有任意访问权限

addauth digest super:admin

JavaAPI

构造方法

ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
  • connectString 连接的ip:端口号(集群用逗号隔开)

  • sessionTimeout 超时毫秒时间

  • watcher 监视器对象,通过监视器对象返回连接状态

新增节点

create(final String path, byte data[], List<ACL> acl, CreateMode createMode)
  • path 需要创建的zNode路径

  • data 需要存储的数据

  • acl 节点的访问控制列表

  • createMode 节点的类型(临时/持久化等)

create(final String path, byte data[], List<ACL> acl,
            CreateMode createMode,  StringCallback cb, Object ctx)
  • StringCallback 异步回调

  • ctx 回调的参数传递的上下文参数

更新节点

setData(final String path, byte data[], int version)
  • path zNode路径

  • data data路径

  • version 验证版本号,如果为-1则跳过验证

setData(final String path, byte data[], int version, StatCallback cb, Object ctx)
  • cb(rc, path, ctx, stat) 回调函数

rc 为0代表成功

path 修改的路径

ctx 上下文参数路径

  • ctx 传递上下文参数

删除节点

delete(final String path, int version)
  • path zNode路径

  • version 验证版本号,如果为-1则跳过验证

delete(final String path, int version, VoidCallback cb, Object ctx)
  • cb(rc, path, ctx) 回调函数

rc 为0代表成功

path 删除的路径

ctx 上下文参数对象

  • ctx 传递上下文参数

查看节点

getData可以捕获到的事件 NodeDeleted节点删除,NodeDataChanged节点内容发生变化

getData(String path, boolean watch, Stat stat)
  • path zNode路径

  • watch 是否使用连接对象中注册的监视器

  • stat 返回zNode的元数据

getData(String path, boolean watch, DataCallback cb, Object ctx)
  • cb(rc, path, ctx, data, stat) 回调函数

rx 为0代表读取成功

path节点的路径

ctx 上下文参数对象

data 元数据

stat 节点属性

  • ctx 传递上下文参数

getData(final String path, Watcher watcher, Stat stat)
  • watcher 自定义监视器对象

  • stat 返回zNode的元数据

查看子节点

getChildren可以捕捉到的事件 NodeChildrenChanged子节点发生变化 NodeDeleted节点删除

getChildren(String path, boolean watch)
  • path 路径

  • watch 是否使用连接对象中注册的监视器

getChildren(String path, boolean watch, ChildrenCallback cb, Object ctx)
  • cb(rc, path, ctx, children) 回调函数

rc 为0代表读取成功

path 节点路径

ctx 上下文参数

children 子节点中的节点

  • ctx 传递上下文参数

getChildren(String path, Watcher watch)
  • watch 自定义监视器对象

查看节点是否存在

exists可以捕获到的事件 NodeCreated节点创建,NodeDeleted节点删除,NodeDataChanged 节点内容发送变化

exists(String path, boolean watch)
  • path zNode路径

  • watch 是否使用连接对象中注册的监视器

exists(String path, boolean watch, StatCallback cb, Object ctx)
  • cb(rc, path, ctx, stat) 回调函数

rc 为0代表读取成功

path 节点路径

ctx 上下文参数

stat 节点属性

  • ctx 传递上下文参数

exists(final String path, Watcher watcher)
  • watch 自定义监视器对象

使用Zookeeper

导入约束

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.10</version>
</dependency>
<!--zookeeper日志都隐藏在lo4j中-->
<dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.16</version>
</dependency>

连接到zookeeper

@Test
public void test() throws Exception {
    // 计数器对象
    CountDownLatch countDownLatch = new CountDownLatch(1);
    zooKeeper = new ZooKeeper(IP, 5000, event -> {
        if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
            System.out.println("连接创建成功!");
            // 放开阻塞
            countDownLatch.countDown();
        }
    });
    // 因为zookeeper是异步的,主线程阻塞等待对象创建成功
    countDownLatch.await();
    // 该连接会话编号
    System.out.println(zooKeeper.getSessionId());
}
@After
public void after() throws Exception {
    // 释放资源
    zooKeeper.close();
}

新增节点

分离重复代码

@Before
public void before() throws Exception {
    CountDownLatch countDownLatch = new CountDownLatch(1);
    zooKeeper = new ZooKeeper(IP, 5000, event -> {
        if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
            System.out.println("连接创建成功!"); 
        } else System.out.println("连接失败!"); 
        countDownLatch.countDown();
    });
    countDownLatch.await();
}
@After
public void after() throws Exception {
    zooKeeper.close();
}

同步方式

@Test
public void create() throws Exception {
    String path = zooKeeper.create("/create", "create".getBytes(),
            // ZooDefs.Ids.OPEN_ACL_UNSAFE world:anyone:cdrwa
            // CreateMode.PERSISTENT 持久化节点
            Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    log.info(path);
    log.info("创建节点");
}

ZooDefs.Ids使用

  • OPEN_ACL_UNSAFE 完全开放的权限

  • CREATOR_ALL_ACL 只有创建者才有权限

  • READ_ACL_UNSAFE 只能读取的权限

  • 自定义,提供一个List<ACL> acls

@Test
public void create() throws Exception {
    List<ACL> acls = new ArrayList<>();
    // Id (授权模式,授权对象)
    Id id = new Id("world", "anyone");
    // ACL(权限信息,Id)
    acls.add(new ACL(ZooDefs.Perms.READ, id));
    acls.add(new ACL(ZooDefs.Perms.WRITE, id));
    String path = zooKeeper.create("/create", "create".getBytes(), acls, CreateMode.PERSISTENT);
}

异步方式

和同步方式多了一个回调函数

@Test
public void create6() throws Exception {
    CountDownLatch countDownLatch = new CountDownLatch(1);
    zooKeeper.create("/create", "create".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
            (rc, path, ctx, name) -> {
                // rc为0代表节点创建成功
                System.out.println(rc);
                // path和name描述节点路径
                System.out.println(path);
                System.out.println(name);
			   // 传递上下文,这里即"come here"字符串
                System.out.println(ctx);
                countDownLatch.countDown();
            }, "come here");
    // 异步,代码的执行和服务器是否接收到无关
    countDownLatch.await();
}

Auth授权模式

账号密码

@Test
public void create2() throws Exception {
    // 先添加授权用户
    zooKeeper.addAuthInfo("digest", "mengnan:123456".getBytes());
    // 给予cdrwa权限
    zooKeeper.create("/create", "create".getBytes(),
            Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
}

查看

getAcl /create
$ 'digest,'mengnan:D99QUIAbXdkPPr9XjWrAKYDTp1c=
: cdrwa

digest授权模式

自制密文

@Test
public void create4() throws Exception {
    // 先添加授权用户
    List<ACL> acls = new ArrayList<ACL>();
    Id id = new Id("digest", "mengnan:D99QUIAbXdkPPr9XjWrAKYDTp1c=");
    acls.add(new ACL(ZooDefs.Perms.ALL, id));
    zooKeeper.create("/create", "create".getBytes(), acls, CreateMode.PERSISTENT);
}

查看

getAcl /create/node10
$ 'digest,'mengnan:D99QUIAbXdkPPr9XjWrAKYDTp1c=
: cdrwa

节点类型

@Test
public void create5() throws Exception {
    String result = zooKeeper.create("/create", "create".getBytes(),
            Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
    System.out.println(result);
}
  • CreateMode.PERSISTENT 持久节点

  • CreateMode.PERSISTENT_SEQUENTIAL 持久化有序节点

  • CreateMode.EPHEMERAL 临时节点

  • CreateMode.EPHEMERAL_SEQUENTIAL 临时有序节点

更新节点

同步方式

@Test
public void update() throws KeeperException, InterruptedException {
    Stat stat = zooKeeper.setData("/set/node", "node".getBytes(), -1);
    System.out.println(stat.getVersion());
    System.out.println(stat.getCtime());
}

异步方式

@Test
public void update2() throws InterruptedException {
  CountDownLatch countDownLatch = new CountDownLatch(1);
  zooKeeper.setData("/set/node", "node".getBytes(), -1,
          (rc, path, ctx, stat) -> {
            System.out.println(rc);
            System.out.println(path);
            System.out.println(ctx);
            System.out.println(stat.getVersion());
            countDownLatch.countDown();
          }, "context");
  countDownLatch.await();
}

删除节点

同步方式

@Test
public void delete() throws Exception {
    zooKeeper.delete("/set/nod1", -1);
}

异步方式

@Test
public void update2() throws InterruptedException {
    CountDownLatch countDownLatch = new CountDownLatch(1);
    zooKeeper.delete("/set/node", -1, (rc, path, ctx) -> {
        System.out.println(rc);
        System.out.println(path);
        System.out.println(ctx);
        countDownLatch.countDown();
    }, "context");
    countDownLatch.await();
}

查看节点

同步方式

@Test
public void get() throws KeeperException, InterruptedException {
    Stat stat = new Stat();
    byte[] data = zooKeeper.getData("/create", false, stat);
    System.out.println(new String(data));
    System.out.println(stat.getVersion());
}

异步方式

@Test
public void get2() {
    CountDownLatch countDownLatch = new CountDownLatch(1);
    zooKeeper.getData("/create", false, (rc, path, ctx, data, stat) -> {
        System.out.println(rc);
        System.out.println(path);
        System.out.println(ctx);
        System.out.println(new String(data));
        System.out.println(stat.getVersion());
        countDownLatch.countDown();
    }, "context");
    countDownLatch.await();
}

查看子节点

同步方式

@Test
public void getChild() throws KeeperException, InterruptedException {
    List<String> children = zooKeeper.getChildren("/create", false);
    System.out.println(children);
}

异步方式

@Test
public void getChild2() throws InterruptedException {
    CountDownLatch countDownLatch = new CountDownLatch(1);
    zooKeeper.getChildren("/create", false, (rc, path, ctx, children) -> {
        System.out.println(rc);
        System.out.println(path);
        System.out.println(ctx);
        System.out.println(children);
        countDownLatch.countDown();
    }, "context");
    countDownLatch.await();
}

查看节点是否存在

同步方式

如果不存在,则返回null

@Test
public void exists() throws Exception {
    // arg1:节点的路径
    Stat stat = zooKeeper.exists("/exists", false);
    if (stat != null)
        System.out.println(stat.getVersion());
}

异步方式

@Test
public void exists2() throws InterruptedException {
    CountDownLatch countDownLatch = new CountDownLatch(1);
    zooKeeper.exists("/exists", false,
            (rc, path, ctx, stat) -> {
                System.out.println(rc);
                System.out.println(path);
                System.out.println(ctx);
                if (stat != null) System.out.println(stat.getVersion());
                countDownLatch.countDown();
            }, "context");
    countDownLatch.await();
}

Watcher

客户端先将Watcher注册到服务端,同时将Watcher对象保存到客户端的Watch管理器中。当Zookeeper服务端监听的数据状态发送改变时,服务端会主动通知客户端,记者客户端的Watch管理器会触发相关的Watch来回调相应处理逻辑,从而完成整体的数据发布/订阅流程

watcher架构.svg

特性

  • 一次性 watcher是一次性的,一旦触发就会被移除,再次使用需要重新注册

  • 客户端顺序回调 watcher回调是顺序串行化执行的,只有回调后客户端才能看到最新的数据状态。一个watcher回调逻辑不应该太多,以免影响别的watcher执行

  • 轻量级 WatchEvent是最小的通信单元,结构上只包含通知状态、事件类型和节点 路径,并不会得到数据节点变化前后的具体内容

  • 时效性 watcher只有在当前session彻底失效时才会无效,若在session有效期内快速重连成功,则watcher依然存在,仍可接收到通知

使用

实现Watcher接口

public class ZKConnectionWatcher implements Watcher {
    static CountDownLatch countDownLatch = new CountDownLatch(1);
    static ZooKeeper zooKeeper;

    @Override
    public void process(WatchedEvent event) {
        if (event.getState() == Event.KeeperState.SyncConnected) {
            System.out.println("连接创建成功");
            System.out.println("path:" + event.getPath());
            System.out.println("eventType" + event.getType());
            countDownLatch.countDown();
        }
    }
}

Event.KeeperState枚举类型,是客户端与服务端连接状态发生变化时对应的通知类型

  • SyncConnected 客户端与服务器正常连接时

  • Disconnected 客户端与服务器断开连接时

  • Expired 会话session超时(超过构造方法中sessionTimeout定义的超时时间)

  • AuthFailed 身份认证失败时

使用ZooKeeper构造方法中定义的watcher

@Test
public void watcherExists() throws KeeperException, InterruptedException {
    // true,使用连接对象中的watcher
    Stat exists = zooKeeper.exists("/watcher", true);
    if (exists != null) {
        System.out.println(exists.getVersion());
    }
    // 手动阻塞,当该节点发生修改时执行构造方法中的回调函数
    // 阻塞时即使发送了多次事件也只执行第一次,因为Watcher是一次性的
    Thread.sleep(Integer.MAX_VALUE);
}

event.getType所有会返回列表,不同方法的监视器捕获到的事件不同,详细内容请参照JavaAPI部分

  • NodeCreated 节点创建

  • NodeDeleted 节点删除

  • NodeDataChanged 节点发送变化

  • NodeChildrenChanged 数据节点的子节点列表发生变更时

  • None 无状态

自定义watcher

watcher是一次性的

@Test
public void watcherExists2() throws KeeperException, InterruptedException {
  CountDownLatch countDownLatch = new CountDownLatch(1);
  // 自定义监视器
  zooKeeper.exists("/watcher", event -> {
    System.out.println("使用自定义的监视器");
    System.out.println("path:" + event.getPath());
    System.out.println("eventType:" + event.getType());
    countDownLatch.countDown();
  });
  countDownLatch.await();
}

实现消耗一次的同时创建一次Watcher

@Test
public void watcherExists3() throws KeeperException, InterruptedException {
    Watcher watcher = new Watcher() {
        @SneakyThrows
        @Override
        public void process(WatchedEvent event) {
            System.out.println(event.getPath());
            System.out.println(event.getType());
            // 实现触发多次Watcher
            byte[] data = zooKeeper.getData("/watcher", this, null);
            System.out.println(new String(data));
        }
    };
    byte[] data = zooKeeper.getData("/watcher", watcher, null);
    System.out.println(new String(data));
    Thread.sleep(Integer.MAX_VALUE);
}

可以同时为一个节点注册多个Watcher

@Test
public void watcherExists4() throws KeeperException, InterruptedException {
    zooKeeper.exists("/watcher", event -> System.out.println(1));
    zooKeeper.exists("/watcher", event -> System.out.println(2));
    Thread.sleep(Integer.MAX_VALUE);
}

各种监听器代码完全相似,不重新记录

配置中心

当配置发生变化是自动完成配置缓存的改变

创建配置节点

create /config "config"
create /config/url "[IP地址]:3306"
create /config/username "[账号]"
create /config/passwrod "[密码]"

建立连接

@Data
public class ConfigCenter implements Watcher {
    private final CountDownLatch countDownLatch = new CountDownLatch(1);
    private ZooKeeper zooKeeper;
    private final static String IP = "[IP地址]:2181";

    // 配置信息
    private String url;
    private String username;
    private String password;
  	
    @SneakyThrows
    @Override
    private void process(WatchedEvent event) {
        if (event.getState() != Event.KeeperState.SyncConnected) {
            System.out.println("连接失败" + event.getState());
            return;
        }
        countDownLatch.countDown();
        System.out.println("连接成功" + event.getState());
        if (event.getType() == Event.EventType.NodeDataChanged) {
            initValue();
        }
    }
    
    // 1.建立连接
    // 2.监听配置节点
}

监听配置节点

@SneakyThrows
public void initValue() {
    zooKeeper = new ZooKeeper(IP, 5000, this);
    // 阻塞线程,等待连接的创建
    countDownLatch.await();
    this.url = new String(zooKeeper.getData("/config/url", true, null));
    this.username = new String(zooKeeper.getData("/config/username", true, null));
    this.password = new String(zooKeeper.getData("/config/password", true, null));
	// 只要修改配置信息就会打印到控制台上
    System.out.println(this.getUsername());
    System.out.println(this.getPassword());
    System.out.println(this.getUrl());
    System.out.println("-------获取修改后的配置信息成功-------");
}

测试

修改对应节点的值会触发Watcher

public static void main(String[] args) throws InterruptedException {
    new ConfigCenter().initValue();
    Thread.sleep(Integer.MAX_VALUE);
}

分布式唯一ID

实现

持久化的有序节点获取唯一ID,生成后返回唯一ID

建立连接

public class GloballyUniqueId implements Watcher {
    private final CountDownLatch countDownLatch = new CountDownLatch(1);
    private final ZooKeeper zooKeeper;
    private final static String IP = "[IP地址]:2181";
    private final static String defaultPath = "/uniqueId";

    @SneakyThrows
    @Override
    public void process(WatchedEvent event) {
        if (event.getState() != Event.KeeperState.SyncConnected) {
            System.out.println("连接失败" + event.getState());
            return;
        }
        countDownLatch.countDown();
        System.out.println("连接成功" + event.getState());
    }

    @SneakyThrows
    public GloballyUniqueId() {
        zooKeeper = new ZooKeeper(IP, 5000, this);
        countDownLatch.await();
    }

	// 1.建立连接
    // 2.持久化有序节点获取返回唯一ID
}

生成分布式唯一ID

@SneakyThrows
public String getUniqueId() {
    // 创建持久化有序节点
    String path = zooKeeper.create(defaultPath, null, Ids.OPEN_ACL_UNSAFE,
                                   CreateMode.PERSISTENT_SEQUENTIAL);
    // 截取后面的数字字符串就可以作为分布式环境下的唯一id
    return path.substring(9);
}

测试

public static void main(String[] args) {
    GloballyUniqueId globallyUniqueId = new GloballyUniqueId();
    // 全局ID各不相同,并且是之前的+1
    for (int i = 1; i <= 15; i++) {
        String uniqueId = globallyUniqueId.getUniqueId();
        System.out.println(uniqueId);
    }
}

分布式锁

思路

每个客户端往根节点下创建临时有序节点,客户端取得根节点下子节点,并进行排序,判断排在最前面的是否为本节点,如果自己的锁节点不在第一位,则监听自己前一位的锁节点

建立连接

public class ZookeeperLock {
    private final CountDownLatch countDownLatch = new CountDownLatch(1);
    private final ZooKeeper zooKeeper;
    private static final String IP = "[IP地址]:2181";
    private static final String LOCK_ROOT_PATH = "/Locks";
    private static final String LOCK_NODE_NAME = "Lock_";
    private String lockPath;

    public ZookeeperLock() throws IOException, InterruptedException {
        zooKeeper = new ZooKeeper(IP, 5000, event -> {
            if (event.getState() == Event.KeeperState.SyncConnected) {
                System.out.println("连接创建成功");
                countDownLatch.countDown();
            }
        });
        countDownLatch.await();
    }
    
    // 1.创建节点
    // 2.获取锁
    // 3.释放锁
}

创建节点

private void createLock() throws KeeperException, InterruptedException {
    Stat exists = zooKeeper.exists(LOCK_ROOT_PATH, false);
    if (exists == null) {
        // 判断根节点是否存在
        zooKeeper.create(LOCK_ROOT_PATH, null, Ids.OPEN_ACL_UNSAFE,
                CreateMode.PERSISTENT);
    }
    // 创建临时有序子节点
    lockPath = zooKeeper.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME, null,
            Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    System.out.println("节点创建成功" + lockPath);
}

获取锁

private void attemptLock() throws KeeperException, InterruptedException {
    // 获取根节点的所有子节点
    List<String> children = zooKeeper.getChildren(LOCK_ROOT_PATH, false);
    // 排序
    Collections.sort(children);
    // 获取当前子节点的下标
    int index = children.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1));
    // 如果是0说明当前元素为第一个
    if (index == 0) {
        System.out.println("-------获取锁成功-------");
        System.out.println("lockPath: " + lockPath);
        // 如果不是第一个,则需要监视上一个节点是否被删除
    } else {
        String path = children.get(index - 1);
        Stat exists = zooKeeper.exists(LOCK_ROOT_PATH + "/" + path, watcher);
        // exists如果为null,则说明在执行上两行的时候上一个节点被删除了(双重检索)
        if (exists != null) {
            synchronized (watcher) {
                // 阻塞
                watcher.wait();
            }
        }
        // 递归
        attemptLock();
    }
}
// 上一个节点是否删除监视器对象
private final Watcher watcher = new Watcher() {
    @Override
    public void process(WatchedEvent event) {
        // 监听到删除事件,恢复线程
        if (event.getType() == Event.EventType.NodeDeleted) {
            synchronized (this) {
                notifyAll();
            }
        }
    }
};

释放锁

public void releaseLock() throws KeeperException, InterruptedException {
    zooKeeper.delete(lockPath, -1);
    zooKeeper.close();
    System.out.println("锁已经释放" + lockPath);
}

测试

在一个线程中是同步进行的,在多个线程中是交替执行的

public class TicketSeller {
    // 业务代码
    private void sell() throws InterruptedException {
        int sleepMillis = 5000;
        Thread.sleep(sleepMillis);
        System.out.println("收货结束");
    }

    public void sellTicketWithLock() throws Exception {
        ZookeeperLock zookeeperLock = new ZookeeperLock();
        zookeeperLock.acquireLock();
        sell();
        zookeeperLock.releaseLock();
    }

    public static void main(String[] args) throws Exception {
        TicketSeller ticketSeller = new TicketSeller();
        for (int i = 0; i < 10; i++) {
            ticketSeller.sellTicketWithLock();
        }
    }
}

搭建集群

准备

docker-compose.yaml

version: '3'
services:
    zoo1:
        image: zookeeper:3.4.10
        restart: always
        container_name: zoo1
        privileged: true
        volumes:
        	- /data/zookeeper/zookeeper2181:/zookeeper-3.4.10/conf
        ports:
            - "2181:2181"
        environment:
            ZOO_MY_ID: 1
            ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
 
    zoo2:
        image: zookeeper:3.4.10
        restart: always
        container_name: zoo2
        privileged: true
        volumes:
        	- /data/zookeeper/zookeeper2182:/zookeeper-3.4.10/conf
        ports:
            - "2182:2181"
        environment:
            ZOO_MY_ID: 2
            ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
 
    zoo3:
        image: zookeeper:3.4.10
        restart: always
        container_name: zoo3
        privileged: true
        volumes:
        	- /data/zookeeper/zookeeper2183:/zookeeper-3.4.10/conf
        ports:
            - "2183:2181"
        environment:
            ZOO_MY_ID: 3
            ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888

启动

进入docker-compose.yaml路径后

# 启动
COMPOSE_PROJECT_NAME=zk_test docker-compose up

# 查看状态
COMPOSE_PROJECT_NAME=zk_test docker-compose ps

查看

依次执行,两个为follower,一个为leader,说明集群搭建成功

echo stat | nc 127.0.0.1 2183

# 登陆一个客户端,多个用逗号隔开
./zkCli.sh -server 127.0.0.1 2183

ZAB协议

Zookeepter Atomic Broadcast (zookeepter的原子广播)

Zookeeper有三种角色

角色

描述

领导者

领导者负责进行投票发起和决议,更新系统状态

学习者(跟随者)

跟随者用于接受客户请求并向客户端返回结果,在选择过程中参与投票

学习者(观察者)

观察者可以接受客户端连接,将请求转发给领导者,但观察者不参与投票,只是同步领导的状态

客户端

请求发起方

集群.svg

ZAB协议工作原理,通过类似两阶段提交协议的方式解决数据一致性

  • leader从客户端收到一个写请求

  • leader生成一个新的事务并为它生成一个唯一的ZXID(事务ID)

  • leader将这个事务提议(propose)发送给所有follows节点

  • follower节点将收到的事务请求加入到历史队列中,并发送ack给leader

  • 当leader收到大多数follower(半数以上节点)的ack消息,leader会发送commit请求

  • 当follower收到commit请求时,从历史队列中将事务请求commit

Leader选举制度

服务器的状态

  • looking 当处于该状态时会被认为当前集群没有leader,因此需要选举

  • leading 领导者状态,leader

  • following 跟随者状态,follower

  • ovserving 观察者状态,observer

启动时选举

在集群初始化阶段,当有一台服务器server1启动时,其单独无法进行和完成 leader选举,当第二台服务器server2启动时,此时两台机器可以相互通信,每台机器都试图找到leader,于是进入leader选举过程

  • 每个server发出一个投票,由于是初始情况,server1和server2都会将自己作为leader服务器来进行投票,每次投票会包含所推举的服务器的myid和zxid,使用 (myid, zxid)来表示,此时server1的投票为(1, 0),server2的投票为(2, 0),然后各自将这个投票发给集群中其他机器

  • 集群中的每台服务器接收来自集群中各个服务器的投票并处理投票

    对于server1、server2而言,都统计出集群中已经有两台机器接受 了(2, 0)的投票信息,此时便认为已经选出了leader为服务器ID更大的server2为leader

  • 改变服务器状态

运行时选举

在zookeeper运行期间,leader与非leader服务器各司其职,即便当有非leader服务器宕机或新加入,此时也不会影响leader,但是如果leader服务器挂了,那么整个集群将暂停对外服务,进入新一轮leader选举,其过程和启动时期的Leader选举过程基本 一致

这个时候每个server会发出一个投票。在运行期间,每个服务器上的zxid可能不同,此时假定 server1的zxid为122,server3的zxid为122,在第一轮投票中,server1和server3都会投自己,产生投票(1, 122),(3, 122),然后各自将投票发送给集群中所有机器

Observer及其配置

为需要观察者的节点添加配置

peerType=observer

为每个配置文件server的配置追加:observer

server.3=[IP地址]:2888:3888:observer

重启后查看状态为observer

# 状态变为observer
/zookeeper-3.4.10/bin/zkServer.sh status
$ ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Mode: observer

连接集群

ZooKeeper zooKeeper = new ZooKeeper("[集群1]:[端口号],[集群2]:[端口号],[集群3]:[端口号]",
        5000, (event -> {
  if (event.getState() == Event.KeeperState.SyncConnected) {
    System.out.println("连接成功");
    countDownLatch.countDown();
  }
}));
countDownLatch.await();
System.out.println(zooKeeper.getSessionId());
zooKeeper.close();

未完待续

Curator