有趣网站建设无聊做网站的技术

当前位置: 首页 > news >正文

有趣网站建设无聊,做网站的技术,设计托管网站建设,wordpress淘客宝主题- 消息队列的核心价值 - 解耦合。 异步处理 例如电商平台#xff0c;秒杀活动。一般流程会分为#xff1a;1: 风险控制、2#xff1a;库存锁定、3#xff1a;生成订单、4#xff1a;短信通知、5#xff1a;更新数据。 通过消息系统将秒杀活动业务拆分开#x…-     消息队列的核心价值    -  解耦合。 异步处理 例如电商平台秒杀活动。一般流程会分为1: 风险控制、2库存锁定、3生成订单、4短信通知、5更新数据。 通过消息系统将秒杀活动业务拆分开将不急需处理的业务放在后面慢慢处理流程改为1风险控制、2库存锁定、3:消息系统、4:生成订单、5短信通知、6更新数据。 流量的控制 1. 网关在接受到请求后就把请求放入到消息队列里面 2.后端的服务从消息队列里面获取到请求完成后续的秒杀处理流程。然后再给用户返回结果。优点控制了流量 缺点会让流程变慢。 -     Kafka 核心概念    -  生产者Producer 往Kafka集群生成数据消费者Consumer 往Kafka里面去获取数据处理数据、消费数据Kafka的数据是由消费者自己去拉去Kafka里面的数据主题topic分区partition 默认一个topic有一个分区partition自己可设置多个分区分区分散存储在服务器不同节点上。 -     集群架构    -  Kafka集群中一个kafka服务器就是一个broker Topic只是逻辑上的概念partition在磁盘上就体现为一个目录Consumer Group消费组 消费数据的时候都必须指定一个group id指定一个组的id假定程序A和程序B指定的group id号一样那么两个程序就属于同一个消费组特殊。 比如有一个主题topicA程序A去消费了这个topicA那么程序B就不能再去消费topicA程序A和程序B属于一个消费组 再比如程序A已经消费了topicA里面的数据现在还是重新再次消费topicA的数据是不可以的但是重新指定一个group id号以后可以消费。不同消费组之间没有影响。 消费组需自定义消费者名称程序自动生成独一无二。ControllerKafka节点里面的一个主节点。 -     数据性能    -  kafka写数据顺序写往磁盘上写数据时就是追加数据没有随机写的操作。经验: 如果一个服务器磁盘达到一定的个数磁盘也达到一定转数往磁盘里面顺序写追加写数据的速度和写内存的速度差不多生产者生产消息经过kafka服务先写到os cache 内存中然后经过sync顺序写到磁盘上。 -     零拷贝数据高性能    -  消费者读取数据流程 消费者发送请求给kafka服务 kafka服务去os cache缓存读取数据缓存没有就去磁盘读取数据 从磁盘读取了数据到os cache缓存中 os cache复制数据到kafka应用程序中 kafka将数据复制发送到socket cache中 socket cache通过网卡传输给消费者。
kafka linux sendfile技术 — 零拷贝 1.消费者发送请求给kafka服务
2.kafka服务去os cache缓存读取数据缓存没有就去磁盘读取数据
3.从磁盘读取了数据到os cache缓存中
4.os cache直接将数据发送给网卡
5.通过网卡将数据传输给消费者。 -     Kafka 日志分段保存    -  Kafka中一个主题一般会设置分区比如创建了一个topic_a然后创建的时候指定了这个主题有三个分区。其实在三台服务器上会创建三个目录。服务器1kafka1创建目录topic_a-0:。 目录下面是我们文件存储数据kafka数据就是message数据存储在log文件里。.log结尾的就是日志文件在kafka中把数据文件就叫做日志文件 。一个分区下面默认有n多个日志文件分段存储一个日志文件默认1G。 服务器2kafka2创建目录topic_a-1: 服务器3kafka3创建目录topic_a-2。另外搜索公众号Linux中文社区后台回复“私房菜”获取一份惊喜礼包。 -     二分查找定位数据    -  Kafka里面每一条消息都有自己的offset相对偏移量存在物理磁盘上面在position Position物理位置磁盘上面哪个地方也就是说一条消息就有两个位置offset相对偏移量相对位置position磁盘物理位置稀疏索引         Kafka中采用了稀疏索引的方式读取索引kafka每当写入了4k大小的日志.log就往index里写入一个记录索引。其中会采用二分查找 -     高并发网络设计 NIO    -  网络设计部分是kafka中设计最好的一个部分这也是保证Kafka高并发、高性能的原因对kafka进行调优就得对kafka原理比较了解尤其是网络设计部分。 Reactor 网络设计模式1 Reactor网络设计模式2 Reactor网络设计模式3 Kafka超高并发网络设计 -     Kafka 冗余副本保证高可用    -  在kafka里面分区是有副本的注0.8以前是没有副本机制的。创建主题时可以指定分区也可以指定副本个数。副本是有角色的leader partition1、写数据、读数据操作都是从leader partition去操作的。 它会维护一个ISRin-sync- replica 列表但是会根据一定的规则删除ISR列表里面的值 生产者发送来一个消息消息首先要写入到leader partition中 写完了以后还要把消息写入到ISR列表里面的其它分区写完后才算这个消息提交 follower partition从leader partition同步数据。 -     优秀架构思考    -  Kafka — 高并发、高可用、高性能 高可用多副本机制 高并发网络架构设计 三层架构多selector - 多线程 - 队列的设计NIO 高性能写数据 把数据先写入到OS Cache 写到磁盘上面是顺序写性能很高 读数据 根据稀疏索引快速定位到要消费的数据 零拷贝机制 减少数据的拷贝 减少了应用程序与操作系统上下文切换 -     Kafka 生产环境搭建    -  需求场景分析 电商平台需要每天10亿请求都要发送到Kafka集群上面。二八反正一般评估出来问题都不大。10亿请求 - 24 过来的一般情况下每天的12:00 到早上8:00 这段时间其实是没有多大的数据量的。80%的请求是用的另外16小时的处理的。16个小时处理 - 8亿的请求。16 * 0.2 3个小时 处理了8亿请求的80%的数据。 也就是说6亿的数据是靠3个小时处理完的。我们简单的算一下高峰期时候的qps6亿/3小时 5.5万/s qps5.5万。 10亿请求 * 50kb 46T 每天需要存储46T的数据。 一般情况下我们都会设置两个副本 46T * 2 92T Kafka里面的数据是有保留的时间周期保留最近3天的数据。92T * 3天 276T我这儿说的是50kb不是说一条消息就是50kb不是把日志合并了多条日志合并在一起通常情况下一条消息就几b也有可能就是几百字节。 -     物理机数量评估    -  1首先分析一下是需要虚拟机还是物理机 像Kafka mysql hadoop这些集群搭建的时候我们生产里面都是使用物理机。 2高峰期需要处理的请求总的请求每秒5.5万个其实一两台物理机绝对是可以抗住的。一般情况下我们评估机器的时候是按照高峰期的4倍的去评估。如果是4倍的话大概我们集群的能力要准备到 20万qps。这样子的集群才是比较安全的集群。大概就需要5台物理机。每台承受4万请求。 场景总结搞定10亿请求高峰期5.5万的qps,276T的数据需要5台物理机。 -     磁盘选择    -  搞定10亿请求高峰期5.5万的qps,276T的数据需要5台物理机。 1SSD固态硬盘还是需要普通的机械硬盘SSD硬盘性能比较好但是价格贵 SAS盘某方面性能不是很好但是比较便宜。SSD硬盘性能比较好指的是它随机读写的性能比较好。适合MySQL这样集群。但是其实他的顺序写的性能跟SAS盘差不多。 kafka的理解就是用的顺序写。所以我们就用普通的【机械硬盘】就可以了。 2需要我们评估每台服务器需要多少块磁盘 5台服务器一共需要276T 大约每台服务器 需要存储60T的数据。我们公司里面服务器的配置用的是 11块硬盘每个硬盘 7T。11 * 7T 77T。 77T * 5 台服务器 385T。 场景总结 搞定10亿请求需要5台物理机11SAS * 7T。 -     内存评估    -  搞定10亿请求需要5台物理机11SAS * 7T。 我们发现kafka读写数据的流程 都是基于os cache,换句话说假设咱们的os cashe无限大那么整个kafka是不是相当于就是基于内存去操作如果是基于内存去操作性能肯定很好。内存是有限的。 1尽可能多的内存资源要给 os cache。 2Kafka的代码用 核心的代码用的是scala写的客户端的代码java写的。都是基于jvm。所以我们还要给一部分的内存给jvm。Kafka的设计没有把很多数据结构都放在jvm里面。所以我们的这个jvm不需要太大的内存。根据经验给个10G就可以了。 NameNode: jvm里面还放了元数据几十GJVM一定要给得很大。比如给个100G。 假设我们这个10请求的这个项目一共会有100个topic。100 topic * 5 partition * 2 1000 partition 一个partition其实就是物理机上面的一个目录这个目录下面会有很多个.log的文件。 .log就是存储数据文件默认情况下一个.log文件的大小是1G。我们如果要保证 1000个partition 的最新的.log 文件的数据 如果都在内存里面这个时候性能就是最好。1000 * 1G 1000G内存. 我们只需要把当前最新的这个log 保证里面的25%的最新的数据在内存里面。250M * 1000 0.25 G* 1000 250G的内存。 250内存 / 5 50G内存 50G10G 60G内存。 64G的内存另外的4G操作系统本生是不是也需要内存。其实Kafka的jvm也可以不用给到10G这么多。评估出来64G是可以的。当然如果能给到128G的内存的服务器那就最好。 我刚刚评估的时候用的都是一个topic是5个partition但是如果是数据量比较大的topic可能会有10个partition。 总结搞定10亿请求需要5台物理机11SAS * 7T 需要64G的内存128G更好 -     CPU 压力评估    -  评估一下每台服务器需要多少cpu core(资源很有限)。 我们评估需要多少个cpu 依据就是看我们的服务里面有多少线程去跑。线程就是依托cpu 去运行的。如果我们的线程比较多但是cpu core比较少这样的话我们的机器负载就会很高性能不就不好。 评估一下kafka的一台服务器 启动以后会有多少线程 Acceptor线程 1 processor线程 3 6~9个线程 处理请求线程 8个 32个线程 定时清理的线程拉取数据的线程定时检查ISR列表的机制 等等。所以大概一个Kafka的服务启动起来以后会有一百多个线程。 cpu core 4个一遍来说几十个线程就肯定把cpu 打满了。cpu core 8个应该很轻松的能支持几十个线程。如果我们的线程是100多个或者差不多200个那么8 个 cpu core是搞不定的。所以我们这儿建议CPU core 16个。如果可以的话能有32个cpu core 那就最好。 结论kafka集群最低也要给16个cpu core如果能给到32 cpu core那就更好。2cpu * 8 16 cpu core 4cpu * 8 32 cpu core。 总结搞定10亿请求需要5台物理机11SAS * 7T 需要64G的内存128G更好需要16个cpu core32个更好。 -     网络需求评估    -  评估我们需要什么样网卡一般要么是千兆的网卡1G/s还有的就是万兆的网卡10G/s。 高峰期的时候 每秒会有5.5万的请求涌入5.55  大约是每台服务器会有1万个请求涌入。我们之前说的10000 * 50kb 488M  也就是每条服务器每秒要接受488M的数据。数据还要有副本副本之间的同步也是走的网络的请求。488 * 2 976m/s说明一下   很多公司的数据一个请求里面是没有50kb这么大的我们公司是因为主机在生产端封装了数据   然后把多条数据合并在一起了所以我们的一个请求才会有这么大。   说明一下   一般情况下网卡的带宽是达不到极限的如果是千兆的网卡我们能用的一般就是700M左右。   但是如果最好的情况我们还是使用万兆的网卡。   如果使用的是万兆的那就是很轻松。 -     集群规划    -  请求量规划物理机的个数 分析磁盘的个数选择使用什么样的磁盘 内存 cpu core 网卡就是告诉大家以后要是公司里面有什么需求进行资源的评估服务器的评估大家按照我的思路去评估 一条消息的大小 50kb - 1kb 500byte 1Mip 主机名 192.168.0.100 hadoop1 192.168.0.101 hadoop2 192.168.0.102 hadoop3。 主机的规划kafka集群架构的时候主从式的架构controller - 通过zk集群来管理整个集群的元数据。 zookeeper集群 hadoop1 hadoop2 hadoop3 kafka集群 理论上来讲我们不应该把kafka的服务于zk的服务安装在一起。但是我们这儿服务器有限。所以我们kafka集群也是安装在hadoop1 haadoop2 hadoop3。 -     Kafka 运维工具与命令    -  KafkaManager — 页面管理工具。 场景一topic数据量太大要增加topic数。 一开始创建主题的时候数据量不大给的分区数不多。 kafka-topics.sh –create –zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 –replication-factor 1 –partitions 1 –topic test6kafka-topics.sh –alter –zookeeper hadoop1:2181,hadoop2:2181,ha broker id hadoop1:0 hadoop2:1 hadoop3:2 假设一个partition有三个副本partition0a,b,c aleader partition bc:follower partition ISR:{a,b,c}如果一个follower分区 超过10秒 没有向leader partition去拉取数据那么这个分区就从ISR列表里面移除。 场景二核心topic增加副本因子 如果对核心业务数据需要增加副本因子 vim test.json脚本将下面一行json脚本保存 {“version”:1,“partitions”:[{“topic”:“test6”,“partition”:0,“replicas”:[0,1,2]},{“topic”:“test6”,“partition”:1,“replicas”:[0,1,2]},{“topic”:“test6”,“partition”:2,“replicas”:[0,1,2]}]} 执行上面json脚本 kafka-reassign-partitions.sh –zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 –reassignment-json-file test.json –execute 场景三负载不均衡的topic手动迁移vi topics-to-move.json {“topics”: [{“topic”: “test01”}, {“topic”: “test02”}], “version”: 1} // 把你所有的topic都写在这里 kafka-reassgin-partitions.sh –zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 –topics-to-move-json-file topics-to-move.json –broker-list “5,6” –generate 把你所有的包括新加入的broker机器都写在这里就会说是把所有的partition均匀的分散在各个broker上包括新进来的broker此时会生成一个迁移方案可以保存到一个文件里去expand-cluster-reassignment.json kafka-reassign-partitions.sh –zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 –reassignment-json-file expand-cluster-reassignment.json –executekafka-reassign-partitions.sh –zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 –reassignment-json-file expand-cluster-reassignment.json –verify 这种数据迁移操作一定要在晚上低峰的时候来做因为他会在机器之间迁移数据非常的占用带宽资源–generate: 根据给予的Topic列表和Broker列表生成迁移计划。generate并不会真正进行消息迁移而是将消息迁移计划计算出来供execute命令使用。–execute: 根据给予的消息迁移计划进行迁移。–verify: 检查消息是否已经迁移完成。 场景四如果某个broker leader partition过多 正常情况下我们的leader partition在服务器之间是负载均衡。hadoop1 4 hadoop2 1 hadoop3 1。 现在各个业务方可以自行申请创建topic分区数量都是自动分配和后续动态调整的 kafka本身会自动把leader partition均匀分散在各个机器上这样可以保证每台机器的读写吞吐量都是均匀的但是也有例外。 那就是如果某些broker宕机会导致leader partition过于集中在其他少部分几台broker上 这会导致少数几台broker的读写请求压力过高其他宕机的broker重启之后都是folloer partition读写请求很低造成集群负载不均衡有一个参数auto.leader.rebalance.enable。 默认是true每隔300秒leader.imbalance.check.interval.seconds检查leader负载是否平衡 如果一台broker上的不均衡的leader超过了10%leader.imbalance.per.broker.percentage 就会对这个broker进行选举 配置参数auto.leader.rebalance.enable 默认是true leader.imbalance.per.broker.percentage: 每个broker允许的不平衡的leader的比率。如果每个broker超过了这个值控制器会触发leader的平衡。 这个值表示百分比。10% leader.imbalance.check.interval.seconds默认值300秒。 -     Kafka 生产者发消息原理    -  生产者发送消息原理—基础案例演示 -     如何提升吞吐量    -  如何提升吞吐量参数一buffer.memory设置发送消息的缓冲区默认值是33554432就是32MB 参数二compression.type默认是none不压缩但是也可以使用lz4压缩效率还是不错的压缩之后可以减小数据量提升吞吐量但是会加大producer端的cpu开销 参数三batch.size设置batch的大小如果batch太小会导致频繁网络请求吞吐量下降。 如果batch太大会导致一条消息需要等待很久才能被发送出去而且会让内存缓冲区有很大压力过多数据缓冲在内存里默认值是16384就是16kb也就是一个batch满了16kb就发送出去一般在实际生产环境这个batch的值可以增大一些来提升吞吐量如果一个批次设置大了会有延迟。 一般根据一条消息大小来设置。如果我们消息比较少。配合使用的参数linger.ms这个值默认是0意思就是消息必须立即被发送但是这是不对的一般设置一个100毫秒之类的这样的话就是说这个消息被发送出去后进入一个batch如果100毫秒内这个batch满了16kb自然就会发送出去。 -     如何处理异常    -  LeaderNotAvailableException这个就是如果某台机器挂了此时leader副本不可用会导致你写入失败要等待其他follower副本切换为leader副本之后才能继续写入此时可以重试发送即可如果说你平时重启kafka的broker进程肯定会导致leader切换一定会导致你写入报错是LeaderNotAvailableException。 NotControllerException这个也是同理如果说Controller所在Broker挂了那么此时会有问题需要等待Controller重新选举此时也是一样就是重试即可。 NetworkException网络异常 timeout a. 配置retries参数他会自动重试的 b. 但是如果重试几次之后还是不行就会提供Exception给我们来处理了,我们获取到异常以后再对这个消息进行单独处理。我们会有备用的链路。发送不成功的消息发送到Redis或者写到文件系统中甚至是丢弃。 -     重试机制    -  重试会带来一些问题 消息会重复有的时候一些leader切换之类的问题需要进行重试设置retries即可但是消息重试会导致,重复发送的问题比如说网络抖动一下导致他以为没成功就重试了其实人家都成功了。 消息乱序消息重试是可能导致消息的乱序的因为可能排在你后面的消息都发送出去了。所以可以使用max.in.flight.requests.per.connection参数设置为1 这样可以保证producer同一时间只能发送一条消息。两次重试的间隔默认是100毫秒用retry.backoff.ms来进行设置 基本上在开发过程中靠重试机制基本就可以搞定95%的异常问题。 -     ACK 参数详情    -  producer端设置的 request.required.acks0只要请求已发送出去就算是发送完了不关心有没有写成功。性能很好如果是对一些日志进行分析可以承受丢数据的情况用这个参数性能会很好。request.required.acks1发送一条消息当leader partition写入成功以后才算写入成功。 不过这种方式也有丢数据的可能。request.required.acks-1需要ISR列表里面所有副本都写完以后这条消息才算写入成功。ISR1个副本。1 leader partition 1 follower partition kafka服务端min.insync.replicas1 如果我们不设置的话默认这个值是1 一个leader partition会维护一个ISR列表这个值就是限制ISR列表里面 至少得有几个副本比如这个值是2那么当ISR列表里面只有一个副本的时候。另外搜索公众号后端架构师后台回复“架构整洁”获取一份惊喜礼包。 往这个分区插入数据的时候会报错。设计一个不丢数据的方案数据不丢失的方案1)分区副本 2 2)acks -1 3)min.insync.replicas 2 还有可能就是发送有异常对异常进行处理。 -     自定义分区    -  分区1、没有设置key我们的消息就会被轮训的发送到不同的分区。2、设置了keykafka自带的分区器会根据key计算出来一个hash值这个hash值会对应某一个分区。如果key相同的那么hash值必然相同key相同的值必然是会被发送到同一个分区。但是有些比较特殊的时候我们就需要自定义分区了 public class HotDataPartitioner implements Partitioner {private Random random;Overridepublic void configure(MapString, ? configs) {random  new Random();}Overridepublic int partition(String topic, Object keyObj, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {String key  (String)keyObj;List partitionInfoList  cluster.availablePartitionsForTopic(topic);//获取到分区的个数 0,12int partitionCount  partitionInfoList.size();//最后一个分区int hotDataPartition  partitionCount - 1;return !key.contains(“hot_data”) ? random.nextInt(partitionCount - 1) : hotDataPartition;}} 如何使用配置上这个类即可props.put(”partitioner.class”, “com.zhss.HotDataPartitioner”); -     综合案例演示    -  消费组概念groupid相同就属于同一个消费组。 1每个consumer都要属于一个consumer.group就是一个消费组topic的一个分区只会分配给 一个消费组下的一个consumer来处理每个consumer可能会分配多个分区也有可能某个consumer没有分配到任何分区。 2如果想要实现一个广播的效果那只需要使用不同的group id去消费就可以。topicA: partition0、partition1 groupAconsumer1:消费 partition0 consuemr2:消费 partition1 consuemr3:消费不到数据 groupB: consuemr3:消费到partition0和partition1 3如果consumer group中某个消费者挂了此时会自动把分配给他的分区交给其他的消费者如果他又重启了那么又会把一些分区重新交还给他。 -     Kafka 消费组概念    -  groupid 相同就属于同一个消费组。 1每个consumer都要属于一个consumer.group就是一个消费组topic的一个分区只会分配给 一个消费组下的一个consumer来处理每个consumer可能会分配多个分区也有可能某个consumer没有分配到任何分区。 2如果想要实现一个广播的效果那只需要使用不同的group id去消费就可以。topicA: partition0、partition1 groupAconsumer1:消费 partition0 consuemr2:消费 partition1 consuemr3:消费不到数据 groupB: consuemr3:消费到partition0和partition1 3如果consumer group中某个消费者挂了此时会自动把分配给他的分区交给其他的消费者如果他又重启了那么又会把一些分区重新交还给他。 基础案例演示 -     偏移量管理    -  每个consumer内存里数据结构保存对每个topic的每个分区的消费offset定期会提交offset老版本是写入zk但是那样高并发请求zk是不合理的架构设计zk是做分布式系统的协调的轻量级的元数据存储不能负责高并发读写作为数据存储。 现在新的版本提交offset发送给kafka内部topic__consumer_offsets提交过去的时候 key是group.idtopic分区号value就是当前offset的值每隔一段时间kafka内部会对这个topic进行compact(合并)也就是每个group.idtopic分区号就保留最新数据。 consumer_offsets可能会接收高并发的请求所以默认分区50个(leader partitiron - 50 kafka)这样如果你的kafka部署了一个大的集群比如有50台机器就可以用50台机器来抗offset提交的请求压力. 消费者 - broker端的数据 message - 磁盘 - offset 顺序递增 从哪儿开始消费- offset 消费者offset。 -     偏移量监控工具介绍    -  web页面管理的一个管理软件(kafka Manager) 修改bin/kafka-run-class.sh脚本第一行增加JMX_PORT9988 重启kafka进程。 另一个软件主要监控的consumer的偏移量。就是一个jar包 java -cp KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb –offsetStorage kafka \根据版本偏移量存在kafka就填kafka存在zookeeper就填zookeeper –zk hadoop1:2181 –port 9004 –refresh 15.seconds –retain 2.days。 -     消费异常感知    -  heartbeat.interval.msconsumer心跳时间间隔必须得与coordinator保持心跳才能知道consumer是否故障了 然后如果故障之后就会通过心跳下发rebalance的指令给其他的consumer通知他们进行rebalance的操作 session.timeout.mskafka多长时间感知不到一个consumer就认为他故障了默认是10秒 max.poll.interval.ms如果在两次poll操作之间超过了这个时间那么就会认为这个consume处理能力太弱了会被踢出消费组分区分配给别人去消费一般来说结合业务处理的性能来设置就可以了。 -     核心参数解释    -  fetch.max.bytes获取一条消息最大的字节数一般建议设置大一些默认是1M 其实我们在之前多个地方都见到过这个类似的参数意思就是说一条信息最大能多大 Producer 发送的数据一条消息最大多大 - 10M。 Broker 存储数据一条消息最大能接受多大 - 10M。 Consumer max.poll.records: 一次poll返回消息的最大条数默认是500条 connection.max.idle.msconsumer跟broker的socket连接如果空闲超过了一定的时间此时就会自动回收连接但是下次消费就要重新建立socket连接这个建议设置为-1不要去回收 enable.auto.commit: 开启自动提交偏移量 auto.commit.interval.ms: 每隔多久提交一次偏移量默认值5000毫秒 _consumer_offset auto.offset.resetearliest 当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时从头开始消费 topica - partition0:1000 partitino1:2000 latest 当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时消费新产生的该分区下的数据 none topic各分区都存在已提交的offset时从offset后开始消费只要有一个分区不存在已提交的offset则抛出异常。 -     综合案例演示    -  引入案例二手电商平台欢乐送根据用户消费的金额对用户星星进行累计。订单系统生产者 - Kafka集群里面发送了消息。会员系统消费者 - Kafak集群里面消费消息对消息进行处理。 group coordinator原理 面试题消费者是如何实现rebalance的— 根据coordinator实现 什么是coordinator 每个consumer group都会选择一个broker作为自己的coordinator他是负责监控这个消费组里的各个消费者的心跳以及判断是否宕机然后开启rebalance的。 如何选择coordinator机器 首先对groupId进行hash数字接着对consumer_offsets的分区数量取模默认是50_consumer_offsets的分区数可以通过offsets.topic.num.partitions来设置找到分区以后这个分区所在的broker机器就是coordinator机器。比如说groupId“myconsumer_group” - hash值数字- 对50取模 - 8 __consumer_offsets 这个主题的8号分区在哪台broker上面那一台就是coordinator 就知道这个consumer group下的所有的消费者提交offset的时候是往哪个分区去提交offset。 运行流程1每个consumer都发送JoinGroup请求到Coordinator 2然后Coordinator从一个consumer group中选择一个consumer作为leader3把consumer group情况发送给这个leader4接着这个leader会负责制定消费方案5通过SyncGroup发给Coordinator 6接着Coordinator就把消费方案下发给各个consumer他们会从指定的分区的 leader broker开始进行socket连接以及消费消息。 -     Rebalance 策略    -  consumer group靠coordinator实现了Rebalance。 这里有三种rebalance的策略range、round-robin、sticky。 比如我们消费的一个主题有12个分区p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11 假设我们的消费者组里面有三个消费者 range策略 range策略就是按照partiton的序号范围 p0~3 consumer1 p4~7 consumer2 p8~11 consumer3 默认就是这个策略 round-robin策略 就是轮询分配 consumer1:0,3,6,9 consumer2:1,4,7,10 consumer3:2,5,8,11 但是前面的这两个方案有个问题12 - 2 每个消费者会消费6个分区。假设consuemr1挂了:p0-5分配给consumer2,p6-11分配给consumer3 这样的话原本在consumer2上的的p6,p7分区就被分配到了 consumer3上。 sticky策略 最新的一个sticky策略就是说尽可能保证在rebalance的时候让原本属于这个consumer 的分区还是属于他们然后把多余的分区再均匀分配过去这样尽可能维持原来的分区分配的策略。 consumer10-3 consumer2: 4-7 consumer3: 8-11 假设consumer3挂了 consumer10-38,9 consumer2: 4-710,11。 -     Broker 管理    -  Leo、hw含义 Kafka的核心原理 如何去评估一个集群资源 搭建了一套kafka集群 -》 介绍了简单的一些运维管理的操作。 生产者使用核心的参数 消费者原理使用的核心参数 broker内部的一些原理 核心的概念LEOHW LEO是跟offset偏移量有关系。 LEO在kafka里面无论leader partition还是follower partition统一都称作副本replica。 每次partition接收到一条消息都会更新自己的LEO也就是log end offsetLEO其实就是最新的offset 1 HW高水位 LEO有一个很重要的功能就是更新HW如果follower和leader的LEO同步了此时HW就可以更新 HW之前的数据对消费者是可见消息属于commit状态。HW之后的消息消费者消费不到。 Leo更新 hw更新 controller如何管理整个集群 1: 竞争controller的 /controller/id 2controller服务监听的目录/broker/ids/ 用来感知 broker上下线 /broker/topics/ 创建主题我们当时创建主题命令提供的参数ZK地址。/admin/reassign_partitions 分区重分配…… 延时任务 kafka的延迟调度机制扩展知识 我们先看一下kafka里面哪些地方需要有任务要进行延迟调度。第一类延时的任务比如说producer的acks-1必须等待leader和follower都写完才能返回响应。 有一个超时时间默认是30秒request.timeout.ms。所以需要在写入一条数据到leader磁盘之后就必须有一个延时任务到期时间是30秒延时任务 放到DelayedOperationPurgatory延时管理器中。 假如在30秒之前如果所有follower都写入副本到本地磁盘了那么这个任务就会被自动触发苏醒就可以返回响应结果给客户端了 否则的话这个延时任务自己指定了最多是30秒到期如果到了超时时间都没等到就直接超时返回异常。 第二类延时的任务follower往leader拉取消息的时候如果发现是空的此时会创建一个延时拉取任务 延时时间到了之后比如到了100ms就给follower返回一个空的数据然后follower再次发送请求读取消息 但是如果延时的过程中(还没到100ms)leader写入了消息这个任务就会自动苏醒自动执行拉取任务。 海量的延时任务需要去调度。 -     时间轮机制    -  什么会有要设计时间轮Kafka内部有很多延时任务没有基于JDK Timer来实现那个插入和删除任务的时间复杂度是O(nlogn) 而是基于了自己写的时间轮来实现的时间复杂度是O(1)依靠时间轮机制延时任务插入和删除O(1)。 时间轮是什么其实时间轮说白其实就是一个数组。tickMs:时间轮间隔 1ms wheelSize时间轮大小 20 intervaltimckMS * whellSize一个时间轮的总的时间跨度。20ms currentTime当时时间的指针。a:因为时间轮是一个数组所以要获取里面数据的时候靠的是index时间复杂度是O(1) b:数组某个位置上对应的任务用的是双向链表存储的往双向链表里面插入删除任务时间复杂度也是O1 举例插入一个8ms以后要执行的任务 19ms 3.多层级的时间轮 比如要插入一个110毫秒以后运行的任务。tickMs:时间轮间隔 20ms wheelSize时间轮大小 20 intervaltimckMS * whellSize一个时间轮的总的时间跨度。20ms currentTime当时时间的指针。第一层时间轮1ms * 20 第二层时间轮20ms * 20 第三层时间轮400ms * 20。