舟山网站制作公司东莞哪家公司做网站好

当前位置: 首页 > news >正文

舟山网站制作公司,东莞哪家公司做网站好,龙华营销型网站建设,网站中的下拉菜单RocketMQ目前在国内应该是比较流行的MQ 了#xff0c;目前本人也在公司的项目中进行使用和研究#xff0c;借着这个机会#xff0c;分析一下RocketMQ 发送一条消息到存储一条消息的过程#xff0c;这样会对以后大家分析和研究RocketMQ相关的问题有一定的帮助。 分析的总体…RocketMQ目前在国内应该是比较流行的MQ 了目前本人也在公司的项目中进行使用和研究借着这个机会分析一下RocketMQ 发送一条消息到存储一条消息的过程这样会对以后大家分析和研究RocketMQ相关的问题有一定的帮助。 分析的总体技术范围发送到存储本文的主要目的是主要是为了认识一条消息并分析被发出且被存储的代码中关于 MQ 文件系统的优化设计等。 来自官方源码example的一段发送代码 DefaultMQProducer producer new DefaultMQProducer(ProducerGroupName); producer.start(); Message msg new Message(TopicTest, TagA, OrderID188, Hello world.getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult producer.send(msg); System.out.printf(%s%n, sendResult); producer.shutdown();直接看看send方法send 方法会设置一个默认的 timeout3秒。默认使用 SYNC 模式另外有Async和OneWay模式。需要处理方法签名中的 Client 端的异常网络异常Broker 端的异常线程中断异常。 DefaultMQProducerImpl 的 sendDefaultImpl方法就是发送的主要逻辑。 代码里有个地方可以提一下关于更新故障时间的策略RocketMQ有一个类 MQFaultStrategy用来处理MQ错误然后对 MQ Server 进行服务降级。 如果发送一条消息在550ms以内那么就不用降级如果550毫秒以外就进行容错降级熔断30 秒以此类推。 再看DefaultMQProducerImpl 的 sendKernelImpl发送到内核的方法实现。 先找到broker的地址。尝试压缩大于4M 的消息批量消息不压缩然后执行各种钩子。 Request对象存放数据Context 上下文对象存放调用上下文。 这里会设置一个消息生成时间即bornTimestamp后面使用消息轨迹的时候可以查看。 默认情况下如果采用SYNC 模式就调用 MQClientAPIImpl 来发送消息这一层还是在 Client 模块里在这一层会设置更详细的消息细节构造命令对象。最后调用 remotingClient的 invokeSync 发送消息。 MQClientAPIImpl的sendMessage这一层会给命令对象设置一个CmdCode叫SEND_MESSAGE这个东西就是一个和Broker的契约Broker会根据这个Code进行不同的策略。 Netty 会使用 Handler 处理出去的数据和返回的数据我们看看 Client 端 Netty 有哪些 Handler. Bootstrap handler this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, false).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()).option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()).handler(new ChannelInitializer() {public void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline ch.pipeline();if (nettyClientConfig.isUseTLS()) {if (null ! sslContext) {pipeline.addFirst(defaultEventExecutorGroup, sslHandler, sslContext.newHandler(ch.alloc()));log.info(Prepend SSL handler);} else {log.warn(Connections are insecure as SSLContext is null!);}}pipeline.addLast(defaultEventExecutorGroup,new NettyEncoder(),new NettyDecoder(),new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),new NettyConnectManageHandler(),new NettyClientHandler());}});使用了一个 EncoderDecoder空闲处理器连接管理器ClientHandler。 XXCoder就是对Cmd对象进行序列化和反序列化的这里的空闲使用的读写最大空闲时间为120s超过这个就会触发空闲事件。 RocketMQ就会关闭Channel 连接。而针对空闲事件进行处理的就是连接管理器了。连接管理器处理空闲、Close、Connect、异常等事件使用监听器模式不同的监听器对不同的事件进行处理。另外这里也许可以借鉴 EventBus每个事件可以设置多个监听器。 看了RocketMQ中 Netty 的设计再看看返回值处理就简单了NettyClientHandler 会在 channelRead0 方法处理 Netty Server 的返回值。对应 RMQ则是 processMessageReceived 方法。该方法很简洁 public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {final RemotingCommand cmd msg;if (cmd ! null) {switch (cmd.getType()) {case REQUEST_COMMAND:processRequestCommand(ctx, cmd);break;case RESPONSE_COMMAND:processResponseCommand(ctx, cmd);break;default:break;}}}其实这是一个模板方法固定算法由子类实现分为 Request 实现和 Response 实现。我们看看 Response 实现。 public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {final int opaque cmd.getOpaque();final ResponseFuture responseFuture responseTable.get(opaque);if (responseFuture ! null) {responseFuture.setResponseCommand(cmd);responseTable.remove(opaque);if (responseFuture.getInvokeCallback() ! null) {executeInvokeCallback(responseFuture);} else {responseFuture.putResponse(cmd);responseFuture.release();}} else {log.warn(receive response, but not matched any request, RemotingHelper.parseChannelRemoteAddr(ctx.channel()));log.warn(cmd.toString());}}通过 cmd 对象的 Request ID 找到 Feature执行 responseFuture.putResponse设置返回值唤醒阻塞等待的发送线程。 这里还有一个 release 调用这个和异步发送有关默认最大同时 65535 个异步请求具体就不展开了。 到这里唤醒阻塞的发送线程返回数据客户端层面的发送就结束了。 看源码看到有个 SEND_MESSAGE Code是 Client 和 Broker Server 的一个约定代码我们看看这个代码在哪里用的。 在 broker 模块的 BrokerController 类中有个 registerProcessor 方法会将 SEND_MESSAGE Code 和一个 SendMessageProcessor 对象绑定。 NettyRemotingServer是处理Request 的类ServerBootstrap 会在 pipeline 中添加一个 NettyServerHandler处理器这个处理器的channelRead0方法会调用 NettyRemotingServer的父类processMessageReceived 方法。 从processorTable 里根据 Cmd Code也就是 SEND_MESSAGE 获取对应的 Processor 一部分是处理数据的对象一部分是这个对象所对应的线程池。用于异步处理逻辑防止阻塞 Netty IO线程。 doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); final RemotingCommand response pair.getObject1().processRequest(ctx, cmd); doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);前后都是执行一些钩子例如 ACL RocketMQ会有一个 BrokerController 类会注册 Code 和 Processor 的绑定关系BrokerController 也会把这些绑定注册到 Netty Server 中当 Netty Server 从 Socket 收到 Cmd 对象根据 Cmd 对象的 Code就可以找到对应 Processor 类对数据进行处理。 中间是处理 Request请求的。这个 processRequest 方法有很多的实现SendMessageProcessor的sendMessage 是处理消息的主要逻辑。 消息存储引擎这里我们看DefaultMessageStore的putMessage 实现。 putMessageResult this.brokerController.getMessageStore().putMessage(msgInner);由于RocketMQ写数据是PageCache里面写的因此如果写的慢就是 PageCache 忙这里忙的标准是如果锁文件的时间超过了 1 秒那就是忙。 if (this.isOSPageCacheBusy()) {return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null); }最后调用 PutMessageResult result this.commitLog.putMessage(msg) 写数据。如果耗时超过 500 毫秒就会打印日志。这样我们排查问题的时候可以看看 storeStats 的日志。 result mappedFile.appendMessage(msg, this.appendMessageCallback)写完之后释放锁如果超过 500 毫秒打印 cost time 日志。 处理刷盘和slave 同步这里看刷盘策略和同步策略是 SYNC 还是 ASYNC。经过我的测试同步刷盘和异步刷盘的性能差距是 10 倍。 而 Slave 的数据同步如果用 SYNC 模式tps 最高也就 2000 多一丢度为什么内网两台机器 ping 一下都要 0.2 毫秒一秒最多 5000 次再加上处理逻辑 2000 已经到顶了网络成了瓶颈。 我们看看 mappedFile.appendMessage 方法的实现。一路追踪有个关键逻辑 在 appendMessagesInner 里 int currentPos this.wrotePosition.get(); if (currentPos this.fileSize) {ByteBuffer byteBuffer writeBuffer ! null ? writeBuffer.slice() : this.mappedByteBuffer.slice();byteBuffer.position(currentPos);AppendMessageResult result null;if (messageExt instanceof MessageExtBrokerInner) {result cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);} else if (messageExt instanceof MessageExtBatch) {result cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);} else {return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}this.wrotePosition.addAndGet(result.getWroteBytes());this.storeTimestamp result.getStoreTimestamp();return result; }代码中使用了 mappedFile 从 Linux 映射的 MMap buffer对数据进行写入。我们看看 doAppend 方法。 如果是 SYNC 模式执行 CommitLog 的 handleDiskFlush 的方法时就会立刻刷盘并等待刷盘结果。如果是 ASYNC 模式执行 CommitLog 的 handleDiskFlush 的方法时会通知异步线程进行刷盘但不等待结果。 如果没有新数据则为 500ms 执行一次刷盘策略。 简单说下异步刷盘 默认刷盘 4 页Linux 一页是 4kb 数据4页就是 16kb。 如果写的数据减去已经刷的数据剩下的数据大于等于 4 页就执行刷盘执行 mappedByteBuffer.force() 或者 fileChannel.force(false); 分享资源 获取以上资源请访问开源项目 点击跳转