Flink源码系列(TaskExecutor向ResourceManager发起注册[flink内部,非yarn中rm])

阅读: 评论:0

Flink源码系列(TaskExecutor向ResourceManager发起注册[flink内部,非yarn中rm])

Flink源码系列(TaskExecutor向ResourceManager发起注册[flink内部,非yarn中rm])

上一期指路:

上一期​​​​​​​

承接上一期讲到YarnTaskExecutorRunner的main方法,我们继续往下分析。

1.YarnTaskExecutorRunner#main->YarnTaskExecutorRunner#runTaskManagerSecurely->TaskManagerRunner#runTaskManagerSecurely

	public static void runTaskManagerSecurely(Configuration configuration) throws Exception {replaceGracefulExitWithHaltIfConfigured(configuration);final PluginManager pluginManager = atePluginManagerFromRootFolder(configuration);FileSystem.initialize(configuration, pluginManager);SecurityUtils.install(new SecurityConfiguration(configuration));InstalledContext().runSecured(() -> {runTaskManager(configuration, pluginManager);return null;});}

2. TaskManagerRunner#runTaskManager

	public static void runTaskManager(Configuration configuration, PluginManager pluginManager) throws Exception {final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, pluginManager, TaskManagerRunner::createTaskExecutorService);taskManagerRunner.start();}

①new TaskManagerRunner

构建TaskManagerRunner

②taskManagerRunner.start()

启动

3.TaskManagerRunner#start->TaskExecutorToServiceAdapter#start->RpcEndpoint#start

rpcServer.start()

rpc服务启动。即发消息通知底层的 AkkaRpcActor 切换为 START 状态。那么直接看TaskExecutor的onStart方法

4.TaskExecutor#onStart->TaskExecutor#startTaskExecutorServices

	private void startTaskExecutorServices() throws Exception {try {// start by connecting to the ResourceManagerresourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());// tell the task slot table who's responsible for the task slot actionstaskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());// start the job leader servicejobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());fileCache = new TmpDirectories(), PermanentBlobService());} catch (Exception e) {handleStartTaskExecutorServicesException(e);}}

5.StandaloneLeaderRetrievalService#start->TaskExecutor的内部类ResourceManagerLeaderListener#notifyLeaderAddress->TaskExecutor#notifyOfNewResourceManagerLeader->TaskExecutor#reconnectToResourceManager->TaskExecutor#tryConnectToResourceManager->TaskExecutor#connectToResourceManager

	private void connectToResourceManager() {assert(resourceManagerAddress != null);assert(establishedResourceManagerConnection == null);assert(resourceManagerConnection == null);log.info("Connecting to ResourceManager {}.", resourceManagerAddress);final TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(getAddress(),getResourceID(),DataPort(),Port().orElse(-1),hardwareDescription,DefaultSlotResourceProfile(),TotalResourceProfile());resourceManagerConnection =new TaskExecutorToResourceManagerConnection(log,getRpcService(),RetryingRegistrationConfiguration(),Address(),ResourceManagerId(),getMainThreadExecutor(),new ResourceManagerRegistrationListener(),taskExecutorRegistration);resourceManagerConnection.start();}

6.RegisteredRpcConnection#start

	public void start() {checkState(!closed, "The RPC connection is already closed");checkState(!isConnected() && pendingRegistration == null, "The RPC connection is already started");final RetryingRegistration<F, G, S> newRegistration = createNewRegistration();if (REGISTRATION_UPDATERpareAndSet(this, null, newRegistration)) {newRegistration.startRegistration();} else {// concurrent start operationnewRegistration.cancel();}}

①createNewRegistration

创建一个te向rm发起注册

②startRegistration

启动这个注册

7.RetryingRegistration#startRegistration

	public void startRegistration() {if (canceled) {// we already got canceledreturn;}try {// trigger resolution of the target address to a callable gatewayfinal CompletableFuture<G> rpcGatewayFuture;if (FencedRpcGateway.class.isAssignableFrom(targetType)) {rpcGatewayFuture = (CompletableFuture<G>) t(targetAddress,fencingToken,targetType.asSubclass(FencedRpcGateway.class));} else {rpcGatewayFuture = t(targetAddress, targetType);}// upon success, start the registration attemptsCompletableFuture<Void> rpcGatewayAcceptFuture = rpcGatewayFuture.thenAcceptAsync((G rpcGateway) -> {log.info("Resolved {} address, beginning registration", targetName);register(rpcGateway, 1, InitialRegistrationTimeoutMillis());},Executor());// upon failure, retry, unless this is cancelledrpcGatewayAcceptFuture.whenCompleteAsync((Void v, Throwable failure) -> {if (failure != null && !canceled) {final Throwable strippedFailure = ExceptionUtils.stripCompletionException(failure);if (log.isDebugEnabled()) {log.debug("Could not resolve {} address {}, retrying in {} ms.",targetName,ErrorDelayMillis(),strippedFailure);} else {log.info("Could not resolve {} address {}, retrying in {} ms: {}",targetName,ErrorDelayMillis(),Message());}ErrorDelayMillis());}},Executor());}catch (Throwable t) {completionFuturepleteExceptionally(t);cancel();}}

①t

将目标地址解析为一个可调用的网关

②register

成功后,就开始尝试注册

8.RetryingRegistration#register->TaskExecutorToResourceManagerConnection的内部类ResourceManagerRegistration#invokeRegistration->ResourceManager#registerTaskExecutorInternal

	private RegistrationResponse registerTaskExecutorInternal(TaskExecutorGateway taskExecutorGateway,TaskExecutorRegistration taskExecutorRegistration) {ResourceID taskExecutorResourceId = ResourceId();WorkerRegistration<WorkerType> oldRegistration = ve(taskExecutorResourceId);if (oldRegistration != null) {// TODO :: suggest old taskExecutor to stop itselflog.debug("Replacing old registration of TaskExecutor {}.", StringWithMetadata());// remove old task manager registration from slot managerslotManager.InstanceID(),new ResourceManagerException(String.format("TaskExecutor %s re-connected to the ResourceManager.", StringWithMetadata())));}final WorkerType newWorker = workerStarted(taskExecutorResourceId);String taskExecutorAddress = TaskExecutorAddress();if (newWorker == null) {log.warn("Discard registration from TaskExecutor {} at ({}) because the framework did " +"not recognize it", StringWithMetadata(), taskExecutorAddress);return new RegistrationResponse.Decline("unrecognized TaskExecutor");} else {WorkerRegistration<WorkerType> registration = new WorkerRegistration<>(taskExecutorGateway,DataPort(),JmxPort(),HardwareDescription(),MemoryConfiguration());log.info("Registering TaskManager with ResourceID {} ({}) at ResourceManager", StringWithMetadata(), taskExecutorAddress);taskExecutors.put(taskExecutorResourceId, registration);itorTarget(taskExecutorResourceId, new HeartbeatTarget<Void>() {@Overridepublic void receiveHeartbeat(ResourceID resourceID, Void payload) {// the ResourceManager will always send heartbeat requests to the// TaskManager}@Overridepublic void requestHeartbeat(ResourceID resourceID, Void payload) {taskExecutorGateway.heartbeatFromResourceManager(resourceID);}});return new InstanceID(),resourceId,clusterInformation);}}

①getResourceId

获取te的资源id

②ve

移除之前注册的缓存信息

③slotManager.unregisterTaskManager

从slotManager中删除旧的taskManager注册

④workerStarted

当一个worker被启动时回调得到workerType

⑤getTaskExecutorAddress

获取te地址

⑥new WorkerRegistration<>

构建WorkerRegistration

⑦log.info

taskExecutors.put

打印日志在rm上注册tm

放入缓存中

⑧itorTarget

监控tm作为心跳目标

⑨new TaskExecutorRegistrationSuccess

创建并返回注册成功的信息

由于涉及到rpc调用,发送了注册成功的信息,那么就一定会回调TaskExecutor中的onRegistrationSuccess方法,剩下的我们下期分析。

总览

这一期涉及到的源码流程图如下:

我们下期见!

本文发布于:2024-02-01 15:44:01,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170677344337675.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:源码   系列   TaskExecutor   Flink   yarn
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23