Skip to content

Commit f012581

Browse files
authored
Add "transformWith" to Task API which performs transformation by executing a task (#295)
Add transformWith to Task API which performs transformation by executing a task
1 parent 21188dd commit f012581

6 files changed

Lines changed: 139 additions & 1 deletion

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
1+
v5.1.2
2+
------
3+
14
v5.1.1
25
------
6+
* Add "transformWith" to Task API which performs transformation by executing a task
37

48
v5.1.0
59
------

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
version=5.1.0
1+
version=5.1.1
22
group=com.linkedin.parseq
33
org.gradle.parallel=true

subprojects/parseq/src/main/java/com/linkedin/parseq/Task.java

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -845,6 +845,82 @@ default <R> Task<R> transform(final Function1<Try<T>, Try<R>> func) {
845845
return transform("transform: " + _taskDescriptor.getDescription(func.getClass().getName()), func);
846846
}
847847

848+
/**
849+
* Create a new task that will transform the result of this task.
850+
* Returned task will complete with value calculated by a task returned by the function.
851+
*
852+
* This is similar to {@link #transform(String, Function1)} except the transformation is done by executing the task returned
853+
* by the function.
854+
*
855+
* <blockquote><pre>
856+
* boolean writeToDB(String content) {...}
857+
*
858+
* Task{@code <String>} pictureBase64= ...
859+
*
860+
* // this task will complete with either complete successfully
861+
* // with uploadResult being true or false, or fail with MyLibException
862+
* Task{@code <Boolean>} uploadResult = pictureBase64.transformWith("transformUsingATask", t {@code ->} {
863+
* if (!t.isFailed()) {
864+
* return Task.blocking(() -> writeToDB(t.get()), executor));
865+
* }
866+
* return Task.failure(new MyLibException(t.getError());
867+
* });
868+
* <img src="doc-files/transformWith-1.png" height="90" width="296"/>
869+
*
870+
* @param desc description
871+
* @param func function to be applied to the result of this task which returns new task
872+
* to be executed
873+
* @param <R> value type of the returned task returned by function <code>func<</code>
874+
* @return a new task which will apply given function on result of either successful and failed completion of this task
875+
* to get instance of a task which will be executed next
876+
*/
877+
default <R> Task<R> transformWith(final String desc, final Function1<Try<T>, Task<R>> func) {
878+
ArgumentUtil.requireNotNull(func, "function");
879+
final Task<T> that = this;
880+
Task<R> transformWithTask = async(desc, context -> {
881+
final SettablePromise<R> result = Promises.settable();
882+
final Task<R> transform = async("transform", ctx -> {
883+
final SettablePromise<R> transformResult = Promises.settable();
884+
if (that.isFailed() && (Exceptions.isCancellation(that.getError()))) {
885+
//cancellations will not be propagated as other errors to the function to get the task to execute.
886+
transformResult.fail(that.getError());
887+
}
888+
else {
889+
final Try<T> tryT = Promises.toTry(that);
890+
try {
891+
Task<R> r = func.apply(tryT);
892+
if (r == null) {
893+
throw new RuntimeException(desc + " returned null");
894+
}
895+
Promises.propagateResult(r, transformResult);
896+
ctx.run(r);
897+
} catch (Throwable t) {
898+
transformResult.fail(t);
899+
}
900+
}
901+
return transformResult;
902+
});
903+
transform.getShallowTraceBuilder().setSystemHidden(true);
904+
transform.getShallowTraceBuilder().setTaskType(TaskType.TRANSFORM.getName());
905+
Promises.propagateResult(transform, result);
906+
context.after(that).run(transform);
907+
context.run(that);
908+
return result;
909+
});
910+
transformWithTask.getShallowTraceBuilder().setTaskType(TaskType.WITH_TRANSFORM.getName());
911+
return transformWithTask;
912+
913+
}
914+
915+
/**
916+
* Equivalent to {@code transformWith("transformWith", func)}.
917+
* @see #transformWith(String, Function1)
918+
*/
919+
default <R> Task<R> transformWith(final Function1<Try<T>, Task<R>> func) {
920+
return transformWith("transform: " + _taskDescriptor.getDescription(func.getClass().getName()), func);
921+
}
922+
923+
848924
/**
849925
* Creates a new task that will handle failure of this task.
850926
* Early completion due to cancellation is not considered to be a failure so it will not be recovered.

subprojects/parseq/src/main/java/com/linkedin/parseq/TaskType.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ public enum TaskType {
1616
WITH_TIMEOUT ("withTimeout"),
1717
RECOVER ("recover"),
1818
WITH_RECOVER ("withRecover"),
19+
TRANSFORM ("transform"),
20+
WITH_TRANSFORM ("withTransform"),
1921
WITH_DELAY ("withDelay");
2022

2123
private final String _name;
26.1 KB
Loading

subprojects/parseq/src/test/java/com/linkedin/parseq/AbstractTaskTest.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,62 @@ public void testTransformFailureToFailure() {
439439
assertSame(transformed.getError(), failureReason);
440440
}
441441

442+
@Test
443+
public void testTransformWith_SuccessToFailure() {
444+
String msg = "transform failed";
445+
Task<String> success = getSuccessTask();
446+
Task<Integer> transformed = success.transformWith(tryT ->
447+
Task.callable(() -> {throw new RuntimeException(msg);}));
448+
try {
449+
runAndWait("AbstractTaskTest.testTransformWith_SuccessToFailure", transformed);
450+
} catch (Exception ex) {
451+
assertEquals(ex.getCause().getMessage(), msg);
452+
}
453+
}
454+
455+
@Test
456+
public void testTransformWith_SuccessToSuccess() {
457+
Task<String> success = getSuccessTask();
458+
Task<Integer> transformed = success.transformWith(tryT ->
459+
Task.callable(() -> tryT.get().length()));
460+
runAndWait("AbstractTaskTest.testTransformWith_SuccessToSuccess", transformed);
461+
assertEquals(transformed.get().intValue(), success.get().length());
462+
}
463+
464+
@Test
465+
public void testTransformWith_FailureToSuccess() {
466+
int returnValue = 100;
467+
Task<String> failed = getFailureTask();
468+
Task<Integer> transformed = failed.transformWith(tryT ->
469+
Task.callable(() -> returnValue));
470+
runAndWait("AbstractTaskTest.testTransformWith_FailureToSuccess", transformed);
471+
}
472+
473+
@Test
474+
public void testTransformWith_FailureToFailure() {
475+
String msg = "transform failed";
476+
Task<String> failed = getFailureTask();
477+
Task<Integer> transformed = failed.transformWith(tryT ->
478+
Task.callable(() -> {throw new RuntimeException(msg);}));
479+
try {
480+
runAndWait("AbstractTaskTest.testTransformWith_FailureToFailure", transformed);
481+
} catch (Exception ex) {
482+
assertEquals(ex.getCause().getMessage(), msg);
483+
}
484+
}
485+
486+
@Test
487+
public void testTransformWith_Cancelled() {
488+
Task<String> cancelled = getCancelledTask().recoverWith(e -> Task.callable("recover success", () -> "recovered"));
489+
try {
490+
runAndWait("AbstractTaskTest.testTransformWith_Cancelled", cancelled);
491+
fail("should have failed");
492+
} catch (Exception ex) {
493+
assertTrue(cancelled.isFailed());
494+
assertTrue(Exceptions.isCancellation(cancelled.getError()));
495+
}
496+
}
497+
442498
@Test
443499
public void testFlatten() {
444500
Task<Task<String>> nested = Task.callable(() -> getSuccessTask());

0 commit comments

Comments
 (0)