2024年2月6日发(作者:)
Kafka通讯协议指南
中英文术语对照
为避免歧义,大部分的英文术语找不到合适中文对应时都保持英文原文,Kafka中一些基本术语也使用英文,其中一部分通过括号加入英文原文;另外,文中可能使用到的中英文术语包括但不限于:
英文
Metadata
offset
Comsumer
Topic
API
中文
元数据
偏移量
消费者
主题
接口
Comsumer Group 消费者组
Coordinator 协调器
1 简介
此文档涵盖了Kafka 0.8及之前版本的通讯协议实现。其目的是提供一个包含的可请求的协议及其二进制格式以及如何正确使用他们来实现一个客户端的通讯协议文档。本文假设您已经了解了Kafka基本的设计以及术语。
0.7和更早的版本所使用的协议与此类似,但我们(希望)通过一次性地斩断兼容性,以便清理原有设计上的沉疴,并且泛化一些概念。
如果遇到无法理解的情况,请参照英文原文
2 概述
卡夫卡协议是相当简单的,只有六个核心的客户端请求的API:
1. 元数据(Metadata) – 描述可用的brokers,包括他们的主机和端口信息,并给出了每个broker上分别存有哪些分区;
2. 发送(Send) – 发送消息到broker;
3. 获取(Fetch) – 从broker获取消息,其中,一个获取数据,一个获取集群的元数据,还有一个获取topic的偏移量信息;
4. 偏移量(Offsets) – 获取给定topic的分区的可用偏移量信息;
5. 偏移量提交(Offset Commit) – 提交消费者组(Comsumer
Group)的一组偏移量;
6. 偏移量获取(Offset Fetch) – 获取一个消费者组的一组偏移量;
上述的API都将在下面详细说明。此外,从0.9版本开始,Kafka支持为消费者和Kafka连接进行分组管理。客户端API包括五个请求:
1. 分组协调者(GroupCoordinator) – 用来定位一个分组当前的协调者。
2. 加入分组(JoinGroup) – 成为某一个分组的一个成员,当分组不存在(没有一个成员时)创建分组。
3. 同步分组(SyncGroup) – 同步分组中所有成员的状态(例如分发分区分配信息(Partition Assignments)到各个组员)。
4. 心跳(Heartbeat) – 保持组内成员的活跃状态。
5. 离开分组(LeaveGroup) – 直接离开一个组。
最后,有几个管理API,可用于监控/管理的卡夫卡集群(KIP-4完成时,这个列表将增长):
1. 描述消费者组(DescribeGroups) – 用于检查一组群体的当前状态(如:查看消费者分区分配)。
2. 1.列出组(ListGroups) – 列出某一个broker当前管理的所有组
3 开始
网络
Kafka使用基于TCP的二进制协议。该协议定义了所有API的请求及响应消息。所有消息都是有长度限制的,并且由后面描述的基本类型组成。
客户端启动的socket连接,并且写入请求的消息序列和读回相应的响应消息。连接和断开时均不需要握手消息。如果保持你保持长连接,那么TCP协议本身将会节省很多TCP握手时间,但如果真的重新
建立连接,那么代价也相当小。
客户可能需要维持到多个broker的连接,因为数据是被分区的,而客户端需要和存储这些分区的broker服务器进行通讯。当然,一般而言,不需要为单个服务端和单个客户端间维护多个连接(即连接池技术)。
服务器的保证单一的TCP连接中,请求将被顺序处理,响应也将按该顺序返回。为保证broker的处理请求的顺序,单个连接同时也只会处理一个请求指令。请注意,客户端可以(也应该)使用非阻塞IO实现请求流水线,从而实现更高的吞吐量。也就是说,客户可以在等待上次请求应答的同时发送下个请求,因为待完成的请求将会在底层操作系统套接字缓冲区进行缓冲。除非特别说明,所有的请求是由客户端启动,并从服务器获取到相应的响应消息。
服务器能够配置请求大小的最大限制,超过这个限制将导致socket连接被断开。
分区和引导(Partitioning and bootstrapping)
Kafka是一个分区系统,所以不是所有的服务器都具有完整的数据集。主题(Topic)被分为P(预先定义的分区数量)个分区,每个分区被复制N(复制因子)份,Topic Partition根据顺序在“提交日志”中编号为0,1,…,P。
所有具有这种特性的系统都有一个如何制定某个特定数据应该被分配给哪个特定的分区的问题。Kafka中它由客户端直接控制分配策略,broker则没有特别的语义来决定消息发布到哪个分区。相反,生产者直接将消息发送到一个特定的分区,提取消息时,消费者也直接从某个特定的分区获取。如果两个生产者要使用相同的分区方案,那么他们必须用同样的方法来计算Key到分区映射关系。
这些发布或获取数据的请求必须发送到指定分区中作为leader的broker。此条件同时也会由broker保证,发送到不正确的broker的请求将会返回NotLeaderForPartition错误代码(后文所描述的)。
那么客户端如何找出哪些主题存在,他们有什么分区,以及这些
分区被哪些broker存取,以便它可以直接将请求发送到所在的主机?这个信息是动态的,因此你不能只是提供每个客户端一些静态映射文件。所有的Kafka broker都可以回答这个描述集群的当前状态的数据请求:有哪些主题,这些主题都有多少分区,哪个broker是这些分区的Leader,以及这些broker主机的地址和端口信息。
换句话说,客户端只需要找到一个broker,broker将会告知客户端所有其他存在的broker,以及这些broker上面的所有分区。这个broker本身也可能会掉线,因此客户端实现的最佳做法是保存两个或三个broker地址,从而来引导列表。用户可以选择使用负载均衡器或只是静态地配置两个或三个客户的Kafka主机。
客户并不需要轮询地查看集群是否已经改变;它可以等到它接收到所用的元数据是过时的错误信息时一次性更新元数据。这中错误有两种形式:(1)一个套接字错误指示客户端不能与特定的broker进行通信,(2)请求响应表明该broker不再是其请求数据分区的Leader的错误。
1. 轮询“起始”Kafka的URL列表,直到我们找到一个我们可以连接到的broker。获取集群元数据。
2. 处理获取数据或者存储消息请求,根据这些请求所发送的主题和分区,将这些请求发送到合适的broker。
3. 如果我们得到一个适当的错误(显示元数据已经过时时),刷新元数据,然后再试一次。
分区策略(Partitioning Strategies)
上面提到消息的分区分配是由生产者客户端控制,那么,为什么要把这个功能被暴露给最终用户?
在Kafka中,这样分区有两个目的:
1. 它平衡了broker的数据和请求负载
2. 它允许多个消费者之间处理分发消息的同时,能够维护本地状态,并且在分区中维持消息的顺序。我们称这种语义的分区(semantic partitioning)。
对于给定的使用场景下,你可能只关心其中的一个或两个。
为了实现简单的负载均衡,一个简单的策略是客户端发布消息是对所有broker进行轮询请求(round robin requests)。另一种选择,在那些生产者比消费者多的场景下,给每个客户机随机选择并发布消息到该分区。后一种的策略能够使用少得多的TCP连接。
语义分区是指使用关键字(key)来决定消息分配的分区。例如,如果你正在处理一个点击消息流时,可能需要通过用户ID来划分流,使得特定用户的所有数据会被单个消费者消费。要做到这一点,客户端可以采取与消息相关联的关键字,并使用关键字的某个Hash值来选择的传送的分区。
批处理(Batching)
我们的API鼓励将小的请求批量处理以提高效率。我们发现这能非常显著地提升性能。我们两个用来发送消息和获取消息的API,总是以一连串的消息工作,而不是单一的消息,从而鼓励批处理操作。聪明的客户端可以利用这一点,并支持“异步”操作模式,以此进行批处理哪些单独发送的消息,并把它们以较大的块进行发送。我们再进一步允许跨多个主题和分区的批处理,所以生产请求可能包含追加到许多分区的数据,一个读取请求可以一次性从多个分区提取数据的。
当然,如果他们喜欢,客户端实现者可以选择忽略这一点,所有消息一次都发送一个。
版本和兼容性(Versioning and Compatibility)
该协议的目的要达到在向后兼容的基础上渐进演化。我们的版本是基于每个API基础之上,每个版本包括一个请求和响应对。每个请求包含API Key,里面包含了被调用的API标识,以及表示这些请求和响应格式的版本号。
这样做的目的是允许客户端执行相应特定版本的请求。目标主要是为了在不允许停机的环境下进行更新,这种环境下,客户端和服务器不能一次性都切换所使用的API。
服务器将拒绝它不支持的版本的请求,并始终返回它期望收到的能够完成请求响应的版本的协议格式。预期的升级路径方式是,新功能将首先部署到服务器(老客户端无法完全利用他们的新功能),然后随着新的客户端的部署,这些新功能将逐步被利用。
目前,所有版本基线为0,当我们演进这些API时,我们将分别显示每个版本的格式。
4 通讯协议(The Protocol)
协议基本数据类型(Protocol Primitive Types)
The protocol is built out of the following primitive types.
该协议是建立在下列基本类型之上。
•
定长基本类型(Fixed Width Primitives)
•
int8, int16, int32, int64 – 不同精度(以bit数区分)的带符号整数,以大端(Big Endiam)方式存储.
•
变长基本类型(Variable Length Primitives)
•
bytes, string – 这些类型由一个表示长度的带符号整数N以及后续N字节的内容组成。长度如果为-1表示空(null). string 使用int16表示长度,bytes使用int32.
•
数组(Arrays)
•
这个类型用来处理重复的结构体数据。他们总是由一个代表元素个数int32整数N,以及后续的N个重复结构体组成,这些结构体自身是有其他的基本数据类型组成。我们后面会用BNF语法展示一个foo的结构体数组[foo]
请求格式语法要点(Notes on reading the request format
grammars)
后面的BNF确切地以上下文无关的语法展示了请求和响应的二进制格式。每个API都会一起给出请求和响应,以及所有的子定义(sub-definitions)。BNF使用没有经过缩写的便于阅读的名称(比如我使用一个符号化了得名称来定义了一个生产者错误码,即便它只
是int16整数)。一般在BNF中,一个序列表示一个连接,所以下面给出的MetadataRequest将是一个含有VersionId,然后clientId,然后TopicNames的阵列(每一个都有其自身的定义)。自定义类型一般使用驼峰法拼写,基本类型使用全小写方式乒协。当存在多中可能的自定义类型时,使用’|’符号分割,并且用括号表示分组。顶级定义不缩进,后续的子部分会被缩进。
一般的请求和响应格式(Common Request and Response
Structure)
所有请求和响应都从以下语法起源,其余的会在本文剩下部分中进行增量描述:
1.
2. RequestOrResponse
ResponseMessage)
3.
4.
5. Size => int32
6.
域(FIELD)
描述
=> Size (RequestMessage |
MessageSize 域给出了 后续请求或响应消息的字节(bytes)长MessageSize
度。客户端可以先读取4字节的长度N,然后读取并解析后续的N
字节请求内容。
请求(Requests)
所有请求都具有以下格式:
1.
2. RequestMessage => ApiKey
ClientId RequestMessage
3.
4.
5. ApiKey => int16
6.
7.
8. ApiVersion => int16
9.
10.
11. CorrelationId => int32
12.
ApiVersion CorrelationId
13.
14.
15.
16.
17.
RequestMessage
|
=> MetadataRequest
| OffsetRequest
|
| FetchRequest
ClientId => string
ProduceRequest
18.
域(FIELD)
ApiKey
OffsetCommitRequest | OffsetFetchRequest
描述
这是一个表示所调用的API的数字id(即它表示是一个元数据请求?生产请求?获取请求等).
这是该API的一个数字版本号。我们为每个API定义一个版本号,该版本号允许服务器根据版本号正确地解释请求内容。响ApiVersion
应消息也始终对应于所述请求的版本的格式。目前所有API的支持版本为0。
这是一个用户提供的整数。它将会被服务器原封不动地回传给CorrelationId
客户端。用于匹配客户机和服务器之间的请求和响应。
这是为客户端应用程序的自定义的标识。用户可以使用他们喜欢的任何标识符,他们会被用在记录错误时,监测统计信息等场景。例如,你可能不仅想要监视每秒的总体请求,还要根据客户端应用程序进行监视,那它就可以被用上(其中每一个都将驻留在多个服务器上)。这个ID作为特定的客户端对所有的请求的逻辑分组。
ClientId
下面我们就来描述各种请求和响应消息。
响应(Responses)
1.
2. Response => CorrelationId ResponseMessage
3.
4.
5. CorrelationId => int32
6.
7.
8. ResponseMessage
ProduceResponse
9.
域(FIELD) 描述
服务器传回给客户端它所提供用作关联请求和响应消息的整CorrelationId
数。
=> MetadataResponse
| OffsetResponse
|
| | FetchResponse
OffsetCommitResponse | OffsetFetchResponse
所有响应都是与请求成对匹配(例如,我们将发送回一个元数据请求,会得到一个元数据响应)。
消息集(Message sets)
生产和获取消息指令请求共享同一个消息集结构。在Kafka中,消息是由一个键值对以及少量相关的元数据组成。消息集知识一个有偏移量和大小信息的消息序列。这种格式正好即可用于在broker上的
磁盘上存储,也可用在线上数据交换。
消息集也是Kafka中的压缩单元,我们也允许消息递归包含压缩消息从而允许批量压缩。
注意, 在通讯协议中,消息集之前没有类似的其他数组元素的int32。
1.
2. MessageSet => [Offset MessageSize Message]
3.
4.
5. Offset => int64
6.
7.
8. MessageSize => int32
9.
消息格式
1.
2. Message => Crc MagicByte Attributes Key Value
3.
4.
5. Crc => int32
6.
7.
8. MagicByte => int8
9.
10.
11.
12.
13.
14.
15.
Key => bytes
Attributes => int8
16.
17.
18.
域(FIELD)
Offset
Crc
MagicByte
Value => bytes
描述
这是在Kafka中作为日志序列号使用的偏移量。当生产者发送消息,实际上它并不知道偏移量的具体值,这时候它可以填写任意值。
Crc是的剩余消息字节的CRC32值。broker和消费者可用来检查信息的完整性。
这是一个用于允许消息二进制格式的向后兼容演化的版本id。当前值是0。
这个字节保存有关信息的元数据属性。最低的2位包含用于消息Attributes
的压缩编解码器。其他位应该被设置为0。
Key
Value
Key是一个可选项,它主要用来进行指派分区。Key可以为null.
Value是消息的实际内容,类型是字节数组。Kafka支持本身递归包含,因此本身也可能是一个消息集。消息可以为null。
压缩(Compression)
Kafka支持压缩多条消息以提高效率,当然,这比压缩一条原始消息要来得复杂。因为单个消息可能没有足够的冗余信息以达到良好的压缩比,压缩的多条信息必须以特殊方式批量发送(当然,如果真的需要的话,你可以自己压缩批处理的一个消息)。要被发送的消息被包装(未压缩)在一个MessageSet结构中,然后将其压缩并存储在一个单一的“消息”中,一起保存的还有相应的压缩编解码集。接收系统通过解压缩得到实际的消息集。外层MessageSet应该只包含一个压缩的“消息”(详情见Kafka-1718)。
卡夫卡目前支持一下两种压缩编解码器编号:
压缩算法(COMPRESSION) 编码器编号(CODEC)
None
GZIP
Snappy
0
1
2
接口(The APIs)
本节将给出每个API的用法、二进制格式,以及它们的字段的含义的细节。
元数据接口(Metadata API)
这个API回答下列问题:
•
存在哪些主题(Topic)?
•
每个主题有几个分区(Partition)?
•
每个分区的•
这些Leader分别是哪个broker?
broker的地址和端口分别是什么?
这是唯一一个能发往集群中任意一个broker的请求消息。
因为可能有很多主题,客户端可以给一个的可选主题名列表,以便只返回主题元数据的一个子集。
返回的元数据是在分区级别,为了方便和以避免冗余,以主题为组集中在一起。每个分区的元数据中包含了leader以及所有副本以及正在同步的副本的信息。
注意: 如果broker配置中设置了””,
主题元数据请求将会以默认的复制因子和默认的分区数为参数创建主题。
主题元数据请求(Topic Metadata Request)
1.
2. TopicMetadataRequest => [TopicName]
3.
4.
5. TopicName => string
6.
域(FIELD) 描述
TopicName 要获取元数据的主题数组。 如果为空,就返回所有主题的元数据
元数据反馈(Metadata Response)
响应包含的每个分区的元数据,这些分区元数据以主题为组组装在一起。该元数据以broker id来指向具体的broker。每个broker有一个地址和端口。
1.
2. MetadataResponse => [Broker][TopicMetadata]
3.
4.
5. Broker => NodeId Host Port (any number of brokers may
be returned)
6.
7.
8. NodeId => int32
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
PartitionMetadata => PartitionErrorCode
TopicErrorCode => int16
TopicMetadata => TopicErrorCode TopicName
Port => int32
Host => string
[PartitionMetadata]
PartitionId Leader Replicas Isr
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
Replicas => [int32]
Leader => int32
PartitionId => int32
PartitionErrorCode => int16
38.
39.
域(FIELD)
Leader
Isr => [int32]
描述
该分区作为Leader节点的Kafka broker id。如果在一个Leader选举过程中,没有Leader存在,这个id将是-1。
副本集合中,所有处在与Leader跟随(“caught up”,表示数据已经完全复制到这些节点)状态的子集
kafka broker节点的id, 主机名, 端口信息
Replicas 该分区中,其他活着的作为slave的节点集合。
Isr
Broker
可能的错误码(Possible Error Codes)
•
UnknownTopic (3)
•
LeaderNotAvailable (5)
•
InvalidTopic (17)
•
TopicAuthorizationFailed (29)
生产者接口(Produce API)
生产者API用于将消息集发送到服务器。为了提高效率,它允许在单个请求中发送多个不同主题的不同分区的消息。
生产者API使用通用的消息集格式,但由于发送时还没有被分配偏移量,因此可以任意填写该值。
生产消息请求(Produce Request)
1.
2. ProduceRequest => RequiredAcks Timeout [TopicName
[Partition MessageSetSize MessageSet]]
3.
4.
5. RequiredAcks => int16
6.
7.
8. Timeout => int32
9.
10.
11.
12.
13.
14.
15.
域(FIELD) 描述
这个值表示服务端收到多少确认后才发送反馈消息给客户端。如果设置为0,那么服务端将不发送反馈消息(这是唯一RequiredAcks
的服务端不发送反馈消息的情况)。如果这个值为1,那么服务器将等到数据写入到本地日之后发送反馈消息。如果这个
Partition => int32
MessageSetSize => int32
值是-1,那么服务端将阻塞,知道这个消息被所有的同步副本写入后再反馈响应消息。对于其他大于1的值,服务端将会阻塞,直到收到这个数量的写入反馈后再反馈响应消息(但服务器不会等大于同步中副本的数量,即达到同步中复本个数后,会停止等待,即使所填的值大于这个副本个数)。
这个值提供了以毫秒为单位的超时时间,服务器可以在这个时间内可以等待接收所需的Ack确认的数目。超时并非一个确切的限制,有以下原因:(1)不包括网络延迟,(2)计时器开始在这一请求的处理开始,所以如果有很多请求,由于服务器负载而导致的排队等待时间将不被包括在内,(3)如果本地写入时间超过超时,我们将不会终止本地写操作,这样这个超时时间就不会得到遵守。要使硬超时时间,客户端应该使用套接字超时。
该数据将会发布到的主题名称
该数据将会发布到的分区
上面描述的标准格式的消息集合
Timeout
TopicName
Partition
MessageSet
MessageSetSize 后续消息集的长度,字节为单位
生产消息响应(Produce Response)
1.
2. ProduceResponse => [TopicName [Partition ErrorCode
Offset]]
3.
4.
5. TopicName => string
6.
7.
8. Partition => int32
9.
10.
11.
12.
13.
14.
15.
域
Topic 此响应对应的主题。
Partition 此响应对应的分区。
如果有,此分区对应的错误信息。错误以分区为单位提供,因为可ErrorCode 能存在给定的分区不可用或者被其他的主机维护(非Leader),但是其他的分区的请求操作成功的情况
Offset 追加到该分区的消息集中的分配给第一个消息的偏移量。
描述
ErrorCode => int16
Offset => int64
可能的错误码(Possible Error Codes):(未完待续 TODO)
获取消息接口(Fetch API)
获取消息接口用于获取一些主题分区的一个或多个的日志块。逻辑上根据指定主题,分区和消息起始偏移量开始获取一批消息。在一般情况下,返回消息的偏移量将大于或等于开始偏移量。然而,如果是压缩消息,有可能返回的消息的偏移量比起始偏移量小。这类的消
息的数量通常较少,并且调用者必须负责过滤掉这些消息。
获取数据指令请求遵循一个长轮询模型,如果没有足够数量的消息可用,它们可以阻塞一段时间。
作为优化,服务器被允许在消息集的末尾返回部分消息。客户应处理这种情况。
有一点要注意的是,获取消息API需要指定消费的分区。现在的问题是如何让消费者知道消费哪个分区?特别地,作为一组消费者,如何使得每个消费者获取分区的一个子集,并且平衡这些分区。我们使用zookeeper动态地为Scala和Java客户端完成这个任务。这种方法的缺点是,它需要一个相当胖的客户端并且需要客户端与zookeeper联系。我们尚未创建一个Kafka接口(API),允许该功能被移动到在服务器端并被更方便地访问。一个简单的消费者的客户端可以通过配置指定访问的分区,但这样将不能在某些消费者失效后做到分区的动态重新分配。我们希望能在下一个主要版本解决这一空白。
数据获取请求(Fetch Request)
1.
2. FetchRequest => ReplicaId MaxWaitTime MinBytes
[TopicName [Partition FetchOffset MaxBytes]]
3.
4.
5. ReplicaId => int32
6.
7.
8. MaxWaitTime => int32
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
FetchOffset => int64
Partition => int32
TopicName => string
MinBytes => int32
22.
23.
24.
域 描述
副本ID的是发起这个请求的副本节点ID。普通消费者客户端应该始终将其指定为-1,因为他们没有节点ID。其他broker设置ReplicaId
他们自己的节点ID。基于调试目的,以非代理身份模拟副本broker发出获取数据指令请求时,这个值填-2。
如果没有足够的数据可发送时,最大阻塞等待时间,以毫秒为单MaxWaitTime
位。
返回响应消息的最小字节数目,必须设置。如果客户端将此值设为0,服务器将会立即返回,但如果没有新的数据,服务端会返回一个空消息集。如果它被设置为1,则服务器将在至少一个分区收到一个字节的数据的情况下立即返回,或者等到超时时间达MinBytes 到。通过设置较高的值,结合超时设置,消费者可以在牺牲一点实时性能的情况下通过一次读取较大的字节的数据块从而提高的吞吐量(例如,设置MaxWaitTime至100毫秒,设置MinBytes为64K,将允许服务器累积数据达到64K前等待长达100ms再响应)。
TopicName 主题(topic)名称
Partition 获取数据的Partition id
FetchOffset 获取数据的起始偏移量
MaxBytes
此分区返回消息集所能包含的最大字节数。这有助于限制响应消息的大小。
MaxBytes => int32
获取消息响应(Fetch Response)
1.
2. FetchResponse => [TopicName [Partition ErrorCode
HighwaterMarkOffset MessageSetSize MessageSet]]
3.
4.
5. TopicName => string
6.
7.
8. Partition => int32
9.
10.
11.
12.
13.
14.
15.
16.
17.
MessageSetSize => int32
HighwaterMarkOffset => int64
ErrorCode => int16
18.
域
TopicName
Partition
描述
返回消息所对应的主题(Topic)名称。
返回消息所对应的分区id。
此分区日志中最末尾的偏移量。此信息可被客户端用来HighwaterMarkOffset
确定后面还有多少条消息。
MessageSetSize
MessageSet
此分区中消息集的字节长度
此分区获取到的消息集,格式与之前描述相同
可能的错误码(Possible Error Codes)
•
OFFSET_OUT_OF_RANGE (1)
•
UNKNOWN_TOPIC_OR_PARTITION (3)
•
NOT_LEADER_FOR_PARTITION (6)
•
REPLICA_NOT_AVAILABLE (9)
•
UNKNOWN (-1)
偏移量接口(又称ListOffset)(Offset API)
此API描述了一个主题分区的偏移量有效范围。生产者和获取数据API的请求必须发送到分区Leader所在的broker上,这需要通过使用元数据的API来确定。
响应包含分区的起始偏移量以及“日志末端偏移量”,即,将被追加到给定分区中的下一个消息的偏移量。
我们也觉得这个API是稍微有点时髦。
偏移量请求(Offset Request)
1.
2. OffsetRequest => ReplicaId [TopicName [Partition Time
MaxNumberOfOffsets]]
3.
4.
5. ReplicaId => int32
6.
7.
8. TopicName => string
9.
10.
11.
12.
13.
14.
15.
16.
17.
MaxNumberOfOffsets => int32
Time => int64
Partition => int32
18.
域
描述
用来请求一定时间(毫秒)前的所有消息。这里有两个特殊取值:-1表示获取最后一个offset(也就是后面即将到来消息的offset值); -2表Time
示获取最早的有效偏移量。注意,因为获取到偏移值都是降序排序,因此请求最早Offset的请求将总是返回一个值
偏移量响应(Offset Response)
1.
2. OffsetResponse => [TopicName [PartitionOffsets]]
3.
4.
5. PartitionOffsets => Partition ErrorCode [Offset]
6.
7.
8. Partition => int32
9.
10.
11.
12.
13.
14.
15.
ErrorCode => int16
Offset => int64
可能的错误吗(Possible Error Codes)
•
o
UNKNOWN_TOPIC_OR_PARTITION (3)
•
NOT_LEADER_FOR_PARTITION (6)
•
UNKNOWN (-1)
偏移量提交/获取接口(Offset Commit/Fetch API)
这些API使得偏移量的能够集中管理。了解更多偏移量管理。按照Kafka-993的评论,直到Kafka 0.8.1.1,这些API调用无法完全正常使用,他们这将在0.8.2版本中提供。
消费者组协调员请求(Group Coordinator Request)
消费者组(Consumer Group)偏移量信息,由一个特定的broker维护,这个broker称为消费者组协调员。即消费者需要向从这个特定的broker提交和获取偏移量。可以通过发出一组协调员发现请求从而获得当前协调员信息。
1.
2. GroupCoordinatorRequest => GroupId
3.
4.
5. GroupId => string
6.
消费者组协调员响应(Group Coordinator Response)
1.
2. GroupCoordinatorResponse => ErrorCode CoordinatorId
CoordinatorHost CoordinatorPort
3.
4.
5. ErrorCode => int16
6.
7.
8. CoordinatorId => int32
9.
10.
11.
12.
13.
14.
15.
可能的错误码(Possible Error Codes)
•
GROUP_COORDINATOR_NOT_AVAILABLE (15)
•
NOT_COORDINATOR_FOR_GROUP (16)
•
GROUP_AUTHORIZATION_FAILED (30)
CoordinatorHost => string
CoordinatorPort => int32
偏移量提交请求(Offset Commit Request)
1.
2. v0 (在0.8.1及之后的版本中支持)
3.
4.
5. OffsetCommitRequest => ConsumerGroupId [TopicName
[Partition Offset Metadata]]
6.
7.
8. ConsumerGroupId => string
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
Offset => int64
Partition => int32
TopicName => string
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
OffsetCommitRequest => ConsumerGroupId
v1 (在0.8.2及之后的版本中支持)
Metadata => string
ConsumerGroupGenerationId ConsumerId [TopicName [Partition
Offset TimeStamp Metadata]]
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
Partition => int32
TopicName => string
ConsumerId => string
ConsumerGroupGenerationId => int32
ConsumerGroupId => string
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
v2 (在0.8.3及之后的版本中支持)
Metadata => string
TimeStamp => int64
Offset => int64
64.
65.
OffsetCommitRequest => ConsumerGroup
RetentionTime ConsumerId ConsumerGroupGenerationId
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
[TopicName [Partition Offset Metadata]]
ConsumerGroupId => string
ConsumerGroupGenerationId => int32
ConsumerId => string
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
RetentionTime => int64
TopicName => string
Partition => int32
Offset => int64
Metadata => string
在V0和v1版本中,每个分区的时间戳作为提交时间戳定义,偏
移量协调员将保存消费者所提交的偏移量,直到当前时间超过提交时间戳+偏移量保留时间,此偏移量保留时间在broker配置中指定;如果时间错域没有设值,那么broker会将此值设定为接收到提交偏移量请求的时间,用户可以通过设置这个提交时间戳达到延长偏移量保存时间的目的。
在v2版本中,我们移除了时间戳域,但是增加了一个全局保存时间域(详情参见KAFKA-1634);broker会设置提交时间戳为接收到请求的时间,但是提交的偏移量能被保存到提交请求中用户指定的保存时间,如果这个保存时间没有设值,那么broker会使用默认的保存时间。
偏移量提交响应(Offset Commit Response)
1.
2. v0, v1 and v2:
3.
4.
5. OffsetCommitResponse
ErrorCode]]]
6.
7.
8. TopicName => string
9.
=> [TopicName [Partition
10.
11.
12.
13.
14.
15.
可能的错误码(Possible Error Codes)
•
OFFSET_METADATA_TOO_LARGE (12)
•
GROUP_LOAD_IN_PROGRESS (14)
•
GROUP_COORDINATOR_NOT_AVAILABLE (15)
•
NOT_COORDINATOR_FOR_GROUP (16)
•
ILLEGAL_GENERATION (22)
•
UNKNOWN_MEMBER_ID (25)
•
REBALANCE_IN_PROGRESS (27)
•
INVALID_COMMIT_OFFSET_SIZE (28)
•
TOPIC_AUTHORIZATION_FAILED (29)
•
GROUP_AUTHORIZATION_FAILED (30)
Partition => int32
ErrorCode => int16
偏移量获取请求(Offset Fetch Request)
根据KAFKA-1841的注释,V0和V1是在上是相同的,但V0(0.8.1或更高版本支持)从zookeeper读取的偏移量,而V1(0.8.2或更高版本支持)从卡夫卡读偏移。
1.
2. OffsetFetchRequest => ConsumerGroup [TopicName
[Partition]]
3.
4.
5. ConsumerGroup => string
6.
7.
8. TopicName => string
9.
10.
11.
12.
偏移量获取响应(Offset Fetch Response)
Partition => int32
1.
2. OffsetFetchResponse => [TopicName [Partition Offset
Metadata ErrorCode]]
3.
4.
5. TopicName => string
6.
7.
8. Partition => int32
9.
10.
11.
12.
13.
14.
15.
Metadata => string
Offset => int64
16.
17.
18.
请注意,消费者组下一个主题的分区如果没有偏移量,broker不会设定一个错误码(因为它不是一个真正的错误),但会返回空的元数据并将偏移字段为-1。
可能的错误码(Possible Error Codes)
•
UNKNOWN_TOPIC_OR_PARTITION (3) <- 只在
ErrorCode => int16
v0版本的请求中出现
•
GROUP_LOAD_IN_PROGRESS (14)
•
NOT_COORDINATOR_FOR_GROUP (16)
•
ILLEGAL_GENERATION (22)
•
UNKNOWN_MEMBER_ID (25)
•
TOPIC_AUTHORIZATION_FAILED (29)
•
GROUP_AUTHORIZATION_FAILED (30)
组籍管理接口(Group Membership API)
这些请求用于客户端参加卡夫卡所管理的消费者组。从更高层次上看,集群中每个消费者组都会分配一个broker(及消费者组协调员),以简化消费者组管理。一旦得到了组协调员地址(使用上面的消费者组协调员请求),组成员可以加入该组,同步状态,然后用心跳消息保持在组中的活跃状态。当客户端关闭时,它会使用离开组请求从消费者组中注销。此协议的语义在Kafka客户端分配协议中有详细描述。
组建管理接口的主要使用场景是消费者组,但这些请求也尽量设
计得一般化以便支持其他应用场景(例如,Kafka Connect组)。这种设计的带来的代价就是是一些特定的组语义(group semantics)被推到了客户端实现。例如,下面定义的JoinGroup和SyncGroup请求无明确定义的字段以支持消费者组分区分配。相反,它们在其中包含有一些通用的字节数组(byte arrays),用这些字节数组就可以使得分区分配切入在消费者客户端实现。
但是,虽然这种实现允许每个客户端来实现来定义分区方案,但是Kafka工具的兼容性要求这些客户端使用Kafka客户端使用的标准方案。例如,这个应用程序会假定用这种格式来显示分区分配。因此,我们建议客户遵循相同的模式,使这些工具对所有客户端实现都可以正常工作。
加入组请求(Join Group Request)
加入组请求用于让客户端成为组的成员。当新成员加入一个现有组,之前加入大的所有的会员必须通过发送一个新加入组的要求来重新入组。当成员第一次加入该组,成员编号将是空的(即“”),但重新加入的成员都应该使用与之前生成的相同的会员ID。
1.
2. JoinGroupRequest
3.
4.
5. GroupId => string
6.
=> GroupId SessionTimeout
MemberId ProtocolType GroupProtocols
7.
8. SessionTimeout => int32
9.
10.
11. MemberId => string
12.
13.
14. ProtocolType => string
15.
16.
17. GroupProtocols =>
ProtocolMetadata]
18.
19.
20. ProtocolName => string
21.
[ProtocolName
22.
23.
24.
ProtocolType字段定义了该组实现的嵌入协议。组协调器确保该组中的所有成员都支持相同的协议类型。组中包含的协议(GroupProtocols)字段中的协议名称和元数据的含义取决于协议类型。请注意,加入群请求允许多协议/元数据对。这使得滚动升级时无需停机。协调器会选择所有成员支持的一种协议,升级后的成员既包括新版本和老版本的协议,一旦所有成员都升级,协调器将选择列在数组中最前面的组协议(GroupProtocol)。
消费者组: 下文我们定义了消费者组使用的嵌入协议。我们建议所有消费者客户端实现遵循这个格式,以便Kafka工具能够对所有的客户端正常工作
1.
2. ProtocolType => "consumer"
3.
4.
5.
ProtocolMetadata => bytes
6.
7.
8. ProtocolName => AssignmentStrategy
9.
10.
11.
12.
13.
14.
15.
16.
17.
UserData
18.
19.
AssignmentStrategy => string
ProtocolMetadata => Version
Subscription
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
Version => int16
Subscription => [Topic]
Topic => string
UserData => bytes
UserData域的可以用来自定义分配策略。例如,在一个粘性分区策略实现中,这个字段可以包含之前的分配。在基于资源的分配策略,也可以包括每个运行消费者主机上的CPU个数等信息。
Kafka Connect使用“connect”的协议类型,和协议细节也是基于Connect的内部实现。
加入组响应(Join Group Response)
接收到来自该组中的所有成员组的加入组请求后,协调器将选择
一个成员作为Leader,并且选择所有成员支持的协议。Leader将收到会员的完整列表与选择的协议相关的元数据。其他追随者成员,会收到一个空会员数组。Leader需要检查每个成员的元数据,并且使用下文中描述的SyncGroup请求来分配状态。
一旦加加入组阶段完成,协调器会增加该组的GenerationId,这个Id是发送给每个成员的响应中的一个域,同时也会在心跳和偏移量提交请求中。当协调器重新平衡(rebalance)了一个组,协调器将发送一个错误码,表示客户端成员需要重新加入组。如果重新平衡完成前成员未重入组(rejoin),那么它将有一个旧generationId,在新的请求使用这个旧Id时,这将导致ILLEGAL_GENERATION错误。
1.
2. JoinGroupResponse
3.
4.
5. ErrorCode => int16
6.
7.
8. GenerationId => int32
9.
=> ErrorCode GenerationId
GroupProtocol LeaderId MemberId Members
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
MemberId => string
Members => [MemberId MemberMetadata]
MemberId => string
LeaderId => string
GroupProtocol => string
本文发布于:2024-02-06 22:03:24,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170722820662557.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |