class ShuffleDependency[K, V, C](
@transient _add: RDD[ _ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Option[Serializer] = None,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]]{
override def rdd = _rdd.asInstanceOf[RDD[Product2[K, V]]]
//获取新shuffleId
val shuffleId: Int = _rdd.context,newShuffleId()
//向ShuffleManager注册Shuffle的信息
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.size, this)
_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}
代码5.5
scala> val data = Array(1,2,3,4,5)
data: Array[Int] = Array(1, 2, 3, 4, 5)
scala> val distData = sc.parallelize(data)
distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:26
scala> distData.partitions.size
res25: Int = 1
scala> val disData2 = sc.parallelize(data,4)
disData2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at <console>:26
scala> disData2.partitions.size
res30: Int = 4
代码5.6
scala> val data1 = sc.makeRDD(Array("changsha","is a beautiful city","yes"))
data1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[17] at makeRDD at <console>:24
scala> data1.collect
res31: Array[String] = Array(changsha, is a beautiful city, yes)
代码5.7
//读取该目录下所有文件
val text1 = sc.textFile("/my/directory/*")
//采用通配符匹配同一类型的文件进行读取
val text2 = sc.textFile("/my/directory/*.txt")
//也可以读取压缩文件
val text3 = sc.textFile("/my/directory/*.gz")
//同时读取来自不同路径的多个文件
val text4 = sc.textFile("/my/directory/test1.txt" , "/my/directory/test2.txt")
代码5.8
scala> val data = sc.textFile("/usr/spark/spark-2.3.0-bin-hadoop2.7/README.md",3)
data:org.apache.spark.rdd.RDD[String]=/usr/spark/spark-2.3.0-bin-hadoop2.7/README.md MapPartitionsRDD[7] at textFile at <console>:24
代码5.9
scala> val rdd=sc.parallelize(Array(1,2,3))
scala> val rdd1 = rdd.map(x=>x.to(3)).collect
rdd1: Array[scala.collection.immutable.Range.Inclusive]=Array(Range(1, 2, 3), Range(2, 3), Range(3))
代码5.10
scala> val rdd=sc.parallelize(Array(1,2,3))
scala> val rdd2 = rdd.flatMap(x=>x.to(3)).collect
rdd2: Array[Int] = Array(1, 2, 3, 2, 3, 3)
代码5.11
scala> val rdd=sc.parallelize(Array(1,2,3,4))
scala> val line1 = rdd.filter(x=>x>2).foreach(println)
3
4
代码5.12
scala> val rdd_a = sc.parallelize(Array("apple","orange","pineapple","pineapple"))
rdd_a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:24
scala> val rdd_b = sc.parallelize(Array("apple","orange","grape"))
rdd_b: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[11] at parallelize at <console>:24
scala> val rdd_union = rdd_a.union(rdd_b).foreach(println)
apple
orange
pineapple
pineapple
apple
orange
grape
代码5.13
scala> val rdd_distinct = rdd_a.distinct().foreach(println)
orange
pineapple
apple
代码5.14
scala> rdd_a.partitions.size
res4: Int = 1
scala> val rdd_re = rdd_a.repartition(4)
rdd_re: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[29] at repartition at <console>:25
scala> rdd_re.partitions.size
res7: Int = 4
代码5.15
scala> val rdd_sub = rdd_b.subtract(rdd_a).foreach(println)
grape
代码5.16
scala> val rdd3 = sc.parallelize(Array((1,3),(2,6),(2,2),(3,6)))
rdd3: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[42] at parallelize at <console>:24
scala> val rdd_gbk = rdd3.groupByKey().foreach(println)
(1,CompactBuffer(3))
(3,CompactBuffer(6))
(2,CompactBuffer(6, 2))
代码5.17
scala> val rdd1 = sc.parallelize(Array((1,2),(2,3),(2,6),(3,8),(3,10)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val rdd2 = rdd1.reduceByKey((x,y)=>x+y).foreach(println)
(1,2)
(3,18)
(2,9)
代码5.18
scala> val rdd3 = rdd1.sortByKey(true).foreach(println)
(1,2)
(2,3)
(2,6)
(3,8)
(3,10)
代码5.19
scala> val rdd1 = sc.parallelize(Array(("a",1),("b",1),("c",1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(Array(("a",2),("b",2),("c",2)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[8] at parallelize at <console>:24
scala> val rdd3 = rdd1.join(rdd2). foreach(println)
(a,(1,2))
(b,(1,2))
(c,(1,2))
代码5.20
scala> val rdd4 = sc.parallelize(Array(3,4,5,6,7))
rdd4: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:24
scala> val rdd5 = rdd4.reduce((x,y)=>x+y)
rdd5: Int = 25
代码5.21
scala> val data = sc.parallelize(Array(("A",1),("B",2),("C",3)))
data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> data.fold(("V0",2))((A,B)=>(A._1 + "@" + B._1,A._2+B._2))
res5: (String, Int) = (V0@V0@A@B@C,10)
代码5.22
scala> val data1 = sc.parallelize(Array(1,2,3,4,5))
data1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> data1.fold(0)((A,B)=>A+B)
res7: Int = 15
代码5.23
scala> val data = sc.parallelize(Array(("A",1),("B",2),("C",3)))
data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[2] at parallelize at <console>:24
scala> data.aggregate(("V0",2))((A,B)=>(A._1 + "@" + B._1,A._2+B._2),(A,B)=>(A._1 + "$" + B._1,A._2+B._2))
res10: (String, Int) = (V0$V0@A@B@C,10)
scala> val rdd5 = sc.parallelize(Array(("a",1),("b",2),("a",5),("b",6)))
rdd5: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[13] at parallelize at <console>:24
scala> rdd5.countByKey
res10: scala.collection.Map[String,Long] = Map(a -> 2, b -> 2)
代码5.30
scala> val rdd6 = sc.parallelize(Array("a","b","c","d"))
rdd6: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[16] at parallelize at <console>:24
scala> rdd6.saveAsTextFile("/home/ubuntu01/TextFile1")
//查看TextFile1中文本文件Part-00000的内容为:
a
b
c
d
代码5.31
scala> val list = List("Hadoop","Spark","Hive")
list: List[String] = List(Hadoop, Spark, Hive)
scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:26
scala> println(rdd.count())
3
scala> println(rdd.collect().mkString(" , "))
Hadoop , Spark , Hive
代码5.32
scala> val list = List("Hadoop","Spark","Hive")
list: List[String] = List(Hadoop, Spark, Hive)
scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[11] at parallelize at <console>:26
scala> rdd.cache()
res22: rdd.type = ParallelCollectionRDD[11] at parallelize at <console>:26
scala> println(rdd.count())
3
scala> println(rdd.collect().mkString(" , "))
Hadoop , Spark , Hive
代码5.33
//创建一个保存checkpoint数据的目录
val path = new Path(rdd.context.checkpointDir.get, “rdd-” + rdd.id)
val fs = path.getFileSystem(rdd.context.hadoopConfiguration)
if (!fs.mkdirs(path)) {
throw new SparkException(“Failed to create checkpoint path ” + path)
}
//创建广播变量
val broadcastedConf = rdd.context.broadcast(
new SerializableWritable(rdd.context.hadoopConfiguration))
//开始一个新的Job进行计算,计算结果存入路径path中
rdd.context.runJob(rdd, CheckpointRDD.writeToFile[T](path.toString,broadcastedConf) _)
//根据结果的路径path来创建CheckpointRDD
val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
//保存结果,清除原始RDD的依赖、Partition信息等
RDDCheckpointData.synchronized{
cpFile = Some(path.toString)
cpRDD = Some(newRDD) //RDDCheckpointData对应的CheckpointRDD
rdd.markCheckpointed(newRDD) //清除原始RDD的依赖
cpState = Checkpointed //标记checkpoint的状态为完成
}
代码5.34
import org.apache.spark.{SparkConf,SparkContext}
object rank {
def main(args:Array[String]): Unit ={
val conf = new SparkConf().setAppName("rank").setMaster("local")
val sc = new SparkContext(conf)
//设置将错误信息记录于日志
sc.setLogLevel("ERROR")
//对多个文件进行读取
val lines = sc.textFile("class1.txt,class2.txt,class3.txt")
var num =0
val result = lines.filter(line => (line.trim().length > 0) && (line.split(",").length == 4))
.map(line => {
val fields = line.split(",")
val userid = fields(1)//每行数据的第二个属性值是学号userid
val core = fields(2).toInt//第三个属性值是分数core
val classs = fields(3)//第四个属性值是班级classs
//拼接数据
(core,(classs,userid))
})
println("rank" +"\t" + "class" +"\t" + "\t" + "userid" +"\t" + "core" +"\n")
//对数据进行排序,取前10名进行输出
val result1 = result.sortByKey(false).take(10).foreach(x => {
num = num +1
println(num + "\t\t" + x._2._1+ "\t\t" + x._2._2 +"\t" + x._1)
})
}
}
代码5.35
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object mobineNum {
def main(args: Array[String]) {
// AppName 参数是应用程序的名字,可以在 Spark 内置 UI 上看到它。
val conf = new SparkConf().setAppName("mobineNum")
// Master 是 Spark、Mesos、或者 YARN 集群的 URL,或者使用一个专用的字符串“Local”设定其在本地模式下运行。
conf.setMaster("local")
//sc是SparkContext,指的是“上下文”,也就是运行的环境,需要把conf当参数传进去
val sc = new SparkContext(conf)
//通过sc获取一个文本文件,传入本地文本的路径,将输入文件转换成RDD
// path是该文本文件在该项目所在文件中的路径
val lines = sc.textFile("A.txt")
//切分
val splited = lines.map(line => {
//将每行记录以逗号进行分割
val fields = line.split(",")
//其中第一个属性值表示手机号
val mobile = fields(0)
//第二个属性值为基站信息
val lac = fields(2)
//第三个属性值为连接状态
val tp = fields(3)
//第四个属性值为时间,将其转换为数据类型。
val time = if(tp == "1") -fields(1).toLong else fields(1).toLong
//拼接数据,将其拼接为以下格式组成新的RDD
((mobile, lac), time)
})
//分组聚合,将同一个基站中同一个手机号的时间进行相加
val reduced= splited.reduceByKey(_+_)
val lmt = reduced.map(x => {
//x._1._2表示((mobile, lac), time)格式中lac,x._1._1表示mobine,x._2表示time,(基站id,(手机号,时间))
(x._1._2, (x._1._1, x._2))
})
//获取各个基站的信息
val lacInfo = sc.textFile("B.txt")
//整理基站数据
val splitedLacInfo = lacInfo.map(line => {
val fields = line.split(",")
//基站信息中第一个属性值为基站的id
val id = fields(0)
//基站信息中第二个属性值为基站的经度
val x = fields(1)
//基站信息中第三个属性值为基站的纬度
val y = fields(2)
//拼接为(基站id,(经度,纬度))
(id, (x, y))
})
//将两个RDD进行连接join操作
val joined = lmt.join(splitedLacInfo)
println(joined.collect().toBuffer)
sc.stop()
}
}