spark-submit
首先查看一下spark-submit的位置,可以看出/usr/bin/spark-submit是个软连接:
[root@master ~]# which spark-submit/usr/bin/spark-submit[root@master ~]# ll /usr/bin/spark-submitlrwxrwxrwx 1 root root 30 2月 6 10:50 /usr/bin/spark-submit -> /etc/alternatives/spark-submit[root@master ~]# vi /usr/bin/spark-submit
下面是spark-submit脚本的内容:
#!/bin/bash # Reference: http://stackoverflow.com/questions/59895/can-a-bash-script-tell-what-directory-its-stored-in SOURCE=\"大众${BASH_SOURCE[0]}\"大众 BIN_DIR=\公众$( dirname \"大众$SOURCE\"大众 )\"大众 while [ -h \"大众$SOURCE\公众 ] ##为了打消软连接找到真正的spark-submit脚本的位置,-h 表示判断一个文件存在并且是一个软链接。 do SOURCE=\"大众$(readlink \公众$SOURCE\公众)\"大众 ##读取这个文件的软连接,如果不是软连接则返回空 [[ $SOURCE != / ]] && SOURCE=\"大众$DIR/$SOURCE\公众 BIN_DIR=\"大众$( cd -P \公众$( dirname \"大众$SOURCE\"大众 )\"大众 && pwd )\"大众 done BIN_DIR=\"大众$( cd -P \"大众$( dirname \"大众$SOURCE\公众 )\"大众 && pwd )\"大众 LIB_DIR=$BIN_DIR/../lib export HADOOP_HOME=$LIB_DIR/hadoop# Autodetect JAVA_HOME if not defined. $LIB_DIR/bigtop-utils/bigtop-detect-javahome##此时 $LIB_DIR =/opt/cloudera/parcels/CDH-5.6.0-1.cdh5.6.0.p0.45/lib/spark/bin/spark-submitexec $LIB_DIR/spark/bin/spark-submit \"大众$@\"大众
上述脚本的紧张功能是:找到spark提交的真正脚本的所在位置,在CDH版本中该位置在/opt/cloudera/parcels/CDH-5.6.0-1.cdh5.6.0.p0.45/lib/spark/bin/spark-submit,另一种查找spark-submit的位置方法是利用find / -name spark-submit命令。
真正的spark-submit脚本内容是:
#!/usr/bin/env bashSPARK_HOME=\"大众$(cd \公众`dirname \"大众$0\公众`\公众/..; pwd)\"大众# disable randomized hash for string in Python 3.3+export PYTHONHASHSEED=0##spark-submit调用了spark-class##传入的类是org.apache.spark.deploy.SparkSubmit##以及它传入的参数,如deploy mode、executor-memory等## 此时的$SPARK_HOME=/opt/cloudera/parcels/CDH-5.6.0-1.cdh5.6.0.p0.45/exec \"大众$SPARK_HOME\"大众/bin/spark-class org.apache.spark.deploy.SparkSubmit \"大众$@\公众
可以看出该脚本又调用了spark-class脚本,通报过去的第一个参数是org.apache.spark.deploy.SparkSubmit,$@是实行/usr/bin/spark-submit脚本时传入的参数,同样传给了spark-class。
接着来看spark-class脚本,spark-class脚本会加载spark配置的环境变量信息、定位依赖包spark-assembly-1.5.0-cdh5.6.0-hadoop2.6.0-cdh5.6.0.jar文件(以cdh2.6.0-spark1.5.0为例)等,然后再调用org.apache.spark.launcher.Main正式启动Spark运用程序的运行,spark-class脚本详细内容:
#!/usr/bin/env bash#定位SAPRK_HOME目录,SPARK_HOME=/opt/cloudera/parcels/CDH-5.6.0-1.cdh5.6.0.p0.45/lib/sparkexport SPARK_HOME=\"大众$(cd \"大众`dirname \公众$0\"大众`\公众/..; pwd)\公众#加载load-spark-env.sh,运行环境干系信息#例如配置文件conf下的spark-env.sh等. \公众$SPARK_HOME\公众/bin/load-spark-env.sh# 定位JAVA_HOME目录,并赋值给RUNNER变量if [ -n \"大众${JAVA_HOME}\"大众 ]; then RUNNER=\"大众${JAVA_HOME}/bin/java\公众else if [ `command -v java` ]; then RUNNER=\"大众java\"大众 else echo \公众JAVA_HOME is not set\公众 >&2 exit 1 fifi#定位spark-assembly-1.5.0-cdh5.6.0-hadoop2.6.0-cdh5.6.0.jar文件(以cdh2.6.0-spark1.5.0为例)#这意味着任务提交时无需将该JAR文件打包\公众/opt/cloudera/parcels/CDH-5.6.0-1.cdh5.6.0.p0.45/lib/spark/bin/spark-class\公众 77L, 2654Celse ASSEMBLY_DIR=\"大众$SPARK_HOME/assembly/target/scala-$SPARK_SCALA_VERSION\"大众finum_jars=\公众$(ls -1 \"大众$ASSEMBLY_DIR\"大众 | grep \"大众^spark-assembly.hadoop.\.jar$\"大众 | wc -l)\"大众if [ \公众$num_jars\公众 -eq \公众0\公众 -a -z \公众$SPARK_ASSEMBLY_JAR\"大众 ]; then echo \公众Failed to find Spark assembly in $ASSEMBLY_DIR.\公众 1>&2 echo \"大众You need to build Spark before running this program.\公众 1>&2 exit 1fiASSEMBLY_JARS=\公众$(ls -1 \"大众$ASSEMBLY_DIR\"大众 | grep \公众^spark-assembly.hadoop.\.jar$\公众 || true)\"大众if [ \"大众$num_jars\"大众 -gt \"大众1\"大众 ]; then echo \"大众Found multiple Spark assembly jars in $ASSEMBLY_DIR:\"大众 1>&2 echo \公众$ASSEMBLY_JARS\"大众 1>&2 echo \"大众Please remove all but one jar.\"大众 1>&2 exit 1fiSPARK_ASSEMBLY_JAR=\"大众${ASSEMBLY_DIR}/${ASSEMBLY_JARS}\"大众LAUNCH_CLASSPATH=\公众$SPARK_ASSEMBLY_JAR\"大众# 把launcher加到类路径中if [ -n \"大众$SPARK_PREPEND_CLASSES\公众 ]; then LAUNCH_CLASSPATH=\公众$SPARK_HOME/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH\"大众fiexport _SPARK_ASSEMBLY=\"大众$SPARK_ASSEMBLY_JAR\"大众# The launcher library will print arguments separated by a NULL character, to allow arguments with# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating# an array that will be used to exec the final command.##实行org.apache.spark.launcher.Main作为Spark运用程序的主入口CMD=()while IFS= read -d '' -r ARG; do CMD+=(\"大众$ARG\"大众)done < <(\"大众$RUNNER\"大众 -cp \"大众$LAUNCH_CLASSPATH\"大众 org.apache.spark.launcher.Main \公众$@\"大众)exec \"大众${CMD[@]}\公众
最关键的便是下面这句了:
CMD=()while IFS= read -d '' -r ARG; do CMD+=(\公众$ARG\公众)done < <(\公众$RUNNER\"大众 -cp \"大众$LAUNCH_CLASSPATH\"大众 org.apache.spark.launcher.Main \公众$@\"大众)exec \公众${CMD[@]}\"大众
首先实行了\"大众$RUNNER\"大众 -cp \公众$LAUNCH_CLASSPATH\公众 org.apache.spark.launcher.Main \公众$@ 这个是真正实行的第一个spark的类。返回值由while循环读取,加入到CMD中。
launcher.Main返回的数据存储到CMD中。
然后实行命令:
exec \公众${CMD[@]}\"大众
launcher.Main
org.apache.spark.launcher.Main是Spark启动器的命令行接口,spark-class调用完org.apache.spark.launcher.Main后,实行exec \公众${CMD[@]}\公众,会启动org.apache.spark.deploy.SparkSubmit的实行,org.apache.spark.launcher.Main部分源码如下:
public static void main(String[] argsArray) throws Exception { ... List<String> args = new ArrayList<String>(Arrays.asList(argsArray)); String className = args.remove(0); ... //创建命令解析器 AbstractCommandBuilder builder; if (className.equals(\"大众org.apache.spark.deploy.SparkSubmit\"大众)) { try { //调用org.apache.spark.deploy.SparkSubmit builder = new SparkSubmitCommandBuilder(args); } catch (IllegalArgumentException e) { //......省略 builder = new SparkSubmitCommandBuilder(help); } } else { // 启动其他类 builder = new SparkClassCommandBuilder(className, args); } List<String> cmd = builder.buildCommand(env);//解析器解析参数 ... //返回有效的参数 if (isWindows()) { System.out.println(prepareWindowsCommand(cmd, env)); } else { List<String> bashCmd = prepareBashCommand(cmd, env); for (String c : bashCmd) { System.out.print(c); System.out.print('\0'); } } }
这样就进入了org.apache.spark.deploy.SparkSubmit方法入口函数。SparkSubmit部分源码如下:
def main(args: Array[String]): Unit = { //任务提交时设置的参数,例如master、executorMemory等 val appArgs = new SparkSubmitArguments(args) if (appArgs.verbose) { // scalastyle:off println printStream.println(appArgs) // scalastyle:on println } appArgs.action match { //任务提交时,实行submit(appArgs) case SparkSubmitAction.SUBMIT => submit(appArgs) case SparkSubmitAction.KILL => kill(appArgs) case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) } }创建SparkSubmitArguments工具,并解析参数来初始化成员。这里只剖析submit过程。
SparkSubmitArguments
此类,解析并封装来自spark-submit脚本的参数。当new SparkSubmitArguments(args)的时候会顺序触发
// Set parameters from command line arguments try { //解析命令行参数 parse(args.toList) } catch { //省略 } // Populate `sparkProperties` map from properties file // 将默认属性文件(默认为spark-defaults.conf)中配置的参数值与通过-conf指定的值合并; mergeDefaultSparkProperties() // Remove keys that don't start with \"大众spark.\公众 from `sparkProperties`. //删除不以\"大众spark.\"大众开头的属性 ignoreNonSparkProperties() // Use `sparkProperties` map along with env vars to fill in any missing parameters //从环境变量、Spark属性等加载参数。此方法会设置action参数,默认为SUBMIT; loadEnvironmentArguments() //验证action操作所须要的属性都已设置 validateArguments()解析Sprak-submit脚本通报过来的参数将默认属性文件的配置值和--conf通报过来的属性值合并,取--conf的值,删除不以\"大众spark.\"大众开头的属性从环境变量、Spark属性等加载参数。此方法会设置action参数,默认为SUBMIT;根据action值的不同,检讨action对应操作所须要的属性是否都已设置,不同的action操作有不同的检测方法。
private def validateArguments(): Unit = { action match { case SUBMIT => validateSubmitArguments() case KILL => validateKillArguments() case REQUEST_STATUS => validateStatusRequestArguments() }}
Spark属性参数优先级
运用程序中设置的参数;命令行设置的参数;在配置文件中设置的参数(默认为spark-defaults.conf);环境变量中设置的属性值。SparkSubmit.submit
该方法的功能分为两步骤:
首先,我们通过设置类路径、系统属性和运用程序参数准备启动环境来运行基于集群管理器和支配模式的子主类。其次,我们利用这个启动环境来调用子类的主理法。下面来看submit方法的实现:
private def submit(args: SparkSubmitArguments): Unit = { val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args) def doRunMain(): Unit = { // args.proxyUser可以在命令行中通过args.proxyUser通报,仿照指定的用户提交程序 if (args.proxyUser != null) { val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser, UserGroupInformation.getCurrentUser()) try { proxyUser.doAs(new PrivilegedExceptionAction[Unit]() { override def run(): Unit = { //重点!
!
!
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) } }) } catch { case e: Exception => .... } } else { //重点!
!
!
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) } } // In standalone cluster mode, there are two submission gateways: // (1) The traditional Akka gateway using o.a.s.deploy.Client as a wrapper // (2) The new REST-based gateway introduced in Spark 1.3 // The latter is the default behavior as of Spark 1.3, but Spark submit will fail over // to use the legacy gateway if the master endpoint turns out to be not a REST server. if (args.isStandaloneCluster && args.useRest) { try { printStream.println(\公众Running Spark using the REST application submission protocol.\"大众) doRunMain() } catch { // Fail over to use the legacy submission gateway case e: SubmitRestConnectionException => printWarning(s\公众Master endpoint ${args.master} was not a REST server. \公众 + \"大众Falling back to legacy submission gateway instead.\"大众) args.useRest = false submit(args) } // In all other modes, just run the main class as prepared } else { doRunMain() } }prepareSubmitEnvironment个中一个职责便是设置childMainClass,它决定了运用程序主类的调用办法;调用doRunMain内部方法,它将调用runMain方法。
SparkSubmit.prepareSubmitEnvironment
该方法紧张是为提走运用程序做准备,终极返回一个Tuple类型,具有4个元素:
childArgs:返回的第一个参数是子程序的参数childClasspath:进入子程序所须要的类路径sysProps:一个包括系统属性的map类型childMainClass:主类名,即org.apache.spark.deploy.yarn.Client、org.apache.spark.deploy.Client、org.apache.spark.deploy.rest.RestSubmissionClient或运用程序主类名等,这些Client类是userClass的包装类,userClass便是运用程序主类名,也便是用户自己编写的Spark程序的主类。 prepareSubmitEnvironment的方法署名:private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments) : (Seq[String], Seq[String], Map[String, String], String) = {设置集群资源管理器,表示程序将会运行在哪个集群资源管理器上
val clusterManager: Int = args.master match { case m if m.startsWith(\"大众yarn\"大众) => YARN case m if m.startsWith(\"大众spark\"大众) => STANDALONE case m if m.startsWith(\"大众mesos\公众) => MESOS case m if m.startsWith(\"大众local\"大众) => LOCAL case _ => printErrorAndExit(\"大众Master must start with yarn, spark, mesos, or local\"大众); -1}设置运用程序支配模式
var deployMode: Int = args.deployMode match { case \公众client\"大众 | null => CLIENT case \"大众cluster\"大众 => CLUSTER case _ => printErrorAndExit(\"大众Deploy mode must be either client or cluster\"大众); -1}在client模式下,直接将childMainClass设置为运用程序主类名
//在client模式下,直接启动运用程序主类,并将运用程序jar和其它依赖的jar添加到类路径中 if (deployMode == CLIENT) { childMainClass = args.mainClass if (isUserJar(args.primaryResource)) { childClasspath += args.primaryResource } if (args.jars != null) { childClasspath ++= args.jars.split(\"大众,\公众) } if (args.childArgs != null) { childArgs ++= args.childArgs } }在standalone cluster模式下,将childMainClass设置org.apache.spark.deploy.rest.RestSubmissionClient或org.apache.spark.deploy.Client
if (args.isStandaloneCluster) { if (args.useRest) { childMainClass = \公众org.apache.spark.deploy.rest.RestSubmissionClient\公众 childArgs += (args.primaryResource, args.mainClass) } else { // In legacy standalone cluster mode, use Client as a wrapper around the user class childMainClass = \公众org.apache.spark.deploy.Client\公众 if (args.supervise) { childArgs += \公众--supervise\公众 } Option(args.driverMemory).foreach { m => childArgs += (\"大众--memory\公众, m) } Option(args.driverCores).foreach { c => childArgs += (\"大众--cores\"大众, c) } childArgs += \"大众launch\公众 childArgs += (args.master, args.primaryResource, args.mainClass) } if (args.childArgs != null) { childArgs ++= args.childArgs } }如果提交办法是yarn-cluster,则将childMainClass设置为org.apache.spark.deploy.yarn.Client
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class//在yarn-cluster模式下,利用yarn.Client用户包装类 if (isYarnCluster) { childMainClass = \"大众org.apache.spark.deploy.yarn.Client\公众 // ................ }
SparkSubmit.runMain
runMain方法的定义:private def runMain( childArgs: Seq[String], childClasspath: Seq[String], sysProps: Map[String, String], childMainClass: String, verbose: Boolean): Unit = {verbose可以在命令行中利用--verbose, -v设置,将runMain的参数输出到掌握台:
if (verbose) { printStream.println(s\"大众Main class:\n$childMainClass\公众) printStream.println(s\"大众Arguments:\n${childArgs.mkString(\公众\n\"大众)}\"大众) printStream.println(s\"大众System properties:\n${sysProps.mkString(\公众\n\"大众)}\公众) printStream.println(s\"大众Classpath elements:\n${childClasspath.mkString(\"大众\n\"大众)}\公众) printStream.println(\"大众\n\公众) }
举个例子,如果提交办法为client:spark-submit -v --class com.carol.SparkTest --master yarn-client testSpark.jar 100 2,那么,会有如下输出:
Main class:com.carol.SparkTestArguments:100 2--nametestSpark--jarfile:/home/jars/testSpark.jar--classcom.carol.SparkTestSystem properties:.....Classpath elements:.....
如果提交办法为cluster:spark-submit -v --class com.carol.SparkTest --master yarn-cluster testSpark.jar 100 2,那么,会有如下输出:
Main class:org.apache.spark.deploy.yarn.ClientArguments:--nametestSpark--jarfile:/home/jars/testSpark.jar--classcom.carol.SparkTest--arg100--arg2System properties:.....Classpath elements:.....加载jar
for (jar <- childClasspath) { addJarToClasspath(jar, loader) }将Spark属性参数设置为系统属性(很多地方采取从System属性中获取参数,比如创建SparkConf时从系统中加载):
for ((key, value) <- sysProps) { System.setProperty(key, value) }创建childMainClass的类工具:
try { mainClass = Utils.classForName(childMainClass) } catch { // ..... }获取main方法工具:
val mainMethod = mainClass.getMethod(\公众main\"大众, new Array[String](0).getClass)调用main方法:
try { mainMethod.invoke(null, childArgs.toArray) } catch { // ..... }
到此,就已经调用prepareSubmitEnvironment方法设置的childMainClass类了。那么childMainClass的取值可为:
运用程序主类名; //Clientorg.apache.spark.deploy.rest.StandaloneRestClient; // Standalone rest Clientorg.apache.spark.deploy.Client; // Standalone Clientorg.apache.spark.deploy.yarn.Client //yarn cluster
接下来便是通过Master、Worker启动DriverWrapper进程,进而启动运用程序主类的过程。
好了,内容就到这里了,照例放张美图安歇一下
感兴趣的朋友可以点个关注,我们一起学习进步!