深圳个人网站制作阿里云服务器做网站django

当前位置: 首页 > news >正文

深圳个人网站制作,阿里云服务器做网站django,直播平台网站建设,杭州建设网站需要多少钱欢迎来到啾啾的博客#x1f431;。 记录学习点滴。分享工作思考和实用技巧#xff0c;偶尔也分享一些杂谈#x1f4ac;。 有很多很多不足的地方#xff0c;欢迎评论交流#xff0c;感谢您的阅读和评论#x1f604;。 目录 1 引言2 缓冲区2.1 消息在Partition内有序2.2 批… 欢迎来到啾啾的博客。 记录学习点滴。分享工作思考和实用技巧偶尔也分享一些杂谈。 有很多很多不足的地方欢迎评论交流感谢您的阅读和评论。 目录 1 引言2 缓冲区2.1 消息在Partition内有序2.2 批次消息ProducerBatch2.2.1 内存分配2.2.2 线程安全 3 发送消息Sender4 总结 1 引言 继续看Kafka源码看其是如何批量发送消息的。 2 缓冲区 当调用producer.send(record)时消息将先到缓冲区在缓冲区按照目标的Topic-Partition进行组织满足以条件后随批次发送给Broker。 // KafkaProducer.java public FutureRecordMetadata send(ProducerRecordK, V record, Callback callback) {// … 省略了部分代码 …return doSend(record, callback); // 转交给 doSend 方法 }private FutureRecordMetadata doSend(ProducerRecordK, V record, Callback callback) {// …// 1. 等待元数据更新如果需要的话// …// 2.【核心步骤】调用 RecordAccumulator 的 append 方法RecordAccumulator.RecordAppendResult result accumulator.append(tp,timestamp, key, value, headers, interceptors, remainingWaitMs);// …// 3. 唤醒 Sender 线程告诉他“可能有新活儿干了”this.sender.wakeup();// …return result.future; }我们看一下“缓冲区”RecordAccumulator。
2.1 消息在Partition内有序 RecordAccumulator维护了一个数据结构 private final ConcurrentMapTopicPartition, DequeProducerBatch batchesappend代码简化如下 // RecordAccumulator.java public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers, …) {// 1. 获取该分区的批次队列 batches中获取没有则创建DequeProducerBatch dq getOrCreateDeque(tp);synchronized (dq) { // 对该分区的队列加锁保证线程安全// 2. 尝试追加到最后一个当前活跃的批次中ProducerBatch last dq.peekLast();if (last ! null) {FutureRecordMetadata future last.tryAppend(timestamp, key, value, headers, …);if (future ! null) {// 如果追加成功直接返回return new RecordAppendResult(future, dq.size() 1 || last.isFull(), false);}}// 3. 如果最后一个批次满了或者不存在就需要一个新的批次// 从 BufferPool 申请一块内存大小由 batch.size 配置决定ByteBuffer buffer free.allocate(batchSize, maxTimeToBlock);// 4. 创建一个新的 ProducerBatch (货运箱)ProducerBatch batch new ProducerBatch(tp, memoryRecordsBuilder, now);FutureRecordMetadata future batch.tryAppend(timestamp, key, value, headers, …); // 把当前消息放进去// 5. 将新的批次加入到队列的末尾dq.addLast(batch);// …return new RecordAppendResult(future, …);} }可以看到batches的value类型为Deque所以生产者可以维护发送时partition内的顺序结构。 但是在网络抖动时这样做还是不够时序性还是难以保障所以生产者还有别的配置: 每个连接上允许发送的未确认请求的最大数量 max.in.flight.requests.per.connection当 max.in.flight.requests.per.connection 1 时Sender 线程在发送完 Batch-1 后会阻塞自己直到 Batch-1 的请求得到响应成功或失败它绝不会在此期间发送 Batch-2。这样一来即使 Batch-1 需要重试Batch-2 也只能乖乖地在后面排队。这就从根本上杜绝了因重试导致乱序的可能。默认 max.in.flight.requests.per.connection 5。即它允许 Producer 在还没收到 Batch-1 的 ACK 时就继续发送 Batch-2、3、4、5。这极大地提升了吞吐量不用傻等但牺牲了顺序性。 一般max.in.flight.requests.per.connection还需要与生产者幂等性配合。 enable.idempotence true开启幂等后Producer 会被分配一个唯一的 Producer ID (PID)并且它发送的每一批消息都会带上一个从0开始递增的序列号。Broker 端会为每个 TopicPartition 维护这个 PID 和序列号。如果收到的消息序列号不是预期的下一个Broker 就会拒绝它。 // NetworkClient.java// 这个方法判断我们是否可以向某个节点发送更多数据 Override public boolean isReady(Node node, long now) {// … 省略了连接状态的检查 …// 检查在途请求数是否小于该连接配置的上限return !connectionStates.isBlackedOut(node.idString(), now) canSendRequest(node.idString(), now); }// canSendRequest 方法内部会调用 inFlightRequests.canSendMore() // InFlightRequests.java public boolean canSendMore(String nodeId) {// this.requests 是一个 MapString, DequeNetworkClient.InFlightRequest// 它记录了每个节点上所有在途已发送但未收到响应的请求DequeInFlightRequest queue requests.get(nodeId);// 如果队列为空当然可以发送if (queue null) {return true;}// 将在途请求数 与 从配置中读到的max.in.flight.requests.per.connection比较// this.maxInFlightRequestsPerConnection 就是你配置的那个值return queue.size() this.maxInFlightRequestsPerConnection; }2.2 批次消息ProducerBatch 可以看到在结构private final ConcurrentMapTopicPartition, DequeProducerBatch batches中批次消息被封装为ProducerBatch。 2.2.1 内存分配 这个类的核心是MemoryRecordsBuilder。 总所周知频繁地创建和销毁对象特别是大块的byte[]对GC非常不友好。MemoryRecordsBuilder内部管理者一个巨大的、连续的ByteBuffer。 这个 ByteBuffer 不是每次创建 ProducerBatch 时都 new 出来的。它是在 RecordAccumulator 初始化时(我们在上面的RecordAccumulator中有看到BufferPool)从一个叫 BufferPool 的内存池中借用 (allocate) 的。当 ProducerBatch 发送完毕这块内存会归还 (deallocate) 给池子供下一个 ProducerBatch 复用 当你调用 tryAppend 添加消息时消息的 key, value 等内容被直接序列化成字节并写入到这个 ByteBuffer 的末尾。它不是在发送时才做序列化而是在追加时就完成了。 池化对于那些需要频繁创建和销毁的、生命周期短暂的、昂贵的对象如数据库连接、线程、大块内存一定要使用池化技术。这能极大地降低GC压力提升系统稳定性。 Redis和Kafka都有共同的高频内存使用的特性也都设计了预分配和复用。Kafka生产者与其用多少申请多少不如一次性申请一块大内存然后通过内部的指针移动position, limit来管理这块内存的使用。 2.2.2 线程安全 ProducerBatch 会被多个线程访问 你的业务线程Producer主线程调用 tryAppend() 往里面写数据。Sender 线程检查它是否已满 (isFull)、是否超时 (isExpired)并最终把它发送出去。 ProducerBatch 内部有一个精密的“状态机”并用 volatile 和 synchronized 保护。 // ProducerBatch.java (简化后) private final ListThunk thunks; private final MemoryRecordsBuilder recordsBuilder;// 【关键状态】volatile 保证了多线程间的可见性 private volatile boolean closed; private int appends; // 记录追加次数public FutureRecordMetadata tryAppend(…) {// 【关键检查】在方法入口处检查状态快速失败if (this.closed) {return null; }// … 将消息写入 recordsBuilder …// … }// 这个方法会被 Sender 线程调用 public void close() {this.closed true; }// 当批次被确认后由 Sender 线程调用 public void done(long baseOffset, long logAppendTime, RuntimeException exception) {// for-each 循环是线程安全的因为 thunks 列表在 close 之后就不再被修改for (Thunk thunk : this.thunks) {try {// 【核心】执行每个 send() 调用对应的回调函数thunk.callback.onCompletion(metadata, exception);} catch (Exception e) {// …}} }总的来说是职责分离最小化锁的设计以保证线程安全。 3 发送消息Sender Sender 是一个实现了 Runnable 接口的类它在一个独立的线程里无限循环最终发送消息。 // Sender.java public void run() {while (running) {try {runOnce();} catch (Exception e) {// …}} }void runOnce() {// …// 1. 【核心】找出所有可以发送的批次// linger.ms 决定了可以等待的最长时间RecordAccumulator.ReadyCheckResult result this.accumulator.ready(cluster, now);// 2. 如果有准备好的节点分区就发送它们if (!result.readyNodes.isEmpty()) {// …// 从累加器中“榨干”所有准备好的批次MapInteger, ListProducerBatch batches this.accumulator.drain(cluster, result.readyNodes, …);// …// 将批次转换成网络请求并发送sendProducerData(now, … , batches);}// … }RecordAccumulator.ready() 方法是决定何时发送的关键。它会遍历所有的 ProducerBatch满足以下任意一个条件的批次就会被认为是 “ready”准备就绪 批次已满批次大小达到了 batch.size。等待超时批次从创建到现在等待的时间超过了 linger.ms。其他原因比如 Producer 被关闭或者有新的 Producer 加入导致需要立即发送等。 Sender 的工作模式是 不断地问accumulator.ready()有到linger.ms时间的或者装满batch.size的批次没有。然后依据节点列表通过NetworkClient发送ProducerBatch到Kafka Broker。 4 总结 Kafka Producer 在客户端内部通过 RecordAccumulator 维护了一个按 TopicPartition 分类的内存缓冲区。当用户调用 send() 方法时消息并不会立即发送而是被追加到对应分区的某个 ProducerBatch 中。一个独立的 Sender 线程在后台运行它会持续检查 RecordAccumulator 中的批次一旦某个批次满足了“大小达到 batch.size”或“等待时间超过 linger.ms”这两个条件之一Sender 线程就会将这个批次以及其他所有准备好的批次一同取出打包成一个请求通过网络一次性发送给 Broker从而实现批量发送极大地提升了吞吐能力。 这个设计是经典的 “空间换时间” 和 “攒一批再处理” 的思想通过牺牲一点点延迟linger.ms换取了巨大的吞吐量提升。理解了这个机制你就能更好地去配置 batch.size 和 linger.ms 这两个核心参数以平衡你的业务对吞吐和延迟的需求。