Skip to content

Commit

Permalink
Adding in the --addJars option to make SparkContext.addJar work on ya…
Browse files Browse the repository at this point in the history
…rn and cleanup

the classpaths
  • Loading branch information
tgravescs committed Oct 3, 2013
1 parent 9d42468 commit 0fff4ee
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 23 deletions.
17 changes: 14 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -643,10 +643,21 @@ class SparkContext(
key = uri.getScheme match {
case null | "file" =>
if (env.hadoop.isYarnMode()) {
logWarning("local jar specified as parameter to addJar under Yarn mode")
return
// In order for this to work on yarn the user must specify the --addjars option to
// the client to upload the file into the distributed cache to make it show up in the
// current working directory.
val fileName = new Path(uri.getPath).getName()
try {
env.httpFileServer.addJar(new File(fileName))
} catch {
case e: Exception => {
logError("Error adding jar (" + e + "), was the --addJars option used?")
throw e
}
}
} else {
env.httpFileServer.addJar(new File(uri.getPath))
}
env.httpFileServer.addJar(new File(uri.getPath))
case _ =>
path
}
Expand Down
2 changes: 2 additions & 0 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ The command to launch the YARN Client is as follows:
--worker-memory <MEMORY_PER_WORKER> \
--worker-cores <CORES_PER_WORKER> \
--queue <queue_name> \
--addJars <any_local_files_used_in_SparkContext.addJar> \
--files <files_for_distributed_cache> \
--archives <archives_for_distributed_cache>

Expand Down Expand Up @@ -88,3 +89,4 @@ The above starts a YARN Client programs which periodically polls the Application
- We do not requesting container resources based on the number of cores. Thus the numbers of cores given via command line arguments cannot be guaranteed.
- The local directories used for spark will be the local directories configured for YARN (Hadoop Yarn config yarn.nodemanager.local-dirs). If the user specifies spark.local.dir, it will be ignored.
- The --files and --archives options support specifying file names with the # similar to Hadoop. For example you can specify: --files localtest.txt#appSees.txt and this will upload the file you have locally named localtest.txt into HDFS but this will be linked to by the name appSees.txt and your application should use the name as appSees.txt to reference it when running on YARN.
- The --addJars option allows the SparkContext.addJar function to work if you are using it with local files. It does not need to be used if you are using it with HDFS, HTTP, HTTPS, or FTP files.
27 changes: 20 additions & 7 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
originalPath: Path,
replication: Short,
localResources: HashMap[String,LocalResource],
fragment: String) = {
fragment: String,
appMasterOnly: Boolean = false): Unit = {
val fs = FileSystem.get(conf)
val newPath = new Path(dstDir, originalPath.getName())
logInfo("Uploading " + originalPath + " to " + newPath)
Expand All @@ -149,6 +150,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
pathURI = new URI(newPath.toString() + "#" + fragment);
}
val distPath = pathURI.toString()
if (appMasterOnly == true) return
if (resourceType == LocalResourceType.FILE) {
distFiles match {
case Some(path) =>
Expand Down Expand Up @@ -223,6 +225,16 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}
}

// handle any add jars
if ((args.addJars != null) && (!args.addJars.isEmpty())){
args.addJars.split(',').foreach { case file: String =>
val tmpURI = new URI(file)
val tmp = new Path(tmpURI)
copyLocalFile(dst, LocalResourceType.FILE, tmp, replication, localResources,
tmpURI.getFragment(), true)
}
}

// handle any distributed cache files
if ((args.files != null) && (!args.files.isEmpty())){
args.files.split(',').foreach { case file: String =>
Expand Down Expand Up @@ -253,11 +265,10 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl

val env = new HashMap[String, String]()

// If log4j present, ensure ours overrides all others
if (log4jConfLocalRes != null) Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./")
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
Apps.addToEnvironment(env, Environment.CLASSPATH.name,
Environment.PWD.$() + Path.SEPARATOR + "*")

Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./*")
Apps.addToEnvironment(env, Environment.CLASSPATH.name, "$CLASSPATH")
Client.populateHadoopClasspath(yarnConf, env)
env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_JAR_PATH") =
Expand All @@ -279,6 +290,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
env("SPARK_YARN_LOG4J_SIZE") = log4jConfLocalRes.getSize().toString()
}

// set the environment variables to be passed on to the Workers
if (distFiles != None) {
env("SPARK_YARN_CACHE_FILES") = distFiles.get
env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") = distFilesTimeStamps.get
Expand Down Expand Up @@ -328,8 +340,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
// Add Xmx for am memory
JAVA_OPTS += "-Xmx" + amMemory + "m "

JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(),
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
JAVA_OPTS += " -Djava.io.tmpdir=" +
new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "


// Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out.
Expand All @@ -345,6 +357,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
}

if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.scheduler.{InputFormatInfo, SplitInfo}

// TODO: Add code and support for ensuring that yarn resource 'asks' are location aware !
class ClientArguments(val args: Array[String]) {
var addJars: String = null
var files: String = null
var archives: String = null
var userJar: String = null
Expand Down Expand Up @@ -80,6 +81,10 @@ class ClientArguments(val args: Array[String]) {
amQueue = value
args = tail

case ("--addJars") :: value :: tail =>
addJars = value
args = tail

case ("--files") :: value :: tail =>
files = value
args = tail
Expand Down Expand Up @@ -119,8 +124,9 @@ class ClientArguments(val args: Array[String]) {
" --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
" --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" +
" --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')\n" +
" --files file Comma separated list of files to be distributed with the job.\n" +
" --archives archive Comma separated list of archives to be distributed with the job."
" --addJars jars Comma separated list of local jars that want SparkContext.addJar to work with.\n" +
" --files files Comma separated list of files to be distributed with the job.\n" +
" --archives archives Comma separated list of archives to be distributed with the job."
)
System.exit(exitCode)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
}

JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(),
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
JAVA_OPTS += " -Djava.io.tmpdir=" +
new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "


// Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out.
// The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same
Expand Down Expand Up @@ -215,15 +216,9 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
def prepareEnvironment: HashMap[String, String] = {
val env = new HashMap[String, String]()

// If log4j present, ensure ours overrides all others
if (System.getenv("SPARK_YARN_LOG4J_PATH") != null) {
// Which is correct ?
Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./log4j.properties")
Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./")
}

Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./*")
Apps.addToEnvironment(env, Environment.CLASSPATH.name, "$CLASSPATH")
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
Apps.addToEnvironment(env, Environment.CLASSPATH.name,
Environment.PWD.$() + Path.SEPARATOR + "*")
Client.populateHadoopClasspath(yarnConf, env)

// allow users to specify some environment variables
Expand Down

0 comments on commit 0fff4ee

Please sign in to comment.