Skip to content

Commit

Permalink
Merge pull request #332 from JakubSroka/linear_reg
Browse files Browse the repository at this point in the history
added linear regression
  • Loading branch information
OlivierBlanvillain authored Sep 30, 2018
2 parents 0e71fc1 + 2775be3 commit 542c0c8
Show file tree
Hide file tree
Showing 6 changed files with 303 additions and 0 deletions.
70 changes: 70 additions & 0 deletions ml/src/main/scala/frameless/ml/internals/LinearInputsChecker.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package frameless
package ml
package internals

import org.apache.spark.ml.linalg._
import shapeless.ops.hlist.Length
import shapeless.{HList, LabelledGeneric, Nat, Witness}

import scala.annotation.implicitNotFound

/**
* Can be used for linear reg algorithm
*/
@implicitNotFound(
msg = "Cannot prove that ${Inputs} is a valid input type. " +
"Input type must only contain a field of type Double (the label) and a field of type " +
"org.apache.spark.ml.linalg.Vector (the features) and optional field of float type (weight)."
)
trait LinearInputsChecker[Inputs] {
val featuresCol: String
val labelCol: String
val weightCol: Option[String]
}

object LinearInputsChecker {

implicit def checkLinearInputs[
Inputs,
InputsRec <: HList,
LabelK <: Symbol,
FeaturesK <: Symbol](
implicit
i0: LabelledGeneric.Aux[Inputs, InputsRec],
i1: Length.Aux[InputsRec, Nat._2],
i2: SelectorByValue.Aux[InputsRec, Double, LabelK],
i3: Witness.Aux[LabelK],
i4: SelectorByValue.Aux[InputsRec, Vector, FeaturesK],
i5: Witness.Aux[FeaturesK]
): LinearInputsChecker[Inputs] = {
new LinearInputsChecker[Inputs] {
val labelCol: String = implicitly[Witness.Aux[LabelK]].value.name
val featuresCol: String = implicitly[Witness.Aux[FeaturesK]].value.name
val weightCol: Option[String] = None
}
}

implicit def checkLinearInputs2[
Inputs,
InputsRec <: HList,
LabelK <: Symbol,
FeaturesK <: Symbol,
WeightK <: Symbol](
implicit
i0: LabelledGeneric.Aux[Inputs, InputsRec],
i1: Length.Aux[InputsRec, Nat._3],
i2: SelectorByValue.Aux[InputsRec, Vector, FeaturesK],
i3: Witness.Aux[FeaturesK],
i4: SelectorByValue.Aux[InputsRec, Double, LabelK],
i5: Witness.Aux[LabelK],
i6: SelectorByValue.Aux[InputsRec, Float, WeightK],
i7: Witness.Aux[WeightK]
): LinearInputsChecker[Inputs] = {
new LinearInputsChecker[Inputs] {
val labelCol: String = implicitly[Witness.Aux[LabelK]].value.name
val featuresCol: String = implicitly[Witness.Aux[FeaturesK]].value.name
val weightCol: Option[String] = Some(implicitly[Witness.Aux[WeightK]].value.name)
}
}

}
16 changes: 16 additions & 0 deletions ml/src/main/scala/frameless/ml/params/linears/LossStrategy.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package frameless
package ml
package params
package linears
/**
* <a href="https://en.wikipedia.org/wiki/Mean_squared_error">SquaredError</a> measures the average of the squares of the errors—that is,
* the average squared difference between the estimated values and what is estimated.
*
* <a href="https://en.wikipedia.org/wiki/Huber_loss">Huber Loss</a> loss function less sensitive to outliers in data than the
* squared error loss
*/
sealed abstract class LossStrategy private[ml](val sparkValue: String)
object LossStrategy {
case object SquaredError extends LossStrategy("squaredError")
case object Huber extends LossStrategy("huber")
}
25 changes: 25 additions & 0 deletions ml/src/main/scala/frameless/ml/params/linears/Solver.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package frameless
package ml
package params
package linears

/**
* solver algorithm used for optimization.
* - "l-bfgs" denotes Limited-memory BFGS which is a limited-memory quasi-Newton
* optimization method.
* - "normal" denotes using Normal Equation as an analytical solution to the linear regression
* problem. This solver is limited to `LinearRegression.MAX_FEATURES_FOR_NORMAL_SOLVER`.
* - "auto" (default) means that the solver algorithm is selected automatically.
* The Normal Equations solver will be used when possible, but this will automatically fall
* back to iterative optimization methods when needed.
*
* spark
*/

sealed abstract class Solver private[ml](val sparkValue: String)
object Solver {
case object LBFGS extends Solver("l-bfgs")
case object Auto extends Solver("auto")
case object Normal extends Solver("normal")
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package frameless
package ml
package regression

import frameless.ml.internals.LinearInputsChecker
import frameless.ml.params.linears.{LossStrategy, Solver}
import frameless.ml.{AppendTransformer, TypedEstimator}
import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel}

/**
* <a href="https://en.wikipedia.org/wiki/Linear_regression">Linear Regression</a> linear approach to modelling the relationship
* between a scalar response (or dependent variable) and one or more explanatory variables
*/
final class TypedLinearRegression [Inputs] private[ml](
lr: LinearRegression,
labelCol: String,
featuresCol: String,
weightCol: Option[String]
) extends TypedEstimator[Inputs, TypedLinearRegression.Outputs, LinearRegressionModel] {

val estimatorWithoutWeight : LinearRegression = lr
.setLabelCol(labelCol)
.setFeaturesCol(featuresCol)
.setPredictionCol(AppendTransformer.tempColumnName)

val estimator = if (weightCol.isDefined) estimatorWithoutWeight.setWeightCol(weightCol.get) else estimatorWithoutWeight

def setRegParam(value: Double): TypedLinearRegression[Inputs] = copy(lr.setRegParam(value))
def setFitIntercept(value: Boolean): TypedLinearRegression[Inputs] = copy(lr.setFitIntercept(value))
def setStandardization(value: Boolean): TypedLinearRegression[Inputs] = copy(lr.setStandardization(value))
def setElasticNetParam(value: Double): TypedLinearRegression[Inputs] = copy(lr.setElasticNetParam(value))
def setMaxIter(value: Int): TypedLinearRegression[Inputs] = copy(lr.setMaxIter(value))
def setTol(value: Double): TypedLinearRegression[Inputs] = copy(lr.setTol(value))
def setSolver(value: Solver): TypedLinearRegression[Inputs] = copy(lr.setSolver(value.sparkValue))
def setAggregationDepth(value: Int): TypedLinearRegression[Inputs] = copy(lr.setAggregationDepth(value))
def setLoss(value: LossStrategy): TypedLinearRegression[Inputs] = copy(lr.setLoss(value.sparkValue))
def setEpsilon(value: Double): TypedLinearRegression[Inputs] = copy(lr.setEpsilon(value))

private def copy(newLr: LinearRegression): TypedLinearRegression[Inputs] =
new TypedLinearRegression[Inputs](newLr, labelCol, featuresCol, weightCol)

}

object TypedLinearRegression {
case class Outputs(prediction: Double)
case class Weight(weight: Double)


def apply[Inputs](implicit inputsChecker: LinearInputsChecker[Inputs]): TypedLinearRegression[Inputs] = {
new TypedLinearRegression(new LinearRegression(), inputsChecker.labelCol, inputsChecker.featuresCol, inputsChecker.weightCol)
}
}
13 changes: 13 additions & 0 deletions ml/src/test/scala/frameless/ml/Generators.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package frameless
package ml

import frameless.ml.params.linears.{LossStrategy, Solver}
import frameless.ml.params.trees.FeatureSubsetStrategy
import org.apache.spark.ml.linalg.{Matrices, Matrix, Vector, Vectors}
import org.scalacheck.{Arbitrary, Gen}
Expand Down Expand Up @@ -41,4 +42,16 @@ object Generators {
)
}

implicit val arbLossStrategy: Arbitrary[LossStrategy] = Arbitrary {
Gen.const(LossStrategy.SquaredError)
}

implicit val arbSolver: Arbitrary[Solver] = Arbitrary {
Gen.oneOf(
Gen.const(Solver.LBFGS),
Gen.const(Solver.Auto),
Gen.const(Solver.Normal)
)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package frameless
package ml
package regression

import frameless.ml.params.linears.{LossStrategy, Solver}
import org.apache.spark.ml.linalg._
import org.scalacheck.Arbitrary
import org.scalacheck.Prop._
import org.scalatest.Matchers._
import org.scalatest.{MustMatchers}
import shapeless.test.illTyped

class TypedLinearRegressionTests extends FramelessMlSuite with MustMatchers {

implicit val arbVectorNonEmpty: Arbitrary[Vector] = Arbitrary(Generators.arbVector.arbitrary)

test("fit() returns a correct TypedTransformer") {
val prop = forAll { x2: X2[Double, Vector] =>
val lr = TypedLinearRegression[X2[Double, Vector]]
val ds = TypedDataset.create(Seq(x2))

val model = lr.fit(ds).run()
val pDs = model.transform(ds).as[X3[Double, Vector, Double]]

pDs.select(pDs.col('a), pDs.col('b)).collect.run() == Seq(x2.a -> x2.b)
}
val prop2 = forAll { x2: X2[Vector, Double] =>
val lr = TypedLinearRegression[X2[Vector, Double]]
val ds = TypedDataset.create(Seq(x2))
val model = lr.fit(ds).run()
val pDs = model.transform(ds).as[X3[Vector, Double, Double]]

pDs.select(pDs.col('a), pDs.col('b)).collect.run() == Seq(x2.a -> x2.b)
}

def prop3[A: TypedEncoder: Arbitrary] = forAll { x3: X3[Vector, Double, A] =>
val lr = TypedLinearRegression[X2[Vector, Double]]
val ds = TypedDataset.create(Seq(x3))
val model = lr.fit(ds).run()
val pDs = model.transform(ds).as[X4[Vector, Double, A, Double]]

pDs.select(pDs.col('a), pDs.col('b), pDs.col('c)).collect.run() == Seq((x3.a, x3.b, x3.c))
}

check(prop)
check(prop2)
check(prop3[String])
check(prop3[Double])
}

test("param setting is retained") {
import Generators.{arbLossStrategy, arbSolver}

val prop = forAll { (lossStrategy: LossStrategy, solver: Solver) =>
val lr = TypedLinearRegression[X2[Double, Vector]]
.setAggregationDepth(10)
.setEpsilon(4)
.setFitIntercept(true)
.setLoss(lossStrategy)
.setMaxIter(23)
.setRegParam(1.2)
.setStandardization(true)
.setTol(2.3)
.setSolver(solver)

val ds = TypedDataset.create(Seq(X2(0D, Vectors.dense(0D))))
val model = lr.fit(ds).run()

model.transformer.getAggregationDepth == 10 &&
model.transformer.getEpsilon == 4.0 &&
model.transformer.getLoss == lossStrategy.sparkValue &&
model.transformer.getMaxIter == 23 &&
model.transformer.getRegParam == 1.2 &&
model.transformer.getTol == 2.3 &&
model.transformer.getSolver == solver.sparkValue
}

check(prop)
}

test("create() compiles only with correct inputs") {
illTyped("TypedLinearRegressor.create[Double]()")
illTyped("TypedLinearRegressor.create[X1[Double]]()")
illTyped("TypedLinearRegressor.create[X2[Double, Double]]()")
illTyped("TypedLinearRegressor.create[X3[Vector, Double, Int]]()")
illTyped("TypedLinearRegressor.create[X2[Vector, String]]()")
}

test("TypedLinearRegressor should fit straight line ") {
case class Point(features: Vector, labels: Double)

val ds = Seq(
X2(new DenseVector(Array(1.0)): Vector, 1.0),
X2(new DenseVector(Array(2.0)): Vector, 2.0),
X2(new DenseVector(Array(3.0)): Vector, 3.0),
X2(new DenseVector(Array(4.0)): Vector, 4.0),
X2(new DenseVector(Array(5.0)): Vector, 5.0),
X2(new DenseVector(Array(6.0)): Vector, 6.0)
)

val ds2 = Seq(
X3(new DenseVector(Array(1.0)): Vector,2F, 1.0),
X3(new DenseVector(Array(2.0)): Vector,2F, 2.0),
X3(new DenseVector(Array(3.0)): Vector,2F, 3.0),
X3(new DenseVector(Array(4.0)): Vector,2F, 4.0),
X3(new DenseVector(Array(5.0)): Vector,2F, 5.0),
X3(new DenseVector(Array(6.0)): Vector,2F, 6.0)
)

val tds = TypedDataset.create(ds)

val lr = TypedLinearRegression[X2[Vector, Double]]
.setMaxIter(10)

val model = lr.fit(tds).run()

val tds2 = TypedDataset.create(ds2)

val lr2 = TypedLinearRegression[X3[Vector, Float, Double]]
.setMaxIter(10)

val model2 = lr2.fit(tds2).run()

model.transformer.coefficients shouldEqual new DenseVector(Array(1.0))
model2.transformer.coefficients shouldEqual new DenseVector(Array(1.0))
}
}

0 comments on commit 542c0c8

Please sign in to comment.