经典微网站爱客crm系统
- 作者: 五速梦信息网
- 时间: 2026年03月21日 10:35
当前位置: 首页 > news >正文
经典微网站,爱客crm系统,wordpress网站从零,郑州网站建设技术大纲 1.标志位修改场景优先使用volatile(服务优雅停机) 2.数值递增场景优先使用Atomic类(心跳计数器) 3.共享变量仅对当前线程可见的场景优先使用ThreadLocal(edits log的处理) 4.读多写少需要加锁的场景优先使用读写锁(服务注册表的并发读写) 5.尽量减少线程对锁占用时间…大纲 1.标志位修改场景优先使用volatile(服务优雅停机) 2.数值递增场景优先使用Atomic类(心跳计数器) 3.共享变量仅对当前线程可见的场景优先使用ThreadLocal(edits log的处理) 4.读多写少需要加锁的场景优先使用读写锁(服务注册表的并发读写) 5.尽量减少线程对锁占用时间(edits log分段加锁) 6.尽量减少线程对数据加锁的粒度(库存分段扣减) 7.尽量按不同功能进行锁分离(避免循环中加锁) 8.尽量减少高并发下线程对锁的竞争(多级缓存) 9.锁故障之死锁总结 10.无锁化编程 1.标志位修改场景优先使用volatile(服务优雅停机) (1)标志位修改等可见性场景优先使用volatile (2)通过volatile标志位实现服务优雅停机的案例 (1)标志位修改等可见性场景优先使用volatile 分布式系统、中间件系统都需要开启大量的线程进行处理。多线程访问共享变量时首先需要明确是并发写还是并发读。 如果仅仅只有一个线程会来写一个共享变量比如标志位。另外多个线程会来读取这个标志位的值那么此时优先使用volatile。 所以锁优化的原则是能不用锁就尽量不用锁。因为可能会导致锁争用和锁冲突从而导致系统吞吐量、性能大幅度降低。 (2)通过volatile标志位实现服务优雅停机的案例 比如下面的shutdown()方法就利用了volatile完成锁优化。服务优雅停机中会使用一个isRunning标志位该标志位会被心跳线程和批量拉取注册表的线程共享使用。但是只有某个线程执行shutdown()方法时才会修改isRunning标志位。其他情况都是由心跳线程和批量拉取注册表的线程读取isRunning标志位所以使用volatile。 //服务启动时负责与register-server进行通信 public class RegisterClient {public static final String SERVICE_NAME inventory-service;public static final String IP 192.168.31.207;public static final String HOSTNAME inventory01;public static final int PORT 9000;private static final Long HEARTBEAT_INTERVAL 30 * 1000L;//服务实例IDprivate String serviceInstanceId;//HTTP通信组件private HttpSender httpSender;//心跳线程private HeartbeatWorker heartbeatWorker;//服务实例是否在运行private volatile Boolean isRunning;//客户端缓存的注册表private CachedServiceRegistry registry;public RegisterClient() {this.serviceInstanceId UUID.randomUUID().toString().replace(-, );this.httpSender new HttpSender();this.heartbeatWorker new HeartbeatWorker();this.isRunning true;this.registry new CachedServiceRegistry(this, httpSender); }//启动RegisterClient组件public void start() {try {//开启一个线程去完成注册//如果注册完成后就进入while循环每隔30秒就发送一个心跳请求RegisterWorker registerWorker new RegisterWorker();registerWorker.start();registerWorker.join();//启动心跳线程定时发送心跳信息heartbeatWorker.start();//初始化客户端缓存的服务注册表组件this.registry.initialize();} catch (Exception e) {e.printStackTrace();}}//停止RegisterClient组件public void shutdown() {this.isRunning false;this.heartbeatWorker.interrupt();//通过interrupt中断心跳线程this.registry.destroy();//通过interrupt打断拉取注册表线程this.httpSender.cancel(SERVICE_NAME, serviceInstanceId);//通知服务端下线}//服务注册线程private class RegisterWorker extends Thread {Overridepublic void run() {RegisterRequest registerRequest new RegisterRequest();registerRequest.setServiceName(SERVICE_NAME);registerRequest.setIp(IP); registerRequest.setHostname(HOSTNAME);registerRequest.setPort(PORT); registerRequest.setServiceInstanceId(serviceInstanceId);RegisterResponse registerResponse httpSender.register(registerRequest);System.out.println(服务注册的结果是 registerResponse.getStatus() ……); }}//心跳线程private class HeartbeatWorker extends Thread {Overridepublic void run() {//如果注册成功就进入while循环HeartbeatRequest heartbeatRequest new HeartbeatRequest();heartbeatRequest.setServiceName(SERVICE_NAME); heartbeatRequest.setServiceInstanceId(serviceInstanceId);HeartbeatResponse heartbeatResponse null;while (isRunning) {try {heartbeatResponse httpSender.heartbeat(heartbeatRequest);System.out.println(心跳的结果为 heartbeatResponse.getStatus() ……);Thread.sleep(HEARTBEAT_INTERVAL); } catch (Exception e) { e.printStackTrace();}}}}//返回RegisterClient是否正在运行给批量拉取注册表的后台线程调用public Boolean isRunning() {return isRunning;} } 2.数值递增场景优先使用Atomic类(心跳计数器) (1)数值递增场景优先使用Atomic原子类 (2)服务心跳计数器使用Atomic原子类的案例 (1)数值递增场景优先使用Atomic原子类 多线程并发访问共享变量时首先判断变量是否只需保证可见性即可如果是则使用volatile优化。然后判断是否要保证原子性如果是则判断是简单数值累加还是变更操作。如果只需要简单进行数值累加那么则建议使用Atomic原子类否则才使用synchronized或者ReentrantLock这种重量级锁。Atomic原子类基于CAS机制实现了无锁化编程并发性比synchronized好。 (2)服务心跳计数器使用Atomic原子类的案例 服务保护机制中有个心跳测量计数器 //心跳测量计数器 public class HeartbeatCounter {//单例实例private static HeartbeatCounter instance new HeartbeatCounter();//最近一分钟的心跳次数private AtomicLong latestMinuteHeartbeatRate new AtomicLong(0L); //最近一分钟的时间戳private long latestMinuteTimestamp System.currentTimeMillis();private HeartbeatCounter() {Daemon daemon new Daemon();daemon.setDaemon(true); daemon.start();}//获取单例实例public static HeartbeatCounter getInstance() {return instance;}//增加一次最近一分钟的心跳次数//一旦加锁了后就会导致大量的线程并发的更新心跳次数从而导致加锁串行化//进而导致多线程并发性能的降低public /synchronized*/ void increment() {//如果服务实例很多比如1万个服务实例那么每秒可能都有很多个请求过来更新心跳//如果在这里加了synchronized的话会影响并发的性能//但是换成了AtomicLong原子类之后不加锁无锁化CAS操作保证原子性还可以多线程并发latestMinuteHeartbeatRate.incrementAndGet();}//获取最近一分钟的心跳次数public /synchronized*/ long get() {return latestMinuteHeartbeatRate.get();}private class Daemon extends Thread {Overridepublic void run() {while(true) {try {long currentTime System.currentTimeMillis();if (currentTime - latestMinuteTimestamp 60 * 1000) {while(true) {Long expectedValue latestMinuteHeartbeatRate.get();if (latestMinuteHeartbeatRate.compareAndSet(expectedValue, 0L)) {break;}}latestMinuteTimestamp System.currentTimeMillis();}Thread.sleep(1000); } catch (Exception e) {e.printStackTrace();}}}} } 3.共享变量仅对当前线程可见的场景优先使用ThreadLocal(edits log的处理) (1)共享变量仅对当前线程可见的场景优先使用ThreadLocal (2)分布式存储系统处理edits log时使用ThreadLocal (1)共享变量仅对当前线程可见的场景优先使用ThreadLocal 在多线程环境中当多个线程同时访问某个共享变量时如果希望对共享变量的操作仅对当前线程可见那么可使用ThreadLocal。 ThreadLocal会为每个线程提供独立的存储空间来存储共享变量的副本每个线程只会对共享变量的副本进行操作且该操作对其他线程不可见。 volatile、Atomic、ThreadLocal是锁优化的三大利器应当优先考虑。如果不能使用它们再考虑synchronized和ReentrantLock等这些重量级锁。 (2)分布式存储系统处理edits log时使用ThreadLocal //负责管理edits log日志的核心组件 public class FSEditlog {…//每个线程自己本地的txid副本private ThreadLocalLong localTxid new ThreadLocalLong();//记录edits log日志public void logEdit(String content) {//这里必须直接加锁有线程执行logSync()方法时这里没有其他线程能进来synchronized(this) {//获取全局唯一递增的txid代表了edits log的序号txidSeq;long txid txidSeq;localTxid.set(txid);//构造一条edits log对象EditLog log new EditLog(txid, content);//将edits log写入内存缓冲中不是直接刷入磁盘文件editLogBuffer.write(log); }logSync();}… } 4.读多写少需要加锁的场景优先使用读写锁(服务注册表的并发读写) (1)读多写少需要加锁的场景优先使用读写锁 (2)服务注册表的并发读写场景使用读写锁的案例 (1)读多写少需要加锁的场景优先使用读写锁 如果volatile、Atomic、ThreadLocal都不适用这时多线程并发访问一块共享数据就需要加锁了。但是此时会优先考虑使用读写锁而不是使用synchronized重量级锁。 比如在读多写少的场景下可以进行读写锁分离。读锁 - 大量的线程可以并发的读 写锁 - 有线程写数据时其他线程不能同时来写也不能同时来读。 (2)服务注册表的并发读写场景使用读写锁的案例 服务注册中心通过如下两种方式处理服务注册表 一.每隔30秒会拉取增量注册表 二.每隔30秒会处理心跳请求 拉取增量注册表属于读操作发现心跳异常时会更新注册表属于写操作。很明显大部分情况下都不会更新注册表数据所以这是读多写少的场景。 如果读取注册表和更新注册表使用synchronized重量级锁那么将会导致大量的读取注册表的操作都是串行化的从而降低服务注册中心的并发性能。 所以对服务注册表的读写操作使用读写锁进行分离。读取服务注册表时使用读锁更新服务注册表时使用写锁这样就能保证读操作都能并发执行。 //服务注册表 public class ServiceRegistry {public static final Long RECENTLY_CHANGED_ITEM_CHECK_INTERVAL 3000L;public static final Long RECENTLY_CHANGED_ITEM_EXPIRED 3 * 60 * 1000L;//注册表是一个单例private static ServiceRegistry instance new ServiceRegistry();//核心的内存数据结构注册表Mapkey是服务名称value是这个服务的所有的服务实例private MapString, MapString, ServiceInstance registry new HashMapString, MapString, ServiceInstance();//最近变更的服务实例的队列private LinkedListRecentlyChangedServiceInstance recentlyChangedQueue new LinkedListRecentlyChangedServiceInstance();//服务注册表的锁private ReentrantReadWriteLock lock new ReentrantReadWriteLock();private ReadLock readLock lock.readLock();private WriteLock writeLock lock.writeLock();//构造函数private ServiceRegistry() {//启动后台线程监控最近变更的队列RecentlyChangedQueueMonitor recentlyChangedQueueMonitor new RecentlyChangedQueueMonitor();recentlyChangedQueueMonitor.setDaemon(true); recentlyChangedQueueMonitor.start();}//获取服务注册表的单例实例public static ServiceRegistry getInstance() {return instance;}//加读锁public void readLock() {this.readLock.lock();}//释放读锁public void readUnlock() {this.readLock.unlock();}//加写锁public void writeLock() {this.writeLock.lock();}//释放写锁public void writeUnlock() {this.writeLock.unlock();}//服务注册public void register(ServiceInstance serviceInstance) {try {//加写锁this.writeLock();System.out.println(服务注册……【 serviceInstance 】);//将服务实例放入最近变更的队列中RecentlyChangedServiceInstance recentlyChangedItem new RecentlyChangedServiceInstance(serviceInstance, System.currentTimeMillis(), ServiceInstanceOperation.REGISTER);recentlyChangedQueue.offer(recentlyChangedItem);System.out.println(最近变更队列 recentlyChangedQueue);//将服务实例放入注册表中MapString, ServiceInstance serviceInstanceMap registry.get(serviceInstance.getServiceName());if (serviceInstanceMap null) {serviceInstanceMap new HashMapString, ServiceInstance();registry.put(serviceInstance.getServiceName(), serviceInstanceMap);}serviceInstanceMap.put(serviceInstance.getServiceInstanceId(), serviceInstance);System.out.println(注册表 registry); } finally {this.writeUnlock();}}//从注册表删除一个服务实例public void remove(String serviceName, String serviceInstanceId) {try {//加写锁this.writeLock();System.out.println(服务摘除【 serviceName , serviceInstanceId 】);//获取服务实例MapString, ServiceInstance serviceInstanceMap registry.get(serviceName);ServiceInstance serviceInstance serviceInstanceMap.get(serviceInstanceId);//将服务实例变更信息放入队列中RecentlyChangedServiceInstance recentlyChangedItem new RecentlyChangedServiceInstance(serviceInstance, System.currentTimeMillis(), ServiceInstanceOperation.REMOVE);recentlyChangedQueue.offer(recentlyChangedItem);System.out.println(最近变更队列 recentlyChangedQueue);//从服务注册表删除服务实例serviceInstanceMap.remove(serviceInstanceId);System.out.println(注册表 registry); } finally {this.writeUnlock();}}//获取服务实例public ServiceInstance getServiceInstance(String serviceName, String serviceInstanceId) {try {this.readLock();MapString, ServiceInstance serviceInstanceMap registry.get(serviceName);return serviceInstanceMap.get(serviceInstanceId);} finally {this.readUnlock(); }}//获取整个注册表public MapString, MapString, ServiceInstance getRegistry() {return registry;}//获取最近有变化的注册表public DeltaRegistry getDeltaRegistry() {Long totalCount 0L;for (MapString, ServiceInstance serviceInstanceMap : registry.values()) {totalCount serviceInstanceMap.size();}DeltaRegistry deltaRegistry new DeltaRegistry(recentlyChangedQueue, totalCount);return deltaRegistry;}//最近变化的服务实例class RecentlyChangedServiceInstance {//服务实例ServiceInstance serviceInstance;//发生变更的时间戳Long changedTimestamp;//变更操作String serviceInstanceOperation;public RecentlyChangedServiceInstance(ServiceInstance serviceInstance, Long changedTimestamp, String serviceInstanceOperation) {this.serviceInstance serviceInstance;this.changedTimestamp changedTimestamp;this.serviceInstanceOperation serviceInstanceOperation;}Overridepublic String toString() {return RecentlyChangedServiceInstance [serviceInstance serviceInstance , changedTimestamp changedTimestamp , serviceInstanceOperation serviceInstanceOperation ];}}//服务实例操作class ServiceInstanceOperation {public static final String REGISTER register;//注册public static final String REMOVE REMOVE;//删除}//最近变更队列的监控线程class RecentlyChangedQueueMonitor extends Thread {Overridepublic void run() {while(true) {try {try {writeLock.lock();RecentlyChangedServiceInstance recentlyChangedItem null;Long currentTimestamp System.currentTimeMillis();while ((recentlyChangedItem recentlyChangedQueue.peek()) ! null) {//判断如果一个服务实例变更信息已经在队列里存在超过3分钟了就从队列中移除if (currentTimestamp - recentlyChangedItem.changedTimestamp RECENTLY_CHANGED_ITEM_EXPIRED) {recentlyChangedQueue.poll();}}} finally {writeLock.unlock();}Thread.sleep(RECENTLY_CHANGED_ITEM_CHECK_INTERVAL);} catch (Exception e) {e.printStackTrace();} }}} }//微服务存活状态监控组件 public class ServiceAliveMonitor {//检查服务实例是否存活的间隔private static final Long CHECK_ALIVE_INTERVAL 60 * 1000L;//负责监控微服务存活状态的后台线程private Daemon daemon;public ServiceAliveMonitor() {this.daemon new Daemon();daemon.setDaemon(true); daemon.setName(ServiceAliveMonitor); }//启动后台线程public void start() {daemon.start();}//负责监控微服务存活状态的后台线程private class Daemon extends Thread {private ServiceRegistry registry ServiceRegistry.getInstance();Overridepublic void run() {MapString, MapString, ServiceInstance registryMap null;while (true) {try {//可以判断一下是否要开启自我保护机制SelfProtectionPolicy selfProtectionPolicy SelfProtectionPolicy.getInstance();if (selfProtectionPolicy.isEnable()) {Thread.sleep(CHECK_ALIVE_INTERVAL);continue;}//定义要删除的服务实例的集合ListServiceInstance removingServiceInstances new ArrayListServiceInstance();//开始读服务注册表的数据这个过程中其他线程可以读但不可以写try {//对整个服务注册表加读锁registry.readLock();registryMap registry.getRegistry();for (String serviceName : registryMap.keySet()) {MapString, ServiceInstance serviceInstanceMap registryMap.get(serviceName);for (ServiceInstance serviceInstance : serviceInstanceMap.values()) {//说明服务实例距离上一次发送心跳已经超过90秒了认为这个服务死了从注册表中摘除这个服务实例if (!serviceInstance.isAlive()) {removingServiceInstances.add(serviceInstance);} }}} finally {registry.readUnlock();}//将所有的要删除的服务实例从服务注册表删除for (ServiceInstance serviceInstance : removingServiceInstances) {registry.remove(serviceInstance.getServiceName(), serviceInstance.getServiceInstanceId());//更新自我保护机制的阈值synchronized(SelfProtectionPolicy.class) {selfProtectionPolicy.setExpectedHeartbeatRate(selfProtectionPolicy.getExpectedHeartbeatRate() - 2);selfProtectionPolicy.setExpectedHeartbeatThreshold((long)(selfProtectionPolicy.getExpectedHeartbeatRate() * 0.85));}}Thread.sleep(CHECK_ALIVEINTERVAL);} catch (Exception e) {e.printStackTrace();}}}} }//负责接收register-client发送过来的请求的处理器 public class RegisterServerController {private ServiceRegistry registry ServiceRegistry.getInstance();//服务注册public RegisterResponse register(RegisterRequest registerRequest) {RegisterResponse registerResponse new RegisterResponse();try {//在注册表中加入这个服务实例ServiceInstance serviceInstance new ServiceInstance();serviceInstance.setHostname(registerRequest.getHostname()); serviceInstance.setIp(registerRequest.getIp()); serviceInstance.setPort(registerRequest.getPort()); serviceInstance.setServiceInstanceId(registerRequest.getServiceInstanceId()); serviceInstance.setServiceName(registerRequest.getServiceName());registry.register(serviceInstance);//更新自我保护机制的阈值synchronized(SelfProtectionPolicy.class) {SelfProtectionPolicy selfProtectionPolicy SelfProtectionPolicy.getInstance();selfProtectionPolicy.setExpectedHeartbeatRate(selfProtectionPolicy.getExpectedHeartbeatRate() 2); selfProtectionPolicy.setExpectedHeartbeatThreshold((long)(selfProtectionPolicy.getExpectedHeartbeatRate() * 0.85)); }registerResponse.setStatus(RegisterResponse.SUCCESS); } catch (Exception e) {e.printStackTrace(); registerResponse.setStatus(RegisterResponse.FAILURE); }return registerResponse;}//服务下线public void cancel(String serviceName, String serviceInstanceId) {//从服务注册中摘除实例registry.remove(serviceName, serviceInstanceId);//更新自我保护机制的阈值synchronized(SelfProtectionPolicy.class) {SelfProtectionPolicy selfProtectionPolicy SelfProtectionPolicy.getInstance();selfProtectionPolicy.setExpectedHeartbeatRate(selfProtectionPolicy.getExpectedHeartbeatRate() - 2); selfProtectionPolicy.setExpectedHeartbeatThreshold((long)(selfProtectionPolicy.getExpectedHeartbeatRate() * 0.85)); }}//发送心跳public HeartbeatResponse heartbeat(HeartbeatRequest heartbeatRequest) { HeartbeatResponse heartbeatResponse new HeartbeatResponse();try {ServiceInstance serviceInstance registry.getServiceInstance(heartbeatRequest.getServiceName(), heartbeatRequest.getServiceInstanceId());if (serviceInstance ! null) {serviceInstance.renew();}//记录一下每分钟的心跳的次数HeartbeatCounter heartbeatMeasuredRate HeartbeatCounter.getInstance();heartbeatMeasuredRate.increment();heartbeatResponse.setStatus(HeartbeatResponse.SUCCESS); } catch (Exception e) {e.printStackTrace(); heartbeatResponse.setStatus(HeartbeatResponse.FAILURE); }return heartbeatResponse;}//拉取全量注册表public Applications fetchFullRegistry() {try {registry.readLock();return new Applications(registry.getRegistry());} finally {registry.readUnlock();}}//拉取增量注册表public DeltaRegistry fetchDeltaRegistry() {try {registry.readLock();return registry.getDeltaRegistry();} finally {registry.readUnlock(); }} } 5.尽量减少线程对锁占用时间(edits log分段加锁) (1)尽可能减少线程对锁占用的时间 (2)edits log通过分段加锁减少锁占用时间的案例 (1)尽可能减少线程对锁占用的时间 无论是synchronized锁还是读写锁都要尽量保证加锁的时间比较短不能在加锁后执行耗时的磁盘文件读写、网络IO读写等操作长时间占有锁。一般会在操作内存中的数据时加锁而不会在操作数据库中的数据时加锁。否则可能会导致长时间占用锁导致线程并发的性能和吞吐量大幅度下降。 (2)edits log通过分段加锁减少锁占用时间的案例 分布式存储系统中的edits log分段加锁机制就是为了尽量减少锁占用的时间其中加锁的部分都是简单的变量判断执行时间都是很快的。 //负责管理edits log日志的核心组件 public class FSEditlog {//当前递增到的txid的序号private long txidSeq 0L;//内存双缓冲区private DoubleBuffer editLogBuffer new DoubleBuffer();//当前是否在将内存缓冲刷入磁盘中private volatile Boolean isSyncRunning false;//当前是否有线程在等待刷新下一批edits log到磁盘里去private volatile Boolean isWaitSync false;//在同步到磁盘中的最大的一个txidprivate volatile Long syncMaxTxid 0L;//每个线程自己本地的txid副本private ThreadLocalLong localTxid new ThreadLocalLong();//记录edits log日志public void logEdit(String content) {//这里直接加锁有线程执行logSync()方法时这里没有其他线程能进来synchronized(this) {//获取全局唯一递增的txid代表了edits log的序号txidSeq;long txid txidSeq;localTxid.set(txid);//构造一条edits log对象EditLog log new EditLog(txid, content);//将edits log写入内存缓冲中不是直接刷入磁盘文件editLogBuffer.write(log); }//尝试允许某一个执行logEdit()方法的线程一次性将内存缓冲中的数据刷入到磁盘文件中logSync();}//将内存缓冲中的数据刷入磁盘文件中//在这里尝试允许某一个线程一次性将内存缓冲中的数据刷入到磁盘文件中//相当于批量将内存缓冲的数据刷入磁盘private void logSync() {//再次尝试加锁只有一个线程能进来这个过程很快纳秒级别这里属于第一段加锁synchronized(this) {//如果当前正好有线程在刷内存缓冲到磁盘中去if (isSyncRunning) {//假如某个线程正在把txid 6,7,8,9,10,11,12的edits log从syncBuffer刷入磁盘//此时syncMaxTxid 12代表的是正在刷入磁盘的最大txid//那么刷盘的线程释放锁进行刷盘后这时来一个线程对应的txid 10此时它可以直接返回//因为它对应的edits log被刷盘的线程正在刷入或者已经刷入磁盘了这时txid 12的线程就不需要等待long txid localTxid.get();if (txid syncMaxTxid) {return;}//此时如果来的是一个txid 13的线程那么就会发现已经有线程在等待刷下一批数据到磁盘此时会直接返回if (isWaitSync) {return;}//此时如果来的是一个txid 14的线程并且刷盘还没刷完//那么就在这里等待或者成为下一个刷盘的线程只有一个线程在等isWaitSync true;while (isSyncRunning) {try {wait(2000);//释放锁并自己等2秒或者等别人唤醒} catch (Exception e) {e.printStackTrace(); }}isWaitSync false;}//交换两块缓冲区editLogBuffer.setReadyToSync();//然后保存当前要同步到磁盘中的最大txid此时editLogBuffer中的syncBuffer在交换完以后可能有多条数据//而且里面的edits log的txid一定是从小到大的此时要同步的txid 6,7,8,9,10,11,12所以syncMaxTxid 12syncMaxTxid editLogBuffer.getSyncMaxTxid();//设置当前正在同步到磁盘的标志位isSyncRunning true;}//释放锁开始同步内存缓冲的数据到磁盘文件里去//这个过程其实是比较慢基本上肯定是毫秒级了弄不好就要几十毫秒editLogBuffer.flush(); //这里属于另外一段加锁synchronized(this) {//同步完了磁盘之后就会将标志位复位再释放锁isSyncRunning false;//唤醒可能正在等待他同步完磁盘的线程notifyAll();}}//代表了一条edits log(内部类)private class EditLog {long txid;String content;public EditLog(long txid, String content) {this.txid txid;this.content content;}}//内存双缓冲(内部类)private class DoubleBuffer {//专门用来承载线程写入edits logLinkedListEditLog currentBuffer new LinkedListEditLog();//专门用来将数据同步到磁盘中去的一块缓冲LinkedListEditLog syncBuffer new LinkedListEditLog();//将edits log写到内存缓冲里去public void write(EditLog log) {currentBuffer.add(log);}//交换两块缓冲区为了同步内存数据到磁盘做准备public void setReadyToSync() {LinkedListEditLog tmp currentBuffer;currentBuffer syncBuffer;syncBuffer tmp;}//获取sync buffer缓冲区里的最大的一个txidpublic Long getSyncMaxTxid() {return syncBuffer.getLast().txid;}//将syncBuffer缓冲区中的数据刷入磁盘中public void flush() {for (EditLog log : syncBuffer) {System.out.println(将edit log写入磁盘文件中 log); //正常来说就是用文件输出流将数据写入磁盘文件中}syncBuffer.clear(); }} } 6.尽量减少线程对数据加锁的粒度(库存分段扣减) (1)尽可能减少线程对数据加锁的粒度 (2)每秒上千订单下库存分段扣减的案例 (1)尽可能减少线程对数据加锁的粒度 如果有一份共享数据里面包含了多个子数据。可以对完整的这份数据加锁线程只要访问这份数据就会竞争锁。也可以只对数据里的部分子数据进行加锁从而降低加锁的粒度减少竞争。比如库存扣减时可以仅仅对部分库存数据加锁而不是对完整的库存数据加锁。 一.edits log的分段加锁机制尽可能减少一个线程占用锁的时间 二.库存的分段扣减机制尽可能减少一个线程对数据加锁的粒度 (2)每秒上千订单下库存分段扣减的案例 一.不加锁时的库存超卖问题 假设订单系统部署在两台机器上某一时刻不同用户都要买10个商品于是这两台机器上的订单系统实例分别收到了不同用户的购买请求。然后每个订单系统实例都去数据库里查询检查发现当前商品库存是12个。于是每个订单系统实例都发送SQL到数据库里进行下单即扣减10个库存。其中一个实例将库存从12扣减为2另外一个实例将库存从2扣减为-8于是出现了库存超卖的问题。 二.用分布式锁解决库存超卖问题 分布式锁就是同一个锁key同一时间只能有一个客户端拿到锁这样只有获取到锁的客户端才能执行接下来的库存扣减逻辑。 当然对于库存超卖的问题除了使用分布式锁来解决之外也可以使用悲观锁、乐观锁、队列串行化、异步队列分散、Redis原子操作等来解决。 三.分布式锁的方案在高并发场景下的问题 一旦某个客户端的某个线程获取了某个商品的分布式锁那么该客户端的其他线程或其他客户端的所有线程对这个商品的下单请求就要进行串行化等待。 假设获取锁后到释放锁前查库存 - 创建订单 - 扣减库存只需20毫秒那么1秒(1000毫秒)只能容纳50个客户端线程对同一个商品的下单请求。 所以如果应对低并发、无秒杀场景的电商系统分布式锁还是可以解决的。因为并发量很低每秒不到10个请求不存在高并发秒杀单个商品的情况。但对于高并发秒杀商品的情况明显分布式锁只支持50个线程是不行的。 四.如何对分布式锁进行高并发优化 把库存数据分成很多个段每个段对应一个单独的锁。当多个线程并发修改库存数据时就可以并发修改不同段的库存数据从而避免同一时间只能有一个线程独占修改某商品的库存数据。 Java中的LongAdder类也采用了类似的思路进行分段CAS操作。当某段CAS操作失败时会自动迁移到下一个分段进行CAS操作从而减少在高并发场景下CAS操作共享数据出现线程长时间自旋的情况。 因此库存问题下的分布式锁优化思路如下 将一个库存拆分为N个库存段比如10个库存段每个库存段有100个库存。每个线程处理下单请求时会从10个库存段中挑选一个来进行加锁并扣减库存。这样就将竞争一个库存锁转变为竞争10个库存锁从而并发性能提升10倍。 如果一个分段库存是0那么就先释放锁然后遍历下一个分段直到找出分段库存不是0的再尝试加锁处理。如果一个分段库存大于购买数量那么就直接加锁扣减。如果一个分段库存小于购买数量那么就要对多个分段库存加锁合并扣减。 该分段加锁方案的不足就是实现比较复杂 首先要对一个库存数据分段存储比如本来一个库存字段要分为10个字段。其次在每次处理库存时需要写随机算法随机挑选一个分段来扣减。最后如果某个分段中的库存不足需要自动切换到下一个分段去处理。 public class RedisLockOptimizeDemo {public static void main(String[] args) throws Exception {Long goodsSkuId 1L;//IDLong purchaseCount 50L;//购买数量int stockSegmentSeq new Random().nextInt(10) 1;//随机的库存段InventoryDAO inventoryDAO new InventoryDAO();RLock lock new RLock(stock goodsSkuId _ stockSegmentSeq);//分段加锁后查分段的库存lock.lock();Long stock inventoryDAO.getStock(goodsSkuId, stockSegmentSeq);//如果查出来库存是0那么查下一个分段if (stock 0L) {lock.unlock();boolean foundOtherStockSegment false;for (int i 1; i 10; i) {if (i stockSegmentSeq) {continue;}lock new RLock(stock_ goodsSkuId _ i);lock.lock();stock inventoryDAO.getStock(goodsSkuId, i);if (stock ! 0) {stockSegmentSeq i;foundOtherStockSegment true;//该分段库存不为0那么走完后面的逻辑才unlockbreak;} else {//该分段库存为0那么就要unlock了继续遍历下一个分段lock.unlock();}}if (!foundOtherStockSegment) {System.out.println(商品库存不足);return;}}//如果库存分段正好大于要购买的数量if (stock purchaseCount) {inventoryDAO.updateStock(goodsSkuId, stockSegmentSeq, stock - purchaseCount);lock.unlock();return;}//代码执行到这里就说明当前这个分段的库存小于要购买的数量合并分段加锁Long totalStock stock;MapRLock, Long otherLocks new HashMapRLock, Long();for (int i 1; i 10; i) {if (i stockSegmentSeq) {continue;}RLock otherLock new RLock(stock_ goodsSkuId _ i);otherLock.lock();Long otherStock inventoryDAO.getStock(goodsSkuId, i);if (otherStock 0) {otherLock.unlock();continue;}totalStock otherStock;otherLocks.put(otherLock, otherStock);if (totalStock purchaseCount) {break;}}//尝试所有的其他分段之后还是无法满足购买数量需要对那些分段释放锁if (totalStock purchaseCount) {System.out.println(商品库存不足);for (Map.EntryRLock, Long otherLockEntry : otherLocks.entrySet()) {otherLockEntry.getKey().unlock();}return;}//先将最初加的那个分段库存扣减掉Long remainReducingStock purchaseCount - stock;inventoryDAO.updateStock(goodsSkuId, stockSegmentSeq, 0L);lock.unlock();for (Map.EntryRLock, Long otherLockEntry : otherLocks.entrySet()) {RLock otherLock otherLockEntry.getKey();int otherStockSegmentSeq Integer.valueOf(otherLock.name.substring(otherLock.name.length() - 1));Long otherStock otherLockEntry.getValue();if (otherStock remainReducingStock) {remainReducingStock - otherStock;inventoryDAO.updateStock(goodsSkuId, otherStockSegmentSeq, 0L);} else {remainReducingStock 0L;inventoryDAO.updateStock(goodsSkuId, otherStockSegmentSeq, otherStock - remainReducingStock);}otherLock.unlock();}}static class RLock {String name;public RLock(String name) { this.name name; }public void lock() { System.out.println(加分布式锁 name); }public void unlock() { System.out.println(释放分布式锁 name); }Overridepublic int hashCode() {final int prime 31;int result 1;result prime * result ((name null) ? 0 : name.hashCode());return result;}Overridepublic boolean equals(Object obj) {if (this obj) { return true; }if (obj null) { return false; }if (getClass() ! obj.getClass()) { return false; }RLock other (RLock) obj;if (name null) {if (other.name ! null) {return false;}} else if (!name.equals(other.name)) {return false;}return true;}}static class InventoryDAO {public Long getStock(Long goodsSkuId, Integer stockSegmentSeq) { return 1000L; }public void updateStock(Long goodsSkuId, Integer stockSegmentSeq, Long stock) { }} } 7.尽量按不同功能进行锁分离(避免循环中加锁) (1)尽可能按不同功能对锁进行分离 (2)避免在循环中频繁加锁以及释放锁 (1)尽可能按不同功能对锁进行分离 如果可以按照使用功能的不同将一把锁拆分为多把锁。那么在使用不同的功能时就可以使用不同的锁了从而减少线程竞争同一把锁时的冲突。 比如LinkedBlockingQueue有界阻塞队列就使用了两把锁。队列头部使用一把take锁队列尾部使用一把put锁。执行put()方法时先获取put锁然后再从队列尾部入队元素。执行take()方法时先获取take锁然后再从队列头部出队元素。这样同一时刻对阻塞队列的入队和出队操作就不会产生锁冲突。 (2)避免在循环中频繁加锁以及释放锁 尽量避免在for循环里频繁地加锁和释放锁。 8.尽量减少高并发下线程对锁的竞争(多级缓存) (1)高并发场景下如何减少线程对锁的竞争 (2)服务注册表通过多级缓存来降低锁竞争频率 (1)高并发场景下如何减少线程对锁的竞争 一.使用读写锁 读写锁主要用来解决读请求和读请求不会冲突可以并行执行但写请求和读请求还是会冲突写请求和写请求也是会冲突的。 二.按不同功能进行锁分离 比如按使用功能不同将一把锁拆分为多把锁。 三.减少锁占用的时间 比如分布式存储系统中的edits log分段锁。 四.数据分段加锁 比如分布式锁的分段库存扣减。 五.减少线程对锁的竞争频率 比如通过多级缓存来降低锁竞争的频率。 (2)服务注册表通过多级缓存来降低锁竞争频率 一.读写锁依然存在并发读写冲突 虽然服务注册中心已经使用了读写锁来处理读写服务注册表但是当需要对服务注册表进行写操作时可能会出现大量的读操作被阻塞。现在假设每分钟会出现并发读写冲突的次数是10次。 二.多级缓存的运行逻辑 因此可以使用多级缓存机制来优化服务注册表的读写并发冲突问题。服务注册表的多级缓存是一级缓存(只读缓存)二级缓存(读写缓存)。 所有的读请求都先从一级缓存(只读缓存)里读取数据。如果一级缓存(只读缓存)里没找到则从二级缓存(读写缓存)里查找。如果二级缓存(读写缓存)也没有找到则再从服务注册表中进行加载。 三.多级缓存的过期策略 一级缓存(读写缓存)中的数据的过期策略是通过一个定时任务每隔30s和二级缓存的数据保持同步(一致)。 二级缓存(读写缓存)中的数据的过期策略是当有服务实例发生注册、下线时就会主动过期(更新)其对应的二级缓存。此外构建二级缓存时会指定默认的过期时间为180秒180秒后自动过期。 四.多级缓存如何降低并发读写频率 一级缓存使用ConcurrentHashMap实现二级缓存使用Guava Cache实现。一级缓存主要用于降低每分钟的写频率二级缓存主要用于冷热数据分离。 当服务注册表的某个key对应的数据更新时二级缓存的数据会立即更新30秒后一级缓存的数据也一定会更新。 当服务注册表的某个key数据没有更新时二级缓存的数据180秒后就会过期过期的30秒后一级缓存的数据变为null。后续访问一级缓存时再从服务注册表中获取对应的数据写入二级缓存。 通过这两级缓存就可以大大降低每分钟并发读写缓存注册表的冲突次数比如一级缓存就可以将每分钟并发读写冲突的次数从10次降为2次因为一级缓存的数据最多每30秒更新一次。而二级缓存则实现冷数据180秒没访问则不缓存减少需要缓存的热数据数量。 五.多级缓存的本质 一级缓存是只读缓存二级缓存是读写缓存。在一级缓存(只读缓存)中存在很少的并发读写情况每分钟两次。在二级缓存(读写缓存)中会进行冷热数据分离冷数据不缓存。 服务注册表的数据更新之后最多30秒就会更新到一级缓存多级缓存就是牺牲读数据和写数据的实时一致性来降低读写竞争的频率。 public class MutilCache {//一级缓存(只读缓存)private final ConcurrentMapKey, Value readOnlyCacheMap new ConcurrentHashMapKey, Value();//二级缓存(读写缓存)private final LoadingCacheKey, Value readWriteCacheMap;public MultiCache() {this.readWriteCacheMap CacheBuilder.newBuilder().expireAfterWrite(180, TimeUnit.SECONDS).build(new CacheLoaderKey, Value() {Overridepublic Value load(Key key) {//从注册表中获取数据return getFromRegistry(key);}});…}//读取服务注册表中的某个key的值public Value get(final Key key) {return getValue(key);}public Value getValue(final Key key) {Value payload null;try {//首先读取一级缓存(只读缓存)的数据final Value currentPayload readOnlyCacheMap.get(key);if (currentPayload ! null) {payload currentPayload;} else {//然后再读二级缓存(读写缓存)的数据payload readWriteCacheMap.get(key);readOnlyCacheMap.put(key, payload);}} catch (Throwable t) {logger.error(Cannot get value for key : {}, key, t);}return payload;}private TimerTask getCacheUpdateTask() {return new TimerTask() {Overridepublic void run() {//遍历一级缓存(只读缓存)for (Key key : readOnlyCacheMap.keySet()) {Value cacheValue readWriteCacheMap.get(key);Value currentCacheValue readOnlyCacheMap.get(key);//判断二级缓存的数据是否和一级缓存的数据相同如果不相同则更新一级缓存的数据if (cacheValue ! currentCacheValue) {readOnlyCacheMap.put(key, cacheValue);}}}}}… } 9.锁故障之死锁总结 (1)产生死锁的例子 (2)死锁产生的必要条件 (3)如何解决死锁问题 (4)避免死锁的常见方式 (1)产生死锁的案例 案例一 如果在一个类的clint()方法中有耗时的操作那么可能会造成多个线程同时初始化该类时产生阻塞。 如果两个类在各自的静态语句块中都尝试通过反射初始化对方那么并发调用这两个类的初始化方法就可能会产生死锁。 案例二 简单的锁顺序死锁类不一样。
synchronized(left)synchronized(right)
synchronized(right)synchronized(left) 案例三 动态的锁顺序死锁类一样但对象实例不一样比如相互转账场景。 synchronized(fromAccount)synchronized(toAccount) (2)死锁产生的必要条件 不管是线程级别的死锁还是数据库级别的死锁只能通过人工干预解决。导致死锁的条件有如下四个这四个条件同时满足就会产生死锁。 条件一互斥条件 共享资源X和Y只能被一个线程占用。 条件二占有且等待条件 线程1已经取得共享资源X在等待共享资源Y的时候不释放共享资源X。 条件三不可抢占条件 其他线程不能强行抢占线程1占有的资源。 条件四循环等待条件 线程1在等待线程2占有的共享资源线程2在等待线程1占有的共享资源。 (3)如何解决死锁问题 只要破坏产生死锁的四个条件中的任意一个就可以避免产生死锁。其中互斥条件不可破坏因为这是互斥锁的基本约束其他三个条件都可破坏。 一.破坏占有且等待条件 要破坏在占有资源X的同时还在等待资源Y可以一次性申请资源X和资源Y。每个线程都一次性申请所有的资源这样就不存在占有一部分等待一部分了。 二.破坏不可抢占条件 当占用部分资源的线程进一步申请其他资源时如果申请不到则可以主动释放其占有的资源这样不可抢占条件就被破坏了。破坏不可抢占条件的核心是当前线程能够主动释放尝试占有的资源。 synchronized在申请不到资源的时候会直接进入阻塞状态一旦线程被阻塞之后就无法再释放已经占有的资源。Lock接口中的tryLock()方法可以尝试抢占锁抢占失败不会阻塞线程。所以synchronized不能实现破坏该条件但Lock.tryLock()可以实现。 三.破坏循环等待条件 把资源按照某种顺序编号所有锁资源的申请都按照某种顺序来获取。比如所有线程在申请资源时都是先申请资源编号小的再申请资源编号大的。 (4)避免死锁的常见方式 一.避免一个线程同时获得多个锁 二.避免一个线程在锁内同时占用多个资源尽量保证每个锁只占用一个资源 三.尝试使用定时锁使用Lock.tryLock(timeout)来替代使用内部锁 四.对于数据库锁加锁和解锁必须要在同一个数据库连接里 10.无锁化编程 (1)一写一读的无锁队列(内存屏障) (2)一写多读的无锁队列(volatile关键字) (3)多写多读的无锁队列(CAS机制) (3)多写多读的无锁队列(CAS机制) (4)多写多读的无锁栈(CAS机制) (5)多写多读的无锁链表 锁在Java中主要是指synchronized关键字和Lock接口锁在Linux中主要是指pthread和mutex。 (1)一写一读的无锁队列(内存屏障) 一写一读的无锁队列即Linux内核的kfifo队列。一写一读两个线程不需要锁只需要内存屏障。 (2)一写多读的无锁队列(volatile关键字) Disruptor是一个开源的并发框架能在无锁的情况下实现队列并发操作。Disruptor的RingBuffer之所以可以做到完全无锁是因为单线程写。RingBuffer有一个头指针对应一个生产者线程。RingBuffer有多个尾指针对应多个消费者线程。指针类型都是volatile的。 (3)多写多读的无锁队列(CAS机制) 基于CAS和链表就可以实现一个多写多读的无锁队列。链表有一个头指针head和一个尾指针tail。入队时对tail指针进行CAS操作出队时对head指针进行CAS操作。 (4)多写多读的无锁栈(CAS机制) 无锁栈比无锁队列的实现更简单只需要对head指针进行CAS操作即可。入栈时对head指针进行CAS操作出栈时也是对head指针进行CAS操作。 (5)多写多读的无锁链表 ConcurrentSkipListMap并发跳表查询就是基于无锁链表的。
- 上一篇: 经典网站欣赏wordpress评论框
- 下一篇: 经营地址怎么在国税网站做更改什么网站建设最简单
相关文章
-
经典网站欣赏wordpress评论框
经典网站欣赏wordpress评论框
- 技术栈
- 2026年03月21日
-
京紫元年网站建设广东省自然资源厅事务中心
京紫元年网站建设广东省自然资源厅事务中心
- 技术栈
- 2026年03月21日
-
京东网站拼图验证怎么做学校网站建设培训心得
京东网站拼图验证怎么做学校网站建设培训心得
- 技术栈
- 2026年03月21日
-
经营地址怎么在国税网站做更改什么网站建设最简单
经营地址怎么在国税网站做更改什么网站建设最简单
- 技术栈
- 2026年03月21日
-
经营类网页游戏大全免费建站网站 seo
经营类网页游戏大全免费建站网站 seo
- 技术栈
- 2026年03月21日
-
经营网站 备案信息管理系统百度问一问
经营网站 备案信息管理系统百度问一问
- 技术栈
- 2026年03月21日


