Skip to content

Commit

Permalink
Merge branch 'master' into fast-map
Browse files Browse the repository at this point in the history
Conflicts:
	core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
  • Loading branch information
mateiz committed Oct 10, 2013
2 parents 12d5931 + 320418f commit 001d13f
Show file tree
Hide file tree
Showing 18 changed files with 470 additions and 118 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ class SparkContext(
}
taskScheduler.start()

@volatile private var dagScheduler = new DAGScheduler(taskScheduler)
@volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler)
dagScheduler.start()

ui.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class ApplicationSource(val application: ApplicationInfo) extends Source {
override def getValue: Long = application.duration
})

metricRegistry.register(MetricRegistry.name("cores", "number"), new Gauge[Int] {
metricRegistry.register(MetricRegistry.name("cores"), new Gauge[Int] {
override def getValue: Int = application.coresGranted
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@ private[spark] class MasterSource(val master: Master) extends Source {
val sourceName = "master"

// Gauge for worker numbers in cluster
metricRegistry.register(MetricRegistry.name("workers","number"), new Gauge[Int] {
metricRegistry.register(MetricRegistry.name("workers"), new Gauge[Int] {
override def getValue: Int = master.workers.size
})

// Gauge for application numbers in cluster
metricRegistry.register(MetricRegistry.name("apps", "number"), new Gauge[Int] {
metricRegistry.register(MetricRegistry.name("apps"), new Gauge[Int] {
override def getValue: Int = master.apps.size
})

// Gauge for waiting application numbers in cluster
metricRegistry.register(MetricRegistry.name("waitingApps", "number"), new Gauge[Int] {
metricRegistry.register(MetricRegistry.name("waitingApps"), new Gauge[Int] {
override def getValue: Int = master.waitingApps.size
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,27 @@ private[spark] class WorkerSource(val worker: Worker) extends Source {
val sourceName = "worker"
val metricRegistry = new MetricRegistry()

metricRegistry.register(MetricRegistry.name("executors", "number"), new Gauge[Int] {
metricRegistry.register(MetricRegistry.name("executors"), new Gauge[Int] {
override def getValue: Int = worker.executors.size
})

// Gauge for cores used of this worker
metricRegistry.register(MetricRegistry.name("coresUsed", "number"), new Gauge[Int] {
metricRegistry.register(MetricRegistry.name("coresUsed"), new Gauge[Int] {
override def getValue: Int = worker.coresUsed
})

// Gauge for memory used of this worker
metricRegistry.register(MetricRegistry.name("memUsed", "MBytes"), new Gauge[Int] {
metricRegistry.register(MetricRegistry.name("memUsed_MB"), new Gauge[Int] {
override def getValue: Int = worker.memoryUsed
})

// Gauge for cores free of this worker
metricRegistry.register(MetricRegistry.name("coresFree", "number"), new Gauge[Int] {
metricRegistry.register(MetricRegistry.name("coresFree"), new Gauge[Int] {
override def getValue: Int = worker.coresFree
})

// Gauge for memory free of this worker
metricRegistry.register(MetricRegistry.name("memFree", "MBytes"), new Gauge[Int] {
metricRegistry.register(MetricRegistry.name("memFree_MB"), new Gauge[Int] {
override def getValue: Int = worker.memoryFree
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,31 +43,31 @@ class ExecutorSource(val executor: Executor, executorId: String) extends Source
val sourceName = "executor.%s".format(executorId)

// Gauge for executor thread pool's actively executing task counts
metricRegistry.register(MetricRegistry.name("threadpool", "activeTask", "count"), new Gauge[Int] {
metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), new Gauge[Int] {
override def getValue: Int = executor.threadPool.getActiveCount()
})

// Gauge for executor thread pool's approximate total number of tasks that have been completed
metricRegistry.register(MetricRegistry.name("threadpool", "completeTask", "count"), new Gauge[Long] {
metricRegistry.register(MetricRegistry.name("threadpool", "completeTasks"), new Gauge[Long] {
override def getValue: Long = executor.threadPool.getCompletedTaskCount()
})

// Gauge for executor thread pool's current number of threads
metricRegistry.register(MetricRegistry.name("threadpool", "currentPool", "size"), new Gauge[Int] {
metricRegistry.register(MetricRegistry.name("threadpool", "currentPool_size"), new Gauge[Int] {
override def getValue: Int = executor.threadPool.getPoolSize()
})

// Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool
metricRegistry.register(MetricRegistry.name("threadpool", "maxPool", "size"), new Gauge[Int] {
metricRegistry.register(MetricRegistry.name("threadpool", "maxPool_size"), new Gauge[Int] {
override def getValue: Int = executor.threadPool.getMaximumPoolSize()
})

// Gauge for file system stats of this executor
for (scheme <- Array("hdfs", "file")) {
registerFileSystemStat(scheme, "bytesRead", _.getBytesRead(), 0L)
registerFileSystemStat(scheme, "bytesWritten", _.getBytesWritten(), 0L)
registerFileSystemStat(scheme, "readOps", _.getReadOps(), 0)
registerFileSystemStat(scheme, "largeReadOps", _.getLargeReadOps(), 0)
registerFileSystemStat(scheme, "writeOps", _.getWriteOps(), 0)
registerFileSystemStat(scheme, "read_bytes", _.getBytesRead(), 0L)
registerFileSystemStat(scheme, "write_bytes", _.getBytesWritten(), 0L)
registerFileSystemStat(scheme, "read_ops", _.getReadOps(), 0)
registerFileSystemStat(scheme, "largeRead_ops", _.getLargeReadOps(), 0)
registerFileSystemStat(scheme, "write_ops", _.getWriteOps(), 0)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class DAGScheduler(

private[spark] val stageToInfos = new TimeStampedHashMap[Stage, StageInfo]

private val listenerBus = new SparkListenerBus()
private[spark] val listenerBus = new SparkListenerBus()

// Contains the locations that each RDD's partitions are cached on
private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,23 @@ private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: Spar
val metricRegistry = new MetricRegistry()
val sourceName = "%s.DAGScheduler".format(sc.appName)

metricRegistry.register(MetricRegistry.name("stage", "failedStages", "number"), new Gauge[Int] {
metricRegistry.register(MetricRegistry.name("stage", "failedStages"), new Gauge[Int] {
override def getValue: Int = dagScheduler.failed.size
})

metricRegistry.register(MetricRegistry.name("stage", "runningStages", "number"), new Gauge[Int] {
metricRegistry.register(MetricRegistry.name("stage", "runningStages"), new Gauge[Int] {
override def getValue: Int = dagScheduler.running.size
})

metricRegistry.register(MetricRegistry.name("stage", "waitingStages", "number"), new Gauge[Int] {
metricRegistry.register(MetricRegistry.name("stage", "waitingStages"), new Gauge[Int] {
override def getValue: Int = dagScheduler.waiting.size
})

metricRegistry.register(MetricRegistry.name("job", "allJobs", "number"), new Gauge[Int] {
metricRegistry.register(MetricRegistry.name("job", "allJobs"), new Gauge[Int] {
override def getValue: Int = dagScheduler.nextJobId.get()
})

metricRegistry.register(MetricRegistry.name("job", "activeJobs", "number"), new Gauge[Int] {
metricRegistry.register(MetricRegistry.name("job", "activeJobs"), new Gauge[Int] {
override def getValue: Int = dagScheduler.activeJobs.size
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,23 @@ private[spark] class SparkListenerBus() extends Logging {
queueFullErrorMessageLogged = true
}
}

/**
* Waits until there are no more events in the queue, or until the specified time has elapsed.
* Used for testing only. Returns true if the queue has emptied and false is the specified time
* elapsed before the queue emptied.
*/
def waitUntilEmpty(timeoutMillis: Int): Boolean = {
val finishTime = System.currentTimeMillis + timeoutMillis
while (!eventQueue.isEmpty()) {
if (System.currentTimeMillis > finishTime) {
return false
}
/* Sleep rather than using wait/notify, because this is used only for testing and wait/notify
* add overhead in the general case. */
Thread.sleep(10)
}
return true
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.io.{EOFException, InputStream, OutputStream}
import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
import com.esotericsoftware.kryo.{KryoException, Kryo}
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import com.twitter.chill.ScalaKryoInstantiator
import com.twitter.chill.{EmptyScalaKryoInstantiator, AllScalaRegistrar}

import org.apache.spark.{SerializableWritable, Logging}
import org.apache.spark.storage.{GetBlock, GotBlock, PutBlock, StorageLevel}
Expand All @@ -39,7 +39,7 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging
def newKryoOutput() = new KryoOutput(bufferSize)

def newKryo(): Kryo = {
val instantiator = new ScalaKryoInstantiator
val instantiator = new EmptyScalaKryoInstantiator
val kryo = instantiator.newKryo()
val classLoader = Thread.currentThread.getContextClassLoader

Expand All @@ -49,7 +49,11 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging
StorageLevel.MEMORY_ONLY,
PutBlock("1", ByteBuffer.allocate(1), StorageLevel.MEMORY_ONLY),
GotBlock("1", ByteBuffer.allocate(1)),
GetBlock("1")
GetBlock("1"),
1 to 10,
1 until 10,
1L to 10L,
1L until 10L
)

for (obj <- toRegister) kryo.register(obj.getClass)
Expand All @@ -69,6 +73,10 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging
case _: Exception => println("Failed to register spark.kryo.registrator")
}

// Register Chill's classes; we do this after our ranges and the user's own classes to let
// our code override the generic serialziers in Chill for things like Seq
new AllScalaRegistrar().apply(kryo)

kryo.setClassLoader(classLoader)

// Allow disabling Kryo reference tracking if user knows their object graphs don't have loops
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,23 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
val metricRegistry = new MetricRegistry()
val sourceName = "%s.BlockManager".format(sc.appName)

metricRegistry.register(MetricRegistry.name("memory", "maxMem", "MBytes"), new Gauge[Long] {
metricRegistry.register(MetricRegistry.name("memory", "maxMem_MB"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
maxMem / 1024 / 1024
}
})

metricRegistry.register(MetricRegistry.name("memory", "remainingMem", "MBytes"), new Gauge[Long] {
metricRegistry.register(MetricRegistry.name("memory", "remainingMem_MB"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
remainingMem / 1024 / 1024
}
})

metricRegistry.register(MetricRegistry.name("memory", "memUsed", "MBytes"), new Gauge[Long] {
metricRegistry.register(MetricRegistry.name("memory", "memUsed_MB"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
Expand All @@ -53,7 +53,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
}
})

metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed", "MBytes"), new Gauge[Long] {
metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed_MB"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
val diskSpaceUsed = storageStatusList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,9 @@ import scala.collection.mutable
import org.scalatest.matchers.ShouldMatchers
import org.apache.spark.SparkContext._

/**
*
*/

class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers {

// TODO: This test has a race condition since the DAGScheduler now reports results
// asynchronously. It needs to be updated for that patch.
ignore("local metrics") {
test("local metrics") {
sc = new SparkContext("local[4]", "test")
val listener = new SaveStageInfo
sc.addSparkListener(listener)
Expand All @@ -45,7 +39,8 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc

val d = sc.parallelize(1 to 1e4.toInt, 64).map{i => w(i)}
d.count()
Thread.sleep(1000)
val WAIT_TIMEOUT_MILLIS = 10000
assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.stageInfos.size should be (1)

val d2 = d.map{i => w(i) -> i * 2}.setName("shuffle input 1")
Expand All @@ -57,18 +52,25 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc

d4.collectAsMap()

Thread.sleep(1000)
assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.stageInfos.size should be (4)
listener.stageInfos.foreach {stageInfo =>
//small test, so some tasks might take less than 1 millisecond, but average should be greater than 1 ms
listener.stageInfos.foreach { stageInfo =>
/* small test, so some tasks might take less than 1 millisecond, but average should be greater
* than 0 ms. */
checkNonZeroAvg(stageInfo.taskInfos.map{_._1.duration}, stageInfo + " duration")
checkNonZeroAvg(stageInfo.taskInfos.map{_._2.executorRunTime.toLong}, stageInfo + " executorRunTime")
checkNonZeroAvg(stageInfo.taskInfos.map{_._2.executorDeserializeTime.toLong}, stageInfo + " executorDeserializeTime")
checkNonZeroAvg(
stageInfo.taskInfos.map{_._2.executorRunTime.toLong},
stageInfo + " executorRunTime")
checkNonZeroAvg(
stageInfo.taskInfos.map{_._2.executorDeserializeTime.toLong},
stageInfo + " executorDeserializeTime")
if (stageInfo.stage.rdd.name == d4.name) {
checkNonZeroAvg(stageInfo.taskInfos.map{_._2.shuffleReadMetrics.get.fetchWaitTime}, stageInfo + " fetchWaitTime")
checkNonZeroAvg(
stageInfo.taskInfos.map{_._2.shuffleReadMetrics.get.fetchWaitTime},
stageInfo + " fetchWaitTime")
}

stageInfo.taskInfos.foreach{case (taskInfo, taskMetrics) =>
stageInfo.taskInfos.foreach { case (taskInfo, taskMetrics) =>
taskMetrics.resultSize should be > (0l)
if (isStage(stageInfo, Set(d2.name, d3.name), Set(d4.name))) {
taskMetrics.shuffleWriteMetrics should be ('defined)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,27 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
check(List(mutable.HashMap("one" -> 1, "two" -> 2),mutable.HashMap(1->"one",2->"two",3->"three")))
}

test("ranges") {
val ser = (new KryoSerializer).newInstance()
def check[T](t: T) {
assert(ser.deserialize[T](ser.serialize(t)) === t)
// Check that very long ranges don't get written one element at a time
assert(ser.serialize(t).limit < 100)
}
check(1 to 1000000)
check(1 to 1000000 by 2)
check(1 until 1000000)
check(1 until 1000000 by 2)
check(1L to 1000000L)
check(1L to 1000000L by 2L)
check(1L until 1000000L)
check(1L until 1000000L by 2L)
check(1.0 to 1000000.0 by 1.0)
check(1.0 to 1000000.0 by 2.0)
check(1.0 until 1000000.0 by 1.0)
check(1.0 until 1000000.0 by 2.0)
}

test("custom registrator") {
System.setProperty("spark.kryo.registrator", classOf[MyRegistrator].getName)

Expand Down
24 changes: 20 additions & 4 deletions docs/mllib-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,9 @@ Available algorithms for clustering:

# Collaborative Filtering

[Collaborative
filtering](http://en.wikipedia.org/wiki/Recommender_system#Collaborative_filtering)
[Collaborative filtering](http://en.wikipedia.org/wiki/Recommender_system#Collaborative_filtering)
is commonly used for recommender systems. These techniques aim to fill in the
missing entries of a user-product association matrix. MLlib currently supports
missing entries of a user-item association matrix. MLlib currently supports
model-based collaborative filtering, in which users and products are described
by a small set of latent factors that can be used to predict missing entries.
In particular, we implement the [alternating least squares
Expand All @@ -158,7 +157,24 @@ following parameters:
* *numBlocks* is the number of blacks used to parallelize computation (set to -1 to auto-configure).
* *rank* is the number of latent factors in our model.
* *iterations* is the number of iterations to run.
* *lambda* specifies the regularization parameter in ALS.
* *lambda* specifies the regularization parameter in ALS.
* *implicitPrefs* specifies whether to use the *explicit feedback* ALS variant or one adapted for *implicit feedback* data
* *alpha* is a parameter applicable to the implicit feedback variant of ALS that governs the *baseline* confidence in preference observations

## Explicit vs Implicit Feedback

The standard approach to matrix factorization based collaborative filtering treats
the entries in the user-item matrix as *explicit* preferences given by the user to the item.

It is common in many real-world use cases to only have access to *implicit feedback*
(e.g. views, clicks, purchases, likes, shares etc.). The approach used in MLlib to deal with
such data is taken from
[Collaborative Filtering for Implicit Feedback Datasets](http://research.yahoo.com/pub/2433).
Essentially instead of trying to model the matrix of ratings directly, this approach treats the data as
a combination of binary preferences and *confidence values*. The ratings are then related
to the level of confidence in observed user preferences, rather than explicit ratings given to items.
The model then tries to find latent factors that can be used to predict the expected preference of a user
for an item.

Available algorithms for collaborative filtering:

Expand Down
2 changes: 1 addition & 1 deletion docs/python-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ This guide will show how to use the Spark features described there in Python.
There are a few key differences between the Python and Scala APIs:

* Python is dynamically typed, so RDDs can hold objects of multiple types.
* PySpark does not yet support a few API calls, such as `lookup`, `sort`, and non-text input files, though these will be added in future releases.
* PySpark does not yet support a few API calls, such as `lookup` and non-text input files, though these will be added in future releases.

In PySpark, RDDs support the same methods as their Scala counterparts but take Python functions and return Python collection types.
Short functions can be passed to RDD methods using Python's [`lambda`](http://www.diveintopython.net/power_of_introspection/lambda_functions.html) syntax:
Expand Down
Loading

0 comments on commit 001d13f

Please sign in to comment.