命令操作
- 启动 、停止、查看状态
./zkServer.sh start
./zkServer.sh stop
./zkServer.sh status
- 连接客户端
./zkCli.sh -server localhost:2181
- 节点命令
javaAPI操作
- 引入依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
- API操作
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.List;
public class CuratorTest {
private static RetryPolicy retry ;
private static CuratorFramework client;
/**
* 创建
*/
@Test
void create() throws Exception {
String path = client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/xyx4/sd1", "china".getBytes());
System.out.println("path = " + path);
}
/**
* 查询
*/
@Test
void query() throws Exception {
// 查询数据 get
byte[] bytes = client.getData().forPath("/xyx4/sd");
System.out.println(new String(bytes));
// 查询子节点 ls
List<String> list = client.getChildren().forPath("/");
// 查询子节点状态信息 ls -s
Stat status = new Stat(); //节点信息会放在这个对象里
client.getData().storingStatIn(status).forPath("/xyx");
}
/**
* 修改数据
*/
@Test
void set() throws Exception {
Stat status = new Stat();
client.getData().storingStatIn(status).forPath("/xyx4/sd");
int version = status.getVersion();
client.setData().withVersion(version).forPath("/xyx4/sd", "zhouyu3".getBytes());
}
/**
* 删除
*/
@Test
void delete() throws Exception{
// 1.删除单个节点
client.delete().forPath("/xyx4/sd1");
// 2.删除带有子节点的节点
client.delete().deletingChildrenIfNeeded().forPath("/xyx4/sd");
// 3.必须成功的删除
client.delete().guaranteed().forPath("/xyx3");
// 4.回调
client.delete().guaranteed().inBackground((q1,q2) -> {
// 回调函数 执行删除后自动执行
}).forPath("/xyx3");
}
@BeforeEach
void setUp() {
retry = new ExponentialBackoffRetry(3000, 10);
client = CuratorFrameworkFactory.builder().connectString("82.157.174.50:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retry)
.namespace("zyw")
.build();
client.start();
}
@AfterEach
void tearDown() {
if (client != null) {
client.close();
}
}
}
Watch监听
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
public class CuratorTest1 {
private static RetryPolicy retry;
private static CuratorFramework client;
/**
* 给指定一个节点注册监听器
*/
@Test
void testNode() throws Exception {
// 1.创建NodeCache对象
NodeCache nodeCache = new NodeCache(client, "/");
nodeCache.getListenable().addListener(() -> {
byte[] data = nodeCache.getCurrentData().getData();
System.out.println(new String(data));
});
// 3.开启监听,如果设置为true,则开启监听是,加载缓冲数据
nodeCache.start(true);
while (true);
}
/**
* 监听某个节点的所有子节点,不感知自己的变化
*/
@Test
void testChildren() throws Exception {
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/", true);
pathChildrenCache.getListenable().addListener((client, event) -> {
System.out.println("子节点变化了");
// 获取类型
PathChildrenCacheEvent.Type type = event.getType();
if (PathChildrenCacheEvent.Type.CHILD_UPDATED.equals(type)) {
// 变更后的数据
byte[] data = event.getData().getData();
}
});
pathChildrenCache.start();
while (true);
}
/**
* 监听某个节点自己和所有子节点
*/
@Test
void testTreeCache() throws Exception{
TreeCache treeCache = new TreeCache(client, "/");
treeCache.getListenable().addListener((client, event) -> {
// TODO
});
treeCache.start();
while (true);
}
@BeforeEach
void setUp() {
retry = new ExponentialBackoffRetry(3000, 10);
client = CuratorFrameworkFactory.builder().connectString("82.157.174.50:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retry)
.namespace("zyw")
.build();
client.start();
}
@AfterEach
void tearDown() {
if (client != null) {
client.close();
}
}
}
ZooKeeper分布式锁
原理
Curator的五种锁方案
InterProcessMutex 演示
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.concurrent.TimeUnit;
public class TicketsSell implements Runnable {
private static int tickets = 50;
private InterProcessMutex lock;
public TicketsSell() {
RetryPolicy retry = new ExponentialBackoffRetry(3000, 10);
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("82.157.174.50:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retry)
.build();
client.start();
lock = new InterProcessMutex(client, "/lock");
}
@Override
public void run() {
while (true) {
try {
// 获取锁
boolean isLock = lock.acquire(300, TimeUnit.SECONDS);
if (isLock) {
if (tickets > 0) {
System.out.println(Thread.currentThread().getName() + ": " + tickets);
tickets--;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
// 释放锁
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
ZooKeeper集群
介绍
例如集群有5台zookeeper服务, 按顺序启动, 3号超过半数,3号就是Leader, 4和5号启动也不会变成Leader
3个服务的集群, 如果2个follower挂了, leader虽然没有挂, 但也无法对外提供服务.