代码10.1


//导入相关类
import org.apache.spark.ml.classification.NaiveBayes
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.sql.SparkSession
object NaiveBayesExample {
  def main(args: Array[String]): Unit = {
    //创建SparkSession,设置运行环境为本地模式
    val spark = SparkSession
      .builder
      .master("local")
      .appName("NaiveBayesExample")
      .getOrCreate()
    // 把以libsvm格式存储的数据加载为DataFrame
    val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
    //把数据集划分为训练集和测试集(30%用于测试)
    val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3), seed = 1234L)
    // 训练一个朴素贝叶斯模型
    val model = new NaiveBayes()
      .fit(trainingData)
    // 用训练好的模型对测试集进行分类
    val predictions = model.transform(testData)
    predictions.show()
    //比较测试集的预测列和标签列,并计算测试误差
    val evaluator = new MulticlassClassificationEvaluator()
      .setLabelCol("label")
      .setPredictionCol("prediction")
      .setMetricName("accuracy")
    //打印分类的精确度
    val accuracy = evaluator.evaluate(predictions)
    println(s"Test set accuracy = $accuracy")
    // 停止SparkContext
    spark.stop()
  }
}


代码10.2

//导入相关类
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.sql.SparkSession
object LinearRegressionWithElasticNetExample {
  def main(args: Array[String]): Unit = {
    //创建SparkSession,设置运行环境为本地模式
    val spark = SparkSession
      .builder
      .appName("LinearRegressionWithElasticNetExample")
      .master("local")
      .getOrCreate()
    // 把以libsvm格式存储的数据加载为DataFrame
    val training = spark.read.format("libsvm")
      .load("data/mllib/sample_linear_regression_data.txt")
    //新建线性回归示例
    val lr = new LinearRegression()
      .setMaxIter(10) //最大迭代次数10
      .setRegParam(0.3) //正则化参数0.3
      .setElasticNetParam(0.8) //弹性网络参数0.8
    // 训练线性回归模型
    val lrModel = lr.fit(training)
    // 打印线性回归的系数和截距
    println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")
    // 提取训练集上的模型摘要并打印评估指标
    val trainingSummary = lrModel.summary
    //打印迭代次数
    println(s"numIterations: ${trainingSummary.totalIterations}")
    //打印每次迭代的结果
    println(s"objectiveHistory: [${trainingSummary.objectiveHistory.mkString(",")}]")
    //输出残差(label - predicted)
    trainingSummary.residuals.show()
    //打印均方根误差RMSE
    println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")
    //打印R平方系数
    println(s"r2: ${trainingSummary.r2}")
    // 停止SparkContext
    spark.stop()
  }
}


代码10.3

//导入相关类
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
import org.apache.spark.sql.SparkSession

object DecisionTreeClassificationExample {
  	def main(args: Array[String]): Unit = {
		//创建SparkSession,设置运行模式为本地
		val spark = SparkSession
		  .builder
		  .appName("DecisionTreeClassificationExample")
		  .master("local")
		  .getOrCreate()
		//把以libsvm格式存储的数据加载为DataFrame
		val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
		//索引标签,添加元数据到标签列
    	//训练整个数据集来包含所有索引中的标签
		val labelIndexer = new StringIndexer()
		  .setInputCol("label")
		  .setOutputCol("indexedLabel")
		  .fit(data)
		//自动识别分类特征并设置索引
		val featureIndexer = new VectorIndexer()
		  .setInputCol("features")
		  .setOutputCol("indexedFeatures")
		  //将具有超过4个值的特征视为连续的
		  .setMaxCategories(4)
		  .fit(data)
		//把数据划分成训练数据集和测试数据集,30%用作测试
		val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
		//训练一个决策树模型
		val dt = new DecisionTreeClassifier()
		  .setLabelCol("indexedLabel")
		  .setFeaturesCol("indexedFeatures")
		//把索引标签转换回原始标签
		val labelConverter = new IndexToString()
		  .setInputCol("prediction")
		  .setOutputCol("predictedLabel")
		  .setLabels(labelIndexer.labels)
		//把pipeline中的索引和数链接起来
		val pipeline = new Pipeline()
		  .setStages(Array(labelIndexer, featureIndexer, dt, labelConverter))
		//训练决策树模型
		val model = pipeline.fit(trainingData)
		//用测试数据集测试决策树
		val predictions = model.transform(testData)
		//选择要显示的列和总行数(此处为5行)
		predictions.select("predictedLabel", "label", "features").show(5)
		//比较真实值和预测值,并计算误差
		val evaluator = new MulticlassClassificationEvaluator()
		  .setLabelCol("indexedLabel")
		  .setPredictionCol("prediction")
		  .setMetricName("accuracy")
		val accuracy = evaluator.evaluate(predictions)
		println(s"Test Error = ${(1.0 - accuracy)}")
		//打印决策树
		val treeModel = model.stages(2).asInstanceOf[DecisionTreeClassificationModel]
		println(s"Learned classification tree model:\n ${treeModel.toDebugString}")
		//停止SparkContext
		spark.stop()
  	}
}


代码10.4

//导入相关类
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.regression.DecisionTreeRegressionModel
import org.apache.spark.ml.regression.DecisionTreeRegressor
import org.apache.spark.sql.SparkSession
object DecisionTreeRegressionExample {
  	def main(args: Array[String]): Unit = {
		//设置运行环境为本地模式
		val spark = SparkSession
		  .builder
		  .appName("DecisionTreeRegressionExample")
		  .master("local")
		  .getOrCreate()
		// 把以LIBSVM格式存储的数据加载为DataFrame
		val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
		// 自动识别分类特征并设置索引
		val featureIndexer = new VectorIndexer()
		  .setInputCol("features")
		  .setOutputCol("indexedFeatures")
		  //将具有> 4个不同值的特征视为连续的。
		  .setMaxCategories(4)
		  .fit(data)
		// 将数据拆分为训练集和测试集(30%用于测试)
		val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
		// 新建一个决策树模型。
		val dt = new DecisionTreeRegressor()
		  .setLabelCol("label")
		  .setFeaturesCol("indexedFeatures")
		// 链接pipeline中的索引和树
		val pipeline = new Pipeline()
		  .setStages(Array(featureIndexer, dt))
		// 训练决策树模型
		val model = pipeline.fit(trainingData)
		// 用测试集来评估训练好的模型
		val predictions = model.transform(testData)
		// 选择要显示的列和总行数(这里设置为5行)
		predictions.select("prediction", "label", "features").show(5)
		// 比较prediction和label列的数据并计算预测误差
		val evaluator = new RegressionEvaluator()
		  .setLabelCol("label")
		  .setPredictionCol("prediction")
		  .setMetricName("rmse")
		//打印均方根误差
		val rmse = evaluator.evaluate(predictions)
		println(s"Root Mean Squared Error (RMSE) on test data = $rmse")
		//打印生成的决策树
		val treeModel = model.stages(1).asInstanceOf[DecisionTreeRegressionModel]
		println(s"Learned regression tree model:\n ${treeModel.toDebugString}")
		// 停止SparkContext
		spark.stop()
  	}
}


代码10.5

//导入相关类
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.evaluation.ClusteringEvaluator
import org.apache.spark.sql.SparkSession
object KMeansExample {
  	def main(args: Array[String]): Unit = {
		//设置运行环境为本地模式
		val spark = SparkSession
		  .builder
		  .appName(s"${this.getClass.getSimpleName}")
		  .master("local")
		  .getOrCreate()
		// 把以libsvm格式存储加载为DataFrame
		val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
		// 训练一个K-means模型,设置K=2并设置一个随机数种子
		val kmeans = new KMeans().setK(2).setSeed(1L)
		val model = kmeans.fit(dataset)
		// 用训练好的模型对数据集进行聚类
		val predictions = model.transform(dataset)
		// 评估聚类结果
		val evaluator = new ClusteringEvaluator()
		val silhouette = evaluator.evaluate(predictions)
		// 打印通过欧氏距离得出的轮廓系数
		println(s"Silhouette with squared euclidean distance = $silhouette")
		// 打印聚类中心点
		println("Cluster Centers: ")
		model.clusterCenters.foreach(println)
		// 停止SparkContext
		spark.stop()
  	}
}


代码10.6

//导入FPGrowth和SparkSession的包
import org.apache.spark.ml.fpm.FPGrowth
import org.apache.spark.sql.SparkSession
object FPGrowthExample {
  	def main(args: Array[String]): Unit = {
		//SparkSession.builder创建示例,设置运行模式等配置信息
		val spark = SparkSession.builder
		  .master("local")
		  .appName(s"${this.getClass.getSimpleName}")
		  .getOrCreate()
		//隐式将RDD转换为DataFrame需要的包
		import spark.implicits._
		//隐式创建DataFrame,列名为items
		val dataset = spark.createDataset(Seq(
		  "I1 I2 I5",
		  "I2 I4",
		  "I3 I4 I5",
		  "I1 I2 I4 I5",
		  "I4 I5")
		).map(t => t.split(" ")).toDF("items")
		//创建FPGrowth示例,
		//设置Items列(输入列)名为items,最小支持度为0.4,最小置信度为0.6
		val fpgrowth = new FPGrowth().setItemsCol("items")
		  .setMinSupport(0.4).setMinConfidence(0.6)
		//调用fit()方法,生成FPGrowthModel(Transformer)
		val model = fpgrowth.fit(dataset)
		//调用model的freqItemsets.show打印输出频繁项集
		model.freqItemsets.show()
		//调用model的associationRules.show打印关联规则
		model.associationRules.show()
		//调用model的transform方法,生成结果,show方法
		model.transform(dataset).show()
		spark.stop()
  	}
}