代码10.1
//导入相关类
import org.apache.spark.ml.classification.NaiveBayes
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.sql.SparkSession
object NaiveBayesExample {
def main(args: Array[String]): Unit = {
//创建SparkSession,设置运行环境为本地模式
val spark = SparkSession
.builder
.master("local")
.appName("NaiveBayesExample")
.getOrCreate()
// 把以libsvm格式存储的数据加载为DataFrame
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
//把数据集划分为训练集和测试集(30%用于测试)
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3), seed = 1234L)
// 训练一个朴素贝叶斯模型
val model = new NaiveBayes()
.fit(trainingData)
// 用训练好的模型对测试集进行分类
val predictions = model.transform(testData)
predictions.show()
//比较测试集的预测列和标签列,并计算测试误差
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("accuracy")
//打印分类的精确度
val accuracy = evaluator.evaluate(predictions)
println(s"Test set accuracy = $accuracy")
// 停止SparkContext
spark.stop()
}
}
代码10.2
//导入相关类
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.sql.SparkSession
object LinearRegressionWithElasticNetExample {
def main(args: Array[String]): Unit = {
//创建SparkSession,设置运行环境为本地模式
val spark = SparkSession
.builder
.appName("LinearRegressionWithElasticNetExample")
.master("local")
.getOrCreate()
// 把以libsvm格式存储的数据加载为DataFrame
val training = spark.read.format("libsvm")
.load("data/mllib/sample_linear_regression_data.txt")
//新建线性回归示例
val lr = new LinearRegression()
.setMaxIter(10) //最大迭代次数10
.setRegParam(0.3) //正则化参数0.3
.setElasticNetParam(0.8) //弹性网络参数0.8
// 训练线性回归模型
val lrModel = lr.fit(training)
// 打印线性回归的系数和截距
println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")
// 提取训练集上的模型摘要并打印评估指标
val trainingSummary = lrModel.summary
//打印迭代次数
println(s"numIterations: ${trainingSummary.totalIterations}")
//打印每次迭代的结果
println(s"objectiveHistory: [${trainingSummary.objectiveHistory.mkString(",")}]")
//输出残差(label - predicted)
trainingSummary.residuals.show()
//打印均方根误差RMSE
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")
//打印R平方系数
println(s"r2: ${trainingSummary.r2}")
// 停止SparkContext
spark.stop()
}
}
代码10.3
//导入相关类
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
import org.apache.spark.sql.SparkSession
object DecisionTreeClassificationExample {
def main(args: Array[String]): Unit = {
//创建SparkSession,设置运行模式为本地
val spark = SparkSession
.builder
.appName("DecisionTreeClassificationExample")
.master("local")
.getOrCreate()
//把以libsvm格式存储的数据加载为DataFrame
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
//索引标签,添加元数据到标签列
//训练整个数据集来包含所有索引中的标签
val labelIndexer = new StringIndexer()
.setInputCol("label")
.setOutputCol("indexedLabel")
.fit(data)
//自动识别分类特征并设置索引
val featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
//将具有超过4个值的特征视为连续的
.setMaxCategories(4)
.fit(data)
//把数据划分成训练数据集和测试数据集,30%用作测试
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
//训练一个决策树模型
val dt = new DecisionTreeClassifier()
.setLabelCol("indexedLabel")
.setFeaturesCol("indexedFeatures")
//把索引标签转换回原始标签
val labelConverter = new IndexToString()
.setInputCol("prediction")
.setOutputCol("predictedLabel")
.setLabels(labelIndexer.labels)
//把pipeline中的索引和数链接起来
val pipeline = new Pipeline()
.setStages(Array(labelIndexer, featureIndexer, dt, labelConverter))
//训练决策树模型
val model = pipeline.fit(trainingData)
//用测试数据集测试决策树
val predictions = model.transform(testData)
//选择要显示的列和总行数(此处为5行)
predictions.select("predictedLabel", "label", "features").show(5)
//比较真实值和预测值,并计算误差
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("indexedLabel")
.setPredictionCol("prediction")
.setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println(s"Test Error = ${(1.0 - accuracy)}")
//打印决策树
val treeModel = model.stages(2).asInstanceOf[DecisionTreeClassificationModel]
println(s"Learned classification tree model:\n ${treeModel.toDebugString}")
//停止SparkContext
spark.stop()
}
}
代码10.4
//导入相关类
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.regression.DecisionTreeRegressionModel
import org.apache.spark.ml.regression.DecisionTreeRegressor
import org.apache.spark.sql.SparkSession
object DecisionTreeRegressionExample {
def main(args: Array[String]): Unit = {
//设置运行环境为本地模式
val spark = SparkSession
.builder
.appName("DecisionTreeRegressionExample")
.master("local")
.getOrCreate()
// 把以LIBSVM格式存储的数据加载为DataFrame
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
// 自动识别分类特征并设置索引
val featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
//将具有> 4个不同值的特征视为连续的。
.setMaxCategories(4)
.fit(data)
// 将数据拆分为训练集和测试集(30%用于测试)
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
// 新建一个决策树模型。
val dt = new DecisionTreeRegressor()
.setLabelCol("label")
.setFeaturesCol("indexedFeatures")
// 链接pipeline中的索引和树
val pipeline = new Pipeline()
.setStages(Array(featureIndexer, dt))
// 训练决策树模型
val model = pipeline.fit(trainingData)
// 用测试集来评估训练好的模型
val predictions = model.transform(testData)
// 选择要显示的列和总行数(这里设置为5行)
predictions.select("prediction", "label", "features").show(5)
// 比较prediction和label列的数据并计算预测误差
val evaluator = new RegressionEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("rmse")
//打印均方根误差
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")
//打印生成的决策树
val treeModel = model.stages(1).asInstanceOf[DecisionTreeRegressionModel]
println(s"Learned regression tree model:\n ${treeModel.toDebugString}")
// 停止SparkContext
spark.stop()
}
}
代码10.5
//导入相关类
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.evaluation.ClusteringEvaluator
import org.apache.spark.sql.SparkSession
object KMeansExample {
def main(args: Array[String]): Unit = {
//设置运行环境为本地模式
val spark = SparkSession
.builder
.appName(s"${this.getClass.getSimpleName}")
.master("local")
.getOrCreate()
// 把以libsvm格式存储加载为DataFrame
val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
// 训练一个K-means模型,设置K=2并设置一个随机数种子
val kmeans = new KMeans().setK(2).setSeed(1L)
val model = kmeans.fit(dataset)
// 用训练好的模型对数据集进行聚类
val predictions = model.transform(dataset)
// 评估聚类结果
val evaluator = new ClusteringEvaluator()
val silhouette = evaluator.evaluate(predictions)
// 打印通过欧氏距离得出的轮廓系数
println(s"Silhouette with squared euclidean distance = $silhouette")
// 打印聚类中心点
println("Cluster Centers: ")
model.clusterCenters.foreach(println)
// 停止SparkContext
spark.stop()
}
}
代码10.6
//导入FPGrowth和SparkSession的包
import org.apache.spark.ml.fpm.FPGrowth
import org.apache.spark.sql.SparkSession
object FPGrowthExample {
def main(args: Array[String]): Unit = {
//SparkSession.builder创建示例,设置运行模式等配置信息
val spark = SparkSession.builder
.master("local")
.appName(s"${this.getClass.getSimpleName}")
.getOrCreate()
//隐式将RDD转换为DataFrame需要的包
import spark.implicits._
//隐式创建DataFrame,列名为items
val dataset = spark.createDataset(Seq(
"I1 I2 I5",
"I2 I4",
"I3 I4 I5",
"I1 I2 I4 I5",
"I4 I5")
).map(t => t.split(" ")).toDF("items")
//创建FPGrowth示例,
//设置Items列(输入列)名为items,最小支持度为0.4,最小置信度为0.6
val fpgrowth = new FPGrowth().setItemsCol("items")
.setMinSupport(0.4).setMinConfidence(0.6)
//调用fit()方法,生成FPGrowthModel(Transformer)
val model = fpgrowth.fit(dataset)
//调用model的freqItemsets.show打印输出频繁项集
model.freqItemsets.show()
//调用model的associationRules.show打印关联规则
model.associationRules.show()
//调用model的transform方法,生成结果,show方法
model.transform(dataset).show()
spark.stop()
}
}