ML Pipeline Models
Model
abstract class is a Transformer with the optional Estimator that has produced it (as a transient parent
field).
1 2 3 4 5 |
model: DataFrame =[predict]=> DataFrame (with predictions) |
Note
|
An Estimator is optional and is available only after fit (of an Estimator) has been executed whose result a model is.
|
As a Transformer
it takes a DataFrame
and transforms it to a result DataFrame
with prediction
column added.
There are two direct implementations of the Model
class that are not directly related to a concrete ML algorithm:
PipelineModel
Caution
|
PipelineModel is a private[ml] class.
|
PipelineModel
is a Model
of Pipeline estimator.
Once fit, you can use the result model as any other models to transform datasets (as DataFrame
).
A very interesting use case of PipelineModel
is when a Pipeline
is made up of Transformer instances.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
// Transformer #1 import org.apache.spark.ml.feature.Tokenizer val tok = new Tokenizer().setInputCol("text") // Transformer #2 import org.apache.spark.ml.feature.HashingTF val hashingTF = new HashingTF().setInputCol(tok.getOutputCol).setOutputCol("features") // Fuse the Transformers in a Pipeline import org.apache.spark.ml.Pipeline val pipeline = new Pipeline().setStages(Array(tok, hashingTF)) val dataset = Seq((0, "hello world")).toDF("id", "text") // Since there's no fitting, any dataset works fine val featurize = pipeline.fit(dataset) // Use the pipelineModel as a series of Transformers scala> featurize.transform(dataset).show(false) +---+-----------+------------------------+--------------------------------+ |id |text |tok_8aec9bfad04a__output|features | +---+-----------+------------------------+--------------------------------+ |0 |hello world|[hello, world] |(262144,[71890,72594],[1.0,1.0])| +---+-----------+------------------------+--------------------------------+ |
PredictionModel
PredictionModel
is an abstract class to represent a model for prediction algorithms like regression and classification (that have their own specialized models – details coming up below).
PredictionModel
is basically a Transformer with predict
method to calculate predictions (that end up in prediction
column).
PredictionModel
belongs to org.apache.spark.ml
package.
1 2 3 4 5 |
import org.apache.spark.ml.PredictionModel |
The contract of PredictionModel
class requires that every custom implementation defines predict
method (with FeaturesType
type being the type of features
).
1 2 3 4 5 |
predict(features: FeaturesType): Double |
The direct less-algorithm-specific extensions of the PredictionModel
class are:
As a custom Transformer
it comes with its own custom transform
method.
Internally, transform
first ensures that the type of the features
column matches the type of the model and adds the prediction
column of type Double
to the schema of the result DataFrame
.
It then creates the result DataFrame
and adds the prediction
column with a predictUDF
function applied to the values of the features
column.
Caution
|
FIXME A diagram to show the transformation from a dataframe (on the left) and another (on the right) with an arrow to represent the transformation method. |
Tip
|
Enable Add the following line to
Refer to Logging. |
ClassificationModel
ClassificationModel
is a PredictionModel that transforms a DataFrame
with mandatory features
, label
, and rawPrediction
(of type Vector) columns to a DataFrame with prediction
column added.
Note
|
A Model with ClassifierParams parameters, e.g. ClassificationModel , requires that a DataFrame have the mandatory features , label (of type Double ), and rawPrediction (of type Vector) columns.
|
ClassificationModel
comes with its own transform
(as Transformer) and predict
(as PredictionModel).
The following is a list of the known ClassificationModel
custom implementations (as of March, 24th):
-
ProbabilisticClassificationModel
(theabstract
parent of the following classification models)-
DecisionTreeClassificationModel
(final
) -
LogisticRegressionModel
-
NaiveBayesModel
-
RandomForestClassificationModel
(final
)
-
RegressionModel
RegressionModel
is a PredictionModel that transforms a DataFrame
with mandatory label
, features
, and prediction
columns.
It comes with no own methods or values and so is more a marker abstract class (to combine different features of regression models under one type).
LinearRegressionModel
LinearRegressionModel
represents a model produced by a LinearRegression estimator. It transforms the required features
column of type org.apache.spark.mllib.linalg.Vector.
Note
|
It is a private[ml] class so what you, a developer, may eventually work with is the more general RegressionModel , and since RegressionModel is just a marker no-method abstract class, it is more a PredictionModel.
|
As a linear regression model that extends LinearRegressionParams
it expects the following schema of an input DataFrame
:
-
label
(required) -
features
(required) -
prediction
-
regParam
-
elasticNetParam
-
maxIter
(Int) -
tol
(Double) -
fitIntercept
(Boolean) -
standardization
(Boolean) -
weightCol
(String) -
solver
(String)
(New in 1.6.0) LinearRegressionModel
is also a MLWritable
(so you can save it to a persistent storage for later reuse).
With DEBUG
logging enabled (see above) you can see the following messages in the logs when transform
is called and transforms the schema.
1 2 3 4 5 6 |
16/03/21 06:55:32 DEBUG LinearRegressionModel: Input schema: {"type":"struct","fields":[{"name":"label","type":"double","nullable":false,"metadata":{}},{"name":"features","type":{"type":"udt","class":"org.apache.spark.mllib.linalg.VectorUDT","pyClass":"pyspark.mllib.linalg.VectorUDT","sqlType":{"type":"struct","fields":[{"name":"type","type":"byte","nullable":false,"metadata":{}},{"name":"size","type":"integer","nullable":true,"metadata":{}},{"name":"indices","type":{"type":"array","elementType":"integer","containsNull":false},"nullable":true,"metadata":{}},{"name":"values","type":{"type":"array","elementType":"double","containsNull":false},"nullable":true,"metadata":{}}]}},"nullable":true,"metadata":{}}]} 16/03/21 06:55:32 DEBUG LinearRegressionModel: Expected output schema: {"type":"struct","fields":[{"name":"label","type":"double","nullable":false,"metadata":{}},{"name":"features","type":{"type":"udt","class":"org.apache.spark.mllib.linalg.VectorUDT","pyClass":"pyspark.mllib.linalg.VectorUDT","sqlType":{"type":"struct","fields":[{"name":"type","type":"byte","nullable":false,"metadata":{}},{"name":"size","type":"integer","nullable":true,"metadata":{}},{"name":"indices","type":{"type":"array","elementType":"integer","containsNull":false},"nullable":true,"metadata":{}},{"name":"values","type":{"type":"array","elementType":"double","containsNull":false},"nullable":true,"metadata":{}}]}},"nullable":true,"metadata":{}},{"name":"prediction","type":"double","nullable":false,"metadata":{}}]} |
The implementation of predict
for LinearRegressionModel
calculates dot(v1, v2)
of two Vectors – features
and coefficients
– (of DenseVector
or SparseVector
types) of the same size and adds intercept
.
Note
|
The coefficients Vector and intercept Double are the integral part of LinearRegressionModel as the required input parameters of the constructor.
|
LinearRegressionModel Example
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
// Create a (sparse) Vector import org.apache.spark.mllib.linalg.Vectors val indices = 0 to 4 val elements = indices.zip(Stream.continually(1.0)) val sv = Vectors.sparse(elements.size, elements) // Create a proper DataFrame val ds = sc.parallelize(Seq((0.5, sv))).toDF("label", "features") import org.apache.spark.ml.regression.LinearRegression val lr = new LinearRegression // Importing LinearRegressionModel and being explicit about the type of model value // is for learning purposes only import org.apache.spark.ml.regression.LinearRegressionModel val model: LinearRegressionModel = lr.fit(ds) // Use the same ds - just for learning purposes scala> model.transform(ds).show +-----+--------------------+----------+ |label| features|prediction| +-----+--------------------+----------+ | 0.5|(5,[0,1,2,3,4],[1...| 0.5| +-----+--------------------+----------+ |
RandomForestRegressionModel
RandomForestRegressionModel
is a PredictionModel with features
column of type Vector.
Interestingly, DataFrame
transformation (as part of Transformer contract) uses SparkContext.broadcast to send itself to the nodes in a Spark cluster and calls calculates predictions (as prediction
column) on features
.
KMeansModel
KMeansModel
is a Model
of KMeans algorithm.
It belongs to org.apache.spark.ml.clustering
package.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
// See spark-mllib-estimators.adoc#KMeans val kmeans: KMeans = ??? val trainingDF: DataFrame = ??? val kmModel = kmeans.fit(trainingDF) // Know the cluster centers scala> kmModel.clusterCenters res0: Array[org.apache.spark.mllib.linalg.Vector] = Array([0.1,0.3], [0.1,0.1]) val inputDF = Seq((0.0, Vectors.dense(0.2, 0.4))).toDF("label", "features") scala> kmModel.transform(inputDF).show(false) +-----+---------+----------+ |label|features |prediction| +-----+---------+----------+ |0.0 |[0.2,0.4]|0 | +-----+---------+----------+ |