Skip to content
This repository was archived by the owner on Jan 3, 2023. It is now read-only.

Commit 035887f

Browse files
rui-mophilo-he
authored andcommitted
Set tmp dir for compression action by scheduler and make bufSize configurable with human-readable value (#2070)
1 parent bdfae36 commit 035887f

3 files changed

Lines changed: 47 additions & 14 deletions

File tree

smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/CompressionAction.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.smartdata.conf.SmartConfKeys;
3434
import org.smartdata.model.CompressionFileInfo;
3535
import org.smartdata.model.CompressionFileState;
36+
import org.smartdata.utils.StringUtil;
3637

3738
import java.io.IOException;
3839
import java.io.InputStream;
@@ -63,7 +64,6 @@ public class CompressionAction extends HdfsAction {
6364
public static final String BUF_SIZE = "-bufSize";
6465
public static final String COMPRESS_IMPL = "-compressImpl";
6566
private static List<String> compressionImplList = Arrays.asList("Lz4","Bzip2","Zlib","snappy");
66-
private static final String COMPRESS_DIR = "/system/ssm/compress_tmp";
6767

6868
private String filePath;
6969
private Configuration conf;
@@ -78,6 +78,9 @@ public class CompressionAction extends HdfsAction {
7878
private CompressionFileInfo compressionFileInfo;
7979
private CompressionFileState compressionFileState;
8080

81+
private String compressionTmpPath;
82+
public static final String COMPRESSION_TMP = "-compressionTmp";
83+
8184
@Override
8285
public void init(Map<String, String> args) {
8386
super.init(args);
@@ -89,14 +92,17 @@ public void init(Map<String, String> args) {
8992
SmartConfKeys.SMART_COMPRESSION_MAX_SPLIT,
9093
SmartConfKeys.SMART_COMPRESSION_MAX_SPLIT_DEFAULT);
9194
this.xAttrName = SmartConstants.SMART_FILE_STATE_XATTR_NAME;
92-
9395
this.filePath = args.get(FILE_PATH);
94-
if (args.containsKey(BUF_SIZE)) {
95-
this.UserDefinedbuffersize = Integer.valueOf(args.get(BUF_SIZE));
96+
if (args.containsKey(BUF_SIZE) && !args.get(BUF_SIZE).isEmpty()) {
97+
this.UserDefinedbuffersize = (int) StringUtil.parseToByte(args.get(BUF_SIZE));
9698
}
9799
if (args.containsKey(COMPRESS_IMPL)) {
98100
this.compressionImpl = args.get(COMPRESS_IMPL);
99101
}
102+
if (args.containsKey(COMPRESSION_TMP)) {
103+
// this is a temp file kept for compressing a file.
104+
this.compressionTmpPath = args.get(COMPRESSION_TMP);
105+
}
100106
}
101107

102108
@Override
@@ -120,8 +126,6 @@ protected void execute() throws Exception {
120126
if (srcFile.getLen() == 0) {
121127
compressionFileInfo = new CompressionFileInfo(false, compressionFileState);
122128
} else {
123-
String tempPath = COMPRESS_DIR + filePath + "_" + "aid" + getActionId() +
124-
"_" + System.currentTimeMillis();
125129
short replication = srcFile.getReplication();
126130
long blockSize = srcFile.getBlockSize();
127131
long fileSize = srcFile.getLen();
@@ -142,12 +146,12 @@ protected void execute() throws Exception {
142146

143147
DFSInputStream dfsInputStream = dfsClient.open(filePath);
144148

145-
OutputStream compressedOutputStream = dfsClient.create(tempPath,
149+
OutputStream compressedOutputStream = dfsClient.create(compressionTmpPath,
146150
true, replication, blockSize);
147151
compress(dfsInputStream, compressedOutputStream);
148-
HdfsFileStatus destFile = dfsClient.getFileInfo(tempPath);
152+
HdfsFileStatus destFile = dfsClient.getFileInfo(compressionTmpPath);
149153
compressionFileState.setCompressedLength(destFile.getLen());
150-
compressionFileInfo = new CompressionFileInfo(true, tempPath, compressionFileState);
154+
compressionFileInfo = new CompressionFileInfo(true, compressionTmpPath, compressionFileState);
151155
}
152156
compressionFileState.setBufferSize(bufferSize);
153157
appendLog("Final compression bufferSize = " + bufferSize);

smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/CompressionScheduler.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.slf4j.Logger;
2525
import org.slf4j.LoggerFactory;
2626
import org.smartdata.SmartContext;
27+
import org.smartdata.conf.SmartConf;
28+
import org.smartdata.conf.SmartConfKeys;
2729
import org.smartdata.hdfs.HadoopUtil;
2830
import org.smartdata.hdfs.action.HdfsAction;
2931
import org.smartdata.metastore.MetaStore;
@@ -34,6 +36,8 @@
3436
import org.smartdata.model.FileState;
3537
import org.smartdata.model.CompressionFileState;
3638
import org.smartdata.model.LaunchAction;
39+
import org.smartdata.model.action.ScheduleResult;
40+
import org.smartdata.protocol.message.LaunchCmdlet;
3741

3842
import java.io.IOException;
3943
import java.net.URI;
@@ -48,15 +52,25 @@ public class CompressionScheduler extends ActionSchedulerService {
4852
private final URI nnUri;
4953
private MetaStore metaStore;
5054
private static final List<String> actions = Arrays.asList("compress");
55+
public static String COMPRESSION_DIR;
56+
public static final String COMPRESSION_TMP = "-compressionTmp";
57+
public static final String COMPRESSION_TMP_DIR = "compress_tmp/";
58+
private SmartConf conf;
5159

5260
public static final Logger LOG =
5361
LoggerFactory.getLogger(CompressionScheduler.class);
5462

5563
public CompressionScheduler(SmartContext context, MetaStore metaStore)
5664
throws IOException {
5765
super(context, metaStore);
66+
this.conf = context.getConf();
5867
this.metaStore = metaStore;
5968
nnUri = HadoopUtil.getNameNodeUri(getContext().getConf());
69+
70+
String ssmTmpDir = conf.get(
71+
SmartConfKeys.SMART_WORK_DIR_KEY, SmartConfKeys.SMART_WORK_DIR_DEFAULT);
72+
ssmTmpDir = ssmTmpDir + (ssmTmpDir.endsWith("/") ? "" : "/");
73+
CompressionScheduler.COMPRESSION_DIR = ssmTmpDir + COMPRESSION_TMP_DIR;
6074
}
6175

6276
@Override
@@ -136,6 +150,16 @@ public boolean onSubmit(CmdletInfo cmdletInfo, ActionInfo actionInfo, int action
136150
}
137151
}
138152

153+
@Override
154+
public ScheduleResult onSchedule(CmdletInfo cmdletInfo, ActionInfo actionInfo,
155+
LaunchCmdlet cmdlet, LaunchAction action, int actionIndex) {
156+
// For compression, add compressionTmp argument
157+
String tmpName = createTmpName(action);
158+
action.getArgs().put(COMPRESSION_TMP, COMPRESSION_DIR + tmpName);
159+
actionInfo.getArgs().put(COMPRESSION_TMP, COMPRESSION_DIR + tmpName);
160+
return ScheduleResult.SUCCESS;
161+
}
162+
139163
@Override
140164
public void onActionFinished(CmdletInfo cmdletInfo, ActionInfo actionInfo, int actionIndex) {
141165
if (actionInfo.isFinished()) {

smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/action/TestCompressionAction.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,18 @@ public void init() throws Exception {
4040
super.init();
4141
}
4242

43-
protected void compressoin(String filePath, long bufferSize) throws IOException {
43+
protected void compression(String filePath, String bufferSize) throws IOException {
4444
CompressionAction compressionAction = new CompressionAction();
4545
compressionAction.setDfsClient(dfsClient);
4646
compressionAction.setContext(smartContext);
4747
Map<String, String> args = new HashMap<>();
4848
args.put(compressionAction.FILE_PATH, filePath);
49-
args.put(compressionAction.BUF_SIZE, "" + bufferSize);
49+
args.put(compressionAction.BUF_SIZE, bufferSize);
50+
// set a tmp dir for compression
51+
String COMPRESS_DIR = "/system/ssm/compress_tmp";
52+
String tempPath = COMPRESS_DIR + filePath + "_" + "aid" + compressionAction.getActionId()
53+
+ "_" + System.currentTimeMillis();
54+
args.put(compressionAction.COMPRESSION_TMP, tempPath);
5055
// args.put(CompressionAction.COMPRESS_IMPL, "Lz4");
5156
// args.put(CompressionAction.COMPRESS_IMPL,"Bzip2");
5257
// args.put(CompressionAction.COMPRESS_IMPL,"Zlib");
@@ -70,24 +75,24 @@ public void testInit() throws IOException {
7075

7176
@Test
7277
public void testExecute() throws Exception {
73-
7478
String filePath = "/testCompressFile/fadsfa/213";
7579
int bufferSize = 1024 * 128;
7680
// String compressionImpl = "Lz4";
7781
// String compressionImpl = "Bzip2";
7882
// String compressionImpl = "Zlib";
7983
byte[] bytes = TestCompressionAction.BytesGenerator.get(bufferSize);
80-
8184
short replication = 3;
8285
long blockSize = DEFAULT_BLOCK_SIZE;
86+
8387
// Create HDFS file
8488
OutputStream outputStream = dfsClient.create(filePath, true,
8589
replication, blockSize);
8690
outputStream.write(bytes);
8791
outputStream.close();
8892

8993
// Generate compressed file
90-
compressoin(filePath, bufferSize);
94+
String bufferSizeForCompression = "10MB";
95+
compression(filePath, bufferSizeForCompression);
9196

9297
// Check HdfsFileStatus
9398
HdfsFileStatus fileStatus = dfsClient.getFileInfo(filePath);

0 commit comments

Comments
 (0)