Part 1 理论部分
1 什么是Zookeeper
Zookeeper是一个分布式协调服务,提供了协调分布式应用中的服务的能力,可以对分布式项目中的服务实现服务注册和发现、服务治理。对外暴露一组通用服务——分布式同步(Distributed Synchronization)、命名服务(Naming Service)、集群维护(Group Maintenance)等,简化分布式项目中服务协调及其管理的难度,提供高性能的分布式协调服务。
Zookeeper本身可以单机模式安装运行,不过它的优点在于通过分布式Zookeeper集群(一个Leader,多个Follower),可以基于一定的策略来保证Zookeeper集群的稳定性和可用性,从而实现分布式项目的可靠性。
需要注意:
1、Zookeeper是为其他分布式项目服务的
2、Zookeeper本质就是一个分布式协调服务程序(只要有半数以上节点存活,zk就能正常服务)
3、Zookeeper所提供的服务涵盖:主从协调、服务器节点动态上下线、分布式锁、统一配置管理、统一名称服务等
4、虽然说可以提供各种服务,但是zookeeper在底层其实只提供了两个功能:
(1) 管理(存储,读取)用户在程序中提交的数据(类似namenode中存放的metadata);
(2) 为用户程序提供数据节点监听服务;
2 Zookeeper集群机制
Zookeeper集群是中心化集群,集群中的角色: Leader主节点 和 Follower从节点
Zookeeper集群可用的原则:只要集群中有半数以上节点存活,并且集群当前的状态为非选举状态,集群就能提供服务
3 Zookeeper集群特性
1、中心化集群,一主多从:zookeeper集群由一个leader主节点和多个follower从节点组成的
2、全局数据一致性:每个server保存一份相同的数据副本,client无论连接到哪个server,读取的数据都是一致的
3、读写分离:leader负责写操作和更新请求转发,follower负责读操作和执行leader的命令
4、更新请求有序进行:来自同一个client的更新请求将按其发送顺序依次执行
5、数据更新原子性:一次数据更新要么成功,要么失败,必须有结果。
6、数据读取实时性:在一定的时间范围内,client总是能读到最新的数据。
4 Zookeeper数据结构
1、zookeeper采用层次化的目录结构,命名符合常规文件系统规范(类似文件系统)
2、每个节点在zookeeper中叫做znode,并且都有一个全局唯一的路径标识
3、znode可以包含数据和子节点(但是EPHEMERAL短暂类型的节点不能有子节点)
4、znode节点有两种类型:ephemeral短暂、persistent持久
短暂(ephemeral):客户端断开连接会自动删除
(create -e /app1/test1 “test1” 客户端断开连接zk删除ephemeral类型节点)
持久(persistent):客户端断开连接不会删除
(create -s /app1/test2 “test2” 客户端断开连接zk不删除persistent类型节点)
5、znode有四种形式的目录节点(默认是persistent持久)
PERSISTENT (持久)(默认)
PERSISTENT_SEQUENTIAL(持久序列/test0000000019)
EPHEMERAL(短暂)
EPHEMERAL_SEQUENTIAL(短暂序列/test0000000025)
6、创建znode时可以设置顺序标识,znode名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护。
7、在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序。
5 Zookeeper应用场景
5.1 数据发布和订阅(配置中心)
数据发布和订阅,即所谓的配置中心,顾名思义就是发布者将数据发布到zookeeper节点上,供订阅者动态获取数据,实现配置信息的集中管理和动态更新。
例如全局的配置信息、基于SOA设计的面向服务的项目中的服务地址列表等就非常适合使用。
5.2 软负载均衡
在分布式环境中,为了保证高可用,通常同一个应用或同一个服务的提供方都会在不同的服务器上部署多份,从而搭建服务集群,达到对等服务。而消费者就需要在这些对等的服务中选择一个服务来执行相关的业务逻辑,这就是软负载均衡。
软负载均衡比较典型的实现案例就是MQ消息中间件中的生产者和消费者之间的交互,比如Kafka和 MetaQ等消息中间件就是通过zookeeper实现的生产者和消费者之间的软负载均衡。
MetaQ软负载均衡的实现机制:
生产者负载均衡:MetaQ生产者在发送消息之前必须选择broker集群中的一台broker上的一个分区来发送消息,因此MetaQ在运行过程中,会把集群中所有的broker和对应的分区信息全部注册到zookeeper指定节点上。默认的实现策略是一个依次轮询的过程,生产者在通过zookeeper获取broker上的分区列表之后,会按照brokerId和partition的顺序组织成一个有序的分区列表,发送的时候会按照从头到尾循环往复的方式选择一个分区来发送消息。
消费者负载均衡:在消费过程中,MetaQ消费者会消费一个或多个分区中的消息,但是一个分区只会由一个消费者来消费。MetaQ需要保证所有的分区都有消费者消费,所以MetaQ消费者的消费策略是:
1.每个分区针对同一个group只挂载一个消费者。
2.如果同一个group的消费者数目大于分区数目,则多出来的消费者将不参与消费。
3.如果同一个group的消费者数目小于分区数目,则有部分消费者需要额外承担消费任务。
注意:在某个消费者故障或者重启等情况下,其他消费者会感知到这一变化(通过 zookeeper watch消费者列表实现),然后会重新进行软负载均衡,保证所有的分区都有消费者进行消费。
5.3 命名服务(Naming Service)
命名服务也是分布式系统中比较常见的一类场景。在分布式系统中,通过使用命名服务,客户端能够根据指定名字来获取资源或服务的地址、提供者等信息。被命名的实体通常可以是集群中的机器,提供的服务地址,远程对象等等——这些都可以统称他们为名字(Name),其中比较常见的就是一些基于SOA设计的面向服务的项目中的服务地址列表。通过调用zookeeper提供的创建节点的API,能够很容易创建一个全局唯一的path,这个path就可以作为一个名称。
阿里巴巴开源的分布式服务框架Dubbo中使用zookeeper来作为其命名服务、维护全局服务地址列表的工具。在Dubbo实现中:
服务提供方在启动的时候,会向zookeeper上的指定节点/dubbo/${serviceName}/providers目录下写入自己的URL地址,这个操作就完成了服务的发布。
服务消费方在启动的时候,会订阅/dubbo/${serviceName}/providers目录下提供者的URL地址,并向/dubbo/${serviceName} /consumers目录下写入自己消费者的URL地址。
注意:所有向zookeeper上注册的地址本质都是临时节点,服务提供方和消费方其实都是连接到zookeeper的客户端,这样就能够保证服务提供方和消费方能够自动感知zookeeper server端资源的变化。
另外,Dubbo还有针对服务粒度的监控,实现方法是订阅/dubbo/${serviceName}目录下所有的服务提供方和消费方的信息。
5.4 分布式通知和协调
zookeeper中持有watcher注册和异步通知机制,能够很好的实现分布式环境下不同系统之间的通知与协调,从而能对数据变更进行实时处理。
通常的实现方法是:不同系统都对zookeeper上同一个znode进行注册,监听znode的变化(包括znode本身数据及其子节点),当某一个系统update了这个znode,其他的系统能够及时收到通知,并作出相应处理。
zookeeper实现分布式通知和协调的底层实现机制:
1.心跳检测机制:检测系统和被检测系统之间并不直接关联起来,而是通过zookeeper上某个节点进行关联,大大减少系统耦合。
2.系统调度模式:在系统调度模式中,系统由控制台和推送系统两部分组成。控制台的职责是控制推送系统进行相应的推送工作,管理人员在控制台做的一些操作,实际上是修改了zookeeper上某些节点的状态,而zookeeper就把这些节点的变化通知给推送系统,推送系统负责注册Watcher的客户端,然后执行相应的推送任务。
3.工作汇报模式:类似于CDN任务分发系统实现机制:子任务启动后,在zookeeper上注册一个临时节点,并且定时将自己的任务进度进行汇报(定时将进度写回这个临时节点),这样任务管理者就能够实时知道任务进度。
总之,使用zookeeper提供的分布式通知和协调能力能够大大降低系统之间的耦合度。
5.5 分布式锁
基于zookeeper实现分布式锁,主要得益于zookeeper保证了数据的强一致性。zookeeper实现的分布式锁可以分为两类:保持独占和控制时序。
1.保持独占,就是同一时刻所有试图来获取这个锁的客户端,最终只有一个能成功获取到这把锁。通常的做法是把 zookeeper上的一个 znode 看作是一把锁,通过 create znode 的方式来获取锁。所有客户端同时创建 /distribute_lock 节点,最终只会有一个客户端能创建成功,这是因为znode路径是全局唯一的,所以最终只有能成功创建节点的客户端获取到了这把锁并得到执行机会,其他客户端都不会被安排执行。
2.控制时序,就是同一时刻所有试图来获取这个锁的客户端,最终都会被安排执行,只是会有全局时序。控制时序的实现方式和保持独占的实现方式基本类似,只是在控制时序这种实现方式中, /distribute_lock 节点已经预先存在,客户端在它下面创建临时有序的节点(可以通过节点的属性控制:CreateMode.EPHEMERAL_SEQUENCE 来指定)。zookeeper的父节点(/distribute_lock)维护一份单调递增的计数器sequence来保证子节点创建的时序性,从而实现了每个客户端获取锁的全局时序。
5.6 集群管理和Leader选举
zookeeper集群管理的核心是集群机器监控:集群机器监控通常用于对集群中机器状态、机器在线率有较高要求的场景,能够快速对集群中的机器变化作出响应。在这样的场景下,往往会有一个监控系统,可以实时监测集群中的机器是否存活。监控系统过去的实现做法通常是:监控系统通过某种手段(比如ping)定时检测每个机器,或者每个机器自己定时向监控系统汇报“我还活着”。这种做法可行,但是存在两个比较明显的问题:
1. 集群中机器有变动的时候,牵连修改的东西比较多。
2. 有一定的延时。
利用zookeeper的两个特性,就可以实现另一种集群机器存活性监控系统:
1. 客户端在节点znode上注册一个Watcher,这样如果znode的子节点发生变化,会通知该客户端。
2. 在注册了watcher的节点上创建EPHEMERAL短暂类型的子节点,一旦服务器和客户端的会话结束或过期,该节点就会被自动删除。
例如,监控系统在/clusterServers节点上注册一个Watcher,以后每当动态增加一个新服务器的时候,就往/clusterServers节点下创建一个 EPHEMERAL短暂类型的子节点:/clusterServers/{hostname}. 这样,监控系统就能够实时知道机器的增减情况,后续处理的就是监控系统自己的业务了。
Leader选举是zookeeper集群管理中最为经典的应用场景。
在分布式环境中,相同的业务应用分布在不同的服务器上,有些通用的业务逻辑(例如一些耗时的计算,网络I/O处理等),往往只需要让整个集群中的某一台机器(Leader)执行,其余机器(Follower)可以共享这个执行结果,这样可以大大减少重复劳动,提高性能。于是如何进行Leader选举就是在这种场景下碰到的主要问题。再比如,zookeeper server端集群如果Leader节点宕机了,如何在剩余可用的节点中选举出新的Leader节点,以保证后续zookeeper服务的正常使用,也是我们需要考虑的。
zookeeper集群的Leader选举分为server端的集群选举和客户端的集群选举。
1.
server端的集群选举实现机制:
之前提到,zookeeper集群在选举过程中,会导致整个zookeeper集群短暂不可用,这样做的目的是保证不同zookeeper节点的数据强一致性,因为zookeeper对于节点的数据一致性控制是非常严格的,是典型的分布式CP模式的实现。
server端的集群选举实现过程:
1 先比较每个zookeeper节点的zxid,zxid谁最大谁就是leader节点
2 若zxid相同,则比较zxid相同的节点的myid,myid谁最大谁就是leader节点
当Leader节点宕机后会重新进行选举,选举实现策略同理。
2.客户端的集群选举实现机制:分为静态选举和动态选举
2.1静态选举:利用zookeeper具备的数据强一致性和保持独占的特性,能够保证在分布式高并发情况下节点创建的全局唯一性。即:同时有多个客户端请求创建/currentMaster节点,最终一定只有一个客户端能够创建成功,谁创建成功谁就是客户端Leader。利用这个特性,就能很轻易的在分布式环境中进行客户端的集群选举。
2.2动态选举:需要用到EPHEMERAL_SEQUENCE短暂序列形式节点的特性。
之前提到,在静态选举中,同一时刻所有客户端创建同一个节点的请求,最终只有一个客户端能够创建成功(保持独占)。而动态选举则是允许所有请求都能够创建成功,但是得有个全局顺序(控制时序)。于是所有的客户端请求最终在zookeeper上创建的一系列节点可能是这样: /currentMaster/{sessionId}-1,/currentMaster/{sessionId}-2,/currentMaster/{sessionId}-3……等等。每次选取顺序号最小的那个机器作为客户端Leader,如果这个机器挂了,基于EPHEMERAL短暂类型节点的特性,这个机器在zookeeper上创建的节点会被直接删除,删除之后,其他机器中顺序号最小的机器就是新的客户端Leader,以此类推。
Leader选举的应用场景:
1. 在全文检索系统中,如果集群中每个机器都生成一份全量索引,不仅耗时,而且不能保证彼此之间的索引数据一致。因此可以让集群中的Leader来进行全量索引的生成,然后同步到集群中的其它机器。另外,Leader选举的容灾机制是,可以随时手动指定Leader,也就是说当应用在zookeeper中无法获取Leader信息时,可以通过比如http方式,指定一台机器作为客户端Leader。
2. 在HBase中,也是使用zookeeper来实现的Leader动态选举。在HBase实现中,会在zookeeper上存储一些ROOT表的地址和Leader的地址,每个RegionServer也会把自己以临时顺序编号节点(EPHEMERAL_SEQUENCE短暂序列形式)的方式注册到zookeeper中,使得Leader可以随时感知到各个RegionServer的存活状态。同时,一旦Leader出现问题,会按照顺序号重新选举出一个新的Leader来运行,从而避免了Leader的单点问题。
Part 2 实践部分
Zookeeper安装
Zookeeper windows环境安装
环境要求:必须要有jdk环境,本次使用jdk1.8
1.安装jdk
2.安装Zookeeper. 在官网Apache ZooKeeper下载zookeeper.我下载的是zookeeper-3.4.6版本。
解压zookeeper-3.4.6至D:\machine\zookeeper-3.4.6.
在D:\machine 新建data及log目录。
3.ZooKeeper的安装模式分为三种,分别为:单机模式(stand-alone)、集群模式和集群伪分布模式。ZooKeeper 单机模式的安装相对比较简单,如果第一次接触ZooKeeper的话,建议安装ZooKeeper单机模式或者集群伪分布模式。
安装单击模式。 至D:\machine\zookeeper-3.4.6\conf 复制 zoo_sample.cfg 并粘贴到当前目录下,命名zoo.cfg.
Java操作Zookeeper
Zookeeper说明
创建节点(znode) 方法:
create:
提供了两套创建节点的方法,同步和异步创建节点方式。
同步方式:
参数1,节点路径《名称) : InodeName (不允许递归创建节点,也就是说在父节点不存在
的情况下,不允许创建子节点)
参数2,节点内容: 要求类型是字节数组(也就是说,不支持序列化方式,如果需要实现序
列化,可使用java相关序列化框架,如Hessian、Kryo框架)
参數3,节点权限: 使用Ids.OPEN_ACL_UNSAFE开放权限即可。(这个参数一般在权展
没有太高要求的场景下,没必要关注)
参数4,节点类型: 创建节点的类型: CreateMode,提供四种首点象型
PERSISTENT 持久化节点 PERSISTENT_SEQUENTIAL 顺序自动编号持久化节点,这种节点会根据当前已存在的节点数自动加 1 EPHEMERAL 临时节点, 客户端session超时这类节点就会被自动删除 EPHEMERAL_SEQUENTIAL 临时自动编号节点 |
Maven依赖信息
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency> |
Zookeeper客户端连接
public class Test001 { //连接地址 private static final String ADDRES = "127.0.0.1:2181"; //session 会话 private static final int SESSION_OUTTIME = 2000; //信号量,阻塞程序执行,用户等待zookeeper连接成功,发送成功信号, private static final CountDownLatch countDownLatch = new CountDownLatch(1); public static void main(String[] args) throws IOException, InterruptedException, KeeperException { ZooKeeper zk = new ZooKeeper(ADDRES, SESSION_OUTTIME, new Watcher() { public void process(WatchedEvent event) { // 获取事件状态 KeeperState keeperState = event.getState(); // 获取事件类型 EventType eventType = event.getType(); if (KeeperState.SyncConnected == keeperState) { if (EventType.None == eventType) { countDownLatch.countDown(); System.out.println("zk 启动连接..."); } } } }); // 进行阻塞 countDownLatch.await(); String result = zk.create("/itmayeidu_Lasting", "Lasting".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println(result); zk.close(); } } |
创建Zookeeper节点信息
String result = zk.create("/ittcf_Lasting", "Lasting".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("result:" + result);
String result = zk.create("/ittcf_temp", "temp".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("result:" + result); |
Watcher
在ZooKeeper中,接口类Watcher用于表示一个标准的事件处理器,其定义了事件通知相关的逻辑,包含KeeperState和EventType两个枚举类,分别代表了通知状态和事件类型,同时定义了事件的回调方法:process(WatchedEvent event)。
什么是Watcher接口
同一个事件类型在不同的通知状态中代表的含义有所不同,表7-3列举了常见的通知状态和事件类型。
表7-3 Watcher通知状态与事件类型一览
KeeperState |
EventType |
触发条件 |
说明 |
|
None |
客户端与服务端成功建立连接 |
|
SyncConnected |
NodeCreated |
Watcher监听的对应数据节点被创建 |
|
|
NodeDeleted |
Watcher监听的对应数据节点被删除 |
此时客户端和服务器处于连接状态 |
|
NodeDataChanged |
Watcher监听的对应数据节点的数据内容发生变更 |
|
|
NodeChildChanged |
Wather监听的对应数据节点的子节点列表发生变更 |
|
Disconnected |
None |
客户端与ZooKeeper服务器断开连接 |
此时客户端和服务器处于断开连接状态 |
Expired |
Node |
会话超时 |
此时客户端会话失效,通常同时也会受到SessionExpiredException异常 |
AuthFailed |
None |
通常有两种情况,1:使用错误的schema进行权限检查 2:SASL权限检查失败 |
通常同时也会收到AuthFailedException异常 |
表7-3中列举了ZooKeeper中最常见的几个通知状态和事件类型。
回调方法process()
process方法是Watcher接口中的一个回调方法,当ZooKeeper向客户端发送一个Watcher事件通知时,客户端就会对相应的process方法进行回调,从而实现对事件的处理。process方法的定义如下:
abstract public void process(WatchedEvent event);
这个回调方法的定义非常简单,我们重点看下方法的参数定义:WatchedEvent。
WatchedEvent包含了每一个事件的三个基本属性:通知状态(keeperState),事件类型(EventType)和节点路径(path),其数据结构如图7-5所示。ZooKeeper使用WatchedEvent对象来封装服务端事件并传递给Watcher,从而方便回调方法process对服务端事件进行处理。
提到WatchedEvent,不得不讲下WatcherEvent实体。笼统地讲,两者表示的是同一个事物,都是对一个服务端事件的封装。不同的是,WatchedEvent是一个逻辑事件,用于服务端和客户端程序执行过程中所需的逻辑对象,而WatcherEvent因为实现了序列化接口,因此可以用于网络传输。
服务端在生成WatchedEvent事件之后,会调用getWrapper方法将自己包装成一个可序列化的WatcherEvent事件,以便通过网络传输到客户端。客户端在接收到服务端的这个事件对象后,首先会将WatcherEvent还原成一个WatchedEvent事件,并传递给process方法处理,回调方法process根据入参就能够解析出完整的服务端事件了。
需要注意的一点是,无论是WatchedEvent还是WatcherEvent,其对ZooKeeper服务端事件的封装都是机及其简单的。举个例子来说,当/zk-book这个节点的数据发生变更时,服务端会发送给客户端一个“ZNode数据内容变更”事件,客户端只能够接收到如下信
Watcher代码
public class ZkClientWatcher implements Watcher { // 集群连接地址 private static final String CONNECT_ADDRES = "192.168.0.100:2181,192.168.0.101:2181,192.168.0.102:2181"; // 会话超时时间 private static final int SESSIONTIME = 2000; // 信号量,让zk在连接之前等待,连接成功后才能往下走. private static final CountDownLatch countDownLatch = new CountDownLatch(1); private static String LOG_MAIN = "【main】 "; private ZooKeeper zk; public void createConnection(String connectAddres, int sessionTimeOut) { try { zk = new ZooKeeper(connectAddres, sessionTimeOut, this); System.out.println(LOG_MAIN + "zk 开始启动连接服务器...."); countDownLatch.await(); } catch (Exception e) { e.printStackTrace(); } } public boolean createPath(String path, String data) { try { this.exists(path, true); this.zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println(LOG_MAIN + "节点创建成功, Path:" + path + ",data:" + data); } catch (Exception e) { e.printStackTrace(); return false; } return true; } /** * 判断指定节点是否存在 * * @param path * 节点路径 */ public Stat exists(String path, boolean needWatch) { try { return this.zk.exists(path, needWatch); } catch (Exception e) { e.printStackTrace(); return null; } } public boolean updateNode(String path,String data) throws KeeperException, InterruptedException { exists(path, true); this.zk.setData(path, data.getBytes(), -1); return false; } public void process(WatchedEvent watchedEvent) { // 获取事件状态 KeeperState keeperState = watchedEvent.getState(); // 获取事件类型 EventType eventType = watchedEvent.getType(); // zk 路径 String path = watchedEvent.getPath(); System.out.println("进入到 process() keeperState:" + keeperState + ", eventType:" + eventType + ", path:" + path); // 判断是否建立连接 if (KeeperState.SyncConnected == keeperState) { if (EventType.None == eventType) { // 如果建立建立成功,让后程序往下走 System.out.println(LOG_MAIN + "zk 建立连接成功!"); countDownLatch.countDown(); } else if (EventType.NodeCreated == eventType) { System.out.println(LOG_MAIN + "事件通知,新增node节点" + path); } else if (EventType.NodeDataChanged == eventType) { System.out.println(LOG_MAIN + "事件通知,当前node节点" + path + "被修改...."); } else if (EventType.NodeDeleted == eventType) { System.out.println(LOG_MAIN + "事件通知,当前node节点" + path + "被删除...."); } } System.out.println("--------------------------------------------------------"); } public static void main(String[] args) throws KeeperException, InterruptedException { ZkClientWatcher zkClientWatcher = new ZkClientWatcher(); zkClientWatcher.createConnection(CONNECT_ADDRES, SESSIONTIME); // boolean createResult = zkClientWatcher.createPath("/p15", "pa-644064"); zkClientWatcher.updateNode("/pa2","12345678"); } } |
使用Zookeeper实现负载均衡原理
思路
使用Zookeeper实现负载均衡原理,服务器端将启动的服务注册到,zk注册中心上,采用临时节点。客户端从zk节点上获取最新服务节点信息,本地使用负载均衡算法,随机分配服务器。
创建项目工程
Maven依赖
<dependencies> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.8</version> </dependency> </dependencies> |
创建Server服务端
ZkServerSocket服务
//##ServerScoekt服务端 public class ZkServerSocket implements Runnable { private int port = 18080; public static void main(String[] args) throws IOException { int port = 18080; ZkServerScoekt server = new ZkServerScoekt(port); Thread thread = new Thread(server); thread.start(); } public ZkServerScoekt(int port) { this.port = port; } public void run() { ServerSocket serverSocket = null; try { serverSocket = new ServerSocket(port); System.out.println("Server start port:" + port); Socket socket = null; while (true) { socket = serverSocket.accept(); new Thread(new ServerHandler(socket)).start(); } } catch (Exception e) { e.printStackTrace(); } finally { try { if (serverSocket != null) { serverSocket.close(); } } catch (Exception e2) { } } } } |
ServerHandler
public class ServerHandler implements Runnable { private Socket socket; public ServerHandler(Socket socket) { this.socket = socket; } public void run() { BufferedReader in = null; PrintWriter out = null; try { in = new BufferedReader(new InputStreamReader(this.socket.getInputStream())); out = new PrintWriter(this.socket.getOutputStream(), true); String body = null; while (true) { body = in.readLine(); if (body == null) break; System.out.println("Receive : " + body); out.println("Hello, " + body); } } catch (Exception e) { if (in != null) { try { in.close(); } catch (IOException e1) { e1.printStackTrace(); } } if (out != null) { out.close(); } if (this.socket != null) { try { this.socket.close(); } catch (IOException e1) { e1.printStackTrace(); } this.socket = null; } } } } |
ZkServerClient
public class ZkServerClient { public static List<String> listServer = new ArrayList<String>(); public static void main(String[] args) { initServer(); ZkServerClient client= new ZkServerClient(); BufferedReader console = new BufferedReader(new InputStreamReader(System.in)); while (true) { String name; try { name = console.readLine(); if ("exit".equals(name)) { System.exit(0); } client.send(name); } catch (IOException e) { e.printStackTrace(); } } } // 注册所有server public static void initServer() { listServer.clear(); listServer.add("127.0.0.1:18080"); } // 获取当前server信息 public static String getServer() { return listServer.get(0); }
public void send(String name) { String server = ZkServerClient.getServer(); String[] cfg = server.split(":"); Socket socket = null; BufferedReader in = null; PrintWriter out = null; try { socket = new Socket(cfg[0], Integer.parseInt(cfg[1])); in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(), true); out.println(name); while (true) { String resp = in.readLine(); if (resp == null) break; else if (resp.length() > 0) { System.out.println("Receive : " + resp); break; } } } catch (Exception e) { e.printStackTrace(); } finally { if (out != null) { out.close(); } if (in != null) { try { in.close(); } catch (IOException e) { e.printStackTrace(); } } if (socket != null) { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } } |
改造ZkServerScoekt
public class ZkServerScoekt implements Runnable { private static int port = 18081; public static void main(String[] args) throws IOException { ZkServerScoekt server = new ZkServerScoekt(port); Thread thread = new Thread(server); thread.start(); } public ZkServerScoekt(int port) { this.port = port; } public void regServer() { // 向ZooKeeper注册当前服务器 ZkClient client = new ZkClient("127.0.0.1:2181", 60000, 1000); String path = "/test/server" + port; if (client.exists(path)) client.delete(path); client.createEphemeral(path, "127.0.0.1:" + port); } public void run() { ServerSocket serverSocket = null; try { serverSocket = new ServerSocket(port); regServer(); System.out.println("Server start port:" + port); Socket socket = null; while (true) { socket = serverSocket.accept(); new Thread(new ServerHandler(socket)).start(); } } catch (Exception e) { e.printStackTrace(); } finally { try { if (serverSocket != null) { serverSocket.close(); } } catch (Exception e2) { } } } } |
改造ZkServerClient
public class ZkServerClient { public static List<String> listServer = new ArrayList<String>(); public static String parent = "/test"; public static void main(String[] args) { initServer(); ZkServerClient client = new ZkServerClient(); BufferedReader console = new BufferedReader(new InputStreamReader(System.in)); while (true) { String name; try { name = console.readLine(); if ("exit".equals(name)) { System.exit(0); } client.send(name); } catch (IOException e) { e.printStackTrace(); } } } // 注册所有server public static void initServer() { // listServer.add("127.0.0.1:18080"); final ZkClient zkClient = new ZkClient("127.0.0.1:2181", 6000, 1000); List<String> children = zkClient.getChildren(parent); getChilds(zkClient, children); // 监听事件 zkClient.subscribeChildChanges(parent, new IZkChildListener() { public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { getChilds(zkClient, currentChilds); } }); } private static void getChilds(ZkClient zkClient, List<String> currentChilds) { listServer.clear(); for (String p : currentChilds) { String pathValue = (String) zkClient.readData(parent + "/" + p); listServer.add(pathValue); } serverCount = listServer.size(); System.out.println("从zk读取到信息:" + listServer.toString()); } // 请求次数 private static int reqestCount = 1; // 服务数量 private static int serverCount = 0; // 获取当前server信息 public static String getServer() { // 实现负载均衡 String serverName = listServer.get(reqestCount % serverCount); ++reqestCount; return serverName; } public void send(String name) { String server = ZkServerClient.getServer(); String[] cfg = server.split(":"); Socket socket = null; BufferedReader in = null; PrintWriter out = null; try { socket = new Socket(cfg[0], Integer.parseInt(cfg[1])); in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(), true); out.println(name); while (true) { String resp = in.readLine(); if (resp == null) break; else if (resp.length() > 0) { System.out.println("Receive : " + resp); break; } } } catch (Exception e) { e.printStackTrace(); } finally { if (out != null) { out.close(); } if (in != null) { try { in.close(); } catch (IOException e) { e.printStackTrace(); } } if (socket != null) { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } } |
DubboAdmin部署
将dubbo-admin.zip放入到TomcatWebapps目录下,修改dubbo.properties中的Zookeeper连接地址即可。
本文部分素材转载自蚂蚁课堂