Skip to content
17 changes: 15 additions & 2 deletions server/src/main/scala/org/apache/livy/utils/SparkApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,28 @@ package org.apache.livy.utils

import scala.collection.JavaConverters._

import com.fasterxml.jackson.core.`type`.TypeReference
import com.fasterxml.jackson.module.scala.JsonScalaEnumeration

import org.apache.livy.LivyConf
import org.apache.livy.utils.SparkApp.StateTypeReference

object AppInfo {
val DRIVER_LOG_URL_NAME = "driverLogUrl"
val SPARK_UI_URL_NAME = "sparkUiUrl"
val APP_STATE_NAME = "appState"
}

case class AppInfo(var driverLogUrl: Option[String] = None, var sparkUiUrl: Option[String] = None) {
case class AppInfo(
var driverLogUrl: Option[String] = None,
var sparkUiUrl: Option[String] = None,
@JsonScalaEnumeration(classOf[StateTypeReference])
var appState: Option[SparkApp.State] = None) {
import AppInfo._
def asJavaMap: java.util.Map[String, String] =
Map(DRIVER_LOG_URL_NAME -> driverLogUrl.orNull, SPARK_UI_URL_NAME -> sparkUiUrl.orNull).asJava
Map(DRIVER_LOG_URL_NAME -> driverLogUrl.orNull,
SPARK_UI_URL_NAME -> sparkUiUrl.orNull,
APP_STATE_NAME -> appState.map(_.toString).orNull).asJava
}

trait SparkAppListener {
Expand All @@ -54,6 +65,8 @@ object SparkApp {
}
type State = State.Value

class StateTypeReference extends TypeReference[State.type]

/**
* Return cluster manager dependent SparkConf.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,10 +285,11 @@ class SparkYarnApp private[utils] (
// Refresh application state
val appReport = yarnClient.getApplicationReport(appId)
yarnDiagnostics = getYarnDiagnostics(appReport)
changeState(mapYarnState(
val state = mapYarnState(
appReport.getApplicationId,
appReport.getYarnApplicationState,
appReport.getFinalApplicationStatus))
appReport.getFinalApplicationStatus)
changeState(state)

if (isProcessErrExit()) {
if (killed) {
Expand All @@ -304,7 +305,7 @@ class SparkYarnApp private[utils] (
val driverLogUrl =
Try(yarnClient.getContainerReport(attempt.getAMContainerId).getLogUrl)
.toOption
AppInfo(driverLogUrl, Option(appReport.getTrackingUrl))
AppInfo(driverLogUrl, Option(appReport.getTrackingUrl), Some(state))
}

if (appInfo != latestAppInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.livy.{LivyConf, Utils}
import org.apache.livy.server.{AccessManager, BaseSessionServletSpec}
import org.apache.livy.server.recovery.SessionStore
import org.apache.livy.sessions.{BatchSessionManager, SessionState}
import org.apache.livy.utils.AppInfo
import org.apache.livy.utils.{AppInfo, SparkApp}

class BatchServletSpec extends BaseSessionServletSpec[BatchSession, BatchRecoveryMetadata] {

Expand Down Expand Up @@ -68,7 +68,10 @@ class BatchServletSpec extends BaseSessionServletSpec[BatchSession, BatchRecover
val appId = "appid"
val owner = "owner"
val proxyUser = "proxyUser"
val appInfo = AppInfo(Some("DRIVER LOG URL"), Some("SPARK UI URL"))
val appInfo = AppInfo(
Some("DRIVER LOG URL"),
Some("SPARK UI URL"),
Some(SparkApp.State.RUNNING))
val log = IndexedSeq[String]("log1", "log2")

val session = mock[BatchSession]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import java.util.concurrent.atomic.AtomicInteger
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}

import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.language.postfixOps

import org.json4s.jackson.Json4sScalaModule
Expand All @@ -40,7 +40,7 @@ import org.apache.livy.rsc.driver.{Statement, StatementState}
import org.apache.livy.server.AccessManager
import org.apache.livy.server.recovery.SessionStore
import org.apache.livy.sessions._
import org.apache.livy.utils.AppInfo
import org.apache.livy.utils.{AppInfo, SparkApp}

class InteractiveSessionServletSpec extends BaseInteractiveServletSpec {

Expand Down Expand Up @@ -168,7 +168,10 @@ class InteractiveSessionServletSpec extends BaseInteractiveServletSpec {
val proxyUser = "proxyUser"
val state = SessionState.Running
val kind = Spark
val appInfo = AppInfo(Some("DRIVER LOG URL"), Some("SPARK UI URL"))
val appInfo = AppInfo(
Some("DRIVER LOG URL"),
Some("SPARK UI URL"),
Some(SparkApp.State.RUNNING))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also test None case?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added verification that appState=None gets serialized to "appState": null.

val log = IndexedSeq[String]("log1", "log2")

val session = mock[InteractiveSession]
Expand Down Expand Up @@ -197,7 +200,19 @@ class InteractiveSessionServletSpec extends BaseInteractiveServletSpec {
view.kind shouldEqual kind.toString
view.appInfo should contain (Entry(AppInfo.DRIVER_LOG_URL_NAME, appInfo.driverLogUrl.get))
view.appInfo should contain (Entry(AppInfo.SPARK_UI_URL_NAME, appInfo.sparkUiUrl.get))
view.appInfo should contain (Entry(AppInfo.APP_STATE_NAME, appInfo.appState.get.toString))
view.log shouldEqual log.asJava

// Test case where appState = None.
val noStateAppInfo = AppInfo(
Some("DRIVER LOG URL"),
Some("SPARK UI URL"))
when(session.appInfo).thenReturn(noStateAppInfo)
val noStateView = servlet
.asInstanceOf[InteractiveSessionServlet]
.clientSessionView(session, req)
.asInstanceOf[SessionInfo]
noStateView.appInfo should contain (Entry(AppInfo.APP_STATE_NAME, null))
}

private def waitSession(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import org.apache.hadoop.yarn.util.ConverterUtils
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest.concurrent.Eventually
import org.scalatest.FunSpec
import org.scalatest.concurrent.Eventually
import org.scalatestplus.mockito.MockitoSugar.mock

import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf, Utils}
Expand Down Expand Up @@ -322,7 +322,7 @@ class SparkYarnAppSpec extends FunSpec with LivyBaseUnitTestSuite {
}
}

it("should expose driver log url and Spark UI url") {
it("should expose driver log url, Spark UI url, and app state") {
Clock.withSleepMethod(mockSleep) {
val mockYarnClient = mock[YarnClient]
val driverLogUrl = "DRIVER LOG URL"
Expand Down Expand Up @@ -380,7 +380,8 @@ class SparkYarnAppSpec extends FunSpec with LivyBaseUnitTestSuite {
verify(mockAppReport, atLeast(1)).getTrackingUrl()
verify(mockContainerReport, atLeast(1)).getLogUrl()
verify(mockListener).appIdKnown(appId.toString)
verify(mockListener).infoChanged(AppInfo(Some(driverLogUrl), Some(sparkUiUrl)))
verify(mockListener).infoChanged(
AppInfo(Some(driverLogUrl), Some(sparkUiUrl), Some(SparkApp.State.FINISHED)))
}
}
}
Expand Down