Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions conf/livy.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,15 @@
# JAAS login context name for ZooKeeper SASL authentication.
# livy.server.zk.sasl.login-context = Client

# The SSL configuration for zookeeper. When livy.server.zk.client.secure=true, the following
# are required: livy.keystore, livy.server.zk.ssl.keyStore.password,
# livy.server.zk.ssl.truststore.location, livy.server.zk.ssl.truststore.password.
# livy.server.zk.client.secure=false
# livy.server.zk.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
# livy.server.zk.ssl.keyStore.password=
# livy.server.zk.ssl.truststore.location=
# livy.server.zk.ssl.truststore.password=

# If Livy can't find the yarn app within this time, consider it lost.
# livy.server.yarn.app-lookup-timeout = 120s
# When the cluster is busy, we may fail to launch yarn app in app-lookup-timeout, then it would
Expand Down
7 changes: 7 additions & 0 deletions server/src/main/scala/org/apache/livy/LivyConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,13 @@ object LivyConf {

val SESSION_ALLOW_CUSTOM_CLASSPATH = Entry("livy.server.session.allow-custom-classpath", false)

val LIVY_ZK_CLIENT_SOCKET = Entry("livy.server.zk.clientCnxnSocket",
"org.apache.zookeeper.ClientCnxnSocketNetty")
val LIVY_ZK_KEYSTORE_PASS = Entry("livy.server.zk.ssl.keyStore.password", null)
val LIVY_ZK_TRUSTSTORE_FILE = Entry("livy.server.zk.ssl.truststore.location", null)
val LIVY_ZK_TRUSTSTORE_PASS = Entry("livy.server.zk.ssl.truststore.password", null)
val LIVY_ZK_CLIENT_SECURE = Entry("livy.server.zk.client.secure", false)

val SPARK_MASTER = "spark.master"
val SPARK_DEPLOY_MODE = "spark.submit.deployMode"
val SPARK_JARS = "spark.jars"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.framework.state.{ConnectionState, ConnectionStateListener}
import org.apache.curator.retry.RetryNTimes
import org.apache.zookeeper.KeeperException.NoNodeException
import org.apache.zookeeper.client.ZKClientConfig

import org.apache.livy.LivyConf
import org.apache.livy.Logging
Expand Down Expand Up @@ -63,6 +64,38 @@ class ZooKeeperManager(
"Correct format is <max retry count>,<sleep ms between retry>. e.g. 5,100")
}

if (livyConf.getBoolean(LivyConf.LIVY_ZK_CLIENT_SECURE)) {
Seq(
(LivyConf.SSL_KEYSTORE, livyConf.get(LivyConf.SSL_KEYSTORE)),
(LivyConf.LIVY_ZK_KEYSTORE_PASS, livyConf.get(LivyConf.LIVY_ZK_KEYSTORE_PASS)),
(LivyConf.LIVY_ZK_TRUSTSTORE_FILE, livyConf.get(LivyConf.LIVY_ZK_TRUSTSTORE_FILE)),
(LivyConf.LIVY_ZK_TRUSTSTORE_PASS, livyConf.get(LivyConf.LIVY_ZK_TRUSTSTORE_PASS))
).foreach { case (entry, value) =>
require(value != null && !value.trim.isEmpty,
s"Please config ${entry.key} when ${LivyConf.LIVY_ZK_CLIENT_SECURE.key}=true.")
}
}

private[recovery] def createZKClientConfig = {
val clientConfig = new ZKClientConfig

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: when secure is true, can we strict require() the four SSL fields (truststore location/password, ssl keystore, keystore password) since operator missing out on some configs would cause retry loop to zk connection

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Arnav for the review! I have implemented your suggestion and updated the code.

clientConfig.setProperty("zookeeper.client.secure", "true")
clientConfig.setProperty("zookeeper.clientCnxnSocket",
livyConf.get(LivyConf.LIVY_ZK_CLIENT_SOCKET))
clientConfig.setProperty("zookeeper.ssl.keyStore.location",
livyConf.get(LivyConf.SSL_KEYSTORE))
clientConfig.setProperty("zookeeper.ssl.keyStore.password",
livyConf.get(LivyConf.LIVY_ZK_KEYSTORE_PASS))
clientConfig.setProperty("zookeeper.ssl.keyStore.type",
livyConf.get(LivyConf.SSL_KEYSTORE_TYPE))
clientConfig.setProperty("zookeeper.ssl.trustStore.location",
livyConf.get(LivyConf.LIVY_ZK_TRUSTSTORE_FILE))
clientConfig.setProperty("zookeeper.ssl.trustStore.password",
livyConf.get(LivyConf.LIVY_ZK_TRUSTSTORE_PASS))
clientConfig.setProperty("zookeeper.ssl.trustStore.type",
livyConf.get(LivyConf.SSL_KEYSTORE_TYPE))
clientConfig
}

private val curatorClient = mockCuratorClient.getOrElse {
if (livyConf.getBoolean(LivyConf.ZK_SASL_ENABLED)) {
System.setProperty("zookeeper.sasl.client", "true")
Expand All @@ -73,10 +106,13 @@ class ZooKeeperManager(
info(s"ZooKeeper SASL authentication enabled with login context: " +
s"${Option(loginContext).getOrElse("Client")}")
}
CuratorFrameworkFactory.builder()
.connectString(zkAddress)
.retryPolicy(retryPolicy)
.build()
val builder = CuratorFrameworkFactory.builder()
builder.connectString(zkAddress)
builder.retryPolicy(retryPolicy)
if (livyConf.getBoolean(LivyConf.LIVY_ZK_CLIENT_SECURE)) {
builder.zkClientConfig(createZKClientConfig)
}
builder.build()
}

curatorClient.getUnhandledErrorListenable().addListener(new UnhandledErrorListener {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,17 @@ class ZooKeeperStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite {
val key = "key"
val prefixedKey = s"/livy/$key"

def withMock[R](testBody: TestFixture => R): R = {
val curatorClient = mock[CuratorFramework]
when(curatorClient.getUnhandledErrorListenable())
def mockCurator(): CuratorFramework = {
val cc = mock[CuratorFramework]
when(cc.getUnhandledErrorListenable())
.thenReturn(mock[Listenable[UnhandledErrorListener]])
when(curatorClient.getConnectionStateListenable())
when(cc.getConnectionStateListenable())
.thenReturn(mock[Listenable[ConnectionStateListener]])
cc
}

def withMock[R](testBody: TestFixture => R): R = {
val curatorClient = mockCurator()
val zkManager = new ZooKeeperManager(conf, Some(curatorClient))
zkManager.start()
val stateStore = new ZooKeeperStateStore(conf, zkManager)
Expand Down Expand Up @@ -191,9 +196,7 @@ class ZooKeeperStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite {
}

it("should register a ConnectionStateListener that handles all connection states") {
val curatorClient = mock[CuratorFramework]
when(curatorClient.getUnhandledErrorListenable())
.thenReturn(mock[Listenable[UnhandledErrorListener]])
val curatorClient = mockCurator()
val listenable = mock[Listenable[ConnectionStateListener]]
when(curatorClient.getConnectionStateListenable()).thenReturn(listenable)

Expand All @@ -219,5 +222,99 @@ class ZooKeeperStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite {
System.getProperty("zookeeper.sasl.client") shouldBe null
System.getProperty("zookeeper.sasl.clientconfig") shouldBe null
}

describe("SSL config") {
case class SslTestFixture(zkManager: ZooKeeperManager, curatorClient: CuratorFramework)

def makeSslConf(): LivyConf = {
val c = new LivyConf()
c.set(LivyConf.RECOVERY_STATE_STORE_URL, "/tmp/livy")
c.set(LivyConf.SSL_KEYSTORE, "/tmp/keystore.jks")
c.set(LivyConf.SSL_KEYSTORE_PASSWORD, "keystorePass")
c.set(LivyConf.SSL_KEY_PASSWORD, "keyPass")
c.set(LivyConf.SSL_KEYSTORE_TYPE, "JKS")
c.set(LivyConf.LIVY_ZK_KEYSTORE_PASS, "keystorePass")
c.set(LivyConf.LIVY_ZK_TRUSTSTORE_FILE, "/tmp/truststore.jks")
c.set(LivyConf.LIVY_ZK_TRUSTSTORE_PASS, "truststorePass")
c
}

def withSslMock[R](sslConf: LivyConf)(testBody: SslTestFixture => R): R = {
val curatorClient = mockCurator()
val zkManager = new ZooKeeperManager(sslConf, Some(curatorClient))
zkManager.start()
testBody(SslTestFixture(zkManager, curatorClient))
}

it("createZKClientConfig should set secure flag and socket class") {
withSslMock(makeSslConf()) { f =>
verify(f.curatorClient).start()
val zkConfig = f.zkManager.createZKClientConfig
zkConfig.getProperty("zookeeper.client.secure") shouldBe "true"
zkConfig.getProperty("zookeeper.clientCnxnSocket") shouldBe
"org.apache.zookeeper.ClientCnxnSocketNetty"
}
}

it("createZKClientConfig should set keystore location, password and type from LivyConf") {
withSslMock(makeSslConf()) { f =>
verify(f.curatorClient).start()
val zkConfig = f.zkManager.createZKClientConfig
zkConfig.getProperty("zookeeper.ssl.keyStore.location") shouldBe "/tmp/keystore.jks"
zkConfig.getProperty("zookeeper.ssl.keyStore.password") shouldBe "keystorePass"
zkConfig.getProperty("zookeeper.ssl.keyStore.type") shouldBe "JKS"
}
}

it("createZKClientConfig should set truststore location, password and type from LivyConf") {
withSslMock(makeSslConf()) { f =>
verify(f.curatorClient).start()
val zkConfig = f.zkManager.createZKClientConfig
zkConfig.getProperty("zookeeper.ssl.trustStore.location") shouldBe
"/tmp/truststore.jks"
zkConfig.getProperty("zookeeper.ssl.trustStore.password") shouldBe "truststorePass"
// trustStore.type reuses SSL_KEYSTORE_TYPE
zkConfig.getProperty("zookeeper.ssl.trustStore.type") shouldBe "JKS"
}
}

it("should build successfully when LIVY_ZK_CLIENT_SECURE is enabled") {
val sslConf = makeSslConf()
sslConf.set(LivyConf.LIVY_ZK_CLIENT_SECURE, true)
noException should be thrownBy {
val zkManager = new ZooKeeperManager(sslConf, Some(mockCurator()))
zkManager.start()
zkManager.stop()
}
}

it("should build successfully when LIVY_ZK_CLIENT_SECURE is disabled") {
val noSslConf = new LivyConf()
noSslConf.set(LivyConf.RECOVERY_STATE_STORE_URL, "host")
noSslConf.set(LivyConf.LIVY_ZK_CLIENT_SECURE, false)
noException should be thrownBy {
val zkManager = new ZooKeeperManager(noSslConf, Some(mockCurator()))
zkManager.start()
zkManager.stop()
}
}

Seq(
(LivyConf.SSL_KEYSTORE, "keystore location"),
(LivyConf.LIVY_ZK_KEYSTORE_PASS, "keystore password"),
(LivyConf.LIVY_ZK_TRUSTSTORE_FILE, "truststore location"),
(LivyConf.LIVY_ZK_TRUSTSTORE_PASS, "truststore password")
).foreach { case (entry, label) =>
it(s"should fail fast when LIVY_ZK_CLIENT_SECURE is enabled but $label is missing") {
val sslConf = makeSslConf()
sslConf.set(LivyConf.LIVY_ZK_CLIENT_SECURE, true)
sslConf.set(entry, null)
val thrown = the[IllegalArgumentException] thrownBy {
new ZooKeeperManager(sslConf)
}
thrown.getMessage should include(entry.key)
}
}
}
}
}
Loading