代码5.1

abstract class NarrowDependency[T](_rdd:RDD[T]) extends Dependency[T] {
	//返回子RDD的partitionId所依赖的所有的parent RDD的Partition(s)
	def getParents(partitionId:Int):Seq[Int]
	override def rdd: RDD[T] = _rdd
}

代码5.2

class OneToOneDependency[T](rdd:RDD[T]) extends NarrowDependency[T](rdd){
	override def getParents(partitionId:Int) = List(partitionId)
}

代码5.3

override def getParents(partitionId:Int):List[Int] = {
	if (partitionId >= outStart && partitionId < outStart + length) {
		//outStart是在UnionRDD 中的起始位置
		//length是parent RDD中Partition的数量
		List(partitionId – outStart + inStart)
		//inStart是这个Partition在parent RDD中Partition的起始位置	
	}
	else{	Nil	  }
}


代码5.4

class ShuffleDependency[K, V, C](
	@transient _add: RDD[ _ <: Product2[K, V]],
	val partitioner: Partitioner,
	val serializer: Option[Serializer] = None,
	val keyOrdering: Option[Ordering[K]] = None,
	val aggregator: Option[Aggregator[K, V, C]] = None,
	val mapSideCombine: Boolean = false)
	extends Dependency[Product2[K, V]]{
		override def rdd = _rdd.asInstanceOf[RDD[Product2[K, V]]]
		//获取新shuffleId
		val shuffleId: Int = _rdd.context,newShuffleId()
		//向ShuffleManager注册Shuffle的信息
		val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
		shuffleId, _rdd.partitions.size, this)
		_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
	}

代码5.5

scala> val data = Array(1,2,3,4,5)
data: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val distData = sc.parallelize(data)
distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:26

scala> distData.partitions.size
res25: Int = 1

scala> val disData2 = sc.parallelize(data,4)
disData2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at <console>:26

scala> disData2.partitions.size
res30: Int = 4

代码5.6

scala> val data1 = sc.makeRDD(Array("changsha","is a beautiful city","yes"))
data1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[17] at makeRDD at <console>:24

scala> data1.collect
res31: Array[String] = Array(changsha, is a beautiful city, yes)

代码5.7

//读取该目录下所有文件
val text1 = sc.textFile("/my/directory/*")
//采用通配符匹配同一类型的文件进行读取
val text2 = sc.textFile("/my/directory/*.txt")
//也可以读取压缩文件
val text3 = sc.textFile("/my/directory/*.gz")
//同时读取来自不同路径的多个文件
val text4 = sc.textFile("/my/directory/test1.txt" , "/my/directory/test2.txt")

代码5.8

scala> val data = sc.textFile("/usr/spark/spark-2.3.0-bin-hadoop2.7/README.md",3)
data:org.apache.spark.rdd.RDD[String]=/usr/spark/spark-2.3.0-bin-hadoop2.7/README.md MapPartitionsRDD[7] at textFile at <console>:24

代码5.9

scala> val rdd=sc.parallelize(Array(1,2,3))
scala> val rdd1 = rdd.map(x=>x.to(3)).collect
rdd1: Array[scala.collection.immutable.Range.Inclusive]=Array(Range(1, 2, 3), Range(2, 3), Range(3))

代码5.10

scala> val rdd=sc.parallelize(Array(1,2,3))
scala> val rdd2 = rdd.flatMap(x=>x.to(3)).collect
rdd2: Array[Int] = Array(1, 2, 3, 2, 3, 3)

代码5.11

scala> val rdd=sc.parallelize(Array(1,2,3,4))
scala> val line1 = rdd.filter(x=>x>2).foreach(println)
3
4

代码5.12

scala> val rdd_a = sc.parallelize(Array("apple","orange","pineapple","pineapple"))
rdd_a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:24
scala> val rdd_b = sc.parallelize(Array("apple","orange","grape"))
rdd_b: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[11] at parallelize at <console>:24
scala> val rdd_union = rdd_a.union(rdd_b).foreach(println)
apple
orange
pineapple
pineapple
apple
orange
grape


代码5.13

scala> val rdd_distinct = rdd_a.distinct().foreach(println) 
orange
pineapple
apple

代码5.14

scala> rdd_a.partitions.size
res4: Int = 1

scala> val rdd_re = rdd_a.repartition(4)
rdd_re: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[29] at repartition at <console>:25

scala> rdd_re.partitions.size
res7: Int = 4

代码5.15

scala> val rdd_sub = rdd_b.subtract(rdd_a).foreach(println)
grape

代码5.16

scala> val rdd3 = sc.parallelize(Array((1,3),(2,6),(2,2),(3,6)))
rdd3: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[42] at parallelize at <console>:24

scala> val rdd_gbk = rdd3.groupByKey().foreach(println)
(1,CompactBuffer(3))
(3,CompactBuffer(6))
(2,CompactBuffer(6, 2))

代码5.17

scala> val rdd1 = sc.parallelize(Array((1,2),(2,3),(2,6),(3,8),(3,10)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val rdd2 = rdd1.reduceByKey((x,y)=>x+y).foreach(println)
(1,2)
(3,18)
(2,9)

代码5.18

scala> val rdd3 = rdd1.sortByKey(true).foreach(println)
(1,2)
(2,3)
(2,6)
(3,8)
(3,10)

代码5.19

scala> val rdd1 = sc.parallelize(Array(("a",1),("b",1),("c",1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(Array(("a",2),("b",2),("c",2)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[8] at parallelize at <console>:24

scala> val rdd3 = rdd1.join(rdd2). foreach(println)
(a,(1,2))
(b,(1,2))
(c,(1,2))

代码5.20

scala> val rdd4 = sc.parallelize(Array(3,4,5,6,7))
rdd4: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:24

scala> val rdd5 = rdd4.reduce((x,y)=>x+y)
rdd5: Int = 25

代码5.21

scala> val data = sc.parallelize(Array(("A",1),("B",2),("C",3)))
data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> data.fold(("V0",2))((A,B)=>(A._1 + "@" + B._1,A._2+B._2))
res5: (String, Int) = (V0@V0@A@B@C,10)

代码5.22

scala> val data1 = sc.parallelize(Array(1,2,3,4,5))
data1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> data1.fold(0)((A,B)=>A+B)
res7: Int = 15

代码5.23

scala> val data = sc.parallelize(Array(("A",1),("B",2),("C",3)))
data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[2] at parallelize at <console>:24

scala> data.aggregate(("V0",2))((A,B)=>(A._1 + "@" + B._1,A._2+B._2),(A,B)=>(A._1 + "$" + B._1,A._2+B._2))
res10: (String, Int) = (V0$V0@A@B@C,10)

代码5.24

scala> rdd4.collect
res1: Array[Int] = Array(3, 4, 5, 6, 7)

代码5.25

scala> rdd4.count
res2: Long = 5

代码5.26

scala> rdd4.first
res3: Int = 3

代码5.27

scala> rdd4.take(3)
res4: Array[Int] = Array(3, 4, 5)

代码5.28

scala> rdd4.foreach(println)
3
4
5
6
7

代码5.29

scala> val rdd5 = sc.parallelize(Array(("a",1),("b",2),("a",5),("b",6)))
rdd5: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[13] at parallelize at <console>:24

scala> rdd5.countByKey
res10: scala.collection.Map[String,Long] = Map(a -> 2, b -> 2)

代码5.30

scala> val rdd6 = sc.parallelize(Array("a","b","c","d"))
rdd6: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[16] at parallelize at <console>:24

scala> rdd6.saveAsTextFile("/home/ubuntu01/TextFile1")
//查看TextFile1中文本文件Part-00000的内容为:
a
b
c
d

代码5.31

scala> val list = List("Hadoop","Spark","Hive")
list: List[String] = List(Hadoop, Spark, Hive)

scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:26

scala> println(rdd.count())
3

scala> println(rdd.collect().mkString(" , "))
Hadoop , Spark , Hive

代码5.32

scala> val list = List("Hadoop","Spark","Hive")
list: List[String] = List(Hadoop, Spark, Hive)

scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[11] at parallelize at <console>:26

scala> rdd.cache()
res22: rdd.type = ParallelCollectionRDD[11] at parallelize at <console>:26

scala> println(rdd.count())
3

scala> println(rdd.collect().mkString(" , "))
Hadoop , Spark , Hive

代码5.33

//创建一个保存checkpoint数据的目录
val path = new Path(rdd.context.checkpointDir.get, “rdd-” + rdd.id)
val fs = path.getFileSystem(rdd.context.hadoopConfiguration)
if (!fs.mkdirs(path)) {
	throw new SparkException(“Failed to create checkpoint path ” + path)
}
//创建广播变量
val broadcastedConf = rdd.context.broadcast(
new SerializableWritable(rdd.context.hadoopConfiguration))
//开始一个新的Job进行计算,计算结果存入路径path中
rdd.context.runJob(rdd, CheckpointRDD.writeToFile[T](path.toString,broadcastedConf) _)
//根据结果的路径path来创建CheckpointRDD
val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
//保存结果,清除原始RDD的依赖、Partition信息等
RDDCheckpointData.synchronized{
	cpFile = Some(path.toString)
	cpRDD = Some(newRDD) //RDDCheckpointData对应的CheckpointRDD
	rdd.markCheckpointed(newRDD) //清除原始RDD的依赖
	cpState = Checkpointed //标记checkpoint的状态为完成
}

代码5.34

import org.apache.spark.{SparkConf,SparkContext}
object rank {
    def main(args:Array[String]): Unit ={
    	val conf = new SparkConf().setAppName("rank").setMaster("local")
		val sc = new SparkContext(conf)
		//设置将错误信息记录于日志
		sc.setLogLevel("ERROR")
		//对多个文件进行读取
    	val lines = sc.textFile("class1.txt,class2.txt,class3.txt")
    	var num =0
    	val result  = lines.filter(line => (line.trim().length > 0) && (line.split(",").length == 4))
     	.map(line => {
			val fields = line.split(",")
			val userid = fields(1)//每行数据的第二个属性值是学号userid
			val core = fields(2).toInt//第三个属性值是分数core
			val classs = fields(3)//第四个属性值是班级classs
			//拼接数据
			(core,(classs,userid))
      	})
    	println("rank" +"\t" + "class"  +"\t" + "\t" + "userid" +"\t" + "core" +"\n")
		//对数据进行排序,取前10名进行输出
    	val result1 = result.sortByKey(false).take(10).foreach(x => {
     		num = num +1
      		println(num + "\t\t" + x._2._1+ "\t\t" + x._2._2 +"\t" + x._1)
    	})
  	}
}

代码5.35

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object mobineNum {
	def main(args: Array[String]) {
		// AppName 参数是应用程序的名字,可以在 Spark 内置 UI 上看到它。
		val conf = new SparkConf().setAppName("mobineNum")
		// Master 是 Spark、Mesos、或者 YARN 集群的 URL,或者使用一个专用的字符串“Local”设定其在本地模式下运行。
		conf.setMaster("local")
		//sc是SparkContext,指的是“上下文”,也就是运行的环境,需要把conf当参数传进去
		val sc = new SparkContext(conf)
		//通过sc获取一个文本文件,传入本地文本的路径,将输入文件转换成RDD
		// path是该文本文件在该项目所在文件中的路径
		val lines = sc.textFile("A.txt")
		//切分 
		val splited = lines.map(line => {
			//将每行记录以逗号进行分割
			val fields = line.split(",")
			//其中第一个属性值表示手机号
			val mobile = fields(0)
			//第二个属性值为基站信息
			val lac = fields(2)
			//第三个属性值为连接状态
			val tp = fields(3)
			//第四个属性值为时间,将其转换为数据类型。
			val time = if(tp == "1") -fields(1).toLong else fields(1).toLong
			//拼接数据,将其拼接为以下格式组成新的RDD
			((mobile, lac), time)
    	})
		//分组聚合,将同一个基站中同一个手机号的时间进行相加
    	val reduced= splited.reduceByKey(_+_)
		val lmt = reduced.map(x => {
    		//x._1._2表示((mobile, lac), time)格式中lac,x._1._1表示mobine,x._2表示time,(基站id,(手机号,时间))
        	(x._1._2, (x._1._1, x._2))
    	})

		//获取各个基站的信息
    	val lacInfo = sc.textFile("B.txt")
    	//整理基站数据
    	val splitedLacInfo = lacInfo.map(line => {
      		val fields = line.split(",")
      		//基站信息中第一个属性值为基站的id
      		val id = fields(0)
      		//基站信息中第二个属性值为基站的经度
      		val x = fields(1)
      		//基站信息中第三个属性值为基站的纬度
      		val y = fields(2)
      		//拼接为(基站id,(经度,纬度))
      		(id, (x, y))
    	})
    	//将两个RDD进行连接join操作
    	val joined = lmt.join(splitedLacInfo)
    	println(joined.collect().toBuffer)
    	sc.stop()
  	}
}