消息拉起主要入口为:KafkaConsumer#poll方法,其声明如下:
public ConsumerRecords<K, V> poll(final Duration timeout) { // @1return poll(time.timer(timeout), true); // @2
}
代码@1:参数为超时时间,使用 java 的 Duration 来定义。 代码@2:调用内部的 poll 方法。
KafkaConsumer#poll
private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) { // @1acquireAndEnsureOpen(); // @2try {if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) { // @3throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");}// poll for new data until the timeout expiresdo {// @4client.maybeTriggerWakeup(); //@5if (includeMetadataInTimeout) { // @6 if (!updateAssignmentMetadataIfNeeded(timer)) {pty();}} else {while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) { log.warn("Still waiting for metadata");}}final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer); // @7if (!records.isEmpty()) { if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) { // @8client.pollNoWakeup();}return Consume(new ConsumerRecords<>(records)); // @9}} while (Expired()); pty();} finally {release();}
}
代码@1:首先先对其参数含义进行讲解。
代码@2:检查是否可以拉取消息,其主要判断依据如下:
代码@3:如果当前消费者未订阅任何主题或者没有指定队列,则抛出错误,结束本次消息拉取。
代码@4:使用 do while 结构循环拉取消息,直到超时或拉取到消息。
代码@5:避免在禁止禁用wakeup时,有请求想唤醒时则抛出异常,例如在下面的@8时,会禁用wakeup。
代码@6:更新相关元数据,为真正向 broker 发送消息拉取请求做好准备,该方法将在下面详细介绍,现在先简单介绍其核心实现点:
这里会有一个更新元数据是否占用消息拉取的超时时间,默认为 true。
代码@7:调用 pollForFetches 向broker拉取消息,该方法将在下文详细介绍。
代码@8:如果拉取到的消息集合不为空,再返回该批消息之前,如果还有挤压的拉取请求,可以继续发送拉取请求,但此时会禁用warkup,主要的目的是用户在处理消息时,KafkaConsumer 还可以继续向broker 拉取消息。
代码@9:执行消费拦截器。
接下来对上文提到的代码@6、@7进行详细介绍。
1.1 KafkaConsumer updateAssignmentMetadataIfNeeded 详解
KafkaConsumer#updateAssignmentMetadataIfNeeded
boolean updateAssignmentMetadataIfNeeded(final Timer timer) {if (coordinator != null && !coordinator.poll(timer)) { // @1return false;}return updateFetchPositions(timer); // @2
}
要理解这个方法实现的用途,我们就必须依次对 coordinator.poll 方法与 updateFetchPositions 方法。
1.1.1 ConsumerCoordinator#poll
public boolean poll(Timer timer) {invokeCompletedOffsetCommitCallbacks(); // @1if (subscriptions.partitionsAutoAssigned()) { // @2pollHeartbeat(timer.currentTimeMs()); // @21if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) { //@22return false;}if (rejoinNeededOrPending()) { // @23if (subscriptions.hasPatternSubscription()) { // @231if (adata.timeToAllowUpdate(time.milliseconds()) == 0) { questUpdate();}if (!sureFreshMetadata(timer)) { return false;}}if (!ensureActiveGroup(timer)) { // @232return false;}}} else { // @3if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) {client.awaitMetadataUpdate(timer);}}maybeAutoCommitOffsetsAsync(timer.currentTimeMs()); // @4return true;
}
代码@1:执行已完成的 offset (消费进度)提交请求的回调函数。
代码@2:队列负载算法为自动分配(即 Kafka 根据消费者个数与分区书动态负载分区)的相关的处理逻辑。其实现关键点如下:
代码@3:用户手动为消费组指定负载的队列的相关处理逻辑,其实现关键如下:
代码@4:如果开启了自动提交消费进度,并且已到下一次提交时间,则提交。Kafka 消费者可以通过设置属性 enable.automit 来开启自动提交,该参数默认为 true,则默认会每隔 5s 提交一次消费进度,提交间隔可以通过参数 automit.interval.ms 设置。
接下来继续探讨 updateAssignmentMetadataIfNeeded (更新元数据)的第二个步骤,更新拉取位移。
1.1.2 updateFetchPositions 详解
KafkaConsumer#updateFetchPositions
private boolean updateFetchPositions(final Timer timer) {cachedSubscriptionHashAllFetchPositions = subscriptions.hasAllFetchPositions(); if (cachedSubscriptionHashAllFetchPositions) { // @1return true;}if (coordinator != null && !freshCommittedOffsetsIfNeeded(timer)) // @2return setMissingPositions(); // setOffsetsIfNeeded(); // @4return true;
}
代码@1:如果订阅关系中的所有分区都有有效的位移,则返回 true。
代码@2:如果存在任意一个分区没有有效的位移信息,则需要向 broker 发送请求,从broker 获取该消费组,该分区的消费进度。相关的实现细节将在后续文章【Kafka 消费进度】专题文章中详细介绍。
代码@3:如果经过第二步,订阅关系中还某些分区还是没有获取到有效的偏移量,则使用偏移量重置策略进行重置,如果未配置,则抛出异常。
代码@4:发送一个异步请求去重置那些正等待重置位置的分区。有关 Kafka 消费消费进度、重平衡等知识将会在后续文章中深入探讨,本文只需了解 poll 消息的核心处理流程。
从 KafkaConsumer#poll 中流程可以看到,通过 updateAssignmentMetadataIfNeeded 对元数据、重平衡,更新拉取偏移量等工作处理完成后,下一步就是需要向 broker 拉取消息了,其实现入口为:KafkaConsumer 的 pollForFetches 方法。
1.2 消息拉取
KafkaConsumer#pollForFetches
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {long pollTimeout = coordinator == null ? ainingMs() :Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), ainingMs()); // @1// if data is available already, return it immediatelyfinal Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords(); // @2if (!records.isEmpty()) {return records;}fetcher.sendFetches(); // @3// We do not want to be stuck blocking in poll if we are missing some positions// since the offset lookup may be backing off after a failure// NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call// updateAssignmentMetadataIfNeeded before this method.if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) { // @4pollTimeout = retryBackoffMs;}Timer pollTimer = time.timer(pollTimeout);client.poll(pollTimer, () -> {return !fetcher.hasCompletedFetches();}); // @5timer.update(pollTimer.currentTimeMs()); // @6if (coordinator != null && joinNeededOrPending()) { // @ptyMap();}return fetcher.fetchedRecords(); // @8}
代码@1:计算本次拉取的超时时间,其计算逻辑如下:
代码@2:如果数据已经拉回到本地,直接返回数据。将在下文详细介绍 Fetcher 的 fetchedRecords 方法。
代码@3:组装发送请求,并将存储在待发送请求列表中。
代码@4:如果已缓存的分区信息中存在某些分区缺少偏移量,如果拉取的超时时间大于失败重试需要阻塞的时间,则更新此次拉取的超时时间为失败重试需要的间隔时间,主要的目的是不希望在 poll 过程中被阻塞【后续会详细介绍 Kafka 拉取消息的线程模型,再来回顾一下这里】。
代码@5:通过调用NetworkClient 的 poll 方法发起消息拉取操作(触发网络读写)。
代码@6:更新本次拉取的时间。
代码@7:检查是需要重平衡。
代码@8:将从 broker 读取到的数据返回(即封装成消息)。
从上面消息拉取流程来看,有几个比较重要的方法,例如 Fetcher 类相关的方法,NetworkClient 的 poll 方法,那我们接下来来重点探讨。
我们先用一张流程图总结一下消息拉取的全过程:
接下来我们将重点看一下 KafkaConsumer 的 pollForFetches 详细过程,也就是需要详细探究 Fetcher 类的实现细节。
Fetcher 封装消息拉取的方法,可以看成是消息拉取的门面类。
2.1 类图
我们首先一一介绍一下 Fetcher 的核心属性与核心方法。
接下来我们将按照消息流程,一起来看一下 Fetcher 的核心方法。
2.2 Fetcher 核心方法
2.2.1 Fetcher#fetchedRecords
Fetcher#fetchedRecords
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>(); // @1int recordsRemaining = maxPollRecords; try {while (recordsRemaining > 0) { // @2if (nextInLineRecords == null || nextInLineRecords.isFetched) { // @3CompletedFetch completedFetch = completedFetches.peek();if (completedFetch == null) break;try {nextInLineRecords = parseCompletedFetch(completedFetch);} catch (Exception e) {FetchResponse.PartitionData partition = completedFetch.partitionData;if (fetched.isEmpty() && (ds == null || ds.sizeInBytes() == 0)) {completedFetches.poll();}throw e;}completedFetches.poll();} else { // @4List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining);TopicPartition partition = nextInLineRecords.partition;if (!records.isEmpty()) {List<ConsumerRecord<K, V>> currentRecords = (partition);if (currentRecords == null) {fetched.put(partition, records);} else {List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());newRecords.addAll(currentRecords);newRecords.addAll(records);fetched.put(partition, newRecords);}recordsRemaining -= records.size();}}}} catch (KafkaException e) {if (fetched.isEmpty())throw e;}return fetched;
}
代码@1:首先先解释两个局部变量的含义:
代码@2:循环去取已经完成了 Fetch 请求的消息,该 while 循环有两个跳出条件:
代码@3、@4 主要完成从缓存中解析数据的两个步骤,初次运行的时候,会进入分支@3,然后从 调用 parseCompletedFetch 解析成 PartitionRecords 对象,然后代码@4的职责就是从解析 PartitionRecords ,将消息封装成 ConsumerRecord,返回给消费端线程处理。
代码@3的实现要点如下:
从上面可知,上述方法的核心方法是:parseCompletedFetch。
代码@4的实现要点无非就是调用 fetchRecords 方法,按分区组装成 Map<TopicPartition, List<ConsumerRecord<K, V>>>,供消费者处理,例如供业务处理。
接下来将重点探讨上述两个方法的实现细节。
2.2.1.1 Fetcher#parseCompletedFetch
在尝试探讨该方法之前,我们首先对其入参进行一个梳理,特别是先认识其主要数据结构。
1、CompletedFetch 相关类图
从上图可以看出,CompleteFetch 核心属性主要如下:
分区的数据是使用 PartitionData 来进行封装的。我们也来简单的了解一下其内部数据结果。
2、parseCompletedFetch 详解
private PartitionRecords parseCompletedFetch(CompletedFetch completedFetch) {TopicPartition tp = completedFetch.partition;FetchResponse.PartitionData<Records> partition = completedFetch.partitionData;long fetchOffset = completedFetch.fetchedOffset;PartitionRecords partitionRecords = null;Errors error = ;try {if (!subscriptions.isFetchable(tp)) { // @1log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", tp);} else if (error == Errors.NONE) { // @2Long position = subscriptions.position(tp);if (position == null || position != fetchOffset) { // @21log.debug("Discarding stale fetch response for partition {} since its offset {} does not match " +"the expected offset {}", tp, fetchOffset, position);return null;}ace("Preparing to read {} bytes of data for partition {} with offset {}",ds.sizeInBytes(), tp, position);Iterator<? extends RecordBatch> batches = ds.batches().iterator(); // @22partitionRecords = new PartitionRecords(tp, completedFetch, batches);if (!batches.hasNext() && ds.sizeInBytes() > 0) { // @23if (sponseVersion < 3) {Map<TopicPartition, Long> recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset);throw new RecordTooLargeException("There are some messages at [Partition=Offset]: " +recordTooLargePartitions + " whose size is larger than the fetch size " + this.fetchSize +" and hence cannot be returned. Please considering upgrading your broker to 0.10.1.0 or " +"newer to avoid this issue. Alternately, increase the fetch size on the client (using " +ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG + ")",recordTooLargePartitions);} else {// This should not happen with brokers that support FetchRequest/Response V3 or higher (i.e. KIP-74)throw new KafkaException("Failed to make progress reading messages at " + tp + "=" +fetchOffset + ". Received a non-empty fetch response from the server, but no " +"complete records were found.");}}if (partition.highWatermark >= 0) { // ace("Updating high watermark for partition {} to {}", tp, partition.highWatermark);subscriptions.updateHighWatermark(tp, partition.highWatermark);}if (partition.logStartOffset >= 0) { // ace("Updating log start offset for partition {} to {}", tp, partition.logStartOffset);subscriptions.updateLogStartOffset(tp, partition.logStartOffset);}if (partition.lastStableOffset >= 0) { // ace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset);subscriptions.updateLastStableOffset(tp, partition.lastStableOffset);}} else if (error == Errors.NOT_LEADER_FOR_PARTITION ||error == Errors.REPLICA_NOT_AVAILABLE ||error == Errors.KAFKA_STORAGE_ERROR) { // @3log.debug("Error in fetch for partition {}: {}", tp, ptionName());questUpdate();} else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { // @4log.warn("Received unknown topic or partition error in fetch for partition {}", tp);questUpdate();} else if (error == Errors.OFFSET_OUT_OF_RANGE) { // @5if (fetchOffset != subscriptions.position(tp)) {log.debug("Discarding stale fetch response for partition {} since the fetched offset {} " +"does not match the current offset {}", tp, fetchOffset, subscriptions.position(tp));} else if (subscriptions.hasDefaultOffsetResetPolicy()) {log.info("Fetch offset {} is out of range for partition {}, resetting offset", fetchOffset, tp);questOffsetReset(tp);} else {throw new OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset));}} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { // @6log.warn("Not authorized to read from topic {}.", tp.topic());throw new TopicAuthorizationException(Collections.pic()));} else if (error == Errors.UNKNOWN_SERVER_ERROR) { log.warn("Unknown error fetching data for topic-partition {}", tp);} else {throw new IllegalStateException("Unexpected error code " + de() + " while fetching data");}} finally { // @7if (partitionRecords == d(tp, 0, 0);if (error != Errors.NONE)// we move the partition to the end if there was an error. This way, it's more likely that partitions for// the same topic can remain together (allowing for more efficient serialization).vePartitionToEnd(tp);}return partitionRecords;
}
上面的代码虽然比较长,其实整体还是比较简单,只是需要针对各种异常处理,打印对应的日志,接下来详细介绍该方法的实现关键点。
代码@1:判断该分区是否可拉取,如果不可拉取,则忽略这批拉取的消息,判断是可拉取的要点如下:
代码@2:该分支是处理正常返回的相关逻辑。其关键点如下:
从代码@3到@8 是多种异常信息的处理。 代码@3:如果出现如下3种错误码,则使用 debug 打印错误日志,并且向服务端请求元数据并更新本地缓存。
Kafka 认为上述错误是可恢复的,而且对消费不会造成太大影响,故只是用 debug 打印日志,然后更新本地缓存即可。
代码@4:如果出现 UNKNOWN_TOPIC_OR_PARTITION 未知主题与分区时,则使用 warn 级别输出错误日志,并更新元数据。
代码@5:针对 OFFSET_OUT_OF_RANGE 偏移量超过范围异常的处理逻辑,其实现关键点如下:
代码@6:如果是 TOPIC_AUTHORIZATION_FAILED 没有权限(ACL)则抛出异常。
代码@7:如果本次拉取的结果不是NONE(成功),并且是可恢复的,将该队列的订阅关系移动到消费者缓存列表的末尾。如果成功,则返回拉取到的分区数据,其封装对象为 PartitionRecords。
接下来我们再来看看 2.1.1 fetchedRecords 中的另外一个核心方法。
2.2.1.2 fetchRecords()
在介绍该方法之前同样先来看一下参数 PartitionRecords 的内部结构。
1、PartitionRecords 类图
主要的核心属性如下:
2、fetchRecords 详解
private List<ConsumerRecord<K, V>> fetchRecords(PartitionRecords partitionRecords, int maxRecords) {if (!subscriptions.isAssigned(partitionRecords.partition)) { // @1// this can happen when a rebalance happened before fetched records are returned to the consumer's poll calllog.debug("Not returning fetched records for partition {} since it is no longer assigned",partitionRecords.partition);} else if (!subscriptions.isFetchable(partitionRecords.partition)) { // @2// this can happen when a partition is paused before fetched records are returned to the consumer's// poll call or if the offset is being resetlog.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable",partitionRecords.partition);} else {long position = subscriptions.position(partitionRecords.partition); // @3if (FetchOffset == position) { // @4List<ConsumerRecord<K, V>> partRecords = partitionRecords.fetchRecords(maxRecords);long nextOffset = ace("Returning fetched records at offset {} for assigned partition {} and update " +"position to {}", position, partitionRecords.partition, nextOffset);subscriptions.position(partitionRecords.partition, nextOffset);Long partitionLag = subscriptions.partitionLag(partitionRecords.partition, isolationLevel); if (partitionLag != null)dPartitionLag(partitionRecords.partition, partitionLag);Long lead = subscriptions.partitionLead(partitionRecords.partition);if (lead != null) {dPartitionLead(partitionRecords.partition, lead);}return partRecords;} else { // @5// these records aren't next in line based on the last consumed position, ignore them// they must be from an obsolete requestlog.debug("Ignoring fetched records for {} at offset {} since the current position is {}",partitionRecords.partition, FetchOffset, position);}}partitionRecords.drain();return emptyList();
}
代码@1:从 PartitionRecords 中提取消息之前,再次判断订阅消息中是否包含当前分区,如果不包含,则使用 debug 打印日志,很有可能是发生了重平衡。
代码@2:是否允许拉取,如果用户主动暂停消费,则忽略本次拉取的消息。备注:Kafka 消费端如果消费太快,可以进行限流。
代码@3:从本地消费者缓存中获取该队列已消费的偏移量,在发送拉取消息时,就是从该偏移量开始拉取的。
代码@4:如果本地缓存已消费偏移量与从服务端拉回的起始偏移量相等的话,则认为是一个有效拉取,否则则认为是一个过期的拉取,该批消息已被消费,见代码@5。如果是一个有效请求,则使用 sensors 收集统计信息,并返回拉取到的消息, 返回结果被封装在 List<ConsumerRecord<K, V>> 。
2.2.2 sendFetches
“发送” fetch 请求,注意这里并不会触发网络操作,而是组装拉取请求,将其放入网络缓存区。
Fetcher#sendFetches
public synchronized int sendFetches() {Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests(); // @1for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : Set()) { // @2final Node fetchTarget = Key();final FetchSessionHandler.FetchRequestData data = Value();final FetchRequest.Builder request = FetchRequest.Builder.forConsumer(this.maxWaitMs, this.minBytes, Send()).isolationLevel(isolationLevel).setMaxBytes(this.maxBytes).adata()).Forget()); // @3client.send(fetchTarget, request) // @4.addListener(new RequestFutureListener<ClientResponse>() {@Overridepublic void onSuccess(ClientResponse resp) { // @5synchronized (Fetcher.this) {@SuppressWarnings("unchecked")FetchResponse<Records> response = (FetchResponse<Records>) sponseBody();FetchSessionHandler handler = sessionHandler(fetchTarget.id());if (handler == null) {("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.",fetchTarget.id());return;}if (!handler.handleResponse(response)) {return;}Set<TopicPartition> partitions = new HashSet<>(sponseData().keySet());FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);for (Map.Entry<TopicPartition, FetchResponse.PartitionData<Records>> entry : sponseData().entrySet()) {TopicPartition partition = Key();long fetchOffset = data.sessionPartitions().get(partition).fetchOffset;FetchResponse.PartitionData<Records> fetchData = Value();completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, questHeader().apiVersion()));} // @questLatencyMs());}}public void onFailure(RuntimeException e) { // @7synchronized (Fetcher.this) {FetchSessionHandler handler = sessionHandler(fetchTarget.id());if (handler != null) {handler.handleError(e);}}}});}return fetchRequestMap.size();
}
~~~java
上面的方法比较长,其实现的关键点如下:
代码@1:通过调用 Fetcher 的 prepareFetchRequests 方法按节点组装拉取请求,将在后面详细介绍。代码@2:遍历上面的待发请求,进一步组装请求。下面就是分节点发送拉取请求。代码@3:构建 FetchRequest 拉取请求对象。代码@4:调用 NetworkClient 的 send 方法将其发送到发送缓存区,本文不会详细介绍网络方面的实现,但下文会截图说明拉取请求发送缓存区的一个关键点。代码@5:这里会注册事件监听器,当消息从 broker 拉取到本地后触发回调,即消息拉取请求收到返回结果后会将返回结果放入到completedFetches 中(代码@6),这就和上文消息拉取时 Fetcher 的 fetchedRecords 方法形成闭环。
代码@7:消息拉取一次处理。接下来详细介绍 prepareFetchRequests 方法。###### 2.2.2.1 Fetcher prepareFetchRequests 方法详解
~~~java
private Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() {Map<Node, FetchSessionHandler.Builder> fetchable = new LinkedHashMap<>(); for (TopicPartition partition : fetchablePartitions()) { // @1Node node = metadata.partitionInfoIfCurrent(partition).map(PartitionInfo::leader).orElse(null); // @2if (node == null) { // questUpdate();} else if (client.isUnavailable(node)) { // @4client.maybeThrowAuthFailure(node);ace("Skipping fetch for partition {} because node {} is awaiting reconnect backoff", partition, node);} else if (client.hasPendingRequests(node)) { // ace("Skipping fetch for partition {} because there is an in-flight request to {}", partition, node);} else {// if there is a leader and no in-flight requests, issue a new fetchFetchSessionHandler.Builder builder = (node); // @7if (builder == null) {FetchSessionHandler handler = sessionHandler(node.id());if (handler == null) {handler = new FetchSessionHandler(logContext, node.id());sessionHandlers.put(node.id(), handler);}builder = wBuilder();fetchable.put(node, builder);}long position = this.subscriptions.position(partition);builder.add(partition, new FetchRequest.PartitionData(position, FetchRequest.INVALID_LOG_START_OFFSET,this.fetchSize, pty()));log.debug("Added {} fetch request for partition {} at offset {} to node {}", isolationLevel,partition, position, node);}}Map<Node, FetchSessionHandler.FetchRequestData> reqs = new LinkedHashMap<>(); for (Map.Entry<Node, FetchSessionHandler.Builder> entry : Set()) {reqs.Key(), Value().build());}return reqs;
}
代码@1:首先通过调用 fetchablePartitions() 获取可发起拉取任务的分区信息,下文简单介绍一下。
代码@2:如果该分区在客户端本地缓存中获取该分区的 Leader 节点信息。
代码@3:如果其 Leader 节点信息为空,则发起更新元数据请求,本次拉取任务将不会包含该分区。
代码@4:如果客户端与该分区的 Leader 连接为完成,如果是因为权限的原因则抛出ACL相关异常,否则打印日志,本次拉取请求不会包含该分区。
代码@5:判断该节点是否有挂起的拉取请求,即发送缓存区中是待发送的请求,如果有,本次将不会被拉取。
代码@6:构建拉取请求,分节点组织请求。
2.2.2.2 NetworkClient send 方法关键点
NetworkClient 的 send 方法只是将其放入 unsent 中。
与上文的 client.hasPendingRequests(node) 方法遥相呼应。
3、总结 上面的源码分析有点长,也有点枯燥,我们还是画一张流程图来进行总结。
本文发布于:2024-01-31 14:52:57,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170668397929313.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |