From b66968feac1a2d113a7acbe5c9b7d8f7e6e138a0 Mon Sep 17 00:00:00 2001 From: Andrew Fogarty Date: Thu, 13 Aug 2020 17:19:47 -0700 Subject: [PATCH 01/16] Add yarn state to AppInfo --- .../org/apache/livy/server/batch/BatchSession.scala | 1 + .../livy/server/interactive/InteractiveSession.scala | 1 + .../main/scala/org/apache/livy/utils/SparkApp.scala | 10 ++++++++-- 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala index 16f9d4d11..e068fb59f 100644 --- a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala +++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala @@ -182,6 +182,7 @@ class BatchSession( override def stateChanged(oldState: SparkApp.State, newState: SparkApp.State): Unit = { synchronized { debug(s"$this state changed from $oldState to $newState") + this.appInfo.appState = Some(newState) newState match { case SparkApp.State.RUNNING => _state = SessionState.Running diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala index c4c273acd..302de3677 100644 --- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala +++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala @@ -634,6 +634,7 @@ class InteractiveSession( override def stateChanged(oldState: SparkApp.State, newState: SparkApp.State): Unit = { synchronized { debug(s"$this app state changed from $oldState to $newState") + this.appInfo.appState = Some(newState) newState match { case SparkApp.State.FINISHED | SparkApp.State.FAILED => transition(SessionState.Dead()) diff --git a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala index 9afe28162..dc5c4a3f1 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala @@ -24,12 +24,18 @@ import org.apache.livy.LivyConf object AppInfo { val DRIVER_LOG_URL_NAME = "driverLogUrl" val SPARK_UI_URL_NAME = "sparkUiUrl" + val APP_STATE_NAME = "appState" } -case class AppInfo(var driverLogUrl: Option[String] = None, var sparkUiUrl: Option[String] = None) { +case class AppInfo( + var driverLogUrl: Option[String] = None, + var sparkUiUrl: Option[String] = None, + var appState: Option[SparkApp.State]) { import AppInfo._ def asJavaMap: java.util.Map[String, String] = - Map(DRIVER_LOG_URL_NAME -> driverLogUrl.orNull, SPARK_UI_URL_NAME -> sparkUiUrl.orNull).asJava + Map(DRIVER_LOG_URL_NAME -> driverLogUrl.orNull, + SPARK_UI_URL_NAME -> sparkUiUrl.orNull, + APP_STATE_NAME -> appState.map(s => s.toString).orNull).asJava } trait SparkAppListener { From 0bd8ede07d0ca8748016bf74ae5530407afb886d Mon Sep 17 00:00:00 2001 From: Andrew Fogarty Date: Thu, 13 Aug 2020 17:33:51 -0700 Subject: [PATCH 02/16] Fixed: Default to None --- server/src/main/scala/org/apache/livy/utils/SparkApp.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala index dc5c4a3f1..a1edb6b91 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala @@ -30,7 +30,7 @@ object AppInfo { case class AppInfo( var driverLogUrl: Option[String] = None, var sparkUiUrl: Option[String] = None, - var appState: Option[SparkApp.State]) { + var appState: Option[SparkApp.State] = None) { import AppInfo._ def asJavaMap: java.util.Map[String, String] = Map(DRIVER_LOG_URL_NAME -> driverLogUrl.orNull, From a1623c891e22938f26b754b742ea16ac2e2fbfb6 Mon Sep 17 00:00:00 2001 From: Andrew Fogarty Date: Thu, 13 Aug 2020 18:33:41 -0700 Subject: [PATCH 03/16] Add some tests --- .../apache/livy/server/batch/BatchSessionSpec.scala | 4 ++++ .../server/interactive/InteractiveSessionSpec.scala | 11 +++++++++++ 2 files changed, 15 insertions(+) diff --git a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala index 20e6136b0..dad7e293f 100644 --- a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala @@ -136,6 +136,10 @@ class BatchSessionSpec case SessionState.Killed(_) => true case _ => false }) should be (true) + (batch.appInfo.appState match { + case SparkApp.State.KILLED => true + case _ => false + }) should be (true) } def testRecoverSession(name: Option[String]): Unit = { diff --git a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala index d13e68263..6d3834bb3 100644 --- a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala @@ -176,6 +176,17 @@ class InteractiveSessionSpec extends FunSpec session.state should (be(SessionState.Starting) or be(SessionState.Idle)) } + it ("should update appState once started") { + val mockApp = mock[SparkApp] + val sessionStore = mock[SessionStore] + session = createSession(sessionStore, Some(mockApp)) + session.start() + + eventually(timeout(10 seconds), interval(30 millis)) { + session.appInfo.appState shouldBe (SparkApp.State.RUNNING) + } + } + it("should propagate RSC configuration properties") { val livyConf = new LivyConf(false) .set(LivyConf.REPL_JARS, "dummy.jar") From 53a1b31117c1038f5b7eb2c110bdbe1bd668865d Mon Sep 17 00:00:00 2001 From: Andrew Fogarty Date: Fri, 14 Aug 2020 11:07:41 -0700 Subject: [PATCH 04/16] Test: Remove new tests --- .../livy/server/batch/BatchSessionSpec.scala | 8 ++++---- .../interactive/InteractiveSessionSpec.scala | 20 +++++++++---------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala index dad7e293f..dc9f58906 100644 --- a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala @@ -136,10 +136,10 @@ class BatchSessionSpec case SessionState.Killed(_) => true case _ => false }) should be (true) - (batch.appInfo.appState match { - case SparkApp.State.KILLED => true - case _ => false - }) should be (true) + //(batch.appInfo.appState match { + // case SparkApp.State.KILLED => true + // case _ => false + //}) should be (true) } def testRecoverSession(name: Option[String]): Unit = { diff --git a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala index 6d3834bb3..a2851fd00 100644 --- a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala @@ -176,16 +176,16 @@ class InteractiveSessionSpec extends FunSpec session.state should (be(SessionState.Starting) or be(SessionState.Idle)) } - it ("should update appState once started") { - val mockApp = mock[SparkApp] - val sessionStore = mock[SessionStore] - session = createSession(sessionStore, Some(mockApp)) - session.start() - - eventually(timeout(10 seconds), interval(30 millis)) { - session.appInfo.appState shouldBe (SparkApp.State.RUNNING) - } - } + //it ("should update appState once started") { + // val mockApp = mock[SparkApp] + // val sessionStore = mock[SessionStore] + // session = createSession(sessionStore, Some(mockApp)) + // session.start() + // + // eventually(timeout(10 seconds), interval(30 millis)) { + // session.appInfo.appState shouldBe (SparkApp.State.RUNNING) + // } + //} it("should propagate RSC configuration properties") { val livyConf = new LivyConf(false) From 4edc259662632d788c7fb133b620242372344b17 Mon Sep 17 00:00:00 2001 From: Andrew Fogarty Date: Fri, 14 Aug 2020 12:00:14 -0700 Subject: [PATCH 05/16] Remove commented tests --- .../apache/livy/server/batch/BatchSessionSpec.scala | 4 ---- .../server/interactive/InteractiveSessionSpec.scala | 11 ----------- 2 files changed, 15 deletions(-) diff --git a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala index dc9f58906..20e6136b0 100644 --- a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala @@ -136,10 +136,6 @@ class BatchSessionSpec case SessionState.Killed(_) => true case _ => false }) should be (true) - //(batch.appInfo.appState match { - // case SparkApp.State.KILLED => true - // case _ => false - //}) should be (true) } def testRecoverSession(name: Option[String]): Unit = { diff --git a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala index a2851fd00..d13e68263 100644 --- a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala @@ -176,17 +176,6 @@ class InteractiveSessionSpec extends FunSpec session.state should (be(SessionState.Starting) or be(SessionState.Idle)) } - //it ("should update appState once started") { - // val mockApp = mock[SparkApp] - // val sessionStore = mock[SessionStore] - // session = createSession(sessionStore, Some(mockApp)) - // session.start() - // - // eventually(timeout(10 seconds), interval(30 millis)) { - // session.appInfo.appState shouldBe (SparkApp.State.RUNNING) - // } - //} - it("should propagate RSC configuration properties") { val livyConf = new LivyConf(false) .set(LivyConf.REPL_JARS, "dummy.jar") From a642978095a6d1dcc1ff29572d7bdf13b5b7e44b Mon Sep 17 00:00:00 2001 From: Andrew Fogarty Date: Fri, 14 Aug 2020 17:08:15 -0700 Subject: [PATCH 06/16] New approach --- .../scala/org/apache/livy/server/batch/BatchSession.scala | 1 - .../livy/server/interactive/InteractiveSession.scala | 1 - .../main/scala/org/apache/livy/utils/SparkYarnApp.scala | 7 ++++--- .../scala/org/apache/livy/utils/SparkYarnAppSpec.scala | 2 +- 4 files changed, 5 insertions(+), 6 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala index e068fb59f..16f9d4d11 100644 --- a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala +++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala @@ -182,7 +182,6 @@ class BatchSession( override def stateChanged(oldState: SparkApp.State, newState: SparkApp.State): Unit = { synchronized { debug(s"$this state changed from $oldState to $newState") - this.appInfo.appState = Some(newState) newState match { case SparkApp.State.RUNNING => _state = SessionState.Running diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala index 302de3677..c4c273acd 100644 --- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala +++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala @@ -634,7 +634,6 @@ class InteractiveSession( override def stateChanged(oldState: SparkApp.State, newState: SparkApp.State): Unit = { synchronized { debug(s"$this app state changed from $oldState to $newState") - this.appInfo.appState = Some(newState) newState match { case SparkApp.State.FINISHED | SparkApp.State.FAILED => transition(SessionState.Dead()) diff --git a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala index a245823e3..441fd9bc2 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala @@ -285,10 +285,11 @@ class SparkYarnApp private[utils] ( // Refresh application state val appReport = yarnClient.getApplicationReport(appId) yarnDiagnostics = getYarnDiagnostics(appReport) - changeState(mapYarnState( + val state = mapYarnState( appReport.getApplicationId, appReport.getYarnApplicationState, - appReport.getFinalApplicationStatus)) + appReport.getFinalApplicationStatus) + changeState(state) if (isProcessErrExit()) { if (killed) { @@ -304,7 +305,7 @@ class SparkYarnApp private[utils] ( val driverLogUrl = Try(yarnClient.getContainerReport(attempt.getAMContainerId).getLogUrl) .toOption - AppInfo(driverLogUrl, Option(appReport.getTrackingUrl)) + AppInfo(driverLogUrl, Option(appReport.getTrackingUrl), Some(state)) } if (appInfo != latestAppInfo) { diff --git a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala index ddd97674a..61307d580 100644 --- a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala +++ b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala @@ -380,7 +380,7 @@ class SparkYarnAppSpec extends FunSpec with LivyBaseUnitTestSuite { verify(mockAppReport, atLeast(1)).getTrackingUrl() verify(mockContainerReport, atLeast(1)).getLogUrl() verify(mockListener).appIdKnown(appId.toString) - verify(mockListener).infoChanged(AppInfo(Some(driverLogUrl), Some(sparkUiUrl))) + verify(mockListener).infoChanged(AppInfo(Some(driverLogUrl), Some(sparkUiUrl), Some(SparkApp.State.FINISHED))) } } } From 7c6c5e46209b536565f4f55ee26f63eac9962e25 Mon Sep 17 00:00:00 2001 From: Andrew Fogarty Date: Fri, 14 Aug 2020 17:27:16 -0700 Subject: [PATCH 07/16] Unit test --- .../src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala index 61307d580..6c5a7bc2c 100644 --- a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala +++ b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala @@ -322,7 +322,7 @@ class SparkYarnAppSpec extends FunSpec with LivyBaseUnitTestSuite { } } - it("should expose driver log url and Spark UI url") { + it("should expose driver log url, Spark UI url, and app state") { Clock.withSleepMethod(mockSleep) { val mockYarnClient = mock[YarnClient] val driverLogUrl = "DRIVER LOG URL" From e24feadf0976ff1878bba215e86e9facaff8a4ba Mon Sep 17 00:00:00 2001 From: Andrew Fogarty Date: Fri, 14 Aug 2020 18:03:53 -0700 Subject: [PATCH 08/16] Fixed: line too long --- .../test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala index 6c5a7bc2c..b7e7fe1d6 100644 --- a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala +++ b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala @@ -380,7 +380,8 @@ class SparkYarnAppSpec extends FunSpec with LivyBaseUnitTestSuite { verify(mockAppReport, atLeast(1)).getTrackingUrl() verify(mockContainerReport, atLeast(1)).getLogUrl() verify(mockListener).appIdKnown(appId.toString) - verify(mockListener).infoChanged(AppInfo(Some(driverLogUrl), Some(sparkUiUrl), Some(SparkApp.State.FINISHED))) + verify(mockListener).infoChanged( + AppInfo(Some(driverLogUrl), Some(sparkUiUrl), Some(SparkApp.State.FINISHED))) } } } From 572f52f3467d8b7cf6eeb65af9ab1ca88725b072 Mon Sep 17 00:00:00 2001 From: Andrew Fogarty Date: Fri, 14 Aug 2020 18:10:12 -0700 Subject: [PATCH 09/16] Also test the servlet --- .../org/apache/livy/server/batch/BatchServletSpec.scala | 5 ++--- .../server/interactive/InteractiveSessionServletSpec.scala | 6 +++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/server/src/test/scala/org/apache/livy/server/batch/BatchServletSpec.scala b/server/src/test/scala/org/apache/livy/server/batch/BatchServletSpec.scala index 5a84035b8..89b25a4b2 100644 --- a/server/src/test/scala/org/apache/livy/server/batch/BatchServletSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/batch/BatchServletSpec.scala @@ -27,12 +27,11 @@ import scala.concurrent.duration.Duration import org.mockito.Mockito._ import org.scalatestplus.mockito.MockitoSugar.mock - import org.apache.livy.{LivyConf, Utils} import org.apache.livy.server.{AccessManager, BaseSessionServletSpec} import org.apache.livy.server.recovery.SessionStore import org.apache.livy.sessions.{BatchSessionManager, SessionState} -import org.apache.livy.utils.AppInfo +import org.apache.livy.utils.{AppInfo, SparkApp} class BatchServletSpec extends BaseSessionServletSpec[BatchSession, BatchRecoveryMetadata] { @@ -68,7 +67,7 @@ class BatchServletSpec extends BaseSessionServletSpec[BatchSession, BatchRecover val appId = "appid" val owner = "owner" val proxyUser = "proxyUser" - val appInfo = AppInfo(Some("DRIVER LOG URL"), Some("SPARK UI URL")) + val appInfo = AppInfo(Some("DRIVER LOG URL"), Some("SPARK UI URL"), Some(SparkApp.State.RUNNING)) val log = IndexedSeq[String]("log1", "log2") val session = mock[BatchSession] diff --git a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala index 78407d5ad..c879fbf4b 100644 --- a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala @@ -33,14 +33,13 @@ import org.mockito.stubbing.Answer import org.scalatest.Entry import org.scalatest.concurrent.Eventually._ import org.scalatestplus.mockito.MockitoSugar.mock - import org.apache.livy.{ExecuteRequest, LivyConf} import org.apache.livy.client.common.HttpMessages.SessionInfo import org.apache.livy.rsc.driver.{Statement, StatementState} import org.apache.livy.server.AccessManager import org.apache.livy.server.recovery.SessionStore import org.apache.livy.sessions._ -import org.apache.livy.utils.AppInfo +import org.apache.livy.utils.{AppInfo, SparkApp} class InteractiveSessionServletSpec extends BaseInteractiveServletSpec { @@ -168,7 +167,7 @@ class InteractiveSessionServletSpec extends BaseInteractiveServletSpec { val proxyUser = "proxyUser" val state = SessionState.Running val kind = Spark - val appInfo = AppInfo(Some("DRIVER LOG URL"), Some("SPARK UI URL")) + val appInfo = AppInfo(Some("DRIVER LOG URL"), Some("SPARK UI URL"), Some(SparkApp.State.RUNNING)) val log = IndexedSeq[String]("log1", "log2") val session = mock[InteractiveSession] @@ -197,6 +196,7 @@ class InteractiveSessionServletSpec extends BaseInteractiveServletSpec { view.kind shouldEqual kind.toString view.appInfo should contain (Entry(AppInfo.DRIVER_LOG_URL_NAME, appInfo.driverLogUrl.get)) view.appInfo should contain (Entry(AppInfo.SPARK_UI_URL_NAME, appInfo.sparkUiUrl.get)) + view.appInfo should contain (Entry(AppInfo.APP_STATE_NAME, appInfo.appState.get)) view.log shouldEqual log.asJava } From 8ec2c200562b5d936caf7f217cbc71fbc788fbe8 Mon Sep 17 00:00:00 2001 From: Andrew Fogarty Date: Sun, 16 Aug 2020 13:09:58 -0700 Subject: [PATCH 10/16] Clean up scalastyle --- .../org/apache/livy/server/batch/BatchServletSpec.scala | 6 +++++- .../interactive/InteractiveSessionServletSpec.scala | 8 ++++++-- .../scala/org/apache/livy/utils/SparkYarnAppSpec.scala | 2 +- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/server/src/test/scala/org/apache/livy/server/batch/BatchServletSpec.scala b/server/src/test/scala/org/apache/livy/server/batch/BatchServletSpec.scala index 89b25a4b2..c7eae3e18 100644 --- a/server/src/test/scala/org/apache/livy/server/batch/BatchServletSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/batch/BatchServletSpec.scala @@ -27,6 +27,7 @@ import scala.concurrent.duration.Duration import org.mockito.Mockito._ import org.scalatestplus.mockito.MockitoSugar.mock + import org.apache.livy.{LivyConf, Utils} import org.apache.livy.server.{AccessManager, BaseSessionServletSpec} import org.apache.livy.server.recovery.SessionStore @@ -67,7 +68,10 @@ class BatchServletSpec extends BaseSessionServletSpec[BatchSession, BatchRecover val appId = "appid" val owner = "owner" val proxyUser = "proxyUser" - val appInfo = AppInfo(Some("DRIVER LOG URL"), Some("SPARK UI URL"), Some(SparkApp.State.RUNNING)) + val appInfo = AppInfo( + Some("DRIVER LOG URL"), + Some("SPARK UI URL"), + Some(SparkApp.State.RUNNING)) val log = IndexedSeq[String]("log1", "log2") val session = mock[BatchSession] diff --git a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala index c879fbf4b..6871ad761 100644 --- a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala @@ -21,8 +21,8 @@ import java.util.concurrent.atomic.AtomicInteger import javax.servlet.http.{HttpServletRequest, HttpServletResponse} import scala.collection.JavaConverters._ -import scala.concurrent.duration._ import scala.concurrent.Future +import scala.concurrent.duration._ import scala.language.postfixOps import org.json4s.jackson.Json4sScalaModule @@ -33,6 +33,7 @@ import org.mockito.stubbing.Answer import org.scalatest.Entry import org.scalatest.concurrent.Eventually._ import org.scalatestplus.mockito.MockitoSugar.mock + import org.apache.livy.{ExecuteRequest, LivyConf} import org.apache.livy.client.common.HttpMessages.SessionInfo import org.apache.livy.rsc.driver.{Statement, StatementState} @@ -167,7 +168,10 @@ class InteractiveSessionServletSpec extends BaseInteractiveServletSpec { val proxyUser = "proxyUser" val state = SessionState.Running val kind = Spark - val appInfo = AppInfo(Some("DRIVER LOG URL"), Some("SPARK UI URL"), Some(SparkApp.State.RUNNING)) + val appInfo = AppInfo( + Some("DRIVER LOG URL"), + Some("SPARK UI URL"), + Some(SparkApp.State.RUNNING)) val log = IndexedSeq[String]("log1", "log2") val session = mock[InteractiveSession] diff --git a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala index b7e7fe1d6..d8947eb2b 100644 --- a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala +++ b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala @@ -33,8 +33,8 @@ import org.apache.hadoop.yarn.util.ConverterUtils import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer -import org.scalatest.concurrent.Eventually import org.scalatest.FunSpec +import org.scalatest.concurrent.Eventually import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf, Utils} From 822cb329685ebfcc4f81c0f5798b85b691924715 Mon Sep 17 00:00:00 2001 From: Andrew Fogarty Date: Sun, 16 Aug 2020 13:57:19 -0700 Subject: [PATCH 11/16] Fixed: Compare against string --- .../livy/server/interactive/InteractiveSessionServletSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala index 6871ad761..998cb895a 100644 --- a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala @@ -200,7 +200,7 @@ class InteractiveSessionServletSpec extends BaseInteractiveServletSpec { view.kind shouldEqual kind.toString view.appInfo should contain (Entry(AppInfo.DRIVER_LOG_URL_NAME, appInfo.driverLogUrl.get)) view.appInfo should contain (Entry(AppInfo.SPARK_UI_URL_NAME, appInfo.sparkUiUrl.get)) - view.appInfo should contain (Entry(AppInfo.APP_STATE_NAME, appInfo.appState.get)) + view.appInfo should contain (Entry(AppInfo.APP_STATE_NAME, appInfo.appState.get.toString)) view.log shouldEqual log.asJava } From f4590d89623d791bac7c6e0f9c15fd853d3ccd55 Mon Sep 17 00:00:00 2001 From: Andrew Fogarty Date: Sun, 16 Aug 2020 15:41:34 -0700 Subject: [PATCH 12/16] Test with JsonProperty --- server/src/main/scala/org/apache/livy/utils/SparkApp.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala index a1edb6b91..96e5ab4b1 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala @@ -19,6 +19,8 @@ package org.apache.livy.utils import scala.collection.JavaConverters._ +import com.fasterxml.jackson.annotation.JsonProperty + import org.apache.livy.LivyConf object AppInfo { @@ -30,6 +32,7 @@ object AppInfo { case class AppInfo( var driverLogUrl: Option[String] = None, var sparkUiUrl: Option[String] = None, + @JsonProperty var appState: Option[SparkApp.State] = None) { import AppInfo._ def asJavaMap: java.util.Map[String, String] = From 852d3db2f0ba10d11926347d0669476b6d09f8bf Mon Sep 17 00:00:00 2001 From: Andrew Fogarty Date: Mon, 17 Aug 2020 10:16:09 -0700 Subject: [PATCH 13/16] Another enumeration attempt --- server/src/main/scala/org/apache/livy/utils/SparkApp.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala index 96e5ab4b1..55a4bbe5d 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala @@ -19,7 +19,7 @@ package org.apache.livy.utils import scala.collection.JavaConverters._ -import com.fasterxml.jackson.annotation.JsonProperty +import com.fasterxml.jackson.module.scala.JsonScalaEnumeration import org.apache.livy.LivyConf @@ -32,7 +32,7 @@ object AppInfo { case class AppInfo( var driverLogUrl: Option[String] = None, var sparkUiUrl: Option[String] = None, - @JsonProperty + @JsonScalaEnumeration(classOf[SparkApp.State]) var appState: Option[SparkApp.State] = None) { import AppInfo._ def asJavaMap: java.util.Map[String, String] = From 9079a1045574d1a798269fbe61cebf34cb5cdca4 Mon Sep 17 00:00:00 2001 From: Andrew Fogarty Date: Mon, 17 Aug 2020 16:46:11 -0700 Subject: [PATCH 14/16] Enum type --- server/src/main/scala/org/apache/livy/utils/SparkApp.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala index 55a4bbe5d..922f0eee6 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala @@ -19,9 +19,11 @@ package org.apache.livy.utils import scala.collection.JavaConverters._ +import com.fasterxml.jackson.core.`type`.TypeReference import com.fasterxml.jackson.module.scala.JsonScalaEnumeration import org.apache.livy.LivyConf +import org.apache.livy.utils.SparkApp.StateTypeReference object AppInfo { val DRIVER_LOG_URL_NAME = "driverLogUrl" @@ -32,7 +34,7 @@ object AppInfo { case class AppInfo( var driverLogUrl: Option[String] = None, var sparkUiUrl: Option[String] = None, - @JsonScalaEnumeration(classOf[SparkApp.State]) + @JsonScalaEnumeration(classOf[StateTypeReference]) var appState: Option[SparkApp.State] = None) { import AppInfo._ def asJavaMap: java.util.Map[String, String] = @@ -63,6 +65,8 @@ object SparkApp { } type State = State.Value + class StateTypeReference extends TypeReference[State.type] + /** * Return cluster manager dependent SparkConf. * From bc3e7c10ff03484361c42cd402a76c0f581a5079 Mon Sep 17 00:00:00 2001 From: Andrew Fogarty Date: Tue, 18 Aug 2020 12:56:21 -0700 Subject: [PATCH 15/16] Address comments --- .../main/scala/org/apache/livy/utils/SparkApp.scala | 2 +- .../interactive/InteractiveSessionServletSpec.scala | 11 +++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala index 922f0eee6..7c8565f29 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala @@ -40,7 +40,7 @@ case class AppInfo( def asJavaMap: java.util.Map[String, String] = Map(DRIVER_LOG_URL_NAME -> driverLogUrl.orNull, SPARK_UI_URL_NAME -> sparkUiUrl.orNull, - APP_STATE_NAME -> appState.map(s => s.toString).orNull).asJava + APP_STATE_NAME -> appState.map(_.toString).orNull).asJava } trait SparkAppListener { diff --git a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala index 998cb895a..dce410730 100644 --- a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala @@ -202,6 +202,17 @@ class InteractiveSessionServletSpec extends BaseInteractiveServletSpec { view.appInfo should contain (Entry(AppInfo.SPARK_UI_URL_NAME, appInfo.sparkUiUrl.get)) view.appInfo should contain (Entry(AppInfo.APP_STATE_NAME, appInfo.appState.get.toString)) view.log shouldEqual log.asJava + + // Test case where appState=None. + val noStateAppInfo = AppInfo( + Some("DRIVER LOG URL"), + Some("SPARK UI URL")) + when(session.appInfo).thenReturn(noStateAppInfo) + val noStateView = servlet + .asInstanceOf[InteractiveSessionServlet] + .clientSessionView(session, req) + .asInstanceOf[SessionInfo] + noStateView.appInfo should contain (Entry(AppInfo.APP_STATE_NAME, null)) } private def waitSession(): Unit = { From e8ceafb14788f1c09c1c35abe313fc6f60d68b4f Mon Sep 17 00:00:00 2001 From: Andrew Fogarty Date: Fri, 21 Aug 2020 19:00:06 -0700 Subject: [PATCH 16/16] Spacing --- .../livy/server/interactive/InteractiveSessionServletSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala index dce410730..d617e10e6 100644 --- a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala @@ -203,7 +203,7 @@ class InteractiveSessionServletSpec extends BaseInteractiveServletSpec { view.appInfo should contain (Entry(AppInfo.APP_STATE_NAME, appInfo.appState.get.toString)) view.log shouldEqual log.asJava - // Test case where appState=None. + // Test case where appState = None. val noStateAppInfo = AppInfo( Some("DRIVER LOG URL"), Some("SPARK UI URL"))