diff --git a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala index 9afe28162..7c8565f29 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala @@ -19,17 +19,28 @@ package org.apache.livy.utils import scala.collection.JavaConverters._ +import com.fasterxml.jackson.core.`type`.TypeReference +import com.fasterxml.jackson.module.scala.JsonScalaEnumeration + import org.apache.livy.LivyConf +import org.apache.livy.utils.SparkApp.StateTypeReference object AppInfo { val DRIVER_LOG_URL_NAME = "driverLogUrl" val SPARK_UI_URL_NAME = "sparkUiUrl" + val APP_STATE_NAME = "appState" } -case class AppInfo(var driverLogUrl: Option[String] = None, var sparkUiUrl: Option[String] = None) { +case class AppInfo( + var driverLogUrl: Option[String] = None, + var sparkUiUrl: Option[String] = None, + @JsonScalaEnumeration(classOf[StateTypeReference]) + var appState: Option[SparkApp.State] = None) { import AppInfo._ def asJavaMap: java.util.Map[String, String] = - Map(DRIVER_LOG_URL_NAME -> driverLogUrl.orNull, SPARK_UI_URL_NAME -> sparkUiUrl.orNull).asJava + Map(DRIVER_LOG_URL_NAME -> driverLogUrl.orNull, + SPARK_UI_URL_NAME -> sparkUiUrl.orNull, + APP_STATE_NAME -> appState.map(_.toString).orNull).asJava } trait SparkAppListener { @@ -54,6 +65,8 @@ object SparkApp { } type State = State.Value + class StateTypeReference extends TypeReference[State.type] + /** * Return cluster manager dependent SparkConf. * diff --git a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala index a245823e3..441fd9bc2 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala @@ -285,10 +285,11 @@ class SparkYarnApp private[utils] ( // Refresh application state val appReport = yarnClient.getApplicationReport(appId) yarnDiagnostics = getYarnDiagnostics(appReport) - changeState(mapYarnState( + val state = mapYarnState( appReport.getApplicationId, appReport.getYarnApplicationState, - appReport.getFinalApplicationStatus)) + appReport.getFinalApplicationStatus) + changeState(state) if (isProcessErrExit()) { if (killed) { @@ -304,7 +305,7 @@ class SparkYarnApp private[utils] ( val driverLogUrl = Try(yarnClient.getContainerReport(attempt.getAMContainerId).getLogUrl) .toOption - AppInfo(driverLogUrl, Option(appReport.getTrackingUrl)) + AppInfo(driverLogUrl, Option(appReport.getTrackingUrl), Some(state)) } if (appInfo != latestAppInfo) { diff --git a/server/src/test/scala/org/apache/livy/server/batch/BatchServletSpec.scala b/server/src/test/scala/org/apache/livy/server/batch/BatchServletSpec.scala index 5a84035b8..c7eae3e18 100644 --- a/server/src/test/scala/org/apache/livy/server/batch/BatchServletSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/batch/BatchServletSpec.scala @@ -32,7 +32,7 @@ import org.apache.livy.{LivyConf, Utils} import org.apache.livy.server.{AccessManager, BaseSessionServletSpec} import org.apache.livy.server.recovery.SessionStore import org.apache.livy.sessions.{BatchSessionManager, SessionState} -import org.apache.livy.utils.AppInfo +import org.apache.livy.utils.{AppInfo, SparkApp} class BatchServletSpec extends BaseSessionServletSpec[BatchSession, BatchRecoveryMetadata] { @@ -68,7 +68,10 @@ class BatchServletSpec extends BaseSessionServletSpec[BatchSession, BatchRecover val appId = "appid" val owner = "owner" val proxyUser = "proxyUser" - val appInfo = AppInfo(Some("DRIVER LOG URL"), Some("SPARK UI URL")) + val appInfo = AppInfo( + Some("DRIVER LOG URL"), + Some("SPARK UI URL"), + Some(SparkApp.State.RUNNING)) val log = IndexedSeq[String]("log1", "log2") val session = mock[BatchSession] diff --git a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala index 78407d5ad..d617e10e6 100644 --- a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala @@ -21,8 +21,8 @@ import java.util.concurrent.atomic.AtomicInteger import javax.servlet.http.{HttpServletRequest, HttpServletResponse} import scala.collection.JavaConverters._ -import scala.concurrent.duration._ import scala.concurrent.Future +import scala.concurrent.duration._ import scala.language.postfixOps import org.json4s.jackson.Json4sScalaModule @@ -40,7 +40,7 @@ import org.apache.livy.rsc.driver.{Statement, StatementState} import org.apache.livy.server.AccessManager import org.apache.livy.server.recovery.SessionStore import org.apache.livy.sessions._ -import org.apache.livy.utils.AppInfo +import org.apache.livy.utils.{AppInfo, SparkApp} class InteractiveSessionServletSpec extends BaseInteractiveServletSpec { @@ -168,7 +168,10 @@ class InteractiveSessionServletSpec extends BaseInteractiveServletSpec { val proxyUser = "proxyUser" val state = SessionState.Running val kind = Spark - val appInfo = AppInfo(Some("DRIVER LOG URL"), Some("SPARK UI URL")) + val appInfo = AppInfo( + Some("DRIVER LOG URL"), + Some("SPARK UI URL"), + Some(SparkApp.State.RUNNING)) val log = IndexedSeq[String]("log1", "log2") val session = mock[InteractiveSession] @@ -197,7 +200,19 @@ class InteractiveSessionServletSpec extends BaseInteractiveServletSpec { view.kind shouldEqual kind.toString view.appInfo should contain (Entry(AppInfo.DRIVER_LOG_URL_NAME, appInfo.driverLogUrl.get)) view.appInfo should contain (Entry(AppInfo.SPARK_UI_URL_NAME, appInfo.sparkUiUrl.get)) + view.appInfo should contain (Entry(AppInfo.APP_STATE_NAME, appInfo.appState.get.toString)) view.log shouldEqual log.asJava + + // Test case where appState = None. + val noStateAppInfo = AppInfo( + Some("DRIVER LOG URL"), + Some("SPARK UI URL")) + when(session.appInfo).thenReturn(noStateAppInfo) + val noStateView = servlet + .asInstanceOf[InteractiveSessionServlet] + .clientSessionView(session, req) + .asInstanceOf[SessionInfo] + noStateView.appInfo should contain (Entry(AppInfo.APP_STATE_NAME, null)) } private def waitSession(): Unit = { diff --git a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala index ddd97674a..d8947eb2b 100644 --- a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala +++ b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala @@ -33,8 +33,8 @@ import org.apache.hadoop.yarn.util.ConverterUtils import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer -import org.scalatest.concurrent.Eventually import org.scalatest.FunSpec +import org.scalatest.concurrent.Eventually import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf, Utils} @@ -322,7 +322,7 @@ class SparkYarnAppSpec extends FunSpec with LivyBaseUnitTestSuite { } } - it("should expose driver log url and Spark UI url") { + it("should expose driver log url, Spark UI url, and app state") { Clock.withSleepMethod(mockSleep) { val mockYarnClient = mock[YarnClient] val driverLogUrl = "DRIVER LOG URL" @@ -380,7 +380,8 @@ class SparkYarnAppSpec extends FunSpec with LivyBaseUnitTestSuite { verify(mockAppReport, atLeast(1)).getTrackingUrl() verify(mockContainerReport, atLeast(1)).getLogUrl() verify(mockListener).appIdKnown(appId.toString) - verify(mockListener).infoChanged(AppInfo(Some(driverLogUrl), Some(sparkUiUrl))) + verify(mockListener).infoChanged( + AppInfo(Some(driverLogUrl), Some(sparkUiUrl), Some(SparkApp.State.FINISHED))) } } }