中间件
路由与 web 服务器
- 阿里基于 Nginx 研发的 Tengine
- 阿里内部的集中式路由服务 VipServer
RPC 框架
- grpc
- Thrift
- 阿里的 HSF
- Dubbo
- 节点说明
- Consumer
- 需要调用远程服务的服务消费方 Registry 注册中心
- Provider
- 服务提供方
- Container
- 服务运行的容器
- Monitor
- 监控中心
- Consumer
- 大致流程:
- 首先服务提供者 Provider 启动然后向注册中心注册自己所能提供的服务。
- 服务消费者 Consumer 启动向注册中心订阅自己所需的服务。
- 然后注册中心将提供者元信息通知给 Consumer, 之后 Consumer 因为已经从注册中心获取提供者的地址,因此可以通过负载均衡选择一个 Provider 直接调用。
- 之后服务提供方元数据变更的话注册中心会把变更推送给服务消费者。
- 服务提供者和消费者都会在内存中记录着调用的次数和时间,然后定时的发送统计数据到监控中心。
- 注意:
- 注册中心和监控中心是可选的,可以直接在配置文件里面写然后提供方和消费方直连。
- 注册中心、提供方和消费方之间都是⻓连接,和监控方不是⻓连接,并且消费方是直接调用提供方,不经过注册中心。
- 注册中心和监控中心宕机了也不会影响到已经正常运行的提供者和消费者,因为消费者有本地缓存提供者的信息。
- 分层:
- Service,业务层,就是咱们开发的业务逻辑层。
- Config,配置层,主要围绕 ServiceConfig 和 ReferenceConfig,初始化配置信息。
- Proxy,代理层,服务提供者还是消费者都会生成一个代理类,使得服务接口透明化,代理层做远程调用和返回结果。
- Register,注册层,封装了服务注册和发现。
- Cluster,路由和集群容错层,负责选取具体调用的节点,处理特殊的调用要求和负责远程调用失败的容错措施。
- Monitor,监控层,负责监控统计调用时间和次数。
- Portocol,远程调用层,主要是封装 RPC 调用,主要负责管理 Invoker,Invoker 代表一个抽象封装了的执行体。
- Exchange,信息交换层,用来封装请求响应模型,同步转异步。
- Transport,网络传输层,抽象了网络传输的统一接口,这样用户想用 Netty 就用 Netty,想用 Mina 就用 Mina。
- Serialize,序列化层,将数据序列化成二进制流,当然也做反序列化。
- 调用过程:
- 服务暴露过程
- 首先 Provider 启动,通过 Proxy 组件根据具体的协议 Protocol 将需要暴露出去的接口封装成 Invoker, Invoker 是 Dubbo 一个很核心的组件,代表一个可执行体。
- 然后再通过 Exporter 包装一下,这是为了在注册中心暴露自己套的一层,然后将 Exporter 通过 Registry 注册到注册中心。 这就是整体服务暴露过程。
- 消费过程
- 首先消费者启动会向注册中心拉取服务提供者的元信息,然后调用流程也是从 Proxy 开始,毕竟都需要代理才能无感知。
- Proxy 持有一个 Invoker 对象,调用 invoke 之后需要通过 Cluster 先从 Directory 获取所有可调用的远程服务的 Invoker 列表,如果配置了某些路由规则,比如某个接口只能调用某个节点的那就再过滤一遍 Invoker 列表。
- 剩下的 Invoker 再通过 LoadBalance 做负载均衡选取一个。然后再经过 Filter 做一些统计什么的,再通过 Client 做数据传输,比如用 Netty 来传输。
- 传输需要经过 Codec 接口做协议构造,再序列化。最终发往对应的服务提供者。
- 服务提供者接收到之后也会进行 Codec 协议处理,然后反序列化后将请求扔到线程池处理。某个线程会根据请求找到对应的 Exporter ,而找到 Exporter 其实就是找到了 Invoker,但是还会有一层层 Filter,经过一层层过滤链之后最终调用实现类然后原路返回结果。
- 服务暴露过程
- 负载均衡策略
- 随机 Random LoadBalance
- 按照权重设置的大小,随机
- 轮询 RoundRobin LoadBalance
- 例如:a b c,a 执行完 b 执行然后c,然后在到 a
- 最少活跃调用数(权重)LeastActive LoadBalance
- 活跃数指调用前后计数差,优先调用高的,相同活跃数的随机。使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大。
- 一致性 Hash ConsistentHash LoadBalance
- 相同参数总是发送到同一个提供者,如果这个提供者挂掉了,它会根据它的虚拟节点,平摊到其它服务者,不会引起巨大的变动
- 随机 Random LoadBalance
- 相关问题
- Dubbo 和 Spring Cloud 有什么区别?
- 通信方式不同
- Dubbo 使用的是 RPC 通信,而 Spring Cloud 使用的是 HTTP RESTFul 方式。
- 组成部分不同
- 通信方式不同
- 当一个服务接口有多种实现时怎么做?
- 当一个接口有多种实现时,可以用 group 属性来分组,服务提供方和消费方都指定同一个 group 即可。
- 服务上线怎么兼容旧版本?
- 可以用版本号(version)过渡,多个不同版本的服务注册到注册中心,版本号不同的服务相互间不引用。这个和服务分组的概念有一点类似。
- Dubbo 和 Spring Cloud 有什么区别?
- 节点说明
- SOFA-RPC
消息中间件
- 消息队列通信的模式
- 点对点模式
- 点对点模式通常是基于拉取或者轮询的消息传送模型,这个模型的特点是发送到队列的消息被一个且只有一个消费者进行处理。生产者将消息放入消息队列后,由消费者主动的去拉取消息进行消费。点对点模型的的优点是消费者拉取消息的频率可以由自己控制。但是消息队列是否有消息需要消费,在消费者端无法感知,所以在消费者端需要额外的线程去监控。
- 发布订阅模式
- 发布订阅模式是一个基于消息送的消息传送模型,该模型可以有多种不同的订阅者。生产者将消息放入消息队列后,队列会将消息推送给订阅过该类消息的消费者(类似微信公众号)。由于是消费者被动接收推送,所以无需感知消息队列是否有待消费的消息!但是 consumer1、consumer2、consumer3 由于机器性能不一样,所以处理消息的能力也会不一样,但消息队列却无法感知消费者消费的速度!所以推送的速度成了发布订阅模模式的一个问题!假设三个消费者处理速度分别是 8M/s、5M/s、2M/s,如果队列推送的速度为 5M/s,则 consumer3 无法承受!如果队列推送的速度为 2M/s,则 consumer1、consumer2 会出现资源的极大浪费!
- 点对点模式
- 消息队列使用场景
- 解耦
- 解耦是消息队列要解决的最本质问题。
- 最终一致性
- 最终一致性指的是两个系统的状态保持一致,要么都成功,要么都失败。
- 最终一致性不是消息队列的必备特性,但确实可以依靠消息队列来做最终一致性的事情。
- 广播
- 消息队列的基本功能之一是进行广播。
- 有了消息队列,我们只需要关心消息是否送达了队列,至于谁希望订阅,是下游的事情,无疑极大地减少了开发和联调的工作量。
- 错峰与流控
- 典型的使用场景就是秒杀业务用于流量削峰场景。
- 解耦
- 常用的消息队列
- Apache Kafka
- Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据,具有高性能、持久化、多副本备份、横向扩展能力。
- 架构
- Producer
- Producer 即生产者,消息的产生者,是消息的入口。
- Kafka cluster
- Broker
- Broker 是 Kafka 实例,每个服务器上有一个或多个 Kafka 的实例,我们姑且认为每个 broker 对应一台服务器。每个 Kafka 集群内的 broker 都有一个不重复的编号,如图中的 broker-0、broker-1 等……
- Controller Broker
- 在 Kafka 早期版本,对于分区和副本的状态的管理依赖于 zookeeper 的 Watcher 和队列:每一个 broker 都会在 zookeeper 注册 Watcher,所以 zookeeper 就会出现大量的 Watcher, 如果宕机的 broker 上的 partition 很多比较多,会造成多个 Watcher 触发,造成集群内大规模调整;每一个 replica 都要去再次 zookeeper 上注册监视器,当集群规模很大的时候,zookeeper 负担很重。这种设计很容易出现脑裂和羊群效应以及 zookeeper 集群过载。
- 新版本该变了这种设计,使用 Kafka Controller,Leader 会向 zookeeper 上注册 Watcher,其他 broker 几乎不用监听 zookeeper 的状态变化。
- Kafka 集群中多个 broker,有一个会被选举为 controller leader,负责管理整个集群中分区和副本的状态,比如 partition 的 leader 副本故障,由 controller 负责为该 partition 重新选举新的 leader 副本;当检测到 ISR 列表发生变化,由 controller 通知集群中所有 broker 更新其 MetadataCache 信息;或者增加某个 topic 分区的时候也会由 controller 管理分区的重新分配工作。
- 当 broker 启动的时候,都会创建 KafkaController 对象,但是集群中只能有一个 leader 对外提供服务,这些每个节点上的 KafkaController 会在指定的 zookeeper 路径下创建临时节点,只有第一个成功创建的节点的 KafkaController 才可以成为 leader,其余的都是 follower。当 leader 故障后,所有的 follower 会收到通知,再次竞争在该路径下创建节点从而选举新的 leader。
- Controller Broker 的具体作用
- 创建、删除主题,增加分区并分配 leader 分区
- 集群 Broker 管理(新增 Broker、Broker 主动关闭、Broker 故障)
- preferred leader 选举
- 分区重分配
- Topic
- 消息的主题,可以理解为消息的分类,Kafka 的数据就保存在 topic。在每个 broker 上都可以创建多个 topic。
- Partition
- Topic 的分区,每个 topic 可以有多个分区,分区的作用是做负载,提高 Kafka 的吞吐量。同一个 topic 在不同的分区的数据是不重复的,partition 的表现形式就是一个一个的文件夹!
- 分区的主要目的
- 方便扩展
- 因为一个 topic 可以有多个 partition,所以我们可以通过扩展机器去轻松的应对日益增长的数据量。
- 提高并发
- 以 partition 为读写单位,可以多个消费者同时消费数据,提高了消息的处理效率。
- 方便扩展
- Partition 结构
- Partition 在服务器上的表现形式就是一个一个的文件夹,每个 partition 的文件夹下面会有多组 segment 文件,每组 segment 文件又包含 .index 文件、.log 文件、.timeindex 文件(早期版本中没有)三个文件,
- log 文件就实际是存储 message 的地方
- index 和 timeindex 文件为索引文件,用于检索消息。
- 这个 partition 有三组 segment 文件,每个 log 文件的大小是一样的,但是存储的 message 数量是不一定相等的(每条的 message 大小不一致)。文件的命名是以该 segment 最小 offset 来命名的,如 000.index 存储 offset 为 0~368795 的消息,kafka 就是利用分段+索引的方式来解决查找效率的问题。
- Partition 在服务器上的表现形式就是一个一个的文件夹,每个 partition 的文件夹下面会有多组 segment 文件,每组 segment 文件又包含 .index 文件、.log 文件、.timeindex 文件(早期版本中没有)三个文件,
- Replication
- 每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为 Leader。在 Kafka 中默认副本的最大数量是 10 个,且副本的数量不能大于 Broker 的数量,follower 和 leader 绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。
- Broker
- Message
- 每一条发送的消息主体。
- Message 结构
- 消息主要包含消息体、消息大小、offset、压缩类型等等
- offset
- offset 是一个占 8byte 的有序 id 号,它可以唯一确定每条消息在 parition 内的位置!
- 消息大小
- 消息大小占用 4byte,用于描述消息的大小。
- 消息体
- 消息体存放的是实际的消息数据(被压缩过),占用的空间根据具体的消息而不一样。
- offset
- 消息主要包含消息体、消息大小、offset、压缩类型等等
- Consumer
- 消费者,即消息的消费方,是消息的出口。
- Consumer Group
- 我们可以将多个消费组组成一个消费者组,在 Kafka 的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个 topic 的不同分区的数据,这也是为了提高 Kafka 的吞吐量!
- GroupCoordinator
- 每个 consumer group 都会选择一个 broker 作为自己的 coordinator,他是负责监控整个消费组里的各个分区的心跳,以及判断是否宕机,和开启 rebalance 的。
- 如何选择 coordinator 机器
- 首先对 group id 进行 hash,接着对 __consumer_offsets 的分区数量进行取模,默认分区数量是 50
- __consumer_offsets 的分区数量可以通过 offsets.topic.num.partitions 来设置,找到分区以后,这个分区所在的 broker 机器就是 coordinator 机器。
- __consumer_offsets topic 了,它是 Kafka 内部使用的一个 topic,专门用来存储 group 消费的情况,默认情况下有50个 partition,每个 partition 默认有三个副本,而具体的一个 group 的消费情况要存储到哪一个 partition 上,是根据 $abs(GroupId.hashCode()) % NumPartitions$ 来计算的(其中,NumPartitions 是 __consumer_offsets 的 partition 数,默认是50个)。
- 对于 consumer group 而言,是根据其 group.id 进行 hash 并计算得到其具对应的 partition 值,该 partition leader 所在 Broker 即为该 Group 所对应的 GroupCoordinator,GroupCoordinator 会存储与该 group 相关的所有的 Meta 信息。
- Zookeeper
- Kafka 集群依赖 zookeeper 来保存集群的的元信息,来保证系统的可用性。
- 具体功能
- 对于 broker
- 记录状态
- zookeeper 记录了所有 broker 的存活状态,broker 会向 zookeeper 发送心跳请求来上报自己的状态。
- zookeeper 维护了一个正在运行并且属于集群的 broker 列表。
- 控制器选举
- kafka 集群中有多个 broker,其中有一个会被选举为控制器。
- 控制器负责管理整个集群所有分区和副本的状态,例如某个分区的 leader 故障了,控制器会选举新的 leader。
- 从多个 broker 中选出控制器,这个工作就是 zookeeper 负责的。
- 限额权限
- kafka 允许一些 client 有不同的生产和消费的限额。
- 这些限额配置信息是保存在 zookeeper 里面的。
- 所有 topic 的访问控制信息也是由 zookeeper 维护的。
- 记录 ISR
- ISR(in-sync replica) 是 partition 的一组同步集合,就是所有 follower 里面同步最积极的那部分。
- 一条消息只有被 ISR 中的成员都接收到,才被视为“已同步”状态。
- 只有处于 ISR 集合中的副本才有资格被选举为 leader。
- zookeeper 记录着 ISR 的信息,而且是实时更新的,只要发现其中有成员不正常,马上移除。
- node 和 topic 注册
- zookeeper 保存了所有 node 和 topic 的注册信息,可以方便的找到每个 broker 持有哪些 topic。
- node 和 topic 在 zookeeper 中是以临时节点的形式存在的,只要与 zookeeper 的 session 一关闭,他们的信息就没有了。
- topic 配置
- zookeeper 保存了 topic 相关配置,例如 topic 列表、每个 topic 的 partition 数量、副本的位置等等。
- 记录状态
- 对于 consumer
- offset
- kafka 老版本中,consumer 的消费偏移量是默认存储在 zookeeper 中的。
- 新版本中,这个工作由 kafka 自己做了,kafka 专门做了一个 offset manager。
- 注册
- 和 broker 一样,consumer 也需要注册。
- consumer 会自动注册,注册的方式也是创建一个临时节点,consumer down 了之后就会自动销毁。
- 分区注册
- kafka 的每个 partition 只能被消费组中的一个 consumer 消费,kafka 必须知道所有 partition 与 consumer 的关系。
- offset
- 对于 broker
- 相关问题
- Kafka 为什么要放弃 Zookeeper?
- confluent 社区发表了一篇文章,主要讲述了 Kafka 未来的 2.8 版本将要放弃 Zookeeper,这对于 Kafka 用户来说,是一个重要的改进。之前部署 Kafka 就必须得部署 Zookeeper,而之后就只要单独部署 Kafka 就行了。
- Kafka 本身就是一个分布式系统,但是需要另一个分布式系统来管理,复杂性无疑增加了。
- 运维复杂度
- Controller 故障处理
- Kafaka 依赖一个单一 Controller 节点跟 Zookeeper 进行交互,如果这个 Controller 节点发生了故障,就需要从 broker 中选择新的 Controller。
- 新的 Controller 选举成功后,会重新从 Zookeeper 拉取元数据进行初始化,并且需要通知其他所有的 broker 更新 ActiveControllerId。老的 Controller 需要关闭监听、事件处理线程和定时任务。分区数非常多时,这个过程非常耗时,而且这个过程中 Kafka 集群是不能工作的。
- 分区瓶颈
- 当分区数增加时,Zookeeper 保存的元数据变多,Zookeeper 集群压力变大,达到一定级别后,监听延迟增加,给 Kafaka 的工作带来了影响。
- 升级
- KIP-500 用 Quorum Controller 代替之前的 Controller,Quorum 中每个 Controller 节点都会保存所有元数据,通过 KRaft 协议保证副本的一致性。这样即使 Quorum Controller 节点出故障了,新的 Controller 迁移也会非常快。
- 官方介绍,升级之后,Kafka 可以轻松支持百万级别的分区。
- Kafaka 计划在 3.0 版本会兼容 Zookeeper Controller 和 Quorum Controller,这样用户可以进行灰度测试。
- Kafka 为什么要放弃 Zookeeper?
- Producer
- 重平衡机制
- 重平衡其实就是一个协议,它规定了如何让消费者组下的所有消费者来分配 topic 中的每一个分区。比如一个 topic 有 100 个分区,一个消费者组内有 20 个消费者,在协调者的控制下让组内每一个消费者分配到 5 个分区,这个分配的过程就是重平衡。
- 重平衡的触发条件
- 消费者组内成员发生变更,这个变更包括了增加和减少消费者。注意这里的减少有很大的可能是被动的,就是某个消费者崩溃退出了
- 主题的分区数发生变更,kafka 目前只支持增加分区,当增加的时候就会触发重平衡
- 订阅的主题发生变化,当消费者组使用正则表达式订阅主题,而恰好又新建了对应的主题,就会触发重平衡
- 重平衡策略
- Range
- 具体实现位于,package org.apache.kafka.clients.consumer.RangeAssignor。
- 把若干个连续的分区分配给消费者,如存在分区 1-5,假设有 3 个消费者,则消费者 1 负责分区 1-2,消费者 2 负责分区 3-4,消费者 3 负责分区 5。
- RoundRobin
- 具体实现位于,package org.apache.kafka.clients.consumer.RoundRobinAssignor。
- 就是把所有分区逐个分给消费者,如存在分区 1-5,假设有 3 个消费者,则分区 1->消费 1,分区 2->消费者 2,分区 3>消费者 3,分区 4>消费者 1,分区 5->消费者 2。
- Sticky
- Sticky 分配策略是最新的也是最复杂的策略,其具体实现位于 package org.apache.kafka.clients.consumer.StickyAssignor。
- 这种分配策略是在 0.11.0 才被提出来的,主要是为了一定程度解决上面提到的重平衡非要重新分配全部分区的问题。称为粘性分配策略。
- Range
- 重平衡过程
- 消费端重平衡流程
- Rebalance 是通过消费者群组中的称为“群主”消费者客户端进行的。
- “群主”就是第一个加入群组的消费者。消费者第一次加入群组时,它会向群组协调器发送一个 JoinGroup 的请求,如果是第一个,则此消费者被指定为“群主”。
- “群主”就是第一个加入群组的消费者。消费者第一次加入群组时,它会向群组协调器发送一个 JoinGroup 的请求,如果是第一个,则此消费者被指定为“群主”。
- 群主从群组协调器获取群组成员列表,然后给每一个消费者进行分配分区 Partition。
- 群主分配完成之后,把分配情况发送给群组协调器。
- 群组协调器再把这些信息发送给消费者。每一个消费者只能看到自己的分配信息,只有群主知道所有消费者的分配信息。
- Rebalance 是通过消费者群组中的称为“群主”消费者客户端进行的。
- Broker 端重平衡
- 新成员加入组
- 组成员主动离组
- 组成员崩溃离组
- 组成员提交位移
- 新成员加入组
- 消费端重平衡流程
- 避免重平衡
- 未及时发送心跳
- 第一类非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被“踢出” Group 而引发的。因此,你需要仔细地设置 session.timeout.ms 和 heartbeat.interval.ms 的值。
- Consumer 消费时间过长
- 第二类非必要 Rebalance 是 Consumer 消费时间过长导致的。
- 未及时发送心跳
- ISR 机制
- Kafka 提供了数据复制算法保证,如果 leader 发生故障或挂掉,一个新 leader 被选举并被接受客户端的消息成功写入。Kafka 确保从同步副本列表中选举一个副本为 leader,或者说 follower 追赶 leader 数据。leader 负责维护和跟踪 ISR(In-Sync Replicas 的缩写,表示副本同步队列)中所有 follower 滞后的状态。当 producer 发送一条消息到 broker 后,leader 写入消息并复制到所有 follower。消息提交之后才被成功复制到所有的同步副本。消息复制延迟受最慢的 follower 限制,重要的是快速检测慢副本,如果 follower “落后”太多或者失效,leader 将会把它从 ISR 中删除。
- 相关概念
- AR:所有的副本(replicas)统称为 Assigned Replicas
- ISR:in-Sync Replicas,这个是指副本同步队列
- OSR:follower 从 leader 同步数据有一些延迟,任意一个超过阈值都会把 follower 剔除出 ISR, 存入 OSR(Outof-Sync Replicas)列表,新加入的 follower 也会先存放在 OSR 中
- HW:HighWatermark,是指 consumer 能够看到的此 partition 的位置
- LEO:LogEndOffset,表示每个 partition 的 log 最后一条 Message 的位置
- 机制原理
- 每个 replica 都有自己的 HW,leader 和 follower 各自负责更新自己的 HW 的状态。对于 leader 新写入的消息,consumer 不能立刻消费,leader 会等待该消息被所有 ISR 中的 replicas 同步后更新 HW,此时消息才能被 consumer 消费。这样就保证了如果 leader 所在的 broker 失效,该消息仍然可以从新选举的 leader 中获取。对于来自内部 broker 的读取请求,没有 HW 的限制。
- Kafka 的复制机制既不是完全的同步复制,也不是单纯的异步复制。
- 同步复制要求所有能工作的 follower 都复制完,这条消息才会被 commit,这种复制方式极大的影响了吞吐率。
- 异步复制方式下,follower 异步的从 leader 复制数据,数据只要被 leader 写入 log 就被认为已经 commit,这种情况下如果 follower 都还没有复制完,落后于 leader 时,突然 leader 宕机,则会丢失数据。
- 流程
- 自动给每个 Partition 维护一个 ISR 列表,这个列表里一定会有 Leader,然后还会包含跟 Leader 保持同步的 Follower。也就是说,只要 Leader 的某个 Follower 一直跟他保持数据同步,那么就会存在于 ISR 列表里。
- 但是如果 Follower 因为自身发生一些问题,导致不能及时的从 Leader 同步数据过去,那么这个 Follower 就会被认为是“out-of-sync”,从 ISR 列表里踢出去。
- 每个 replica 都有自己的 HW,leader 和 follower 各自负责更新自己的 HW 的状态。对于 leader 新写入的消息,consumer 不能立刻消费,leader 会等待该消息被所有 ISR 中的 replicas 同步后更新 HW,此时消息才能被 consumer 消费。这样就保证了如果 leader 所在的 broker 失效,该消息仍然可以从新选举的 leader 中获取。对于来自内部 broker 的读取请求,没有 HW 的限制。
- 生效时机
- 当 acks 参数设置为 all 时,producer 需要等待 ISR 中的所有 follower 都确认接收到数据后才算一次发送完成,可靠性最高。
- 工作流程
- 发送数据
- Producer 在写入数据的时候永远的找 leader,不会直接将数据写入 follower。
- 消息写入 leader 后,follower 是主动的去 leader 进行同步的!producer 采用 push 模式将数据发布到 broker,每条消息追加到分区中,顺序写入磁盘,所以保证同一分区内的数据是有序的!
- 相关问题
- 如果某个 topic 有多个 partition,producer 又怎么知道该将数据发往哪个 partition 呢?
- partition 在写入的时候可以指定需要写入的 partition,如果有指定,则写入对应的 partition。
- 如果没有指定 partition,但是设置了数据的 key,则会根据 key 的值 hash 出一个 partition。
- 如果既没指定 partition,又没有设置 key,则会轮询选出一个 partition。
- producer 在向 kafka 写入消息的时候,怎么保证消息不丢失呢?
- 通过 ACK 应答机制!在生产者向队列写入数据的时候可以设置参数来确定是否确认 Kafka 接收到数据,这个参数可设置的值为 0、1、-1。
- 0 代表 producer 往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。
- 1 代表 producer 往集群发送数据只要 leader 应答就可以发送下一条,只确保 leader 发送成功。
- -1 代表只有当 ISR 中的副本全部收到消息时,生产者才会认为消息生产成功了。这种配置是最安全的,因为如果 leader 副本挂了,当 follower 副本被选为 leader 副本时,消息也不会丢失。但是系统吞吐量会降低,因为生产者要等待所有副本都收到消息后才能再次发送消息。
- 通过 ACK 应答机制!在生产者向队列写入数据的时候可以设置参数来确定是否确认 Kafka 接收到数据,这个参数可设置的值为 0、1、-1。
- 如果往不存在的 topic 写数据,能不能写入成功呢?
- Kafka 会自动创建 topic,分区和副本的数量根据默认配置都是 1。
- 如果某个 topic 有多个 partition,producer 又怎么知道该将数据发往哪个 partition 呢?
- 保存数据
- Kafka 将数据保存在磁盘,可能在我们的一般的认知里,写入磁盘是比较耗时的操作,不适合这种高并发的组件。Kafka 初始会单独开辟一块磁盘空间,顺序写入数据(效率比随机写入高)。
- 存储策略
- 基于时间,默认配置是 168 小时(7 天)。
- 基于大小,默认配置是 1073741824(1G)。
- 需要注意的是,kafka 读取特定消息的时间复杂度是
O(1)
,所以这里删除过期的文件并不会提高 kafka 的性能!
- 消费数据
- Kafka 采用的是点对点的模式,消费者主动的去 kafka 集群拉取消息,与 producer 相同的是,消费者在拉取消息的时候也是找 leader 去拉取。
- 同一个消费组的消费者可以消费同一 topic 下不同分区的数据,但是不会组内多个消费者消费同一分区的数据!
- 消费者组内的消费者小于 partition 数量的情况,所以会出现某个消费者消费多个 partition 数据的情况,消费的速度也就不及只处理一个 partition 的消费者的处理速度!
- 建议消费者组的 consumer 的数量与 partition 的数量一致!
- 相关问题
- 查找消息的时候是怎么利用 segment+offset 配合查找的呢?假如现在需要查找一个 offset 为 368801 的 message 是什么样的过程呢?
- 先找到 offset 的 368801 的 message 所在的 segment 文件(利用二分法查找),这里找到的就是在第二个 segment 文件。
- 打开找到的 segment 中的 .index 文件(也就是 368796.index 文件,该文件起始偏移量为 368796+1,我们要查找的 offset 为 368801 的 message 在该 index 内的偏移量为 368796+5=368801,所以这里要查找的相对 offset 为 5)。由于该文件采用的是稀疏索引的方式存储着相对 offset 及对应 message 物理偏移量的关系,所以直接找相对 offset 为 5 的索引找不到,这里同样利用二分法查找相对 offset 小于或者等于指定的相对 offset 的索引条目中最大的那个相对 offset,所以找到的是相对 offset 为 4 的这个索引。
- 根据找到的相对 offset 为 4 的索引确定 message 存储的物理偏移位置为 256。打开数据文件,从位置为 256 的那个地方开始顺序扫描直到找到 offset 为 368801 的那条 Message。
- 这套机制是建立在 offset 为有序的基础上,利用 segment+有序 offset+稀疏索引+二分查找+顺序查找等多种手段来高效的查找数据!
- 从 kafka 读取数据后,数据会自动删除吗?
- 不会,kafka 中数据的删除跟有没有消费者消费完全无关。数据的删除,只跟 kafka broker 上面上面的这两个配置有关:
log.retention.hours=48 #数据最多保存48小时
log.retention.bytes=1073741824 #数据最多1G
- 不会,kafka 中数据的删除跟有没有消费者消费完全无关。数据的删除,只跟 kafka broker 上面上面的这两个配置有关:
- 查找消息的时候是怎么利用 segment+offset 配合查找的呢?假如现在需要查找一个 offset 为 368801 的 message 是什么样的过程呢?
- 发送数据
- log 的清除策略以及压缩策略
- 日志的分段存储,一方面能够减少单个文件内容的大小,另一方面,方便 kafka 进行日志清理。
- 清理策略有两个:
- 根据消息的保留时间,当消息在 kafka 中保存的时间超过了指定的时间,就会触发清理过程
- 根据 topic 存储的数据大小,当 topic 所占的日志文件大小大于一定的阀值,则可以开始删除最旧的消息。kafka 会启动一个后台线程,定期检查是否存在可以删除的消息
- 当其中任意一个达到要求,都会执行删除。
- 日志压缩策略:
- 通过这个功能可以有效的减少日志文件的大小,缓解磁盘紧张的情况,在很多实际场景中,消息的 key 和 value 的值之间的对应关系是不断变化的,就像数据库中的数据会不断被修改一样,消费者只关心 key 对应的最新的 value。因此,我们可以开启 kafka 的日志压缩功能,服务端会在后台启动启动 Cleaner 线程池,定期将相同的 key 进行合并,只保留最新的 value 值。
- 原理
- producer
- 整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和发送线程。在主线程中由 KafkaProducer 创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息收集器(RecordAccumulator,也称为消息累加器)中。发送线程负责从消息收集器中获取消息并将其发送到 Kafka 中。
- 主线程中发送过来的消息都会被追加到消息收集器的某个双端队列(Deque)中,在其的内部为每个分区都维护了一个双端队列,队列中的内容就是ProducerBatch,即 Deque。消息写入缓存时,追加到双端队列的尾部;Sender 读取消息时,从双端队列的头部读取。注意 ProducerBatch 不是 ProducerRecord,ProducerBatch 中可以包含一至多个 ProducerRecord。
- ProducerRecord 是生产者中创建的消息,而 ProducerBatch 是指一个消息批次,ProducerRecord 会被包含在 ProducerBatch 中,这样可以使字节的使用更加紧凑。与此同时,将较小的 ProducerRecord 拼凑成一个较大的 ProducerBatch,也可以减少网络请求的次数以提升整体的吞吐量。
- broker
- consumer
- producer
- 相关问题
- 为什么要使用 kafka?
- 缓冲和削峰
- 解耦和扩展性
- 冗余
- 健壮性
- 异步通信
- Kafka 是如何做到消息不丢失或不重复的?
- 生产者数据的不丢失
- 要使用带回调方法的 API。
- 在 kafka 发送数据的时候,每次发送消息都会有一个确认反馈机制,确保消息正常的能够被收到。
- 设置参数 acks=-1。
- 设置参数 retries=3。
- 参数 retries 表示生产者生产消息的重试次数。
- 这里 retries=3 是一个建议值,一般情况下能满足足够的重试次数就能重试成功。但是如果重试失败了,对异常处理时就可以把消息保存到其他可靠的地方,如磁盘、数据库、远程缓存等,然后等到服务正常了再继续发送消息。
- 设置参数 retry.backoff.ms=300。
- retry.backoff.ms 指消息生产超时或失败后重试的间隔时间,单位是毫秒。
- 消费者数据的不丢失
- 从 kafka 拉取消息下来,由于自动的提交模式已经提交了 offset,但消费者是没有真正消费成功的,并且消费者可能日常发布重启或者挂掉了,那这条消息就丢了。
- 如何费者数据的不丢失解决?
- 关闭自动提交,改成手动提交,每次数据处理完后,再提交。
- 如何费者数据的不丢失解决?
- kafka 自己记录了每次消费的 offset 数值,下次继续消费的时候,会接着上次的 offset 进行消费。
- 而 offset 的信息在 kafka0.8 版本之前保存在 zookeeper 中,在 0.8 版本之后保存到 topic 中,即使消费者在运行过程中挂掉了,再次启动的时候会找到 offset 的值,找到之前消费消息的位置,接着消费,由于 offset 的信息写入的时候并不是每条消息消费完成后都写入的,所以这种情况有可能会造成重复消费,但是不会丢失消息。
- 如何解决重复消费问题?
- 关闭自动提交,改成手动提交,每次数据处理完后,再提交。消费的接口幂等处理。
- 如何解决重复消费问题?
- 从 kafka 拉取消息下来,由于自动的提交模式已经提交了 offset,但消费者是没有真正消费成功的,并且消费者可能日常发布重启或者挂掉了,那这条消息就丢了。
- broker 的数据不丢失
- 每个 broker 中的 partition 我们一般都会设置有 replication(副本)的个数,生产者写入的时候首先根据分发策略(有 partition 按 partition,有 key 按 key,都没有就轮询)写入到 leader 中,follower(副本)再跟 leader 同步数据,这样有了备份,也可以保证消息数据的不丢失。
- 设置 replication.factor >1。
- replication.factor 这个参数表示分区副本的个数,这里我们要将其设置为大于 1 的数,这样当 leader 副本挂了,follower 副本还能被选为 leader 副本继续接收消息。
- 设置 min.insync.replicas >1。
- min.insync.replicas 指的是 ISR 最少的副本数量,原理同上,也需要大于 1 的副本数量来保证消息不丢失。
- 设置 unclean.leader.election.enable = false。
- unclean.leader.election.enable 指是否能把非 ISR 集合中的副本选举为 leader 副本。unclean.leader.election.enable = true,也就是说允许非 ISR 集合中的 follower 副本成为 leader 副本。
- 设置 replication.factor >1。
- 每个 broker 中的 partition 我们一般都会设置有 replication(副本)的个数,生产者写入的时候首先根据分发策略(有 partition 按 partition,有 key 按 key,都没有就轮询)写入到 leader 中,follower(副本)再跟 leader 同步数据,这样有了备份,也可以保证消息数据的不丢失。
- 在 Kafka 中,生产者写入消息、消费者读取消息的操作都是与 leader 副本进行交互的,从而实现的是一种主写主读的生产消费模型。
- Kafka 并不支持主写从读,因为主写从读有 2 个很明显的缺点:
- 数据一致性问题
- 数据从主节点转到从节点必然会有一个延时的时间窗口,这个时间窗口会导致主从节点之间的数据不一致。某一时刻,在主节点和从节点中 A 数据的值都为 X,之后将主节点中 A 的值修改为 Y,那么在这个变更通知到从节点之前,应用读取从节点中的 A 数据的值并不为最新的 Y,由此便产生了数据不一致的问题。
- 延时问题
- 类似 Redis 这种组件,数据从写入主节点到同步至从节点中的过程需要经历 网络→主节点内存→网络→从节点内存 这几个阶段,整个过程会耗费一定的时间。而在 Kafka 中,主从同步会比 Redis 更加耗时,它需要经历 网络→主节点内存→主节点磁盘→网络→从节点内存→从节点磁盘 这几个阶段。对延时敏感的应用而言,主写从读的功能并不太适用。
- 数据一致性问题
- 而 kafka 的主写主读的优点就很多了:
- 可以简化代码的实现逻辑,减少出错的可能;
- 将负载粒度细化均摊,与主写从读相比,不仅负载效能更好,而且对用户可控;
- 没有延时的影响;
- 在副本稳定的情况下,不会出现数据不一致的情况。
- 生产者数据的不丢失
- 磁盘存储的性能问题
- 为了规避随机读写带来的时间消耗,kafka 采用顺序写的方式存储数据。
- 磁盘读取时间:
- 寻道时间,表示磁头在不同磁道之间移动的时间。
- 旋转延迟,表示在磁道找到时,中轴带动盘面旋转到合适的扇区开头处。
- 传输时间,表示盘面继续转动,实际读取数据的时间。
- 顺序读写,磁盘会预读,预读即在读取的起始地址连续读取多个页面,主要时间花费在了传输时间,而这个时间两种读写可以认为是一样的。
- 随机读写,因为数据没有在一起,将预读浪费掉了。需要多次寻道和旋转延迟。而这个时间可能是传输时间的许多倍。
- 磁盘读取时间:
- 零拷贝
- 消息从发送到落地保存,broker 维护的消息日志本身就是文件目录,每个文件都是二进制保存,生产者和消费者使用相同的格式来处理。在消费者获取消息时,服务器先从硬盘读取数据到内存,然后把内存中的数据原封不动的通过 socket 发送给消费者。虽然这个操作描述起来很简单,但实际上经历了很多步骤。
- 操作系统将数据从磁盘读入到内核空间的页缓存:
- 应用程序将数据从内核空间读入到用户空间缓存中
- 应用程序将数据写回到内核空间到 socket 缓存中
- 操作系统将数据从 socket 缓冲区复制到网卡缓冲区,以便将数据经网络发出
- 操作系统将数据从磁盘读入到内核空间的页缓存:
- 通过“零拷贝”技术,可以去掉这些没必要的数据复制操作,同时也会减少上下文切换次数。现代的unix操作系统提供一个优化的代码路径,用于将数据从页缓存传输到 socket;在 Linux 中,是通过 sendfile 系统调用来完成的。Java 提供了访问这个系统调用的方法:FileChannel.transferTo API
- 使用 sendfile,只需要一次拷贝就行,允许操作系统将数据直接从页缓存发送到网络上。所以在这个优化的路径中,只有最后一步将数据拷贝到网卡缓存中是需要的
- 消息从发送到落地保存,broker 维护的消息日志本身就是文件目录,每个文件都是二进制保存,生产者和消费者使用相同的格式来处理。在消费者获取消息时,服务器先从硬盘读取数据到内存,然后把内存中的数据原封不动的通过 socket 发送给消费者。虽然这个操作描述起来很简单,但实际上经历了很多步骤。
- 页缓存
- 页缓存是操作系统实现的一种主要的磁盘缓存,但凡设计到缓存的,基本都是为了提升 I/O 性能,所以页缓存是用来减少磁盘 I/O 操作的。
- 磁盘高速缓存有两个重要因素:
- 访问磁盘的速度要远低于访问内存的速度,若从处理器 L1 和 L2 高速缓存访问则速度更快。
- 数据一旦被访问,就很有可能短时间内再次访问。正是由于基于访问内存比磁盘快的多,所以磁盘的内存缓存将给系统存储性能带来质的飞越。
- 当一个进程准备读取磁盘上的文件内容时,操作系统会先查看待读取的数据所在的页(page)是否在页缓存(pagecache)中,如果存在(命中)则直接返回数据,从而避免了对物理磁盘的 I/O 操作;如果没有命中,则操作系统会向磁盘发起读取请求并将读取的数据页存入页缓存,之后再将数据返回给进程。
- 同样,如果一个进程需要将数据写入磁盘,那么操作系统也会检测数据对应的页是否在页缓存中,如果不存在,则会先在页缓存中添加相应的页,最后将数据写入对应的页。被修改过后的页也就变成了脏页,操作系统会在合适的时间把脏页中的数据写入磁盘,以保持数据的一致性。
- Kafka 中大量使用了页缓存,这是 Kafka 实现高吞吐的重要因素之一。虽然消息都是先被写入页缓存,然后由操作系统负责具体的刷盘任务的,但在 Kafka 中同样提供了同步刷盘及间断性强制刷盘(fsync),可以通过 log.flush.interval.messages 和 log.flush.interval.ms 参数来控制。
- 同步刷盘能够保证消息的可靠性,避免因为宕机导致页缓存数据还未完成同步时造成的数据丢失。
- 但是实际使用上,我们没必要去考虑这样的因素以及这种问题带来的损失,消息可靠性可以由多副本来解决,同步刷盘会带来性能的影响。
- 同步刷盘能够保证消息的可靠性,避免因为宕机导致页缓存数据还未完成同步时造成的数据丢失。
- 为了规避随机读写带来的时间消耗,kafka 采用顺序写的方式存储数据。
- Kafka 为何兵法吞吐量高?
- 生产端
- 通过消息压缩、消息批量缓存发送、异步解耦等方面提升吞吐量
- 服务端
- 采用的优化技术比较多,比如网络层的 Reactor 设计提升了网络层的吞吐;顺序写、页缓存、零拷贝时利用操作系统的优化点来实现存储层读写的吞吐量
- 消费端
- 通过线程异步解耦的方式提升了拉取消息的效率,进而提升消费者的吞吐量
- 生产端
- 为什么要使用 kafka?
- Apache RabbitMQ
- 概念
- broker:每个节点运行的服务程序,功能为维护该节点的队列的增删以及转发队列操作请求。
- master queue:每个队列都分为一个主队列和若干个镜像队列。
- mirror queue:镜像队列,作为 master queue 的备份。在 master queue 所在节点挂掉之后,系统把 mirror queue 提升为 master queue,负责处理客户端队列操作请求。注意,mirror queue 只做镜像,设计目的不是为了承担客户端读写压力。
- 架构
- 工作流程
- 队列消费
- 有两个 consumer 消费队列 A,这两个 consumer 连在了集群的不同机器上。RabbitMQ 集群中的任何一个节点都拥有集群上所有队列的元信息,所以连接到集群中的任何一个节点都可以,主要区别在于有的 consumer 连在 master queue 所在节点,有的连在非 master queue 节点上。
- 因为 mirror queue 要和 master queue 保持一致,故需要同步机制,正因为一致性的限制,导致所有的读写操作都必须都操作在 master queue 上,然后由 master 节点同步操作到 mirror queue 所在的节点。即使 consumer 连接到了非 master queue 节点,该 consumer 的操作也会被路由到 master queue 所在的节点上,这样才能进行消费。
- 队列生产
- 原理和消费一样,如果连接到非 master queue 节点,则路由过去。
- 队列消费
- RabbitMQ 的不足:由于 master queue 单节点,导致性能瓶颈,吞吐量受限。虽然为了提高性能,内部使用了 Erlang 这个语言实现,但是终究摆脱不了架构设计上的致命缺陷。
- 概念
- NSQ
- 阿里孵化开源的 Apache RocketMQ
- ActiveMQ
- 比较
特性 ActiveMQ RabbitMQ RocketMQ Kafka 单机吞吐量 万级,吞吐量比RocketMQ和Kafka要低了一个数量级 万级,吞吐量比RocketMQ和Kafka要低了一个数量级 10万级,RocketMQ也是可以支撑高吞吐的一种MQ 10万级别,这是kafka最大的优点,就是吞吐量高。一般配合大数据类的系统来进行实时数据计算、日志采集等场景 topic数量对吞吐量的影响 topic可以达到几百,几千个的级别,吞吐量会有较小幅度的下降。这是RocketMQ的一大优势,在同等机器下,可以支撑大量的topic topic从几十个到几百个的时候,吞吐量会大幅度下降。所以在同等机器下,kafka尽量保证topic数量不要过多。如果要支撑大规模topic,需要增加更多的机器资源 时效性 ms级 微秒级,这是rabbitmq的一大特点,延迟是最低的 ms级 延迟在ms级以内 可用性 高,基于主从架构实现高可用性 高,基于主从架构实现高可用性 非常高,分布式架构 非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 功能支持 MQ领域的功能极其完备 基于erlang开发,所以并发能力很强,性能极其好,延时很低 MQ功能较为完善,还是分布式的,扩展性好 功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用,是事实上的标准 优劣势总结 非常成熟,功能强大,在业内大量的公司以及项目中都有应用。偶尔会有较低概率丢失消息。而且现在社区以及国内应用都越来越少,官方社区现在对ActiveMQ 5.x维护越来越少,几个月才发布一个版本,而且确实主要是基于解耦和异步来用的,较少在大规模吞吐的场景中使用 erlang语言开发,性能极其好,延时很低;吞吐量到万级,MQ功能比较完备;而且开源提供的管理界面非常棒,用起来很好用;社区相对比较活跃,几乎每个月都发布几个版本分;在国内一些互联网公司近几年用rabbitmq也比较多一些;但是问题也是显而易见的,RabbitMQ确实吞吐量会低一些,这是因为他做的实现机制比较重。而且erlang开发,国内有几个公司有实力做erlang源码级别的研究和定制?如果说你没这个实力的话,确实偶尔会有一些问题,你很难去看懂源码,你公司对这个东西的掌控很弱,基本职能依赖于开源社区的快速维护和修复bug。而且rabbitmq集群动态扩展会很麻烦,不过这个我觉得还好。其实主要是erlang语言本身带来的问题。很难读源码,很难定制和掌控。 接口简单易用,而且毕竟在阿里大规模应用过,有阿里品牌保障。日处理消息上百亿之多,可以做到大规模吞吐,性能也非常好,分布式扩展也很方便,社区维护还可以,可靠性和可用性都是ok的,还可以支撑大规模的topic数量,支持复杂MQ业务场景,而且一个很大的优势在于,阿里出品都是java系的,我们可以自己阅读源码,定制自己公司的MQ,可以掌控。社区活跃度相对较为一般,不过也还可以,文档相对来说简单一些,然后接口这块不是按照标准JMS规范走的有些系统要迁移需要修改大量代码。还有就是阿里出台的技术,你得做好这个技术万一被抛弃,社区黄掉的风险,那如果你们公司有技术实力我觉得用RocketMQ挺好的 kafka的特点其实很明显,就是仅仅提供较少的核心功能,但是提供超高的吞吐量,ms级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展;同时kafka最好是支撑较少的topic数量即可,保证其超高吞吐量;而且kafka唯一的一点劣势是有可能消息重复消费,那么对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略,这个特性天然适合大数据实时计算以及日志收集
- Apache Kafka
缓存服务
- 阿里 Tair
- 业界的 Redis
- Memcached
- Ehcache
配置中心
- 阿里 Nacos
- 携程 Apollo
- 百度 Disconf
分布式事务
- 阿里 seata
- 腾讯 DTF
任务调度
- 阿里 SchedulerX
- 业界 xxl-job
- 大众点评员工徐雪里于 2015 年发布的分布式任务调度平台,是一个轻量级分布式任务调度框架,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。
- 架构
- 当当 elastic-job
- 当当开发的弹性分布式任务调度系统,功能丰富强大,采用 zookeeper 实现分布式协调,实现任务高可用以及分片,并且可以支持云开发,由两个相互独立的子项目 Elastic-Job-Lite 和 Elastic-Job-Cloud 组成。基于 Quartz ⼆次开发的。
- 有赞 TSP
数据库层
- 用于支持弹性扩容和分库分表的 TDDL
- 数据库连接池 Driud
- Binlog 同步的 Canal
- Canal 是阿里巴巴旗下的一款开源项目,纯 Java 开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了 MySQL(也支持 MariaDB)。
- Mycat
- 相关问题
- Sharding-JDBC 和 Mycat 的区别?
- 工作层次:Sharding-JDBC 实现了 JDBC 协议,工作在 JDBC 层;Mycat 可以当做一个 MySQL 数据库使用,其实就是在 Proxy 层的。
- 运行方式:Sharding-JDBC 只需要在工程中导入一个 Sharding-JDBC 的 jar 包,然后在配置文件中配置相应的数据源和分片策略即可;Mycat 则是需要单独提供一个端口为 8066 的服务,然后在 Mycat 的配置文件中配置相关的数据源和分片策略。
- 开发方式:Sharding-JDBC 只需要在配置文件中进行配置即可使用;Mycat 需要在其配置文件中修改数据源等一系列参数。
- 运维成本:Sharding-JDBC 的运维成本低,java 开发人员的维护成本高;Mycat 运维成本高,得配置 Mycat 的一系列参数以及高可用负载均衡的配置,需要一定的运维实力。
- 支持的语言:Sharding-JDBC 只支持 java 语言;Mycat 支持实现了 JDBC 规范的语言。
- Sharding-JDBC 和 Mycat 的区别?
- 相关问题
其他
- Zookeeper
- Zookeeper 是一个开源的分布式协调服务,由雅虎公司创建,由于最初雅虎公司的内部研究小组的项目大多以动物的名字命名,所以后来就以 Zookeeper(动物管理员)来命名了,而就是由 Zookeeper 来负责这些分布式组件环境的协调工作。
- 可以用 ZooKeeper 来做:统一配置管理、统一命名服务、分布式锁、集群管理。
- ZooKeeper 的数据结构,跟 Unix 文件系统非常类似,可以看做是一颗树,每个节点叫做 ZNode。每一个节点可以通过路径来标识
- Znode 类型:
- 短暂/临时(Ephemeral)
- 当客户端和服务端断开连接后,所创建的 Znode(节点)会自动删除
- 持久(Persistent)
- 当客户端和服务端断开连接后,所创建的 Znode(节点)不会删除
- 临时顺序
- ZK 会自动在这两种节点之后增加一个数字的后缀,而路径 + 数字后缀是能保证唯一的,这数字后缀的应用场景可以实现诸如分布式队列,分布式公平锁等。
- 持久顺序
- ZK 会自动在这两种节点之后增加一个数字的后缀,而路径 + 数字后缀是能保证唯一的,这数字后缀的应用场景可以实现诸如分布式队列,分布式公平锁等。
- 容器
- 容器节点是 3.5 以后新增的节点类型,只要在调用 create 方法时,指定 CreateMode 为 CONTAINER 即可创建容器的节点类型,容器节点的表现形式和持久节点是一样的,但是区别是 ZK 服务端启动后,会有一个单独的线程去扫描,所有的容器节点,当发现容器节点的子节点数量为 0 时,会自动删除该节点,除此之外和持久节点没有区别,官方注释给出的使用场景是 Container nodes are special purpose nodes useful for recipes such as leader, lock, etc. 说可以用在 leader 或者锁的场景中。
- 持久 TTL、持久顺序 TTL
- 带有存活时间。就是当该节点下面没有子节点的话,超过了 TTL 指定时间后就会被自动删除,特性跟上面的容器节点很像,只是容器节点没有超时时间而已,但是 TTL 启用是需要额外的配置(这个之前也有提过)配置是 zookeeper.extendedTypesEnabled 需要配置成 true,否则的话创建 TTL 时会收到 Unimplemented 的报错
- 短暂/临时(Ephemeral)
- ACL(access control list 访问控制列表)
- zookeeper 在分布式系统中承担中间件的作用,它管理的每一个节点上都可能存储着重要的信息,因为应用可以读取到任意节点,这就可能造成安全问题,ACL 的作用就是帮助 zookeeper 实现权限控制。
- zookeeper 的权限控制基于节点,每个 znode 可以有不同的权限。
- 子节点不会继承父节点的权限,访问不了该节点,并不代表访问不到其子节点。
- Schema: 鉴权策略
- world
- 默认方式,相当于全世界都能访问
- digest
- 即: “用户名+密码” 这种认证方式,也是业务中常用的
- ip
- 使用 IP 认证的方式
- auth
- 代表已经认证通过的用户(cli 中可以通过 addauth digest user:pwd 来添加当前上下文中的授权用户)
- world
- 授权对象
- world
- 只有一个 ID:“anyone”
- digest
- 自定义,通常是用户名:密码,在 ACl 中使用时,表达式将是 username:base64 编码的 SHA1.例如”admin:u53OoA8hprX59uwFsvQBS3QuI00=”(明文密码为123456)
- ip
- 通常是一个 Ip 地址或者是 Ip 段, 例如 192.168.xxx.xxx 或者 192.168.xxx.xxx/xxx
- super
- 与 digest 模式一样
- world
- 权限
- create
- 创建权限,授予权限的对象可以在数据节点下创建子节点;
- read
- 读取权限,授予权限的对象可以读取该节点的内容以及子节点的信息;
- write
- 更新权限,授予权限的对象可以更新该数据节点;
- delete
- 删除权限,授予权限的对象可以删除该数据节点的子节点;
- admin
- 管理者权限,授予权限的对象可以对该数据节点体进行 ACL 权限设置。
- create
- Znode 类型:
- 集群
- 角色介绍
- Leader
- Leader 不直接接受 client 的请求,但接受由其他 Follower 和 Observer 转发过来的 Client 请求,此外,Leader 还负责投票的发起和决议,即时更新状态和数据。
- Follower
- Follower 角色接受客户端请求并返回结果,参与 Leader 发起的投票和选举,但不具有写操作的权限。
- Observer
- Observer 角色接受客户端连接,将写操作转给 Leader,但 Observer 不参与投票(即不参加一致性协议的达成),只同步 Leader 节点的状态,Observer 角色是为集群系统扩展而生的。
- Leader
- ZAB(Zookeeper Atomic BroadCast)原子广播协议
- 在 zookeeper 中,只有一台服务器机器作为 leader 机器,所以当客户端链接到机器的某一个节点时
- 当这个客户端提交的是读取数据请求,那么当前连接的机器节点,就会把自己保存的数据返回出去。
- 当这个客户端提交的是写数据请求时,首先会看当前连接的节点是不是 leader 节点,如果不是 leader 节点则会转发出去到 leader 机器的节点上,由 leader 机器写入,然后广播出去通知其他的节点过来同步数据
- 在 ZAB 中的三个重点数据
- Zxid:是 zookeeper 中的事务 ID,总长度为 64 位的长度的 Long 类型数据。其中有两部分构成前 32 位是 epoch 后 32 位是 xid
- Epoch:每一个 leader 都会有一个这个值,表示当前 leader 获取到的最大 N 值,可以理解为“年代”
- Xid:事务 ID,表示当前 zookeeper 集群当前提交的事物 ID 是多少(watch 机制),方便选举的过程后不会出现事务重复执行或者遗漏等一些特殊情况。
- 在 zookeeper 中,只有一台服务器机器作为 leader 机器,所以当客户端链接到机器的某一个节点时
- 角色介绍
- 监听器
- 常见的监听场景有以下两项:
- 监听 Znode 节点的数据变化
- 监听子节点的增减变化
- 常见的监听场景有以下两项:
- 用途
- 统一配置管理
- 问题描述
- 比如我们现在有三个系统 A、B、C,他们有三份配置,分别是 ASystem.yml、BSystem.yml、CSystem.yml,然后,这三份配置又非常类似,很多的配置项几乎都一样。
- 此时,如果我们要改变其中一份配置项的信息,很可能其他两份都要改。并且,改变了配置项的信息很可能就要重启系统
- 于是,我们希望把 ASystem.yml、BSystem.yml、CSystem.yml 相同的配置项抽取出来成一份公用的配置 common.yml,并且即便 common.yml 改了,也不需要系统 A、B、C 重启。
- 做法
- 我们可以将 common.yml 这份配置放在 ZooKeeper 的 Znode 节点中,系统 A、B、C 监听着这个 Znode 节点有无变更,如果变更了,及时响应。
- 问题描述
- 统一命名服务
- 问题描述
- 统一命名服务的理解其实跟域名一样,是我们为这某一部分的资源给它取一个名字,别人通过这个名字就可以拿到对应的资源。
- 比如说,现在我有一个域名 www.java3y.com,但我这个域名下有多台机器:
- 192.168.1.1、192.168.1.2、192.168.1.3、192.168.1.4
- 别人访问 www.java3y.com 即可访问到我的机器,而不是通过 IP 去访问。
- 做法
- 问题描述
- 分布式锁
- 做法
- 系统 A、B、C 都去访问 /locks 节点
- 访问的时候会创建带顺序号的临时/短暂(EPHEMERAL_SEQUENTIAL)节点,比如,系统 A 创建了 id_000000 节点,系统 B 创建了 id_000002 节点,系统 C 创建了 id_000001 节点。
- 接着,拿到 /locks 节点下的所有子节点(id_000000,id_000001,id_000002),判断自己创建的是不是最小的那个节点
- 如果是,则拿到锁。
- 释放锁:执行完操作后,把创建的节点给删掉
- 如果不是,则监听比自己要小 1 的节点变化
- 如果是,则拿到锁。
- 例子
- 系统 A 拿到 /locks 节点下的所有子节点,经过比较,发现自己(id_000000),是所有子节点最小的。所以得到锁。
- 系统 B 拿到 /locks 节点下的所有子节点,经过比较,发现自己(id_000002),不是所有子节点最小的。所以监听比自己小 1 的节点 id_000001 的状态。
- 系统 C 拿到 /locks 节点下的所有子节点,经过比较,发现自己(id_000001),不是所有子节点最小的。所以监听比自己小 1 的节点 id_000000 的状态。
- 等到系统 A 执行完操作以后,将自己创建的节点删除(id_000000)。通过监听,系统 C 发现 id_000000 节点已经删除了,发现自己已经是最小的节点了,于是顺利拿到锁。
- 做法
- 集群状态
- 做法
- 三个系统 A、B、C,在 ZooKeeper 中创建临时节点
- 只要系统 A 挂了,那 /groupMember/A 这个节点就会删除,通过监听 groupMember 下的子节点,系统 B 和 C 就能够感知到系统 A 已经挂了。
- 除了能够感知节点的上下线变化,ZooKeeper 还可以实现动态选举 Master 的功能。
- 如果想要实现动态选举 Master 的功能,Znode 节点的类型是带顺序号的临时节点(EPHEMERAL_SEQUENTIAL)就好了。
- Zookeeper 会每次选举最小编号的作为 Master,如果 Master 挂了,自然对应的 Znode 节点就会删除。然后让新的最小编号作为 Master,这样就可以实现动态选举的功能了。
- 做法
- 统一配置管理
- 相关问题
- 说说 Watcher 监听机制和它的原理?
- Zookeeper 可以提供分布式数据的发布/订阅功能,依赖的就是 Watcher 监听机制。
- 客户端可以向服务端注册 Watcher 监听,服务端的指定事件触发之后,就会向客户端发送一个事件通知。
- 特性:
- 一次性:一旦一个 Watcher 触发之后,Zookeeper 就会将它从存储中移除
- 客户端串行:客户端的 Watcher 回调处理是串行同步的过程,不要因为一个 Watcher 的逻辑阻塞整个客户端
- 轻量:Watcher 通知的单位是 WatchedEvent,只包含通知状态、事件类型和节点路径,不包含具体的事件内容,具体的时间内容需要客户端主动去重新获取数据
- 流程
- 客户端向服务端注册 Watcher 监听
- 保存 Watcher 对象到客户端本地的 WatcherManager 中
- 服务端 Watcher 事件触发后,客户端收到服务端通知,从 WatcherManager 中取出对应 Watcher 对象执行回调逻辑
- Zookeeper 是如何保证数据一致性的?
- Zookeeper 通过 ZAB 原子广播协议来实现数据的最终顺序一致性,他是一个类似 2PC 两阶段提交的过程。
- 由于 Zookeeper 只有 Leader 节点可以写入数据,如果是其他节点收到写入数据的请求,则会将之转发给 Leader 节点。
- 主要流程:
- Leader 收到请求之后,将它转换为一个 proposal 提议,并且为每个提议分配一个全局唯一递增的事务 ID:zxid,然后把提议放入到一个 FIFO 的队列中,按照 FIFO 的策略发送给所有的 Follower
- Follower 收到提议之后,以事务日志的形式写入到本地磁盘中,写入成功后返回 ACK 给 Leader
- Leader 在收到超过半数的 Follower 的 ACK 之后,即可认为数据写入成功,就会发送 commit 命令给 Follower 告诉他们可以提交 proposal 了
- ZAB 包含两种基本模式,崩溃恢复和消息广播
- 整个集群服务在启动、网络中断或者重启等异常情况的时候,首先会进入到崩溃恢复状态,此时会通过选举产生 Leader 节点,当集群过半的节点都和 Leader 状态同步之后,ZAB 就会退出恢复模式。之后,就会进入消息广播的模式。
- Zookeeper 如何进行 Leader 选举的?
- Leader 的选举可以分为两个方面,同时选举主要包含事务 zxid 和 myid,节点主要包含 LEADING\FOLLOWING\LOOKING 3个状态。
- 不同时期选举
- 服务启动期间的选举
- 过程
- 首先,每个节点都会对自己进行投票,然后把投票信息广播给集群中的其他节点
- 节点接收到其他节点的投票信息,然后和自己的投票进行比较,首先 zxid 较大的优先,如果 zxid 相同那么则会去选择 myid 更大者,此时大家都是 LOOKING 的状态
- 投票完成之后,开始统计投票信息,如果集群中过半的机器都选择了某个节点机器作为 leader,那么选举结束
- 最后,更新各个节点的状态,leader 改为 LEADING 状态,follower 改为 FOLLOWING 状态
- 过程
- 服务运行期间的选举
- 如果开始选举出来的 leader 节点宕机了,那么运行期间就会重新进行 leader 的选举。
- leader 宕机之后,非 observer 节点都会把自己的状态修改为 LOOKING 状态,然后重新进入选举流程
- 生成投票信息(myid,zxid),同样,第一轮的投票大家都会把票投给自己,然后把投票信息广播出去
- 接下来的流程和上面的选举是一样的,都会优先以 zxid,然后选择 myid,最后统计投票信息,修改节点状态,选举结束
- 服务启动期间的选举
- 选举之后又是怎样进行数据同步的?
- 实际上 Zookeeper 在选举之后,Follower 和 Observer(统称为 Learner)就会去向 Leader 注册,然后就会开始数据同步的过程。
- 数据同步包含 3 个主要值和 4 种形式。
- 3 个主要值
- PeerLastZxid:Learner 服务器最后处理的 ZXID
- minCommittedLog:Leader 提议缓存队列中最小 ZXID
- maxCommittedLog:Leader 提议缓存队列中最大 ZXID
- 4 种形式
- 直接差异化同步(DIFF 同步)
- 流程
- 首先 Leader 向 Learner 发送 DIFF 指令,代表开始差异化同步,然后把差异数据(从 PeerLastZxid 到 maxCommittedLog 之间的数据)提议 proposal 发送给 Learner
- 发送完成之后发送一个 NEWLEADER 命令给 Learner,同时 Learner 返回 ACK 表示已经完成了同步
- 接着等待集群中过半的 Learner 响应了 ACK 之后,就发送一个 UPTODATE 命令,Learner 返回 ACK,同步流程结束
- 流程
- 先回滚再差异化同步(TRUNC+DIFF 同步)
- 问题描述
- 如果 Leader 刚生成一个 proposal,还没有来得及发送出去,此时 Leader 宕机,重新选举之后作为 Follower,但是新的 Leader 没有这个 proposal 数据。
- 例子
- 假设现在的 Leader 是 A,minCommittedLog=1,maxCommittedLog=3,刚好生成的一个 proposal 的 ZXID=4,然后挂了。
- 重新选举出来的 Leader 是 B,B 之后又处理了 2 个提议,然后 minCommittedLog=1,maxCommittedLog=5。
- 这时候A的 PeerLastZxid=4,在(1,5)之间。
- 处理方式
- A 要进行事务回滚,相当于抛弃这条数据,并且回滚到最接近于 PeerLastZxid 的事务,对于 A 来说,也就是 PeerLastZxid=3。
- 流程
- 流程和 DIFF 一致,只是会先发送一个 TRUNC 命令,然后再执行差异化 DIFF 同步。
- 问题描述
- 仅回滚同步(TRUNC 同步)
- 针对 PeerLastZxid 大于 maxCommittedLog 的场景,流程和上述一致,事务将会被回滚到 maxCommittedLog 的记录。
- 例子
- 可以认为 TRUNC+DIFF 中的例子,新的 Leader B没有处理提议,所以 B 中 minCommittedLog=1,maxCommittedLog=3。
- 所以 A 的 PeerLastZxid=4 就会大于 maxCommittedLog 了,也就是 A 只需要回滚就行了,不需要执行差异化同步 DIFF 了。
- 全量同步(SNAP 同步)
- 适用于两个场景:
- PeerLastZxid 小于 minCommittedLog
- Leader 服务器上没有提议缓存队列,并且 PeerLastZxid 不等于 Leader 的最大 ZXID
- 适用于两个场景:
- 直接差异化同步(DIFF 同步)
- 有可能会出现数据不一致的问题吗?
- 查询不一致
- 因为 Zookeeper 是过半成功即代表成功,假设我们有 5 个节点,如果 123 节点写入成功,如果这时候请求访问到 4 或者 5 节点,那么有可能读取不到数据,因为可能数据还没有同步到 4、5 节点中,也可以认为这算是数据不一致的问题。
- 解决方案可以在读取前使用 sync 命令。
- leader 未发送 proposal 宕机
- 这也就是数据同步说过的问题。
- leader 刚生成一个 proposal,还没有来得及发送出去,此时 leader 宕机,重新选举之后作为 follower,但是新的 leader 没有这个 proposal。
- 这种场景下的日志将会被丢弃。
- leader 发送 proposal 成功,发送 commit 前宕机
- 如果发送 proposal 成功了,但是在将要发送 commit 命令前宕机了,如果重新进行选举,还是会选择 zxid 最大的节点作为 leader,因此,这个日志并不会被丢弃,会在选举出 leader 之后重新同步到其他节点当中。
- 查询不一致
- 如果作为注册中心,Zookeeper 和 Eureka、Consul、Nacos 有什么区别?
- 说说 Watcher 监听机制和它的原理?