logo头像
Snippet 博客主题

Graphx中处理字符串类型的ID

Graphx中的结点ID只能是Long型的,但是在实际的业务中有时会遇到字符串类型的ID,这时需要建立一个结点ID的映射。


生成样例网络

使用python随机生成100条字符串类型的边


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from random import randint

vertices = ['v_'+str(i) for i in range(1000)]

edges = []
while len(edges) != 100:
i = randint(0, 1000)
j = randint(0, 1000)
if i == j:
continue
else:
edges.append((vertices[i], vertices[j]))

for i, j in edges:
print '%s %s' % (i, j)

执行连通图算法

通过zipWithIndex()函数可以给每个节点进行编码,然后在构建网络时使用编码后的id来作为节点id


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import spark.sqlContext.implicits._
import org.apache.spark.sql._
import org.apache.spark.graphx.{Graph, VertexId, Edge}

// 加载数据,数据格式为每行:vi vj
val data = spark.sparkContext.textFile("/data/graph_sample").map{line =>
val items = line.split(" ")
(items(0), items(1))
}.toDF("vi", "vj")

// 建立映射
val dict = data.select("vi").union(data.select("vj")).distinct.rdd
.zipWithIndex().map {
case (Row(id: String), index) =>
(id, index)
}.toDF("id", "vid")

val dictVi = dict.withColumnRenamed("id", "vi").withColumnRenamed("vid", "vid_i")

val dictVj = dict.withColumnRenamed("id", "vj").withColumnRenamed("vid", "vid_j")

val data2 = data.join(dictVi, Seq("vi")).join(dictVj, Seq("vj"))

// 构造网络
val vertices = data2.select("vid_i")
.union(data2.select("vid_j"))
.distinct
.map{case Row(id: VertexId)=>(id, "")}

val edges = data2.select("vid_i", "vid_j")
.flatMap{
case Row(vidi: Long, vidj: Long) =>
Array(Edge(vidi, vidj, ""), Edge(vidj, vidi, ""))
}

val g = Graph(vertices.rdd, edges.rdd, "")

// 求联通子图
val cc = g.connectedComponents()

// 结点ID映射回原来的ID
val ret = cc.vertices.toDF("vid", "cid").join(dict, Seq("vid"))

遇到的坑

在线上实际使用的时候,遇到过这样的一个问题:对于一个结点,开始时是一个ID,但是在执行过程中它的ID变了。猜测产生这个问题的原因是Catalyst优化错误导致的,最后采用了一种强行中断sql优化的方式:将映射好的ID存入hive中,然后再从hive读取进来。相关问题可以参考:GraphFrames的一个issue

评论系统未开启,无法评论!