Spark2.x源码阅读之SparkContext

阅读: 评论:0

Spark2.x源码阅读之SparkContext

Spark2.x源码阅读之SparkContext

本文主要介绍SparkContext中的主构造方法里面的内容,即初始化过程,其中调用的方法留到后面讲解。

  try {_conf = config.clone()//拷贝配置文件_conf.validateSettings()//验证配置文件是否有错if (!_ains("spark.master")) {//没有设置Master则报错throw new SparkException("A master URL must be set in your configuration")}if (!_ains("spark.app.name")) {//没有设置APPName则报错??没有默认?throw new SparkException("An application name must be set in your configuration")}_driverLogger = DriverLogger(_conf)//用于Driver端的日志输出setupDriverResources()//检查Driver启动的资源// log out spark.app.name in the Spark driver logslogInfo(s"Submitted application: $appName")// System property spark.yarn.app.id must be set if user code ran by AM on a YARN clusterif (master == "yarn" && deployMode == "cluster" && !_ains("spark.yarn.app.id")) {throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. " +"Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")}if (_Boolean("spark.logConf", false)) {logInfo("Spark configuration:n" + _DebugString)}// Set Spark driver host and port system properties. This explicitly sets the configuration// instead of relying on the default value of the config constant._conf.set(DRIVER_HOST_ADDRESS, _(DRIVER_HOST_ADDRESS))//设置Driver端的ip和端口号_conf.setIfMissing(DRIVER_PORT, 0)_conf.set(EXECUTOR_ID, SparkContext.DRIVER_IDENTIFIER)_jars = UserJars(_conf)_files = _Option(FILES.key).map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten_eventLogDir =if (isEventLogEnabled) {val unresolvedDir = (EVENT_LOG_DIR).stripSuffix("/")solveURI(unresolvedDir))} else {None}_eventLogCodec = {val compress = _(EVENT_LOG_COMPRESS)if (compress && isEventLogEnabled) {CodecName(_conf)).ShortName)} else {None}}_listenerBus = new LiveListenerBus(_conf)//事件总线// Initialize the app status store and listener before SparkEnv is created so that it gets// all events.val appStatusSource = ateSource(conf)_statusStore = ateLiveStore(conf, appStatusSource)listenerBus.addToStatusQueue(_)// Create the Spark execution environment (cache, map output tracker, etc)_env = createSparkEnv(_conf, isLocal, listenerBus)//创建Spark环境SparkEnv.set(_env)// If running the REPL, register the repl's output dir with the file server._Option(&#pl.class.outputDir").foreach { path =>val replUri = _env.rpcEnv.fileServer.addDirectory("/classes", new File(path))_conf.set(&#pl.class.uri", replUri)}_statusTracker = new SparkStatusTracker(this, _statusStore)//创建Spark状态跟踪器_progressBar =if (_(UI_SHOW_CONSOLE_PROGRESS)) {Some(new ConsoleProgressBar(this))} else {None}_ui =if ((UI_ENABLED)) {ate(Some(this), _statusStore, _conf, _env.securityManager, appName, "",startTime))} else {// For tests, do not enable the UINone}// Bind the UI before starting the task scheduler to communicate// the bound port to the cluster manager properly_ui.foreach(_.bind())_hadoopConfiguration = wConfiguration(_conf)//hadoop的相关配置// Performance optimization: this dummy call to .size() triggers eager evaluation of// Configuration's internal  `properties` field, guaranteeing that it will be computed and// cached wHadoopConf() uses `sc.hadoopConfiguration` to create// a new per-session Configuration. If `properties` has not been computed by that time// then each newly-created Configuration will perform its own expensive IO and XML// parsing to load configuration defaults and populate its own properties. By ensuring// that we've pre-computed the parent's properties, the child Configuration will simply// clone the parent's properties._hadoopConfiguration.size()//???// Add each JAR given through the constructorif (jars != null) {//调用addJar将jar包添加到Driver端jars.foreach(addJar)}if (files != null) {//调用addFile将文件添加到Driver端files.foreach(addFile)}_executorMemory = _Option(EXECUTOR_MEMORY.key)//获取Exector的内存大小 Int;类型.orElse(v("SPARK_EXECUTOR_MEMORY"))).orElse(v("SPARK_MEM")).map(warnSparkMem)).StringToMb)//(JavaUtils.byteStringAsBytes(str) / 1024 / 1024).toInt 计算内存大小.getOrElse(1024)//默认// Convert java options to env vars as a work around// since we can't set env vars directly in sbt.for { (envKey, propKey) <- Seq(("SPARK_TESTING", IS_TESTING.key))value <- v(envKey)).orElse(Property(propKey)))} {executorEnvs(envKey) = value}v("SPARK_PREPEND_CLASSES")).foreach { v =>executorEnvs("SPARK_PREPEND_CLASSES") = v}// The Mesos scheduler backend relies on this environment variable to set executor memory.// TODO: Set this only in the utorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"//使用map存储内存大小 单位MexecutorEnvs ++= _ExecutorEnvexecutorEnvs("SPARK_USER") = sparkUser// We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will// retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)_heartbeatReceiver = env.rpcEnv.setupEndpoint(//启动心跳接收器HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))// Create and start the schedulerval (sched, ts) = ateTaskScheduler(this, master, deployMode)//创建任务调度器_schedulerBackend = sched_taskScheduler = ts_dagScheduler = new DAGScheduler(this)_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)//使用心跳接收器发送任务调度集// create and start the heartbeater for collecting memory metrics_heartbeater = new Manager,() => portHeartBeat(),"driver-heartbeater",(EXECUTOR_HEARTBEAT_INTERVAL))_heartbeater.start()// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's// constructor_taskScheduler.start()_applicationId = _taskScheduler.applicationId()_applicationAttemptId = _taskScheduler.applicationAttemptId()_conf.set("spark.app.id", _applicationId)if (_(UI_REVERSE_PROXY)) {System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)}_ui.foreach(_.setAppId(_applicationId))_env.blockManager.initialize(_applicationId)// The metrics system for Driver need to be set spark.app.id to app ID.// So it should start after we get app ID from the task scheduler and set spark.app.id._icsSystem.start()//度量系统启动// Attach the driver metrics servlet handler to the web ui after the metrics system is started._ServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))_eventLogger =if (isEventLogEnabled) {val logger =new EventLoggingListener(_applicationId, _applicationAttemptId, _,_conf, _hadoopConfiguration)logger.start()listenerBus.addToEventLogQueue(logger)Some(logger)} else {None}// Optionally scale number of executors dynamically based on workload. Exposed for testing.val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)_executorAllocationManager =if (dynamicAllocationEnabled) {schedulerBackend match {case b: ExecutorAllocationClient =>Some(new ExecutorAllocationManager(schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf))case _ =>None}} else {None}_executorAllocationManager.foreach(_.start())//启动Exector动态内存代理_cleaner =if (_(CLEANER_REFERENCE_TRACKING)) {Some(new ContextCleaner(this))} else {None}_cleaner.foreach(_.start())//启动ContextCleaner,用于清楚超出范围,无用的RDD等setupAndStartListenerBus()//启动事件总线postEnvironmentUpdate()//更新Spark环境postApplicationStart()//启动应用程序// Post init_taskScheduler.postStartHook()_isterSource(_icsSource)_isterSource(new BlockManagerSource(_env.blockManager))_isterSource(new JVMCPUSource())_executorAllocationManager.foreach { e =>_utorAllocationManagerSource)}appStatusSource.foreach(_isterSource(_))// Make sure the context is stopped if the user forgets about it. This avoids leaving// unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM// is killed, though.logDebug("Adding shutdown hook") // force eager creation of logger_shutdownHookRef = ShutdownHookManager.addShutdownHook(ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>logInfo("Invoking stop() from shutdown hook")try {stop()} catch {case e: Throwable =>logWarning("Ignoring Exception while stopping SparkContext from shutdown hook", e)}}} catch {case NonFatal(e) =>logError("Error initializing SparkContext.", e)try {stop()} catch {case NonFatal(inner) =>logError("Error stopping SparkContext after init error.", inner)} finally {throw e}}

 

本文发布于:2024-01-29 20:18:37,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170653072118021.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:源码   SparkContext
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23