Skip to content
This repository was archived by the owner on Dec 15, 2025. It is now read-only.

Commit 9e778a3

Browse files
committed
Add NWeight workload
1 parent f2542d8 commit 9e778a3

13 files changed

Lines changed: 387 additions & 2 deletions

File tree

bin/functions/hibench_prop_env_mapping.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,12 @@
115115
# For NWeight
116116
MODEL_INPUT="hibench.nweight.model_path",
117117
EDGES="hibench.workload.edges",
118+
DEGREE="hibench.nweight.degree",
119+
MAX_OUT_EDGES="hibench.nweight.max_out_edges",
120+
NUM_PARTITION="hibench.nweight.partitions",
121+
STORAGE_LEVEL="hibench.nweight.storage_level",
122+
DISABLE_KRYO="hibench.nweight.disable_kryo",
123+
MODEL="hibench.nweight.model",
118124

119125
# For streaming bench
120126
# zkHelper

conf/10-data-scale-profile.conf

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,8 +216,20 @@ hibench.dfsioe.bigdata.write.file_size 1000
216216

217217
#NWeight
218218
hibench.nweight.tiny.edges 100000
219+
hibench.nweight.tiny.degree 3
220+
hibench.nweight.tiny.max_out_edges 30
219221
hibench.nweight.small.edges 1000000
222+
hibench.nweight.small.degree 3
223+
hibench.nweight.small.max_out_edges 30
220224
hibench.nweight.large.edges 10000000
225+
hibench.nweight.large.degree 3
226+
hibench.nweight.large.max_out_edges 30
221227
hibench.nweight.huge.edges 100000000
228+
hibench.nweight.huge.degree 3
229+
hibench.nweight.huge.max_out_edges 30
222230
hibench.nweight.gigantic.edges 425000000
231+
hibench.nweight.gigantic.degree 3
232+
hibench.nweight.gigantic.max_out_edges 30
223233
hibench.nweight.bigdata.edges 4250000000
234+
hibench.nweight.bigdata.degree 3
235+
hibench.nweight.bigdata.max_out_edges 30

src/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
<jetty.version>8.1.14.v20131031</jetty.version>
3333
<scalatest.version>2.2.1</scalatest.version>
3434
<scalacheck.version>1.11.3</scalacheck.version>
35+
<fastutil.version>6.5.15</fastutil.version>
3536
</properties>
3637

3738
<repositories>

src/sparkbench/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,11 @@
9292
<artifactId>mahout-math</artifactId>
9393
<version>${mahout.version}</version>
9494
</dependency>
95+
<dependency>
96+
<groupId>it.unimi.dsi</groupId>
97+
<artifactId>fastutil</artifactId>
98+
<version>${fastutil.version}</version>
99+
</dependency>
95100
</dependencies>
96101
<build>
97102
<plugins>
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package com.intel.sparkbench.nweight
2+
3+
import org.apache.spark.{SparkContext, SparkConf}
4+
import org.apache.spark.SparkContext._
5+
import org.apache.spark.rdd.RDD
6+
import org.apache.spark.storage.StorageLevel
7+
import org.apache.spark.scheduler.{JobLogger, StatsReportListener}
8+
9+
import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer}
10+
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
11+
import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
12+
13+
/**
14+
* Compute NWeight for Graph G(V, E) as defined below
15+
* Weight(1)(u, v) = edge(u, v)
16+
* Weight(n)(u, v) = Sum (over {x|there are edges (u, x) and (x, v)}) Weight(n-1)(u, x)*Weight(1)(x, v)
17+
*
18+
* Input is given in Text file format. Each line represents a Node and all out edges of that node (edge weight specified)
19+
* <vertex> <vertex1>:<weight1>, <vertex2>:<weight2> ...)
20+
*/
21+
object NWeight extends Serializable{
22+
23+
def parseArgs(args: Array[String]) = {
24+
if (args.length < 7) {
25+
System.err.println("Usage: <input> <output> <step> <max Out edges> " +
26+
"<no. of result partitions> <storageLevel> <model>")
27+
System.exit(1)
28+
}
29+
val input = args(0)
30+
val output = args(1)
31+
val step = args(2).toInt
32+
val maxDegree = args(3).toInt
33+
val numPartitions = args(4).toInt
34+
val storageLevel = args(5).toInt match {
35+
case 0 => StorageLevel.OFF_HEAP
36+
case 1 => StorageLevel.DISK_ONLY
37+
case 2 => StorageLevel.DISK_ONLY_2
38+
case 3 => StorageLevel.MEMORY_ONLY
39+
case 4 => StorageLevel.MEMORY_ONLY_2
40+
case 5 => StorageLevel.MEMORY_ONLY_SER
41+
case 6 => StorageLevel.MEMORY_ONLY_SER_2
42+
case 7 => StorageLevel.MEMORY_AND_DISK
43+
case 8 => StorageLevel.MEMORY_AND_DISK_2
44+
case 9 => StorageLevel.MEMORY_AND_DISK_SER
45+
case 10 => StorageLevel.MEMORY_AND_DISK_SER_2
46+
case _ => StorageLevel.MEMORY_AND_DISK
47+
}
48+
val disableKryo = args(6).toBoolean
49+
val model = args(7)
50+
51+
(input, output, step, maxDegree, numPartitions, storageLevel, disableKryo, model)
52+
}
53+
54+
def main(args: Array[String]) {
55+
val (input, output, step, maxDegree, numPartitions, storageLevel, disableKryo, model) = parseArgs(args)
56+
57+
if(!disableKryo) {
58+
System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
59+
}
60+
val sparkConf = new SparkConf()
61+
if (model.toLowerCase == "graphx")
62+
sparkConf.setAppName("NWeightGraphX")
63+
else
64+
sparkConf.setAppName("NWeightPregel")
65+
val sc = new SparkContext(sparkConf)
66+
67+
sc.addSparkListener(new JobLogger)
68+
sc.addSparkListener(new StatsReportListener)
69+
70+
if (model.toLowerCase == "graphx") {
71+
GraphxNWeight.nweight(sc, input, output, step, maxDegree, numPartitions, storageLevel)
72+
} else {
73+
PregelNWeight.nweight(sc, input, output, step, maxDegree, numPartitions, storageLevel)
74+
}
75+
}
76+
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package com.intel.sparkbench.nweight
2+
3+
import scala.collection.JavaConversions._
4+
import org.apache.spark.SparkContext
5+
import org.apache.spark.SparkContext._
6+
import org.apache.spark.rdd.RDD
7+
import org.apache.spark.HashPartitioner
8+
import org.apache.spark.storage.StorageLevel
9+
import org.apache.spark.graphx._
10+
import org.apache.spark.graphx.impl.GraphImpl
11+
import it.unimi.dsi.fastutil.longs.Long2DoubleOpenHashMap
12+
13+
/** * Compute NWeight for Graph G(V, E) as defined below * Weight(1)(u, v) = edge(u, v)
14+
* Weight(n)(u, v) = Sum (over {x|there are edges (u, x) and (x, v)}) Weight(n-1)(u, x)*Weight(1)(x, v)
15+
*
16+
* Input is given in Text file format. Each line represents a Node and all out edges of that node (edge weight specified)
17+
* <vertex> <vertex1>:<weight1>, <vertex2>:<weight2> ...)
18+
*/
19+
20+
object GraphxNWeight extends Serializable{
21+
22+
def mapF(edge: EdgeTriplet[SizedPriorityQueue, Double]) = {
23+
val m = new Long2DoubleOpenHashMap()
24+
val w1 = edge.attr
25+
val id = edge.srcId
26+
edge.dstAttr.foreach{ case (target, wn) =>
27+
if (target != id)
28+
m.put(target, wn*w1)
29+
}
30+
Iterator((id, m))
31+
}
32+
33+
def reduceF(c1: Long2DoubleOpenHashMap, c2: Long2DoubleOpenHashMap) = {
34+
c2.long2DoubleEntrySet()
35+
.fastIterator()
36+
.foreach(pair => c1.put(pair.getLongKey(), c1.get(pair.getLongKey()) + pair.getDoubleValue()))
37+
c1
38+
}
39+
40+
def updateF (id: VertexId, vdata: SizedPriorityQueue, msg: Option[Long2DoubleOpenHashMap]) = {
41+
vdata.clear()
42+
val weightMap = msg.orNull
43+
if (weightMap != null) {
44+
weightMap.long2DoubleEntrySet().fastIterator().foreach { pair =>
45+
val src = pair.getLongKey()
46+
val wn = pair.getDoubleValue()
47+
vdata.enqueue((src, wn))
48+
}
49+
}
50+
vdata
51+
}
52+
53+
def nweight(sc: SparkContext, input: String, output: String, step: Int,
54+
maxDegree: Int, numPartitions: Int, storageLevel: StorageLevel) {
55+
56+
//val start1 = System.currentTimeMillis
57+
val part = new HashPartitioner(numPartitions)
58+
val edges = sc.textFile(input, numPartitions).flatMap { line =>
59+
val fields = line.split("\\s+", 2)
60+
val src = fields(0).trim.toLong
61+
62+
fields(1).split("[,\\s]+").filter(_.isEmpty() == false).map { pairStr =>
63+
val pair = pairStr.split(":")
64+
val (dest, weight) = (pair(0).trim.toLong, pair(1).toDouble)
65+
(src, Edge(src, dest, weight))
66+
}
67+
}.partitionBy(part).map(_._2)
68+
69+
val vertices = edges.map { e =>
70+
(e.srcId, (e.dstId, e.attr))
71+
}.groupByKey(part).map { case (id, seq) =>
72+
val vdata = new SizedPriorityQueue(maxDegree)
73+
seq.foreach(vdata.enqueue(_))
74+
(id, vdata)
75+
}
76+
77+
var g = GraphImpl(vertices, edges, new SizedPriorityQueue(maxDegree), storageLevel, storageLevel).cache()
78+
79+
var msg: RDD[(VertexId, Long2DoubleOpenHashMap)] = null
80+
for (i <- 2 to step) {
81+
msg = g.mapReduceTriplets(mapF _, reduceF _, Some(g.vertices , EdgeDirection.In))
82+
g = g.outerJoinVertices(msg)(updateF _).persist(storageLevel)
83+
}
84+
85+
g.vertices.map { case (vid, vdata) =>
86+
var s = new StringBuilder
87+
s.append(vid)
88+
89+
vdata.foreach { r =>
90+
s.append(' ')
91+
s.append(r._1)
92+
s.append(':')
93+
s.append(r._2)
94+
}
95+
s.toString
96+
}.saveAsTextFile(output)
97+
}
98+
}
99+
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package com.intel.sparkbench.nweight
2+
3+
import scala.collection.JavaConversions._
4+
import org.apache.spark.SparkContext
5+
import org.apache.spark.SparkContext._
6+
import org.apache.spark.rdd.RDD
7+
import org.apache.spark.HashPartitioner
8+
import org.apache.spark.storage.StorageLevel
9+
import org.apache.spark.graphx._
10+
import org.apache.spark.graphx.impl.GraphImpl
11+
import it.unimi.dsi.fastutil.longs.Long2DoubleOpenHashMap
12+
13+
/** * Compute NWeight for Graph G(V, E) as defined below * Weight(1)(u, v) = edge(u, v)
14+
* Weight(n)(u, v) = Sum (over {x|there are edges (u, x) and (x, v)}) Weight(n-1)(u, x)*Weight(1)(x, v)
15+
*
16+
* Input is given in Text file format. Each line represents a Node and all out edges of that node (edge weight specified)
17+
* <vertex> <vertex1>:<weight1>, <vertex2>:<weight2> ...)
18+
*/
19+
20+
object PregelNWeight extends Serializable{
21+
22+
def sendMsg(edge: EdgeTriplet[SizedPriorityQueue, Double]) = {
23+
val m = new Long2DoubleOpenHashMap()
24+
val w1 = edge.attr
25+
val id = edge.srcId
26+
edge.dstAttr.foreach{ case (target, wn) =>
27+
if (target != id)
28+
m.put(target, wn*w1)
29+
}
30+
Iterator((id, m))
31+
}
32+
33+
def mergMsg(c1: Long2DoubleOpenHashMap, c2: Long2DoubleOpenHashMap) = {
34+
c2.long2DoubleEntrySet()
35+
.fastIterator()
36+
.foreach(pair => c1.put(pair.getLongKey(), c1.get(pair.getLongKey()) + pair.getDoubleValue()))
37+
c1
38+
}
39+
40+
def vProg(id: VertexId, vdata: SizedPriorityQueue, msg: Long2DoubleOpenHashMap) = {
41+
vdata.clear()
42+
if (msg.size > 0) {
43+
msg.long2DoubleEntrySet().fastIterator().foreach { pair =>
44+
val src = pair.getLongKey()
45+
val wn = pair.getDoubleValue()
46+
vdata.enqueue((src, wn))
47+
}
48+
vdata
49+
} else {
50+
vdata.enqueue((id, 1))
51+
vdata
52+
}
53+
}
54+
55+
def nweight(sc: SparkContext, input: String, output: String, step: Int,
56+
maxDegree: Int, numPartitions: Int, storageLevel: StorageLevel) {
57+
58+
//val start1 = System.currentTimeMillis
59+
val part = new HashPartitioner(numPartitions)
60+
val edges = sc.textFile(input, numPartitions).flatMap { line =>
61+
val fields = line.split("\\s+", 2)
62+
val src = fields(0).trim.toLong
63+
64+
fields(1).split("[,\\s]+").filter(_.isEmpty() == false).map { pairStr =>
65+
val pair = pairStr.split(":")
66+
val (dest, weight) = (pair(0).trim.toLong, pair(1).toDouble)
67+
(src, Edge(src, dest, weight))
68+
}
69+
}.partitionBy(part).map(_._2)
70+
71+
var g = GraphImpl(edges, new SizedPriorityQueue(maxDegree), storageLevel, storageLevel).cache()
72+
73+
g = Pregel(g, new Long2DoubleOpenHashMap, step, EdgeDirection.In)(vProg _, sendMsg _, mergMsg _)
74+
75+
g.vertices.map { case (vid, vdata) =>
76+
var s = new StringBuilder
77+
s.append(vid)
78+
79+
vdata.foreach { r =>
80+
s.append(' ')
81+
s.append(r._1)
82+
s.append(':')
83+
s.append(r._2)
84+
}
85+
s.toString
86+
}.saveAsTextFile(output)
87+
}
88+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package com.intel.sparkbench.nweight
2+
3+
import it.unimi.dsi.fastutil.objects.ObjectHeaps
4+
5+
class SizedPriorityQueue(
6+
val capacity:Int) extends Traversable[(Long, Double)] with Serializable {
7+
protected val buf = new Array[(Long, Double)](capacity)
8+
protected val comparator = new java.util.Comparator[(Long, Double)] with Serializable {
9+
override def compare(m1: (Long, Double), m2: (Long, Double)) = if (m1._2 < m2._2) -1 else (if (m1._2 > m2._2) 1 else (if (m1._1 < m2._1) -1 else (if (m1._1 > m2._1) 1 else 0)))
10+
}
11+
12+
protected var size_ = 0
13+
14+
override def size() = size_
15+
16+
def clear() {
17+
size_ = 0
18+
}
19+
20+
def fullySorted(): Array[(Long, Double)] = {
21+
import scala.collection.JavaConversions._
22+
val slicedBuf = buf.slice(0, size_ - 1)
23+
java.util.Arrays.sort(slicedBuf, comparator)
24+
slicedBuf
25+
}
26+
27+
def foreach[U](f: ((Long, Double)) => U): Unit = {
28+
for (i <- 0 until size_) f(buf(i))
29+
}
30+
31+
def enqueue(value: (Long, Double)) {
32+
if (size_ < capacity) {
33+
buf(size_) = value
34+
size_ = size_ + 1
35+
ObjectHeaps.upHeap(buf, size_, size_ - 1, comparator)
36+
} else if (comparator.compare(value, buf(0)) > 0) {
37+
buf(0) = value
38+
ObjectHeaps.downHeap(buf, size_, 0, comparator)
39+
}
40+
}
41+
42+
}
43+
44+
object SizedPriorityQueue {
45+
def apply(capacity :Int)(elems: (Long, Double)*) = {
46+
val q = new SizedPriorityQueue(capacity);
47+
for ((i, v) <- elems)
48+
q.enqueue(i, v);
49+
q
50+
}
51+
}

src/sparkbench/src/main/scala/com/intel/sparkbench/nweight/datagen/NWeightDataGenerator.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ object NWeightDataGenerator {
3838
println(s"Total Records: $totalNumRecords")
3939
} else {
4040
System.err.println(
41-
s"Usage: $NWeightDataGenerator <MODEL_PATH> <OUTPUT_PATH> <NUM_RECORDS> <NUM_PARTITIONS>"
41+
s"Usage: $NWeightDataGenerator <MODEL_PATH> <OUTPUT_PATH> <NUM_RECORDS>"
4242
)
4343
System.exit(1)
4444
}

workloads/nweight/conf/00-nweight-default.conf

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22
# override configurations here
33
hibench.nweight.edges ${hibench.nweight.${hibench.scale.profile}.edges}
44
hibench.nweight.base.hdfs ${hibench.hdfs.data.dir}/NWeight
5+
hibench.nweight.degree ${hibench.nweight.${hibench.scale.profile}.degree}
6+
hibench.nweight.max_out_edges ${hibench.nweight.${hibench.scale.profile}.max_out_edges}
7+
hibench.nweight.partitions ${hibench.default.map.parallelism}
8+
hibench.nweight.storage_level 7
9+
hibench.nweight.disable_kryo false
10+
hibench.nweight.model graphx
511

612
# export for shell script
713
hibench.workload.input ${hibench.nweight.base.hdfs}/${hibench.workload.dir.name.input}

0 commit comments

Comments
 (0)