ML Pipeline Models
Model abstract class is a Transformer with the optional Estimator that has produced it (as a transient parent field).
|
1 2 3 4 5 |
model: DataFrame =[predict]=> DataFrame (with predictions) |
|
Note
|
An Estimator is optional and is available only after fit (of an Estimator) has been executed whose result a model is.
|
As a Transformer it takes a DataFrame and transforms it to a result DataFrame with prediction column added.
There are two direct implementations of the Model class that are not directly related to a concrete ML algorithm:
PipelineModel
|
Caution
|
PipelineModel is a private[ml] class.
|
PipelineModel is a Model of Pipeline estimator.
Once fit, you can use the result model as any other models to transform datasets (as DataFrame).
A very interesting use case of PipelineModel is when a Pipeline is made up of Transformer instances.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
// Transformer #1 import org.apache.spark.ml.feature.Tokenizer val tok = new Tokenizer().setInputCol("text") // Transformer #2 import org.apache.spark.ml.feature.HashingTF val hashingTF = new HashingTF().setInputCol(tok.getOutputCol).setOutputCol("features") // Fuse the Transformers in a Pipeline import org.apache.spark.ml.Pipeline val pipeline = new Pipeline().setStages(Array(tok, hashingTF)) val dataset = Seq((0, "hello world")).toDF("id", "text") // Since there's no fitting, any dataset works fine val featurize = pipeline.fit(dataset) // Use the pipelineModel as a series of Transformers scala> featurize.transform(dataset).show(false) +---+-----------+------------------------+--------------------------------+ |id |text |tok_8aec9bfad04a__output|features | +---+-----------+------------------------+--------------------------------+ |0 |hello world|[hello, world] |(262144,[71890,72594],[1.0,1.0])| +---+-----------+------------------------+--------------------------------+ |
PredictionModel
PredictionModel is an abstract class to represent a model for prediction algorithms like regression and classification (that have their own specialized models – details coming up below).
PredictionModel is basically a Transformer with predict method to calculate predictions (that end up in prediction column).
PredictionModel belongs to org.apache.spark.ml package.
|
1 2 3 4 5 |
import org.apache.spark.ml.PredictionModel |
The contract of PredictionModel class requires that every custom implementation defines predict method (with FeaturesType type being the type of features).
|
1 2 3 4 5 |
predict(features: FeaturesType): Double |
The direct less-algorithm-specific extensions of the PredictionModel class are:
As a custom Transformer it comes with its own custom transform method.
Internally, transform first ensures that the type of the features column matches the type of the model and adds the prediction column of type Double to the schema of the result DataFrame.
It then creates the result DataFrame and adds the prediction column with a predictUDF function applied to the values of the features column.
|
Caution
|
FIXME A diagram to show the transformation from a dataframe (on the left) and another (on the right) with an arrow to represent the transformation method. |
|
Tip
|
Enable Add the following line to
Refer to Logging. |
ClassificationModel
ClassificationModel is a PredictionModel that transforms a DataFrame with mandatory features, label, and rawPrediction (of type Vector) columns to a DataFrame with prediction column added.
|
Note
|
A Model with ClassifierParams parameters, e.g. ClassificationModel, requires that a DataFrame have the mandatory features, label (of type Double), and rawPrediction (of type Vector) columns.
|
ClassificationModel comes with its own transform (as Transformer) and predict (as PredictionModel).
The following is a list of the known ClassificationModel custom implementations (as of March, 24th):
-
ProbabilisticClassificationModel(theabstractparent of the following classification models)-
DecisionTreeClassificationModel(final) -
LogisticRegressionModel -
NaiveBayesModel -
RandomForestClassificationModel(final)
-
RegressionModel
RegressionModel is a PredictionModel that transforms a DataFrame with mandatory label, features, and prediction columns.
It comes with no own methods or values and so is more a marker abstract class (to combine different features of regression models under one type).
LinearRegressionModel
LinearRegressionModel represents a model produced by a LinearRegression estimator. It transforms the required features column of type org.apache.spark.mllib.linalg.Vector.
|
Note
|
It is a private[ml] class so what you, a developer, may eventually work with is the more general RegressionModel, and since RegressionModel is just a marker no-method abstract class, it is more a PredictionModel.
|
As a linear regression model that extends LinearRegressionParams it expects the following schema of an input DataFrame:
-
label(required) -
features(required) -
prediction -
regParam -
elasticNetParam -
maxIter(Int) -
tol(Double) -
fitIntercept(Boolean) -
standardization(Boolean) -
weightCol(String) -
solver(String)
(New in 1.6.0) LinearRegressionModel is also a MLWritable (so you can save it to a persistent storage for later reuse).
With DEBUG logging enabled (see above) you can see the following messages in the logs when transform is called and transforms the schema.
|
1 2 3 4 5 6 |
16/03/21 06:55:32 DEBUG LinearRegressionModel: Input schema: {"type":"struct","fields":[{"name":"label","type":"double","nullable":false,"metadata":{}},{"name":"features","type":{"type":"udt","class":"org.apache.spark.mllib.linalg.VectorUDT","pyClass":"pyspark.mllib.linalg.VectorUDT","sqlType":{"type":"struct","fields":[{"name":"type","type":"byte","nullable":false,"metadata":{}},{"name":"size","type":"integer","nullable":true,"metadata":{}},{"name":"indices","type":{"type":"array","elementType":"integer","containsNull":false},"nullable":true,"metadata":{}},{"name":"values","type":{"type":"array","elementType":"double","containsNull":false},"nullable":true,"metadata":{}}]}},"nullable":true,"metadata":{}}]} 16/03/21 06:55:32 DEBUG LinearRegressionModel: Expected output schema: {"type":"struct","fields":[{"name":"label","type":"double","nullable":false,"metadata":{}},{"name":"features","type":{"type":"udt","class":"org.apache.spark.mllib.linalg.VectorUDT","pyClass":"pyspark.mllib.linalg.VectorUDT","sqlType":{"type":"struct","fields":[{"name":"type","type":"byte","nullable":false,"metadata":{}},{"name":"size","type":"integer","nullable":true,"metadata":{}},{"name":"indices","type":{"type":"array","elementType":"integer","containsNull":false},"nullable":true,"metadata":{}},{"name":"values","type":{"type":"array","elementType":"double","containsNull":false},"nullable":true,"metadata":{}}]}},"nullable":true,"metadata":{}},{"name":"prediction","type":"double","nullable":false,"metadata":{}}]} |
The implementation of predict for LinearRegressionModel calculates dot(v1, v2) of two Vectors – features and coefficients – (of DenseVector or SparseVector types) of the same size and adds intercept.
|
Note
|
The coefficients Vector and intercept Double are the integral part of LinearRegressionModel as the required input parameters of the constructor.
|
LinearRegressionModel Example
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
// Create a (sparse) Vector import org.apache.spark.mllib.linalg.Vectors val indices = 0 to 4 val elements = indices.zip(Stream.continually(1.0)) val sv = Vectors.sparse(elements.size, elements) // Create a proper DataFrame val ds = sc.parallelize(Seq((0.5, sv))).toDF("label", "features") import org.apache.spark.ml.regression.LinearRegression val lr = new LinearRegression // Importing LinearRegressionModel and being explicit about the type of model value // is for learning purposes only import org.apache.spark.ml.regression.LinearRegressionModel val model: LinearRegressionModel = lr.fit(ds) // Use the same ds - just for learning purposes scala> model.transform(ds).show +-----+--------------------+----------+ |label| features|prediction| +-----+--------------------+----------+ | 0.5|(5,[0,1,2,3,4],[1...| 0.5| +-----+--------------------+----------+ |
RandomForestRegressionModel
RandomForestRegressionModel is a PredictionModel with features column of type Vector.
Interestingly, DataFrame transformation (as part of Transformer contract) uses SparkContext.broadcast to send itself to the nodes in a Spark cluster and calls calculates predictions (as prediction column) on features.
KMeansModel
KMeansModel is a Model of KMeans algorithm.
It belongs to org.apache.spark.ml.clustering package.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
// See spark-mllib-estimators.adoc#KMeans val kmeans: KMeans = ??? val trainingDF: DataFrame = ??? val kmModel = kmeans.fit(trainingDF) // Know the cluster centers scala> kmModel.clusterCenters res0: Array[org.apache.spark.mllib.linalg.Vector] = Array([0.1,0.3], [0.1,0.1]) val inputDF = Seq((0.0, Vectors.dense(0.2, 0.4))).toDF("label", "features") scala> kmModel.transform(inputDF).show(false) +-----+---------+----------+ |label|features |prediction| +-----+---------+----------+ |0.0 |[0.2,0.4]|0 | +-----+---------+----------+ |
spark技术分享