-
Notifications
You must be signed in to change notification settings - Fork 29
Expand file tree
/
Copy pathTableStatsSinglePathMain.scala
More file actions
73 lines (59 loc) · 1.94 KB
/
TableStatsSinglePathMain.scala
File metadata and controls
73 lines (59 loc) · 1.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package com.cloudera.sa.examples.tablestats
import com.cloudera.sa.examples.tablestats.model.{FirstPassStatsModel}
import org.apache.spark._
import org.apache.spark.sql.DataFrame
import scala.collection.mutable
/**
* Created by ted.malaska on 6/27/15.
*/
object TableStatsSinglePathMain {
def main(args: Array[String]): Unit = {
if (args.length == 0) {
println("TableStatsSinglePathMain <inputPath>")
return
}
val inputPath = args(0)
val runLocal = (args.length == 2 && args(1).eq("L"))
var sc:SparkContext = null
if (runLocal) {
val sparkConfig = new SparkConf()
sparkConfig.set("spark.broadcast.compress", "false")
sparkConfig.set("spark.shuffle.compress", "false")
sparkConfig.set("spark.shuffle.spill.compress", "false")
sc = new SparkContext("local", "TableStatsSinglePathMain", sparkConfig)
} else {
val sparkConfig = new SparkConf().setAppName("TableStatsSinglePathMain")
sc = new SparkContext(sparkConfig)
}
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
//Part A
var df = sqlContext.parquetFile(inputPath)
val firstPassStats = getFirstPassStat( df)
//Part E
println(firstPassStats)
//Part F
sc.stop()
}
def getFirstPassStat(df: DataFrame): FirstPassStatsModel = {
val schema = df.schema
//Part B
val columnValueCounts = df.flatMap{ r =>
(0 until schema.length).map { idx =>
//((columnIdx, cellValue), count)
((idx, r.get(idx)), 1l)
}
}.reduceByKey(_ + _) //This is like word count
//Part C
val firstPassStats = columnValueCounts.mapPartitions[FirstPassStatsModel]{it =>
val firstPassStatsModel = new FirstPassStatsModel()
it.foreach{ case ((columnIdx, columnVal), count) =>
firstPassStatsModel += (columnIdx, columnVal, count)
}
Iterator(firstPassStatsModel)
}.reduce{ (a, b) => //Part D
a += b
a
}
firstPassStats
}
}