-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathAmazonContainsReview.java
More file actions
252 lines (211 loc) · 9.35 KB
/
AmazonContainsReview.java
File metadata and controls
252 lines (211 loc) · 9.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
import java.io.IOException;
import java.util.regex.*;
import java.util.Set;
import java.util.Map;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.*;
import com.google.gson.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/***
* This Map-Reduce code will go through every Amazon review in rfox12:reviews
* It will then output data on the top-level JSON keys
*/
public class AmazonContainsReview extends Configured implements Tool {
// Just used for logging
protected static final Logger LOG = LoggerFactory.getLogger(AmazonContainsReview.class);
// This is the execution entry point for Java programs
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(HBaseConfiguration.create(), new AmazonContainsReview(), args);
System.exit(res);
}
public int run(String[] args) throws Exception {
if (args.length != 3) {
System.err.println("Need 3 argument (hdfs output path), got: " + args.length);
return -1;
}
// Now we create and configure a map-reduce "job"
Job job1 = Job.getInstance(getConf(), "average");
job1.setJarByClass(AmazonContainsReview.class);
// By default we are going to can every row in the table
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
// This helper will configure how table data feeds into the "map" method
TableMapReduceUtil.initTableMapperJob(
"rfox12:reviews", // input HBase table name
scan, // Scan instance to control CF and attribute selection
AverageMapReduceMapper.class, // Mapper class
Text.class, // Mapper output key
Text.class, // Mapper output value
job1, // This job
true // Add dependency jars (keep this to true)
);
// Specifies the reducer class to used to execute the "reduce" method after "map"
job1.setReducerClass(AverageMapReduceReducer.class);
// For file output (text -> number)
FileOutputFormat.setOutputPath(job1, new Path(args[0])); // The first argument must be an output path
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(Text.class);
// What for the job to complete
job1.waitForCompletion(true);
Job job2 = Job.getInstance(getConf(), "differnce");
job2.setJarByClass(AmazonContainsReview.class);
job2.setMapperClass(DiffMapReduceMapper.class);
job2.setReducerClass(DiffMapReduceReducer.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job2, new Path(args[0]));
FileOutputFormat.setOutputPath(job2, new Path(args[1]));
job2.waitForCompletion(true);
Job job3 = Job.getInstance(getConf(), "bins");
job3.setJarByClass(AmazonContainsReview.class);
job3.setMapperClass(TotalMapReduceMapper.class);
job3.setReducerClass(TotalMapReduceReducer.class);
job3.setOutputKeyClass(Text.class);
job3.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job3, new Path(args[1]));
FileOutputFormat.setOutputPath(job3, new Path(args[2]));
return job3.waitForCompletion(true) ? 0 : 1;
}
public static class AverageMapReduceMapper extends TableMapper<Text, Text> {
private static final Logger LOG = LoggerFactory.getLogger(AverageMapReduceMapper.class);
// Here are some static (hard coded) variables
private static final byte[] CF_NAME = Bytes.toBytes("cf"); // the "column family" name
private static final byte[] QUALIFIER = Bytes.toBytes("review_data"); // the column name
private Counter rowsProcessed; // This will count number of rows processed
private JsonParser parser; // This gson parser will help us parse JSON
// This setup method is called once before the task is started
@Override
protected void setup(Context context) {
parser = new JsonParser();
rowsProcessed = context.getCounter("average", "Rows Processed");
}
// This "map" method is called with every row scanned.
@Override
public void map(ImmutableBytesWritable rowKey, Result value, Context context) throws InterruptedException, IOException {
try {
// Here we get the json data (stored as a string) from the appropriate column
String jsonString = new String(value.getValue(CF_NAME, QUALIFIER));
// Now we parse the string into a JsonElement so we can dig into it
JsonElement jsonTree = parser.parse(jsonString);
JsonObject jsonObject = jsonTree.getAsJsonObject();
String productID = jsonObject.get("asin").getAsString();
String overall = jsonObject.get("overall").getAsString();
String reviewText = jsonObject.get("reviewText").getAsString();
String search1 = "reviews";
String search2 = "comments";
if (reviewText.toLowerCase().indexOf(search1.toLowerCase()) != -1 || reviewText.toLowerCase().indexOf(search2.toLowerCase()) != -1) {
context.write(new Text(productID + "Contains"),new Text(overall));
}
context.write(new Text(productID),new Text(overall));
rowsProcessed.increment(1);
} catch (Exception e) {
LOG.error("Error in MAP process: " + e.getMessage(), e);
}
}
}
// Reducer to simply sum up the values with the same key (text)
// The reducer will run until all values that have the same key are combined
public static class AverageMapReduceReducer extends Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
double sum = 0.0;
int count = 0;
for (Text value : values) {
sum += Double.valueOf(value.toString());
count++;
}
double average = sum/count;
context.write(key, new Text(String.valueOf(average)));
}
}
public static class DiffMapReduceMapper extends Mapper<Object, Text, Text, Text> {
private static final Logger LOG = LoggerFactory.getLogger(DiffMapReduceMapper.class);
private Counter rowsProcessed; // This will count number of rows processed
// This "map" method is called with every row scanned.
@Override
public void map(Object key, Text value, Context context) throws InterruptedException, IOException {
try {
// Here we get the json data (stored as a string) from the appropriate column
String [] kv = value.toString().split("\t");
String productID = kv[0].replace("Contains","");
String overall = kv[1];
context.write(new Text(productID),new Text(overall));
rowsProcessed.increment(1);
} catch (Exception e) {
LOG.error("Error in MAP process: " + e.getMessage(), e);
}
}
}
// Reducer to simply sum up the values with the same key (text)
// The reducer will run until all values that have the same key are combined
public static class DiffMapReduceReducer extends Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
int count = 0;
double diff = 0.0;
for (Text value : values) {
diff = Math.abs(diff - Double.valueOf(value.toString()));
count++;
}
if (count > 1) {
context.write(key, new Text(String.valueOf(diff)));
}
}
}
public static class TotalMapReduceMapper extends Mapper<Object, Text, Text, Text> {
private static final Logger LOG = LoggerFactory.getLogger(TotalMapReduceMapper.class);
private Counter rowsProcessed; // This will count number of rows processed
// This "map" method is called with every row scanned.
@Override
public void map(Object key, Text value, Context context) throws InterruptedException, IOException {
try {
// Here we get the json data (stored as a string) from the appropriate column
String [] kv = value.toString().split("\t");
String productID = kv[0];
String diff = kv[1];
int [] bins = new int[]{2,3,5};
String [] bin_labels = new String[]{"0<x≤2", "2<x≤3","3<x≤5"};
for (int i = 0; i <= bins.length; i++) {
if(Double.valueOf(diff) <= bins[i]) {
context.write(new Text(bin_labels[i]),new Text("1"));
break;
}
}
rowsProcessed.increment(1);
} catch (Exception e) {
LOG.error("Error in MAP process: " + e.getMessage(), e);
}
}
}
// Reducer to simply sum up the values with the same key (text)
// The reducer will run until all values that have the same key are combined
public static class TotalMapReduceReducer extends Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (Text value : values) {
sum++;
}
context.write(key, new Text(String.valueOf(sum)));
}
}
}