Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions api/src/main/java/org/apache/livy/LivyClientBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
public final class LivyClientBuilder {

public static final String LIVY_URI_KEY = "livy.uri";
public static final String LIVY_SESSION_ID_KEY = "livy.sessionId";

private static final ServiceLoader<LivyClientFactory> CLIENT_FACTORY_LOADER =
ServiceLoader.load(LivyClientFactory.class, classLoader());
Expand Down Expand Up @@ -96,11 +97,32 @@ public LivyClientBuilder(boolean loadDefaults) throws IOException {
}
}

/**
* Sets the URI of the Livy server the client will connect to. If the URI contains
* <pre>sessions/{sessionId}</pre>, the client will connect to the specified existing session;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one last question: @ajbozarth @jerryshao @tmnd1991 I think that by adding this API, we might want to deprecate this "hacky" approach and eventually remove the support to it in next major release. Any thought on this?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I 100% agree on removing the hacky (and until now not even documented) approach. I left it to make the PR backward compatible

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we definitely need to leave it for now, but we can start deprecating it. Let's wait for others opinion .

* otherwise it will create a new session.
*
Comment thread
mgaido91 marked this conversation as resolved.
* @param uri The URI of Livy server.
* @return The builder itself.
*/
public LivyClientBuilder setURI(URI uri) {
config.setProperty(LIVY_URI_KEY, uri.toString());
return this;
}

/**
* Sets the session ID the client will connect to. If a session ID is set, the chosen session
* will be used with its own configurations, so Spark configurations will be ignored. If not set,
* a new session will be created when the client is built.
*
Comment thread
mgaido91 marked this conversation as resolved.
* @param sessionId The ID of the session to attach to.
* @return the builder itself.
*/
public LivyClientBuilder setSessionId(int sessionId) {
config.setProperty(LIVY_SESSION_ID_KEY, String.valueOf(sessionId));
return this;
}

public LivyClientBuilder setConf(String key, String value) {
if (value != null) {
config.setProperty(key, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.livy.Job;
import org.apache.livy.JobHandle;
import org.apache.livy.LivyClient;
import org.apache.livy.LivyClientBuilder;
import org.apache.livy.client.common.Serializer;
import static org.apache.livy.client.common.HttpMessages.*;

Expand All @@ -59,9 +60,17 @@ class HttpClient implements LivyClient {
// unused.
Matcher m = Pattern.compile("(.*)" + LivyConnection.SESSIONS_URI + "/([0-9]+)")
.matcher(uri.getPath());
String sessionIdFromConf = httpConf.get(LivyClientBuilder.LIVY_SESSION_ID_KEY);

try {
if (m.matches()) {
if (sessionIdFromConf != null && m.matches()) {
throw new IllegalArgumentException(
"Cannot set existing session both from URI and configuration");
} else if (sessionIdFromConf != null) {
this.conn = new LivyConnection(uri, httpConf);
this.sessionId = Integer.parseInt(sessionIdFromConf);
conn.post(null, SessionInfo.class, "/%d/connect", sessionId);
} else if (m.matches()) {
URI base = new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), uri.getPort(),
m.group(1), uri.getQuery(), uri.getFragment());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,34 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni
testJob(false, response = Some(null))
}

withClient("should connect to existing sessions") {
var sid = client.asInstanceOf[HttpClient].getSessionId()
withClient("should connect to existing sessions using the URI") {
val sid = client.asInstanceOf[HttpClient].getSessionId()
val uri = s"http://${InetAddress.getLocalHost.getHostAddress}:${server.port}" +
s"${LivyConnection.SESSIONS_URI}/$sid"
val newClient = new LivyClientBuilder(false).setURI(new URI(uri)).build()
newClient.stop(false)
verify(session, never()).stop()
}

withClient("should connect to existing sessions using the conf") {
val sid = client.asInstanceOf[HttpClient].getSessionId()
val uri = s"http://${InetAddress.getLocalHost.getHostAddress}:${server.port}"
val newClient = new LivyClientBuilder(false).setURI(new URI(uri)).setSessionId(sid).build()
newClient.stop(false)
verify(session, never()).stop()
}

withClient("should throw an exception if the sessionId is set through conf and URI") {
val sid = client.asInstanceOf[HttpClient].getSessionId()
val uri = s"http://${InetAddress.getLocalHost.getHostAddress}:${server.port}" +
s"${LivyConnection.SESSIONS_URI}/$sid"
intercept[IllegalArgumentException]{
val newClient = new LivyClientBuilder(false).setURI(new URI(uri)).setSessionId(sid).build()
newClient.stop(false)
verify(session, never()).stop()
}
}

withClient("should tear down clients") {
client.stop(true)
verify(session, times(1)).stop()
Expand Down