Skip to content

Commit

Permalink
Create fewer function objects in uses of AppendOnlyMap.changeValue
Browse files Browse the repository at this point in the history
  • Loading branch information
mateiz committed Oct 9, 2013
1 parent 0b35051 commit 12d5931
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 20 deletions.
30 changes: 14 additions & 16 deletions core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,28 +33,26 @@ case class Aggregator[K, V, C] (

def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
val combiners = new AppendOnlyMap[K, C]
for ((k, v) <- iter) {
combiners.changeValue(k, (hadValue, oldValue) => {
if (hadValue) {
mergeValue(oldValue, v)
} else {
createCombiner(v)
}
})
var kv: Product2[K, V] = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
}
while (iter.hasNext) {
kv = iter.next()
combiners.changeValue(kv._1, update)
}
combiners.iterator
}

def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = {
val combiners = new AppendOnlyMap[K, C]
for ((k, c) <- iter) {
combiners.changeValue(k, (hadValue, oldValue) => {
if (hadValue) {
mergeCombiners(oldValue, c)
} else {
c
}
})
var kc: (K, C) = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeCombiners(oldValue, kc._2) else kc._2
}
while (iter.hasNext) {
kc = iter.next()
combiners.changeValue(kc._1, update)
}
combiners.iterator
}
Expand Down
10 changes: 6 additions & 4 deletions core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,12 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
// e.g. for `(k, a) cogroup (k, b)`, K -> Seq(ArrayBuffer as, ArrayBuffer bs)
val map = new AppendOnlyMap[K, Seq[ArrayBuffer[Any]]]

def getSeq(k: K): Seq[ArrayBuffer[Any]] = {
map.changeValue(k, (hadValue, oldValue) => {
if (hadValue) oldValue else Array.fill(numRdds)(new ArrayBuffer[Any])
})
val update: (Boolean, Seq[ArrayBuffer[Any]]) => Seq[ArrayBuffer[Any]] = (hadVal, oldVal) => {
if (hadVal) oldVal else Array.fill(numRdds)(new ArrayBuffer[Any])
}

val getSeq = (k: K) => {
map.changeValue(k, update)
}

val ser = SparkEnv.get.serializerManager.get(serializerClass)
Expand Down

0 comments on commit 12d5931

Please sign in to comment.