From 6789b31ba2ada6062521d356d7e30dbfeeab611e Mon Sep 17 00:00:00 2001 From: Sigee Date: Fri, 20 Sep 2024 11:38:10 +0200 Subject: [PATCH 1/4] Replace String.indexOf() calls with String.contains() method for improved readability --- rsc/src/test/java/org/apache/livy/rsc/rpc/TestRpc.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rsc/src/test/java/org/apache/livy/rsc/rpc/TestRpc.java b/rsc/src/test/java/org/apache/livy/rsc/rpc/TestRpc.java index d9c90f5d7..fdf13254d 100644 --- a/rsc/src/test/java/org/apache/livy/rsc/rpc/TestRpc.java +++ b/rsc/src/test/java/org/apache/livy/rsc/rpc/TestRpc.java @@ -108,7 +108,7 @@ public void testClientServer() throws Exception { client.call(new ErrorCall(errorMsg)).get(10, TimeUnit.SECONDS); } catch (ExecutionException ee) { assertTrue(ee.getCause() instanceof RpcException); - assertTrue(ee.getCause().getMessage().indexOf(errorMsg) >= 0); + assertTrue(ee.getCause().getMessage().contains(errorMsg)); } // Test from server to client too. @@ -169,7 +169,7 @@ public void testNotDeserializableRpc() throws Exception { client.call(new NotDeserializable(42)).get(10, TimeUnit.SECONDS); } catch (ExecutionException ee) { assertTrue(ee.getCause() instanceof RpcException); - assertTrue(ee.getCause().getMessage().indexOf("KryoException") >= 0); + assertTrue(ee.getCause().getMessage().contains("KryoException")); } } From b25a60e3b96193481deebd9fb97fd74ef046e8db Mon Sep 17 00:00:00 2001 From: Sigee Date: Fri, 20 Sep 2024 11:40:58 +0200 Subject: [PATCH 2/4] Remove unnecessary explicit type parameters --- .../main/java/org/apache/livy/client/http/HttpClient.java | 2 +- .../main/java/org/apache/livy/client/http/HttpConf.java | 2 +- rsc/src/main/java/org/apache/livy/rsc/RSCClient.java | 4 ++-- rsc/src/main/java/org/apache/livy/rsc/RSCConf.java | 2 +- .../main/java/org/apache/livy/rsc/driver/RSCDriver.java | 2 +- rsc/src/main/java/org/apache/livy/rsc/rpc/Rpc.java | 4 ++-- rsc/src/test/java/org/apache/livy/rsc/TestJobHandle.java | 8 ++++---- 7 files changed, 12 insertions(+), 12 deletions(-) diff --git a/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java b/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java index f40148f94..1211a5288 100644 --- a/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java +++ b/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java @@ -170,7 +170,7 @@ public Void call() throws Exception { private JobHandleImpl sendJob(final String command, Job job) { final ByteBuffer serializedJob = serializer.serialize(job); - JobHandleImpl handle = new JobHandleImpl(config, conn, sessionId, executor, serializer); + JobHandleImpl handle = new JobHandleImpl<>(config, conn, sessionId, executor, serializer); handle.start(command, serializedJob); return handle; } diff --git a/client-http/src/main/java/org/apache/livy/client/http/HttpConf.java b/client-http/src/main/java/org/apache/livy/client/http/HttpConf.java index 2ae7475b5..0ce66b1cf 100644 --- a/client-http/src/main/java/org/apache/livy/client/http/HttpConf.java +++ b/client-http/src/main/java/org/apache/livy/client/http/HttpConf.java @@ -94,7 +94,7 @@ boolean isSpnegoEnabled() { // Maps deprecated key to DeprecatedConf with the same key. // There are no deprecated configs without alternatives currently. private static final Map deprecatedConfigs - = Collections.unmodifiableMap(new HashMap()); + = Collections.unmodifiableMap(new HashMap<>()); protected Map getConfigsWithAlternatives() { return configsWithAlternatives; diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java b/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java index ee9c9012f..51f84b4c5 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java +++ b/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java @@ -336,10 +336,10 @@ private class ClientProtocol extends BaseProtocol { JobHandleImpl submit(Job job) { final String jobId = UUID.randomUUID().toString(); - Object msg = new JobRequest(jobId, job); + Object msg = new JobRequest<>(jobId, job); final Promise promise = eventLoopGroup.next().newPromise(); - final JobHandleImpl handle = new JobHandleImpl(RSCClient.this, + final JobHandleImpl handle = new JobHandleImpl<>(RSCClient.this, promise, jobId); jobs.put(jobId, handle); diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java index 4c45956d7..3624307a5 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java +++ b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java @@ -168,7 +168,7 @@ public String findLocalAddress() throws IOException { // Maps deprecated key to DeprecatedConf with the same key. // There are no deprecated configs without alternatives currently. private static final Map deprecatedConfigs - = Collections.unmodifiableMap(new HashMap()); + = Collections.unmodifiableMap(new HashMap<>()); protected Map getConfigsWithAlternatives() { return configsWithAlternatives; diff --git a/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java b/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java index b5b99f624..dd2ede881 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java +++ b/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java @@ -394,7 +394,7 @@ Serializer serializer() { void jobFinished(String jobId, T result, Throwable error) { LOG.debug("Send job({}) result to Client.", jobId); - broadcast(new JobResult(jobId, result, error)); + broadcast(new JobResult<>(jobId, result, error)); } void jobStarted(String jobId) { diff --git a/rsc/src/main/java/org/apache/livy/rsc/rpc/Rpc.java b/rsc/src/main/java/org/apache/livy/rsc/rpc/Rpc.java index 5fce16410..e4871f528 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/rpc/Rpc.java +++ b/rsc/src/main/java/org/apache/livy/rsc/rpc/Rpc.java @@ -108,7 +108,7 @@ public static Promise createClient( .connect(host, port); final Promise promise = eloop.next().newPromise(); - final AtomicReference rpc = new AtomicReference(); + final AtomicReference rpc = new AtomicReference<>(); // Set up a timeout to undo everything. final Runnable timeoutTask = new Runnable() { @@ -221,7 +221,7 @@ static Rpc createEmbedded(RpcDispatcher dispatcher) { private volatile RpcDispatcher dispatcher; private final Map, Method> handlers = new ConcurrentHashMap<>(); - private final Collection rpcCalls = new ConcurrentLinkedQueue(); + private final Collection rpcCalls = new ConcurrentLinkedQueue<>(); private volatile Rpc.MessageHeader lastHeader; private Rpc(RSCConf config, Channel channel, EventExecutorGroup egroup) { diff --git a/rsc/src/test/java/org/apache/livy/rsc/TestJobHandle.java b/rsc/src/test/java/org/apache/livy/rsc/TestJobHandle.java index e6161ed81..ac4cb7ebd 100644 --- a/rsc/src/test/java/org/apache/livy/rsc/TestJobHandle.java +++ b/rsc/src/test/java/org/apache/livy/rsc/TestJobHandle.java @@ -38,7 +38,7 @@ public class TestJobHandle { @Test public void testStateChanges() throws Exception { - JobHandleImpl handle = new JobHandleImpl(client, promise, "job"); + JobHandleImpl handle = new JobHandleImpl<>(client, promise, "job"); handle.addListener(listener); assertTrue(handle.changeState(JobHandle.State.QUEUED)); @@ -57,7 +57,7 @@ public void testStateChanges() throws Exception { @Test public void testFailedJob() throws Exception { - JobHandleImpl handle = new JobHandleImpl(client, promise, "job"); + JobHandleImpl handle = new JobHandleImpl<>(client, promise, "job"); handle.addListener(listener); Throwable cause = new Exception(); @@ -70,7 +70,7 @@ public void testFailedJob() throws Exception { @Test public void testSucceededJob() throws Exception { - JobHandleImpl handle = new JobHandleImpl(client, promise, "job"); + JobHandleImpl handle = new JobHandleImpl<>(client, promise, "job"); handle.addListener(listener); Object result = new Exception(); @@ -83,7 +83,7 @@ public void testSucceededJob() throws Exception { @Test public void testImmediateCallback() throws Exception { - JobHandleImpl handle = new JobHandleImpl(client, promise, "job"); + JobHandleImpl handle = new JobHandleImpl<>(client, promise, "job"); assertTrue(handle.changeState(JobHandle.State.QUEUED)); handle.addListener(listener); verify(listener).onJobQueued(handle); From a7d5075122de458d7a3f5538b708a4bc2504e351 Mon Sep 17 00:00:00 2001 From: Sigee Date: Fri, 20 Sep 2024 11:47:13 +0200 Subject: [PATCH 3/4] Replace try-finally statement with try-with-resources --- .../main/java/org/apache/livy/LivyClientBuilder.java | 5 +---- .../main/java/org/apache/livy/rsc/ContextLauncher.java | 10 ++-------- .../java/org/apache/livy/rsc/driver/RSCDriver.java | 7 ++----- .../apache/livy/rsc/driver/RSCDriverBootstrapper.java | 5 +---- .../test/java/org/apache/livy/rsc/TestSparkClient.java | 4 +--- .../java/org/apache/livy/test/apps/SimpleSparkApp.java | 5 +---- 6 files changed, 8 insertions(+), 28 deletions(-) diff --git a/api/src/main/java/org/apache/livy/LivyClientBuilder.java b/api/src/main/java/org/apache/livy/LivyClientBuilder.java index 1d7ec0183..8b6fe8167 100644 --- a/api/src/main/java/org/apache/livy/LivyClientBuilder.java +++ b/api/src/main/java/org/apache/livy/LivyClientBuilder.java @@ -85,11 +85,8 @@ public LivyClientBuilder(boolean loadDefaults) throws IOException { for (String file : confFiles) { URL url = classLoader().getResource(file); if (url != null) { - Reader r = new InputStreamReader(url.openStream(), UTF_8); - try { + try (Reader r = new InputStreamReader(url.openStream(), UTF_8)) { config.load(r); - } finally { - r.close(); } } } diff --git a/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java b/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java index c59136d55..1251a0f04 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java +++ b/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java @@ -286,11 +286,8 @@ private static File writeConfToFile(RSCConf conf) throws IOException { File sparkDefaults = new File(confDir + File.separator + "spark-defaults.conf"); if (sparkDefaults.isFile()) { Properties sparkConf = new Properties(); - Reader r = new InputStreamReader(new FileInputStream(sparkDefaults), UTF_8); - try { + try (Reader r = new InputStreamReader(new FileInputStream(sparkDefaults), UTF_8)) { sparkConf.load(r); - } finally { - r.close(); } for (String key : sparkConf.stringPropertyNames()) { @@ -305,11 +302,8 @@ private static File writeConfToFile(RSCConf conf) throws IOException { Files.setPosixFilePermissions(file.toPath(), EnumSet.of(OWNER_READ, OWNER_WRITE)); //file.deleteOnExit(); - Writer writer = new OutputStreamWriter(new FileOutputStream(file), UTF_8); - try { + try (Writer writer = new OutputStreamWriter(new FileOutputStream(file), UTF_8)) { confView.store(writer, "Livy App Context Configuration"); - } finally { - writer.close(); } return file; diff --git a/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java b/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java index dd2ede881..0e4fbc5ba 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java +++ b/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java @@ -206,16 +206,13 @@ public void onSaslComplete(Rpc client) { }); // The RPC library takes care of timing out this. - Rpc callbackRpc = Rpc.createClient(livyConf, server.getEventLoopGroup(), - launcherAddress, launcherPort, clientId, secret, this).get(); - try { + try (Rpc callbackRpc = Rpc.createClient(livyConf, server.getEventLoopGroup(), + launcherAddress, launcherPort, clientId, secret, this).get()) { callbackRpc.call(new RemoteDriverAddress(server.getAddress(), server.getPort())).get( livyConf.getTimeAsMs(RPC_CLIENT_HANDSHAKE_TIMEOUT), TimeUnit.MILLISECONDS); } catch (TimeoutException te) { LOG.warn("Timed out sending address to Livy server, shutting down."); throw te; - } finally { - callbackRpc.close(); } // At this point we install the idle timeout handler, in case the Livy server fails to connect diff --git a/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriverBootstrapper.java b/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriverBootstrapper.java index 0b591a3fc..761281aaf 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriverBootstrapper.java +++ b/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriverBootstrapper.java @@ -54,11 +54,8 @@ public static void main(String[] args) throws Exception { throw new IllegalArgumentException("File name " + fileName + "is not a legal file name."); } - Reader r = new InputStreamReader(new FileInputStream(propertyFile), UTF_8); - try { + try (Reader r = new InputStreamReader(new FileInputStream(propertyFile), UTF_8)) { props.load(r); - } finally { - r.close(); } break; diff --git a/rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java b/rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java index 6bf8f6ea7..c8e44f00a 100644 --- a/rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java +++ b/rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java @@ -403,8 +403,7 @@ void config(Properties conf) { public void testKillServerWhileSparkSubmitIsRunning() throws Exception { Properties conf = createConf(true); LivyClient client = null; - PipedInputStream stubStream = new PipedInputStream(new PipedOutputStream()); - try { + try (PipedInputStream stubStream = new PipedInputStream(new PipedOutputStream())) { Process mockSparkSubmit = mock(Process.class); when(mockSparkSubmit.getInputStream()).thenReturn(stubStream); when(mockSparkSubmit.getErrorStream()).thenReturn(stubStream); @@ -446,7 +445,6 @@ public Void answer(InvocationOnMock invocation) throws Throwable { throw e; } finally { ContextLauncher.mockSparkSubmit = null; - stubStream.close(); if (client != null) { client.stop(true); } diff --git a/test-lib/src/main/java/org/apache/livy/test/apps/SimpleSparkApp.java b/test-lib/src/main/java/org/apache/livy/test/apps/SimpleSparkApp.java index ef8e1a6cb..7675982f7 100644 --- a/test-lib/src/main/java/org/apache/livy/test/apps/SimpleSparkApp.java +++ b/test-lib/src/main/java/org/apache/livy/test/apps/SimpleSparkApp.java @@ -40,8 +40,7 @@ public static void main(String[] args) throws Exception { exitAfterOutput = Boolean.parseBoolean(args[1]); } - JavaSparkContext sc = new JavaSparkContext(); - try { + try (JavaSparkContext sc = new JavaSparkContext()) { List data = Arrays.asList("the", "quick", "brown", "fox", "jumped", "over", "the", "lazy", "dog"); @@ -54,8 +53,6 @@ public static void main(String[] args) throws Exception { Thread.sleep(60 * 60 * 1000); } } - } finally { - sc.close(); } } From 67b684afe1d487899e8271a999dd2917c2aa6a16 Mon Sep 17 00:00:00 2001 From: Sigee Date: Fri, 20 Sep 2024 11:54:29 +0200 Subject: [PATCH 4/4] Replace Anonymous types with lambda --- .../apache/livy/client/common/Serializer.java | 30 ++++--- .../apache/livy/client/http/HttpClient.java | 31 +++---- .../livy/client/http/JobHandleImpl.java | 42 ++++------ .../org/apache/livy/rsc/ContextLauncher.java | 84 +++++++------------ .../java/org/apache/livy/rsc/RSCClient.java | 15 ++-- .../main/java/org/apache/livy/rsc/Utils.java | 13 ++- .../org/apache/livy/rsc/driver/RSCDriver.java | 9 +- .../java/org/apache/livy/rsc/rpc/Rpc.java | 63 +++++--------- .../org/apache/livy/rsc/rpc/RpcServer.java | 9 +- .../org/apache/livy/rsc/TestSparkClient.java | 18 ++-- 10 files changed, 121 insertions(+), 193 deletions(-) diff --git a/client-common/src/main/java/org/apache/livy/client/common/Serializer.java b/client-common/src/main/java/org/apache/livy/client/common/Serializer.java index 2f879ac3d..3814c721c 100644 --- a/client-common/src/main/java/org/apache/livy/client/common/Serializer.java +++ b/client-common/src/main/java/org/apache/livy/client/common/Serializer.java @@ -18,6 +18,7 @@ package org.apache.livy.client.common; import java.io.ByteArrayOutputStream; +import java.lang.invoke.SerializedLambda; import java.nio.ByteBuffer; import com.esotericsoftware.kryo.Kryo; @@ -41,23 +42,20 @@ public class Serializer { private final ThreadLocal kryos; public Serializer(final Class... klasses) { - this.kryos = new ThreadLocal() { - @Override - protected Kryo initialValue() { - Kryo kryo = new Kryo(); - int count = 0; - for (Class klass : klasses) { - kryo.register(klass, REG_ID_BASE + count); - count++; - } - kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy( - new StdInstantiatorStrategy())); - kryo.register(java.lang.invoke.SerializedLambda.class); - kryo.register(ClosureSerializer.Closure.class, new ClosureSerializer()); - kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); - return kryo; + this.kryos = ThreadLocal.withInitial(() -> { + Kryo kryo = new Kryo(); + int count = 0; + for (Class klass : klasses) { + kryo.register(klass, REG_ID_BASE + count); + count++; } - }; + kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy( + new StdInstantiatorStrategy())); + kryo.register(SerializedLambda.class); + kryo.register(ClosureSerializer.Closure.class, new ClosureSerializer()); + kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); + return kryo; + }); } public Object deserialize(ByteBuffer data) { diff --git a/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java b/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java index 1211a5288..f95e30d1a 100644 --- a/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java +++ b/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java @@ -84,13 +84,10 @@ class HttpClient implements LivyClient { // Because we only have one connection to the server, we don't need more than a single // threaded executor here. - this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r, "HttpClient-" + sessionId); - t.setDaemon(true); - return t; - } + this.executor = Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, "HttpClient-" + sessionId); + t.setDaemon(true); + return t; }); this.serializer = new Serializer(); @@ -146,24 +143,18 @@ public Future addFile(URI uri) { } private Future uploadResource(final File file, final String command, final String paramName) { - Callable task = new Callable() { - @Override - public Void call() throws Exception { - conn.post(file, Void.class, paramName, "/%d/%s", sessionId, command); - return null; - } + Callable task = () -> { + conn.post(file, Void.class, paramName, "/%d/%s", sessionId, command); + return null; }; return executor.submit(task); } private Future addResource(final String command, final URI resource) { - Callable task = new Callable() { - @Override - public Void call() throws Exception { - ClientMessage msg = new AddResource(resource.toString()); - conn.post(msg, Void.class, "/%d/%s", sessionId, command); - return null; - } + Callable task = () -> { + ClientMessage msg = new AddResource(resource.toString()); + conn.post(msg, Void.class, "/%d/%s", sessionId, command); + return null; }; return executor.submit(task); } diff --git a/client-http/src/main/java/org/apache/livy/client/http/JobHandleImpl.java b/client-http/src/main/java/org/apache/livy/client/http/JobHandleImpl.java index d39dfe994..d5e74a784 100644 --- a/client-http/src/main/java/org/apache/livy/client/http/JobHandleImpl.java +++ b/client-http/src/main/java/org/apache/livy/client/http/JobHandleImpl.java @@ -134,38 +134,32 @@ protected Throwable error() { } void start(final String command, final ByteBuffer serializedJob) { - Runnable task = new Runnable() { - @Override - public void run() { - try { - ClientMessage msg = new SerializedJob(BufferUtils.toByteArray(serializedJob), "spark"); - JobStatus status = conn.post(msg, JobStatus.class, "/%d/%s", sessionId, command); - - if (isCancelPending) { - sendCancelRequest(status.id); - } - - jobId = status.id; + Runnable task = () -> { + try { + ClientMessage msg = new SerializedJob(BufferUtils.toByteArray(serializedJob), "spark"); + JobStatus status = conn.post(msg, JobStatus.class, "/%d/%s", sessionId, command); - pollTask = executor.schedule(new JobPollTask(initialPollInterval), - initialPollInterval, TimeUnit.MILLISECONDS); - } catch (Exception e) { - setResult(null, e, State.FAILED); + if (isCancelPending) { + sendCancelRequest(status.id); } + + jobId = status.id; + + pollTask = executor.schedule(new JobPollTask(initialPollInterval), + initialPollInterval, TimeUnit.MILLISECONDS); + } catch (Exception e) { + setResult(null, e, State.FAILED); } }; executor.submit(task); } private void sendCancelRequest(final long id) { - executor.submit(new Runnable() { - @Override - public void run() { - try { - conn.post(null, Void.class, "/%d/jobs/%d/cancel", sessionId, id); - } catch (Exception e) { - setResult(null, e, State.FAILED); - } + executor.submit(() -> { + try { + conn.post(null, Void.class, "/%d/jobs/%d/cancel", sessionId, id); + } catch (Exception e) { + setResult(null, e, State.FAILED); } }); } diff --git a/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java b/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java index 1251a0f04..bbd570468 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java +++ b/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java @@ -123,12 +123,7 @@ public void onFailure(Throwable error) throws Exception { // Set up a timeout to fail the promise if we don't hear back from the context // after a configurable timeout. - Runnable timeoutTask = new Runnable() { - @Override - public void run() { - connectTimeout(handler); - } - }; + Runnable timeoutTask = () -> connectTimeout(handler); this.timeout = factory.getServer().getEventLoopGroup().schedule(timeoutTask, conf.getTimeAsMs(RPC_CLIENT_HANDSHAKE_TIMEOUT), TimeUnit.MILLISECONDS); } catch (Exception e) { @@ -226,14 +221,11 @@ private static ChildProcess startDriver(final RSCConf conf, Promise promise) } else if (conf.getBoolean(CLIENT_IN_PROCESS)) { // Mostly for testing things quickly. Do not do this in production. LOG.warn("!!!! Running remote driver in-process. !!!!"); - Runnable child = new Runnable() { - @Override - public void run() { - try { - RSCDriverBootstrapper.main(new String[] { confFile.getAbsolutePath() }); - } catch (Exception e) { - throw Utils.propagate(e); - } + Runnable child = () -> { + try { + RSCDriverBootstrapper.main(new String[] { confFile.getAbsolutePath() }); + } catch (Exception e) { + throw Utils.propagate(e); } }; return new ChildProcess(conf, promise, child, confFile); @@ -346,12 +338,9 @@ private void handle(ChannelHandlerContext ctx, RemoteDriverAddress msg) { LOG.warn("Connection established but promise is already finalized."); } - ctx.executor().submit(new Runnable() { - @Override - public void run() { - dispose(); - ContextLauncher.this.dispose(false); - } + ctx.executor().submit(() -> { + dispose(); + ContextLauncher.this.dispose(false); }); } @@ -380,25 +369,22 @@ public ChildProcess(RSCConf conf, Promise promise, final Process childProc, F this.child = childProc; this.confFile = confFile; - Runnable monitorTask = new Runnable() { - @Override - public void run() { - try { - RSCClientFactory.childProcesses().incrementAndGet(); - int exitCode = child.waitFor(); - if (exitCode != 0) { - LOG.warn("Child process exited with code {}.", exitCode); - fail(new IOException(String.format("Child process exited with code %d.", exitCode))); - } - } catch (InterruptedException ie) { - LOG.warn("Waiting thread interrupted, killing child process."); - Thread.interrupted(); - child.destroy(); - } catch (Exception e) { - LOG.warn("Exception while waiting for child process.", e); - } finally { - RSCClientFactory.childProcesses().decrementAndGet(); + Runnable monitorTask = () -> { + try { + RSCClientFactory.childProcesses().incrementAndGet(); + int exitCode = child.waitFor(); + if (exitCode != 0) { + LOG.warn("Child process exited with code {}.", exitCode); + fail(new IOException(String.format("Child process exited with code %d.", exitCode))); } + } catch (InterruptedException ie) { + LOG.warn("Waiting thread interrupted, killing child process."); + Thread.interrupted(); + child.destroy(); + } catch (Exception e) { + LOG.warn("Exception while waiting for child process.", e); + } finally { + RSCClientFactory.childProcesses().decrementAndGet(); } }; this.monitor = monitor(monitorTask, childId); @@ -435,25 +421,19 @@ public void detach() { } private Thread monitor(final Runnable task, int childId) { - Runnable wrappedTask = new Runnable() { - @Override - public void run() { - try { - task.run(); - } finally { - confFile.delete(); - } + Runnable wrappedTask = () -> { + try { + task.run(); + } finally { + confFile.delete(); } }; Thread thread = new Thread(wrappedTask); thread.setDaemon(true); thread.setName("ContextLauncher-" + childId); - thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(Thread t, Throwable e) { - LOG.warn("Child task threw exception.", e); - fail(e); - } + thread.setUncaughtExceptionHandler((t, e) -> { + LOG.warn("Child task threw exception.", e); + fail(e); }); thread.start(); return thread; diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java b/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java index 51f84b4c5..668d0a1d5 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java +++ b/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java @@ -358,15 +358,12 @@ public void onFailure(Throwable error) throws Exception { promise.tryFailure(error); } }); - promise.addListener(new GenericFutureListener>() { - @Override - public void operationComplete(Promise p) { - if (jobId != null) { - jobs.remove(jobId); - } - if (p.isCancelled() && !rpc.isDone()) { - rpc.cancel(true); - } + promise.addListener((GenericFutureListener>) p -> { + if (jobId != null) { + jobs.remove(jobId); + } + if (p.isCancelled() && !rpc.isDone()) { + rpc.cancel(true); } }); return handle; diff --git a/rsc/src/main/java/org/apache/livy/rsc/Utils.java b/rsc/src/main/java/org/apache/livy/rsc/Utils.java index 3c8a5e664..ca0b9de5f 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/Utils.java +++ b/rsc/src/main/java/org/apache/livy/rsc/Utils.java @@ -99,14 +99,11 @@ public static String stackTraceAsString(Throwable t) { } public static void addListener(Future future, final FutureListener lsnr) { - future.addListener(new GenericFutureListener>() { - @Override - public void operationComplete(Future f) throws Exception { - if (f.isSuccess()) { - lsnr.onSuccess(f.get()); - } else { - lsnr.onFailure(f.cause()); - } + future.addListener((GenericFutureListener>) f -> { + if (f.isSuccess()) { + lsnr.onSuccess(f.get()); + } else { + lsnr.onFailure(f.cause()); } }); } diff --git a/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java b/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java index 0e4fbc5ba..4e9044544 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java +++ b/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java @@ -242,12 +242,9 @@ private void setupIdleTimeout() { return; } - Runnable timeoutTask = new Runnable() { - @Override - public void run() { - LOG.warn("Shutting down RSC due to idle timeout ({}).", livyConf.get(SERVER_IDLE_TIMEOUT)); - shutdown(); - } + Runnable timeoutTask = () -> { + LOG.warn("Shutting down RSC due to idle timeout ({}).", livyConf.get(SERVER_IDLE_TIMEOUT)); + shutdown(); }; ScheduledFuture timeout = server.getEventLoopGroup().schedule(timeoutTask, livyConf.getTimeAsMs(SERVER_IDLE_TIMEOUT), TimeUnit.MILLISECONDS); diff --git a/rsc/src/main/java/org/apache/livy/rsc/rpc/Rpc.java b/rsc/src/main/java/org/apache/livy/rsc/rpc/Rpc.java index e4871f528..4ee7ff858 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/rpc/Rpc.java +++ b/rsc/src/main/java/org/apache/livy/rsc/rpc/Rpc.java @@ -111,39 +111,28 @@ public static Promise createClient( final AtomicReference rpc = new AtomicReference<>(); // Set up a timeout to undo everything. - final Runnable timeoutTask = new Runnable() { - @Override - public void run() { - promise.setFailure(new TimeoutException("Timed out waiting for RPC server connection.")); - } - }; + final Runnable timeoutTask = () -> promise.setFailure(new TimeoutException("Timed out waiting for RPC server connection.")); final ScheduledFuture timeoutFuture = eloop.schedule(timeoutTask, config.getTimeAsMs(RPC_CLIENT_HANDSHAKE_TIMEOUT), TimeUnit.MILLISECONDS); // The channel listener instantiates the Rpc instance when the connection is established, // and initiates the SASL handshake. - cf.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture cf) throws Exception { - if (cf.isSuccess()) { - SaslClientHandler saslHandler = new SaslClientHandler(config, clientId, promise, - timeoutFuture, secret, dispatcher); - Rpc rpc = createRpc(config, saslHandler, (SocketChannel) cf.channel(), eloop); - saslHandler.rpc = rpc; - saslHandler.sendHello(cf.channel()); - } else { - promise.setFailure(cf.cause()); - } + cf.addListener((ChannelFutureListener) cf1 -> { + if (cf1.isSuccess()) { + SaslClientHandler saslHandler = new SaslClientHandler(config, clientId, promise, + timeoutFuture, secret, dispatcher); + Rpc rpc1 = createRpc(config, saslHandler, (SocketChannel) cf1.channel(), eloop); + saslHandler.rpc = rpc1; + saslHandler.sendHello(cf1.channel()); + } else { + promise.setFailure(cf1.cause()); } }); // Handle cancellation of the promise. - promise.addListener(new GenericFutureListener>() { - @Override - public void operationComplete(Promise p) { - if (p.isCancelled()) { - cf.cancel(true); - } + promise.addListener((GenericFutureListener>) p -> { + if (p.isCancelled()) { + cf.cancel(true); } }); @@ -429,25 +418,19 @@ public Future call(final Object msg, Class retType) { try { final long id = rpcId.getAndIncrement(); final Promise promise = egroup.next().newPromise(); - final ChannelFutureListener listener = new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture cf) { - if (!cf.isSuccess() && !promise.isDone()) { - LOG.warn("Failed to send RPC, closing connection.", cf.cause()); - promise.setFailure(cf.cause()); - discardRpcCall(id); - close(); - } - } + final ChannelFutureListener listener = cf -> { + if (!cf.isSuccess() && !promise.isDone()) { + LOG.warn("Failed to send RPC, closing connection.", cf.cause()); + promise.setFailure(cf.cause()); + discardRpcCall(id); + close(); + } }; registerRpcCall(id, promise, msg.getClass().getName()); - channel.eventLoop().submit(new Runnable() { - @Override - public void run() { - channel.write(new MessageHeader(id, Rpc.MessageType.CALL)).addListener(listener); - channel.writeAndFlush(msg).addListener(listener); - } + channel.eventLoop().submit(() -> { + channel.write(new MessageHeader(id, MessageType.CALL)).addListener(listener); + channel.writeAndFlush(msg).addListener(listener); }); return promise; diff --git a/rsc/src/main/java/org/apache/livy/rsc/rpc/RpcServer.java b/rsc/src/main/java/org/apache/livy/rsc/rpc/RpcServer.java index 14694fbc9..21b30c844 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/rpc/RpcServer.java +++ b/rsc/src/main/java/org/apache/livy/rsc/rpc/RpcServer.java @@ -145,12 +145,9 @@ public void initChannel(SocketChannel ch) throws Exception { final Rpc newRpc = Rpc.createServer(saslHandler, config, ch, group); saslHandler.rpc = newRpc; - Runnable cancelTask = new Runnable() { - @Override - public void run() { - LOG.warn("Timed out waiting for hello from client."); - newRpc.close(); - } + Runnable cancelTask = () -> { + LOG.warn("Timed out waiting for hello from client."); + newRpc.close(); }; saslHandler.cancelTask = group.schedule(cancelTask, config.getTimeAsMs(RPC_CLIENT_HANDSHAKE_TIMEOUT), diff --git a/rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java b/rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java index c8e44f00a..90e7c06af 100644 --- a/rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java +++ b/rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java @@ -410,22 +410,16 @@ public void testKillServerWhileSparkSubmitIsRunning() throws Exception { // Block waitFor until process.destroy() is called. final CountDownLatch waitForCalled = new CountDownLatch(1); - when(mockSparkSubmit.waitFor()).thenAnswer(new Answer() { - @Override - public Integer answer(InvocationOnMock invocation) throws Throwable { - waitForCalled.await(); - return 0; - } + when(mockSparkSubmit.waitFor()).thenAnswer((Answer) invocation -> { + waitForCalled.await(); + return 0; }); // Verify process.destroy() is called. final CountDownLatch destroyCalled = new CountDownLatch(1); - doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - destroyCalled.countDown(); - return null; - } + doAnswer((Answer) invocation -> { + destroyCalled.countDown(); + return null; }).when(mockSparkSubmit).destroy(); ContextLauncher.mockSparkSubmit = mockSparkSubmit;