香河县住房和城乡建设局网站国外的营销网站有哪些

当前位置: 首页 > news >正文

香河县住房和城乡建设局网站,国外的营销网站有哪些,公司内账管理系统,网络培训学校一、概述 消息队列 定义 消息队列模型#xff1a;一种分布式系统中的消息传递方案#xff0c;由消息队列、生产者和消费者组成消息队列#xff1a;负责存储和管理消息的中间件#xff0c;也称为消息代理#xff08;Message Broker#xff09;生产者#xff1a;负责 产…一、概述 消息队列 定义 消息队列模型一种分布式系统中的消息传递方案由消息队列、生产者和消费者组成消息队列负责存储和管理消息的中间件也称为消息代理Message Broker生产者负责 产生并发送 消息到队列的应用程序消费者负责从队列 获取并处理 消息的应用程序功能实现消息发送和处理的解耦支持异步通信提高系统的可扩展性和可靠性主流消息队列解决方案 RabbitMQ轻量级支持多种协议适合中小规模应用RocketMQ阿里开源高性能适合大规模分布式应用 Stream 定义StreamRedis 5.0 引入的一种数据类型用于处理高吞吐量的消息流、事件流等场景功能按时间顺序 ”添加、读取、消费“ 消息支持消费者组、消息确认等功能 二、Stream 工作流程 写入消息 生产者通过 XADD 向 Stream 中添加消息。每条消息自动获得唯一的 ID按时间顺序存入 Stream。创建消费者组 如果使用消费者组首先需要通过 XGROUP CREATE 创建消费者组。消费者组会根据时间顺序将消息分配给组内的消费者。读取消息 消费者使用 XREADGROUP 命令读取 Stream 中的消息。消息按规则分配给不同消费者处理每个消费者读取到不同的消息。确认消息 消费者在处理完消息后使用 XACK 命令确认消息表示该消息已成功处理。如果消息未确认例如消费者崩溃或超时它将保持在 Pending 状态等待重新分配给其他消费者。重新分配未确认消息 如果消息在一定时间内没有被确认其他消费者可以读取未确认的消息并进行处理。可通过 XPENDING 命令查看未确认消息或在消费者组中设置时间阈值自动重新分配。删除消费者组 不再需要消费者组时使用 XGROUP DESTROY 命令删除消费者组 三、Stream 实现 消费者组模式 定义Redis Streams 的一部分用于处理消息的分布式消费优点 消息分流多消费者争抢消息加快消费速度避免消息堆积消息标示避免消息漏读消费者读取消息后不马上销毁加入 consumerGroup 维护的 pending list 队列等待 ACK消息确认通过消息 ACK 机制保证消息至少被消费一次可以阻塞读取避免盲等实现方法 通过 Stream 数据类型实现消息队列命令以 “X” 开头 常用命令 XGROUP CREATE key groupName ID [MKSTREAM] 功能创建消费者组参数 key队列名称groupName组名称ID起始 ID 标识\( 表示队列中最后一个消息0 表示队列中第一个消息MKSTREAM队列不存在则创建队列 XGROUP DESTORY key groupName 功能删除指定消费者组 XGROUP CREATECONSUMER key groupName consumerName 功能添加组中消费者 XGROUP DELCONSUMER key groupName consumerName 功能删除组中消费者 XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key …] ID [ID …] 功能读取组中的消息gourp消费者组名称consumer消费者名称(不存在则自动创建)count本次查询的最大数量BLOCK milliseconds当没有消息时最长等待时间NOACK无需手动 ACK获取到消息后自动确认STREAMS KEY指定队列名称ID获取消息的起始 ID 表示从下一个未消费消息开始 (常用) XPENDING key group [ [ IDLE min-idle-time ] start end count [consumer] ] 功能获取 pending-list 中的消息IDLE获取消息后、确认消息前的这段时间空闲时间超过 min-idle-time 则取出start获取的最小目标 IDend获取的最大目标 IDcount获取的数量consumer获取 consumer 的 pending-list XACK key group ID [ ID … ] 功能确认从组中读取的消息已被处理key队列名称group组名称ID消息的 ID 表格版命令 命令 命令功能XGROUP CREATE key groupName ID [MKSTREAM]创建消费者组XGROUP DESTORY key groupName删除指定消费者组XGROUP CREATECONSUMER key groupName consumerName添加组中消费者XGROUP DELCONSUMER key groupName consumerName删除组中消费者XREADGROUP GROUP groupName consumerName [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key …] ID [ID …]读取组中的消息ID 填写 “” 则读取第一条未读消息XACK key group ID [ ID … ]确认从组中读取的消息已被处理 属性 属性名定义key队列名称groupName消费者组名称ID起始 ID 标示\) 代表队列中最后一个消息0 代表第一个消息MKSTREAM队列不存在时自动创建队列BLOCK milliseconds没有消息时的最大等待时长NOACK无需手动 ACK获取到消息后自动确认STREAMS key指定队列名称 运行逻辑 while(true) {// 尝试监听队列使用阻塞模式最长等待 2000 msObject msg redis.call(XREADGROUP GROUP group1 consumer1 COUNT 1 BLOCK 2000 STREAMS s1 );if(msg null) {continue;}try {// 处理消息完成后一定要 ACKhandleMessage(msg);} catch (Exception e) {while(true) {// 重新读取阻塞队列消息Object msg redis.call(XREADGROUP GROUP group1 consumer1 COUNT 1 STREAM S1 0);if(msg null) // 如果阻塞队中的消息已经全部处理则退出pending-listbreak;try {handleMessage(msg); // 重新处理 pending-list 中的消息} catch (Exception e){continue; // 如果还出错, 则继续重新读取}}} }四、示例 目标消息队列实现数据库异步修改数据库将下单 message 缓存在 redis 中减小下单操作对数据库的冲击 项目结构 RedisConfig 配置类创建消费者组是一次性的操作适合放在配置类中VoucherOrderHandler 内部类消费者的逻辑和订单业务相关因此适合放在 VoucherOrderServiceImpl 中多线程启动逻辑消费者线程的启动与订单业务密切相关直接放在 VoucherOrderServiceImpl 类中更符合职责分离原则src/main/java ├── com/example │ ├── config │ │ └── RedisConfig.java // Redis 配置类包含消费者组初始化 │ ├── service │ │ ├── VoucherOrderService.java │ │ └── impl │ │ └── VoucherOrderServiceImpl.java // 包含 VoucherOrderHandler 内部类 │ ├── entity │ │ └── VoucherOrder.java // 优惠券订单实体 │ ├── utils │ │ └── BeanUtil.java // 用于 Map 转 Bean 的工具类 │ └── controller │ └── VoucherOrderController.java // 如果有 Controller创建消费者组config.RedisConfig Bean public void initStreamGroup() {// 检查是否存在消费者组 g1try {stringRedisTemplate.opsForStream().createGroup(stream.orders, g1);} catch (RedisSystemException e) {// 如果 group 已存在抛出异常可忽略log.warn(消费者组 g1 已存在);} }创建消费者线程 位置作为 VoucherOrderServiceImpl 内的预构造部分PostConstruct public void startConsumers() {for (int i 0; i 5; i) { // 5 个线程模拟多个消费者new Thread(new VoucherOrderHandler()).start();} }添加消息到消息队列 src/main/resources/lua/SECKILL_SCRIPT.lua –1. 参数列表 –1.1. 优惠券id local voucherId ARGV[1] –1.2. 用户id local userId ARGV[2] –1.3. 订单id local orderId ARGV[3]–2. 数据key local stockKey seckill:stock: .. voucherId –2.1. 库存key local orderKey seckill:order .. voucherId –2.2. 订单key–3. 脚本业务 –3.1. 判断库存是否充足 get stockKey if( tonumber( redis.call(GET, stockKey) ) 0 ) thenreturn 1 end –3.2. 判断用户是否重复下单 SISMEMBER orderKey userId if( redis.call( SISMEMBER, orderKey, userId ) 1 ) thenreturn 2 end –3.4 扣库存 incrby stockKey -1 redis.call( INCRBY, stockKey, -1 ) –3.5 下单(保存用户) sadd orderKey userId redis.call( SADD, orderKey, userId ) – 3.6. 发送消息到队列中 redis.call( XADD, stream.orders, *, userId, userId, voucherId, voucherId, id, orderId )创建消费者类ServiceImpl 位置作为 VoucherOrderServiceImpl 内的私有类// 在ServiceImpl中创建一个VoucherOrderHandler消费者类,专门用于处理消息队列中的消息 private class VoucherOrderHandler implements Runnable {Overridepublic void run() {while (true) {try {// 1. 获取消息队列中的订单信息ListMapRecordString, Object, Object list stringRedisTemplate.opsForStream().read(Consumer.from(g1, c1),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create( stream.order, ReadOffset.lastConsumed()));// 2. 没有消息则重新监听if (list null || list.isEmpty() ) continue;// 3. 获取消息中的 voucherOrderMapRecordString, Object, Object record list.get(0);MapObject, Object value record.getValue();VoucherOrder voucherOrder BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 4. 创建订单createVoucherOrder(voucherOrder);// 5. 确认当前消息已消费 XACKstringRedisTemplate.opsForStream().acknowledge(s1, g1, record.getId());} catch ( Exception e) {log.error(处理订单异常, e);// 6. 处理订单失败则消息会加入pending-list,继续处理pending-listhandlePendingList();}}}// 处理pending-list中的消息private void handlePendingList() {while(true) {try {// 1. 消费pending-list中的消息ListMapRecordString, Object, Object list stringRedisTemplate.opsForStream().read(Consumer.from(g1, c1), // 消费者此消息的消费者StreamReadOptions.empty().count(1), // StreamOffset.create(stream.order, ReadOffset.from(0)) // 从pending-list的第一条消息开始读);// 2. 退出条件, list 为空 - pending-list 已全部处理if(list null || list.isEmpty()) break;// 3. 获取消息中的 voucherOrderMapRecordString, Object, Object record list.get(0);MapObject, Object value record.getValue();VoucherOrder voucherOrder BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 4. 创建订单createVoucherOrder(voucherOrder);// 5. 确认消息已消费(XACK)stringRedisTemplate.opsForStream().acknowledge(s1, g1, record.getId());} catch (Exception e) {log.error(处理pendding订单异常, e);try{Thread.sleep(20); // 如果发生异常则休眠一会再重新消费pending-list中的消息} catch (Exception e2) {e.printStackTrace(); }}}} }创建消息方法 目标用户通过这个方法发送一条创建订单的 Message 给 Redis Stream// 创建Lua脚本对象 private static final DefaultRedisScriptLong SECKILL_SCRIPT;// Lua脚本初始化 (通过静态代码块) static {SECKILL_SCRIPT new DefaultRedisScript();SECKILL_SCRIPT.setLocation(new ClassPathResource(lua/SECKILL_SCRIPT.lua));SECKILL_SCRIPT.setResultType(Long.class); }Override public void createVoucherOrder(Long voucherId, Long userId) {// 生成订单 ID模拟long orderId System.currentTimeMillis();// 执行 Lua 脚本Long result stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(), // 使用空的 key 列表voucherId.toString(), userId.toString(), String.valueOf(orderId));// 根据 Lua 脚本返回结果处理if (result 1) {throw new RuntimeException(库存不足);} else if (result 2) {throw new RuntimeException(不能重复下单);}// 如果脚本执行成功则订单消息会进入 Redis Stream消费者组会自动处理System.out.println(订单创建成功); }(缺陷) 单消费者模式 常用命令 XADD key [NOMKSTREAM] [MAXLEN | MINID [|~] threshold [LIMIT count] * | ID field value [field value …]XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ ID … ]缺陷有消息漏读风险 五、其他消息队列方案 (缺陷) List 实现 优点 不受 JVM 内存上限限制因为利用 Redis 存储数据安全 因为基于 List 结构本身是数据存储基于 Redis 持久化机制消息有序性通过 List 结构的 LPUSH BRPOP 命令实现顺序缺点 消息丢失BRPOP 的时候如果宕机则消息会丢失只支持单消费者 (缺陷) PubSub 实现 定义 Publish Subscribe 模型一种消息队列模型生产者向指定的 channel 来 public 消息消费者从 subscribe 的 channel 中接收消息功能支持多消费者模式多个消费者可以同时 subscribe 一个 channel优点采用发布订阅模型支持多生产者、消费者缺点 不支持数据持久化无法避免消息丢失消息堆积有上限超出时数据丢失 三种消息队列对比