做网站什么域名好广州顺德网站建设

当前位置: 首页 > news >正文

做网站什么域名好,广州顺德网站建设,深圳seo云哥,常州承接网站建设目录 一、Redis Stream1.1 场景1#xff1a;多个客户端可以同时接收到消息1.1.1 XADD - 向stream添加Entry#xff08;发消息 #xff09;1.1.2 XREAD - 从stream中读取Entry#xff08;收消息#xff09;1.1.3 XRANGE - 从stream指定区间读取Entry#xff08;收消息多个客户端可以同时接收到消息1.1.1 XADD - 向stream添加Entry发消息 1.1.2 XREAD - 从stream中读取Entry收消息1.1.3 XRANGE - 从stream指定区间读取Entry收消息 1.2 场景2多个客户端仅收到一部分消息分片sharded、消费组group1.2.1 XGROUP CREATE - 创建消费组1.2.2 XREADGROUP - 从消费组中读取消息1.2.3 XACK - 确认消息1.2.4 XPENDING - 读取PEL消息1.2.5 XCLAIM XAUTOCLAIM - 转移PEL中消息的所有权给其他消费者1.2.6 统计命令 1.3 其他 二、Redisson Stream 一、Redis Stream 之前介绍过Redis Pub/Sub相关内容通过Redis Pub/Sub可以实现发布/订阅消息传递范式但是存在丢消息的可能而本文介绍的Redis Stream是一种可用来实现 可靠消息队列、支持消息分组类似Kafka Group 的数据结构。 关于Redis Stream的使用存在如下2个场景 场景1 多个客户端可以同时接收到消息场景2 多个客户端仅收到一部分消息分片sharded例如发送消息ABC客户端1收到AC客户端2收到B参考Kafka group概念。 关于场景1则可参考XADD、XREAD、XRANGE等相关命令的使用 关于场景2则需要了解XGROUP CREATE、XREADGROUP、XACK等相关命令的使用。 1.1 场景1多个客户端可以同时接收到消息 场景1中相关命令XADD、XREAD、XRANGE的使用汇总如下图
1.1.1 XADD - 向stream添加Entry发消息
向stream添加Entry多个key/value对XADD命令格式 XADD stream名称 id key1 value1 key2 value2 … 其中id为此次entry的唯一ID而key1 value1 key2 value2 …即为entry的具体内容 id为*则表示由Redis自动生成IDmillisecondsTime-sequenceNumber 亦可明确指定id。 示例 XADD mystream * name 罗 age 18 XADD mystream 1692632086370-0 name 刘 age 181.1.2 XREAD - 从stream中读取Entry收消息 从stream中读取entryXREAD命令格式 XREAD COUNT 最多读取数量 BLOCK 阻塞等待毫秒数 STREAMS stream名称 上次接收的id 通过XADD添加一条消息多个执行XREAD的客户端都会读取到该消息 XREAD会从参数中指定的 上次接收的id 之后开始读取后续的消息 上次接受的id 可设置为$需配合BLOCK使用表示仅读取从阻塞开始后新添加的消息即不关心历史消息 上次接受的id 可设置为需要Redis版本7.4 RC1表示仅读取最后一条消息。 阻塞等待的毫秒数 如果为0则表示一直阻塞直到读取到一条消息。 示例

从头开始读取1条消息

XREAD STREAMS mystream 0# 从头开始读取2条消息 XREAD COUNT 2 STREAMS mystream 0-0

从指定消息ID之后开始读取2条消息

XREAD COUNT 2 STREAMS mystream 1692632086370-0# 最长阻塞5秒最多读取100条消息仅读取从阻塞开始后新添加的消息 XREAD BLOCK 5000 COUNT 100 STREAMS mystream $

继续从上次接受的id之后继续读取

XREAD BLOCK 5000 COUNT 100 STREAMS mystream 1526999644174-3# 读取最后一条消息需要Redis版本7.4 RC1 XREAD STREAM mystream 1.1.3 XRANGE - 从stream指定区间读取Entry收消息 从stream指定区间起始ID范围正向读取EntryXRANGE命令格式 XRANGE stream名称 起始id 结束id COUNT 最多读取数量 按起始到结束正向返回消息 -表示最小ID表示最大ID 示例

返回全部消息从前到后依次返回

XRANGE mystream -

返回5条消息从前到后依次返回

XRANGE mystream - COUNT 5# 返回指定id包括指定id之后5条消息从前到后依次返回 XRANGE mystream 1718951980910-0 COUNT 5# 返回指定id不包括指定id之后5条消息从前到后依次返回 XRANGE mystream (1718951980910-0 COUNT 5从stream指定区间起始ID范围逆向读取EntryXREVRANGE命令格式 XREVRANGE stream名称 结束id 起始id COUNT 最多读取数量 按结束到起始逆向返回消息。 示例 返回全部消息从后到前逆向依次返回 XREVRANGE mystream -

返回2条消息从后到前逆向依次返回

XREVRANGE mystream - COUNT 21.2 场景2多个客户端仅收到一部分消息分片sharded、消费组group 场景2中相关命令XGROUP CREATE、XREADGROUP、XACK、XPENDING、XCLAIM等使用汇总如下图 1.2.1 XGROUP CREATE - 创建消费组 给stream创建消费分组分组间彼此隔离分组内多个consumer会轮流消费消息分片XGROUP CREATE命令格式 XGROUP CREATE stream名称 group名称 起始读取id [MKSTREAM] 起始读取id 为0表示从头开始读取 起始读取id 为$表示从最后一条消息之后开始读取 MKSTREAM子命令是可选的表示自动创建stream。 示例

为mystream创建分组mygroup1且从最新消息开始消费XGROUP CREATE mystream mygroup1 $1.2.2 XREADGROUP - 从消费组中读取消息

以分组group读取stream中的消息group中每个客户端需要指定consumer名称多个consumer分摊group中的消息而多个group间彼此隔离XREADGROUP命令格式 XREADGROUP GROUP group名称 consumer名称 COUNT 最多读取数量 BLOCK 阻塞等待毫秒数 [NOACK] STREAMS stream名称 上次接收的id PEL(Pending Entries List) 当使用XREADGROUP读取分组下消息时服务器会记住哪条消息发给了分组下的哪个消费者该记录存储在消费者组中称为PEL即已发送但尚未确认的消息ID列表。后续在消费者处理完消息后消费者必须手动调用XACK命令对消息ID进行确认以便从PEL中删除挂起的消息关于PEL的结构可参见下图截取自RedisInsight工具
上次接收的id 为表示消费者只希望接收从未传递给任何其他消费者的消息即给我新的信息号表示从当前消费组的last_delivered_id后面开始读。 上次接收的id 设为0或其他有效的id则表示仅读取 PEL当前consumer没有确认的消息 中指定id之后的消息。 NOACK子命令式可选的表示无需确认消息NOACK子命令适用于对可靠性要求不高、偶尔的消息丢失是可以接受的情况使用NOACK子命令可以避免将消息添加到PEL Pending Entries List相当于在读取消息后自动确认消息后续无需再调用XACK命令进行确认 示例

消费者c1阻塞读取mystream下分组mygroup1的最新消息直到读取到1条消息后解除阻塞

XREADGROUP GROUP mygroup1 c1 BLOCK 0 STREAMS mystream # 消费者c1读取mystream下分组mygroup1的PEL消息即已投递给c1但c1未进行确认的消息列表 XREADGROUP GROUP mygroup1 c1 STREAMS mystream 01.2.3 XACK - 确认消息 确认stream下指定分组group的某条消息已被成功消费XACK命令格式 XACK stream名称 group名称 消息id 示例

确认1条消息

XACK mystream mygroup1 1719206857966-0 # 同时确认3条消息 XACK mystream mygroup1 1719206857966-0 1719206909894-0 1719207195666-01.2.4 XPENDING - 读取PEL消息 读取stream中指定分组group的PEL挂起消息列表XPENDING命令格式 XPENDING stream名称 group名称 IDEL 空闲毫秒数 起始消息id 结束消息id 查询数量 consumer名称 示例

查询mystream下mygroup1分组的PEL列表

XPENDING mystream mygroup1# 查询mystream下mygroup1分组下的消费者c1的空闲9秒的最多10条PEL消息 XPENDING mystream mygroup1 IDLE 9000 - 10 c11.2.5 XCLAIM XAUTOCLAIM - 转移PEL中消息的所有权给其他消费者 通过XPENDING查询出PEL消息已投递未确认后若原先消息对应的consumer已经挂掉没有能力继续处理消息则可通过XCLIAM将对应的消息转移给同分组下的其他consumer进行处理XCLAIM命令格式如下 XCLAIM stream名称 group名称 consumer名称 空闲时长毫秒 消息id1 消息id2 转移后消息上次投递时间会重置为当前时间即消息空闲idle时间为0 默认会返回已经转移成功的消息内容且消息投递计数会加1 也可添加JUSTID子命令则只返回消息ID不返回消息内容且消息投递计数不变 若多个客户端同时通过XCLAIM转移同一条消息的所有权则只会有一个客户端转移成功。 Redis官方原文如下 Note that the message is claimed only if its idle time is greater than the minimum idle time we specify when calling XCLAIM. Because as a side effect XCLAIM will also reset the idle time (since this is a new attempt at processing the message),two consumers trying to claim a message at the same time will never both succeed: only one will successfully claim the message. This avoids that we process a given message multiple times in a trivial way (yet multiple processing is possible and unavoidable in the general case). 示例

mystream下mygroup1分组下的PEL消息1526569498055-0且空闲时长超过1小时则将其转移给消费者c2

XCLAIM mystream mygroup1 c2 3600000 1526569498055-0亦可通过XAUTOCLAIM将PEL中指定起始消息ID后的消息批量进行转移XAUTOCLIAM命令格式如下 XAUTOCLAIM stream名称 group名称 consumer名称 空闲时长毫秒 起始消息id COUNT 消息数量 示例

扫描mystream下mygroup1分组下的所有PEL消息空闲时长超过1小时则最多转移25条消息给消费者c2

XAUTOCLAIM mystream mygroup1 c2 3600000 0-0 COUNT 251.2.6 统计命令

查询stream下的分组信息

XINFO GROUPS stream名称# 查询stream信息 XINFO STREAM stream名称# 查询stream下指定分组的消费者信息 XINFO CONSUMERS stream名称 group名称1.3 其他 删除stream中的消息 XDEL stream名称 id1 id2 … 查询stream中的消息entry数量 XLEN stream名称 压缩stream中的消息数据量 XTRIM stream名称 MAXLEN 保留的最近消息数量 XTRIM stream名称 MINID 消息ID小于此ID的消息均会被删除 二、Redisson Stream 在Redisson中可通过Stream实现Redis Stream 场景1 相关示例代码如下 Test void testStream() throws InterruptedException {String streamName mystream;MyMessage2 myMessage this.buildMyMessageWithTimestampId();//获取StreamRStreamString, Object stream this.redisson.getStream(streamName);//发消息 - XADD mystream * name 我的消息 age 18StreamMessageId entryId stream.add(StreamAddArgs.entries(myMessage.toMap()));log.info(stream[{}] add success, id: {}, streamName, entryId);//读消息 - XREAD COUNT 5 BLOCK 5000 STREAMS mystream 0MapStreamMessageId, MapString, Object entries stream.read(StreamReadArgs.greaterThan(StreamMessageId.ALL).count(5).timeout(Duration.ofSeconds(5)));entries.forEach((id, entryMap) - {log.info(stream[{}] read message: id{}, entry: {}, streamName, id, entryMap);});//读取区间内消息 - XRANGE mystream 0 entryId COUNT 10entries stream.range(10, StreamMessageId.ALL, entryId);entries.forEach((id, entryMap) - {log.info(stream[{}] range message: id{}, entry: {}, streamName, id, entryMap);}); }场景2 相关示例代码如下 Resource private RedissonClient redisson;Test void testStreamGroup() throws InterruptedException {String streamName mystream;String groupName mygroup1;String consumerName c1;MyMessage2 myMessage this.buildMyMessageWithTimestampId();//获取StreamRStreamString, Object stream this.redisson.getStream(streamName);//发消息 - XADD mystream * name 我的消息 age 18StreamMessageId entryId stream.add(StreamAddArgs.entries(myMessage.toMap()));log.info(stream[{}] add success, id: {}, streamName, entryId);//查询已存在的分组 - XINFO GROUPS mystreamListStreamGroup streamGroups stream.listGroups();streamGroups.forEach(streamGroup - {log.info(stream[{}] listGroups groupName: {}, streamName, streamGroup.getName());});Boolean existGroup streamGroups.stream().anyMatch(group - groupName.equals(group.getName()));if (!existGroup) {//创建分组 - XGROUP CREATE mygroup1 \(stream.createGroup(StreamCreateGroupArgs.name(groupName)//此处id支持NEWEST即\)ALL即0.id(StreamMessageId.ALL));log.info(stream[{}] createGroup success, groupName: {}, streamName, groupName);}//读分组消息 - XREADGROUP GROUP mygroup1 c1 COUNT 5 BLOCK 5000 STREAMS mystream MapStreamMessageId, MapString, Object entries stream.readGroup(groupName, consumerName,//greaterThan即设置从哪个消息ID之后开始读取支持NEVER_DELIVERED即、ALL即0StreamReadGroupArgs.greaterThan(StreamMessageId.NEVER_DELIVERED).count(5).timeout(Duration.ofSeconds(5)));entries.forEach((id, entryMap) - {log.info(stream[{}] readGroup groupName: {}, consumerName: {}, message: id{}, entry: {},streamName, groupName, consumerName, id, entryMap);});//读取PEL中未确认的消息 - XPENDING mystream mygroup1 - 100 c1MapStreamMessageId, MapString, Object streamMessageIdMapMap stream.pendingRange(groupName, consumerName, StreamMessageId.MIN, StreamMessageId.MAX, 100);streamMessageIdMapMap.forEach((id, entryMap) - {log.info(stream[{}] pendingRange groupName: {}, consumerName: {}, message: id{}, entry: {},streamName, groupName, consumerName, id, entryMap);//确认消息从PEL中移除 - XACK mystream mygroup1 1600000000000-0stream.ack(groupName, id);log.info(stream[{}] ack groupName: {}, consumerName: {}, message: id{},streamName, groupName, consumerName, id);});} 参考 Redis Stream https://redis.io/docs/latest/develop/data-types/streams/ https://redis.io/docs/latest/commands/xreadgroup/ Redisson Stream https://github.com/redisson/redisson/wiki/7.-Distributed-collections#720-stream