Skip to content

Commit

Permalink
Merge remote-tracking branch 'tgravescs/sparkYarnDistCache'
Browse files Browse the repository at this point in the history
Closes #11

Conflicts:
	docs/running-on-yarn.md
	yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
  • Loading branch information
mateiz committed Oct 11, 2013
2 parents c71499b + 0fff4ee commit 8f11c36
Show file tree
Hide file tree
Showing 6 changed files with 275 additions and 49 deletions.
17 changes: 14 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -672,10 +672,21 @@ class SparkContext(
key = uri.getScheme match {
case null | "file" =>
if (env.hadoop.isYarnMode()) {
logWarning("local jar specified as parameter to addJar under Yarn mode")
return
// In order for this to work on yarn the user must specify the --addjars option to
// the client to upload the file into the distributed cache to make it show up in the
// current working directory.
val fileName = new Path(uri.getPath).getName()
try {
env.httpFileServer.addJar(new File(fileName))
} catch {
case e: Exception => {
logError("Error adding jar (" + e + "), was the --addJars option used?")
throw e
}
}
} else {
env.httpFileServer.addJar(new File(uri.getPath))
}
env.httpFileServer.addJar(new File(uri.getPath))
case _ =>
path
}
Expand Down
9 changes: 8 additions & 1 deletion docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ Environment variables:

System Properties:
* 'spark.yarn.applicationMaster.waitTries', property to set the number of times the ApplicationMaster waits for the the spark master and then also the number of tries it waits for the Spark Context to be intialized. Default is 10.
* 'spark.yarn.submit.file.replication', the HDFS replication level for the files uploaded into HDFS for the application. These include things like the spark jar, the app jar, and any distributed cache files/archives.
* 'spark.yarn.preserve.staging.files', set to true to preserve the staged files(spark jar, app jar, distributed cache files) at the end of the job rather then delete them.

# Launching Spark on YARN

Expand All @@ -51,7 +53,10 @@ The command to launch the YARN Client is as follows:
--worker-memory <MEMORY_PER_WORKER> \
--worker-cores <CORES_PER_WORKER> \
--name <application_name> \
--queue <queue_name>
--queue <queue_name> \
--addJars <any_local_files_used_in_SparkContext.addJar> \
--files <files_for_distributed_cache> \
--archives <archives_for_distributed_cache>

For example:

Expand Down Expand Up @@ -84,3 +89,5 @@ The above starts a YARN Client programs which periodically polls the Application
- When your application instantiates a Spark context it must use a special "yarn-standalone" master url. This starts the scheduler without forcing it to connect to a cluster. A good way to handle this is to pass "yarn-standalone" as an argument to your program, as shown in the example above.
- We do not requesting container resources based on the number of cores. Thus the numbers of cores given via command line arguments cannot be guaranteed.
- The local directories used for spark will be the local directories configured for YARN (Hadoop Yarn config yarn.nodemanager.local-dirs). If the user specifies spark.local.dir, it will be ignored.
- The --files and --archives options support specifying file names with the # similar to Hadoop. For example you can specify: --files localtest.txt#appSees.txt and this will upload the file you have locally named localtest.txt into HDFS but this will be linked to by the name appSees.txt and your application should use the name as appSees.txt to reference it when running on YARN.
- The --addJars option allows the SparkContext.addJar function to work if you are using it with local files. It does not need to be used if you are using it with HDFS, HTTP, HTTPS, or FTP files.
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,25 @@

package org.apache.spark.deploy.yarn

import java.io.IOException;
import java.net.Socket
import java.security.PrivilegedExceptionAction
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.net.NetUtils
import org.apache.hadoop.util.ShutdownHookManager
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import scala.collection.JavaConversions._
import org.apache.spark.{SparkContext, Logging}
import org.apache.spark.util.Utils
import org.apache.hadoop.security.UserGroupInformation
import java.security.PrivilegedExceptionAction
import scala.collection.JavaConversions._

class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging {

Expand All @@ -43,18 +46,26 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
private var appAttemptId: ApplicationAttemptId = null
private var userThread: Thread = null
private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
private val fs = FileSystem.get(yarnConf)

private var yarnAllocator: YarnAllocationHandler = null
private var isFinished:Boolean = false
private var uiAddress: String = ""
private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
private var isLastAMRetry: Boolean = true


def run() {
// setup the directories so things go to yarn approved directories rather
// then user specified and /tmp
System.setProperty("spark.local.dir", getLocalDirs())

// use priority 30 as its higher then HDFS. Its same priority as MapReduce is using
ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)

appAttemptId = getApplicationAttemptId()
isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts;
resourceManager = registerWithResourceManager()

// Workaround until hadoop moves to something which has
Expand Down Expand Up @@ -183,6 +194,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// It need shutdown hook to set SUCCEEDED
successed = true
} finally {
logDebug("finishing main")
isLastAMRetry = true;
if (successed) {
ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
} else {
Expand Down Expand Up @@ -229,8 +242,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
}
}



private def allocateWorkers() {
try {
logInfo("Allocating " + args.numWorkers + " workers.")
Expand Down Expand Up @@ -329,6 +340,40 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
resourceManager.finishApplicationMaster(finishReq)

}

/**
* clean up the staging directory.
*/
private def cleanupStagingDir() {
var stagingDirPath: Path = null
try {
val preserveFiles = System.getProperty("spark.yarn.preserve.staging.files", "false").toBoolean
if (!preserveFiles) {
stagingDirPath = new Path(System.getenv("SPARK_YARN_JAR_PATH")).getParent()
if (stagingDirPath == null) {
logError("Staging directory is null")
return
}
logInfo("Deleting staging directory " + stagingDirPath)
fs.delete(stagingDirPath, true)
}
} catch {
case e: IOException =>
logError("Failed to cleanup staging dir " + stagingDirPath, e)
}
}

// The shutdown hook that runs when a signal is received AND during normal
// close of the JVM.
class AppMasterShutdownHook(appMaster: ApplicationMaster) extends Runnable {

def run() {
logInfo("AppMaster received a signal.")
// we need to clean up staging dir before HDFS is shut down
// make sure we don't delete it until this is the last AM
if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir()
}
}

}

Expand Down Expand Up @@ -368,6 +413,8 @@ object ApplicationMaster {
// Add a shutdown hook - as a best case effort in case users do not call sc.stop or do System.exit
// Should not really have to do this, but it helps yarn to evict resources earlier.
// not to mention, prevent Client declaring failure even though we exit'ed properly.
// Note that this will unfortunately not properly clean up the staging files because it gets called to
// late and the filesystem is already shutdown.
if (modified) {
Runtime.getRuntime().addShutdownHook(new Thread with Logging {
// This is not just to log, but also to ensure that log system is initialized for this instance when we actually are 'run'
Expand Down
160 changes: 138 additions & 22 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,13 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl

var rpc: YarnRPC = YarnRPC.create(conf)
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
val credentials = UserGroupInformation.getCurrentUser().getCredentials();
val credentials = UserGroupInformation.getCurrentUser().getCredentials()
private var distFiles = None: Option[String]
private var distFilesTimeStamps = None: Option[String]
private var distFilesFileSizes = None: Option[String]
private var distArchives = None: Option[String]
private var distArchivesTimeStamps = None: Option[String]
private var distArchivesFileSizes = None: Option[String]

def run() {
init(yarnConf)
Expand All @@ -57,7 +63,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl

verifyClusterResources(newApp)
val appContext = createApplicationSubmissionContext(appId)
val localResources = prepareLocalResources(appId, "spark")
val localResources = prepareLocalResources(appId, ".sparkStaging")
val env = setupLaunchEnv(localResources)
val amContainer = createContainerLaunchContext(newApp, localResources, env)

Expand Down Expand Up @@ -109,10 +115,73 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
appContext.setApplicationName(args.appName)
return appContext
}

def prepareLocalResources(appId: ApplicationId, appName: String): HashMap[String, LocalResource] = {

/**
* Copy the local file into HDFS and configure to be distributed with the
* job via the distributed cache.
* If a fragment is specified the file will be referenced as that fragment.
*/
private def copyLocalFile(
dstDir: Path,
resourceType: LocalResourceType,
originalPath: Path,
replication: Short,
localResources: HashMap[String,LocalResource],
fragment: String,
appMasterOnly: Boolean = false): Unit = {
val fs = FileSystem.get(conf)
val newPath = new Path(dstDir, originalPath.getName())
logInfo("Uploading " + originalPath + " to " + newPath)
fs.copyFromLocalFile(false, true, originalPath, newPath)
fs.setReplication(newPath, replication);
val destStatus = fs.getFileStatus(newPath)

val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
amJarRsrc.setType(resourceType)
amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION)
amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(newPath))
amJarRsrc.setTimestamp(destStatus.getModificationTime())
amJarRsrc.setSize(destStatus.getLen())
var pathURI: URI = new URI(newPath.toString() + "#" + originalPath.getName());
if ((fragment == null) || (fragment.isEmpty())){
localResources(originalPath.getName()) = amJarRsrc
} else {
localResources(fragment) = amJarRsrc
pathURI = new URI(newPath.toString() + "#" + fragment);
}
val distPath = pathURI.toString()
if (appMasterOnly == true) return
if (resourceType == LocalResourceType.FILE) {
distFiles match {
case Some(path) =>
distFilesFileSizes = Some(distFilesFileSizes.get + "," +
destStatus.getLen().toString())
distFilesTimeStamps = Some(distFilesTimeStamps.get + "," +
destStatus.getModificationTime().toString())
distFiles = Some(path + "," + distPath)
case _ =>
distFilesFileSizes = Some(destStatus.getLen().toString())
distFilesTimeStamps = Some(destStatus.getModificationTime().toString())
distFiles = Some(distPath)
}
} else {
distArchives match {
case Some(path) =>
distArchivesTimeStamps = Some(distArchivesTimeStamps.get + "," +
destStatus.getModificationTime().toString())
distArchivesFileSizes = Some(distArchivesFileSizes.get + "," +
destStatus.getLen().toString())
distArchives = Some(path + "," + distPath)
case _ =>
distArchivesTimeStamps = Some(destStatus.getModificationTime().toString())
distArchivesFileSizes = Some(destStatus.getLen().toString())
distArchives = Some(distPath)
}
}
}

def prepareLocalResources(appId: ApplicationId, sparkStagingDir: String): HashMap[String, LocalResource] = {
logInfo("Preparing Local resources")
val locaResources = HashMap[String, LocalResource]()
// Upload Spark and the application JAR to the remote file system
// Add them as local resources to the AM
val fs = FileSystem.get(conf)
Expand All @@ -125,33 +194,69 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}
}

val pathSuffix = sparkStagingDir + "/" + appId.toString() + "/"
val dst = new Path(fs.getHomeDirectory(), pathSuffix)
val replication = System.getProperty("spark.yarn.submit.file.replication", "3").toShort

if (UserGroupInformation.isSecurityEnabled()) {
val dstFs = dst.getFileSystem(conf)
dstFs.addDelegationTokens(delegTokenRenewer, credentials);
}
val localResources = HashMap[String, LocalResource]()

Map("spark.jar" -> System.getenv("SPARK_JAR"), "app.jar" -> args.userJar, "log4j.properties" -> System.getenv("SPARK_LOG4J_CONF"))
.foreach { case(destName, _localPath) =>
val localPath: String = if (_localPath != null) _localPath.trim() else ""
if (! localPath.isEmpty()) {
val src = new Path(localPath)
val pathSuffix = appName + "/" + appId.getId() + destName
val dst = new Path(fs.getHomeDirectory(), pathSuffix)
logInfo("Uploading " + src + " to " + dst)
fs.copyFromLocalFile(false, true, src, dst)
val destStatus = fs.getFileStatus(dst)

// get tokens for anything we upload to hdfs
if (UserGroupInformation.isSecurityEnabled()) {
fs.addDelegationTokens(delegTokenRenewer, credentials);
}
val newPath = new Path(dst, destName)
logInfo("Uploading " + src + " to " + newPath)
fs.copyFromLocalFile(false, true, src, newPath)
fs.setReplication(newPath, replication);
val destStatus = fs.getFileStatus(newPath)

val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
amJarRsrc.setType(LocalResourceType.FILE)
amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION)
amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst))
amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(newPath))
amJarRsrc.setTimestamp(destStatus.getModificationTime())
amJarRsrc.setSize(destStatus.getLen())
locaResources(destName) = amJarRsrc
localResources(destName) = amJarRsrc
}
}

// handle any add jars
if ((args.addJars != null) && (!args.addJars.isEmpty())){
args.addJars.split(',').foreach { case file: String =>
val tmpURI = new URI(file)
val tmp = new Path(tmpURI)
copyLocalFile(dst, LocalResourceType.FILE, tmp, replication, localResources,
tmpURI.getFragment(), true)
}
}

// handle any distributed cache files
if ((args.files != null) && (!args.files.isEmpty())){
args.files.split(',').foreach { case file: String =>
val tmpURI = new URI(file)
val tmp = new Path(tmpURI)
copyLocalFile(dst, LocalResourceType.FILE, tmp, replication, localResources,
tmpURI.getFragment())
}
}

// handle any distributed cache archives
if ((args.archives != null) && (!args.archives.isEmpty())) {
args.archives.split(',').foreach { case file:String =>
val tmpURI = new URI(file)
val tmp = new Path(tmpURI)
copyLocalFile(dst, LocalResourceType.ARCHIVE, tmp, replication,
localResources, tmpURI.getFragment())
}
}

UserGroupInformation.getCurrentUser().addCredentials(credentials);
return locaResources
return localResources
}

def setupLaunchEnv(localResources: HashMap[String, LocalResource]): HashMap[String, String] = {
Expand All @@ -160,11 +265,10 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl

val env = new HashMap[String, String]()

// If log4j present, ensure ours overrides all others
if (log4jConfLocalRes != null) Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./")
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
Apps.addToEnvironment(env, Environment.CLASSPATH.name,
Environment.PWD.$() + Path.SEPARATOR + "*")

Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./*")
Apps.addToEnvironment(env, Environment.CLASSPATH.name, "$CLASSPATH")
Client.populateHadoopClasspath(yarnConf, env)
env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_JAR_PATH") =
Expand All @@ -186,6 +290,18 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
env("SPARK_YARN_LOG4J_SIZE") = log4jConfLocalRes.getSize().toString()
}

// set the environment variables to be passed on to the Workers
if (distFiles != None) {
env("SPARK_YARN_CACHE_FILES") = distFiles.get
env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") = distFilesTimeStamps.get
env("SPARK_YARN_CACHE_FILES_FILE_SIZES") = distFilesFileSizes.get
}
if (distArchives != None) {
env("SPARK_YARN_CACHE_ARCHIVES") = distArchives.get
env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") = distArchivesTimeStamps.get
env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") = distArchivesFileSizes.get
}

// allow users to specify some environment variables
Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))

Expand Down
Loading

0 comments on commit 8f11c36

Please sign in to comment.