1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| import spark.sqlContext.implicits._ import org.apache.spark.sql._ import org.apache.spark.graphx.{Graph, VertexId, Edge}
val data = spark.sparkContext.textFile("/data/graph_sample").map{line => val items = line.split(" ") (items(0), items(1)) }.toDF("vi", "vj")
val dict = data.select("vi").union(data.select("vj")).distinct.rdd .zipWithIndex().map { case (Row(id: String), index) => (id, index) }.toDF("id", "vid")
val dictVi = dict.withColumnRenamed("id", "vi").withColumnRenamed("vid", "vid_i")
val dictVj = dict.withColumnRenamed("id", "vj").withColumnRenamed("vid", "vid_j")
val data2 = data.join(dictVi, Seq("vi")).join(dictVj, Seq("vj"))
val vertices = data2.select("vid_i") .union(data2.select("vid_j")) .distinct .map{case Row(id: VertexId)=>(id, "")}
val edges = data2.select("vid_i", "vid_j") .flatMap{ case Row(vidi: Long, vidj: Long) => Array(Edge(vidi, vidj, ""), Edge(vidj, vidi, "")) }
val g = Graph(vertices.rdd, edges.rdd, "")
val cc = g.connectedComponents()
val ret = cc.vertices.toDF("vid", "cid").join(dict, Seq("vid"))
|
评论系统未开启,无法评论!