Skip to content

Commit

Permalink
Merge pull request #19 from aarondav/master-zk
Browse files Browse the repository at this point in the history
Standalone Scheduler fault tolerance using ZooKeeper

This patch implements full distributed fault tolerance for standalone scheduler Masters.
There is only one master Leader at a time, which is actively serving scheduling
requests. If this Leader crashes, another master will eventually be elected, reconstruct
the state from the first Master, and continue serving scheduling requests.

Leader election is performed using the ZooKeeper leader election pattern. We try to minimize
the use of ZooKeeper and the assumptions about ZooKeeper's behavior, so there is a layer of
retries and session monitoring on top of the ZooKeeper client.

Master failover follows directly from the single-node Master recovery via the file
system (patch d5a96fe), save that the Master state is stored in ZooKeeper instead.

Configuration:
By default, no recovery mechanism is enabled (spark.deploy.recoveryMode = NONE).
By setting spark.deploy.recoveryMode to ZOOKEEPER and setting spark.deploy.zookeeper.url
to an appropriate ZooKeeper URL, ZooKeeper recovery mode is enabled.
By setting spark.deploy.recoveryMode to FILESYSTEM and setting spark.deploy.recoveryDirectory
to an appropriate directory accessible by the Master, we will keep the behavior of from d5a96fe.

Additionally, places where a Master could be specificied by a spark:// url can now take
comma-delimited lists to specify backup masters. Note that this is only used for registration
of NEW Workers and application Clients. Once a Worker or Client has registered with the
Master Leader, it is "in the system" and will never need to register again.
  • Loading branch information
mateiz committed Oct 11, 2013
2 parents cd08f73 + 66c2063 commit c71499b
Show file tree
Hide file tree
Showing 45 changed files with 1,947 additions and 175 deletions.
2 changes: 0 additions & 2 deletions bin/stop-slaves.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
# limitations under the License.
#

# Starts the master on the machine this script is executed on.

bin=`dirname "$0"`
bin=`cd "$bin"; pwd`

Expand Down
9 changes: 5 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class SparkContext(
// Regular expression for simulating a Spark cluster of [N, cores, memory] locally
val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
// Regular expression for connecting to Spark deploy clusters
val SPARK_REGEX = """(spark://.*)""".r
val SPARK_REGEX = """spark://(.*)""".r
//Regular expression for connection to Mesos cluster
val MESOS_REGEX = """(mesos://.*)""".r

Expand All @@ -172,7 +172,8 @@ class SparkContext(

case SPARK_REGEX(sparkUrl) =>
val scheduler = new ClusterScheduler(this)
val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new SparkDeploySchedulerBackend(scheduler, this, masterUrls, appName)
scheduler.initialize(backend)
scheduler

Expand All @@ -188,8 +189,8 @@ class SparkContext(
val scheduler = new ClusterScheduler(this)
val localCluster = new LocalSparkCluster(
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
val sparkUrl = localCluster.start()
val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName)
val masterUrls = localCluster.start()
val backend = new SparkDeploySchedulerBackend(scheduler, this, masterUrls, appName)
scheduler.initialize(backend)
backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
localCluster.stop()
Expand Down
29 changes: 18 additions & 11 deletions core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import scala.collection.immutable.List

import org.apache.spark.deploy.ExecutorState.ExecutorState
import org.apache.spark.deploy.master.{WorkerInfo, ApplicationInfo}
import org.apache.spark.deploy.master.RecoveryState.MasterState
import org.apache.spark.deploy.worker.ExecutorRunner
import org.apache.spark.util.Utils


private[deploy] sealed trait DeployMessage extends Serializable

/** Contains messages sent between Scheduler actor nodes. */
private[deploy] object DeployMessages {

// Worker to Master
Expand All @@ -52,17 +54,20 @@ private[deploy] object DeployMessages {
exitStatus: Option[Int])
extends DeployMessage

case class WorkerSchedulerStateResponse(id: String, executors: List[ExecutorDescription])

case class Heartbeat(workerId: String) extends DeployMessage

// Master to Worker

case class RegisteredWorker(masterWebUiUrl: String) extends DeployMessage
case class RegisteredWorker(masterUrl: String, masterWebUiUrl: String) extends DeployMessage

case class RegisterWorkerFailed(message: String) extends DeployMessage

case class KillExecutor(appId: String, execId: Int) extends DeployMessage
case class KillExecutor(masterUrl: String, appId: String, execId: Int) extends DeployMessage

case class LaunchExecutor(
masterUrl: String,
appId: String,
execId: Int,
appDesc: ApplicationDescription,
Expand All @@ -76,9 +81,11 @@ private[deploy] object DeployMessages {
case class RegisterApplication(appDescription: ApplicationDescription)
extends DeployMessage

case class MasterChangeAcknowledged(appId: String)

// Master to Client

case class RegisteredApplication(appId: String) extends DeployMessage
case class RegisteredApplication(appId: String, masterUrl: String) extends DeployMessage

// TODO(matei): replace hostPort with host
case class ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) {
Expand All @@ -94,14 +101,19 @@ private[deploy] object DeployMessages {

case object StopClient

// Master to Worker & Client

case class MasterChanged(masterUrl: String, masterWebUiUrl: String)

// MasterWebUI To Master

case object RequestMasterState

// Master to MasterWebUI

case class MasterStateResponse(host: String, port: Int, workers: Array[WorkerInfo],
activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo]) {
activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo],
status: MasterState) {

Utils.checkHost(host, "Required hostname")
assert (port > 0)
Expand All @@ -123,12 +135,7 @@ private[deploy] object DeployMessages {
assert (port > 0)
}

// Actor System to Master

case object CheckForWorkerTimeOut

case object RequestWebUIPort

case class WebUIPortResponse(webUIBoundPort: Int)
// Actor System to Worker

case object SendHeartbeat
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy

/**
* Used to send state on-the-wire about Executors from Worker to Master.
* This state is sufficient for the Master to reconstruct its internal data structures during
* failover.
*/
private[spark] class ExecutorDescription(
val appId: String,
val execId: Int,
val cores: Int,
val state: ExecutorState.Value)
extends Serializable {

override def toString: String =
"ExecutorState(appId=%s, execId=%d, cores=%d, state=%s)".format(appId, execId, cores, state)
}
Loading

0 comments on commit c71499b

Please sign in to comment.