一起做网店的网站重庆网站建设

当前位置: 首页 > news >正文

一起做网店的网站,重庆网站建设,烟台企业网站怎么优化,局机关建设网站的意义前言 首先Redis执行命令是单线程的#xff0c;所以可以利用Redis实现分布式锁#xff0c;而对于Redis单线程的问题#xff0c;是其线程模型的问题#xff0c;本篇重点是对目前流行的工具Redisson怎么去实现的分布式锁进行深入理解#xff1b;开始之前#xff0c;我们可以…前言 首先Redis执行命令是单线程的所以可以利用Redis实现分布式锁而对于Redis单线程的问题是其线程模型的问题本篇重点是对目前流行的工具Redisson怎么去实现的分布式锁进行深入理解开始之前我们可以下你思考一个问题Redisson的实现方式有何不同为什么 使用 引入依赖 dependencygroupIdorg.redisson/groupIdartifactIdredisson/artifactIdversion3.25.0/version /dependency添加配置 Autowiredprivate RedisProperties redisProperties;Beanpublic RedissonClient redisClient() {Config config new Config();config.setTransportMode(TransportMode.NIO);SingleServerConfig singleServerConfig config.useSingleServer();singleServerConfig.setAddress(String.format(redis://%s:%s, redisProperties.getHost(), redisProperties.getPort()));singleServerConfig.setPassword(redisProperties.getPassword());return Redisson.create(config);}加锁 private final RedissonClient redissonClient;public void lock() throws Exception {RLock lock redissonClient.getLock(PRODUCT_LOCK_KEY id);if (lock.tryLock(100, TimeUnit.MILLISECONDS)) {// 业务} else {// 业务} } 如上使用是最简单的方式Redission它底层封装了很多逻辑但如果说要redis客户端实现你要怎么实现 Redis中本就有支持分布式锁的命令setnx对应RedisTemplate中使用如下 redisTemplate.opsForValue().setIfAbsent(lockkey, , 1, TimeUnit.SECONDS);使用Redis实现分布式锁需要注意的是不要产生死锁所以使用Redis实现分布式锁有两种方式 setnx命令lua脚本 两种方式都是一个操作完成keyvalue的设置以及过期时间的设置你是否有相关他们的实现是否都一样或者说这个实现可以有其他实现方式 源码 原理 lua脚本保证多个命令的原子性采用hash数据结构key为锁的名称field是线程对应的名称也因为这个数据结构也支持可重入锁定时延时的操作避免死锁看门狗 lock() 我们先看lock()方法tryLock()和lock()底层是一样的所以我们只看lock方法 Overridepublic void lock() {try {lock(-1, null, false);} catch (InterruptedException e) {throw new IllegalStateException();}} 位置org.redisson.RedissonLock#lock(long, java.util.concurrent.TimeUnit, boolean) 传参是(-1, null, false) /** leaseTime 过期时间 unit过期时间单位 interruptibly 信号量是否打断线程 */ private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {// 取线程ID后面作为锁的一个标志long threadId Thread.currentThread().getId();// 加锁// 两个步骤// 1. 利用lua脚本设置锁与过期时间原子操作过期时间lockWatchdogTimeout 30 * 1000 30秒// 2. 执行过期时间刷新这里的刷新时利用回调的方式 延迟执行实现的定时任务lockWatchdogTimeout/3 10秒Long ttl tryAcquire(-1, leaseTime, unit, threadId);// lua脚本中加锁成功返回nil对应redis中是null加锁失败则返回的是已存在的锁的过期时间// 所以这里返回null就是加锁成功了就不再往下走了if (ttl null) {return;}// 加锁成功的在上面就已经结束了所以下面的都是加锁失败时走的// 这里它订阅了一个channel参数threadId无用别被误导了它订阅的名称时固定的// 为什么这里要订阅可以思考一下CompletableFutureRedissonLockEntry future subscribe(threadId);// 检查是否超时pubSub.timeout(future);RedissonLockEntry entry;// 这里的interruptibly应该时程序一次时是否结束而不是一直在执行中if (interruptibly) {entry commandExecutor.getInterrupted(future);} else {entry commandExecutor.get(future);}try {// 这里就是一个自旋 加锁while (true) {// 每次循环都进行加锁操作ttl tryAcquire(-1, leaseTime, unit, threadId);// 同样如果这里时null那么就是加锁成功了if (ttl null) {break;}// 加锁失败返回了锁的过期时间if (ttl 0) {try {// 信号量 unsafe.park // 信号量本地更改线程共享变量状态达到加锁的目的// unsafe.park利用park方法将加锁失败信号量更改失败的线程进行挂起// 为什么要挂起这个问题应该不用多说entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {if (interruptibly) {throw e;}entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);}} else {// 这个分支时ttl 0也就是过期时间为负数也就是锁失效了// 对本地加锁之后在下一次循环redis加锁if (interruptibly) {entry.getLatch().acquire();} else {entry.getLatch().acquireUninterruptibly();}}}} finally {unsubscribe(future, threadId);} // get(lockAsync(leaseTime, unit));} 上面的步骤就是Redisson大致逻辑 lua脚本加锁记录线程id防止非本线程解锁成功则退出添加订阅对应的channel这里的订阅是异步的 唤醒当收到channel的通知后也就是上一个锁解锁了从第6步醒来 自旋lua脚本加锁与第一步一样存在订阅后上一个锁解锁了就不用再挂起线程失败挂起unsafe.park加锁成功后取消订阅 lua脚本加锁 简单说一下这个脚本做了什么lua脚本类似js redis.call是调用redis命令第一个参数是redis的命令第二个参数是参数 它先是exists判断了key以及hash数据结构了的field是否存在 如果存在对field递增为什么要递增思考一下并设置过期时间返回返回nil 如果不存在调用redis命令pttl获取key的过期时间并返回 那如果我们自己写lua脚本呢 redis 2.6之后支持lua脚本一个脚本执行这个脚本是原子性的所以脚本里的多个命令是原子性的。 如果说要执行批量的命令可以使用piple但是管道的话它并不是原子性的他只是一次性把批量的命令发给了redis。 LUA脚本格式 eval 脚本 KEYS[1…N] ARGV[1…N] count key[1…N] argv[1…N]KEYS[1…N]key的展位符多个时序号递增从1开始 ARGV[1…N]value的展位符多个时序号递增从1开始 count是对应key输入的个数 key[1…N]对应KEYS[0…N]的key argv[1…N]对应ARGV[0…N]的value 这里就大概简单的说明了一下使得看本篇的朋友能够理解详细的还请百度 看门狗 使用过Redisson的朋友应该都听过“看门狗”为什么Redisson加锁要看门狗呢 如果我们使用lock()方法不设置过期时间那么应该是永不过期 好如果说加锁成功在解锁时出了意外如服务异常退出或宕机导致没有解锁那么这个锁就需要人工干预了这是有问题的。 所以在Redisson中它并不是永不过期当我们使用lock()方法时它的参数时-1但在最终执行lua脚本时传入了默认参数 位置org.redisson.RedissonLock#tryAcquireAsync 过期时间时internalLockLeaseTime 可以看到在初始化时它便赋予了该值一个默认的30秒 它并没有将锁设置为无限时间而是30秒又怎么保证锁的有效 所以它还有一个续约的操作对未使用unlock的锁进行时间延迟这一操作是为了保证未来某一时刻如果出现服务或其他问题导致解锁失败产生死锁这样的一个情况。 来看代码 private RFutureLong tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RFutureLong ttlRemainingFuture;// leaseTime-1表示永不过期if (leaseTime 0) {// 对应方法tryLock(过期时间, 时间单位)ttlRemainingFuture tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {// 对应方法lock()// 注意它达到参数是internalLockLeaseTime上面说了他是30秒// internalLockLeaseTime lockWatchdogTimeout 30 * 1000;ttlRemainingFuture tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}// 这里是上面ttlRemainingFuture回调结果处理// 如果出现异常就unlock解锁CompletionStageLong s handleNoSync(threadId, ttlRemainingFuture);ttlRemainingFuture new CompletableFutureWrapper(s); // 这里是定时刷新任务CompletionStageLong f ttlRemainingFuture.thenApply(ttlRemaining - {// 执行成功返回的时null所以这里以null为加锁成功的标志if (ttlRemaining null) {if (leaseTime 0) {internalLockLeaseTime unit.toMillis(leaseTime);} else {scheduleExpirationRenewal(threadId);}}return ttlRemaining;});return new CompletableFutureWrapper(f);}handleNoSync它只是针对异常做了处理正常情况下只是进行了封装而unlocakInnerAysnc也是在异常时的一个回调 return是结果封装 再回到刷新的部分 它是在加锁完后的一个回调方法ttlRemaining它就是上面执行的结果null或者是过期时间 private RFutureLong tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RFutureLong ttlRemainingFuture;// leaseTime-1表示永不过期if (leaseTime 0) {// 对应方法tryLock(过期时间, 时间单位)ttlRemainingFuture tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {// 对应方法lock()// 注意它达到参数是internalLockLeaseTime上面说了他是30秒// internalLockLeaseTime lockWatchdogTimeout 30 * 1000;ttlRemainingFuture tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}// 这里是上面ttlRemainingFuture回调结果处理// 如果出现异常就unlock解锁CompletionStageLong s handleNoSync(threadId, ttlRemainingFuture);ttlRemainingFuture new CompletableFutureWrapper(s); // 这里是定时刷新任务CompletionStageLong f ttlRemainingFuture.thenApply(ttlRemaining - {// 执行成功返回的时null所以这里以null为加锁成功的标志if (ttlRemaining null) {if (leaseTime 0) {// leaseTime 0 表示加锁时设置了过期时间// 而ternalLockLeaseTime存在默认值这里就是取消了默认值internalLockLeaseTime unit.toMillis(leaseTime);} else {// 那这里执行 过期时间的定时刷新scheduleExpirationRenewal(threadId);}}return ttlRemaining;});return new CompletableFutureWrapper(f);}过期时间刷新的步骤 创建ExpirationEntry可以看作时一个上下文对象他是延时的标识原子操作添加上下文ExpirationEntry设置当前线程id到上下文创建Timeout延时任务延时10秒延时任务执行异步 根据客户端id和锁获取上下文通过上下文获取到线程id异步执行延时lua脚本执行回调成功回调本身回调第四步失败移除上下文取消延时 protected void scheduleExpirationRenewal(long threadId) {// 1. 创建ExpirationEntry可以看作时一个上下文对象他是延时的标识ExpirationEntry entry new ExpirationEntry();// 2. 原子操作添加上下文ExpirationEntry// 要进行过期时间刷新就要先添加这个对象可以理解为一个刷新的标志ExpirationEntry oldEntry EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);// 3. 设置当前线程id到上下文if (oldEntry ! null) {// 不为空说明这个对象已经存在已经有其他线程执行了过期刷新oldEntry.addThreadId(threadId);} else {// 为空则是不存在该映射对象锁不存在entry.addThreadId(threadId);try {// 执行过期时间刷新// 4. 创建Timeout延时任务延时10秒// 5. 延时任务执行异步// 5.1 根据客户端id和锁获取上下文// 5.2 通过上下文获取到线程id// 5.3 异步执行延时lua脚本// 5.4 执行回调成功回调本身回调第四步失败移除上下文取消延时renewExpiration();} finally {if (Thread.currentThread().isInterrupted()) {cancelExpirationRenewal(threadId);}}}}下面我们进入renewExpiration方法 private void renewExpiration() {// EXPIRATION_RENEWAL_MAP 在上一层方法中添加了刷新的上下文标志ExpirationEntry ee EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee null) {// 如果这里为null就说明已经有其他线程移除了那么就是不需要刷新锁不存在了return;}// 4. 创建Timeout延时任务延时10秒// 注意这里的时间 internalLockLeaseTime / 3// 之前说过internalLockLeaseTime lockWatchdogTimeout 30 * 1000;// 所以这里是延迟10秒Timeout task getServiceManager().newTimeout(new TimerTask() {// 5. 延时任务执行异步Overridepublic void run(Timeout timeout) throws Exception {// 5.1 根据客户端id和锁获取上下文// entryName是客户端id线程id// 每次执行都要检查刷新的标志存在就说明锁还在需要执行ExpirationEntry ent EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent null) {return;}// 5.2 通过上下文获取到线程idLong threadId ent.getFirstThreadId();if (threadId null) {return;}// 5.3 异步执行延时lua脚本CompletionStageBoolean future renewExpirationAsync(threadId);// 当延时完成后执行这个方法也是个回调future.whenComplete((res, e) - {if (e ! null) {log.error(Cant update lock {} expiration, getRawName(), e);// 异常时移除刷新的标志// 也是避免死锁的一个方式EXPIRATION_RENEWAL_MAP.remove(getEntryName());return;}// 5.4 成功回调本身回调第四步失败移除上下文取消延时if (res) {// 延时成功回调当前这个方法以实现定时任务renewExpiration();} else {// 没有成功也是移除刷新的标志// 这里没有成功的情况是锁已经不存在了对应的定时任务就应该停止// 两个步骤// 1. task.cancel()通过上下文获取到任务然后取消// 2. EXPIRATION_RENEWAL_MAP.remove(getEntryName());cancelExpirationRenewal(null);}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);// 将定时任务放到entry中也就是放到了定时任务的上下文中// 在下一次时通过上下文获取到这个定时任务ee.setTimeout(task);}其实他延时的逻辑也是一个lua脚本 漏了一点 由Redisson实现JDK的Timeout类加回调完成的一个定时任务当当前任务执行完后又执行本身方法创建一个延迟任务也就实现了一个定时任务