zookeepr入门

阅读: 评论:0

zookeepr入门

zookeepr入门

汇总篇:

工作原理篇:

架构预览:

如何选主?

顺序一致性


搭建和简单使用:

.html

.html

Java Api与maven依赖

zookeeper实现分布式锁

zookeeper架构美学:

CAP理论的含义:

.html

zookeeper适合做注册中心吗:

zookeeper是最终一致性,比如3台机器,leader将写入的数据同步到了两台,这个时候还有一个可能还是会读到老数据,但是可以通过sync方法强行实现数据一致性,及CAP理论中的C。

注册中心强调高可用即A,比如eureka就是AP的,而zookeeper是CP的。

故但是zookeeper不适合做注册中心,因为不能保证高可用,如果leader宕机之后,重新选举会耗费大量的时间,这个时候是一定时间段内的不可用的。故阿里的dubbo会在客户端使用缓存来解决。但是不适合不代表不能。注册中心推进阿里的nacos(支持AP和CP模型)

ZK分布式锁有几种实现方式?各自的优缺点是什么?

Zk分布式锁有两种实现方式。
一种比较简单,应对并发量不是很大的情况。
获得锁:创建一个临时节点,比如/lock,如果成功获得锁,如果失败没获得锁,返回false;
释放锁:删除/lock节点;
锁等待:使用监听机制,监听lock节点,如果lock节点被删除,重新去抢锁,否则一直等待。
第二种方式,这种方式比第一种复杂点,但解决了羊群效应问题。
获得锁:创建临时带序号的节点,排序,判断创建的节点是否是当前目录下最小的,如果最小获得锁结束;
如果不是,获得当前节点的前面一个节点名称,进入锁等待;
释放锁:删除创建的临时带序号节点;
锁等待:获取第一步的获的前一个节点名称,使用监听机制,监听这节点,当这个节点被删除的时候,重新去抢锁。

ZK的哪些基础能力构建出了哪些上层应用

福哥口诀法:数负命Ma集配分(使用场景:数据发布订阅、负载均衡、命名服务、Master 选举、集群管理、配置管理、分布式队列和分布式锁)
数据发布订阅:dubbo的rpc。
负载均衡:动态dns。
命名服务:全局唯一id。
Master 选举:搜索系统、Hbase。
集群管理:分布式日志收集系统。
配置管理:kafka、storm。
分布式队列。
分布式锁。
分布式协调:心跳检测、工作进度汇报和系统调度。
HA高可用性:hadoop、hdfs、yarn。

ZK宕机怎么办?

zookeeper节点宕机如何处理?

参考答案:
Zookeeper 本身也是集群,推荐配置不少于 3 个服务器。Zookeeper 自身也要保证当一个节点宕机时,其他节点会继续提供服务。

如果是一个 Follower 宕机,还有 2 台服务器提供访问,因为 Zookeeper 上的数据是有多个副本的,数据并不会丢失;

如果是一个 Leader 宕机,Zookeeper 会选举出新的 Leader。

ZK 集群的机制是只要超过半数的节点正常,集群就能正常提供服务。只有在 ZK节点挂得太多,只剩一半或不到一半节点能工作,集群才失效。

所以
3 个节点的 cluster 可以挂掉 1 个节点(leader 可以得到 2 票>1.5)
2 个节点的 cluster 就不能挂掉任何 1 个节点了(leader 可以得到 1 票<=1)


/*** 程序启动监听**/
@Component
public class SpringApplicationListener implements ApplicationRunner {private static final Logger logger = Logger(SpringApplicationListener.class);@Value("${zk.host}")private String zkHosts;@Value("${zk.user}")private String zkUser;@Value("${zk.password}")private String zkPassword;@Value("${zk.custom.path}")private String zkPath;private volatile ZooKeeper zk;@Overridepublic void run(ApplicationArguments args) throws Exception {//监听xxx_server的akka注册zk地址initialZookeeperListener();}private void initialZookeeperListener() {try {getZkClient();updateServers();} catch (Exception ex) {("error in zookeeper listener: {}", ex.getMessage(), ex);}}private void getZkClient() throws Exception {synchronized (this) {if (zk == null) {synchronized (this) {zk = new ZooKeeper(zkHosts, 3000, event -> {if (Type() == Watcher.Event.EventType.None) {return;}try {// 获取新的服务器列表,重新注册监听updateServers();} catch (Exception e) {("error in getZkClient:{}", e.getMessage(), e);}});}}}}private void updateServers() throws KeeperException, InterruptedException {zk.addAuthInfo("digest", (zkUser + ":" + zkPassword).getBytes());List<String> children = zk.getChildren(zkPath, true, null);if (CollectionUtils.isEmpty(children)) {logger.warn("zk node not has children: {}", zkPath);return;}String text;boolean first = true;for (String child : children) {String path = zkPushServerPath + "/" + child;byte[] raw = zk.getData(path, true, null);logger.warn("watch zk {} data: {}", path, new String(raw));//todo 业务逻辑first = false;}}
}
zk.host=192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181
zk.user=admin
zk.password=123456
zk.custom.path=/test_path

一、什么是zookeeper,有什么用

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,它是集群的管理者,监视着集群中各个节点的状态根据节点提交的反馈进行下一步合理操作。最终,将简单易用的接口和性能高效、功能稳定的系统提供给用户(来自百度百科)。

其主要的功能有

1.命名服务 2.配置管理 3.集群管理 4.分布式锁 5.队列管理

二、zookeeper的单机部署

1.下载并解压 zookeeper-3.4.

2.将conf目录下zoo_sample.cfg配置文件修改为

3.修改

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
#数据存放的位置(自主配置)
dataDir=/tmp/zookeeper/data
#日志存放的位置(新加的配置,默认在zookeeper的bin目录下的zookeeper.out)
dataLogDir=/tmp/zookeeper/logs
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# .html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
~

三、zookeeper集群搭建(伪集群)

1.在服务器上解压三份zookeeper

2.分别将conf目录下zoo_sample.cfg配置文件修改为f,并修改f配置文件

由于是在一台服务器上做测试,所以为了避免端口冲突,修改了端口

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/tmp/zookeeper0/data
# the port at which the clients will connect
clientPort=2180
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# .html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1server.0=localhost:2287:3387
server.1=localhost:2288:3388
server.2=localhost:2289:3389
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/tmp/zookeeper1/data
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# .html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1server.0=localhost:2287:3387
server.1=localhost:2288:3388
server.2=localhost:2289:3389
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/tmp/zookeeper2/data
# the port at which the clients will connect
clientPort=2182
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# .html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1server.0=localhost:2287:3387
server.1=localhost:2288:3388
server.2=localhost:2289:3389

4.分别启动几个zookeeper,这样集群就搭建完成。

四、命令行操作zookeeper

  1.启动zookeeper : ./zkServer.sh start2.关闭zookeeper: ./zkServer.sh stop3.客户端连接 ./zkCli.sh -server localhost:21814.查询当前zookeeper的状态 ./zkServer.sh status5.客户端连接上zookeeper后,可以使用help命令来查询zookeeper的命令6.关闭与服务端的连接 : close7.连接服务端:connect 127.0.0.1:21818.创建节点 create /name value9.获取节点的信息 get /name10.列出节点 ls /name11.删除节点信息 delete /name12.列出节点 ls2 /name  是ls的加强版13.列出历史执行命令  history14.重新执行某个命令和history结合使用  redo 2015.sync 强制同步16.stat 查看节点信息17.显示配额 listquota /name18.设置配额 setquota /name19.删除配额 delquota /name20.addauth命令用于节点认证,使用方式:如addauth digest username:password21.setAcl命令用于设置节点AclAcl由三部分构成:1为scheme,2为user,3为permission,一般情况下表示为 `scheme:id:permissions`22. 获取节点的Acl,如getAcl /node112.退出客户端 quit

五、zookeeper的使用


import java.lang.management.ManagementFactory;import keeper.CreateMode;
import keeper.WatchedEvent;
import keeper.Watcher;
import keeper.ZooDefs.Ids;
import keeper.ZooKeeper;/*** 模拟集群服务器连接*/
public class ClusterClient implements Watcher, Runnable {private static String membershipRoot = "/Members";final ZooKeeper zk;public ClusterClient(String hostPort, String processId) throws Exception {zk = new ZooKeeper(hostPort, 2000, this);if (zk != null) {zk.create(membershipRoot + '/' + processId, Bytes(), Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);}}public synchronized void close() {try {zk.close();} catch (InterruptedException e) {e.printStackTrace();}}@Overridepublic void process(WatchedEvent event) {}public void run() {try {synchronized (this) {while (true) {wait();}}} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();} finally {this.close();}}public static void main(String[] args) throws Exception {String hostPort = "111.230.239.152:2180,111.230.239.152:2181,111.230.239.152:2182";String name = RuntimeMXBean().getName();String processId = name.substring(0, name.indexOf('@'));new ClusterClient(hostPort, processId).run();}
}

import java.io.IOException;
import java.util.List;import keeper.CreateMode;
import keeper.KeeperException;
import keeper.WatchedEvent;
import keeper.Watcher;
import keeper.ZooDefs.Ids;
import keeper.ZooKeeper;/***模拟集群监控**/
public class ClusterMonitor {private static String membershipRoot = "/Members";private final Watcher connectionWatcher;private final Watcher childrenWatcher;private ZooKeeper zk;boolean alive = true;public ClusterMonitor(String HostPort) throws Exception {connectionWatcher = new Watcher() {@Overridepublic void process(WatchedEvent event) {if (Type() == Watcher.Event.EventType.None&& State() == Watcher.Event.KeeperState.SyncConnected) {System.out.println("Client connect success !!!");}}};childrenWatcher = new Watcher() {@Overridepublic void process(WatchedEvent event) {if (Type() == Event.EventType.NodeChildrenChanged) {List<String> children;try {children = zk.getChildren(membershipRoot, this);System.out.println("Cluster Membership Change!!!");System.out.println("Members: " + children);}catch (Exception e) {e.printStackTrace();}}}};zk = new ZooKeeper(HostPort, 2000, connectionWatcher);if (zk.exists(membershipRoot, false) == null) {zk.create(membershipRoot, "ClusterMonitorRoot".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}List<String> children = zk.getChildren(membershipRoot, childrenWatcher);println("Members: " + children);}public synchronized void close() {try {zk.close();} catch (InterruptedException e) {e.printStackTrace();}}public void run() {try {synchronized (this) {while (alive) {wait();}}} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();} finally {this.close();}}public static void main(String[] args) throws Exception {String hostPort = "111.230.239.152:2180,111.230.239.152:2181,111.230.239.152:2182";new ClusterMonitor(hostPort).run();}
}

linux客户端:

[zk: localhost:2181(CONNECTED) 3] create /test data
Created /test
[zk: localhost:2181(CONNECTED) 2] getAcl /test
'world,'anyone
: cdrwa
[zk: localhost:2181(CONNECTED) 3] addauth digest user:password
[zk: localhost:2181(CONNECTED) 4] setAcl /test auth:user:password:cdrwa
[zk: localhost:2181(CONNECTED) 5] getAcl /test
'digest,'user:V28q/NynI4JI3Rk54h0r8O5kMug=
: cdrwa    
[zk: localhost:2181(CONNECTED) 0] ls /test
Authentication is not valid : /test
[zk: localhost:2181(CONNECTED) 4] addauth digest user:password
[zk: localhost:2181(CONNECTED) 5] ls /test
[] 
[zk: localhost:2181(CONNECTED) 6] create /test/leaf data
Created /test/leaf
[zk: localhost:2181(CONNECTED) 7] getAcl /test/leaf
'world,'anyone
: cdrwa

import urrent.CountDownLatch;
import keeper.WatchedEvent;
import keeper.Watcher;
import keeper.Watcher.Event.EventType;
import keeper.Watcher.Event.KeeperState;
import keeper.ZooKeeper;
import keeper.data.Stat;/*** 分布式配置中心demo* @author **/
public class ZooKeeperProSync implements Watcher {private static CountDownLatch connectedSemaphore = new CountDownLatch(1);private static ZooKeeper zk = null;private static Stat stat = new Stat();public static void main(String[] args) throws Exception {//zookeeper配置数据存放路径String path = "/username";//连接zookeeper并且注册一个默认的监听器zk = new ZooKeeper("192.168.31.100:2181", 5000, //new ZooKeeperProSync());//等待zk连接成功的通知connectedSemaphore.await();//获取path目录节点的配置数据,并注册默认的监听器System.out.println(new Data(path, true, stat)));Thread.sleep(Integer.MAX_VALUE);}public void process(WatchedEvent event) {if (KeeperState.SyncConnected == State()) {  //zk连接成功通知事件if (EventType.None == Type() && null == Path()) {untDown();} else if (Type() == EventType.NodeDataChanged) {  //zk目录节点数据变化通知事件try {System.out.println("配置已修改,新值为:" + new Path(), true, stat)));} catch (Exception e) {}}}}
}

本文发布于:2024-02-02 16:07:32,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170686125244915.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:入门   zookeepr
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23