网站建设 互成网络网页界面设计作品推荐

当前位置: 首页 > news >正文

网站建设 互成网络,网页界面设计作品推荐,网站怎么做超链接,seo教程技术资源文章目录 服务器动态上下线监听案例需求需求分析具体实现测试 Zookeeper分布式锁案例原生Zookeeper实现分布式锁Curator框架实现分布式锁 Zookeeper面试重点选举机制生产集群安装多少zk合适zk常用命令 服务器动态上下线监听案例 需求 某分布式系统中#xff0c;主节点可以有… 文章目录 服务器动态上下线监听案例需求需求分析具体实现测试 Zookeeper分布式锁案例原生Zookeeper实现分布式锁Curator框架实现分布式锁 Zookeeper面试重点选举机制生产集群安装多少zk合适zk常用命令 服务器动态上下线监听案例 需求 某分布式系统中主节点可以有多台可以动态上下线任意一台客户端都能实时感知 到主节点服务器的上下线。 需求分析 具体实现 1先在集群上创建/servers节点 [zk: localhost:2181(CONNECTED) 10] create /servers servers Created /servers2在Idea中创建包名com.yudan.case1 3服务器端向Zookeeper注册代码 import org.apache.zookeeper.;import java.io.IOException;public class DistributeServer {private String connectString hadoop102:2181,hadoop103:2181,hadoop104:2181;private int sessionTime 100000;private ZooKeeper zk;public static void main(String[] args) throws IOException, InterruptedException, KeeperException {DistributeServer server new DistributeServer();// 1、获取zk连接server.getConnect();// 2、注册服务器到zk集群server.regist(args[0]);// 3、启动 业务逻辑睡觉server.business();}// 创建到 zk 的客户端连接private void getConnect() throws IOException {zk new ZooKeeper(connectString, sessionTime, new Watcher() {Overridepublic void process(WatchedEvent watchedEvent) {}});}// 注册到服务器private void regist(String hostname) throws InterruptedException, KeeperException {String create zk.create(/servers/hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);System.out.println(hostname is online);}// 业务功能private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);} }4客户端代码 import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper;import java.io.IOException; import java.util.ArrayList; import java.util.List;public class DistributeClient {private String connectString hadoop102:2181,hadoop103:2181,hadoop104:2181;private int sessionTime 100000;private ZooKeeper zk;public static void main(String[] args) throws IOException, InterruptedException, KeeperException {DistributeClient client new DistributeClient();// 1、获取zk连接client.getConnect();// 2、监听/servers下面子节点的增加和删除client.getServersList();// 3、业务逻辑睡觉client.business();}// 创建到 zk 的客户端连接private void getConnect() throws IOException {zk new ZooKeeper(connectString, sessionTime, new Watcher() {Overridepublic void process(WatchedEvent watchedEvent) {// 再次启动监听try {getServersList();} catch (InterruptedException e) {throw new RuntimeException(e);} catch (KeeperException e) {throw new RuntimeException(e);}}});}// 获取服务器列表信息private void getServersList() throws InterruptedException, KeeperException {// 获取服务器子节点信息并且对父节点进行监听ListString children zk.getChildren(/servers, true);// 存储服务器信息列表 ArrayListString servers new ArrayList();// 遍历所有节点获取节点中的主机名称信息 for (String child : children) {byte[] data zk.getData(/servers/ child, false, null);servers.add(new String(data));}// 打印System.out.println(servers);}private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);} }测试 1在Linux命令行上操作增加减少服务器 1启动DistributeClient 客户端 2在hadoop102上zk的客户端/servers目录上创建临时带序号节点 [zk: localhost:2181(CONNECTED) 1] create -e -s /servers/hadoop102 hadoop102 [zk: localhost:2181(CONNECTED) 2] create -e -s /servers/hadoop103 hadoop1033观察Idea控制台变化 [hadoop102, hadoop103]4执行删除操作 [zk: localhost:2181(CONNECTED) 8] delete /servers/hadoop1020000000000 5观察Idea控制台变化 [hadoop103] 2在Idea上操作增加减少服务器 1启动DistributeClient 客户端如果已经启动过不需要重启 2启动DistributeServer 服务 点击Edit Configurations… 在弹出的窗口中Program arguments输入想启动的主机例如hadoop102 回到DistributeServer的main方法右键在弹出的窗口中点击Run “DistributeServer.main()”观察DistributeServer控制台提示hadoop102 is online观察DistributeClient控制台提示hadoop102已经上线 Zookeeper分布式锁案例 什么叫做分布式锁呢 比如说进程1在使用该资源的时候会先去获得锁进程1获得锁以后会对该资源保持独占这样其他进程就无法访问该资源进程1用完该资源以后就将锁释放掉让其他进程来获得锁那么通过这个锁机制我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。
原生Zookeeper实现分布式锁 1分布式锁实现 import org.apache.zookeeper.
; import org.apache.zookeeper.data.Stat;import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch;public class DistributeLock {private final String connectString hadoop102:2181,hadoop103:2181,hadoop104:2181;private final int sessionTime 100000;private final ZooKeeper zk;// 当前client等待的子节点private String waitPath;// zookeeper节点等待private CountDownLatch waitLatch new CountDownLatch(1);// zookeeper连接private CountDownLatch connectLatch new CountDownLatch(1);// 当前client创建的子节点private String currentMode;// 和 zk 服务建立连接并创建根节点public DistributeLock() throws IOException, InterruptedException, KeeperException {// 1、获取连接zk new ZooKeeper(connectString, sessionTime, new Watcher() {Overridepublic void process(WatchedEvent watchedEvent) {// connectLatch 如果连接上zk 可以释放// 连接建立时, 打开latch, 唤醒wait在该latch上的线程if (watchedEvent.getState() Event.KeeperState.SyncConnected) {connectLatch.countDown();}// waitLatch 需要释放// 发生了waitPath的删除事件if (watchedEvent.getType() Event.EventType.NodeDeleted watchedEvent.getPath().equals(waitPath)) {waitLatch.countDown();}}});// 等待 zookeeper正常连接后往下走程序connectLatch.await();// 2、判断根节点/locks是否存在Stat stat zk.exists(/locks, false);if (stat null) {// 创建一下根节点zk.create(/locks,locks.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);}}// 对zk加锁public void zkLock() {// 创建对应的临时带序号节点try {currentMode zk.create(/locks/ seq-, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);// 判断创建的节点是否是最小的序号节点如果是获取到锁如果不是, 监听他序号的前一个节点ListString children zk.getChildren(/locks, false);// 如果children 只有一个值那就直接获取锁如果有多个节点需要判断哪个节点最小if (children.size() 1) {return;} else {// 对children集合内的节点进行排序Collections.sort(children);// 获取节点名称 seq-String thisNode currentMode.substring(/locks/.length());// 通过seq- 获取到该节点在children集合中的位置int index children.indexOf(thisNode);// 判断if (index -1) {System.out.println(数据异常);} else if (index 0) {// 就一个节点可以获取锁了return;}else {// 需要监听前一个节点waitPath /locks/ children.get(index-1);zk.getData(waitPath,true,null);// 等待监听waitLatch.await();return;}}} catch (KeeperException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);}}// 对zk解锁public void unzkLock() {// 删除节点try {zk.delete(currentMode,-1);} catch (InterruptedException e) {throw new RuntimeException(e);} catch (KeeperException e) {throw new RuntimeException(e);}} }2分布式锁测试 1创建两个线程 import org.apache.zookeeper.KeeperException;import java.io.IOException;public class DistributeLockTest {public static void main(String[] args) throws IOException, InterruptedException, KeeperException {// 创建分布式锁1final DistributeLock lock1 new DistributeLock();// 创建分布式锁2final DistributeLock lock2 new DistributeLock();new Thread(new Runnable() {Overridepublic void run() {// 获取锁对象try {lock1.zkLock();System.out.println(线程1 启动,获取到锁);Thread.sleep(5 * 1000);lock1.unzkLock();System.out.println(线程1 释放锁);} catch (InterruptedException e) {throw new RuntimeException(e);}}}).start();new Thread(new Runnable() {Overridepublic void run() {// 获取锁对象try {lock2.zkLock();System.out.println(线程2 启动,获取到锁);Thread.sleep(5 * 1000);lock2.unzkLock();System.out.println(线程2 释放锁);} catch (InterruptedException e) {throw new RuntimeException(e);}}}).start();} }2观察控制台变化 线程1获取锁 线程1释放锁 线程2获取锁 线程2释放锁Curator框架实现分布式锁 1原生的Java API开发存在的问题 1会话连接是异步的需要自己去处理。比如使用CountDownLatch 2Watch需要重复注册不然就不能生效 3开发的复杂性还是比较高的 4不支持多节点删除和创建。需要自己去递归 2Curator是一个专门解决分布式锁的框架解决了原生Java API开发分布式遇到的问题。 详情请查看官方文档https://curator.apache.org/index.html
3Curator 案例实操 1添加依赖 dependency groupIdorg.apache.curator/groupId artifactIdcurator-framework/artifactId version4.3.0/version /dependency dependency groupIdorg.apache.curator/groupId artifactIdcurator-recipes/artifactId version4.3.0/version /dependency dependency groupIdorg.apache.curator/groupId artifactIdcurator-client/artifactId version4.3.0/version /dependency 2代码实现 import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry;public class CuratorLockTest {public static void main(String[] args) {// 创建分布式锁1InterProcessMutex lock1 new InterProcessMutex(getCuratorFramework(), /locks);// 创建分布式锁2InterProcessMutex lock2 new InterProcessMutex(getCuratorFramework(), /locks);new Thread(new Runnable() {Overridepublic void run() {try {lock1.acquire();System.out.println(线程1 获取到锁);lock1.acquire();System.out.println(线程1 再次获取到锁);Thread.sleep(5 * 1000);lock1.release();System.out.println(线程1 释放锁);lock1.release();System.out.println(线程1 再次释放锁);} catch (Exception e) {throw new RuntimeException(e);}}}).start();new Thread(new Runnable() {Overridepublic void run() {try {lock2.acquire();System.out.println(线程2 获取到锁);lock2.acquire();System.out.println(线程2 再次获取到锁);Thread.sleep(5 * 1000);lock2.release();System.out.println(线程2 释放锁);lock2.release();System.out.println(线程2 再次释放锁);} catch (Exception e) {throw new RuntimeException(e);}}}).start();}// 分布式锁初始化private static CuratorFramework getCuratorFramework() {// 重试策略初始时间3秒重试3次ExponentialBackoffRetry policy new ExponentialBackoffRetry(3000, 3);CuratorFramework client CuratorFrameworkFactory.builder().connectString(hadoop102:2181,hadoop103:2181,hadoop104:2181).connectionTimeoutMs(100000).sessionTimeoutMs(100000).retryPolicy(policy).build();// 启动客户端client.start();System.out.println(zookeeper 启动成功);return client;} } 2观察控制台变化 线程1获取锁 线程1再次获取锁 线程1释放锁 线程1再次释放锁 线程2获取锁 线程2再次获取锁 线程2释放锁 线程2再次释放锁Zookeeper面试重点 选举机制 半数机制超过半数的投票通过即通过。 1第一次启动选举规则 投票过半数时服务器id大的胜出 2第二次启动选举规则 ①EPOCH大的直接胜出
②EPOCH相同事务id大的胜出
③事务id相同服务器id大的胜出 生产集群安装多少zk合适 安装奇数台。
生产经验 10台服务器3台zk20台服务器5台zk100台服务器11台zk200台服务器11台zk zk常用命令 ls、get、create、delete