代码9.1


//导入必要的包
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
object EstimatorTransformerParamExample {
  def main(args: Array[String]): Unit = {
	//SparkSession.builder创建一个SparkSession实例,设置运行模式等配置信息
    val spark = SparkSession.builder
      .master("local")
      .appName("EstimatorTransformerParamExample")
      .getOrCreate()
	//创建训练集。createDataFrame()方法根据元组(label,vector)的序列,
	//创建DataFrame,
	//toDF()方法设置DataFrame的两列数据的列名,分别为“label”、“features”。
    val training = spark.createDataFrame(Seq(
      (1.0, Vectors.dense(0.0, 1.1, 0.1)),
      (0.0, Vectors.dense(2.0, 1.0, -1.0)),
      (0.0, Vectors.dense(2.0, 1.3, 1.0)),
      (1.0, Vectors.dense(0.0, 1.2, -0.5))
    )).toDF("label", "features")
    //创建LogisticRegression(Estimator)实例lr
    val lr = new LogisticRegression()
    //打印参数、文档、默认值
	println(s"LogisticRegression parameters:\n ${lr.explainParams()}\n")
	//设置参数的第一种方式:setter方法
    lr.setMaxIter(10)
      .setRegParam(0.01)
	//training输入lr的方法fit()中,训练生成LogisticRegession模型model,属于
	//Transformer
	//设置的参数存储在lr中
	val model1 = lr.fit(training)
	//打印lr在fit()操作中使用的参数。打印结果中,参数以(名称,值)键值对的形式呈
	//现,其中LogisticRegression实例的名称有唯一的ID
    println(s"Model 1 was fit using parameters: ${model1.parent.extractParamMap}")
	//设置参数的第二种方法:ParamMap方法
	//ParamMap通过参数映射的方式,改变最大迭代次数
    val paramMap = ParamMap(lr.maxIter -> 20)
      .put(lr.maxIter, 30)  // 设置参数值,修改lr之前设置的参数值
      .put(lr.regParam -> 0.1, lr.threshold -> 0.55)  //设置多个参数值
    // ParamMap也可以组合设置参数值
	val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability")  
    val paramMapCombined = paramMap ++ paramMap2
    //使用paramMapCombined参数,训练生成新模型model2,属于Transformer
    val model2 = lr.fit(training, paramMapCombined)
    println(s"Model 2 was fit using parameters: ${model2.parent.extractParamMap}")
    //创建测试集
	val test = spark.createDataFrame(Seq(
      (1.0, Vectors.dense(-1.0, 1.5, 1.3)),
      (0.0, Vectors.dense(3.0, 2.0, -0.1)),
      (1.0, Vectors.dense(0.0, 2.2, -1.5))
	)).toDF("label", "features")
	//model2调用transform()方法,输入测试集test,输出带有预测列的新的DataFrame
    model2.transform(test)
      .select("features", "label", "myProbability", "prediction")
      .collect()
      .foreach{case Row(features: Vector, label: Double, prob: Vector, prediction: Double) => println(s"($features, $label) -> prob=$prob, prediction=$prediction")
      }
    spark.stop()
  }
}
	

代码9.2

//导入必要的包
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
object PipelineExample {
  def main(args: Array[String]): Unit = {
	//SparkSession.builder创建实例spark,并设置运行模式等配置信息
    val spark = SparkSession.builder
      .master("local")
      .appName("PipelineExample")
      .getOrCreate()
    //创建训练集。createDataFrame()方法创建,列名为id,text,label
    val training = spark.createDataFrame(Seq(
      (0L, "a b c d e spark", 1.0),
      (1L, "b d", 0.0),
      (2L, "spark f g h", 1.0),
      (3L, "hadoop mapreduce", 0.0)
    )).toDF("id", "text", "label")
    //实例化三个stage:Tokenizer、HashingTF、LogisticRegression,设置参数
    val tokenizer = new Tokenizer()
      .setInputCol("text")
      .setOutputCol("words")
    val hashingTF = new HashingTF()
      .setNumFeatures(1000)
      .setInputCol(tokenizer.getOutputCol)
      .setOutputCol("features")
    val lr = new LogisticRegression()
      .setMaxIter(10)
      .setRegParam(0.001)
    //实例化Pipeline,设置stages序列为Array(tokenizer,hashingTF,lr)
    val pipeline = new Pipeline()
      .setStages(Array(tokenizer, hashingTF, lr))
    //pipeline调用fit()方法,输入训练集数据,生成pipelineModel(Transformer)。
    val model = pipeline.fit(training)
    //保存PipelineModel到本地路径
    model.write.overwrite().save("/tmp/spark-logistic-regression-model")
    //保存未训练的Pipeline实例(Estimator)到本地路径
    pipeline.write.overwrite().save("/tmp/unfit-lr-model")
    //加载保存本地路径的PipelineModel
    val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")
    //创建测试集。createDataFrame()方法创建,列名为id和text
    val test = spark.createDataFrame(Seq(
      (4L, "spark i j k"),
      (5L, "l m n"),
      (6L, "spark hadoop spark"),
      (7L, "apache hadoop")
    )).toDF("id", "text")
    //model调用transform()方法,输入测试集test,输出带有预测列的新的DataFrame
    model.transform(test)
      .select("id", "text", "probability", "prediction")
      .collect()
      .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
        println(s"($id, $text) --> prob=$prob, prediction=$prediction")
      }
    spark.stop()
  }
}


代码9.3

//导入必要的包
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
import org.apache.spark.sql.SparkSession
object TfIdfExample {
  	def main(args: Array[String]) {
		//SparkSession.builder创建实例,并设置运行模式等配置信息
    	val spark = SparkSession.builder
		  .master("local")
		  .appName("TfIdfExample")
		  .getOrCreate()
		//创建数据集。createDataFrame()方法创建,列名为label和sentence
		val sentenceData = spark.createDataFrame(Seq(
		  (0.0, "Hi I heard about Spark"),
		  (0.0, "I wish Java could use case classes"),
		  (1.0, "Logistic regression models are neat")
		)).toDF("label", "sentence")
		//创建Tokenizer(Transformer)实例,
		//并设置输入列(操作列)名为sentence,输出列名为words
		val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
		//调用tokenizer的transform()方法,生成包含words列的新的DataFrame
		val wordsData = tokenizer.transform(sentenceData)
		//创建HashingTF(Transformer)实例,
		//并设置输入列(操作列)名为words,输出列名为rawFeatures,维数为20
			val hashingTF = new HashingTF()
			  .setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(20)
			//调用hashingTF的transform()方法,生成包含rawFeatures列的新的DataFrame
			val featurizedData = hashingTF.transform(wordsData)
		//创建IDF(Estimator)实例,
		//并设置输入列(操作列)名为rawFeatures,输出列名为features
		val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
		//调用idf的fit()方法,训练生成IDFModel(Transformer)
		val idfModel = idf.fit(featurizedData)
		//调用IDFModel的transform()方法,生成包含features列的新的DataFrame
		val rescaledData = idfModel.transform(featurizedData)
		//打印输出结果的label列和feature列,show()方法默认为true,只显示前20个字符
		rescaledData.select("label", "features").show(false)
		spark.stop()
  	}
}


代码9.4

//导入必要的包
import org.apache.spark.ml.feature.Word2Vec
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
object Word2VecExample {
  def main(args: Array[String]) {
	//SparkSession.builder创建实例,设置运行模式等配置信息
    val spark = SparkSession.builder
      .master("local")
      .appName("Word2Vec example")
      .getOrCreate()
	//数据集创建DataFrame,列名为text
    val documentDF = spark.createDataFrame(Seq(
      "Hi I heard about Spark".split(" "),
      "I wish Java could use case classes".split(" "),
      "Logistic regression models are neat".split(" ")
).map(Tuple1.apply)).toDF("text")
//创建Word2vec(Estimator)实例,
//并设置输入列(操作列)名为text,输出列名为result,向量维数为3,
//setMinCount(0)设置为0,词频少于设定值(0)的词会被丢弃
    val word2Vec = new Word2Vec()
      .setInputCol("text")
      .setOutputCol("result")
      .setVectorSize(3)
      .setMinCount(0)
	//word2Vec调用fit()方法,生成Word2VecModel(Transformer)
val model = word2Vec.fit(documentDF)
//model调用transform()方法,将文档转变为向量。
val result = model.transform(documentDF)
//打印输出结果
    result.collect().foreach { case Row(text: Seq[_], features: Vector) =>
      println(s"Text: [${text.mkString(", ")}] => \nVector: $features\n") }
    spark.stop()
  }
}


代码9.5

//导入必要的包
import org.apache.spark.ml.feature.Binarizer
import org.apache.spark.sql.SparkSession
object BinarizerExample {
  	def main(args: Array[String]): Unit = {
		//SparkSession.builder创建实例,并设置运行模式等配置信息
		val spark = SparkSession.builder
		  .master("local")
		  .appName("BinarizerExample")
		  .getOrCreate()
		//创建数据集,createDataFrame方法创建DataFrame,列名为id和feature
		val data = Array((0, 0.1), (1, 0.8), (2, 0.2))
		val dataFrame = spark.createDataFrame(data).toDF("id", "feature")
		//创建Binarizer(Transformer)实例,
		//设置输入列(操作列)名为feature,输出列为binarized_feature,阈值为0.5
		val binarizer: Binarizer = new Binarizer()
			.setInputCol("feature")
			.setOutputCol("binarized_feature")
			.setThreshold(0.5)
		//调用binarizer的transform()方法,生成结果。
		val binarizedDataFrame = binarizer.transform(dataFrame)
		//打印输出阈值以及二值化以后的结果
		println(s"Binarizer output with Threshold = ${binarizer.getThreshold}")
    	binarizedDataFrame.show()
    	spark.stop()
   }
}


代码9.6

//导入必要的包
import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
object MinMaxScalerExample {
  	def main(args: Array[String]): Unit = {
		//SparkSession.builder创建实例,并设置运行模式等配置信息
		val spark = SparkSession.builder
		  .master("local")
		  .appName("MinMaxScalerExample")
		  .getOrCreate()
		//创建数据集,createDataFrame()方法创建DataFrame,列名为id和features
		val dataFrame = spark.createDataFrame(Seq(
		  (0, Vectors.dense(1.0, 0.1, -1.0)),
		  (1, Vectors.dense(2.0, 1.1, 1.0)),
		  (2, Vectors.dense(3.0, 10.1, 3.0))
		)).toDF("id", "features")
		//创建MinMaxScaler(Estimator)实例,
		//设置输入列(操作列)名为features,输出列为scaledFeatures
    	val scaler = new MinMaxScaler()
      		.setInputCol("features")
      		.setOutputCol("scaledFeatures")
		//调用scaler的fit()方法,生成MinMaxScalerModel(Transformer)
		val scalerModel = scaler.fit(dataFrame)
		//调用scalerModel的transform()方法,生成结果
		val scaledData = scalerModel.transform(dataFrame)
		//打印最小值、最大值以及最大最小值缩放后的结果。
			println(s"Features scaled to range: [${scaler.getMin}, ${scaler.getMax}]")
			scaledData.select("features", "scaledFeatures").show()
			spark.stop()
  	}
}


代码9.7


//导入必要的包
import java.util.Arrays
import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NumericAttribute}
import org.apache.spark.ml.feature.VectorSlicer
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.StructType
object VectorSlicerExample {
  	def main(args: Array[String]): Unit = {
		//SparkSession.builder创建实例,并设置运行模式等配置信息
		val spark = SparkSession.builder
		  .master("local")
		  .appName("VectorSlicerExample")
		  .getOrCreate()
		//创建行向量数组
		//向量分为稠密向量(dense vector)和稀疏向量(sparse vector)
		//其中稀疏向量创建有两种方式:
		//Vector.sparse(向量大小,索引数组,与索引数组对应的数值数组)
		//Vector.sparse(向量大小,Seq((索引,数值),(索引,数值),…,(索引,数值))
		val data = Arrays.asList(
		//等同于Vectors.dense(-2.0,2.3,0)
			  Row(Vectors.sparse(3, Seq((0, -2.0), (1, 2.3)))),
			  Row(Vectors.dense(-2.0, 2.3, 0.0))
		)
		//设置字符串索引
		val defaultAttr = NumericAttribute.defaultAttr
		val attrs = Array("f1", "f2", "f3").map(defaultAttr.withName)
		val attrGroup = new AttributeGroup("userFeatures",
		attrs.asInstanceOf[Array[Attribute]])
		//创建数据集。createDataFrame()方法创建DataFrame,设置列名为userFeatures
		val dataset = spark.createDataFrame(data, 
		StructType(Array(attrGroup.toStructField())))
		//创建VectorSlicer实例,
		//设置输入列(操作列)名为userFeatures,输出列名为features
		val slicer = new VectorSlicer().setInputCol("userFeatures").setOutputCol("features")
		//设置整数索引1,即取整数索引为1的数值(整数索引从0开始)
		//设置字符串索引为"f3",即取字符串索引为"f3"的数值
		slicer.setIndices(Array(1)).setNames(Array("f3"))
		//调用slicer的transform()方法,生成索引结果。
		val output = slicer.transform(dataset)
		output.show(false)
		spark.stop()
   }
}


代码9.8

import org.apache.spark.ml.feature.RFormula
import org.apache.spark.sql.SparkSession
object RFormulaExample {
  	def main(args: Array[String]): Unit = {
		//SparkSession.builder创建实例,设置运行模式等配置信息
		val spark = SparkSession.builder
		  .master("local")
		  .appName("RFormulaExample")
		  .getOrCreate()
		//创建数据集。createDataFrame()方法创建数据集,列名为id、country、hour、clicked
		val dataset = spark.createDataFrame(Seq(
		  (7, "US", 18, 1.0),
		  (8, "CA", 12, 0.0),
		  (9, "NZ", 15, 0.0)
		)).toDF("id", "country", "hour", "clicked")
		//创建RFormula实例
		//设置R公式为:clicked ~ country + hour,特征列名为features,标签列为label
		val formula = new RFormula()
			.setFormula("clicked ~ country + hour")
			.setFeaturesCol("features")
			.setLabelCol("label")
		//调用formula的fit方法生成RFormulaModel(Transformer),
		//再调用transform()方法,生成结果
		val output = formula.fit(dataset).transform(dataset)
		//打印输出结果
		output.select("features", "label").show()
		spark.stop()
  }
}


代码9.9

//导入必要的包
import org.apache.spark.ml.feature.ChiSqSelector
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
object ChiSqSelectorExample {
  	def main(args: Array[String]) {
		//SparkSession.builder创建实例,设置运行模式等配置信息
	  	val spark = SparkSession.builder
		  .master("local")
		  .appName("ChiSqSelectorExample")
		  .getOrCreate()
		//隐式将RDD转换成DataFrame需要的包
		import spark.implicits._
		val data = Seq(
		  (7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
		  (8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
		  (9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
		)
		//隐式创建DataFrame,列名为id、feature、clicked。
		val df = spark.createDataset(data).toDF("id", "features", "clicked")
		//创建ChiSqSelector(Estimator)实例,
		//设置提取预测能力最强的第一个特征
		//设置特征列名为features,设置标签列名为clicked,设置输出列名为selectedFeatures
		val selector = new ChiSqSelector()
			.setNumTopFeatures(1)
		    .setFeaturesCol("features")
			.setLabelCol("clicked")
			.setOutputCol("selectedFeatures")
		//调用selector的fit()方法,生成ChiSqSelectorModel(Transformer),
		//再调用transform()方法,生成结果
		val result = selector.fit(df).transform(df)
		//打印结果
		println(s"ChiSqSelector output with top ${selector.getNumTopFeatures} features 
		selected")
		result.show()
		spark.stop()
  }
}


代码9.10

//导入必要的包
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
object ModelSelectionViaCrossValidationExample {
  	def main(args: Array[String]): Unit = {
		//SparkSession.builed创建实例,设置运行模式等配置信息
    	val spark = SparkSession.builder
     	 .master("local")
      	.appName("ModelSelectionViaCrossValidationExample")
      	.getOrCreate()
		//创建训练集。createDataFrame()创建DataFrame,列名为id,text,label
    	val training = spark.createDataFrame(Seq(
		  (0L, "a b c d e spark", 1.0),
		  (1L, "b d", 0.0),
		  (2L, "spark f g h", 1.0),
		  (3L, "hadoop mapreduce", 0.0),
		  (4L, "b spark who", 1.0),
		  (5L, "g d a y", 0.0),
		  (6L, "spark fly", 1.0),
		  (7L, "was mapreduce", 0.0),
		  (8L, "e spark program", 1.0),
		  (9L, "a e c l", 0.0),
		  (10L, "spark compile", 1.0),
		  (11L, "hadoop software", 0.0)
		)).toDF("id", "text", "label")
		//创建Pipeline实例,包含三个stage:Tokenizer、HashingTF、LogisticRegression
    	val tokenizer = new Tokenizer()
		  .setInputCol("text")
		  .setOutputCol("words")
    	val hashingTF = new HashingTF()
		  .setInputCol(tokenizer.getOutputCol)
		  .setOutputCol("features")
    	val lr = new LogisticRegression()
      	  .setMaxIter(10)
    	val pipeline = new Pipeline()
      	  .setStages(Array(tokenizer, hashingTF, lr))
		//创建ParamGridBuilder实例,创建参数网格
		//设置hashingTF.numFeatures有三个可能值,lr.regParam有2个可能值
		//参数网格将有3 * 2 = 6个参数组合设置供CrossValidator选择。
		val paramGrid = new ParamGridBuilder()
         .addGrid(hashingTF.numFeatures, Array(10, 100, 1000))
         .addGrid(lr.regParam, Array(0.1, 0.01))
         .build()
        //创建CrossValidator(Estimator)实例
        //将Pipeline实例“嵌入”交叉验证实例中,Pipeline的中的任务都可以使用参数网格;
        //BinaryClassificationEvaluator使用的默认的评估指标是AUC(areaUnderROC)。
        val cv = new CrossValidator()
		  .setEstimator(pipeline)
		  .setEvaluator(new BinaryClassificationEvaluator)
		  .setEstimatorParamMaps(paramGrid)
		  .setNumFolds(2)
      	  .setParallelism(2)
    	//调用cv的fit()方法,训练生成CrossValidatorModel(Transformer)。
		//得到最优参数集。
		val cvModel = cv.fit(training)
		//创建测试集。createDataFrame()创建DataFrame,仅包含两列,分别为id和text
		//模型预测产生label列
    	val test = spark.createDataFrame(Seq(
		  (4L, "spark i j k"),
		  (5L, "l m n"),
		  (6L, "mapreduce spark"),
		  (7L, "apache hadoop")
		)).toDF("id", "text")
		//调用cvModel的transform()方法,生成probability列和prediction列。
		//打印输出结果。
    	cvModel.transform(test)
		  .select("id", "text", "probability", "prediction")
		  .collect()
		  .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
			println(s"($id, $text) --> prob=$prob, prediction=$prediction")
      	   }
		//打印输出lrModel最优参数值。在Pipeline中,LogisticRegressionModel的索引值为2
		val bestModel = cvModel.bestModel.asInstanceOf[PipelineModel]
		val lrModel = bestModel.stages(2).asInstanceOf[LogisticRegressionModel]
		println(lrModel.getRegParam)
		println(lrModel.numFeatures)
		spark.stop()
    }
}


代码9.11

//导入必要的包
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.sql.SparkSession
object ModelSelectionViaTrainValidationSplitExample {
  	def main(args: Array[String]): Unit = {
    	//SparkSession.builder创建实例,设置运行模式等配置信息
    	val spark = SparkSession.builder
      	  .master("local")
          .appName("ModelSelectionViaTrainValidationSplitExample")
          .getOrCreate()
    	//创建数据集。加载本地路径文件,按"libsvm"类型文件读取,创建DataFrame。
    	val data = spark.read.format("libsvm")
          .load("data/mllib/sample_linear_regression_data.txt")
    	//使用randomSplit方法,将DataFrame分为训练集和测试集
        val Array(training, test) = data.randomSplit(Array(0.9, 0.1), seed = 12345)
		//创建LinearRegression(Estimator)实例。设置最大迭代次数
		val lr = new LinearRegression()
     	  .setMaxIter(10)
		//创建ParamGridBuilder实例,创建参数网格
		//TrainValidationSplit将使用Evaluator尝试所有参数值的组合,并确定使用最佳模型
		val paramGrid = new ParamGridBuilder()
		  .addGrid(lr.regParam, Array(0.1, 0.01))
		  .addGrid(lr.fitIntercept)
		  .addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))
		  .build()
		//创建TrainValidationSplit实例
		//TrainValidationSplit需要设置的参数包括:
		//一个Estimator,一组Estimator的ParamMap,一个Evaluator
		val trainValidationSplit = new TrainValidationSplit()
		  .setEstimator(lr)
		  .setEvaluator(new RegressionEvaluator)
		  .setEstimatorParamMaps(paramGrid)
		  //设置80%的数据用于训练,20%的数据用于验证
		  .setTrainRatio(0.8)
		  //设置最多并行评估两个参数设置
      	  .setParallelism(2)
		//调用TrainValidationSplit(Estimator)的fit()方法,
		//训练生成TrainValidationSplitModel(Transformer)
		val model = trainValidationSplit.fit(training)
		//使用最优参数组合的模型预测测试集的结果,并打印输出。
    	model.transform(test)
      	  .select("features", "label", "prediction")
          .show()
        spark.stop()
     }
  }
}