diff --git a/conf/livy.conf.template b/conf/livy.conf.template index 456bec749..663c88dc6 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -100,6 +100,25 @@ # on user request and then livy server classpath automatically. # livy.repl.enable-hive-context = +# High Availability mode of Livy. Possible values: +# off: Default. Turn off High Availability. +# on: Livy uses Zookeeper as a state store to ensure a livy server is always available with the +# correct state. +# Must set livy.server.ha.zookeeper-url to configure HA +# livy.server.ha.mode = off + +# For zookeeper, the address to the Zookeeper servers. e.g. host1:port1,host2:port2 +# livy.server.ha.zookeeper-url = + +# The list of ids for all livy servers used for HA +# livy.server.ha.server-ids = + +# The list of hostnames for all livy servers used for HA +# livy.server.ha.server-hostnames = + +# The list of server addresses for all livy servers used for HA +# livy.server.ha.server-addresses = + # Recovery mode of Livy. Possible values: # off: Default. Turn off recovery. Every time Livy shuts down, it stops and forgets all sessions. # recovery: Livy persists session info to the state store. When Livy restarts, it recovers diff --git a/integration-test/src/main/scala/org/apache/livy/test/framework/MiniCluster.scala b/integration-test/src/main/scala/org/apache/livy/test/framework/MiniCluster.scala index 2827dceab..28c4dc103 100644 --- a/integration-test/src/main/scala/org/apache/livy/test/framework/MiniCluster.scala +++ b/integration-test/src/main/scala/org/apache/livy/test/framework/MiniCluster.scala @@ -154,6 +154,7 @@ object MiniLivyMain extends MiniClusterBase { saveProperties(livyConf, new File(configPath + "/livy.conf")) val server = new LivyServer() + server.init() server.start() server.livyConf.set(LivyConf.ENABLE_HIVE_CONTEXT, false) // Write a serverUrl.conf file to the conf directory with the location of the Livy diff --git a/server/pom.xml b/server/pom.xml index bca185327..55603f0bd 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -251,6 +251,12 @@ test + + org.springframework + spring-web + 5.2.0.RELEASE + + org.scalatra scalatra-test_${scala.binary.version} diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index 1b06fdebd..398395df6 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -196,6 +196,25 @@ object LivyConf { * zookeeper: Store state in a Zookeeper instance. */ val RECOVERY_STATE_STORE = Entry("livy.server.recovery.state-store", null) + + /** + * High Availability mode of Livy. Possible values: + * off: Default. Turn off High Availability. + * on: Livy uses Zookeeper as a state store to ensure a livy server is always available with the + * correct state. + * Must set livy.server.ha.zookeeper-url to configure HA + */ + val HA_MODE = Entry("livy.server.ha.mode", "off") + + // For zookeeper, the address to the Zookeeper servers. e.g. host1:port1,host2:port2 + val HA_ZOOKEEPER_URL = Entry("livy.server.ha.zookeeper-url", "") + + // The ids of all servers used in HA + val HA_SERVER_IDS = Entry("livy.server.ha.server-ids", "") + + // The hostnames of all servers used in HA + val HA_SERVER_HOSTNAMES = Entry("livy.server.ha.server-hostnames", "") + /** * For filesystem state store, the path of the state store directory. Please don't use a * filesystem that doesn't support atomic rename (e.g. S3). e.g. file:///tmp/livy or hdfs:///. @@ -406,3 +425,4 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) { } } + diff --git a/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala b/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala new file mode 100644 index 000000000..2c3ef0c48 --- /dev/null +++ b/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.server + +import java.io.Closeable +import java.io.IOException +import java.net.InetAddress +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger + +import org.apache.curator.framework.CuratorFramework +import org.apache.curator.framework.CuratorFrameworkFactory +import org.apache.curator.framework.recipes.leader.LeaderLatch +import org.apache.curator.framework.recipes.leader.LeaderLatchListener +import org.apache.curator.retry.RetryNTimes + +import org.apache.livy.{LivyConf, Logging} +import org.apache.livy.LivyConf.Entry + +object CuratorElectorService { + val HA_KEY_PREFIX_CONF = Entry("livy.server.ha.key-prefix", "livy_ha") + val HA_RETRY_CONF = Entry("livy.server.ha.retry-policy", "5,100") +} + +object HAState extends Enumeration{ + type HAState = Value + val Active, Standby = Value +} + + +class CuratorElectorService(livyConf : LivyConf, livyServer : LivyServer, + mockCuratorClient: Option[CuratorFramework] = None, + mockLeaderLatch: Option[LeaderLatch] = None) + extends LeaderLatchListener + with Logging +{ + import CuratorElectorService._ + import HAState._ + + val haAddress = livyConf.get(LivyConf.HA_ZOOKEEPER_URL) + require(!haAddress.isEmpty, s"Please configure ${LivyConf.HA_ZOOKEEPER_URL.key}.") + val haKeyPrefix = livyConf.get(HA_KEY_PREFIX_CONF) + val retryValue = livyConf.get(HA_RETRY_CONF) + // a regex to match patterns like "m, n" where m and n both are integer values + val retryPattern = """\s*(\d+)\s*,\s*(\d+)\s*""".r + val retryPolicy = retryValue match { + case retryPattern(n, sleepMs) => new RetryNTimes(n.toInt, sleepMs.toInt) + case _ => throw new IllegalArgumentException( + s"$HA_KEY_PREFIX_CONF contains bad value: $retryValue. " + + "Correct format is ,. e.g. 5,100") + } + + var server : LivyServer = livyServer + + val client: CuratorFramework = mockCuratorClient.getOrElse { + CuratorFrameworkFactory.newClient(haAddress, retryPolicy) + } + val leaderKey = s"/$haKeyPrefix/leader" + + val leaderIds = livyConf.configToSeq(LivyConf.HA_SERVER_IDS) + val leaderHostnames = livyConf.configToSeq(LivyConf.HA_SERVER_HOSTNAMES) + + var leaderLatch = mockLeaderLatch.getOrElse { + new LeaderLatch(client, leaderKey, getCurrentId()) + } + leaderLatch.addListener(this) + + var currentState = HAState.Standby + override def isLeader() { + transitionToActive() + } + + override def notLeader(){ + transitionToStandby() + } + + def getCurrentId(): String = { + val currentHostname = java.net.InetAddress.getLocalHost().getCanonicalHostName(); + debug("This server's current hostname is: " + currentHostname) + debug("This hostnames for valid leaders are: " + leaderHostnames) + val currentId = leaderIds(leaderHostnames indexOf currentHostname) + debug("This server's designated ID is: " + currentId) + currentId + } + + def getActiveHostname(): String = { + val activeLeaderId = leaderLatch.getLeader().getId() + val activeAddress = leaderHostnames(leaderIds indexOf activeLeaderId) + activeAddress + } + + def start(): Unit = { + server.initHa(this) + transitionToStandby() + + server.start() + client.start() + leaderLatch.start() + + try { + Thread.currentThread.join() + } finally { + transitionToStandby() + } + } + + def close(): Unit = { + transitionToStandby() + leaderLatch.close() + } + + def transitionToActive(): Unit = { + info("Transitioning to Active state") + if(currentState == HAState.Active) { + info("Already in Active State") + } + else { + server.restart() + currentState = HAState.Active + info("Transition complete") + } + } + + def transitionToStandby(): Unit = { + info("Transitioning to Standby state") + if(currentState == HAState.Standby) { + info("Already in Standby State") + } + else { + currentState = HAState.Standby + info("Transition complete") + } + } +} + diff --git a/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala b/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala new file mode 100644 index 000000000..292fcd03c --- /dev/null +++ b/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.server + +import java.io.IOException +import java.net.URL +import javax.servlet.Filter +import javax.servlet.FilterChain +import javax.servlet.FilterConfig +import javax.servlet.ServletContext +import javax.servlet.ServletException +import javax.servlet.ServletRequest +import javax.servlet.ServletRequestWrapper +import javax.servlet.ServletResponse +import javax.servlet.http.HttpServletRequest +import javax.servlet.http.HttpServletRequestWrapper +import javax.servlet.http.HttpServletResponse + +import org.springframework.web.util.UriComponentsBuilder + +import org.apache.livy.{LivyConf, Logging} + +class DomainRedirectionFilter(haService: CuratorElectorService) extends Filter + with Logging +{ + + val METHODS_TO_IGNORE = Set("GET", "OPTIONS", "HEAD") + + val HEADER_NAME = "X-Requested-By" + + def isLeader(): Boolean = { + haService.currentState == HAState.Active + } + + override def init(filterConfig: FilterConfig): Unit = {} + + override def doFilter(request: ServletRequest, + response: ServletResponse, + chain: FilterChain): Unit = { + if (!isLeader()) { + debug("active leader's hostnames are:" + haService.getActiveHostname()) + debug("current id:" + haService.getCurrentId()) + val httpRequest = request.asInstanceOf[HttpServletRequest] + val queryOpt: Option[String] = Option(httpRequest.getQueryString()) + val requestURL = httpRequest.getRequestURL().toString() + debug("requested url: " + requestURL) + + val builder = UriComponentsBuilder.fromHttpUrl(requestURL) + val activeURL = builder.host(haService.getActiveHostname()).toUriString() + val redirectURL = if (queryOpt.isEmpty) activeURL else activeURL + "?" + queryOpt.get + debug("redirected url:" + redirectURL) + + val httpServletResponse = response.asInstanceOf[HttpServletResponse]; + val redirectMsg = "This is a standby Livy Instance. The redirect url is: " + redirectURL + val out = httpServletResponse.getWriter() + // scalastyle:off println + out.println(redirectMsg) + // scalastyle:on println + + httpServletResponse.setHeader("Location", redirectURL) + httpServletResponse.setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT) + } else { + chain.doFilter(request, response); + } + } + + override def destroy(): Unit = {} +} + diff --git a/server/src/main/scala/org/apache/livy/server/LivyServer.scala b/server/src/main/scala/org/apache/livy/server/LivyServer.scala index 3e715bdf1..32e94afdd 100644 --- a/server/src/main/scala/org/apache/livy/server/LivyServer.scala +++ b/server/src/main/scala/org/apache/livy/server/LivyServer.scala @@ -27,6 +27,7 @@ import scala.collection.JavaConverters._ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future +import org.apache.curator.utils.CloseableUtils import org.apache.hadoop.security.{SecurityUtil, UserGroupInformation} import org.apache.hadoop.security.authentication.server._ import org.eclipse.jetty.servlet.FilterHolder @@ -62,9 +63,13 @@ class LivyServer extends Logging { private var zkManager: Option[ZooKeeperManager] = None + private var interactiveSessionManager: InteractiveSessionManager = _ + private var batchSessionManager: BatchSessionManager = _ + private var sessionStore: SessionStore = _ + private var ugi: UserGroupInformation = _ - def start(): Unit = { + def init(): Unit = { livyConf = new LivyConf().loadFromFile("livy.conf") accessManager = new AccessManager(livyConf) @@ -154,9 +159,9 @@ class LivyServer extends Logging { } StateStore.init(livyConf, zkManager) - val sessionStore = new SessionStore(livyConf) - val batchSessionManager = new BatchSessionManager(livyConf, sessionStore) - val interactiveSessionManager = new InteractiveSessionManager(livyConf, sessionStore) + sessionStore = new SessionStore(livyConf) + batchSessionManager = new BatchSessionManager(livyConf, sessionStore) + interactiveSessionManager = new InteractiveSessionManager(livyConf, sessionStore) server = new WebServer(livyConf, host, port) server.context.setResourceBase("src/main/org/apache/livy/server") @@ -320,7 +325,22 @@ class LivyServer extends Logging { val accessHolder = new FilterHolder(new AccessFilter(accessManager)) server.context.addFilter(accessHolder, "/*", EnumSet.allOf(classOf[DispatcherType])) } + } + + def initHa(electorService: CuratorElectorService): Unit = { + // Start server HA leader election service if applicable + if (livyConf.get(LivyConf.HA_MODE) == HighAvailabilitySettings.HA_ON) { + info("Starting HA connection") + + val redirectHolder = new FilterHolder(new DomainRedirectionFilter(electorService)) + server.context.addFilter(redirectHolder, "/*", EnumSet.allOf(classOf[DispatcherType])) + } + } + def start(): Unit = { + info("Starting HA connection") + interactiveSessionManager.recoverSessions() + batchSessionManager.recoverSessions() server.start() _thriftServerFactory.foreach { @@ -329,10 +349,7 @@ class LivyServer extends Logging { Runtime.getRuntime().addShutdownHook(new Thread("Livy Server Shutdown") { override def run(): Unit = { - info("Shutting down Livy server.") - zkManager.foreach(_.stop()) - server.stop() - _thriftServerFactory.foreach(_.stop()) + stop() } }) @@ -391,10 +408,21 @@ class LivyServer extends Logging { def stop(): Unit = { if (server != null) { - server.stop() + info("Shutting down Livy server.") + server.stop() + _thriftServerFactory.foreach(_.stop()) + if (livyConf.get(LivyConf.HA_MODE) != HighAvailabilitySettings.HA_ON) { + zkManager.foreach(_.stop()) + } } } + def restart(): Unit = + { + stop() + start() + } + def serverUrl(): String = { _serverUrl.getOrElse(throw new IllegalStateException("Server not yet started.")) } @@ -423,16 +451,31 @@ class LivyServer extends Logging { } } +object HighAvailabilitySettings { + val HA_ON = "on" + val HA_OFF = "off" +} + object LivyServer { def main(args: Array[String]): Unit = { val server = new LivyServer() - try { - server.start() - server.join() - } finally { - server.stop() + val livyConf = new LivyConf().loadFromFile("livy.conf") + + server.init() + if(livyConf.get(LivyConf.HA_MODE) == HighAvailabilitySettings.HA_ON) { + info("Starting HA connection") + val electorService: CuratorElectorService = new CuratorElectorService(livyConf, server) + electorService.start() + } + else { + try { + server.start() + server.join() + } finally { + server.stop() + } } } - } + diff --git a/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala b/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala index f2548ac00..e2245a7a5 100644 --- a/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala +++ b/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala @@ -75,7 +75,6 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag]( protected[this] final val sessions = mutable.LinkedHashMap[Int, S]() private[this] final val sessionsByName = mutable.HashMap[String, S]() - private[this] final val sessionTimeoutCheck = livyConf.getBoolean(LivyConf.SESSION_TIMEOUT_CHECK) private[this] final val sessionTimeoutCheckSkipBusy = livyConf.getBoolean(LivyConf.SESSION_TIMEOUT_CHECK_SKIP_BUSY) @@ -84,8 +83,15 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag]( private[this] final val sessionStateRetainedInSec = TimeUnit.MILLISECONDS.toNanos(livyConf.getTimeAsMs(LivyConf.SESSION_STATE_RETAIN_TIME)) - mockSessions.getOrElse(recover()).foreach(register) new GarbageCollector().start() + recoverSessions() + + def recoverSessions(): Unit = { + idCounter.set(0) + sessions.clear() + sessionsByName.clear() + mockSessions.getOrElse(recover()).foreach(register) + } def nextId(): Int = synchronized { val id = idCounter.getAndIncrement() @@ -209,7 +215,5 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag]( Thread.sleep(60 * 1000) } } - } - } diff --git a/server/src/test/scala/org/apache/livy/server/recovery/CuratorElectorServiceSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/CuratorElectorServiceSpec.scala new file mode 100644 index 000000000..8b037b5fb --- /dev/null +++ b/server/src/test/scala/org/apache/livy/server/recovery/CuratorElectorServiceSpec.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.server.recovery + +import scala.collection.JavaConverters._ + +import org.apache.curator.framework.CuratorFramework +import org.apache.curator.framework.api._ +import org.apache.curator.framework.listen.Listenable +import org.apache.curator.framework.recipes.leader.LeaderLatch +import org.apache.zookeeper.data.Stat +import org.mockito.Mockito._ +import org.scalatest.FunSpec +import org.scalatest.Matchers._ +import org.scalatest.mock.MockitoSugar.mock + +import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf} +import org.apache.livy.server.CuratorElectorService +import org.apache.livy.server.HAState +import org.apache.livy.server.LivyServer + +class CuratorElectorServiceSpec extends FunSpec with LivyBaseUnitTestSuite { + describe("CuratorElectorService") { + case class TestFixture(electorService: CuratorElectorService) + val conf = new LivyConf() + conf.set(LivyConf.HA_ZOOKEEPER_URL, "host") + + // Need to create mock leader latches and their associated functions + def withMock[R](testBody: TestFixture => R): R = { + val curatorClient = mock[CuratorFramework] + when(curatorClient.getUnhandledErrorListenable()) + .thenReturn(mock[Listenable[UnhandledErrorListener]]) + val leaderLatch = mock[LeaderLatch] + + val server = mock[LivyServer] + val electorService = new CuratorElectorService(conf, server, + Some(curatorClient), Some(leaderLatch)) + + testBody(TestFixture(electorService)) + } + + it("should not start the server until it acquires leadership") { + withMock { f => + f.electorService.currentState shouldBe HAState.Standby + verify(f.electorService.server, times(0)).start() + } + } + + it("should restart the livy server after acquiring leadership") { + withMock { f => + f.electorService.isLeader() + f.electorService.currentState shouldBe HAState.Active + verify(f.electorService.server, times(1)).restart() + } + } + + it("should be in standy state if loses leadership") { + withMock { f => + f.electorService.isLeader() + f.electorService.notLeader() + f.electorService.currentState shouldBe HAState.Standby + } + } + + it("should restart the Livy Server again after reacquiring leadership") { + withMock { f => + f.electorService.isLeader() + f.electorService.notLeader() + f.electorService.isLeader() + f.electorService.currentState shouldBe HAState.Active + verify(f.electorService.server, times(2)).restart() + } + } + } +} + diff --git a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala index 7f5e31e26..fde67ae43 100644 --- a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala +++ b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala @@ -232,6 +232,7 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit .thenReturn(validMetadata ++ invalidMetadata) val sm = new BatchSessionManager(conf, sessionStore) + sm.recoverSessions() sm.nextId() shouldBe nextId validMetadata.foreach { m => sm.get(m.get.id) shouldBe defined @@ -248,6 +249,7 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit val session = mockSession(sessionId) val sm = new BatchSessionManager(conf, sessionStore, Some(Seq(session))) + sm.recoverSessions() sm.get(sessionId) shouldBe defined Await.ready(sm.delete(sessionId).get, 30 seconds) @@ -263,6 +265,7 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit val session = mockSession(sessionId) val sm = new BatchSessionManager(conf, sessionStore, Some(Seq(session))) + sm.recoverSessions() sm.get(sessionId) shouldBe defined sm.shutdown() @@ -278,6 +281,7 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit val session = mockSession(sessionId) val sm = new BatchSessionManager(conf, sessionStore, Some(Seq(session))) + sm.recoverSessions() sm.get(sessionId) shouldBe defined sm.shutdown()