代码9.1
//导入必要的包
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
object EstimatorTransformerParamExample {
def main(args: Array[String]): Unit = {
//SparkSession.builder创建一个SparkSession实例,设置运行模式等配置信息
val spark = SparkSession.builder
.master("local")
.appName("EstimatorTransformerParamExample")
.getOrCreate()
//创建训练集。createDataFrame()方法根据元组(label,vector)的序列,
//创建DataFrame,
//toDF()方法设置DataFrame的两列数据的列名,分别为“label”、“features”。
val training = spark.createDataFrame(Seq(
(1.0, Vectors.dense(0.0, 1.1, 0.1)),
(0.0, Vectors.dense(2.0, 1.0, -1.0)),
(0.0, Vectors.dense(2.0, 1.3, 1.0)),
(1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF("label", "features")
//创建LogisticRegression(Estimator)实例lr
val lr = new LogisticRegression()
//打印参数、文档、默认值
println(s"LogisticRegression parameters:\n ${lr.explainParams()}\n")
//设置参数的第一种方式:setter方法
lr.setMaxIter(10)
.setRegParam(0.01)
//training输入lr的方法fit()中,训练生成LogisticRegession模型model,属于
//Transformer
//设置的参数存储在lr中
val model1 = lr.fit(training)
//打印lr在fit()操作中使用的参数。打印结果中,参数以(名称,值)键值对的形式呈
//现,其中LogisticRegression实例的名称有唯一的ID
println(s"Model 1 was fit using parameters: ${model1.parent.extractParamMap}")
//设置参数的第二种方法:ParamMap方法
//ParamMap通过参数映射的方式,改变最大迭代次数
val paramMap = ParamMap(lr.maxIter -> 20)
.put(lr.maxIter, 30) // 设置参数值,修改lr之前设置的参数值
.put(lr.regParam -> 0.1, lr.threshold -> 0.55) //设置多个参数值
// ParamMap也可以组合设置参数值
val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability")
val paramMapCombined = paramMap ++ paramMap2
//使用paramMapCombined参数,训练生成新模型model2,属于Transformer
val model2 = lr.fit(training, paramMapCombined)
println(s"Model 2 was fit using parameters: ${model2.parent.extractParamMap}")
//创建测试集
val test = spark.createDataFrame(Seq(
(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
(0.0, Vectors.dense(3.0, 2.0, -0.1)),
(1.0, Vectors.dense(0.0, 2.2, -1.5))
)).toDF("label", "features")
//model2调用transform()方法,输入测试集test,输出带有预测列的新的DataFrame
model2.transform(test)
.select("features", "label", "myProbability", "prediction")
.collect()
.foreach{case Row(features: Vector, label: Double, prob: Vector, prediction: Double) => println(s"($features, $label) -> prob=$prob, prediction=$prediction")
}
spark.stop()
}
}
代码9.2
//导入必要的包
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
object PipelineExample {
def main(args: Array[String]): Unit = {
//SparkSession.builder创建实例spark,并设置运行模式等配置信息
val spark = SparkSession.builder
.master("local")
.appName("PipelineExample")
.getOrCreate()
//创建训练集。createDataFrame()方法创建,列名为id,text,label
val training = spark.createDataFrame(Seq(
(0L, "a b c d e spark", 1.0),
(1L, "b d", 0.0),
(2L, "spark f g h", 1.0),
(3L, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")
//实例化三个stage:Tokenizer、HashingTF、LogisticRegression,设置参数
val tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
val hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("features")
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.001)
//实例化Pipeline,设置stages序列为Array(tokenizer,hashingTF,lr)
val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))
//pipeline调用fit()方法,输入训练集数据,生成pipelineModel(Transformer)。
val model = pipeline.fit(training)
//保存PipelineModel到本地路径
model.write.overwrite().save("/tmp/spark-logistic-regression-model")
//保存未训练的Pipeline实例(Estimator)到本地路径
pipeline.write.overwrite().save("/tmp/unfit-lr-model")
//加载保存本地路径的PipelineModel
val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")
//创建测试集。createDataFrame()方法创建,列名为id和text
val test = spark.createDataFrame(Seq(
(4L, "spark i j k"),
(5L, "l m n"),
(6L, "spark hadoop spark"),
(7L, "apache hadoop")
)).toDF("id", "text")
//model调用transform()方法,输入测试集test,输出带有预测列的新的DataFrame
model.transform(test)
.select("id", "text", "probability", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
println(s"($id, $text) --> prob=$prob, prediction=$prediction")
}
spark.stop()
}
}
代码9.3
//导入必要的包
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
import org.apache.spark.sql.SparkSession
object TfIdfExample {
def main(args: Array[String]) {
//SparkSession.builder创建实例,并设置运行模式等配置信息
val spark = SparkSession.builder
.master("local")
.appName("TfIdfExample")
.getOrCreate()
//创建数据集。createDataFrame()方法创建,列名为label和sentence
val sentenceData = spark.createDataFrame(Seq(
(0.0, "Hi I heard about Spark"),
(0.0, "I wish Java could use case classes"),
(1.0, "Logistic regression models are neat")
)).toDF("label", "sentence")
//创建Tokenizer(Transformer)实例,
//并设置输入列(操作列)名为sentence,输出列名为words
val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
//调用tokenizer的transform()方法,生成包含words列的新的DataFrame
val wordsData = tokenizer.transform(sentenceData)
//创建HashingTF(Transformer)实例,
//并设置输入列(操作列)名为words,输出列名为rawFeatures,维数为20
val hashingTF = new HashingTF()
.setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(20)
//调用hashingTF的transform()方法,生成包含rawFeatures列的新的DataFrame
val featurizedData = hashingTF.transform(wordsData)
//创建IDF(Estimator)实例,
//并设置输入列(操作列)名为rawFeatures,输出列名为features
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
//调用idf的fit()方法,训练生成IDFModel(Transformer)
val idfModel = idf.fit(featurizedData)
//调用IDFModel的transform()方法,生成包含features列的新的DataFrame
val rescaledData = idfModel.transform(featurizedData)
//打印输出结果的label列和feature列,show()方法默认为true,只显示前20个字符
rescaledData.select("label", "features").show(false)
spark.stop()
}
}
代码9.4
//导入必要的包
import org.apache.spark.ml.feature.Word2Vec
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
object Word2VecExample {
def main(args: Array[String]) {
//SparkSession.builder创建实例,设置运行模式等配置信息
val spark = SparkSession.builder
.master("local")
.appName("Word2Vec example")
.getOrCreate()
//数据集创建DataFrame,列名为text
val documentDF = spark.createDataFrame(Seq(
"Hi I heard about Spark".split(" "),
"I wish Java could use case classes".split(" "),
"Logistic regression models are neat".split(" ")
).map(Tuple1.apply)).toDF("text")
//创建Word2vec(Estimator)实例,
//并设置输入列(操作列)名为text,输出列名为result,向量维数为3,
//setMinCount(0)设置为0,词频少于设定值(0)的词会被丢弃
val word2Vec = new Word2Vec()
.setInputCol("text")
.setOutputCol("result")
.setVectorSize(3)
.setMinCount(0)
//word2Vec调用fit()方法,生成Word2VecModel(Transformer)
val model = word2Vec.fit(documentDF)
//model调用transform()方法,将文档转变为向量。
val result = model.transform(documentDF)
//打印输出结果
result.collect().foreach { case Row(text: Seq[_], features: Vector) =>
println(s"Text: [${text.mkString(", ")}] => \nVector: $features\n") }
spark.stop()
}
}
代码9.5
//导入必要的包
import org.apache.spark.ml.feature.Binarizer
import org.apache.spark.sql.SparkSession
object BinarizerExample {
def main(args: Array[String]): Unit = {
//SparkSession.builder创建实例,并设置运行模式等配置信息
val spark = SparkSession.builder
.master("local")
.appName("BinarizerExample")
.getOrCreate()
//创建数据集,createDataFrame方法创建DataFrame,列名为id和feature
val data = Array((0, 0.1), (1, 0.8), (2, 0.2))
val dataFrame = spark.createDataFrame(data).toDF("id", "feature")
//创建Binarizer(Transformer)实例,
//设置输入列(操作列)名为feature,输出列为binarized_feature,阈值为0.5
val binarizer: Binarizer = new Binarizer()
.setInputCol("feature")
.setOutputCol("binarized_feature")
.setThreshold(0.5)
//调用binarizer的transform()方法,生成结果。
val binarizedDataFrame = binarizer.transform(dataFrame)
//打印输出阈值以及二值化以后的结果
println(s"Binarizer output with Threshold = ${binarizer.getThreshold}")
binarizedDataFrame.show()
spark.stop()
}
}
代码9.6
//导入必要的包
import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
object MinMaxScalerExample {
def main(args: Array[String]): Unit = {
//SparkSession.builder创建实例,并设置运行模式等配置信息
val spark = SparkSession.builder
.master("local")
.appName("MinMaxScalerExample")
.getOrCreate()
//创建数据集,createDataFrame()方法创建DataFrame,列名为id和features
val dataFrame = spark.createDataFrame(Seq(
(0, Vectors.dense(1.0, 0.1, -1.0)),
(1, Vectors.dense(2.0, 1.1, 1.0)),
(2, Vectors.dense(3.0, 10.1, 3.0))
)).toDF("id", "features")
//创建MinMaxScaler(Estimator)实例,
//设置输入列(操作列)名为features,输出列为scaledFeatures
val scaler = new MinMaxScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
//调用scaler的fit()方法,生成MinMaxScalerModel(Transformer)
val scalerModel = scaler.fit(dataFrame)
//调用scalerModel的transform()方法,生成结果
val scaledData = scalerModel.transform(dataFrame)
//打印最小值、最大值以及最大最小值缩放后的结果。
println(s"Features scaled to range: [${scaler.getMin}, ${scaler.getMax}]")
scaledData.select("features", "scaledFeatures").show()
spark.stop()
}
}
代码9.7
//导入必要的包
import java.util.Arrays
import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NumericAttribute}
import org.apache.spark.ml.feature.VectorSlicer
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.StructType
object VectorSlicerExample {
def main(args: Array[String]): Unit = {
//SparkSession.builder创建实例,并设置运行模式等配置信息
val spark = SparkSession.builder
.master("local")
.appName("VectorSlicerExample")
.getOrCreate()
//创建行向量数组
//向量分为稠密向量(dense vector)和稀疏向量(sparse vector)
//其中稀疏向量创建有两种方式:
//Vector.sparse(向量大小,索引数组,与索引数组对应的数值数组)
//Vector.sparse(向量大小,Seq((索引,数值),(索引,数值),…,(索引,数值))
val data = Arrays.asList(
//等同于Vectors.dense(-2.0,2.3,0)
Row(Vectors.sparse(3, Seq((0, -2.0), (1, 2.3)))),
Row(Vectors.dense(-2.0, 2.3, 0.0))
)
//设置字符串索引
val defaultAttr = NumericAttribute.defaultAttr
val attrs = Array("f1", "f2", "f3").map(defaultAttr.withName)
val attrGroup = new AttributeGroup("userFeatures",
attrs.asInstanceOf[Array[Attribute]])
//创建数据集。createDataFrame()方法创建DataFrame,设置列名为userFeatures
val dataset = spark.createDataFrame(data,
StructType(Array(attrGroup.toStructField())))
//创建VectorSlicer实例,
//设置输入列(操作列)名为userFeatures,输出列名为features
val slicer = new VectorSlicer().setInputCol("userFeatures").setOutputCol("features")
//设置整数索引1,即取整数索引为1的数值(整数索引从0开始)
//设置字符串索引为"f3",即取字符串索引为"f3"的数值
slicer.setIndices(Array(1)).setNames(Array("f3"))
//调用slicer的transform()方法,生成索引结果。
val output = slicer.transform(dataset)
output.show(false)
spark.stop()
}
}
代码9.8
import org.apache.spark.ml.feature.RFormula
import org.apache.spark.sql.SparkSession
object RFormulaExample {
def main(args: Array[String]): Unit = {
//SparkSession.builder创建实例,设置运行模式等配置信息
val spark = SparkSession.builder
.master("local")
.appName("RFormulaExample")
.getOrCreate()
//创建数据集。createDataFrame()方法创建数据集,列名为id、country、hour、clicked
val dataset = spark.createDataFrame(Seq(
(7, "US", 18, 1.0),
(8, "CA", 12, 0.0),
(9, "NZ", 15, 0.0)
)).toDF("id", "country", "hour", "clicked")
//创建RFormula实例
//设置R公式为:clicked ~ country + hour,特征列名为features,标签列为label
val formula = new RFormula()
.setFormula("clicked ~ country + hour")
.setFeaturesCol("features")
.setLabelCol("label")
//调用formula的fit方法生成RFormulaModel(Transformer),
//再调用transform()方法,生成结果
val output = formula.fit(dataset).transform(dataset)
//打印输出结果
output.select("features", "label").show()
spark.stop()
}
}
代码9.9
//导入必要的包
import org.apache.spark.ml.feature.ChiSqSelector
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
object ChiSqSelectorExample {
def main(args: Array[String]) {
//SparkSession.builder创建实例,设置运行模式等配置信息
val spark = SparkSession.builder
.master("local")
.appName("ChiSqSelectorExample")
.getOrCreate()
//隐式将RDD转换成DataFrame需要的包
import spark.implicits._
val data = Seq(
(7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
(8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
(9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
)
//隐式创建DataFrame,列名为id、feature、clicked。
val df = spark.createDataset(data).toDF("id", "features", "clicked")
//创建ChiSqSelector(Estimator)实例,
//设置提取预测能力最强的第一个特征
//设置特征列名为features,设置标签列名为clicked,设置输出列名为selectedFeatures
val selector = new ChiSqSelector()
.setNumTopFeatures(1)
.setFeaturesCol("features")
.setLabelCol("clicked")
.setOutputCol("selectedFeatures")
//调用selector的fit()方法,生成ChiSqSelectorModel(Transformer),
//再调用transform()方法,生成结果
val result = selector.fit(df).transform(df)
//打印结果
println(s"ChiSqSelector output with top ${selector.getNumTopFeatures} features
selected")
result.show()
spark.stop()
}
}
代码9.10
//导入必要的包
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
object ModelSelectionViaCrossValidationExample {
def main(args: Array[String]): Unit = {
//SparkSession.builed创建实例,设置运行模式等配置信息
val spark = SparkSession.builder
.master("local")
.appName("ModelSelectionViaCrossValidationExample")
.getOrCreate()
//创建训练集。createDataFrame()创建DataFrame,列名为id,text,label
val training = spark.createDataFrame(Seq(
(0L, "a b c d e spark", 1.0),
(1L, "b d", 0.0),
(2L, "spark f g h", 1.0),
(3L, "hadoop mapreduce", 0.0),
(4L, "b spark who", 1.0),
(5L, "g d a y", 0.0),
(6L, "spark fly", 1.0),
(7L, "was mapreduce", 0.0),
(8L, "e spark program", 1.0),
(9L, "a e c l", 0.0),
(10L, "spark compile", 1.0),
(11L, "hadoop software", 0.0)
)).toDF("id", "text", "label")
//创建Pipeline实例,包含三个stage:Tokenizer、HashingTF、LogisticRegression
val tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
val hashingTF = new HashingTF()
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("features")
val lr = new LogisticRegression()
.setMaxIter(10)
val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))
//创建ParamGridBuilder实例,创建参数网格
//设置hashingTF.numFeatures有三个可能值,lr.regParam有2个可能值
//参数网格将有3 * 2 = 6个参数组合设置供CrossValidator选择。
val paramGrid = new ParamGridBuilder()
.addGrid(hashingTF.numFeatures, Array(10, 100, 1000))
.addGrid(lr.regParam, Array(0.1, 0.01))
.build()
//创建CrossValidator(Estimator)实例
//将Pipeline实例“嵌入”交叉验证实例中,Pipeline的中的任务都可以使用参数网格;
//BinaryClassificationEvaluator使用的默认的评估指标是AUC(areaUnderROC)。
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(new BinaryClassificationEvaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(2)
.setParallelism(2)
//调用cv的fit()方法,训练生成CrossValidatorModel(Transformer)。
//得到最优参数集。
val cvModel = cv.fit(training)
//创建测试集。createDataFrame()创建DataFrame,仅包含两列,分别为id和text
//模型预测产生label列
val test = spark.createDataFrame(Seq(
(4L, "spark i j k"),
(5L, "l m n"),
(6L, "mapreduce spark"),
(7L, "apache hadoop")
)).toDF("id", "text")
//调用cvModel的transform()方法,生成probability列和prediction列。
//打印输出结果。
cvModel.transform(test)
.select("id", "text", "probability", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
println(s"($id, $text) --> prob=$prob, prediction=$prediction")
}
//打印输出lrModel最优参数值。在Pipeline中,LogisticRegressionModel的索引值为2
val bestModel = cvModel.bestModel.asInstanceOf[PipelineModel]
val lrModel = bestModel.stages(2).asInstanceOf[LogisticRegressionModel]
println(lrModel.getRegParam)
println(lrModel.numFeatures)
spark.stop()
}
}
代码9.11
//导入必要的包
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.sql.SparkSession
object ModelSelectionViaTrainValidationSplitExample {
def main(args: Array[String]): Unit = {
//SparkSession.builder创建实例,设置运行模式等配置信息
val spark = SparkSession.builder
.master("local")
.appName("ModelSelectionViaTrainValidationSplitExample")
.getOrCreate()
//创建数据集。加载本地路径文件,按"libsvm"类型文件读取,创建DataFrame。
val data = spark.read.format("libsvm")
.load("data/mllib/sample_linear_regression_data.txt")
//使用randomSplit方法,将DataFrame分为训练集和测试集
val Array(training, test) = data.randomSplit(Array(0.9, 0.1), seed = 12345)
//创建LinearRegression(Estimator)实例。设置最大迭代次数
val lr = new LinearRegression()
.setMaxIter(10)
//创建ParamGridBuilder实例,创建参数网格
//TrainValidationSplit将使用Evaluator尝试所有参数值的组合,并确定使用最佳模型
val paramGrid = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.1, 0.01))
.addGrid(lr.fitIntercept)
.addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))
.build()
//创建TrainValidationSplit实例
//TrainValidationSplit需要设置的参数包括:
//一个Estimator,一组Estimator的ParamMap,一个Evaluator
val trainValidationSplit = new TrainValidationSplit()
.setEstimator(lr)
.setEvaluator(new RegressionEvaluator)
.setEstimatorParamMaps(paramGrid)
//设置80%的数据用于训练,20%的数据用于验证
.setTrainRatio(0.8)
//设置最多并行评估两个参数设置
.setParallelism(2)
//调用TrainValidationSplit(Estimator)的fit()方法,
//训练生成TrainValidationSplitModel(Transformer)
val model = trainValidationSplit.fit(training)
//使用最优参数组合的模型预测测试集的结果,并打印输出。
model.transform(test)
.select("features", "label", "prediction")
.show()
spark.stop()
}
}
}