代码8.1

class VertexPartition[@specialized ( Long, Int, Double)  VD : ClassTag](
	val index: VertexIdToIndexMap,
	val values: Array[VD],
	val mask: BitSet,
	private val activeSet: Option[VertexSet] = None )

代码8.2

class EdgePartition[@specialized (Char, Int, Boolean, Byte, Long, Float, Double)  ED : ClassTag](
	val srcIds: Array[VertexId],
	val dstIds: Array[VertexId],
	val data: Array[ED],
	val index: PrimitiveKeyOpenHashMap[VertexId,Int] )

代码8.3

scala> import org.apache.spark.graphx._ //首先将GraphX导入
scala> val myVertices = sc.parallelize(Array((1L,"Tom"),(2L,"Marry"),(3L,"Jack"),(4L,"Cody"),(5L,"Adam"),(6L,"Bob"))) //构造VertexRDD

scala> val myEdges = sc.parallelize(Array(Edge(1L,2L,"Colleague"),Edge(2L,3L,"Child"),Edge(2L,4L,"Child"),Edge(4L,5L,"Colleague"),Edge(4L,6L,"Colleague"),Edge(5L,6L,"Friend"))) //构造EdgeRDD

scala> val myGraph=Graph(myVertices,myEdges) //构造图Graph[VD,ED]


代码8.4

scala> myGraph.vertices.collect
res0: Array[(org.apache.spark.graphx.VertexId, String)] = Array((1,Tom), (2,Marry), (3,Jack), (4,Cody), (5,Adam), (6,Bob))

代码8.5

scala> myGraph.edges.collect
res1: Array[org.apache.spark.graphx.Edge[String]] = Array(Edge(1,2,Colleague), Edge(2,3,Child), Edge(2,4,Child), Edge(4,5,Colleague), Edge(4,6,Colleague), Edge(5,6,Friend))

代码8.6

iscala> myGraph.triplets.collect
res2: Array[org.apache.spark.graphx.EdgeTriplet[String,String]] = Array(((1,Tom),(2,Marry),Colleague), ((2,Marry),(3,Jack),Child), ((2,Marry),(4,Cody),Child), ((4,Cody),(5,Adam),Colleague), ((4,Cody),(6,Bob),Colleague), ((5,Adam),(6,Bob),Friend))

代码8.7


scala> myGraph.inDegrees.collect
res3: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((2,1), (3,1), (4,1), (5,1), (6,2))
		

代码8.8

//定义max函数找出两两节点中度数较大的节点
scala> def max(a :(VertexId,Int), b :(VertexId,Int)): (VertexId,Int)={ 
         if(a._2 > b._2) a else b
      }
max: (a: (org.apache.spark.graphx.VertexId, Int), b: (org.apache.spark.graphx.VertexId, Int))(org.apache.spark.graphx.VertexId, Int)

//使用reduce操作对两两元素进行归约操作,找出所有节点中度数最大的节点
scala> myGraph.degrees.reduce(max)
res4: (org.apache.spark.graphx.VertexId, Int) = (4,3)

代码8.9

scala> val outDegrees: VertexRDD[Int] = myGraph.outDegrees
outDegrees: org.apache.spark.graphx.VertexRDD[Int] = VertexRDDImpl[25] at RDD at VertexRDD.scala:57
scala> val degreeGraph = myGraph.outerJoinVertices(outDegrees) { (id, oldAttr, outDegOpt) =>outDegOpt match {
          case Some(outDeg) =>outDeg
          case None => 0  //没有度信息则为0
         }
      }
degreeGraph: org.apache.spark.graphx.Graph[Int,String] = org.apache.spark.graphx.impl.GraphImpl@11544ddd
scala> degreeGraph.vertices.collect
res5: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((4,2), (1,1), (6,0), (3,0), (5,1), (2,2))

代码8.10

scala> 
myGraph.mapTriplets(t => (t.attr, t.attr=="Colleague" && t.srcAttr.toLowerCase.contains("o"))).triplets.collect
res6: Array[org.apache.spark.graphx.EdgeTriplet[String,(String, Boolean)]] = Array(((1,Tom),(2,Marry),(Colleague,true)), ((2,Marry),(3,Jack),(Child,false)), ((2,Marry),(4,Cody),(Child,false)), ((4,Cody),(5,Adam),(Colleague,true)), ((4,Cody),(6,Bob),(Colleague,true)), ((5,Adam),(6,Bob),(Friend,false)))


代码8.11

scala> val subGraph = myGraph.subgraph(each => each.attr == "Colleague")
subGraph: org.apache.spark.graphx.Graph[String,String] = org.apache.spark.graphx.impl.GraphImpl@48cbb4c5

scala> subGraph.vertices.collect
res7: Array[(org.apache.spark.graphx.VertexId, String)] = Array((4,Cody), (1,Tom), (6,Bob), (3,Jack), (5,Adam), (2,Marry))

scala> subGraph.edges.collect
res8: Array[org.apache.spark.graphx.Edge[String]] = Array(Edge(1,2,Colleague), Edge(4,5,Colleague), Edge(4,6,Colleague))


代码8.12

def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = TripletFields.All)
      : VertexRDD[Msg]

代码8.13

scala> myGraph.aggregateMessages[Int](_.sendToSrc(1), _+_).collect
res9: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((1,1), (2,2), (4,2), (5,1))

代码8.14

def pregel[A]
         (initialMsg: A,
          maxIter: Int = Int.MaxValue,
          activeDir: EdgeDirection = EdgeDirection.Out)
         (vprog: (VertexId, VD, A) => VD,
          sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
          mergeMsg: (A, A) => A)
         : Graph[VD, ED]


代码8.15


scala> val g=Pregel(myGraph.mapVertices((vid,vd) => 0),0,activeDirection=EdgeDirection.Out)((id:VertexId, vd:Int, a:Int) => math.max(vd,a),(et:EdgeTriplet[Int,String]) => Iterator((et.dstId,et.srcAttr+1)),(a:Int,b:Int) => math.max(a,b))

scala> g.vertices.collect
res10: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((1,0), (2,1), (3,2), (4,2), (5,3), (6,4))

代码8.16

//定义sendMsg函数
scala> def sendMsg(ec:EdgeContext[Int,String,Int]):Unit = {
      ec.sendToDst(ec.srcAttr+1)
      }
//定义mergeMsg函数
scala> def mergeMsg(a:Int,b:Int):Int = {
      math.max(a,b)
      }
scala> def propagateEdgeCount(g:Graph[Int,String]):Graph[Int,String] = {
         val verts = g.aggregateMessages[Int](sendMsg,mergeMsg)//生成新的顶点集
         val g2 = Graph(verts,g.edges)//根据新顶点集生成一个新图
         //将新图g2和原图g连接,查看顶点的距离值是否有变化
         val check = g2.vertices.join(g.vertices).
         map(x=>x._2._1-x._2._2).
         reduce(_+_)
         //判断距离变化,如果有变化,则继续递归,否则返回新的图对象
         if(check>0)
           propagateEdgeCount(g2)
         else
           g
       }
//初始化距离值,将每个顶点的值设置为0
scala> val newGraph = myGraph.mapVertices((_,_)=>0)
scala> propagateEdgeCount(newGraph).vertices.collect
res11: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((1,0), (2,1), (3,2), (4,2), (5,3), (6,4))


代码8.17

package org.apache.spark.examples.graphx
import org.apache.spark.graphx.GraphLoader
import org.apache.spark.sql.SparkSession
object PageRankExample {
  	def main(args: Array[String]): Unit = {
		// 创建一个SparkSession.
		val spark = SparkSession
		  .builder
		  //如果代码在本地计算机运行需要添加master("local")
		  .appName(s"${this.getClass.getSimpleName}").master("local")
		  .getOrCreate()
		val sc = spark.sparkContext
		// 加载边数据,创建Graph
		val graph = GraphLoader.edgeListFile(sc, "followers.txt")
		// 运行 PageRank
		val ranks = graph.pageRank(0.0001).vertices
		// 将排名与用户名连接,连接后输出结果
		val users = sc.textFile("users.txt").map { line =>
			  val fields = line.split(",")
			  (fields(0).toLong, fields(1))
		}
		val ranksByUsername = users.join(ranks).map {
		  	case (id, (username, rank)) => (username, rank)
		}
		println(ranksByUsername.collect().mkString("\n"))
		spark.stop()
  	}
}

代码8.18

scala> import org.apache.spark.graphx.{GraphLoader, PartitionStrategy}
import org.apache.spark.graphx.{GraphLoader, PartitionStrategy}

scala> val graph = GraphLoader.edgeListFile(sc, "/usr/local/spark-2.3.0-bin-hadoop2.7/data/graphx/followers.txt", true).partitionBy(PartitionStrategy.RandomVertexCut)
graph: org.apache.spark.graphx.Graph[Int,Int] = org.apache.spark.graphx.impl.GraphImpl@428169d

scala> val triCounts = graph.triangleCount().vertices //对每个顶点计算三角形数
triCounts: org.apache.spark.graphx.VertexRDD[Int] = VertexRDDImpl[128] at RDD at VertexRDD.scala:57
//将三角形数和用户名相联系
scala> val users = sc.textFile("/usr/local/spark-2.3.0-bin-hadoop2.7/data/graphx/users.txt").map {line =>
     | val fields = line.split(",")
     | (fields(0).toLong, fields(1))
     | }
users: org.apache.spark.rdd.RDD[(Long, String)] = MapPartitionsRDD[133] at map at <console>:27

scala> val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) =>
     | (username, tc)
     | }
triCountByUsername: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[137] at map at <console>:30
//输出结果
scala> println(triCountByUsername.collect().mkString("\n"))
(justinbieber,0)
(BarackObama,0)
(matei_zaharia,1)
(jeresig,1)
(odersky,1)
(ladygaga,0)

代码8.19

scala> import org.apache.spark.graphx._  //将GraphX包导入
import org.apache.spark.graphx._
//构建连通分量示例图,并缓存
scala> val g=Graph(sc.makeRDD((1L to 7L).map((_,""))),
sc.makeRDD(Array(Edge(2L,5L,""),Edge(5L,3L,""),Edge(3L,2L,""),
Edge(4L,5L,""),Edge(6L,7L,"")))).cache

//使用连通分量算法,并通过map变换操作和ID分组显示结果
scala> g.connectedComponents.vertices.map(_.swap).groupByKey.map(_._2).collect
res12: Array[Iterable[org.apache.spark.graphx.VertexId]] = Array(CompactBuffer(1), CompactBuffer(6, 7), CompactBuffer(4, 3, 5, 2))


代码8.20

scala> import org.apache.spark.graphx._
import org.apache.spark.graphx._

//构造VertexRDD
scala> val v = sc.makeRDD(Array((1L,""),(2L,""),(3L,""),(4L,""),(5L,""),(6L,""),(7L,""),(8L,"")))
v: org.apache.spark.rdd.RDD[(Long, String)] = ParallelCollectionRDD[0] at makeRDD at <console>:27

//构造EdgeRDD
scala> val e = sc.makeRDD(Array(Edge(1L,2L,""),Edge(2L,3L,""),Edge(3L,4L,""),
Edge(4L,1L,""),Edge(1L,3L,""),Edge(2L,4L,""),Edge(4L,5L,""),
Edge(5L,6L,""),Edge(6L,7L,""),Edge(7L,8L,""),Edge(8L,5L,""),
Edge(5L,7L,""),Edge(6L,8L,"")))
e: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[String]] = ParallelCollectionRDD[1] at makeRDD at <console>:27

//调用LabelPropagation的run()方法,并根据顶点Id从小到大显示结果
scala> lib.LabelPropagation.run(Graph(v,e),5).vertices.collect.sortWith(_._1<_._1)
res13: Array[(org.apache.spark.graphx.VertexId, org.apache.spark.graphx.VertexId)] = Array((1,2), (2,1), (3,2), (4,1), (5,2), (6,1), (7,2), (8,3))


代码8.21

scala> import org.apache.spark.graphx._  //导入GraphX包
import org.apache.spark.graphx._

//构建图中的EdgeRDD
scala> val edges = sc.makeRDD (Array(Edge(1L,5L,5.0), Edge(1L,6L,4.0), Edge(2L,6L,5.0), Edge(2L,7L,5.0), Edge(3L,5L,5.0), Edge(3L,6L,2.0), Edge(4L,5L,4.0), Edge(4L,6L,4.0)))  
edges: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Double]] = ParallelCollectionRDD[0] at makeRDD at <console>:27

//算法指定超参数,参考参数设定表
scala> val conf=new lib.SVDPlusPlus.Conf(2,10,0,5,0.007,0.007,0.005,0.015)
conf: org.apache.spark.graphx.lib.SVDPlusPlus.Conf = org.apache.spark.graphx.lib.SVDPlusPlus$Conf@2674ca88

//运行SVD++算法,获得返回的模型—输入图的再处理结果和数据集的平均打分情况
scala> val (g,mean)=lib.SVDPlusPlus.run(edges,conf)
g: org.apache.spark.graphx.Graph[(Array[Double], Array[Double], Double, Double),Double] = org.apache.spark.graphx.impl.GraphImpl@3ce4eb42
mean: Double = 4.25

//定义pred()函数,输入为模型的参数、用户id和需要预测的影片
scala> def pred (g:Graph[(Array[Double], Array[Double], Double, Double), Double ], mean:Double, u:Long, i:Long)={
      val user=g.vertices.filter(_._1 == u).collect()(0)._2
      val item=g.vertices.filter(_._1 == i).collect()(0)._2
      mean+user._3+item._3+item._1.zip(user._2).map(x => x._1 * x._2).reduce(_+_)
      }
pred: (g: org.apache.spark.graphx.Graph[(Array[Double], Array[Double], Double, Double),Double], mean: Double, u: Long, i: Long)Double

//SVDPlusPlus的一部分初始化是使用随机数的,所以对于同一份输入结果每次程序的预测结果不一定完全一样
scala> pred(g,mean,4L,7L)
res14: Double = 5.935786807208753


代码8.22

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.graphx.{Edge, Graph, TripletFields, VertexId}
import scala.reflect.ClassTag
object Paths {        //单源最短路径
  	def dijkstra[VD: ClassTag](g : Graph[VD, Double], origin: VertexId) = {
		//初始化,其中属性为(boolean, double,Long)类型,boolean用于标记是否访问过,double为顶点距离原点的距离,Long是上一个顶点的id
		var g2 = g.mapVertices((vid, _) => (false, if(vid == origin) 0 else Double.MaxValue, -1L))
		for(i <- 1L to g.vertices.count()) {
			  //从没有访问过的顶点中找出距离原点最近的点
			  val currentVertexId = g2.vertices.filter(! _._2._1).reduce((a,b) => if (a._2._2 < b._2._2) a else b)._1
			  //更新currentVertexId邻接顶点的‘double’值
			  val newDistances = g2.aggregateMessages[(Double, Long)](
				triplet => if(triplet.srcId == currentVertexId && !triplet.dstAttr._1) {    //只给未确定的顶点发送消息
				  triplet.sendToDst((triplet.srcAttr._2 + triplet.attr, triplet.srcId))
				},(x, y) => if(x._1 < y._1) x else y ,
				TripletFields.All)
			  g2 = g2.outerJoinVertices(newDistances) {       //更新图形
					case (vid, vd, Some(newSum)) => (vd._1 ||
					  vid == currentVertexId, math.min(vd._2, newSum._1), if(vd._2 <= newSum._1) vd._3 else newSum._2 )
					case (vid, vd, None) => (vd._1|| vid == currentVertexId, vd._2, vd._3)
			  }
		}
		g.outerJoinVertices(g2.vertices)( (vid, srcAttr, dist) => (srcAttr, dist.getOrElse(false, Double.MaxValue, -1)._2) )
	}
   	def main(args: Array[String]): Unit ={
		val conf = new SparkConf().setAppName("ShortPaths").setMaster("local[4]")//指定四个本地线程数目,来模拟分布式集群
		val sc = new SparkContext(conf) //屏蔽日志
		Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
		Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
		val myVertices = sc.makeRDD(Array((1L, "A"), (2L, "B"), (3L, "C"), (4L, "D"), (5L, "E"), (6L, "F"), (7L, "G")))
		val initialEdges = sc.makeRDD(Array(Edge(1L, 2L, 7.0), Edge(1L, 4L, 5.0),
		  Edge(2L, 3L, 8.0), Edge(2L, 4L, 9.0), Edge(2L, 5L, 7.0),Edge(3L, 5L, 5.0), Edge(4L, 5L, 15.0), Edge(4L, 6L, 6.0),Edge(5L, 6L, 8.0), Edge(5L, 7L, 9.0), Edge(6L, 7L, 11.0)))
		val myEdges = initialEdges.filter(e => e.srcId != e.dstId).flatMap(e => Array(e, Edge(e.dstId, e.srcId, e.attr))).distinct()  //去掉自循环边,有向图变为无向图,去除重复边
		val myGraph = Graph(myVertices, myEdges).cache()
		println(dijkstra(myGraph, 1L).vertices.map(x => (x._1, x._2)).collect().mkString(" | "))
  }
}


代码8.23

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx._
object TSP {
  	def greedy[VD](g: Graph[VD, Double], origin: VertexId) = {
		var g2: Graph[Boolean, (Double, Boolean)] = g.mapVertices((vid, vd) => vid == origin).mapTriplets {
		  et => (et.attr, false)
    	}
		var nextVertexId = origin
		var edgesAreAvailable = true
		type tripletType = EdgeTriplet[Boolean, (Double, Boolean)]
		do {
			val availableEdges = g2.triplets.filter { et => !et.attr._2 && (et.srcId == nextVertexId && !et.dstAttr || et.dstId == nextVertexId && !et.srcAttr) }
			edgesAreAvailable = availableEdges.count > 0
			if (edgesAreAvailable) {
				val smallestEdge = availableEdges.min()(new Ordering[tripletType]() {
					override def compare(a: tripletType, b: tripletType) = {
						Ordering[Double].compare(a.attr._1, b.attr._1)
					}
				})
				nextVertexId = Seq(smallestEdge.srcId, smallestEdge.dstId).filter(_ != nextVertexId).head
				g2 = g2.mapVertices((vid, vd) => vd || vid == nextVertexId).mapTriplets { et =>
					(et.attr._1, et.attr._2 ||
					(	et.srcId == smallestEdge.srcId&& et.dstId == smallestEdge.dstId))
				}
			}
		} while (edgesAreAvailable)
		g2
  	}
  	def main(args: Array[String]): Unit = {
		val conf = new SparkConf().setAppName("ShortPaths").setMaster("local[4]")
		val sc = new SparkContext(conf) //屏蔽日志
		Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
		Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
		val myVertices = sc.makeRDD(Array((1L, "A"), (2L, "B"), (3L, "C"), (4L, "D"), (5L, "E"), (6L, "F"), (7L, "G")))
		val initialEdges = sc.makeRDD(Array(Edge(1L, 2L, 7.0), Edge(1L, 4L, 5.0),
		  Edge(2L, 3L, 8.0), Edge(2L, 4L, 9.0), Edge(2L, 5L, 7.0), Edge(3L, 5L, 5.0), Edge(4L, 5L, 15.0), Edge(4L, 6L, 6.0),
		  Edge(5L, 6L, 8.0), Edge(5L, 7L, 9.0), Edge(6L, 7L, 11.0)))
		val myEdges = initialEdges.filter(e => e.srcId != e.dstId).flatMap(e => Array(e, Edge(e.dstId, e.srcId, e.attr))).distinct()
		val myGraph = Graph(myVertices, myEdges).cache()
		println(greedy(myGraph, 1L).vertices.map(x => (x._1, x._2)).collect().mkString(" | "))
  }
}


代码8.24

import TSP.greedy
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx._
object Prime {
  	//最小生成树
  	def prime[VD: scala.reflect.ClassTag](g: Graph[VD, Double], origin: VertexId) = {
		//初始化,其中属性为(boolean, double,Long)类型,boolean用于标记是否访问过,//double为加入当前顶点的代价,Long是上一个顶点的id
		 var g2 = g.mapVertices((vid, _) => (false, if (vid == origin) 0 else Double.MaxValue, -1L))
		 for (i <- 1L to g.vertices.count()) {
			  //从没有访问过的顶点中找出 代价最小的点
			   val currentVertexId = g2.vertices.filter(!_._2._1).reduce((a, b) => if (a._2._2 < b._2._2) a else b)._1
			  //更新currentVertexId邻接顶点的“double”值
			   val newDistances = g2.aggregateMessages[(Double, Long)](
				 triplet => if (triplet.srcId == currentVertexId && !triplet.dstAttr._1) { //只给未确定的顶点发送消息
				   	triplet.sendToDst((triplet.attr, triplet.srcId))
				 },
				  (x, y) => if (x._1 < y._1) x else y,
				  TripletFields.All
			   )
			  //更新图形
			   g2 = g2.outerJoinVertices(newDistances) {
					 case (vid, vd, Some(newSum)) => (vd._1 || vid == currentVertexId, math.min(vd._2, newSum._1), if (vd._2 <= newSum._1) vd._3 else newSum._2)
					 case (vid, vd, None) => (vd._1 || vid == currentVertexId, vd._2, vd._3)
			   }
		 }
       g.outerJoinVertices(g2.vertices)((vid, srcAttr, dist) ==> (srcAttr, dist.getOrElse(false, Double.MaxValue, -1)._2,
            dist.getOrElse(false, Double.MaxValue, -1)._3))
  }
def main(args: Array[String]): Unit = {
      val conf = new SparkConf().setAppName("ShortPaths").setMaster("local[4]")
      val sc = new SparkContext(conf) //屏蔽日志
      Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
      Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
      val myVertices = sc.makeRDD(Array((1L, "A"), (2L, "B"), (3L, "C"), (4L, "D"), (5L, "E"), (6L, "F"), (7L, "G")))
      val initialEdges = sc.makeRDD(Array(Edge(1L, 2L, 7.0), Edge(1L, 4L, 5.0),
                   Edge(2L, 3L, 8.0), Edge(2L, 4L, 9.0), Edge(2L, 5L, 7.0), Edge(3L, 5L, 5.0), Edge(4L, 5L, 15.0), Edge(4L, 6L, 6.0),
                   Edge(5L, 6L, 8.0), Edge(5L, 7L, 9.0), Edge(6L, 7L, 11.0)))
      val myEdges = initialEdges.filter(e => e.srcId != e.dstId).flatMap(e => Array(e, Edge(e.dstId, e.srcId, e.attr))).distinct()
      val myGraph = Graph(myVertices, myEdges).cache()
      println(prime(myGraph,1L).vertices.map(x => (x._1, x._2)).collect().mkString(" | "))
  }
}

代码8.25

scala> import org.apache.spark.graphx._ //导入GraphX包
scala> val graph=GraphLoader.edgeListFile(sc,"Cit-HepTh.txt")
scala> graph.inDegrees.reduce((a,b)=>if(a._2>b._2) a else b)
res15: (org.apache.spark.graphx.VertexId, Int) = (9711200,2414)

代码8.26


scala> graph.vertices.take(5) //首先查看图中的顶点数据格式,顶点属性默认值为1
res16: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((9405166,1), (108150,1), (110163,1), (204100,1), (9407099,1))

scala> val v=graph.pageRank(0.001).vertices

scala> v.reduce((a,b)=>if(a._2>b._2) a else b)
res17: (org.apache.spark.graphx.VertexId, Double) = (9207016,85.27317386053808)

scala> graph.vertices.take(5) //首先查看图中的顶点数据格式,顶点属性默认值为1
res18: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((9405166,1), (108150,1), (110163,1), (204100,1), (9407099,1))

scala> val v=graph.pageRank(0.001).vertices
scala> v.reduce((a,b)=>if(a._2>b._2) a else b)
res19: (org.apache.spark.graphx.VertexId, Double) = (9207016,85.27317386053808)


代码8.27


scala> graph.personalizedPageRank(9207016,0.001).vertices.filter(_._1!=9207016).reduce((a,b) => if(a._2>b._2) a else b)
res20: (org.apache.spark.graphx.VertexId, Double) = (9201015,0.09211875000000003)

代码8.28


import java.io.PrintWriter
import org.apache.spark.graphx.{Edge, EdgeDirection, Graph, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Twitter_test {
  	def main(args: Array[String]): Unit = {
     	val conf = new SparkConf().setAppName("Twittter Influencer").setMaster("local[*]")
     	val sparkContext = new SparkContext(conf)
		sparkContext.setLogLevel("ERROR")
		//文本文件的路径根实际存放的位置决定
    	 val twitterData = sparkContext.textFile("twitter-graph-data.txt")
     	//分别从文本文件中提取followee和follower的数据
     	val followeeVertices: RDD[(VertexId, String)] = twitterData.map(_.split(",")).map { arr =>
        	val user = arr(0).replace("((", "")
        	val id = arr(1).replace(")", "")
        	(id.toLong, user)
     	}
     	val followerVertices: RDD[(VertexId, String)] = twitterData.map(_.split(",")).map { arr =>
        	val user = arr(2).replace("(", "")
        	val id = arr(3).replace("))", "")
        	(id.toLong, user)
		}
		//接下来,使用Spark GraphX API从提取的数据创建图形。
     	val vertices = followeeVertices.union(followerVertices)
     	val edges: RDD[Edge[String]] = twitterData.map(_.split(",")).map { arr =>
        	val followeeId = arr(1).replace(")", "").toLong
        	val followerId = arr(3).replace("))", "").toLong
        	Edge(followeeId, followerId, "follow")
      	}
		val defaultUser = ("") //提供了一个默认输入
		val graph = Graph(vertices, edges, defaultUser)
		//使用Spark GraphX的Pregel API和广度优先遍历算法
     	val subGraph = graph.pregel("", 2, EdgeDirection.In)((_, attr, msg) =>
                  attr + "," + msg,
                  triplet => Iterator((triplet.srcId, triplet.dstAttr)),
                  (a, b) => (a + "," + b))
     	//找到拥有最多followers of followers的用户
     	val lengthRDD = subGraph.vertices.map(vertex => (vertex._1, vertex._2.split(",").distinct.length - 2))
                 .max()(new Ordering[Tuple2[VertexId, Int]]() {
                    override def compare(x: (VertexId, Int), y: (VertexId, Int)): Int =
                    Ordering[Int].compare(x._2, y._2)
                  })
     	val userId = graph.vertices.filter(_._1 == lengthRDD._1).map(_._2).collect().head
     	println(userId + " has maximum influence on network with " + lengthRDD._2 + " influencers.")
     	val pw=new PrintWriter("Twitter_graph.gexf");
     	pw.close()
     	sparkContext.stop()
  	}
}