代码8.1
class VertexPartition[@specialized ( Long, Int, Double) VD : ClassTag](
val index: VertexIdToIndexMap,
val values: Array[VD],
val mask: BitSet,
private val activeSet: Option[VertexSet] = None )
代码8.2
class EdgePartition[@specialized (Char, Int, Boolean, Byte, Long, Float, Double) ED : ClassTag](
val srcIds: Array[VertexId],
val dstIds: Array[VertexId],
val data: Array[ED],
val index: PrimitiveKeyOpenHashMap[VertexId,Int] )
代码8.3
scala> import org.apache.spark.graphx._ //首先将GraphX导入
scala> val myVertices = sc.parallelize(Array((1L,"Tom"),(2L,"Marry"),(3L,"Jack"),(4L,"Cody"),(5L,"Adam"),(6L,"Bob"))) //构造VertexRDD
scala> val myEdges = sc.parallelize(Array(Edge(1L,2L,"Colleague"),Edge(2L,3L,"Child"),Edge(2L,4L,"Child"),Edge(4L,5L,"Colleague"),Edge(4L,6L,"Colleague"),Edge(5L,6L,"Friend"))) //构造EdgeRDD
scala> val myGraph=Graph(myVertices,myEdges) //构造图Graph[VD,ED]
代码8.4
scala> myGraph.vertices.collect
res0: Array[(org.apache.spark.graphx.VertexId, String)] = Array((1,Tom), (2,Marry), (3,Jack), (4,Cody), (5,Adam), (6,Bob))
代码8.5
scala> myGraph.edges.collect
res1: Array[org.apache.spark.graphx.Edge[String]] = Array(Edge(1,2,Colleague), Edge(2,3,Child), Edge(2,4,Child), Edge(4,5,Colleague), Edge(4,6,Colleague), Edge(5,6,Friend))
代码8.6
iscala> myGraph.triplets.collect
res2: Array[org.apache.spark.graphx.EdgeTriplet[String,String]] = Array(((1,Tom),(2,Marry),Colleague), ((2,Marry),(3,Jack),Child), ((2,Marry),(4,Cody),Child), ((4,Cody),(5,Adam),Colleague), ((4,Cody),(6,Bob),Colleague), ((5,Adam),(6,Bob),Friend))
代码8.7
scala> myGraph.inDegrees.collect
res3: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((2,1), (3,1), (4,1), (5,1), (6,2))
代码8.8
//定义max函数找出两两节点中度数较大的节点
scala> def max(a :(VertexId,Int), b :(VertexId,Int)): (VertexId,Int)={
if(a._2 > b._2) a else b
}
max: (a: (org.apache.spark.graphx.VertexId, Int), b: (org.apache.spark.graphx.VertexId, Int))(org.apache.spark.graphx.VertexId, Int)
//使用reduce操作对两两元素进行归约操作,找出所有节点中度数最大的节点
scala> myGraph.degrees.reduce(max)
res4: (org.apache.spark.graphx.VertexId, Int) = (4,3)
代码8.9
scala> val outDegrees: VertexRDD[Int] = myGraph.outDegrees
outDegrees: org.apache.spark.graphx.VertexRDD[Int] = VertexRDDImpl[25] at RDD at VertexRDD.scala:57
scala> val degreeGraph = myGraph.outerJoinVertices(outDegrees) { (id, oldAttr, outDegOpt) =>outDegOpt match {
case Some(outDeg) =>outDeg
case None => 0 //没有度信息则为0
}
}
degreeGraph: org.apache.spark.graphx.Graph[Int,String] = org.apache.spark.graphx.impl.GraphImpl@11544ddd
scala> degreeGraph.vertices.collect
res5: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((4,2), (1,1), (6,0), (3,0), (5,1), (2,2))
代码8.10
scala>
myGraph.mapTriplets(t => (t.attr, t.attr=="Colleague" && t.srcAttr.toLowerCase.contains("o"))).triplets.collect
res6: Array[org.apache.spark.graphx.EdgeTriplet[String,(String, Boolean)]] = Array(((1,Tom),(2,Marry),(Colleague,true)), ((2,Marry),(3,Jack),(Child,false)), ((2,Marry),(4,Cody),(Child,false)), ((4,Cody),(5,Adam),(Colleague,true)), ((4,Cody),(6,Bob),(Colleague,true)), ((5,Adam),(6,Bob),(Friend,false)))
代码8.11
scala> val subGraph = myGraph.subgraph(each => each.attr == "Colleague")
subGraph: org.apache.spark.graphx.Graph[String,String] = org.apache.spark.graphx.impl.GraphImpl@48cbb4c5
scala> subGraph.vertices.collect
res7: Array[(org.apache.spark.graphx.VertexId, String)] = Array((4,Cody), (1,Tom), (6,Bob), (3,Jack), (5,Adam), (2,Marry))
scala> subGraph.edges.collect
res8: Array[org.apache.spark.graphx.Edge[String]] = Array(Edge(1,2,Colleague), Edge(4,5,Colleague), Edge(4,6,Colleague))
代码8.12
def aggregateMessages[Msg: ClassTag](
sendMsg: EdgeContext[VD, ED, Msg] => Unit,
mergeMsg: (Msg, Msg) => Msg,
tripletFields: TripletFields = TripletFields.All)
: VertexRDD[Msg]
代码8.13
scala> myGraph.aggregateMessages[Int](_.sendToSrc(1), _+_).collect
res9: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((1,1), (2,2), (4,2), (5,1))
代码8.14
def pregel[A]
(initialMsg: A,
maxIter: Int = Int.MaxValue,
activeDir: EdgeDirection = EdgeDirection.Out)
(vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED]
代码8.15
scala> val g=Pregel(myGraph.mapVertices((vid,vd) => 0),0,activeDirection=EdgeDirection.Out)((id:VertexId, vd:Int, a:Int) => math.max(vd,a),(et:EdgeTriplet[Int,String]) => Iterator((et.dstId,et.srcAttr+1)),(a:Int,b:Int) => math.max(a,b))
scala> g.vertices.collect
res10: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((1,0), (2,1), (3,2), (4,2), (5,3), (6,4))
代码8.16
//定义sendMsg函数
scala> def sendMsg(ec:EdgeContext[Int,String,Int]):Unit = {
ec.sendToDst(ec.srcAttr+1)
}
//定义mergeMsg函数
scala> def mergeMsg(a:Int,b:Int):Int = {
math.max(a,b)
}
scala> def propagateEdgeCount(g:Graph[Int,String]):Graph[Int,String] = {
val verts = g.aggregateMessages[Int](sendMsg,mergeMsg)//生成新的顶点集
val g2 = Graph(verts,g.edges)//根据新顶点集生成一个新图
//将新图g2和原图g连接,查看顶点的距离值是否有变化
val check = g2.vertices.join(g.vertices).
map(x=>x._2._1-x._2._2).
reduce(_+_)
//判断距离变化,如果有变化,则继续递归,否则返回新的图对象
if(check>0)
propagateEdgeCount(g2)
else
g
}
//初始化距离值,将每个顶点的值设置为0
scala> val newGraph = myGraph.mapVertices((_,_)=>0)
scala> propagateEdgeCount(newGraph).vertices.collect
res11: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((1,0), (2,1), (3,2), (4,2), (5,3), (6,4))
代码8.17
package org.apache.spark.examples.graphx
import org.apache.spark.graphx.GraphLoader
import org.apache.spark.sql.SparkSession
object PageRankExample {
def main(args: Array[String]): Unit = {
// 创建一个SparkSession.
val spark = SparkSession
.builder
//如果代码在本地计算机运行需要添加master("local")
.appName(s"${this.getClass.getSimpleName}").master("local")
.getOrCreate()
val sc = spark.sparkContext
// 加载边数据,创建Graph
val graph = GraphLoader.edgeListFile(sc, "followers.txt")
// 运行 PageRank
val ranks = graph.pageRank(0.0001).vertices
// 将排名与用户名连接,连接后输出结果
val users = sc.textFile("users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val ranksByUsername = users.join(ranks).map {
case (id, (username, rank)) => (username, rank)
}
println(ranksByUsername.collect().mkString("\n"))
spark.stop()
}
}
代码8.18
scala> import org.apache.spark.graphx.{GraphLoader, PartitionStrategy}
import org.apache.spark.graphx.{GraphLoader, PartitionStrategy}
scala> val graph = GraphLoader.edgeListFile(sc, "/usr/local/spark-2.3.0-bin-hadoop2.7/data/graphx/followers.txt", true).partitionBy(PartitionStrategy.RandomVertexCut)
graph: org.apache.spark.graphx.Graph[Int,Int] = org.apache.spark.graphx.impl.GraphImpl@428169d
scala> val triCounts = graph.triangleCount().vertices //对每个顶点计算三角形数
triCounts: org.apache.spark.graphx.VertexRDD[Int] = VertexRDDImpl[128] at RDD at VertexRDD.scala:57
//将三角形数和用户名相联系
scala> val users = sc.textFile("/usr/local/spark-2.3.0-bin-hadoop2.7/data/graphx/users.txt").map {line =>
| val fields = line.split(",")
| (fields(0).toLong, fields(1))
| }
users: org.apache.spark.rdd.RDD[(Long, String)] = MapPartitionsRDD[133] at map at <console>:27
scala> val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) =>
| (username, tc)
| }
triCountByUsername: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[137] at map at <console>:30
//输出结果
scala> println(triCountByUsername.collect().mkString("\n"))
(justinbieber,0)
(BarackObama,0)
(matei_zaharia,1)
(jeresig,1)
(odersky,1)
(ladygaga,0)
代码8.19
scala> import org.apache.spark.graphx._ //将GraphX包导入
import org.apache.spark.graphx._
//构建连通分量示例图,并缓存
scala> val g=Graph(sc.makeRDD((1L to 7L).map((_,""))),
sc.makeRDD(Array(Edge(2L,5L,""),Edge(5L,3L,""),Edge(3L,2L,""),
Edge(4L,5L,""),Edge(6L,7L,"")))).cache
//使用连通分量算法,并通过map变换操作和ID分组显示结果
scala> g.connectedComponents.vertices.map(_.swap).groupByKey.map(_._2).collect
res12: Array[Iterable[org.apache.spark.graphx.VertexId]] = Array(CompactBuffer(1), CompactBuffer(6, 7), CompactBuffer(4, 3, 5, 2))
代码8.20
scala> import org.apache.spark.graphx._
import org.apache.spark.graphx._
//构造VertexRDD
scala> val v = sc.makeRDD(Array((1L,""),(2L,""),(3L,""),(4L,""),(5L,""),(6L,""),(7L,""),(8L,"")))
v: org.apache.spark.rdd.RDD[(Long, String)] = ParallelCollectionRDD[0] at makeRDD at <console>:27
//构造EdgeRDD
scala> val e = sc.makeRDD(Array(Edge(1L,2L,""),Edge(2L,3L,""),Edge(3L,4L,""),
Edge(4L,1L,""),Edge(1L,3L,""),Edge(2L,4L,""),Edge(4L,5L,""),
Edge(5L,6L,""),Edge(6L,7L,""),Edge(7L,8L,""),Edge(8L,5L,""),
Edge(5L,7L,""),Edge(6L,8L,"")))
e: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[String]] = ParallelCollectionRDD[1] at makeRDD at <console>:27
//调用LabelPropagation的run()方法,并根据顶点Id从小到大显示结果
scala> lib.LabelPropagation.run(Graph(v,e),5).vertices.collect.sortWith(_._1<_._1)
res13: Array[(org.apache.spark.graphx.VertexId, org.apache.spark.graphx.VertexId)] = Array((1,2), (2,1), (3,2), (4,1), (5,2), (6,1), (7,2), (8,3))
代码8.21
scala> import org.apache.spark.graphx._ //导入GraphX包
import org.apache.spark.graphx._
//构建图中的EdgeRDD
scala> val edges = sc.makeRDD (Array(Edge(1L,5L,5.0), Edge(1L,6L,4.0), Edge(2L,6L,5.0), Edge(2L,7L,5.0), Edge(3L,5L,5.0), Edge(3L,6L,2.0), Edge(4L,5L,4.0), Edge(4L,6L,4.0)))
edges: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Double]] = ParallelCollectionRDD[0] at makeRDD at <console>:27
//算法指定超参数,参考参数设定表
scala> val conf=new lib.SVDPlusPlus.Conf(2,10,0,5,0.007,0.007,0.005,0.015)
conf: org.apache.spark.graphx.lib.SVDPlusPlus.Conf = org.apache.spark.graphx.lib.SVDPlusPlus$Conf@2674ca88
//运行SVD++算法,获得返回的模型—输入图的再处理结果和数据集的平均打分情况
scala> val (g,mean)=lib.SVDPlusPlus.run(edges,conf)
g: org.apache.spark.graphx.Graph[(Array[Double], Array[Double], Double, Double),Double] = org.apache.spark.graphx.impl.GraphImpl@3ce4eb42
mean: Double = 4.25
//定义pred()函数,输入为模型的参数、用户id和需要预测的影片
scala> def pred (g:Graph[(Array[Double], Array[Double], Double, Double), Double ], mean:Double, u:Long, i:Long)={
val user=g.vertices.filter(_._1 == u).collect()(0)._2
val item=g.vertices.filter(_._1 == i).collect()(0)._2
mean+user._3+item._3+item._1.zip(user._2).map(x => x._1 * x._2).reduce(_+_)
}
pred: (g: org.apache.spark.graphx.Graph[(Array[Double], Array[Double], Double, Double),Double], mean: Double, u: Long, i: Long)Double
//SVDPlusPlus的一部分初始化是使用随机数的,所以对于同一份输入结果每次程序的预测结果不一定完全一样
scala> pred(g,mean,4L,7L)
res14: Double = 5.935786807208753
代码8.22
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.graphx.{Edge, Graph, TripletFields, VertexId}
import scala.reflect.ClassTag
object Paths { //单源最短路径
def dijkstra[VD: ClassTag](g : Graph[VD, Double], origin: VertexId) = {
//初始化,其中属性为(boolean, double,Long)类型,boolean用于标记是否访问过,double为顶点距离原点的距离,Long是上一个顶点的id
var g2 = g.mapVertices((vid, _) => (false, if(vid == origin) 0 else Double.MaxValue, -1L))
for(i <- 1L to g.vertices.count()) {
//从没有访问过的顶点中找出距离原点最近的点
val currentVertexId = g2.vertices.filter(! _._2._1).reduce((a,b) => if (a._2._2 < b._2._2) a else b)._1
//更新currentVertexId邻接顶点的‘double’值
val newDistances = g2.aggregateMessages[(Double, Long)](
triplet => if(triplet.srcId == currentVertexId && !triplet.dstAttr._1) { //只给未确定的顶点发送消息
triplet.sendToDst((triplet.srcAttr._2 + triplet.attr, triplet.srcId))
},(x, y) => if(x._1 < y._1) x else y ,
TripletFields.All)
g2 = g2.outerJoinVertices(newDistances) { //更新图形
case (vid, vd, Some(newSum)) => (vd._1 ||
vid == currentVertexId, math.min(vd._2, newSum._1), if(vd._2 <= newSum._1) vd._3 else newSum._2 )
case (vid, vd, None) => (vd._1|| vid == currentVertexId, vd._2, vd._3)
}
}
g.outerJoinVertices(g2.vertices)( (vid, srcAttr, dist) => (srcAttr, dist.getOrElse(false, Double.MaxValue, -1)._2) )
}
def main(args: Array[String]): Unit ={
val conf = new SparkConf().setAppName("ShortPaths").setMaster("local[4]")//指定四个本地线程数目,来模拟分布式集群
val sc = new SparkContext(conf) //屏蔽日志
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
val myVertices = sc.makeRDD(Array((1L, "A"), (2L, "B"), (3L, "C"), (4L, "D"), (5L, "E"), (6L, "F"), (7L, "G")))
val initialEdges = sc.makeRDD(Array(Edge(1L, 2L, 7.0), Edge(1L, 4L, 5.0),
Edge(2L, 3L, 8.0), Edge(2L, 4L, 9.0), Edge(2L, 5L, 7.0),Edge(3L, 5L, 5.0), Edge(4L, 5L, 15.0), Edge(4L, 6L, 6.0),Edge(5L, 6L, 8.0), Edge(5L, 7L, 9.0), Edge(6L, 7L, 11.0)))
val myEdges = initialEdges.filter(e => e.srcId != e.dstId).flatMap(e => Array(e, Edge(e.dstId, e.srcId, e.attr))).distinct() //去掉自循环边,有向图变为无向图,去除重复边
val myGraph = Graph(myVertices, myEdges).cache()
println(dijkstra(myGraph, 1L).vertices.map(x => (x._1, x._2)).collect().mkString(" | "))
}
}
代码8.23
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx._
object TSP {
def greedy[VD](g: Graph[VD, Double], origin: VertexId) = {
var g2: Graph[Boolean, (Double, Boolean)] = g.mapVertices((vid, vd) => vid == origin).mapTriplets {
et => (et.attr, false)
}
var nextVertexId = origin
var edgesAreAvailable = true
type tripletType = EdgeTriplet[Boolean, (Double, Boolean)]
do {
val availableEdges = g2.triplets.filter { et => !et.attr._2 && (et.srcId == nextVertexId && !et.dstAttr || et.dstId == nextVertexId && !et.srcAttr) }
edgesAreAvailable = availableEdges.count > 0
if (edgesAreAvailable) {
val smallestEdge = availableEdges.min()(new Ordering[tripletType]() {
override def compare(a: tripletType, b: tripletType) = {
Ordering[Double].compare(a.attr._1, b.attr._1)
}
})
nextVertexId = Seq(smallestEdge.srcId, smallestEdge.dstId).filter(_ != nextVertexId).head
g2 = g2.mapVertices((vid, vd) => vd || vid == nextVertexId).mapTriplets { et =>
(et.attr._1, et.attr._2 ||
( et.srcId == smallestEdge.srcId&& et.dstId == smallestEdge.dstId))
}
}
} while (edgesAreAvailable)
g2
}
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("ShortPaths").setMaster("local[4]")
val sc = new SparkContext(conf) //屏蔽日志
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
val myVertices = sc.makeRDD(Array((1L, "A"), (2L, "B"), (3L, "C"), (4L, "D"), (5L, "E"), (6L, "F"), (7L, "G")))
val initialEdges = sc.makeRDD(Array(Edge(1L, 2L, 7.0), Edge(1L, 4L, 5.0),
Edge(2L, 3L, 8.0), Edge(2L, 4L, 9.0), Edge(2L, 5L, 7.0), Edge(3L, 5L, 5.0), Edge(4L, 5L, 15.0), Edge(4L, 6L, 6.0),
Edge(5L, 6L, 8.0), Edge(5L, 7L, 9.0), Edge(6L, 7L, 11.0)))
val myEdges = initialEdges.filter(e => e.srcId != e.dstId).flatMap(e => Array(e, Edge(e.dstId, e.srcId, e.attr))).distinct()
val myGraph = Graph(myVertices, myEdges).cache()
println(greedy(myGraph, 1L).vertices.map(x => (x._1, x._2)).collect().mkString(" | "))
}
}
代码8.24
import TSP.greedy
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx._
object Prime {
//最小生成树
def prime[VD: scala.reflect.ClassTag](g: Graph[VD, Double], origin: VertexId) = {
//初始化,其中属性为(boolean, double,Long)类型,boolean用于标记是否访问过,//double为加入当前顶点的代价,Long是上一个顶点的id
var g2 = g.mapVertices((vid, _) => (false, if (vid == origin) 0 else Double.MaxValue, -1L))
for (i <- 1L to g.vertices.count()) {
//从没有访问过的顶点中找出 代价最小的点
val currentVertexId = g2.vertices.filter(!_._2._1).reduce((a, b) => if (a._2._2 < b._2._2) a else b)._1
//更新currentVertexId邻接顶点的“double”值
val newDistances = g2.aggregateMessages[(Double, Long)](
triplet => if (triplet.srcId == currentVertexId && !triplet.dstAttr._1) { //只给未确定的顶点发送消息
triplet.sendToDst((triplet.attr, triplet.srcId))
},
(x, y) => if (x._1 < y._1) x else y,
TripletFields.All
)
//更新图形
g2 = g2.outerJoinVertices(newDistances) {
case (vid, vd, Some(newSum)) => (vd._1 || vid == currentVertexId, math.min(vd._2, newSum._1), if (vd._2 <= newSum._1) vd._3 else newSum._2)
case (vid, vd, None) => (vd._1 || vid == currentVertexId, vd._2, vd._3)
}
}
g.outerJoinVertices(g2.vertices)((vid, srcAttr, dist) ==> (srcAttr, dist.getOrElse(false, Double.MaxValue, -1)._2,
dist.getOrElse(false, Double.MaxValue, -1)._3))
}
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("ShortPaths").setMaster("local[4]")
val sc = new SparkContext(conf) //屏蔽日志
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
val myVertices = sc.makeRDD(Array((1L, "A"), (2L, "B"), (3L, "C"), (4L, "D"), (5L, "E"), (6L, "F"), (7L, "G")))
val initialEdges = sc.makeRDD(Array(Edge(1L, 2L, 7.0), Edge(1L, 4L, 5.0),
Edge(2L, 3L, 8.0), Edge(2L, 4L, 9.0), Edge(2L, 5L, 7.0), Edge(3L, 5L, 5.0), Edge(4L, 5L, 15.0), Edge(4L, 6L, 6.0),
Edge(5L, 6L, 8.0), Edge(5L, 7L, 9.0), Edge(6L, 7L, 11.0)))
val myEdges = initialEdges.filter(e => e.srcId != e.dstId).flatMap(e => Array(e, Edge(e.dstId, e.srcId, e.attr))).distinct()
val myGraph = Graph(myVertices, myEdges).cache()
println(prime(myGraph,1L).vertices.map(x => (x._1, x._2)).collect().mkString(" | "))
}
}
代码8.25
scala> import org.apache.spark.graphx._ //导入GraphX包
scala> val graph=GraphLoader.edgeListFile(sc,"Cit-HepTh.txt")
scala> graph.inDegrees.reduce((a,b)=>if(a._2>b._2) a else b)
res15: (org.apache.spark.graphx.VertexId, Int) = (9711200,2414)
代码8.26
scala> graph.vertices.take(5) //首先查看图中的顶点数据格式,顶点属性默认值为1
res16: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((9405166,1), (108150,1), (110163,1), (204100,1), (9407099,1))
scala> val v=graph.pageRank(0.001).vertices
scala> v.reduce((a,b)=>if(a._2>b._2) a else b)
res17: (org.apache.spark.graphx.VertexId, Double) = (9207016,85.27317386053808)
scala> graph.vertices.take(5) //首先查看图中的顶点数据格式,顶点属性默认值为1
res18: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((9405166,1), (108150,1), (110163,1), (204100,1), (9407099,1))
scala> val v=graph.pageRank(0.001).vertices
scala> v.reduce((a,b)=>if(a._2>b._2) a else b)
res19: (org.apache.spark.graphx.VertexId, Double) = (9207016,85.27317386053808)
代码8.27
scala> graph.personalizedPageRank(9207016,0.001).vertices.filter(_._1!=9207016).reduce((a,b) => if(a._2>b._2) a else b)
res20: (org.apache.spark.graphx.VertexId, Double) = (9201015,0.09211875000000003)
代码8.28
import java.io.PrintWriter
import org.apache.spark.graphx.{Edge, EdgeDirection, Graph, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Twitter_test {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Twittter Influencer").setMaster("local[*]")
val sparkContext = new SparkContext(conf)
sparkContext.setLogLevel("ERROR")
//文本文件的路径根实际存放的位置决定
val twitterData = sparkContext.textFile("twitter-graph-data.txt")
//分别从文本文件中提取followee和follower的数据
val followeeVertices: RDD[(VertexId, String)] = twitterData.map(_.split(",")).map { arr =>
val user = arr(0).replace("((", "")
val id = arr(1).replace(")", "")
(id.toLong, user)
}
val followerVertices: RDD[(VertexId, String)] = twitterData.map(_.split(",")).map { arr =>
val user = arr(2).replace("(", "")
val id = arr(3).replace("))", "")
(id.toLong, user)
}
//接下来,使用Spark GraphX API从提取的数据创建图形。
val vertices = followeeVertices.union(followerVertices)
val edges: RDD[Edge[String]] = twitterData.map(_.split(",")).map { arr =>
val followeeId = arr(1).replace(")", "").toLong
val followerId = arr(3).replace("))", "").toLong
Edge(followeeId, followerId, "follow")
}
val defaultUser = ("") //提供了一个默认输入
val graph = Graph(vertices, edges, defaultUser)
//使用Spark GraphX的Pregel API和广度优先遍历算法
val subGraph = graph.pregel("", 2, EdgeDirection.In)((_, attr, msg) =>
attr + "," + msg,
triplet => Iterator((triplet.srcId, triplet.dstAttr)),
(a, b) => (a + "," + b))
//找到拥有最多followers of followers的用户
val lengthRDD = subGraph.vertices.map(vertex => (vertex._1, vertex._2.split(",").distinct.length - 2))
.max()(new Ordering[Tuple2[VertexId, Int]]() {
override def compare(x: (VertexId, Int), y: (VertexId, Int)): Int =
Ordering[Int].compare(x._2, y._2)
})
val userId = graph.vertices.filter(_._1 == lengthRDD._1).map(_._2).collect().head
println(userId + " has maximum influence on network with " + lengthRDD._2 + " influencers.")
val pw=new PrintWriter("Twitter_graph.gexf");
pw.close()
sparkContext.stop()
}
}