|
| 1 | +package com.intel.sparkbench.nweight |
| 2 | + |
| 3 | +import scala.collection.JavaConversions._ |
| 4 | +import org.apache.spark.SparkContext |
| 5 | +import org.apache.spark.SparkContext._ |
| 6 | +import org.apache.spark.rdd.RDD |
| 7 | +import org.apache.spark.HashPartitioner |
| 8 | +import org.apache.spark.storage.StorageLevel |
| 9 | +import org.apache.spark.graphx._ |
| 10 | +import org.apache.spark.graphx.impl.GraphImpl |
| 11 | +import it.unimi.dsi.fastutil.longs.Long2DoubleOpenHashMap |
| 12 | + |
| 13 | +/** * Compute NWeight for Graph G(V, E) as defined below * Weight(1)(u, v) = edge(u, v) |
| 14 | + * Weight(n)(u, v) = Sum (over {x|there are edges (u, x) and (x, v)}) Weight(n-1)(u, x)*Weight(1)(x, v) |
| 15 | + * |
| 16 | + * Input is given in Text file format. Each line represents a Node and all out edges of that node (edge weight specified) |
| 17 | + * <vertex> <vertex1>:<weight1>, <vertex2>:<weight2> ...) |
| 18 | + */ |
| 19 | + |
| 20 | +object GraphxNWeight extends Serializable{ |
| 21 | + |
| 22 | + def mapF(edge: EdgeTriplet[SizedPriorityQueue, Double]) = { |
| 23 | + val m = new Long2DoubleOpenHashMap() |
| 24 | + val w1 = edge.attr |
| 25 | + val id = edge.srcId |
| 26 | + edge.dstAttr.foreach{ case (target, wn) => |
| 27 | + if (target != id) |
| 28 | + m.put(target, wn*w1) |
| 29 | + } |
| 30 | + Iterator((id, m)) |
| 31 | + } |
| 32 | + |
| 33 | + def reduceF(c1: Long2DoubleOpenHashMap, c2: Long2DoubleOpenHashMap) = { |
| 34 | + c2.long2DoubleEntrySet() |
| 35 | + .fastIterator() |
| 36 | + .foreach(pair => c1.put(pair.getLongKey(), c1.get(pair.getLongKey()) + pair.getDoubleValue())) |
| 37 | + c1 |
| 38 | + } |
| 39 | + |
| 40 | + def updateF (id: VertexId, vdata: SizedPriorityQueue, msg: Option[Long2DoubleOpenHashMap]) = { |
| 41 | + vdata.clear() |
| 42 | + val weightMap = msg.orNull |
| 43 | + if (weightMap != null) { |
| 44 | + weightMap.long2DoubleEntrySet().fastIterator().foreach { pair => |
| 45 | + val src = pair.getLongKey() |
| 46 | + val wn = pair.getDoubleValue() |
| 47 | + vdata.enqueue((src, wn)) |
| 48 | + } |
| 49 | + } |
| 50 | + vdata |
| 51 | + } |
| 52 | + |
| 53 | + def nweight(sc: SparkContext, input: String, output: String, step: Int, |
| 54 | + maxDegree: Int, numPartitions: Int, storageLevel: StorageLevel) { |
| 55 | + |
| 56 | + //val start1 = System.currentTimeMillis |
| 57 | + val part = new HashPartitioner(numPartitions) |
| 58 | + val edges = sc.textFile(input, numPartitions).flatMap { line => |
| 59 | + val fields = line.split("\\s+", 2) |
| 60 | + val src = fields(0).trim.toLong |
| 61 | + |
| 62 | + fields(1).split("[,\\s]+").filter(_.isEmpty() == false).map { pairStr => |
| 63 | + val pair = pairStr.split(":") |
| 64 | + val (dest, weight) = (pair(0).trim.toLong, pair(1).toDouble) |
| 65 | + (src, Edge(src, dest, weight)) |
| 66 | + } |
| 67 | + }.partitionBy(part).map(_._2) |
| 68 | + |
| 69 | + val vertices = edges.map { e => |
| 70 | + (e.srcId, (e.dstId, e.attr)) |
| 71 | + }.groupByKey(part).map { case (id, seq) => |
| 72 | + val vdata = new SizedPriorityQueue(maxDegree) |
| 73 | + seq.foreach(vdata.enqueue(_)) |
| 74 | + (id, vdata) |
| 75 | + } |
| 76 | + |
| 77 | + var g = GraphImpl(vertices, edges, new SizedPriorityQueue(maxDegree), storageLevel, storageLevel).cache() |
| 78 | + |
| 79 | + var msg: RDD[(VertexId, Long2DoubleOpenHashMap)] = null |
| 80 | + for (i <- 2 to step) { |
| 81 | + msg = g.mapReduceTriplets(mapF _, reduceF _, Some(g.vertices , EdgeDirection.In)) |
| 82 | + g = g.outerJoinVertices(msg)(updateF _).persist(storageLevel) |
| 83 | + } |
| 84 | + |
| 85 | + g.vertices.map { case (vid, vdata) => |
| 86 | + var s = new StringBuilder |
| 87 | + s.append(vid) |
| 88 | + |
| 89 | + vdata.foreach { r => |
| 90 | + s.append(' ') |
| 91 | + s.append(r._1) |
| 92 | + s.append(':') |
| 93 | + s.append(r._2) |
| 94 | + } |
| 95 | + s.toString |
| 96 | + }.saveAsTextFile(output) |
| 97 | + } |
| 98 | +} |
| 99 | + |
0 commit comments