Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
18254ff
[LIVY-616] Livy Server discovery
o-shevchenko Jul 31, 2019
6526052
scalastyle import order and line lengths
o-shevchenko Jul 31, 2019
5f30372
reorganized imports to fit project scalastyle requirements
o-shevchenko Aug 1, 2019
9731aae
Trigger, flaky tests
o-shevchenko Aug 1, 2019
64dabaf
added Java/Scala API, scaladocs, small refactoring
o-shevchenko Aug 1, 2019
a7fd7d8
scalastyle
o-shevchenko Aug 1, 2019
aacfd9f
Trigger, flaky tests
o-shevchenko Aug 1, 2019
d693ea3
Trigger, flaky tests
o-shevchenko Aug 1, 2019
729beda
Trigger, flaky tests
o-shevchenko Aug 1, 2019
27a7ffc
Trigger, flaky tests
o-shevchenko Aug 1, 2019
104f66b
Trigger, flaky tests
o-shevchenko Aug 2, 2019
5baf435
make more configurable
o-shevchenko Aug 2, 2019
5d7ea28
fix typo
o-shevchenko Aug 2, 2019
728deb9
Trigger, flaky tests
o-shevchenko Aug 2, 2019
a71c0ef
honor livy.server.host
o-shevchenko Aug 2, 2019
1d0add4
remove package access
o-shevchenko Aug 2, 2019
7e57010
update scaladoc
o-shevchenko Aug 14, 2019
627e455
Trigger, flaky tests
o-shevchenko Aug 14, 2019
df5a395
refactored, changed class ZooKeeperManager to trait to be able to add…
o-shevchenko Aug 14, 2019
18c10ed
Logging already mixed to ZooKeeperManager
o-shevchenko Aug 14, 2019
7d655fc
fixed problem with host resolving
o-shevchenko Sep 2, 2019
e8dc350
fixed test
o-shevchenko Sep 2, 2019
5e88b2d
log message improved
o-shevchenko Sep 2, 2019
47d8852
fix scalastyle
o-shevchenko Sep 2, 2019
1250fea
fix scalastyle
o-shevchenko Sep 2, 2019
5c47b09
Trigger, flaky
o-shevchenko Sep 2, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions api/src/main/java/org/apache/livy/LivyClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,11 @@ public interface LivyClient {
*/
Future<?> addFile(URI uri);

/**
* Get Livy Server URI.
* URI will be propagated from LivyClientBuilder during creating LivyClient.
*
* @return A future with Livy Server URI
*/
Future<URI> getServerUri();
}
8 changes: 8 additions & 0 deletions api/src/main/java/org/apache/livy/LivyClientBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ public LivyClientBuilder(boolean loadDefaults) throws IOException {
}
}

/**
* Set Livy Server URI.
* This is possible to set it manually or get URI from LivyDiscoveryManager
* ({@code livy.zookeeper.url} should be configured).
*
* @param uri Livy Server URI
* @return this builder
*/
public LivyClientBuilder setURI(URI uri) {
config.setProperty(LIVY_URI_KEY, uri.toString());
return this;
Expand Down
5 changes: 5 additions & 0 deletions api/src/test/java/org/apache/livy/TestClientFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ public Future<?> addFile(URI uri) {
throw new UnsupportedOperationException();
}

@Override
public Future<URI> getServerUri() {
throw new UnsupportedOperationException();
}

}

}
5 changes: 5 additions & 0 deletions client-http/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@
<artifactId>httpmime</artifactId>
<version>4.5.1</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>

<dependency>
<groupId>org.apache.livy</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;

import org.apache.livy.Job;
import org.apache.livy.JobHandle;
import org.apache.livy.LivyClient;
Expand All @@ -47,12 +50,14 @@ class HttpClient implements LivyClient {
private final int sessionId;
private final ScheduledExecutorService executor;
private final Serializer serializer;
private final Promise<URI> serverUriPromise;

private boolean stopped;

HttpClient(URI uri, HttpConf httpConf) {
this.config = httpConf;
this.stopped = false;
this.serverUriPromise = ImmediateEventExecutor.INSTANCE.newPromise();

// If the given URI looks like it refers to an existing session, then try to connect to
// an existing session. Note this means that any Spark configuration in httpConf will be
Expand All @@ -77,6 +82,7 @@ class HttpClient implements LivyClient {
ClientMessage create = new CreateClientRequest(sessionConf);
this.conn = new LivyConnection(uri, httpConf);
this.sessionId = conn.post(create, SessionInfo.class, "/").id;
serverUriPromise.setSuccess(uri);
}
} catch (Exception e) {
throw propagate(e);
Expand Down Expand Up @@ -145,6 +151,11 @@ public Future<?> addFile(URI uri) {
return addResource("add-file", uri);
}

@Override
public Future<URI> getServerUri() {
return serverUriPromise;
}

private Future<?> uploadResource(final File file, final String command, final String paramName) {
Callable<Void> task = new Callable<Void>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni
testJob(false)
}

withClient("should get Livy Server URI") {
assume(Option(client.getServerUri.get()).isDefined)
}

withClient("should propagate errors from jobs") {
val errorMessage = "This job throws an error."
val (jobId, handle) = runJob(false, { id => Seq(
Expand Down
21 changes: 18 additions & 3 deletions conf/livy.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,9 @@
# off: Default. Turn off recovery. Every time Livy shuts down, it stops and forgets all sessions.
# recovery: Livy persists session info to the state store. When Livy restarts, it recovers
# previous sessions from the state store.
# Must set livy.server.recovery.state-store and livy.server.recovery.state-store.url to
# configure the state store.
# Must set livy.server.recovery.state-store to needed state store (filesystem or zookeeper)
# Set livy.server.recovery.state-store.url for filesystem state store
# or livy.zookeeper.url for zookeeper state store.
# livy.server.recovery.mode = off

# Where Livy should store state to for recovery. Possible values:
Expand All @@ -112,8 +113,22 @@

# For filesystem state store, the path of the state store directory. Please don't use a filesystem
# that doesn't support atomic rename (e.g. S3). e.g. file:///tmp/livy or hdfs:///.
# For zookeeper, the address to the Zookeeper servers. e.g. host1:port1,host2:port2
# livy.server.recovery.state-store.url =
# For zookeeper, the address to the Zookeeper servers. e.g. host1:port1,host2:port2
# livy.zookeeper.url =

# Livy Server discovery
# ZooKeeper quorum URLs, e.g. host1:port1,host2:port2
# livy.zookeeper.url =
# Name of base Livy znode. Default livy
# livy.zookeeper.namespace = livy
# Name of Livy Server znode. Uses livy.zookeeper.namespace as parent.
# By default, the full path to znode is /livy/server.uri
# livy.server.zookeeper.namespace = server.uri
# Number of trials to establish the connection to ZooKeeper quorum
# livy.server.zookeeper.connection.max.retries = 3
# Sleep time between connection retries to ZooKeeper quorum
# livy.server.zookeeper.connection.retry.interval.ms = 500

# If Livy can't find the yarn app within this time, consider it lost.
# livy.server.yarn.app-lookup-timeout = 120s
Expand Down
1 change: 1 addition & 0 deletions rsc/src/main/java/org/apache/livy/rsc/RSCClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ public void onFailure(Throwable error) throws Exception {
return promise;
}

@Override
public Future<URI> getServerUri() {
return serverUriPromise;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,13 @@ class LivyScalaClient(livyJavaClient: LivyClient) {
*/
def addFile(uri: URI): Future[_] = new PollingContainer(livyJavaClient.addFile(uri)).poll()

/**
* Get Livy Server URI.
*
* @return A future with Livy Server URI
*/
def getServerUri: Future[URI] = new PollingContainer(livyJavaClient.getServerUri).poll()

private class PollingContainer[T] private[livy] (jFuture: JFuture[T]) extends Runnable {

private val initialDelay = 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,13 @@ class ScalaClientTest extends FunSuite
ScalaClientTestUtils.assertTestPassed(sFuture, "test file")
}

test("test get uri") {
configureClient(true)
val getUriFuture = client.getServerUri
val uri = Await.result(getUriFuture, ScalaClientTestUtils.Timeout second)
assert(Option(uri).isDefined)
}

test("test add jar") {
configureClient(true)
val jar = File.createTempFile("test", ".resource")
Expand Down
27 changes: 25 additions & 2 deletions server/src/main/scala/org/apache/livy/LivyConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,9 @@ object LivyConf {
* off: Default. Turn off recovery. Every time Livy shuts down, it stops and forgets all sessions.
* recovery: Livy persists session info to the state store. When Livy restarts, it recovers
* previous sessions from the state store.
* Must set livy.server.recovery.state-store and livy.server.recovery.state-store.url to
* Must set livy.server.recovery.state-store to needed state store (filesystem or zookeeper)
* Set livy.server.recovery.state-store.url for filesystem state store
* or livy.zookeeper.url for zookeeper state store.
* configure the state store.
*/
val RECOVERY_MODE = Entry("livy.server.recovery.mode", "off")
Expand All @@ -187,10 +189,31 @@ object LivyConf {
/**
* For filesystem state store, the path of the state store directory. Please don't use a
* filesystem that doesn't support atomic rename (e.g. S3). e.g. file:///tmp/livy or hdfs:///.
* For zookeeper, the address to the Zookeeper servers. e.g. host1:port1,host2:port2
* For zookeeper, use livy.zookeeper.url.
*/
val RECOVERY_STATE_STORE_URL = Entry("livy.server.recovery.state-store.url", "")

/**
* ZooKeeper address witch will be used for Livy Server discovery
* and Zookeeper state store. e.g. host1:port1,host2:port2
*/
val LIVY_ZOOKEEPER_URL = Entry("livy.zookeeper.url", null)

// Name of base Livy znode.
val LIVY_ZOOKEEPER_NAMESPACE = Entry("livy.zookeeper.namespace", "livy")

// Number of trials to establish the connection to ZooKeeper quorum.
val LIVY_ZOOKEEPER_CONNECTION_MAX_RETRIES =
Entry("livy.server.zookeeper.connection.max.retries", 3)

// Sleep time between connection retries to ZooKeeper quorum.
val LIVY_ZOOKEEPER_CONNECTION_RETRY_INTERVAL =
Entry("livy.server.zookeeper.connection.retry.interval.ms", 500)

// Name of Livy Server znode. Uses LIVY_ZOOKEEPER_NAMESPACE as parent.
// By default, the full path to znode is /livy/server.uri
val LIVY_SERVER_ZOOKEEPER_NAMESPACE = Entry("livy.server.zookeeper.namespace", "server.uri")

// Livy will cache the max no of logs specified. 0 means don't cache the logs.
val SPARK_LOGS_SIZE = Entry("livy.cache-log.size", 200)

Expand Down
27 changes: 25 additions & 2 deletions server/src/main/scala/org/apache/livy/server/LivyServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@
package org.apache.livy.server

import java.io.{BufferedInputStream, InputStream}
import java.net.InetAddress
import java.util.concurrent._
import java.net.{InetAddress, URI}
import java.util.EnumSet
import java.util.concurrent._
import javax.servlet._

import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

import org.apache.curator.framework.CuratorFramework
import org.apache.hadoop.security.{SecurityUtil, UserGroupInformation}
import org.apache.hadoop.security.authentication.server._
import org.eclipse.jetty.servlet.FilterHolder
Expand All @@ -37,6 +38,7 @@ import org.scalatra.servlet.{MultipartConfig, ServletApiImplicits}

import org.apache.livy._
import org.apache.livy.server.batch.BatchSessionServlet
import org.apache.livy.server.discovery.LivyDiscoveryManager
import org.apache.livy.server.interactive.InteractiveSessionServlet
import org.apache.livy.server.recovery.{SessionStore, StateStore}
import org.apache.livy.server.ui.UIServlet
Expand Down Expand Up @@ -72,6 +74,8 @@ class LivyServer extends Logging {
maxFileSize = Some(livyConf.getLong(LivyConf.FILE_UPLOAD_MAX_SIZE))
).toMultipartConfigElement

setServerUri(livyConf)

// Make sure the `spark-submit` program exists, otherwise much of livy won't work.
testSparkHome(livyConf)

Expand Down Expand Up @@ -385,6 +389,25 @@ class LivyServer extends Logging {
}
}

private[livy] def setServerUri(livyConf: LivyConf,
mockCuratorClient: Option[CuratorFramework] = None): Unit = {
if (Option(livyConf.get(LIVY_ZOOKEEPER_URL)).isDefined) {
val discoveryManager = LivyDiscoveryManager(livyConf, mockCuratorClient)
val host = resolvedSeverHost(livyConf)
val uri = new URI(s"http://$host:${livyConf.getInt(LivyConf.SERVER_PORT)}")
discoveryManager.setServerUri(uri)
}
}

private def resolvedSeverHost(livyConf: LivyConf) = {
val host = livyConf.get(LivyConf.SERVER_HOST)
if (host.equals(LivyConf.SERVER_HOST.dflt.toString)) {
InetAddress.getLocalHost.getHostAddress
} else {
host
}
}

private[livy] def testRecovery(livyConf: LivyConf): Unit = {
if (!livyConf.isRunningOnYarn()) {
// If recovery is turned on but we are not running on YARN, quit.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.livy.server.discovery

import scala.reflect.{classTag, ClassTag}

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule

import org.apache.livy.sessions.SessionKindModule

protected[server] trait JsonMapper {
protected val mapper = new ObjectMapper()
.registerModule(DefaultScalaModule)
.registerModule(new SessionKindModule())

def serializeToBytes(value: Object): Array[Byte] = mapper.writeValueAsBytes(value)

def deserialize[T: ClassTag](json: Array[Byte]): T =
mapper.readValue(json, classTag[T].runtimeClass.asInstanceOf[Class[T]])
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.livy.server.discovery

import java.net.URI

import org.apache.curator.framework.CuratorFramework

import org.apache.livy.LivyConf

/**
* Livy Server Discovery manager.
* Stores information about Livy Server location in ZooKeeper.
* The address will be stored in
* "/{@code LIVY_ZOOKEEPER_NAMESPACE}/{@code LIVY_SERVER_ZOOKEEPER_NAMESPACE}" znode
* By default, the full path to znode is /livy/server.uri.
* Need to set {@code livy.zookeeper.url} to be able to get information from ZooKeeper.
*
* @param livyConf - Livy configurations
* @param mockCuratorClient - used for testing
*/
class LivyDiscoveryManager(val livyConf: LivyConf,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider providing a generic method to publishing configuration key/value to zookeeper?

In some other scenario, e.g. this PR #193 , a bunch of hive related configurations need to be published.

If we have a generic method publish configuration key/value, it may be easier to let other components to leverage the code.

@o-shevchenko o-shevchenko Sep 2, 2019

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have one in ZooKeeperManager
LivyDiscovery is a hight level of abstraction which is used for better understanding, single responsibility and API simplification

@o-shevchenko o-shevchenko Sep 2, 2019

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just refactored StateStore to move functionality for communication with ZK from ZooKeeperStateStore.scala to separated trait ZooKeeperManager.scala. In ZooKeeperStateStore we will use ZooKeeperManager to store data in ZK. ZooKeeperManager code was not changed except couple small improvements for configurations to make it more flexible.

We can use this trait everywhere when we need to store something to ZooKeeper since it's generic:

def setData(key: String, value: Object): Unit = {
    val prefixedKey = prefixKey(key)
    val data = serializeToBytes(value)
    if (exist(prefixedKey)) {
      curatorClient.setData().forPath(prefixedKey, data)
    } else {
      curatorClient.create().creatingParentsIfNeeded().forPath(prefixedKey, data)
    }
  }

  def getData[T: ClassTag](key: String): Option[T] = {
    val prefixedKey = prefixKey(key)
    if (exist(prefixedKey)) {
      Option(deserialize[T](curatorClient.getData().forPath(prefixedKey)))
    } else {
      None
    }
  }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ZooKeeperManager should be used every time by design when we need to store something to ZK. I think we need to have one logic for ZK interaction for LivyServer discovery, ZK state store, Livy HA, Livy Thrift Server HA. That's why I moved this functionality to the separated trait, to make it reuseable for someone else.

@o-shevchenko o-shevchenko Sep 2, 2019

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I have tried to make all configurations more generic and configurable on the top level to use it everywhere. As far as I see, for now, we have a couple of PRs with different ZK logic which can't be reused a lot and I see too specific confs like livy.server.thrift.zookeeper.quorum instead of simple common livy.zookeeper.url which can be reused. In this PR I want to not only add LivyServer discovery but also create init structure for subsequent work with ZooKeeper.

val mockCuratorClient: Option[CuratorFramework] = None)
extends ZooKeeperManager {

private val LIVY_SERVER_URI_KEY = livyConf.get(LivyConf.LIVY_SERVER_ZOOKEEPER_NAMESPACE)

/**
* Save Livy Server URI to ZooKeeper.
* @param address - URI address of Livy Server
*/
def setServerUri(address: URI): Unit = {
setData(LIVY_SERVER_URI_KEY, address)
}

/**
* Get Livy Server URI from ZooKeeper.
* @return Livy Server URI
*/
def getServerUri(): URI = {
getData[URI](LIVY_SERVER_URI_KEY).getOrElse(URI.create(""))
}
}

object LivyDiscoveryManager {

def apply(livyConf: LivyConf,
mockCuratorClient: Option[CuratorFramework] = None): LivyDiscoveryManager = {
new LivyDiscoveryManager(livyConf, mockCuratorClient)
}
}
Loading