ZooKeeper Java API
ZooKeeper是一个分布式应用程序协调服务,主要用于解决分布式集群中应用系统的一致性问题。它能提供类似文件系统的目录节点树方式的数据存储,主要用途是维护和监控所存数据的状态变化,以实现对集群的管理。
ZooKeeper提供了Java API操作接口
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.10</version> </dependency>
package com.xc.xcspringboot.test;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
/**
* 对ZooKeeper节点数据的查询、删除和修改
*/
public class NodeTestDemo {
static ZooKeeper zk;
static {
String connectStr = "172.19.25.170:2181,172.19.25.171:2181,172.19.25.172:2181";
// 参数1:服务器连接字符串
// 参数2:连接超时时间
// 参数3:观察者对象(回调方法)
try {
zk = new ZooKeeper(connectStr, 3000, null);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
create();
// setNodeData();
// getNodeData();
// getNodeDataWatch();
// getNodeDataWatch2();
// deletePath();
existsPath();
}
/**
* 创建ZooKeeper节点,并设置元数据
*/
public static void create() throws Exception {
/*
* CreateMode 取值如下:
* PERSISTENT:持久节点
* PERSISTENT_SEQUENTIAL:持久顺序节点(自动编号)
* EPHEMERAL:临时节点,客户端断开连接时,这种节点会被自动删除
* EPHEMERAL_SEQUENTIAL:临时顺序节点
*/
String path = zk.create("/zk001", "zk001_data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("path " + path);
}
/**
* 修改节点数据
*/
public static void setNodeData() throws Exception {
Stat stat = zk.setData("/zk001", "zk001_data_new".getBytes(), -1);
//输出节点版本号
System.out.println("输出节点版本号 " + stat.getVersion());
}
/**
* 获取节点元数据
*/
public static void getNodeData() throws Exception {
Stat stat = new Stat();
//返回指定路径上的节点数据和节点状态,节点的状态会放入stat对象中
byte[] bytes = zk.getData("/zk001", null, stat);
//输出节点元数据
System.out.println("输出节点元数据 " + new String(bytes));
}
/**
* 获取节点数据,并加入观察者对象Watcher(一次监听)
*/
public static void getNodeDataWatch() throws Exception {
Stat stat = new Stat();
//返回指定路径上的节点数据和节点状态,节点的状态会放入stat对象中
byte[] bytes = zk.getData("/zk001", new Watcher() {
//实现process()方法
public void process(WatchedEvent event) {
System.out.println("实现process()方法 " + event.getType());
}
}, stat);
System.out.println("输出节点元数据 " + new String(bytes));
//改变节点数据,触发watch
zk.setData("/zk001", "zk001_data_testwatch".getBytes(), -1);
//为了验证是否触发了watch,不让程序结束
while (true) {
Thread.sleep(3000);
}
}
/**
* 获取节点数据,并加入观察者对象Watcher,实现持续监听
*/
public static void getNodeDataWatch2() throws Exception {
final Stat stat = new Stat();
//定义Watcher对象
Watcher watcher = new Watcher() {
//实现process()方法
public void process(WatchedEvent event) {
//输出事件类型
System.out.println("实现process()方法 " + event.getType());
//重新设置监听,参数this代表当前Watcher对象
try {
zk.getData("/zk001", this, stat);
} catch (Exception e) {
e.printStackTrace();
}
}
};
//返回指定路径上的节点数据和节点状态,并设置Watcher监听,节点的状态会放入stat对象中
byte[] bytes = zk.getData("/zk001", watcher, stat);
System.out.println("输出节点元数据 " + new String(bytes));
//为了验证是否触发了watch,不让程序结束
while (true) {
//改变节点数据,触发watch
zk.setData("/zk001", "zk001_data_testwatch2".getBytes(), -1);
Thread.sleep(3000);
}
}
/**
* 删除节点
*/
public static void deletePath() throws Exception {
//删除节点
zk.delete("/zk001", -1);
}
public static void existsPath() throws Exception {
Stat stat = zk.exists("/zk001", null);
System.out.println("exists " + stat.toString());
}
}
书籍: Hadoop大数据技术开发实战 6.4 ZooKeeper Java API操作
https://gitee.com/caoyeoo0/xc-springboot/blob/zookeeper/src/main/java/com/xc/xcspringboot/test/NodeTestDemo.java

浙公网安备 33010602011771号